mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-08 01:58:34 +08:00
Update quota params and disable limit for showCollection (#19641)
Signed-off-by: bigsheeper <yihao.dai@zilliz.com> Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
parent
a85e44d169
commit
f98dbcf5be
@ -377,43 +377,43 @@ quotaAndLimits:
|
|||||||
enabled: false # `true` to enable quota and limits, `false` to disable.
|
enabled: false # `true` to enable quota and limits, `false` to disable.
|
||||||
|
|
||||||
# quotaCenterCollectInterval is the time interval that quotaCenter
|
# quotaCenterCollectInterval is the time interval that quotaCenter
|
||||||
# collects metrics from Query cluster and Data cluster.
|
# collects metrics from Proxies, Query cluster and Data cluster.
|
||||||
quotaCenterCollectInterval: 3 # seconds, (0 ~ 65536)
|
quotaCenterCollectInterval: 3 # seconds, (0 ~ 65536)
|
||||||
|
|
||||||
ddl: # ddl limit rates, default no limit.
|
ddl: # ddl limit rates, default no limit.
|
||||||
enabled: false
|
enabled: false
|
||||||
collectionRate: # requests per minute, default no limit, rate for CreateCollection, DropCollection, HasCollection, DescribeCollection, LoadCollection, ReleaseCollection
|
collectionRate: # qps, default no limit, rate for CreateCollection, DropCollection, LoadCollection, ReleaseCollection
|
||||||
partitionRate: # requests per minute, default no limit, rate for CreatePartition, DropPartition, HasPartition, LoadPartition, ReleasePartition
|
partitionRate: # qps, default no limit, rate for CreatePartition, DropPartition, LoadPartition, ReleasePartition
|
||||||
indexRate: # requests per minute, default no limit, rate for CreateIndex, DropIndex, DescribeIndex
|
|
||||||
flushRate: # requests per minute, default no limit, rate for flush
|
indexRate:
|
||||||
compactionRate: # requests per minute, default no limit, rate for manualCompaction
|
enabled: false
|
||||||
|
max: # qps, default no limit, rate for CreateIndex, DropIndex
|
||||||
|
flushRate:
|
||||||
|
enabled: false
|
||||||
|
max: # qps, default no limit, rate for flush
|
||||||
|
compactionRate:
|
||||||
|
enabled: false
|
||||||
|
max: # qps, default no limit, rate for manualCompaction
|
||||||
|
|
||||||
# dml limit rates, default no limit.
|
# dml limit rates, default no limit.
|
||||||
# The maximum rate will not be greater than `max`,
|
# The maximum rate will not be greater than `max`.
|
||||||
# and the rate after handling back pressure will not be less than `min`.
|
|
||||||
dml:
|
dml:
|
||||||
enabled: false
|
enabled: false
|
||||||
insertRate:
|
insertRate:
|
||||||
max: # MB/s, default no limit
|
max: # MB/s, default no limit
|
||||||
min: # MB/s, default 0
|
|
||||||
deleteRate:
|
deleteRate:
|
||||||
max: # MB/s, default no limit
|
max: # MB/s, default no limit
|
||||||
min: # MB/s, default 0
|
|
||||||
bulkLoadRate: # not support yet. TODO: limit bulkLoad rate
|
bulkLoadRate: # not support yet. TODO: limit bulkLoad rate
|
||||||
max: # MB/s, default no limit
|
max: # MB/s, default no limit
|
||||||
min: # MB/s, default 0
|
|
||||||
|
|
||||||
# dql limit rates, default no limit.
|
# dql limit rates, default no limit.
|
||||||
# The maximum rate will not be greater than `max`,
|
# The maximum rate will not be greater than `max`.
|
||||||
# and the rate after handling back pressure will not be less than `min`.
|
|
||||||
dql:
|
dql:
|
||||||
enabled: false
|
enabled: false
|
||||||
searchRate:
|
searchRate:
|
||||||
max: # vps (vectors per second), default no limit
|
max: # vps (vectors per second), default no limit
|
||||||
min: # vps (vectors per second), default 0
|
|
||||||
queryRate:
|
queryRate:
|
||||||
max: # qps, default no limit
|
max: # qps, default no limit
|
||||||
min: # qps, default 0
|
|
||||||
|
|
||||||
# limitWriting decides whether dml requests are allowed.
|
# limitWriting decides whether dml requests are allowed.
|
||||||
limitWriting:
|
limitWriting:
|
||||||
@ -432,10 +432,10 @@ quotaAndLimits:
|
|||||||
# When memoryLowWaterLevel < memory usage < memoryHighWaterLevel, reduce the dml rate;
|
# When memoryLowWaterLevel < memory usage < memoryHighWaterLevel, reduce the dml rate;
|
||||||
# When memory usage < memoryLowWaterLevel, no action.
|
# When memory usage < memoryLowWaterLevel, no action.
|
||||||
# memoryLowWaterLevel should be less than memoryHighWaterLevel.
|
# memoryLowWaterLevel should be less than memoryHighWaterLevel.
|
||||||
dataNodeMemoryLowWaterLevel: 0.8 # (0, 1], memoryLowWaterLevel in DataNodes
|
dataNodeMemoryLowWaterLevel: 0.85 # (0, 1], memoryLowWaterLevel in DataNodes
|
||||||
dataNodeMemoryHighWaterLevel: 0.9 # (0, 1], memoryHighWaterLevel in DataNodes
|
dataNodeMemoryHighWaterLevel: 0.95 # (0, 1], memoryHighWaterLevel in DataNodes
|
||||||
queryNodeMemoryLowWaterLevel: 0.8 # (0, 1], memoryLowWaterLevel in QueryNodes
|
queryNodeMemoryLowWaterLevel: 0.85 # (0, 1], memoryLowWaterLevel in QueryNodes
|
||||||
queryNodeMemoryHighWaterLevel: 0.9 # (0, 1], memoryHighWaterLevel in QueryNodes
|
queryNodeMemoryHighWaterLevel: 0.95 # (0, 1], memoryHighWaterLevel in QueryNodes
|
||||||
|
|
||||||
# limitReading decides whether dql requests are allowed.
|
# limitReading decides whether dql requests are allowed.
|
||||||
limitReading:
|
limitReading:
|
||||||
|
|||||||
@ -106,11 +106,11 @@ func (rl *rateLimiter) registerLimiters() {
|
|||||||
case internalpb.RateType_DDLPartition:
|
case internalpb.RateType_DDLPartition:
|
||||||
r = Params.QuotaConfig.DDLPartitionRate
|
r = Params.QuotaConfig.DDLPartitionRate
|
||||||
case internalpb.RateType_DDLIndex:
|
case internalpb.RateType_DDLIndex:
|
||||||
r = Params.QuotaConfig.DDLIndexRate
|
r = Params.QuotaConfig.MaxIndexRate
|
||||||
case internalpb.RateType_DDLFlush:
|
case internalpb.RateType_DDLFlush:
|
||||||
r = Params.QuotaConfig.DDLFlushRate
|
r = Params.QuotaConfig.MaxFlushRate
|
||||||
case internalpb.RateType_DDLCompaction:
|
case internalpb.RateType_DDLCompaction:
|
||||||
r = Params.QuotaConfig.DDLCompactionRate
|
r = Params.QuotaConfig.MaxCompactionRate
|
||||||
case internalpb.RateType_DMLInsert:
|
case internalpb.RateType_DMLInsert:
|
||||||
r = Params.QuotaConfig.DMLMaxInsertRate
|
r = Params.QuotaConfig.DMLMaxInsertRate
|
||||||
case internalpb.RateType_DMLDelete:
|
case internalpb.RateType_DMLDelete:
|
||||||
|
|||||||
@ -65,15 +65,15 @@ func getRequestInfo(req interface{}) (internalpb.RateType, int, error) {
|
|||||||
return internalpb.RateType_DQLSearch, int(r.GetNq()), nil
|
return internalpb.RateType_DQLSearch, int(r.GetNq()), nil
|
||||||
case *milvuspb.QueryRequest:
|
case *milvuspb.QueryRequest:
|
||||||
return internalpb.RateType_DQLQuery, 1, nil // think of the query request's nq as 1
|
return internalpb.RateType_DQLQuery, 1, nil // think of the query request's nq as 1
|
||||||
case *milvuspb.CreateCollectionRequest, *milvuspb.DropCollectionRequest, *milvuspb.HasCollectionRequest:
|
case *milvuspb.CreateCollectionRequest, *milvuspb.DropCollectionRequest:
|
||||||
return internalpb.RateType_DDLCollection, 1, nil
|
return internalpb.RateType_DDLCollection, 1, nil
|
||||||
case *milvuspb.LoadCollectionRequest, *milvuspb.ReleaseCollectionRequest, *milvuspb.ShowCollectionsRequest:
|
case *milvuspb.LoadCollectionRequest, *milvuspb.ReleaseCollectionRequest:
|
||||||
return internalpb.RateType_DDLCollection, 1, nil
|
return internalpb.RateType_DDLCollection, 1, nil
|
||||||
case *milvuspb.CreatePartitionRequest, *milvuspb.DropPartitionRequest, *milvuspb.HasPartitionRequest:
|
case *milvuspb.CreatePartitionRequest, *milvuspb.DropPartitionRequest:
|
||||||
return internalpb.RateType_DDLPartition, 1, nil
|
return internalpb.RateType_DDLPartition, 1, nil
|
||||||
case *milvuspb.LoadPartitionsRequest, *milvuspb.ReleasePartitionsRequest, *milvuspb.ShowPartitionsRequest:
|
case *milvuspb.LoadPartitionsRequest, *milvuspb.ReleasePartitionsRequest:
|
||||||
return internalpb.RateType_DDLPartition, 1, nil
|
return internalpb.RateType_DDLPartition, 1, nil
|
||||||
case *milvuspb.CreateIndexRequest, *milvuspb.DropIndexRequest, *milvuspb.DescribeIndexRequest:
|
case *milvuspb.CreateIndexRequest, *milvuspb.DropIndexRequest:
|
||||||
return internalpb.RateType_DDLIndex, 1, nil
|
return internalpb.RateType_DDLIndex, 1, nil
|
||||||
case *milvuspb.FlushRequest:
|
case *milvuspb.FlushRequest:
|
||||||
return internalpb.RateType_DDLFlush, 1, nil
|
return internalpb.RateType_DDLFlush, 1, nil
|
||||||
@ -130,20 +130,6 @@ func getFailedResponse(req interface{}, code commonpb.ErrorCode, reason string)
|
|||||||
*milvuspb.LoadPartitionsRequest, *milvuspb.ReleasePartitionsRequest,
|
*milvuspb.LoadPartitionsRequest, *milvuspb.ReleasePartitionsRequest,
|
||||||
*milvuspb.CreateIndexRequest, *milvuspb.DropIndexRequest:
|
*milvuspb.CreateIndexRequest, *milvuspb.DropIndexRequest:
|
||||||
return failedStatus(code, reason), nil
|
return failedStatus(code, reason), nil
|
||||||
case *milvuspb.HasCollectionRequest, *milvuspb.HasPartitionRequest:
|
|
||||||
return failedBoolResponse(code, reason), nil
|
|
||||||
case *milvuspb.ShowCollectionsRequest:
|
|
||||||
return &milvuspb.ShowCollectionsResponse{
|
|
||||||
Status: failedStatus(code, reason),
|
|
||||||
}, nil
|
|
||||||
case *milvuspb.ShowPartitionsRequest:
|
|
||||||
return &milvuspb.ShowPartitionsResponse{
|
|
||||||
Status: failedStatus(code, reason),
|
|
||||||
}, nil
|
|
||||||
case *milvuspb.DescribeIndexRequest:
|
|
||||||
return &milvuspb.DescribeIndexResponse{
|
|
||||||
Status: failedStatus(code, reason),
|
|
||||||
}, nil
|
|
||||||
case *milvuspb.FlushRequest:
|
case *milvuspb.FlushRequest:
|
||||||
return &milvuspb.FlushResponse{
|
return &milvuspb.FlushResponse{
|
||||||
Status: failedStatus(code, reason),
|
Status: failedStatus(code, reason),
|
||||||
|
|||||||
@ -96,10 +96,6 @@ func TestRateLimitInterceptor(t *testing.T) {
|
|||||||
testGetFailedResponse(&milvuspb.SearchRequest{})
|
testGetFailedResponse(&milvuspb.SearchRequest{})
|
||||||
testGetFailedResponse(&milvuspb.QueryRequest{})
|
testGetFailedResponse(&milvuspb.QueryRequest{})
|
||||||
testGetFailedResponse(&milvuspb.CreateCollectionRequest{})
|
testGetFailedResponse(&milvuspb.CreateCollectionRequest{})
|
||||||
testGetFailedResponse(&milvuspb.HasCollectionRequest{})
|
|
||||||
testGetFailedResponse(&milvuspb.ShowCollectionsRequest{})
|
|
||||||
testGetFailedResponse(&milvuspb.ShowPartitionsRequest{})
|
|
||||||
testGetFailedResponse(&milvuspb.DescribeIndexRequest{})
|
|
||||||
testGetFailedResponse(&milvuspb.FlushRequest{})
|
testGetFailedResponse(&milvuspb.FlushRequest{})
|
||||||
testGetFailedResponse(&milvuspb.ManualCompactionRequest{})
|
testGetFailedResponse(&milvuspb.ManualCompactionRequest{})
|
||||||
|
|
||||||
|
|||||||
@ -32,12 +32,9 @@ const (
|
|||||||
// defaultMax is the default minimal rate.
|
// defaultMax is the default minimal rate.
|
||||||
defaultMin = float64(0)
|
defaultMin = float64(0)
|
||||||
// defaultLowWaterLevel is the default memory low water level.
|
// defaultLowWaterLevel is the default memory low water level.
|
||||||
defaultLowWaterLevel = float64(0.8)
|
defaultLowWaterLevel = float64(0.85)
|
||||||
// defaultHighWaterLevel is the default memory low water level.
|
// defaultHighWaterLevel is the default memory low water level.
|
||||||
defaultHighWaterLevel = float64(0.9)
|
defaultHighWaterLevel = float64(0.95)
|
||||||
|
|
||||||
// secondsPerMinute is used to convert minutes to seconds.
|
|
||||||
secondsPerMinute = 60.0
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// quotaConfig is configuration for quota and limitations.
|
// quotaConfig is configuration for quota and limitations.
|
||||||
@ -52,9 +49,13 @@ type quotaConfig struct {
|
|||||||
DDLLimitEnabled bool
|
DDLLimitEnabled bool
|
||||||
DDLCollectionRate float64
|
DDLCollectionRate float64
|
||||||
DDLPartitionRate float64
|
DDLPartitionRate float64
|
||||||
DDLIndexRate float64
|
|
||||||
DDLFlushRate float64
|
IndexLimitEnabled bool
|
||||||
DDLCompactionRate float64
|
MaxIndexRate float64
|
||||||
|
FlushLimitEnabled bool
|
||||||
|
MaxFlushRate float64
|
||||||
|
CompactionLimitEnabled bool
|
||||||
|
MaxCompactionRate float64
|
||||||
|
|
||||||
// dml
|
// dml
|
||||||
DMLLimitEnabled bool
|
DMLLimitEnabled bool
|
||||||
@ -103,9 +104,13 @@ func (p *quotaConfig) init(base *BaseTable) {
|
|||||||
p.initDDLLimitEnabled()
|
p.initDDLLimitEnabled()
|
||||||
p.initDDLCollectionRate()
|
p.initDDLCollectionRate()
|
||||||
p.initDDLPartitionRate()
|
p.initDDLPartitionRate()
|
||||||
p.initDDLIndexRate()
|
|
||||||
p.initDDLFlushRate()
|
p.initIndexLimitEnabled()
|
||||||
p.initDDLCompactionRate()
|
p.initMaxIndexRate()
|
||||||
|
p.initFlushLimitEnabled()
|
||||||
|
p.initMaxFlushRate()
|
||||||
|
p.initCompactionLimitEnabled()
|
||||||
|
p.initMaxCompactionRate()
|
||||||
|
|
||||||
// dml
|
// dml
|
||||||
p.initDMLLimitEnabled()
|
p.initDMLLimitEnabled()
|
||||||
@ -167,7 +172,6 @@ func (p *quotaConfig) initDDLCollectionRate() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
p.DDLCollectionRate = p.Base.ParseFloatWithDefault("quotaAndLimits.ddl.collectionRate", defaultMax)
|
p.DDLCollectionRate = p.Base.ParseFloatWithDefault("quotaAndLimits.ddl.collectionRate", defaultMax)
|
||||||
p.DDLCollectionRate /= secondsPerMinute
|
|
||||||
// [0 ~ Inf)
|
// [0 ~ Inf)
|
||||||
if p.DDLCollectionRate < 0 {
|
if p.DDLCollectionRate < 0 {
|
||||||
p.DDLCollectionRate = defaultMax
|
p.DDLCollectionRate = defaultMax
|
||||||
@ -180,49 +184,57 @@ func (p *quotaConfig) initDDLPartitionRate() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
p.DDLPartitionRate = p.Base.ParseFloatWithDefault("quotaAndLimits.ddl.partitionRate", defaultMax)
|
p.DDLPartitionRate = p.Base.ParseFloatWithDefault("quotaAndLimits.ddl.partitionRate", defaultMax)
|
||||||
p.DDLPartitionRate /= secondsPerMinute
|
|
||||||
// [0 ~ Inf)
|
// [0 ~ Inf)
|
||||||
if p.DDLPartitionRate < 0 {
|
if p.DDLPartitionRate < 0 {
|
||||||
p.DDLPartitionRate = defaultMax
|
p.DDLPartitionRate = defaultMax
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *quotaConfig) initDDLIndexRate() {
|
func (p *quotaConfig) initIndexLimitEnabled() {
|
||||||
if !p.DDLLimitEnabled {
|
p.IndexLimitEnabled = p.Base.ParseBool("quotaAndLimits.indexRate.enabled", false)
|
||||||
p.DDLIndexRate = defaultMax
|
}
|
||||||
|
|
||||||
|
func (p *quotaConfig) initMaxIndexRate() {
|
||||||
|
if !p.IndexLimitEnabled {
|
||||||
|
p.MaxIndexRate = defaultMax
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
p.DDLIndexRate = p.Base.ParseFloatWithDefault("quotaAndLimits.ddl.indexRate", defaultMax)
|
p.MaxIndexRate = p.Base.ParseFloatWithDefault("quotaAndLimits.indexRate.max", defaultMax)
|
||||||
p.DDLIndexRate /= secondsPerMinute
|
|
||||||
// [0 ~ Inf)
|
// [0 ~ Inf)
|
||||||
if p.DDLIndexRate < 0 {
|
if p.MaxIndexRate < 0 {
|
||||||
p.DDLIndexRate = defaultMax
|
p.MaxIndexRate = defaultMax
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *quotaConfig) initDDLFlushRate() {
|
func (p *quotaConfig) initFlushLimitEnabled() {
|
||||||
if !p.DDLLimitEnabled {
|
p.FlushLimitEnabled = p.Base.ParseBool("quotaAndLimits.flushRate.enabled", false)
|
||||||
p.DDLFlushRate = defaultMax
|
}
|
||||||
|
|
||||||
|
func (p *quotaConfig) initMaxFlushRate() {
|
||||||
|
if !p.FlushLimitEnabled {
|
||||||
|
p.MaxFlushRate = defaultMax
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
p.DDLFlushRate = p.Base.ParseFloatWithDefault("quotaAndLimits.ddl.flushRate", defaultMax)
|
p.MaxFlushRate = p.Base.ParseFloatWithDefault("quotaAndLimits.flushRate.max", defaultMax)
|
||||||
p.DDLFlushRate /= secondsPerMinute
|
|
||||||
// [0 ~ Inf)
|
// [0 ~ Inf)
|
||||||
if p.DDLFlushRate < 0 {
|
if p.MaxFlushRate < 0 {
|
||||||
p.DDLFlushRate = defaultMax
|
p.MaxFlushRate = defaultMax
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *quotaConfig) initDDLCompactionRate() {
|
func (p *quotaConfig) initCompactionLimitEnabled() {
|
||||||
if !p.DDLLimitEnabled {
|
p.CompactionLimitEnabled = p.Base.ParseBool("quotaAndLimits.compactionRate.enabled", false)
|
||||||
p.DDLCompactionRate = defaultMax
|
}
|
||||||
|
|
||||||
|
func (p *quotaConfig) initMaxCompactionRate() {
|
||||||
|
if !p.CompactionLimitEnabled {
|
||||||
|
p.MaxCompactionRate = defaultMax
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
p.DDLCompactionRate = p.Base.ParseFloatWithDefault("quotaAndLimits.ddl.compactionRate", defaultMax)
|
p.MaxCompactionRate = p.Base.ParseFloatWithDefault("quotaAndLimits.compactionRate.max", defaultMax)
|
||||||
p.DDLCompactionRate /= secondsPerMinute
|
|
||||||
// [0 ~ Inf)
|
// [0 ~ Inf)
|
||||||
if p.DDLCompactionRate < 0 {
|
if p.MaxCompactionRate < 0 {
|
||||||
p.DDLCompactionRate = defaultMax
|
p.MaxCompactionRate = defaultMax
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -36,9 +36,15 @@ func TestQuotaParam(t *testing.T) {
|
|||||||
assert.Equal(t, false, qc.DDLLimitEnabled)
|
assert.Equal(t, false, qc.DDLLimitEnabled)
|
||||||
assert.Equal(t, defaultMax, qc.DDLCollectionRate)
|
assert.Equal(t, defaultMax, qc.DDLCollectionRate)
|
||||||
assert.Equal(t, defaultMax, qc.DDLPartitionRate)
|
assert.Equal(t, defaultMax, qc.DDLPartitionRate)
|
||||||
assert.Equal(t, defaultMax, qc.DDLIndexRate)
|
})
|
||||||
assert.Equal(t, defaultMax, qc.DDLFlushRate)
|
|
||||||
assert.Equal(t, defaultMax, qc.DDLCompactionRate)
|
t.Run("test functional params", func(t *testing.T) {
|
||||||
|
assert.Equal(t, false, qc.IndexLimitEnabled)
|
||||||
|
assert.Equal(t, defaultMax, qc.MaxIndexRate)
|
||||||
|
assert.Equal(t, false, qc.FlushLimitEnabled)
|
||||||
|
assert.Equal(t, defaultMax, qc.MaxFlushRate)
|
||||||
|
assert.Equal(t, false, qc.CompactionLimitEnabled)
|
||||||
|
assert.Equal(t, defaultMax, qc.MaxCompactionRate)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("test dml", func(t *testing.T) {
|
t.Run("test dml", func(t *testing.T) {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user