diff --git a/internal/querycoordv2/task/executor.go b/internal/querycoordv2/task/executor.go index ba071f7314..12d836efd1 100644 --- a/internal/querycoordv2/task/executor.go +++ b/internal/querycoordv2/task/executor.go @@ -284,33 +284,39 @@ func (ex *Executor) releaseSegment(task *SegmentTask, step int) { ctx, cancel := context.WithTimeout(task.Context(), actionTimeout) defer cancel() - var targetSegment *meta.Segment - segments := ex.dist.SegmentDistManager.GetByNode(action.Node()) - for _, segment := range segments { - if segment.ID == task.SegmentID() { - targetSegment = segment - break - } - } - if targetSegment == nil { - log.Info("segment to release not found in distribution") - return - } - - req := packReleaseSegmentRequest(task, action, targetSegment.GetInsertChannel()) - - // Get shard leader for the given replica and segment dstNode := action.Node() - if ex.meta.CollectionManager.Exist(task.CollectionID()) { - leader, ok := getShardLeader(ex.meta.ReplicaManager, ex.dist, task.CollectionID(), action.Node(), targetSegment.GetInsertChannel()) - if !ok { - log.Warn("no shard leader for the segment to execute loading", zap.String("shard", targetSegment.GetInsertChannel())) + req := packReleaseSegmentRequest(task, action) + if action.Scope() != querypb.DataScope_Streaming { + var targetSegment *meta.Segment + segments := ex.dist.SegmentDistManager.GetByNode(action.Node()) + for _, segment := range segments { + if segment.GetID() == task.SegmentID() { + targetSegment = segment + break + } + } + if targetSegment == nil { + log.Info("segment to release not found in distribution") return } - dstNode = leader - log = log.With(zap.Int64("shardLeader", leader)) + req.Shard = targetSegment.GetInsertChannel() + + if ex.meta.CollectionManager.Exist(task.CollectionID()) { + leader, ok := getShardLeader(ex.meta.ReplicaManager, ex.dist, task.CollectionID(), action.Node(), req.GetShard()) + if !ok { + log.Warn("no shard leader for the segment to execute loading", zap.String("shard", req.GetShard())) + return + } + dstNode = leader + log = log.With(zap.Int64("shardLeader", leader)) + req.NeedTransfer = true + } + } else { + // Any modification to the segment distribution have to set NeedTransfer true, + // to protect the version, which serves search/query req.NeedTransfer = true } + log.Info("release segment...") status, err := ex.cluster.ReleaseSegments(ctx, dstNode, req) if err != nil { diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index 9eff259267..658f8f9336 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -513,6 +513,50 @@ func (suite *TaskSuite) TestReleaseSegmentTask() { } } +func (suite *TaskSuite) TestReleaseGrowingSegmentTask() { + ctx := context.Background() + timeout := 10 * time.Second + targetNode := int64(3) + + // Expect + suite.cluster.EXPECT().ReleaseSegments(mock.Anything, targetNode, mock.Anything).Return(utils.WrapStatus(commonpb.ErrorCode_Success, ""), nil) + + tasks := []Task{} + for _, segment := range suite.releaseSegments { + task := NewSegmentTask( + ctx, + timeout, + 0, + suite.collection, + suite.replica, + NewSegmentActionWithScope(targetNode, ActionTypeReduce, segment, querypb.DataScope_Streaming), + ) + tasks = append(tasks, task) + err := suite.scheduler.Add(task) + suite.NoError(err) + } + + segmentsNum := len(suite.releaseSegments) + suite.AssertTaskNum(0, segmentsNum, 0, segmentsNum) + + // Process tasks + suite.dispatchAndWait(targetNode) + suite.AssertTaskNum(segmentsNum, 0, 0, segmentsNum) + + // Other nodes' HB can't trigger the procedure of tasks + suite.dispatchAndWait(targetNode + 1) + suite.AssertTaskNum(segmentsNum, 0, 0, segmentsNum) + + // Process tasks done + suite.dispatchAndWait(targetNode) + suite.AssertTaskNum(0, 0, 0, 0) + + for _, task := range tasks { + suite.Equal(TaskStatusSucceeded, task.Status()) + suite.NoError(task.Err()) + } +} + func (suite *TaskSuite) TestMoveSegmentTask() { ctx := context.Background() timeout := 10 * time.Second diff --git a/internal/querycoordv2/task/utils.go b/internal/querycoordv2/task/utils.go index 70360a8f84..6fd479dfad 100644 --- a/internal/querycoordv2/task/utils.go +++ b/internal/querycoordv2/task/utils.go @@ -77,7 +77,7 @@ func packLoadSegmentRequest( } } -func packReleaseSegmentRequest(task *SegmentTask, action *SegmentAction, shard string) *querypb.ReleaseSegmentsRequest { +func packReleaseSegmentRequest(task *SegmentTask, action *SegmentAction) *querypb.ReleaseSegmentsRequest { return &querypb.ReleaseSegmentsRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_ReleaseSegments, @@ -87,7 +87,6 @@ func packReleaseSegmentRequest(task *SegmentTask, action *SegmentAction, shard s NodeID: action.Node(), CollectionID: task.CollectionID(), SegmentIDs: []int64{task.SegmentID()}, - Shard: shard, Scope: action.Scope(), NeedTransfer: false, }