diff --git a/internal/querycoordv2/checkers/leader_checker.go b/internal/querycoordv2/checkers/leader_checker.go index 16b059daaf..eacee155b9 100644 --- a/internal/querycoordv2/checkers/leader_checker.go +++ b/internal/querycoordv2/checkers/leader_checker.go @@ -66,6 +66,13 @@ func (c *LeaderChecker) Description() string { return "LeaderChecker checks the difference of leader view between dist, and try to correct it" } +func (c *LeaderChecker) readyToCheck(collectionID int64) bool { + metaExist := (c.meta.GetCollection(collectionID) != nil) + targetExist := c.target.IsNextTargetExist(collectionID) || c.target.IsCurrentTargetExist(collectionID) + + return metaExist && targetExist +} + func (c *LeaderChecker) Check(ctx context.Context) []task.Task { if !c.IsActive() { return nil @@ -75,6 +82,9 @@ func (c *LeaderChecker) Check(ctx context.Context) []task.Task { tasks := make([]task.Task, 0) for _, collectionID := range collectionIDs { + if !c.readyToCheck(collectionID) { + continue + } collection := c.meta.CollectionManager.GetCollection(collectionID) if collection == nil { log.Warn("collection released during check leader", zap.Int64("collection", collectionID)) @@ -180,7 +190,7 @@ func (c *LeaderChecker) findNeedRemovedSegments(ctx context.Context, replica int zap.Int64("segmentID", sid), zap.Int64("nodeID", s.NodeID)) - action := task.NewSegmentActionWithScope(leaderView.ID, task.ActionTypeReduce, leaderView.Channel, sid, querypb.DataScope_Historical) + action := task.NewSegmentActionWithScope(s.NodeID, task.ActionTypeReduce, leaderView.Channel, sid, querypb.DataScope_Historical) t, err := task.NewSegmentTask( ctx, paramtable.Get().QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond), diff --git a/internal/querycoordv2/checkers/leader_checker_test.go b/internal/querycoordv2/checkers/leader_checker_test.go index bfdf7a0f37..d28c0ccafa 100644 --- a/internal/querycoordv2/checkers/leader_checker_test.go +++ b/internal/querycoordv2/checkers/leader_checker_test.go @@ -101,6 +101,10 @@ func (suite *LeaderCheckerTestSuite) TestSyncLoadedSegments() { } suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( channels, segments, nil) + + // before target ready, should skip check collection + tasks := suite.checker.Check(context.TODO()) + suite.Len(tasks, 0) observer.target.UpdateCollectionNextTarget(int64(1)) observer.target.UpdateCollectionCurrentTarget(1) observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 2, 1, "test-insert-channel")) @@ -109,7 +113,7 @@ func (suite *LeaderCheckerTestSuite) TestSyncLoadedSegments() { view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget) observer.dist.LeaderViewManager.Update(2, view) - tasks := suite.checker.Check(context.TODO()) + tasks = suite.checker.Check(context.TODO()) suite.Len(tasks, 1) suite.Equal(tasks[0].Source(), utils.LeaderChecker) suite.Len(tasks[0].Actions(), 1) @@ -353,7 +357,7 @@ func (suite *LeaderCheckerTestSuite) TestSyncRemovedSegments() { observer.target.UpdateCollectionCurrentTarget(1) observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel")) - view := utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{3: 2}, map[int64]*meta.Segment{}) + view := utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{3: 1}, map[int64]*meta.Segment{}) view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget) observer.dist.LeaderViewManager.Update(2, view) @@ -363,7 +367,7 @@ func (suite *LeaderCheckerTestSuite) TestSyncRemovedSegments() { suite.Equal(tasks[0].ReplicaID(), int64(1)) suite.Len(tasks[0].Actions(), 1) suite.Equal(tasks[0].Actions()[0].Type(), task.ActionTypeReduce) - suite.Equal(tasks[0].Actions()[0].Node(), int64(2)) + suite.Equal(tasks[0].Actions()[0].Node(), int64(1)) suite.Equal(tasks[0].Actions()[0].(*task.SegmentAction).SegmentID(), int64(3)) suite.Equal(tasks[0].Priority(), task.TaskPriorityHigh) } diff --git a/internal/querycoordv2/task/executor.go b/internal/querycoordv2/task/executor.go index b08bc33815..cef2527bf8 100644 --- a/internal/querycoordv2/task/executor.go +++ b/internal/querycoordv2/task/executor.go @@ -286,19 +286,7 @@ func (ex *Executor) releaseSegment(task *SegmentTask, step int) { // to protect the version, which serves search/query req.NeedTransfer = true } else { - var targetSegment *meta.Segment - segments := ex.dist.SegmentDistManager.GetByNode(action.Node()) - for _, segment := range segments { - if segment.GetID() == task.SegmentID() { - targetSegment = segment - break - } - } - if targetSegment == nil { - log.Info("segment to release not found in distribution") - return - } - req.Shard = targetSegment.GetInsertChannel() + req.Shard = task.shard if ex.meta.CollectionManager.Exist(task.CollectionID()) { leader, ok := getShardLeader(ex.meta.ReplicaManager, ex.dist, task.CollectionID(), action.Node(), req.GetShard())