mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-08 10:08:42 +08:00
fix: compacted segment still buffers delta data (#28816)
Related to #28628 Compacted segment syncing counter is not set correctly in sync task and the bf write buffer shall not use compacted segment as candidate when buffering delta data --------- Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
f9bb8e9648
commit
2cd8daaf0b
@ -87,8 +87,9 @@ func (t *SyncTask) Run() error {
|
|||||||
t.segment, has = t.metacache.GetSegmentByID(t.segmentID)
|
t.segment, has = t.metacache.GetSegmentByID(t.segmentID)
|
||||||
if !has {
|
if !has {
|
||||||
log.Warn("failed to sync data, segment not found in metacache")
|
log.Warn("failed to sync data, segment not found in metacache")
|
||||||
|
err := merr.WrapErrSegmentNotFound(t.segmentID)
|
||||||
t.handleError(err)
|
t.handleError(err)
|
||||||
return merr.WrapErrSegmentNotFound(t.segmentID)
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if t.segment.CompactTo() == metacache.NullSegment {
|
if t.segment.CompactTo() == metacache.NullSegment {
|
||||||
@ -141,7 +142,7 @@ func (t *SyncTask) Run() error {
|
|||||||
actions = append(actions, metacache.UpdateState(commonpb.SegmentState_Flushed))
|
actions = append(actions, metacache.UpdateState(commonpb.SegmentState_Flushed))
|
||||||
}
|
}
|
||||||
|
|
||||||
t.metacache.UpdateSegments(metacache.MergeSegmentAction(actions...), metacache.WithSegmentIDs(t.segmentID))
|
t.metacache.UpdateSegments(metacache.MergeSegmentAction(actions...), metacache.WithSegmentIDs(t.segment.SegmentID()))
|
||||||
|
|
||||||
log.Info("task done")
|
log.Info("task done")
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@ -53,6 +53,9 @@ func (wb *bfWriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsg
|
|||||||
segments := wb.metaCache.GetSegmentsBy(metacache.WithPartitionID(delMsg.PartitionID),
|
segments := wb.metaCache.GetSegmentsBy(metacache.WithPartitionID(delMsg.PartitionID),
|
||||||
metacache.WithSegmentState(commonpb.SegmentState_Growing, commonpb.SegmentState_Flushing, commonpb.SegmentState_Flushed))
|
metacache.WithSegmentState(commonpb.SegmentState_Growing, commonpb.SegmentState_Flushing, commonpb.SegmentState_Flushed))
|
||||||
for _, segment := range segments {
|
for _, segment := range segments {
|
||||||
|
if segment.CompactTo() != 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
var deletePks []storage.PrimaryKey
|
var deletePks []storage.PrimaryKey
|
||||||
var deleteTss []typeutil.Timestamp
|
var deleteTss []typeutil.Timestamp
|
||||||
for idx, pk := range pks {
|
for idx, pk := range pks {
|
||||||
|
|||||||
@ -256,7 +256,10 @@ func (s *BFWriteBufferSuite) TestAutoSyncWithStorageV2() {
|
|||||||
s.NoError(err)
|
s.NoError(err)
|
||||||
|
|
||||||
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet())
|
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet())
|
||||||
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
|
segCompacted := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet())
|
||||||
|
metacache.CompactTo(2001)(segCompacted)
|
||||||
|
|
||||||
|
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg, segCompacted})
|
||||||
s.metacache.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false)
|
s.metacache.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false)
|
||||||
s.metacache.EXPECT().GetSegmentByID(int64(1002)).Return(seg, true)
|
s.metacache.EXPECT().GetSegmentByID(int64(1002)).Return(seg, true)
|
||||||
s.metacache.EXPECT().GetSegmentIDsBy(mock.Anything).Return([]int64{1002})
|
s.metacache.EXPECT().GetSegmentIDsBy(mock.Anything).Return([]int64{1002})
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user