diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 391209ea55..4ed6d17b6b 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -401,7 +401,7 @@ func (s *Server) startDataCoord() { } func (s *Server) afterStart() { - go s.updateBalanceConfigLoop(s.ctx) + s.updateBalanceConfigLoop(s.ctx) } func (s *Server) initCluster() error { @@ -1114,30 +1114,47 @@ func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID i } func (s *Server) updateBalanceConfigLoop(ctx context.Context) { - log := log.Ctx(s.ctx).WithRateGroup("dc.updateBalanceConfigLoop", 1, 60) - ticker := time.NewTicker(Params.DataCoordCfg.CheckAutoBalanceConfigInterval.GetAsDuration(time.Second)) - for { - select { - case <-ctx.Done(): - log.Info("update balance config loop exit!") - return - - case <-ticker.C: - r := semver.MustParseRange("<2.3.0") - sessions, _, err := s.session.GetSessionsWithVersionRange(typeutil.DataNodeRole, r) - if err != nil { - log.Warn("check data node version occur error on etcd", zap.Error(err)) - continue - } - - if len(sessions) == 0 { - // only balance channel when all data node's version > 2.3.0 - Params.Save(Params.DataCoordCfg.AutoBalance.Key, "true") - log.Info("all old data node down, enable auto balance!") - return - } - - log.RatedDebug(10, "old data node exist", zap.Strings("sessions", lo.Keys(sessions))) - } + success := s.updateBalanceConfig() + if success { + return } + + s.serverLoopWg.Add(1) + go func() { + defer s.serverLoopWg.Done() + ticker := time.NewTicker(Params.DataCoordCfg.CheckAutoBalanceConfigInterval.GetAsDuration(time.Second)) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + log.Info("update balance config loop exit!") + return + + case <-ticker.C: + success := s.updateBalanceConfig() + if success { + return + } + } + } + }() +} + +func (s *Server) updateBalanceConfig() bool { + r := semver.MustParseRange("<2.3.0") + sessions, _, err := s.session.GetSessionsWithVersionRange(typeutil.DataNodeRole, r) + if err != nil { + log.Warn("check data node version occur error on etcd", zap.Error(err)) + return false + } + + if len(sessions) == 0 { + // only balance channel when all data node's version > 2.3.0 + Params.Save(Params.DataCoordCfg.AutoBalance.Key, "true") + log.Info("all old data node down, enable auto balance!") + return true + } + + log.RatedDebug(10, "old data node exist", zap.Strings("sessions", lo.Keys(sessions))) + return false } diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index 85f5dad1ea..f0fa01b684 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -391,7 +391,7 @@ func (s *Server) initObserver() { } func (s *Server) afterStart() { - go s.updateBalanceConfigLoop(s.ctx) + s.updateBalanceConfigLoop(s.ctx) } func (s *Server) Start() error { @@ -797,30 +797,48 @@ func (s *Server) checkReplicas() { } func (s *Server) updateBalanceConfigLoop(ctx context.Context) { - log := log.Ctx(s.ctx).WithRateGroup("qcv2.updateBalanceConfigLoop", 1, 60) - ticker := time.NewTicker(Params.QueryCoordCfg.CheckAutoBalanceConfigInterval.GetAsDuration(time.Second)) - for { - select { - case <-ctx.Done(): - log.Info("update balance config loop exit!") - return - - case <-ticker.C: - r := semver.MustParseRange("<2.3.0") - sessions, _, err := s.session.GetSessionsWithVersionRange(typeutil.QueryNodeRole, r) - if err != nil { - log.Warn("check query node version occur error on etcd", zap.Error(err)) - continue - } - - if len(sessions) == 0 { - // only balance channel when all query node's version >= 2.3.0 - Params.Save(Params.QueryCoordCfg.AutoBalance.Key, "true") - log.Info("all old query node down, enable auto balance!") - return - } - - log.RatedDebug(10, "old query node exist", zap.Strings("sessions", lo.Keys(sessions))) - } + success := s.updateBalanceConfig() + if success { + return } + + s.wg.Add(1) + go func() { + defer s.wg.Done() + ticker := time.NewTicker(Params.QueryCoordCfg.CheckAutoBalanceConfigInterval.GetAsDuration(time.Second)) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + log.Info("update balance config loop exit!") + return + + case <-ticker.C: + success := s.updateBalanceConfig() + if success { + return + } + } + } + }() +} + +func (s *Server) updateBalanceConfig() bool { + log := log.Ctx(s.ctx).WithRateGroup("qcv2.updateBalanceConfigLoop", 1, 60) + r := semver.MustParseRange("<2.3.0") + sessions, _, err := s.session.GetSessionsWithVersionRange(typeutil.QueryNodeRole, r) + if err != nil { + log.Warn("check query node version occur error on etcd", zap.Error(err)) + return false + } + + if len(sessions) == 0 { + // only balance channel when all query node's version >= 2.3.0 + Params.Save(Params.QueryCoordCfg.AutoBalance.Key, "true") + log.Info("all old query node down, enable auto balance!") + return true + } + + log.RatedDebug(10, "old query node exist", zap.Strings("sessions", lo.Keys(sessions))) + return false }