From ed81eaa963fa17b8a1a951f37b3733253aee1334 Mon Sep 17 00:00:00 2001 From: congqixia Date: Mon, 8 May 2023 14:06:41 +0800 Subject: [PATCH] Make CollectionObserver trigger checker more frequently during load procedure (#23928) Signed-off-by: Congqi Xia --- internal/querycoordv2/checkers/controller.go | 7 +++++-- .../querycoordv2/observers/collection_observer.go | 10 ++++++++++ .../observers/collection_observer_test.go | 12 ++++++++---- internal/querycoordv2/server.go | 1 + internal/querycoordv2/server_test.go | 1 + internal/querycoordv2/services_test.go | 2 ++ 6 files changed, 27 insertions(+), 6 deletions(-) diff --git a/internal/querycoordv2/checkers/controller.go b/internal/querycoordv2/checkers/controller.go index 9df4fc652c..865c3fd1f6 100644 --- a/internal/querycoordv2/checkers/controller.go +++ b/internal/querycoordv2/checkers/controller.go @@ -70,7 +70,7 @@ func NewCheckerController( return &CheckerController{ stopCh: make(chan struct{}), - checkCh: make(chan struct{}), + checkCh: make(chan struct{}, 1), meta: meta, dist: dist, targetMgr: targetMgr, @@ -112,7 +112,10 @@ func (controller *CheckerController) Stop() { } func (controller *CheckerController) Check() { - controller.checkCh <- struct{}{} + select { + case controller.checkCh <- struct{}{}: + default: + } } // check is the real implementation of Check diff --git a/internal/querycoordv2/observers/collection_observer.go b/internal/querycoordv2/observers/collection_observer.go index edbb5c9b8f..380a4bd4f6 100644 --- a/internal/querycoordv2/observers/collection_observer.go +++ b/internal/querycoordv2/observers/collection_observer.go @@ -24,6 +24,7 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/querycoordv2/checkers" "github.com/milvus-io/milvus/internal/querycoordv2/meta" . "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/querycoordv2/utils" @@ -37,6 +38,7 @@ type CollectionObserver struct { meta *meta.Meta targetMgr *meta.TargetManager targetObserver *TargetObserver + checkerController *checkers.CheckerController partitionLoadedCount map[int64]int stopOnce sync.Once @@ -47,6 +49,7 @@ func NewCollectionObserver( meta *meta.Meta, targetMgr *meta.TargetManager, targetObserver *TargetObserver, + checherController *checkers.CheckerController, ) *CollectionObserver { return &CollectionObserver{ stopCh: make(chan struct{}), @@ -54,6 +57,7 @@ func NewCollectionObserver( meta: meta, targetMgr: targetMgr, targetObserver: targetObserver, + checkerController: checherController, partitionLoadedCount: make(map[int64]int), } } @@ -135,12 +139,18 @@ func (ob *CollectionObserver) observeLoadStatus() { if len(partitions) > 0 { log.Info("observe partitions status", zap.Int("partitionNum", len(partitions))) } + loading := false for _, partition := range partitions { if partition.LoadPercentage == 100 { continue } replicaNum := ob.meta.GetReplicaNumber(partition.GetCollectionID()) ob.observePartitionLoadStatus(partition, replicaNum) + loading = true + } + // trigger check logic when loading collections/partitions + if loading { + ob.checkerController.Check() } } diff --git a/internal/querycoordv2/observers/collection_observer_test.go b/internal/querycoordv2/observers/collection_observer_test.go index 1036592bb8..b2a85f3382 100644 --- a/internal/querycoordv2/observers/collection_observer_test.go +++ b/internal/querycoordv2/observers/collection_observer_test.go @@ -30,6 +30,7 @@ import ( etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/querycoordv2/checkers" "github.com/milvus-io/milvus/internal/querycoordv2/meta" . "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/querycoordv2/session" @@ -58,10 +59,11 @@ type CollectionObserverSuite struct { broker *meta.MockBroker // Dependencies - dist *meta.DistributionManager - meta *meta.Meta - targetMgr *meta.TargetManager - targetObserver *TargetObserver + dist *meta.DistributionManager + meta *meta.Meta + targetMgr *meta.TargetManager + targetObserver *TargetObserver + checkerController *checkers.CheckerController // Test object ob *CollectionObserver @@ -188,6 +190,7 @@ func (suite *CollectionObserverSuite) SetupTest() { suite.dist, suite.broker, ) + suite.checkerController = &checkers.CheckerController{} // Test object suite.ob = NewCollectionObserver( @@ -195,6 +198,7 @@ func (suite *CollectionObserverSuite) SetupTest() { suite.meta, suite.targetMgr, suite.targetObserver, + suite.checkerController, ) for _, collection := range suite.collections { diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index ebfb244fab..8a36f86fd3 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -356,6 +356,7 @@ func (s *Server) initObserver() { s.meta, s.targetMgr, s.targetObserver, + s.checkerController, ) s.replicaObserver = observers.NewReplicaObserver( diff --git a/internal/querycoordv2/server_test.go b/internal/querycoordv2/server_test.go index f47a16a481..5452b4071d 100644 --- a/internal/querycoordv2/server_test.go +++ b/internal/querycoordv2/server_test.go @@ -518,6 +518,7 @@ func (suite *ServerSuite) hackServer() { suite.server.meta, suite.server.targetMgr, suite.server.targetObserver, + suite.server.checkerController, ) suite.broker.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything).Return(&schemapb.CollectionSchema{}, nil).Maybe() diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index 6897e3c164..7af73e5b03 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -35,6 +35,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querycoordv2/balance" + "github.com/milvus-io/milvus/internal/querycoordv2/checkers" "github.com/milvus-io/milvus/internal/querycoordv2/dist" "github.com/milvus-io/milvus/internal/querycoordv2/job" "github.com/milvus-io/milvus/internal/querycoordv2/meta" @@ -188,6 +189,7 @@ func (suite *ServiceSuite) SetupTest() { suite.server.meta, suite.server.targetMgr, suite.targetObserver, + &checkers.CheckerController{}, ) suite.server.UpdateStateCode(commonpb.StateCode_Healthy)