From 2a0eb1d2e6b8700536ef7e75c6afac9d207442d1 Mon Sep 17 00:00:00 2001 From: MrPresent-Han <116052805+MrPresent-Han@users.noreply.github.com> Date: Tue, 16 Jan 2024 16:32:53 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20support=20general=20capacity=20restrict?= =?UTF-8?q?=20for=20cloud-side=20resoure=20contro=E2=80=A6=20(#29845)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit related: #29844 Signed-off-by: MrPresent-Han --- configs/milvus.yaml | 1 + internal/rootcoord/constrant.go | 46 +++++++++++++++ internal/rootcoord/create_collection_task.go | 12 +++- .../rootcoord/create_collection_task_test.go | 57 ++++++++++++++++++- internal/rootcoord/create_partition_task.go | 2 +- .../rootcoord/create_partition_task_test.go | 9 +++ pkg/util/merr/errors.go | 3 + pkg/util/merr/utils.go | 9 +++ pkg/util/paramtable/component_param.go | 16 ++++++ 9 files changed, 150 insertions(+), 5 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index e97104bd30..4fcbd97615 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -180,6 +180,7 @@ rootCoord: serverMaxRecvSize: 268435456 clientMaxSendSize: 268435456 clientMaxRecvSize: 536870912 + maxGeneralCapacity: 65536 # Related configuration of proxy, used to validate client requests and reduce the returned results. proxy: diff --git a/internal/rootcoord/constrant.go b/internal/rootcoord/constrant.go index a15e57922b..0aed96c8c5 100644 --- a/internal/rootcoord/constrant.go +++ b/internal/rootcoord/constrant.go @@ -16,6 +16,13 @@ package rootcoord +import ( + "context" + + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + const ( // TODO: better to make them configurable, use default value if no config was set since we never explode these before. globalIDAllocatorKey = "idTimestamp" @@ -23,3 +30,42 @@ const ( globalTSOAllocatorKey = "timestamp" globalTSOAllocatorSubPath = "tso" ) + +func checkGeneralCapacity(ctx context.Context, newColNum int, + newParNum int64, + newShardNum int32, + core *Core, + ts typeutil.Timestamp, +) error { + var addedNum int64 = 0 + if newColNum > 0 && newParNum > 0 && newShardNum > 0 { + // create collections scenarios + addedNum += int64(newColNum) * newParNum * int64(newShardNum) + } else if newColNum == 0 && newShardNum == 0 && newParNum > 0 { + // add partitions to existing collections + addedNum += newParNum + } + + var generalNum int64 = 0 + collectionsMap := core.meta.ListAllAvailCollections(ctx) + for dbId, collectionIds := range collectionsMap { + db, err := core.meta.GetDatabaseByID(ctx, dbId, ts) + if err == nil { + for _, collectionId := range collectionIds { + collection, err := core.meta.GetCollectionByID(ctx, db.Name, collectionId, ts, true) + if err == nil { + partNum := int64(collection.GetPartitionNum(false)) + shardNum := int64(collection.ShardsNum) + generalNum += partNum * shardNum + } + } + } + } + + generalNum += addedNum + if generalNum > Params.RootCoordCfg.MaxGeneralCapacity.GetAsInt64() { + return merr.WrapGeneralCapacityExceed(generalNum, Params.RootCoordCfg.MaxGeneralCapacity.GetAsInt64(), + "failed checking constraint: sum_collections(parition*shard) exceeding the max general capacity:") + } + return nil +} diff --git a/internal/rootcoord/create_collection_task.go b/internal/rootcoord/create_collection_task.go index 7356df5c15..aca073c8e7 100644 --- a/internal/rootcoord/create_collection_task.go +++ b/internal/rootcoord/create_collection_task.go @@ -66,8 +66,8 @@ func (t *createCollectionTask) validate() error { return err } + // 1. check shard number shardsNum := t.Req.GetShardsNum() - cfgMaxShardNum := Params.RootCoordCfg.DmlChannelNum.GetAsInt32() if shardsNum > cfgMaxShardNum { return fmt.Errorf("shard num (%d) exceeds max configuration (%d)", shardsNum, cfgMaxShardNum) @@ -78,6 +78,7 @@ func (t *createCollectionTask) validate() error { return fmt.Errorf("shard num (%d) exceeds system limit (%d)", shardsNum, cfgShardLimit) } + // 2. check db-collection capacity db2CollIDs := t.core.meta.ListAllAvailCollections(t.ctx) collIDs, ok := db2CollIDs[t.dbID] @@ -92,6 +93,7 @@ func (t *createCollectionTask) validate() error { return merr.WrapErrCollectionNumLimitExceeded(maxColNumPerDB, "max number of collection has reached the limit in DB") } + // 3. check total collection number totalCollections := 0 for _, collIDs := range db2CollIDs { totalCollections += len(collIDs) @@ -102,7 +104,13 @@ func (t *createCollectionTask) validate() error { log.Warn("unable to create collection because the number of collection has reached the limit", zap.Int("max_collection_num", maxCollectionNum)) return merr.WrapErrCollectionNumLimitExceeded(maxCollectionNum, "max number of collection has reached the limit") } - return nil + + // 4. check collection * shard * partition + var newPartNum int64 = 1 + if t.Req.GetNumPartitions() > 0 { + newPartNum = t.Req.GetNumPartitions() + } + return checkGeneralCapacity(t.ctx, 1, newPartNum, t.Req.GetShardsNum(), t.core, t.ts) } func checkDefaultValue(schema *schemapb.CollectionSchema) error { diff --git a/internal/rootcoord/create_collection_task_test.go b/internal/rootcoord/create_collection_task_test.go index 00e926c0ba..710e5ab22f 100644 --- a/internal/rootcoord/create_collection_task_test.go +++ b/internal/rootcoord/create_collection_task_test.go @@ -183,6 +183,48 @@ func Test_createCollectionTask_validate(t *testing.T) { assert.Error(t, err) }) + t.Run("collection general number exceeds limit", func(t *testing.T) { + paramtable.Get().Save(Params.RootCoordCfg.MaxGeneralCapacity.Key, strconv.Itoa(1)) + defer paramtable.Get().Reset(Params.RootCoordCfg.MaxGeneralCapacity.Key) + + meta := mockrootcoord.NewIMetaTable(t) + meta.On("ListAllAvailCollections", + mock.Anything, + ).Return(map[int64][]int64{ + 1: {1, 2}, + }, nil) + meta.On("GetDatabaseByID", + mock.Anything, mock.Anything, mock.Anything, + ).Return(&model.Database{ + Name: "default", + }, nil) + meta.On("GetCollectionByID", + mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, + ).Return(&model.Collection{ + Name: "default", + ShardsNum: 2, + Partitions: []*model.Partition{ + { + PartitionID: 1, + }, + }, + }, nil) + + core := newTestCore(withMeta(meta)) + + task := createCollectionTask{ + baseTask: newBaseTask(context.TODO(), core), + Req: &milvuspb.CreateCollectionRequest{ + Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_CreateCollection}, + NumPartitions: 256, + ShardsNum: 2, + }, + dbID: util.DefaultDBID, + } + err := task.validate() + assert.ErrorIs(t, err, merr.ErrGeneralCapacityExceeded) + }) + t.Run("normal case", func(t *testing.T) { meta := mockrootcoord.NewIMetaTable(t) meta.On("ListAllAvailCollections", @@ -190,12 +232,16 @@ func Test_createCollectionTask_validate(t *testing.T) { ).Return(map[int64][]int64{ 1: {1, 2}, }, nil) + meta.On("GetDatabaseByID", mock.Anything, + mock.Anything, mock.Anything).Return(nil, errors.New("mock")) core := newTestCore(withMeta(meta)) task := createCollectionTask{ baseTask: newBaseTask(context.TODO(), core), Req: &milvuspb.CreateCollectionRequest{ - Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_CreateCollection}, + Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_CreateCollection}, + NumPartitions: 2, + ShardsNum: 2, }, dbID: 1, } @@ -526,6 +572,8 @@ func Test_createCollectionTask_Prepare(t *testing.T) { }) t.Run("invalid schema", func(t *testing.T) { + meta.On("GetDatabaseByID", mock.Anything, + mock.Anything, mock.Anything).Return(nil, errors.New("mock")) core := newTestCore(withMeta(meta)) collectionName := funcutil.GenRandomStr() task := &createCollectionTask{ @@ -554,7 +602,8 @@ func Test_createCollectionTask_Prepare(t *testing.T) { } marshaledSchema, err := proto.Marshal(schema) assert.NoError(t, err) - + meta.On("GetDatabaseByID", mock.Anything, + mock.Anything, mock.Anything).Return(nil, errors.New("mock")) core := newTestCore(withInvalidIDAllocator(), withMeta(meta)) task := createCollectionTask{ @@ -577,6 +626,8 @@ func Test_createCollectionTask_Prepare(t *testing.T) { field1 := funcutil.GenRandomStr() ticker := newRocksMqTtSynchronizer() + meta.On("GetDatabaseByID", mock.Anything, + mock.Anything, mock.Anything).Return(nil, errors.New("mock")) core := newTestCore(withValidIDAllocator(), withTtSynchronizer(ticker), withMeta(meta)) @@ -912,6 +963,8 @@ func Test_createCollectionTask_PartitionKey(t *testing.T) { ).Return(map[int64][]int64{ util.DefaultDBID: {1, 2}, }, nil) + meta.On("GetDatabaseByID", mock.Anything, + mock.Anything, mock.Anything).Return(nil, errors.New("mock")) paramtable.Get().Save(Params.QuotaConfig.MaxCollectionNum.Key, strconv.Itoa(math.MaxInt64)) defer paramtable.Get().Reset(Params.QuotaConfig.MaxCollectionNum.Key) diff --git a/internal/rootcoord/create_partition_task.go b/internal/rootcoord/create_partition_task.go index bd09ee869e..b2587f9d87 100644 --- a/internal/rootcoord/create_partition_task.go +++ b/internal/rootcoord/create_partition_task.go @@ -44,7 +44,7 @@ func (t *createPartitionTask) Prepare(ctx context.Context) error { return err } t.collMeta = collMeta - return nil + return checkGeneralCapacity(ctx, 0, 1, 0, t.core, t.ts) } func (t *createPartitionTask) Execute(ctx context.Context) error { diff --git a/internal/rootcoord/create_partition_task_test.go b/internal/rootcoord/create_partition_task_test.go index 8d4d315d86..880291da9b 100644 --- a/internal/rootcoord/create_partition_task_test.go +++ b/internal/rootcoord/create_partition_task_test.go @@ -20,6 +20,7 @@ import ( "context" "testing" + "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -61,6 +62,14 @@ func Test_createPartitionTask_Prepare(t *testing.T) { mock.Anything, mock.Anything, ).Return(coll.Clone(), nil) + meta.On("ListAllAvailCollections", + mock.Anything, + ).Return(map[int64][]int64{ + 1: {1, 2}, + }, nil) + meta.On("GetDatabaseByID", + mock.Anything, mock.Anything, mock.Anything, + ).Return(nil, errors.New("mock")) core := newTestCore(withMeta(meta)) task := &createPartitionTask{ diff --git a/pkg/util/merr/errors.go b/pkg/util/merr/errors.go index fcd588f544..664d06441d 100644 --- a/pkg/util/merr/errors.go +++ b/pkg/util/merr/errors.go @@ -57,6 +57,9 @@ var ( ErrPartitionNotLoaded = newMilvusError("partition not loaded", 201, false) ErrPartitionNotFullyLoaded = newMilvusError("partition not fully loaded", 202, true) + // General capacity related + ErrGeneralCapacityExceeded = newMilvusError("general capacity exceeded", 250, false) + // ResourceGroup related ErrResourceGroupNotFound = newMilvusError("resource group not found", 300, false) diff --git a/pkg/util/merr/utils.go b/pkg/util/merr/utils.go index dacd238ebb..767a01beaa 100644 --- a/pkg/util/merr/utils.go +++ b/pkg/util/merr/utils.go @@ -540,6 +540,15 @@ func WrapErrPartitionNotFullyLoaded(partition any, msg ...string) error { return err } +func WrapGeneralCapacityExceed(newGeneralSize any, generalCapacity any, msg ...string) error { + err := wrapFields(ErrGeneralCapacityExceeded, value("newGeneralSize", newGeneralSize), + value("generalCapacity", generalCapacity)) + if len(msg) > 0 { + err = errors.Wrap(err, strings.Join(msg, "->")) + } + return err +} + // ResourceGroup related func WrapErrResourceGroupNotFound(rg any, msg ...string) error { err := wrapFields(ErrResourceGroupNotFound, value("rg", rg)) diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 33c5570a04..3a991b9f31 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -869,6 +869,7 @@ type rootCoordConfig struct { ImportTaskSubPath ParamItem `refreshable:"true"` EnableActiveStandby ParamItem `refreshable:"false"` MaxDatabaseNum ParamItem `refreshable:"false"` + MaxGeneralCapacity ParamItem `refreshable:"true"` } func (p *rootCoordConfig) init(base *BaseTable) { @@ -948,6 +949,21 @@ func (p *rootCoordConfig) init(base *BaseTable) { Export: true, } p.MaxDatabaseNum.Init(base.mgr) + + p.MaxGeneralCapacity = ParamItem{ + Key: "rootCoord.maxGeneralCapacity", + Version: "2.3.5", + DefaultValue: "65536", + Doc: "upper limit for the sum of of product of partitionNumber and shardNumber", + Export: true, + Formatter: func(v string) string { + if getAsInt(v) < 512 { + return "512" + } + return v + }, + } + p.MaxGeneralCapacity.Init(base.mgr) } // /////////////////////////////////////////////////////////////////////////////