enhance: Optimize datacoord meta mutex (#40552)

Use a separate collection mutex.

issue: https://github.com/milvus-io/milvus/issues/40551

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
yihao.dai 2025-03-25 13:46:25 +08:00 committed by GitHub
parent 7fdb2e144f
commit f65e6b7c6e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 545 additions and 532 deletions

View File

@ -32,6 +32,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
func TestClusteringCompactionPolicySuite(t *testing.T) {
@ -67,7 +68,7 @@ func (s *ClusteringCompactionPolicySuite) SetupTest() {
meta := &meta{
segments: NewSegmentsInfo(),
collections: make(map[UniqueID]*collectionInfo, 0),
collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo](),
compactionTaskMeta: compactionTaskMeta,
partitionStatsMeta: partitionStatsMeta,
indexMeta: indexMeta,
@ -109,22 +110,22 @@ func (s *ClusteringCompactionPolicySuite) TestTriggerWithNoCollecitons() {
func (s *ClusteringCompactionPolicySuite) TestTriggerWithCollections() {
// valid collection
s.meta.collections[1] = &collectionInfo{
s.meta.collections.Insert(1, &collectionInfo{
ID: 1,
Schema: newTestScalarClusteringKeySchema(),
}
})
// deleted collection
s.meta.collections[2] = &collectionInfo{
s.meta.collections.Insert(2, &collectionInfo{
ID: 2,
Schema: newTestScalarClusteringKeySchema(),
}
})
s.clusteringCompactionPolicy.meta = s.meta
s.handler.EXPECT().GetCollection(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, collectionID int64) (*collectionInfo, error) {
if collectionID == 2 {
return nil, errors.New("mock get collection fail error")
}
coll, exist := s.meta.collections[collectionID]
coll, exist := s.meta.collections.Get(collectionID)
if exist {
return coll, nil
}
@ -308,10 +309,10 @@ func (s *ClusteringCompactionPolicySuite) TestTriggerOneCollectionNormal() {
Channel: "ch-1",
}
s.meta.collections[testLabel.CollectionID] = &collectionInfo{
s.meta.collections.Insert(testLabel.CollectionID, &collectionInfo{
ID: testLabel.CollectionID,
Schema: newTestScalarClusteringKeySchema(),
}
})
segments := genSegmentsForMeta(testLabel)
for id, segment := range segments {
@ -319,7 +320,7 @@ func (s *ClusteringCompactionPolicySuite) TestTriggerOneCollectionNormal() {
}
s.handler.EXPECT().GetCollection(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, collectionID int64) (*collectionInfo, error) {
coll, exist := s.meta.collections[collectionID]
coll, exist := s.meta.collections.Get(collectionID)
if exist {
return coll, nil
}

View File

@ -27,6 +27,7 @@ import (
"github.com/milvus-io/milvus/internal/datacoord/allocator"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
func TestSingleCompactionPolicySuite(t *testing.T) {
@ -53,7 +54,7 @@ func (s *SingleCompactionPolicySuite) SetupTest() {
}
segments := genSegmentsForMeta(s.testLabel)
meta := &meta{segments: NewSegmentsInfo()}
meta := &meta{segments: NewSegmentsInfo(), collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()}
for id, segment := range segments {
meta.segments.SetSegment(id, segment)
}

View File

@ -149,6 +149,11 @@ func Test_compactionTrigger_force_without_index(t *testing.T) {
Deltalogs: deltaLogs,
IsSorted: true,
}
collections := typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()
collections.Insert(collectionID, &collectionInfo{
ID: collectionID,
Schema: schema,
})
m := &meta{
catalog: catalog,
channelCPs: newChannelCps(),
@ -172,12 +177,7 @@ func Test_compactionTrigger_force_without_index(t *testing.T) {
segmentIndexes: typeutil.NewConcurrentMap[UniqueID, *typeutil.ConcurrentMap[UniqueID, *model.SegmentIndex]](),
indexes: map[UniqueID]map[UniqueID]*model.Index{},
},
collections: map[int64]*collectionInfo{
collectionID: {
ID: collectionID,
Schema: schema,
},
},
collections: collections,
}
compactionHandler := &spyCompactionHandler{t: t, spyChan: make(chan *datapb.CompactionPlan, 1), meta: m}
@ -308,6 +308,113 @@ func Test_compactionTrigger_force(t *testing.T) {
},
}
collections := typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()
collections.Insert(2, &collectionInfo{
ID: 2,
Schema: schema,
Properties: map[string]string{
common.CollectionTTLConfigKey: "0",
},
})
collections.Insert(1111, &collectionInfo{
ID: 1111,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: vecFieldID,
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
},
},
},
},
Properties: map[string]string{
common.CollectionTTLConfigKey: "error",
},
})
collections.Insert(1000, &collectionInfo{
ID: 1000,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: vecFieldID,
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
},
},
},
},
})
// error (has no vector field)
collections.Insert(2000, &collectionInfo{
ID: 2000,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: vecFieldID,
DataType: schemapb.DataType_Int16,
},
},
},
})
// error (has no dim)
collections.Insert(3000, &collectionInfo{
ID: 3000,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: vecFieldID,
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{},
},
},
},
},
})
// error (dim parse fail)
collections.Insert(4000, &collectionInfo{
ID: 4000,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: vecFieldID,
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128error",
},
},
},
},
},
})
collections.Insert(10000, &collectionInfo{
ID: 10000,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: vecFieldID,
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
},
},
},
},
})
im := &indexMeta{
segmentIndexes: typeutil.NewConcurrentMap[UniqueID, *typeutil.ConcurrentMap[UniqueID, *model.SegmentIndex]](),
indexes: map[UniqueID]map[UniqueID]*model.Index{
@ -443,115 +550,8 @@ func Test_compactionTrigger_force(t *testing.T) {
},
},
},
indexMeta: im,
collections: map[int64]*collectionInfo{
2: {
ID: 2,
Schema: schema,
Properties: map[string]string{
common.CollectionTTLConfigKey: "0",
},
},
1111: {
ID: 1111,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: vecFieldID,
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
},
},
},
},
Properties: map[string]string{
common.CollectionTTLConfigKey: "error",
},
},
1000: {
ID: 1000,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: vecFieldID,
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
},
},
},
},
},
// error (has no vector field)
2000: {
ID: 2000,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: vecFieldID,
DataType: schemapb.DataType_Int16,
},
},
},
},
// error (has no dim)
3000: {
ID: 3000,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: vecFieldID,
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{},
},
},
},
},
},
// error (dim parse fail)
4000: {
ID: 4000,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: vecFieldID,
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128error",
},
},
},
},
},
},
10000: {
ID: 10000,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: vecFieldID,
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
},
},
},
},
},
},
indexMeta: im,
collections: collections,
},
mock0Allocator,
nil,
@ -851,6 +851,25 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) {
mock0Allocator := newMockAllocator(t)
collections := typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()
collections.Insert(2, &collectionInfo{
ID: 2,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: vecFieldID,
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
},
},
},
},
})
tests := []struct {
name string
fields fields
@ -862,28 +881,10 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) {
"test many segments",
fields{
&meta{
segments: segmentInfos,
channelCPs: newChannelCps(),
collections: map[int64]*collectionInfo{
2: {
ID: 2,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: vecFieldID,
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
},
},
},
},
},
},
indexMeta: indexMeta,
segments: segmentInfos,
channelCPs: newChannelCps(),
collections: collections,
indexMeta: indexMeta,
},
mock0Allocator,
nil,
@ -1004,6 +1005,25 @@ func Test_compactionTrigger_noplan(t *testing.T) {
im := newSegmentIndexMeta(nil)
im.indexes[2] = make(map[UniqueID]*model.Index)
collections := typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()
collections.Insert(2, &collectionInfo{
ID: 2,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: vecFieldID,
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
},
},
},
},
})
tests := []struct {
name string
fields fields
@ -1070,25 +1090,7 @@ func Test_compactionTrigger_noplan(t *testing.T) {
},
},
},
collections: map[int64]*collectionInfo{
2: {
ID: 2,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: vecFieldID,
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
},
},
},
},
},
},
collections: collections,
},
mock0Allocator,
make(chan *compactionSignal, 1),
@ -1217,6 +1219,25 @@ func Test_compactionTrigger_PrioritizedCandi(t *testing.T) {
return segIdx
}
collections := typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()
collections.Insert(2, &collectionInfo{
ID: 2,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: vecFieldID,
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
},
},
},
},
})
im := &indexMeta{
segmentIndexes: typeutil.NewConcurrentMap[UniqueID, *typeutil.ConcurrentMap[UniqueID, *model.SegmentIndex]](),
indexes: map[UniqueID]map[UniqueID]*model.Index{
@ -1261,27 +1282,9 @@ func Test_compactionTrigger_PrioritizedCandi(t *testing.T) {
// 8 small segments
channelCPs: newChannelCps(),
segments: mockSegmentsInfo(20, 20, 20, 20, 20, 20),
indexMeta: im,
collections: map[int64]*collectionInfo{
2: {
ID: 2,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: vecFieldID,
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
},
},
},
},
},
},
segments: mockSegmentsInfo(20, 20, 20, 20, 20, 20),
indexMeta: im,
collections: collections,
},
mock0Allocator,
make(chan *compactionSignal, 1),
@ -1358,6 +1361,20 @@ func Test_compactionTrigger_SmallCandi(t *testing.T) {
})
return segIdx
}
collections := typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()
collections.Insert(2, &collectionInfo{
ID: 2,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: vecFieldID,
DataType: schemapb.DataType_FloatVector,
},
},
},
})
im := &indexMeta{
segmentIndexes: typeutil.NewConcurrentMap[UniqueID, *typeutil.ConcurrentMap[UniqueID, *model.SegmentIndex]](),
indexes: map[UniqueID]map[UniqueID]*model.Index{
@ -1404,21 +1421,9 @@ func Test_compactionTrigger_SmallCandi(t *testing.T) {
channelCPs: newChannelCps(),
// 7 segments with 200MB each, the compaction is expected to be triggered
// as the first 5 being merged, and 1 plus being squeezed.
segments: mockSegmentsInfo(200, 200, 200, 200, 200, 200, 200),
indexMeta: im,
collections: map[int64]*collectionInfo{
2: {
ID: 2,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: vecFieldID,
DataType: schemapb.DataType_FloatVector,
},
},
},
},
},
segments: mockSegmentsInfo(200, 200, 200, 200, 200, 200, 200),
indexMeta: im,
collections: collections,
},
mock0Allocator,
make(chan *compactionSignal, 1),
@ -1533,6 +1538,20 @@ func Test_compactionTrigger_SqueezeNonPlannedSegs(t *testing.T) {
im.segmentIndexes.Insert(5, genSegIndex(5, indexID, 20))
im.segmentIndexes.Insert(6, genSegIndex(6, indexID, 20))
mock0Allocator := newMockAllocator(t)
collections := typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()
collections.Insert(2, &collectionInfo{
ID: 2,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: vecFieldID,
DataType: schemapb.DataType_FloatVector,
},
},
},
})
tests := []struct {
name string
fields fields
@ -1544,23 +1563,10 @@ func Test_compactionTrigger_SqueezeNonPlannedSegs(t *testing.T) {
"test small segment",
fields{
&meta{
channelCPs: newChannelCps(),
segments: mockSegmentsInfo(600, 600, 600, 600, 260, 260),
indexMeta: im,
collections: map[int64]*collectionInfo{
2: {
ID: 2,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: vecFieldID,
DataType: schemapb.DataType_FloatVector,
},
},
},
},
},
channelCPs: newChannelCps(),
segments: mockSegmentsInfo(600, 600, 600, 600, 260, 260),
indexMeta: im,
collections: collections,
},
mock0Allocator,
make(chan *compactionSignal, 1),
@ -1713,6 +1719,25 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) {
mock0Allocator := newMockAllocator(t)
collections := typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()
collections.Insert(2, &collectionInfo{
ID: 2,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: vecFieldID,
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
},
},
},
},
})
tests := []struct {
name string
fields fields
@ -1726,27 +1751,9 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) {
&meta{
channelCPs: newChannelCps(),
segments: segmentInfos,
collections: map[int64]*collectionInfo{
2: {
ID: 2,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: vecFieldID,
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
},
},
},
},
},
},
indexMeta: indexMeta,
segments: segmentInfos,
collections: collections,
indexMeta: indexMeta,
},
mock0Allocator,
make(chan *compactionSignal, 1),
@ -2083,7 +2090,7 @@ func Test_triggerSingleCompaction(t *testing.T) {
}()
m := &meta{
channelCPs: newChannelCps(),
segments: NewSegmentsInfo(), collections: make(map[UniqueID]*collectionInfo),
segments: NewSegmentsInfo(), collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo](),
}
got := newCompactionTrigger(m, &compactionPlanHandler{}, newMockAllocator(t),
&ServerHandler{
@ -2243,6 +2250,19 @@ func (s *CompactionTriggerSuite) SetupTest() {
lastFlushTime: time.Now(),
}
collections := typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()
collections.Insert(s.collectionID, &collectionInfo{
ID: s.collectionID,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: s.vecFieldID,
DataType: schemapb.DataType_FloatVector,
},
},
},
})
im := &indexMeta{
segmentIndexes: typeutil.NewConcurrentMap[UniqueID, *typeutil.ConcurrentMap[UniqueID, *model.SegmentIndex]](),
indexes: map[UniqueID]map[UniqueID]*model.Index{
@ -2309,20 +2329,8 @@ func (s *CompactionTriggerSuite) SetupTest() {
},
},
},
indexMeta: im,
collections: map[int64]*collectionInfo{
s.collectionID: {
ID: s.collectionID,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: s.vecFieldID,
DataType: schemapb.DataType_FloatVector,
},
},
},
},
},
indexMeta: im,
collections: collections,
}
s.meta.UpdateChannelCheckpoint(context.TODO(), s.channel, &msgpb.MsgPosition{
ChannelName: s.channel,
@ -2733,6 +2741,16 @@ func Test_compactionTrigger_generatePlans(t *testing.T) {
compactTime *compactTime
expectedSize int64
}
collections := typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()
collections.Insert(2, &collectionInfo{
ID: 2,
Schema: schema,
Properties: map[string]string{
common.CollectionTTLConfigKey: "0",
},
})
segIndexes := typeutil.NewConcurrentMap[UniqueID, *typeutil.ConcurrentMap[UniqueID, *model.SegmentIndex]]()
segIdx0 := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]()
segIdx0.Insert(indexID, &model.SegmentIndex{
@ -2823,15 +2841,7 @@ func Test_compactionTrigger_generatePlans(t *testing.T) {
},
},
},
collections: map[int64]*collectionInfo{
2: {
ID: 2,
Schema: schema,
Properties: map[string]string{
common.CollectionTTLConfigKey: "0",
},
},
},
collections: collections,
},
mock0Allocator,
nil,

View File

@ -363,7 +363,6 @@ func createMetaForRecycleUnusedIndexes(catalog metastore.DataCoordCatalog) *meta
indexID = UniqueID(400)
)
return &meta{
RWMutex: lock.RWMutex{},
ctx: ctx,
catalog: catalog,
collections: nil,
@ -517,7 +516,6 @@ func createMetaForRecycleUnusedSegIndexes(catalog metastore.DataCoordCatalog) *m
segIndexes.Insert(segID, segIdx0)
segIndexes.Insert(segID+1, segIdx1)
meta := &meta{
RWMutex: lock.RWMutex{},
ctx: ctx,
catalog: catalog,
collections: nil,
@ -682,7 +680,6 @@ func createMetaTableForRecycleUnusedIndexFiles(catalog *datacoord.Catalog) *meta
segIndexes.Insert(segID, segIdx0)
segIndexes.Insert(segID+1, segIdx1)
meta := &meta{
RWMutex: lock.RWMutex{},
ctx: ctx,
catalog: catalog,
collections: nil,
@ -1044,6 +1041,33 @@ func TestGarbageCollector_clearETCD(t *testing.T) {
},
},
}
collections := typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()
collections.Insert(collID, &collectionInfo{
ID: collID,
Schema: &schemapb.CollectionSchema{
Name: "",
Description: "",
AutoID: false,
Fields: []*schemapb.FieldSchema{
{
FieldID: fieldID,
Name: "",
IsPrimaryKey: false,
Description: "",
DataType: schemapb.DataType_FloatVector,
TypeParams: nil,
IndexParams: nil,
AutoID: false,
State: 0,
},
},
},
Partitions: nil,
StartPositions: nil,
Properties: nil,
})
segIndexes := typeutil.NewConcurrentMap[UniqueID, *typeutil.ConcurrentMap[UniqueID, *model.SegmentIndex]]()
segIdx0 := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]()
segIdx0.Insert(indexID, &model.SegmentIndex{
@ -1111,32 +1135,7 @@ func TestGarbageCollector_clearETCD(t *testing.T) {
},
},
collections: map[UniqueID]*collectionInfo{
collID: {
ID: collID,
Schema: &schemapb.CollectionSchema{
Name: "",
Description: "",
AutoID: false,
Fields: []*schemapb.FieldSchema{
{
FieldID: fieldID,
Name: "",
IsPrimaryKey: false,
Description: "",
DataType: schemapb.DataType_FloatVector,
TypeParams: nil,
IndexParams: nil,
AutoID: false,
State: 0,
},
},
},
Partitions: nil,
StartPositions: nil,
Properties: nil,
},
},
collections: collections,
}
m.indexMeta.segmentBuildInfo.Add(&model.SegmentIndex{

View File

@ -919,7 +919,7 @@ func (m *indexMeta) GetAllSegIndexes() map[int64]*model.SegmentIndex {
}
// SetStoredIndexFileSizeMetric returns the total index files size of all segment for each collection.
func (m *indexMeta) SetStoredIndexFileSizeMetric(collections map[UniqueID]*collectionInfo) uint64 {
func (m *indexMeta) SetStoredIndexFileSizeMetric(collections *typeutil.ConcurrentMap[UniqueID, *collectionInfo]) uint64 {
m.fieldIndexLock.Lock()
defer m.fieldIndexLock.Unlock()
@ -927,7 +927,7 @@ func (m *indexMeta) SetStoredIndexFileSizeMetric(collections map[UniqueID]*colle
metrics.DataCoordStoredIndexFilesSize.Reset()
for _, segmentIdx := range m.segmentBuildInfo.List() {
coll, ok := collections[segmentIdx.CollectionID]
coll, ok := collections.Get(segmentIdx.CollectionID)
if ok {
metrics.DataCoordStoredIndexFilesSize.WithLabelValues(coll.DatabaseName, coll.Schema.GetName(),
fmt.Sprint(segmentIdx.CollectionID)).Add(float64(segmentIdx.IndexSerializedSize))

View File

@ -91,21 +91,21 @@ func TestServer_CreateIndex(t *testing.T) {
mock0Allocator := newMockAllocator(t)
collections := typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()
collections.Insert(collID, &collectionInfo{
ID: collID,
Partitions: nil,
StartPositions: nil,
Properties: nil,
CreatedAt: 0,
})
indexMeta := newSegmentIndexMeta(catalog)
s := &Server{
meta: &meta{
catalog: catalog,
collections: map[UniqueID]*collectionInfo{
collID: {
ID: collID,
Partitions: nil,
StartPositions: nil,
Properties: nil,
CreatedAt: 0,
},
},
indexMeta: indexMeta,
catalog: catalog,
collections: collections,
indexMeta: indexMeta,
},
allocator: mock0Allocator,
notifyIndexChan: make(chan UniqueID, 1),
@ -2664,15 +2664,16 @@ func TestJsonIndex(t *testing.T) {
},
}, nil)
collections := typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()
collections.Insert(collID, &collectionInfo{
ID: collID,
})
s := &Server{
meta: &meta{
catalog: catalog,
collections: map[UniqueID]*collectionInfo{
collID: {
ID: collID,
},
},
indexMeta: indexMeta,
catalog: catalog,
collections: collections,
indexMeta: indexMeta,
},
allocator: mock0Allocator,
notifyIndexChan: make(chan UniqueID, 1),

View File

@ -41,33 +41,34 @@ func (s *jobManagerSuite) TestJobManager_triggerStatsTaskLoop() {
catalog := mocks.NewDataCoordCatalog(s.T())
catalog.EXPECT().SaveStatsTask(mock.Anything, mock.Anything).Return(nil)
mt := &meta{
collections: map[UniqueID]*collectionInfo{
1: {
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
collections := typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()
collections.Insert(1, &collectionInfo{
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 100,
Name: "pk",
DataType: schemapb.DataType_Int64,
},
{
FieldID: 101,
Name: "var",
DataType: schemapb.DataType_VarChar,
TypeParams: []*commonpb.KeyValuePair{
{
FieldID: 100,
Name: "pk",
DataType: schemapb.DataType_Int64,
Key: "enable_match", Value: "true",
},
{
FieldID: 101,
Name: "var",
DataType: schemapb.DataType_VarChar,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "enable_match", Value: "true",
},
{
Key: "enable_analyzer", Value: "true",
},
},
Key: "enable_analyzer", Value: "true",
},
},
},
},
},
})
mt := &meta{
collections: collections,
segments: &SegmentsInfo{
segments: map[UniqueID]*SegmentInfo{
10: {

View File

@ -54,6 +54,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/util/retry"
"github.com/milvus-io/milvus/pkg/v2/util/timerecord"
"github.com/milvus-io/milvus/pkg/v2/util/tsoutil"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
type CompactionMeta interface {
@ -82,12 +83,15 @@ type CompactionMeta interface {
var _ CompactionMeta = (*meta)(nil)
type meta struct {
lock.RWMutex
ctx context.Context
catalog metastore.DataCoordCatalog
collections map[UniqueID]*collectionInfo // collection id to collection info
segments *SegmentsInfo // segment id to segment info
channelCPs *channelCPs // vChannel -> channel checkpoint/see position
ctx context.Context
catalog metastore.DataCoordCatalog
collections *typeutil.ConcurrentMap[UniqueID, *collectionInfo] // collection id to collection info
segMu lock.RWMutex
segments *SegmentsInfo // segment id to segment info
channelCPs *channelCPs // vChannel -> channel checkpoint/see position
chunkManager storage.ChunkManager
indexMeta *indexMeta
@ -178,7 +182,7 @@ func newMeta(ctx context.Context, catalog metastore.DataCoordCatalog, chunkManag
mt := &meta{
ctx: ctx,
catalog: catalog,
collections: make(map[UniqueID]*collectionInfo),
collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo](),
segments: NewSegmentsInfo(),
channelCPs: newChannelCps(),
indexMeta: im,
@ -334,29 +338,24 @@ func (m *meta) reloadCollectionsFromRootcoord(ctx context.Context, broker broker
// Note that collection info is just for caching and will not be set into etcd from datacoord
func (m *meta) AddCollection(collection *collectionInfo) {
log.Info("meta update: add collection", zap.Int64("collectionID", collection.ID))
m.Lock()
defer m.Unlock()
m.collections[collection.ID] = collection
metrics.DataCoordNumCollections.WithLabelValues().Set(float64(len(m.collections)))
m.collections.Insert(collection.ID, collection)
metrics.DataCoordNumCollections.WithLabelValues().Set(float64(m.collections.Len()))
log.Info("meta update: add collection - complete", zap.Int64("collectionID", collection.ID))
}
// DropCollection drop a collection from meta
func (m *meta) DropCollection(collectionID int64) {
log.Info("meta update: drop collection", zap.Int64("collectionID", collectionID))
m.Lock()
defer m.Unlock()
delete(m.collections, collectionID)
metrics.CleanupDataCoordWithCollectionID(collectionID)
metrics.DataCoordNumCollections.WithLabelValues().Set(float64(len(m.collections)))
log.Info("meta update: drop collection - complete", zap.Int64("collectionID", collectionID))
if _, ok := m.collections.GetAndRemove(collectionID); ok {
metrics.CleanupDataCoordWithCollectionID(collectionID)
metrics.DataCoordNumCollections.WithLabelValues().Set(float64(m.collections.Len()))
log.Info("meta update: drop collection - complete", zap.Int64("collectionID", collectionID))
}
}
// GetCollection returns collection info with provided collection id from local cache
func (m *meta) GetCollection(collectionID UniqueID) *collectionInfo {
m.RLock()
defer m.RUnlock()
collection, ok := m.collections[collectionID]
collection, ok := m.collections.Get(collectionID)
if !ok {
return nil
}
@ -365,20 +364,11 @@ func (m *meta) GetCollection(collectionID UniqueID) *collectionInfo {
// GetCollections returns collections from local cache
func (m *meta) GetCollections() []*collectionInfo {
m.RLock()
defer m.RUnlock()
collections := make([]*collectionInfo, 0)
for _, coll := range m.collections {
collections = append(collections, coll)
}
return collections
return m.collections.Values()
}
func (m *meta) GetClonedCollectionInfo(collectionID UniqueID) *collectionInfo {
m.RLock()
defer m.RUnlock()
coll, ok := m.collections[collectionID]
coll, ok := m.collections.Get(collectionID)
if !ok {
return nil
}
@ -432,24 +422,18 @@ func GetSegmentsChanPart(m *meta, collectionID int64, filters ...SegmentFilter)
return result
}
func (m *meta) getNumRowsOfCollectionUnsafe(collectionID UniqueID) int64 {
// GetNumRowsOfCollection returns total rows count of segments belongs to provided collection
func (m *meta) GetNumRowsOfCollection(ctx context.Context, collectionID UniqueID) int64 {
var ret int64
segments := m.segments.GetSegments()
segments := m.SelectSegments(ctx, WithCollection(collectionID), SegmentFilterFunc(func(si *SegmentInfo) bool {
return isSegmentHealthy(si)
}))
for _, segment := range segments {
if isSegmentHealthy(segment) && segment.GetCollectionID() == collectionID {
ret += segment.GetNumOfRows()
}
ret += segment.GetNumOfRows()
}
return ret
}
// GetNumRowsOfCollection returns total rows count of segments belongs to provided collection
func (m *meta) GetNumRowsOfCollection(collectionID UniqueID) int64 {
m.RLock()
defer m.RUnlock()
return m.getNumRowsOfCollectionUnsafe(collectionID)
}
func getBinlogFileCount(s *datapb.SegmentInfo) int {
statsFieldFn := func(fieldBinlogs []*datapb.FieldBinlog) int {
cnt := 0
@ -468,8 +452,8 @@ func getBinlogFileCount(s *datapb.SegmentInfo) int {
func (m *meta) GetQuotaInfo() *metricsinfo.DataCoordQuotaMetrics {
info := &metricsinfo.DataCoordQuotaMetrics{}
m.RLock()
defer m.RUnlock()
m.segMu.RLock()
defer m.segMu.RUnlock()
collectionBinlogSize := make(map[UniqueID]int64)
partitionBinlogSize := make(map[UniqueID]map[UniqueID]int64)
collectionRowsNum := make(map[UniqueID]map[commonpb.SegmentState]int64)
@ -493,7 +477,7 @@ func (m *meta) GetQuotaInfo() *metricsinfo.DataCoordQuotaMetrics {
}
partBinlogSize[segment.GetPartitionID()] += segmentSize
coll, ok := m.collections[segment.GetCollectionID()]
coll, ok := m.collections.Get(segment.GetCollectionID())
if ok {
metrics.DataCoordStoredBinlogSize.WithLabelValues(coll.DatabaseName,
fmt.Sprint(segment.GetCollectionID()), segment.GetState().String()).Add(float64(segmentSize))
@ -516,7 +500,7 @@ func (m *meta) GetQuotaInfo() *metricsinfo.DataCoordQuotaMetrics {
metrics.DataCoordNumStoredRows.Reset()
for collectionID, statesRows := range collectionRowsNum {
coll, ok := m.collections[collectionID]
coll, ok := m.collections.Get(collectionID)
if ok {
for state, rows := range statesRows {
metrics.DataCoordNumStoredRows.WithLabelValues(coll.DatabaseName, fmt.Sprint(collectionID), coll.Schema.GetName(), state.String()).Set(float64(rows))
@ -526,7 +510,7 @@ func (m *meta) GetQuotaInfo() *metricsinfo.DataCoordQuotaMetrics {
metrics.DataCoordL0DeleteEntriesNum.Reset()
for collectionID, entriesNum := range collectionL0RowCounts {
coll, ok := m.collections[collectionID]
coll, ok := m.collections.Get(collectionID)
if ok {
metrics.DataCoordL0DeleteEntriesNum.WithLabelValues(coll.DatabaseName, fmt.Sprint(collectionID)).Set(float64(entriesNum))
}
@ -542,15 +526,13 @@ func (m *meta) GetQuotaInfo() *metricsinfo.DataCoordQuotaMetrics {
// SetStoredIndexFileSizeMetric returns the total index files size of all segment for each collection.
func (m *meta) SetStoredIndexFileSizeMetric() uint64 {
m.RLock()
defer m.RUnlock()
return m.indexMeta.SetStoredIndexFileSizeMetric(m.collections)
}
func (m *meta) GetAllCollectionNumRows() map[int64]int64 {
m.RLock()
defer m.RUnlock()
ret := make(map[int64]int64, len(m.collections))
m.segMu.RLock()
defer m.segMu.RUnlock()
ret := make(map[int64]int64, m.collections.Len())
segments := m.segments.GetSegments()
for _, segment := range segments {
if isSegmentHealthy(segment) {
@ -564,8 +546,8 @@ func (m *meta) GetAllCollectionNumRows() map[int64]int64 {
func (m *meta) AddSegment(ctx context.Context, segment *SegmentInfo) error {
log := log.Ctx(ctx).With(zap.String("channel", segment.GetInsertChannel()))
log.Info("meta update: adding segment - Start", zap.Int64("segmentID", segment.GetID()))
m.Lock()
defer m.Unlock()
m.segMu.Lock()
defer m.segMu.Unlock()
if err := m.catalog.AddSegment(ctx, segment.SegmentInfo); err != nil {
log.Error("meta update: adding segment failed",
zap.Int64("segmentID", segment.GetID()),
@ -583,8 +565,8 @@ func (m *meta) AddSegment(ctx context.Context, segment *SegmentInfo) error {
func (m *meta) DropSegment(ctx context.Context, segmentID UniqueID) error {
log := log.Ctx(ctx)
log.Debug("meta update: dropping segment", zap.Int64("segmentID", segmentID))
m.Lock()
defer m.Unlock()
m.segMu.Lock()
defer m.segMu.Unlock()
segment := m.segments.GetSegment(segmentID)
if segment == nil {
log.Warn("meta update: dropping segment failed - segment not found",
@ -608,8 +590,8 @@ func (m *meta) DropSegment(ctx context.Context, segmentID UniqueID) error {
// GetHealthySegment returns segment info with provided id
// if not segment is found, nil will be returned
func (m *meta) GetHealthySegment(ctx context.Context, segID UniqueID) *SegmentInfo {
m.RLock()
defer m.RUnlock()
m.segMu.RLock()
defer m.segMu.RUnlock()
segment := m.segments.GetSegment(segID)
if segment != nil && isSegmentHealthy(segment) {
return segment
@ -619,8 +601,8 @@ func (m *meta) GetHealthySegment(ctx context.Context, segID UniqueID) *SegmentIn
// Get segments By filter function
func (m *meta) GetSegments(segIDs []UniqueID, filterFunc SegmentInfoSelector) []UniqueID {
m.RLock()
defer m.RUnlock()
m.segMu.RLock()
defer m.segMu.RUnlock()
var result []UniqueID
for _, id := range segIDs {
segment := m.segments.GetSegment(id)
@ -635,21 +617,21 @@ func (m *meta) GetSegments(segIDs []UniqueID, filterFunc SegmentInfoSelector) []
// include the unhealthy segment
// if not segment is found, nil will be returned
func (m *meta) GetSegment(ctx context.Context, segID UniqueID) *SegmentInfo {
m.RLock()
defer m.RUnlock()
m.segMu.RLock()
defer m.segMu.RUnlock()
return m.segments.GetSegment(segID)
}
// GetAllSegmentsUnsafe returns all segments
func (m *meta) GetAllSegmentsUnsafe() []*SegmentInfo {
m.RLock()
defer m.RUnlock()
m.segMu.RLock()
defer m.segMu.RUnlock()
return m.segments.GetSegments()
}
func (m *meta) GetSegmentsTotalNumRows(segmentIDs []UniqueID) int64 {
m.RLock()
defer m.RUnlock()
m.segMu.RLock()
defer m.segMu.RUnlock()
var sum int64 = 0
for _, segmentID := range segmentIDs {
segment := m.segments.GetSegment(segmentID)
@ -663,8 +645,8 @@ func (m *meta) GetSegmentsTotalNumRows(segmentIDs []UniqueID) int64 {
}
func (m *meta) GetSegmentsChannels(segmentIDs []UniqueID) (map[int64]string, error) {
m.RLock()
defer m.RUnlock()
m.segMu.RLock()
defer m.segMu.RUnlock()
segChannels := make(map[int64]string)
for _, segmentID := range segmentIDs {
segment := m.segments.GetSegment(segmentID)
@ -682,8 +664,8 @@ func (m *meta) SetState(ctx context.Context, segmentID UniqueID, targetState com
log.Debug("meta update: setting segment state",
zap.Int64("segmentID", segmentID),
zap.Any("target state", targetState))
m.Lock()
defer m.Unlock()
m.segMu.Lock()
defer m.segMu.Unlock()
curSegInfo := m.segments.GetSegment(segmentID)
if curSegInfo == nil {
log.Warn("meta update: setting segment state - segment not found",
@ -722,8 +704,8 @@ func (m *meta) SetState(ctx context.Context, segmentID UniqueID, targetState com
}
func (m *meta) UpdateSegment(segmentID int64, operators ...SegmentOperator) error {
m.Lock()
defer m.Unlock()
m.segMu.Lock()
defer m.segMu.Unlock()
log := log.Ctx(context.TODO())
info := m.segments.GetSegment(segmentID)
if info == nil {
@ -1121,8 +1103,8 @@ func UpdateAsDroppedIfEmptyWhenFlushing(segmentID int64) UpdateOperator {
// updateSegmentsInfo update segment infos
// will exec all operators, and update all changed segments
func (m *meta) UpdateSegmentsInfo(ctx context.Context, operators ...UpdateOperator) error {
m.Lock()
defer m.Unlock()
m.segMu.Lock()
defer m.segMu.Unlock()
updatePack := &updateSegmentPack{
meta: m,
segments: make(map[int64]*SegmentInfo),
@ -1165,8 +1147,8 @@ func (m *meta) UpdateDropChannelSegmentInfo(ctx context.Context, channel string,
log := log.Ctx(ctx)
log.Debug("meta update: update drop channel segment info",
zap.String("channel", channel))
m.Lock()
defer m.Unlock()
m.segMu.Lock()
defer m.segMu.Unlock()
// Prepare segment metric mutation.
metricMutation := &segMetricMutation{
@ -1391,14 +1373,14 @@ func (m *meta) GetFlushingSegments() []*SegmentInfo {
// SelectSegments select segments with selector
func (m *meta) SelectSegments(ctx context.Context, filters ...SegmentFilter) []*SegmentInfo {
m.RLock()
defer m.RUnlock()
m.segMu.RLock()
defer m.segMu.RUnlock()
return m.segments.GetSegmentsBySelector(filters...)
}
func (m *meta) GetRealSegmentsForChannel(channel string) []*SegmentInfo {
m.RLock()
defer m.RUnlock()
m.segMu.RLock()
defer m.segMu.RUnlock()
return m.segments.GetRealSegmentsForChannel(channel)
}
@ -1407,8 +1389,8 @@ func (m *meta) AddAllocation(segmentID UniqueID, allocation *Allocation) error {
log.Ctx(m.ctx).Debug("meta update: add allocation",
zap.Int64("segmentID", segmentID),
zap.Any("allocation", allocation))
m.Lock()
defer m.Unlock()
m.segMu.Lock()
defer m.segMu.Unlock()
curSegInfo := m.segments.GetSegment(segmentID)
if curSegInfo == nil {
// TODO: Error handling.
@ -1423,24 +1405,24 @@ func (m *meta) AddAllocation(segmentID UniqueID, allocation *Allocation) error {
}
func (m *meta) SetRowCount(segmentID UniqueID, rowCount int64) {
m.Lock()
defer m.Unlock()
m.segMu.Lock()
defer m.segMu.Unlock()
m.segments.SetRowCount(segmentID, rowCount)
}
// SetAllocations set Segment allocations, will overwrite ALL original allocations
// Note that allocations is not persisted in KV store
func (m *meta) SetAllocations(segmentID UniqueID, allocations []*Allocation) {
m.Lock()
defer m.Unlock()
m.segMu.Lock()
defer m.segMu.Unlock()
m.segments.SetAllocations(segmentID, allocations)
}
// SetLastExpire set lastExpire time for segment
// Note that last is not necessary to store in KV meta
func (m *meta) SetLastExpire(segmentID UniqueID, lastExpire uint64) {
m.Lock()
defer m.Unlock()
m.segMu.Lock()
defer m.segMu.Unlock()
clonedSegment := m.segments.GetSegment(segmentID).Clone()
clonedSegment.LastExpireTime = lastExpire
m.segments.SetSegment(segmentID, clonedSegment)
@ -1449,22 +1431,22 @@ func (m *meta) SetLastExpire(segmentID UniqueID, lastExpire uint64) {
// SetLastFlushTime set LastFlushTime for segment with provided `segmentID`
// Note that lastFlushTime is not persisted in KV store
func (m *meta) SetLastFlushTime(segmentID UniqueID, t time.Time) {
m.Lock()
defer m.Unlock()
m.segMu.Lock()
defer m.segMu.Unlock()
m.segments.SetFlushTime(segmentID, t)
}
// SetLastWrittenTime set LastWrittenTime for segment with provided `segmentID`
// Note that lastWrittenTime is not persisted in KV store
func (m *meta) SetLastWrittenTime(segmentID UniqueID) {
m.Lock()
defer m.Unlock()
m.segMu.Lock()
defer m.segMu.Unlock()
m.segments.SetLastWrittenTime(segmentID)
}
func (m *meta) CheckSegmentsStating(ctx context.Context, segmentIDs []UniqueID) (exist bool, hasStating bool) {
m.RLock()
defer m.RUnlock()
m.segMu.RLock()
defer m.segMu.RUnlock()
exist = true
for _, segmentID := range segmentIDs {
seg := m.segments.GetSegment(segmentID)
@ -1481,16 +1463,16 @@ func (m *meta) CheckSegmentsStating(ctx context.Context, segmentIDs []UniqueID)
}
func (m *meta) SetSegmentStating(segmentID UniqueID, stating bool) {
m.Lock()
defer m.Unlock()
m.segMu.Lock()
defer m.segMu.Unlock()
m.segments.SetIsStating(segmentID, stating)
}
// SetSegmentCompacting sets compaction state for segment
func (m *meta) SetSegmentCompacting(segmentID UniqueID, compacting bool) {
m.Lock()
defer m.Unlock()
m.segMu.Lock()
defer m.segMu.Unlock()
m.segments.SetIsCompacting(segmentID, compacting)
}
@ -1499,8 +1481,8 @@ func (m *meta) SetSegmentCompacting(segmentID UniqueID, compacting bool) {
// if true, set them compacting and return true
// if false, skip setting and
func (m *meta) CheckAndSetSegmentsCompacting(ctx context.Context, segmentIDs []UniqueID) (exist, canDo bool) {
m.Lock()
defer m.Unlock()
m.segMu.Lock()
defer m.segMu.Unlock()
var hasCompacting bool
exist = true
for _, segmentID := range segmentIDs {
@ -1524,8 +1506,8 @@ func (m *meta) CheckAndSetSegmentsCompacting(ctx context.Context, segmentIDs []U
}
func (m *meta) SetSegmentsCompacting(ctx context.Context, segmentIDs []UniqueID, compacting bool) {
m.Lock()
defer m.Unlock()
m.segMu.Lock()
defer m.segMu.Unlock()
for _, segmentID := range segmentIDs {
m.segments.SetIsCompacting(segmentID, compacting)
}
@ -1533,8 +1515,8 @@ func (m *meta) SetSegmentsCompacting(ctx context.Context, segmentIDs []UniqueID,
// SetSegmentLevel sets level for segment
func (m *meta) SetSegmentLevel(segmentID UniqueID, level datapb.SegmentLevel) {
m.Lock()
defer m.Unlock()
m.segMu.Lock()
defer m.segMu.Unlock()
m.segments.SetLevel(segmentID, level)
}
@ -1739,8 +1721,8 @@ func (m *meta) completeMixCompactionMutation(t *datapb.CompactionTask, result *d
}
func (m *meta) CompleteCompactionMutation(ctx context.Context, t *datapb.CompactionTask, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error) {
m.Lock()
defer m.Unlock()
m.segMu.Lock()
defer m.segMu.Unlock()
switch t.GetType() {
case datapb.CompactionType_MixCompaction:
return m.completeMixCompactionMutation(t, result)
@ -1771,8 +1753,8 @@ func isSegmentHealthy(segment *SegmentInfo) bool {
}
func (m *meta) HasSegments(segIDs []UniqueID) (bool, error) {
m.RLock()
defer m.RUnlock()
m.segMu.RLock()
defer m.segMu.RUnlock()
for _, segID := range segIDs {
if _, ok := m.segments.segments[segID]; !ok {
@ -1784,8 +1766,8 @@ func (m *meta) HasSegments(segIDs []UniqueID) (bool, error) {
// GetCompactionTo returns the segment info of the segment to be compacted to.
func (m *meta) GetCompactionTo(segmentID int64) ([]*SegmentInfo, bool) {
m.RLock()
defer m.RUnlock()
m.segMu.RLock()
defer m.segMu.RUnlock()
return m.segments.GetCompactionTo(segmentID)
}
@ -2015,10 +1997,7 @@ func updateSegStateAndPrepareMetrics(segToUpdate *SegmentInfo, targetState commo
}
func (m *meta) ListCollections() []int64 {
m.RLock()
defer m.RUnlock()
return lo.Keys(m.collections)
return m.collections.Keys()
}
func (m *meta) DropCompactionTask(ctx context.Context, task *datapb.CompactionTask) error {
@ -2093,8 +2072,8 @@ func (m *meta) CleanPartitionStatsInfo(ctx context.Context, info *datapb.Partiti
}
func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.StatsResult) (*segMetricMutation, error) {
m.Lock()
defer m.Unlock()
m.segMu.Lock()
defer m.segMu.Unlock()
log := log.Ctx(m.ctx).With(zap.Int64("collectionID", result.GetCollectionID()),
zap.Int64("partitionID", result.GetPartitionID()),
@ -2169,8 +2148,8 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats
}
func (m *meta) getSegmentsMetrics(collectionID int64) []*metricsinfo.Segment {
m.RLock()
defer m.RUnlock()
m.segMu.RLock()
defer m.segMu.RUnlock()
segments := make([]*metricsinfo.Segment, 0, len(m.segments.segments))
for _, s := range m.segments.segments {

View File

@ -50,6 +50,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/testutils"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
// MetaReloadSuite tests meta reload & meta creation related logic
@ -655,7 +656,7 @@ func TestMeta_Basic(t *testing.T) {
const rowCount1 = 300
// no segment
nums := meta.GetNumRowsOfCollection(collID)
nums := meta.GetNumRowsOfCollection(context.Background(), collID)
assert.EqualValues(t, 0, nums)
// add seg1 with 100 rows
@ -675,7 +676,7 @@ func TestMeta_Basic(t *testing.T) {
// check partition/collection statistics
nums = meta.GetNumRowsOfPartition(context.TODO(), collID, partID0)
assert.EqualValues(t, (rowCount0 + rowCount1), nums)
nums = meta.GetNumRowsOfCollection(collID)
nums = meta.GetNumRowsOfCollection(context.Background(), collID)
assert.EqualValues(t, (rowCount0 + rowCount1), nums)
})
@ -742,7 +743,7 @@ func TestMeta_Basic(t *testing.T) {
assert.Equal(t, int64(size0+size1), quotaInfo.CollectionBinlogSize[collID])
assert.Equal(t, int64(size0+size1), quotaInfo.TotalBinlogSize)
meta.collections[collID] = collInfo
meta.collections.Insert(collID, collInfo)
quotaInfo = meta.GetQuotaInfo()
assert.Len(t, quotaInfo.CollectionBinlogSize, 1)
assert.Equal(t, int64(size0+size1), quotaInfo.CollectionBinlogSize[collID])
@ -754,12 +755,11 @@ func TestMeta_Basic(t *testing.T) {
ret := meta.SetStoredIndexFileSizeMetric()
assert.Equal(t, uint64(0), ret)
meta.collections = map[UniqueID]*collectionInfo{
100: {
ID: 100,
DatabaseName: "db",
},
}
meta.collections = typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()
meta.collections.Insert(100, &collectionInfo{
ID: 100,
DatabaseName: "db",
})
ret = meta.SetStoredIndexFileSizeMetric()
assert.Equal(t, uint64(11), ret)
})
@ -1347,7 +1347,7 @@ func Test_meta_GcConfirm(t *testing.T) {
func Test_meta_ReloadCollectionsFromRootcoords(t *testing.T) {
t.Run("fail to list database", func(t *testing.T) {
m := &meta{
collections: make(map[UniqueID]*collectionInfo),
collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo](),
}
mockBroker := broker.NewMockBroker(t)
mockBroker.EXPECT().ListDatabases(mock.Anything).Return(nil, errors.New("list database failed, mocked"))
@ -1357,7 +1357,7 @@ func Test_meta_ReloadCollectionsFromRootcoords(t *testing.T) {
t.Run("fail to show collections", func(t *testing.T) {
m := &meta{
collections: make(map[UniqueID]*collectionInfo),
collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo](),
}
mockBroker := broker.NewMockBroker(t)
@ -1371,7 +1371,7 @@ func Test_meta_ReloadCollectionsFromRootcoords(t *testing.T) {
t.Run("fail to describe collection", func(t *testing.T) {
m := &meta{
collections: make(map[UniqueID]*collectionInfo),
collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo](),
}
mockBroker := broker.NewMockBroker(t)
@ -1389,7 +1389,7 @@ func Test_meta_ReloadCollectionsFromRootcoords(t *testing.T) {
t.Run("fail to show partitions", func(t *testing.T) {
m := &meta{
collections: make(map[UniqueID]*collectionInfo),
collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo](),
}
mockBroker := broker.NewMockBroker(t)
@ -1408,7 +1408,7 @@ func Test_meta_ReloadCollectionsFromRootcoords(t *testing.T) {
t.Run("success", func(t *testing.T) {
m := &meta{
collections: make(map[UniqueID]*collectionInfo),
collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo](),
}
mockBroker := broker.NewMockBroker(t)

View File

@ -2488,13 +2488,12 @@ func Test_CheckHealth(t *testing.T) {
return channelManager
}
collections := map[UniqueID]*collectionInfo{
449684528748778322: {
ID: 449684528748778322,
VChannelNames: []string{"ch1", "ch2"},
},
2: nil,
}
collections := typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()
collections.Insert(449684528748778322, &collectionInfo{
ID: 449684528748778322,
VChannelNames: []string{"ch1", "ch2"},
})
collections.Insert(2, nil)
t.Run("not healthy", func(t *testing.T) {
ctx := context.Background()
@ -2657,7 +2656,7 @@ func TestLoadCollectionFromRootCoord(t *testing.T) {
broker := broker.NewMockBroker(t)
s := &Server{
broker: broker,
meta: &meta{collections: make(map[UniqueID]*collectionInfo)},
meta: &meta{collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()},
}
t.Run("has collection fail with error", func(t *testing.T) {
@ -2698,8 +2697,8 @@ func TestLoadCollectionFromRootCoord(t *testing.T) {
t.Run("ok", func(t *testing.T) {
err := s.loadCollectionFromRootCoord(context.TODO(), 0)
assert.NoError(t, err)
assert.Equal(t, 1, len(s.meta.collections))
_, ok := s.meta.collections[1]
assert.Equal(t, 1, s.meta.collections.Len())
_, ok := s.meta.collections.Get(1)
assert.True(t, ok)
})
}

View File

@ -371,7 +371,7 @@ func (s *Server) GetCollectionStatistics(ctx context.Context, req *datapb.GetCol
resp := &datapb.GetCollectionStatisticsResponse{
Status: merr.Success(),
}
nums := s.meta.GetNumRowsOfCollection(req.CollectionID)
nums := s.meta.GetNumRowsOfCollection(ctx, req.CollectionID)
resp.Stats = append(resp.Stats, &commonpb.KeyValuePair{Key: "row_count", Value: strconv.FormatInt(nums, 10)})
log.Info("success to get collection statistics", zap.Any("response", resp))
return resp, nil
@ -395,7 +395,7 @@ func (s *Server) GetPartitionStatistics(ctx context.Context, req *datapb.GetPart
}
nums := int64(0)
if len(req.GetPartitionIDs()) == 0 {
nums = s.meta.GetNumRowsOfCollection(req.CollectionID)
nums = s.meta.GetNumRowsOfCollection(ctx, req.CollectionID)
}
for _, partID := range req.GetPartitionIDs() {
num := s.meta.GetNumRowsOfPartition(ctx, req.CollectionID, partID)

View File

@ -37,6 +37,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/metautil"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
type ServerSuite struct {
@ -727,7 +728,7 @@ func TestBroadcastAlteredCollection(t *testing.T) {
})
t.Run("test meta non exist", func(t *testing.T) {
s := &Server{meta: &meta{collections: make(map[UniqueID]*collectionInfo, 1)}}
s := &Server{meta: &meta{collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()}}
s.stateCode.Store(commonpb.StateCode_Healthy)
ctx := context.Background()
req := &datapb.AlterCollectionRequest{
@ -738,13 +739,13 @@ func TestBroadcastAlteredCollection(t *testing.T) {
resp, err := s.BroadcastAlteredCollection(ctx, req)
assert.NotNil(t, resp)
assert.NoError(t, err)
assert.Equal(t, 1, len(s.meta.collections))
assert.Equal(t, 1, s.meta.collections.Len())
})
t.Run("test update meta", func(t *testing.T) {
s := &Server{meta: &meta{collections: map[UniqueID]*collectionInfo{
1: {ID: 1},
}}}
collections := typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()
collections.Insert(1, &collectionInfo{ID: 1})
s := &Server{meta: &meta{collections: collections}}
s.stateCode.Store(commonpb.StateCode_Healthy)
ctx := context.Background()
req := &datapb.AlterCollectionRequest{
@ -753,11 +754,15 @@ func TestBroadcastAlteredCollection(t *testing.T) {
Properties: []*commonpb.KeyValuePair{{Key: "k", Value: "v"}},
}
assert.Nil(t, s.meta.collections[1].Properties)
coll, ok := s.meta.collections.Get(1)
assert.True(t, ok)
assert.Nil(t, coll.Properties)
resp, err := s.BroadcastAlteredCollection(ctx, req)
assert.NotNil(t, resp)
assert.NoError(t, err)
assert.NotNil(t, s.meta.collections[1].Properties)
coll, ok = s.meta.collections.Get(1)
assert.True(t, ok)
assert.NotNil(t, coll.Properties)
})
}

View File

@ -29,7 +29,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datacoord/session"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/util/lock"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
type SyncSegmentsSchedulerSuite struct {
@ -45,35 +45,34 @@ func Test_SyncSegmentsSchedulerSuite(t *testing.T) {
}
func (s *SyncSegmentsSchedulerSuite) initParams() {
s.m = &meta{
RWMutex: lock.RWMutex{},
collections: map[UniqueID]*collectionInfo{
1: {
ID: 1,
Schema: &schemapb.CollectionSchema{
Name: "coll1",
Fields: []*schemapb.FieldSchema{
{
FieldID: 100,
Name: "pk",
IsPrimaryKey: true,
Description: "",
DataType: schemapb.DataType_Int64,
},
{
FieldID: 101,
Name: "vec",
IsPrimaryKey: false,
Description: "",
DataType: schemapb.DataType_FloatVector,
},
},
collections := typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()
collections.Insert(1, &collectionInfo{
ID: 1,
Schema: &schemapb.CollectionSchema{
Name: "coll1",
Fields: []*schemapb.FieldSchema{
{
FieldID: 100,
Name: "pk",
IsPrimaryKey: true,
Description: "",
DataType: schemapb.DataType_Int64,
},
{
FieldID: 101,
Name: "vec",
IsPrimaryKey: false,
Description: "",
DataType: schemapb.DataType_FloatVector,
},
Partitions: []int64{2, 3},
VChannelNames: []string{"channel1", "channel2"},
},
2: nil,
},
Partitions: []int64{2, 3},
VChannelNames: []string{"channel1", "channel2"},
})
collections.Insert(2, nil)
s.m = &meta{
collections: collections,
segments: &SegmentsInfo{
secondaryIndexes: segmentInfoIndexes{
channel2Segments: map[string]map[UniqueID]*SegmentInfo{
@ -356,9 +355,13 @@ func (s *SyncSegmentsSchedulerSuite) Test_SyncSegmentsFail() {
ctx := context.Background()
s.Run("pk not found", func() {
sss.meta.collections[1].Schema.Fields[0].IsPrimaryKey = false
coll, ok := sss.meta.collections.Get(1)
s.True(ok)
coll.Schema.Fields[0].IsPrimaryKey = false
sss.SyncSegmentsForCollections(ctx)
sss.meta.collections[1].Schema.Fields[0].IsPrimaryKey = true
coll, ok = sss.meta.collections.Get(1)
s.True(ok)
coll.Schema.Fields[0].IsPrimaryKey = true
})
s.Run("find watcher failed", func() {

View File

@ -509,7 +509,8 @@ func withStatsTaskMeta(stm *statsTaskMeta) testMetaOption {
func createMeta(catalog metastore.DataCoordCatalog, opts ...testMetaOption) *meta {
mt := &meta{
catalog: catalog,
catalog: catalog,
collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo](),
segments: &SegmentsInfo{
segments: map[UniqueID]*SegmentInfo{
1000: {
@ -1594,6 +1595,16 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() {
IsPartitionKey: true,
},
}
collections := typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()
collections.Insert(collID, &collectionInfo{
ID: collID,
Schema: &schemapb.CollectionSchema{
Fields: fieldsSchema,
},
CreatedAt: 0,
})
segIndexes := typeutil.NewConcurrentMap[UniqueID, *typeutil.ConcurrentMap[UniqueID, *model.SegmentIndex]]()
segIdx := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]()
segIdx.Insert(indexID, &model.SegmentIndex{
@ -1613,17 +1624,10 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() {
IndexSerializedSize: 0,
})
segIndexes.Insert(segID, segIdx)
mt := meta{
catalog: catalog,
collections: map[int64]*collectionInfo{
collID: {
ID: collID,
Schema: &schemapb.CollectionSchema{
Fields: fieldsSchema,
},
CreatedAt: 0,
},
},
catalog: catalog,
collections: collections,
analyzeMeta: &analyzeMeta{
ctx: context.Background(),
@ -1754,9 +1758,11 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() {
mt.indexMeta.fieldIndexLock.Lock()
defer mt.indexMeta.fieldIndexLock.Unlock()
mt.indexMeta.indexes[collID][indexID].IndexParams[1].Value = "HNSW"
mt.collections[collID].Schema.Fields[0].DataType = schemapb.DataType_FloatVector
mt.collections[collID].Schema.Fields[1].IsPartitionKey = true
mt.collections[collID].Schema.Fields[1].DataType = schemapb.DataType_VarChar
coll, ok := mt.collections.Get(collID)
s.True(ok)
coll.Schema.Fields[0].DataType = schemapb.DataType_FloatVector
coll.Schema.Fields[1].IsPartitionKey = true
coll.Schema.Fields[1].DataType = schemapb.DataType_VarChar
}
in.EXPECT().QueryJobsV2(mock.Anything, mock.Anything).RunAndReturn(
@ -1815,7 +1821,9 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() {
schemapb.DataType_VarChar,
schemapb.DataType_String,
} {
mt.collections[collID].Schema.Fields[1].DataType = dataType
coll, ok := mt.collections.Get(collID)
s.True(ok)
coll.Schema.Fields[1].DataType = dataType
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, in *workerpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) {
s.NotZero(len(in.GetIndexRequest().OptionalScalarFields), "optional scalar field should be set")
@ -1863,7 +1871,9 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() {
for _, dataType := range []schemapb.DataType{
schemapb.DataType_SparseFloatVector,
} {
mt.collections[collID].Schema.Fields[0].DataType = dataType
coll, ok := mt.collections.Get(collID)
s.True(ok)
coll.Schema.Fields[0].DataType = dataType
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, in *workerpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) {
s.Zero(len(in.GetIndexRequest().OptionalScalarFields), "optional scalar field should not be set")
@ -1893,7 +1903,9 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() {
schemapb.DataType_Array,
schemapb.DataType_JSON,
} {
mt.collections[collID].Schema.Fields[1].DataType = dataType
coll, ok := mt.collections.Get(collID)
s.True(ok)
coll.Schema.Fields[1].DataType = dataType
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, in *workerpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) {
s.Zero(len(in.GetIndexRequest().OptionalScalarFields), "optional scalar field should be set")
@ -1916,7 +1928,9 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() {
s.Run("Submit returns empty optional field when no partition key", func() {
paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true")
mt.collections[collID].Schema.Fields[1].IsPartitionKey = false
coll, ok := mt.collections.Get(collID)
s.True(ok)
coll.Schema.Fields[1].IsPartitionKey = false
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, in *workerpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) {
s.Zero(len(in.GetIndexRequest().OptionalScalarFields), "optional scalar field should be set")