From dc66f3fa0460edcafaad747546be3c7f80e7d47f Mon Sep 17 00:00:00 2001 From: Guanghui Huang <65161555+guanghuihuang88@users.noreply.github.com> Date: Tue, 6 Jan 2026 10:21:24 +0800 Subject: [PATCH] fix: Fix rate limit not work when set collection rate limit properties (#46714) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit issue: https://github.com/milvus-io/milvus/issues/46713 /kind bug - Core invariant: a Limiter's hasUpdated flag controls whether its non-Infinite limit is exported to proxies via toRequestLimiter(); only limiters with hasUpdated==true and non-Inf rates produce rate updates delivered to proxies (pkg/util/ratelimitutil/limiter.go: HasUpdated / toRequestLimiter behavior unchanged). - Exact bug and fix (issue #46713): collection-level limiters created from configured collection/partition/database properties were constructed with correct limits but left hasUpdated==false, so they were skipped by the existing !HasUpdated() check and never sent to proxies. Fix: add Limiter.SetHasUpdated(updated bool) and call a new updateLimiterHasUpdated helper immediately after creating limiter nodes during initialization/reset (internal/rootcoord/quota_center.go) to mark non-Inf newly-created limiters as updated so they are included in toRequestLimiter exports. - Logic simplified / redundancy removed: initialization now explicitly sets limiter initialization state (hasUpdated) for newly-created non-Infinite limiters instead of relying on implicit later side-effects to toggle the flag; this removes the implicit gap between creation and the expectation that a configured limiter should be published. - No data-loss or behavior regression: the change only mutates the in-memory hasUpdated flag for freshly created limiter instances (pkg/util/ratelimitutil/limiter.go: SetHasUpdated) and sets it in the limiter initialization path (internal/rootcoord/quota_center.go). It does not alter token accounting (advance, AllowN, Cancel), rate computation, SetLimit semantics, persistence, or proxy filtering logic—only ensures intended collection-level rates are delivered to proxies—so no persisted data or runtime rate behavior is removed or degraded. Signed-off-by: guanghuihuang Co-authored-by: guanghuihuang --- internal/rootcoord/quota_center.go | 24 +++++++++++++++++++++--- pkg/util/ratelimitutil/limiter.go | 7 +++++++ 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/internal/rootcoord/quota_center.go b/internal/rootcoord/quota_center.go index c717afd622..6c7cf0a4ac 100644 --- a/internal/rootcoord/quota_center.go +++ b/internal/rootcoord/quota_center.go @@ -1307,6 +1307,20 @@ func (q *QuotaCenter) resetAllCurrentRates() error { } } + // updateLimiterHasUpdated checks all limiters in a RateLimiterNode and sets hasUpdated to true + // for those with non-Inf values + updateLimiterHasUpdated := func(node *rlinternal.RateLimiterNode) { + if node == nil { + return + } + node.GetLimiters().Range(func(rateType internalpb.RateType, limiter *ratelimitutil.Limiter) bool { + if limiter.Limit() != Inf { + limiter.SetHasUpdated(true) + } + return true + }) + } + collectionRateTypes := getRateTypes(internalpb.RateScope_Collection, allOps) initLimiters := func(sourceCollections map[int64]map[int64][]int64) { for dbID, collections := range sourceCollections { @@ -1324,22 +1338,26 @@ func (q *QuotaCenter) resetAllCurrentRates() error { getCollectionLimitVal := func(rateType internalpb.RateType) Limit { return collectionLimitVals[rateType] } - q.rateLimiter.GetOrCreateCollectionLimiters(dbID, collectionID, + + collectionLimiter := q.rateLimiter.GetOrCreateCollectionLimiters(dbID, collectionID, newParamLimiterFunc(internalpb.RateScope_Database, allOps), newParamLimiterFuncWithLimitFunc(internalpb.RateScope_Collection, allOps, getCollectionLimitVal)) + updateLimiterHasUpdated(collectionLimiter) if !enablePartitionRateLimit { continue } for _, partitionID := range partitionIDs { - q.rateLimiter.GetOrCreatePartitionLimiters(dbID, collectionID, partitionID, + partitionLimiter := q.rateLimiter.GetOrCreatePartitionLimiters(dbID, collectionID, partitionID, newParamLimiterFunc(internalpb.RateScope_Database, allOps), newParamLimiterFuncWithLimitFunc(internalpb.RateScope_Collection, allOps, getCollectionLimitVal), newParamLimiterFunc(internalpb.RateScope_Partition, allOps)) + updateLimiterHasUpdated(partitionLimiter) } } if len(collections) == 0 { - q.rateLimiter.GetOrCreateDatabaseLimiters(dbID, newParamLimiterFunc(internalpb.RateScope_Database, allOps)) + dbLimiter := q.rateLimiter.GetOrCreateDatabaseLimiters(dbID, newParamLimiterFunc(internalpb.RateScope_Database, allOps)) + updateLimiterHasUpdated(dbLimiter) } } } diff --git a/pkg/util/ratelimitutil/limiter.go b/pkg/util/ratelimitutil/limiter.go index d2f95b31b6..8960efc59d 100644 --- a/pkg/util/ratelimitutil/limiter.go +++ b/pkg/util/ratelimitutil/limiter.go @@ -143,6 +143,13 @@ func (lim *Limiter) HasUpdated() bool { return lim.hasUpdated } +// SetHasUpdated sets the hasUpdated flag to the specified value. +func (lim *Limiter) SetHasUpdated(updated bool) { + lim.mu.Lock() + defer lim.mu.Unlock() + lim.hasUpdated = updated +} + // advance calculates and returns an updated state for lim resulting from the passage of time. // lim is not changed. advance requires that lim.mu is held. func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {