diff --git a/internal/proxy/simple_rate_limiter.go b/internal/proxy/simple_rate_limiter.go index 65fcc80551..dd06057c8f 100644 --- a/internal/proxy/simple_rate_limiter.go +++ b/internal/proxy/simple_rate_limiter.go @@ -217,14 +217,20 @@ func (m *SimpleLimiter) SetRates(rootLimiter *proxypb.LimiterNode) error { } func initLimiter(rln *rlinternal.RateLimiterNode, rateLimiterConfigs map[internalpb.RateType]*paramtable.ParamItem) { - log := log.Ctx(context.TODO()).WithRateGroup("proxy.rateLimiter", 1.0, 60.0) for rt, p := range rateLimiterConfigs { - limit := ratelimitutil.Limit(p.GetAsFloat()) + newLimit := ratelimitutil.Limit(p.GetAsFloat()) burst := p.GetAsFloat() // use rate as burst, because SimpleLimiter is with punishment mechanism, burst is insignificant. - rln.GetLimiters().Insert(rt, ratelimitutil.NewLimiter(limit, burst)) - log.RatedDebug(30, "RateLimiter register for rateType", + old, ok := rln.GetLimiters().Get(rt) + if ok { + if old.Limit() != newLimit { + old.SetLimit(newLimit) + } + } else { + rln.GetLimiters().Insert(rt, ratelimitutil.NewLimiter(newLimit, burst)) + } + log.Debug("RateLimiter register for rateType", zap.String("rateType", internalpb.RateType_name[(int32(rt))]), - zap.String("rateLimit", ratelimitutil.Limit(p.GetAsFloat()).String()), + zap.String("rateLimit", newLimit.String()), zap.String("burst", fmt.Sprintf("%v", burst))) } } diff --git a/internal/rootcoord/quota_center.go b/internal/rootcoord/quota_center.go index 9dd91350c4..a4e2d45046 100644 --- a/internal/rootcoord/quota_center.go +++ b/internal/rootcoord/quota_center.go @@ -551,6 +551,7 @@ func (q *QuotaCenter) collectMetrics() error { // forceDenyWriting sets dml rates to 0 to reject all dml requests. func (q *QuotaCenter) forceDenyWriting(errorCode commonpb.ErrorCode, cluster bool, dbIDs, collectionIDs []int64, col2partitionIDs map[int64][]int64) error { + log := log.Ctx(context.TODO()).WithRateGroup("quotaCenter.forceDenyWriting", 1.0, 60.0) if cluster { clusterLimiters := q.rateLimiter.GetRootLimiters() updateLimiter(clusterLimiters, GetEarliestLimiter(), internalpb.RateScope_Cluster, dml) @@ -1282,7 +1283,8 @@ func (q *QuotaCenter) calculateRates() error { } func (q *QuotaCenter) resetAllCurrentRates() error { - q.rateLimiter = rlinternal.NewRateLimiterTree(initInfLimiter(internalpb.RateScope_Cluster, allOps)) + clusterLimiter := newParamLimiterFunc(internalpb.RateScope_Cluster, allOps)() + q.rateLimiter = rlinternal.NewRateLimiterTree(clusterLimiter) initLimiters := func(sourceCollections map[int64]map[int64][]int64) { for dbID, collections := range sourceCollections { for collectionID, partitionIDs := range collections { @@ -1357,6 +1359,7 @@ func (q *QuotaCenter) getCollectionLimitProperties(collection int64) map[string] // checkDiskQuota checks if disk quota exceeded. func (q *QuotaCenter) checkDiskQuota(denyWritingDBs map[int64]struct{}) error { + log := log.Ctx(context.Background()).WithRateGroup("rootcoord.QuotaCenter", 1.0, 60.0) q.diskMu.Lock() defer q.diskMu.Unlock() if !Params.QuotaConfig.DiskProtectionEnabled.GetAsBool() { @@ -1370,6 +1373,7 @@ func (q *QuotaCenter) checkDiskQuota(denyWritingDBs map[int64]struct{}) error { totalDiskQuota := Params.QuotaConfig.DiskQuota.GetAsFloat() total := q.dataCoordMetrics.TotalBinlogSize if float64(total) >= totalDiskQuota { + log.RatedWarn(10, "cluster disk quota exceeded", zap.Int64("disk usage", total), zap.Float64("disk quota", totalDiskQuota)) err := q.forceDenyWriting(commonpb.ErrorCode_DiskQuotaExhausted, true, nil, nil, nil) if err != nil { log.Warn("fail to force deny writing", zap.Error(err)) @@ -1431,6 +1435,7 @@ func (q *QuotaCenter) checkDiskQuota(denyWritingDBs map[int64]struct{}) error { } func (q *QuotaCenter) checkDBDiskQuota(dbSizeInfo map[int64]int64) []int64 { + log := log.Ctx(context.Background()).WithRateGroup("rootcoord.QuotaCenter", 1.0, 60.0) dbIDs := make([]int64, 0) checkDiskQuota := func(dbID, binlogSize int64, quota float64) { if float64(binlogSize) >= quota {