From f65e6b7c6ee10a4cdf506fad9bfa0cfafb02490e Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Tue, 25 Mar 2025 13:46:25 +0800 Subject: [PATCH] enhance: Optimize datacoord meta mutex (#40552) Use a separate collection mutex. issue: https://github.com/milvus-io/milvus/issues/40551 --------- Signed-off-by: bigsheeper --- .../compaction_policy_clustering_test.go | 19 +- .../compaction_policy_single_test.go | 3 +- internal/datacoord/compaction_trigger_test.go | 518 +++++++++--------- internal/datacoord/garbage_collector_test.go | 57 +- internal/datacoord/index_meta.go | 4 +- internal/datacoord/index_service_test.go | 39 +- internal/datacoord/job_manager_test.go | 39 +- internal/datacoord/meta.go | 217 ++++---- internal/datacoord/meta_test.go | 28 +- internal/datacoord/server_test.go | 19 +- internal/datacoord/services.go | 4 +- internal/datacoord/services_test.go | 19 +- .../datacoord/sync_segments_scheduler_test.go | 61 ++- internal/datacoord/task_scheduler_test.go | 50 +- 14 files changed, 545 insertions(+), 532 deletions(-) diff --git a/internal/datacoord/compaction_policy_clustering_test.go b/internal/datacoord/compaction_policy_clustering_test.go index f2b612a0f8..3fe4e2ccc6 100644 --- a/internal/datacoord/compaction_policy_clustering_test.go +++ b/internal/datacoord/compaction_policy_clustering_test.go @@ -32,6 +32,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" + "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) func TestClusteringCompactionPolicySuite(t *testing.T) { @@ -67,7 +68,7 @@ func (s *ClusteringCompactionPolicySuite) SetupTest() { meta := &meta{ segments: NewSegmentsInfo(), - collections: make(map[UniqueID]*collectionInfo, 0), + collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo](), compactionTaskMeta: compactionTaskMeta, partitionStatsMeta: partitionStatsMeta, indexMeta: indexMeta, @@ -109,22 +110,22 @@ func (s *ClusteringCompactionPolicySuite) TestTriggerWithNoCollecitons() { func (s *ClusteringCompactionPolicySuite) TestTriggerWithCollections() { // valid collection - s.meta.collections[1] = &collectionInfo{ + s.meta.collections.Insert(1, &collectionInfo{ ID: 1, Schema: newTestScalarClusteringKeySchema(), - } + }) // deleted collection - s.meta.collections[2] = &collectionInfo{ + s.meta.collections.Insert(2, &collectionInfo{ ID: 2, Schema: newTestScalarClusteringKeySchema(), - } + }) s.clusteringCompactionPolicy.meta = s.meta s.handler.EXPECT().GetCollection(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, collectionID int64) (*collectionInfo, error) { if collectionID == 2 { return nil, errors.New("mock get collection fail error") } - coll, exist := s.meta.collections[collectionID] + coll, exist := s.meta.collections.Get(collectionID) if exist { return coll, nil } @@ -308,10 +309,10 @@ func (s *ClusteringCompactionPolicySuite) TestTriggerOneCollectionNormal() { Channel: "ch-1", } - s.meta.collections[testLabel.CollectionID] = &collectionInfo{ + s.meta.collections.Insert(testLabel.CollectionID, &collectionInfo{ ID: testLabel.CollectionID, Schema: newTestScalarClusteringKeySchema(), - } + }) segments := genSegmentsForMeta(testLabel) for id, segment := range segments { @@ -319,7 +320,7 @@ func (s *ClusteringCompactionPolicySuite) TestTriggerOneCollectionNormal() { } s.handler.EXPECT().GetCollection(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, collectionID int64) (*collectionInfo, error) { - coll, exist := s.meta.collections[collectionID] + coll, exist := s.meta.collections.Get(collectionID) if exist { return coll, nil } diff --git a/internal/datacoord/compaction_policy_single_test.go b/internal/datacoord/compaction_policy_single_test.go index d651ac9f0f..2e247b3c71 100644 --- a/internal/datacoord/compaction_policy_single_test.go +++ b/internal/datacoord/compaction_policy_single_test.go @@ -27,6 +27,7 @@ import ( "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" + "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) func TestSingleCompactionPolicySuite(t *testing.T) { @@ -53,7 +54,7 @@ func (s *SingleCompactionPolicySuite) SetupTest() { } segments := genSegmentsForMeta(s.testLabel) - meta := &meta{segments: NewSegmentsInfo()} + meta := &meta{segments: NewSegmentsInfo(), collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()} for id, segment := range segments { meta.segments.SetSegment(id, segment) } diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index 278c84be58..04d9fea767 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -149,6 +149,11 @@ func Test_compactionTrigger_force_without_index(t *testing.T) { Deltalogs: deltaLogs, IsSorted: true, } + collections := typeutil.NewConcurrentMap[UniqueID, *collectionInfo]() + collections.Insert(collectionID, &collectionInfo{ + ID: collectionID, + Schema: schema, + }) m := &meta{ catalog: catalog, channelCPs: newChannelCps(), @@ -172,12 +177,7 @@ func Test_compactionTrigger_force_without_index(t *testing.T) { segmentIndexes: typeutil.NewConcurrentMap[UniqueID, *typeutil.ConcurrentMap[UniqueID, *model.SegmentIndex]](), indexes: map[UniqueID]map[UniqueID]*model.Index{}, }, - collections: map[int64]*collectionInfo{ - collectionID: { - ID: collectionID, - Schema: schema, - }, - }, + collections: collections, } compactionHandler := &spyCompactionHandler{t: t, spyChan: make(chan *datapb.CompactionPlan, 1), meta: m} @@ -308,6 +308,113 @@ func Test_compactionTrigger_force(t *testing.T) { }, } + collections := typeutil.NewConcurrentMap[UniqueID, *collectionInfo]() + collections.Insert(2, &collectionInfo{ + ID: 2, + Schema: schema, + Properties: map[string]string{ + common.CollectionTTLConfigKey: "0", + }, + }) + collections.Insert(1111, &collectionInfo{ + ID: 1111, + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: vecFieldID, + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: "128", + }, + }, + }, + }, + }, + Properties: map[string]string{ + common.CollectionTTLConfigKey: "error", + }, + }) + collections.Insert(1000, &collectionInfo{ + ID: 1000, + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: vecFieldID, + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: "128", + }, + }, + }, + }, + }, + }) + // error (has no vector field) + collections.Insert(2000, &collectionInfo{ + ID: 2000, + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: vecFieldID, + DataType: schemapb.DataType_Int16, + }, + }, + }, + }) + // error (has no dim) + collections.Insert(3000, &collectionInfo{ + ID: 3000, + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: vecFieldID, + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + {}, + }, + }, + }, + }, + }) + // error (dim parse fail) + collections.Insert(4000, &collectionInfo{ + ID: 4000, + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: vecFieldID, + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: "128error", + }, + }, + }, + }, + }, + }) + collections.Insert(10000, &collectionInfo{ + ID: 10000, + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: vecFieldID, + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: "128", + }, + }, + }, + }, + }, + }) im := &indexMeta{ segmentIndexes: typeutil.NewConcurrentMap[UniqueID, *typeutil.ConcurrentMap[UniqueID, *model.SegmentIndex]](), indexes: map[UniqueID]map[UniqueID]*model.Index{ @@ -443,115 +550,8 @@ func Test_compactionTrigger_force(t *testing.T) { }, }, }, - indexMeta: im, - collections: map[int64]*collectionInfo{ - 2: { - ID: 2, - Schema: schema, - Properties: map[string]string{ - common.CollectionTTLConfigKey: "0", - }, - }, - 1111: { - ID: 1111, - Schema: &schemapb.CollectionSchema{ - Fields: []*schemapb.FieldSchema{ - { - FieldID: vecFieldID, - DataType: schemapb.DataType_FloatVector, - TypeParams: []*commonpb.KeyValuePair{ - { - Key: common.DimKey, - Value: "128", - }, - }, - }, - }, - }, - Properties: map[string]string{ - common.CollectionTTLConfigKey: "error", - }, - }, - 1000: { - ID: 1000, - Schema: &schemapb.CollectionSchema{ - Fields: []*schemapb.FieldSchema{ - { - FieldID: vecFieldID, - DataType: schemapb.DataType_FloatVector, - TypeParams: []*commonpb.KeyValuePair{ - { - Key: common.DimKey, - Value: "128", - }, - }, - }, - }, - }, - }, - // error (has no vector field) - 2000: { - ID: 2000, - Schema: &schemapb.CollectionSchema{ - Fields: []*schemapb.FieldSchema{ - { - FieldID: vecFieldID, - DataType: schemapb.DataType_Int16, - }, - }, - }, - }, - // error (has no dim) - 3000: { - ID: 3000, - Schema: &schemapb.CollectionSchema{ - Fields: []*schemapb.FieldSchema{ - { - FieldID: vecFieldID, - DataType: schemapb.DataType_FloatVector, - TypeParams: []*commonpb.KeyValuePair{ - {}, - }, - }, - }, - }, - }, - // error (dim parse fail) - 4000: { - ID: 4000, - Schema: &schemapb.CollectionSchema{ - Fields: []*schemapb.FieldSchema{ - { - FieldID: vecFieldID, - DataType: schemapb.DataType_FloatVector, - TypeParams: []*commonpb.KeyValuePair{ - { - Key: common.DimKey, - Value: "128error", - }, - }, - }, - }, - }, - }, - 10000: { - ID: 10000, - Schema: &schemapb.CollectionSchema{ - Fields: []*schemapb.FieldSchema{ - { - FieldID: vecFieldID, - DataType: schemapb.DataType_FloatVector, - TypeParams: []*commonpb.KeyValuePair{ - { - Key: common.DimKey, - Value: "128", - }, - }, - }, - }, - }, - }, - }, + indexMeta: im, + collections: collections, }, mock0Allocator, nil, @@ -851,6 +851,25 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) { mock0Allocator := newMockAllocator(t) + collections := typeutil.NewConcurrentMap[UniqueID, *collectionInfo]() + collections.Insert(2, &collectionInfo{ + ID: 2, + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: vecFieldID, + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: "128", + }, + }, + }, + }, + }, + }) + tests := []struct { name string fields fields @@ -862,28 +881,10 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) { "test many segments", fields{ &meta{ - segments: segmentInfos, - channelCPs: newChannelCps(), - collections: map[int64]*collectionInfo{ - 2: { - ID: 2, - Schema: &schemapb.CollectionSchema{ - Fields: []*schemapb.FieldSchema{ - { - FieldID: vecFieldID, - DataType: schemapb.DataType_FloatVector, - TypeParams: []*commonpb.KeyValuePair{ - { - Key: common.DimKey, - Value: "128", - }, - }, - }, - }, - }, - }, - }, - indexMeta: indexMeta, + segments: segmentInfos, + channelCPs: newChannelCps(), + collections: collections, + indexMeta: indexMeta, }, mock0Allocator, nil, @@ -1004,6 +1005,25 @@ func Test_compactionTrigger_noplan(t *testing.T) { im := newSegmentIndexMeta(nil) im.indexes[2] = make(map[UniqueID]*model.Index) + collections := typeutil.NewConcurrentMap[UniqueID, *collectionInfo]() + collections.Insert(2, &collectionInfo{ + ID: 2, + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: vecFieldID, + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: "128", + }, + }, + }, + }, + }, + }) + tests := []struct { name string fields fields @@ -1070,25 +1090,7 @@ func Test_compactionTrigger_noplan(t *testing.T) { }, }, }, - collections: map[int64]*collectionInfo{ - 2: { - ID: 2, - Schema: &schemapb.CollectionSchema{ - Fields: []*schemapb.FieldSchema{ - { - FieldID: vecFieldID, - DataType: schemapb.DataType_FloatVector, - TypeParams: []*commonpb.KeyValuePair{ - { - Key: common.DimKey, - Value: "128", - }, - }, - }, - }, - }, - }, - }, + collections: collections, }, mock0Allocator, make(chan *compactionSignal, 1), @@ -1217,6 +1219,25 @@ func Test_compactionTrigger_PrioritizedCandi(t *testing.T) { return segIdx } + collections := typeutil.NewConcurrentMap[UniqueID, *collectionInfo]() + collections.Insert(2, &collectionInfo{ + ID: 2, + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: vecFieldID, + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: "128", + }, + }, + }, + }, + }, + }) + im := &indexMeta{ segmentIndexes: typeutil.NewConcurrentMap[UniqueID, *typeutil.ConcurrentMap[UniqueID, *model.SegmentIndex]](), indexes: map[UniqueID]map[UniqueID]*model.Index{ @@ -1261,27 +1282,9 @@ func Test_compactionTrigger_PrioritizedCandi(t *testing.T) { // 8 small segments channelCPs: newChannelCps(), - segments: mockSegmentsInfo(20, 20, 20, 20, 20, 20), - indexMeta: im, - collections: map[int64]*collectionInfo{ - 2: { - ID: 2, - Schema: &schemapb.CollectionSchema{ - Fields: []*schemapb.FieldSchema{ - { - FieldID: vecFieldID, - DataType: schemapb.DataType_FloatVector, - TypeParams: []*commonpb.KeyValuePair{ - { - Key: common.DimKey, - Value: "128", - }, - }, - }, - }, - }, - }, - }, + segments: mockSegmentsInfo(20, 20, 20, 20, 20, 20), + indexMeta: im, + collections: collections, }, mock0Allocator, make(chan *compactionSignal, 1), @@ -1358,6 +1361,20 @@ func Test_compactionTrigger_SmallCandi(t *testing.T) { }) return segIdx } + + collections := typeutil.NewConcurrentMap[UniqueID, *collectionInfo]() + collections.Insert(2, &collectionInfo{ + ID: 2, + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: vecFieldID, + DataType: schemapb.DataType_FloatVector, + }, + }, + }, + }) + im := &indexMeta{ segmentIndexes: typeutil.NewConcurrentMap[UniqueID, *typeutil.ConcurrentMap[UniqueID, *model.SegmentIndex]](), indexes: map[UniqueID]map[UniqueID]*model.Index{ @@ -1404,21 +1421,9 @@ func Test_compactionTrigger_SmallCandi(t *testing.T) { channelCPs: newChannelCps(), // 7 segments with 200MB each, the compaction is expected to be triggered // as the first 5 being merged, and 1 plus being squeezed. - segments: mockSegmentsInfo(200, 200, 200, 200, 200, 200, 200), - indexMeta: im, - collections: map[int64]*collectionInfo{ - 2: { - ID: 2, - Schema: &schemapb.CollectionSchema{ - Fields: []*schemapb.FieldSchema{ - { - FieldID: vecFieldID, - DataType: schemapb.DataType_FloatVector, - }, - }, - }, - }, - }, + segments: mockSegmentsInfo(200, 200, 200, 200, 200, 200, 200), + indexMeta: im, + collections: collections, }, mock0Allocator, make(chan *compactionSignal, 1), @@ -1533,6 +1538,20 @@ func Test_compactionTrigger_SqueezeNonPlannedSegs(t *testing.T) { im.segmentIndexes.Insert(5, genSegIndex(5, indexID, 20)) im.segmentIndexes.Insert(6, genSegIndex(6, indexID, 20)) mock0Allocator := newMockAllocator(t) + + collections := typeutil.NewConcurrentMap[UniqueID, *collectionInfo]() + collections.Insert(2, &collectionInfo{ + ID: 2, + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: vecFieldID, + DataType: schemapb.DataType_FloatVector, + }, + }, + }, + }) + tests := []struct { name string fields fields @@ -1544,23 +1563,10 @@ func Test_compactionTrigger_SqueezeNonPlannedSegs(t *testing.T) { "test small segment", fields{ &meta{ - channelCPs: newChannelCps(), - - segments: mockSegmentsInfo(600, 600, 600, 600, 260, 260), - indexMeta: im, - collections: map[int64]*collectionInfo{ - 2: { - ID: 2, - Schema: &schemapb.CollectionSchema{ - Fields: []*schemapb.FieldSchema{ - { - FieldID: vecFieldID, - DataType: schemapb.DataType_FloatVector, - }, - }, - }, - }, - }, + channelCPs: newChannelCps(), + segments: mockSegmentsInfo(600, 600, 600, 600, 260, 260), + indexMeta: im, + collections: collections, }, mock0Allocator, make(chan *compactionSignal, 1), @@ -1713,6 +1719,25 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) { mock0Allocator := newMockAllocator(t) + collections := typeutil.NewConcurrentMap[UniqueID, *collectionInfo]() + collections.Insert(2, &collectionInfo{ + ID: 2, + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: vecFieldID, + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: "128", + }, + }, + }, + }, + }, + }) + tests := []struct { name string fields fields @@ -1726,27 +1751,9 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) { &meta{ channelCPs: newChannelCps(), - segments: segmentInfos, - collections: map[int64]*collectionInfo{ - 2: { - ID: 2, - Schema: &schemapb.CollectionSchema{ - Fields: []*schemapb.FieldSchema{ - { - FieldID: vecFieldID, - DataType: schemapb.DataType_FloatVector, - TypeParams: []*commonpb.KeyValuePair{ - { - Key: common.DimKey, - Value: "128", - }, - }, - }, - }, - }, - }, - }, - indexMeta: indexMeta, + segments: segmentInfos, + collections: collections, + indexMeta: indexMeta, }, mock0Allocator, make(chan *compactionSignal, 1), @@ -2083,7 +2090,7 @@ func Test_triggerSingleCompaction(t *testing.T) { }() m := &meta{ channelCPs: newChannelCps(), - segments: NewSegmentsInfo(), collections: make(map[UniqueID]*collectionInfo), + segments: NewSegmentsInfo(), collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo](), } got := newCompactionTrigger(m, &compactionPlanHandler{}, newMockAllocator(t), &ServerHandler{ @@ -2243,6 +2250,19 @@ func (s *CompactionTriggerSuite) SetupTest() { lastFlushTime: time.Now(), } + collections := typeutil.NewConcurrentMap[UniqueID, *collectionInfo]() + collections.Insert(s.collectionID, &collectionInfo{ + ID: s.collectionID, + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: s.vecFieldID, + DataType: schemapb.DataType_FloatVector, + }, + }, + }, + }) + im := &indexMeta{ segmentIndexes: typeutil.NewConcurrentMap[UniqueID, *typeutil.ConcurrentMap[UniqueID, *model.SegmentIndex]](), indexes: map[UniqueID]map[UniqueID]*model.Index{ @@ -2309,20 +2329,8 @@ func (s *CompactionTriggerSuite) SetupTest() { }, }, }, - indexMeta: im, - collections: map[int64]*collectionInfo{ - s.collectionID: { - ID: s.collectionID, - Schema: &schemapb.CollectionSchema{ - Fields: []*schemapb.FieldSchema{ - { - FieldID: s.vecFieldID, - DataType: schemapb.DataType_FloatVector, - }, - }, - }, - }, - }, + indexMeta: im, + collections: collections, } s.meta.UpdateChannelCheckpoint(context.TODO(), s.channel, &msgpb.MsgPosition{ ChannelName: s.channel, @@ -2733,6 +2741,16 @@ func Test_compactionTrigger_generatePlans(t *testing.T) { compactTime *compactTime expectedSize int64 } + + collections := typeutil.NewConcurrentMap[UniqueID, *collectionInfo]() + collections.Insert(2, &collectionInfo{ + ID: 2, + Schema: schema, + Properties: map[string]string{ + common.CollectionTTLConfigKey: "0", + }, + }) + segIndexes := typeutil.NewConcurrentMap[UniqueID, *typeutil.ConcurrentMap[UniqueID, *model.SegmentIndex]]() segIdx0 := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]() segIdx0.Insert(indexID, &model.SegmentIndex{ @@ -2823,15 +2841,7 @@ func Test_compactionTrigger_generatePlans(t *testing.T) { }, }, }, - collections: map[int64]*collectionInfo{ - 2: { - ID: 2, - Schema: schema, - Properties: map[string]string{ - common.CollectionTTLConfigKey: "0", - }, - }, - }, + collections: collections, }, mock0Allocator, nil, diff --git a/internal/datacoord/garbage_collector_test.go b/internal/datacoord/garbage_collector_test.go index 040396e398..29fd46b469 100644 --- a/internal/datacoord/garbage_collector_test.go +++ b/internal/datacoord/garbage_collector_test.go @@ -363,7 +363,6 @@ func createMetaForRecycleUnusedIndexes(catalog metastore.DataCoordCatalog) *meta indexID = UniqueID(400) ) return &meta{ - RWMutex: lock.RWMutex{}, ctx: ctx, catalog: catalog, collections: nil, @@ -517,7 +516,6 @@ func createMetaForRecycleUnusedSegIndexes(catalog metastore.DataCoordCatalog) *m segIndexes.Insert(segID, segIdx0) segIndexes.Insert(segID+1, segIdx1) meta := &meta{ - RWMutex: lock.RWMutex{}, ctx: ctx, catalog: catalog, collections: nil, @@ -682,7 +680,6 @@ func createMetaTableForRecycleUnusedIndexFiles(catalog *datacoord.Catalog) *meta segIndexes.Insert(segID, segIdx0) segIndexes.Insert(segID+1, segIdx1) meta := &meta{ - RWMutex: lock.RWMutex{}, ctx: ctx, catalog: catalog, collections: nil, @@ -1044,6 +1041,33 @@ func TestGarbageCollector_clearETCD(t *testing.T) { }, }, } + + collections := typeutil.NewConcurrentMap[UniqueID, *collectionInfo]() + collections.Insert(collID, &collectionInfo{ + ID: collID, + Schema: &schemapb.CollectionSchema{ + Name: "", + Description: "", + AutoID: false, + Fields: []*schemapb.FieldSchema{ + { + FieldID: fieldID, + Name: "", + IsPrimaryKey: false, + Description: "", + DataType: schemapb.DataType_FloatVector, + TypeParams: nil, + IndexParams: nil, + AutoID: false, + State: 0, + }, + }, + }, + Partitions: nil, + StartPositions: nil, + Properties: nil, + }) + segIndexes := typeutil.NewConcurrentMap[UniqueID, *typeutil.ConcurrentMap[UniqueID, *model.SegmentIndex]]() segIdx0 := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]() segIdx0.Insert(indexID, &model.SegmentIndex{ @@ -1111,32 +1135,7 @@ func TestGarbageCollector_clearETCD(t *testing.T) { }, }, - collections: map[UniqueID]*collectionInfo{ - collID: { - ID: collID, - Schema: &schemapb.CollectionSchema{ - Name: "", - Description: "", - AutoID: false, - Fields: []*schemapb.FieldSchema{ - { - FieldID: fieldID, - Name: "", - IsPrimaryKey: false, - Description: "", - DataType: schemapb.DataType_FloatVector, - TypeParams: nil, - IndexParams: nil, - AutoID: false, - State: 0, - }, - }, - }, - Partitions: nil, - StartPositions: nil, - Properties: nil, - }, - }, + collections: collections, } m.indexMeta.segmentBuildInfo.Add(&model.SegmentIndex{ diff --git a/internal/datacoord/index_meta.go b/internal/datacoord/index_meta.go index ff8dadecb2..1da5660af5 100644 --- a/internal/datacoord/index_meta.go +++ b/internal/datacoord/index_meta.go @@ -919,7 +919,7 @@ func (m *indexMeta) GetAllSegIndexes() map[int64]*model.SegmentIndex { } // SetStoredIndexFileSizeMetric returns the total index files size of all segment for each collection. -func (m *indexMeta) SetStoredIndexFileSizeMetric(collections map[UniqueID]*collectionInfo) uint64 { +func (m *indexMeta) SetStoredIndexFileSizeMetric(collections *typeutil.ConcurrentMap[UniqueID, *collectionInfo]) uint64 { m.fieldIndexLock.Lock() defer m.fieldIndexLock.Unlock() @@ -927,7 +927,7 @@ func (m *indexMeta) SetStoredIndexFileSizeMetric(collections map[UniqueID]*colle metrics.DataCoordStoredIndexFilesSize.Reset() for _, segmentIdx := range m.segmentBuildInfo.List() { - coll, ok := collections[segmentIdx.CollectionID] + coll, ok := collections.Get(segmentIdx.CollectionID) if ok { metrics.DataCoordStoredIndexFilesSize.WithLabelValues(coll.DatabaseName, coll.Schema.GetName(), fmt.Sprint(segmentIdx.CollectionID)).Add(float64(segmentIdx.IndexSerializedSize)) diff --git a/internal/datacoord/index_service_test.go b/internal/datacoord/index_service_test.go index 34c1b57e8b..c7e22067d4 100644 --- a/internal/datacoord/index_service_test.go +++ b/internal/datacoord/index_service_test.go @@ -91,21 +91,21 @@ func TestServer_CreateIndex(t *testing.T) { mock0Allocator := newMockAllocator(t) + collections := typeutil.NewConcurrentMap[UniqueID, *collectionInfo]() + collections.Insert(collID, &collectionInfo{ + ID: collID, + Partitions: nil, + StartPositions: nil, + Properties: nil, + CreatedAt: 0, + }) + indexMeta := newSegmentIndexMeta(catalog) s := &Server{ meta: &meta{ - catalog: catalog, - collections: map[UniqueID]*collectionInfo{ - collID: { - ID: collID, - - Partitions: nil, - StartPositions: nil, - Properties: nil, - CreatedAt: 0, - }, - }, - indexMeta: indexMeta, + catalog: catalog, + collections: collections, + indexMeta: indexMeta, }, allocator: mock0Allocator, notifyIndexChan: make(chan UniqueID, 1), @@ -2664,15 +2664,16 @@ func TestJsonIndex(t *testing.T) { }, }, nil) + collections := typeutil.NewConcurrentMap[UniqueID, *collectionInfo]() + collections.Insert(collID, &collectionInfo{ + ID: collID, + }) + s := &Server{ meta: &meta{ - catalog: catalog, - collections: map[UniqueID]*collectionInfo{ - collID: { - ID: collID, - }, - }, - indexMeta: indexMeta, + catalog: catalog, + collections: collections, + indexMeta: indexMeta, }, allocator: mock0Allocator, notifyIndexChan: make(chan UniqueID, 1), diff --git a/internal/datacoord/job_manager_test.go b/internal/datacoord/job_manager_test.go index 9c4b362246..6dfcaae3c8 100644 --- a/internal/datacoord/job_manager_test.go +++ b/internal/datacoord/job_manager_test.go @@ -41,33 +41,34 @@ func (s *jobManagerSuite) TestJobManager_triggerStatsTaskLoop() { catalog := mocks.NewDataCoordCatalog(s.T()) catalog.EXPECT().SaveStatsTask(mock.Anything, mock.Anything).Return(nil) - mt := &meta{ - collections: map[UniqueID]*collectionInfo{ - 1: { - Schema: &schemapb.CollectionSchema{ - Fields: []*schemapb.FieldSchema{ + collections := typeutil.NewConcurrentMap[UniqueID, *collectionInfo]() + collections.Insert(1, &collectionInfo{ + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: 100, + Name: "pk", + DataType: schemapb.DataType_Int64, + }, + { + FieldID: 101, + Name: "var", + DataType: schemapb.DataType_VarChar, + TypeParams: []*commonpb.KeyValuePair{ { - FieldID: 100, - Name: "pk", - DataType: schemapb.DataType_Int64, + Key: "enable_match", Value: "true", }, { - FieldID: 101, - Name: "var", - DataType: schemapb.DataType_VarChar, - TypeParams: []*commonpb.KeyValuePair{ - { - Key: "enable_match", Value: "true", - }, - { - Key: "enable_analyzer", Value: "true", - }, - }, + Key: "enable_analyzer", Value: "true", }, }, }, }, }, + }) + + mt := &meta{ + collections: collections, segments: &SegmentsInfo{ segments: map[UniqueID]*SegmentInfo{ 10: { diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index ddb5ecb22e..b162dd7206 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -54,6 +54,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/util/retry" "github.com/milvus-io/milvus/pkg/v2/util/timerecord" "github.com/milvus-io/milvus/pkg/v2/util/tsoutil" + "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) type CompactionMeta interface { @@ -82,12 +83,15 @@ type CompactionMeta interface { var _ CompactionMeta = (*meta)(nil) type meta struct { - lock.RWMutex - ctx context.Context - catalog metastore.DataCoordCatalog - collections map[UniqueID]*collectionInfo // collection id to collection info - segments *SegmentsInfo // segment id to segment info - channelCPs *channelCPs // vChannel -> channel checkpoint/see position + ctx context.Context + catalog metastore.DataCoordCatalog + + collections *typeutil.ConcurrentMap[UniqueID, *collectionInfo] // collection id to collection info + + segMu lock.RWMutex + segments *SegmentsInfo // segment id to segment info + + channelCPs *channelCPs // vChannel -> channel checkpoint/see position chunkManager storage.ChunkManager indexMeta *indexMeta @@ -178,7 +182,7 @@ func newMeta(ctx context.Context, catalog metastore.DataCoordCatalog, chunkManag mt := &meta{ ctx: ctx, catalog: catalog, - collections: make(map[UniqueID]*collectionInfo), + collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo](), segments: NewSegmentsInfo(), channelCPs: newChannelCps(), indexMeta: im, @@ -334,29 +338,24 @@ func (m *meta) reloadCollectionsFromRootcoord(ctx context.Context, broker broker // Note that collection info is just for caching and will not be set into etcd from datacoord func (m *meta) AddCollection(collection *collectionInfo) { log.Info("meta update: add collection", zap.Int64("collectionID", collection.ID)) - m.Lock() - defer m.Unlock() - m.collections[collection.ID] = collection - metrics.DataCoordNumCollections.WithLabelValues().Set(float64(len(m.collections))) + m.collections.Insert(collection.ID, collection) + metrics.DataCoordNumCollections.WithLabelValues().Set(float64(m.collections.Len())) log.Info("meta update: add collection - complete", zap.Int64("collectionID", collection.ID)) } // DropCollection drop a collection from meta func (m *meta) DropCollection(collectionID int64) { log.Info("meta update: drop collection", zap.Int64("collectionID", collectionID)) - m.Lock() - defer m.Unlock() - delete(m.collections, collectionID) - metrics.CleanupDataCoordWithCollectionID(collectionID) - metrics.DataCoordNumCollections.WithLabelValues().Set(float64(len(m.collections))) - log.Info("meta update: drop collection - complete", zap.Int64("collectionID", collectionID)) + if _, ok := m.collections.GetAndRemove(collectionID); ok { + metrics.CleanupDataCoordWithCollectionID(collectionID) + metrics.DataCoordNumCollections.WithLabelValues().Set(float64(m.collections.Len())) + log.Info("meta update: drop collection - complete", zap.Int64("collectionID", collectionID)) + } } // GetCollection returns collection info with provided collection id from local cache func (m *meta) GetCollection(collectionID UniqueID) *collectionInfo { - m.RLock() - defer m.RUnlock() - collection, ok := m.collections[collectionID] + collection, ok := m.collections.Get(collectionID) if !ok { return nil } @@ -365,20 +364,11 @@ func (m *meta) GetCollection(collectionID UniqueID) *collectionInfo { // GetCollections returns collections from local cache func (m *meta) GetCollections() []*collectionInfo { - m.RLock() - defer m.RUnlock() - collections := make([]*collectionInfo, 0) - for _, coll := range m.collections { - collections = append(collections, coll) - } - return collections + return m.collections.Values() } func (m *meta) GetClonedCollectionInfo(collectionID UniqueID) *collectionInfo { - m.RLock() - defer m.RUnlock() - - coll, ok := m.collections[collectionID] + coll, ok := m.collections.Get(collectionID) if !ok { return nil } @@ -432,24 +422,18 @@ func GetSegmentsChanPart(m *meta, collectionID int64, filters ...SegmentFilter) return result } -func (m *meta) getNumRowsOfCollectionUnsafe(collectionID UniqueID) int64 { +// GetNumRowsOfCollection returns total rows count of segments belongs to provided collection +func (m *meta) GetNumRowsOfCollection(ctx context.Context, collectionID UniqueID) int64 { var ret int64 - segments := m.segments.GetSegments() + segments := m.SelectSegments(ctx, WithCollection(collectionID), SegmentFilterFunc(func(si *SegmentInfo) bool { + return isSegmentHealthy(si) + })) for _, segment := range segments { - if isSegmentHealthy(segment) && segment.GetCollectionID() == collectionID { - ret += segment.GetNumOfRows() - } + ret += segment.GetNumOfRows() } return ret } -// GetNumRowsOfCollection returns total rows count of segments belongs to provided collection -func (m *meta) GetNumRowsOfCollection(collectionID UniqueID) int64 { - m.RLock() - defer m.RUnlock() - return m.getNumRowsOfCollectionUnsafe(collectionID) -} - func getBinlogFileCount(s *datapb.SegmentInfo) int { statsFieldFn := func(fieldBinlogs []*datapb.FieldBinlog) int { cnt := 0 @@ -468,8 +452,8 @@ func getBinlogFileCount(s *datapb.SegmentInfo) int { func (m *meta) GetQuotaInfo() *metricsinfo.DataCoordQuotaMetrics { info := &metricsinfo.DataCoordQuotaMetrics{} - m.RLock() - defer m.RUnlock() + m.segMu.RLock() + defer m.segMu.RUnlock() collectionBinlogSize := make(map[UniqueID]int64) partitionBinlogSize := make(map[UniqueID]map[UniqueID]int64) collectionRowsNum := make(map[UniqueID]map[commonpb.SegmentState]int64) @@ -493,7 +477,7 @@ func (m *meta) GetQuotaInfo() *metricsinfo.DataCoordQuotaMetrics { } partBinlogSize[segment.GetPartitionID()] += segmentSize - coll, ok := m.collections[segment.GetCollectionID()] + coll, ok := m.collections.Get(segment.GetCollectionID()) if ok { metrics.DataCoordStoredBinlogSize.WithLabelValues(coll.DatabaseName, fmt.Sprint(segment.GetCollectionID()), segment.GetState().String()).Add(float64(segmentSize)) @@ -516,7 +500,7 @@ func (m *meta) GetQuotaInfo() *metricsinfo.DataCoordQuotaMetrics { metrics.DataCoordNumStoredRows.Reset() for collectionID, statesRows := range collectionRowsNum { - coll, ok := m.collections[collectionID] + coll, ok := m.collections.Get(collectionID) if ok { for state, rows := range statesRows { metrics.DataCoordNumStoredRows.WithLabelValues(coll.DatabaseName, fmt.Sprint(collectionID), coll.Schema.GetName(), state.String()).Set(float64(rows)) @@ -526,7 +510,7 @@ func (m *meta) GetQuotaInfo() *metricsinfo.DataCoordQuotaMetrics { metrics.DataCoordL0DeleteEntriesNum.Reset() for collectionID, entriesNum := range collectionL0RowCounts { - coll, ok := m.collections[collectionID] + coll, ok := m.collections.Get(collectionID) if ok { metrics.DataCoordL0DeleteEntriesNum.WithLabelValues(coll.DatabaseName, fmt.Sprint(collectionID)).Set(float64(entriesNum)) } @@ -542,15 +526,13 @@ func (m *meta) GetQuotaInfo() *metricsinfo.DataCoordQuotaMetrics { // SetStoredIndexFileSizeMetric returns the total index files size of all segment for each collection. func (m *meta) SetStoredIndexFileSizeMetric() uint64 { - m.RLock() - defer m.RUnlock() return m.indexMeta.SetStoredIndexFileSizeMetric(m.collections) } func (m *meta) GetAllCollectionNumRows() map[int64]int64 { - m.RLock() - defer m.RUnlock() - ret := make(map[int64]int64, len(m.collections)) + m.segMu.RLock() + defer m.segMu.RUnlock() + ret := make(map[int64]int64, m.collections.Len()) segments := m.segments.GetSegments() for _, segment := range segments { if isSegmentHealthy(segment) { @@ -564,8 +546,8 @@ func (m *meta) GetAllCollectionNumRows() map[int64]int64 { func (m *meta) AddSegment(ctx context.Context, segment *SegmentInfo) error { log := log.Ctx(ctx).With(zap.String("channel", segment.GetInsertChannel())) log.Info("meta update: adding segment - Start", zap.Int64("segmentID", segment.GetID())) - m.Lock() - defer m.Unlock() + m.segMu.Lock() + defer m.segMu.Unlock() if err := m.catalog.AddSegment(ctx, segment.SegmentInfo); err != nil { log.Error("meta update: adding segment failed", zap.Int64("segmentID", segment.GetID()), @@ -583,8 +565,8 @@ func (m *meta) AddSegment(ctx context.Context, segment *SegmentInfo) error { func (m *meta) DropSegment(ctx context.Context, segmentID UniqueID) error { log := log.Ctx(ctx) log.Debug("meta update: dropping segment", zap.Int64("segmentID", segmentID)) - m.Lock() - defer m.Unlock() + m.segMu.Lock() + defer m.segMu.Unlock() segment := m.segments.GetSegment(segmentID) if segment == nil { log.Warn("meta update: dropping segment failed - segment not found", @@ -608,8 +590,8 @@ func (m *meta) DropSegment(ctx context.Context, segmentID UniqueID) error { // GetHealthySegment returns segment info with provided id // if not segment is found, nil will be returned func (m *meta) GetHealthySegment(ctx context.Context, segID UniqueID) *SegmentInfo { - m.RLock() - defer m.RUnlock() + m.segMu.RLock() + defer m.segMu.RUnlock() segment := m.segments.GetSegment(segID) if segment != nil && isSegmentHealthy(segment) { return segment @@ -619,8 +601,8 @@ func (m *meta) GetHealthySegment(ctx context.Context, segID UniqueID) *SegmentIn // Get segments By filter function func (m *meta) GetSegments(segIDs []UniqueID, filterFunc SegmentInfoSelector) []UniqueID { - m.RLock() - defer m.RUnlock() + m.segMu.RLock() + defer m.segMu.RUnlock() var result []UniqueID for _, id := range segIDs { segment := m.segments.GetSegment(id) @@ -635,21 +617,21 @@ func (m *meta) GetSegments(segIDs []UniqueID, filterFunc SegmentInfoSelector) [] // include the unhealthy segment // if not segment is found, nil will be returned func (m *meta) GetSegment(ctx context.Context, segID UniqueID) *SegmentInfo { - m.RLock() - defer m.RUnlock() + m.segMu.RLock() + defer m.segMu.RUnlock() return m.segments.GetSegment(segID) } // GetAllSegmentsUnsafe returns all segments func (m *meta) GetAllSegmentsUnsafe() []*SegmentInfo { - m.RLock() - defer m.RUnlock() + m.segMu.RLock() + defer m.segMu.RUnlock() return m.segments.GetSegments() } func (m *meta) GetSegmentsTotalNumRows(segmentIDs []UniqueID) int64 { - m.RLock() - defer m.RUnlock() + m.segMu.RLock() + defer m.segMu.RUnlock() var sum int64 = 0 for _, segmentID := range segmentIDs { segment := m.segments.GetSegment(segmentID) @@ -663,8 +645,8 @@ func (m *meta) GetSegmentsTotalNumRows(segmentIDs []UniqueID) int64 { } func (m *meta) GetSegmentsChannels(segmentIDs []UniqueID) (map[int64]string, error) { - m.RLock() - defer m.RUnlock() + m.segMu.RLock() + defer m.segMu.RUnlock() segChannels := make(map[int64]string) for _, segmentID := range segmentIDs { segment := m.segments.GetSegment(segmentID) @@ -682,8 +664,8 @@ func (m *meta) SetState(ctx context.Context, segmentID UniqueID, targetState com log.Debug("meta update: setting segment state", zap.Int64("segmentID", segmentID), zap.Any("target state", targetState)) - m.Lock() - defer m.Unlock() + m.segMu.Lock() + defer m.segMu.Unlock() curSegInfo := m.segments.GetSegment(segmentID) if curSegInfo == nil { log.Warn("meta update: setting segment state - segment not found", @@ -722,8 +704,8 @@ func (m *meta) SetState(ctx context.Context, segmentID UniqueID, targetState com } func (m *meta) UpdateSegment(segmentID int64, operators ...SegmentOperator) error { - m.Lock() - defer m.Unlock() + m.segMu.Lock() + defer m.segMu.Unlock() log := log.Ctx(context.TODO()) info := m.segments.GetSegment(segmentID) if info == nil { @@ -1121,8 +1103,8 @@ func UpdateAsDroppedIfEmptyWhenFlushing(segmentID int64) UpdateOperator { // updateSegmentsInfo update segment infos // will exec all operators, and update all changed segments func (m *meta) UpdateSegmentsInfo(ctx context.Context, operators ...UpdateOperator) error { - m.Lock() - defer m.Unlock() + m.segMu.Lock() + defer m.segMu.Unlock() updatePack := &updateSegmentPack{ meta: m, segments: make(map[int64]*SegmentInfo), @@ -1165,8 +1147,8 @@ func (m *meta) UpdateDropChannelSegmentInfo(ctx context.Context, channel string, log := log.Ctx(ctx) log.Debug("meta update: update drop channel segment info", zap.String("channel", channel)) - m.Lock() - defer m.Unlock() + m.segMu.Lock() + defer m.segMu.Unlock() // Prepare segment metric mutation. metricMutation := &segMetricMutation{ @@ -1391,14 +1373,14 @@ func (m *meta) GetFlushingSegments() []*SegmentInfo { // SelectSegments select segments with selector func (m *meta) SelectSegments(ctx context.Context, filters ...SegmentFilter) []*SegmentInfo { - m.RLock() - defer m.RUnlock() + m.segMu.RLock() + defer m.segMu.RUnlock() return m.segments.GetSegmentsBySelector(filters...) } func (m *meta) GetRealSegmentsForChannel(channel string) []*SegmentInfo { - m.RLock() - defer m.RUnlock() + m.segMu.RLock() + defer m.segMu.RUnlock() return m.segments.GetRealSegmentsForChannel(channel) } @@ -1407,8 +1389,8 @@ func (m *meta) AddAllocation(segmentID UniqueID, allocation *Allocation) error { log.Ctx(m.ctx).Debug("meta update: add allocation", zap.Int64("segmentID", segmentID), zap.Any("allocation", allocation)) - m.Lock() - defer m.Unlock() + m.segMu.Lock() + defer m.segMu.Unlock() curSegInfo := m.segments.GetSegment(segmentID) if curSegInfo == nil { // TODO: Error handling. @@ -1423,24 +1405,24 @@ func (m *meta) AddAllocation(segmentID UniqueID, allocation *Allocation) error { } func (m *meta) SetRowCount(segmentID UniqueID, rowCount int64) { - m.Lock() - defer m.Unlock() + m.segMu.Lock() + defer m.segMu.Unlock() m.segments.SetRowCount(segmentID, rowCount) } // SetAllocations set Segment allocations, will overwrite ALL original allocations // Note that allocations is not persisted in KV store func (m *meta) SetAllocations(segmentID UniqueID, allocations []*Allocation) { - m.Lock() - defer m.Unlock() + m.segMu.Lock() + defer m.segMu.Unlock() m.segments.SetAllocations(segmentID, allocations) } // SetLastExpire set lastExpire time for segment // Note that last is not necessary to store in KV meta func (m *meta) SetLastExpire(segmentID UniqueID, lastExpire uint64) { - m.Lock() - defer m.Unlock() + m.segMu.Lock() + defer m.segMu.Unlock() clonedSegment := m.segments.GetSegment(segmentID).Clone() clonedSegment.LastExpireTime = lastExpire m.segments.SetSegment(segmentID, clonedSegment) @@ -1449,22 +1431,22 @@ func (m *meta) SetLastExpire(segmentID UniqueID, lastExpire uint64) { // SetLastFlushTime set LastFlushTime for segment with provided `segmentID` // Note that lastFlushTime is not persisted in KV store func (m *meta) SetLastFlushTime(segmentID UniqueID, t time.Time) { - m.Lock() - defer m.Unlock() + m.segMu.Lock() + defer m.segMu.Unlock() m.segments.SetFlushTime(segmentID, t) } // SetLastWrittenTime set LastWrittenTime for segment with provided `segmentID` // Note that lastWrittenTime is not persisted in KV store func (m *meta) SetLastWrittenTime(segmentID UniqueID) { - m.Lock() - defer m.Unlock() + m.segMu.Lock() + defer m.segMu.Unlock() m.segments.SetLastWrittenTime(segmentID) } func (m *meta) CheckSegmentsStating(ctx context.Context, segmentIDs []UniqueID) (exist bool, hasStating bool) { - m.RLock() - defer m.RUnlock() + m.segMu.RLock() + defer m.segMu.RUnlock() exist = true for _, segmentID := range segmentIDs { seg := m.segments.GetSegment(segmentID) @@ -1481,16 +1463,16 @@ func (m *meta) CheckSegmentsStating(ctx context.Context, segmentIDs []UniqueID) } func (m *meta) SetSegmentStating(segmentID UniqueID, stating bool) { - m.Lock() - defer m.Unlock() + m.segMu.Lock() + defer m.segMu.Unlock() m.segments.SetIsStating(segmentID, stating) } // SetSegmentCompacting sets compaction state for segment func (m *meta) SetSegmentCompacting(segmentID UniqueID, compacting bool) { - m.Lock() - defer m.Unlock() + m.segMu.Lock() + defer m.segMu.Unlock() m.segments.SetIsCompacting(segmentID, compacting) } @@ -1499,8 +1481,8 @@ func (m *meta) SetSegmentCompacting(segmentID UniqueID, compacting bool) { // if true, set them compacting and return true // if false, skip setting and func (m *meta) CheckAndSetSegmentsCompacting(ctx context.Context, segmentIDs []UniqueID) (exist, canDo bool) { - m.Lock() - defer m.Unlock() + m.segMu.Lock() + defer m.segMu.Unlock() var hasCompacting bool exist = true for _, segmentID := range segmentIDs { @@ -1524,8 +1506,8 @@ func (m *meta) CheckAndSetSegmentsCompacting(ctx context.Context, segmentIDs []U } func (m *meta) SetSegmentsCompacting(ctx context.Context, segmentIDs []UniqueID, compacting bool) { - m.Lock() - defer m.Unlock() + m.segMu.Lock() + defer m.segMu.Unlock() for _, segmentID := range segmentIDs { m.segments.SetIsCompacting(segmentID, compacting) } @@ -1533,8 +1515,8 @@ func (m *meta) SetSegmentsCompacting(ctx context.Context, segmentIDs []UniqueID, // SetSegmentLevel sets level for segment func (m *meta) SetSegmentLevel(segmentID UniqueID, level datapb.SegmentLevel) { - m.Lock() - defer m.Unlock() + m.segMu.Lock() + defer m.segMu.Unlock() m.segments.SetLevel(segmentID, level) } @@ -1739,8 +1721,8 @@ func (m *meta) completeMixCompactionMutation(t *datapb.CompactionTask, result *d } func (m *meta) CompleteCompactionMutation(ctx context.Context, t *datapb.CompactionTask, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error) { - m.Lock() - defer m.Unlock() + m.segMu.Lock() + defer m.segMu.Unlock() switch t.GetType() { case datapb.CompactionType_MixCompaction: return m.completeMixCompactionMutation(t, result) @@ -1771,8 +1753,8 @@ func isSegmentHealthy(segment *SegmentInfo) bool { } func (m *meta) HasSegments(segIDs []UniqueID) (bool, error) { - m.RLock() - defer m.RUnlock() + m.segMu.RLock() + defer m.segMu.RUnlock() for _, segID := range segIDs { if _, ok := m.segments.segments[segID]; !ok { @@ -1784,8 +1766,8 @@ func (m *meta) HasSegments(segIDs []UniqueID) (bool, error) { // GetCompactionTo returns the segment info of the segment to be compacted to. func (m *meta) GetCompactionTo(segmentID int64) ([]*SegmentInfo, bool) { - m.RLock() - defer m.RUnlock() + m.segMu.RLock() + defer m.segMu.RUnlock() return m.segments.GetCompactionTo(segmentID) } @@ -2015,10 +1997,7 @@ func updateSegStateAndPrepareMetrics(segToUpdate *SegmentInfo, targetState commo } func (m *meta) ListCollections() []int64 { - m.RLock() - defer m.RUnlock() - - return lo.Keys(m.collections) + return m.collections.Keys() } func (m *meta) DropCompactionTask(ctx context.Context, task *datapb.CompactionTask) error { @@ -2093,8 +2072,8 @@ func (m *meta) CleanPartitionStatsInfo(ctx context.Context, info *datapb.Partiti } func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.StatsResult) (*segMetricMutation, error) { - m.Lock() - defer m.Unlock() + m.segMu.Lock() + defer m.segMu.Unlock() log := log.Ctx(m.ctx).With(zap.Int64("collectionID", result.GetCollectionID()), zap.Int64("partitionID", result.GetPartitionID()), @@ -2169,8 +2148,8 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats } func (m *meta) getSegmentsMetrics(collectionID int64) []*metricsinfo.Segment { - m.RLock() - defer m.RUnlock() + m.segMu.RLock() + defer m.segMu.RUnlock() segments := make([]*metricsinfo.Segment, 0, len(m.segments.segments)) for _, s := range m.segments.segments { diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index f8c0660632..85ee3ba2ad 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -50,6 +50,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/util/metricsinfo" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/testutils" + "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) // MetaReloadSuite tests meta reload & meta creation related logic @@ -655,7 +656,7 @@ func TestMeta_Basic(t *testing.T) { const rowCount1 = 300 // no segment - nums := meta.GetNumRowsOfCollection(collID) + nums := meta.GetNumRowsOfCollection(context.Background(), collID) assert.EqualValues(t, 0, nums) // add seg1 with 100 rows @@ -675,7 +676,7 @@ func TestMeta_Basic(t *testing.T) { // check partition/collection statistics nums = meta.GetNumRowsOfPartition(context.TODO(), collID, partID0) assert.EqualValues(t, (rowCount0 + rowCount1), nums) - nums = meta.GetNumRowsOfCollection(collID) + nums = meta.GetNumRowsOfCollection(context.Background(), collID) assert.EqualValues(t, (rowCount0 + rowCount1), nums) }) @@ -742,7 +743,7 @@ func TestMeta_Basic(t *testing.T) { assert.Equal(t, int64(size0+size1), quotaInfo.CollectionBinlogSize[collID]) assert.Equal(t, int64(size0+size1), quotaInfo.TotalBinlogSize) - meta.collections[collID] = collInfo + meta.collections.Insert(collID, collInfo) quotaInfo = meta.GetQuotaInfo() assert.Len(t, quotaInfo.CollectionBinlogSize, 1) assert.Equal(t, int64(size0+size1), quotaInfo.CollectionBinlogSize[collID]) @@ -754,12 +755,11 @@ func TestMeta_Basic(t *testing.T) { ret := meta.SetStoredIndexFileSizeMetric() assert.Equal(t, uint64(0), ret) - meta.collections = map[UniqueID]*collectionInfo{ - 100: { - ID: 100, - DatabaseName: "db", - }, - } + meta.collections = typeutil.NewConcurrentMap[UniqueID, *collectionInfo]() + meta.collections.Insert(100, &collectionInfo{ + ID: 100, + DatabaseName: "db", + }) ret = meta.SetStoredIndexFileSizeMetric() assert.Equal(t, uint64(11), ret) }) @@ -1347,7 +1347,7 @@ func Test_meta_GcConfirm(t *testing.T) { func Test_meta_ReloadCollectionsFromRootcoords(t *testing.T) { t.Run("fail to list database", func(t *testing.T) { m := &meta{ - collections: make(map[UniqueID]*collectionInfo), + collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo](), } mockBroker := broker.NewMockBroker(t) mockBroker.EXPECT().ListDatabases(mock.Anything).Return(nil, errors.New("list database failed, mocked")) @@ -1357,7 +1357,7 @@ func Test_meta_ReloadCollectionsFromRootcoords(t *testing.T) { t.Run("fail to show collections", func(t *testing.T) { m := &meta{ - collections: make(map[UniqueID]*collectionInfo), + collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo](), } mockBroker := broker.NewMockBroker(t) @@ -1371,7 +1371,7 @@ func Test_meta_ReloadCollectionsFromRootcoords(t *testing.T) { t.Run("fail to describe collection", func(t *testing.T) { m := &meta{ - collections: make(map[UniqueID]*collectionInfo), + collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo](), } mockBroker := broker.NewMockBroker(t) @@ -1389,7 +1389,7 @@ func Test_meta_ReloadCollectionsFromRootcoords(t *testing.T) { t.Run("fail to show partitions", func(t *testing.T) { m := &meta{ - collections: make(map[UniqueID]*collectionInfo), + collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo](), } mockBroker := broker.NewMockBroker(t) @@ -1408,7 +1408,7 @@ func Test_meta_ReloadCollectionsFromRootcoords(t *testing.T) { t.Run("success", func(t *testing.T) { m := &meta{ - collections: make(map[UniqueID]*collectionInfo), + collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo](), } mockBroker := broker.NewMockBroker(t) diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 3d8492c6ff..b012d5e860 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -2488,13 +2488,12 @@ func Test_CheckHealth(t *testing.T) { return channelManager } - collections := map[UniqueID]*collectionInfo{ - 449684528748778322: { - ID: 449684528748778322, - VChannelNames: []string{"ch1", "ch2"}, - }, - 2: nil, - } + collections := typeutil.NewConcurrentMap[UniqueID, *collectionInfo]() + collections.Insert(449684528748778322, &collectionInfo{ + ID: 449684528748778322, + VChannelNames: []string{"ch1", "ch2"}, + }) + collections.Insert(2, nil) t.Run("not healthy", func(t *testing.T) { ctx := context.Background() @@ -2657,7 +2656,7 @@ func TestLoadCollectionFromRootCoord(t *testing.T) { broker := broker.NewMockBroker(t) s := &Server{ broker: broker, - meta: &meta{collections: make(map[UniqueID]*collectionInfo)}, + meta: &meta{collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()}, } t.Run("has collection fail with error", func(t *testing.T) { @@ -2698,8 +2697,8 @@ func TestLoadCollectionFromRootCoord(t *testing.T) { t.Run("ok", func(t *testing.T) { err := s.loadCollectionFromRootCoord(context.TODO(), 0) assert.NoError(t, err) - assert.Equal(t, 1, len(s.meta.collections)) - _, ok := s.meta.collections[1] + assert.Equal(t, 1, s.meta.collections.Len()) + _, ok := s.meta.collections.Get(1) assert.True(t, ok) }) } diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index f7e629e8a2..25340ce5cc 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -371,7 +371,7 @@ func (s *Server) GetCollectionStatistics(ctx context.Context, req *datapb.GetCol resp := &datapb.GetCollectionStatisticsResponse{ Status: merr.Success(), } - nums := s.meta.GetNumRowsOfCollection(req.CollectionID) + nums := s.meta.GetNumRowsOfCollection(ctx, req.CollectionID) resp.Stats = append(resp.Stats, &commonpb.KeyValuePair{Key: "row_count", Value: strconv.FormatInt(nums, 10)}) log.Info("success to get collection statistics", zap.Any("response", resp)) return resp, nil @@ -395,7 +395,7 @@ func (s *Server) GetPartitionStatistics(ctx context.Context, req *datapb.GetPart } nums := int64(0) if len(req.GetPartitionIDs()) == 0 { - nums = s.meta.GetNumRowsOfCollection(req.CollectionID) + nums = s.meta.GetNumRowsOfCollection(ctx, req.CollectionID) } for _, partID := range req.GetPartitionIDs() { num := s.meta.GetNumRowsOfPartition(ctx, req.CollectionID, partID) diff --git a/internal/datacoord/services_test.go b/internal/datacoord/services_test.go index 9ad350bbaa..0acfb52915 100644 --- a/internal/datacoord/services_test.go +++ b/internal/datacoord/services_test.go @@ -37,6 +37,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/metautil" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" + "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) type ServerSuite struct { @@ -727,7 +728,7 @@ func TestBroadcastAlteredCollection(t *testing.T) { }) t.Run("test meta non exist", func(t *testing.T) { - s := &Server{meta: &meta{collections: make(map[UniqueID]*collectionInfo, 1)}} + s := &Server{meta: &meta{collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()}} s.stateCode.Store(commonpb.StateCode_Healthy) ctx := context.Background() req := &datapb.AlterCollectionRequest{ @@ -738,13 +739,13 @@ func TestBroadcastAlteredCollection(t *testing.T) { resp, err := s.BroadcastAlteredCollection(ctx, req) assert.NotNil(t, resp) assert.NoError(t, err) - assert.Equal(t, 1, len(s.meta.collections)) + assert.Equal(t, 1, s.meta.collections.Len()) }) t.Run("test update meta", func(t *testing.T) { - s := &Server{meta: &meta{collections: map[UniqueID]*collectionInfo{ - 1: {ID: 1}, - }}} + collections := typeutil.NewConcurrentMap[UniqueID, *collectionInfo]() + collections.Insert(1, &collectionInfo{ID: 1}) + s := &Server{meta: &meta{collections: collections}} s.stateCode.Store(commonpb.StateCode_Healthy) ctx := context.Background() req := &datapb.AlterCollectionRequest{ @@ -753,11 +754,15 @@ func TestBroadcastAlteredCollection(t *testing.T) { Properties: []*commonpb.KeyValuePair{{Key: "k", Value: "v"}}, } - assert.Nil(t, s.meta.collections[1].Properties) + coll, ok := s.meta.collections.Get(1) + assert.True(t, ok) + assert.Nil(t, coll.Properties) resp, err := s.BroadcastAlteredCollection(ctx, req) assert.NotNil(t, resp) assert.NoError(t, err) - assert.NotNil(t, s.meta.collections[1].Properties) + coll, ok = s.meta.collections.Get(1) + assert.True(t, ok) + assert.NotNil(t, coll.Properties) }) } diff --git a/internal/datacoord/sync_segments_scheduler_test.go b/internal/datacoord/sync_segments_scheduler_test.go index e6fd509d90..86cd5ccb23 100644 --- a/internal/datacoord/sync_segments_scheduler_test.go +++ b/internal/datacoord/sync_segments_scheduler_test.go @@ -29,7 +29,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/datacoord/session" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" - "github.com/milvus-io/milvus/pkg/v2/util/lock" + "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) type SyncSegmentsSchedulerSuite struct { @@ -45,35 +45,34 @@ func Test_SyncSegmentsSchedulerSuite(t *testing.T) { } func (s *SyncSegmentsSchedulerSuite) initParams() { - s.m = &meta{ - RWMutex: lock.RWMutex{}, - collections: map[UniqueID]*collectionInfo{ - 1: { - ID: 1, - Schema: &schemapb.CollectionSchema{ - Name: "coll1", - Fields: []*schemapb.FieldSchema{ - { - FieldID: 100, - Name: "pk", - IsPrimaryKey: true, - Description: "", - DataType: schemapb.DataType_Int64, - }, - { - FieldID: 101, - Name: "vec", - IsPrimaryKey: false, - Description: "", - DataType: schemapb.DataType_FloatVector, - }, - }, + collections := typeutil.NewConcurrentMap[UniqueID, *collectionInfo]() + collections.Insert(1, &collectionInfo{ + ID: 1, + Schema: &schemapb.CollectionSchema{ + Name: "coll1", + Fields: []*schemapb.FieldSchema{ + { + FieldID: 100, + Name: "pk", + IsPrimaryKey: true, + Description: "", + DataType: schemapb.DataType_Int64, + }, + { + FieldID: 101, + Name: "vec", + IsPrimaryKey: false, + Description: "", + DataType: schemapb.DataType_FloatVector, }, - Partitions: []int64{2, 3}, - VChannelNames: []string{"channel1", "channel2"}, }, - 2: nil, }, + Partitions: []int64{2, 3}, + VChannelNames: []string{"channel1", "channel2"}, + }) + collections.Insert(2, nil) + s.m = &meta{ + collections: collections, segments: &SegmentsInfo{ secondaryIndexes: segmentInfoIndexes{ channel2Segments: map[string]map[UniqueID]*SegmentInfo{ @@ -356,9 +355,13 @@ func (s *SyncSegmentsSchedulerSuite) Test_SyncSegmentsFail() { ctx := context.Background() s.Run("pk not found", func() { - sss.meta.collections[1].Schema.Fields[0].IsPrimaryKey = false + coll, ok := sss.meta.collections.Get(1) + s.True(ok) + coll.Schema.Fields[0].IsPrimaryKey = false sss.SyncSegmentsForCollections(ctx) - sss.meta.collections[1].Schema.Fields[0].IsPrimaryKey = true + coll, ok = sss.meta.collections.Get(1) + s.True(ok) + coll.Schema.Fields[0].IsPrimaryKey = true }) s.Run("find watcher failed", func() { diff --git a/internal/datacoord/task_scheduler_test.go b/internal/datacoord/task_scheduler_test.go index 5cb5df05b7..4ef6ee4c3a 100644 --- a/internal/datacoord/task_scheduler_test.go +++ b/internal/datacoord/task_scheduler_test.go @@ -509,7 +509,8 @@ func withStatsTaskMeta(stm *statsTaskMeta) testMetaOption { func createMeta(catalog metastore.DataCoordCatalog, opts ...testMetaOption) *meta { mt := &meta{ - catalog: catalog, + catalog: catalog, + collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo](), segments: &SegmentsInfo{ segments: map[UniqueID]*SegmentInfo{ 1000: { @@ -1594,6 +1595,16 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() { IsPartitionKey: true, }, } + + collections := typeutil.NewConcurrentMap[UniqueID, *collectionInfo]() + collections.Insert(collID, &collectionInfo{ + ID: collID, + Schema: &schemapb.CollectionSchema{ + Fields: fieldsSchema, + }, + CreatedAt: 0, + }) + segIndexes := typeutil.NewConcurrentMap[UniqueID, *typeutil.ConcurrentMap[UniqueID, *model.SegmentIndex]]() segIdx := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]() segIdx.Insert(indexID, &model.SegmentIndex{ @@ -1613,17 +1624,10 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() { IndexSerializedSize: 0, }) segIndexes.Insert(segID, segIdx) + mt := meta{ - catalog: catalog, - collections: map[int64]*collectionInfo{ - collID: { - ID: collID, - Schema: &schemapb.CollectionSchema{ - Fields: fieldsSchema, - }, - CreatedAt: 0, - }, - }, + catalog: catalog, + collections: collections, analyzeMeta: &analyzeMeta{ ctx: context.Background(), @@ -1754,9 +1758,11 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() { mt.indexMeta.fieldIndexLock.Lock() defer mt.indexMeta.fieldIndexLock.Unlock() mt.indexMeta.indexes[collID][indexID].IndexParams[1].Value = "HNSW" - mt.collections[collID].Schema.Fields[0].DataType = schemapb.DataType_FloatVector - mt.collections[collID].Schema.Fields[1].IsPartitionKey = true - mt.collections[collID].Schema.Fields[1].DataType = schemapb.DataType_VarChar + coll, ok := mt.collections.Get(collID) + s.True(ok) + coll.Schema.Fields[0].DataType = schemapb.DataType_FloatVector + coll.Schema.Fields[1].IsPartitionKey = true + coll.Schema.Fields[1].DataType = schemapb.DataType_VarChar } in.EXPECT().QueryJobsV2(mock.Anything, mock.Anything).RunAndReturn( @@ -1815,7 +1821,9 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() { schemapb.DataType_VarChar, schemapb.DataType_String, } { - mt.collections[collID].Schema.Fields[1].DataType = dataType + coll, ok := mt.collections.Get(collID) + s.True(ok) + coll.Schema.Fields[1].DataType = dataType in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn( func(ctx context.Context, in *workerpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) { s.NotZero(len(in.GetIndexRequest().OptionalScalarFields), "optional scalar field should be set") @@ -1863,7 +1871,9 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() { for _, dataType := range []schemapb.DataType{ schemapb.DataType_SparseFloatVector, } { - mt.collections[collID].Schema.Fields[0].DataType = dataType + coll, ok := mt.collections.Get(collID) + s.True(ok) + coll.Schema.Fields[0].DataType = dataType in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn( func(ctx context.Context, in *workerpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) { s.Zero(len(in.GetIndexRequest().OptionalScalarFields), "optional scalar field should not be set") @@ -1893,7 +1903,9 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() { schemapb.DataType_Array, schemapb.DataType_JSON, } { - mt.collections[collID].Schema.Fields[1].DataType = dataType + coll, ok := mt.collections.Get(collID) + s.True(ok) + coll.Schema.Fields[1].DataType = dataType in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn( func(ctx context.Context, in *workerpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) { s.Zero(len(in.GetIndexRequest().OptionalScalarFields), "optional scalar field should be set") @@ -1916,7 +1928,9 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() { s.Run("Submit returns empty optional field when no partition key", func() { paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true") - mt.collections[collID].Schema.Fields[1].IsPartitionKey = false + coll, ok := mt.collections.Get(collID) + s.True(ok) + coll.Schema.Fields[1].IsPartitionKey = false in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn( func(ctx context.Context, in *workerpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) { s.Zero(len(in.GetIndexRequest().OptionalScalarFields), "optional scalar field should be set")