mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 17:48:29 +08:00
Fix confusing quota log (#28262)
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
parent
f8aa46419a
commit
c16fc854d1
@ -132,8 +132,9 @@ func NewQuotaCenter(proxies *proxyClientManager, queryCoord types.QueryCoordClie
|
||||
|
||||
// run starts the service of QuotaCenter.
|
||||
func (q *QuotaCenter) run() {
|
||||
log.Info("Start QuotaCenter", zap.Float64("collectInterval/s", Params.QuotaConfig.QuotaCenterCollectInterval.GetAsFloat()))
|
||||
ticker := time.NewTicker(time.Duration(Params.QuotaConfig.QuotaCenterCollectInterval.GetAsFloat() * float64(time.Second)))
|
||||
interval := time.Duration(Params.QuotaConfig.QuotaCenterCollectInterval.GetAsFloat() * float64(time.Second))
|
||||
log.Info("Start QuotaCenter", zap.Duration("collectInterval", interval))
|
||||
ticker := time.NewTicker(interval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
@ -408,7 +409,7 @@ func (q *QuotaCenter) calculateReadRates() {
|
||||
zap.Any("queryRate", q.currentRates[collection][internalpb.RateType_DQLQuery]))
|
||||
}
|
||||
|
||||
collectionProps := q.getCollectionLimitConfig(collection)
|
||||
collectionProps := q.getCollectionLimitProperties(collection)
|
||||
q.guaranteeMinRate(getCollectionRateLimitConfig(collectionProps, common.CollectionSearchRateMinKey), internalpb.RateType_DQLSearch, collection)
|
||||
q.guaranteeMinRate(getCollectionRateLimitConfig(collectionProps, common.CollectionQueryRateMinKey), internalpb.RateType_DQLQuery, collection)
|
||||
}
|
||||
@ -474,7 +475,7 @@ func (q *QuotaCenter) calculateWriteRates() error {
|
||||
q.currentRates[collection][internalpb.RateType_DMLDelete] *= Limit(factor)
|
||||
}
|
||||
|
||||
collectionProps := q.getCollectionLimitConfig(collection)
|
||||
collectionProps := q.getCollectionLimitProperties(collection)
|
||||
q.guaranteeMinRate(getCollectionRateLimitConfig(collectionProps, common.CollectionInsertRateMinKey), internalpb.RateType_DMLInsert, collection)
|
||||
q.guaranteeMinRate(getCollectionRateLimitConfig(collectionProps, common.CollectionUpsertRateMinKey), internalpb.RateType_DMLUpsert, collection)
|
||||
q.guaranteeMinRate(getCollectionRateLimitConfig(collectionProps, common.CollectionDeleteRateMinKey), internalpb.RateType_DMLDelete, collection)
|
||||
@ -589,8 +590,9 @@ func (q *QuotaCenter) getMemoryFactor() map[int64]float64 {
|
||||
zap.Int64s("collections", metric.Effect.CollectionIDs),
|
||||
zap.Uint64("UsedMem", metric.Hms.MemoryUsage),
|
||||
zap.Uint64("TotalMem", metric.Hms.Memory),
|
||||
zap.Float64("memoryWaterLevel", memoryWaterLevel),
|
||||
zap.Float64("memoryHighWaterLevel", queryNodeMemoryHighWaterLevel))
|
||||
zap.Float64("curWatermark", memoryWaterLevel),
|
||||
zap.Float64("lowWatermark", queryNodeMemoryLowWaterLevel),
|
||||
zap.Float64("highWatermark", queryNodeMemoryHighWaterLevel))
|
||||
updateCollectionFactor(0, metric.Effect.CollectionIDs)
|
||||
continue
|
||||
}
|
||||
@ -601,8 +603,9 @@ func (q *QuotaCenter) getMemoryFactor() map[int64]float64 {
|
||||
zap.Int64s("collections", metric.Effect.CollectionIDs),
|
||||
zap.Uint64("UsedMem", metric.Hms.MemoryUsage),
|
||||
zap.Uint64("TotalMem", metric.Hms.Memory),
|
||||
zap.Float64("memoryWaterLevel", memoryWaterLevel),
|
||||
zap.Float64("memoryLowWaterLevel", queryNodeMemoryLowWaterLevel))
|
||||
zap.Float64("curWatermark", memoryWaterLevel),
|
||||
zap.Float64("lowWatermark", queryNodeMemoryLowWaterLevel),
|
||||
zap.Float64("highWatermark", queryNodeMemoryHighWaterLevel))
|
||||
}
|
||||
for nodeID, metric := range q.dataNodeMetrics {
|
||||
memoryWaterLevel := float64(metric.Hms.MemoryUsage) / float64(metric.Hms.Memory)
|
||||
@ -615,8 +618,9 @@ func (q *QuotaCenter) getMemoryFactor() map[int64]float64 {
|
||||
zap.Int64s("collections", metric.Effect.CollectionIDs),
|
||||
zap.Uint64("UsedMem", metric.Hms.MemoryUsage),
|
||||
zap.Uint64("TotalMem", metric.Hms.Memory),
|
||||
zap.Float64("memoryWaterLevel", memoryWaterLevel),
|
||||
zap.Float64("memoryHighWaterLevel", dataNodeMemoryHighWaterLevel))
|
||||
zap.Float64("curWatermark", memoryWaterLevel),
|
||||
zap.Float64("lowWatermark", dataNodeMemoryLowWaterLevel),
|
||||
zap.Float64("highWatermark", dataNodeMemoryHighWaterLevel))
|
||||
updateCollectionFactor(0, metric.Effect.CollectionIDs)
|
||||
continue
|
||||
}
|
||||
@ -626,8 +630,9 @@ func (q *QuotaCenter) getMemoryFactor() map[int64]float64 {
|
||||
zap.Int64s("collections", metric.Effect.CollectionIDs),
|
||||
zap.Uint64("UsedMem", metric.Hms.MemoryUsage),
|
||||
zap.Uint64("TotalMem", metric.Hms.Memory),
|
||||
zap.Float64("memoryWaterLevel", memoryWaterLevel),
|
||||
zap.Float64("memoryLowWaterLevel", dataNodeMemoryLowWaterLevel))
|
||||
zap.Float64("curWatermark", memoryWaterLevel),
|
||||
zap.Float64("lowWatermark", dataNodeMemoryLowWaterLevel),
|
||||
zap.Float64("highWatermark", dataNodeMemoryHighWaterLevel))
|
||||
updateCollectionFactor(factor, metric.Effect.CollectionIDs)
|
||||
}
|
||||
return collectionFactor
|
||||
@ -713,7 +718,7 @@ func (q *QuotaCenter) resetCurrentRate(rt internalpb.RateType, collection int64)
|
||||
q.quotaStates[collection] = make(map[milvuspb.QuotaState]commonpb.ErrorCode)
|
||||
}
|
||||
|
||||
collectionProps := q.getCollectionLimitConfig(collection)
|
||||
collectionProps := q.getCollectionLimitProperties(collection)
|
||||
switch rt {
|
||||
case internalpb.RateType_DMLInsert:
|
||||
q.currentRates[collection][rt] = Limit(getCollectionRateLimitConfig(collectionProps, common.CollectionInsertRateMaxKey))
|
||||
@ -733,13 +738,13 @@ func (q *QuotaCenter) resetCurrentRate(rt internalpb.RateType, collection int64)
|
||||
}
|
||||
}
|
||||
|
||||
func (q *QuotaCenter) getCollectionLimitConfig(collection int64) map[string]string {
|
||||
func (q *QuotaCenter) getCollectionLimitProperties(collection int64) map[string]string {
|
||||
log := log.Ctx(context.Background()).WithRateGroup("rootcoord.QuotaCenter", 1.0, 60.0)
|
||||
|
||||
// dbName can be ignored if ts is max timestamps
|
||||
collectionInfo, err := q.meta.GetCollectionByID(context.TODO(), "", collection, typeutil.MaxTimestamp, false)
|
||||
if err != nil {
|
||||
log.RatedWarn(10, "failed to get collection rate limit config",
|
||||
log.RatedWarn(10, "failed to get rate limit properties from collection meta",
|
||||
zap.Int64("collectionID", collection),
|
||||
zap.Error(err))
|
||||
return make(map[string]string)
|
||||
@ -766,7 +771,7 @@ func (q *QuotaCenter) checkDiskQuota() {
|
||||
collections := typeutil.NewUniqueSet()
|
||||
totalDiskQuota := Params.QuotaConfig.DiskQuota.GetAsFloat()
|
||||
for collection, binlogSize := range q.dataCoordMetrics.CollectionBinlogSize {
|
||||
collectionProps := q.getCollectionLimitConfig(collection)
|
||||
collectionProps := q.getCollectionLimitProperties(collection)
|
||||
colDiskQuota := getCollectionRateLimitConfig(collectionProps, common.CollectionDiskQuotaKey)
|
||||
if float64(binlogSize) >= colDiskQuota {
|
||||
log.RatedWarn(10, "collection disk quota exceeded",
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user