diff --git a/internal/querycoordv2/observers/collection_observer.go b/internal/querycoordv2/observers/collection_observer.go index 69235f783c..08c77858d3 100644 --- a/internal/querycoordv2/observers/collection_observer.go +++ b/internal/querycoordv2/observers/collection_observer.go @@ -56,7 +56,8 @@ type CollectionObserver struct { proxyManager proxyutil.ProxyClientManagerInterface - stopOnce sync.Once + startOnce sync.Once + stopOnce sync.Once } type LoadTask struct { @@ -94,27 +95,29 @@ func NewCollectionObserver( } func (ob *CollectionObserver) Start() { - ctx, cancel := context.WithCancel(context.Background()) - ob.cancel = cancel + ob.startOnce.Do(func() { + ctx, cancel := context.WithCancel(context.Background()) + ob.cancel = cancel - observePeriod := Params.QueryCoordCfg.CollectionObserverInterval.GetAsDuration(time.Millisecond) - ob.wg.Add(1) - go func() { - defer ob.wg.Done() + observePeriod := Params.QueryCoordCfg.CollectionObserverInterval.GetAsDuration(time.Millisecond) + ob.wg.Add(1) + go func() { + defer ob.wg.Done() - ticker := time.NewTicker(observePeriod) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - log.Info("CollectionObserver stopped") - return + ticker := time.NewTicker(observePeriod) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + log.Info("CollectionObserver stopped") + return - case <-ticker.C: - ob.Observe(ctx) + case <-ticker.C: + ob.Observe(ctx) + } } - } - }() + }() + }) } func (ob *CollectionObserver) Stop() { diff --git a/internal/querycoordv2/observers/leader_cache_observer.go b/internal/querycoordv2/observers/leader_cache_observer.go index f63ededfbd..7f92d6f090 100644 --- a/internal/querycoordv2/observers/leader_cache_observer.go +++ b/internal/querycoordv2/observers/leader_cache_observer.go @@ -36,6 +36,7 @@ type CollectionShardLeaderCache = map[string]*querypb.ShardLeadersList type LeaderCacheObserver struct { wg sync.WaitGroup proxyManager proxyutil.ProxyClientManagerInterface + startOnce sync.Once stopOnce sync.Once closeCh chan struct{} @@ -44,8 +45,10 @@ type LeaderCacheObserver struct { } func (o *LeaderCacheObserver) Start(ctx context.Context) { - o.wg.Add(1) - go o.schedule(ctx) + o.startOnce.Do(func() { + o.wg.Add(1) + go o.schedule(ctx) + }) } func (o *LeaderCacheObserver) Stop() { diff --git a/internal/querycoordv2/observers/replica_observer.go b/internal/querycoordv2/observers/replica_observer.go index 96180fb72e..4251fef99f 100644 --- a/internal/querycoordv2/observers/replica_observer.go +++ b/internal/querycoordv2/observers/replica_observer.go @@ -37,7 +37,8 @@ type ReplicaObserver struct { meta *meta.Meta distMgr *meta.DistributionManager - stopOnce sync.Once + startOnce sync.Once + stopOnce sync.Once } func NewReplicaObserver(meta *meta.Meta, distMgr *meta.DistributionManager) *ReplicaObserver { @@ -48,11 +49,13 @@ func NewReplicaObserver(meta *meta.Meta, distMgr *meta.DistributionManager) *Rep } func (ob *ReplicaObserver) Start() { - ctx, cancel := context.WithCancel(context.Background()) - ob.cancel = cancel + ob.startOnce.Do(func() { + ctx, cancel := context.WithCancel(context.Background()) + ob.cancel = cancel - ob.wg.Add(1) - go ob.schedule(ctx) + ob.wg.Add(1) + go ob.schedule(ctx) + }) } func (ob *ReplicaObserver) Stop() { diff --git a/internal/querycoordv2/observers/resource_observer.go b/internal/querycoordv2/observers/resource_observer.go index bfad63e28a..3e0938b0d5 100644 --- a/internal/querycoordv2/observers/resource_observer.go +++ b/internal/querycoordv2/observers/resource_observer.go @@ -36,7 +36,8 @@ type ResourceObserver struct { wg sync.WaitGroup meta *meta.Meta - stopOnce sync.Once + startOnce sync.Once + stopOnce sync.Once } func NewResourceObserver(meta *meta.Meta) *ResourceObserver { @@ -46,11 +47,13 @@ func NewResourceObserver(meta *meta.Meta) *ResourceObserver { } func (ob *ResourceObserver) Start() { - ctx, cancel := context.WithCancel(context.Background()) - ob.cancel = cancel + ob.startOnce.Do(func() { + ctx, cancel := context.WithCancel(context.Background()) + ob.cancel = cancel - ob.wg.Add(1) - go ob.schedule(ctx) + ob.wg.Add(1) + go ob.schedule(ctx) + }) } func (ob *ResourceObserver) Stop() { diff --git a/internal/querycoordv2/observers/target_observer.go b/internal/querycoordv2/observers/target_observer.go index 8c5bf46596..5e6f7ede3c 100644 --- a/internal/querycoordv2/observers/target_observer.go +++ b/internal/querycoordv2/observers/target_observer.go @@ -71,7 +71,8 @@ type TargetObserver struct { dispatcher *taskDispatcher[int64] keylocks *lock.KeyLock[int64] - stopOnce sync.Once + startOnce sync.Once + stopOnce sync.Once } func NewTargetObserver( @@ -101,19 +102,21 @@ func NewTargetObserver( } func (ob *TargetObserver) Start() { - ctx, cancel := context.WithCancel(context.Background()) - ob.cancel = cancel + ob.startOnce.Do(func() { + ctx, cancel := context.WithCancel(context.Background()) + ob.cancel = cancel - ob.dispatcher.Start() + ob.dispatcher.Start() - ob.wg.Add(1) - go func() { - defer ob.wg.Done() - ob.schedule(ctx) - }() + ob.wg.Add(1) + go func() { + defer ob.wg.Done() + ob.schedule(ctx) + }() - // after target observer start, update target for all collection - ob.initChan <- initRequest{} + // after target observer start, update target for all collection + ob.initChan <- initRequest{} + }) } func (ob *TargetObserver) Stop() { diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index 5f0c600167..8e4fca3fea 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -1084,8 +1084,6 @@ func (suite *ServiceSuite) TestReleasePartition() { func (suite *ServiceSuite) TestRefreshCollection() { server := suite.server - server.collectionObserver.Start() - // Test refresh all collections. for _, collection := range suite.collections { err := server.refreshCollection(collection)