From 922f54967d096453137e1422e2dfffc6bb4bbf75 Mon Sep 17 00:00:00 2001 From: wayblink Date: Fri, 6 Sep 2024 23:27:05 +0800 Subject: [PATCH] fix: [cherry-pick] add log in mixCompactionTask and set fail/timeout task to clean (#35967) #35966 master pr :#35970 Signed-off-by: wayblink --- internal/datacoord/compaction_task_l0.go | 11 +++ internal/datacoord/compaction_task_l0_test.go | 18 ++--- internal/datacoord/compaction_task_mix.go | 70 +++++++++++++------ .../datacoord/compaction_task_mix_test.go | 44 ++++++++++++ 4 files changed, 113 insertions(+), 30 deletions(-) diff --git a/internal/datacoord/compaction_task_l0.go b/internal/datacoord/compaction_task_l0.go index 09b56be9bb..3729ab77d0 100644 --- a/internal/datacoord/compaction_task_l0.go +++ b/internal/datacoord/compaction_task_l0.go @@ -164,6 +164,11 @@ func (t *l0CompactionTask) processCompleted() bool { func (t *l0CompactionTask) processTimeout() bool { t.resetSegmentCompacting() + err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned)) + if err != nil { + log.Warn("l0CompactionTask failed to updateAndSaveTaskMeta", zap.Error(err)) + return false + } return true } @@ -178,6 +183,12 @@ func (t *l0CompactionTask) processFailed() bool { } t.resetSegmentCompacting() + err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned)) + if err != nil { + log.Warn("l0CompactionTask failed to updateAndSaveTaskMeta", zap.Error(err)) + return false + } + log.Info("l0CompactionTask processFailed done", zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", t.GetPlanID())) return true } diff --git a/internal/datacoord/compaction_task_l0_test.go b/internal/datacoord/compaction_task_l0_test.go index 3ea25ada39..653a7e33fb 100644 --- a/internal/datacoord/compaction_task_l0_test.go +++ b/internal/datacoord/compaction_task_l0_test.go @@ -202,14 +202,14 @@ func (s *L0CompactionTaskSuite) TestStateTrans() { Deltalogs: deltaLogs, }} }).Twice() - s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Once() + s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Times(2) s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).Return() s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil).Once() got := t.Process() s.True(got) - s.Equal(datapb.CompactionTaskState_failed, t.State) + s.Equal(datapb.CompactionTaskState_cleaned, t.State) }) s.Run("test pipelining saveTaskMeta failed", func() { t := s.generateTestL0Task(datapb.CompactionTaskState_pipelining) @@ -374,7 +374,7 @@ func (s *L0CompactionTaskSuite) TestStateTrans() { got = t.Process() s.True(got) - s.Equal(datapb.CompactionTaskState_timeout, t.GetState()) + s.Equal(datapb.CompactionTaskState_cleaned, t.GetState()) }) s.Run("test executing with result executing timeout and updataAndSaveTaskMeta failed", func() { @@ -466,12 +466,12 @@ func (s *L0CompactionTaskSuite) TestStateTrans() { }, nil).Once() s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetNodeID(), mock.Anything).Return(nil) - s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Times(1) + s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Times(2) s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).Return().Once() got := t.Process() s.True(got) - s.Equal(datapb.CompactionTaskState_failed, t.GetState()) + s.Equal(datapb.CompactionTaskState_cleaned, t.GetState()) }) s.Run("test executing with result failed save compaction meta failed", func() { t := s.generateTestL0Task(datapb.CompactionTaskState_executing) @@ -494,7 +494,7 @@ func (s *L0CompactionTaskSuite) TestStateTrans() { t := s.generateTestL0Task(datapb.CompactionTaskState_timeout) t.NodeID = 100 s.Require().True(t.GetNodeID() > 0) - + s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Times(1) s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).RunAndReturn(func(segIDs []int64, isCompacting bool) { s.Require().False(isCompacting) s.ElementsMatch(segIDs, t.GetInputSegments()) @@ -568,6 +568,7 @@ func (s *L0CompactionTaskSuite) TestStateTrans() { t := s.generateTestL0Task(datapb.CompactionTaskState_failed) t.NodeID = 100 s.Require().True(t.GetNodeID() > 0) + s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Times(1) s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetNodeID(), mock.Anything).Return(nil).Once() s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).RunAndReturn(func(segIDs []int64, isCompacting bool) { s.ElementsMatch(segIDs, t.GetInputSegments()) @@ -575,12 +576,13 @@ func (s *L0CompactionTaskSuite) TestStateTrans() { got := t.Process() s.True(got) - s.Equal(datapb.CompactionTaskState_failed, t.GetState()) + s.Equal(datapb.CompactionTaskState_cleaned, t.GetState()) }) s.Run("test process failed failed", func() { t := s.generateTestL0Task(datapb.CompactionTaskState_failed) t.NodeID = 100 s.Require().True(t.GetNodeID() > 0) + s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Once() s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetNodeID(), mock.Anything).Return(errors.New("mock error")).Once() s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).RunAndReturn(func(segIDs []int64, isCompacting bool) { s.ElementsMatch(segIDs, t.GetInputSegments()) @@ -588,7 +590,7 @@ func (s *L0CompactionTaskSuite) TestStateTrans() { got := t.Process() s.True(got) - s.Equal(datapb.CompactionTaskState_failed, t.GetState()) + s.Equal(datapb.CompactionTaskState_cleaned, t.GetState()) }) s.Run("test unkonwn task", func() { diff --git a/internal/datacoord/compaction_task_mix.go b/internal/datacoord/compaction_task_mix.go index b1905c402c..82c44daa72 100644 --- a/internal/datacoord/compaction_task_mix.go +++ b/internal/datacoord/compaction_task_mix.go @@ -28,11 +28,12 @@ type mixCompactionTask struct { } func (t *mixCompactionTask) processPipelining() bool { + log := log.With(zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("PlanID", t.GetPlanID()), zap.Int64("collectionID", t.GetCollectionID()), zap.Int64("nodeID", t.GetNodeID())) if t.NeedReAssignNodeID() { + log.Info("mixCompactionTask need assign nodeID") return false } - log := log.With(zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("nodeID", t.GetNodeID())) var err error t.plan, err = t.BuildCompactionRequest() if err != nil { @@ -47,16 +48,22 @@ func (t *mixCompactionTask) processPipelining() bool { err = t.sessions.Compaction(context.TODO(), t.GetNodeID(), t.GetPlan()) if err != nil { - log.Warn("mixCompactionTask failed to notify compaction tasks to DataNode", zap.Int64("planID", t.GetPlanID()), zap.Error(err)) + log.Warn("mixCompactionTask failed to notify compaction tasks to DataNode", zap.Error(err)) t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)) return false } + log.Warn("mixCompactionTask notify compaction tasks to DataNode") - t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing)) + err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing)) + if err != nil { + log.Warn("mixCompactionTask update task state failed", zap.Error(err)) + return false + } return false } func (t *mixCompactionTask) processMetaSaved() bool { + log := log.With(zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("PlanID", t.GetPlanID()), zap.Int64("collectionID", t.GetCollectionID())) if err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed)); err != nil { log.Warn("mixCompactionTask failed to proccessMetaSaved", zap.Error(err)) return false @@ -66,11 +73,13 @@ func (t *mixCompactionTask) processMetaSaved() bool { } func (t *mixCompactionTask) processExecuting() bool { - log := log.With(zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String())) + log := log.With(zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("PlanID", t.GetPlanID()), zap.Int64("collectionID", t.GetCollectionID())) result, err := t.sessions.GetCompactionPlanResult(t.GetNodeID(), t.GetPlanID()) if err != nil || result == nil { if errors.Is(err, merr.ErrNodeNotFound) { - t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)) + if err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)); err != nil { + log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err)) + } } log.Warn("mixCompactionTask failed to get compaction result", zap.Error(err)) return false @@ -78,6 +87,7 @@ func (t *mixCompactionTask) processExecuting() bool { switch result.GetState() { case datapb.CompactionTaskState_executing: if t.checkTimeout() { + log.Info("mixCompactionTask timeout", zap.Int32("timeout in seconds", t.GetTimeoutInSeconds()), zap.Int64("startTime", t.GetStartTime())) err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_timeout)) if err != nil { log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err)) @@ -115,6 +125,7 @@ func (t *mixCompactionTask) processExecuting() bool { } return t.processMetaSaved() case datapb.CompactionTaskState_failed: + log.Info("mixCompactionTask fail in datanode") err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed)) if err != nil { log.Warn("fail to updateAndSaveTaskMeta") @@ -133,7 +144,7 @@ func (t *mixCompactionTask) SaveTaskMeta() error { } func (t *mixCompactionTask) saveSegmentMeta() error { - log := log.With(zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String())) + log := log.With(zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("PlanID", t.GetPlanID()), zap.Int64("collectionID", t.GetCollectionID())) // Also prepare metric updates. newSegments, metricMutation, err := t.meta.CompleteCompactionMutation(t.CompactionTask, t.result) if err != nil { @@ -149,21 +160,28 @@ func (t *mixCompactionTask) saveSegmentMeta() error { // Note: return True means exit this state machine. // ONLY return True for processCompleted or processFailed func (t *mixCompactionTask) Process() bool { + log := log.With(zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("PlanID", t.GetPlanID()), zap.Int64("collectionID", t.GetCollectionID())) + lastState := t.GetState().String() + processResult := true switch t.GetState() { case datapb.CompactionTaskState_pipelining: - return t.processPipelining() + processResult = t.processPipelining() case datapb.CompactionTaskState_executing: - return t.processExecuting() + processResult = t.processExecuting() case datapb.CompactionTaskState_timeout: - return t.processTimeout() + processResult = t.processTimeout() case datapb.CompactionTaskState_meta_saved: - return t.processMetaSaved() + processResult = t.processMetaSaved() case datapb.CompactionTaskState_completed: - return t.processCompleted() + processResult = t.processCompleted() case datapb.CompactionTaskState_failed: - return t.processFailed() + processResult = t.processFailed() } - return true + currentState := t.GetState().String() + if currentState != lastState { + log.Info("mix compaction task state changed", zap.String("lastState", lastState), zap.String("currentState", currentState)) + } + return processResult } func (t *mixCompactionTask) GetResult() *datapb.CompactionPlanResult { @@ -189,15 +207,16 @@ func (t *mixCompactionTask) NeedReAssignNodeID() bool { } func (t *mixCompactionTask) processCompleted() bool { + log := log.With(zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("PlanID", t.GetPlanID()), zap.Int64("collectionID", t.GetCollectionID())) if err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{ PlanID: t.GetPlanID(), }); err != nil { - log.Warn("mixCompactionTask processCompleted unable to drop compaction plan", zap.Int64("planID", t.GetPlanID())) + log.Warn("mixCompactionTask processCompleted unable to drop compaction plan") } t.resetSegmentCompacting() UpdateCompactionSegmentSizeMetrics(t.result.GetSegments()) - log.Info("mixCompactionTask processCompleted done", zap.Int64("planID", t.GetPlanID())) + log.Info("mixCompactionTask processCompleted done") return true } @@ -208,6 +227,11 @@ func (t *mixCompactionTask) resetSegmentCompacting() { func (t *mixCompactionTask) processTimeout() bool { t.resetSegmentCompacting() + err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned)) + if err != nil { + log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err)) + return false + } return true } @@ -241,14 +265,20 @@ func (t *mixCompactionTask) ShadowClone(opts ...compactionTaskOpt) *datapb.Compa } func (t *mixCompactionTask) processFailed() bool { + log := log.With(zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("PlanID", t.GetPlanID()), zap.Int64("collectionID", t.GetCollectionID())) if err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{ PlanID: t.GetPlanID(), }); err != nil { - log.Warn("mixCompactionTask processFailed unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()), zap.Error(err)) + log.Warn("mixCompactionTask processFailed unable to drop compaction plan", zap.Error(err)) } - log.Info("mixCompactionTask processFailed done", zap.Int64("planID", t.GetPlanID())) + log.Info("mixCompactionTask processFailed done") t.resetSegmentCompacting() + err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned)) + if err != nil { + log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err)) + return false + } return true } @@ -256,10 +286,6 @@ func (t *mixCompactionTask) checkTimeout() bool { if t.GetTimeoutInSeconds() > 0 { diff := time.Since(time.Unix(t.GetStartTime(), 0)).Seconds() if diff > float64(t.GetTimeoutInSeconds()) { - log.Warn("compaction timeout", - zap.Int32("timeout in seconds", t.GetTimeoutInSeconds()), - zap.Int64("startTime", t.GetStartTime()), - ) return true } } @@ -329,6 +355,7 @@ func (t *mixCompactionTask) CleanLogPath() { } func (t *mixCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) { + log := log.With(zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("PlanID", t.GetPlanID()), zap.Int64("collectionID", t.GetCollectionID())) plan := &datapb.CompactionPlan{ PlanID: t.GetPlanID(), StartTime: t.GetStartTime(), @@ -341,7 +368,6 @@ func (t *mixCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, er SlotUsage: Params.DataCoordCfg.MixCompactionSlotUsage.GetAsInt64(), MaxSize: t.GetMaxSize(), } - log := log.With(zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", plan.GetPlanID())) segIDMap := make(map[int64][]*datapb.FieldBinlog, len(plan.SegmentBinlogs)) for _, segID := range t.GetInputSegments() { diff --git a/internal/datacoord/compaction_task_mix_test.go b/internal/datacoord/compaction_task_mix_test.go index 2d9c4e146a..dd655639e1 100644 --- a/internal/datacoord/compaction_task_mix_test.go +++ b/internal/datacoord/compaction_task_mix_test.go @@ -1,6 +1,8 @@ package datacoord import ( + "time" + "github.com/samber/lo" "github.com/stretchr/testify/mock" @@ -70,3 +72,45 @@ func (s *CompactionTaskSuite) TestProcessRefreshPlan_MixSegmentNotFound() { s.ErrorIs(err, merr.ErrSegmentNotFound) }) } + +func (s *CompactionTaskSuite) TestCompactionTimeout() { + channel := "Ch-1" + binLogs := []*datapb.FieldBinlog{getFieldBinlogIDs(101, 3)} + s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).RunAndReturn(func(segID int64) *SegmentInfo { + return &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ + ID: segID, + Level: datapb.SegmentLevel_L1, + InsertChannel: channel, + State: commonpb.SegmentState_Flushed, + Binlogs: binLogs, + }} + }).Times(2) + s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil) + s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything) + task := &mixCompactionTask{ + CompactionTask: &datapb.CompactionTask{ + PlanID: 1, + TriggerID: 19530, + CollectionID: 1, + PartitionID: 10, + Type: datapb.CompactionType_MixCompaction, + NodeID: 1, + State: datapb.CompactionTaskState_executing, + InputSegments: []int64{200, 201}, + TimeoutInSeconds: 1, + }, + meta: s.mockMeta, + sessions: s.mockSessMgr, + } + plan, err := task.BuildCompactionRequest() + task.plan = plan + s.Require().NoError(err) + time.Sleep(time.Second * 2) + + s.mockSessMgr.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(&datapb.CompactionPlanResult{ + State: datapb.CompactionTaskState_executing, + }, nil) + end := task.processExecuting() + s.Equal(true, end) + s.Equal(datapb.CompactionTaskState_cleaned, task.State) +}