mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
When there're no growing segments in the collection, L0 Compaction will try to choose all L0 segments that hits all L1/L2 segments. However, if there's Sealed Segment still under flushing in DataNode at the same time L0 Compaction selects satisfied L1/L2 segments, L0 Compaction will ignore this Segment because it's not in "FlushState", which is wrong, causing missing deletes on the Sealed Segment. This quick solution here is to fail this L0 compaction task once selected a Sealed segment. See also: #45339 pr: #45340 pr: #45341 Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
parent
93e8c5465a
commit
2d6c736448
@ -275,12 +275,12 @@ func (t *l0CompactionTask) ShadowClone(opts ...compactionTaskOpt) *datapb.Compac
|
||||
|
||||
func (t *l0CompactionTask) selectSealedSegment() ([]*SegmentInfo, []*datapb.CompactionSegmentBinlogs) {
|
||||
taskProto := t.taskProto.Load().(*datapb.CompactionTask)
|
||||
// Select sealed L1 segments for LevelZero compaction that meets the condition:
|
||||
// Select sealed L1/L2 segments for LevelZero compaction that meets the condition:
|
||||
// dmlPos < triggerInfo.pos
|
||||
sealedSegments := t.meta.SelectSegments(context.TODO(), WithCollection(taskProto.GetCollectionID()), SegmentFilterFunc(func(info *SegmentInfo) bool {
|
||||
return (taskProto.GetPartitionID() == common.AllPartitionsID || info.GetPartitionID() == taskProto.GetPartitionID()) &&
|
||||
info.GetInsertChannel() == taskProto.GetChannel() &&
|
||||
isFlushState(info.GetState()) &&
|
||||
(info.GetState() == commonpb.SegmentState_Sealed || isFlushState(info.GetState())) &&
|
||||
!info.GetIsImporting() &&
|
||||
info.GetLevel() != datapb.SegmentLevel_L0 &&
|
||||
info.GetStartPosition().GetTimestamp() < taskProto.GetPos().GetTimestamp()
|
||||
@ -359,7 +359,18 @@ func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, err
|
||||
return nil, errors.Errorf("Selected zero L1/L2 segments for the position=%v", taskProto.GetPos())
|
||||
}
|
||||
|
||||
segments = append(segments, sealedSegments...)
|
||||
for _, seg := range sealedSegments {
|
||||
if seg.GetState() == commonpb.SegmentState_Sealed {
|
||||
log.Warn("invalid L0 compaction plan, selected sealed L1 segments",
|
||||
zap.Int64("segmentID", seg.GetID()),
|
||||
zap.String("state", seg.GetState().String()),
|
||||
zap.Uint64("position", taskProto.GetPos().GetTimestamp()),
|
||||
)
|
||||
return nil, errors.Errorf("Invalid L0 compaction plan, selected sealed L1/L2 segments for the position=%v", taskProto.GetPos())
|
||||
}
|
||||
segments = append(segments, seg)
|
||||
}
|
||||
|
||||
logIDRange, err := PreAllocateBinlogIDs(t.allocator, segments)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user