From f3e5a53fc59ca8dd10d12a9d875a08b46ec0647a Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Fri, 7 Nov 2025 11:49:34 +0800 Subject: [PATCH] fix: [2.6]Accidentally ignored sealed segments in L0 Compaction (#45341) When there're no growing segments in the collection, L0 Compaction will try to choose all L0 segments that hits all L1/L2 segments. However, if there's Sealed Segment still under flushing in DataNode at the same time L0 Compaction selects satisfied L1/L2 segments, L0 Compaction will ignore this Segment because it's not in "FlushState", which is wrong, causing missing deletes on the Sealed Segment. This quick solution here is to fail this L0 compaction task once selected a Sealed segment. See also: #45339 pr: #45340 --------- Signed-off-by: yangxuan --- internal/datacoord/compaction_task.go | 2 - .../datacoord/compaction_task_clustering.go | 4 -- internal/datacoord/compaction_task_l0.go | 49 +++++++++---------- internal/datacoord/compaction_task_mix.go | 4 -- 4 files changed, 24 insertions(+), 35 deletions(-) diff --git a/internal/datacoord/compaction_task.go b/internal/datacoord/compaction_task.go index 00386346b2..c6a0bba83b 100644 --- a/internal/datacoord/compaction_task.go +++ b/internal/datacoord/compaction_task.go @@ -45,8 +45,6 @@ type CompactionTask interface { SetNodeID(UniqueID) error NeedReAssignNodeID() bool SaveTaskMeta() error - - CheckCompactionContainsSegment(segmentID int64) bool } type compactionTaskOpt func(task *datapb.CompactionTask) diff --git a/internal/datacoord/compaction_task_clustering.go b/internal/datacoord/compaction_task_clustering.go index caa6769aa6..1a5329f661 100644 --- a/internal/datacoord/compaction_task_clustering.go +++ b/internal/datacoord/compaction_task_clustering.go @@ -332,10 +332,6 @@ func (t *clusteringCompactionTask) Clean() bool { return t.doClean() == nil } -func (t *clusteringCompactionTask) CheckCompactionContainsSegment(segmentID int64) bool { - return false -} - func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) { taskProto := t.taskProto.Load().(*datapb.CompactionTask) logIDRange, err := PreAllocateBinlogIDs(t.allocator, t.meta.GetSegmentInfos(taskProto.GetInputSegments())) diff --git a/internal/datacoord/compaction_task_l0.go b/internal/datacoord/compaction_task_l0.go index 00d623b839..e6c5e7b2c1 100644 --- a/internal/datacoord/compaction_task_l0.go +++ b/internal/datacoord/compaction_task_l0.go @@ -22,7 +22,6 @@ import ( "time" "github.com/cockroachdb/errors" - "github.com/samber/lo" "go.uber.org/atomic" "go.uber.org/zap" "google.golang.org/protobuf/proto" @@ -274,21 +273,27 @@ func (t *l0CompactionTask) ShadowClone(opts ...compactionTaskOpt) *datapb.Compac return taskClone } -func (t *l0CompactionTask) selectSealedSegment() ([]*SegmentInfo, []*datapb.CompactionSegmentBinlogs) { +func (t *l0CompactionTask) selectFlushedSegment() ([]*SegmentInfo, []*datapb.CompactionSegmentBinlogs, error) { taskProto := t.taskProto.Load().(*datapb.CompactionTask) - // Select sealed L1 segments for LevelZero compaction that meets the condition: + // Select flushed L1/L2 segments for LevelZero compaction that meets the condition: // dmlPos < triggerInfo.pos - sealedSegments := t.meta.SelectSegments(context.TODO(), WithCollection(taskProto.GetCollectionID()), SegmentFilterFunc(func(info *SegmentInfo) bool { + flushedSegments := t.meta.SelectSegments(context.TODO(), WithCollection(taskProto.GetCollectionID()), SegmentFilterFunc(func(info *SegmentInfo) bool { return (taskProto.GetPartitionID() == common.AllPartitionsID || info.GetPartitionID() == taskProto.GetPartitionID()) && info.GetInsertChannel() == taskProto.GetChannel() && - isFlushState(info.GetState()) && + (info.GetState() == commonpb.SegmentState_Sealed || isFlushState(info.GetState())) && !info.GetIsImporting() && info.GetLevel() != datapb.SegmentLevel_L0 && info.GetStartPosition().GetTimestamp() < taskProto.GetPos().GetTimestamp() })) - sealedSegBinlogs := lo.Map(sealedSegments, func(info *SegmentInfo, _ int) *datapb.CompactionSegmentBinlogs { - return &datapb.CompactionSegmentBinlogs{ + sealedSegBinlogs := []*datapb.CompactionSegmentBinlogs{} + for _, info := range flushedSegments { + // Sealed is unexpected, fail fast + if info.GetState() == commonpb.SegmentState_Sealed { + return nil, nil, fmt.Errorf("L0 compaction selected invalid sealed segment %d", info.GetID()) + } + + sealedSegBinlogs = append(sealedSegBinlogs, &datapb.CompactionSegmentBinlogs{ SegmentID: info.GetID(), Field2StatslogPaths: info.GetStatslogs(), InsertChannel: info.GetInsertChannel(), @@ -296,20 +301,10 @@ func (t *l0CompactionTask) selectSealedSegment() ([]*SegmentInfo, []*datapb.Comp CollectionID: info.GetCollectionID(), PartitionID: info.GetPartitionID(), IsSorted: info.GetIsSorted(), - } - }) - - return sealedSegments, sealedSegBinlogs -} - -func (t *l0CompactionTask) CheckCompactionContainsSegment(segmentID int64) bool { - sealedSegmentIDs, _ := t.selectSealedSegment() - for _, sealedSegment := range sealedSegmentIDs { - if sealedSegment.GetID() == segmentID { - return true - } + }) } - return false + + return flushedSegments, sealedSegBinlogs, nil } func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) { @@ -349,14 +344,18 @@ func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, err segments = append(segments, segInfo) } - sealedSegments, sealedSegBinlogs := t.selectSealedSegment() - if len(sealedSegments) == 0 { + flushedSegments, flushedSegBinlogs, err := t.selectFlushedSegment() + if err != nil { + log.Warn("invalid L0 compaction plan, unable to select flushed segments", zap.Error(err)) + return nil, err + } + if len(flushedSegments) == 0 { // TODO fast finish l0 segment, just drop l0 segment log.Info("l0Compaction available non-L0 Segments is empty ") return nil, errors.Errorf("Selected zero L1/L2 segments for the position=%v", taskProto.GetPos()) } - segments = append(segments, sealedSegments...) + segments = append(segments, flushedSegments...) logIDRange, err := PreAllocateBinlogIDs(t.allocator, segments) if err != nil { return nil, err @@ -365,10 +364,10 @@ func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, err // BeginLogID is deprecated, but still assign it for compatibility. plan.BeginLogID = logIDRange.Begin - plan.SegmentBinlogs = append(plan.SegmentBinlogs, sealedSegBinlogs...) + plan.SegmentBinlogs = append(plan.SegmentBinlogs, flushedSegBinlogs...) log.Info("l0CompactionTask refreshed level zero compaction plan", zap.Any("target position", taskProto.GetPos()), - zap.Any("target segments count", len(sealedSegBinlogs)), + zap.Any("target segments count", len(flushedSegBinlogs)), zap.Any("PreAllocatedLogIDs", logIDRange)) WrapPluginContext(taskProto.GetCollectionID(), taskProto.GetSchema().GetProperties(), plan) diff --git a/internal/datacoord/compaction_task_mix.go b/internal/datacoord/compaction_task_mix.go index d1ec3fb20f..fdb0c3b539 100644 --- a/internal/datacoord/compaction_task_mix.go +++ b/internal/datacoord/compaction_task_mix.go @@ -365,10 +365,6 @@ func (t *mixCompactionTask) SetTask(task *datapb.CompactionTask) { t.taskProto.Store(task) } -func (t *mixCompactionTask) CheckCompactionContainsSegment(segmentID int64) bool { - return false -} - func (t *mixCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) { compactionParams, err := compaction.GenerateJSONParams() if err != nil {