diff --git a/internal/querycoordv2/checkers/channel_checker.go b/internal/querycoordv2/checkers/channel_checker.go index 86bdf3c374..651191e8b5 100644 --- a/internal/querycoordv2/checkers/channel_checker.go +++ b/internal/querycoordv2/checkers/channel_checker.go @@ -191,6 +191,10 @@ func (c *ChannelChecker) findRepeatedChannels(ctx context.Context, replicaID int delegatorList := c.dist.ChannelDistManager.GetByCollectionAndFilter(replica.GetCollectionID(), meta.WithReplica2Channel(replica)) for _, delegator := range delegatorList { leader := c.dist.ChannelDistManager.GetShardLeader(delegator.GetChannelName(), replica) + if leader == nil { + log.Warn("channel leader does not exist, skip it", zap.String("channel", delegator.GetChannelName())) + continue + } // if channel's version is smaller than shard leader's version, it means that the channel is not up to date if delegator.Version < leader.Version && delegator.Node != leader.Node { dupChannels = append(dupChannels, delegator) diff --git a/internal/querycoordv2/checkers/controller.go b/internal/querycoordv2/checkers/controller.go index 5df2c5ca0d..cbe2cb2be5 100644 --- a/internal/querycoordv2/checkers/controller.go +++ b/internal/querycoordv2/checkers/controller.go @@ -50,6 +50,7 @@ type CheckerController struct { scheduler task.Scheduler checkers map[utils.CheckerType]Checker + wg sync.WaitGroup stopOnce sync.Once } @@ -96,7 +97,11 @@ func (controller *CheckerController) Start() { controller.cancel = cancel for checker := range controller.checkers { - go controller.startChecker(ctx, checker) + controller.wg.Add(1) + go func() { + defer controller.wg.Done() + controller.startChecker(ctx, checker) + }() } } @@ -145,6 +150,8 @@ func (controller *CheckerController) Stop() { if controller.cancel != nil { controller.cancel() } + + controller.wg.Wait() }) }