diff --git a/internal/rootcoord/quota_center.go b/internal/rootcoord/quota_center.go index c717afd622..6c7cf0a4ac 100644 --- a/internal/rootcoord/quota_center.go +++ b/internal/rootcoord/quota_center.go @@ -1307,6 +1307,20 @@ func (q *QuotaCenter) resetAllCurrentRates() error { } } + // updateLimiterHasUpdated checks all limiters in a RateLimiterNode and sets hasUpdated to true + // for those with non-Inf values + updateLimiterHasUpdated := func(node *rlinternal.RateLimiterNode) { + if node == nil { + return + } + node.GetLimiters().Range(func(rateType internalpb.RateType, limiter *ratelimitutil.Limiter) bool { + if limiter.Limit() != Inf { + limiter.SetHasUpdated(true) + } + return true + }) + } + collectionRateTypes := getRateTypes(internalpb.RateScope_Collection, allOps) initLimiters := func(sourceCollections map[int64]map[int64][]int64) { for dbID, collections := range sourceCollections { @@ -1324,22 +1338,26 @@ func (q *QuotaCenter) resetAllCurrentRates() error { getCollectionLimitVal := func(rateType internalpb.RateType) Limit { return collectionLimitVals[rateType] } - q.rateLimiter.GetOrCreateCollectionLimiters(dbID, collectionID, + + collectionLimiter := q.rateLimiter.GetOrCreateCollectionLimiters(dbID, collectionID, newParamLimiterFunc(internalpb.RateScope_Database, allOps), newParamLimiterFuncWithLimitFunc(internalpb.RateScope_Collection, allOps, getCollectionLimitVal)) + updateLimiterHasUpdated(collectionLimiter) if !enablePartitionRateLimit { continue } for _, partitionID := range partitionIDs { - q.rateLimiter.GetOrCreatePartitionLimiters(dbID, collectionID, partitionID, + partitionLimiter := q.rateLimiter.GetOrCreatePartitionLimiters(dbID, collectionID, partitionID, newParamLimiterFunc(internalpb.RateScope_Database, allOps), newParamLimiterFuncWithLimitFunc(internalpb.RateScope_Collection, allOps, getCollectionLimitVal), newParamLimiterFunc(internalpb.RateScope_Partition, allOps)) + updateLimiterHasUpdated(partitionLimiter) } } if len(collections) == 0 { - q.rateLimiter.GetOrCreateDatabaseLimiters(dbID, newParamLimiterFunc(internalpb.RateScope_Database, allOps)) + dbLimiter := q.rateLimiter.GetOrCreateDatabaseLimiters(dbID, newParamLimiterFunc(internalpb.RateScope_Database, allOps)) + updateLimiterHasUpdated(dbLimiter) } } } diff --git a/pkg/util/ratelimitutil/limiter.go b/pkg/util/ratelimitutil/limiter.go index d2f95b31b6..8960efc59d 100644 --- a/pkg/util/ratelimitutil/limiter.go +++ b/pkg/util/ratelimitutil/limiter.go @@ -143,6 +143,13 @@ func (lim *Limiter) HasUpdated() bool { return lim.hasUpdated } +// SetHasUpdated sets the hasUpdated flag to the specified value. +func (lim *Limiter) SetHasUpdated(updated bool) { + lim.mu.Lock() + defer lim.mu.Unlock() + lim.hasUpdated = updated +} + // advance calculates and returns an updated state for lim resulting from the passage of time. // lim is not changed. advance requires that lim.mu is held. func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {