diff --git a/internal/datacoord/compaction_task_l0.go b/internal/datacoord/compaction_task_l0.go index cf531eeb87..f29f057b10 100644 --- a/internal/datacoord/compaction_task_l0.go +++ b/internal/datacoord/compaction_task_l0.go @@ -139,26 +139,117 @@ func (t *l0CompactionTask) processExecuting() bool { return false } -func (t *l0CompactionTask) GetSpan() trace.Span { - return t.span +func (t *l0CompactionTask) processMetaSaved() bool { + err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed)) + if err != nil { + log.Warn("l0CompactionTask unable to processMetaSaved", zap.Int64("planID", t.GetPlanID()), zap.Error(err)) + return false + } + return t.processCompleted() +} + +func (t *l0CompactionTask) processCompleted() bool { + if t.hasAssignedWorker() { + err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{ + PlanID: t.GetPlanID(), + }) + if err != nil { + log.Warn("l0CompactionTask unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()), zap.Error(err)) + } + } + + t.resetSegmentCompacting() + UpdateCompactionSegmentSizeMetrics(t.result.GetSegments()) + log.Info("l0CompactionTask processCompleted done", zap.Int64("planID", t.GetPlanID())) + return true +} + +func (t *l0CompactionTask) processTimeout() bool { + t.resetSegmentCompacting() + return true +} + +func (t *l0CompactionTask) processFailed() bool { + if t.hasAssignedWorker() { + err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{ + PlanID: t.GetPlanID(), + }) + if err != nil { + log.Warn("l0CompactionTask processFailed unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()), zap.Error(err)) + } + } + + t.resetSegmentCompacting() + log.Info("l0CompactionTask processFailed done", zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", t.GetPlanID())) + return true } func (t *l0CompactionTask) GetResult() *datapb.CompactionPlanResult { return t.result } +func (t *l0CompactionTask) SetResult(result *datapb.CompactionPlanResult) { + t.result = result +} + func (t *l0CompactionTask) SetTask(task *datapb.CompactionTask) { t.CompactionTask = task } +func (t *l0CompactionTask) GetSpan() trace.Span { + return t.span +} + func (t *l0CompactionTask) SetSpan(span trace.Span) { t.span = span } +func (t *l0CompactionTask) EndSpan() { + if t.span != nil { + t.span.End() + } +} + func (t *l0CompactionTask) SetPlan(plan *datapb.CompactionPlan) { t.plan = plan } +func (t *l0CompactionTask) GetPlan() *datapb.CompactionPlan { + return t.plan +} + +func (t *l0CompactionTask) SetStartTime(startTime int64) { + t.StartTime = startTime +} + +func (t *l0CompactionTask) GetLabel() string { + return fmt.Sprintf("%d-%s", t.PartitionID, t.GetChannel()) +} + +func (t *l0CompactionTask) NeedReAssignNodeID() bool { + return t.GetState() == datapb.CompactionTaskState_pipelining && (!t.hasAssignedWorker()) +} + +func (t *l0CompactionTask) CleanLogPath() { + if t.plan == nil { + return + } + if t.plan.GetSegmentBinlogs() != nil { + for _, binlogs := range t.plan.GetSegmentBinlogs() { + binlogs.FieldBinlogs = nil + binlogs.Field2StatslogPaths = nil + binlogs.Deltalogs = nil + } + } + if t.result.GetSegments() != nil { + for _, segment := range t.result.GetSegments() { + segment.InsertLogs = nil + segment.Deltalogs = nil + segment.Field2StatslogPaths = nil + } + } +} + func (t *l0CompactionTask) ShadowClone(opts ...compactionTaskOpt) *datapb.CompactionTask { taskClone := &datapb.CompactionTask{ PlanID: t.GetPlanID(), @@ -187,48 +278,6 @@ func (t *l0CompactionTask) ShadowClone(opts ...compactionTaskOpt) *datapb.Compac return taskClone } -func (t *l0CompactionTask) EndSpan() { - if t.span != nil { - t.span.End() - } -} - -func (t *l0CompactionTask) GetLabel() string { - return fmt.Sprintf("%d-%s", t.PartitionID, t.GetChannel()) -} - -func (t *l0CompactionTask) GetPlan() *datapb.CompactionPlan { - return t.plan -} - -func (t *l0CompactionTask) NeedReAssignNodeID() bool { - return t.GetState() == datapb.CompactionTaskState_pipelining && (t.GetNodeID() == 0 || t.GetNodeID() == NullNodeID) -} - -func (t *l0CompactionTask) SetResult(result *datapb.CompactionPlanResult) { - t.result = result -} - -func (t *l0CompactionTask) CleanLogPath() { - if t.plan == nil { - return - } - if t.plan.GetSegmentBinlogs() != nil { - for _, binlogs := range t.plan.GetSegmentBinlogs() { - binlogs.FieldBinlogs = nil - binlogs.Field2StatslogPaths = nil - binlogs.Deltalogs = nil - } - } - if t.result.GetSegments() != nil { - for _, segment := range t.result.GetSegments() { - segment.InsertLogs = nil - segment.Deltalogs = nil - segment.Field2StatslogPaths = nil - } - } -} - func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) { beginLogID, _, err := t.allocator.allocN(1) if err != nil { @@ -306,53 +355,12 @@ func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, err return plan, nil } -func (t *l0CompactionTask) processMetaSaved() bool { - err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed)) - if err != nil { - log.Warn("l0CompactionTask unable to processMetaSaved", zap.Int64("planID", t.GetPlanID()), zap.Error(err)) - return false - } - return t.processCompleted() -} - -func (t *l0CompactionTask) processCompleted() bool { - if t.GetNodeID() != 0 && t.GetNodeID() != NullNodeID { - err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{ - PlanID: t.GetPlanID(), - }) - if err != nil { - log.Warn("l0CompactionTask unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()), zap.Error(err)) - } - } - - t.resetSegmentCompacting() - UpdateCompactionSegmentSizeMetrics(t.result.GetSegments()) - log.Info("l0CompactionTask processCompleted done", zap.Int64("planID", t.GetPlanID())) - return true -} - func (t *l0CompactionTask) resetSegmentCompacting() { t.meta.SetSegmentsCompacting(t.GetInputSegments(), false) } -func (t *l0CompactionTask) processTimeout() bool { - t.resetSegmentCompacting() - return true -} - -func (t *l0CompactionTask) processFailed() bool { - if t.GetNodeID() != 0 && t.GetNodeID() != NullNodeID { - err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{ - PlanID: t.GetPlanID(), - }) - if err != nil { - log.Warn("l0CompactionTask processFailed unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()), zap.Error(err)) - } - } - - t.resetSegmentCompacting() - log.Info("l0CompactionTask processFailed done", zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", t.GetPlanID())) - return true +func (t *l0CompactionTask) hasAssignedWorker() bool { + return t.GetNodeID() != 0 && t.GetNodeID() != NullNodeID } func (t *l0CompactionTask) checkTimeout() bool { @@ -373,6 +381,14 @@ func (t *l0CompactionTask) checkTimeout() bool { return false } +func (t *l0CompactionTask) SetNodeID(id UniqueID) error { + return t.updateAndSaveTaskMeta(setNodeID(id)) +} + +func (t *l0CompactionTask) SaveTaskMeta() error { + return t.saveTaskMeta(t.CompactionTask) +} + func (t *l0CompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskOpt) error { task := t.ShadowClone(opts...) err := t.saveTaskMeta(task) @@ -383,18 +399,10 @@ func (t *l0CompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskOpt) erro return nil } -func (t *l0CompactionTask) SetNodeID(id UniqueID) error { - return t.updateAndSaveTaskMeta(setNodeID(id)) -} - func (t *l0CompactionTask) saveTaskMeta(task *datapb.CompactionTask) error { return t.meta.SaveCompactionTask(task) } -func (t *l0CompactionTask) SaveTaskMeta() error { - return t.saveTaskMeta(t.CompactionTask) -} - func (t *l0CompactionTask) saveSegmentMeta() error { result := t.result var operators []UpdateOperator diff --git a/internal/datacoord/compaction_task_l0_test.go b/internal/datacoord/compaction_task_l0_test.go index 3730c8e43c..3a994b6136 100644 --- a/internal/datacoord/compaction_task_l0_test.go +++ b/internal/datacoord/compaction_task_l0_test.go @@ -25,6 +25,7 @@ import ( "github.com/samber/lo" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" + "go.opentelemetry.io/otel/trace" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -212,6 +213,7 @@ func (s *L0CompactionTaskSuite) generateTestL0Task(state datapb.CompactionTaskSt Type: datapb.CompactionType_Level0DeleteCompaction, NodeID: NullNodeID, State: state, + Channel: "ch-1", InputSegments: []int64{100, 101}, }, meta: s.mockMeta, @@ -265,6 +267,37 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() { s.True(got) s.Equal(datapb.CompactionTaskState_failed, t.State) }) + s.Run("test pipelining saveTaskMeta failed", func() { + t := s.generateTestL0Task(datapb.CompactionTaskState_pipelining) + s.mockAlloc.EXPECT().allocN(mock.Anything).Return(100, 200, nil) + t.NodeID = 100 + channel := "ch-1" + deltaLogs := []*datapb.FieldBinlog{getFieldBinlogIDs(101, 3)} + + s.mockMeta.EXPECT().SelectSegments(mock.Anything, mock.Anything).Return( + []*SegmentInfo{ + {SegmentInfo: &datapb.SegmentInfo{ + ID: 200, + Level: datapb.SegmentLevel_L1, + InsertChannel: channel, + }, isCompacting: true}, + }, + ) + + s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).RunAndReturn(func(segID int64) *SegmentInfo { + return &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ + ID: segID, + Level: datapb.SegmentLevel_L0, + InsertChannel: channel, + State: commonpb.SegmentState_Flushed, + Deltalogs: deltaLogs, + }} + }).Twice() + s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(errors.New("mock error")).Once() + got := t.Process() + s.False(got) + s.Equal(datapb.CompactionTaskState_pipelining, t.State) + }) s.Run("test pipelining Compaction failed", func() { s.mockAlloc.EXPECT().allocN(mock.Anything).Return(100, 200, nil) @@ -515,4 +548,192 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() { s.False(got) s.Equal(datapb.CompactionTaskState_executing, t.GetState()) }) + + s.Run("test timeout", func() { + t := s.generateTestL0Task(datapb.CompactionTaskState_timeout) + t.NodeID = 100 + s.Require().True(t.GetNodeID() > 0) + + s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).RunAndReturn(func(segIDs []int64, isCompacting bool) { + s.Require().False(isCompacting) + s.ElementsMatch(segIDs, t.GetInputSegments()) + }).Once() + + got := t.Process() + s.True(got) + }) + + s.Run("test metaSaved success", func() { + t := s.generateTestL0Task(datapb.CompactionTaskState_meta_saved) + t.NodeID = 100 + s.Require().True(t.GetNodeID() > 0) + t.result = &datapb.CompactionPlanResult{} + + s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Once() + s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).RunAndReturn(func(segIDs []int64, isCompacting bool) { + s.ElementsMatch(segIDs, t.GetInputSegments()) + }).Once() + s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetNodeID(), mock.Anything).Return(nil).Once() + + got := t.Process() + s.True(got) + s.Equal(datapb.CompactionTaskState_completed, t.GetState()) + }) + + s.Run("test metaSaved failed", func() { + t := s.generateTestL0Task(datapb.CompactionTaskState_meta_saved) + t.NodeID = 100 + s.Require().True(t.GetNodeID() > 0) + t.result = &datapb.CompactionPlanResult{} + + s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(errors.New("mock error")).Once() + + got := t.Process() + s.False(got) + s.Equal(datapb.CompactionTaskState_meta_saved, t.GetState()) + }) + + s.Run("test complete drop failed", func() { + t := s.generateTestL0Task(datapb.CompactionTaskState_completed) + t.NodeID = 100 + s.Require().True(t.GetNodeID() > 0) + t.result = &datapb.CompactionPlanResult{} + s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetNodeID(), mock.Anything).Return(errors.New("mock error")).Once() + s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).RunAndReturn(func(segIDs []int64, isCompacting bool) { + s.ElementsMatch(segIDs, t.GetInputSegments()) + }).Once() + + got := t.Process() + s.True(got) + s.Equal(datapb.CompactionTaskState_completed, t.GetState()) + }) + + s.Run("test complete success", func() { + t := s.generateTestL0Task(datapb.CompactionTaskState_completed) + t.NodeID = 100 + s.Require().True(t.GetNodeID() > 0) + t.result = &datapb.CompactionPlanResult{} + s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetNodeID(), mock.Anything).Return(nil).Once() + s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).RunAndReturn(func(segIDs []int64, isCompacting bool) { + s.ElementsMatch(segIDs, t.GetInputSegments()) + }).Once() + + got := t.Process() + s.True(got) + s.Equal(datapb.CompactionTaskState_completed, t.GetState()) + }) + + s.Run("test process failed success", func() { + t := s.generateTestL0Task(datapb.CompactionTaskState_failed) + t.NodeID = 100 + s.Require().True(t.GetNodeID() > 0) + s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetNodeID(), mock.Anything).Return(nil).Once() + s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).RunAndReturn(func(segIDs []int64, isCompacting bool) { + s.ElementsMatch(segIDs, t.GetInputSegments()) + }).Once() + + got := t.Process() + s.True(got) + s.Equal(datapb.CompactionTaskState_failed, t.GetState()) + }) + s.Run("test process failed failed", func() { + t := s.generateTestL0Task(datapb.CompactionTaskState_failed) + t.NodeID = 100 + s.Require().True(t.GetNodeID() > 0) + s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetNodeID(), mock.Anything).Return(errors.New("mock error")).Once() + s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).RunAndReturn(func(segIDs []int64, isCompacting bool) { + s.ElementsMatch(segIDs, t.GetInputSegments()) + }).Once() + + got := t.Process() + s.True(got) + s.Equal(datapb.CompactionTaskState_failed, t.GetState()) + }) + + s.Run("test unkonwn task", func() { + t := s.generateTestL0Task(datapb.CompactionTaskState_unknown) + + got := t.Process() + s.True(got) + }) +} + +func (s *L0CompactionTaskSuite) TestSetterGetter() { + t := s.generateTestL0Task(datapb.CompactionTaskState_pipelining) + + span := t.GetSpan() + s.Nil(span) + s.NotPanics(t.EndSpan) + + t.SetSpan(trace.SpanFromContext(context.TODO())) + s.NotPanics(t.EndSpan) + + rst := t.GetResult() + s.Nil(rst) + t.SetResult(&datapb.CompactionPlanResult{PlanID: 19530}) + s.NotNil(t.GetResult()) + + label := t.GetLabel() + s.Equal("10-ch-1", label) + + t.SetStartTime(100) + s.EqualValues(100, t.GetStartTime()) + + t.SetTask(nil) + t.SetPlan(&datapb.CompactionPlan{PlanID: 19530}) + s.NotNil(t.GetPlan()) + + s.Run("set NodeID", func() { + t := s.generateTestL0Task(datapb.CompactionTaskState_pipelining) + + s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil) + t.SetNodeID(1000) + s.EqualValues(1000, t.GetNodeID()) + }) +} + +func (s *L0CompactionTaskSuite) TestCleanLogPath() { + s.Run("plan nil", func() { + t := s.generateTestL0Task(datapb.CompactionTaskState_pipelining) + t.CleanLogPath() + }) + + s.Run("clear path", func() { + t := s.generateTestL0Task(datapb.CompactionTaskState_pipelining) + t.SetPlan(&datapb.CompactionPlan{ + Channel: "ch-1", + Type: datapb.CompactionType_MixCompaction, + SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ + { + SegmentID: 100, + FieldBinlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(101, 4)}, + Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogIDs(101, 5)}, + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(101, 6)}, + }, + }, + PlanID: 19530, + }) + + t.SetResult(&datapb.CompactionPlanResult{ + Segments: []*datapb.CompactionSegment{ + { + SegmentID: 100, + InsertLogs: []*datapb.FieldBinlog{getFieldBinlogIDs(101, 4)}, + Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogIDs(101, 5)}, + Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(101, 6)}, + }, + }, + PlanID: 19530, + }) + + t.CleanLogPath() + + s.Empty(t.GetPlan().GetSegmentBinlogs()[0].GetFieldBinlogs()) + s.Empty(t.GetPlan().GetSegmentBinlogs()[0].GetField2StatslogPaths()) + s.Empty(t.GetPlan().GetSegmentBinlogs()[0].GetDeltalogs()) + + s.Empty(t.GetResult().GetSegments()[0].GetInsertLogs()) + s.Empty(t.GetResult().GetSegments()[0].GetField2StatslogPaths()) + s.Empty(t.GetResult().GetSegments()[0].GetDeltalogs()) + }) }