diff --git a/internal/querycoordv2/checkers/channel_checker.go b/internal/querycoordv2/checkers/channel_checker.go index ef4ec783f7..0016756d0c 100644 --- a/internal/querycoordv2/checkers/channel_checker.go +++ b/internal/querycoordv2/checkers/channel_checker.go @@ -59,13 +59,22 @@ func (c *ChannelChecker) Description() string { return "DmChannelChecker checks the lack of DmChannels, or some DmChannels are redundant" } +func (c *ChannelChecker) readyToCheck(collectionID int64) bool { + metaExist := (c.meta.GetCollection(collectionID) != nil) + targetExist := c.targetMgr.IsNextTargetExist(collectionID) || c.targetMgr.IsCurrentTargetExist(collectionID) + + return metaExist && targetExist +} + func (c *ChannelChecker) Check(ctx context.Context) []task.Task { collectionIDs := c.meta.CollectionManager.GetAll() tasks := make([]task.Task, 0) for _, cid := range collectionIDs { - replicas := c.meta.ReplicaManager.GetByCollection(cid) - for _, r := range replicas { - tasks = append(tasks, c.checkReplica(ctx, r)...) + if c.readyToCheck(cid) { + replicas := c.meta.ReplicaManager.GetByCollection(cid) + for _, r := range replicas { + tasks = append(tasks, c.checkReplica(ctx, r)...) + } } } diff --git a/internal/querycoordv2/checkers/channel_checker_test.go b/internal/querycoordv2/checkers/channel_checker_test.go index 340719679d..1e53fb091d 100644 --- a/internal/querycoordv2/checkers/channel_checker_test.go +++ b/internal/querycoordv2/checkers/channel_checker_test.go @@ -135,9 +135,22 @@ func (suite *ChannelCheckerTestSuite) TestLoadChannel() { func (suite *ChannelCheckerTestSuite) TestReduceChannel() { checker := suite.checker checker.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1)) + checker.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1)) checker.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1})) - checker.dist.ChannelDistManager.Update(1, utils.CreateTestChannel(1, 1, 1, "test-insert-channel")) + channels := []*datapb.VchannelInfo{ + { + CollectionID: 1, + ChannelName: "test-insert-channel1", + }, + } + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( + channels, nil, nil) + checker.targetMgr.UpdateCollectionNextTarget(int64(1)) + checker.targetMgr.UpdateCollectionCurrentTarget(int64(1)) + + checker.dist.ChannelDistManager.Update(1, utils.CreateTestChannel(1, 1, 1, "test-insert-channel1")) + checker.dist.ChannelDistManager.Update(1, utils.CreateTestChannel(1, 1, 1, "test-insert-channel2")) tasks := checker.Check(context.TODO()) suite.Len(tasks, 1) suite.EqualValues(1, tasks[0].ReplicaID()) @@ -146,7 +159,7 @@ func (suite *ChannelCheckerTestSuite) TestReduceChannel() { action := tasks[0].Actions()[0].(*task.ChannelAction) suite.Equal(task.ActionTypeReduce, action.Type()) suite.EqualValues(1, action.Node()) - suite.EqualValues("test-insert-channel", action.ChannelName()) + suite.EqualValues("test-insert-channel2", action.ChannelName()) } func (suite *ChannelCheckerTestSuite) TestRepeatedChannels() { diff --git a/internal/querycoordv2/checkers/controller_test.go b/internal/querycoordv2/checkers/controller_test.go index 286dfc6dae..8adb58a335 100644 --- a/internal/querycoordv2/checkers/controller_test.go +++ b/internal/querycoordv2/checkers/controller_test.go @@ -84,7 +84,6 @@ func (suite *CheckerControllerSuite) SetupTest() { } func (suite *CheckerControllerSuite) TestBasic() { - // set meta suite.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1)) suite.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1)) @@ -95,20 +94,27 @@ func (suite *CheckerControllerSuite) TestBasic() { suite.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2) // set target + channels := []*datapb.VchannelInfo{ + { + CollectionID: 1, + ChannelName: "test-insert-channel2", + }, + } + segments := []*datapb.SegmentInfo{ { - ID: 1, + ID: 3, PartitionID: 1, - InsertChannel: "test-insert-channel", + InsertChannel: "test-insert-channel2", }, } suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( - nil, segments, nil) + channels, segments, nil) suite.targetManager.UpdateCollectionNextTarget(int64(1)) // set dist suite.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel")) - suite.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{})) + suite.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{1: 2}, map[int64]*meta.Segment{})) counter := atomic.NewInt64(0) suite.scheduler.EXPECT().Add(mock.Anything).Run(func(task task.Task) { diff --git a/internal/querycoordv2/checkers/segment_checker.go b/internal/querycoordv2/checkers/segment_checker.go index 38d3564ff6..b4da95ed53 100644 --- a/internal/querycoordv2/checkers/segment_checker.go +++ b/internal/querycoordv2/checkers/segment_checker.go @@ -64,13 +64,22 @@ func (c *SegmentChecker) Description() string { return "SegmentChecker checks the lack of segments, or some segments are redundant" } +func (c *SegmentChecker) readyToCheck(collectionID int64) bool { + metaExist := (c.meta.GetCollection(collectionID) != nil) + targetExist := c.targetMgr.IsNextTargetExist(collectionID) || c.targetMgr.IsCurrentTargetExist(collectionID) + + return metaExist && targetExist +} + func (c *SegmentChecker) Check(ctx context.Context) []task.Task { collectionIDs := c.meta.CollectionManager.GetAll() tasks := make([]task.Task, 0) for _, cid := range collectionIDs { - replicas := c.meta.ReplicaManager.GetByCollection(cid) - for _, r := range replicas { - tasks = append(tasks, c.checkReplica(ctx, r)...) + if c.readyToCheck(cid) { + replicas := c.meta.ReplicaManager.GetByCollection(cid) + for _, r := range replicas { + tasks = append(tasks, c.checkReplica(ctx, r)...) + } } } diff --git a/internal/querycoordv2/checkers/segment_checker_test.go b/internal/querycoordv2/checkers/segment_checker_test.go index a046808e46..209afa5a36 100644 --- a/internal/querycoordv2/checkers/segment_checker_test.go +++ b/internal/querycoordv2/checkers/segment_checker_test.go @@ -121,8 +121,16 @@ func (suite *SegmentCheckerTestSuite) TestLoadSegments() { InsertChannel: "test-insert-channel", }, } + + channels := []*datapb.VchannelInfo{ + { + CollectionID: 1, + ChannelName: "test-insert-channel", + }, + } + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( - nil, segments, nil) + channels, segments, nil) checker.targetMgr.UpdateCollectionNextTarget(int64(1)) // set dist @@ -159,8 +167,15 @@ func (suite *SegmentCheckerTestSuite) TestSkipCheckReplica() { InsertChannel: "test-insert-channel", }, } + + channels := []*datapb.VchannelInfo{ + { + CollectionID: 1, + ChannelName: "test-insert-channel", + }, + } suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( - nil, segments, nil) + channels, segments, nil) checker.targetMgr.UpdateCollectionNextTarget(int64(1)) // set dist @@ -177,8 +192,20 @@ func (suite *SegmentCheckerTestSuite) TestReleaseSegments() { checker := suite.checker // set meta checker.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1)) + checker.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1)) checker.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2})) + // set target + channels := []*datapb.VchannelInfo{ + { + CollectionID: 1, + ChannelName: "test-insert-channel", + }, + } + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( + channels, nil, nil) + checker.targetMgr.UpdateCollectionNextTarget(int64(1)) + // set dist checker.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel")) checker.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{})) @@ -210,8 +237,14 @@ func (suite *SegmentCheckerTestSuite) TestReleaseRepeatedSegments() { InsertChannel: "test-insert-channel", }, } + channels := []*datapb.VchannelInfo{ + { + CollectionID: 1, + ChannelName: "test-insert-channel", + }, + } suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( - nil, segments, nil) + channels, segments, nil) checker.targetMgr.UpdateCollectionNextTarget(int64(1)) // set dist @@ -249,9 +282,16 @@ func (suite *SegmentCheckerTestSuite) TestSkipReleaseSealedSegments() { checker.meta.ReplicaManager.Put(utils.CreateTestReplica(1, collectionID, []int64{1, 2})) // set target + channels := []*datapb.VchannelInfo{ + { + CollectionID: 1, + ChannelName: "test-insert-channel", + SeekPosition: &msgpb.MsgPosition{Timestamp: 10}, + }, + } segments := []*datapb.SegmentInfo{} suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( - nil, segments, nil) + channels, segments, nil) checker.targetMgr.UpdateCollectionNextTarget(collectionID) checker.targetMgr.UpdateCollectionCurrentTarget(collectionID) readableVersion := checker.targetMgr.GetCollectionTargetVersion(collectionID, meta.CurrentTarget) diff --git a/internal/querycoordv2/observers/collection_observer.go b/internal/querycoordv2/observers/collection_observer.go index c59193db85..63e252f4b3 100644 --- a/internal/querycoordv2/observers/collection_observer.go +++ b/internal/querycoordv2/observers/collection_observer.go @@ -146,6 +146,13 @@ func (ob *CollectionObserver) observeTimeout() { } } +func (ob *CollectionObserver) readyToObserve(collectionID int64) bool { + metaExist := (ob.meta.GetCollection(collectionID) != nil) + targetExist := ob.targetMgr.IsNextTargetExist(collectionID) || ob.targetMgr.IsCurrentTargetExist(collectionID) + + return metaExist && targetExist +} + func (ob *CollectionObserver) observeLoadStatus() { partitions := ob.meta.CollectionManager.GetAllPartitions() if len(partitions) > 0 { @@ -156,9 +163,11 @@ func (ob *CollectionObserver) observeLoadStatus() { if partition.LoadPercentage == 100 { continue } - replicaNum := ob.meta.GetReplicaNumber(partition.GetCollectionID()) - ob.observePartitionLoadStatus(partition, replicaNum) - loading = true + if ob.readyToObserve(partition.CollectionID) { + replicaNum := ob.meta.GetReplicaNumber(partition.GetCollectionID()) + ob.observePartitionLoadStatus(partition, replicaNum) + loading = true + } } // trigger check logic when loading collections/partitions if loading { diff --git a/internal/querycoordv2/observers/leader_observer.go b/internal/querycoordv2/observers/leader_observer.go index f5dcb26492..ceb50d25e1 100644 --- a/internal/querycoordv2/observers/leader_observer.go +++ b/internal/querycoordv2/observers/leader_observer.go @@ -90,10 +90,19 @@ func (o *LeaderObserver) observe(ctx context.Context) { o.observeSegmentsDist(ctx) } +func (o *LeaderObserver) readyToObserve(collectionID int64) bool { + metaExist := (o.meta.GetCollection(collectionID) != nil) + targetExist := o.target.IsNextTargetExist(collectionID) || o.target.IsCurrentTargetExist(collectionID) + + return metaExist && targetExist +} + func (o *LeaderObserver) observeSegmentsDist(ctx context.Context) { collectionIDs := o.meta.CollectionManager.GetAll() for _, cid := range collectionIDs { - o.observeCollection(ctx, cid) + if o.readyToObserve(cid) { + o.observeCollection(ctx, cid) + } } } diff --git a/internal/querycoordv2/observers/leader_observer_test.go b/internal/querycoordv2/observers/leader_observer_test.go index e72aaf0a34..5268ce8f93 100644 --- a/internal/querycoordv2/observers/leader_observer_test.go +++ b/internal/querycoordv2/observers/leader_observer_test.go @@ -212,6 +212,7 @@ func (suite *LeaderObserverTestSuite) TestIgnoreSyncLoadedSegments() { channels, segments, nil) observer.target.UpdateCollectionNextTarget(int64(1)) observer.target.UpdateCollectionCurrentTarget(1) + observer.target.UpdateCollectionNextTarget(int64(1)) observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 2, 1, "test-insert-channel"), utils.CreateTestSegment(1, 1, 2, 2, 1, "test-insert-channel")) observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel")) @@ -407,12 +408,26 @@ func (suite *LeaderObserverTestSuite) TestSyncRemovedSegments() { observer.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1)) observer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2})) - observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel")) - observer.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{3: 2}, map[int64]*meta.Segment{})) - schema := utils.CreateTestSchema() suite.broker.EXPECT().GetCollectionSchema(mock.Anything, int64(1)).Return(schema, nil) + channels := []*datapb.VchannelInfo{ + { + CollectionID: 1, + ChannelName: "test-insert-channel", + }, + } + + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( + channels, nil, nil) + observer.target.UpdateCollectionNextTarget(int64(1)) + observer.target.UpdateCollectionCurrentTarget(1) + + observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel")) + view := utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{3: 2}, map[int64]*meta.Segment{}) + view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget) + observer.dist.LeaderViewManager.Update(2, view) + expectReqeustFunc := func(version int64) *querypb.SyncDistributionRequest { return &querypb.SyncDistributionRequest{ Base: &commonpb.MsgBase{ diff --git a/internal/querycoordv2/observers/target_observer.go b/internal/querycoordv2/observers/target_observer.go index 5d129a3319..3568b3ae79 100644 --- a/internal/querycoordv2/observers/target_observer.go +++ b/internal/querycoordv2/observers/target_observer.go @@ -140,6 +140,14 @@ func (ob *TargetObserver) Check(collectionID int64) bool { } func (ob *TargetObserver) check(collectionID int64) { + if !ob.meta.Exist(collectionID) { + ob.ReleaseCollection(collectionID) + ob.targetMgr.RemoveCollection(collectionID) + log.Info("collection has been removed from target observer", + zap.Int64("collectionID", collectionID)) + return + } + if ob.shouldUpdateCurrentTarget(collectionID) { ob.updateCurrentTarget(collectionID) } diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index 2112b5cc00..697b712d25 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -28,7 +28,6 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/atomic" "go.uber.org/zap" - "golang.org/x/sync/errgroup" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" @@ -407,11 +406,9 @@ func (s *Server) startQueryCoord() error { go s.handleNodeUpLoop() go s.watchNodes(revision) - log.Info("start recovering dist and target") - err = s.recover() - if err != nil { - return err - } + // Recover dist, to avoid generate too much task when dist not ready after restart + s.distController.SyncAll(s.ctx) + s.startServerLoop() s.afterStart() s.UpdateStateCode(commonpb.StateCode_Healthy) @@ -573,39 +570,6 @@ func (s *Server) SetQueryNodeCreator(f func(ctx context.Context, addr string, no s.queryNodeCreator = f } -func (s *Server) recover() error { - // Recover target managers - group, ctx := errgroup.WithContext(s.ctx) - for _, collection := range s.meta.GetAll() { - collection := collection - group.Go(func() error { - return s.recoverCollectionTargets(ctx, collection) - }) - } - err := group.Wait() - if err != nil { - return err - } - - // Recover dist - s.distController.SyncAll(s.ctx) - - return nil -} - -func (s *Server) recoverCollectionTargets(ctx context.Context, collection int64) error { - err := s.targetMgr.UpdateCollectionNextTarget(collection) - if err != nil { - s.meta.CollectionManager.RemoveCollection(collection) - s.meta.ReplicaManager.RemoveCollection(collection) - log.Error("failed to recover collection due to update next target failed", - zap.Int64("collectionID", collection), - zap.Error(err), - ) - } - return nil -} - func (s *Server) watchNodes(revision int64) { defer s.wg.Done() diff --git a/internal/querycoordv2/server_test.go b/internal/querycoordv2/server_test.go index d1047c0f37..7ea9e5e919 100644 --- a/internal/querycoordv2/server_test.go +++ b/internal/querycoordv2/server_test.go @@ -24,7 +24,6 @@ import ( "testing" "time" - "github.com/cockroachdb/errors" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -139,7 +138,7 @@ func (suite *ServerSuite) SetupTest() { suite.loadAll() for _, collection := range suite.collections { - suite.assertLoaded(collection) + suite.True(suite.server.meta.Exist(collection)) suite.updateCollectionStatus(collection, querypb.LoadStatus_Loaded) } } @@ -165,27 +164,7 @@ func (suite *ServerSuite) TestRecover() { suite.NoError(err) for _, collection := range suite.collections { - suite.assertLoaded(collection) - } -} - -func (suite *ServerSuite) TestRecoverFailed() { - err := suite.server.Stop() - suite.NoError(err) - - suite.server, err = suite.newQueryCoord() - suite.NoError(err) - - broker := meta.NewMockBroker(suite.T()) - for _, collection := range suite.collections { - broker.EXPECT().GetRecoveryInfoV2(context.TODO(), collection).Return(nil, nil, errors.New("CollectionNotExist")) - } - suite.server.targetMgr = meta.NewTargetManager(broker, suite.server.meta) - err = suite.server.Start() - suite.NoError(err) - - for _, collection := range suite.collections { - suite.Nil(suite.server.targetMgr.GetDmChannelsByCollection(collection, meta.NextTarget)) + suite.True(suite.server.meta.Exist(collection)) } } @@ -396,18 +375,6 @@ func (suite *ServerSuite) loadAll() { } } -func (suite *ServerSuite) assertLoaded(collection int64) { - suite.True(suite.server.meta.Exist(collection)) - for _, channel := range suite.channels[collection] { - suite.NotNil(suite.server.targetMgr.GetDmChannel(collection, channel, meta.NextTarget)) - } - for _, partitions := range suite.segments[collection] { - for _, segment := range partitions { - suite.NotNil(suite.server.targetMgr.GetHistoricalSegment(collection, segment, meta.NextTarget)) - } - } -} - func (suite *ServerSuite) expectGetRecoverInfo(collection int64) { vChannels := []*datapb.VchannelInfo{} for _, channel := range suite.channels[collection] { diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index dc829fc477..7fa1aab5ab 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -1726,7 +1726,7 @@ func (suite *ServiceSuite) expectGetRecoverInfo(collection int64) { } suite.broker.EXPECT(). GetRecoveryInfoV2(mock.Anything, collection, mock.Anything, mock.Anything). - Return(vChannels, segmentBinlogs, nil) + Return(vChannels, segmentBinlogs, nil).Maybe() } func (suite *ServiceSuite) expectLoadPartitions() {