diff --git a/internal/datacoord/compaction_task.go b/internal/datacoord/compaction_task.go index 00386346b2..c6a0bba83b 100644 --- a/internal/datacoord/compaction_task.go +++ b/internal/datacoord/compaction_task.go @@ -45,8 +45,6 @@ type CompactionTask interface { SetNodeID(UniqueID) error NeedReAssignNodeID() bool SaveTaskMeta() error - - CheckCompactionContainsSegment(segmentID int64) bool } type compactionTaskOpt func(task *datapb.CompactionTask) diff --git a/internal/datacoord/compaction_task_clustering.go b/internal/datacoord/compaction_task_clustering.go index caa6769aa6..1a5329f661 100644 --- a/internal/datacoord/compaction_task_clustering.go +++ b/internal/datacoord/compaction_task_clustering.go @@ -332,10 +332,6 @@ func (t *clusteringCompactionTask) Clean() bool { return t.doClean() == nil } -func (t *clusteringCompactionTask) CheckCompactionContainsSegment(segmentID int64) bool { - return false -} - func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) { taskProto := t.taskProto.Load().(*datapb.CompactionTask) logIDRange, err := PreAllocateBinlogIDs(t.allocator, t.meta.GetSegmentInfos(taskProto.GetInputSegments())) diff --git a/internal/datacoord/compaction_task_l0.go b/internal/datacoord/compaction_task_l0.go index 00d623b839..e6c5e7b2c1 100644 --- a/internal/datacoord/compaction_task_l0.go +++ b/internal/datacoord/compaction_task_l0.go @@ -22,7 +22,6 @@ import ( "time" "github.com/cockroachdb/errors" - "github.com/samber/lo" "go.uber.org/atomic" "go.uber.org/zap" "google.golang.org/protobuf/proto" @@ -274,21 +273,27 @@ func (t *l0CompactionTask) ShadowClone(opts ...compactionTaskOpt) *datapb.Compac return taskClone } -func (t *l0CompactionTask) selectSealedSegment() ([]*SegmentInfo, []*datapb.CompactionSegmentBinlogs) { +func (t *l0CompactionTask) selectFlushedSegment() ([]*SegmentInfo, []*datapb.CompactionSegmentBinlogs, error) { taskProto := t.taskProto.Load().(*datapb.CompactionTask) - // Select sealed L1 segments for LevelZero compaction that meets the condition: + // Select flushed L1/L2 segments for LevelZero compaction that meets the condition: // dmlPos < triggerInfo.pos - sealedSegments := t.meta.SelectSegments(context.TODO(), WithCollection(taskProto.GetCollectionID()), SegmentFilterFunc(func(info *SegmentInfo) bool { + flushedSegments := t.meta.SelectSegments(context.TODO(), WithCollection(taskProto.GetCollectionID()), SegmentFilterFunc(func(info *SegmentInfo) bool { return (taskProto.GetPartitionID() == common.AllPartitionsID || info.GetPartitionID() == taskProto.GetPartitionID()) && info.GetInsertChannel() == taskProto.GetChannel() && - isFlushState(info.GetState()) && + (info.GetState() == commonpb.SegmentState_Sealed || isFlushState(info.GetState())) && !info.GetIsImporting() && info.GetLevel() != datapb.SegmentLevel_L0 && info.GetStartPosition().GetTimestamp() < taskProto.GetPos().GetTimestamp() })) - sealedSegBinlogs := lo.Map(sealedSegments, func(info *SegmentInfo, _ int) *datapb.CompactionSegmentBinlogs { - return &datapb.CompactionSegmentBinlogs{ + sealedSegBinlogs := []*datapb.CompactionSegmentBinlogs{} + for _, info := range flushedSegments { + // Sealed is unexpected, fail fast + if info.GetState() == commonpb.SegmentState_Sealed { + return nil, nil, fmt.Errorf("L0 compaction selected invalid sealed segment %d", info.GetID()) + } + + sealedSegBinlogs = append(sealedSegBinlogs, &datapb.CompactionSegmentBinlogs{ SegmentID: info.GetID(), Field2StatslogPaths: info.GetStatslogs(), InsertChannel: info.GetInsertChannel(), @@ -296,20 +301,10 @@ func (t *l0CompactionTask) selectSealedSegment() ([]*SegmentInfo, []*datapb.Comp CollectionID: info.GetCollectionID(), PartitionID: info.GetPartitionID(), IsSorted: info.GetIsSorted(), - } - }) - - return sealedSegments, sealedSegBinlogs -} - -func (t *l0CompactionTask) CheckCompactionContainsSegment(segmentID int64) bool { - sealedSegmentIDs, _ := t.selectSealedSegment() - for _, sealedSegment := range sealedSegmentIDs { - if sealedSegment.GetID() == segmentID { - return true - } + }) } - return false + + return flushedSegments, sealedSegBinlogs, nil } func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) { @@ -349,14 +344,18 @@ func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, err segments = append(segments, segInfo) } - sealedSegments, sealedSegBinlogs := t.selectSealedSegment() - if len(sealedSegments) == 0 { + flushedSegments, flushedSegBinlogs, err := t.selectFlushedSegment() + if err != nil { + log.Warn("invalid L0 compaction plan, unable to select flushed segments", zap.Error(err)) + return nil, err + } + if len(flushedSegments) == 0 { // TODO fast finish l0 segment, just drop l0 segment log.Info("l0Compaction available non-L0 Segments is empty ") return nil, errors.Errorf("Selected zero L1/L2 segments for the position=%v", taskProto.GetPos()) } - segments = append(segments, sealedSegments...) + segments = append(segments, flushedSegments...) logIDRange, err := PreAllocateBinlogIDs(t.allocator, segments) if err != nil { return nil, err @@ -365,10 +364,10 @@ func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, err // BeginLogID is deprecated, but still assign it for compatibility. plan.BeginLogID = logIDRange.Begin - plan.SegmentBinlogs = append(plan.SegmentBinlogs, sealedSegBinlogs...) + plan.SegmentBinlogs = append(plan.SegmentBinlogs, flushedSegBinlogs...) log.Info("l0CompactionTask refreshed level zero compaction plan", zap.Any("target position", taskProto.GetPos()), - zap.Any("target segments count", len(sealedSegBinlogs)), + zap.Any("target segments count", len(flushedSegBinlogs)), zap.Any("PreAllocatedLogIDs", logIDRange)) WrapPluginContext(taskProto.GetCollectionID(), taskProto.GetSchema().GetProperties(), plan) diff --git a/internal/datacoord/compaction_task_mix.go b/internal/datacoord/compaction_task_mix.go index d1ec3fb20f..fdb0c3b539 100644 --- a/internal/datacoord/compaction_task_mix.go +++ b/internal/datacoord/compaction_task_mix.go @@ -365,10 +365,6 @@ func (t *mixCompactionTask) SetTask(task *datapb.CompactionTask) { t.taskProto.Store(task) } -func (t *mixCompactionTask) CheckCompactionContainsSegment(segmentID int64) bool { - return false -} - func (t *mixCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) { compactionParams, err := compaction.GenerateJSONParams() if err != nil {