From 0066c016b633c97d34c243c77dd90cad35cddbe9 Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Tue, 19 Mar 2024 10:13:14 +0800 Subject: [PATCH] enhance: Skip submit empty l0 tasks in DC (#31280) Signed-off-by: yangxuan --- internal/datacoord/compaction.go | 3 ++ internal/datacoord/compaction_test.go | 43 +++++++++++++++++++++++++++ 2 files changed, 46 insertions(+) diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index 10e14ab342..558671e3e2 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -332,6 +332,9 @@ func (c *compactionPlanHandler) RefreshPlan(task *compactionTask) error { info.GetLevel() != datapb.SegmentLevel_L0 && info.GetDmlPosition().GetTimestamp() < task.triggerInfo.pos.GetTimestamp() }) + if len(sealedSegments) == 0 { + return errors.Errorf("Selected zero L1/L2 segments for the position=%v", task.triggerInfo.pos) + } sealedSegBinlogs := lo.Map(sealedSegments, func(info *SegmentInfo, _ int) *datapb.CompactionSegmentBinlogs { return &datapb.CompactionSegmentBinlogs{ diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index 1d57f2e1e7..c6fefaffee 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -298,6 +298,49 @@ func (s *CompactionPlanHandlerSuite) TestRefreshL0Plan() { s.Error(err) s.ErrorIs(err, merr.ErrSegmentNotFound) }) + + s.Run("select zero segments", func() { + s.SetupTest() + s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).RunAndReturn(func(segID int64) *SegmentInfo { + return &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ + ID: segID, + Level: datapb.SegmentLevel_L0, + InsertChannel: channel, + State: commonpb.SegmentState_Flushed, + Deltalogs: deltalogs, + }} + }).Times(2) + s.mockMeta.EXPECT().SelectSegments(mock.Anything).Return(nil).Once() + + // 2 l0 segments + plan := &datapb.CompactionPlan{ + PlanID: 1, + SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ + { + SegmentID: 100, + Level: datapb.SegmentLevel_L0, + InsertChannel: channel, + }, + { + SegmentID: 101, + Level: datapb.SegmentLevel_L0, + InsertChannel: channel, + }, + }, + Type: datapb.CompactionType_Level0DeleteCompaction, + } + + task := &compactionTask{ + triggerInfo: &compactionSignal{id: 19530, collectionID: 1, partitionID: 10}, + state: executing, + plan: plan, + dataNodeID: 1, + } + + handler := newCompactionPlanHandler(nil, nil, s.mockMeta, s.mockAlloc) + err := handler.RefreshPlan(task) + s.Error(err) + }) } func (s *CompactionPlanHandlerSuite) TestRefreshPlanMixCompaction() {