From 918333817e3e28e2b64f86ed7ea364f519366fc8 Mon Sep 17 00:00:00 2001 From: wei liu Date: Wed, 8 Nov 2023 07:10:17 +0800 Subject: [PATCH] Disable auto balance when old node exists (#28191) (#28224) Signed-off-by: Wei Liu --- Makefile | 1 + configs/milvus.yaml | 2 +- internal/datacoord/channel_manager.go | 8 + internal/datacoord/channel_manager_test.go | 85 +++- internal/datacoord/index_service.go | 2 +- internal/datacoord/metrics_info.go | 2 +- internal/datacoord/mock_channel_store.go | 460 ++++++++++++++++++++ internal/datacoord/server.go | 45 +- internal/datacoord/server_test.go | 40 ++ internal/datacoord/services.go | 2 +- internal/querycoordv2/handlers.go | 2 +- internal/querycoordv2/server.go | 44 +- internal/querycoordv2/server_test.go | 43 ++ internal/util/sessionutil/mock_session.go | 123 ++++++ internal/util/sessionutil/session.go | 4 + internal/util/sessionutil/session_util.go | 12 + pkg/util/paramtable/component_param.go | 56 ++- pkg/util/paramtable/component_param_test.go | 5 + 18 files changed, 888 insertions(+), 48 deletions(-) create mode 100644 internal/datacoord/mock_channel_store.go diff --git a/Makefile b/Makefile index 4894acbc58..f9ca8989f6 100644 --- a/Makefile +++ b/Makefile @@ -425,6 +425,7 @@ generate-mockery-datacoord: getdeps $(INSTALL_PATH)/mockery --name=compactionPlanContext --dir=internal/datacoord --filename=mock_compaction_plan_context.go --output=internal/datacoord --structname=MockCompactionPlanContext --with-expecter --inpackage $(INSTALL_PATH)/mockery --name=Handler --dir=internal/datacoord --filename=mock_handler.go --output=internal/datacoord --structname=NMockHandler --with-expecter --inpackage $(INSTALL_PATH)/mockery --name=allocator --dir=internal/datacoord --filename=mock_allocator_test.go --output=internal/datacoord --structname=NMockAllocator --with-expecter --inpackage + $(INSTALL_PATH)/mockery --name=RWChannelStore --dir=internal/datacoord --filename=mock_channel_store.go --output=internal/datacoord --structname=MockRWChannelStore --with-expecter --inpackage generate-mockery-datanode: getdeps $(INSTALL_PATH)/mockery --name=Allocator --dir=$(PWD)/internal/datanode/allocator --output=$(PWD)/internal/datanode/allocator --filename=mock_allocator.go --with-expecter --structname=MockAllocator --outpkg=allocator --inpackage diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 1440bf74ed..8da26b5fdd 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -221,7 +221,7 @@ proxy: # Related configuration of queryCoord, used to manage topology and load balancing for the query nodes, and handoff from growing segments to sealed segments. queryCoord: autoHandoff: true # Enable auto handoff - autoBalance: true # Enable auto balance + autoBalance: false # Enable auto balance balancer: ScoreBasedBalancer # Balancer to use globalRowCountFactor: 0.1 # expert parameters, only used by scoreBasedBalancer scoreUnbalanceTolerationFactor: 0.05 # expert parameters, only used by scoreBasedBalancer diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index 9feb380db6..66605ed030 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -276,6 +276,10 @@ func (c *ChannelManager) bgCheckChannelsWork(ctx context.Context) { log.Info("background checking channels loop quit") return case <-ticker.C: + if !Params.DataCoordCfg.AutoBalance.GetAsBool() { + return + } + c.mu.Lock() if !c.isSilent() { log.Info("ChannelManager is not silent, skip channel balance this round") @@ -343,6 +347,10 @@ func (c *ChannelManager) AddNode(nodeID int64) error { c.store.Add(nodeID) + if !Params.DataCoordCfg.AutoBalance.GetAsBool() { + return nil + } + updates := c.registerPolicy(c.store, nodeID) if len(updates) <= 0 { log.Info("register node with no reassignment", zap.Int64("registered node", nodeID)) diff --git a/internal/datacoord/channel_manager_test.go b/internal/datacoord/channel_manager_test.go index cf7eaf12b6..a6d0cbb258 100644 --- a/internal/datacoord/channel_manager_test.go +++ b/internal/datacoord/channel_manager_test.go @@ -28,6 +28,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "go.uber.org/atomic" "github.com/milvus-io/milvus/internal/kv" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -388,6 +389,8 @@ func TestChannelManager(t *testing.T) { watchkv.Close() }() + Params.Save(Params.DataCoordCfg.AutoBalance.Key, "true") + prefix := Params.CommonCfg.DataCoordWatchSubPath.GetValue() t.Run("test AddNode with avalible node", func(t *testing.T) { // Note: this test is based on the default registerPolicy @@ -1063,6 +1066,7 @@ func TestChannelManager_BalanceBehaviour(t *testing.T) { prefix := Params.CommonCfg.DataCoordWatchSubPath.GetValue() + Params.Save(Params.DataCoordCfg.AutoBalance.Key, "true") t.Run("one node with three channels add a new node", func(t *testing.T) { defer watchkv.RemoveWithPrefix("") @@ -1092,26 +1096,6 @@ func TestChannelManager_BalanceBehaviour(t *testing.T) { chManager.AddNode(2) channelBalanced = "channel-1" - // waitAndStore := func(waitState, storeState datapb.ChannelWatchState, nodeID UniqueID, channelName string) { - // for { - // key := path.Join(prefix, strconv.FormatInt(nodeID, 10), channelName) - // v, err := watchkv.Load(key) - // if err == nil && len(v) > 0 { - // watchInfo, err := parseWatchInfo(key, []byte(v)) - // require.NoError(t, err) - // require.Equal(t, waitState, watchInfo.GetState()) - // - // watchInfo.State = storeState - // data, err := proto.Marshal(watchInfo) - // require.NoError(t, err) - // - // watchkv.Save(key, string(data)) - // break - // } - // time.Sleep(100 * time.Millisecond) - // } - // } - key := path.Join(prefix, "1", channelBalanced) waitAndStore(t, watchkv, key, datapb.ChannelWatchState_ToRelease, datapb.ChannelWatchState_ReleaseSuccess) @@ -1120,7 +1104,6 @@ func TestChannelManager_BalanceBehaviour(t *testing.T) { assert.True(t, chManager.Match(1, "channel-2")) assert.True(t, chManager.Match(1, "channel-3")) - assert.True(t, chManager.Match(2, "channel-1")) chManager.AddNode(3) @@ -1260,3 +1243,63 @@ func TestChannelManager_HelperFunc(t *testing.T) { } }) } + +func TestChannelManager_BackgroundChannelChecker(t *testing.T) { + Params.Save(Params.DataCoordCfg.ChannelBalanceInterval.Key, "1") + Params.Save(Params.DataCoordCfg.ChannelBalanceSilentDuration.Key, "1") + + watchkv := getWatchKV(t) + defer func() { + watchkv.RemoveWithPrefix("") + watchkv.Close() + }() + + defer watchkv.RemoveWithPrefix("") + + c, err := NewChannelManager(watchkv, newMockHandler(), withStateChecker()) + require.NoError(t, err) + mockStore := NewMockRWChannelStore(t) + mockStore.EXPECT().GetNodesChannels().Return([]*NodeChannelInfo{ + { + NodeID: 1, + Channels: []*channel{ + { + Name: "channel-1", + }, + { + Name: "channel-2", + }, + { + Name: "channel-3", + }, + }, + }, + { + NodeID: 2, + }, + }).Maybe() + c.store = mockStore + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go c.bgCheckChannelsWork(ctx) + + updateCounter := atomic.NewInt64(0) + + mockStore.EXPECT().Update(mock.Anything).Run(func(op ChannelOpSet) { + updateCounter.Inc() + }).Return(nil).Maybe() + + t.Run("test disable auto balance", func(t *testing.T) { + assert.Eventually(t, func() bool { + return updateCounter.Load() == 0 + }, 5*time.Second, 1*time.Second) + }) + + t.Run("test enable auto balance", func(t *testing.T) { + Params.Save(Params.DataCoordCfg.AutoBalance.Key, "true") + assert.Eventually(t, func() bool { + return updateCounter.Load() > 0 + }, 5*time.Second, 1*time.Second) + }) +} diff --git a/internal/datacoord/index_service.go b/internal/datacoord/index_service.go index 38abfd2568..d9e76de8ec 100644 --- a/internal/datacoord/index_service.go +++ b/internal/datacoord/index_service.go @@ -37,7 +37,7 @@ import ( // serverID return the session serverID func (s *Server) serverID() int64 { if s.session != nil { - return s.session.ServerID + return s.session.GetServerID() } // return 0 if no session exist, only for UT return 0 diff --git a/internal/datacoord/metrics_info.go b/internal/datacoord/metrics_info.go index fb74df46e2..c2e8e2fd80 100644 --- a/internal/datacoord/metrics_info.go +++ b/internal/datacoord/metrics_info.go @@ -108,7 +108,7 @@ func (s *Server) getDataCoordMetrics() metricsinfo.DataCoordInfos { BaseComponentInfos: metricsinfo.BaseComponentInfos{ Name: metricsinfo.ConstructComponentName(typeutil.DataCoordRole, paramtable.GetNodeID()), HardwareInfos: metricsinfo.HardwareMetrics{ - IP: s.session.Address, + IP: s.session.GetAddress(), CPUCoreCount: hardware.GetCPUNum(), CPUCoreUsage: hardware.GetCPUUsage(), Memory: hardware.GetMemoryCount(), diff --git a/internal/datacoord/mock_channel_store.go b/internal/datacoord/mock_channel_store.go new file mode 100644 index 0000000000..d7790c66bd --- /dev/null +++ b/internal/datacoord/mock_channel_store.go @@ -0,0 +1,460 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package datacoord + +import mock "github.com/stretchr/testify/mock" + +// MockRWChannelStore is an autogenerated mock type for the RWChannelStore type +type MockRWChannelStore struct { + mock.Mock +} + +type MockRWChannelStore_Expecter struct { + mock *mock.Mock +} + +func (_m *MockRWChannelStore) EXPECT() *MockRWChannelStore_Expecter { + return &MockRWChannelStore_Expecter{mock: &_m.Mock} +} + +// Add provides a mock function with given fields: nodeID +func (_m *MockRWChannelStore) Add(nodeID int64) { + _m.Called(nodeID) +} + +// MockRWChannelStore_Add_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Add' +type MockRWChannelStore_Add_Call struct { + *mock.Call +} + +// Add is a helper method to define mock.On call +// - nodeID int64 +func (_e *MockRWChannelStore_Expecter) Add(nodeID interface{}) *MockRWChannelStore_Add_Call { + return &MockRWChannelStore_Add_Call{Call: _e.mock.On("Add", nodeID)} +} + +func (_c *MockRWChannelStore_Add_Call) Run(run func(nodeID int64)) *MockRWChannelStore_Add_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64)) + }) + return _c +} + +func (_c *MockRWChannelStore_Add_Call) Return() *MockRWChannelStore_Add_Call { + _c.Call.Return() + return _c +} + +func (_c *MockRWChannelStore_Add_Call) RunAndReturn(run func(int64)) *MockRWChannelStore_Add_Call { + _c.Call.Return(run) + return _c +} + +// Delete provides a mock function with given fields: nodeID +func (_m *MockRWChannelStore) Delete(nodeID int64) ([]*channel, error) { + ret := _m.Called(nodeID) + + var r0 []*channel + var r1 error + if rf, ok := ret.Get(0).(func(int64) ([]*channel, error)); ok { + return rf(nodeID) + } + if rf, ok := ret.Get(0).(func(int64) []*channel); ok { + r0 = rf(nodeID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*channel) + } + } + + if rf, ok := ret.Get(1).(func(int64) error); ok { + r1 = rf(nodeID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockRWChannelStore_Delete_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Delete' +type MockRWChannelStore_Delete_Call struct { + *mock.Call +} + +// Delete is a helper method to define mock.On call +// - nodeID int64 +func (_e *MockRWChannelStore_Expecter) Delete(nodeID interface{}) *MockRWChannelStore_Delete_Call { + return &MockRWChannelStore_Delete_Call{Call: _e.mock.On("Delete", nodeID)} +} + +func (_c *MockRWChannelStore_Delete_Call) Run(run func(nodeID int64)) *MockRWChannelStore_Delete_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64)) + }) + return _c +} + +func (_c *MockRWChannelStore_Delete_Call) Return(_a0 []*channel, _a1 error) *MockRWChannelStore_Delete_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockRWChannelStore_Delete_Call) RunAndReturn(run func(int64) ([]*channel, error)) *MockRWChannelStore_Delete_Call { + _c.Call.Return(run) + return _c +} + +// GetBufferChannelInfo provides a mock function with given fields: +func (_m *MockRWChannelStore) GetBufferChannelInfo() *NodeChannelInfo { + ret := _m.Called() + + var r0 *NodeChannelInfo + if rf, ok := ret.Get(0).(func() *NodeChannelInfo); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*NodeChannelInfo) + } + } + + return r0 +} + +// MockRWChannelStore_GetBufferChannelInfo_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetBufferChannelInfo' +type MockRWChannelStore_GetBufferChannelInfo_Call struct { + *mock.Call +} + +// GetBufferChannelInfo is a helper method to define mock.On call +func (_e *MockRWChannelStore_Expecter) GetBufferChannelInfo() *MockRWChannelStore_GetBufferChannelInfo_Call { + return &MockRWChannelStore_GetBufferChannelInfo_Call{Call: _e.mock.On("GetBufferChannelInfo")} +} + +func (_c *MockRWChannelStore_GetBufferChannelInfo_Call) Run(run func()) *MockRWChannelStore_GetBufferChannelInfo_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockRWChannelStore_GetBufferChannelInfo_Call) Return(_a0 *NodeChannelInfo) *MockRWChannelStore_GetBufferChannelInfo_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockRWChannelStore_GetBufferChannelInfo_Call) RunAndReturn(run func() *NodeChannelInfo) *MockRWChannelStore_GetBufferChannelInfo_Call { + _c.Call.Return(run) + return _c +} + +// GetChannels provides a mock function with given fields: +func (_m *MockRWChannelStore) GetChannels() []*NodeChannelInfo { + ret := _m.Called() + + var r0 []*NodeChannelInfo + if rf, ok := ret.Get(0).(func() []*NodeChannelInfo); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*NodeChannelInfo) + } + } + + return r0 +} + +// MockRWChannelStore_GetChannels_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetChannels' +type MockRWChannelStore_GetChannels_Call struct { + *mock.Call +} + +// GetChannels is a helper method to define mock.On call +func (_e *MockRWChannelStore_Expecter) GetChannels() *MockRWChannelStore_GetChannels_Call { + return &MockRWChannelStore_GetChannels_Call{Call: _e.mock.On("GetChannels")} +} + +func (_c *MockRWChannelStore_GetChannels_Call) Run(run func()) *MockRWChannelStore_GetChannels_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockRWChannelStore_GetChannels_Call) Return(_a0 []*NodeChannelInfo) *MockRWChannelStore_GetChannels_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockRWChannelStore_GetChannels_Call) RunAndReturn(run func() []*NodeChannelInfo) *MockRWChannelStore_GetChannels_Call { + _c.Call.Return(run) + return _c +} + +// GetNode provides a mock function with given fields: nodeID +func (_m *MockRWChannelStore) GetNode(nodeID int64) *NodeChannelInfo { + ret := _m.Called(nodeID) + + var r0 *NodeChannelInfo + if rf, ok := ret.Get(0).(func(int64) *NodeChannelInfo); ok { + r0 = rf(nodeID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*NodeChannelInfo) + } + } + + return r0 +} + +// MockRWChannelStore_GetNode_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetNode' +type MockRWChannelStore_GetNode_Call struct { + *mock.Call +} + +// GetNode is a helper method to define mock.On call +// - nodeID int64 +func (_e *MockRWChannelStore_Expecter) GetNode(nodeID interface{}) *MockRWChannelStore_GetNode_Call { + return &MockRWChannelStore_GetNode_Call{Call: _e.mock.On("GetNode", nodeID)} +} + +func (_c *MockRWChannelStore_GetNode_Call) Run(run func(nodeID int64)) *MockRWChannelStore_GetNode_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64)) + }) + return _c +} + +func (_c *MockRWChannelStore_GetNode_Call) Return(_a0 *NodeChannelInfo) *MockRWChannelStore_GetNode_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockRWChannelStore_GetNode_Call) RunAndReturn(run func(int64) *NodeChannelInfo) *MockRWChannelStore_GetNode_Call { + _c.Call.Return(run) + return _c +} + +// GetNodeChannelCount provides a mock function with given fields: nodeID +func (_m *MockRWChannelStore) GetNodeChannelCount(nodeID int64) int { + ret := _m.Called(nodeID) + + var r0 int + if rf, ok := ret.Get(0).(func(int64) int); ok { + r0 = rf(nodeID) + } else { + r0 = ret.Get(0).(int) + } + + return r0 +} + +// MockRWChannelStore_GetNodeChannelCount_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetNodeChannelCount' +type MockRWChannelStore_GetNodeChannelCount_Call struct { + *mock.Call +} + +// GetNodeChannelCount is a helper method to define mock.On call +// - nodeID int64 +func (_e *MockRWChannelStore_Expecter) GetNodeChannelCount(nodeID interface{}) *MockRWChannelStore_GetNodeChannelCount_Call { + return &MockRWChannelStore_GetNodeChannelCount_Call{Call: _e.mock.On("GetNodeChannelCount", nodeID)} +} + +func (_c *MockRWChannelStore_GetNodeChannelCount_Call) Run(run func(nodeID int64)) *MockRWChannelStore_GetNodeChannelCount_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64)) + }) + return _c +} + +func (_c *MockRWChannelStore_GetNodeChannelCount_Call) Return(_a0 int) *MockRWChannelStore_GetNodeChannelCount_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockRWChannelStore_GetNodeChannelCount_Call) RunAndReturn(run func(int64) int) *MockRWChannelStore_GetNodeChannelCount_Call { + _c.Call.Return(run) + return _c +} + +// GetNodes provides a mock function with given fields: +func (_m *MockRWChannelStore) GetNodes() []int64 { + ret := _m.Called() + + var r0 []int64 + if rf, ok := ret.Get(0).(func() []int64); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]int64) + } + } + + return r0 +} + +// MockRWChannelStore_GetNodes_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetNodes' +type MockRWChannelStore_GetNodes_Call struct { + *mock.Call +} + +// GetNodes is a helper method to define mock.On call +func (_e *MockRWChannelStore_Expecter) GetNodes() *MockRWChannelStore_GetNodes_Call { + return &MockRWChannelStore_GetNodes_Call{Call: _e.mock.On("GetNodes")} +} + +func (_c *MockRWChannelStore_GetNodes_Call) Run(run func()) *MockRWChannelStore_GetNodes_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockRWChannelStore_GetNodes_Call) Return(_a0 []int64) *MockRWChannelStore_GetNodes_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockRWChannelStore_GetNodes_Call) RunAndReturn(run func() []int64) *MockRWChannelStore_GetNodes_Call { + _c.Call.Return(run) + return _c +} + +// GetNodesChannels provides a mock function with given fields: +func (_m *MockRWChannelStore) GetNodesChannels() []*NodeChannelInfo { + ret := _m.Called() + + var r0 []*NodeChannelInfo + if rf, ok := ret.Get(0).(func() []*NodeChannelInfo); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*NodeChannelInfo) + } + } + + return r0 +} + +// MockRWChannelStore_GetNodesChannels_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetNodesChannels' +type MockRWChannelStore_GetNodesChannels_Call struct { + *mock.Call +} + +// GetNodesChannels is a helper method to define mock.On call +func (_e *MockRWChannelStore_Expecter) GetNodesChannels() *MockRWChannelStore_GetNodesChannels_Call { + return &MockRWChannelStore_GetNodesChannels_Call{Call: _e.mock.On("GetNodesChannels")} +} + +func (_c *MockRWChannelStore_GetNodesChannels_Call) Run(run func()) *MockRWChannelStore_GetNodesChannels_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockRWChannelStore_GetNodesChannels_Call) Return(_a0 []*NodeChannelInfo) *MockRWChannelStore_GetNodesChannels_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockRWChannelStore_GetNodesChannels_Call) RunAndReturn(run func() []*NodeChannelInfo) *MockRWChannelStore_GetNodesChannels_Call { + _c.Call.Return(run) + return _c +} + +// Reload provides a mock function with given fields: +func (_m *MockRWChannelStore) Reload() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockRWChannelStore_Reload_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Reload' +type MockRWChannelStore_Reload_Call struct { + *mock.Call +} + +// Reload is a helper method to define mock.On call +func (_e *MockRWChannelStore_Expecter) Reload() *MockRWChannelStore_Reload_Call { + return &MockRWChannelStore_Reload_Call{Call: _e.mock.On("Reload")} +} + +func (_c *MockRWChannelStore_Reload_Call) Run(run func()) *MockRWChannelStore_Reload_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockRWChannelStore_Reload_Call) Return(_a0 error) *MockRWChannelStore_Reload_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockRWChannelStore_Reload_Call) RunAndReturn(run func() error) *MockRWChannelStore_Reload_Call { + _c.Call.Return(run) + return _c +} + +// Update provides a mock function with given fields: op +func (_m *MockRWChannelStore) Update(op ChannelOpSet) error { + ret := _m.Called(op) + + var r0 error + if rf, ok := ret.Get(0).(func(ChannelOpSet) error); ok { + r0 = rf(op) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockRWChannelStore_Update_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Update' +type MockRWChannelStore_Update_Call struct { + *mock.Call +} + +// Update is a helper method to define mock.On call +// - op ChannelOpSet +func (_e *MockRWChannelStore_Expecter) Update(op interface{}) *MockRWChannelStore_Update_Call { + return &MockRWChannelStore_Update_Call{Call: _e.mock.On("Update", op)} +} + +func (_c *MockRWChannelStore_Update_Call) Run(run func(op ChannelOpSet)) *MockRWChannelStore_Update_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(ChannelOpSet)) + }) + return _c +} + +func (_c *MockRWChannelStore_Update_Call) Return(_a0 error) *MockRWChannelStore_Update_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockRWChannelStore_Update_Call) RunAndReturn(run func(ChannelOpSet) error) *MockRWChannelStore_Update_Call { + _c.Call.Return(run) + return _c +} + +// NewMockRWChannelStore creates a new instance of MockRWChannelStore. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockRWChannelStore(t interface { + mock.TestingT + Cleanup(func()) +}) *MockRWChannelStore { + mock := &MockRWChannelStore{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index f990c4f47e..391209ea55 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -28,6 +28,7 @@ import ( semver "github.com/blang/semver/v4" "github.com/cockroachdb/errors" + "github.com/samber/lo" "github.com/tikv/client-go/v2/txnkv" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" @@ -130,7 +131,7 @@ type Server struct { notifyIndexChan chan UniqueID factory dependency.Factory - session *sessionutil.Session + session sessionutil.SessionInterface icSession *sessionutil.Session dnEventCh <-chan *sessionutil.SessionEvent inEventCh <-chan *sessionutil.SessionEvent @@ -265,13 +266,13 @@ func (s *Server) Register() error { log.Info("DataCoord Register Finished") s.session.LivenessCheck(s.serverLoopCtx, func() { - logutil.Logger(s.ctx).Error("disconnected from etcd and exited", zap.Int64("serverID", s.session.ServerID)) + logutil.Logger(s.ctx).Error("disconnected from etcd and exited", zap.Int64("serverID", s.session.GetServerID())) if err := s.Stop(); err != nil { logutil.Logger(s.ctx).Fatal("failed to stop server", zap.Error(err)) } metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.DataCoordRole).Dec() // manually send signal to starter goroutine - if s.session.TriggerKill { + if s.session.IsTriggerKill() { if p, err := os.FindProcess(os.Getpid()); err == nil { p.Signal(syscall.SIGINT) } @@ -394,8 +395,13 @@ func (s *Server) startDataCoord() { s.compactionTrigger.start() } s.startServerLoop() + s.afterStart() s.stateCode.Store(commonpb.StateCode_Healthy) - sessionutil.SaveServerInfo(typeutil.DataCoordRole, s.session.ServerID) + sessionutil.SaveServerInfo(typeutil.DataCoordRole, s.session.GetServerID()) +} + +func (s *Server) afterStart() { + go s.updateBalanceConfigLoop(s.ctx) } func (s *Server) initCluster() error { @@ -782,7 +788,7 @@ func (s *Server) stopServiceWatch() { // ErrCompacted is handled inside SessionWatcher, which means there is some other error occurred, closing server. logutil.Logger(s.ctx).Error("watch service channel closed", zap.Int64("serverID", paramtable.GetNodeID())) go s.Stop() - if s.session.TriggerKill { + if s.session.IsTriggerKill() { if p, err := os.FindProcess(os.Getpid()); err == nil { p.Signal(syscall.SIGINT) } @@ -1106,3 +1112,32 @@ func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID i s.meta.AddCollection(collInfo) return nil } + +func (s *Server) updateBalanceConfigLoop(ctx context.Context) { + log := log.Ctx(s.ctx).WithRateGroup("dc.updateBalanceConfigLoop", 1, 60) + ticker := time.NewTicker(Params.DataCoordCfg.CheckAutoBalanceConfigInterval.GetAsDuration(time.Second)) + for { + select { + case <-ctx.Done(): + log.Info("update balance config loop exit!") + return + + case <-ticker.C: + r := semver.MustParseRange("<2.3.0") + sessions, _, err := s.session.GetSessionsWithVersionRange(typeutil.DataNodeRole, r) + if err != nil { + log.Warn("check data node version occur error on etcd", zap.Error(err)) + continue + } + + if len(sessions) == 0 { + // only balance channel when all data node's version > 2.3.0 + Params.Save(Params.DataCoordCfg.AutoBalance.Key, "true") + log.Info("all old data node down, enable auto balance!") + return + } + + log.RatedDebug(10, "old data node exist", zap.Strings("sessions", lo.Keys(sessions))) + } + } +} diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 0f8ae9b882..68bdd93e4d 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -4737,3 +4737,43 @@ func TestDataNodeTtChannel(t *testing.T) { assert.EqualValues(t, 0, len(segment.allocations)) }) } + +func TestUpdateAutoBalanceConfigLoop(t *testing.T) { + Params.Save(Params.DataCoordCfg.CheckAutoBalanceConfigInterval.Key, "1") + defer Params.Reset(Params.DataCoordCfg.CheckAutoBalanceConfigInterval.Key) + Params.Save(Params.DataCoordCfg.AutoBalance.Key, "false") + defer Params.Reset(Params.DataCoordCfg.AutoBalance.Key) + + t.Run("test old node exist", func(t *testing.T) { + oldSessions := make(map[string]*sessionutil.Session) + oldSessions["s1"] = &sessionutil.Session{} + + server := &Server{} + mockSession := sessionutil.NewMockSession(t) + mockSession.EXPECT().GetSessionsWithVersionRange(mock.Anything, mock.Anything).Return(oldSessions, 0, nil).Maybe() + server.session = mockSession + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go server.updateBalanceConfigLoop(ctx) + // old data node exist, disable auto balance + assert.Eventually(t, func() bool { + return !Params.DataCoordCfg.AutoBalance.GetAsBool() + }, 3*time.Second, 1*time.Second) + }) + + t.Run("test all old node down", func(t *testing.T) { + server := &Server{} + mockSession := sessionutil.NewMockSession(t) + mockSession.EXPECT().GetSessionsWithVersionRange(mock.Anything, mock.Anything).Return(nil, 0, nil).Maybe() + server.session = mockSession + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go server.updateBalanceConfigLoop(ctx) + // all old data node down, enable auto balance + assert.Eventually(t, func() bool { + return Params.DataCoordCfg.AutoBalance.GetAsBool() + }, 3*time.Second, 1*time.Second) + }) +} diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index e768348bc3..40d2c71c31 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -604,7 +604,7 @@ func (s *Server) GetComponentStates(ctx context.Context, req *milvuspb.GetCompon code := s.GetStateCode() nodeID := common.NotRegisteredID if s.session != nil && s.session.Registered() { - nodeID = s.session.ServerID // or Params.NodeID + nodeID = s.session.GetServerID() // or Params.NodeID } resp := &milvuspb.ComponentStates{ State: &milvuspb.ComponentInfo{ diff --git a/internal/querycoordv2/handlers.go b/internal/querycoordv2/handlers.go index 1f431c6274..0d70128605 100644 --- a/internal/querycoordv2/handlers.go +++ b/internal/querycoordv2/handlers.go @@ -179,7 +179,7 @@ func (s *Server) getSystemInfoMetrics( BaseComponentInfos: metricsinfo.BaseComponentInfos{ Name: metricsinfo.ConstructComponentName(typeutil.QueryCoordRole, paramtable.GetNodeID()), HardwareInfos: metricsinfo.HardwareMetrics{ - IP: s.session.Address, + IP: s.session.GetAddress(), CPUCoreCount: hardware.GetCPUNum(), CPUCoreUsage: hardware.GetCPUUsage(), Memory: hardware.GetMemoryCount(), diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index 18a0b0bcb8..8fcbaf80bb 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -24,7 +24,9 @@ import ( "syscall" "time" + "github.com/blang/semver/v4" "github.com/cockroachdb/errors" + "github.com/samber/lo" "github.com/tikv/client-go/v2/txnkv" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/atomic" @@ -74,7 +76,7 @@ type Server struct { etcdCli *clientv3.Client tikvCli *txnkv.Client address string - session *sessionutil.Session + session sessionutil.SessionInterface kv kv.MetaKv idAllocator func() (int64, error) metricsCacheManager *metricsinfo.MetricsCacheManager @@ -146,13 +148,13 @@ func (s *Server) Register() error { } metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.QueryCoordRole).Inc() s.session.LivenessCheck(s.ctx, func() { - log.Error("QueryCoord disconnected from etcd, process will exit", zap.Int64("serverID", s.session.ServerID)) + log.Error("QueryCoord disconnected from etcd, process will exit", zap.Int64("serverID", s.session.GetServerID())) if err := s.Stop(); err != nil { log.Fatal("failed to stop server", zap.Error(err)) } metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.QueryCoordRole).Dec() // manually send signal to starter goroutine - if s.session.TriggerKill { + if s.session.IsTriggerKill() { if p, err := os.FindProcess(os.Getpid()); err == nil { p.Signal(syscall.SIGINT) } @@ -388,6 +390,7 @@ func (s *Server) initObserver() { } func (s *Server) afterStart() { + go s.updateBalanceConfigLoop(s.ctx) } func (s *Server) Start() error { @@ -429,7 +432,7 @@ func (s *Server) startQueryCoord() error { s.startServerLoop() s.afterStart() s.UpdateStateCode(commonpb.StateCode_Healthy) - sessionutil.SaveServerInfo(typeutil.QueryCoordRole, s.session.ServerID) + sessionutil.SaveServerInfo(typeutil.QueryCoordRole, s.session.GetServerID()) return nil } @@ -529,7 +532,7 @@ func (s *Server) State() commonpb.StateCode { func (s *Server) GetComponentStates(ctx context.Context, req *milvuspb.GetComponentStatesRequest) (*milvuspb.ComponentStates, error) { nodeID := common.NotRegisteredID if s.session != nil && s.session.Registered() { - nodeID = s.session.ServerID + nodeID = s.session.GetServerID() } serviceComponentInfo := &milvuspb.ComponentInfo{ // NodeID: Params.QueryCoordID, // will race with QueryCoord.Register() @@ -609,7 +612,7 @@ func (s *Server) watchNodes(revision int64) { // ErrCompacted is handled inside SessionWatcher log.Warn("Session Watcher channel closed", zap.Int64("serverID", paramtable.GetNodeID())) go s.Stop() - if s.session.TriggerKill { + if s.session.IsTriggerKill() { if p, err := os.FindProcess(os.Getpid()); err == nil { p.Signal(syscall.SIGINT) } @@ -791,3 +794,32 @@ func (s *Server) checkReplicas() { } } } + +func (s *Server) updateBalanceConfigLoop(ctx context.Context) { + log := log.Ctx(s.ctx).WithRateGroup("qcv2.updateBalanceConfigLoop", 1, 60) + ticker := time.NewTicker(Params.QueryCoordCfg.CheckAutoBalanceConfigInterval.GetAsDuration(time.Second)) + for { + select { + case <-ctx.Done(): + log.Info("update balance config loop exit!") + return + + case <-ticker.C: + r := semver.MustParseRange("<2.3.0") + sessions, _, err := s.session.GetSessionsWithVersionRange(typeutil.QueryNodeRole, r) + if err != nil { + log.Warn("check query node version occur error on etcd", zap.Error(err)) + continue + } + + if len(sessions) == 0 { + // only balance channel when all query node's version >= 2.3.0 + Params.Save(Params.QueryCoordCfg.AutoBalance.Key, "true") + log.Info("all old query node down, enable auto balance!") + return + } + + log.RatedDebug(10, "old query node exist", zap.Strings("sessions", lo.Keys(sessions))) + } + } +} diff --git a/internal/querycoordv2/server_test.go b/internal/querycoordv2/server_test.go index 5a0ddecda7..209a34b848 100644 --- a/internal/querycoordv2/server_test.go +++ b/internal/querycoordv2/server_test.go @@ -43,6 +43,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" + "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/commonpbutil" "github.com/milvus-io/milvus/pkg/util/etcd" @@ -356,6 +357,48 @@ func (suite *ServerSuite) TestStop() { suite.server.Stop() } +func (suite *ServerSuite) TestUpdateAutoBalanceConfigLoop() { + suite.server.Stop() + + Params.Save(Params.QueryCoordCfg.CheckAutoBalanceConfigInterval.Key, "1") + defer Params.Reset(Params.QueryCoordCfg.CheckAutoBalanceConfigInterval.Key) + Params.Save(Params.QueryCoordCfg.AutoBalance.Key, "false") + defer Params.Reset(Params.QueryCoordCfg.AutoBalance.Key) + + suite.Run("test old node exist", func() { + server := &Server{} + mockSession := sessionutil.NewMockSession(suite.T()) + server.session = mockSession + + oldSessions := make(map[string]*sessionutil.Session) + oldSessions["s1"] = &sessionutil.Session{} + mockSession.EXPECT().GetSessionsWithVersionRange(mock.Anything, mock.Anything).Return(oldSessions, 0, nil).Maybe() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go server.updateBalanceConfigLoop(ctx) + // old query node exist, disable auto balance + suite.Eventually(func() bool { + return !Params.QueryCoordCfg.AutoBalance.GetAsBool() + }, 5*time.Second, 1*time.Second) + }) + + suite.Run("all old node down", func() { + server := &Server{} + mockSession := sessionutil.NewMockSession(suite.T()) + server.session = mockSession + mockSession.EXPECT().GetSessionsWithVersionRange(mock.Anything, mock.Anything).Return(nil, 0, nil).Maybe() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go server.updateBalanceConfigLoop(ctx) + // all old query node down, enable auto balance + suite.Eventually(func() bool { + return Params.QueryCoordCfg.AutoBalance.GetAsBool() + }, 5*time.Second, 1*time.Second) + }) +} + func (suite *ServerSuite) waitNodeUp(node *mocks.MockQueryNode, timeout time.Duration) bool { start := time.Now() for time.Since(start) < timeout { diff --git a/internal/util/sessionutil/mock_session.go b/internal/util/sessionutil/mock_session.go index 0e771d3ab0..a4d47020ec 100644 --- a/internal/util/sessionutil/mock_session.go +++ b/internal/util/sessionutil/mock_session.go @@ -107,6 +107,88 @@ func (_c *MockSession_ForceActiveStandby_Call) RunAndReturn(run func(func() erro return _c } +// GetAddress provides a mock function with given fields: +func (_m *MockSession) GetAddress() string { + ret := _m.Called() + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// MockSession_GetAddress_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetAddress' +type MockSession_GetAddress_Call struct { + *mock.Call +} + +// GetAddress is a helper method to define mock.On call +func (_e *MockSession_Expecter) GetAddress() *MockSession_GetAddress_Call { + return &MockSession_GetAddress_Call{Call: _e.mock.On("GetAddress")} +} + +func (_c *MockSession_GetAddress_Call) Run(run func()) *MockSession_GetAddress_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockSession_GetAddress_Call) Return(_a0 string) *MockSession_GetAddress_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockSession_GetAddress_Call) RunAndReturn(run func() string) *MockSession_GetAddress_Call { + _c.Call.Return(run) + return _c +} + +// GetServerID provides a mock function with given fields: +func (_m *MockSession) GetServerID() int64 { + ret := _m.Called() + + var r0 int64 + if rf, ok := ret.Get(0).(func() int64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int64) + } + + return r0 +} + +// MockSession_GetServerID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetServerID' +type MockSession_GetServerID_Call struct { + *mock.Call +} + +// GetServerID is a helper method to define mock.On call +func (_e *MockSession_Expecter) GetServerID() *MockSession_GetServerID_Call { + return &MockSession_GetServerID_Call{Call: _e.mock.On("GetServerID")} +} + +func (_c *MockSession_GetServerID_Call) Run(run func()) *MockSession_GetServerID_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockSession_GetServerID_Call) Return(_a0 int64) *MockSession_GetServerID_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockSession_GetServerID_Call) RunAndReturn(run func() int64) *MockSession_GetServerID_Call { + _c.Call.Return(run) + return _c +} + // GetSessions provides a mock function with given fields: prefix func (_m *MockSession) GetSessions(prefix string) (map[string]*Session, int64, error) { ret := _m.Called(prefix) @@ -307,6 +389,47 @@ func (_c *MockSession_Init_Call) RunAndReturn(run func(string, string, bool, boo return _c } +// IsTriggerKill provides a mock function with given fields: +func (_m *MockSession) IsTriggerKill() bool { + ret := _m.Called() + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// MockSession_IsTriggerKill_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsTriggerKill' +type MockSession_IsTriggerKill_Call struct { + *mock.Call +} + +// IsTriggerKill is a helper method to define mock.On call +func (_e *MockSession_Expecter) IsTriggerKill() *MockSession_IsTriggerKill_Call { + return &MockSession_IsTriggerKill_Call{Call: _e.mock.On("IsTriggerKill")} +} + +func (_c *MockSession_IsTriggerKill_Call) Run(run func()) *MockSession_IsTriggerKill_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockSession_IsTriggerKill_Call) Return(_a0 bool) *MockSession_IsTriggerKill_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockSession_IsTriggerKill_Call) RunAndReturn(run func() bool) *MockSession_IsTriggerKill_Call { + _c.Call.Return(run) + return _c +} + // LivenessCheck provides a mock function with given fields: ctx, callback func (_m *MockSession) LivenessCheck(ctx context.Context, callback func()) { _m.Called(ctx, callback) diff --git a/internal/util/sessionutil/session.go b/internal/util/sessionutil/session.go index 4d676b673f..eae304d583 100644 --- a/internal/util/sessionutil/session.go +++ b/internal/util/sessionutil/session.go @@ -46,4 +46,8 @@ type SessionInterface interface { SetEnableActiveStandBy(enable bool) ProcessActiveStandBy(activateFunc func() error) error ForceActiveStandby(activateFunc func() error) error + + GetAddress() string + GetServerID() int64 + IsTriggerKill() bool } diff --git a/internal/util/sessionutil/session_util.go b/internal/util/sessionutil/session_util.go index 693faef499..a1e5daca89 100644 --- a/internal/util/sessionutil/session_util.go +++ b/internal/util/sessionutil/session_util.go @@ -102,6 +102,18 @@ type SessionRaw struct { EnableDisk bool `json:"EnableDisk,omitempty"` } +func (s *SessionRaw) GetAddress() string { + return s.Address +} + +func (s *SessionRaw) GetServerID() int64 { + return s.ServerID +} + +func (s *SessionRaw) IsTriggerKill() bool { + return s.TriggerKill +} + // Session is a struct to store service's session, including ServerID, ServerName, // Address. // Exclusive indicates that this server can only start one. diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 54a74445f0..64d842824c 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -1195,16 +1195,17 @@ type queryCoordConfig struct { // Deprecated: Since 2.2.2, use different interval for different checker CheckInterval ParamItem `refreshable:"true"` - NextTargetSurviveTime ParamItem `refreshable:"true"` - UpdateNextTargetInterval ParamItem `refreshable:"false"` - CheckNodeInReplicaInterval ParamItem `refreshable:"false"` - CheckResourceGroupInterval ParamItem `refreshable:"false"` - EnableRGAutoRecover ParamItem `refreshable:"true"` - CheckHealthInterval ParamItem `refreshable:"false"` - CheckHealthRPCTimeout ParamItem `refreshable:"true"` - BrokerTimeout ParamItem `refreshable:"false"` - CollectionRecoverTimesLimit ParamItem `refreshable:"true"` - ObserverTaskParallel ParamItem `refreshable:"false"` + NextTargetSurviveTime ParamItem `refreshable:"true"` + UpdateNextTargetInterval ParamItem `refreshable:"false"` + CheckNodeInReplicaInterval ParamItem `refreshable:"false"` + CheckResourceGroupInterval ParamItem `refreshable:"false"` + EnableRGAutoRecover ParamItem `refreshable:"true"` + CheckHealthInterval ParamItem `refreshable:"false"` + CheckHealthRPCTimeout ParamItem `refreshable:"true"` + BrokerTimeout ParamItem `refreshable:"false"` + CollectionRecoverTimesLimit ParamItem `refreshable:"true"` + ObserverTaskParallel ParamItem `refreshable:"false"` + CheckAutoBalanceConfigInterval ParamItem `refreshable:"false"` } func (p *queryCoordConfig) init(base *BaseTable) { @@ -1252,7 +1253,7 @@ func (p *queryCoordConfig) init(base *BaseTable) { p.AutoBalance = ParamItem{ Key: "queryCoord.autoBalance", Version: "2.0.0", - DefaultValue: "true", + DefaultValue: "false", PanicIfEmpty: true, Doc: "Enable auto balance", Export: true, @@ -1526,6 +1527,16 @@ func (p *queryCoordConfig) init(base *BaseTable) { Export: true, } p.ObserverTaskParallel.Init(base.mgr) + + p.CheckAutoBalanceConfigInterval = ParamItem{ + Key: "queryCoord.checkAutoBalanceConfigInterval", + Version: "2.3.3", + DefaultValue: "10", + PanicIfEmpty: true, + Doc: "the interval of check auto balance config", + Export: true, + } + p.CheckAutoBalanceConfigInterval.Init(base.mgr) } // ///////////////////////////////////////////////////////////////////////////// @@ -2030,6 +2041,9 @@ type dataCoordConfig struct { IndexTaskSchedulerInterval ParamItem `refreshable:"false"` MinSegmentNumRowsToEnableIndex ParamItem `refreshable:"true"` + // auto balance channel on datanode + AutoBalance ParamItem `refreshable:"true"` + CheckAutoBalanceConfigInterval ParamItem `refreshable:"false"` } func (p *dataCoordConfig) init(base *BaseTable) { @@ -2385,6 +2399,26 @@ During compaction, the size of segment # of rows is able to exceed segment max # DefaultValue: "1000", } p.IndexTaskSchedulerInterval.Init(base.mgr) + + p.AutoBalance = ParamItem{ + Key: "dataCoord.autoBalance", + Version: "2.3.3", + DefaultValue: "false", + PanicIfEmpty: true, + Doc: "Enable auto balance", + Export: true, + } + p.AutoBalance.Init(base.mgr) + + p.CheckAutoBalanceConfigInterval = ParamItem{ + Key: "dataCoord.checkAutoBalanceConfigInterval", + Version: "2.3.3", + DefaultValue: "10", + PanicIfEmpty: true, + Doc: "the interval of check auto balance config", + Export: true, + } + p.CheckAutoBalanceConfigInterval.Init(base.mgr) } // ///////////////////////////////////////////////////////////////////////////// diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index ce3293c399..343e38dc98 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -279,6 +279,8 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, 10000, Params.BalanceCheckInterval.GetAsInt()) assert.Equal(t, 10000, Params.IndexCheckInterval.GetAsInt()) assert.Equal(t, 3, Params.CollectionRecoverTimesLimit.GetAsInt()) + assert.Equal(t, false, Params.AutoBalance.GetAsBool()) + assert.Equal(t, 10, Params.CheckAutoBalanceConfigInterval.GetAsInt()) }) t.Run("test queryNodeConfig", func(t *testing.T) { @@ -351,6 +353,9 @@ func TestComponentParam(t *testing.T) { assert.True(t, Params.EnableGarbageCollection.GetAsBool()) assert.Equal(t, Params.EnableActiveStandby.GetAsBool(), false) t.Logf("dataCoord EnableActiveStandby = %t", Params.EnableActiveStandby.GetAsBool()) + + assert.Equal(t, false, Params.AutoBalance.GetAsBool()) + assert.Equal(t, 10, Params.CheckAutoBalanceConfigInterval.GetAsInt()) }) t.Run("test dataNodeConfig", func(t *testing.T) {