From d7ff1bbe5cea34608a9ad21cf8a435e98f1fe7e4 Mon Sep 17 00:00:00 2001 From: congqixia Date: Mon, 22 Apr 2024 10:39:22 +0800 Subject: [PATCH] enhance: Make querycoordv2 collection observer task driven (#32441) See also #32440 - Add loadTask in collection observer - For load collection/partitions, load task shall timeout as a whole - Change related constructor to load jobs --------- Signed-off-by: Congqi Xia --- internal/querycoordv2/job/job_load.go | 83 ++++---- internal/querycoordv2/job/job_test.go | 59 +++++- .../observers/collection_observer.go | 192 ++++++++++++++---- .../observers/collection_observer_test.go | 3 + internal/querycoordv2/services.go | 2 + internal/querycoordv2/services_test.go | 44 ++-- 6 files changed, 276 insertions(+), 107 deletions(-) diff --git a/internal/querycoordv2/job/job_load.go b/internal/querycoordv2/job/job_load.go index 396105e99d..5dc89c3ad1 100644 --- a/internal/querycoordv2/job/job_load.go +++ b/internal/querycoordv2/job/job_load.go @@ -44,13 +44,14 @@ type LoadCollectionJob struct { req *querypb.LoadCollectionRequest undo *UndoList - dist *meta.DistributionManager - meta *meta.Meta - broker meta.Broker - cluster session.Cluster - targetMgr *meta.TargetManager - targetObserver *observers.TargetObserver - nodeMgr *session.NodeManager + dist *meta.DistributionManager + meta *meta.Meta + broker meta.Broker + cluster session.Cluster + targetMgr *meta.TargetManager + targetObserver *observers.TargetObserver + collectionObserver *observers.CollectionObserver + nodeMgr *session.NodeManager } func NewLoadCollectionJob( @@ -62,19 +63,21 @@ func NewLoadCollectionJob( cluster session.Cluster, targetMgr *meta.TargetManager, targetObserver *observers.TargetObserver, + collectionObserver *observers.CollectionObserver, nodeMgr *session.NodeManager, ) *LoadCollectionJob { return &LoadCollectionJob{ - BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()), - req: req, - undo: NewUndoList(ctx, meta, cluster, targetMgr, targetObserver), - dist: dist, - meta: meta, - broker: broker, - cluster: cluster, - targetMgr: targetMgr, - targetObserver: targetObserver, - nodeMgr: nodeMgr, + BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()), + req: req, + undo: NewUndoList(ctx, meta, cluster, targetMgr, targetObserver), + dist: dist, + meta: meta, + broker: broker, + cluster: cluster, + targetMgr: targetMgr, + targetObserver: targetObserver, + collectionObserver: collectionObserver, + nodeMgr: nodeMgr, } } @@ -184,7 +187,7 @@ func (job *LoadCollectionJob) Execute() error { } }) - _, sp := otel.Tracer(typeutil.QueryCoordRole).Start(job.ctx, "LoadCollection", trace.WithNewRoot()) + ctx, sp := otel.Tracer(typeutil.QueryCoordRole).Start(job.ctx, "LoadCollection", trace.WithNewRoot()) collection := &meta.Collection{ CollectionLoadInfo: &querypb.CollectionLoadInfo{ CollectionID: req.GetCollectionID(), @@ -214,6 +217,9 @@ func (job *LoadCollectionJob) Execute() error { } job.undo.IsTargetUpdated = true + // 6. register load task into collection observer + job.collectionObserver.LoadCollection(ctx, req.GetCollectionID()) + return nil } @@ -228,13 +234,14 @@ type LoadPartitionJob struct { req *querypb.LoadPartitionsRequest undo *UndoList - dist *meta.DistributionManager - meta *meta.Meta - broker meta.Broker - cluster session.Cluster - targetMgr *meta.TargetManager - targetObserver *observers.TargetObserver - nodeMgr *session.NodeManager + dist *meta.DistributionManager + meta *meta.Meta + broker meta.Broker + cluster session.Cluster + targetMgr *meta.TargetManager + targetObserver *observers.TargetObserver + collectionObserver *observers.CollectionObserver + nodeMgr *session.NodeManager } func NewLoadPartitionJob( @@ -246,19 +253,21 @@ func NewLoadPartitionJob( cluster session.Cluster, targetMgr *meta.TargetManager, targetObserver *observers.TargetObserver, + collectionObserver *observers.CollectionObserver, nodeMgr *session.NodeManager, ) *LoadPartitionJob { return &LoadPartitionJob{ - BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()), - req: req, - undo: NewUndoList(ctx, meta, cluster, targetMgr, targetObserver), - dist: dist, - meta: meta, - broker: broker, - cluster: cluster, - targetMgr: targetMgr, - targetObserver: targetObserver, - nodeMgr: nodeMgr, + BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()), + req: req, + undo: NewUndoList(ctx, meta, cluster, targetMgr, targetObserver), + dist: dist, + meta: meta, + broker: broker, + cluster: cluster, + targetMgr: targetMgr, + targetObserver: targetObserver, + collectionObserver: collectionObserver, + nodeMgr: nodeMgr, } } @@ -360,10 +369,10 @@ func (job *LoadPartitionJob) Execute() error { CreatedAt: time.Now(), } }) + ctx, sp := otel.Tracer(typeutil.QueryCoordRole).Start(job.ctx, "LoadPartition", trace.WithNewRoot()) if !job.meta.CollectionManager.Exist(req.GetCollectionID()) { job.undo.IsNewCollection = true - _, sp := otel.Tracer(typeutil.QueryCoordRole).Start(job.ctx, "LoadPartition", trace.WithNewRoot()) collection := &meta.Collection{ CollectionLoadInfo: &querypb.CollectionLoadInfo{ CollectionID: req.GetCollectionID(), @@ -399,6 +408,8 @@ func (job *LoadPartitionJob) Execute() error { } job.undo.IsTargetUpdated = true + job.collectionObserver.LoadPartitions(ctx, req.GetCollectionID(), lackPartitionIDs) + return nil } diff --git a/internal/querycoordv2/job/job_test.go b/internal/querycoordv2/job/job_test.go index 633583f27d..40b2628c04 100644 --- a/internal/querycoordv2/job/job_test.go +++ b/internal/querycoordv2/job/job_test.go @@ -60,16 +60,17 @@ type JobSuite struct { loadTypes map[int64]querypb.LoadType // Dependencies - kv kv.MetaKv - store metastore.QueryCoordCatalog - dist *meta.DistributionManager - meta *meta.Meta - cluster *session.MockCluster - targetMgr *meta.TargetManager - targetObserver *observers.TargetObserver - broker *meta.MockBroker - nodeMgr *session.NodeManager - checkerController *checkers.CheckerController + kv kv.MetaKv + store metastore.QueryCoordCatalog + dist *meta.DistributionManager + meta *meta.Meta + cluster *session.MockCluster + targetMgr *meta.TargetManager + targetObserver *observers.TargetObserver + collectionObserver *observers.CollectionObserver + broker *meta.MockBroker + nodeMgr *session.NodeManager + checkerController *checkers.CheckerController // Test objects scheduler *Scheduler @@ -192,6 +193,13 @@ func (suite *JobSuite) SetupTest() { suite.meta.HandleNodeUp(3000) suite.checkerController = &checkers.CheckerController{} + suite.collectionObserver = observers.NewCollectionObserver( + suite.dist, + suite.meta, + suite.targetMgr, + suite.targetObserver, + suite.checkerController, + ) } func (suite *JobSuite) TearDownTest() { @@ -231,6 +239,7 @@ func (suite *JobSuite) TestLoadCollection() { suite.cluster, suite.targetMgr, suite.targetObserver, + suite.collectionObserver, suite.nodeMgr, ) suite.scheduler.Add(job) @@ -258,6 +267,7 @@ func (suite *JobSuite) TestLoadCollection() { suite.cluster, suite.targetMgr, suite.targetObserver, + suite.collectionObserver, suite.nodeMgr, ) suite.scheduler.Add(job) @@ -283,6 +293,7 @@ func (suite *JobSuite) TestLoadCollection() { suite.cluster, suite.targetMgr, suite.targetObserver, + suite.collectionObserver, suite.nodeMgr, ) suite.scheduler.Add(job) @@ -310,6 +321,7 @@ func (suite *JobSuite) TestLoadCollection() { suite.cluster, suite.targetMgr, suite.targetObserver, + suite.collectionObserver, suite.nodeMgr, ) suite.scheduler.Add(job) @@ -345,6 +357,7 @@ func (suite *JobSuite) TestLoadCollection() { suite.cluster, suite.targetMgr, suite.targetObserver, + suite.collectionObserver, suite.nodeMgr, ) suite.scheduler.Add(job) @@ -366,6 +379,7 @@ func (suite *JobSuite) TestLoadCollection() { suite.cluster, suite.targetMgr, suite.targetObserver, + suite.collectionObserver, suite.nodeMgr, ) suite.scheduler.Add(job) @@ -395,6 +409,7 @@ func (suite *JobSuite) TestLoadCollectionWithReplicas() { suite.cluster, suite.targetMgr, suite.targetObserver, + suite.collectionObserver, suite.nodeMgr, ) suite.scheduler.Add(job) @@ -427,6 +442,7 @@ func (suite *JobSuite) TestLoadCollectionWithDiffIndex() { suite.cluster, suite.targetMgr, suite.targetObserver, + suite.collectionObserver, suite.nodeMgr, ) suite.scheduler.Add(job) @@ -457,6 +473,7 @@ func (suite *JobSuite) TestLoadCollectionWithDiffIndex() { suite.cluster, suite.targetMgr, suite.targetObserver, + suite.collectionObserver, suite.nodeMgr, ) suite.scheduler.Add(job) @@ -488,6 +505,7 @@ func (suite *JobSuite) TestLoadPartition() { suite.cluster, suite.targetMgr, suite.targetObserver, + suite.collectionObserver, suite.nodeMgr, ) suite.scheduler.Add(job) @@ -518,6 +536,7 @@ func (suite *JobSuite) TestLoadPartition() { suite.cluster, suite.targetMgr, suite.targetObserver, + suite.collectionObserver, suite.nodeMgr, ) suite.scheduler.Add(job) @@ -545,6 +564,7 @@ func (suite *JobSuite) TestLoadPartition() { suite.cluster, suite.targetMgr, suite.targetObserver, + suite.collectionObserver, suite.nodeMgr, ) suite.scheduler.Add(job) @@ -572,6 +592,7 @@ func (suite *JobSuite) TestLoadPartition() { suite.cluster, suite.targetMgr, suite.targetObserver, + suite.collectionObserver, suite.nodeMgr, ) suite.scheduler.Add(job) @@ -598,6 +619,7 @@ func (suite *JobSuite) TestLoadPartition() { suite.cluster, suite.targetMgr, suite.targetObserver, + suite.collectionObserver, suite.nodeMgr, ) suite.scheduler.Add(job) @@ -633,6 +655,7 @@ func (suite *JobSuite) TestLoadPartition() { suite.cluster, suite.targetMgr, suite.targetObserver, + suite.collectionObserver, suite.nodeMgr, ) suite.scheduler.Add(job) @@ -655,6 +678,7 @@ func (suite *JobSuite) TestLoadPartition() { suite.cluster, suite.targetMgr, suite.targetObserver, + suite.collectionObserver, suite.nodeMgr, ) suite.scheduler.Add(job) @@ -682,6 +706,7 @@ func (suite *JobSuite) TestDynamicLoad() { suite.cluster, suite.targetMgr, suite.targetObserver, + suite.collectionObserver, suite.nodeMgr, ) return job @@ -700,6 +725,7 @@ func (suite *JobSuite) TestDynamicLoad() { suite.cluster, suite.targetMgr, suite.targetObserver, + suite.collectionObserver, suite.nodeMgr, ) return job @@ -799,6 +825,7 @@ func (suite *JobSuite) TestLoadPartitionWithReplicas() { suite.cluster, suite.targetMgr, suite.targetObserver, + suite.collectionObserver, suite.nodeMgr, ) suite.scheduler.Add(job) @@ -832,6 +859,7 @@ func (suite *JobSuite) TestLoadPartitionWithDiffIndex() { suite.cluster, suite.targetMgr, suite.targetObserver, + suite.collectionObserver, suite.nodeMgr, ) suite.scheduler.Add(job) @@ -864,6 +892,7 @@ func (suite *JobSuite) TestLoadPartitionWithDiffIndex() { suite.cluster, suite.targetMgr, suite.targetObserver, + suite.collectionObserver, suite.nodeMgr, ) suite.scheduler.Add(job) @@ -891,6 +920,7 @@ func (suite *JobSuite) TestReleaseCollection() { suite.cluster, suite.targetMgr, suite.targetObserver, + suite.checkerController, ) suite.scheduler.Add(job) @@ -1133,6 +1163,7 @@ func (suite *JobSuite) TestLoadCollectionStoreFailed() { suite.cluster, suite.targetMgr, suite.targetObserver, + suite.collectionObserver, suite.nodeMgr, ) suite.scheduler.Add(job) @@ -1174,6 +1205,7 @@ func (suite *JobSuite) TestLoadPartitionStoreFailed() { suite.cluster, suite.targetMgr, suite.targetObserver, + suite.collectionObserver, suite.nodeMgr, ) suite.scheduler.Add(job) @@ -1201,6 +1233,7 @@ func (suite *JobSuite) TestLoadCreateReplicaFailed() { suite.cluster, suite.targetMgr, suite.targetObserver, + suite.collectionObserver, suite.nodeMgr, ) suite.scheduler.Add(job) @@ -1229,6 +1262,7 @@ func (suite *JobSuite) TestCallLoadPartitionFailed() { suite.cluster, suite.targetMgr, suite.targetObserver, + suite.collectionObserver, suite.nodeMgr, ) suite.scheduler.Add(loadCollectionJob) @@ -1249,6 +1283,7 @@ func (suite *JobSuite) TestCallLoadPartitionFailed() { suite.cluster, suite.targetMgr, suite.targetObserver, + suite.collectionObserver, suite.nodeMgr, ) suite.scheduler.Add(loadPartitionJob) @@ -1275,6 +1310,7 @@ func (suite *JobSuite) TestCallLoadPartitionFailed() { suite.cluster, suite.targetMgr, suite.targetObserver, + suite.collectionObserver, suite.nodeMgr, ) suite.scheduler.Add(loadCollectionJob) @@ -1294,6 +1330,7 @@ func (suite *JobSuite) TestCallLoadPartitionFailed() { suite.cluster, suite.targetMgr, suite.targetObserver, + suite.collectionObserver, suite.nodeMgr, ) suite.scheduler.Add(loadPartitionJob) @@ -1436,6 +1473,7 @@ func (suite *JobSuite) loadAll() { suite.cluster, suite.targetMgr, suite.targetObserver, + suite.collectionObserver, suite.nodeMgr, ) suite.scheduler.Add(job) @@ -1460,6 +1498,7 @@ func (suite *JobSuite) loadAll() { suite.cluster, suite.targetMgr, suite.targetObserver, + suite.collectionObserver, suite.nodeMgr, ) suite.scheduler.Add(job) diff --git a/internal/querycoordv2/observers/collection_observer.go b/internal/querycoordv2/observers/collection_observer.go index d8f2428f68..e9e90fcdfa 100644 --- a/internal/querycoordv2/observers/collection_observer.go +++ b/internal/querycoordv2/observers/collection_observer.go @@ -23,6 +23,7 @@ import ( "time" "github.com/samber/lo" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "github.com/milvus-io/milvus/internal/proto/querypb" @@ -32,6 +33,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/utils" "github.com/milvus-io/milvus/pkg/eventlog" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) type CollectionObserver struct { @@ -45,9 +47,17 @@ type CollectionObserver struct { checkerController *checkers.CheckerController partitionLoadedCount map[int64]int + loadTasks *typeutil.ConcurrentMap[string, LoadTask] + stopOnce sync.Once } +type LoadTask struct { + LoadType querypb.LoadType + CollectionID int64 + PartitionIDs []int64 +} + func NewCollectionObserver( dist *meta.DistributionManager, meta *meta.Meta, @@ -55,14 +65,23 @@ func NewCollectionObserver( targetObserver *TargetObserver, checherController *checkers.CheckerController, ) *CollectionObserver { - return &CollectionObserver{ + ob := &CollectionObserver{ dist: dist, meta: meta, targetMgr: targetMgr, targetObserver: targetObserver, checkerController: checherController, partitionLoadedCount: make(map[int64]int), + loadTasks: typeutil.NewConcurrentMap[string, LoadTask](), } + + // Add load task for collection recovery + collections := meta.GetAllCollections() + for _, collection := range collections { + ob.LoadCollection(context.Background(), collection.GetCollectionID()) + } + + return ob } func (ob *CollectionObserver) Start() { @@ -98,51 +117,104 @@ func (ob *CollectionObserver) Stop() { }) } +func (ob *CollectionObserver) LoadCollection(ctx context.Context, collectionID int64) { + span := trace.SpanFromContext(ctx) + + traceID := span.SpanContext().TraceID() + key := traceID.String() + + if !traceID.IsValid() { + key = fmt.Sprintf("LoadCollection_%d", collectionID) + } + + ob.loadTasks.Insert(key, LoadTask{LoadType: querypb.LoadType_LoadCollection, CollectionID: collectionID}) +} + +func (ob *CollectionObserver) LoadPartitions(ctx context.Context, collectionID int64, partitionIDs []int64) { + span := trace.SpanFromContext(ctx) + + traceID := span.SpanContext().TraceID() + key := traceID.String() + if !traceID.IsValid() { + key = fmt.Sprintf("LoadPartition_%d_%v", collectionID, partitionIDs) + } + + ob.loadTasks.Insert(key, LoadTask{LoadType: querypb.LoadType_LoadPartition, CollectionID: collectionID, PartitionIDs: partitionIDs}) +} + func (ob *CollectionObserver) Observe(ctx context.Context) { ob.observeTimeout() ob.observeLoadStatus(ctx) } func (ob *CollectionObserver) observeTimeout() { - collections := ob.meta.CollectionManager.GetAllCollections() - for _, collection := range collections { - if collection.GetStatus() != querypb.LoadStatus_Loading || - time.Now().Before(collection.UpdatedAt.Add(Params.QueryCoordCfg.LoadTimeoutSeconds.GetAsDuration(time.Second))) { - continue + ob.loadTasks.Range(func(traceID string, task LoadTask) bool { + collection := ob.meta.CollectionManager.GetCollection(task.CollectionID) + // collection released + if collection == nil { + log.Info("Load Collection Task canceled, collection removed from meta", zap.Int64("collectionID", task.CollectionID), zap.String("traceID", traceID)) + ob.loadTasks.Remove(traceID) + return true } - log.Info("load collection timeout, cancel it", - zap.Int64("collectionID", collection.GetCollectionID()), - zap.Duration("loadTime", time.Since(collection.CreatedAt))) - ob.meta.CollectionManager.RemoveCollection(collection.GetCollectionID()) - ob.meta.ReplicaManager.RemoveCollection(collection.GetCollectionID()) - ob.targetMgr.RemoveCollection(collection.GetCollectionID()) - } + switch task.LoadType { + case querypb.LoadType_LoadCollection: + if collection.GetStatus() == querypb.LoadStatus_Loading && + time.Now().After(collection.UpdatedAt.Add(Params.QueryCoordCfg.LoadTimeoutSeconds.GetAsDuration(time.Second))) { + log.Info("load collection timeout, cancel it", + zap.Int64("collectionID", collection.GetCollectionID()), + zap.Duration("loadTime", time.Since(collection.CreatedAt))) + ob.meta.CollectionManager.RemoveCollection(collection.GetCollectionID()) + ob.meta.ReplicaManager.RemoveCollection(collection.GetCollectionID()) + ob.targetMgr.RemoveCollection(collection.GetCollectionID()) + ob.loadTasks.Remove(traceID) + } + case querypb.LoadType_LoadPartition: + partitionIDs := typeutil.NewSet(task.PartitionIDs...) + partitions := ob.meta.GetPartitionsByCollection(task.CollectionID) + partitions = lo.Filter(partitions, func(partition *meta.Partition, _ int) bool { + return partitionIDs.Contain(partition.GetPartitionID()) + }) - partitions := utils.GroupPartitionsByCollection(ob.meta.CollectionManager.GetAllPartitions()) - for collection, partitions := range partitions { - for _, partition := range partitions { - if partition.GetStatus() != querypb.LoadStatus_Loading || - time.Now().Before(partition.UpdatedAt.Add(Params.QueryCoordCfg.LoadTimeoutSeconds.GetAsDuration(time.Second))) { - continue + // all partition released + if len(partitions) == 0 { + log.Info("Load Partitions Task canceled, collection removed from meta", + zap.Int64("collectionID", task.CollectionID), + zap.Int64s("partitionIDs", task.PartitionIDs), + zap.String("traceID", traceID)) + ob.loadTasks.Remove(traceID) + return true } - log.Info("load partition timeout, cancel it", - zap.Int64("collectionID", collection), - zap.Int64("partitionID", partition.GetPartitionID()), - zap.Duration("loadTime", time.Since(partition.CreatedAt))) - ob.meta.CollectionManager.RemovePartition(collection, partition.GetPartitionID()) - ob.targetMgr.RemovePartition(partition.GetCollectionID(), partition.GetPartitionID()) - } - // all partition timeout, remove collection - if len(ob.meta.CollectionManager.GetPartitionsByCollection(collection)) == 0 { - log.Info("collection timeout due to all partition removed", zap.Int64("collection", collection)) + working := false + for _, partition := range partitions { + if time.Now().Before(partition.UpdatedAt.Add(Params.QueryCoordCfg.LoadTimeoutSeconds.GetAsDuration(time.Second))) { + working = true + break + } + } + // only all partitions timeout means task timeout + if !working { + log.Info("load partitions timeout, cancel it", + zap.Int64("collectionID", task.CollectionID), + zap.Int64s("partitionIDs", task.PartitionIDs)) + for _, partition := range partitions { + ob.meta.CollectionManager.RemovePartition(partition.CollectionID, partition.GetPartitionID()) + ob.targetMgr.RemovePartition(partition.GetCollectionID(), partition.GetPartitionID()) + } - ob.meta.CollectionManager.RemoveCollection(collection) - ob.meta.ReplicaManager.RemoveCollection(collection) - ob.targetMgr.RemoveCollection(collection) + // all partition timeout, remove collection + if len(ob.meta.CollectionManager.GetPartitionsByCollection(task.CollectionID)) == 0 { + log.Info("collection timeout due to all partition removed", zap.Int64("collection", task.CollectionID)) + + ob.meta.CollectionManager.RemoveCollection(task.CollectionID) + ob.meta.ReplicaManager.RemoveCollection(task.CollectionID) + ob.targetMgr.RemoveCollection(task.CollectionID) + } + } } - } + return true + }) } func (ob *CollectionObserver) readyToObserve(collectionID int64) bool { @@ -153,18 +225,54 @@ func (ob *CollectionObserver) readyToObserve(collectionID int64) bool { } func (ob *CollectionObserver) observeLoadStatus(ctx context.Context) { - partitions := ob.meta.CollectionManager.GetAllPartitions() loading := false - for _, partition := range partitions { - if partition.LoadPercentage == 100 { - continue + ob.loadTasks.Range(func(traceID string, task LoadTask) bool { + loading = true + + collection := ob.meta.CollectionManager.GetCollection(task.CollectionID) + if collection == nil { + return true } - if ob.readyToObserve(partition.CollectionID) { - replicaNum := ob.meta.GetReplicaNumber(partition.GetCollectionID()) - ob.observePartitionLoadStatus(ctx, partition, replicaNum) - loading = true + + var partitions []*meta.Partition + switch task.LoadType { + case querypb.LoadType_LoadCollection: + partitions = ob.meta.GetPartitionsByCollection(task.CollectionID) + case querypb.LoadType_LoadPartition: + partitionIDs := typeutil.NewSet[int64](task.PartitionIDs...) + partitions = ob.meta.GetPartitionsByCollection(task.CollectionID) + partitions = lo.Filter(partitions, func(partition *meta.Partition, _ int) bool { + return partitionIDs.Contain(partition.GetPartitionID()) + }) } - } + + loaded := true + for _, partition := range partitions { + if partition.LoadPercentage == 100 { + continue + } + if ob.readyToObserve(partition.CollectionID) { + replicaNum := ob.meta.GetReplicaNumber(partition.GetCollectionID()) + ob.observePartitionLoadStatus(ctx, partition, replicaNum) + } + partition = ob.meta.GetPartition(partition.PartitionID) + if partition.LoadPercentage != 100 { + loaded = false + } + } + // all partition loaded, finish task + if len(partitions) > 0 && loaded { + log.Info("Load task finish", + zap.String("traceID", traceID), + zap.Int64("collectionID", task.CollectionID), + zap.Int64s("partitionIDs", task.PartitionIDs), + zap.Stringer("loadType", task.LoadType)) + ob.loadTasks.Remove(traceID) + } + + return true + }) + // trigger check logic when loading collections/partitions if loading { ob.checkerController.Check() diff --git a/internal/querycoordv2/observers/collection_observer_test.go b/internal/querycoordv2/observers/collection_observer_test.go index 478094f975..b336ad6072 100644 --- a/internal/querycoordv2/observers/collection_observer_test.go +++ b/internal/querycoordv2/observers/collection_observer_test.go @@ -17,6 +17,7 @@ package observers import ( + "context" "testing" "time" @@ -441,6 +442,8 @@ func (suite *CollectionObserverSuite) load(collection int64) { suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, collection).Return(dmChannels, allSegments, nil) suite.targetMgr.UpdateCollectionNextTarget(collection) + + suite.ob.LoadCollection(context.Background(), collection) } func TestCollectionObserver(t *testing.T) { diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index 1b594cfcd6..9592dc605a 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -232,6 +232,7 @@ func (s *Server) LoadCollection(ctx context.Context, req *querypb.LoadCollection s.cluster, s.targetMgr, s.targetObserver, + s.collectionObserver, s.nodeMgr, ) s.jobScheduler.Add(loadJob) @@ -332,6 +333,7 @@ func (s *Server) LoadPartitions(ctx context.Context, req *querypb.LoadPartitions s.cluster, s.targetMgr, s.targetObserver, + s.collectionObserver, s.nodeMgr, ) s.jobScheduler.Add(loadJob) diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index c8fd570421..0c74f60c95 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -70,18 +70,19 @@ type ServiceSuite struct { nodes []int64 // Dependencies - kv kv.MetaKv - store metastore.QueryCoordCatalog - dist *meta.DistributionManager - meta *meta.Meta - targetMgr *meta.TargetManager - broker *meta.MockBroker - targetObserver *observers.TargetObserver - cluster *session.MockCluster - nodeMgr *session.NodeManager - jobScheduler *job.Scheduler - taskScheduler *task.MockScheduler - balancer balance.Balance + kv kv.MetaKv + store metastore.QueryCoordCatalog + dist *meta.DistributionManager + meta *meta.Meta + targetMgr *meta.TargetManager + broker *meta.MockBroker + targetObserver *observers.TargetObserver + collectionObserver *observers.CollectionObserver + cluster *session.MockCluster + nodeMgr *session.NodeManager + jobScheduler *job.Scheduler + taskScheduler *task.MockScheduler + balancer balance.Balance distMgr *meta.DistributionManager distController *dist.MockController @@ -177,6 +178,15 @@ func (suite *ServiceSuite) SetupTest() { suite.distMgr = meta.NewDistributionManager() suite.distController = dist.NewMockController(suite.T()) + suite.collectionObserver = observers.NewCollectionObserver( + suite.dist, + suite.meta, + suite.targetMgr, + suite.targetObserver, + &checkers.CheckerController{}, + ) + suite.collectionObserver.Start() + suite.server = &Server{ kv: suite.kv, store: suite.store, @@ -187,6 +197,7 @@ func (suite *ServiceSuite) SetupTest() { targetMgr: suite.targetMgr, broker: suite.broker, targetObserver: suite.targetObserver, + collectionObserver: suite.collectionObserver, nodeMgr: suite.nodeMgr, cluster: suite.cluster, jobScheduler: suite.jobScheduler, @@ -195,13 +206,6 @@ func (suite *ServiceSuite) SetupTest() { distController: suite.distController, ctx: context.Background(), } - suite.server.collectionObserver = observers.NewCollectionObserver( - suite.server.dist, - suite.server.meta, - suite.server.targetMgr, - suite.targetObserver, - &checkers.CheckerController{}, - ) suite.server.UpdateStateCode(commonpb.StateCode_Healthy) } @@ -1802,6 +1806,7 @@ func (suite *ServiceSuite) loadAll() { suite.cluster, suite.targetMgr, suite.targetObserver, + suite.collectionObserver, suite.nodeMgr, ) suite.jobScheduler.Add(job) @@ -1826,6 +1831,7 @@ func (suite *ServiceSuite) loadAll() { suite.cluster, suite.targetMgr, suite.targetObserver, + suite.collectionObserver, suite.nodeMgr, ) suite.jobScheduler.Add(job)