diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 321771a581..25eb970eb8 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -244,7 +244,7 @@ proxy: # Related configuration of queryCoord, used to manage topology and load balancing for the query nodes, and handoff from growing segments to sealed segments. queryCoord: autoHandoff: true # Enable auto handoff - autoBalance: false # Enable auto balance + autoBalance: true # Enable auto balance balancer: ScoreBasedBalancer # Balancer to use globalRowCountFactor: 0.1 # expert parameters, only used by scoreBasedBalancer scoreUnbalanceTolerationFactor: 0.05 # expert parameters, only used by scoreBasedBalancer diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index b9534ddd56..fe45a04552 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -350,6 +350,9 @@ func (s *Server) initDataCoord() error { s.handler = newServerHandler(s) + // check whether old node exist, if yes suspend auto balance until all old nodes down + s.updateBalanceConfigLoop(s.ctx) + if err = s.initCluster(); err != nil { return err } @@ -450,9 +453,7 @@ func (s *Server) startDataCoord() { sessionutil.SaveServerInfo(typeutil.DataCoordRole, s.session.GetServerID()) } -func (s *Server) afterStart() { - s.updateBalanceConfigLoop(s.ctx) -} +func (s *Server) afterStart() {} func (s *Server) initCluster() error { if s.cluster != nil { @@ -1209,11 +1210,12 @@ func (s *Server) updateBalanceConfig() bool { if len(sessions) == 0 { // only balance channel when all data node's version > 2.3.0 - Params.Save(Params.DataCoordCfg.AutoBalance.Key, "true") + Params.Reset(Params.DataCoordCfg.AutoBalance.Key) log.Info("all old data node down, enable auto balance!") return true } + Params.Save(Params.DataCoordCfg.AutoBalance.Key, "false") log.RatedDebug(10, "old data node exist", zap.Strings("sessions", lo.Keys(sessions))) return false } diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 0f6fb21ce2..aeebd3df68 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -4810,8 +4810,10 @@ func TestUpdateAutoBalanceConfigLoop(t *testing.T) { server.session = mockSession ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + wg := &sync.WaitGroup{} + wg.Add(1) go func() { + defer wg.Done() time.Sleep(1500 * time.Millisecond) server.updateBalanceConfigLoop(ctx) }() @@ -4819,6 +4821,9 @@ func TestUpdateAutoBalanceConfigLoop(t *testing.T) { assert.Eventually(t, func() bool { return !Params.DataCoordCfg.AutoBalance.GetAsBool() }, 3*time.Second, 1*time.Second) + + cancel() + wg.Wait() }) t.Run("test all old node down", func(t *testing.T) { @@ -4830,11 +4835,18 @@ func TestUpdateAutoBalanceConfigLoop(t *testing.T) { server.session = mockSession ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - go server.updateBalanceConfigLoop(ctx) + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + server.updateBalanceConfigLoop(ctx) + }() // all old data node down, enable auto balance assert.Eventually(t, func() bool { return Params.DataCoordCfg.AutoBalance.GetAsBool() }, 3*time.Second, 1*time.Second) + + cancel() + wg.Wait() }) } diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index 56924add2c..caa2285a6d 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -395,9 +395,7 @@ func (s *Server) initObserver() { s.resourceObserver = observers.NewResourceObserver(s.meta) } -func (s *Server) afterStart() { - s.updateBalanceConfigLoop(s.ctx) -} +func (s *Server) afterStart() {} func (s *Server) Start() error { if !s.enableActiveStandBy { @@ -432,6 +430,9 @@ func (s *Server) startQueryCoord() error { go s.handleNodeUpLoop() go s.watchNodes(revision) + // check whether old node exist, if yes suspend auto balance until all old nodes down + s.updateBalanceConfigLoop(s.ctx) + // Recover dist, to avoid generate too much task when dist not ready after restart s.distController.SyncAll(s.ctx) @@ -840,11 +841,12 @@ func (s *Server) updateBalanceConfig() bool { if len(sessions) == 0 { // only balance channel when all query node's version >= 2.3.0 - Params.Save(Params.QueryCoordCfg.AutoBalance.Key, "true") + Params.Reset(Params.QueryCoordCfg.AutoBalance.Key) log.Info("all old query node down, enable auto balance!") return true } + Params.Save(Params.QueryCoordCfg.AutoBalance.Key, "false") log.RatedDebug(10, "old query node exist", zap.Strings("sessions", lo.Keys(sessions))) return false } diff --git a/internal/querycoordv2/server_test.go b/internal/querycoordv2/server_test.go index 80262832fd..ed7a979d4b 100644 --- a/internal/querycoordv2/server_test.go +++ b/internal/querycoordv2/server_test.go @@ -21,6 +21,7 @@ import ( "math/rand" "os" "strings" + "sync" "testing" "time" @@ -375,12 +376,20 @@ func (suite *ServerSuite) TestUpdateAutoBalanceConfigLoop() { mockSession.EXPECT().GetSessionsWithVersionRange(mock.Anything, mock.Anything).Return(oldSessions, 0, nil).Maybe() ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - go server.updateBalanceConfigLoop(ctx) + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + time.Sleep(1500 * time.Millisecond) + server.updateBalanceConfigLoop(ctx) + }() // old query node exist, disable auto balance suite.Eventually(func() bool { return !Params.QueryCoordCfg.AutoBalance.GetAsBool() }, 5*time.Second, 1*time.Second) + + cancel() + wg.Wait() }) suite.Run("all old node down", func() { @@ -392,12 +401,20 @@ func (suite *ServerSuite) TestUpdateAutoBalanceConfigLoop() { mockSession.EXPECT().GetSessionsWithVersionRange(mock.Anything, mock.Anything).Return(nil, 0, nil).Maybe() ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - go server.updateBalanceConfigLoop(ctx) + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + time.Sleep(1500 * time.Millisecond) + server.updateBalanceConfigLoop(ctx) + }() // all old query node down, enable auto balance suite.Eventually(func() bool { return Params.QueryCoordCfg.AutoBalance.GetAsBool() }, 5*time.Second, 1*time.Second) + + cancel() + wg.Wait() }) } diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 06c6c9316d..ccdf7c2657 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -1366,7 +1366,7 @@ func (p *queryCoordConfig) init(base *BaseTable) { p.AutoBalance = ParamItem{ Key: "queryCoord.autoBalance", Version: "2.0.0", - DefaultValue: "false", + DefaultValue: "true", PanicIfEmpty: true, Doc: "Enable auto balance", Export: true, @@ -2619,7 +2619,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.AutoBalance = ParamItem{ Key: "dataCoord.autoBalance", Version: "2.3.3", - DefaultValue: "false", + DefaultValue: "true", PanicIfEmpty: true, Doc: "Enable auto balance", Export: true, diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 327aab4934..36efea0817 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -279,7 +279,7 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, 10000, Params.BalanceCheckInterval.GetAsInt()) assert.Equal(t, 10000, Params.IndexCheckInterval.GetAsInt()) assert.Equal(t, 3, Params.CollectionRecoverTimesLimit.GetAsInt()) - assert.Equal(t, false, Params.AutoBalance.GetAsBool()) + assert.Equal(t, true, Params.AutoBalance.GetAsBool()) assert.Equal(t, true, Params.AutoBalanceChannel.GetAsBool()) assert.Equal(t, 10, Params.CheckAutoBalanceConfigInterval.GetAsInt()) }) @@ -355,7 +355,7 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, Params.EnableActiveStandby.GetAsBool(), false) t.Logf("dataCoord EnableActiveStandby = %t", Params.EnableActiveStandby.GetAsBool()) - assert.Equal(t, false, Params.AutoBalance.GetAsBool()) + assert.Equal(t, true, Params.AutoBalance.GetAsBool()) assert.Equal(t, 10, Params.CheckAutoBalanceConfigInterval.GetAsInt()) assert.Equal(t, false, Params.AutoUpgradeSegmentIndex.GetAsBool()) })