fix: Make controller wait checker worker quit and add nil protection (#42704)

Related to #42702

This patch add wait logic for `CheckerController` and nil check for
channel checker in case of panicking during server/testcase stop
procedure

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2025-06-13 13:20:35 +08:00 committed by GitHub
parent ca48603f35
commit d59002d45e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 12 additions and 1 deletions

View File

@ -191,6 +191,10 @@ func (c *ChannelChecker) findRepeatedChannels(ctx context.Context, replicaID int
delegatorList := c.dist.ChannelDistManager.GetByCollectionAndFilter(replica.GetCollectionID(), meta.WithReplica2Channel(replica))
for _, delegator := range delegatorList {
leader := c.dist.ChannelDistManager.GetShardLeader(delegator.GetChannelName(), replica)
if leader == nil {
log.Warn("channel leader does not exist, skip it", zap.String("channel", delegator.GetChannelName()))
continue
}
// if channel's version is smaller than shard leader's version, it means that the channel is not up to date
if delegator.Version < leader.Version && delegator.Node != leader.Node {
dupChannels = append(dupChannels, delegator)

View File

@ -50,6 +50,7 @@ type CheckerController struct {
scheduler task.Scheduler
checkers map[utils.CheckerType]Checker
wg sync.WaitGroup
stopOnce sync.Once
}
@ -96,7 +97,11 @@ func (controller *CheckerController) Start() {
controller.cancel = cancel
for checker := range controller.checkers {
go controller.startChecker(ctx, checker)
controller.wg.Add(1)
go func() {
defer controller.wg.Done()
controller.startChecker(ctx, checker)
}()
}
}
@ -145,6 +150,8 @@ func (controller *CheckerController) Stop() {
if controller.cancel != nil {
controller.cancel()
}
controller.wg.Wait()
})
}