From b65b963ca2acc875b8edd7f5a7e4be3eaa91a20d Mon Sep 17 00:00:00 2001 From: wei liu Date: Tue, 9 Apr 2024 17:17:18 +0800 Subject: [PATCH] enhance: Rootcoord's stop may block in quota_center's stop (#31447) (#31824) pr: #31447 this PR fixed that rootcoord's stop may block in quota_center' stop --------- Signed-off-by: Wei Liu --- internal/rootcoord/quota_center.go | 24 ++++++++++++++-- internal/rootcoord/quota_center_test.go | 38 ++++++++++++++++++++++++- internal/rootcoord/root_coord.go | 2 +- 3 files changed, 59 insertions(+), 5 deletions(-) diff --git a/internal/rootcoord/quota_center.go b/internal/rootcoord/quota_center.go index dffa0e7c17..787e901b99 100644 --- a/internal/rootcoord/quota_center.go +++ b/internal/rootcoord/quota_center.go @@ -87,6 +87,9 @@ type collectionStates = map[milvuspb.QuotaState]commonpb.ErrorCode // // If necessary, user can also manually force to deny RW requests. type QuotaCenter struct { + ctx context.Context + cancel context.CancelFunc + // clients proxies *proxyClientManager queryCoord types.QueryCoordClient @@ -112,11 +115,15 @@ type QuotaCenter struct { stopOnce sync.Once stopChan chan struct{} + wg sync.WaitGroup } // NewQuotaCenter returns a new QuotaCenter. func NewQuotaCenter(proxies *proxyClientManager, queryCoord types.QueryCoordClient, dataCoord types.DataCoordClient, tsoAllocator tso.Allocator, meta IMetaTable) *QuotaCenter { + ctx, cancel := context.WithCancel(context.TODO()) return &QuotaCenter{ + ctx: ctx, + cancel: cancel, proxies: proxies, queryCoord: queryCoord, dataCoord: dataCoord, @@ -132,10 +139,17 @@ func NewQuotaCenter(proxies *proxyClientManager, queryCoord types.QueryCoordClie } } +func (q *QuotaCenter) Start() { + q.wg.Add(1) + go q.run() +} + // run starts the service of QuotaCenter. func (q *QuotaCenter) run() { + defer q.wg.Done() log.Info("Start QuotaCenter", zap.Float64("collectInterval/s", Params.QuotaConfig.QuotaCenterCollectInterval.GetAsFloat())) ticker := time.NewTicker(time.Duration(Params.QuotaConfig.QuotaCenterCollectInterval.GetAsFloat() * float64(time.Second))) + defer ticker.Stop() for { select { @@ -164,9 +178,13 @@ func (q *QuotaCenter) run() { // stop would stop the service of QuotaCenter. func (q *QuotaCenter) stop() { + log.Info("stop quota center") q.stopOnce.Do(func() { - q.stopChan <- struct{}{} + // cancel all blocking request to coord + q.cancel() + close(q.stopChan) }) + q.wg.Wait() } // clearMetrics removes all metrics stored in QuotaCenter. @@ -234,7 +252,7 @@ func (q *QuotaCenter) syncMetrics() error { oldDataNodes := typeutil.NewSet(lo.Keys(q.dataNodeMetrics)...) oldQueryNodes := typeutil.NewSet(lo.Keys(q.queryNodeMetrics)...) q.clearMetrics() - ctx, cancel := context.WithTimeout(context.Background(), GetMetricsTimeout) + ctx, cancel := context.WithTimeout(q.ctx, GetMetricsTimeout) defer cancel() group := &errgroup.Group{} @@ -856,7 +874,7 @@ func (q *QuotaCenter) checkDiskQuota() { // setRates notifies Proxies to set rates for different rate types. func (q *QuotaCenter) setRates() error { - ctx, cancel := context.WithTimeout(context.Background(), SetRatesTimeout) + ctx, cancel := context.WithTimeout(q.ctx, SetRatesTimeout) defer cancel() toCollectionRate := func(collection int64, currentRates map[internalpb.RateType]ratelimitutil.Limit) *proxypb.CollectionRate { diff --git a/internal/rootcoord/quota_center_test.go b/internal/rootcoord/quota_center_test.go index c3a60fce27..5fed04a6b4 100644 --- a/internal/rootcoord/quota_center_test.go +++ b/internal/rootcoord/quota_center_test.go @@ -28,6 +28,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" + "google.golang.org/grpc" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" @@ -67,11 +68,46 @@ func TestQuotaCenter(t *testing.T) { meta.EXPECT().GetCollectionByID(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, merr.ErrCollectionNotFound).Maybe() quotaCenter := NewQuotaCenter(pcm, qc, dc, core.tsoAllocator, meta) - go quotaCenter.run() + quotaCenter.Start() time.Sleep(10 * time.Millisecond) quotaCenter.stop() }) + t.Run("test QuotaCenter stop", func(t *testing.T) { + qc := mocks.NewMockQueryCoordClient(t) + meta := mockrootcoord.NewIMetaTable(t) + + paramtable.Get().Save(paramtable.Get().QuotaConfig.QuotaCenterCollectInterval.Key, "1") + defer paramtable.Get().Reset(paramtable.Get().QuotaConfig.QuotaCenterCollectInterval.Key) + + qc.ExpectedCalls = nil + // mock query coord stuck for at most 10s + qc.EXPECT().GetMetrics(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, gmr *milvuspb.GetMetricsRequest, co ...grpc.CallOption) (*milvuspb.GetMetricsResponse, error) { + counter := 0 + for { + select { + case <-ctx.Done(): + return nil, merr.ErrCollectionNotFound + default: + if counter < 10 { + time.Sleep(1 * time.Second) + counter++ + } + } + } + }) + + meta.EXPECT().GetCollectionByID(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, merr.ErrCollectionNotFound).Maybe() + quotaCenter := NewQuotaCenter(pcm, qc, dc, core.tsoAllocator, meta) + quotaCenter.Start() + time.Sleep(3 * time.Second) + + // assert stop won't stuck more than 5s + start := time.Now() + quotaCenter.stop() + assert.True(t, time.Since(start).Seconds() <= 5) + }) + t.Run("test syncMetrics", func(t *testing.T) { qc := mocks.NewMockQueryCoordClient(t) meta := mockrootcoord.NewIMetaTable(t) diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index d51aaabb4f..714dbbbaf8 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -697,7 +697,7 @@ func (c *Core) startInternal() error { } if Params.QuotaConfig.QuotaAndLimitsEnabled.GetAsBool() { - go c.quotaCenter.run() + c.quotaCenter.Start() } c.scheduler.Start()