diff --git a/internal/querycoordv2/checkers/leader_checker.go b/internal/querycoordv2/checkers/leader_checker.go index 2b9275f95e..99e7937e3e 100644 --- a/internal/querycoordv2/checkers/leader_checker.go +++ b/internal/querycoordv2/checkers/leader_checker.go @@ -189,7 +189,9 @@ func (c *LeaderChecker) findNeedRemovedSegments(ctx context.Context, replica *me log.Debug("leader checker append a segment to remove", zap.Int64("segmentID", sid), zap.Int64("nodeID", s.NodeID)) - action := task.NewLeaderAction(leaderView.ID, s.NodeID, task.ActionTypeReduce, leaderView.Channel, sid, 0) + // reduce leader action won't be execute on worker, in order to remove segment from delegator success even when worker done + // set workerID to leader view's node + action := task.NewLeaderAction(leaderView.ID, leaderView.ID, task.ActionTypeReduce, leaderView.Channel, sid, 0) t := task.NewLeaderTask( ctx, paramtable.Get().QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond), diff --git a/internal/querycoordv2/checkers/leader_checker_test.go b/internal/querycoordv2/checkers/leader_checker_test.go index 5552e513bf..f40563483e 100644 --- a/internal/querycoordv2/checkers/leader_checker_test.go +++ b/internal/querycoordv2/checkers/leader_checker_test.go @@ -409,7 +409,7 @@ func (suite *LeaderCheckerTestSuite) TestSyncRemovedSegments() { suite.Equal(tasks[0].ReplicaID(), int64(1)) suite.Len(tasks[0].Actions(), 1) suite.Equal(tasks[0].Actions()[0].Type(), task.ActionTypeReduce) - suite.Equal(tasks[0].Actions()[0].Node(), int64(1)) + suite.Equal(tasks[0].Actions()[0].Node(), int64(2)) suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).SegmentID(), int64(3)) suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).Version(), int64(0)) suite.Equal(tasks[0].Priority(), task.TaskPriorityLow) diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index 7755911637..1bbcdf222e 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -1425,9 +1425,10 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi return merr.Status(err), nil } + // in case of target node offline, when try to remove segment from leader's distribution, use wildcardNodeID(-1) to skip nodeID check for _, action := range removeActions { shardDelegator.ReleaseSegments(ctx, &querypb.ReleaseSegmentsRequest{ - NodeID: action.GetNodeID(), + NodeID: -1, SegmentIDs: []int64{action.GetSegmentID()}, Scope: querypb.DataScope_Historical, CollectionID: req.GetCollectionID(), diff --git a/internal/querynodev2/services_test.go b/internal/querynodev2/services_test.go index a081ca4783..628689233e 100644 --- a/internal/querynodev2/services_test.go +++ b/internal/querynodev2/services_test.go @@ -1920,7 +1920,7 @@ func (suite *ServiceSuite) TestSyncDistribution_ReleaseResultCheck() { suite.NoError(err) suite.Equal(commonpb.ErrorCode_Success, status.ErrorCode) sealedSegments, _ = delegator.GetSegmentInfo(false) - suite.Len(sealedSegments[0].Segments, 4) + suite.Len(sealedSegments[0].Segments, 3) releaseAction = &querypb.SyncAction{ Type: querypb.SyncType_Remove, @@ -1934,7 +1934,7 @@ func (suite *ServiceSuite) TestSyncDistribution_ReleaseResultCheck() { suite.NoError(err) suite.Equal(commonpb.ErrorCode_Success, status.ErrorCode) sealedSegments, _ = delegator.GetSegmentInfo(false) - suite.Len(sealedSegments[0].Segments, 3) + suite.Len(sealedSegments[0].Segments, 2) } func (suite *ServiceSuite) TestSyncDistribution_Failed() {