From f6ff2588cd49b7dc038dc0c400a0733502e8c0a1 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Sun, 3 Mar 2024 19:00:59 +0800 Subject: [PATCH] enhance: Optimize DescribeIndex to reduce lock contention (#30939) issue: #29313 issue: #30443 Signed-off-by: Cai Zhang --- internal/datacoord/index_service.go | 110 ++++++++++++++--------- internal/datacoord/index_service_test.go | 2 +- internal/datacoord/meta.go | 28 ++++++ 3 files changed, 97 insertions(+), 43 deletions(-) diff --git a/internal/datacoord/index_service.go b/internal/datacoord/index_service.go index 1f3e69f06e..c27132a54c 100644 --- a/internal/datacoord/index_service.go +++ b/internal/datacoord/index_service.go @@ -365,9 +365,12 @@ func (s *Server) GetIndexState(ctx context.Context, req *indexpb.GetIndexStateRe } indexInfo := &indexpb.IndexInfo{} - s.completeIndexInfo(indexInfo, indexes[0], s.meta.SelectSegments(func(info *SegmentInfo) bool { - return isFlush(info) && info.CollectionID == req.GetCollectionID() - }), false, indexes[0].CreateTime) + // The total rows of all indexes should be based on the current perspective + segments := s.meta.SelectSegmentIndexes(func(info *SegmentInfo) bool { + return info.GetCollectionID() == req.GetCollectionID() && (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped) + }) + + s.completeIndexInfo(indexInfo, indexes[0], segments, false, indexes[0].CreateTime) ret.State = indexInfo.State ret.FailReason = indexInfo.IndexStateFailReason @@ -415,35 +418,38 @@ func (s *Server) GetSegmentIndexState(ctx context.Context, req *indexpb.GetSegme return ret, nil } -func (s *Server) countIndexedRows(indexInfo *indexpb.IndexInfo, segments []*SegmentInfo) int64 { +func (s *Server) countIndexedRows(indexInfo *indexpb.IndexInfo, segments map[int64]*indexStats) int64 { unIndexed, indexed := typeutil.NewSet[int64](), typeutil.NewSet[int64]() - for _, seg := range segments { - segIdx, ok := seg.segmentIndexes[indexInfo.IndexID] - if !ok { - unIndexed.Insert(seg.GetID()) + for segID, seg := range segments { + if seg.state != commonpb.SegmentState_Flushed && seg.state != commonpb.SegmentState_Flushing { continue } - switch segIdx.IndexState { + segIdx, ok := seg.indexStates[indexInfo.IndexID] + if !ok { + unIndexed.Insert(segID) + continue + } + switch segIdx.GetState() { case commonpb.IndexState_Finished: - indexed.Insert(seg.GetID()) + indexed.Insert(segID) default: - unIndexed.Insert(seg.GetID()) + unIndexed.Insert(segID) } } retrieveContinue := len(unIndexed) != 0 for retrieveContinue { for segID := range unIndexed { unIndexed.Remove(segID) - segment := s.meta.GetSegment(segID) - if segment == nil || len(segment.CompactionFrom) == 0 { + segment := segments[segID] + if segment == nil || len(segment.compactionFrom) == 0 { continue } - for _, fromID := range segment.CompactionFrom { - fromSeg := s.meta.GetSegment(fromID) + for _, fromID := range segment.compactionFrom { + fromSeg := segments[fromID] if fromSeg == nil { continue } - if segIndex, ok := fromSeg.segmentIndexes[indexInfo.IndexID]; ok && segIndex.IndexState == commonpb.IndexState_Finished { + if segIndex, ok := fromSeg.indexStates[indexInfo.IndexID]; ok && segIndex.GetState() == commonpb.IndexState_Finished { indexed.Insert(fromID) continue } @@ -454,9 +460,9 @@ func (s *Server) countIndexedRows(indexInfo *indexpb.IndexInfo, segments []*Segm } indexedRows := int64(0) for segID := range indexed { - segment := s.meta.GetSegment(segID) + segment := segments[segID] if segment != nil { - indexedRows += segment.GetNumOfRows() + indexedRows += segment.numRows } } return indexedRows @@ -465,7 +471,7 @@ func (s *Server) countIndexedRows(indexInfo *indexpb.IndexInfo, segments []*Segm // completeIndexInfo get the index row count and index task state // if realTime, calculate current statistics // if not realTime, which means get info of the prior `CreateIndex` action, skip segments created after index's create time -func (s *Server) completeIndexInfo(indexInfo *indexpb.IndexInfo, index *model.Index, segments []*SegmentInfo, realTime bool, ts Timestamp) { +func (s *Server) completeIndexInfo(indexInfo *indexpb.IndexInfo, index *model.Index, segments map[int64]*indexStats, realTime bool, ts Timestamp) { var ( cntNone = 0 cntUnissued = 0 @@ -478,31 +484,34 @@ func (s *Server) completeIndexInfo(indexInfo *indexpb.IndexInfo, index *model.In pendingIndexRows = int64(0) ) - for _, seg := range segments { - totalRows += seg.NumOfRows - segIdx, ok := seg.segmentIndexes[index.IndexID] - - if !ok { - if seg.GetLastExpireTime() <= ts { - cntUnissued++ - } - pendingIndexRows += seg.GetNumOfRows() + for segID, seg := range segments { + if seg.state != commonpb.SegmentState_Flushed && seg.state != commonpb.SegmentState_Flushing { continue } - if segIdx.IndexState != commonpb.IndexState_Finished { - pendingIndexRows += seg.GetNumOfRows() + totalRows += seg.numRows + segIdx, ok := seg.indexStates[index.IndexID] + + if !ok { + if seg.lastExpireTime <= ts { + cntUnissued++ + } + pendingIndexRows += seg.numRows + continue + } + if segIdx.GetState() != commonpb.IndexState_Finished { + pendingIndexRows += seg.numRows } // if realTime, calculate current statistics // if not realTime, skip segments created after index create - if !realTime && seg.GetLastExpireTime() > ts { + if !realTime && seg.lastExpireTime > ts { continue } - switch segIdx.IndexState { + switch segIdx.GetState() { case commonpb.IndexState_IndexStateNone: // can't to here - log.Warn("receive unexpected index state: IndexStateNone", zap.Int64("segmentID", segIdx.SegmentID)) + log.Warn("receive unexpected index state: IndexStateNone", zap.Int64("segmentID", segID)) cntNone++ case commonpb.IndexState_Unissued: cntUnissued++ @@ -510,10 +519,10 @@ func (s *Server) completeIndexInfo(indexInfo *indexpb.IndexInfo, index *model.In cntInProgress++ case commonpb.IndexState_Finished: cntFinished++ - indexedRows += seg.NumOfRows + indexedRows += seg.numRows case commonpb.IndexState_Failed: cntFailed++ - failReason += fmt.Sprintf("%d: %s;", segIdx.SegmentID, segIdx.FailReason) + failReason += fmt.Sprintf("%d: %s;", segID, segIdx.FailReason) } } @@ -581,9 +590,13 @@ func (s *Server) GetIndexBuildProgress(ctx context.Context, req *indexpb.GetInde PendingIndexRows: 0, State: 0, } - s.completeIndexInfo(indexInfo, indexes[0], s.meta.SelectSegments(func(info *SegmentInfo) bool { - return isFlush(info) && info.CollectionID == req.GetCollectionID() - }), false, indexes[0].CreateTime) + + // The total rows of all indexes should be based on the current perspective + segments := s.meta.SelectSegmentIndexes(func(info *SegmentInfo) bool { + return info.GetCollectionID() == req.GetCollectionID() && (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped) + }) + + s.completeIndexInfo(indexInfo, indexes[0], segments, false, indexes[0].CreateTime) log.Info("GetIndexBuildProgress success", zap.Int64("collectionID", req.GetCollectionID()), zap.String("indexName", req.GetIndexName())) return &indexpb.GetIndexBuildProgressResponse{ @@ -594,6 +607,17 @@ func (s *Server) GetIndexBuildProgress(ctx context.Context, req *indexpb.GetInde }, nil } +// indexStats just for indexing statistics. +// Please use it judiciously. +type indexStats struct { + ID int64 + numRows int64 + compactionFrom []int64 + indexStates map[int64]*indexpb.SegmentIndexState + state commonpb.SegmentState + lastExpireTime uint64 +} + // DescribeIndex describe the index info of the collection. func (s *Server) DescribeIndex(ctx context.Context, req *indexpb.DescribeIndexRequest) (*indexpb.DescribeIndexResponse, error) { log := log.Ctx(ctx).With( @@ -621,9 +645,10 @@ func (s *Server) DescribeIndex(ctx context.Context, req *indexpb.DescribeIndexRe } // The total rows of all indexes should be based on the current perspective - segments := s.meta.SelectSegments(func(info *SegmentInfo) bool { - return isFlush(info) && info.CollectionID == req.GetCollectionID() + segments := s.meta.SelectSegmentIndexes(func(info *SegmentInfo) bool { + return info.GetCollectionID() == req.GetCollectionID() && (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped) }) + indexInfos := make([]*indexpb.IndexInfo, 0) for _, index := range indexes { indexInfo := &indexpb.IndexInfo{ @@ -679,9 +704,10 @@ func (s *Server) GetIndexStatistics(ctx context.Context, req *indexpb.GetIndexSt } // The total rows of all indexes should be based on the current perspective - segments := s.meta.SelectSegments(func(info *SegmentInfo) bool { - return isFlush(info) && info.CollectionID == req.GetCollectionID() + segments := s.meta.SelectSegmentIndexes(func(info *SegmentInfo) bool { + return info.GetCollectionID() == req.GetCollectionID() && (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped) }) + indexInfos := make([]*indexpb.IndexInfo, 0) for _, index := range indexes { indexInfo := &indexpb.IndexInfo{ diff --git a/internal/datacoord/index_service_test.go b/internal/datacoord/index_service_test.go index de9121a5e7..adbec615dc 100644 --- a/internal/datacoord/index_service_test.go +++ b/internal/datacoord/index_service_test.go @@ -1371,7 +1371,7 @@ func TestServer_DescribeIndex(t *testing.T) { }, segID - 1: { SegmentInfo: &datapb.SegmentInfo{ - ID: segID, + ID: segID - 1, CollectionID: collID, PartitionID: partID, NumOfRows: 10000, diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 707e2e3f8c..6deba8545e 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -36,6 +36,7 @@ import ( "github.com/milvus-io/milvus/internal/metastore" "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/segmentutil" "github.com/milvus-io/milvus/pkg/common" @@ -1050,6 +1051,33 @@ func (m *meta) SelectSegments(selector SegmentInfoSelector) []*SegmentInfo { return ret } +func (m *meta) SelectSegmentIndexes(selector SegmentInfoSelector) map[int64]*indexStats { + m.RLock() + defer m.RUnlock() + ret := make(map[int64]*indexStats) + for _, info := range m.segments.segments { + if selector(info) { + s := &indexStats{ + ID: info.GetID(), + numRows: info.GetNumOfRows(), + compactionFrom: info.GetCompactionFrom(), + indexStates: make(map[int64]*indexpb.SegmentIndexState), + state: info.GetState(), + lastExpireTime: info.GetLastExpireTime(), + } + for indexID, segIndex := range info.segmentIndexes { + s.indexStates[indexID] = &indexpb.SegmentIndexState{ + SegmentID: segIndex.SegmentID, + State: segIndex.IndexState, + FailReason: segIndex.FailReason, + } + } + ret[info.GetID()] = s + } + } + return ret +} + // AddAllocation add allocation in segment func (m *meta) AddAllocation(segmentID UniqueID, allocation *Allocation) error { log.Debug("meta update: add allocation",