From 03a244844e1692bb4c5508efb4bd2d4f172ecfe5 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Wed, 19 Nov 2025 18:03:07 +0800 Subject: [PATCH] fix: Set task init when worker doesn't have task (#45675) issue: #45674 --------- Signed-off-by: Cai Zhang --- internal/datacoord/compaction_task_clustering.go | 11 ++++------- internal/datacoord/compaction_task_clustering_test.go | 7 ++++--- internal/datacoord/compaction_task_l0.go | 7 ++++--- internal/datacoord/compaction_task_l0_test.go | 10 ++++------ internal/datacoord/compaction_task_mix.go | 8 +++----- 5 files changed, 19 insertions(+), 24 deletions(-) diff --git a/internal/datacoord/compaction_task_clustering.go b/internal/datacoord/compaction_task_clustering.go index 1a5329f661..cd8c29af56 100644 --- a/internal/datacoord/compaction_task_clustering.go +++ b/internal/datacoord/compaction_task_clustering.go @@ -150,13 +150,10 @@ func (t *clusteringCompactionTask) QueryTaskOnWorker(cluster session.Cluster) { PlanID: t.GetTaskProto().GetPlanID(), }) if err != nil || result == nil { - log.Warn("processExecuting clustering compaction", zap.Error(err)) - if errors.Is(err, merr.ErrNodeNotFound) { - log.Warn("GetCompactionPlanResult fail", zap.Error(err)) - err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining)) - if err != nil { - log.Warn("update clustering compaction task meta failed", zap.Error(err)) - } + log.Warn("clusteringCompactionTask failed to get compaction result", zap.Error(err)) + err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)) + if err != nil { + log.Warn("update clustering compaction task meta failed", zap.Error(err)) } return } diff --git a/internal/datacoord/compaction_task_clustering_test.go b/internal/datacoord/compaction_task_clustering_test.go index 01615c0820..990977f8ec 100644 --- a/internal/datacoord/compaction_task_clustering_test.go +++ b/internal/datacoord/compaction_task_clustering_test.go @@ -471,14 +471,15 @@ func (s *ClusteringCompactionTaskSuite) TestQueryTaskOnWorker() { }, }) cluster := session.NewMockCluster(s.T()) - cluster.EXPECT().QueryCompaction(mock.Anything, mock.Anything).Return(nil, nil).Once() - task.QueryTaskOnWorker(cluster) - s.Equal(datapb.CompactionTaskState_executing, task.GetTaskProto().GetState()) cluster.EXPECT().QueryCompaction(mock.Anything, mock.Anything).Return(&datapb.CompactionPlanResult{ State: datapb.CompactionTaskState_executing, }, nil).Once() task.QueryTaskOnWorker(cluster) s.Equal(datapb.CompactionTaskState_executing, task.GetTaskProto().GetState()) + + cluster.EXPECT().QueryCompaction(mock.Anything, mock.Anything).Return(nil, nil).Once() + task.QueryTaskOnWorker(cluster) + s.Equal(datapb.CompactionTaskState_pipelining, task.GetTaskProto().GetState()) }) s.Run("QueryTaskOnWorker, scalar clustering key, compaction result ready", func() { diff --git a/internal/datacoord/compaction_task_l0.go b/internal/datacoord/compaction_task_l0.go index e6c5e7b2c1..c06673baf7 100644 --- a/internal/datacoord/compaction_task_l0.go +++ b/internal/datacoord/compaction_task_l0.go @@ -125,10 +125,11 @@ func (t *l0CompactionTask) QueryTaskOnWorker(cluster session.Cluster) { PlanID: t.GetTaskProto().GetPlanID(), }) if err != nil || result == nil { - if errors.Is(err, merr.ErrNodeNotFound) { - t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)) - } log.Warn("l0CompactionTask failed to get compaction result", zap.Error(err)) + err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)) + if err != nil { + log.Warn("update l0 compaction task meta failed", zap.Error(err)) + } return } switch result.GetState() { diff --git a/internal/datacoord/compaction_task_l0_test.go b/internal/datacoord/compaction_task_l0_test.go index 774064e99d..e2c08feeae 100644 --- a/internal/datacoord/compaction_task_l0_test.go +++ b/internal/datacoord/compaction_task_l0_test.go @@ -312,12 +312,10 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() { s.Require().True(t.GetTaskProto().GetNodeID() > 0) cluster := session.NewMockCluster(s.T()) - cluster.EXPECT().QueryCompaction(t.GetTaskProto().NodeID, mock.Anything).Return(nil, errors.New("mock error")).Times(12) - for i := 0; i < 12; i++ { - t.QueryTaskOnWorker(cluster) - s.Equal(datapb.CompactionTaskState_executing, t.GetTaskProto().GetState()) - s.EqualValues(100, t.GetTaskProto().GetNodeID()) - } + cluster.EXPECT().QueryCompaction(t.GetTaskProto().NodeID, mock.Anything).Return(nil, errors.New("mock error")) + t.QueryTaskOnWorker(cluster) + s.Equal(datapb.CompactionTaskState_pipelining, t.GetTaskProto().GetState()) + s.EqualValues(-1, t.GetTaskProto().GetNodeID()) }) s.Run("test executing with result executing", func() { diff --git a/internal/datacoord/compaction_task_mix.go b/internal/datacoord/compaction_task_mix.go index fdb0c3b539..329b6f0d89 100644 --- a/internal/datacoord/compaction_task_mix.go +++ b/internal/datacoord/compaction_task_mix.go @@ -126,12 +126,10 @@ func (t *mixCompactionTask) QueryTaskOnWorker(cluster session.Cluster) { PlanID: t.GetTaskProto().GetPlanID(), }) if err != nil || result == nil { - if errors.Is(err, merr.ErrNodeNotFound) { - if err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)); err != nil { - log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err)) - } - } log.Warn("mixCompactionTask failed to get compaction result", zap.Error(err)) + if err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)); err != nil { + log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err)) + } return } switch result.GetState() {