From 8f7019468f9fb2ed82f6dabeabbf85701bc34f08 Mon Sep 17 00:00:00 2001 From: chyezh Date: Tue, 5 Mar 2024 10:04:59 +0800 Subject: [PATCH] fix: starve lock caused by slow GetCompactionTo method when too much segments (#30963) issue: #30823 Signed-off-by: chyezh --- internal/datacoord/compaction_trigger_test.go | 12 +- internal/datacoord/garbage_collector_test.go | 189 +++++++++- internal/datacoord/index_service_test.go | 355 +++++++++--------- internal/datacoord/meta.go | 12 +- internal/datacoord/meta_test.go | 106 +++--- internal/datacoord/segment_info.go | 52 ++- internal/datacoord/segment_info_test.go | 98 +++++ internal/datacoord/services.go | 7 +- 8 files changed, 591 insertions(+), 240 deletions(-) create mode 100644 internal/datacoord/segment_info_test.go diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index a3be13c657..501f9446a3 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -115,7 +115,7 @@ func Test_compactionTrigger_force(t *testing.T) { &meta{ catalog: catalog, segments: &SegmentsInfo{ - map[int64]*SegmentInfo{ + segments: map[int64]*SegmentInfo{ 1: { SegmentInfo: &datapb.SegmentInfo{ ID: 1, @@ -871,7 +871,7 @@ func Test_compactionTrigger_noplan(t *testing.T) { indexMeta: newSegmentIndexMeta(nil), // 4 segment segments: &SegmentsInfo{ - map[int64]*SegmentInfo{ + segments: map[int64]*SegmentInfo{ 1: { SegmentInfo: &datapb.SegmentInfo{ ID: 1, @@ -1052,7 +1052,7 @@ func Test_compactionTrigger_PrioritizedCandi(t *testing.T) { &meta{ // 8 small segments segments: &SegmentsInfo{ - map[int64]*SegmentInfo{ + segments: map[int64]*SegmentInfo{ 1: { SegmentInfo: genSeg(1, 20), lastFlushTime: time.Now().Add(-100 * time.Minute), @@ -1232,7 +1232,7 @@ func Test_compactionTrigger_SmallCandi(t *testing.T) { &meta{ // 4 small segments segments: &SegmentsInfo{ - map[int64]*SegmentInfo{ + segments: map[int64]*SegmentInfo{ 1: { SegmentInfo: genSeg(1, 20), lastFlushTime: time.Now().Add(-100 * time.Minute), @@ -1419,7 +1419,7 @@ func Test_compactionTrigger_SqueezeNonPlannedSegs(t *testing.T) { &meta{ // 4 small segments segments: &SegmentsInfo{ - map[int64]*SegmentInfo{ + segments: map[int64]*SegmentInfo{ 1: { SegmentInfo: genSeg(1, 60), lastFlushTime: time.Now().Add(-100 * time.Minute), @@ -2183,7 +2183,7 @@ func (s *CompactionTriggerSuite) SetupTest() { s.channel = "dml_0_100v0" s.meta = &meta{ segments: &SegmentsInfo{ - map[int64]*SegmentInfo{ + segments: map[int64]*SegmentInfo{ 1: { SegmentInfo: s.genSeg(1, 60), lastFlushTime: time.Now().Add(-100 * time.Minute), diff --git a/internal/datacoord/garbage_collector_test.go b/internal/datacoord/garbage_collector_test.go index 0cd6ea20ec..f09c48bee3 100644 --- a/internal/datacoord/garbage_collector_test.go +++ b/internal/datacoord/garbage_collector_test.go @@ -798,7 +798,8 @@ func TestGarbageCollector_clearETCD(t *testing.T) { channelCPLocks: lock.NewKeyLock[string](), channelCPs: channelCPs, segments: &SegmentsInfo{ - map[UniqueID]*SegmentInfo{ + compactionTo: make(map[int64]int64), + segments: map[UniqueID]*SegmentInfo{ segID: { SegmentInfo: &datapb.SegmentInfo{ ID: segID, @@ -1107,6 +1108,192 @@ func TestGarbageCollector_clearETCD(t *testing.T) { }, }, } + + for segID, segment := range map[UniqueID]*SegmentInfo{ + segID: { + SegmentInfo: &datapb.SegmentInfo{ + ID: segID, + CollectionID: collID, + PartitionID: partID, + InsertChannel: "dmlChannel", + NumOfRows: 5000, + State: commonpb.SegmentState_Dropped, + MaxRowNum: 65536, + DroppedAt: 0, + DmlPosition: &msgpb.MsgPosition{ + Timestamp: 900, + }, + Binlogs: []*datapb.FieldBinlog{ + { + FieldID: 1, + Binlogs: []*datapb.Binlog{ + { + LogPath: "log1", + LogSize: 1024, + }, + }, + }, + { + FieldID: 2, + Binlogs: []*datapb.Binlog{ + { + LogPath: "log2", + LogSize: 1024, + }, + }, + }, + }, + Deltalogs: []*datapb.FieldBinlog{ + { + FieldID: 1, + Binlogs: []*datapb.Binlog{ + { + LogPath: "del_log1", + LogSize: 1024, + }, + }, + }, + { + FieldID: 2, + Binlogs: []*datapb.Binlog{ + { + LogPath: "del_log2", + LogSize: 1024, + }, + }, + }, + }, + Statslogs: []*datapb.FieldBinlog{ + { + FieldID: 1, + Binlogs: []*datapb.Binlog{ + { + LogPath: "stats_log1", + LogSize: 1024, + }, + }, + }, + }, + }, + }, + segID + 1: { + SegmentInfo: &datapb.SegmentInfo{ + ID: segID + 1, + CollectionID: collID, + PartitionID: partID, + InsertChannel: "dmlChannel", + NumOfRows: 5000, + State: commonpb.SegmentState_Dropped, + MaxRowNum: 65536, + DroppedAt: 0, + DmlPosition: &msgpb.MsgPosition{ + Timestamp: 900, + }, + }, + }, + segID + 2: { + SegmentInfo: &datapb.SegmentInfo{ + ID: segID + 2, + CollectionID: collID, + PartitionID: partID, + InsertChannel: "dmlChannel", + NumOfRows: 10000, + State: commonpb.SegmentState_Dropped, + MaxRowNum: 65536, + DroppedAt: 10, + DmlPosition: &msgpb.MsgPosition{ + Timestamp: 900, + }, + CompactionFrom: []int64{segID, segID + 1}, + }, + }, + segID + 3: { + SegmentInfo: &datapb.SegmentInfo{ + ID: segID + 3, + CollectionID: collID, + PartitionID: partID, + InsertChannel: "dmlChannel", + NumOfRows: 2000, + State: commonpb.SegmentState_Dropped, + MaxRowNum: 65536, + DroppedAt: 10, + DmlPosition: &msgpb.MsgPosition{ + Timestamp: 900, + }, + CompactionFrom: nil, + }, + }, + segID + 4: { + SegmentInfo: &datapb.SegmentInfo{ + ID: segID + 4, + CollectionID: collID, + PartitionID: partID, + InsertChannel: "dmlChannel", + NumOfRows: 12000, + State: commonpb.SegmentState_Flushed, + MaxRowNum: 65536, + DroppedAt: 10, + DmlPosition: &msgpb.MsgPosition{ + Timestamp: 900, + }, + CompactionFrom: []int64{segID + 2, segID + 3}, + }, + }, + segID + 5: { + SegmentInfo: &datapb.SegmentInfo{ + ID: segID + 5, + CollectionID: collID, + PartitionID: partID, + InsertChannel: "dmlChannel", + NumOfRows: 2000, + State: commonpb.SegmentState_Dropped, + MaxRowNum: 65535, + DroppedAt: 0, + CompactionFrom: nil, + DmlPosition: &msgpb.MsgPosition{ + Timestamp: 1200, + }, + }, + }, + segID + 6: { + SegmentInfo: &datapb.SegmentInfo{ + ID: segID + 6, + CollectionID: collID, + PartitionID: partID, + InsertChannel: "dmlChannel", + NumOfRows: 2000, + State: commonpb.SegmentState_Dropped, + MaxRowNum: 65535, + DroppedAt: uint64(time.Now().Add(time.Hour).UnixNano()), + CompactionFrom: nil, + DmlPosition: &msgpb.MsgPosition{ + Timestamp: 900, + }, + Compacted: true, + }, + }, + // compacted and child is GCed, dml pos is big than channel cp + segID + 7: { + SegmentInfo: &datapb.SegmentInfo{ + ID: segID + 7, + CollectionID: collID, + PartitionID: partID, + InsertChannel: "dmlChannel", + NumOfRows: 2000, + State: commonpb.SegmentState_Dropped, + MaxRowNum: 65535, + DroppedAt: 0, + CompactionFrom: nil, + DmlPosition: &msgpb.MsgPosition{ + Timestamp: 1200, + }, + Compacted: true, + }, + }, + } { + m.segments.SetSegment(segID, segment) + } + cm := &mocks.ChunkManager{} cm.EXPECT().Remove(mock.Anything, mock.Anything).Return(nil) gc := newGarbageCollector( diff --git a/internal/datacoord/index_service_test.go b/internal/datacoord/index_service_test.go index 6c96087c74..ba1dd566bd 100644 --- a/internal/datacoord/index_service_test.go +++ b/internal/datacoord/index_service_test.go @@ -527,53 +527,56 @@ func TestServer_AlterIndex(t *testing.T) { meta: &meta{ catalog: catalog, indexMeta: indexMeta, - segments: &SegmentsInfo{map[UniqueID]*SegmentInfo{ - invalidSegID: { - SegmentInfo: &datapb.SegmentInfo{ - ID: invalidSegID, - CollectionID: collID, - PartitionID: partID, - NumOfRows: 10000, - State: commonpb.SegmentState_Flushed, - MaxRowNum: 65536, - LastExpireTime: createTS, - StartPosition: &msgpb.MsgPosition{ - // timesamp > index start time, will be filtered out - Timestamp: createTS + 1, + segments: &SegmentsInfo{ + compactionTo: make(map[int64]int64), + segments: map[UniqueID]*SegmentInfo{ + invalidSegID: { + SegmentInfo: &datapb.SegmentInfo{ + ID: invalidSegID, + CollectionID: collID, + PartitionID: partID, + NumOfRows: 10000, + State: commonpb.SegmentState_Flushed, + MaxRowNum: 65536, + LastExpireTime: createTS, + StartPosition: &msgpb.MsgPosition{ + // timesamp > index start time, will be filtered out + Timestamp: createTS + 1, + }, + }, + }, + segID: { + SegmentInfo: &datapb.SegmentInfo{ + ID: segID, + CollectionID: collID, + PartitionID: partID, + NumOfRows: 10000, + State: commonpb.SegmentState_Flushed, + MaxRowNum: 65536, + LastExpireTime: createTS, + StartPosition: &msgpb.MsgPosition{ + Timestamp: createTS, + }, + CreatedByCompaction: true, + CompactionFrom: []int64{segID - 1}, + }, + }, + segID - 1: { + SegmentInfo: &datapb.SegmentInfo{ + ID: segID, + CollectionID: collID, + PartitionID: partID, + NumOfRows: 10000, + State: commonpb.SegmentState_Dropped, + MaxRowNum: 65536, + LastExpireTime: createTS, + StartPosition: &msgpb.MsgPosition{ + Timestamp: createTS, + }, }, }, }, - segID: { - SegmentInfo: &datapb.SegmentInfo{ - ID: segID, - CollectionID: collID, - PartitionID: partID, - NumOfRows: 10000, - State: commonpb.SegmentState_Flushed, - MaxRowNum: 65536, - LastExpireTime: createTS, - StartPosition: &msgpb.MsgPosition{ - Timestamp: createTS, - }, - CreatedByCompaction: true, - CompactionFrom: []int64{segID - 1}, - }, - }, - segID - 1: { - SegmentInfo: &datapb.SegmentInfo{ - ID: segID, - CollectionID: collID, - PartitionID: partID, - NumOfRows: 10000, - State: commonpb.SegmentState_Dropped, - MaxRowNum: 65536, - LastExpireTime: createTS, - StartPosition: &msgpb.MsgPosition{ - Timestamp: createTS, - }, - }, - }, - }}, + }, }, allocator: newMockAllocator(), notifyIndexChan: make(chan UniqueID, 1), @@ -694,28 +697,31 @@ func TestServer_GetIndexState(t *testing.T) { segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{}, }, - segments: &SegmentsInfo{map[UniqueID]*SegmentInfo{ - segID: { - SegmentInfo: &datapb.SegmentInfo{ - ID: segID, - CollectionID: collID, - PartitionID: partID, - InsertChannel: "", - NumOfRows: 10250, - State: commonpb.SegmentState_Flushed, - MaxRowNum: 65536, - LastExpireTime: createTS - 1, - StartPosition: &msgpb.MsgPosition{ - Timestamp: createTS - 1, + segments: &SegmentsInfo{ + compactionTo: make(map[int64]int64), + segments: map[UniqueID]*SegmentInfo{ + segID: { + SegmentInfo: &datapb.SegmentInfo{ + ID: segID, + CollectionID: collID, + PartitionID: partID, + InsertChannel: "", + NumOfRows: 10250, + State: commonpb.SegmentState_Flushed, + MaxRowNum: 65536, + LastExpireTime: createTS - 1, + StartPosition: &msgpb.MsgPosition{ + Timestamp: createTS - 1, + }, }, + currRows: 0, + allocations: nil, + lastFlushTime: time.Time{}, + isCompacting: false, + lastWrittenTime: time.Time{}, }, - currRows: 0, - allocations: nil, - lastFlushTime: time.Time{}, - isCompacting: false, - lastWrittenTime: time.Time{}, }, - }}, + }, } t.Run("index state is unissued", func(t *testing.T) { @@ -768,28 +774,31 @@ func TestServer_GetIndexState(t *testing.T) { }, }, - segments: &SegmentsInfo{map[UniqueID]*SegmentInfo{ - segID: { - SegmentInfo: &datapb.SegmentInfo{ - ID: segID, - CollectionID: collID, - PartitionID: partID, - InsertChannel: "", - NumOfRows: 10250, - State: commonpb.SegmentState_Flushed, - MaxRowNum: 65536, - LastExpireTime: createTS - 1, - StartPosition: &msgpb.MsgPosition{ - Timestamp: createTS - 1, + segments: &SegmentsInfo{ + compactionTo: make(map[int64]int64), + segments: map[UniqueID]*SegmentInfo{ + segID: { + SegmentInfo: &datapb.SegmentInfo{ + ID: segID, + CollectionID: collID, + PartitionID: partID, + InsertChannel: "", + NumOfRows: 10250, + State: commonpb.SegmentState_Flushed, + MaxRowNum: 65536, + LastExpireTime: createTS - 1, + StartPosition: &msgpb.MsgPosition{ + Timestamp: createTS - 1, + }, }, + currRows: 0, + allocations: nil, + lastFlushTime: time.Time{}, + isCompacting: false, + lastWrittenTime: time.Time{}, }, - currRows: 0, - allocations: nil, - lastFlushTime: time.Time{}, - isCompacting: false, - lastWrittenTime: time.Time{}, }, - }}, + }, } t.Run("index state is none", func(t *testing.T) { @@ -853,7 +862,7 @@ func TestServer_GetSegmentIndexState(t *testing.T) { meta: &meta{ catalog: indexMeta.catalog, indexMeta: indexMeta, - segments: &SegmentsInfo{map[UniqueID]*SegmentInfo{}}, + segments: NewSegmentsInfo(), }, allocator: newMockAllocator(), notifyIndexChan: make(chan UniqueID, 1), @@ -976,7 +985,7 @@ func TestServer_GetIndexBuildProgress(t *testing.T) { meta: &meta{ catalog: &datacoord.Catalog{MetaKv: mockkv.NewMetaKv(t)}, indexMeta: newSegmentIndexMeta(&datacoord.Catalog{MetaKv: mockkv.NewMetaKv(t)}), - segments: &SegmentsInfo{map[UniqueID]*SegmentInfo{}}, + segments: NewSegmentsInfo(), }, allocator: newMockAllocator(), notifyIndexChan: make(chan UniqueID, 1), @@ -1412,53 +1421,56 @@ func TestServer_DescribeIndex(t *testing.T) { }, }, - segments: &SegmentsInfo{map[UniqueID]*SegmentInfo{ - invalidSegID: { - SegmentInfo: &datapb.SegmentInfo{ - ID: invalidSegID, - CollectionID: collID, - PartitionID: partID, - NumOfRows: 10000, - State: commonpb.SegmentState_Flushed, - MaxRowNum: 65536, - LastExpireTime: createTS, - StartPosition: &msgpb.MsgPosition{ - // timesamp > index start time, will be filtered out - Timestamp: createTS + 1, + segments: &SegmentsInfo{ + compactionTo: make(map[int64]int64), + segments: map[UniqueID]*SegmentInfo{ + invalidSegID: { + SegmentInfo: &datapb.SegmentInfo{ + ID: invalidSegID, + CollectionID: collID, + PartitionID: partID, + NumOfRows: 10000, + State: commonpb.SegmentState_Flushed, + MaxRowNum: 65536, + LastExpireTime: createTS, + StartPosition: &msgpb.MsgPosition{ + // timesamp > index start time, will be filtered out + Timestamp: createTS + 1, + }, + }, + }, + segID: { + SegmentInfo: &datapb.SegmentInfo{ + ID: segID, + CollectionID: collID, + PartitionID: partID, + NumOfRows: 10000, + State: commonpb.SegmentState_Flushed, + MaxRowNum: 65536, + LastExpireTime: createTS, + StartPosition: &msgpb.MsgPosition{ + Timestamp: createTS, + }, + CreatedByCompaction: true, + CompactionFrom: []int64{segID - 1}, + }, + }, + segID - 1: { + SegmentInfo: &datapb.SegmentInfo{ + ID: segID - 1, + CollectionID: collID, + PartitionID: partID, + NumOfRows: 10000, + State: commonpb.SegmentState_Dropped, + MaxRowNum: 65536, + LastExpireTime: createTS, + StartPosition: &msgpb.MsgPosition{ + Timestamp: createTS, + }, }, }, }, - segID: { - SegmentInfo: &datapb.SegmentInfo{ - ID: segID, - CollectionID: collID, - PartitionID: partID, - NumOfRows: 10000, - State: commonpb.SegmentState_Flushed, - MaxRowNum: 65536, - LastExpireTime: createTS, - StartPosition: &msgpb.MsgPosition{ - Timestamp: createTS, - }, - CreatedByCompaction: true, - CompactionFrom: []int64{segID - 1}, - }, - }, - segID - 1: { - SegmentInfo: &datapb.SegmentInfo{ - ID: segID - 1, - CollectionID: collID, - PartitionID: partID, - NumOfRows: 10000, - State: commonpb.SegmentState_Dropped, - MaxRowNum: 65536, - LastExpireTime: createTS, - StartPosition: &msgpb.MsgPosition{ - Timestamp: createTS, - }, - }, - }, - }}, + }, }, allocator: newMockAllocator(), notifyIndexChan: make(chan UniqueID, 1), @@ -1716,37 +1728,40 @@ func TestServer_GetIndexStatistics(t *testing.T) { }, }, - segments: &SegmentsInfo{map[UniqueID]*SegmentInfo{ - invalidSegID: { - SegmentInfo: &datapb.SegmentInfo{ - ID: segID, - CollectionID: collID, - PartitionID: partID, - NumOfRows: 10000, - State: commonpb.SegmentState_Flushed, - MaxRowNum: 65536, - LastExpireTime: createTS, - StartPosition: &msgpb.MsgPosition{ - // timesamp > index start time, will be filtered out - Timestamp: createTS + 1, + segments: &SegmentsInfo{ + compactionTo: make(map[int64]int64), + segments: map[UniqueID]*SegmentInfo{ + invalidSegID: { + SegmentInfo: &datapb.SegmentInfo{ + ID: segID, + CollectionID: collID, + PartitionID: partID, + NumOfRows: 10000, + State: commonpb.SegmentState_Flushed, + MaxRowNum: 65536, + LastExpireTime: createTS, + StartPosition: &msgpb.MsgPosition{ + // timesamp > index start time, will be filtered out + Timestamp: createTS + 1, + }, + }, + }, + segID: { + SegmentInfo: &datapb.SegmentInfo{ + ID: segID, + CollectionID: collID, + PartitionID: partID, + NumOfRows: 10000, + State: commonpb.SegmentState_Flushed, + MaxRowNum: 65536, + LastExpireTime: createTS, + StartPosition: &msgpb.MsgPosition{ + Timestamp: createTS, + }, }, }, }, - segID: { - SegmentInfo: &datapb.SegmentInfo{ - ID: segID, - CollectionID: collID, - PartitionID: partID, - NumOfRows: 10000, - State: commonpb.SegmentState_Flushed, - MaxRowNum: 65536, - LastExpireTime: createTS, - StartPosition: &msgpb.MsgPosition{ - Timestamp: createTS, - }, - }, - }, - }}, + }, }, allocator: newMockAllocator(), notifyIndexChan: make(chan UniqueID, 1), @@ -1900,19 +1915,22 @@ func TestServer_DropIndex(t *testing.T) { segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{}, }, - segments: &SegmentsInfo{map[UniqueID]*SegmentInfo{ - segID: { - SegmentInfo: &datapb.SegmentInfo{ - ID: segID, - CollectionID: collID, - PartitionID: partID, - NumOfRows: 10000, - State: commonpb.SegmentState_Flushed, - MaxRowNum: 65536, - LastExpireTime: createTS, + segments: &SegmentsInfo{ + compactionTo: make(map[int64]int64), + segments: map[UniqueID]*SegmentInfo{ + segID: { + SegmentInfo: &datapb.SegmentInfo{ + ID: segID, + CollectionID: collID, + PartitionID: partID, + NumOfRows: 10000, + State: commonpb.SegmentState_Flushed, + MaxRowNum: 65536, + LastExpireTime: createTS, + }, }, }, - }}, + }, }, allocator: newMockAllocator(), notifyIndexChan: make(chan UniqueID, 1), @@ -2064,7 +2082,8 @@ func TestServer_GetIndexInfos(t *testing.T) { }, segments: &SegmentsInfo{ - map[UniqueID]*SegmentInfo{ + compactionTo: make(map[int64]int64), + segments: map[UniqueID]*SegmentInfo{ segID: { SegmentInfo: &datapb.SegmentInfo{ ID: segID, diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 67d1402d67..06112093c9 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -1283,18 +1283,12 @@ func (m *meta) HasSegments(segIDs []UniqueID) (bool, error) { return true, nil } -func (m *meta) GetCompactionTo(segmentID int64) *SegmentInfo { +// GetCompactionTo returns the segment info of the segment to be compacted to. +func (m *meta) GetCompactionTo(segmentID int64) (*SegmentInfo, bool) { m.RLock() defer m.RUnlock() - segments := m.segments.GetSegments() - for _, segment := range segments { - parents := typeutil.NewUniqueSet(segment.GetCompactionFrom()...) - if parents.Contain(segmentID) { - return segment - } - } - return nil + return m.segments.GetCompactionTo(segmentID) } // UpdateChannelCheckpoint updates and saves channel checkpoint. diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index 4ff70a5119..0a332d1686 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -165,33 +165,34 @@ func (suite *MetaBasicSuite) TestCollection() { } func (suite *MetaBasicSuite) TestCompleteCompactionMutation() { - latestSegments := &SegmentsInfo{ - map[UniqueID]*SegmentInfo{ - 1: {SegmentInfo: &datapb.SegmentInfo{ - ID: 1, - CollectionID: 100, - PartitionID: 10, - State: commonpb.SegmentState_Flushed, - Level: datapb.SegmentLevel_L1, - Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 10000, 10001)}, - Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 20000, 20001)}, - // latest segment has 2 deltalogs, one submit for compaction, one is appended before compaction done - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 30000), getFieldBinlogIDs(0, 30001)}, - NumOfRows: 2, - }}, - 2: {SegmentInfo: &datapb.SegmentInfo{ - ID: 2, - CollectionID: 100, - PartitionID: 10, - State: commonpb.SegmentState_Flushed, - Level: datapb.SegmentLevel_L1, - Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 11000)}, - Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 21000)}, - // latest segment has 2 deltalogs, one submit for compaction, one is appended before compaction done - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 31000), getFieldBinlogIDs(0, 31001)}, - NumOfRows: 2, - }}, - }, + latestSegments := NewSegmentsInfo() + for segID, segment := range map[UniqueID]*SegmentInfo{ + 1: {SegmentInfo: &datapb.SegmentInfo{ + ID: 1, + CollectionID: 100, + PartitionID: 10, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L1, + Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 10000, 10001)}, + Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 20000, 20001)}, + // latest segment has 2 deltalogs, one submit for compaction, one is appended before compaction done + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 30000), getFieldBinlogIDs(0, 30001)}, + NumOfRows: 2, + }}, + 2: {SegmentInfo: &datapb.SegmentInfo{ + ID: 2, + CollectionID: 100, + PartitionID: 10, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L1, + Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 11000)}, + Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 21000)}, + // latest segment has 2 deltalogs, one submit for compaction, one is appended before compaction done + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 31000), getFieldBinlogIDs(0, 31001)}, + NumOfRows: 2, + }}, + } { + latestSegments.SetSegment(segID, segment) } mockChMgr := mocks2.NewChunkManager(suite.T()) @@ -861,7 +862,7 @@ func Test_meta_SetSegmentCompacting(t *testing.T) { fields{ NewMetaMemoryKV(), &SegmentsInfo{ - map[int64]*SegmentInfo{ + segments: map[int64]*SegmentInfo{ 1: { SegmentInfo: &datapb.SegmentInfo{ ID: 1, @@ -870,6 +871,7 @@ func Test_meta_SetSegmentCompacting(t *testing.T) { isCompacting: false, }, }, + compactionTo: make(map[int64]UniqueID), }, }, args{ @@ -910,7 +912,7 @@ func Test_meta_SetSegmentImporting(t *testing.T) { fields{ NewMetaMemoryKV(), &SegmentsInfo{ - map[int64]*SegmentInfo{ + segments: map[int64]*SegmentInfo{ 1: { SegmentInfo: &datapb.SegmentInfo{ ID: 1, @@ -941,30 +943,32 @@ func Test_meta_SetSegmentImporting(t *testing.T) { } func Test_meta_GetSegmentsOfCollection(t *testing.T) { - storedSegments := &SegmentsInfo{ - map[int64]*SegmentInfo{ - 1: { - SegmentInfo: &datapb.SegmentInfo{ - ID: 1, - CollectionID: 1, - State: commonpb.SegmentState_Flushed, - }, - }, - 2: { - SegmentInfo: &datapb.SegmentInfo{ - ID: 2, - CollectionID: 1, - State: commonpb.SegmentState_Growing, - }, - }, - 3: { - SegmentInfo: &datapb.SegmentInfo{ - ID: 3, - CollectionID: 2, - State: commonpb.SegmentState_Flushed, - }, + storedSegments := NewSegmentsInfo() + + for segID, segment := range map[int64]*SegmentInfo{ + 1: { + SegmentInfo: &datapb.SegmentInfo{ + ID: 1, + CollectionID: 1, + State: commonpb.SegmentState_Flushed, }, }, + 2: { + SegmentInfo: &datapb.SegmentInfo{ + ID: 2, + CollectionID: 1, + State: commonpb.SegmentState_Growing, + }, + }, + 3: { + SegmentInfo: &datapb.SegmentInfo{ + ID: 3, + CollectionID: 2, + State: commonpb.SegmentState_Flushed, + }, + }, + } { + storedSegments.SetSegment(segID, segment) } expectedSeg := map[int64]commonpb.SegmentState{1: commonpb.SegmentState_Flushed, 2: commonpb.SegmentState_Growing} m := &meta{segments: storedSegments} diff --git a/internal/datacoord/segment_info.go b/internal/datacoord/segment_info.go index cd7d2cdd95..13cd283b2b 100644 --- a/internal/datacoord/segment_info.go +++ b/internal/datacoord/segment_info.go @@ -21,15 +21,19 @@ import ( "github.com/golang/protobuf/proto" "go.uber.org/atomic" + "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/pkg/log" ) // SegmentsInfo wraps a map, which maintains ID to SegmentInfo relation type SegmentsInfo struct { - segments map[UniqueID]*SegmentInfo + segments map[UniqueID]*SegmentInfo + compactionTo map[UniqueID]UniqueID // map the compact relation, value is the segment which `CompactFrom` contains key. + // A segment can be compacted to only one segment finally in meta. } // SegmentInfo wraps datapb.SegmentInfo and patches some extra info on it @@ -62,7 +66,10 @@ func NewSegmentInfo(info *datapb.SegmentInfo) *SegmentInfo { // NewSegmentsInfo creates a `SegmentsInfo` instance, which makes sure internal map is initialized // note that no mutex is wrapped so external concurrent control is needed func NewSegmentsInfo() *SegmentsInfo { - return &SegmentsInfo{segments: make(map[UniqueID]*SegmentInfo)} + return &SegmentsInfo{ + segments: make(map[UniqueID]*SegmentInfo), + compactionTo: make(map[UniqueID]UniqueID), + } } // GetSegment returns SegmentInfo @@ -86,17 +93,42 @@ func (s *SegmentsInfo) GetSegments() []*SegmentInfo { return segments } +// GetCompactionTo returns the segment that the provided segment is compacted to. +// Return (nil, false) if given segmentID can not found in the meta. +// Return (nil, true) if given segmentID can be found not no compaction to. +// Return (notnil, true) if given segmentID can be found and has compaction to. +func (s *SegmentsInfo) GetCompactionTo(fromSegmentID int64) (*SegmentInfo, bool) { + if _, ok := s.segments[fromSegmentID]; !ok { + return nil, false + } + if toID, ok := s.compactionTo[fromSegmentID]; ok { + if to, ok := s.segments[toID]; ok { + return to, true + } + log.Warn("unreachable code: compactionTo relation is broken", zap.Int64("from", fromSegmentID), zap.Int64("to", toID)) + } + return nil, true +} + // DropSegment deletes provided segmentID // no extra method is taken when segmentID not exists func (s *SegmentsInfo) DropSegment(segmentID UniqueID) { - delete(s.segments, segmentID) + if segment, ok := s.segments[segmentID]; ok { + s.deleteCompactTo(segment) + delete(s.segments, segmentID) + } } // SetSegment sets SegmentInfo with segmentID, perform overwrite if already exists // set the logPath of segement in meta empty, to save space // if segment has logPath, make it empty func (s *SegmentsInfo) SetSegment(segmentID UniqueID, segment *SegmentInfo) { + if segment, ok := s.segments[segmentID]; ok { + // Remove old segment compact to relation first. + s.deleteCompactTo(segment) + } s.segments[segmentID] = segment + s.addCompactTo(segment) } // SetRowCount sets rowCount info for SegmentInfo with provided segmentID @@ -217,6 +249,20 @@ func (s *SegmentInfo) ShadowClone(opts ...SegmentInfoOption) *SegmentInfo { return cloned } +// addCompactTo adds the compact relation to the segment +func (s *SegmentsInfo) addCompactTo(segment *SegmentInfo) { + for _, from := range segment.GetCompactionFrom() { + s.compactionTo[from] = segment.GetID() + } +} + +// deleteCompactTo deletes the compact relation to the segment +func (s *SegmentsInfo) deleteCompactTo(segment *SegmentInfo) { + for _, from := range segment.GetCompactionFrom() { + delete(s.compactionTo, from) + } +} + // SegmentInfoOption is the option to set fields in segment info type SegmentInfoOption func(segment *SegmentInfo) diff --git a/internal/datacoord/segment_info_test.go b/internal/datacoord/segment_info_test.go new file mode 100644 index 0000000000..55934e0213 --- /dev/null +++ b/internal/datacoord/segment_info_test.go @@ -0,0 +1,98 @@ +package datacoord + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/internal/proto/datapb" +) + +func TestCompactionTo(t *testing.T) { + segments := NewSegmentsInfo() + segment := NewSegmentInfo(&datapb.SegmentInfo{ + ID: 1, + }) + segments.SetSegment(segment.GetID(), segment) + + s, ok := segments.GetCompactionTo(1) + assert.True(t, ok) + assert.Nil(t, s) + + segment = NewSegmentInfo(&datapb.SegmentInfo{ + ID: 2, + }) + segments.SetSegment(segment.GetID(), segment) + segment = NewSegmentInfo(&datapb.SegmentInfo{ + ID: 3, + CompactionFrom: []int64{1, 2}, + }) + segments.SetSegment(segment.GetID(), segment) + + s, ok = segments.GetCompactionTo(3) + assert.Nil(t, s) + assert.True(t, ok) + s, ok = segments.GetCompactionTo(1) + assert.True(t, ok) + assert.NotNil(t, s) + assert.Equal(t, int64(3), s.GetID()) + s, ok = segments.GetCompactionTo(2) + assert.True(t, ok) + assert.NotNil(t, s) + assert.Equal(t, int64(3), s.GetID()) + + // should be overwrite. + segment = NewSegmentInfo(&datapb.SegmentInfo{ + ID: 3, + CompactionFrom: []int64{2}, + }) + segments.SetSegment(segment.GetID(), segment) + + s, ok = segments.GetCompactionTo(3) + assert.True(t, ok) + assert.Nil(t, s) + s, ok = segments.GetCompactionTo(1) + assert.True(t, ok) + assert.Nil(t, s) + s, ok = segments.GetCompactionTo(2) + assert.True(t, ok) + assert.NotNil(t, s) + assert.Equal(t, int64(3), s.GetID()) + + // should be overwrite back. + segment = NewSegmentInfo(&datapb.SegmentInfo{ + ID: 3, + CompactionFrom: []int64{1, 2}, + }) + segments.SetSegment(segment.GetID(), segment) + + s, ok = segments.GetCompactionTo(3) + assert.Nil(t, s) + assert.True(t, ok) + s, ok = segments.GetCompactionTo(1) + assert.True(t, ok) + assert.NotNil(t, s) + assert.Equal(t, int64(3), s.GetID()) + s, ok = segments.GetCompactionTo(2) + assert.True(t, ok) + assert.NotNil(t, s) + assert.Equal(t, int64(3), s.GetID()) + + // should be droped. + segments.DropSegment(1) + s, ok = segments.GetCompactionTo(1) + assert.False(t, ok) + assert.Nil(t, s) + s, ok = segments.GetCompactionTo(2) + assert.True(t, ok) + assert.NotNil(t, s) + assert.Equal(t, int64(3), s.GetID()) + s, ok = segments.GetCompactionTo(3) + assert.Nil(t, s) + assert.True(t, ok) + + segments.DropSegment(3) + s, ok = segments.GetCompactionTo(2) + assert.True(t, ok) + assert.Nil(t, s) +} diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index bf21bf7818..8d1cb9189d 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -412,15 +412,18 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoR var info *SegmentInfo if req.IncludeUnHealthy { info = s.meta.GetSegment(id) + // TODO: GetCompactionTo should be removed and add into GetSegment method and protected by lock. + // Too much modification need to be applied to SegmentInfo, a refactor is needed. + child, ok := s.meta.GetCompactionTo(id) - if info == nil { + // info may be not-nil, but ok is false when the segment is being dropped concurrently. + if info == nil || !ok { log.Warn("failed to get segment, this may have been cleaned", zap.Int64("segmentID", id)) err := merr.WrapErrSegmentNotFound(id) resp.Status = merr.Status(err) return resp, nil } - child := s.meta.GetCompactionTo(id) clonedInfo := info.Clone() if child != nil { // child segment should decompress binlog path