diff --git a/internal/datanode/metacache/actions.go b/internal/datanode/metacache/actions.go index 5efc8d2203..81bc141abd 100644 --- a/internal/datanode/metacache/actions.go +++ b/internal/datanode/metacache/actions.go @@ -64,6 +64,18 @@ func WithLevel(level datapb.SegmentLevel) SegmentFilter { } } +func WithCompacted() SegmentFilter { + return func(info *SegmentInfo) bool { + return info.compactTo != 0 + } +} + +func WithNoSyncingTask() SegmentFilter { + return func(info *SegmentInfo) bool { + return info.syncingTasks == 0 + } +} + type SegmentAction func(info *SegmentInfo) func UpdateState(state commonpb.SegmentState) SegmentAction { @@ -112,6 +124,7 @@ func StartSyncing(batchSize int64) SegmentAction { return func(info *SegmentInfo) { info.syncingRows += batchSize info.bufferRows -= batchSize + info.syncingTasks++ } } @@ -119,6 +132,7 @@ func FinishSyncing(batchSize int64) SegmentAction { return func(info *SegmentInfo) { info.flushedRows += batchSize info.syncingRows -= batchSize + info.syncingTasks-- } } diff --git a/internal/datanode/metacache/meta_cache.go b/internal/datanode/metacache/meta_cache.go index 7963cab22f..4ebbe9e3e4 100644 --- a/internal/datanode/metacache/meta_cache.go +++ b/internal/datanode/metacache/meta_cache.go @@ -38,6 +38,8 @@ type MetaCache interface { AddSegment(segInfo *datapb.SegmentInfo, factory PkStatsFactory, actions ...SegmentAction) // UpdateSegments applies action to segment(s) satisfy the provided filters. UpdateSegments(action SegmentAction, filters ...SegmentFilter) + // RemoveSegments removes segments matches the provided filter. + RemoveSegments(filters ...SegmentFilter) []int64 // CompactSegments transfers compaction segment results inside the metacache. CompactSegments(newSegmentID, partitionID int64, numRows int64, bfs *BloomFilterSet, oldSegmentIDs ...int64) // GetSegmentsBy returns segments statify the provided filters. @@ -46,6 +48,7 @@ type MetaCache interface { GetSegmentByID(id int64, filters ...SegmentFilter) (*SegmentInfo, bool) // GetSegmentIDs returns ids of segments which satifiy the provided filters. GetSegmentIDsBy(filters ...SegmentFilter) []int64 + // PredictSegments returns the segment ids which may contain the provided primary key. PredictSegments(pk storage.PrimaryKey, filters ...SegmentFilter) ([]int64, bool) } @@ -107,22 +110,13 @@ func (c *metaCacheImpl) AddSegment(segInfo *datapb.SegmentInfo, factory PkStatsF c.segmentInfos[segInfo.GetID()] = segment } -func (c *metaCacheImpl) CompactSegments(newSegmentID, partitionID int64, numOfRows int64, bfs *BloomFilterSet, dropSegmentIDs ...int64) { +func (c *metaCacheImpl) CompactSegments(newSegmentID, partitionID int64, numOfRows int64, bfs *BloomFilterSet, oldSegmentIDs ...int64) { c.mu.Lock() defer c.mu.Unlock() - for _, dropSeg := range dropSegmentIDs { - if _, ok := c.segmentInfos[dropSeg]; ok { - delete(c.segmentInfos, dropSeg) - } else { - log.Warn("some dropped segment not exist in meta cache", - zap.String("channel", c.vChannelName), - zap.Int64("collectionID", c.collectionID), - zap.Int64("segmentID", dropSeg)) - } - } - + compactTo := NullSegment if numOfRows > 0 { + compactTo = newSegmentID if _, ok := c.segmentInfos[newSegmentID]; !ok { c.segmentInfos[newSegmentID] = &SegmentInfo{ segmentID: newSegmentID, @@ -133,6 +127,39 @@ func (c *metaCacheImpl) CompactSegments(newSegmentID, partitionID int64, numOfRo } } } + + for _, segID := range oldSegmentIDs { + if segmentInfo, ok := c.segmentInfos[segID]; ok { + updated := segmentInfo.Clone() + updated.compactTo = compactTo + c.segmentInfos[segID] = updated + } else { + log.Warn("some dropped segment not exist in meta cache", + zap.String("channel", c.vChannelName), + zap.Int64("collectionID", c.collectionID), + zap.Int64("segmentID", segID)) + } + } +} + +func (c *metaCacheImpl) RemoveSegments(filters ...SegmentFilter) []int64 { + if len(filters) == 0 { + log.Warn("remove segment without filters is not allowed", zap.Stack("callstack")) + return nil + } + c.mu.Lock() + defer c.mu.Unlock() + + filter := c.mergeFilters(filters...) + + var ids []int64 + for segID, info := range c.segmentInfos { + if filter(info) { + ids = append(ids, segID) + delete(c.segmentInfos, segID) + } + } + return ids } func (c *metaCacheImpl) GetSegmentsBy(filters ...SegmentFilter) []*SegmentInfo { diff --git a/internal/datanode/metacache/meta_cache_test.go b/internal/datanode/metacache/meta_cache_test.go index 627d721cb3..8b8b006660 100644 --- a/internal/datanode/metacache/meta_cache_test.go +++ b/internal/datanode/metacache/meta_cache_test.go @@ -107,10 +107,14 @@ func (s *MetaCacheSuite) TestCompactSegments() { } for i, partitionID := range s.partitionIDs { - segs := s.cache.GetSegmentIDsBy(WithPartitionID(partitionID)) - s.Equal(1, len(segs)) + segs := s.cache.GetSegmentsBy(WithPartitionID(partitionID)) for _, seg := range segs { - s.Equal(seg, s.newSegments[i]) + if seg.SegmentID() == s.newSegments[i] { + s.Equal(commonpb.SegmentState_Flushed, seg.State()) + } + if seg.SegmentID() == s.flushedSegments[i] { + s.Equal(s.newSegments[i], seg.CompactTo()) + } } } } @@ -156,6 +160,19 @@ func (s *MetaCacheSuite) TestUpdateSegments() { s.Equal(commonpb.SegmentState_Flushed, segment.State()) } +func (s *MetaCacheSuite) TestRemoveSegments() { + ids := s.cache.RemoveSegments() + s.Empty(ids, "remove without filter shall not succeed") + + ids = s.cache.RemoveSegments(WithSegmentIDs(s.flushedSegments...)) + s.ElementsMatch(s.flushedSegments, ids) + + for _, segID := range s.flushedSegments { + _, ok := s.cache.GetSegmentByID(segID) + s.False(ok) + } +} + func (s *MetaCacheSuite) TestPredictSegments() { pk := storage.NewInt64PrimaryKey(100) predict, ok := s.cache.PredictSegments(pk) diff --git a/internal/datanode/metacache/mock_meta_cache.go b/internal/datanode/metacache/mock_meta_cache.go index 076ec05f18..b8c7bd0035 100644 --- a/internal/datanode/metacache/mock_meta_cache.go +++ b/internal/datanode/metacache/mock_meta_cache.go @@ -417,6 +417,63 @@ func (_c *MockMetaCache_PredictSegments_Call) RunAndReturn(run func(storage.Prim return _c } +// RemoveSegments provides a mock function with given fields: filters +func (_m *MockMetaCache) RemoveSegments(filters ...SegmentFilter) []int64 { + _va := make([]interface{}, len(filters)) + for _i := range filters { + _va[_i] = filters[_i] + } + var _ca []interface{} + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 []int64 + if rf, ok := ret.Get(0).(func(...SegmentFilter) []int64); ok { + r0 = rf(filters...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]int64) + } + } + + return r0 +} + +// MockMetaCache_RemoveSegments_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemoveSegments' +type MockMetaCache_RemoveSegments_Call struct { + *mock.Call +} + +// RemoveSegments is a helper method to define mock.On call +// - filters ...SegmentFilter +func (_e *MockMetaCache_Expecter) RemoveSegments(filters ...interface{}) *MockMetaCache_RemoveSegments_Call { + return &MockMetaCache_RemoveSegments_Call{Call: _e.mock.On("RemoveSegments", + append([]interface{}{}, filters...)...)} +} + +func (_c *MockMetaCache_RemoveSegments_Call) Run(run func(filters ...SegmentFilter)) *MockMetaCache_RemoveSegments_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]SegmentFilter, len(args)-0) + for i, a := range args[0:] { + if a != nil { + variadicArgs[i] = a.(SegmentFilter) + } + } + run(variadicArgs...) + }) + return _c +} + +func (_c *MockMetaCache_RemoveSegments_Call) Return(_a0 []int64) *MockMetaCache_RemoveSegments_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockMetaCache_RemoveSegments_Call) RunAndReturn(run func(...SegmentFilter) []int64) *MockMetaCache_RemoveSegments_Call { + _c.Call.Return(run) + return _c +} + // Schema provides a mock function with given fields: func (_m *MockMetaCache) Schema() *schemapb.CollectionSchema { ret := _m.Called() diff --git a/internal/datanode/metacache/segment.go b/internal/datanode/metacache/segment.go index 8b5bdd447a..85d1676c36 100644 --- a/internal/datanode/metacache/segment.go +++ b/internal/datanode/metacache/segment.go @@ -23,6 +23,12 @@ import ( "github.com/milvus-io/milvus/internal/storage" ) +const ( + // NullSegment means the segment id to discard + // happens when segment compacted to 0 lines and target segment is dropped directly + NullSegment = int64(-1) +) + type SegmentInfo struct { segmentID int64 partitionID int64 @@ -37,6 +43,7 @@ type SegmentInfo struct { compactTo int64 importing bool level datapb.SegmentLevel + syncingTasks int32 } func (s *SegmentInfo) SegmentID() int64 { @@ -101,6 +108,7 @@ func (s *SegmentInfo) Clone() *SegmentInfo { compactTo: s.compactTo, level: s.level, importing: s.importing, + syncingTasks: s.syncingTasks, } } diff --git a/internal/datanode/services_test.go b/internal/datanode/services_test.go index 794ac26e1f..324dea5337 100644 --- a/internal/datanode/services_test.go +++ b/internal/datanode/services_test.go @@ -684,8 +684,10 @@ func (s *DataNodeServicesSuite) TestSyncSegments() { s.Assert().True(ok) fg.metacache.AddSegment(&datapb.SegmentInfo{ID: 100, CollectionID: 1, State: commonpb.SegmentState_Flushed}, EmptyBfsFactory) + fg.metacache.AddSegment(&datapb.SegmentInfo{ID: 101, CollectionID: 1, State: commonpb.SegmentState_Flushed}, EmptyBfsFactory) fg.metacache.AddSegment(&datapb.SegmentInfo{ID: 200, CollectionID: 1, State: commonpb.SegmentState_Flushed}, EmptyBfsFactory) - fg.metacache.AddSegment(&datapb.SegmentInfo{ID: 200, CollectionID: 1, State: commonpb.SegmentState_Flushed}, EmptyBfsFactory) + fg.metacache.AddSegment(&datapb.SegmentInfo{ID: 201, CollectionID: 1, State: commonpb.SegmentState_Flushed}, EmptyBfsFactory) + fg.metacache.AddSegment(&datapb.SegmentInfo{ID: 300, CollectionID: 1, State: commonpb.SegmentState_Flushed}, EmptyBfsFactory) s.Run("invalid compacted from", func() { req := &datapb.SyncSegmentsRequest{ @@ -719,8 +721,9 @@ func (s *DataNodeServicesSuite) TestSyncSegments() { _, result := fg.metacache.GetSegmentByID(req.GetCompactedTo(), metacache.WithSegmentState(commonpb.SegmentState_Flushed)) s.True(result) for _, compactFrom := range req.GetCompactedFrom() { - _, result := fg.metacache.GetSegmentByID(compactFrom, metacache.WithSegmentState(commonpb.SegmentState_Flushed)) - s.False(result) + seg, result := fg.metacache.GetSegmentByID(compactFrom, metacache.WithSegmentState(commonpb.SegmentState_Flushed)) + s.True(result) + s.Equal(req.CompactedTo, seg.CompactTo()) } status, err = s.node.SyncSegments(s.ctx, req) @@ -748,7 +751,7 @@ func (s *DataNodeServicesSuite) TestSyncSegments() { req := &datapb.SyncSegmentsRequest{ CompactedFrom: []int64{100, 200}, - CompactedTo: 101, + CompactedTo: 301, NumOfRows: 0, ChannelName: chanName, CollectionId: 1, @@ -757,11 +760,13 @@ func (s *DataNodeServicesSuite) TestSyncSegments() { s.Assert().NoError(err) s.Assert().True(merr.Ok(status)) - _, result := fg.metacache.GetSegmentByID(100, metacache.WithSegmentState(commonpb.SegmentState_Flushed)) - s.False(result) - _, result = fg.metacache.GetSegmentByID(200, metacache.WithSegmentState(commonpb.SegmentState_Flushed)) - s.False(result) - _, result = fg.metacache.GetSegmentByID(101, metacache.WithSegmentState(commonpb.SegmentState_Flushed)) + seg, result := fg.metacache.GetSegmentByID(100, metacache.WithSegmentState(commonpb.SegmentState_Flushed)) + s.True(result) + s.Equal(metacache.NullSegment, seg.CompactTo()) + seg, result = fg.metacache.GetSegmentByID(200, metacache.WithSegmentState(commonpb.SegmentState_Flushed)) + s.True(result) + s.Equal(metacache.NullSegment, seg.CompactTo()) + _, result = fg.metacache.GetSegmentByID(301, metacache.WithSegmentState(commonpb.SegmentState_Flushed)) s.False(result) }) } diff --git a/internal/datanode/syncmgr/sync_manager_test.go b/internal/datanode/syncmgr/sync_manager_test.go index ac7ea8e4a6..6bfdb5fc46 100644 --- a/internal/datanode/syncmgr/sync_manager_test.go +++ b/internal/datanode/syncmgr/sync_manager_test.go @@ -151,6 +151,7 @@ func (s *SyncManagerSuite) TestSubmit() { bfs := metacache.NewBloomFilterSet() seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs) metacache.UpdateNumOfRows(1000)(seg) + s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(seg, true) s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg}) s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() @@ -182,6 +183,7 @@ func (s *SyncManagerSuite) TestCompacted() { seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs) metacache.UpdateNumOfRows(1000)(seg) metacache.CompactTo(1001)(seg) + s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(seg, true) s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg}) s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() @@ -212,6 +214,7 @@ func (s *SyncManagerSuite) TestBlock() { bfs := metacache.NewBloomFilterSet() seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs) metacache.UpdateNumOfRows(1000)(seg) + s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(seg, true) s.metacache.EXPECT().GetSegmentsBy(mock.Anything). RunAndReturn(func(...metacache.SegmentFilter) []*metacache.SegmentInfo { return []*metacache.SegmentInfo{seg} diff --git a/internal/datanode/syncmgr/task.go b/internal/datanode/syncmgr/task.go index c97238abab..897aead92a 100644 --- a/internal/datanode/syncmgr/task.go +++ b/internal/datanode/syncmgr/task.go @@ -82,14 +82,18 @@ func (t *SyncTask) Run() error { log := t.getLogger() var err error - infos := t.metacache.GetSegmentsBy(metacache.WithSegmentIDs(t.segmentID)) - if len(infos) == 0 { + segment, has := t.metacache.GetSegmentByID(t.segmentID) + if !has { log.Warn("failed to sync data, segment not found in metacache") t.handleError(err) return merr.WrapErrSegmentNotFound(t.segmentID) } - segment := infos[0] + if segment.CompactTo() == metacache.NullSegment { + log.Info("segment compacted to zero-length segment, discard sync task") + return nil + } + if segment.CompactTo() > 0 { log.Info("syncing segment compacted, update segment id", zap.Int64("compactTo", segment.CompactTo())) // update sync task segment id diff --git a/internal/datanode/syncmgr/task_test.go b/internal/datanode/syncmgr/task_test.go index 5e2da28e9b..62bfd52f4e 100644 --- a/internal/datanode/syncmgr/task_test.go +++ b/internal/datanode/syncmgr/task_test.go @@ -167,6 +167,7 @@ func (s *SyncTaskSuite) TestRunNormal() { seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs) metacache.UpdateNumOfRows(1000)(seg) seg.GetBloomFilterSet().Roll() + s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(seg, true) s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg}) s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() @@ -238,9 +239,45 @@ func (s *SyncTaskSuite) TestRunNormal() { }) } +func (s *SyncTaskSuite) TestCompactToNull() { + bfs := metacache.NewBloomFilterSet() + fd, err := storage.NewFieldData(schemapb.DataType_Int64, &schemapb.FieldSchema{ + FieldID: 101, + Name: "ID", + IsPrimaryKey: true, + DataType: schemapb.DataType_Int64, + }) + s.Require().NoError(err) + + ids := []int64{1, 2, 3, 4, 5, 6, 7} + for _, id := range ids { + err = fd.AppendRow(id) + s.Require().NoError(err) + } + + bfs.UpdatePKRange(fd) + seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs) + metacache.UpdateNumOfRows(1000)(seg) + metacache.CompactTo(metacache.NullSegment)(seg) + seg.GetBloomFilterSet().Roll() + s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(seg, true) + + task := s.getSuiteSyncTask() + task.WithMetaWriter(BrokerMetaWriter(s.broker)) + task.WithTimeRange(50, 100) + task.WithCheckpoint(&msgpb.MsgPosition{ + ChannelName: s.channelName, + MsgID: []byte{1, 2, 3, 4}, + Timestamp: 100, + }) + + err = task.Run() + s.NoError(err) +} + func (s *SyncTaskSuite) TestRunError() { s.Run("segment_not_found", func() { - s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{}) + s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(nil, false) flag := false handler := func(_ error) { flag = true } task := s.getSuiteSyncTask().WithFailureCallback(handler) @@ -255,7 +292,7 @@ func (s *SyncTaskSuite) TestRunError() { s.metacache.ExpectedCalls = nil seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, metacache.NewBloomFilterSet()) metacache.UpdateNumOfRows(1000)(seg) - s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg}) + s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(seg, true) s.Run("serialize_insert_fail", func() { flag := false handler := func(_ error) { flag = true } diff --git a/internal/datanode/writebuffer/bf_write_buffer.go b/internal/datanode/writebuffer/bf_write_buffer.go index 2d56977dd2..988188ff3c 100644 --- a/internal/datanode/writebuffer/bf_write_buffer.go +++ b/internal/datanode/writebuffer/bf_write_buffer.go @@ -71,5 +71,7 @@ func (wb *bfWriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsg wb.checkpoint = endPos _ = wb.triggerSync() + + wb.cleanupCompactedSegments() return nil } diff --git a/internal/datanode/writebuffer/bf_write_buffer_test.go b/internal/datanode/writebuffer/bf_write_buffer_test.go index 2c59b97e52..9437603aed 100644 --- a/internal/datanode/writebuffer/bf_write_buffer_test.go +++ b/internal/datanode/writebuffer/bf_write_buffer_test.go @@ -161,6 +161,7 @@ func (s *BFWriteBufferSuite) TestBufferData() { s.metacache.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false) s.metacache.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return() s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() + s.metacache.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything).Return([]int64{}) pks, msg := s.composeInsertMsg(1000, 10, 128) delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) })) @@ -187,6 +188,7 @@ func (s *BFWriteBufferSuite) TestAutoSync() { s.metacache.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false) s.metacache.EXPECT().GetSegmentByID(int64(1002)).Return(seg, true) s.metacache.EXPECT().GetSegmentIDsBy(mock.Anything).Return([]int64{1002}) + s.metacache.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything).Return([]int64{}) s.metacache.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return() s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything, mock.Anything).Return() @@ -218,6 +220,7 @@ func (s *BFWriteBufferSuite) TestBufferDataWithStorageV2() { seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet()) s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}) s.metacache.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false) + s.metacache.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything).Return([]int64{}) s.metacache.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return() s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() @@ -257,6 +260,8 @@ func (s *BFWriteBufferSuite) TestAutoSyncWithStorageV2() { s.metacache.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false) s.metacache.EXPECT().GetSegmentByID(int64(1002)).Return(seg, true) s.metacache.EXPECT().GetSegmentIDsBy(mock.Anything).Return([]int64{1002}) + s.metacache.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything).Return([]int64{1003}) // mocked compacted + s.metacache.EXPECT().RemoveSegments(mock.Anything).Return([]int64{1003}) s.metacache.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return() s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything, mock.Anything).Return() diff --git a/internal/datanode/writebuffer/l0_write_buffer.go b/internal/datanode/writebuffer/l0_write_buffer.go index 36f31bf19c..828c04141c 100644 --- a/internal/datanode/writebuffer/l0_write_buffer.go +++ b/internal/datanode/writebuffer/l0_write_buffer.go @@ -73,6 +73,8 @@ func (wb *l0WriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsg delete(wb.l0Segments, partition) } } + + wb.cleanupCompactedSegments() return nil } diff --git a/internal/datanode/writebuffer/l0_write_buffer_test.go b/internal/datanode/writebuffer/l0_write_buffer_test.go index ac746556f4..bcb21533f1 100644 --- a/internal/datanode/writebuffer/l0_write_buffer_test.go +++ b/internal/datanode/writebuffer/l0_write_buffer_test.go @@ -156,6 +156,7 @@ func (s *L0WriteBufferSuite) TestBufferData() { s.metacache.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false) s.metacache.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return() s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() + s.metacache.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything).Return([]int64{}) err = wb.BufferData([]*msgstream.InsertMsg{msg}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200}) s.NoError(err) diff --git a/internal/datanode/writebuffer/write_buffer.go b/internal/datanode/writebuffer/write_buffer.go index 9c3976f144..e3d139a686 100644 --- a/internal/datanode/writebuffer/write_buffer.go +++ b/internal/datanode/writebuffer/write_buffer.go @@ -164,6 +164,22 @@ func (wb *writeBufferBase) triggerSync() (segmentIDs []int64) { return segmentsToSync } +func (wb *writeBufferBase) cleanupCompactedSegments() { + segmentIDs := wb.metaCache.GetSegmentIDsBy(metacache.WithCompacted(), metacache.WithNoSyncingTask()) + // remove compacted only when there is no writebuffer + targetIDs := lo.Filter(segmentIDs, func(segmentID int64, _ int) bool { + _, ok := wb.buffers[segmentID] + return !ok + }) + if len(targetIDs) == 0 { + return + } + removed := wb.metaCache.RemoveSegments(metacache.WithSegmentIDs(targetIDs...)) + if len(removed) > 0 { + log.Info("remove compacted segments", zap.Int64s("removed", removed)) + } +} + func (wb *writeBufferBase) flushSegments(ctx context.Context, segmentIDs []int64) error { // mark segment flushing if segment was growing wb.metaCache.UpdateSegments(metacache.UpdateState(commonpb.SegmentState_Flushing), @@ -320,9 +336,9 @@ func (wb *writeBufferBase) getSyncTask(ctx context.Context, segmentID int64) syn actions := []metacache.SegmentAction{metacache.RollStats()} if insert != nil { - actions = append(actions, metacache.StartSyncing(int64(insert.GetRowNum()))) batchSize = int64(insert.GetRowNum()) } + actions = append(actions, metacache.StartSyncing(batchSize)) wb.metaCache.UpdateSegments(metacache.MergeSegmentAction(actions...), metacache.WithSegmentIDs(segmentID)) var syncTask syncmgr.Task