mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-02-02 01:06:41 +08:00
Add collection number quota per DB (#23808)
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
parent
67dd70f1f9
commit
546d317941
@ -457,7 +457,9 @@ common:
|
||||
# If necessary, you can also manually force to deny RW requests.
|
||||
quotaAndLimits:
|
||||
enabled: true # `true` to enable quota and limits, `false` to disable.
|
||||
|
||||
limits:
|
||||
maxCollectionNum: 65536
|
||||
maxCollectionNumPerDB: 64
|
||||
# quotaCenterCollectInterval is the time interval that quotaCenter
|
||||
# collects metrics from Proxies, Query cluster and Data cluster.
|
||||
quotaCenterCollectInterval: 3 # seconds, (0 ~ 65536)
|
||||
|
||||
@ -10,6 +10,7 @@ import (
|
||||
|
||||
type Collection struct {
|
||||
TenantID string
|
||||
DBName string // TODO: @jiquan.long please help to assign, persistent and check
|
||||
CollectionID int64
|
||||
Partitions []*Partition
|
||||
Name string
|
||||
|
||||
@ -21,28 +21,22 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
ms "github.com/milvus-io/milvus/internal/mq/msgstream"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/commonpbutil"
|
||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/commonpb"
|
||||
|
||||
pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/metastore/model"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/common"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/samber/lo"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/schemapb"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/common"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/metastore/model"
|
||||
ms "github.com/milvus-io/milvus/internal/mq/msgstream"
|
||||
pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/internal/util/commonpbutil"
|
||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
)
|
||||
|
||||
type collectionChannels struct {
|
||||
@ -310,6 +304,7 @@ func (t *createCollectionTask) Execute(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// check collection number quota for the entire the instance
|
||||
existedCollInfos, err := t.core.meta.ListCollections(ctx, typeutil.MaxTimestamp)
|
||||
if err != nil {
|
||||
log.Warn("fail to list collections for checking the collection count", zap.Error(err))
|
||||
@ -317,9 +312,18 @@ func (t *createCollectionTask) Execute(ctx context.Context) error {
|
||||
}
|
||||
maxCollectionNum := Params.QuotaConfig.MaxCollectionNum
|
||||
if len(existedCollInfos) >= maxCollectionNum {
|
||||
log.Error("unable to create collection because the number of collection has reached the limit", zap.Int("max_collection_num", maxCollectionNum))
|
||||
log.Warn("unable to create collection because the number of collection has reached the limit", zap.Int("max_collection_num", maxCollectionNum))
|
||||
return fmt.Errorf("failed to create collection, limit={%d}, exceeded the limit number of collections", maxCollectionNum)
|
||||
}
|
||||
// check collection number quota for DB
|
||||
existedColsInDB := lo.Filter(existedCollInfos, func(collection *model.Collection, _ int) bool {
|
||||
return t.Req.GetDbName() != "" && collection.DBName == t.Req.GetDbName()
|
||||
})
|
||||
maxColNumPerDB := Params.QuotaConfig.MaxCollectionNumPerDB
|
||||
if len(existedColsInDB) >= maxColNumPerDB {
|
||||
log.Warn("unable to create collection because the number of collection has reached the limit in DB", zap.Int("maxCollectionNumPerDB", maxColNumPerDB))
|
||||
return fmt.Errorf("failed to create collection, maxCollectionNumPerDB={%d}, exceeded the limit number of collections per DB", maxColNumPerDB)
|
||||
}
|
||||
|
||||
undoTask := newBaseUndoTask(t.core.stepExecutor)
|
||||
undoTask.AddStep(&expireCacheStep{
|
||||
|
||||
@ -22,11 +22,10 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/milvus-io/milvus-proto/go-api/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/schemapb"
|
||||
@ -34,8 +33,8 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/metastore/model"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/etcdpb"
|
||||
mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks"
|
||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func Test_createCollectionTask_validate(t *testing.T) {
|
||||
@ -469,6 +468,7 @@ func Test_createCollectionTask_Execute(t *testing.T) {
|
||||
baseTask: newBaseTask(context.TODO(), core),
|
||||
Req: &milvuspb.CreateCollectionRequest{
|
||||
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_CreateCollection},
|
||||
DbName: "mock-db",
|
||||
CollectionName: collectionName,
|
||||
Schema: marshaledSchema,
|
||||
ShardsNum: int32(shardNum),
|
||||
@ -493,6 +493,17 @@ func Test_createCollectionTask_Execute(t *testing.T) {
|
||||
assert.Error(t, err)
|
||||
Params.QuotaConfig.MaxCollectionNum = originValue
|
||||
|
||||
meta.ListCollectionsFunc = func(ctx context.Context, ts Timestamp) ([]*model.Collection, error) {
|
||||
maxNum := Params.QuotaConfig.MaxCollectionNumPerDB
|
||||
collections := make([]*model.Collection, 0, maxNum)
|
||||
for i := 0; i < maxNum; i++ {
|
||||
collections = append(collections, &model.Collection{DBName: task.Req.GetDbName()})
|
||||
}
|
||||
return collections, nil
|
||||
}
|
||||
err = task.Execute(context.Background())
|
||||
assert.Error(t, err)
|
||||
|
||||
meta.ListCollectionsFunc = func(ctx context.Context, ts Timestamp) ([]*model.Collection, error) {
|
||||
return []*model.Collection{}, nil
|
||||
}
|
||||
|
||||
@ -79,7 +79,8 @@ type quotaConfig struct {
|
||||
DQLMinQueryRate float64
|
||||
|
||||
// limits
|
||||
MaxCollectionNum int
|
||||
MaxCollectionNum int
|
||||
MaxCollectionNumPerDB int
|
||||
|
||||
// limit writing
|
||||
ForceDenyWriting bool
|
||||
@ -139,6 +140,7 @@ func (p *quotaConfig) init(base *BaseTable) {
|
||||
|
||||
// limits
|
||||
p.initMaxCollectionNum()
|
||||
p.initMaxCollectionNumPerDB()
|
||||
|
||||
// limit writing
|
||||
p.initForceDenyWriting()
|
||||
@ -425,7 +427,11 @@ func (p *quotaConfig) initDQLMinQueryRate() {
|
||||
}
|
||||
|
||||
func (p *quotaConfig) initMaxCollectionNum() {
|
||||
p.MaxCollectionNum = p.Base.ParseIntWithDefault("quotaAndLimits.limits.collection.maxNum", 65535)
|
||||
p.MaxCollectionNum = p.Base.ParseIntWithDefault("quotaAndLimits.limits.maxCollectionNum", 65536)
|
||||
}
|
||||
|
||||
func (p *quotaConfig) initMaxCollectionNumPerDB() {
|
||||
p.MaxCollectionNumPerDB = p.Base.ParseIntWithDefault("quotaAndLimits.limits.maxCollectionNumPerDB", 64)
|
||||
}
|
||||
|
||||
func (p *quotaConfig) initForceDenyWriting() {
|
||||
|
||||
@ -66,7 +66,8 @@ func TestQuotaParam(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("test limits", func(t *testing.T) {
|
||||
assert.Equal(t, 65535, qc.MaxCollectionNum)
|
||||
assert.Equal(t, 65536, qc.MaxCollectionNum)
|
||||
assert.Equal(t, 64, qc.MaxCollectionNumPerDB)
|
||||
})
|
||||
|
||||
t.Run("test limit writing", func(t *testing.T) {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user