From 2cd8daaf0b9088644b40d4593f5874c3ebe9b9d4 Mon Sep 17 00:00:00 2001 From: congqixia Date: Thu, 30 Nov 2023 10:20:28 +0800 Subject: [PATCH] fix: compacted segment still buffers delta data (#28816) Related to #28628 Compacted segment syncing counter is not set correctly in sync task and the bf write buffer shall not use compacted segment as candidate when buffering delta data --------- Signed-off-by: Congqi Xia --- internal/datanode/syncmgr/task.go | 5 +++-- internal/datanode/writebuffer/bf_write_buffer.go | 3 +++ internal/datanode/writebuffer/bf_write_buffer_test.go | 5 ++++- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/internal/datanode/syncmgr/task.go b/internal/datanode/syncmgr/task.go index 077b6fd919..0acf490950 100644 --- a/internal/datanode/syncmgr/task.go +++ b/internal/datanode/syncmgr/task.go @@ -87,8 +87,9 @@ func (t *SyncTask) Run() error { t.segment, has = t.metacache.GetSegmentByID(t.segmentID) if !has { log.Warn("failed to sync data, segment not found in metacache") + err := merr.WrapErrSegmentNotFound(t.segmentID) t.handleError(err) - return merr.WrapErrSegmentNotFound(t.segmentID) + return err } if t.segment.CompactTo() == metacache.NullSegment { @@ -141,7 +142,7 @@ func (t *SyncTask) Run() error { actions = append(actions, metacache.UpdateState(commonpb.SegmentState_Flushed)) } - t.metacache.UpdateSegments(metacache.MergeSegmentAction(actions...), metacache.WithSegmentIDs(t.segmentID)) + t.metacache.UpdateSegments(metacache.MergeSegmentAction(actions...), metacache.WithSegmentIDs(t.segment.SegmentID())) log.Info("task done") return nil diff --git a/internal/datanode/writebuffer/bf_write_buffer.go b/internal/datanode/writebuffer/bf_write_buffer.go index 988188ff3c..6fe27d7898 100644 --- a/internal/datanode/writebuffer/bf_write_buffer.go +++ b/internal/datanode/writebuffer/bf_write_buffer.go @@ -53,6 +53,9 @@ func (wb *bfWriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsg segments := wb.metaCache.GetSegmentsBy(metacache.WithPartitionID(delMsg.PartitionID), metacache.WithSegmentState(commonpb.SegmentState_Growing, commonpb.SegmentState_Flushing, commonpb.SegmentState_Flushed)) for _, segment := range segments { + if segment.CompactTo() != 0 { + continue + } var deletePks []storage.PrimaryKey var deleteTss []typeutil.Timestamp for idx, pk := range pks { diff --git a/internal/datanode/writebuffer/bf_write_buffer_test.go b/internal/datanode/writebuffer/bf_write_buffer_test.go index 48d69d7a72..66d2d164cf 100644 --- a/internal/datanode/writebuffer/bf_write_buffer_test.go +++ b/internal/datanode/writebuffer/bf_write_buffer_test.go @@ -256,7 +256,10 @@ func (s *BFWriteBufferSuite) TestAutoSyncWithStorageV2() { s.NoError(err) seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet()) - s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}) + segCompacted := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet()) + metacache.CompactTo(2001)(segCompacted) + + s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg, segCompacted}) s.metacache.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false) s.metacache.EXPECT().GetSegmentByID(int64(1002)).Return(seg, true) s.metacache.EXPECT().GetSegmentIDsBy(mock.Anything).Return([]int64{1002})