mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-28 14:35:27 +08:00
enhance: Optimize QuotaCenter CPU usage (#46388)
issue: https://github.com/milvus-io/milvus/issues/46387 --------- Signed-off-by: sijie-ni-0214 <sijie.ni@zilliz.com>
This commit is contained in:
parent
6f94d8c41a
commit
fc45905ee0
@ -1065,6 +1065,10 @@ quotaAndLimits:
|
||||
# collects metrics from Proxies, Query cluster and Data cluster.
|
||||
# seconds, (0 ~ 65536)
|
||||
quotaCenterCollectInterval: 3
|
||||
# FactorChangeThreshold defines the minimum relative change in factor to trigger an update.
|
||||
# If the factor change is less than this threshold (e.g., 5%), the update is skipped
|
||||
# to reduce unnecessary proxy updates. Range: (0, 1]
|
||||
factorChangeThreshold: 0.05
|
||||
forceDenyAllDDL: false # true to force deny all DDL requests, false to allow.
|
||||
limits:
|
||||
allocRetryTimes: 15 # retry times when delete alloc forward data from rate limit failed
|
||||
@ -1137,14 +1141,6 @@ quotaAndLimits:
|
||||
max: -1
|
||||
partition:
|
||||
max: -1 # MB/s, default no limit
|
||||
upsertRate:
|
||||
max: -1 # MB/s, default no limit
|
||||
db:
|
||||
max: -1 # MB/s, default no limit
|
||||
collection:
|
||||
max: -1 # MB/s, default no limit
|
||||
partition:
|
||||
max: -1 # MB/s, default no limit
|
||||
deleteRate:
|
||||
# Highest data deletion rate per second.
|
||||
# Setting this item to 0.1 indicates that Milvus only allows data deletion at the rate of 0.1 MB/s.
|
||||
|
||||
@ -510,7 +510,7 @@ func (m *meta) GetQuotaInfo() *metricsinfo.DataCoordQuotaMetrics {
|
||||
|
||||
coll, ok := m.collections.Get(segment.GetCollectionID())
|
||||
if ok {
|
||||
collIDStr := fmt.Sprint(segment.GetCollectionID())
|
||||
collIDStr := strconv.FormatInt(segment.GetCollectionID(), 10)
|
||||
coll2DbName[collIDStr] = coll.DatabaseName
|
||||
if _, ok := storedBinlogSize[collIDStr]; !ok {
|
||||
storedBinlogSize[collIDStr] = make(map[string]int64)
|
||||
@ -551,7 +551,7 @@ func (m *meta) GetQuotaInfo() *metricsinfo.DataCoordQuotaMetrics {
|
||||
coll, ok := m.collections.Get(collectionID)
|
||||
if ok {
|
||||
for state, rows := range statesRows {
|
||||
metrics.DataCoordNumStoredRows.WithLabelValues(coll.DatabaseName, fmt.Sprint(collectionID), coll.Schema.GetName(), state.String()).Set(float64(rows))
|
||||
metrics.DataCoordNumStoredRows.WithLabelValues(coll.DatabaseName, strconv.FormatInt(collectionID, 10), coll.Schema.GetName(), state.String()).Set(float64(rows))
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -560,7 +560,7 @@ func (m *meta) GetQuotaInfo() *metricsinfo.DataCoordQuotaMetrics {
|
||||
for collectionID, entriesNum := range collectionL0RowCounts {
|
||||
coll, ok := m.collections.Get(collectionID)
|
||||
if ok {
|
||||
metrics.DataCoordL0DeleteEntriesNum.WithLabelValues(coll.DatabaseName, fmt.Sprint(collectionID)).Set(float64(entriesNum))
|
||||
metrics.DataCoordL0DeleteEntriesNum.WithLabelValues(coll.DatabaseName, strconv.FormatInt(collectionID, 10)).Set(float64(entriesNum))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -2880,7 +2880,7 @@ func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest)
|
||||
metrics.ProxyReportValue.WithLabelValues(nodeID, hookutil.OpTypeUpsert, dbName, username).Add(float64(v))
|
||||
}
|
||||
|
||||
rateCol.Add(internalpb.RateType_DMLUpsert.String(), float64(it.upsertMsg.InsertMsg.Size()+it.upsertMsg.DeleteMsg.Size()))
|
||||
rateCol.Add(internalpb.RateType_DMLInsert.String(), float64(it.upsertMsg.InsertMsg.Size()+it.upsertMsg.DeleteMsg.Size()))
|
||||
if merr.Ok(it.result.GetStatus()) {
|
||||
metrics.ProxyReportValue.WithLabelValues(nodeID, hookutil.OpTypeUpsert, dbName, username).Add(float64(v))
|
||||
}
|
||||
|
||||
@ -67,7 +67,6 @@ func getQuotaMetrics(node *Proxy) (*metricsinfo.ProxyQuotaMetrics, error) {
|
||||
}
|
||||
}
|
||||
getRateMetric(internalpb.RateType_DMLInsert.String())
|
||||
getRateMetric(internalpb.RateType_DMLUpsert.String())
|
||||
getRateMetric(internalpb.RateType_DMLDelete.String())
|
||||
getRateMetric(internalpb.RateType_DQLSearch.String())
|
||||
getSubLabelRateMetric(internalpb.RateType_DQLSearch.String())
|
||||
|
||||
@ -180,7 +180,6 @@ func (node *Proxy) initRateCollector() error {
|
||||
return err
|
||||
}
|
||||
rateCol.Register(internalpb.RateType_DMLInsert.String())
|
||||
rateCol.Register(internalpb.RateType_DMLUpsert.String())
|
||||
rateCol.Register(internalpb.RateType_DMLDelete.String())
|
||||
// TODO: add bulkLoad rate
|
||||
rateCol.Register(internalpb.RateType_DQLSearch.String())
|
||||
|
||||
@ -271,7 +271,7 @@ func TestRateLimitInterceptor(t *testing.T) {
|
||||
}
|
||||
|
||||
testGetFailedResponse(&milvuspb.DeleteRequest{}, internalpb.RateType_DMLDelete, merr.ErrServiceQuotaExceeded, "delete")
|
||||
testGetFailedResponse(&milvuspb.UpsertRequest{}, internalpb.RateType_DMLUpsert, merr.ErrServiceQuotaExceeded, "upsert")
|
||||
testGetFailedResponse(&milvuspb.UpsertRequest{}, internalpb.RateType_DMLInsert, merr.ErrServiceQuotaExceeded, "upsert")
|
||||
testGetFailedResponse(&milvuspb.ImportRequest{}, internalpb.RateType_DMLBulkLoad, merr.ErrServiceMemoryLimitExceeded, "import")
|
||||
testGetFailedResponse(&milvuspb.SearchRequest{}, internalpb.RateType_DQLSearch, merr.ErrServiceDiskLimitExceeded, "search")
|
||||
testGetFailedResponse(&milvuspb.QueryRequest{}, internalpb.RateType_DQLQuery, merr.ErrServiceQuotaExceeded, "query")
|
||||
|
||||
@ -101,7 +101,6 @@ var ddlRateTypes = typeutil.NewSet(
|
||||
|
||||
var dmlRateTypes = typeutil.NewSet(
|
||||
internalpb.RateType_DMLInsert,
|
||||
internalpb.RateType_DMLUpsert,
|
||||
internalpb.RateType_DMLDelete,
|
||||
internalpb.RateType_DMLBulkLoad,
|
||||
)
|
||||
@ -164,12 +163,19 @@ type QuotaCenter struct {
|
||||
// TODO many metrics information only have collection id currently, it can be removed after db id add into all metrics.
|
||||
collectionIDToDBID *typeutil.ConcurrentMap[int64, int64] // collection id -> db id
|
||||
|
||||
collectionProps map[int64]map[string]string // collection id -> collection properties
|
||||
|
||||
rateLimiter *rlinternal.RateLimiterTree
|
||||
|
||||
tsoAllocator tso.Allocator
|
||||
|
||||
rateAllocateStrategy RateAllocateStrategy
|
||||
|
||||
// Cache previous collection rates (baseLimit * factor) to avoid unnecessary updates
|
||||
// If rate change is less than FactorChangeThreshold, skip SetLimit to reduce proxy updates
|
||||
// Key format: "collectionID-rateType"
|
||||
prevRates map[string]float64
|
||||
|
||||
stopOnce sync.Once
|
||||
stopChan chan struct{}
|
||||
wg sync.WaitGroup
|
||||
@ -190,8 +196,10 @@ func NewQuotaCenter(proxies proxyutil.ProxyClientManagerInterface, mixCoord type
|
||||
meta: meta,
|
||||
readableCollections: make(map[int64]map[int64][]int64, 0),
|
||||
writableCollections: make(map[int64]map[int64][]int64, 0),
|
||||
collectionProps: make(map[int64]map[string]string),
|
||||
rateLimiter: rlinternal.NewRateLimiterTree(initInfLimiter(internalpb.RateScope_Cluster, allOps)),
|
||||
rateAllocateStrategy: DefaultRateAllocateStrategy,
|
||||
prevRates: make(map[string]float64),
|
||||
stopChan: make(chan struct{}),
|
||||
}
|
||||
q.clearMetrics()
|
||||
@ -373,6 +381,7 @@ func (q *QuotaCenter) clearMetrics() {
|
||||
q.collectionIDToDBID = typeutil.NewConcurrentMap[int64, int64]()
|
||||
q.collections = typeutil.NewConcurrentMap[string, int64]()
|
||||
q.dbs = typeutil.NewConcurrentMap[string, int64]()
|
||||
q.collectionProps = make(map[int64]map[string]string)
|
||||
}
|
||||
|
||||
func updateNumEntitiesLoaded(current map[int64]int64, qn *metricsinfo.QueryNodeCollectionMetrics) map[int64]int64 {
|
||||
@ -860,8 +869,8 @@ func (q *QuotaCenter) calculateWriteRates() error {
|
||||
collectionFactors := make(map[int64]float64)
|
||||
updateCollectionFactor := func(factors map[int64]float64) {
|
||||
for collection, factor := range factors {
|
||||
_, ok := collectionFactors[collection]
|
||||
if !ok || collectionFactors[collection] > factor {
|
||||
currentFactor, ok := collectionFactors[collection]
|
||||
if !ok || currentFactor > factor {
|
||||
collectionFactors[collection] = factor
|
||||
}
|
||||
}
|
||||
@ -883,8 +892,11 @@ func (q *QuotaCenter) calculateWriteRates() error {
|
||||
ttCollections := make([]int64, 0)
|
||||
memoryCollections := make([]int64, 0)
|
||||
|
||||
// Get factorChangeThreshold
|
||||
factorChangeThreshold := Params.QuotaConfig.FactorChangeThreshold.GetAsFloat()
|
||||
|
||||
for collection, factor := range collectionFactors {
|
||||
metrics.RootCoordRateLimitRatio.WithLabelValues(fmt.Sprint(collection)).Set(1 - factor)
|
||||
metrics.RootCoordRateLimitRatio.WithLabelValues(strconv.FormatInt(collection, 10)).Set(1 - factor)
|
||||
if factor <= 0 {
|
||||
if _, ok := ttFactors[collection]; ok && factor == ttFactors[collection] {
|
||||
// factor comes from ttFactor
|
||||
@ -907,22 +919,32 @@ func (q *QuotaCenter) calculateWriteRates() error {
|
||||
limiter := collectionLimiter.GetLimiters()
|
||||
for _, rt := range []internalpb.RateType{
|
||||
internalpb.RateType_DMLInsert,
|
||||
internalpb.RateType_DMLUpsert,
|
||||
internalpb.RateType_DMLDelete,
|
||||
} {
|
||||
v, ok := limiter.Get(rt)
|
||||
if ok {
|
||||
if v.Limit() != Inf {
|
||||
v.SetLimit(v.Limit() * Limit(factor))
|
||||
if !ok || v.Limit() == Inf {
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if rate change is significant enough to trigger an update
|
||||
// Calculate newRate = baseLimit * factor for each rate type and compare with previous rate
|
||||
// If the rate change is less than FactorChangeThreshold, skip SetLimit to reduce proxy updates
|
||||
newRate := float64(v.Limit() * Limit(factor))
|
||||
rateKey := strconv.FormatInt(collection, 10) + "-" + strconv.FormatInt(int64(rt), 10)
|
||||
prevRate, ok := q.prevRates[rateKey]
|
||||
if ok && prevRate > 0 {
|
||||
relativeChange := math.Abs(newRate-prevRate) / prevRate
|
||||
if relativeChange < factorChangeThreshold {
|
||||
continue
|
||||
}
|
||||
}
|
||||
v.SetLimit(Limit(newRate))
|
||||
q.prevRates[rateKey] = newRate
|
||||
}
|
||||
|
||||
collectionProps := q.getCollectionLimitProperties(collection)
|
||||
q.guaranteeMinRate(getCollectionRateLimitConfig(collectionProps, common.CollectionInsertRateMinKey),
|
||||
internalpb.RateType_DMLInsert, collectionLimiter)
|
||||
q.guaranteeMinRate(getCollectionRateLimitConfig(collectionProps, common.CollectionUpsertRateMinKey),
|
||||
internalpb.RateType_DMLUpsert, collectionLimiter)
|
||||
q.guaranteeMinRate(getCollectionRateLimitConfig(collectionProps, common.CollectionDeleteRateMinKey),
|
||||
internalpb.RateType_DMLDelete, collectionLimiter)
|
||||
if factor < 1.0 {
|
||||
@ -963,8 +985,8 @@ func (q *QuotaCenter) getTimeTickDelayFactor(ts Timestamp) map[int64]float64 {
|
||||
collectionsMaxDelay := make(map[int64]time.Duration)
|
||||
updateCollectionDelay := func(delay time.Duration, collections []int64) {
|
||||
for _, collection := range collections {
|
||||
_, ok := collectionsMaxDelay[collection]
|
||||
if !ok || collectionsMaxDelay[collection] < delay {
|
||||
currentDelay, ok := collectionsMaxDelay[collection]
|
||||
if !ok || currentDelay < delay {
|
||||
collectionsMaxDelay[collection] = delay
|
||||
}
|
||||
}
|
||||
@ -1053,8 +1075,8 @@ func (q *QuotaCenter) getMemoryFactor() map[int64]float64 {
|
||||
collectionFactor := make(map[int64]float64)
|
||||
updateCollectionFactor := func(factor float64, collections []int64) {
|
||||
for _, collection := range collections {
|
||||
_, ok := collectionFactor[collection]
|
||||
if !ok || collectionFactor[collection] > factor {
|
||||
currentFactor, ok := collectionFactor[collection]
|
||||
if !ok || currentFactor > factor {
|
||||
collectionFactor[collection] = factor
|
||||
}
|
||||
}
|
||||
@ -1130,8 +1152,8 @@ func (q *QuotaCenter) getGrowingSegmentsSizeFactor() map[int64]float64 {
|
||||
collectionFactor := make(map[int64]float64)
|
||||
updateCollectionFactor := func(factor float64, collections []int64) {
|
||||
for _, collection := range collections {
|
||||
_, ok := collectionFactor[collection]
|
||||
if !ok || collectionFactor[collection] > factor {
|
||||
currentFactor, ok := collectionFactor[collection]
|
||||
if !ok || currentFactor > factor {
|
||||
collectionFactor[collection] = factor
|
||||
}
|
||||
}
|
||||
@ -1285,15 +1307,22 @@ func (q *QuotaCenter) resetAllCurrentRates() error {
|
||||
}
|
||||
}
|
||||
|
||||
collectionRateTypes := getRateTypes(internalpb.RateScope_Collection, allOps)
|
||||
initLimiters := func(sourceCollections map[int64]map[int64][]int64) {
|
||||
for dbID, collections := range sourceCollections {
|
||||
for collectionID, partitionIDs := range collections {
|
||||
getCollectionLimitVal := func(rateType internalpb.RateType) Limit {
|
||||
limitVal, err := q.getCollectionMaxLimit(rateType, collectionID)
|
||||
collectionLimitVals := make(map[internalpb.RateType]Limit, collectionRateTypes.Len())
|
||||
collectionRateTypes.Range(func(rt internalpb.RateType) bool {
|
||||
limitVal, err := q.getCollectionMaxLimit(rt, collectionID)
|
||||
if err != nil {
|
||||
return Limit(quota.GetQuotaValue(internalpb.RateScope_Collection, rateType, Params))
|
||||
limitVal = Limit(quota.GetQuotaValue(internalpb.RateScope_Collection, rt, Params))
|
||||
}
|
||||
return limitVal
|
||||
collectionLimitVals[rt] = limitVal
|
||||
return true
|
||||
})
|
||||
|
||||
getCollectionLimitVal := func(rateType internalpb.RateType) Limit {
|
||||
return collectionLimitVals[rateType]
|
||||
}
|
||||
q.rateLimiter.GetOrCreateCollectionLimiters(dbID, collectionID,
|
||||
newParamLimiterFunc(internalpb.RateScope_Database, allOps),
|
||||
@ -1325,8 +1354,6 @@ func (q *QuotaCenter) getCollectionMaxLimit(rt internalpb.RateType, collectionID
|
||||
switch rt {
|
||||
case internalpb.RateType_DMLInsert:
|
||||
return Limit(getCollectionRateLimitConfig(collectionProps, common.CollectionInsertRateMaxKey)), nil
|
||||
case internalpb.RateType_DMLUpsert:
|
||||
return Limit(getCollectionRateLimitConfig(collectionProps, common.CollectionUpsertRateMaxKey)), nil
|
||||
case internalpb.RateType_DMLDelete:
|
||||
return Limit(getCollectionRateLimitConfig(collectionProps, common.CollectionDeleteRateMaxKey)), nil
|
||||
case internalpb.RateType_DMLBulkLoad:
|
||||
@ -1342,6 +1369,11 @@ func (q *QuotaCenter) getCollectionMaxLimit(rt internalpb.RateType, collectionID
|
||||
|
||||
func (q *QuotaCenter) getCollectionLimitProperties(collection int64) map[string]string {
|
||||
log := log.Ctx(context.Background()).WithRateGroup("rootcoord.QuotaCenter", 1.0, 60.0)
|
||||
|
||||
if props, ok := q.collectionProps[collection]; ok {
|
||||
return props
|
||||
}
|
||||
|
||||
collectionInfo, err := q.meta.GetCollectionByIDWithMaxTs(context.TODO(), collection)
|
||||
if err != nil {
|
||||
log.RatedWarn(10, "failed to get rate limit properties from collection meta",
|
||||
@ -1355,6 +1387,8 @@ func (q *QuotaCenter) getCollectionLimitProperties(collection int64) map[string]
|
||||
properties[pair.GetKey()] = pair.GetValue()
|
||||
}
|
||||
|
||||
q.collectionProps[collection] = properties
|
||||
|
||||
return properties
|
||||
}
|
||||
|
||||
@ -1600,10 +1634,11 @@ func (q *QuotaCenter) recordMetrics() {
|
||||
return true
|
||||
})
|
||||
|
||||
record := func(errorCode commonpb.ErrorCode) {
|
||||
rlinternal.TraverseRateLimiterTree(q.rateLimiter.GetRootLimiters(), nil,
|
||||
func(node *rlinternal.RateLimiterNode, state milvuspb.QuotaState, errCode commonpb.ErrorCode) bool {
|
||||
if errCode == errorCode {
|
||||
if errCode == commonpb.ErrorCode_MemoryQuotaExhausted ||
|
||||
errCode == commonpb.ErrorCode_DiskQuotaExhausted ||
|
||||
errCode == commonpb.ErrorCode_TimeTickLongDelay {
|
||||
var name string
|
||||
switch node.Level() {
|
||||
case internalpb.RateScope_Cluster:
|
||||
@ -1615,16 +1650,12 @@ func (q *QuotaCenter) recordMetrics() {
|
||||
default:
|
||||
return false
|
||||
}
|
||||
metrics.RootCoordQuotaStates.WithLabelValues(errorCode.String(), name).Set(1.0)
|
||||
metrics.RootCoordQuotaStates.WithLabelValues(errCode.String(), name).Set(1.0)
|
||||
metrics.RootCoordForceDenyWritingCounter.Inc()
|
||||
return false
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
record(commonpb.ErrorCode_MemoryQuotaExhausted)
|
||||
record(commonpb.ErrorCode_DiskQuotaExhausted)
|
||||
record(commonpb.ErrorCode_TimeTickLongDelay)
|
||||
}
|
||||
|
||||
func (q *QuotaCenter) diskAllowance(collection UniqueID) float64 {
|
||||
|
||||
@ -354,7 +354,6 @@ func TestQuotaCenter(t *testing.T) {
|
||||
|
||||
for _, rt := range []internalpb.RateType{
|
||||
internalpb.RateType_DMLInsert,
|
||||
internalpb.RateType_DMLUpsert,
|
||||
internalpb.RateType_DMLDelete,
|
||||
internalpb.RateType_DMLBulkLoad,
|
||||
} {
|
||||
@ -372,7 +371,6 @@ func TestQuotaCenter(t *testing.T) {
|
||||
assert.NotNil(t, limiters)
|
||||
for _, rt := range []internalpb.RateType{
|
||||
internalpb.RateType_DMLInsert,
|
||||
internalpb.RateType_DMLUpsert,
|
||||
internalpb.RateType_DMLDelete,
|
||||
internalpb.RateType_DMLBulkLoad,
|
||||
} {
|
||||
@ -539,8 +537,6 @@ func TestQuotaCenter(t *testing.T) {
|
||||
paramtable.Get().Save(Params.QuotaConfig.MaxTimeTickDelay.Key, "10.0")
|
||||
paramtable.Get().Save(Params.QuotaConfig.DMLMaxInsertRatePerCollection.Key, "100.0")
|
||||
paramtable.Get().Save(Params.QuotaConfig.DMLMinInsertRatePerCollection.Key, "0.0")
|
||||
paramtable.Get().Save(Params.QuotaConfig.DMLMaxUpsertRatePerCollection.Key, "100.0")
|
||||
paramtable.Get().Save(Params.QuotaConfig.DMLMinUpsertRatePerCollection.Key, "0.0")
|
||||
paramtable.Get().Save(Params.QuotaConfig.DMLMaxDeleteRatePerCollection.Key, "100.0")
|
||||
paramtable.Get().Save(Params.QuotaConfig.DMLMinDeleteRatePerCollection.Key, "0.0")
|
||||
forceBak := Params.QuotaConfig.ForceDenyWriting.GetValue()
|
||||
@ -687,8 +683,6 @@ func TestQuotaCenter(t *testing.T) {
|
||||
limiters := quotaCenter.rateLimiter.GetRootLimiters().GetLimiters()
|
||||
a, _ := limiters.Get(internalpb.RateType_DMLInsert)
|
||||
assert.Equal(t, Limit(0), a.Limit())
|
||||
b, _ := limiters.Get(internalpb.RateType_DMLUpsert)
|
||||
assert.Equal(t, Limit(0), b.Limit())
|
||||
c, _ := limiters.Get(internalpb.RateType_DMLDelete)
|
||||
assert.Equal(t, Limit(0), c.Limit())
|
||||
|
||||
@ -718,8 +712,6 @@ func TestQuotaCenter(t *testing.T) {
|
||||
limiters = rln.GetLimiters()
|
||||
a, _ = limiters.Get(internalpb.RateType_DMLInsert)
|
||||
assert.NotEqual(t, Limit(0), a.Limit())
|
||||
b, _ = limiters.Get(internalpb.RateType_DMLUpsert)
|
||||
assert.NotEqual(t, Limit(0), b.Limit())
|
||||
c, _ = limiters.Get(internalpb.RateType_DMLDelete)
|
||||
assert.NotEqual(t, Limit(0), c.Limit())
|
||||
|
||||
@ -727,8 +719,6 @@ func TestQuotaCenter(t *testing.T) {
|
||||
limiters = rln.GetLimiters()
|
||||
a, _ = limiters.Get(internalpb.RateType_DMLInsert)
|
||||
assert.Equal(t, Limit(0), a.Limit())
|
||||
b, _ = limiters.Get(internalpb.RateType_DMLUpsert)
|
||||
assert.Equal(t, Limit(0), b.Limit())
|
||||
c, _ = limiters.Get(internalpb.RateType_DMLDelete)
|
||||
assert.Equal(t, Limit(0), c.Limit())
|
||||
|
||||
@ -891,15 +881,11 @@ func TestQuotaCenter(t *testing.T) {
|
||||
if lo.Contains(notEquals, collection) {
|
||||
a, _ := limiters.Get(internalpb.RateType_DMLInsert)
|
||||
assert.NotEqual(t, Limit(0), a.Limit())
|
||||
b, _ := limiters.Get(internalpb.RateType_DMLUpsert)
|
||||
assert.NotEqual(t, Limit(0), b.Limit())
|
||||
c, _ := limiters.Get(internalpb.RateType_DMLDelete)
|
||||
assert.NotEqual(t, Limit(0), c.Limit())
|
||||
} else {
|
||||
a, _ := limiters.Get(internalpb.RateType_DMLInsert)
|
||||
assert.Equal(t, Limit(0), a.Limit())
|
||||
b, _ := limiters.Get(internalpb.RateType_DMLUpsert)
|
||||
assert.Equal(t, Limit(0), b.Limit())
|
||||
c, _ := limiters.Get(internalpb.RateType_DMLDelete)
|
||||
assert.NotEqual(t, Limit(0), c.Limit())
|
||||
}
|
||||
@ -911,8 +897,6 @@ func TestQuotaCenter(t *testing.T) {
|
||||
root := quotaCenter.rateLimiter.GetRootLimiters().GetLimiters()
|
||||
a, _ := root.Get(internalpb.RateType_DMLInsert)
|
||||
assert.Equal(t, Limit(0), a.Limit())
|
||||
b, _ := root.Get(internalpb.RateType_DMLUpsert)
|
||||
assert.Equal(t, Limit(0), b.Limit())
|
||||
c, _ := root.Get(internalpb.RateType_DMLDelete)
|
||||
assert.NotEqual(t, Limit(0), c.Limit())
|
||||
}
|
||||
@ -1091,7 +1075,6 @@ func TestQuotaCenter(t *testing.T) {
|
||||
}
|
||||
|
||||
assert.Equal(t, getRate(limiters, internalpb.RateType_DMLInsert), Params.QuotaConfig.DMLMaxInsertRatePerCollection.GetAsFloat())
|
||||
assert.Equal(t, getRate(limiters, internalpb.RateType_DMLUpsert), Params.QuotaConfig.DMLMaxUpsertRatePerCollection.GetAsFloat())
|
||||
assert.Equal(t, getRate(limiters, internalpb.RateType_DMLDelete), Params.QuotaConfig.DMLMaxDeleteRatePerCollection.GetAsFloat())
|
||||
assert.Equal(t, getRate(limiters, internalpb.RateType_DMLBulkLoad), Params.QuotaConfig.DMLMaxBulkLoadRatePerCollection.GetAsFloat())
|
||||
assert.Equal(t, getRate(limiters, internalpb.RateType_DQLSearch), Params.QuotaConfig.DQLMaxSearchRatePerCollection.GetAsFloat())
|
||||
@ -1125,10 +1108,6 @@ func TestQuotaCenter(t *testing.T) {
|
||||
Key: common.CollectionSearchRateMaxKey,
|
||||
Value: "5",
|
||||
},
|
||||
{
|
||||
Key: common.CollectionUpsertRateMaxKey,
|
||||
Value: "6",
|
||||
},
|
||||
},
|
||||
}, nil)
|
||||
quotaCenter.resetAllCurrentRates()
|
||||
@ -1138,7 +1117,6 @@ func TestQuotaCenter(t *testing.T) {
|
||||
assert.Equal(t, getRate(limiters, internalpb.RateType_DMLBulkLoad), float64(3*1024*1024))
|
||||
assert.Equal(t, getRate(limiters, internalpb.RateType_DQLQuery), float64(4))
|
||||
assert.Equal(t, getRate(limiters, internalpb.RateType_DQLSearch), float64(5))
|
||||
assert.Equal(t, getRate(limiters, internalpb.RateType_DMLUpsert), float64(6*1024*1024))
|
||||
})
|
||||
}
|
||||
|
||||
@ -1579,7 +1557,7 @@ func TestGetRateType(t *testing.T) {
|
||||
|
||||
t.Run("ddl cluster scope", func(t *testing.T) {
|
||||
a := getRateTypes(internalpb.RateScope_Cluster, allOps)
|
||||
assert.Equal(t, 12, a.Len())
|
||||
assert.Equal(t, 11, a.Len())
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@ -126,10 +126,6 @@ func getCollectionRateLimitConfigDefaultValue(configKey string) float64 {
|
||||
return Params.QuotaConfig.DMLMaxInsertRatePerCollection.GetAsFloat()
|
||||
case common.CollectionInsertRateMinKey:
|
||||
return Params.QuotaConfig.DMLMinInsertRatePerCollection.GetAsFloat()
|
||||
case common.CollectionUpsertRateMaxKey:
|
||||
return Params.QuotaConfig.DMLMaxUpsertRatePerCollection.GetAsFloat()
|
||||
case common.CollectionUpsertRateMinKey:
|
||||
return Params.QuotaConfig.DMLMinUpsertRatePerCollection.GetAsFloat()
|
||||
case common.CollectionDeleteRateMaxKey:
|
||||
return Params.QuotaConfig.DMLMaxDeleteRatePerCollection.GetAsFloat()
|
||||
case common.CollectionDeleteRateMinKey:
|
||||
@ -167,10 +163,6 @@ func getRateLimitConfig(properties map[string]string, configKey string, configVa
|
||||
return megaBytes2Bytes(rate)
|
||||
case common.CollectionInsertRateMinKey:
|
||||
return megaBytes2Bytes(rate)
|
||||
case common.CollectionUpsertRateMaxKey:
|
||||
return megaBytes2Bytes(rate)
|
||||
case common.CollectionUpsertRateMinKey:
|
||||
return megaBytes2Bytes(rate)
|
||||
case common.CollectionDeleteRateMaxKey:
|
||||
return megaBytes2Bytes(rate)
|
||||
case common.CollectionDeleteRateMinKey:
|
||||
|
||||
@ -47,7 +47,6 @@ func initLimitConfigMaps() {
|
||||
internalpb.RateType_DDLFlush: "aConfig.MaxFlushRate,
|
||||
internalpb.RateType_DDLCompaction: "aConfig.MaxCompactionRate,
|
||||
internalpb.RateType_DMLInsert: "aConfig.DMLMaxInsertRate,
|
||||
internalpb.RateType_DMLUpsert: "aConfig.DMLMaxUpsertRate,
|
||||
internalpb.RateType_DMLDelete: "aConfig.DMLMaxDeleteRate,
|
||||
internalpb.RateType_DMLBulkLoad: "aConfig.DMLMaxBulkLoadRate,
|
||||
internalpb.RateType_DQLSearch: "aConfig.DQLMaxSearchRate,
|
||||
@ -61,7 +60,6 @@ func initLimitConfigMaps() {
|
||||
internalpb.RateType_DDLFlush: "aConfig.MaxFlushRatePerDB,
|
||||
internalpb.RateType_DDLCompaction: "aConfig.MaxCompactionRatePerDB,
|
||||
internalpb.RateType_DMLInsert: "aConfig.DMLMaxInsertRatePerDB,
|
||||
internalpb.RateType_DMLUpsert: "aConfig.DMLMaxUpsertRatePerDB,
|
||||
internalpb.RateType_DMLDelete: "aConfig.DMLMaxDeleteRatePerDB,
|
||||
internalpb.RateType_DMLBulkLoad: "aConfig.DMLMaxBulkLoadRatePerDB,
|
||||
internalpb.RateType_DQLSearch: "aConfig.DQLMaxSearchRatePerDB,
|
||||
@ -69,7 +67,6 @@ func initLimitConfigMaps() {
|
||||
},
|
||||
internalpb.RateScope_Collection: {
|
||||
internalpb.RateType_DMLInsert: "aConfig.DMLMaxInsertRatePerCollection,
|
||||
internalpb.RateType_DMLUpsert: "aConfig.DMLMaxUpsertRatePerCollection,
|
||||
internalpb.RateType_DMLDelete: "aConfig.DMLMaxDeleteRatePerCollection,
|
||||
internalpb.RateType_DMLBulkLoad: "aConfig.DMLMaxBulkLoadRatePerCollection,
|
||||
internalpb.RateType_DQLSearch: "aConfig.DQLMaxSearchRatePerCollection,
|
||||
@ -78,7 +75,6 @@ func initLimitConfigMaps() {
|
||||
},
|
||||
internalpb.RateScope_Partition: {
|
||||
internalpb.RateType_DMLInsert: "aConfig.DMLMaxInsertRatePerPartition,
|
||||
internalpb.RateType_DMLUpsert: "aConfig.DMLMaxUpsertRatePerPartition,
|
||||
internalpb.RateType_DMLDelete: "aConfig.DMLMaxDeleteRatePerPartition,
|
||||
internalpb.RateType_DMLBulkLoad: "aConfig.DMLMaxBulkLoadRatePerPartition,
|
||||
internalpb.RateType_DQLSearch: "aConfig.DQLMaxSearchRatePerPartition,
|
||||
|
||||
@ -32,19 +32,19 @@ func TestGetQuotaConfigMap(t *testing.T) {
|
||||
paramtable.Init()
|
||||
{
|
||||
m := GetQuotaConfigMap(internalpb.RateScope_Cluster)
|
||||
assert.Equal(t, 12, len(m))
|
||||
}
|
||||
{
|
||||
m := GetQuotaConfigMap(internalpb.RateScope_Database)
|
||||
assert.Equal(t, 11, len(m))
|
||||
}
|
||||
{
|
||||
m := GetQuotaConfigMap(internalpb.RateScope_Database)
|
||||
assert.Equal(t, 10, len(m))
|
||||
}
|
||||
{
|
||||
m := GetQuotaConfigMap(internalpb.RateScope_Collection)
|
||||
assert.Equal(t, 7, len(m))
|
||||
assert.Equal(t, 6, len(m))
|
||||
}
|
||||
{
|
||||
m := GetQuotaConfigMap(internalpb.RateScope_Partition)
|
||||
assert.Equal(t, 6, len(m))
|
||||
assert.Equal(t, 5, len(m))
|
||||
}
|
||||
{
|
||||
m := GetQuotaConfigMap(internalpb.RateScope(1000))
|
||||
|
||||
@ -89,7 +89,7 @@ func (rln *RateLimiterNode) Check(rt internalpb.RateType, n int) error {
|
||||
|
||||
func (rln *RateLimiterNode) GetQuotaExceededError(rt internalpb.RateType) error {
|
||||
switch rt {
|
||||
case internalpb.RateType_DMLInsert, internalpb.RateType_DMLUpsert, internalpb.RateType_DMLDelete, internalpb.RateType_DMLBulkLoad:
|
||||
case internalpb.RateType_DMLInsert, internalpb.RateType_DMLDelete, internalpb.RateType_DMLBulkLoad:
|
||||
if errCode, ok := rln.quotaStates.Get(milvuspb.QuotaState_DenyToWrite); ok {
|
||||
return merr.WrapErrServiceQuotaExceeded(ratelimitutil.GetQuotaErrorString(errCode))
|
||||
}
|
||||
|
||||
@ -201,8 +201,6 @@ const (
|
||||
// rate limit
|
||||
CollectionInsertRateMaxKey = "collection.insertRate.max.mb"
|
||||
CollectionInsertRateMinKey = "collection.insertRate.min.mb"
|
||||
CollectionUpsertRateMaxKey = "collection.upsertRate.max.mb"
|
||||
CollectionUpsertRateMinKey = "collection.upsertRate.min.mb"
|
||||
CollectionDeleteRateMaxKey = "collection.deleteRate.max.mb"
|
||||
CollectionDeleteRateMinKey = "collection.deleteRate.min.mb"
|
||||
CollectionBulkLoadRateMaxKey = "collection.bulkLoadRate.max.mb"
|
||||
|
||||
@ -310,7 +310,7 @@ enum RateType {
|
||||
DMLBulkLoad = 7;
|
||||
DQLSearch = 8;
|
||||
DQLQuery = 9;
|
||||
DMLUpsert = 10;
|
||||
DMLUpsert = 10 [deprecated = true]; // UpsertRequest uses DMLInsert for rate limiting
|
||||
DDLDB = 11;
|
||||
}
|
||||
|
||||
|
||||
@ -88,7 +88,8 @@ const (
|
||||
RateType_DMLBulkLoad RateType = 7
|
||||
RateType_DQLSearch RateType = 8
|
||||
RateType_DQLQuery RateType = 9
|
||||
RateType_DMLUpsert RateType = 10
|
||||
// Deprecated: Marked as deprecated in internal.proto.
|
||||
RateType_DMLUpsert RateType = 10 // UpsertRequest uses DMLInsert for rate limiting
|
||||
RateType_DDLDB RateType = 11
|
||||
)
|
||||
|
||||
@ -5003,7 +5004,7 @@ var file_internal_proto_rawDesc = []byte{
|
||||
0x63, 0x6f, 0x70, 0x65, 0x12, 0x0b, 0x0a, 0x07, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x10,
|
||||
0x00, 0x12, 0x0c, 0x0a, 0x08, 0x44, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x10, 0x01, 0x12,
|
||||
0x0e, 0x0a, 0x0a, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x10, 0x02, 0x12,
|
||||
0x0d, 0x0a, 0x09, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x10, 0x03, 0x2a, 0xc4,
|
||||
0x0d, 0x0a, 0x09, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x10, 0x03, 0x2a, 0xc8,
|
||||
0x01, 0x0a, 0x08, 0x52, 0x61, 0x74, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x11, 0x0a, 0x0d, 0x44,
|
||||
0x44, 0x4c, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x10, 0x00, 0x12, 0x10,
|
||||
0x0a, 0x0c, 0x44, 0x44, 0x4c, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x10, 0x01,
|
||||
@ -5014,21 +5015,21 @@ var file_internal_proto_rawDesc = []byte{
|
||||
0x0a, 0x09, 0x44, 0x4d, 0x4c, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x10, 0x06, 0x12, 0x0f, 0x0a,
|
||||
0x0b, 0x44, 0x4d, 0x4c, 0x42, 0x75, 0x6c, 0x6b, 0x4c, 0x6f, 0x61, 0x64, 0x10, 0x07, 0x12, 0x0d,
|
||||
0x0a, 0x09, 0x44, 0x51, 0x4c, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x10, 0x08, 0x12, 0x0c, 0x0a,
|
||||
0x08, 0x44, 0x51, 0x4c, 0x51, 0x75, 0x65, 0x72, 0x79, 0x10, 0x09, 0x12, 0x0d, 0x0a, 0x09, 0x44,
|
||||
0x4d, 0x4c, 0x55, 0x70, 0x73, 0x65, 0x72, 0x74, 0x10, 0x0a, 0x12, 0x09, 0x0a, 0x05, 0x44, 0x44,
|
||||
0x4c, 0x44, 0x42, 0x10, 0x0b, 0x2a, 0x83, 0x01, 0x0a, 0x0e, 0x49, 0x6d, 0x70, 0x6f, 0x72, 0x74,
|
||||
0x4a, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x08, 0x0a, 0x04, 0x4e, 0x6f, 0x6e, 0x65,
|
||||
0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x10, 0x01, 0x12,
|
||||
0x10, 0x0a, 0x0c, 0x50, 0x72, 0x65, 0x49, 0x6d, 0x70, 0x6f, 0x72, 0x74, 0x69, 0x6e, 0x67, 0x10,
|
||||
0x02, 0x12, 0x0d, 0x0a, 0x09, 0x49, 0x6d, 0x70, 0x6f, 0x72, 0x74, 0x69, 0x6e, 0x67, 0x10, 0x03,
|
||||
0x12, 0x0a, 0x0a, 0x06, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x10, 0x04, 0x12, 0x0d, 0x0a, 0x09,
|
||||
0x43, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x10, 0x05, 0x12, 0x11, 0x0a, 0x0d, 0x49,
|
||||
0x6e, 0x64, 0x65, 0x78, 0x42, 0x75, 0x69, 0x6c, 0x64, 0x69, 0x6e, 0x67, 0x10, 0x06, 0x12, 0x0b,
|
||||
0x0a, 0x07, 0x53, 0x6f, 0x72, 0x74, 0x69, 0x6e, 0x67, 0x10, 0x07, 0x42, 0x35, 0x5a, 0x33, 0x67,
|
||||
0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6d, 0x69, 0x6c, 0x76, 0x75, 0x73,
|
||||
0x2d, 0x69, 0x6f, 0x2f, 0x6d, 0x69, 0x6c, 0x76, 0x75, 0x73, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x76,
|
||||
0x32, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c,
|
||||
0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
|
||||
0x08, 0x44, 0x51, 0x4c, 0x51, 0x75, 0x65, 0x72, 0x79, 0x10, 0x09, 0x12, 0x11, 0x0a, 0x09, 0x44,
|
||||
0x4d, 0x4c, 0x55, 0x70, 0x73, 0x65, 0x72, 0x74, 0x10, 0x0a, 0x1a, 0x02, 0x08, 0x01, 0x12, 0x09,
|
||||
0x0a, 0x05, 0x44, 0x44, 0x4c, 0x44, 0x42, 0x10, 0x0b, 0x2a, 0x83, 0x01, 0x0a, 0x0e, 0x49, 0x6d,
|
||||
0x70, 0x6f, 0x72, 0x74, 0x4a, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x08, 0x0a, 0x04,
|
||||
0x4e, 0x6f, 0x6e, 0x65, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e,
|
||||
0x67, 0x10, 0x01, 0x12, 0x10, 0x0a, 0x0c, 0x50, 0x72, 0x65, 0x49, 0x6d, 0x70, 0x6f, 0x72, 0x74,
|
||||
0x69, 0x6e, 0x67, 0x10, 0x02, 0x12, 0x0d, 0x0a, 0x09, 0x49, 0x6d, 0x70, 0x6f, 0x72, 0x74, 0x69,
|
||||
0x6e, 0x67, 0x10, 0x03, 0x12, 0x0a, 0x0a, 0x06, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x10, 0x04,
|
||||
0x12, 0x0d, 0x0a, 0x09, 0x43, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x10, 0x05, 0x12,
|
||||
0x11, 0x0a, 0x0d, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x42, 0x75, 0x69, 0x6c, 0x64, 0x69, 0x6e, 0x67,
|
||||
0x10, 0x06, 0x12, 0x0b, 0x0a, 0x07, 0x53, 0x6f, 0x72, 0x74, 0x69, 0x6e, 0x67, 0x10, 0x07, 0x42,
|
||||
0x35, 0x5a, 0x33, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6d, 0x69,
|
||||
0x6c, 0x76, 0x75, 0x73, 0x2d, 0x69, 0x6f, 0x2f, 0x6d, 0x69, 0x6c, 0x76, 0x75, 0x73, 0x2f, 0x70,
|
||||
0x6b, 0x67, 0x2f, 0x76, 0x32, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x69, 0x6e, 0x74, 0x65,
|
||||
0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
|
||||
}
|
||||
|
||||
var (
|
||||
|
||||
@ -44,6 +44,7 @@ const (
|
||||
type quotaConfig struct {
|
||||
QuotaAndLimitsEnabled ParamItem `refreshable:"false"`
|
||||
QuotaCenterCollectInterval ParamItem `refreshable:"false"`
|
||||
FactorChangeThreshold ParamItem `refreshable:"true"`
|
||||
ForceDenyAllDDL ParamItem `refreshable:"true"`
|
||||
AllocRetryTimes ParamItem `refreshable:"false"`
|
||||
AllocWaitInterval ParamItem `refreshable:"false"`
|
||||
@ -77,32 +78,24 @@ type quotaConfig struct {
|
||||
DMLLimitEnabled ParamItem `refreshable:"true"`
|
||||
DMLMaxInsertRate ParamItem `refreshable:"true"`
|
||||
DMLMinInsertRate ParamItem `refreshable:"true"`
|
||||
DMLMaxUpsertRate ParamItem `refreshable:"true"`
|
||||
DMLMinUpsertRate ParamItem `refreshable:"true"`
|
||||
DMLMaxDeleteRate ParamItem `refreshable:"true"`
|
||||
DMLMinDeleteRate ParamItem `refreshable:"true"`
|
||||
DMLMaxBulkLoadRate ParamItem `refreshable:"true"`
|
||||
DMLMinBulkLoadRate ParamItem `refreshable:"true"`
|
||||
DMLMaxInsertRatePerDB ParamItem `refreshable:"true"`
|
||||
DMLMinInsertRatePerDB ParamItem `refreshable:"true"`
|
||||
DMLMaxUpsertRatePerDB ParamItem `refreshable:"true"`
|
||||
DMLMinUpsertRatePerDB ParamItem `refreshable:"true"`
|
||||
DMLMaxDeleteRatePerDB ParamItem `refreshable:"true"`
|
||||
DMLMinDeleteRatePerDB ParamItem `refreshable:"true"`
|
||||
DMLMaxBulkLoadRatePerDB ParamItem `refreshable:"true"`
|
||||
DMLMinBulkLoadRatePerDB ParamItem `refreshable:"true"`
|
||||
DMLMaxInsertRatePerCollection ParamItem `refreshable:"true"`
|
||||
DMLMinInsertRatePerCollection ParamItem `refreshable:"true"`
|
||||
DMLMaxUpsertRatePerCollection ParamItem `refreshable:"true"`
|
||||
DMLMinUpsertRatePerCollection ParamItem `refreshable:"true"`
|
||||
DMLMaxDeleteRatePerCollection ParamItem `refreshable:"true"`
|
||||
DMLMinDeleteRatePerCollection ParamItem `refreshable:"true"`
|
||||
DMLMaxBulkLoadRatePerCollection ParamItem `refreshable:"true"`
|
||||
DMLMinBulkLoadRatePerCollection ParamItem `refreshable:"true"`
|
||||
DMLMaxInsertRatePerPartition ParamItem `refreshable:"true"`
|
||||
DMLMinInsertRatePerPartition ParamItem `refreshable:"true"`
|
||||
DMLMaxUpsertRatePerPartition ParamItem `refreshable:"true"`
|
||||
DMLMinUpsertRatePerPartition ParamItem `refreshable:"true"`
|
||||
DMLMaxDeleteRatePerPartition ParamItem `refreshable:"true"`
|
||||
DMLMinDeleteRatePerPartition ParamItem `refreshable:"true"`
|
||||
DMLMaxBulkLoadRatePerPartition ParamItem `refreshable:"true"`
|
||||
@ -200,6 +193,26 @@ seconds, (0 ~ 65536)`,
|
||||
}
|
||||
p.QuotaCenterCollectInterval.Init(base.mgr)
|
||||
|
||||
const defaultFactorChangeThreshold = "0.05"
|
||||
p.FactorChangeThreshold = ParamItem{
|
||||
Key: "quotaAndLimits.factorChangeThreshold",
|
||||
Version: "2.6.7",
|
||||
DefaultValue: defaultFactorChangeThreshold,
|
||||
Formatter: func(v string) string {
|
||||
threshold := getAsFloat(v)
|
||||
// (0, 1]
|
||||
if threshold <= 0 || threshold > 1 {
|
||||
return defaultFactorChangeThreshold
|
||||
}
|
||||
return v
|
||||
},
|
||||
Doc: `FactorChangeThreshold defines the minimum relative change in factor to trigger an update.
|
||||
If the factor change is less than this threshold (e.g., 5%), the update is skipped
|
||||
to reduce unnecessary proxy updates. Range: (0, 1]`,
|
||||
Export: true,
|
||||
}
|
||||
p.FactorChangeThreshold.Init(base.mgr)
|
||||
|
||||
p.ForceDenyAllDDL = ParamItem{
|
||||
Key: "quotaAndLimits.forceDenyAllDDL",
|
||||
Version: "2.5.8",
|
||||
@ -716,182 +729,6 @@ To use this setting, set quotaAndLimits.dml.enabled to true at the same time.`,
|
||||
}
|
||||
p.DMLMinInsertRatePerPartition.Init(base.mgr)
|
||||
|
||||
p.DMLMaxUpsertRate = ParamItem{
|
||||
Key: "quotaAndLimits.dml.upsertRate.max",
|
||||
Version: "2.3.0",
|
||||
DefaultValue: max,
|
||||
Formatter: func(v string) string {
|
||||
if !p.DMLLimitEnabled.GetAsBool() {
|
||||
return max
|
||||
}
|
||||
rate := getAsFloat(v)
|
||||
if math.Abs(rate-defaultMax) > 0.001 { // maxRate != defaultMax
|
||||
rate = megaBytes2Bytes(rate)
|
||||
}
|
||||
// [0, inf)
|
||||
if rate < 0 {
|
||||
return max
|
||||
}
|
||||
return fmt.Sprintf("%f", rate)
|
||||
},
|
||||
Doc: "MB/s, default no limit",
|
||||
Export: true,
|
||||
}
|
||||
p.DMLMaxUpsertRate.Init(base.mgr)
|
||||
|
||||
p.DMLMinUpsertRate = ParamItem{
|
||||
Key: "quotaAndLimits.dml.UpsertRate.min",
|
||||
Version: "2.3.0",
|
||||
DefaultValue: min,
|
||||
Formatter: func(v string) string {
|
||||
if !p.DMLLimitEnabled.GetAsBool() {
|
||||
return min
|
||||
}
|
||||
rate := megaBytes2Bytes(getAsFloat(v))
|
||||
// [0, inf)
|
||||
if rate < 0 {
|
||||
return min
|
||||
}
|
||||
if !p.checkMinMaxLegal(rate, p.DMLMaxUpsertRate.GetAsFloat()) {
|
||||
return min
|
||||
}
|
||||
return fmt.Sprintf("%f", rate)
|
||||
},
|
||||
}
|
||||
p.DMLMinUpsertRate.Init(base.mgr)
|
||||
|
||||
p.DMLMaxUpsertRatePerDB = ParamItem{
|
||||
Key: "quotaAndLimits.dml.upsertRate.db.max",
|
||||
Version: "2.4.1",
|
||||
DefaultValue: max,
|
||||
Formatter: func(v string) string {
|
||||
if !p.DMLLimitEnabled.GetAsBool() {
|
||||
return max
|
||||
}
|
||||
rate := getAsFloat(v)
|
||||
if math.Abs(rate-defaultMax) > 0.001 { // maxRate != defaultMax
|
||||
rate = megaBytes2Bytes(rate)
|
||||
}
|
||||
// [0, inf)
|
||||
if rate < 0 {
|
||||
return p.DMLMaxUpsertRate.GetValue()
|
||||
}
|
||||
return fmt.Sprintf("%f", rate)
|
||||
},
|
||||
Doc: "MB/s, default no limit",
|
||||
Export: true,
|
||||
}
|
||||
p.DMLMaxUpsertRatePerDB.Init(base.mgr)
|
||||
|
||||
p.DMLMinUpsertRatePerDB = ParamItem{
|
||||
Key: "quotaAndLimits.dml.upsertRate.db.min",
|
||||
Version: "2.4.1",
|
||||
DefaultValue: min,
|
||||
Formatter: func(v string) string {
|
||||
if !p.DMLLimitEnabled.GetAsBool() {
|
||||
return min
|
||||
}
|
||||
rate := megaBytes2Bytes(getAsFloat(v))
|
||||
// [0, inf)
|
||||
if rate < 0 {
|
||||
return min
|
||||
}
|
||||
if !p.checkMinMaxLegal(rate, p.DMLMaxUpsertRatePerDB.GetAsFloat()) {
|
||||
return min
|
||||
}
|
||||
return fmt.Sprintf("%f", rate)
|
||||
},
|
||||
}
|
||||
p.DMLMinUpsertRatePerDB.Init(base.mgr)
|
||||
|
||||
p.DMLMaxUpsertRatePerCollection = ParamItem{
|
||||
Key: "quotaAndLimits.dml.upsertRate.collection.max",
|
||||
Version: "2.3.0",
|
||||
DefaultValue: max,
|
||||
Formatter: func(v string) string {
|
||||
if !p.DMLLimitEnabled.GetAsBool() {
|
||||
return max
|
||||
}
|
||||
rate := getAsFloat(v)
|
||||
if math.Abs(rate-defaultMax) > 0.001 { // maxRate != defaultMax
|
||||
rate = megaBytes2Bytes(rate)
|
||||
}
|
||||
// [0, inf)
|
||||
if rate < 0 {
|
||||
return p.DMLMaxUpsertRate.GetValue()
|
||||
}
|
||||
return fmt.Sprintf("%f", rate)
|
||||
},
|
||||
Doc: "MB/s, default no limit",
|
||||
Export: true,
|
||||
}
|
||||
p.DMLMaxUpsertRatePerCollection.Init(base.mgr)
|
||||
|
||||
p.DMLMinUpsertRatePerCollection = ParamItem{
|
||||
Key: "quotaAndLimits.dml.upsertRate.collection.min",
|
||||
Version: "2.3.0",
|
||||
DefaultValue: min,
|
||||
Formatter: func(v string) string {
|
||||
if !p.DMLLimitEnabled.GetAsBool() {
|
||||
return min
|
||||
}
|
||||
rate := megaBytes2Bytes(getAsFloat(v))
|
||||
// [0, inf)
|
||||
if rate < 0 {
|
||||
return min
|
||||
}
|
||||
if !p.checkMinMaxLegal(rate, p.DMLMaxUpsertRatePerCollection.GetAsFloat()) {
|
||||
return min
|
||||
}
|
||||
return fmt.Sprintf("%f", rate)
|
||||
},
|
||||
}
|
||||
p.DMLMinUpsertRatePerCollection.Init(base.mgr)
|
||||
|
||||
p.DMLMaxUpsertRatePerPartition = ParamItem{
|
||||
Key: "quotaAndLimits.dml.upsertRate.partition.max",
|
||||
Version: "2.4.1",
|
||||
DefaultValue: max,
|
||||
Formatter: func(v string) string {
|
||||
if !p.DMLLimitEnabled.GetAsBool() {
|
||||
return max
|
||||
}
|
||||
rate := getAsFloat(v)
|
||||
if math.Abs(rate-defaultMax) > 0.001 { // maxRate != defaultMax
|
||||
rate = megaBytes2Bytes(rate)
|
||||
}
|
||||
// [0, inf)
|
||||
if rate < 0 {
|
||||
return p.DMLMaxUpsertRate.GetValue()
|
||||
}
|
||||
return fmt.Sprintf("%f", rate)
|
||||
},
|
||||
Doc: "MB/s, default no limit",
|
||||
Export: true,
|
||||
}
|
||||
p.DMLMaxUpsertRatePerPartition.Init(base.mgr)
|
||||
|
||||
p.DMLMinUpsertRatePerPartition = ParamItem{
|
||||
Key: "quotaAndLimits.dml.upsertRate.partition.min",
|
||||
Version: "2.4.1",
|
||||
DefaultValue: min,
|
||||
Formatter: func(v string) string {
|
||||
if !p.DMLLimitEnabled.GetAsBool() {
|
||||
return min
|
||||
}
|
||||
rate := megaBytes2Bytes(getAsFloat(v))
|
||||
// [0, inf)
|
||||
if rate < 0 {
|
||||
return min
|
||||
}
|
||||
if !p.checkMinMaxLegal(rate, p.DMLMaxUpsertRatePerPartition.GetAsFloat()) {
|
||||
return min
|
||||
}
|
||||
return fmt.Sprintf("%f", rate)
|
||||
},
|
||||
}
|
||||
p.DMLMinUpsertRatePerPartition.Init(base.mgr)
|
||||
|
||||
p.DMLMaxDeleteRate = ParamItem{
|
||||
Key: "quotaAndLimits.dml.deleteRate.max",
|
||||
Version: "2.2.0",
|
||||
|
||||
@ -70,8 +70,6 @@ func TestQuotaParam(t *testing.T) {
|
||||
params.Save(params.QuotaConfig.DMLLimitEnabled.Key, "true")
|
||||
params.Save(params.QuotaConfig.DMLMaxInsertRate.Key, "10")
|
||||
params.Save(params.QuotaConfig.DMLMinInsertRate.Key, "1")
|
||||
params.Save(params.QuotaConfig.DMLMaxUpsertRate.Key, "10")
|
||||
params.Save(params.QuotaConfig.DMLMinUpsertRate.Key, "1")
|
||||
params.Save(params.QuotaConfig.DMLMaxDeleteRate.Key, "10")
|
||||
params.Save(params.QuotaConfig.DMLMinDeleteRate.Key, "1")
|
||||
params.Save(params.QuotaConfig.DMLMaxBulkLoadRate.Key, "10")
|
||||
@ -79,8 +77,6 @@ func TestQuotaParam(t *testing.T) {
|
||||
assert.Equal(t, true, params.QuotaConfig.DMLLimitEnabled.GetAsBool())
|
||||
assert.Equal(t, float64(10)*1024*1024, params.QuotaConfig.DMLMaxInsertRate.GetAsFloat())
|
||||
assert.Equal(t, float64(1)*1024*1024, params.QuotaConfig.DMLMinInsertRate.GetAsFloat())
|
||||
assert.Equal(t, float64(10)*1024*1024, params.QuotaConfig.DMLMaxUpsertRate.GetAsFloat())
|
||||
assert.Equal(t, float64(1)*1024*1024, params.QuotaConfig.DMLMinUpsertRate.GetAsFloat())
|
||||
assert.Equal(t, float64(10)*1024*1024, params.QuotaConfig.DMLMaxDeleteRate.GetAsFloat())
|
||||
assert.Equal(t, float64(1)*1024*1024, params.QuotaConfig.DMLMinDeleteRate.GetAsFloat())
|
||||
assert.Equal(t, float64(10)*1024*1024, params.QuotaConfig.DMLMaxBulkLoadRate.GetAsFloat())
|
||||
@ -92,8 +88,6 @@ func TestQuotaParam(t *testing.T) {
|
||||
params.Save(params.QuotaConfig.DMLLimitEnabled.Key, "true")
|
||||
params.Save(params.QuotaConfig.DMLMaxInsertRatePerCollection.Key, "10")
|
||||
params.Save(params.QuotaConfig.DMLMinInsertRatePerCollection.Key, "1")
|
||||
params.Save(params.QuotaConfig.DMLMaxUpsertRatePerCollection.Key, "10")
|
||||
params.Save(params.QuotaConfig.DMLMinUpsertRatePerCollection.Key, "1")
|
||||
params.Save(params.QuotaConfig.DMLMaxDeleteRatePerCollection.Key, "10")
|
||||
params.Save(params.QuotaConfig.DMLMinDeleteRatePerCollection.Key, "1")
|
||||
params.Save(params.QuotaConfig.DMLMaxBulkLoadRatePerCollection.Key, "10")
|
||||
@ -101,8 +95,6 @@ func TestQuotaParam(t *testing.T) {
|
||||
assert.Equal(t, true, params.QuotaConfig.DMLLimitEnabled.GetAsBool())
|
||||
assert.Equal(t, float64(10)*1024*1024, params.QuotaConfig.DMLMaxInsertRatePerCollection.GetAsFloat())
|
||||
assert.Equal(t, float64(1)*1024*1024, params.QuotaConfig.DMLMinInsertRatePerCollection.GetAsFloat())
|
||||
assert.Equal(t, float64(10)*1024*1024, params.QuotaConfig.DMLMaxUpsertRatePerCollection.GetAsFloat())
|
||||
assert.Equal(t, float64(1)*1024*1024, params.QuotaConfig.DMLMinUpsertRatePerCollection.GetAsFloat())
|
||||
assert.Equal(t, float64(10)*1024*1024, params.QuotaConfig.DMLMaxDeleteRatePerCollection.GetAsFloat())
|
||||
assert.Equal(t, float64(1)*1024*1024, params.QuotaConfig.DMLMinDeleteRatePerCollection.GetAsFloat())
|
||||
assert.Equal(t, float64(10)*1024*1024, params.QuotaConfig.DMLMaxBulkLoadRatePerCollection.GetAsFloat())
|
||||
@ -111,16 +103,12 @@ func TestQuotaParam(t *testing.T) {
|
||||
// test only set global rate limit
|
||||
params.Save(params.QuotaConfig.DMLMaxInsertRatePerCollection.Key, "-1")
|
||||
params.Save(params.QuotaConfig.DMLMinInsertRatePerCollection.Key, "-1")
|
||||
params.Save(params.QuotaConfig.DMLMaxUpsertRatePerCollection.Key, "-1")
|
||||
params.Save(params.QuotaConfig.DMLMinUpsertRatePerCollection.Key, "-1")
|
||||
params.Save(params.QuotaConfig.DMLMaxDeleteRatePerCollection.Key, "-1")
|
||||
params.Save(params.QuotaConfig.DMLMinDeleteRatePerCollection.Key, "-1")
|
||||
params.Save(params.QuotaConfig.DMLMaxBulkLoadRatePerCollection.Key, "-1")
|
||||
params.Save(params.QuotaConfig.DMLMinBulkLoadRatePerCollection.Key, "-1")
|
||||
assert.Equal(t, params.QuotaConfig.DMLMaxInsertRate.GetAsFloat(), params.QuotaConfig.DMLMaxInsertRatePerCollection.GetAsFloat())
|
||||
assert.Equal(t, float64(0), params.QuotaConfig.DMLMinInsertRatePerCollection.GetAsFloat())
|
||||
assert.Equal(t, params.QuotaConfig.DMLMaxUpsertRate.GetAsFloat(), params.QuotaConfig.DMLMaxInsertRatePerCollection.GetAsFloat())
|
||||
assert.Equal(t, float64(0), params.QuotaConfig.DMLMinUpsertRatePerCollection.GetAsFloat())
|
||||
assert.Equal(t, params.QuotaConfig.DMLMaxDeleteRate.GetAsFloat(), params.QuotaConfig.DMLMaxDeleteRatePerCollection.GetAsFloat())
|
||||
assert.Equal(t, float64(0), params.QuotaConfig.DMLMinDeleteRatePerCollection.GetAsFloat())
|
||||
assert.Equal(t, params.QuotaConfig.DMLMaxBulkLoadRate.GetAsFloat(), params.QuotaConfig.DMLMaxBulkLoadRatePerCollection.GetAsFloat())
|
||||
@ -129,16 +117,12 @@ func TestQuotaParam(t *testing.T) {
|
||||
// test invalid config value
|
||||
params.Save(params.QuotaConfig.DMLMaxInsertRatePerCollection.Key, "1")
|
||||
params.Save(params.QuotaConfig.DMLMinInsertRatePerCollection.Key, "5")
|
||||
params.Save(params.QuotaConfig.DMLMaxUpsertRatePerCollection.Key, "1")
|
||||
params.Save(params.QuotaConfig.DMLMinUpsertRatePerCollection.Key, "5")
|
||||
params.Save(params.QuotaConfig.DMLMaxDeleteRatePerCollection.Key, "1")
|
||||
params.Save(params.QuotaConfig.DMLMinDeleteRatePerCollection.Key, "5")
|
||||
params.Save(params.QuotaConfig.DMLMaxBulkLoadRatePerCollection.Key, "1")
|
||||
params.Save(params.QuotaConfig.DMLMinBulkLoadRatePerCollection.Key, "5")
|
||||
assert.Equal(t, float64(1*1024*1024), params.QuotaConfig.DMLMaxInsertRatePerCollection.GetAsFloat())
|
||||
assert.Equal(t, float64(0), params.QuotaConfig.DMLMinInsertRatePerCollection.GetAsFloat())
|
||||
assert.Equal(t, float64(1*1024*1024), params.QuotaConfig.DMLMaxUpsertRatePerCollection.GetAsFloat())
|
||||
assert.Equal(t, float64(0), params.QuotaConfig.DMLMinUpsertRatePerCollection.GetAsFloat())
|
||||
assert.Equal(t, float64(1*1024*1024), params.QuotaConfig.DMLMaxDeleteRatePerCollection.GetAsFloat())
|
||||
assert.Equal(t, float64(0), params.QuotaConfig.DMLMinDeleteRatePerCollection.GetAsFloat())
|
||||
assert.Equal(t, float64(1*1024*1024), params.QuotaConfig.DMLMaxBulkLoadRatePerCollection.GetAsFloat())
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user