From 32fb409e5787b336ce4985b32f82ab10f78b32e0 Mon Sep 17 00:00:00 2001 From: yah01 Date: Fri, 13 Jan 2023 17:11:41 +0800 Subject: [PATCH] Fix may update the current target to an unavailable target when node down (#21698) Signed-off-by: yah01 --- .../querycoordv2/observers/target_observer.go | 54 +++++++++++++++++-- .../observers/target_observer_test.go | 22 ++++++-- internal/querycoordv2/server.go | 2 +- 3 files changed, 70 insertions(+), 8 deletions(-) diff --git a/internal/querycoordv2/observers/target_observer.go b/internal/querycoordv2/observers/target_observer.go index 0f5cbd6d19..195c24f56c 100644 --- a/internal/querycoordv2/observers/target_observer.go +++ b/internal/querycoordv2/observers/target_observer.go @@ -30,6 +30,12 @@ import ( "github.com/milvus-io/milvus/internal/util/typeutil" ) +type targetUpdateRequest struct { + CollectionID int64 + Notifier chan error + ReadyNotifier chan struct{} +} + type TargetObserver struct { c chan struct{} wg sync.WaitGroup @@ -39,7 +45,10 @@ type TargetObserver struct { broker meta.Broker nextTargetLastUpdate map[int64]time.Time - stopOnce sync.Once + updateChan chan targetUpdateRequest + readyNotifiers map[int64][]chan struct{} // CollectionID -> Notifiers + + stopOnce sync.Once } func NewTargetObserver(meta *meta.Meta, targetMgr *meta.TargetManager, distMgr *meta.DistributionManager, broker meta.Broker) *TargetObserver { @@ -50,6 +59,8 @@ func NewTargetObserver(meta *meta.Meta, targetMgr *meta.TargetManager, distMgr * distMgr: distMgr, broker: broker, nextTargetLastUpdate: make(map[int64]time.Time), + updateChan: make(chan targetUpdateRequest), + readyNotifiers: make(map[int64][]chan struct{}), } } @@ -81,10 +92,36 @@ func (ob *TargetObserver) schedule(ctx context.Context) { case <-ticker.C: ob.tryUpdateTarget() + + case request := <-ob.updateChan: + err := ob.updateNextTarget(request.CollectionID) + if err != nil { + close(request.ReadyNotifier) + } else { + ob.readyNotifiers[request.CollectionID] = append(ob.readyNotifiers[request.CollectionID], request.ReadyNotifier) + } + + request.Notifier <- err } } } +// UpdateNextTarget updates the next target, +// returns a channel which will be closed when the next target is ready, +// or returns error if failed to pull target +func (ob *TargetObserver) UpdateNextTarget(collectionID int64) (chan struct{}, error) { + notifier := make(chan error) + readyCh := make(chan struct{}) + defer close(notifier) + + ob.updateChan <- targetUpdateRequest{ + CollectionID: collectionID, + Notifier: notifier, + ReadyNotifier: readyCh, + } + return readyCh, <-notifier +} + func (ob *TargetObserver) tryUpdateTarget() { collections := ob.meta.GetAll() for _, collectionID := range collections { @@ -94,7 +131,7 @@ func (ob *TargetObserver) tryUpdateTarget() { if ob.shouldUpdateNextTarget(collectionID) { // update next target in collection level - ob.UpdateNextTarget(collectionID) + ob.updateNextTarget(collectionID) } } @@ -115,7 +152,7 @@ func (ob *TargetObserver) isNextTargetExpired(collectionID int64) bool { return time.Since(ob.nextTargetLastUpdate[collectionID]) > params.Params.QueryCoordCfg.NextTargetSurviveTime.GetAsDuration(time.Second) } -func (ob *TargetObserver) UpdateNextTarget(collectionID int64) { +func (ob *TargetObserver) updateNextTarget(collectionID int64) error { log := log.With(zap.Int64("collectionID", collectionID)) log.Warn("observer trigger update next target") @@ -123,9 +160,10 @@ func (ob *TargetObserver) UpdateNextTarget(collectionID int64) { if err != nil { log.Error("failed to update next target for collection", zap.Error(err)) - return + return err } ob.updateNextTargetTimestamp(collectionID) + return nil } func (ob *TargetObserver) updateNextTargetTimestamp(collectionID int64) { @@ -175,4 +213,12 @@ func (ob *TargetObserver) updateCurrentTarget(collectionID int64) { log.Warn("observer trigger update current target", zap.Int64("collectionID", collectionID)) ob.targetMgr.UpdateCollectionCurrentTarget(collectionID) + notifiers := ob.readyNotifiers[collectionID] + for _, notifier := range notifiers { + close(notifier) + } + // Reuse the capacity of notifiers slice + if notifiers != nil { + ob.readyNotifiers[collectionID] = notifiers[:0] + } } diff --git a/internal/querycoordv2/observers/target_observer_test.go b/internal/querycoordv2/observers/target_observer_test.go index da2e02bc69..baab112156 100644 --- a/internal/querycoordv2/observers/target_observer_test.go +++ b/internal/querycoordv2/observers/target_observer_test.go @@ -154,15 +154,24 @@ func (suite *TargetObserverSuite) TestTriggerUpdateTarget() { SegmentID: 13, InsertChannel: "channel-1", }) - suite.broker.EXPECT().GetRecoveryInfo(mock.Anything, mock.Anything, mock.Anything).Return(suite.nextTargetChannels, suite.nextTargetSegments, nil) - suite.broker.EXPECT().GetPartitions(mock.Anything, mock.Anything).Return([]int64{suite.partitionID}, nil) suite.targetMgr.UpdateCollectionCurrentTarget(suite.collectionID) // Pull next again + suite.broker.EXPECT(). + GetRecoveryInfo(mock.Anything, mock.Anything, mock.Anything). + Return(suite.nextTargetChannels, suite.nextTargetSegments, nil) + suite.broker.EXPECT(). + GetPartitions(mock.Anything, mock.Anything). + Return([]int64{suite.partitionID}, nil) suite.Eventually(func() bool { return len(suite.targetMgr.GetHistoricalSegmentsByCollection(suite.collectionID, meta.NextTarget)) == 3 && len(suite.targetMgr.GetDmChannelsByCollection(suite.collectionID, meta.NextTarget)) == 2 }, 7*time.Second, 1*time.Second) + suite.broker.AssertExpectations(suite.T()) + + // Manually update next target + ready, err := suite.observer.UpdateNextTarget(suite.collectionID) + suite.NoError(err) suite.distMgr.LeaderViewManager.Update(2, &meta.LeaderView{ @@ -186,7 +195,14 @@ func (suite *TargetObserverSuite) TestTriggerUpdateTarget() { // Able to update current if it's not empty suite.Eventually(func() bool { - return len(suite.targetMgr.GetHistoricalSegmentsByCollection(suite.collectionID, meta.CurrentTarget)) == 3 && + isReady := false + select { + case <-ready: + isReady = true + default: + } + return isReady && + len(suite.targetMgr.GetHistoricalSegmentsByCollection(suite.collectionID, meta.CurrentTarget)) == 3 && len(suite.targetMgr.GetDmChannelsByCollection(suite.collectionID, meta.CurrentTarget)) == 2 }, 7*time.Second, 1*time.Second) } diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index d3822fd026..dec049b425 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -612,7 +612,7 @@ func (s *Server) handleNodeDown(node int64) { // are missed, it will recover for a while. channels := s.dist.ChannelDistManager.GetByNode(node) for _, channel := range channels { - err := s.targetMgr.UpdateCollectionNextTarget(channel.GetCollectionID()) + _, err := s.targetObserver.UpdateNextTarget(channel.GetCollectionID()) if err != nil { msg := "failed to update next targets for collection" log.Error(msg,