From 56b3490a980a2ea3d03412e3c5d504bbc68eff89 Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Wed, 28 Aug 2024 14:13:00 +0800 Subject: [PATCH] fix: Fix rate wasn't limited to the expected value (#35699) Each time the rate is reset, the token bucket is fully refilled, causing the rate wasn't limited to the expected value. This PR addresses the issue by preventing the token reset. issue: https://github.com/milvus-io/milvus/issues/35675, https://github.com/milvus-io/milvus/issues/35702 --------- Signed-off-by: bigsheeper --- internal/proxy/simple_rate_limiter.go | 16 +++++++++++----- internal/rootcoord/quota_center.go | 7 ++++++- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/internal/proxy/simple_rate_limiter.go b/internal/proxy/simple_rate_limiter.go index 65fcc80551..dd06057c8f 100644 --- a/internal/proxy/simple_rate_limiter.go +++ b/internal/proxy/simple_rate_limiter.go @@ -217,14 +217,20 @@ func (m *SimpleLimiter) SetRates(rootLimiter *proxypb.LimiterNode) error { } func initLimiter(rln *rlinternal.RateLimiterNode, rateLimiterConfigs map[internalpb.RateType]*paramtable.ParamItem) { - log := log.Ctx(context.TODO()).WithRateGroup("proxy.rateLimiter", 1.0, 60.0) for rt, p := range rateLimiterConfigs { - limit := ratelimitutil.Limit(p.GetAsFloat()) + newLimit := ratelimitutil.Limit(p.GetAsFloat()) burst := p.GetAsFloat() // use rate as burst, because SimpleLimiter is with punishment mechanism, burst is insignificant. - rln.GetLimiters().Insert(rt, ratelimitutil.NewLimiter(limit, burst)) - log.RatedDebug(30, "RateLimiter register for rateType", + old, ok := rln.GetLimiters().Get(rt) + if ok { + if old.Limit() != newLimit { + old.SetLimit(newLimit) + } + } else { + rln.GetLimiters().Insert(rt, ratelimitutil.NewLimiter(newLimit, burst)) + } + log.Debug("RateLimiter register for rateType", zap.String("rateType", internalpb.RateType_name[(int32(rt))]), - zap.String("rateLimit", ratelimitutil.Limit(p.GetAsFloat()).String()), + zap.String("rateLimit", newLimit.String()), zap.String("burst", fmt.Sprintf("%v", burst))) } } diff --git a/internal/rootcoord/quota_center.go b/internal/rootcoord/quota_center.go index 9dd91350c4..a4e2d45046 100644 --- a/internal/rootcoord/quota_center.go +++ b/internal/rootcoord/quota_center.go @@ -551,6 +551,7 @@ func (q *QuotaCenter) collectMetrics() error { // forceDenyWriting sets dml rates to 0 to reject all dml requests. func (q *QuotaCenter) forceDenyWriting(errorCode commonpb.ErrorCode, cluster bool, dbIDs, collectionIDs []int64, col2partitionIDs map[int64][]int64) error { + log := log.Ctx(context.TODO()).WithRateGroup("quotaCenter.forceDenyWriting", 1.0, 60.0) if cluster { clusterLimiters := q.rateLimiter.GetRootLimiters() updateLimiter(clusterLimiters, GetEarliestLimiter(), internalpb.RateScope_Cluster, dml) @@ -1282,7 +1283,8 @@ func (q *QuotaCenter) calculateRates() error { } func (q *QuotaCenter) resetAllCurrentRates() error { - q.rateLimiter = rlinternal.NewRateLimiterTree(initInfLimiter(internalpb.RateScope_Cluster, allOps)) + clusterLimiter := newParamLimiterFunc(internalpb.RateScope_Cluster, allOps)() + q.rateLimiter = rlinternal.NewRateLimiterTree(clusterLimiter) initLimiters := func(sourceCollections map[int64]map[int64][]int64) { for dbID, collections := range sourceCollections { for collectionID, partitionIDs := range collections { @@ -1357,6 +1359,7 @@ func (q *QuotaCenter) getCollectionLimitProperties(collection int64) map[string] // checkDiskQuota checks if disk quota exceeded. func (q *QuotaCenter) checkDiskQuota(denyWritingDBs map[int64]struct{}) error { + log := log.Ctx(context.Background()).WithRateGroup("rootcoord.QuotaCenter", 1.0, 60.0) q.diskMu.Lock() defer q.diskMu.Unlock() if !Params.QuotaConfig.DiskProtectionEnabled.GetAsBool() { @@ -1370,6 +1373,7 @@ func (q *QuotaCenter) checkDiskQuota(denyWritingDBs map[int64]struct{}) error { totalDiskQuota := Params.QuotaConfig.DiskQuota.GetAsFloat() total := q.dataCoordMetrics.TotalBinlogSize if float64(total) >= totalDiskQuota { + log.RatedWarn(10, "cluster disk quota exceeded", zap.Int64("disk usage", total), zap.Float64("disk quota", totalDiskQuota)) err := q.forceDenyWriting(commonpb.ErrorCode_DiskQuotaExhausted, true, nil, nil, nil) if err != nil { log.Warn("fail to force deny writing", zap.Error(err)) @@ -1431,6 +1435,7 @@ func (q *QuotaCenter) checkDiskQuota(denyWritingDBs map[int64]struct{}) error { } func (q *QuotaCenter) checkDBDiskQuota(dbSizeInfo map[int64]int64) []int64 { + log := log.Ctx(context.Background()).WithRateGroup("rootcoord.QuotaCenter", 1.0, 60.0) dbIDs := make([]int64, 0) checkDiskQuota := func(dbID, binlogSize int64, quota float64) { if float64(binlogSize) >= quota {