diff --git a/internal/querycoordv2/meta/resource_group.go b/internal/querycoordv2/meta/resource_group.go index 69780f9b93..f2e86f0f91 100644 --- a/internal/querycoordv2/meta/resource_group.go +++ b/internal/querycoordv2/meta/resource_group.go @@ -13,7 +13,6 @@ import ( var ( DefaultResourceGroupName = "__default_resource_group" defaultResourceGroupCapacity int32 = 1000000 - resourceGroupTransferBoost = 10000 ) func NewResourceGroupConfig(request int32, limit int32) *rgpb.ResourceGroupConfig { @@ -197,15 +196,19 @@ func (rg *ResourceGroup) SelectNodeForRG(targetRG *ResourceGroup) int64 { // return node and priority. func (rg *ResourceGroup) AcceptNode(nodeID int64) bool { - if rg.GetName() == DefaultResourceGroupName { - return true - } - nodeInfo := rg.nodeMgr.Get(nodeID) if nodeInfo == nil { return false } + if nodeInfo.ResourceGroupName() != "" && nodeInfo.ResourceGroupName() != rg.GetName() { + return false + } + + if rg.GetName() == DefaultResourceGroupName { + return true + } + requiredNodeLabels := rg.GetConfig().GetNodeFilter().GetNodeLabels() if len(requiredNodeLabels) == 0 { return true diff --git a/internal/querycoordv2/meta/resource_group_test.go b/internal/querycoordv2/meta/resource_group_test.go index 1e7da01b57..5441f92aec 100644 --- a/internal/querycoordv2/meta/resource_group_test.go +++ b/internal/querycoordv2/meta/resource_group_test.go @@ -8,6 +8,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/rgpb" "github.com/milvus-io/milvus/internal/querycoordv2/session" + "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/v2/proto/querypb" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) @@ -451,3 +452,33 @@ func TestRGNodeFilter(t *testing.T) { }, nodeMgr) assert.Equal(t, rg.SelectNodeForRG(rg3), int64(-1)) } + +func TestRGAcceptNode(t *testing.T) { + nodeMgr := session.NewNodeManager() + + nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1, + Labels: map[string]string{ + sessionutil.LabelResourceGroup: "rg1", + }, + })) + + nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 2, + Labels: map[string]string{ + sessionutil.LabelResourceGroup: "rg2", + }, + })) + + rg := NewResourceGroup("rg1", &rgpb.ResourceGroupConfig{ + Requests: &rgpb.ResourceGroupLimit{ + NodeNum: 1, + }, + Limits: &rgpb.ResourceGroupLimit{ + NodeNum: 1, + }, + }, nodeMgr) + + assert.True(t, rg.AcceptNode(1)) + assert.False(t, rg.AcceptNode(2)) +} diff --git a/internal/querycoordv2/meta/resource_manager.go b/internal/querycoordv2/meta/resource_manager.go index d5abfd6534..d9322c3a50 100644 --- a/internal/querycoordv2/meta/resource_manager.go +++ b/internal/querycoordv2/meta/resource_manager.go @@ -797,16 +797,18 @@ func (rm *ResourceManager) selectNodeForRedundantRecover(sourceRG *ResourceGroup // assignIncomingNodeWithNodeCheck assign node to resource group with node status check. func (rm *ResourceManager) assignIncomingNodeWithNodeCheck(ctx context.Context, node int64) (string, error) { // node is on stopping or stopped, remove it from incoming node set. - if rm.nodeMgr.Get(node) == nil { + nodeInfo := rm.nodeMgr.Get(node) + if nodeInfo == nil { rm.incomingNode.Remove(node) return "", errors.New("node is not online") } - if ok, _ := rm.nodeMgr.IsStoppingNode(node); ok { + + if nodeInfo.IsStoppingState() { rm.incomingNode.Remove(node) return "", errors.New("node has been stopped") } - rgName, err := rm.assignIncomingNode(ctx, node) + rgName, err := rm.assignIncomingNode(ctx, nodeInfo) if err != nil { return "", err } @@ -816,7 +818,9 @@ func (rm *ResourceManager) assignIncomingNodeWithNodeCheck(ctx context.Context, } // assignIncomingNode assign node to resource group. -func (rm *ResourceManager) assignIncomingNode(ctx context.Context, node int64) (string, error) { +func (rm *ResourceManager) assignIncomingNode(ctx context.Context, nodeInfo *session.NodeInfo) (string, error) { + node := nodeInfo.ID() + // If node already assign to rg. rg := rm.getResourceGroupByNodeID(node) if rg != nil { @@ -827,16 +831,53 @@ func (rm *ResourceManager) assignIncomingNode(ctx context.Context, node int64) ( return rg.GetName(), nil } + if err := rm.createResourceGroupIfNotExists(ctx, nodeInfo); err != nil { + return "", err + } + // select a resource group to assign incoming node. - rg = rm.mustSelectAssignIncomingNodeTargetRG(node) + rg = rm.mustSelectAssignIncomingNodeTargetRG(nodeInfo) if err := rm.transferNode(ctx, rg.GetName(), node); err != nil { return "", errors.Wrap(err, "at finally assign to default resource group") } return rg.GetName(), nil } +// createResourceGroupIfNotExists create resource group if not exists. +func (rm *ResourceManager) createResourceGroupIfNotExists(ctx context.Context, nodeInfo *session.NodeInfo) error { + rgName := nodeInfo.ResourceGroupName() + nodeID := nodeInfo.ID() + if rgName == "" { + return nil + } + if _, ok := rm.groups[rgName]; ok { + return nil + } + if err := rm.updateResourceGroups(ctx, map[string]*rgpb.ResourceGroupConfig{ + rgName: { + Requests: &rgpb.ResourceGroupLimit{ + NodeNum: 0, + }, + Limits: &rgpb.ResourceGroupLimit{ + NodeNum: defaultResourceGroupCapacity, + }, + }, + }); err != nil { + log.Warn("failed to create resource group from session of new incoming node", zap.String("rgName", rgName), zap.Int64("nodeID", nodeID), zap.Error(err)) + return err + } + log.Info("create resource group from session of new incoming node", zap.String("rgName", rgName), zap.Int64("nodeID", nodeID)) + return nil +} + // mustSelectAssignIncomingNodeTargetRG select resource group for assign incoming node. -func (rm *ResourceManager) mustSelectAssignIncomingNodeTargetRG(nodeID int64) *ResourceGroup { +func (rm *ResourceManager) mustSelectAssignIncomingNodeTargetRG(nodeInfo *session.NodeInfo) *ResourceGroup { + if nodeInfo.ResourceGroupName() != "" { + // rg will be created if not exists by createResourceGroupIfNotExists + return rm.groups[nodeInfo.ResourceGroupName()] + } + + nodeID := nodeInfo.ID() // First, Assign it to rg with the most missing nodes at high priority. if rg := rm.findMaxRGWithGivenFilter( func(rg *ResourceGroup) bool { @@ -1020,6 +1061,12 @@ func (rm *ResourceManager) validateResourceGroupIsDeletable(rgName string) error return merr.WrapErrParameterInvalid("not empty resource group", rgName, "resource group's limits node num is not 0") } + for _, nodeInfo := range rm.nodeMgr.GetAll() { + if nodeInfo.ResourceGroupName() == rgName { + return merr.WrapErrParameterInvalid("not empty resource group", fmt.Sprintf("node %d is still in the resource group", nodeInfo.ID())) + } + } + // If rg is used by other rg, it's not deletable. for _, rg := range rm.groups { for _, transferCfg := range rg.GetConfig().GetTransferFrom() { diff --git a/internal/querycoordv2/meta/resource_manager_test.go b/internal/querycoordv2/meta/resource_manager_test.go index 3646f09f3b..7ed0c7105b 100644 --- a/internal/querycoordv2/meta/resource_manager_test.go +++ b/internal/querycoordv2/meta/resource_manager_test.go @@ -232,6 +232,17 @@ func (suite *ResourceManagerSuite) TestManipulateResourceGroup() { }, })) suite.manager.HandleNodeUp(ctx, 10) + + suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 11, + Address: "localhost", + Hostname: "localhost", + Labels: map[string]string{ + sessionutil.LabelResourceGroup: "rg11", + }, + })) + suite.manager.HandleNodeUp(ctx, 11) + suite.Equal(1, suite.manager.GetResourceGroup(ctx, "rg11").NodeNum()) } func (suite *ResourceManagerSuite) TestNodeUpAndDown() { diff --git a/internal/querycoordv2/session/node_manager.go b/internal/querycoordv2/session/node_manager.go index 07e6b45725..9c465da80a 100644 --- a/internal/querycoordv2/session/node_manager.go +++ b/internal/querycoordv2/session/node_manager.go @@ -171,7 +171,15 @@ func (n *NodeInfo) IsInStandalone() bool { } func (n *NodeInfo) IsEmbeddedQueryNodeInStreamingNode() bool { - return n.immutableInfo.Labels[sessionutil.LabelStreamingNodeEmbeddedQueryNode] == "1" + // Since 2.6.8, we introduce a new label rule for session, + // the LegacyLabelStreamingNodeEmbeddedQueryNode is used before 2.6.8, so we need to check both key to keep compatibility. + return n.immutableInfo.Labels[sessionutil.LabelStreamingNodeEmbeddedQueryNode] == "1" || + n.immutableInfo.Labels[sessionutil.LegacyLabelStreamingNodeEmbeddedQueryNode] == "1" +} + +// ResourceGroupName returns the resource group name of the current node. +func (n *NodeInfo) ResourceGroupName() string { + return n.immutableInfo.Labels[sessionutil.LabelResourceGroup] } func (n *NodeInfo) SegmentCnt() int { diff --git a/internal/querycoordv2/session/node_manager_test.go b/internal/querycoordv2/session/node_manager_test.go index 29b83ce212..6f225da719 100644 --- a/internal/querycoordv2/session/node_manager_test.go +++ b/internal/querycoordv2/session/node_manager_test.go @@ -21,6 +21,8 @@ import ( "time" "github.com/stretchr/testify/suite" + + "github.com/milvus-io/milvus/internal/util/sessionutil" ) type NodeManagerSuite struct { @@ -230,6 +232,29 @@ func (s *NodeManagerSuite) TestMemCapacityFunctionality() { s.Equal(2048.75, node.MemCapacity()) } +func (s *NodeManagerSuite) TestNodeInfoLabels() { + info := ImmutableNodeInfo{ + NodeID: 1, + Address: "localhost", + Hostname: "localhost", + Labels: map[string]string{ + sessionutil.LegacyLabelStreamingNodeEmbeddedQueryNode: "1", + }, + } + + info2 := NodeInfo{ + immutableInfo: info, + } + s.True(info2.IsEmbeddedQueryNodeInStreamingNode()) + + info2.immutableInfo.Labels[sessionutil.LabelStreamingNodeEmbeddedQueryNode] = "1" + delete(info2.immutableInfo.Labels, sessionutil.LegacyLabelStreamingNodeEmbeddedQueryNode) + s.True(info2.IsEmbeddedQueryNodeInStreamingNode()) + + info.Labels[sessionutil.LabelResourceGroup] = "rg1" + s.Equal("rg1", info2.ResourceGroupName()) +} + func TestNodeManagerSuite(t *testing.T) { suite.Run(t, new(NodeManagerSuite)) } diff --git a/internal/util/sessionutil/labels.go b/internal/util/sessionutil/labels.go new file mode 100644 index 0000000000..062a5af472 --- /dev/null +++ b/internal/util/sessionutil/labels.go @@ -0,0 +1,125 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sessionutil + +import ( + "os" + "strings" + + "github.com/milvus-io/milvus/pkg/v2/util/typeutil" +) + +// milvus server label rules: +// The server label of milvus will be injected into the session of milvus server, +// which can be found at SessionRaw.ServerLabels. +// The label is a envionment variable, the key must be prefixed with "MILVUS_SERVER_LABEL_". +// There are two types of labels: +// +// role-specific labels and non-role-specific labels. +// The key of role-specific labels have the format of "MILVUS_SERVER_LABEL__