diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index ace82b9215..2c54bc8325 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -56,7 +56,8 @@ var ( ErrTaskQueueFull = errors.New("TaskQueueFull") - ErrFailedResponse = errors.New("RpcFailed") + ErrFailedResponse = errors.New("RpcFailed") + ErrTaskAlreadyDone = errors.New("TaskAlreadyDone") ) type Type = int32 @@ -260,6 +261,13 @@ func (scheduler *taskScheduler) preAdd(task Task) error { return ErrConflictTaskExisted } + if GetTaskType(task) == TaskTypeGrow { + nodesWithSegment := scheduler.distMgr.LeaderViewManager.GetSealedSegmentDist(task.SegmentID()) + replicaNodeMap := utils.GroupNodesByReplica(scheduler.meta.ReplicaManager, task.CollectionID(), nodesWithSegment) + if _, ok := replicaNodeMap[task.ReplicaID()]; ok { + return ErrTaskAlreadyDone + } + } case *ChannelTask: index := replicaChannelIndex{task.ReplicaID(), task.Channel()} @@ -280,6 +288,14 @@ func (scheduler *taskScheduler) preAdd(task Task) error { return ErrConflictTaskExisted } + if GetTaskType(task) == TaskTypeGrow { + nodesWithChannel := scheduler.distMgr.LeaderViewManager.GetChannelDist(task.Channel()) + replicaNodeMap := utils.GroupNodesByReplica(scheduler.meta.ReplicaManager, task.CollectionID(), nodesWithChannel) + if _, ok := replicaNodeMap[task.ReplicaID()]; ok { + return ErrTaskAlreadyDone + } + } + default: panic(fmt.Sprintf("preAdd: forget to process task type: %+v", task)) } diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index 3b63fcefb5..eff2a25b1d 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -145,7 +145,9 @@ func (suite *TaskSuite) BeforeTest(suiteName, testName string) { "TestLoadSegmentTaskFailed", "TestSegmentTaskStale", "TestTaskCanceled", - "TestMoveSegmentTask": + "TestMoveSegmentTask", + "TestSubmitDuplicateLoadSegmentTask", + "TestSubmitDuplicateSubscribeChannelTask": suite.meta.PutCollection(&meta.Collection{ CollectionLoadInfo: &querypb.CollectionLoadInfo{ CollectionID: suite.collection, @@ -158,6 +160,48 @@ func (suite *TaskSuite) BeforeTest(suiteName, testName string) { } } +func (suite *TaskSuite) TestSubmitDuplicateSubscribeChannelTask() { + ctx := context.Background() + timeout := 10 * time.Second + targetNode := int64(3) + + tasks := []Task{} + dmChannels := make([]*datapb.VchannelInfo, 0) + for _, channel := range suite.subChannels { + dmChannels = append(dmChannels, &datapb.VchannelInfo{ + CollectionID: suite.collection, + ChannelName: channel, + UnflushedSegmentIds: []int64{suite.growingSegments[channel]}, + }) + task, err := NewChannelTask( + ctx, + timeout, + 0, + suite.collection, + suite.replica, + NewChannelAction(targetNode, ActionTypeGrow, channel), + ) + suite.NoError(err) + tasks = append(tasks, task) + } + + views := make([]*meta.LeaderView, 0) + for _, channel := range suite.subChannels { + views = append(views, &meta.LeaderView{ + ID: targetNode, + CollectionID: suite.collection, + Channel: channel, + }) + } + suite.dist.LeaderViewManager.Update(targetNode, views...) + + for _, task := range tasks { + err := suite.scheduler.Add(task) + suite.Error(err) + suite.ErrorIs(err, ErrTaskAlreadyDone) + } +} + func (suite *TaskSuite) TestSubscribeChannelTask() { ctx := context.Background() timeout := 10 * time.Second @@ -391,6 +435,47 @@ func (suite *TaskSuite) TestLoadSegmentTask() { } } +func (suite *TaskSuite) TestSubmitDuplicateLoadSegmentTask() { + ctx := context.Background() + timeout := 10 * time.Second + targetNode := int64(3) + channel := &datapb.VchannelInfo{ + CollectionID: suite.collection, + ChannelName: Params.CommonCfg.RootCoordDml + "-test", + } + + tasks := []Task{} + for _, segment := range suite.loadSegments { + task, err := NewSegmentTask( + ctx, + timeout, + 0, + suite.collection, + suite.replica, + NewSegmentAction(targetNode, ActionTypeGrow, channel.GetChannelName(), segment), + ) + suite.NoError(err) + tasks = append(tasks, task) + } + + // Process tasks done + // Dist contains channels + view := &meta.LeaderView{ + ID: targetNode, + CollectionID: suite.collection, + Segments: map[int64]*querypb.SegmentDist{}, + } + for _, segment := range suite.loadSegments { + view.Segments[segment] = &querypb.SegmentDist{NodeID: targetNode, Version: 0} + } + suite.dist.LeaderViewManager.Update(targetNode, view) + + for _, task := range tasks { + err := suite.scheduler.Add(task) + suite.Error(err) + suite.ErrorIs(err, ErrTaskAlreadyDone) + } +} func (suite *TaskSuite) TestLoadSegmentTaskFailed() { ctx := context.Background() timeout := 10 * time.Second