From 75676fbd110e4d54db04ff8cdd23f90b8e691726 Mon Sep 17 00:00:00 2001 From: wei liu Date: Thu, 5 Sep 2024 18:47:03 +0800 Subject: [PATCH] fix: Fix dynamic release partition may fail search/query request (#35919) issue: #33550 cause concurrent issue may occur between remove parition in target manager and sync segment list to delegator. when it happens, some segment may be released in delegator, and those segment may also be synced to delegator, which cause delegator become unserviceable due to lack of necessary segments, then search/query fails. this PR make sure that all write access to target_manager will be executed in serial to avoid the concurrent issues. Signed-off-by: Wei Liu --- internal/querycoordv2/job/job_release.go | 4 +- internal/querycoordv2/job/undo.go | 3 +- .../observers/collection_observer.go | 6 +- .../querycoordv2/observers/target_observer.go | 112 ++++++++++++------ .../observers/target_observer_test.go | 15 +++ 5 files changed, 98 insertions(+), 42 deletions(-) diff --git a/internal/querycoordv2/job/job_release.go b/internal/querycoordv2/job/job_release.go index 4984151031..4551b504dc 100644 --- a/internal/querycoordv2/job/job_release.go +++ b/internal/querycoordv2/job/job_release.go @@ -95,7 +95,6 @@ func (job *ReleaseCollectionJob) Execute() error { log.Warn(msg, zap.Error(err)) } - job.targetMgr.RemoveCollection(req.GetCollectionID()) job.targetObserver.ReleaseCollection(req.GetCollectionID()) waitCollectionReleased(job.dist, job.checkerController, req.GetCollectionID()) metrics.QueryCoordNumCollections.WithLabelValues().Dec() @@ -178,7 +177,6 @@ func (job *ReleasePartitionJob) Execute() error { if err != nil { log.Warn("failed to remove replicas", zap.Error(err)) } - job.targetMgr.RemoveCollection(req.GetCollectionID()) job.targetObserver.ReleaseCollection(req.GetCollectionID()) metrics.QueryCoordNumCollections.WithLabelValues().Dec() waitCollectionReleased(job.dist, job.checkerController, req.GetCollectionID()) @@ -189,7 +187,7 @@ func (job *ReleasePartitionJob) Execute() error { log.Warn(msg, zap.Error(err)) return errors.Wrap(err, msg) } - job.targetMgr.RemovePartition(req.GetCollectionID(), toRelease...) + job.targetObserver.ReleasePartition(req.GetCollectionID(), toRelease...) waitCollectionReleased(job.dist, job.checkerController, req.GetCollectionID(), toRelease...) } metrics.QueryCoordNumPartitions.WithLabelValues().Sub(float64(len(toRelease))) diff --git a/internal/querycoordv2/job/undo.go b/internal/querycoordv2/job/undo.go index 6ac0b62296..e1314f0aec 100644 --- a/internal/querycoordv2/job/undo.go +++ b/internal/querycoordv2/job/undo.go @@ -78,10 +78,9 @@ func (u *UndoList) RollBack() { if u.IsTargetUpdated { if u.IsNewCollection { - u.targetMgr.RemoveCollection(u.CollectionID) u.targetObserver.ReleaseCollection(u.CollectionID) } else { - u.targetMgr.RemovePartition(u.CollectionID, u.LackPartitions...) + u.targetObserver.ReleasePartition(u.CollectionID, u.LackPartitions...) } } } diff --git a/internal/querycoordv2/observers/collection_observer.go b/internal/querycoordv2/observers/collection_observer.go index 08c77858d3..843e3a4d55 100644 --- a/internal/querycoordv2/observers/collection_observer.go +++ b/internal/querycoordv2/observers/collection_observer.go @@ -180,7 +180,7 @@ func (ob *CollectionObserver) observeTimeout() { zap.Duration("loadTime", time.Since(collection.CreatedAt))) ob.meta.CollectionManager.RemoveCollection(collection.GetCollectionID()) ob.meta.ReplicaManager.RemoveCollection(collection.GetCollectionID()) - ob.targetMgr.RemoveCollection(collection.GetCollectionID()) + ob.targetObserver.ReleaseCollection(collection.GetCollectionID()) ob.loadTasks.Remove(traceID) } case querypb.LoadType_LoadPartition: @@ -214,7 +214,7 @@ func (ob *CollectionObserver) observeTimeout() { zap.Int64s("partitionIDs", task.PartitionIDs)) for _, partition := range partitions { ob.meta.CollectionManager.RemovePartition(partition.CollectionID, partition.GetPartitionID()) - ob.targetMgr.RemovePartition(partition.GetCollectionID(), partition.GetPartitionID()) + ob.targetObserver.ReleasePartition(partition.GetCollectionID(), partition.GetPartitionID()) } // all partition timeout, remove collection @@ -223,7 +223,7 @@ func (ob *CollectionObserver) observeTimeout() { ob.meta.CollectionManager.RemoveCollection(task.CollectionID) ob.meta.ReplicaManager.RemoveCollection(task.CollectionID) - ob.targetMgr.RemoveCollection(task.CollectionID) + ob.targetObserver.ReleaseCollection(task.CollectionID) } } } diff --git a/internal/querycoordv2/observers/target_observer.go b/internal/querycoordv2/observers/target_observer.go index 5e6f7ede3c..8e69a32993 100644 --- a/internal/querycoordv2/observers/target_observer.go +++ b/internal/querycoordv2/observers/target_observer.go @@ -38,15 +38,33 @@ import ( "github.com/milvus-io/milvus/pkg/util/typeutil" ) -type checkRequest struct { - CollectionID int64 - Notifier chan bool +type targetOp int + +func (op *targetOp) String() string { + switch *op { + case UpdateCollection: + return "UpdateCollection" + case ReleaseCollection: + return "ReleaseCollection" + case ReleasePartition: + return "ReleasePartition" + default: + return "Unknown" + } } +const ( + UpdateCollection targetOp = iota + 1 + ReleaseCollection + ReleasePartition +) + type targetUpdateRequest struct { CollectionID int64 + PartitionIDs []int64 Notifier chan error ReadyNotifier chan struct{} + opType targetOp } type initRequest struct{} @@ -60,8 +78,7 @@ type TargetObserver struct { broker meta.Broker cluster session.Cluster - initChan chan initRequest - manualCheck chan checkRequest + initChan chan initRequest // nextTargetLastUpdate map[int64]time.Time nextTargetLastUpdate *typeutil.ConcurrentMap[int64, time.Time] updateChan chan targetUpdateRequest @@ -88,9 +105,8 @@ func NewTargetObserver( distMgr: distMgr, broker: broker, cluster: cluster, - manualCheck: make(chan checkRequest, 10), nextTargetLastUpdate: typeutil.NewConcurrentMap[int64, time.Time](), - updateChan: make(chan targetUpdateRequest), + updateChan: make(chan targetUpdateRequest, 10), readyNotifiers: make(map[int64][]chan struct{}), initChan: make(chan initRequest), keylocks: lock.NewKeyLock[int64](), @@ -152,23 +168,44 @@ func (ob *TargetObserver) schedule(ctx context.Context) { ob.dispatcher.AddTask(ob.meta.GetAll()...) case req := <-ob.updateChan: - log := log.With(zap.Int64("collectionID", req.CollectionID)) - log.Info("manually trigger update next target") - ob.keylocks.Lock(req.CollectionID) - err := ob.updateNextTarget(req.CollectionID) - ob.keylocks.Unlock(req.CollectionID) - if err != nil { - log.Warn("failed to manually update next target", zap.Error(err)) - close(req.ReadyNotifier) - } else { + log.Info("manually trigger update target", + zap.Int64("collectionID", req.CollectionID), + zap.String("opType", req.opType.String()), + ) + switch req.opType { + case UpdateCollection: + ob.keylocks.Lock(req.CollectionID) + err := ob.updateNextTarget(req.CollectionID) + ob.keylocks.Unlock(req.CollectionID) + if err != nil { + log.Warn("failed to manually update next target", + zap.Int64("collectionID", req.CollectionID), + zap.String("opType", req.opType.String()), + zap.Error(err)) + close(req.ReadyNotifier) + } else { + ob.mut.Lock() + ob.readyNotifiers[req.CollectionID] = append(ob.readyNotifiers[req.CollectionID], req.ReadyNotifier) + ob.mut.Unlock() + } + req.Notifier <- err + case ReleaseCollection: ob.mut.Lock() - ob.readyNotifiers[req.CollectionID] = append(ob.readyNotifiers[req.CollectionID], req.ReadyNotifier) + for _, notifier := range ob.readyNotifiers[req.CollectionID] { + close(notifier) + } + delete(ob.readyNotifiers, req.CollectionID) ob.mut.Unlock() - } - log.Info("manually trigger update target done") - req.Notifier <- err - log.Info("notify manually trigger update target done") + ob.targetMgr.RemoveCollection(req.CollectionID) + req.Notifier <- nil + case ReleasePartition: + ob.targetMgr.RemovePartition(req.CollectionID, req.PartitionIDs...) + req.Notifier <- nil + } + log.Info("manually trigger update target done", + zap.Int64("collectionID", req.CollectionID), + zap.String("opType", req.opType.String())) } } } @@ -184,14 +221,6 @@ func (ob *TargetObserver) Check(ctx context.Context, collectionID int64, partiti } func (ob *TargetObserver) check(ctx context.Context, collectionID int64) { - if !ob.meta.Exist(collectionID) { - ob.ReleaseCollection(collectionID) - ob.targetMgr.RemoveCollection(collectionID) - log.Info("collection has been removed from target observer", - zap.Int64("collectionID", collectionID)) - return - } - ob.keylocks.Lock(collectionID) defer ob.keylocks.Unlock(collectionID) @@ -229,6 +258,7 @@ func (ob *TargetObserver) UpdateNextTarget(collectionID int64) (chan struct{}, e ob.updateChan <- targetUpdateRequest{ CollectionID: collectionID, + opType: UpdateCollection, Notifier: notifier, ReadyNotifier: readyCh, } @@ -236,12 +266,26 @@ func (ob *TargetObserver) UpdateNextTarget(collectionID int64) (chan struct{}, e } func (ob *TargetObserver) ReleaseCollection(collectionID int64) { - ob.mut.Lock() - defer ob.mut.Unlock() - for _, notifier := range ob.readyNotifiers[collectionID] { - close(notifier) + notifier := make(chan error) + defer close(notifier) + ob.updateChan <- targetUpdateRequest{ + CollectionID: collectionID, + opType: ReleaseCollection, + Notifier: notifier, } - delete(ob.readyNotifiers, collectionID) + <-notifier +} + +func (ob *TargetObserver) ReleasePartition(collectionID int64, partitionID ...int64) { + notifier := make(chan error) + defer close(notifier) + ob.updateChan <- targetUpdateRequest{ + CollectionID: collectionID, + PartitionIDs: partitionID, + opType: ReleasePartition, + Notifier: notifier, + } + <-notifier } func (ob *TargetObserver) clean() { diff --git a/internal/querycoordv2/observers/target_observer_test.go b/internal/querycoordv2/observers/target_observer_test.go index 825a2b28bb..45d804b0f8 100644 --- a/internal/querycoordv2/observers/target_observer_test.go +++ b/internal/querycoordv2/observers/target_observer_test.go @@ -21,6 +21,7 @@ import ( "testing" "time" + "github.com/samber/lo" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -214,6 +215,20 @@ func (suite *TargetObserverSuite) TestTriggerUpdateTarget() { }, 7*time.Second, 1*time.Second) } +func (suite *TargetObserverSuite) TestTriggerRelease() { + // Manually update next target + _, err := suite.observer.UpdateNextTarget(suite.collectionID) + suite.NoError(err) + + // manually release partition + partitions := suite.meta.CollectionManager.GetPartitionsByCollection(suite.collectionID) + partitionIDs := lo.Map(partitions, func(partition *meta.Partition, _ int) int64 { return partition.PartitionID }) + suite.observer.ReleasePartition(suite.collectionID, partitionIDs[0]) + + // manually release collection + suite.observer.ReleaseCollection(suite.collectionID) +} + func (suite *TargetObserverSuite) TearDownTest() { suite.kv.Close() suite.observer.Stop()