From 1b90630633a2628c22f155b241b9cc74510758e1 Mon Sep 17 00:00:00 2001 From: yah01 Date: Wed, 8 Nov 2023 11:36:22 +0800 Subject: [PATCH] Fix the target updated before version updated to cause data missing (#28250) Signed-off-by: yah01 --- internal/querycoordv2/job/job_test.go | 1 + .../observers/collection_observer.go | 4 - .../observers/collection_observer_test.go | 3 + .../querycoordv2/observers/leader_observer.go | 67 -------- .../observers/leader_observer_test.go | 52 ------ .../querycoordv2/observers/target_observer.go | 148 +++++++++++++++++- .../observers/target_observer_test.go | 14 +- internal/querycoordv2/server.go | 1 + internal/querycoordv2/server_test.go | 1 + internal/querycoordv2/services_test.go | 2 + 10 files changed, 162 insertions(+), 131 deletions(-) diff --git a/internal/querycoordv2/job/job_test.go b/internal/querycoordv2/job/job_test.go index bd5c3dcd8e..0cc2656fe2 100644 --- a/internal/querycoordv2/job/job_test.go +++ b/internal/querycoordv2/job/job_test.go @@ -162,6 +162,7 @@ func (suite *JobSuite) SetupTest() { suite.targetMgr, suite.dist, suite.broker, + suite.cluster, ) suite.targetObserver.Start() suite.scheduler = NewScheduler() diff --git a/internal/querycoordv2/observers/collection_observer.go b/internal/querycoordv2/observers/collection_observer.go index a1a8825a05..c8e92b528e 100644 --- a/internal/querycoordv2/observers/collection_observer.go +++ b/internal/querycoordv2/observers/collection_observer.go @@ -231,10 +231,6 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa log.Warn("failed to manual check current target, skip update load status") return } - if !ob.leaderObserver.CheckTargetVersion(ctx, partition.GetCollectionID()) { - log.Warn("failed to manual check leader target version ,skip update load status") - return - } delete(ob.partitionLoadedCount, partition.GetPartitionID()) } collectionPercentage, err := ob.meta.CollectionManager.UpdateLoadPercent(partition.PartitionID, loadPercentage) diff --git a/internal/querycoordv2/observers/collection_observer_test.go b/internal/querycoordv2/observers/collection_observer_test.go index d115e1a270..653f2b3570 100644 --- a/internal/querycoordv2/observers/collection_observer_test.go +++ b/internal/querycoordv2/observers/collection_observer_test.go @@ -59,6 +59,7 @@ type CollectionObserverSuite struct { kv kv.MetaKv store metastore.QueryCoordCatalog broker *meta.MockBroker + cluster *session.MockCluster // Dependencies dist *meta.DistributionManager @@ -189,10 +190,12 @@ func (suite *CollectionObserverSuite) SetupTest() { suite.meta = meta.NewMeta(suite.idAllocator, suite.store, nodeMgr) suite.broker = meta.NewMockBroker(suite.T()) suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta) + suite.cluster = session.NewMockCluster(suite.T()) suite.targetObserver = NewTargetObserver(suite.meta, suite.targetMgr, suite.dist, suite.broker, + suite.cluster, ) suite.checkerController = &checkers.CheckerController{} diff --git a/internal/querycoordv2/observers/leader_observer.go b/internal/querycoordv2/observers/leader_observer.go index 79039a8af1..8192c7274e 100644 --- a/internal/querycoordv2/observers/leader_observer.go +++ b/internal/querycoordv2/observers/leader_observer.go @@ -21,7 +21,6 @@ import ( "sync" "time" - "github.com/samber/lo" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -132,77 +131,11 @@ func (o *LeaderObserver) observeCollection(ctx context.Context, collection int64 actions := o.findNeedLoadedSegments(leaderView, dists) actions = append(actions, o.findNeedRemovedSegments(leaderView, dists)...) - updateVersionAction := o.checkNeedUpdateTargetVersion(ctx, leaderView) - if updateVersionAction != nil { - actions = append(actions, updateVersionAction) - } o.sync(ctx, replica.GetID(), leaderView, actions) } } } -func (o *LeaderObserver) CheckTargetVersion(ctx context.Context, collectionID int64) bool { - // if not ready to observer, skip add task - if !o.readyToObserve(collectionID) { - return false - } - - result := o.checkCollectionLeaderVersionIsCurrent(ctx, collectionID) - if !result { - o.dispatcher.AddTask(collectionID) - } - - return result -} - -func (o *LeaderObserver) checkCollectionLeaderVersionIsCurrent(ctx context.Context, collectionID int64) bool { - replicas := o.meta.ReplicaManager.GetByCollection(collectionID) - for _, replica := range replicas { - leaders := o.dist.ChannelDistManager.GetShardLeadersByReplica(replica) - for ch, leaderID := range leaders { - leaderView := o.dist.LeaderViewManager.GetLeaderShardView(leaderID, ch) - if leaderView == nil { - return false - } - - action := o.checkNeedUpdateTargetVersion(ctx, leaderView) - if action != nil { - return false - } - } - } - return true -} - -func (o *LeaderObserver) checkNeedUpdateTargetVersion(ctx context.Context, leaderView *meta.LeaderView) *querypb.SyncAction { - log.Ctx(ctx).WithRateGroup("qcv2.LeaderObserver", 1, 60) - targetVersion := o.target.GetCollectionTargetVersion(leaderView.CollectionID, meta.CurrentTarget) - - if targetVersion <= leaderView.TargetVersion { - return nil - } - - log.RatedInfo(10, "Update readable segment version", - zap.Int64("collectionID", leaderView.CollectionID), - zap.String("channelName", leaderView.Channel), - zap.Int64("nodeID", leaderView.ID), - zap.Int64("oldVersion", leaderView.TargetVersion), - zap.Int64("newVersion", targetVersion), - ) - - sealedSegments := o.target.GetSealedSegmentsByChannel(leaderView.CollectionID, leaderView.Channel, meta.CurrentTarget) - growingSegments := o.target.GetGrowingSegmentsByChannel(leaderView.CollectionID, leaderView.Channel, meta.CurrentTarget) - droppedSegments := o.target.GetDroppedSegmentsByChannel(leaderView.CollectionID, leaderView.Channel, meta.CurrentTarget) - - return &querypb.SyncAction{ - Type: querypb.SyncType_UpdateVersion, - GrowingInTarget: growingSegments.Collect(), - SealedInTarget: lo.Keys(sealedSegments), - DroppedInTarget: droppedSegments, - TargetVersion: targetVersion, - } -} - func (o *LeaderObserver) findNeedLoadedSegments(leaderView *meta.LeaderView, dists []*meta.Segment) []*querypb.SyncAction { ret := make([]*querypb.SyncAction, 0) dists = utils.FindMaxVersionSegments(dists) diff --git a/internal/querycoordv2/observers/leader_observer_test.go b/internal/querycoordv2/observers/leader_observer_test.go index 23a71ab8e1..c4d170601c 100644 --- a/internal/querycoordv2/observers/leader_observer_test.go +++ b/internal/querycoordv2/observers/leader_observer_test.go @@ -541,58 +541,6 @@ func (suite *LeaderObserverTestSuite) TestIgnoreSyncRemovedSegments() { ) } -func (suite *LeaderObserverTestSuite) TestSyncTargetVersion() { - collectionID := int64(1001) - - observer := suite.observer - observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(collectionID, 1)) - observer.meta.CollectionManager.PutPartition(utils.CreateTestPartition(collectionID, 1)) - observer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, collectionID, []int64{1, 2})) - - nextTargetChannels := []*datapb.VchannelInfo{ - { - CollectionID: collectionID, - ChannelName: "channel-1", - UnflushedSegmentIds: []int64{22, 33}, - }, - { - CollectionID: collectionID, - ChannelName: "channel-2", - UnflushedSegmentIds: []int64{44}, - }, - } - - nextTargetSegments := []*datapb.SegmentInfo{ - { - ID: 11, - PartitionID: 1, - InsertChannel: "channel-1", - }, - { - ID: 12, - PartitionID: 1, - InsertChannel: "channel-2", - }, - } - - suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, collectionID).Return(nextTargetChannels, nextTargetSegments, nil) - suite.observer.target.UpdateCollectionNextTarget(collectionID) - suite.observer.target.UpdateCollectionCurrentTarget(collectionID) - TargetVersion := suite.observer.target.GetCollectionTargetVersion(collectionID, meta.CurrentTarget) - - view := utils.CreateTestLeaderView(1, collectionID, "channel-1", nil, nil) - view.TargetVersion = TargetVersion - action := observer.checkNeedUpdateTargetVersion(context.Background(), view) - suite.Nil(action) - - view.TargetVersion = TargetVersion - 1 - action = observer.checkNeedUpdateTargetVersion(context.Background(), view) - suite.NotNil(action) - suite.Equal(querypb.SyncType_UpdateVersion, action.Type) - suite.Len(action.GrowingInTarget, 2) - suite.Len(action.SealedInTarget, 1) -} - func TestLeaderObserverSuite(t *testing.T) { suite.Run(t, new(LeaderObserverTestSuite)) } diff --git a/internal/querycoordv2/observers/target_observer.go b/internal/querycoordv2/observers/target_observer.go index cc609163d2..0358052acc 100644 --- a/internal/querycoordv2/observers/target_observer.go +++ b/internal/querycoordv2/observers/target_observer.go @@ -21,12 +21,18 @@ import ( "sync" "time" + "github.com/samber/lo" "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/params" + "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/utils" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/commonpbutil" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -50,6 +56,7 @@ type TargetObserver struct { targetMgr *meta.TargetManager distMgr *meta.DistributionManager broker meta.Broker + cluster session.Cluster initChan chan initRequest manualCheck chan checkRequest @@ -64,12 +71,19 @@ type TargetObserver struct { stopOnce sync.Once } -func NewTargetObserver(meta *meta.Meta, targetMgr *meta.TargetManager, distMgr *meta.DistributionManager, broker meta.Broker) *TargetObserver { +func NewTargetObserver( + meta *meta.Meta, + targetMgr *meta.TargetManager, + distMgr *meta.DistributionManager, + broker meta.Broker, + cluster session.Cluster, +) *TargetObserver { result := &TargetObserver{ meta: meta, targetMgr: targetMgr, distMgr: distMgr, broker: broker, + cluster: cluster, manualCheck: make(chan checkRequest, 10), nextTargetLastUpdate: typeutil.NewConcurrentMap[int64, time.Time](), updateChan: make(chan targetUpdateRequest), @@ -122,7 +136,7 @@ func (ob *TargetObserver) schedule(ctx context.Context) { case <-ob.initChan: for _, collectionID := range ob.meta.GetAll() { - ob.init(collectionID) + ob.init(ctx, collectionID) } log.Info("target observer init done") @@ -164,7 +178,7 @@ func (ob *TargetObserver) check(ctx context.Context, collectionID int64) { return } - if ob.shouldUpdateCurrentTarget(collectionID) { + if ob.shouldUpdateCurrentTarget(ctx, collectionID) { ob.updateCurrentTarget(collectionID) } @@ -174,14 +188,14 @@ func (ob *TargetObserver) check(ctx context.Context, collectionID int64) { } } -func (ob *TargetObserver) init(collectionID int64) { +func (ob *TargetObserver) init(ctx context.Context, collectionID int64) { // pull next target first if not exist if !ob.targetMgr.IsNextTargetExist(collectionID) { ob.updateNextTarget(collectionID) } // try to update current target if all segment/channel are ready - if ob.shouldUpdateCurrentTarget(collectionID) { + if ob.shouldUpdateCurrentTarget(ctx, collectionID) { ob.updateCurrentTarget(collectionID) } } @@ -263,7 +277,7 @@ func (ob *TargetObserver) updateNextTargetTimestamp(collectionID int64) { ob.nextTargetLastUpdate.Insert(collectionID, time.Now()) } -func (ob *TargetObserver) shouldUpdateCurrentTarget(collectionID int64) bool { +func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collectionID int64) bool { replicaNum := ob.meta.CollectionManager.GetReplicaNumber(collectionID) // check channel first @@ -293,9 +307,131 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(collectionID int64) bool { } } + replicas := ob.meta.ReplicaManager.GetByCollection(collectionID) + actions := make([]*querypb.SyncAction, 0, 1) + for _, replica := range replicas { + leaders := ob.distMgr.ChannelDistManager.GetShardLeadersByReplica(replica) + for ch, leaderID := range leaders { + actions = actions[:0] + leaderView := ob.distMgr.LeaderViewManager.GetLeaderShardView(leaderID, ch) + if leaderView == nil { + continue + } + updateVersionAction := ob.checkNeedUpdateTargetVersion(ctx, leaderView) + if updateVersionAction != nil { + actions = append(actions, updateVersionAction) + } + if !ob.sync(ctx, replica.GetID(), leaderView, actions) { + return false + } + } + } + return true } +func (ob *TargetObserver) sync(ctx context.Context, replicaID int64, leaderView *meta.LeaderView, diffs []*querypb.SyncAction) bool { + if len(diffs) == 0 { + return true + } + + log := log.With( + zap.Int64("leaderID", leaderView.ID), + zap.Int64("collectionID", leaderView.CollectionID), + zap.String("channel", leaderView.Channel), + ) + + collectionInfo, err := ob.broker.DescribeCollection(ctx, leaderView.CollectionID) + if err != nil { + log.Warn("failed to get collection info", zap.Error(err)) + return false + } + partitions, err := utils.GetPartitions(ob.meta.CollectionManager, leaderView.CollectionID) + if err != nil { + log.Warn("failed to get partitions", zap.Error(err)) + return false + } + + req := &querypb.SyncDistributionRequest{ + Base: commonpbutil.NewMsgBase( + commonpbutil.WithMsgType(commonpb.MsgType_SyncDistribution), + ), + CollectionID: leaderView.CollectionID, + ReplicaID: replicaID, + Channel: leaderView.Channel, + Actions: diffs, + Schema: collectionInfo.GetSchema(), + LoadMeta: &querypb.LoadMetaInfo{ + LoadType: ob.meta.GetLoadType(leaderView.CollectionID), + CollectionID: leaderView.CollectionID, + PartitionIDs: partitions, + }, + Version: time.Now().UnixNano(), + } + ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond)) + defer cancel() + resp, err := ob.cluster.SyncDistribution(ctx, leaderView.ID, req) + if err != nil { + log.Warn("failed to sync distribution", zap.Error(err)) + return false + } + + if resp.ErrorCode != commonpb.ErrorCode_Success { + log.Warn("failed to sync distribution", zap.String("reason", resp.GetReason())) + return false + } + + return true +} + +func (ob *TargetObserver) checkCollectionLeaderVersionIsCurrent(ctx context.Context, collectionID int64) bool { + replicas := ob.meta.ReplicaManager.GetByCollection(collectionID) + for _, replica := range replicas { + leaders := ob.distMgr.ChannelDistManager.GetShardLeadersByReplica(replica) + for ch, leaderID := range leaders { + leaderView := ob.distMgr.LeaderViewManager.GetLeaderShardView(leaderID, ch) + if leaderView == nil { + return false + } + + action := ob.checkNeedUpdateTargetVersion(ctx, leaderView) + if action != nil { + return false + } + } + } + return true +} + +func (ob *TargetObserver) checkNeedUpdateTargetVersion(ctx context.Context, leaderView *meta.LeaderView) *querypb.SyncAction { + log.Ctx(ctx).WithRateGroup("qcv2.LeaderObserver", 1, 60) + targetVersion := ob.targetMgr.GetCollectionTargetVersion(leaderView.CollectionID, meta.NextTarget) + + if targetVersion <= leaderView.TargetVersion { + return nil + } + + log.RatedInfo(10, "Update readable segment version", + zap.Int64("collectionID", leaderView.CollectionID), + zap.String("channelName", leaderView.Channel), + zap.Int64("nodeID", leaderView.ID), + zap.Int64("oldVersion", leaderView.TargetVersion), + zap.Int64("newVersion", targetVersion), + ) + + sealedSegments := ob.targetMgr.GetSealedSegmentsByChannel(leaderView.CollectionID, leaderView.Channel, meta.NextTarget) + growingSegments := ob.targetMgr.GetGrowingSegmentsByChannel(leaderView.CollectionID, leaderView.Channel, meta.NextTarget) + droppedSegments := ob.targetMgr.GetDroppedSegmentsByChannel(leaderView.CollectionID, leaderView.Channel, meta.CurrentTarget) + + return &querypb.SyncAction{ + Type: querypb.SyncType_UpdateVersion, + GrowingInTarget: growingSegments.Collect(), + SealedInTarget: lo.Keys(sealedSegments), + DroppedInTarget: droppedSegments, + TargetVersion: targetVersion, + } +} + func (ob *TargetObserver) updateCurrentTarget(collectionID int64) { log.Info("observer trigger update current target", zap.Int64("collectionID", collectionID)) if ob.targetMgr.UpdateCollectionCurrentTarget(collectionID) { diff --git a/internal/querycoordv2/observers/target_observer_test.go b/internal/querycoordv2/observers/target_observer_test.go index b5509f6ec9..0c0798a1b2 100644 --- a/internal/querycoordv2/observers/target_observer_test.go +++ b/internal/querycoordv2/observers/target_observer_test.go @@ -46,6 +46,7 @@ type TargetObserverSuite struct { targetMgr *meta.TargetManager distMgr *meta.DistributionManager broker *meta.MockBroker + cluster *session.MockCluster observer *TargetObserver @@ -82,7 +83,8 @@ func (suite *TargetObserverSuite) SetupTest() { suite.broker = meta.NewMockBroker(suite.T()) suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta) suite.distMgr = meta.NewDistributionManager() - suite.observer = NewTargetObserver(suite.meta, suite.targetMgr, suite.distMgr, suite.broker) + suite.cluster = session.NewMockCluster(suite.T()) + suite.observer = NewTargetObserver(suite.meta, suite.targetMgr, suite.distMgr, suite.broker, suite.cluster) suite.collectionID = int64(1000) suite.partitionID = int64(100) @@ -225,6 +227,7 @@ type TargetObserverCheckSuite struct { targetMgr *meta.TargetManager distMgr *meta.DistributionManager broker *meta.MockBroker + cluster *session.MockCluster observer *TargetObserver @@ -258,7 +261,14 @@ func (suite *TargetObserverCheckSuite) SetupTest() { suite.broker = meta.NewMockBroker(suite.T()) suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta) suite.distMgr = meta.NewDistributionManager() - suite.observer = NewTargetObserver(suite.meta, suite.targetMgr, suite.distMgr, suite.broker) + suite.cluster = session.NewMockCluster(suite.T()) + suite.observer = NewTargetObserver( + suite.meta, + suite.targetMgr, + suite.distMgr, + suite.broker, + suite.cluster, + ) suite.collectionID = int64(1000) suite.partitionID = int64(100) diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index 7bbf968fe1..a29831ade3 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -371,6 +371,7 @@ func (s *Server) initObserver() { s.targetMgr, s.dist, s.broker, + s.cluster, ) s.collectionObserver = observers.NewCollectionObserver( s.dist, diff --git a/internal/querycoordv2/server_test.go b/internal/querycoordv2/server_test.go index 7eeb020dcb..109b325d5a 100644 --- a/internal/querycoordv2/server_test.go +++ b/internal/querycoordv2/server_test.go @@ -556,6 +556,7 @@ func (suite *ServerSuite) hackServer() { suite.server.targetMgr, suite.server.dist, suite.broker, + suite.server.cluster, ) suite.server.collectionObserver = observers.NewCollectionObserver( suite.server.dist, diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index 6dba44acad..f251c17e5a 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -149,6 +149,7 @@ func (suite *ServiceSuite) SetupTest() { suite.targetMgr, suite.dist, suite.broker, + suite.cluster, ) suite.targetObserver.Start() for _, node := range suite.nodes { @@ -157,6 +158,7 @@ func (suite *ServiceSuite) SetupTest() { suite.NoError(err) } suite.cluster = session.NewMockCluster(suite.T()) + suite.cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe() suite.jobScheduler = job.NewScheduler() suite.taskScheduler = task.NewMockScheduler(suite.T()) suite.jobScheduler.Start()