From 0201e00a2fd165cc4a3fd0f7422a24aaa2f72bf1 Mon Sep 17 00:00:00 2001 From: wei liu Date: Wed, 7 Aug 2024 12:38:21 +0800 Subject: [PATCH] enhance: enable to set load config in cluster level (#35293) issue: #35170 pr: #35169 This PR enable to set load configs in cluster level, such as replicas and resource groups. then when load collections will use the load config. Signed-off-by: Wei Liu --- .../querycoordv2/meta/coordinator_broker.go | 16 ++++ pkg/util/paramtable/component_param.go | 20 +++++ pkg/util/paramtable/component_param_test.go | 5 +- pkg/util/paramtable/param_item.go | 3 + tests/integration/replicas/load/load_test.go | 81 +++++++++++++++++++ 5 files changed, 124 insertions(+), 1 deletion(-) diff --git a/internal/querycoordv2/meta/coordinator_broker.go b/internal/querycoordv2/meta/coordinator_broker.go index b5b606b816..dc237bd7ac 100644 --- a/internal/querycoordv2/meta/coordinator_broker.go +++ b/internal/querycoordv2/meta/coordinator_broker.go @@ -151,6 +151,22 @@ func (broker *CoordinatorBroker) GetCollectionLoadInfo(ctx context.Context, coll } } + if replicaNum <= 0 || len(rgs) == 0 { + if replicaNum <= 0 { + replicaNum = paramtable.Get().QueryCoordCfg.ClusterLevelLoadReplicaNumber.GetAsInt64() + if replicaNum > 0 { + log.Info("get cluster level load info", zap.Int64("collectionID", collectionID), zap.Int64("replica_num", replicaNum)) + } + } + + if len(rgs) == 0 { + rgs = paramtable.Get().QueryCoordCfg.ClusterLevelLoadResourceGroups.GetAsStrings() + if len(rgs) > 0 { + log.Info("get cluster level load info", zap.Int64("collectionID", collectionID), zap.Strings("resource_groups", rgs)) + } + } + } + return rgs, replicaNum, nil } diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 21d34cd5ee..1354f8705c 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -1660,6 +1660,8 @@ type queryCoordConfig struct { CheckExecutedFlagInterval ParamItem `refreshable:"false"` UpdateCollectionLoadStatusInterval ParamItem `refreshable:"false"` CollectionBalanceSegmentBatchSize ParamItem `refreshable:"true"` + ClusterLevelLoadReplicaNumber ParamItem `refreshable:"true"` + ClusterLevelLoadResourceGroups ParamItem `refreshable:"true"` } func (p *queryCoordConfig) init(base *BaseTable) { @@ -2193,6 +2195,24 @@ func (p *queryCoordConfig) init(base *BaseTable) { Export: false, } p.CollectionBalanceSegmentBatchSize.Init(base.mgr) + + p.ClusterLevelLoadReplicaNumber = ParamItem{ + Key: "queryCoord.clusterLevelLoadReplicaNumber", + Version: "2.4.7", + DefaultValue: "0", + Doc: "the cluster level default value for load replica number", + Export: false, + } + p.ClusterLevelLoadReplicaNumber.Init(base.mgr) + + p.ClusterLevelLoadResourceGroups = ParamItem{ + Key: "queryCoord.clusterLevelLoadResourceGroups", + Version: "2.4.7", + DefaultValue: "", + Doc: "resource group names for load collection should be at least equal to queryCoord.clusterLevelLoadReplicaNumber, separate with commas", + Export: false, + } + p.ClusterLevelLoadResourceGroups.Init(base.mgr) } // ///////////////////////////////////////////////////////////////////////////// diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 346585d661..355a9a0d35 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -103,7 +103,7 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, "defaultMilvus", Params.DefaultRootPassword.GetValue()) params.Save("common.security.superUsers", "") - assert.Equal(t, []string{""}, Params.SuperUsers.GetAsStrings()) + assert.Equal(t, []string{}, Params.SuperUsers.GetAsStrings()) assert.Equal(t, false, Params.PreCreatedTopicEnabled.GetAsBool()) @@ -340,6 +340,9 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, 0.1, Params.DelegatorMemoryOverloadFactor.GetAsFloat()) assert.Equal(t, 5, Params.CollectionBalanceSegmentBatchSize.GetAsInt()) + + assert.Equal(t, 0, Params.ClusterLevelLoadReplicaNumber.GetAsInt()) + assert.Len(t, Params.ClusterLevelLoadResourceGroups.GetAsStrings(), 0) }) t.Run("test queryNodeConfig", func(t *testing.T) { diff --git a/pkg/util/paramtable/param_item.go b/pkg/util/paramtable/param_item.go index c3f00727c8..6f201626c6 100644 --- a/pkg/util/paramtable/param_item.go +++ b/pkg/util/paramtable/param_item.go @@ -313,6 +313,9 @@ func ParseAsStings(v string) []string { } func getAsStrings(v string) []string { + if len(v) == 0 { + return []string{} + } return getAndConvert(v, func(value string) ([]string, error) { return strings.Split(value, ","), nil }, []string{}) diff --git a/tests/integration/replicas/load/load_test.go b/tests/integration/replicas/load/load_test.go index 89c8c9fbf4..cac163103d 100644 --- a/tests/integration/replicas/load/load_test.go +++ b/tests/integration/replicas/load/load_test.go @@ -355,6 +355,87 @@ func (s *LoadTestSuite) TestLoadWithPredefineDatabaseLevelConfig() { s.releaseCollection(newDbName, collectionName) } +func (s *LoadTestSuite) TestLoadWithPredefineClusterLevelConfig() { + ctx := context.Background() + + // prepare resource groups + rgNum := 3 + rgs := make([]string, 0) + for i := 0; i < rgNum; i++ { + rgs = append(rgs, fmt.Sprintf("rg_%d", i)) + s.Cluster.QueryCoord.CreateResourceGroup(ctx, &milvuspb.CreateResourceGroupRequest{ + ResourceGroup: rgs[i], + Config: &rgpb.ResourceGroupConfig{ + Requests: &rgpb.ResourceGroupLimit{ + NodeNum: 1, + }, + Limits: &rgpb.ResourceGroupLimit{ + NodeNum: 1, + }, + + TransferFrom: []*rgpb.ResourceGroupTransfer{ + { + ResourceGroup: meta.DefaultResourceGroupName, + }, + }, + TransferTo: []*rgpb.ResourceGroupTransfer{ + { + ResourceGroup: meta.DefaultResourceGroupName, + }, + }, + }, + }) + } + + resp, err := s.Cluster.QueryCoord.ListResourceGroups(ctx, &milvuspb.ListResourceGroupsRequest{}) + s.NoError(err) + s.True(merr.Ok(resp.GetStatus())) + s.Len(resp.GetResourceGroups(), rgNum+1) + + for i := 1; i < rgNum; i++ { + s.Cluster.AddQueryNode() + } + + s.Eventually(func() bool { + matchCounter := 0 + for _, rg := range rgs { + resp1, err := s.Cluster.QueryCoord.DescribeResourceGroup(ctx, &querypb.DescribeResourceGroupRequest{ + ResourceGroup: rg, + }) + s.NoError(err) + s.True(merr.Ok(resp.GetStatus())) + if len(resp1.ResourceGroup.Nodes) == 1 { + matchCounter += 1 + } + } + return matchCounter == rgNum + }, 30*time.Second, time.Second) + + s.CreateCollectionWithConfiguration(ctx, &integration.CreateCollectionConfig{ + DBName: dbName, + Dim: dim, + CollectionName: collectionName, + ChannelNum: 1, + SegmentNum: 3, + RowNumPerSegment: 2000, + }) + paramtable.Get().Save(paramtable.Get().QueryCoordCfg.ClusterLevelLoadReplicaNumber.Key, "3") + defer paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.ClusterLevelLoadReplicaNumber.Key) + paramtable.Get().Save(paramtable.Get().QueryCoordCfg.ClusterLevelLoadResourceGroups.Key, strings.Join(rgs, ",")) + defer paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.ClusterLevelLoadResourceGroups.Key) + + // load collection without specified replica and rgs + s.loadCollection(collectionName, dbName, 0, nil) + resp2, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{ + DbName: dbName, + CollectionName: collectionName, + }) + s.NoError(err) + s.True(merr.Ok(resp2.Status)) + s.Len(resp2.GetReplicas(), 3) + s.releaseCollection(dbName, collectionName) +} + func TestReplicas(t *testing.T) { suite.Run(t, new(LoadTestSuite)) }