diff --git a/internal/rootcoord/quota_center.go b/internal/rootcoord/quota_center.go index b972b896b0..8e1a9d20dd 100644 --- a/internal/rootcoord/quota_center.go +++ b/internal/rootcoord/quota_center.go @@ -33,6 +33,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/proxypb" "github.com/milvus-io/milvus/internal/tso" "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/commonpbutil" @@ -88,6 +89,7 @@ type QuotaCenter struct { proxies *proxyClientManager queryCoord types.QueryCoord dataCoord types.DataCoord + meta IMetaTable // metrics queryNodeMetrics map[UniqueID]*metricsinfo.QueryNodeQuotaMetrics @@ -111,7 +113,7 @@ type QuotaCenter struct { } // NewQuotaCenter returns a new QuotaCenter. -func NewQuotaCenter(proxies *proxyClientManager, queryCoord types.QueryCoord, dataCoord types.DataCoord, tsoAllocator tso.Allocator) *QuotaCenter { +func NewQuotaCenter(proxies *proxyClientManager, queryCoord types.QueryCoord, dataCoord types.DataCoord, tsoAllocator tso.Allocator, meta IMetaTable) *QuotaCenter { return &QuotaCenter{ proxies: proxies, queryCoord: queryCoord, @@ -119,6 +121,7 @@ func NewQuotaCenter(proxies *proxyClientManager, queryCoord types.QueryCoord, da currentRates: make(map[int64]map[internalpb.RateType]Limit), quotaStates: make(map[int64]map[milvuspb.QuotaState]commonpb.ErrorCode), tsoAllocator: tsoAllocator, + meta: meta, readableCollections: make([]int64, 0), writableCollections: make([]int64, 0), @@ -403,10 +406,14 @@ func (q *QuotaCenter) calculateReadRates() { zap.Int64("collectionID", collection), zap.Any("queryRate", q.currentRates[collection][internalpb.RateType_DQLQuery])) } + + collectionProps := q.getCollectionLimitConfig(collection) + q.guaranteeMinRate(getCollectionRateLimitConfig(collectionProps, common.CollectionSearchRateMinKey), internalpb.RateType_DQLSearch, collection) + q.guaranteeMinRate(getCollectionRateLimitConfig(collectionProps, common.CollectionQueryRateMinKey), internalpb.RateType_DQLQuery, collection) } - q.guaranteeMinRate(Params.QuotaConfig.DQLMinSearchRatePerCollection.GetAsFloat(), internalpb.RateType_DQLSearch, collections...) - q.guaranteeMinRate(Params.QuotaConfig.DQLMinQueryRatePerCollection.GetAsFloat(), internalpb.RateType_DQLQuery, collections...) + log.RatedInfo(10, "QueryNodeMetrics when cool-off", + zap.Any("metrics", q.queryNodeMetrics)) } // TODO: unify search and query? @@ -417,6 +424,7 @@ func (q *QuotaCenter) calculateReadRates() { // calculateWriteRates calculates and sets dml rates. func (q *QuotaCenter) calculateWriteRates() error { + log := log.Ctx(context.Background()).WithRateGroup("rootcoord.QuotaCenter", 1.0, 60.0) if Params.QuotaConfig.ForceDenyWriting.GetAsBool() { q.forceDenyWriting(commonpb.ErrorCode_ForceDeny) return nil @@ -463,8 +471,13 @@ func (q *QuotaCenter) calculateWriteRates() error { if q.currentRates[collection][internalpb.RateType_DMLDelete] != Inf { q.currentRates[collection][internalpb.RateType_DMLDelete] *= Limit(factor) } - q.guaranteeMinRate(Params.QuotaConfig.DMLMinInsertRatePerCollection.GetAsFloat(), internalpb.RateType_DMLInsert) - q.guaranteeMinRate(Params.QuotaConfig.DMLMinDeleteRatePerCollection.GetAsFloat(), internalpb.RateType_DMLDelete) + + collectionProps := q.getCollectionLimitConfig(collection) + q.guaranteeMinRate(getCollectionRateLimitConfig(collectionProps, common.CollectionInsertRateMinKey), internalpb.RateType_DMLInsert, collection) + q.guaranteeMinRate(getCollectionRateLimitConfig(collectionProps, common.CollectionDeleteRateMinKey), internalpb.RateType_DMLDelete, collection) + log.RatedDebug(10, "QuotaCenter cool write rates off done", + zap.Int64("collectionID", collection), + zap.Float64("factor", factor)) } return nil @@ -700,23 +713,43 @@ func (q *QuotaCenter) resetCurrentRate(rt internalpb.RateType, collection int64) if q.quotaStates[collection] == nil { q.quotaStates[collection] = make(map[milvuspb.QuotaState]commonpb.ErrorCode) } + + collectionProps := q.getCollectionLimitConfig(collection) switch rt { case internalpb.RateType_DMLInsert: - q.currentRates[collection][rt] = Limit(Params.QuotaConfig.DMLMaxInsertRatePerCollection.GetAsFloat()) + q.currentRates[collection][rt] = Limit(getCollectionRateLimitConfig(collectionProps, common.CollectionInsertRateMaxKey)) case internalpb.RateType_DMLDelete: - q.currentRates[collection][rt] = Limit(Params.QuotaConfig.DMLMaxDeleteRatePerCollection.GetAsFloat()) + q.currentRates[collection][rt] = Limit(getCollectionRateLimitConfig(collectionProps, common.CollectionDeleteRateMaxKey)) case internalpb.RateType_DMLBulkLoad: - q.currentRates[collection][rt] = Limit(Params.QuotaConfig.DMLMaxBulkLoadRatePerCollection.GetAsFloat()) + q.currentRates[collection][rt] = Limit(getCollectionRateLimitConfig(collectionProps, common.CollectionBulkLoadRateMaxKey)) case internalpb.RateType_DQLSearch: - q.currentRates[collection][rt] = Limit(Params.QuotaConfig.DQLMaxSearchRatePerCollection.GetAsFloat()) + q.currentRates[collection][rt] = Limit(getCollectionRateLimitConfig(collectionProps, common.CollectionSearchRateMaxKey)) case internalpb.RateType_DQLQuery: - q.currentRates[collection][rt] = Limit(Params.QuotaConfig.DQLMaxQueryRatePerCollection.GetAsFloat()) + q.currentRates[collection][rt] = Limit(getCollectionRateLimitConfig(collectionProps, common.CollectionQueryRateMaxKey)) } if q.currentRates[collection][rt] < 0 { q.currentRates[collection][rt] = Inf // no limit } } +func (q *QuotaCenter) getCollectionLimitConfig(collection int64) map[string]string { + log := log.Ctx(context.Background()).WithRateGroup("rootcoord.QuotaCenter", 1.0, 60.0) + collectionInfo, err := q.meta.GetCollectionByID(context.TODO(), collection, typeutil.MaxTimestamp, false) + if err != nil { + log.RatedWarn(10, "failed to get collection rate limit config", + zap.Int64("collectionID", collection), + zap.Error(err)) + return make(map[string]string) + } + + properties := make(map[string]string) + for _, pair := range collectionInfo.Properties { + properties[pair.GetKey()] = pair.GetValue() + } + + return properties +} + // checkDiskQuota checks if disk quota exceeded. func (q *QuotaCenter) checkDiskQuota() { q.diskMu.Lock() @@ -729,8 +762,9 @@ func (q *QuotaCenter) checkDiskQuota() { } collections := typeutil.NewUniqueSet() totalDiskQuota := Params.QuotaConfig.DiskQuota.GetAsFloat() - colDiskQuota := Params.QuotaConfig.DiskQuotaPerCollection.GetAsFloat() for collection, binlogSize := range q.dataCoordMetrics.CollectionBinlogSize { + collectionProps := q.getCollectionLimitConfig(collection) + colDiskQuota := getCollectionRateLimitConfig(collectionProps, common.CollectionDiskQuotaKey) if float64(binlogSize) >= colDiskQuota { log.RatedWarn(10, "collection disk quota exceeded", zap.Int64("collection", collection), diff --git a/internal/rootcoord/quota_center_test.go b/internal/rootcoord/quota_center_test.go index 86d39ac43c..83b3ad9af9 100644 --- a/internal/rootcoord/quota_center_test.go +++ b/internal/rootcoord/quota_center_test.go @@ -28,8 +28,12 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/proto/internalpb" + mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks" "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/ratelimitutil" @@ -69,7 +73,9 @@ func TestQuotaCenter(t *testing.T) { t.Run("test QuotaCenter", func(t *testing.T) { qc := types.NewMockQueryCoord(t) - quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator) + meta := mockrootcoord.NewIMetaTable(t) + meta.EXPECT().GetCollectionByID(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, merr.ErrCollectionNotFound).Maybe() + quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator, meta) go quotaCenter.run() time.Sleep(10 * time.Millisecond) quotaCenter.stop() @@ -77,35 +83,39 @@ func TestQuotaCenter(t *testing.T) { t.Run("test syncMetrics", func(t *testing.T) { qc := types.NewMockQueryCoord(t) + meta := mockrootcoord.NewIMetaTable(t) + meta.EXPECT().GetCollectionByID(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, merr.ErrCollectionNotFound).Maybe() qc.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{Status: succStatus()}, nil) - quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator) + quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator, meta) err = quotaCenter.syncMetrics() assert.Error(t, err) // for empty response - quotaCenter = NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator) + quotaCenter = NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator, meta) err = quotaCenter.syncMetrics() assert.Error(t, err) - quotaCenter = NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{retFailStatus: true}, core.tsoAllocator) + quotaCenter = NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{retFailStatus: true}, core.tsoAllocator, meta) err = quotaCenter.syncMetrics() assert.Error(t, err) qc.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(nil, fmt.Errorf("mock err")) - quotaCenter = NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{retErr: true}, core.tsoAllocator) + quotaCenter = NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{retErr: true}, core.tsoAllocator, meta) err = quotaCenter.syncMetrics() assert.Error(t, err) qc.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{ Status: failStatus(commonpb.ErrorCode_UnexpectedError, "mock failure status"), }, nil) - quotaCenter = NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator) + quotaCenter = NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator, meta) err = quotaCenter.syncMetrics() assert.Error(t, err) }) t.Run("test forceDeny", func(t *testing.T) { qc := types.NewMockQueryCoord(t) - quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator) + meta := mockrootcoord.NewIMetaTable(t) + meta.EXPECT().GetCollectionByID(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, merr.ErrCollectionNotFound).Maybe() + quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator, meta) quotaCenter.readableCollections = []int64{1, 2, 3} quotaCenter.resetAllCurrentRates() quotaCenter.forceDenyReading(commonpb.ErrorCode_ForceDeny, 1, 2, 3, 4) @@ -131,7 +141,9 @@ func TestQuotaCenter(t *testing.T) { t.Run("test calculateRates", func(t *testing.T) { qc := types.NewMockQueryCoord(t) - quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator) + meta := mockrootcoord.NewIMetaTable(t) + meta.EXPECT().GetCollectionByID(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, merr.ErrCollectionNotFound).Maybe() + quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator, meta) err = quotaCenter.calculateRates() assert.NoError(t, err) alloc := newMockTsoAllocator() @@ -145,7 +157,9 @@ func TestQuotaCenter(t *testing.T) { t.Run("test getTimeTickDelayFactor factors", func(t *testing.T) { qc := types.NewMockQueryCoord(t) - quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator) + meta := mockrootcoord.NewIMetaTable(t) + meta.EXPECT().GetCollectionByID(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, merr.ErrCollectionNotFound).Maybe() + quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator, meta) type ttCase struct { maxTtDelay time.Duration curTt time.Time @@ -191,7 +205,9 @@ func TestQuotaCenter(t *testing.T) { t.Run("test TimeTickDelayFactor factors", func(t *testing.T) { qc := types.NewMockQueryCoord(t) - quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator) + meta := mockrootcoord.NewIMetaTable(t) + meta.EXPECT().GetCollectionByID(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, merr.ErrCollectionNotFound).Maybe() + quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator, meta) type ttCase struct { delay time.Duration expectedFactor float64 @@ -260,7 +276,9 @@ func TestQuotaCenter(t *testing.T) { t.Run("test calculateReadRates", func(t *testing.T) { qc := types.NewMockQueryCoord(t) - quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator) + meta := mockrootcoord.NewIMetaTable(t) + meta.EXPECT().GetCollectionByID(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, merr.ErrCollectionNotFound).Maybe() + quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator, meta) quotaCenter.readableCollections = []int64{1, 2, 3} quotaCenter.proxyMetrics = map[UniqueID]*metricsinfo.ProxyQuotaMetrics{ 1: {Rms: []metricsinfo.RateMetric{ @@ -317,7 +335,9 @@ func TestQuotaCenter(t *testing.T) { t.Run("test calculateWriteRates", func(t *testing.T) { qc := types.NewMockQueryCoord(t) - quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator) + meta := mockrootcoord.NewIMetaTable(t) + meta.EXPECT().GetCollectionByID(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, merr.ErrCollectionNotFound).Maybe() + quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator, meta) err = quotaCenter.calculateWriteRates() assert.NoError(t, err) @@ -354,7 +374,9 @@ func TestQuotaCenter(t *testing.T) { t.Run("test MemoryFactor factors", func(t *testing.T) { qc := types.NewMockQueryCoord(t) - quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator) + meta := mockrootcoord.NewIMetaTable(t) + meta.EXPECT().GetCollectionByID(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, merr.ErrCollectionNotFound).Maybe() + quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator, meta) type memCase struct { lowWater float64 highWater float64 @@ -407,7 +429,9 @@ func TestQuotaCenter(t *testing.T) { t.Run("test GrowingSegmentsSize factors", func(t *testing.T) { qc := types.NewMockQueryCoord(t) - quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator) + meta := mockrootcoord.NewIMetaTable(t) + meta.EXPECT().GetCollectionByID(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, merr.ErrCollectionNotFound).Maybe() + quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator, meta) tests := []struct { low float64 high float64 @@ -459,7 +483,9 @@ func TestQuotaCenter(t *testing.T) { t.Run("test checkDiskQuota", func(t *testing.T) { qc := types.NewMockQueryCoord(t) - quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator) + meta := mockrootcoord.NewIMetaTable(t) + meta.EXPECT().GetCollectionByID(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, merr.ErrCollectionNotFound).Maybe() + quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator, meta) quotaCenter.checkDiskQuota() // total DiskQuota exceeded @@ -501,7 +527,9 @@ func TestQuotaCenter(t *testing.T) { pcm := &proxyClientManager{proxyClient: map[int64]types.Proxy{ TestProxyID: p1, }} - quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator) + meta := mockrootcoord.NewIMetaTable(t) + meta.EXPECT().GetCollectionByID(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, merr.ErrCollectionNotFound).Maybe() + quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator, meta) quotaCenter.resetAllCurrentRates() collectionID := int64(1) quotaCenter.currentRates[collectionID] = make(map[internalpb.RateType]ratelimitutil.Limit) @@ -515,7 +543,9 @@ func TestQuotaCenter(t *testing.T) { t.Run("test recordMetrics", func(t *testing.T) { qc := types.NewMockQueryCoord(t) - quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator) + meta := mockrootcoord.NewIMetaTable(t) + meta.EXPECT().GetCollectionByID(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, merr.ErrCollectionNotFound).Maybe() + quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator, meta) collectionID := int64(1) quotaCenter.quotaStates[collectionID] = make(map[milvuspb.QuotaState]commonpb.ErrorCode) quotaCenter.quotaStates[collectionID][milvuspb.QuotaState_DenyToWrite] = commonpb.ErrorCode_MemoryQuotaExhausted @@ -525,7 +555,9 @@ func TestQuotaCenter(t *testing.T) { t.Run("test guaranteeMinRate", func(t *testing.T) { qc := types.NewMockQueryCoord(t) - quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator) + meta := mockrootcoord.NewIMetaTable(t) + meta.EXPECT().GetCollectionByID(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, merr.ErrCollectionNotFound).Maybe() + quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator, meta) quotaCenter.resetAllCurrentRates() minRate := Limit(100) collectionID := int64(1) @@ -552,7 +584,9 @@ func TestQuotaCenter(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { collection := UniqueID(0) - quotaCenter := NewQuotaCenter(pcm, nil, &dataCoordMockForQuota{}, core.tsoAllocator) + meta := mockrootcoord.NewIMetaTable(t) + meta.EXPECT().GetCollectionByID(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, merr.ErrCollectionNotFound).Maybe() + quotaCenter := NewQuotaCenter(pcm, nil, &dataCoordMockForQuota{}, core.tsoAllocator, meta) quotaCenter.resetAllCurrentRates() quotaBackup := Params.QuotaConfig.DiskQuota colQuotaBackup := Params.QuotaConfig.DiskQuotaPerCollection @@ -570,4 +604,56 @@ func TestQuotaCenter(t *testing.T) { }) } }) + + t.Run("test reset current rates", func(t *testing.T) { + meta := mockrootcoord.NewIMetaTable(t) + meta.EXPECT().GetCollectionByID(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, merr.ErrCollectionNotFound).Maybe() + quotaCenter := NewQuotaCenter(pcm, nil, &dataCoordMockForQuota{}, core.tsoAllocator, meta) + quotaCenter.readableCollections = []int64{1} + quotaCenter.writableCollections = []int64{1} + quotaCenter.resetAllCurrentRates() + + assert.Equal(t, float64(quotaCenter.currentRates[1][internalpb.RateType_DMLInsert]), Params.QuotaConfig.DMLMaxInsertRatePerCollection.GetAsFloat()) + assert.Equal(t, float64(quotaCenter.currentRates[1][internalpb.RateType_DMLDelete]), Params.QuotaConfig.DMLMaxDeleteRatePerCollection.GetAsFloat()) + assert.Equal(t, float64(quotaCenter.currentRates[1][internalpb.RateType_DMLBulkLoad]), Params.QuotaConfig.DMLMaxBulkLoadRatePerCollection.GetAsFloat()) + assert.Equal(t, float64(quotaCenter.currentRates[1][internalpb.RateType_DQLSearch]), Params.QuotaConfig.DQLMaxSearchRatePerCollection.GetAsFloat()) + assert.Equal(t, float64(quotaCenter.currentRates[1][internalpb.RateType_DQLQuery]), Params.QuotaConfig.DQLMaxQueryRatePerCollection.GetAsFloat()) + + meta.ExpectedCalls = nil + meta.EXPECT().GetCollectionByID(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&model.Collection{ + Properties: []*commonpb.KeyValuePair{ + { + Key: common.CollectionInsertRateMaxKey, + Value: "1", + }, + + { + Key: common.CollectionDeleteRateMaxKey, + Value: "2", + }, + + { + Key: common.CollectionBulkLoadRateMaxKey, + Value: "3", + }, + + { + Key: common.CollectionQueryRateMaxKey, + Value: "4", + }, + + { + Key: common.CollectionSearchRateMaxKey, + Value: "5", + }, + }, + }, nil) + quotaCenter.resetAllCurrentRates() + assert.Equal(t, float64(quotaCenter.currentRates[1][internalpb.RateType_DMLInsert]), float64(1*1024*1024)) + assert.Equal(t, float64(quotaCenter.currentRates[1][internalpb.RateType_DMLDelete]), float64(2*1024*1024)) + assert.Equal(t, float64(quotaCenter.currentRates[1][internalpb.RateType_DMLBulkLoad]), float64(3*1024*1024)) + assert.Equal(t, float64(quotaCenter.currentRates[1][internalpb.RateType_DQLQuery]), float64(4)) + assert.Equal(t, float64(quotaCenter.currentRates[1][internalpb.RateType_DQLSearch]), float64(5)) + + }) } diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 6ac729b55f..268f8b3665 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -445,7 +445,7 @@ func (c *Core) initInternal() error { c.metricsCacheManager = metricsinfo.NewMetricsCacheManager() - c.quotaCenter = NewQuotaCenter(c.proxyClientManager, c.queryCoord, c.dataCoord, c.tsoAllocator) + c.quotaCenter = NewQuotaCenter(c.proxyClientManager, c.queryCoord, c.dataCoord, c.tsoAllocator, c.meta) log.Debug("RootCoord init QuotaCenter done") if err := c.initImportManager(); err != nil { diff --git a/internal/rootcoord/util.go b/internal/rootcoord/util.go index cbc19c3612..08314a1bb7 100644 --- a/internal/rootcoord/util.go +++ b/internal/rootcoord/util.go @@ -19,6 +19,7 @@ package rootcoord import ( "encoding/json" "fmt" + "strconv" "go.uber.org/zap" @@ -135,3 +136,87 @@ func getTravelTs(req TimeTravelRequest) Timestamp { func isMaxTs(ts Timestamp) bool { return ts == typeutil.MaxTimestamp } + +func getCollectionRateLimitConfigDefaultValue(configKey string) float64 { + switch configKey { + case common.CollectionInsertRateMaxKey: + return Params.QuotaConfig.DMLMaxInsertRatePerCollection.GetAsFloat() + case common.CollectionInsertRateMinKey: + return Params.QuotaConfig.DMLMinInsertRatePerCollection.GetAsFloat() + case common.CollectionDeleteRateMaxKey: + return Params.QuotaConfig.DMLMaxDeleteRatePerCollection.GetAsFloat() + case common.CollectionDeleteRateMinKey: + return Params.QuotaConfig.DMLMinDeleteRatePerCollection.GetAsFloat() + case common.CollectionBulkLoadRateMaxKey: + return Params.QuotaConfig.DMLMaxBulkLoadRatePerCollection.GetAsFloat() + case common.CollectionBulkLoadRateMinKey: + return Params.QuotaConfig.DMLMinBulkLoadRatePerCollection.GetAsFloat() + case common.CollectionQueryRateMaxKey: + return Params.QuotaConfig.DQLMaxQueryRatePerCollection.GetAsFloat() + case common.CollectionQueryRateMinKey: + return Params.QuotaConfig.DQLMinQueryRatePerCollection.GetAsFloat() + case common.CollectionSearchRateMaxKey: + return Params.QuotaConfig.DQLMaxSearchRatePerCollection.GetAsFloat() + case common.CollectionSearchRateMinKey: + return Params.QuotaConfig.DQLMinSearchRatePerCollection.GetAsFloat() + case common.CollectionDiskQuotaKey: + return Params.QuotaConfig.DiskQuotaPerCollection.GetAsFloat() + + default: + return float64(0) + } +} + +func getCollectionRateLimitConfig(properties map[string]string, configKey string) float64 { + megaBytes2Bytes := func(v float64) float64 { + return v * 1024.0 * 1024.0 + } + toBytesIfNecessary := func(rate float64) float64 { + switch configKey { + case common.CollectionInsertRateMaxKey: + return megaBytes2Bytes(rate) + case common.CollectionInsertRateMinKey: + return megaBytes2Bytes(rate) + case common.CollectionDeleteRateMaxKey: + return megaBytes2Bytes(rate) + case common.CollectionDeleteRateMinKey: + return megaBytes2Bytes(rate) + case common.CollectionBulkLoadRateMaxKey: + return megaBytes2Bytes(rate) + case common.CollectionBulkLoadRateMinKey: + return megaBytes2Bytes(rate) + case common.CollectionQueryRateMaxKey: + return rate + case common.CollectionQueryRateMinKey: + return rate + case common.CollectionSearchRateMaxKey: + return rate + case common.CollectionSearchRateMinKey: + return rate + case common.CollectionDiskQuotaKey: + return megaBytes2Bytes(rate) + + default: + return float64(0) + } + } + + v, ok := properties[configKey] + if ok { + rate, err := strconv.ParseFloat(v, 64) + if err != nil { + log.Warn("invalid configuration for collection dml rate", + zap.String("config item", configKey), + zap.String("config value", v)) + return getCollectionRateLimitConfigDefaultValue(configKey) + } + + rateInBytes := toBytesIfNecessary(rate) + if rateInBytes < 0 { + return getCollectionRateLimitConfigDefaultValue(configKey) + } + return rateInBytes + } + + return getCollectionRateLimitConfigDefaultValue(configKey) +} diff --git a/internal/rootcoord/util_test.go b/internal/rootcoord/util_test.go index 10f7434fbb..7c9ea2c472 100644 --- a/internal/rootcoord/util_test.go +++ b/internal/rootcoord/util_test.go @@ -19,13 +19,13 @@ package rootcoord import ( "testing" - "github.com/stretchr/testify/assert" - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus/internal/metastore/model" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/util/typeutil" + "github.com/stretchr/testify/assert" ) func Test_EqualKeyPairArray(t *testing.T) { @@ -148,3 +148,161 @@ func Test_isMaxTs(t *testing.T) { }) } } + +func Test_getCollectionRateLimitConfig(t *testing.T) { + type args struct { + properties map[string]string + configKey string + } + + configMap := map[string]string{ + common.CollectionInsertRateMaxKey: "5", + common.CollectionInsertRateMinKey: "5", + common.CollectionDeleteRateMaxKey: "5", + common.CollectionDeleteRateMinKey: "5", + common.CollectionBulkLoadRateMaxKey: "5", + common.CollectionBulkLoadRateMinKey: "5", + common.CollectionQueryRateMaxKey: "5", + common.CollectionQueryRateMinKey: "5", + common.CollectionSearchRateMaxKey: "5", + common.CollectionSearchRateMinKey: "5", + common.CollectionDiskQuotaKey: "5", + } + + tests := []struct { + name string + args args + want float64 + }{ + { + name: "test CollectionInsertRateMaxKey", + args: args{ + properties: configMap, + configKey: common.CollectionInsertRateMaxKey, + }, + want: float64(5 * 1024 * 1024), + }, + { + name: "test CollectionInsertRateMinKey", + args: args{ + properties: configMap, + configKey: common.CollectionInsertRateMinKey, + }, + want: float64(5 * 1024 * 1024), + }, + { + name: "test CollectionDeleteRateMaxKey", + args: args{ + properties: configMap, + configKey: common.CollectionDeleteRateMaxKey, + }, + want: float64(5 * 1024 * 1024), + }, + + { + name: "test CollectionDeleteRateMinKey", + args: args{ + properties: configMap, + configKey: common.CollectionDeleteRateMinKey, + }, + want: float64(5 * 1024 * 1024), + }, + { + name: "test CollectionBulkLoadRateMaxKey", + args: args{ + properties: configMap, + configKey: common.CollectionBulkLoadRateMaxKey, + }, + want: float64(5 * 1024 * 1024), + }, + + { + name: "test CollectionBulkLoadRateMinKey", + args: args{ + properties: configMap, + configKey: common.CollectionBulkLoadRateMinKey, + }, + want: float64(5 * 1024 * 1024), + }, + + { + name: "test CollectionQueryRateMaxKey", + args: args{ + properties: configMap, + configKey: common.CollectionQueryRateMaxKey, + }, + want: float64(5), + }, + + { + name: "test CollectionQueryRateMinKey", + args: args{ + properties: configMap, + configKey: common.CollectionQueryRateMinKey, + }, + want: float64(5), + }, + + { + name: "test CollectionSearchRateMaxKey", + args: args{ + properties: configMap, + configKey: common.CollectionSearchRateMaxKey, + }, + want: float64(5), + }, + + { + name: "test CollectionSearchRateMinKey", + args: args{ + properties: configMap, + configKey: common.CollectionSearchRateMinKey, + }, + want: float64(5), + }, + + { + name: "test CollectionDiskQuotaKey", + args: args{ + properties: configMap, + configKey: common.CollectionDiskQuotaKey, + }, + want: float64(5 * 1024 * 1024), + }, + + { + name: "test invalid config value", + args: args{ + properties: map[string]string{common.CollectionDiskQuotaKey: "invalid value"}, + configKey: common.CollectionDiskQuotaKey, + }, + want: Params.QuotaConfig.DiskQuotaPerCollection.GetAsFloat(), + }, + { + name: "test empty config item", + args: args{ + properties: map[string]string{}, + configKey: common.CollectionDiskQuotaKey, + }, + want: Params.QuotaConfig.DiskQuotaPerCollection.GetAsFloat(), + }, + + { + name: "test unknown config item", + args: args{ + properties: configMap, + configKey: "", + }, + want: 0, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := getCollectionRateLimitConfig(tt.args.properties, tt.args.configKey) + + if got != tt.want { + t.Errorf("getCollectionRateLimitConfig() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/common/common.go b/pkg/common/common.go index bf35940506..d34bbf4d81 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -100,6 +100,19 @@ const ( const ( CollectionTTLConfigKey = "collection.ttl.seconds" CollectionAutoCompactionKey = "collection.autocompaction.enabled" + + // rate limit + CollectionInsertRateMaxKey = "collection.insertRate.max.mb" + CollectionInsertRateMinKey = "collection.insertRate.min.mb" + CollectionDeleteRateMaxKey = "collection.deleteRate.max.mb" + CollectionDeleteRateMinKey = "collection.deleteRate.min.mb" + CollectionBulkLoadRateMaxKey = "collection.bulkLoadRate.max.mb" + CollectionBulkLoadRateMinKey = "collection.bulkLoadRate.min.mb" + CollectionQueryRateMaxKey = "collection.queryRate.max.qps" + CollectionQueryRateMinKey = "collection.queryRate.min.qps" + CollectionSearchRateMaxKey = "collection.searchRate.max.vps" + CollectionSearchRateMinKey = "collection.searchRate.min.vps" + CollectionDiskQuotaKey = "collection.diskProtection.diskQuota.mb" ) const (