diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 8b227c9f1c..145cf1a045 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -1065,6 +1065,10 @@ quotaAndLimits: # collects metrics from Proxies, Query cluster and Data cluster. # seconds, (0 ~ 65536) quotaCenterCollectInterval: 3 + # FactorChangeThreshold defines the minimum relative change in factor to trigger an update. + # If the factor change is less than this threshold (e.g., 5%), the update is skipped + # to reduce unnecessary proxy updates. Range: (0, 1] + factorChangeThreshold: 0.05 forceDenyAllDDL: false # true to force deny all DDL requests, false to allow. limits: allocRetryTimes: 15 # retry times when delete alloc forward data from rate limit failed @@ -1137,14 +1141,6 @@ quotaAndLimits: max: -1 partition: max: -1 # MB/s, default no limit - upsertRate: - max: -1 # MB/s, default no limit - db: - max: -1 # MB/s, default no limit - collection: - max: -1 # MB/s, default no limit - partition: - max: -1 # MB/s, default no limit deleteRate: # Highest data deletion rate per second. # Setting this item to 0.1 indicates that Milvus only allows data deletion at the rate of 0.1 MB/s. diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 39ff416f5f..736fc57fa4 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -510,7 +510,7 @@ func (m *meta) GetQuotaInfo() *metricsinfo.DataCoordQuotaMetrics { coll, ok := m.collections.Get(segment.GetCollectionID()) if ok { - collIDStr := fmt.Sprint(segment.GetCollectionID()) + collIDStr := strconv.FormatInt(segment.GetCollectionID(), 10) coll2DbName[collIDStr] = coll.DatabaseName if _, ok := storedBinlogSize[collIDStr]; !ok { storedBinlogSize[collIDStr] = make(map[string]int64) @@ -551,7 +551,7 @@ func (m *meta) GetQuotaInfo() *metricsinfo.DataCoordQuotaMetrics { coll, ok := m.collections.Get(collectionID) if ok { for state, rows := range statesRows { - metrics.DataCoordNumStoredRows.WithLabelValues(coll.DatabaseName, fmt.Sprint(collectionID), coll.Schema.GetName(), state.String()).Set(float64(rows)) + metrics.DataCoordNumStoredRows.WithLabelValues(coll.DatabaseName, strconv.FormatInt(collectionID, 10), coll.Schema.GetName(), state.String()).Set(float64(rows)) } } } @@ -560,7 +560,7 @@ func (m *meta) GetQuotaInfo() *metricsinfo.DataCoordQuotaMetrics { for collectionID, entriesNum := range collectionL0RowCounts { coll, ok := m.collections.Get(collectionID) if ok { - metrics.DataCoordL0DeleteEntriesNum.WithLabelValues(coll.DatabaseName, fmt.Sprint(collectionID)).Set(float64(entriesNum)) + metrics.DataCoordL0DeleteEntriesNum.WithLabelValues(coll.DatabaseName, strconv.FormatInt(collectionID, 10)).Set(float64(entriesNum)) } } diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index bf02ab9391..1b09f377c4 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -2880,7 +2880,7 @@ func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest) metrics.ProxyReportValue.WithLabelValues(nodeID, hookutil.OpTypeUpsert, dbName, username).Add(float64(v)) } - rateCol.Add(internalpb.RateType_DMLUpsert.String(), float64(it.upsertMsg.InsertMsg.Size()+it.upsertMsg.DeleteMsg.Size())) + rateCol.Add(internalpb.RateType_DMLInsert.String(), float64(it.upsertMsg.InsertMsg.Size()+it.upsertMsg.DeleteMsg.Size())) if merr.Ok(it.result.GetStatus()) { metrics.ProxyReportValue.WithLabelValues(nodeID, hookutil.OpTypeUpsert, dbName, username).Add(float64(v)) } diff --git a/internal/proxy/metrics_info.go b/internal/proxy/metrics_info.go index 7a2a6685df..2306019961 100644 --- a/internal/proxy/metrics_info.go +++ b/internal/proxy/metrics_info.go @@ -67,7 +67,6 @@ func getQuotaMetrics(node *Proxy) (*metricsinfo.ProxyQuotaMetrics, error) { } } getRateMetric(internalpb.RateType_DMLInsert.String()) - getRateMetric(internalpb.RateType_DMLUpsert.String()) getRateMetric(internalpb.RateType_DMLDelete.String()) getRateMetric(internalpb.RateType_DQLSearch.String()) getSubLabelRateMetric(internalpb.RateType_DQLSearch.String()) diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index e3a00774ea..ffbc80c5dc 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -180,7 +180,6 @@ func (node *Proxy) initRateCollector() error { return err } rateCol.Register(internalpb.RateType_DMLInsert.String()) - rateCol.Register(internalpb.RateType_DMLUpsert.String()) rateCol.Register(internalpb.RateType_DMLDelete.String()) // TODO: add bulkLoad rate rateCol.Register(internalpb.RateType_DQLSearch.String()) diff --git a/internal/proxy/rate_limit_interceptor_test.go b/internal/proxy/rate_limit_interceptor_test.go index 9658ffec79..435696071c 100644 --- a/internal/proxy/rate_limit_interceptor_test.go +++ b/internal/proxy/rate_limit_interceptor_test.go @@ -271,7 +271,7 @@ func TestRateLimitInterceptor(t *testing.T) { } testGetFailedResponse(&milvuspb.DeleteRequest{}, internalpb.RateType_DMLDelete, merr.ErrServiceQuotaExceeded, "delete") - testGetFailedResponse(&milvuspb.UpsertRequest{}, internalpb.RateType_DMLUpsert, merr.ErrServiceQuotaExceeded, "upsert") + testGetFailedResponse(&milvuspb.UpsertRequest{}, internalpb.RateType_DMLInsert, merr.ErrServiceQuotaExceeded, "upsert") testGetFailedResponse(&milvuspb.ImportRequest{}, internalpb.RateType_DMLBulkLoad, merr.ErrServiceMemoryLimitExceeded, "import") testGetFailedResponse(&milvuspb.SearchRequest{}, internalpb.RateType_DQLSearch, merr.ErrServiceDiskLimitExceeded, "search") testGetFailedResponse(&milvuspb.QueryRequest{}, internalpb.RateType_DQLQuery, merr.ErrServiceQuotaExceeded, "query") diff --git a/internal/rootcoord/quota_center.go b/internal/rootcoord/quota_center.go index 04ed28bb5e..c717afd622 100644 --- a/internal/rootcoord/quota_center.go +++ b/internal/rootcoord/quota_center.go @@ -101,7 +101,6 @@ var ddlRateTypes = typeutil.NewSet( var dmlRateTypes = typeutil.NewSet( internalpb.RateType_DMLInsert, - internalpb.RateType_DMLUpsert, internalpb.RateType_DMLDelete, internalpb.RateType_DMLBulkLoad, ) @@ -164,12 +163,19 @@ type QuotaCenter struct { // TODO many metrics information only have collection id currently, it can be removed after db id add into all metrics. collectionIDToDBID *typeutil.ConcurrentMap[int64, int64] // collection id -> db id + collectionProps map[int64]map[string]string // collection id -> collection properties + rateLimiter *rlinternal.RateLimiterTree tsoAllocator tso.Allocator rateAllocateStrategy RateAllocateStrategy + // Cache previous collection rates (baseLimit * factor) to avoid unnecessary updates + // If rate change is less than FactorChangeThreshold, skip SetLimit to reduce proxy updates + // Key format: "collectionID-rateType" + prevRates map[string]float64 + stopOnce sync.Once stopChan chan struct{} wg sync.WaitGroup @@ -190,8 +196,10 @@ func NewQuotaCenter(proxies proxyutil.ProxyClientManagerInterface, mixCoord type meta: meta, readableCollections: make(map[int64]map[int64][]int64, 0), writableCollections: make(map[int64]map[int64][]int64, 0), + collectionProps: make(map[int64]map[string]string), rateLimiter: rlinternal.NewRateLimiterTree(initInfLimiter(internalpb.RateScope_Cluster, allOps)), rateAllocateStrategy: DefaultRateAllocateStrategy, + prevRates: make(map[string]float64), stopChan: make(chan struct{}), } q.clearMetrics() @@ -373,6 +381,7 @@ func (q *QuotaCenter) clearMetrics() { q.collectionIDToDBID = typeutil.NewConcurrentMap[int64, int64]() q.collections = typeutil.NewConcurrentMap[string, int64]() q.dbs = typeutil.NewConcurrentMap[string, int64]() + q.collectionProps = make(map[int64]map[string]string) } func updateNumEntitiesLoaded(current map[int64]int64, qn *metricsinfo.QueryNodeCollectionMetrics) map[int64]int64 { @@ -860,8 +869,8 @@ func (q *QuotaCenter) calculateWriteRates() error { collectionFactors := make(map[int64]float64) updateCollectionFactor := func(factors map[int64]float64) { for collection, factor := range factors { - _, ok := collectionFactors[collection] - if !ok || collectionFactors[collection] > factor { + currentFactor, ok := collectionFactors[collection] + if !ok || currentFactor > factor { collectionFactors[collection] = factor } } @@ -883,8 +892,11 @@ func (q *QuotaCenter) calculateWriteRates() error { ttCollections := make([]int64, 0) memoryCollections := make([]int64, 0) + // Get factorChangeThreshold + factorChangeThreshold := Params.QuotaConfig.FactorChangeThreshold.GetAsFloat() + for collection, factor := range collectionFactors { - metrics.RootCoordRateLimitRatio.WithLabelValues(fmt.Sprint(collection)).Set(1 - factor) + metrics.RootCoordRateLimitRatio.WithLabelValues(strconv.FormatInt(collection, 10)).Set(1 - factor) if factor <= 0 { if _, ok := ttFactors[collection]; ok && factor == ttFactors[collection] { // factor comes from ttFactor @@ -907,22 +919,32 @@ func (q *QuotaCenter) calculateWriteRates() error { limiter := collectionLimiter.GetLimiters() for _, rt := range []internalpb.RateType{ internalpb.RateType_DMLInsert, - internalpb.RateType_DMLUpsert, internalpb.RateType_DMLDelete, } { v, ok := limiter.Get(rt) - if ok { - if v.Limit() != Inf { - v.SetLimit(v.Limit() * Limit(factor)) + if !ok || v.Limit() == Inf { + continue + } + + // Check if rate change is significant enough to trigger an update + // Calculate newRate = baseLimit * factor for each rate type and compare with previous rate + // If the rate change is less than FactorChangeThreshold, skip SetLimit to reduce proxy updates + newRate := float64(v.Limit() * Limit(factor)) + rateKey := strconv.FormatInt(collection, 10) + "-" + strconv.FormatInt(int64(rt), 10) + prevRate, ok := q.prevRates[rateKey] + if ok && prevRate > 0 { + relativeChange := math.Abs(newRate-prevRate) / prevRate + if relativeChange < factorChangeThreshold { + continue } } + v.SetLimit(Limit(newRate)) + q.prevRates[rateKey] = newRate } collectionProps := q.getCollectionLimitProperties(collection) q.guaranteeMinRate(getCollectionRateLimitConfig(collectionProps, common.CollectionInsertRateMinKey), internalpb.RateType_DMLInsert, collectionLimiter) - q.guaranteeMinRate(getCollectionRateLimitConfig(collectionProps, common.CollectionUpsertRateMinKey), - internalpb.RateType_DMLUpsert, collectionLimiter) q.guaranteeMinRate(getCollectionRateLimitConfig(collectionProps, common.CollectionDeleteRateMinKey), internalpb.RateType_DMLDelete, collectionLimiter) if factor < 1.0 { @@ -963,8 +985,8 @@ func (q *QuotaCenter) getTimeTickDelayFactor(ts Timestamp) map[int64]float64 { collectionsMaxDelay := make(map[int64]time.Duration) updateCollectionDelay := func(delay time.Duration, collections []int64) { for _, collection := range collections { - _, ok := collectionsMaxDelay[collection] - if !ok || collectionsMaxDelay[collection] < delay { + currentDelay, ok := collectionsMaxDelay[collection] + if !ok || currentDelay < delay { collectionsMaxDelay[collection] = delay } } @@ -1053,8 +1075,8 @@ func (q *QuotaCenter) getMemoryFactor() map[int64]float64 { collectionFactor := make(map[int64]float64) updateCollectionFactor := func(factor float64, collections []int64) { for _, collection := range collections { - _, ok := collectionFactor[collection] - if !ok || collectionFactor[collection] > factor { + currentFactor, ok := collectionFactor[collection] + if !ok || currentFactor > factor { collectionFactor[collection] = factor } } @@ -1130,8 +1152,8 @@ func (q *QuotaCenter) getGrowingSegmentsSizeFactor() map[int64]float64 { collectionFactor := make(map[int64]float64) updateCollectionFactor := func(factor float64, collections []int64) { for _, collection := range collections { - _, ok := collectionFactor[collection] - if !ok || collectionFactor[collection] > factor { + currentFactor, ok := collectionFactor[collection] + if !ok || currentFactor > factor { collectionFactor[collection] = factor } } @@ -1285,15 +1307,22 @@ func (q *QuotaCenter) resetAllCurrentRates() error { } } + collectionRateTypes := getRateTypes(internalpb.RateScope_Collection, allOps) initLimiters := func(sourceCollections map[int64]map[int64][]int64) { for dbID, collections := range sourceCollections { for collectionID, partitionIDs := range collections { - getCollectionLimitVal := func(rateType internalpb.RateType) Limit { - limitVal, err := q.getCollectionMaxLimit(rateType, collectionID) + collectionLimitVals := make(map[internalpb.RateType]Limit, collectionRateTypes.Len()) + collectionRateTypes.Range(func(rt internalpb.RateType) bool { + limitVal, err := q.getCollectionMaxLimit(rt, collectionID) if err != nil { - return Limit(quota.GetQuotaValue(internalpb.RateScope_Collection, rateType, Params)) + limitVal = Limit(quota.GetQuotaValue(internalpb.RateScope_Collection, rt, Params)) } - return limitVal + collectionLimitVals[rt] = limitVal + return true + }) + + getCollectionLimitVal := func(rateType internalpb.RateType) Limit { + return collectionLimitVals[rateType] } q.rateLimiter.GetOrCreateCollectionLimiters(dbID, collectionID, newParamLimiterFunc(internalpb.RateScope_Database, allOps), @@ -1325,8 +1354,6 @@ func (q *QuotaCenter) getCollectionMaxLimit(rt internalpb.RateType, collectionID switch rt { case internalpb.RateType_DMLInsert: return Limit(getCollectionRateLimitConfig(collectionProps, common.CollectionInsertRateMaxKey)), nil - case internalpb.RateType_DMLUpsert: - return Limit(getCollectionRateLimitConfig(collectionProps, common.CollectionUpsertRateMaxKey)), nil case internalpb.RateType_DMLDelete: return Limit(getCollectionRateLimitConfig(collectionProps, common.CollectionDeleteRateMaxKey)), nil case internalpb.RateType_DMLBulkLoad: @@ -1342,6 +1369,11 @@ func (q *QuotaCenter) getCollectionMaxLimit(rt internalpb.RateType, collectionID func (q *QuotaCenter) getCollectionLimitProperties(collection int64) map[string]string { log := log.Ctx(context.Background()).WithRateGroup("rootcoord.QuotaCenter", 1.0, 60.0) + + if props, ok := q.collectionProps[collection]; ok { + return props + } + collectionInfo, err := q.meta.GetCollectionByIDWithMaxTs(context.TODO(), collection) if err != nil { log.RatedWarn(10, "failed to get rate limit properties from collection meta", @@ -1355,6 +1387,8 @@ func (q *QuotaCenter) getCollectionLimitProperties(collection int64) map[string] properties[pair.GetKey()] = pair.GetValue() } + q.collectionProps[collection] = properties + return properties } @@ -1600,31 +1634,28 @@ func (q *QuotaCenter) recordMetrics() { return true }) - record := func(errorCode commonpb.ErrorCode) { - rlinternal.TraverseRateLimiterTree(q.rateLimiter.GetRootLimiters(), nil, - func(node *rlinternal.RateLimiterNode, state milvuspb.QuotaState, errCode commonpb.ErrorCode) bool { - if errCode == errorCode { - var name string - switch node.Level() { - case internalpb.RateScope_Cluster: - name = "cluster" - case internalpb.RateScope_Database: - name = "db_" + dbIDs[node.GetID()] - case internalpb.RateScope_Collection: - name = "collection_" + collectionIDs[node.GetID()] - default: - return false - } - metrics.RootCoordQuotaStates.WithLabelValues(errorCode.String(), name).Set(1.0) - metrics.RootCoordForceDenyWritingCounter.Inc() + rlinternal.TraverseRateLimiterTree(q.rateLimiter.GetRootLimiters(), nil, + func(node *rlinternal.RateLimiterNode, state milvuspb.QuotaState, errCode commonpb.ErrorCode) bool { + if errCode == commonpb.ErrorCode_MemoryQuotaExhausted || + errCode == commonpb.ErrorCode_DiskQuotaExhausted || + errCode == commonpb.ErrorCode_TimeTickLongDelay { + var name string + switch node.Level() { + case internalpb.RateScope_Cluster: + name = "cluster" + case internalpb.RateScope_Database: + name = "db_" + dbIDs[node.GetID()] + case internalpb.RateScope_Collection: + name = "collection_" + collectionIDs[node.GetID()] + default: return false } - return true - }) - } - record(commonpb.ErrorCode_MemoryQuotaExhausted) - record(commonpb.ErrorCode_DiskQuotaExhausted) - record(commonpb.ErrorCode_TimeTickLongDelay) + metrics.RootCoordQuotaStates.WithLabelValues(errCode.String(), name).Set(1.0) + metrics.RootCoordForceDenyWritingCounter.Inc() + return false + } + return true + }) } func (q *QuotaCenter) diskAllowance(collection UniqueID) float64 { diff --git a/internal/rootcoord/quota_center_test.go b/internal/rootcoord/quota_center_test.go index ccfc299352..8faabcde17 100644 --- a/internal/rootcoord/quota_center_test.go +++ b/internal/rootcoord/quota_center_test.go @@ -354,7 +354,6 @@ func TestQuotaCenter(t *testing.T) { for _, rt := range []internalpb.RateType{ internalpb.RateType_DMLInsert, - internalpb.RateType_DMLUpsert, internalpb.RateType_DMLDelete, internalpb.RateType_DMLBulkLoad, } { @@ -372,7 +371,6 @@ func TestQuotaCenter(t *testing.T) { assert.NotNil(t, limiters) for _, rt := range []internalpb.RateType{ internalpb.RateType_DMLInsert, - internalpb.RateType_DMLUpsert, internalpb.RateType_DMLDelete, internalpb.RateType_DMLBulkLoad, } { @@ -539,8 +537,6 @@ func TestQuotaCenter(t *testing.T) { paramtable.Get().Save(Params.QuotaConfig.MaxTimeTickDelay.Key, "10.0") paramtable.Get().Save(Params.QuotaConfig.DMLMaxInsertRatePerCollection.Key, "100.0") paramtable.Get().Save(Params.QuotaConfig.DMLMinInsertRatePerCollection.Key, "0.0") - paramtable.Get().Save(Params.QuotaConfig.DMLMaxUpsertRatePerCollection.Key, "100.0") - paramtable.Get().Save(Params.QuotaConfig.DMLMinUpsertRatePerCollection.Key, "0.0") paramtable.Get().Save(Params.QuotaConfig.DMLMaxDeleteRatePerCollection.Key, "100.0") paramtable.Get().Save(Params.QuotaConfig.DMLMinDeleteRatePerCollection.Key, "0.0") forceBak := Params.QuotaConfig.ForceDenyWriting.GetValue() @@ -687,8 +683,6 @@ func TestQuotaCenter(t *testing.T) { limiters := quotaCenter.rateLimiter.GetRootLimiters().GetLimiters() a, _ := limiters.Get(internalpb.RateType_DMLInsert) assert.Equal(t, Limit(0), a.Limit()) - b, _ := limiters.Get(internalpb.RateType_DMLUpsert) - assert.Equal(t, Limit(0), b.Limit()) c, _ := limiters.Get(internalpb.RateType_DMLDelete) assert.Equal(t, Limit(0), c.Limit()) @@ -718,8 +712,6 @@ func TestQuotaCenter(t *testing.T) { limiters = rln.GetLimiters() a, _ = limiters.Get(internalpb.RateType_DMLInsert) assert.NotEqual(t, Limit(0), a.Limit()) - b, _ = limiters.Get(internalpb.RateType_DMLUpsert) - assert.NotEqual(t, Limit(0), b.Limit()) c, _ = limiters.Get(internalpb.RateType_DMLDelete) assert.NotEqual(t, Limit(0), c.Limit()) @@ -727,8 +719,6 @@ func TestQuotaCenter(t *testing.T) { limiters = rln.GetLimiters() a, _ = limiters.Get(internalpb.RateType_DMLInsert) assert.Equal(t, Limit(0), a.Limit()) - b, _ = limiters.Get(internalpb.RateType_DMLUpsert) - assert.Equal(t, Limit(0), b.Limit()) c, _ = limiters.Get(internalpb.RateType_DMLDelete) assert.Equal(t, Limit(0), c.Limit()) @@ -891,15 +881,11 @@ func TestQuotaCenter(t *testing.T) { if lo.Contains(notEquals, collection) { a, _ := limiters.Get(internalpb.RateType_DMLInsert) assert.NotEqual(t, Limit(0), a.Limit()) - b, _ := limiters.Get(internalpb.RateType_DMLUpsert) - assert.NotEqual(t, Limit(0), b.Limit()) c, _ := limiters.Get(internalpb.RateType_DMLDelete) assert.NotEqual(t, Limit(0), c.Limit()) } else { a, _ := limiters.Get(internalpb.RateType_DMLInsert) assert.Equal(t, Limit(0), a.Limit()) - b, _ := limiters.Get(internalpb.RateType_DMLUpsert) - assert.Equal(t, Limit(0), b.Limit()) c, _ := limiters.Get(internalpb.RateType_DMLDelete) assert.NotEqual(t, Limit(0), c.Limit()) } @@ -911,8 +897,6 @@ func TestQuotaCenter(t *testing.T) { root := quotaCenter.rateLimiter.GetRootLimiters().GetLimiters() a, _ := root.Get(internalpb.RateType_DMLInsert) assert.Equal(t, Limit(0), a.Limit()) - b, _ := root.Get(internalpb.RateType_DMLUpsert) - assert.Equal(t, Limit(0), b.Limit()) c, _ := root.Get(internalpb.RateType_DMLDelete) assert.NotEqual(t, Limit(0), c.Limit()) } @@ -1091,7 +1075,6 @@ func TestQuotaCenter(t *testing.T) { } assert.Equal(t, getRate(limiters, internalpb.RateType_DMLInsert), Params.QuotaConfig.DMLMaxInsertRatePerCollection.GetAsFloat()) - assert.Equal(t, getRate(limiters, internalpb.RateType_DMLUpsert), Params.QuotaConfig.DMLMaxUpsertRatePerCollection.GetAsFloat()) assert.Equal(t, getRate(limiters, internalpb.RateType_DMLDelete), Params.QuotaConfig.DMLMaxDeleteRatePerCollection.GetAsFloat()) assert.Equal(t, getRate(limiters, internalpb.RateType_DMLBulkLoad), Params.QuotaConfig.DMLMaxBulkLoadRatePerCollection.GetAsFloat()) assert.Equal(t, getRate(limiters, internalpb.RateType_DQLSearch), Params.QuotaConfig.DQLMaxSearchRatePerCollection.GetAsFloat()) @@ -1125,10 +1108,6 @@ func TestQuotaCenter(t *testing.T) { Key: common.CollectionSearchRateMaxKey, Value: "5", }, - { - Key: common.CollectionUpsertRateMaxKey, - Value: "6", - }, }, }, nil) quotaCenter.resetAllCurrentRates() @@ -1138,7 +1117,6 @@ func TestQuotaCenter(t *testing.T) { assert.Equal(t, getRate(limiters, internalpb.RateType_DMLBulkLoad), float64(3*1024*1024)) assert.Equal(t, getRate(limiters, internalpb.RateType_DQLQuery), float64(4)) assert.Equal(t, getRate(limiters, internalpb.RateType_DQLSearch), float64(5)) - assert.Equal(t, getRate(limiters, internalpb.RateType_DMLUpsert), float64(6*1024*1024)) }) } @@ -1579,7 +1557,7 @@ func TestGetRateType(t *testing.T) { t.Run("ddl cluster scope", func(t *testing.T) { a := getRateTypes(internalpb.RateScope_Cluster, allOps) - assert.Equal(t, 12, a.Len()) + assert.Equal(t, 11, a.Len()) }) } diff --git a/internal/rootcoord/util.go b/internal/rootcoord/util.go index b340ffad33..f63eaaf75e 100644 --- a/internal/rootcoord/util.go +++ b/internal/rootcoord/util.go @@ -126,10 +126,6 @@ func getCollectionRateLimitConfigDefaultValue(configKey string) float64 { return Params.QuotaConfig.DMLMaxInsertRatePerCollection.GetAsFloat() case common.CollectionInsertRateMinKey: return Params.QuotaConfig.DMLMinInsertRatePerCollection.GetAsFloat() - case common.CollectionUpsertRateMaxKey: - return Params.QuotaConfig.DMLMaxUpsertRatePerCollection.GetAsFloat() - case common.CollectionUpsertRateMinKey: - return Params.QuotaConfig.DMLMinUpsertRatePerCollection.GetAsFloat() case common.CollectionDeleteRateMaxKey: return Params.QuotaConfig.DMLMaxDeleteRatePerCollection.GetAsFloat() case common.CollectionDeleteRateMinKey: @@ -167,10 +163,6 @@ func getRateLimitConfig(properties map[string]string, configKey string, configVa return megaBytes2Bytes(rate) case common.CollectionInsertRateMinKey: return megaBytes2Bytes(rate) - case common.CollectionUpsertRateMaxKey: - return megaBytes2Bytes(rate) - case common.CollectionUpsertRateMinKey: - return megaBytes2Bytes(rate) case common.CollectionDeleteRateMaxKey: return megaBytes2Bytes(rate) case common.CollectionDeleteRateMinKey: diff --git a/internal/util/quota/quota_constant.go b/internal/util/quota/quota_constant.go index 64394a8e81..14707e648d 100644 --- a/internal/util/quota/quota_constant.go +++ b/internal/util/quota/quota_constant.go @@ -47,7 +47,6 @@ func initLimitConfigMaps() { internalpb.RateType_DDLFlush: "aConfig.MaxFlushRate, internalpb.RateType_DDLCompaction: "aConfig.MaxCompactionRate, internalpb.RateType_DMLInsert: "aConfig.DMLMaxInsertRate, - internalpb.RateType_DMLUpsert: "aConfig.DMLMaxUpsertRate, internalpb.RateType_DMLDelete: "aConfig.DMLMaxDeleteRate, internalpb.RateType_DMLBulkLoad: "aConfig.DMLMaxBulkLoadRate, internalpb.RateType_DQLSearch: "aConfig.DQLMaxSearchRate, @@ -61,7 +60,6 @@ func initLimitConfigMaps() { internalpb.RateType_DDLFlush: "aConfig.MaxFlushRatePerDB, internalpb.RateType_DDLCompaction: "aConfig.MaxCompactionRatePerDB, internalpb.RateType_DMLInsert: "aConfig.DMLMaxInsertRatePerDB, - internalpb.RateType_DMLUpsert: "aConfig.DMLMaxUpsertRatePerDB, internalpb.RateType_DMLDelete: "aConfig.DMLMaxDeleteRatePerDB, internalpb.RateType_DMLBulkLoad: "aConfig.DMLMaxBulkLoadRatePerDB, internalpb.RateType_DQLSearch: "aConfig.DQLMaxSearchRatePerDB, @@ -69,7 +67,6 @@ func initLimitConfigMaps() { }, internalpb.RateScope_Collection: { internalpb.RateType_DMLInsert: "aConfig.DMLMaxInsertRatePerCollection, - internalpb.RateType_DMLUpsert: "aConfig.DMLMaxUpsertRatePerCollection, internalpb.RateType_DMLDelete: "aConfig.DMLMaxDeleteRatePerCollection, internalpb.RateType_DMLBulkLoad: "aConfig.DMLMaxBulkLoadRatePerCollection, internalpb.RateType_DQLSearch: "aConfig.DQLMaxSearchRatePerCollection, @@ -78,7 +75,6 @@ func initLimitConfigMaps() { }, internalpb.RateScope_Partition: { internalpb.RateType_DMLInsert: "aConfig.DMLMaxInsertRatePerPartition, - internalpb.RateType_DMLUpsert: "aConfig.DMLMaxUpsertRatePerPartition, internalpb.RateType_DMLDelete: "aConfig.DMLMaxDeleteRatePerPartition, internalpb.RateType_DMLBulkLoad: "aConfig.DMLMaxBulkLoadRatePerPartition, internalpb.RateType_DQLSearch: "aConfig.DQLMaxSearchRatePerPartition, diff --git a/internal/util/quota/quota_constant_test.go b/internal/util/quota/quota_constant_test.go index aa2cd1a6c3..27dd4fd421 100644 --- a/internal/util/quota/quota_constant_test.go +++ b/internal/util/quota/quota_constant_test.go @@ -32,19 +32,19 @@ func TestGetQuotaConfigMap(t *testing.T) { paramtable.Init() { m := GetQuotaConfigMap(internalpb.RateScope_Cluster) - assert.Equal(t, 12, len(m)) - } - { - m := GetQuotaConfigMap(internalpb.RateScope_Database) assert.Equal(t, 11, len(m)) } + { + m := GetQuotaConfigMap(internalpb.RateScope_Database) + assert.Equal(t, 10, len(m)) + } { m := GetQuotaConfigMap(internalpb.RateScope_Collection) - assert.Equal(t, 7, len(m)) + assert.Equal(t, 6, len(m)) } { m := GetQuotaConfigMap(internalpb.RateScope_Partition) - assert.Equal(t, 6, len(m)) + assert.Equal(t, 5, len(m)) } { m := GetQuotaConfigMap(internalpb.RateScope(1000)) diff --git a/internal/util/ratelimitutil/rate_limiter_tree.go b/internal/util/ratelimitutil/rate_limiter_tree.go index a34e18c0b6..1066da98e3 100644 --- a/internal/util/ratelimitutil/rate_limiter_tree.go +++ b/internal/util/ratelimitutil/rate_limiter_tree.go @@ -89,7 +89,7 @@ func (rln *RateLimiterNode) Check(rt internalpb.RateType, n int) error { func (rln *RateLimiterNode) GetQuotaExceededError(rt internalpb.RateType) error { switch rt { - case internalpb.RateType_DMLInsert, internalpb.RateType_DMLUpsert, internalpb.RateType_DMLDelete, internalpb.RateType_DMLBulkLoad: + case internalpb.RateType_DMLInsert, internalpb.RateType_DMLDelete, internalpb.RateType_DMLBulkLoad: if errCode, ok := rln.quotaStates.Get(milvuspb.QuotaState_DenyToWrite); ok { return merr.WrapErrServiceQuotaExceeded(ratelimitutil.GetQuotaErrorString(errCode)) } diff --git a/pkg/common/common.go b/pkg/common/common.go index 443439775a..fc566a2216 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -201,8 +201,6 @@ const ( // rate limit CollectionInsertRateMaxKey = "collection.insertRate.max.mb" CollectionInsertRateMinKey = "collection.insertRate.min.mb" - CollectionUpsertRateMaxKey = "collection.upsertRate.max.mb" - CollectionUpsertRateMinKey = "collection.upsertRate.min.mb" CollectionDeleteRateMaxKey = "collection.deleteRate.max.mb" CollectionDeleteRateMinKey = "collection.deleteRate.min.mb" CollectionBulkLoadRateMaxKey = "collection.bulkLoadRate.max.mb" diff --git a/pkg/proto/internal.proto b/pkg/proto/internal.proto index 168cfecd8b..4a1e598b36 100644 --- a/pkg/proto/internal.proto +++ b/pkg/proto/internal.proto @@ -310,7 +310,7 @@ enum RateType { DMLBulkLoad = 7; DQLSearch = 8; DQLQuery = 9; - DMLUpsert = 10; + DMLUpsert = 10 [deprecated = true]; // UpsertRequest uses DMLInsert for rate limiting DDLDB = 11; } diff --git a/pkg/proto/internalpb/internal.pb.go b/pkg/proto/internalpb/internal.pb.go index 3f406b38f4..cb91c8dcfc 100644 --- a/pkg/proto/internalpb/internal.pb.go +++ b/pkg/proto/internalpb/internal.pb.go @@ -88,8 +88,9 @@ const ( RateType_DMLBulkLoad RateType = 7 RateType_DQLSearch RateType = 8 RateType_DQLQuery RateType = 9 - RateType_DMLUpsert RateType = 10 - RateType_DDLDB RateType = 11 + // Deprecated: Marked as deprecated in internal.proto. + RateType_DMLUpsert RateType = 10 // UpsertRequest uses DMLInsert for rate limiting + RateType_DDLDB RateType = 11 ) // Enum value maps for RateType. @@ -5003,7 +5004,7 @@ var file_internal_proto_rawDesc = []byte{ 0x63, 0x6f, 0x70, 0x65, 0x12, 0x0b, 0x0a, 0x07, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x10, 0x00, 0x12, 0x0c, 0x0a, 0x08, 0x44, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x10, 0x01, 0x12, 0x0e, 0x0a, 0x0a, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x10, 0x02, 0x12, - 0x0d, 0x0a, 0x09, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x10, 0x03, 0x2a, 0xc4, + 0x0d, 0x0a, 0x09, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x10, 0x03, 0x2a, 0xc8, 0x01, 0x0a, 0x08, 0x52, 0x61, 0x74, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x11, 0x0a, 0x0d, 0x44, 0x44, 0x4c, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x10, 0x00, 0x12, 0x10, 0x0a, 0x0c, 0x44, 0x44, 0x4c, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x10, 0x01, @@ -5014,21 +5015,21 @@ var file_internal_proto_rawDesc = []byte{ 0x0a, 0x09, 0x44, 0x4d, 0x4c, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x10, 0x06, 0x12, 0x0f, 0x0a, 0x0b, 0x44, 0x4d, 0x4c, 0x42, 0x75, 0x6c, 0x6b, 0x4c, 0x6f, 0x61, 0x64, 0x10, 0x07, 0x12, 0x0d, 0x0a, 0x09, 0x44, 0x51, 0x4c, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x10, 0x08, 0x12, 0x0c, 0x0a, - 0x08, 0x44, 0x51, 0x4c, 0x51, 0x75, 0x65, 0x72, 0x79, 0x10, 0x09, 0x12, 0x0d, 0x0a, 0x09, 0x44, - 0x4d, 0x4c, 0x55, 0x70, 0x73, 0x65, 0x72, 0x74, 0x10, 0x0a, 0x12, 0x09, 0x0a, 0x05, 0x44, 0x44, - 0x4c, 0x44, 0x42, 0x10, 0x0b, 0x2a, 0x83, 0x01, 0x0a, 0x0e, 0x49, 0x6d, 0x70, 0x6f, 0x72, 0x74, - 0x4a, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x08, 0x0a, 0x04, 0x4e, 0x6f, 0x6e, 0x65, - 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x10, 0x01, 0x12, - 0x10, 0x0a, 0x0c, 0x50, 0x72, 0x65, 0x49, 0x6d, 0x70, 0x6f, 0x72, 0x74, 0x69, 0x6e, 0x67, 0x10, - 0x02, 0x12, 0x0d, 0x0a, 0x09, 0x49, 0x6d, 0x70, 0x6f, 0x72, 0x74, 0x69, 0x6e, 0x67, 0x10, 0x03, - 0x12, 0x0a, 0x0a, 0x06, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x10, 0x04, 0x12, 0x0d, 0x0a, 0x09, - 0x43, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x10, 0x05, 0x12, 0x11, 0x0a, 0x0d, 0x49, - 0x6e, 0x64, 0x65, 0x78, 0x42, 0x75, 0x69, 0x6c, 0x64, 0x69, 0x6e, 0x67, 0x10, 0x06, 0x12, 0x0b, - 0x0a, 0x07, 0x53, 0x6f, 0x72, 0x74, 0x69, 0x6e, 0x67, 0x10, 0x07, 0x42, 0x35, 0x5a, 0x33, 0x67, - 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6d, 0x69, 0x6c, 0x76, 0x75, 0x73, - 0x2d, 0x69, 0x6f, 0x2f, 0x6d, 0x69, 0x6c, 0x76, 0x75, 0x73, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x76, - 0x32, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, - 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x08, 0x44, 0x51, 0x4c, 0x51, 0x75, 0x65, 0x72, 0x79, 0x10, 0x09, 0x12, 0x11, 0x0a, 0x09, 0x44, + 0x4d, 0x4c, 0x55, 0x70, 0x73, 0x65, 0x72, 0x74, 0x10, 0x0a, 0x1a, 0x02, 0x08, 0x01, 0x12, 0x09, + 0x0a, 0x05, 0x44, 0x44, 0x4c, 0x44, 0x42, 0x10, 0x0b, 0x2a, 0x83, 0x01, 0x0a, 0x0e, 0x49, 0x6d, + 0x70, 0x6f, 0x72, 0x74, 0x4a, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x08, 0x0a, 0x04, + 0x4e, 0x6f, 0x6e, 0x65, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, + 0x67, 0x10, 0x01, 0x12, 0x10, 0x0a, 0x0c, 0x50, 0x72, 0x65, 0x49, 0x6d, 0x70, 0x6f, 0x72, 0x74, + 0x69, 0x6e, 0x67, 0x10, 0x02, 0x12, 0x0d, 0x0a, 0x09, 0x49, 0x6d, 0x70, 0x6f, 0x72, 0x74, 0x69, + 0x6e, 0x67, 0x10, 0x03, 0x12, 0x0a, 0x0a, 0x06, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x10, 0x04, + 0x12, 0x0d, 0x0a, 0x09, 0x43, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x10, 0x05, 0x12, + 0x11, 0x0a, 0x0d, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x42, 0x75, 0x69, 0x6c, 0x64, 0x69, 0x6e, 0x67, + 0x10, 0x06, 0x12, 0x0b, 0x0a, 0x07, 0x53, 0x6f, 0x72, 0x74, 0x69, 0x6e, 0x67, 0x10, 0x07, 0x42, + 0x35, 0x5a, 0x33, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6d, 0x69, + 0x6c, 0x76, 0x75, 0x73, 0x2d, 0x69, 0x6f, 0x2f, 0x6d, 0x69, 0x6c, 0x76, 0x75, 0x73, 0x2f, 0x70, + 0x6b, 0x67, 0x2f, 0x76, 0x32, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x69, 0x6e, 0x74, 0x65, + 0x72, 0x6e, 0x61, 0x6c, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/pkg/util/paramtable/quota_param.go b/pkg/util/paramtable/quota_param.go index 38bee539ae..57589899e4 100644 --- a/pkg/util/paramtable/quota_param.go +++ b/pkg/util/paramtable/quota_param.go @@ -44,6 +44,7 @@ const ( type quotaConfig struct { QuotaAndLimitsEnabled ParamItem `refreshable:"false"` QuotaCenterCollectInterval ParamItem `refreshable:"false"` + FactorChangeThreshold ParamItem `refreshable:"true"` ForceDenyAllDDL ParamItem `refreshable:"true"` AllocRetryTimes ParamItem `refreshable:"false"` AllocWaitInterval ParamItem `refreshable:"false"` @@ -77,32 +78,24 @@ type quotaConfig struct { DMLLimitEnabled ParamItem `refreshable:"true"` DMLMaxInsertRate ParamItem `refreshable:"true"` DMLMinInsertRate ParamItem `refreshable:"true"` - DMLMaxUpsertRate ParamItem `refreshable:"true"` - DMLMinUpsertRate ParamItem `refreshable:"true"` DMLMaxDeleteRate ParamItem `refreshable:"true"` DMLMinDeleteRate ParamItem `refreshable:"true"` DMLMaxBulkLoadRate ParamItem `refreshable:"true"` DMLMinBulkLoadRate ParamItem `refreshable:"true"` DMLMaxInsertRatePerDB ParamItem `refreshable:"true"` DMLMinInsertRatePerDB ParamItem `refreshable:"true"` - DMLMaxUpsertRatePerDB ParamItem `refreshable:"true"` - DMLMinUpsertRatePerDB ParamItem `refreshable:"true"` DMLMaxDeleteRatePerDB ParamItem `refreshable:"true"` DMLMinDeleteRatePerDB ParamItem `refreshable:"true"` DMLMaxBulkLoadRatePerDB ParamItem `refreshable:"true"` DMLMinBulkLoadRatePerDB ParamItem `refreshable:"true"` DMLMaxInsertRatePerCollection ParamItem `refreshable:"true"` DMLMinInsertRatePerCollection ParamItem `refreshable:"true"` - DMLMaxUpsertRatePerCollection ParamItem `refreshable:"true"` - DMLMinUpsertRatePerCollection ParamItem `refreshable:"true"` DMLMaxDeleteRatePerCollection ParamItem `refreshable:"true"` DMLMinDeleteRatePerCollection ParamItem `refreshable:"true"` DMLMaxBulkLoadRatePerCollection ParamItem `refreshable:"true"` DMLMinBulkLoadRatePerCollection ParamItem `refreshable:"true"` DMLMaxInsertRatePerPartition ParamItem `refreshable:"true"` DMLMinInsertRatePerPartition ParamItem `refreshable:"true"` - DMLMaxUpsertRatePerPartition ParamItem `refreshable:"true"` - DMLMinUpsertRatePerPartition ParamItem `refreshable:"true"` DMLMaxDeleteRatePerPartition ParamItem `refreshable:"true"` DMLMinDeleteRatePerPartition ParamItem `refreshable:"true"` DMLMaxBulkLoadRatePerPartition ParamItem `refreshable:"true"` @@ -200,6 +193,26 @@ seconds, (0 ~ 65536)`, } p.QuotaCenterCollectInterval.Init(base.mgr) + const defaultFactorChangeThreshold = "0.05" + p.FactorChangeThreshold = ParamItem{ + Key: "quotaAndLimits.factorChangeThreshold", + Version: "2.6.7", + DefaultValue: defaultFactorChangeThreshold, + Formatter: func(v string) string { + threshold := getAsFloat(v) + // (0, 1] + if threshold <= 0 || threshold > 1 { + return defaultFactorChangeThreshold + } + return v + }, + Doc: `FactorChangeThreshold defines the minimum relative change in factor to trigger an update. +If the factor change is less than this threshold (e.g., 5%), the update is skipped +to reduce unnecessary proxy updates. Range: (0, 1]`, + Export: true, + } + p.FactorChangeThreshold.Init(base.mgr) + p.ForceDenyAllDDL = ParamItem{ Key: "quotaAndLimits.forceDenyAllDDL", Version: "2.5.8", @@ -716,182 +729,6 @@ To use this setting, set quotaAndLimits.dml.enabled to true at the same time.`, } p.DMLMinInsertRatePerPartition.Init(base.mgr) - p.DMLMaxUpsertRate = ParamItem{ - Key: "quotaAndLimits.dml.upsertRate.max", - Version: "2.3.0", - DefaultValue: max, - Formatter: func(v string) string { - if !p.DMLLimitEnabled.GetAsBool() { - return max - } - rate := getAsFloat(v) - if math.Abs(rate-defaultMax) > 0.001 { // maxRate != defaultMax - rate = megaBytes2Bytes(rate) - } - // [0, inf) - if rate < 0 { - return max - } - return fmt.Sprintf("%f", rate) - }, - Doc: "MB/s, default no limit", - Export: true, - } - p.DMLMaxUpsertRate.Init(base.mgr) - - p.DMLMinUpsertRate = ParamItem{ - Key: "quotaAndLimits.dml.UpsertRate.min", - Version: "2.3.0", - DefaultValue: min, - Formatter: func(v string) string { - if !p.DMLLimitEnabled.GetAsBool() { - return min - } - rate := megaBytes2Bytes(getAsFloat(v)) - // [0, inf) - if rate < 0 { - return min - } - if !p.checkMinMaxLegal(rate, p.DMLMaxUpsertRate.GetAsFloat()) { - return min - } - return fmt.Sprintf("%f", rate) - }, - } - p.DMLMinUpsertRate.Init(base.mgr) - - p.DMLMaxUpsertRatePerDB = ParamItem{ - Key: "quotaAndLimits.dml.upsertRate.db.max", - Version: "2.4.1", - DefaultValue: max, - Formatter: func(v string) string { - if !p.DMLLimitEnabled.GetAsBool() { - return max - } - rate := getAsFloat(v) - if math.Abs(rate-defaultMax) > 0.001 { // maxRate != defaultMax - rate = megaBytes2Bytes(rate) - } - // [0, inf) - if rate < 0 { - return p.DMLMaxUpsertRate.GetValue() - } - return fmt.Sprintf("%f", rate) - }, - Doc: "MB/s, default no limit", - Export: true, - } - p.DMLMaxUpsertRatePerDB.Init(base.mgr) - - p.DMLMinUpsertRatePerDB = ParamItem{ - Key: "quotaAndLimits.dml.upsertRate.db.min", - Version: "2.4.1", - DefaultValue: min, - Formatter: func(v string) string { - if !p.DMLLimitEnabled.GetAsBool() { - return min - } - rate := megaBytes2Bytes(getAsFloat(v)) - // [0, inf) - if rate < 0 { - return min - } - if !p.checkMinMaxLegal(rate, p.DMLMaxUpsertRatePerDB.GetAsFloat()) { - return min - } - return fmt.Sprintf("%f", rate) - }, - } - p.DMLMinUpsertRatePerDB.Init(base.mgr) - - p.DMLMaxUpsertRatePerCollection = ParamItem{ - Key: "quotaAndLimits.dml.upsertRate.collection.max", - Version: "2.3.0", - DefaultValue: max, - Formatter: func(v string) string { - if !p.DMLLimitEnabled.GetAsBool() { - return max - } - rate := getAsFloat(v) - if math.Abs(rate-defaultMax) > 0.001 { // maxRate != defaultMax - rate = megaBytes2Bytes(rate) - } - // [0, inf) - if rate < 0 { - return p.DMLMaxUpsertRate.GetValue() - } - return fmt.Sprintf("%f", rate) - }, - Doc: "MB/s, default no limit", - Export: true, - } - p.DMLMaxUpsertRatePerCollection.Init(base.mgr) - - p.DMLMinUpsertRatePerCollection = ParamItem{ - Key: "quotaAndLimits.dml.upsertRate.collection.min", - Version: "2.3.0", - DefaultValue: min, - Formatter: func(v string) string { - if !p.DMLLimitEnabled.GetAsBool() { - return min - } - rate := megaBytes2Bytes(getAsFloat(v)) - // [0, inf) - if rate < 0 { - return min - } - if !p.checkMinMaxLegal(rate, p.DMLMaxUpsertRatePerCollection.GetAsFloat()) { - return min - } - return fmt.Sprintf("%f", rate) - }, - } - p.DMLMinUpsertRatePerCollection.Init(base.mgr) - - p.DMLMaxUpsertRatePerPartition = ParamItem{ - Key: "quotaAndLimits.dml.upsertRate.partition.max", - Version: "2.4.1", - DefaultValue: max, - Formatter: func(v string) string { - if !p.DMLLimitEnabled.GetAsBool() { - return max - } - rate := getAsFloat(v) - if math.Abs(rate-defaultMax) > 0.001 { // maxRate != defaultMax - rate = megaBytes2Bytes(rate) - } - // [0, inf) - if rate < 0 { - return p.DMLMaxUpsertRate.GetValue() - } - return fmt.Sprintf("%f", rate) - }, - Doc: "MB/s, default no limit", - Export: true, - } - p.DMLMaxUpsertRatePerPartition.Init(base.mgr) - - p.DMLMinUpsertRatePerPartition = ParamItem{ - Key: "quotaAndLimits.dml.upsertRate.partition.min", - Version: "2.4.1", - DefaultValue: min, - Formatter: func(v string) string { - if !p.DMLLimitEnabled.GetAsBool() { - return min - } - rate := megaBytes2Bytes(getAsFloat(v)) - // [0, inf) - if rate < 0 { - return min - } - if !p.checkMinMaxLegal(rate, p.DMLMaxUpsertRatePerPartition.GetAsFloat()) { - return min - } - return fmt.Sprintf("%f", rate) - }, - } - p.DMLMinUpsertRatePerPartition.Init(base.mgr) - p.DMLMaxDeleteRate = ParamItem{ Key: "quotaAndLimits.dml.deleteRate.max", Version: "2.2.0", diff --git a/pkg/util/paramtable/quota_param_test.go b/pkg/util/paramtable/quota_param_test.go index e2d09677ae..408dc72db4 100644 --- a/pkg/util/paramtable/quota_param_test.go +++ b/pkg/util/paramtable/quota_param_test.go @@ -70,8 +70,6 @@ func TestQuotaParam(t *testing.T) { params.Save(params.QuotaConfig.DMLLimitEnabled.Key, "true") params.Save(params.QuotaConfig.DMLMaxInsertRate.Key, "10") params.Save(params.QuotaConfig.DMLMinInsertRate.Key, "1") - params.Save(params.QuotaConfig.DMLMaxUpsertRate.Key, "10") - params.Save(params.QuotaConfig.DMLMinUpsertRate.Key, "1") params.Save(params.QuotaConfig.DMLMaxDeleteRate.Key, "10") params.Save(params.QuotaConfig.DMLMinDeleteRate.Key, "1") params.Save(params.QuotaConfig.DMLMaxBulkLoadRate.Key, "10") @@ -79,8 +77,6 @@ func TestQuotaParam(t *testing.T) { assert.Equal(t, true, params.QuotaConfig.DMLLimitEnabled.GetAsBool()) assert.Equal(t, float64(10)*1024*1024, params.QuotaConfig.DMLMaxInsertRate.GetAsFloat()) assert.Equal(t, float64(1)*1024*1024, params.QuotaConfig.DMLMinInsertRate.GetAsFloat()) - assert.Equal(t, float64(10)*1024*1024, params.QuotaConfig.DMLMaxUpsertRate.GetAsFloat()) - assert.Equal(t, float64(1)*1024*1024, params.QuotaConfig.DMLMinUpsertRate.GetAsFloat()) assert.Equal(t, float64(10)*1024*1024, params.QuotaConfig.DMLMaxDeleteRate.GetAsFloat()) assert.Equal(t, float64(1)*1024*1024, params.QuotaConfig.DMLMinDeleteRate.GetAsFloat()) assert.Equal(t, float64(10)*1024*1024, params.QuotaConfig.DMLMaxBulkLoadRate.GetAsFloat()) @@ -92,8 +88,6 @@ func TestQuotaParam(t *testing.T) { params.Save(params.QuotaConfig.DMLLimitEnabled.Key, "true") params.Save(params.QuotaConfig.DMLMaxInsertRatePerCollection.Key, "10") params.Save(params.QuotaConfig.DMLMinInsertRatePerCollection.Key, "1") - params.Save(params.QuotaConfig.DMLMaxUpsertRatePerCollection.Key, "10") - params.Save(params.QuotaConfig.DMLMinUpsertRatePerCollection.Key, "1") params.Save(params.QuotaConfig.DMLMaxDeleteRatePerCollection.Key, "10") params.Save(params.QuotaConfig.DMLMinDeleteRatePerCollection.Key, "1") params.Save(params.QuotaConfig.DMLMaxBulkLoadRatePerCollection.Key, "10") @@ -101,8 +95,6 @@ func TestQuotaParam(t *testing.T) { assert.Equal(t, true, params.QuotaConfig.DMLLimitEnabled.GetAsBool()) assert.Equal(t, float64(10)*1024*1024, params.QuotaConfig.DMLMaxInsertRatePerCollection.GetAsFloat()) assert.Equal(t, float64(1)*1024*1024, params.QuotaConfig.DMLMinInsertRatePerCollection.GetAsFloat()) - assert.Equal(t, float64(10)*1024*1024, params.QuotaConfig.DMLMaxUpsertRatePerCollection.GetAsFloat()) - assert.Equal(t, float64(1)*1024*1024, params.QuotaConfig.DMLMinUpsertRatePerCollection.GetAsFloat()) assert.Equal(t, float64(10)*1024*1024, params.QuotaConfig.DMLMaxDeleteRatePerCollection.GetAsFloat()) assert.Equal(t, float64(1)*1024*1024, params.QuotaConfig.DMLMinDeleteRatePerCollection.GetAsFloat()) assert.Equal(t, float64(10)*1024*1024, params.QuotaConfig.DMLMaxBulkLoadRatePerCollection.GetAsFloat()) @@ -111,16 +103,12 @@ func TestQuotaParam(t *testing.T) { // test only set global rate limit params.Save(params.QuotaConfig.DMLMaxInsertRatePerCollection.Key, "-1") params.Save(params.QuotaConfig.DMLMinInsertRatePerCollection.Key, "-1") - params.Save(params.QuotaConfig.DMLMaxUpsertRatePerCollection.Key, "-1") - params.Save(params.QuotaConfig.DMLMinUpsertRatePerCollection.Key, "-1") params.Save(params.QuotaConfig.DMLMaxDeleteRatePerCollection.Key, "-1") params.Save(params.QuotaConfig.DMLMinDeleteRatePerCollection.Key, "-1") params.Save(params.QuotaConfig.DMLMaxBulkLoadRatePerCollection.Key, "-1") params.Save(params.QuotaConfig.DMLMinBulkLoadRatePerCollection.Key, "-1") assert.Equal(t, params.QuotaConfig.DMLMaxInsertRate.GetAsFloat(), params.QuotaConfig.DMLMaxInsertRatePerCollection.GetAsFloat()) assert.Equal(t, float64(0), params.QuotaConfig.DMLMinInsertRatePerCollection.GetAsFloat()) - assert.Equal(t, params.QuotaConfig.DMLMaxUpsertRate.GetAsFloat(), params.QuotaConfig.DMLMaxInsertRatePerCollection.GetAsFloat()) - assert.Equal(t, float64(0), params.QuotaConfig.DMLMinUpsertRatePerCollection.GetAsFloat()) assert.Equal(t, params.QuotaConfig.DMLMaxDeleteRate.GetAsFloat(), params.QuotaConfig.DMLMaxDeleteRatePerCollection.GetAsFloat()) assert.Equal(t, float64(0), params.QuotaConfig.DMLMinDeleteRatePerCollection.GetAsFloat()) assert.Equal(t, params.QuotaConfig.DMLMaxBulkLoadRate.GetAsFloat(), params.QuotaConfig.DMLMaxBulkLoadRatePerCollection.GetAsFloat()) @@ -129,16 +117,12 @@ func TestQuotaParam(t *testing.T) { // test invalid config value params.Save(params.QuotaConfig.DMLMaxInsertRatePerCollection.Key, "1") params.Save(params.QuotaConfig.DMLMinInsertRatePerCollection.Key, "5") - params.Save(params.QuotaConfig.DMLMaxUpsertRatePerCollection.Key, "1") - params.Save(params.QuotaConfig.DMLMinUpsertRatePerCollection.Key, "5") params.Save(params.QuotaConfig.DMLMaxDeleteRatePerCollection.Key, "1") params.Save(params.QuotaConfig.DMLMinDeleteRatePerCollection.Key, "5") params.Save(params.QuotaConfig.DMLMaxBulkLoadRatePerCollection.Key, "1") params.Save(params.QuotaConfig.DMLMinBulkLoadRatePerCollection.Key, "5") assert.Equal(t, float64(1*1024*1024), params.QuotaConfig.DMLMaxInsertRatePerCollection.GetAsFloat()) assert.Equal(t, float64(0), params.QuotaConfig.DMLMinInsertRatePerCollection.GetAsFloat()) - assert.Equal(t, float64(1*1024*1024), params.QuotaConfig.DMLMaxUpsertRatePerCollection.GetAsFloat()) - assert.Equal(t, float64(0), params.QuotaConfig.DMLMinUpsertRatePerCollection.GetAsFloat()) assert.Equal(t, float64(1*1024*1024), params.QuotaConfig.DMLMaxDeleteRatePerCollection.GetAsFloat()) assert.Equal(t, float64(0), params.QuotaConfig.DMLMinDeleteRatePerCollection.GetAsFloat()) assert.Equal(t, float64(1*1024*1024), params.QuotaConfig.DMLMaxBulkLoadRatePerCollection.GetAsFloat())