From 1aa31e2a9bfe583b23469bd126e27cde0985e774 Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Tue, 17 Dec 2024 23:24:45 +0800 Subject: [PATCH] fix: Fix inaccurate general count (#38524) Checking general count should only count healthy collections and partitions. issue: https://github.com/milvus-io/milvus/issues/37630 --------- Signed-off-by: bigsheeper --- internal/rootcoord/constrant_test.go | 92 ++++++++++++++++++++++++++++ internal/rootcoord/meta_table.go | 30 ++++----- 2 files changed, 108 insertions(+), 14 deletions(-) create mode 100644 internal/rootcoord/constrant_test.go diff --git a/internal/rootcoord/constrant_test.go b/internal/rootcoord/constrant_test.go new file mode 100644 index 0000000000..6e8bb9e734 --- /dev/null +++ b/internal/rootcoord/constrant_test.go @@ -0,0 +1,92 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rootcoord + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/milvus-io/milvus/internal/metastore/mocks" + "github.com/milvus-io/milvus/internal/metastore/model" + pb "github.com/milvus-io/milvus/internal/proto/etcdpb" + mocktso "github.com/milvus-io/milvus/internal/tso/mocks" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +func TestCheckGeneralCapacity(t *testing.T) { + ctx := context.Background() + + catalog := mocks.NewRootCoordCatalog(t) + catalog.EXPECT().ListDatabases(mock.Anything, mock.Anything).Return(nil, nil) + catalog.EXPECT().ListCollections(mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) + catalog.EXPECT().ListAliases(mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) + catalog.EXPECT().CreateDatabase(mock.Anything, mock.Anything, mock.Anything).Return(nil) + + allocator := mocktso.NewAllocator(t) + allocator.EXPECT().GenerateTSO(mock.Anything).Return(1000, nil) + + meta, err := NewMetaTable(ctx, catalog, allocator) + assert.NoError(t, err) + core := newTestCore(withMeta(meta)) + assert.Equal(t, 0, meta.GetGeneralCount(ctx)) + + Params.Save(Params.RootCoordCfg.MaxGeneralCapacity.Key, "512") + defer Params.Reset(Params.RootCoordCfg.MaxGeneralCapacity.Key) + + assert.Equal(t, 0, meta.GetGeneralCount(ctx)) + err = checkGeneralCapacity(ctx, 1, 2, 256, core) + assert.NoError(t, err) + assert.Equal(t, 0, meta.GetGeneralCount(ctx)) + err = checkGeneralCapacity(ctx, 2, 4, 256, core) + assert.Error(t, err) + + catalog.EXPECT().CreateCollection(mock.Anything, mock.Anything, mock.Anything).Return(nil) + err = meta.CreateDatabase(ctx, &model.Database{}, typeutil.MaxTimestamp) + assert.NoError(t, err) + err = meta.AddCollection(ctx, &model.Collection{ + CollectionID: 1, + State: pb.CollectionState_CollectionCreating, + ShardsNum: 256, + Partitions: []*model.Partition{ + {PartitionID: 100, State: pb.PartitionState_PartitionCreated}, + {PartitionID: 200, State: pb.PartitionState_PartitionCreated}, + }, + }) + assert.NoError(t, err) + + assert.Equal(t, 0, meta.GetGeneralCount(ctx)) + err = checkGeneralCapacity(ctx, 1, 2, 256, core) + assert.NoError(t, err) + + catalog.EXPECT().AlterCollection(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + err = meta.ChangeCollectionState(ctx, 1, pb.CollectionState_CollectionCreated, typeutil.MaxTimestamp) + assert.NoError(t, err) + + assert.Equal(t, 512, meta.GetGeneralCount(ctx)) + err = checkGeneralCapacity(ctx, 1, 1, 1, core) + assert.Error(t, err) + + err = meta.ChangeCollectionState(ctx, 1, pb.CollectionState_CollectionDropping, typeutil.MaxTimestamp) + assert.NoError(t, err) + + assert.Equal(t, 0, meta.GetGeneralCount(ctx)) + err = checkGeneralCapacity(ctx, 1, 2, 256, core) + assert.NoError(t, err) +} diff --git a/internal/rootcoord/meta_table.go b/internal/rootcoord/meta_table.go index 804093f019..4dfc6d2a03 100644 --- a/internal/rootcoord/meta_table.go +++ b/internal/rootcoord/meta_table.go @@ -199,11 +199,12 @@ func (mt *MetaTable) reload() error { collection.DBName = dbName } mt.collID2Meta[collection.CollectionID] = collection - mt.generalCnt += len(collection.Partitions) * int(collection.ShardsNum) if collection.Available() { mt.names.insert(dbName, collection.Name, collection.CollectionID) + pn := collection.GetPartitionNum(true) + mt.generalCnt += pn * int(collection.ShardsNum) collectionNum++ - partitionNum += int64(collection.GetPartitionNum(true)) + partitionNum += int64(pn) } } @@ -243,8 +244,10 @@ func (mt *MetaTable) reloadWithNonDatabase() error { mt.collID2Meta[collection.CollectionID] = collection if collection.Available() { mt.names.insert(util.DefaultDBName, collection.Name, collection.CollectionID) + pn := collection.GetPartitionNum(true) + mt.generalCnt += pn * int(collection.ShardsNum) collectionNum++ - partitionNum += int64(collection.GetPartitionNum(true)) + partitionNum += int64(pn) } } @@ -428,8 +431,6 @@ func (mt *MetaTable) AddCollection(ctx context.Context, coll *model.Collection) mt.collID2Meta[coll.CollectionID] = coll.Clone() mt.names.insert(db.Name, coll.Name, coll.CollectionID) - mt.generalCnt += len(coll.Partitions) * int(coll.ShardsNum) - log.Ctx(ctx).Info("add collection to meta table", zap.Int64("dbID", coll.DBID), zap.String("collection", coll.Name), @@ -460,13 +461,17 @@ func (mt *MetaTable) ChangeCollectionState(ctx context.Context, collectionID Uni return fmt.Errorf("dbID not found for collection:%d", collectionID) } + pn := coll.GetPartitionNum(true) + switch state { case pb.CollectionState_CollectionCreated: + mt.generalCnt += pn * int(coll.ShardsNum) metrics.RootCoordNumOfCollections.WithLabelValues(db.Name).Inc() - metrics.RootCoordNumOfPartitions.WithLabelValues().Add(float64(coll.GetPartitionNum(true))) - default: + metrics.RootCoordNumOfPartitions.WithLabelValues().Add(float64(pn)) + case pb.CollectionState_CollectionDropping: + mt.generalCnt -= pn * int(coll.ShardsNum) metrics.RootCoordNumOfCollections.WithLabelValues(db.Name).Dec() - metrics.RootCoordNumOfPartitions.WithLabelValues().Sub(float64(coll.GetPartitionNum(true))) + metrics.RootCoordNumOfPartitions.WithLabelValues().Sub(float64(pn)) } log.Ctx(ctx).Info("change collection state", zap.Int64("collection", collectionID), @@ -534,8 +539,6 @@ func (mt *MetaTable) RemoveCollection(ctx context.Context, collectionID UniqueID mt.removeAllNamesIfMatchedInternal(collectionID, allNames) mt.removeCollectionByIDInternal(collectionID) - mt.generalCnt -= len(coll.Partitions) * int(coll.ShardsNum) - log.Ctx(ctx).Info("remove collection", zap.Int64("dbID", coll.DBID), zap.String("name", coll.Name), @@ -942,8 +945,6 @@ func (mt *MetaTable) AddPartition(ctx context.Context, partition *model.Partitio } mt.collID2Meta[partition.CollectionID].Partitions = append(mt.collID2Meta[partition.CollectionID].Partitions, partition.Clone()) - mt.generalCnt += int(coll.ShardsNum) // 1 partition * shardNum - log.Ctx(ctx).Info("add partition to meta table", zap.Int64("collection", partition.CollectionID), zap.String("partition", partition.PartitionName), zap.Int64("partitionid", partition.PartitionID), zap.Uint64("ts", partition.PartitionCreatedTimestamp)) @@ -971,9 +972,11 @@ func (mt *MetaTable) ChangePartitionState(ctx context.Context, collectionID Uniq switch state { case pb.PartitionState_PartitionCreated: + mt.generalCnt += int(coll.ShardsNum) // 1 partition * shardNum // support Dynamic load/release partitions metrics.RootCoordNumOfPartitions.WithLabelValues().Inc() - default: + case pb.PartitionState_PartitionDropping: + mt.generalCnt -= int(coll.ShardsNum) // 1 partition * shardNum metrics.RootCoordNumOfPartitions.WithLabelValues().Dec() } @@ -1008,7 +1011,6 @@ func (mt *MetaTable) RemovePartition(ctx context.Context, dbID int64, collection } if loc != -1 { coll.Partitions = append(coll.Partitions[:loc], coll.Partitions[loc+1:]...) - mt.generalCnt -= int(coll.ShardsNum) // 1 partition * shardNum } log.Ctx(ctx).Info("remove partition", zap.Int64("collection", collectionID), zap.Int64("partition", partitionID), zap.Uint64("ts", ts)) return nil