mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
fix: Set task init when worker doesn't have task (#45675)
issue: #45674 --------- Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
parent
40fdf1e828
commit
03a244844e
@ -150,13 +150,10 @@ func (t *clusteringCompactionTask) QueryTaskOnWorker(cluster session.Cluster) {
|
||||
PlanID: t.GetTaskProto().GetPlanID(),
|
||||
})
|
||||
if err != nil || result == nil {
|
||||
log.Warn("processExecuting clustering compaction", zap.Error(err))
|
||||
if errors.Is(err, merr.ErrNodeNotFound) {
|
||||
log.Warn("GetCompactionPlanResult fail", zap.Error(err))
|
||||
err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining))
|
||||
if err != nil {
|
||||
log.Warn("update clustering compaction task meta failed", zap.Error(err))
|
||||
}
|
||||
log.Warn("clusteringCompactionTask failed to get compaction result", zap.Error(err))
|
||||
err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
|
||||
if err != nil {
|
||||
log.Warn("update clustering compaction task meta failed", zap.Error(err))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@ -471,14 +471,15 @@ func (s *ClusteringCompactionTaskSuite) TestQueryTaskOnWorker() {
|
||||
},
|
||||
})
|
||||
cluster := session.NewMockCluster(s.T())
|
||||
cluster.EXPECT().QueryCompaction(mock.Anything, mock.Anything).Return(nil, nil).Once()
|
||||
task.QueryTaskOnWorker(cluster)
|
||||
s.Equal(datapb.CompactionTaskState_executing, task.GetTaskProto().GetState())
|
||||
cluster.EXPECT().QueryCompaction(mock.Anything, mock.Anything).Return(&datapb.CompactionPlanResult{
|
||||
State: datapb.CompactionTaskState_executing,
|
||||
}, nil).Once()
|
||||
task.QueryTaskOnWorker(cluster)
|
||||
s.Equal(datapb.CompactionTaskState_executing, task.GetTaskProto().GetState())
|
||||
|
||||
cluster.EXPECT().QueryCompaction(mock.Anything, mock.Anything).Return(nil, nil).Once()
|
||||
task.QueryTaskOnWorker(cluster)
|
||||
s.Equal(datapb.CompactionTaskState_pipelining, task.GetTaskProto().GetState())
|
||||
})
|
||||
|
||||
s.Run("QueryTaskOnWorker, scalar clustering key, compaction result ready", func() {
|
||||
|
||||
@ -125,10 +125,11 @@ func (t *l0CompactionTask) QueryTaskOnWorker(cluster session.Cluster) {
|
||||
PlanID: t.GetTaskProto().GetPlanID(),
|
||||
})
|
||||
if err != nil || result == nil {
|
||||
if errors.Is(err, merr.ErrNodeNotFound) {
|
||||
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
|
||||
}
|
||||
log.Warn("l0CompactionTask failed to get compaction result", zap.Error(err))
|
||||
err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
|
||||
if err != nil {
|
||||
log.Warn("update l0 compaction task meta failed", zap.Error(err))
|
||||
}
|
||||
return
|
||||
}
|
||||
switch result.GetState() {
|
||||
|
||||
@ -312,12 +312,10 @@ func (s *L0CompactionTaskSuite) TestPorcessStateTrans() {
|
||||
s.Require().True(t.GetTaskProto().GetNodeID() > 0)
|
||||
|
||||
cluster := session.NewMockCluster(s.T())
|
||||
cluster.EXPECT().QueryCompaction(t.GetTaskProto().NodeID, mock.Anything).Return(nil, errors.New("mock error")).Times(12)
|
||||
for i := 0; i < 12; i++ {
|
||||
t.QueryTaskOnWorker(cluster)
|
||||
s.Equal(datapb.CompactionTaskState_executing, t.GetTaskProto().GetState())
|
||||
s.EqualValues(100, t.GetTaskProto().GetNodeID())
|
||||
}
|
||||
cluster.EXPECT().QueryCompaction(t.GetTaskProto().NodeID, mock.Anything).Return(nil, errors.New("mock error"))
|
||||
t.QueryTaskOnWorker(cluster)
|
||||
s.Equal(datapb.CompactionTaskState_pipelining, t.GetTaskProto().GetState())
|
||||
s.EqualValues(-1, t.GetTaskProto().GetNodeID())
|
||||
})
|
||||
|
||||
s.Run("test executing with result executing", func() {
|
||||
|
||||
@ -126,12 +126,10 @@ func (t *mixCompactionTask) QueryTaskOnWorker(cluster session.Cluster) {
|
||||
PlanID: t.GetTaskProto().GetPlanID(),
|
||||
})
|
||||
if err != nil || result == nil {
|
||||
if errors.Is(err, merr.ErrNodeNotFound) {
|
||||
if err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)); err != nil {
|
||||
log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err))
|
||||
}
|
||||
}
|
||||
log.Warn("mixCompactionTask failed to get compaction result", zap.Error(err))
|
||||
if err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)); err != nil {
|
||||
log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err))
|
||||
}
|
||||
return
|
||||
}
|
||||
switch result.GetState() {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user