diff --git a/configs/milvus.yaml b/configs/milvus.yaml index fbe1d0463f..b3b56f4341 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -633,6 +633,8 @@ quotaAndLimits: flushRate: enabled: false max: -1 # qps, default no limit, rate for flush + collection: + max: -1 # qps, default no limit, rate for flush at collection level. compactionRate: enabled: false max: -1 # qps, default no limit, rate for manualCompaction diff --git a/internal/proxy/multi_rate_limiter.go b/internal/proxy/multi_rate_limiter.go index d1cf2adae2..7744c4d38b 100644 --- a/internal/proxy/multi_rate_limiter.go +++ b/internal/proxy/multi_rate_limiter.go @@ -69,7 +69,7 @@ func NewMultiRateLimiter() *MultiRateLimiter { } // Check checks if request would be limited or denied. -func (m *MultiRateLimiter) Check(collectionID int64, rt internalpb.RateType, n int) error { +func (m *MultiRateLimiter) Check(collectionIDs []int64, rt internalpb.RateType, n int) error { if !Params.QuotaConfig.QuotaAndLimitsEnabled.GetAsBool() { return nil } @@ -96,21 +96,31 @@ func (m *MultiRateLimiter) Check(collectionID int64, rt internalpb.RateType, n i ret := checkFunc(m.globalDDLLimiter) // second check collection level rate limits - if ret == nil && !IsDDLRequest(rt) { - // only dml and dql have collection level rate limits - ret = checkFunc(m.collectionLimiters[collectionID]) - if ret != nil { - m.globalDDLLimiter.cancel(rt, n) + // only dml, dql and flush have collection level rate limits + if ret == nil && len(collectionIDs) > 0 && !isNotCollectionLevelLimitRequest(rt) { + // store done limiters to cancel them when error occurs. + doneLimiters := make([]*rateLimiter, 0, len(collectionIDs)+1) + doneLimiters = append(doneLimiters, m.globalDDLLimiter) + + for _, collectionID := range collectionIDs { + ret = checkFunc(m.collectionLimiters[collectionID]) + if ret != nil { + for _, limiter := range doneLimiters { + limiter.cancel(rt, n) + } + break + } + doneLimiters = append(doneLimiters, m.collectionLimiters[collectionID]) } } - return ret } -func IsDDLRequest(rt internalpb.RateType) bool { +func isNotCollectionLevelLimitRequest(rt internalpb.RateType) bool { + // Most ddl is global level, only DDLFlush will be applied at collection switch rt { case internalpb.RateType_DDLCollection, internalpb.RateType_DDLPartition, internalpb.RateType_DDLIndex, - internalpb.RateType_DDLFlush, internalpb.RateType_DDLCompaction: + internalpb.RateType_DDLCompaction: return true default: return false @@ -291,7 +301,11 @@ func (rl *rateLimiter) registerLimiters(globalLevel bool) { case internalpb.RateType_DDLIndex: r = "aConfig.MaxIndexRate case internalpb.RateType_DDLFlush: - r = "aConfig.MaxFlushRate + if globalLevel { + r = "aConfig.MaxFlushRate + } else { + r = "aConfig.MaxFlushRatePerCollection + } case internalpb.RateType_DDLCompaction: r = "aConfig.MaxCompactionRate case internalpb.RateType_DMLInsert: diff --git a/internal/proxy/multi_rate_limiter_test.go b/internal/proxy/multi_rate_limiter_test.go index 2dd53844dd..b8c4f53f16 100644 --- a/internal/proxy/multi_rate_limiter_test.go +++ b/internal/proxy/multi_rate_limiter_test.go @@ -44,26 +44,26 @@ func TestMultiRateLimiter(t *testing.T) { multiLimiter := NewMultiRateLimiter() multiLimiter.collectionLimiters[collectionID] = newRateLimiter(false) for _, rt := range internalpb.RateType_value { - if IsDDLRequest(internalpb.RateType(rt)) { + if isNotCollectionLevelLimitRequest(internalpb.RateType(rt)) { multiLimiter.globalDDLLimiter.limiters.Insert(internalpb.RateType(rt), ratelimitutil.NewLimiter(ratelimitutil.Limit(5), 1)) } else { multiLimiter.collectionLimiters[collectionID].limiters.Insert(internalpb.RateType(rt), ratelimitutil.NewLimiter(ratelimitutil.Limit(1000), 1)) } } for _, rt := range internalpb.RateType_value { - if IsDDLRequest(internalpb.RateType(rt)) { - err := multiLimiter.Check(collectionID, internalpb.RateType(rt), 1) + if isNotCollectionLevelLimitRequest(internalpb.RateType(rt)) { + err := multiLimiter.Check([]int64{collectionID}, internalpb.RateType(rt), 1) assert.NoError(t, err) - err = multiLimiter.Check(collectionID, internalpb.RateType(rt), 5) + err = multiLimiter.Check([]int64{collectionID}, internalpb.RateType(rt), 5) assert.NoError(t, err) - err = multiLimiter.Check(collectionID, internalpb.RateType(rt), 5) + err = multiLimiter.Check([]int64{collectionID}, internalpb.RateType(rt), 5) assert.ErrorIs(t, err, merr.ErrServiceRateLimit) } else { - err := multiLimiter.Check(collectionID, internalpb.RateType(rt), 1) + err := multiLimiter.Check([]int64{collectionID}, internalpb.RateType(rt), 1) assert.NoError(t, err) - err = multiLimiter.Check(collectionID, internalpb.RateType(rt), math.MaxInt) + err = multiLimiter.Check([]int64{collectionID}, internalpb.RateType(rt), math.MaxInt) assert.NoError(t, err) - err = multiLimiter.Check(collectionID, internalpb.RateType(rt), math.MaxInt) + err = multiLimiter.Check([]int64{collectionID}, internalpb.RateType(rt), math.MaxInt) assert.ErrorIs(t, err, merr.ErrServiceRateLimit) } } @@ -78,7 +78,7 @@ func TestMultiRateLimiter(t *testing.T) { multiLimiter.collectionLimiters[2] = newRateLimiter(false) multiLimiter.collectionLimiters[3] = newRateLimiter(false) for _, rt := range internalpb.RateType_value { - if IsDDLRequest(internalpb.RateType(rt)) { + if isNotCollectionLevelLimitRequest(internalpb.RateType(rt)) { multiLimiter.globalDDLLimiter.limiters.Insert(internalpb.RateType(rt), ratelimitutil.NewLimiter(ratelimitutil.Limit(5), 1)) } else { multiLimiter.globalDDLLimiter.limiters.Insert(internalpb.RateType(rt), ratelimitutil.NewLimiter(ratelimitutil.Limit(2), 1)) @@ -88,19 +88,26 @@ func TestMultiRateLimiter(t *testing.T) { } } for _, rt := range internalpb.RateType_value { - if IsDDLRequest(internalpb.RateType(rt)) { - err := multiLimiter.Check(1, internalpb.RateType(rt), 1) + if internalpb.RateType(rt) == internalpb.RateType_DDLFlush { + err := multiLimiter.Check([]int64{1, 2, 3}, internalpb.RateType(rt), 1) assert.NoError(t, err) - err = multiLimiter.Check(1, internalpb.RateType(rt), 5) + err = multiLimiter.Check([]int64{1, 2, 3}, internalpb.RateType(rt), 5) assert.NoError(t, err) - err = multiLimiter.Check(1, internalpb.RateType(rt), 5) + err = multiLimiter.Check([]int64{1, 2, 3}, internalpb.RateType(rt), 5) + assert.ErrorIs(t, err, merr.ErrServiceRateLimit) + } else if isNotCollectionLevelLimitRequest(internalpb.RateType(rt)) { + err := multiLimiter.Check([]int64{1}, internalpb.RateType(rt), 1) + assert.NoError(t, err) + err = multiLimiter.Check([]int64{1}, internalpb.RateType(rt), 5) + assert.NoError(t, err) + err = multiLimiter.Check([]int64{1}, internalpb.RateType(rt), 5) assert.ErrorIs(t, err, merr.ErrServiceRateLimit) } else { - err := multiLimiter.Check(1, internalpb.RateType(rt), 1) + err := multiLimiter.Check([]int64{1}, internalpb.RateType(rt), 1) assert.NoError(t, err) - err = multiLimiter.Check(2, internalpb.RateType(rt), 1) + err = multiLimiter.Check([]int64{2}, internalpb.RateType(rt), 1) assert.NoError(t, err) - err = multiLimiter.Check(3, internalpb.RateType(rt), 1) + err = multiLimiter.Check([]int64{3}, internalpb.RateType(rt), 1) assert.ErrorIs(t, err, merr.ErrServiceRateLimit) } } @@ -113,7 +120,7 @@ func TestMultiRateLimiter(t *testing.T) { bak := Params.QuotaConfig.QuotaAndLimitsEnabled.GetValue() paramtable.Get().Save(Params.QuotaConfig.QuotaAndLimitsEnabled.Key, "false") for _, rt := range internalpb.RateType_value { - err := multiLimiter.Check(collectionID, internalpb.RateType(rt), 1) + err := multiLimiter.Check([]int64{collectionID}, internalpb.RateType(rt), 1) assert.NoError(t, err) } Params.Save(Params.QuotaConfig.QuotaAndLimitsEnabled.Key, bak) @@ -126,7 +133,7 @@ func TestMultiRateLimiter(t *testing.T) { multiLimiter := NewMultiRateLimiter() bak := Params.QuotaConfig.QuotaAndLimitsEnabled.GetValue() paramtable.Get().Save(Params.QuotaConfig.QuotaAndLimitsEnabled.Key, "true") - err := multiLimiter.Check(collectionID, internalpb.RateType_DMLInsert, 1*1024*1024) + err := multiLimiter.Check([]int64{collectionID}, internalpb.RateType_DMLInsert, 1*1024*1024) assert.NoError(t, err) Params.Save(Params.QuotaConfig.QuotaAndLimitsEnabled.Key, bak) Params.Save(Params.QuotaConfig.DMLMaxInsertRate.Key, bakInsertRate) diff --git a/internal/proxy/rate_limit_interceptor.go b/internal/proxy/rate_limit_interceptor.go index ad79ba58f7..15a019286e 100644 --- a/internal/proxy/rate_limit_interceptor.go +++ b/internal/proxy/rate_limit_interceptor.go @@ -36,12 +36,12 @@ import ( // RateLimitInterceptor returns a new unary server interceptors that performs request rate limiting. func RateLimitInterceptor(limiter types.Limiter) grpc.UnaryServerInterceptor { return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { - collectionID, rt, n, err := getRequestInfo(req) + collectionIDs, rt, n, err := getRequestInfo(req) if err != nil { return handler(ctx, req) } - err = limiter.Check(collectionID, rt, n) + err = limiter.Check(collectionIDs, rt, n) nodeID := strconv.FormatInt(paramtable.GetNodeID(), 10) metrics.ProxyRateLimitReqCount.WithLabelValues(nodeID, rt.String(), metrics.TotalLabel).Inc() if err != nil { @@ -57,66 +57,71 @@ func RateLimitInterceptor(limiter types.Limiter) grpc.UnaryServerInterceptor { } // getRequestInfo returns collection name and rateType of request and return tokens needed. -func getRequestInfo(req interface{}) (int64, internalpb.RateType, int, error) { +func getRequestInfo(req interface{}) ([]int64, internalpb.RateType, int, error) { switch r := req.(type) { case *milvuspb.InsertRequest: collectionID, _ := globalMetaCache.GetCollectionID(context.TODO(), r.GetDbName(), r.GetCollectionName()) - return collectionID, internalpb.RateType_DMLInsert, proto.Size(r), nil + return []int64{collectionID}, internalpb.RateType_DMLInsert, proto.Size(r), nil case *milvuspb.UpsertRequest: collectionID, _ := globalMetaCache.GetCollectionID(context.TODO(), r.GetDbName(), r.GetCollectionName()) - return collectionID, internalpb.RateType_DMLUpsert, proto.Size(r), nil + return []int64{collectionID}, internalpb.RateType_DMLUpsert, proto.Size(r), nil case *milvuspb.DeleteRequest: collectionID, _ := globalMetaCache.GetCollectionID(context.TODO(), r.GetDbName(), r.GetCollectionName()) - return collectionID, internalpb.RateType_DMLDelete, proto.Size(r), nil + return []int64{collectionID}, internalpb.RateType_DMLDelete, proto.Size(r), nil case *milvuspb.ImportRequest: collectionID, _ := globalMetaCache.GetCollectionID(context.TODO(), r.GetDbName(), r.GetCollectionName()) - return collectionID, internalpb.RateType_DMLBulkLoad, proto.Size(r), nil + return []int64{collectionID}, internalpb.RateType_DMLBulkLoad, proto.Size(r), nil case *milvuspb.SearchRequest: collectionID, _ := globalMetaCache.GetCollectionID(context.TODO(), r.GetDbName(), r.GetCollectionName()) - return collectionID, internalpb.RateType_DQLSearch, int(r.GetNq()), nil + return []int64{collectionID}, internalpb.RateType_DQLSearch, int(r.GetNq()), nil case *milvuspb.QueryRequest: collectionID, _ := globalMetaCache.GetCollectionID(context.TODO(), r.GetDbName(), r.GetCollectionName()) - return collectionID, internalpb.RateType_DQLQuery, 1, nil // think of the query request's nq as 1 + return []int64{collectionID}, internalpb.RateType_DQLQuery, 1, nil // think of the query request's nq as 1 case *milvuspb.CreateCollectionRequest: collectionID, _ := globalMetaCache.GetCollectionID(context.TODO(), r.GetDbName(), r.GetCollectionName()) - return collectionID, internalpb.RateType_DDLCollection, 1, nil + return []int64{collectionID}, internalpb.RateType_DDLCollection, 1, nil case *milvuspb.DropCollectionRequest: collectionID, _ := globalMetaCache.GetCollectionID(context.TODO(), r.GetDbName(), r.GetCollectionName()) - return collectionID, internalpb.RateType_DDLCollection, 1, nil + return []int64{collectionID}, internalpb.RateType_DDLCollection, 1, nil case *milvuspb.LoadCollectionRequest: collectionID, _ := globalMetaCache.GetCollectionID(context.TODO(), r.GetDbName(), r.GetCollectionName()) - return collectionID, internalpb.RateType_DDLCollection, 1, nil + return []int64{collectionID}, internalpb.RateType_DDLCollection, 1, nil case *milvuspb.ReleaseCollectionRequest: collectionID, _ := globalMetaCache.GetCollectionID(context.TODO(), r.GetDbName(), r.GetCollectionName()) - return collectionID, internalpb.RateType_DDLCollection, 1, nil + return []int64{collectionID}, internalpb.RateType_DDLCollection, 1, nil case *milvuspb.CreatePartitionRequest: collectionID, _ := globalMetaCache.GetCollectionID(context.TODO(), r.GetDbName(), r.GetCollectionName()) - return collectionID, internalpb.RateType_DDLPartition, 1, nil + return []int64{collectionID}, internalpb.RateType_DDLPartition, 1, nil case *milvuspb.DropPartitionRequest: collectionID, _ := globalMetaCache.GetCollectionID(context.TODO(), r.GetDbName(), r.GetCollectionName()) - return collectionID, internalpb.RateType_DDLPartition, 1, nil + return []int64{collectionID}, internalpb.RateType_DDLPartition, 1, nil case *milvuspb.LoadPartitionsRequest: collectionID, _ := globalMetaCache.GetCollectionID(context.TODO(), r.GetDbName(), r.GetCollectionName()) - return collectionID, internalpb.RateType_DDLPartition, 1, nil + return []int64{collectionID}, internalpb.RateType_DDLPartition, 1, nil case *milvuspb.ReleasePartitionsRequest: collectionID, _ := globalMetaCache.GetCollectionID(context.TODO(), r.GetDbName(), r.GetCollectionName()) - return collectionID, internalpb.RateType_DDLPartition, 1, nil + return []int64{collectionID}, internalpb.RateType_DDLPartition, 1, nil case *milvuspb.CreateIndexRequest: collectionID, _ := globalMetaCache.GetCollectionID(context.TODO(), r.GetDbName(), r.GetCollectionName()) - return collectionID, internalpb.RateType_DDLIndex, 1, nil + return []int64{collectionID}, internalpb.RateType_DDLIndex, 1, nil case *milvuspb.DropIndexRequest: collectionID, _ := globalMetaCache.GetCollectionID(context.TODO(), r.GetDbName(), r.GetCollectionName()) - return collectionID, internalpb.RateType_DDLIndex, 1, nil + return []int64{collectionID}, internalpb.RateType_DDLIndex, 1, nil case *milvuspb.FlushRequest: - return 0, internalpb.RateType_DDLFlush, 1, nil + collectionIDs := make([]int64, 0, len(r.GetCollectionNames())) + for _, collectionName := range r.GetCollectionNames() { + collectionID, _ := globalMetaCache.GetCollectionID(context.TODO(), r.GetDbName(), collectionName) + collectionIDs = append(collectionIDs, collectionID) + } + return collectionIDs, internalpb.RateType_DDLFlush, 1, nil case *milvuspb.ManualCompactionRequest: - return 0, internalpb.RateType_DDLCompaction, 1, nil + return nil, internalpb.RateType_DDLCompaction, 1, nil // TODO: support more request default: if req == nil { - return 0, 0, 0, fmt.Errorf("null request") + return nil, 0, 0, fmt.Errorf("null request") } - return 0, 0, 0, fmt.Errorf("unsupported request type %s", reflect.TypeOf(req).Name()) + return nil, 0, 0, fmt.Errorf("unsupported request type %s", reflect.TypeOf(req).Name()) } } diff --git a/internal/proxy/rate_limit_interceptor_test.go b/internal/proxy/rate_limit_interceptor_test.go index 45cfe64042..1271587620 100644 --- a/internal/proxy/rate_limit_interceptor_test.go +++ b/internal/proxy/rate_limit_interceptor_test.go @@ -38,7 +38,7 @@ type limiterMock struct { quotaStateReasons []commonpb.ErrorCode } -func (l *limiterMock) Check(collection int64, rt internalpb.RateType, n int) error { +func (l *limiterMock) Check(collection []int64, rt internalpb.RateType, n int) error { if l.rate == 0 { return merr.ErrServiceQuotaExceeded } @@ -61,108 +61,109 @@ func TestRateLimitInterceptor(t *testing.T) { assert.NoError(t, err) assert.Equal(t, proto.Size(&milvuspb.InsertRequest{}), size) assert.Equal(t, internalpb.RateType_DMLInsert, rt) - assert.Equal(t, collection, int64(0)) + assert.ElementsMatch(t, collection, []int64{int64(0)}) collection, rt, size, err = getRequestInfo(&milvuspb.UpsertRequest{}) assert.NoError(t, err) assert.Equal(t, proto.Size(&milvuspb.InsertRequest{}), size) assert.Equal(t, internalpb.RateType_DMLUpsert, rt) - assert.Equal(t, collection, int64(0)) + assert.ElementsMatch(t, collection, []int64{int64(0)}) collection, rt, size, err = getRequestInfo(&milvuspb.DeleteRequest{}) assert.NoError(t, err) assert.Equal(t, proto.Size(&milvuspb.DeleteRequest{}), size) assert.Equal(t, internalpb.RateType_DMLDelete, rt) - assert.Equal(t, collection, int64(0)) + assert.ElementsMatch(t, collection, []int64{int64(0)}) collection, rt, size, err = getRequestInfo(&milvuspb.ImportRequest{}) assert.NoError(t, err) assert.Equal(t, proto.Size(&milvuspb.ImportRequest{}), size) assert.Equal(t, internalpb.RateType_DMLBulkLoad, rt) - assert.Equal(t, collection, int64(0)) + assert.ElementsMatch(t, collection, []int64{int64(0)}) collection, rt, size, err = getRequestInfo(&milvuspb.SearchRequest{Nq: 5}) assert.NoError(t, err) assert.Equal(t, 5, size) assert.Equal(t, internalpb.RateType_DQLSearch, rt) - assert.Equal(t, collection, int64(0)) + assert.ElementsMatch(t, collection, []int64{int64(0)}) collection, rt, size, err = getRequestInfo(&milvuspb.QueryRequest{}) assert.NoError(t, err) assert.Equal(t, 1, size) assert.Equal(t, internalpb.RateType_DQLQuery, rt) - assert.Equal(t, collection, int64(0)) + assert.ElementsMatch(t, collection, []int64{int64(0)}) collection, rt, size, err = getRequestInfo(&milvuspb.CreateCollectionRequest{}) assert.NoError(t, err) assert.Equal(t, 1, size) assert.Equal(t, internalpb.RateType_DDLCollection, rt) - assert.Equal(t, collection, int64(0)) + assert.ElementsMatch(t, collection, []int64{int64(0)}) collection, rt, size, err = getRequestInfo(&milvuspb.LoadCollectionRequest{}) assert.NoError(t, err) assert.Equal(t, 1, size) assert.Equal(t, internalpb.RateType_DDLCollection, rt) - assert.Equal(t, collection, int64(0)) + assert.ElementsMatch(t, collection, []int64{int64(0)}) collection, rt, size, err = getRequestInfo(&milvuspb.ReleaseCollectionRequest{}) assert.NoError(t, err) assert.Equal(t, 1, size) assert.Equal(t, internalpb.RateType_DDLCollection, rt) - assert.Equal(t, collection, int64(0)) + assert.ElementsMatch(t, collection, []int64{int64(0)}) collection, rt, size, err = getRequestInfo(&milvuspb.DropCollectionRequest{}) assert.NoError(t, err) assert.Equal(t, 1, size) assert.Equal(t, internalpb.RateType_DDLCollection, rt) - assert.Equal(t, collection, int64(0)) + assert.ElementsMatch(t, collection, []int64{int64(0)}) collection, rt, size, err = getRequestInfo(&milvuspb.CreatePartitionRequest{}) assert.NoError(t, err) assert.Equal(t, 1, size) assert.Equal(t, internalpb.RateType_DDLPartition, rt) - assert.Equal(t, collection, int64(0)) + assert.ElementsMatch(t, collection, []int64{int64(0)}) collection, rt, size, err = getRequestInfo(&milvuspb.LoadPartitionsRequest{}) assert.NoError(t, err) assert.Equal(t, 1, size) assert.Equal(t, internalpb.RateType_DDLPartition, rt) - assert.Equal(t, collection, int64(0)) + assert.ElementsMatch(t, collection, []int64{int64(0)}) collection, rt, size, err = getRequestInfo(&milvuspb.ReleasePartitionsRequest{}) assert.NoError(t, err) assert.Equal(t, 1, size) assert.Equal(t, internalpb.RateType_DDLPartition, rt) - assert.Equal(t, collection, int64(0)) + assert.ElementsMatch(t, collection, []int64{int64(0)}) collection, rt, size, err = getRequestInfo(&milvuspb.DropPartitionRequest{}) assert.NoError(t, err) assert.Equal(t, 1, size) assert.Equal(t, internalpb.RateType_DDLPartition, rt) - assert.Equal(t, collection, int64(0)) + assert.ElementsMatch(t, collection, []int64{int64(0)}) collection, rt, size, err = getRequestInfo(&milvuspb.CreateIndexRequest{}) assert.NoError(t, err) assert.Equal(t, 1, size) assert.Equal(t, internalpb.RateType_DDLIndex, rt) - assert.Equal(t, collection, int64(0)) + assert.ElementsMatch(t, collection, []int64{int64(0)}) collection, rt, size, err = getRequestInfo(&milvuspb.DropIndexRequest{}) assert.NoError(t, err) assert.Equal(t, 1, size) assert.Equal(t, internalpb.RateType_DDLIndex, rt) - assert.Equal(t, collection, int64(0)) + assert.ElementsMatch(t, collection, []int64{int64(0)}) - _, rt, size, err = getRequestInfo(&milvuspb.FlushRequest{}) + collection, rt, size, err = getRequestInfo(&milvuspb.FlushRequest{}) assert.NoError(t, err) assert.Equal(t, 1, size) assert.Equal(t, internalpb.RateType_DDLFlush, rt) + assert.Len(t, collection, 0) collection, rt, size, err = getRequestInfo(&milvuspb.ManualCompactionRequest{}) assert.NoError(t, err) assert.Equal(t, 1, size) assert.Equal(t, internalpb.RateType_DDLCompaction, rt) - assert.Equal(t, collection, int64(0)) + assert.Len(t, collection, 0) }) t.Run("test getFailedResponse", func(t *testing.T) { diff --git a/internal/types/types.go b/internal/types/types.go index ad4021bc33..3239ed8b81 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -37,7 +37,7 @@ import ( // If Limit function return true, the request will be rejected. // Otherwise, the request will pass. Limit also returns limit of limiter. type Limiter interface { - Check(collectionID int64, rt internalpb.RateType, n int) error + Check(collectionIDs []int64, rt internalpb.RateType, n int) error } // Component is the interface all services implement diff --git a/pkg/util/paramtable/quota_param.go b/pkg/util/paramtable/quota_param.go index 172a4a14c7..f38af39bc1 100644 --- a/pkg/util/paramtable/quota_param.go +++ b/pkg/util/paramtable/quota_param.go @@ -54,8 +54,9 @@ type quotaConfig struct { IndexLimitEnabled ParamItem `refreshable:"true"` MaxIndexRate ParamItem `refreshable:"true"` - FlushLimitEnabled ParamItem `refreshable:"true"` - MaxFlushRate ParamItem `refreshable:"true"` + FlushLimitEnabled ParamItem `refreshable:"true"` + MaxFlushRate ParamItem `refreshable:"true"` + MaxFlushRatePerCollection ParamItem `refreshable:"true"` CompactionLimitEnabled ParamItem `refreshable:"true"` MaxCompactionRate ParamItem `refreshable:"true"` @@ -257,6 +258,25 @@ seconds, (0 ~ 65536)`, } p.MaxFlushRate.Init(base.mgr) + p.MaxFlushRatePerCollection = ParamItem{ + Key: "quotaAndLimits.flushRate.collection.max", + Version: "2.3.9", + DefaultValue: "-1", + Formatter: func(v string) string { + if !p.FlushLimitEnabled.GetAsBool() { + return max + } + // [0 ~ Inf) + if getAsInt(v) < 0 { + return max + } + return v + }, + Doc: "qps, default no limit, rate for flush at collection level.", + Export: true, + } + p.MaxFlushRatePerCollection.Init(base.mgr) + p.CompactionLimitEnabled = ParamItem{ Key: "quotaAndLimits.compactionRate.enabled", Version: "2.2.0", diff --git a/pkg/util/paramtable/quota_param_test.go b/pkg/util/paramtable/quota_param_test.go index 8387f83650..0b06ddf656 100644 --- a/pkg/util/paramtable/quota_param_test.go +++ b/pkg/util/paramtable/quota_param_test.go @@ -41,7 +41,8 @@ func TestQuotaParam(t *testing.T) { t.Run("test functional params", func(t *testing.T) { assert.Equal(t, false, qc.IndexLimitEnabled.GetAsBool()) assert.Equal(t, defaultMax, qc.MaxIndexRate.GetAsFloat()) - assert.Equal(t, false, qc.FlushLimitEnabled.GetAsBool()) + assert.False(t, qc.FlushLimitEnabled.GetAsBool()) + assert.Equal(t, defaultMax, qc.MaxFlushRatePerCollection.GetAsFloat()) assert.Equal(t, defaultMax, qc.MaxFlushRate.GetAsFloat()) assert.Equal(t, false, qc.CompactionLimitEnabled.GetAsBool()) assert.Equal(t, defaultMax, qc.MaxCompactionRate.GetAsFloat())