diff --git a/internal/querycoordv2/balance/channel_level_score_balancer_test.go b/internal/querycoordv2/balance/channel_level_score_balancer_test.go index 54d1c983b1..0e5c94ce63 100644 --- a/internal/querycoordv2/balance/channel_level_score_balancer_test.go +++ b/internal/querycoordv2/balance/channel_level_score_balancer_test.go @@ -68,7 +68,7 @@ func (suite *ChannelLevelScoreBalancerTestSuite) SetupTest() { idAllocator := RandomIncrementIDAllocator() nodeManager := session.NewNodeManager() testMeta := meta.NewMeta(idAllocator, store, nodeManager) - testTarget := meta.NewTargetManager(suite.broker, testMeta, querycoord.NewCatalog(suite.kv)) + testTarget := meta.NewTargetManager(suite.broker, testMeta) distManager := meta.NewDistributionManager() suite.mockScheduler = task.NewMockScheduler(suite.T()) diff --git a/internal/querycoordv2/balance/rowcount_based_balancer_test.go b/internal/querycoordv2/balance/rowcount_based_balancer_test.go index 9499055ab8..53096f8a29 100644 --- a/internal/querycoordv2/balance/rowcount_based_balancer_test.go +++ b/internal/querycoordv2/balance/rowcount_based_balancer_test.go @@ -71,7 +71,7 @@ func (suite *RowCountBasedBalancerTestSuite) SetupTest() { idAllocator := RandomIncrementIDAllocator() nodeManager := session.NewNodeManager() testMeta := meta.NewMeta(idAllocator, store, nodeManager) - testTarget := meta.NewTargetManager(suite.broker, testMeta, querycoord.NewCatalog(suite.kv)) + testTarget := meta.NewTargetManager(suite.broker, testMeta) distManager := meta.NewDistributionManager() suite.mockScheduler = task.NewMockScheduler(suite.T()) diff --git a/internal/querycoordv2/balance/score_based_balancer_test.go b/internal/querycoordv2/balance/score_based_balancer_test.go index 4120844d2c..75975507f8 100644 --- a/internal/querycoordv2/balance/score_based_balancer_test.go +++ b/internal/querycoordv2/balance/score_based_balancer_test.go @@ -68,7 +68,7 @@ func (suite *ScoreBasedBalancerTestSuite) SetupTest() { idAllocator := RandomIncrementIDAllocator() nodeManager := session.NewNodeManager() testMeta := meta.NewMeta(idAllocator, store, nodeManager) - testTarget := meta.NewTargetManager(suite.broker, testMeta, querycoord.NewCatalog(suite.kv)) + testTarget := meta.NewTargetManager(suite.broker, testMeta) distManager := meta.NewDistributionManager() suite.mockScheduler = task.NewMockScheduler(suite.T()) diff --git a/internal/querycoordv2/checkers/balance_checker_test.go b/internal/querycoordv2/checkers/balance_checker_test.go index 90236a3b3d..744d9a2fc7 100644 --- a/internal/querycoordv2/checkers/balance_checker_test.go +++ b/internal/querycoordv2/checkers/balance_checker_test.go @@ -75,7 +75,7 @@ func (suite *BalanceCheckerTestSuite) SetupTest() { suite.meta = meta.NewMeta(idAllocator, store, suite.nodeMgr) suite.broker = meta.NewMockBroker(suite.T()) suite.scheduler = task.NewMockScheduler(suite.T()) - suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta, querycoord.NewCatalog(suite.kv)) + suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta) suite.balancer = balance.NewMockBalancer(suite.T()) suite.checker = NewBalanceChecker(suite.meta, suite.targetMgr, suite.nodeMgr, suite.scheduler, func() balance.Balance { return suite.balancer }) diff --git a/internal/querycoordv2/checkers/channel_checker_test.go b/internal/querycoordv2/checkers/channel_checker_test.go index 2d0a90bbb1..f1405c5fbb 100644 --- a/internal/querycoordv2/checkers/channel_checker_test.go +++ b/internal/querycoordv2/checkers/channel_checker_test.go @@ -72,7 +72,7 @@ func (suite *ChannelCheckerTestSuite) SetupTest() { suite.nodeMgr = session.NewNodeManager() suite.meta = meta.NewMeta(idAllocator, store, suite.nodeMgr) suite.broker = meta.NewMockBroker(suite.T()) - targetManager := meta.NewTargetManager(suite.broker, suite.meta, querycoord.NewCatalog(suite.kv)) + targetManager := meta.NewTargetManager(suite.broker, suite.meta) distManager := meta.NewDistributionManager() diff --git a/internal/querycoordv2/checkers/controller_base_test.go b/internal/querycoordv2/checkers/controller_base_test.go index 0102529cb4..0d8e301492 100644 --- a/internal/querycoordv2/checkers/controller_base_test.go +++ b/internal/querycoordv2/checkers/controller_base_test.go @@ -73,7 +73,7 @@ func (suite *ControllerBaseTestSuite) SetupTest() { suite.meta = meta.NewMeta(idAllocator, store, suite.nodeMgr) suite.dist = meta.NewDistributionManager() suite.broker = meta.NewMockBroker(suite.T()) - suite.targetManager = meta.NewTargetManager(suite.broker, suite.meta, querycoord.NewCatalog(suite.kv)) + suite.targetManager = meta.NewTargetManager(suite.broker, suite.meta) suite.balancer = balance.NewMockBalancer(suite.T()) suite.scheduler = task.NewMockScheduler(suite.T()) diff --git a/internal/querycoordv2/checkers/controller_test.go b/internal/querycoordv2/checkers/controller_test.go index c1d39f9278..b73f6b8ffd 100644 --- a/internal/querycoordv2/checkers/controller_test.go +++ b/internal/querycoordv2/checkers/controller_test.go @@ -77,7 +77,7 @@ func (suite *CheckerControllerSuite) SetupTest() { suite.meta = meta.NewMeta(idAllocator, store, suite.nodeMgr) suite.dist = meta.NewDistributionManager() suite.broker = meta.NewMockBroker(suite.T()) - suite.targetManager = meta.NewTargetManager(suite.broker, suite.meta, querycoord.NewCatalog(suite.kv)) + suite.targetManager = meta.NewTargetManager(suite.broker, suite.meta) suite.balancer = balance.NewMockBalancer(suite.T()) suite.scheduler = task.NewMockScheduler(suite.T()) diff --git a/internal/querycoordv2/checkers/leader_checker_test.go b/internal/querycoordv2/checkers/leader_checker_test.go index fcb241cf87..0d2249b14a 100644 --- a/internal/querycoordv2/checkers/leader_checker_test.go +++ b/internal/querycoordv2/checkers/leader_checker_test.go @@ -74,7 +74,7 @@ func (suite *LeaderCheckerTestSuite) SetupTest() { suite.broker = meta.NewMockBroker(suite.T()) distManager := meta.NewDistributionManager() - targetManager := meta.NewTargetManager(suite.broker, suite.meta, querycoord.NewCatalog(suite.kv)) + targetManager := meta.NewTargetManager(suite.broker, suite.meta) suite.checker = NewLeaderChecker(suite.meta, distManager, targetManager, suite.nodeMgr) } diff --git a/internal/querycoordv2/checkers/segment_checker_test.go b/internal/querycoordv2/checkers/segment_checker_test.go index f321e9bcd8..361373f574 100644 --- a/internal/querycoordv2/checkers/segment_checker_test.go +++ b/internal/querycoordv2/checkers/segment_checker_test.go @@ -74,7 +74,7 @@ func (suite *SegmentCheckerTestSuite) SetupTest() { suite.meta = meta.NewMeta(idAllocator, store, suite.nodeMgr) distManager := meta.NewDistributionManager() suite.broker = meta.NewMockBroker(suite.T()) - targetManager := meta.NewTargetManager(suite.broker, suite.meta, querycoord.NewCatalog(suite.kv)) + targetManager := meta.NewTargetManager(suite.broker, suite.meta) balancer := suite.createMockBalancer() suite.checker = NewSegmentChecker(suite.meta, distManager, targetManager, suite.nodeMgr, func() balance.Balance { return balancer }) diff --git a/internal/querycoordv2/dist/dist_controller_test.go b/internal/querycoordv2/dist/dist_controller_test.go index f7a5118954..702a860897 100644 --- a/internal/querycoordv2/dist/dist_controller_test.go +++ b/internal/querycoordv2/dist/dist_controller_test.go @@ -78,7 +78,7 @@ func (suite *DistControllerTestSuite) SetupTest() { suite.mockCluster = session.NewMockCluster(suite.T()) distManager := meta.NewDistributionManager() suite.broker = meta.NewMockBroker(suite.T()) - targetManager := meta.NewTargetManager(suite.broker, suite.meta, querycoord.NewCatalog(suite.kv)) + targetManager := meta.NewTargetManager(suite.broker, suite.meta) suite.mockScheduler = task.NewMockScheduler(suite.T()) suite.mockScheduler.EXPECT().GetExecutedFlag(mock.Anything).Return(nil).Maybe() syncTargetVersionFn := func(collectionID int64) {} diff --git a/internal/querycoordv2/job/job_test.go b/internal/querycoordv2/job/job_test.go index b1be7f0b5d..3da96babff 100644 --- a/internal/querycoordv2/job/job_test.go +++ b/internal/querycoordv2/job/job_test.go @@ -163,7 +163,7 @@ func (suite *JobSuite) SetupTest() { suite.dist = meta.NewDistributionManager() suite.nodeMgr = session.NewNodeManager() suite.meta = meta.NewMeta(RandomIncrementIDAllocator(), suite.store, suite.nodeMgr) - suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta, querycoord.NewCatalog(suite.kv)) + suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta) suite.targetObserver = observers.NewTargetObserver(suite.meta, suite.targetMgr, suite.dist, diff --git a/internal/querycoordv2/meta/collection_manager_test.go b/internal/querycoordv2/meta/collection_manager_test.go index ab88a2a8cb..6e613242d3 100644 --- a/internal/querycoordv2/meta/collection_manager_test.go +++ b/internal/querycoordv2/meta/collection_manager_test.go @@ -375,7 +375,7 @@ func (suite *CollectionManagerSuite) TestRecoverLoadingCollection() { err = mgr.UpdatePartitionLoadPercent(partitionID, 10) suite.NoError(err) } - _, err = mgr.UpdateCollectionLoadPercent(collectionID) + _, err = mgr.UpdateCollectionLoadPercent(ctx, collectionID) suite.NoError(err) } suite.clearMemory() diff --git a/internal/querycoordv2/meta/mock_broker.go b/internal/querycoordv2/meta/mock_broker.go index 9c024a776a..8871c53c8a 100644 --- a/internal/querycoordv2/meta/mock_broker.go +++ b/internal/querycoordv2/meta/mock_broker.go @@ -206,6 +206,10 @@ func (_c *MockBroker_GetCollectionLoadInfo_Call) RunAndReturn(run func(context.C func (_m *MockBroker) GetDataViewVersions(ctx context.Context, collectionIDs []int64) (map[int64]int64, error) { ret := _m.Called(ctx, collectionIDs) + if len(ret) == 0 { + panic("no return value specified for GetDataViewVersions") + } + var r0 map[int64]int64 var r1 error if rf, ok := ret.Get(0).(func(context.Context, []int64) (map[int64]int64, error)); ok { @@ -268,6 +272,10 @@ func (_m *MockBroker) GetIndexInfo(ctx context.Context, collectionID int64, segm _ca = append(_ca, _va...) ret := _m.Called(_ca...) + if len(ret) == 0 { + panic("no return value specified for GetIndexInfo") + } + var r0 map[int64][]*querypb.FieldIndexInfo var r1 error if rf, ok := ret.Get(0).(func(context.Context, int64, ...int64) (map[int64][]*querypb.FieldIndexInfo, error)); ok { diff --git a/internal/querycoordv2/meta/mock_target_manager.go b/internal/querycoordv2/meta/mock_target_manager.go index 908b8928ff..c3622c3fa5 100644 --- a/internal/querycoordv2/meta/mock_target_manager.go +++ b/internal/querycoordv2/meta/mock_target_manager.go @@ -3,7 +3,9 @@ package meta import ( + metastore "github.com/milvus-io/milvus/internal/metastore" datapb "github.com/milvus-io/milvus/internal/proto/datapb" + mock "github.com/stretchr/testify/mock" typeutil "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -604,13 +606,13 @@ func (_c *MockTargetManager_IsNextTargetExist_Call) RunAndReturn(run func(int64) return _c } -// Recover provides a mock function with given fields: -func (_m *MockTargetManager) Recover() error { - ret := _m.Called() +// Recover provides a mock function with given fields: catalog +func (_m *MockTargetManager) Recover(catalog metastore.QueryCoordCatalog) error { + ret := _m.Called(catalog) var r0 error - if rf, ok := ret.Get(0).(func() error); ok { - r0 = rf() + if rf, ok := ret.Get(0).(func(metastore.QueryCoordCatalog) error); ok { + r0 = rf(catalog) } else { r0 = ret.Error(0) } @@ -624,13 +626,14 @@ type MockTargetManager_Recover_Call struct { } // Recover is a helper method to define mock.On call -func (_e *MockTargetManager_Expecter) Recover() *MockTargetManager_Recover_Call { - return &MockTargetManager_Recover_Call{Call: _e.mock.On("Recover")} +// - catalog metastore.QueryCoordCatalog +func (_e *MockTargetManager_Expecter) Recover(catalog interface{}) *MockTargetManager_Recover_Call { + return &MockTargetManager_Recover_Call{Call: _e.mock.On("Recover", catalog)} } -func (_c *MockTargetManager_Recover_Call) Run(run func()) *MockTargetManager_Recover_Call { +func (_c *MockTargetManager_Recover_Call) Run(run func(catalog metastore.QueryCoordCatalog)) *MockTargetManager_Recover_Call { _c.Call.Run(func(args mock.Arguments) { - run() + run(args[0].(metastore.QueryCoordCatalog)) }) return _c } @@ -640,7 +643,7 @@ func (_c *MockTargetManager_Recover_Call) Return(_a0 error) *MockTargetManager_R return _c } -func (_c *MockTargetManager_Recover_Call) RunAndReturn(run func() error) *MockTargetManager_Recover_Call { +func (_c *MockTargetManager_Recover_Call) RunAndReturn(run func(metastore.QueryCoordCatalog) error) *MockTargetManager_Recover_Call { _c.Call.Return(run) return _c } @@ -726,6 +729,39 @@ func (_c *MockTargetManager_RemovePartition_Call) RunAndReturn(run func(int64, . return _c } +// SaveCurrentTarget provides a mock function with given fields: catalog +func (_m *MockTargetManager) SaveCurrentTarget(catalog metastore.QueryCoordCatalog) { + _m.Called(catalog) +} + +// MockTargetManager_SaveCurrentTarget_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SaveCurrentTarget' +type MockTargetManager_SaveCurrentTarget_Call struct { + *mock.Call +} + +// SaveCurrentTarget is a helper method to define mock.On call +// - catalog metastore.QueryCoordCatalog +func (_e *MockTargetManager_Expecter) SaveCurrentTarget(catalog interface{}) *MockTargetManager_SaveCurrentTarget_Call { + return &MockTargetManager_SaveCurrentTarget_Call{Call: _e.mock.On("SaveCurrentTarget", catalog)} +} + +func (_c *MockTargetManager_SaveCurrentTarget_Call) Run(run func(catalog metastore.QueryCoordCatalog)) *MockTargetManager_SaveCurrentTarget_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(metastore.QueryCoordCatalog)) + }) + return _c +} + +func (_c *MockTargetManager_SaveCurrentTarget_Call) Return() *MockTargetManager_SaveCurrentTarget_Call { + _c.Call.Return() + return _c +} + +func (_c *MockTargetManager_SaveCurrentTarget_Call) RunAndReturn(run func(metastore.QueryCoordCatalog)) *MockTargetManager_SaveCurrentTarget_Call { + _c.Call.Return(run) + return _c +} + // UpdateCollectionCurrentTarget provides a mock function with given fields: collectionID func (_m *MockTargetManager) UpdateCollectionCurrentTarget(collectionID int64) bool { ret := _m.Called(collectionID) diff --git a/internal/querycoordv2/meta/target_manager.go b/internal/querycoordv2/meta/target_manager.go index 33b72a99f9..943588369d 100644 --- a/internal/querycoordv2/meta/target_manager.go +++ b/internal/querycoordv2/meta/target_manager.go @@ -19,6 +19,7 @@ package meta import ( "context" "fmt" + "runtime" "sync" "github.com/samber/lo" @@ -27,9 +28,11 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/metastore" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/retry" "github.com/milvus-io/milvus/pkg/util/tsoutil" @@ -62,7 +65,8 @@ type TargetManagerInterface interface { GetCollectionTargetVersion(collectionID int64, scope TargetScope) int64 IsCurrentTargetExist(collectionID int64, partitionID int64) bool IsNextTargetExist(collectionID int64) bool - Recover() error + SaveCurrentTarget(catalog metastore.QueryCoordCatalog) + Recover(catalog metastore.QueryCoordCatalog) error CanSegmentBeMoved(collectionID, segmentID int64) bool } @@ -76,17 +80,14 @@ type TargetManager struct { // all remove segment/channel operation happens on Both current and next -> delete status should be consistent current *target next *target - - catalog metastore.QueryCoordCatalog } -func NewTargetManager(broker Broker, meta *Meta, catalog metastore.QueryCoordCatalog) *TargetManager { +func NewTargetManager(broker Broker, meta *Meta) *TargetManager { return &TargetManager{ broker: broker, meta: meta, current: newTarget(), next: newTarget(), - catalog: catalog, } } @@ -128,12 +129,6 @@ func (mgr *TargetManager) UpdateCollectionCurrentTarget(collectionID int64) bool zap.Int64("version", newTarget.GetTargetVersion()), zap.String("partStatsVersion", partStatsVersionInfo), ) - - // save collection current target for fast recovery after qc restart - err := mgr.catalog.SaveCollectionTargets(newTarget.toPbMsg()) - if err != nil { - log.Warn("failed to save collection targets", zap.Error(err)) - } return true } @@ -244,7 +239,6 @@ func (mgr *TargetManager) RemoveCollection(collectionID int64) { mgr.current.removeCollectionTarget(collectionID) mgr.next.removeCollectionTarget(collectionID) - mgr.catalog.RemoveCollectionTarget(collectionID) } // RemovePartition removes all segment in the given partition, @@ -549,11 +543,51 @@ func (mgr *TargetManager) IsNextTargetExist(collectionID int64) bool { return len(newChannels) > 0 } -func (mgr *TargetManager) Recover() error { +func (mgr *TargetManager) SaveCurrentTarget(catalog metastore.QueryCoordCatalog) { + mgr.rwMutex.Lock() + defer mgr.rwMutex.Unlock() + if mgr.current != nil { + // use pool here to control maximal writer used by save target + pool := conc.NewPool[any](runtime.GOMAXPROCS(0) * 2) + defer pool.Release() + // use batch write in case of the number of collections is large + batchSize := 16 + var wg sync.WaitGroup + submit := func(tasks []typeutil.Pair[int64, *querypb.CollectionTarget]) { + wg.Add(1) + pool.Submit(func() (any, error) { + defer wg.Done() + ids := lo.Map(tasks, func(p typeutil.Pair[int64, *querypb.CollectionTarget], _ int) int64 { return p.A }) + if err := catalog.SaveCollectionTargets(lo.Map(tasks, func(p typeutil.Pair[int64, *querypb.CollectionTarget], _ int) *querypb.CollectionTarget { + return p.B + })...); err != nil { + log.Warn("failed to save current target for collection", zap.Int64s("collectionIDs", ids), zap.Error(err)) + } else { + log.Info("succeed to save current target for collection", zap.Int64s("collectionIDs", ids)) + } + return nil, nil + }) + } + tasks := make([]typeutil.Pair[int64, *querypb.CollectionTarget], 0, batchSize) + for id, target := range mgr.current.collectionTargetMap { + tasks = append(tasks, typeutil.NewPair(id, target.toPbMsg())) + if len(tasks) >= batchSize { + submit(tasks) + tasks = make([]typeutil.Pair[int64, *querypb.CollectionTarget], 0, batchSize) + } + } + if len(tasks) > 0 { + submit(tasks) + } + wg.Wait() + } +} + +func (mgr *TargetManager) Recover(catalog metastore.QueryCoordCatalog) error { mgr.rwMutex.Lock() defer mgr.rwMutex.Unlock() - targets, err := mgr.catalog.GetCollectionTargets() + targets, err := catalog.GetCollectionTargets() if err != nil { log.Warn("failed to recover collection target from etcd", zap.Error(err)) return err @@ -568,7 +602,14 @@ func (mgr *TargetManager) Recover() error { zap.Int("segmentNum", len(newTarget.GetAllSegmentIDs())), zap.Int64("version", newTarget.GetTargetVersion()), ) + + // clear target info in meta store + err := catalog.RemoveCollectionTarget(t.GetCollectionID()) + if err != nil { + log.Warn("failed to clear collection target from etcd", zap.Error(err)) + } } + return nil } diff --git a/internal/querycoordv2/meta/target_manager_test.go b/internal/querycoordv2/meta/target_manager_test.go index a4b9ad731a..3e3954b24d 100644 --- a/internal/querycoordv2/meta/target_manager_test.go +++ b/internal/querycoordv2/meta/target_manager_test.go @@ -113,7 +113,7 @@ func (suite *TargetManagerSuite) SetupTest() { idAllocator := RandomIncrementIDAllocator() suite.meta = NewMeta(idAllocator, suite.catalog, session.NewNodeManager()) suite.broker = NewMockBroker(suite.T()) - suite.mgr = NewTargetManager(suite.broker, suite.meta, querycoord.NewCatalog(suite.kv)) + suite.mgr = NewTargetManager(suite.broker, suite.meta) for _, collection := range suite.collections { dmChannels := make([]*datapb.VchannelInfo, 0) @@ -582,16 +582,13 @@ func (suite *TargetManagerSuite) TestRecover() { suite.mgr.UpdateCollectionNextTarget(collectionID) suite.mgr.UpdateCollectionCurrentTarget(collectionID) - // target should be save to meta store after update current target - targets, err := suite.catalog.GetCollectionTargets() - suite.NoError(err) - suite.Len(targets, 1) + suite.mgr.SaveCurrentTarget(suite.catalog) // clear target in memory version := suite.mgr.current.getCollectionTarget(collectionID).GetTargetVersion() suite.mgr.current.removeCollectionTarget(collectionID) // try to recover - suite.mgr.Recover() + suite.mgr.Recover(suite.catalog) target := suite.mgr.current.getCollectionTarget(collectionID) suite.NotNil(target) @@ -599,9 +596,8 @@ func (suite *TargetManagerSuite) TestRecover() { suite.Len(target.GetAllSegmentIDs(), 2) suite.Equal(target.GetTargetVersion(), version) - // target should be removed from meta store after collection released - suite.mgr.RemoveCollection(collectionID) - targets, err = suite.catalog.GetCollectionTargets() + // after recover, target info should be cleaned up + targets, err := suite.catalog.GetCollectionTargets() suite.NoError(err) suite.Len(targets, 0) } diff --git a/internal/querycoordv2/mocks/mock_querynode.go b/internal/querycoordv2/mocks/mock_querynode.go index f0c5c0f119..961d0b64f4 100644 --- a/internal/querycoordv2/mocks/mock_querynode.go +++ b/internal/querycoordv2/mocks/mock_querynode.go @@ -88,6 +88,10 @@ func (_c *MockQueryNodeServer_Delete_Call) RunAndReturn(run func(context.Context func (_m *MockQueryNodeServer) DeleteBatch(_a0 context.Context, _a1 *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error) { ret := _m.Called(_a0, _a1) + if len(ret) == 0 { + panic("no return value specified for DeleteBatch") + } + var r0 *querypb.DeleteBatchResponse var r1 error if rf, ok := ret.Get(0).(func(context.Context, *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error)); ok { diff --git a/internal/querycoordv2/observers/collection_observer_test.go b/internal/querycoordv2/observers/collection_observer_test.go index a9a994d3d0..3293103c85 100644 --- a/internal/querycoordv2/observers/collection_observer_test.go +++ b/internal/querycoordv2/observers/collection_observer_test.go @@ -196,7 +196,7 @@ func (suite *CollectionObserverSuite) SetupTest() { suite.nodeMgr = session.NewNodeManager() suite.meta = meta.NewMeta(suite.idAllocator, suite.store, suite.nodeMgr) suite.broker = meta.NewMockBroker(suite.T()) - suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta, querycoord.NewCatalog(suite.kv)) + suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta) suite.cluster = session.NewMockCluster(suite.T()) suite.targetObserver = NewTargetObserver(suite.meta, suite.targetMgr, diff --git a/internal/querycoordv2/observers/target_observer_test.go b/internal/querycoordv2/observers/target_observer_test.go index 274b156eea..2058aece3f 100644 --- a/internal/querycoordv2/observers/target_observer_test.go +++ b/internal/querycoordv2/observers/target_observer_test.go @@ -91,7 +91,7 @@ func (suite *TargetObserverSuite) SetupTest() { suite.meta = meta.NewMeta(idAllocator, store, nodeMgr) suite.broker = meta.NewMockBroker(suite.T()) - suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta, querycoord.NewCatalog(suite.kv)) + suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta) suite.distMgr = meta.NewDistributionManager() suite.cluster = session.NewMockCluster(suite.T()) suite.observer = NewTargetObserver(suite.meta, suite.targetMgr, suite.distMgr, suite.broker, suite.cluster, nodeMgr) @@ -300,7 +300,7 @@ func (suite *TargetObserverCheckSuite) SetupTest() { suite.meta = meta.NewMeta(idAllocator, store, nodeMgr) suite.broker = meta.NewMockBroker(suite.T()) - suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta, querycoord.NewCatalog(suite.kv)) + suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta) suite.distMgr = meta.NewDistributionManager() suite.cluster = session.NewMockCluster(suite.T()) suite.observer = NewTargetObserver( diff --git a/internal/querycoordv2/ops_service_test.go b/internal/querycoordv2/ops_service_test.go index 96b9d97c00..2eb4a1c34a 100644 --- a/internal/querycoordv2/ops_service_test.go +++ b/internal/querycoordv2/ops_service_test.go @@ -102,7 +102,7 @@ func (suite *OpsServiceSuite) SetupTest() { suite.nodeMgr = session.NewNodeManager() suite.meta = meta.NewMeta(params.RandomIncrementIDAllocator(), suite.store, suite.nodeMgr) suite.broker = meta.NewMockBroker(suite.T()) - suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta, querycoord.NewCatalog(suite.kv)) + suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta) suite.targetObserver = observers.NewTargetObserver( suite.meta, suite.targetMgr, diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index 0f118c4c41..ed3bbec0e3 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -410,8 +410,8 @@ func (s *Server) initMeta() error { ChannelDistManager: meta.NewChannelDistManager(), LeaderViewManager: meta.NewLeaderViewManager(), } - s.targetMgr = meta.NewTargetManager(s.broker, s.meta, s.store) - err = s.targetMgr.Recover() + s.targetMgr = meta.NewTargetManager(s.broker, s.meta) + err = s.targetMgr.Recover(s.store) if err != nil { log.Warn("failed to recover collection targets", zap.Error(err)) } @@ -566,6 +566,12 @@ func (s *Server) Stop() error { s.targetObserver.Stop() } + // save target to meta store, after querycoord restart, make it fast to recover current target + // should save target after target observer stop, incase of target changed + if s.targetMgr != nil { + s.targetMgr.SaveCurrentTarget(s.store) + } + if s.replicaObserver != nil { s.replicaObserver.Stop() } diff --git a/internal/querycoordv2/server_test.go b/internal/querycoordv2/server_test.go index dea6a7b715..70f5892e33 100644 --- a/internal/querycoordv2/server_test.go +++ b/internal/querycoordv2/server_test.go @@ -551,7 +551,7 @@ func (suite *ServerSuite) updateCollectionStatus(collectionID int64, status quer func (suite *ServerSuite) hackServer() { suite.broker = meta.NewMockBroker(suite.T()) suite.server.broker = suite.broker - suite.server.targetMgr = meta.NewTargetManager(suite.broker, suite.server.meta, suite.server.store) + suite.server.targetMgr = meta.NewTargetManager(suite.broker, suite.server.meta) suite.server.taskScheduler = task.NewScheduler( suite.server.ctx, suite.server.meta, diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index 38ede0ad35..6451f28d0f 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -151,9 +151,7 @@ func (suite *ServiceSuite) SetupTest() { suite.nodeMgr = session.NewNodeManager() suite.meta = meta.NewMeta(params.RandomIncrementIDAllocator(), suite.store, suite.nodeMgr) suite.broker = meta.NewMockBroker(suite.T()) - suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta, suite.store) - suite.cluster = session.NewMockCluster(suite.T()) - suite.cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe() + suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta) suite.targetObserver = observers.NewTargetObserver( suite.meta, suite.targetMgr, diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index a648c303b5..26301dd566 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -155,7 +155,7 @@ func (suite *TaskSuite) SetupTest() { suite.meta = meta.NewMeta(RandomIncrementIDAllocator(), suite.store, session.NewNodeManager()) suite.dist = meta.NewDistributionManager() suite.broker = meta.NewMockBroker(suite.T()) - suite.target = meta.NewTargetManager(suite.broker, suite.meta, querycoord.NewCatalog(suite.kv)) + suite.target = meta.NewTargetManager(suite.broker, suite.meta) suite.nodeMgr = session.NewNodeManager() suite.cluster = session.NewMockCluster(suite.T())