fix: Accidentally ignored sealed segments in L0 Compaction (#45340)

When there're no growing segments in the collection, L0 Compaction will
try to choose all L0 segments that hits all L1/L2 segments.

However, if there's Sealed Segment still under flushing in DataNode at
the same time L0 Compaction selects satisfied L1/L2 segments, L0
Compaction will ignore this Segment because it's not in "FlushState",
which is wrong, causing missing deletes on the Sealed Segment.

This quick solution here is to fail this L0 compaction task once
selected a Sealed segment.

See also: #45339

---------

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
XuanYang-cn 2025-11-06 16:53:38 +08:00 committed by GitHub
parent a2282d61cb
commit 2dd2c96eb1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 24 additions and 35 deletions

View File

@ -45,8 +45,6 @@ type CompactionTask interface {
SetNodeID(UniqueID) error SetNodeID(UniqueID) error
NeedReAssignNodeID() bool NeedReAssignNodeID() bool
SaveTaskMeta() error SaveTaskMeta() error
CheckCompactionContainsSegment(segmentID int64) bool
} }
type compactionTaskOpt func(task *datapb.CompactionTask) type compactionTaskOpt func(task *datapb.CompactionTask)

View File

@ -332,10 +332,6 @@ func (t *clusteringCompactionTask) Clean() bool {
return t.doClean() == nil return t.doClean() == nil
} }
func (t *clusteringCompactionTask) CheckCompactionContainsSegment(segmentID int64) bool {
return false
}
func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) { func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) {
taskProto := t.taskProto.Load().(*datapb.CompactionTask) taskProto := t.taskProto.Load().(*datapb.CompactionTask)
logIDRange, err := PreAllocateBinlogIDs(t.allocator, t.meta.GetSegmentInfos(taskProto.GetInputSegments())) logIDRange, err := PreAllocateBinlogIDs(t.allocator, t.meta.GetSegmentInfos(taskProto.GetInputSegments()))

View File

@ -22,7 +22,6 @@ import (
"time" "time"
"github.com/cockroachdb/errors" "github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.uber.org/atomic" "go.uber.org/atomic"
"go.uber.org/zap" "go.uber.org/zap"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
@ -274,21 +273,27 @@ func (t *l0CompactionTask) ShadowClone(opts ...compactionTaskOpt) *datapb.Compac
return taskClone return taskClone
} }
func (t *l0CompactionTask) selectSealedSegment() ([]*SegmentInfo, []*datapb.CompactionSegmentBinlogs) { func (t *l0CompactionTask) selectFlushedSegment() ([]*SegmentInfo, []*datapb.CompactionSegmentBinlogs, error) {
taskProto := t.taskProto.Load().(*datapb.CompactionTask) taskProto := t.taskProto.Load().(*datapb.CompactionTask)
// Select sealed L1 segments for LevelZero compaction that meets the condition: // Select flushed L1/L2 segments for LevelZero compaction that meets the condition:
// dmlPos < triggerInfo.pos // dmlPos < triggerInfo.pos
sealedSegments := t.meta.SelectSegments(context.TODO(), WithCollection(taskProto.GetCollectionID()), SegmentFilterFunc(func(info *SegmentInfo) bool { flushedSegments := t.meta.SelectSegments(context.TODO(), WithCollection(taskProto.GetCollectionID()), SegmentFilterFunc(func(info *SegmentInfo) bool {
return (taskProto.GetPartitionID() == common.AllPartitionsID || info.GetPartitionID() == taskProto.GetPartitionID()) && return (taskProto.GetPartitionID() == common.AllPartitionsID || info.GetPartitionID() == taskProto.GetPartitionID()) &&
info.GetInsertChannel() == taskProto.GetChannel() && info.GetInsertChannel() == taskProto.GetChannel() &&
isFlushState(info.GetState()) && (info.GetState() == commonpb.SegmentState_Sealed || isFlushState(info.GetState())) &&
!info.GetIsImporting() && !info.GetIsImporting() &&
info.GetLevel() != datapb.SegmentLevel_L0 && info.GetLevel() != datapb.SegmentLevel_L0 &&
info.GetStartPosition().GetTimestamp() < taskProto.GetPos().GetTimestamp() info.GetStartPosition().GetTimestamp() < taskProto.GetPos().GetTimestamp()
})) }))
sealedSegBinlogs := lo.Map(sealedSegments, func(info *SegmentInfo, _ int) *datapb.CompactionSegmentBinlogs { sealedSegBinlogs := []*datapb.CompactionSegmentBinlogs{}
return &datapb.CompactionSegmentBinlogs{ for _, info := range flushedSegments {
// Sealed is unexpected, fail fast
if info.GetState() == commonpb.SegmentState_Sealed {
return nil, nil, fmt.Errorf("L0 compaction selected invalid sealed segment %d", info.GetID())
}
sealedSegBinlogs = append(sealedSegBinlogs, &datapb.CompactionSegmentBinlogs{
SegmentID: info.GetID(), SegmentID: info.GetID(),
Field2StatslogPaths: info.GetStatslogs(), Field2StatslogPaths: info.GetStatslogs(),
InsertChannel: info.GetInsertChannel(), InsertChannel: info.GetInsertChannel(),
@ -296,20 +301,10 @@ func (t *l0CompactionTask) selectSealedSegment() ([]*SegmentInfo, []*datapb.Comp
CollectionID: info.GetCollectionID(), CollectionID: info.GetCollectionID(),
PartitionID: info.GetPartitionID(), PartitionID: info.GetPartitionID(),
IsSorted: info.GetIsSorted(), IsSorted: info.GetIsSorted(),
} })
})
return sealedSegments, sealedSegBinlogs
}
func (t *l0CompactionTask) CheckCompactionContainsSegment(segmentID int64) bool {
sealedSegmentIDs, _ := t.selectSealedSegment()
for _, sealedSegment := range sealedSegmentIDs {
if sealedSegment.GetID() == segmentID {
return true
}
} }
return false
return flushedSegments, sealedSegBinlogs, nil
} }
func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) { func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) {
@ -349,14 +344,18 @@ func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, err
segments = append(segments, segInfo) segments = append(segments, segInfo)
} }
sealedSegments, sealedSegBinlogs := t.selectSealedSegment() flushedSegments, flushedSegBinlogs, err := t.selectFlushedSegment()
if len(sealedSegments) == 0 { if err != nil {
log.Warn("invalid L0 compaction plan, unable to select flushed segments", zap.Error(err))
return nil, err
}
if len(flushedSegments) == 0 {
// TODO fast finish l0 segment, just drop l0 segment // TODO fast finish l0 segment, just drop l0 segment
log.Info("l0Compaction available non-L0 Segments is empty ") log.Info("l0Compaction available non-L0 Segments is empty ")
return nil, errors.Errorf("Selected zero L1/L2 segments for the position=%v", taskProto.GetPos()) return nil, errors.Errorf("Selected zero L1/L2 segments for the position=%v", taskProto.GetPos())
} }
segments = append(segments, sealedSegments...) segments = append(segments, flushedSegments...)
logIDRange, err := PreAllocateBinlogIDs(t.allocator, segments) logIDRange, err := PreAllocateBinlogIDs(t.allocator, segments)
if err != nil { if err != nil {
return nil, err return nil, err
@ -365,10 +364,10 @@ func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, err
// BeginLogID is deprecated, but still assign it for compatibility. // BeginLogID is deprecated, but still assign it for compatibility.
plan.BeginLogID = logIDRange.Begin plan.BeginLogID = logIDRange.Begin
plan.SegmentBinlogs = append(plan.SegmentBinlogs, sealedSegBinlogs...) plan.SegmentBinlogs = append(plan.SegmentBinlogs, flushedSegBinlogs...)
log.Info("l0CompactionTask refreshed level zero compaction plan", log.Info("l0CompactionTask refreshed level zero compaction plan",
zap.Any("target position", taskProto.GetPos()), zap.Any("target position", taskProto.GetPos()),
zap.Any("target segments count", len(sealedSegBinlogs)), zap.Any("target segments count", len(flushedSegBinlogs)),
zap.Any("PreAllocatedLogIDs", logIDRange)) zap.Any("PreAllocatedLogIDs", logIDRange))
WrapPluginContext(taskProto.GetCollectionID(), taskProto.GetSchema().GetProperties(), plan) WrapPluginContext(taskProto.GetCollectionID(), taskProto.GetSchema().GetProperties(), plan)

View File

@ -365,10 +365,6 @@ func (t *mixCompactionTask) SetTask(task *datapb.CompactionTask) {
t.taskProto.Store(task) t.taskProto.Store(task)
} }
func (t *mixCompactionTask) CheckCompactionContainsSegment(segmentID int64) bool {
return false
}
func (t *mixCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) { func (t *mixCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) {
compactionParams, err := compaction.GenerateJSONParams() compactionParams, err := compaction.GenerateJSONParams()
if err != nil { if err != nil {