mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
Improve rate limiter log (#24459)
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
parent
0983d1fac7
commit
0fb9131973
@ -207,7 +207,7 @@ func (rl *rateLimiter) setRates(collectionRate *proxypb.CollectionRate) error {
|
|||||||
} else {
|
} else {
|
||||||
return fmt.Errorf("unregister rateLimiter for rateType %s", r.GetRt().String())
|
return fmt.Errorf("unregister rateLimiter for rateType %s", r.GetRt().String())
|
||||||
}
|
}
|
||||||
log.RatedInfo(30, "current collection rates in proxy",
|
log.RatedDebug(30, "current collection rates in proxy",
|
||||||
zap.String("rateType", r.Rt.String()),
|
zap.String("rateType", r.Rt.String()),
|
||||||
zap.String("rateLimit", ratelimitutil.Limit(r.GetR()).String()),
|
zap.String("rateLimit", ratelimitutil.Limit(r.GetR()).String()),
|
||||||
)
|
)
|
||||||
@ -339,7 +339,7 @@ func (rl *rateLimiter) registerLimiters(globalLevel bool) {
|
|||||||
}
|
}
|
||||||
}(internalpb.RateType(rt))
|
}(internalpb.RateType(rt))
|
||||||
paramtable.Get().Watch(r.Key, config.NewHandler(fmt.Sprintf("rateLimiter-%d", rt), onEvent))
|
paramtable.Get().Watch(r.Key, config.NewHandler(fmt.Sprintf("rateLimiter-%d", rt), onEvent))
|
||||||
log.RatedInfo(30, "RateLimiter register for rateType",
|
log.RatedDebug(30, "RateLimiter register for rateType",
|
||||||
zap.String("rateType", internalpb.RateType_name[rt]),
|
zap.String("rateType", internalpb.RateType_name[rt]),
|
||||||
zap.String("rateLimit", ratelimitutil.Limit(r.GetAsFloat()).String()),
|
zap.String("rateLimit", ratelimitutil.Limit(r.GetAsFloat()).String()),
|
||||||
zap.String("burst", fmt.Sprintf("%v", burst)))
|
zap.String("burst", fmt.Sprintf("%v", burst)))
|
||||||
|
|||||||
@ -285,7 +285,7 @@ func (q *QuotaCenter) forceDenyWriting(errorCode commonpb.ErrorCode, collections
|
|||||||
q.currentRates[collection][internalpb.RateType_DMLBulkLoad] = 0
|
q.currentRates[collection][internalpb.RateType_DMLBulkLoad] = 0
|
||||||
q.quotaStates[collection][milvuspb.QuotaState_DenyToWrite] = errorCode
|
q.quotaStates[collection][milvuspb.QuotaState_DenyToWrite] = errorCode
|
||||||
}
|
}
|
||||||
log.Warn("QuotaCenter force to deny writing",
|
log.RatedWarn(10, "QuotaCenter force to deny writing",
|
||||||
zap.Int64s("collectionIDs", collections),
|
zap.Int64s("collectionIDs", collections),
|
||||||
zap.String("reason", errorCode.String()))
|
zap.String("reason", errorCode.String()))
|
||||||
}
|
}
|
||||||
@ -334,6 +334,7 @@ func (q *QuotaCenter) guaranteeMinRate(minRate float64, rateType internalpb.Rate
|
|||||||
|
|
||||||
// calculateReadRates calculates and sets dql rates.
|
// calculateReadRates calculates and sets dql rates.
|
||||||
func (q *QuotaCenter) calculateReadRates() {
|
func (q *QuotaCenter) calculateReadRates() {
|
||||||
|
log := log.Ctx(context.Background()).WithRateGroup("rootcoord.QuotaCenter", 1.0, 60.0)
|
||||||
if Params.QuotaConfig.ForceDenyReading.GetAsBool() {
|
if Params.QuotaConfig.ForceDenyReading.GetAsBool() {
|
||||||
q.forceDenyReading(commonpb.ErrorCode_ForceDeny)
|
q.forceDenyReading(commonpb.ErrorCode_ForceDeny)
|
||||||
return
|
return
|
||||||
@ -393,13 +394,13 @@ func (q *QuotaCenter) calculateReadRates() {
|
|||||||
for _, collection := range collections {
|
for _, collection := range collections {
|
||||||
if q.currentRates[collection][internalpb.RateType_DQLSearch] != Inf && realTimeSearchRate > 0 {
|
if q.currentRates[collection][internalpb.RateType_DQLSearch] != Inf && realTimeSearchRate > 0 {
|
||||||
q.currentRates[collection][internalpb.RateType_DQLSearch] = Limit(realTimeSearchRate * coolOffSpeed)
|
q.currentRates[collection][internalpb.RateType_DQLSearch] = Limit(realTimeSearchRate * coolOffSpeed)
|
||||||
log.Warn("QuotaCenter cool read rates off done",
|
log.RatedWarn(10, "QuotaCenter cool read rates off done",
|
||||||
zap.Int64("collectionID", collection),
|
zap.Int64("collectionID", collection),
|
||||||
zap.Any("searchRate", q.currentRates[collection][internalpb.RateType_DQLSearch]))
|
zap.Any("searchRate", q.currentRates[collection][internalpb.RateType_DQLSearch]))
|
||||||
}
|
}
|
||||||
if q.currentRates[collection][internalpb.RateType_DQLQuery] != Inf && realTimeQueryRate > 0 {
|
if q.currentRates[collection][internalpb.RateType_DQLQuery] != Inf && realTimeQueryRate > 0 {
|
||||||
q.currentRates[collection][internalpb.RateType_DQLQuery] = Limit(realTimeQueryRate * coolOffSpeed)
|
q.currentRates[collection][internalpb.RateType_DQLQuery] = Limit(realTimeQueryRate * coolOffSpeed)
|
||||||
log.Warn("QuotaCenter cool read rates off done",
|
log.RatedWarn(10, "QuotaCenter cool read rates off done",
|
||||||
zap.Int64("collectionID", collection),
|
zap.Int64("collectionID", collection),
|
||||||
zap.Any("queryRate", q.currentRates[collection][internalpb.RateType_DQLQuery]))
|
zap.Any("queryRate", q.currentRates[collection][internalpb.RateType_DQLQuery]))
|
||||||
}
|
}
|
||||||
@ -407,8 +408,6 @@ func (q *QuotaCenter) calculateReadRates() {
|
|||||||
|
|
||||||
q.guaranteeMinRate(Params.QuotaConfig.DQLMinSearchRatePerCollection.GetAsFloat(), internalpb.RateType_DQLSearch, collections...)
|
q.guaranteeMinRate(Params.QuotaConfig.DQLMinSearchRatePerCollection.GetAsFloat(), internalpb.RateType_DQLSearch, collections...)
|
||||||
q.guaranteeMinRate(Params.QuotaConfig.DQLMinQueryRatePerCollection.GetAsFloat(), internalpb.RateType_DQLQuery, collections...)
|
q.guaranteeMinRate(Params.QuotaConfig.DQLMinQueryRatePerCollection.GetAsFloat(), internalpb.RateType_DQLQuery, collections...)
|
||||||
log.Info("QueryNodeMetrics when cool-off",
|
|
||||||
zap.Any("metrics", q.queryNodeMetrics))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: unify search and query?
|
// TODO: unify search and query?
|
||||||
@ -419,7 +418,6 @@ func (q *QuotaCenter) calculateReadRates() {
|
|||||||
|
|
||||||
// calculateWriteRates calculates and sets dml rates.
|
// calculateWriteRates calculates and sets dml rates.
|
||||||
func (q *QuotaCenter) calculateWriteRates() error {
|
func (q *QuotaCenter) calculateWriteRates() error {
|
||||||
log := log.Ctx(context.Background()).WithRateGroup("rootcoord.QuotaCenter", 1.0, 60.0)
|
|
||||||
if Params.QuotaConfig.ForceDenyWriting.GetAsBool() {
|
if Params.QuotaConfig.ForceDenyWriting.GetAsBool() {
|
||||||
q.forceDenyWriting(commonpb.ErrorCode_ForceDeny)
|
q.forceDenyWriting(commonpb.ErrorCode_ForceDeny)
|
||||||
return nil
|
return nil
|
||||||
@ -468,9 +466,6 @@ func (q *QuotaCenter) calculateWriteRates() error {
|
|||||||
}
|
}
|
||||||
q.guaranteeMinRate(Params.QuotaConfig.DMLMinInsertRatePerCollection.GetAsFloat(), internalpb.RateType_DMLInsert)
|
q.guaranteeMinRate(Params.QuotaConfig.DMLMinInsertRatePerCollection.GetAsFloat(), internalpb.RateType_DMLInsert)
|
||||||
q.guaranteeMinRate(Params.QuotaConfig.DMLMinDeleteRatePerCollection.GetAsFloat(), internalpb.RateType_DMLDelete)
|
q.guaranteeMinRate(Params.QuotaConfig.DMLMinDeleteRatePerCollection.GetAsFloat(), internalpb.RateType_DMLDelete)
|
||||||
log.RatedDebug(10, "QuotaCenter cool write rates off done",
|
|
||||||
zap.Int64("collectionID", collection),
|
|
||||||
zap.Float64("factor", factor))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -738,7 +733,7 @@ func (q *QuotaCenter) checkDiskQuota() {
|
|||||||
colDiskQuota := Params.QuotaConfig.DiskQuotaPerCollection.GetAsFloat()
|
colDiskQuota := Params.QuotaConfig.DiskQuotaPerCollection.GetAsFloat()
|
||||||
for collection, binlogSize := range q.dataCoordMetrics.CollectionBinlogSize {
|
for collection, binlogSize := range q.dataCoordMetrics.CollectionBinlogSize {
|
||||||
if float64(binlogSize) >= colDiskQuota {
|
if float64(binlogSize) >= colDiskQuota {
|
||||||
log.Warn("collection disk quota exceeded",
|
log.RatedWarn(10, "collection disk quota exceeded",
|
||||||
zap.Int64("collection", collection),
|
zap.Int64("collection", collection),
|
||||||
zap.Int64("coll disk usage", binlogSize),
|
zap.Int64("coll disk usage", binlogSize),
|
||||||
zap.Float64("coll disk quota", colDiskQuota))
|
zap.Float64("coll disk quota", colDiskQuota))
|
||||||
@ -750,7 +745,7 @@ func (q *QuotaCenter) checkDiskQuota() {
|
|||||||
}
|
}
|
||||||
total := q.dataCoordMetrics.TotalBinlogSize
|
total := q.dataCoordMetrics.TotalBinlogSize
|
||||||
if float64(total) >= totalDiskQuota {
|
if float64(total) >= totalDiskQuota {
|
||||||
log.Warn("total disk quota exceeded",
|
log.RatedWarn(10, "total disk quota exceeded",
|
||||||
zap.Int64("total disk usage", total),
|
zap.Int64("total disk usage", total),
|
||||||
zap.Float64("total disk quota", totalDiskQuota))
|
zap.Float64("total disk quota", totalDiskQuota))
|
||||||
q.forceDenyWriting(commonpb.ErrorCode_DiskQuotaExhausted)
|
q.forceDenyWriting(commonpb.ErrorCode_DiskQuotaExhausted)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user