enhance: Enable node assign policy on resource group (#36968) (#37588)

issue: #36977
pr: #36968
with node_label_filter on resource group, user can add label on
querynode with env `MILVUS_COMPONENT_LABEL`, then resource group will
prefer to accept node which match it's node_label_filter.

then querynode's can't be group by labels, and put querynodes with same
label to same resource groups.

---------

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
wei liu 2024-11-13 11:10:29 +08:00 committed by GitHub
parent 7d1c899155
commit 6dc879b1e2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 968 additions and 139 deletions

View File

@ -6,7 +6,7 @@ require (
github.com/blang/semver/v4 v4.0.0
github.com/cockroachdb/errors v1.9.1
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.12
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.16-0.20241111072219-bd85c098660b
github.com/milvus-io/milvus/pkg v0.0.2-0.20240317152703-17b4938985f3
github.com/quasilyte/go-ruleguard/dsl v0.3.22
github.com/samber/lo v1.27.0

View File

@ -402,6 +402,8 @@ github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/le
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.12 h1:DJhU9U5bNsCn9zT+0pgHp2IKtSwro6igiThhtCcjhYs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.12/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.16-0.20241111072219-bd85c098660b h1:IeH5mMgI9GyMES3/pLcsOVIAQmbJMAj48+xVVybvC9M=
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.16-0.20241111072219-bd85c098660b/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus/pkg v0.0.2-0.20240317152703-17b4938985f3 h1:ZBpRWhBa7FTFxW4YYVv9AUESoW1Xyb3KNXTzTqfkZmw=
github.com/milvus-io/milvus/pkg v0.0.2-0.20240317152703-17b4938985f3/go.mod h1:jQ2BUZny1COsgv1Qbcv8dmbppW+V9J/c4YQZNb3EOm8=
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=

View File

@ -852,6 +852,61 @@ func (_c *MilvusServiceServer_CreatePartition_Call) RunAndReturn(run func(contex
return _c
}
// CreatePrivilegeGroup provides a mock function with given fields: _a0, _a1
func (_m *MilvusServiceServer) CreatePrivilegeGroup(_a0 context.Context, _a1 *milvuspb.CreatePrivilegeGroupRequest) (*commonpb.Status, error) {
ret := _m.Called(_a0, _a1)
var r0 *commonpb.Status
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CreatePrivilegeGroupRequest) (*commonpb.Status, error)); ok {
return rf(_a0, _a1)
}
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CreatePrivilegeGroupRequest) *commonpb.Status); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*commonpb.Status)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.CreatePrivilegeGroupRequest) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MilvusServiceServer_CreatePrivilegeGroup_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CreatePrivilegeGroup'
type MilvusServiceServer_CreatePrivilegeGroup_Call struct {
*mock.Call
}
// CreatePrivilegeGroup is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *milvuspb.CreatePrivilegeGroupRequest
func (_e *MilvusServiceServer_Expecter) CreatePrivilegeGroup(_a0 interface{}, _a1 interface{}) *MilvusServiceServer_CreatePrivilegeGroup_Call {
return &MilvusServiceServer_CreatePrivilegeGroup_Call{Call: _e.mock.On("CreatePrivilegeGroup", _a0, _a1)}
}
func (_c *MilvusServiceServer_CreatePrivilegeGroup_Call) Run(run func(_a0 context.Context, _a1 *milvuspb.CreatePrivilegeGroupRequest)) *MilvusServiceServer_CreatePrivilegeGroup_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*milvuspb.CreatePrivilegeGroupRequest))
})
return _c
}
func (_c *MilvusServiceServer_CreatePrivilegeGroup_Call) Return(_a0 *commonpb.Status, _a1 error) *MilvusServiceServer_CreatePrivilegeGroup_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MilvusServiceServer_CreatePrivilegeGroup_Call) RunAndReturn(run func(context.Context, *milvuspb.CreatePrivilegeGroupRequest) (*commonpb.Status, error)) *MilvusServiceServer_CreatePrivilegeGroup_Call {
_c.Call.Return(run)
return _c
}
// CreateResourceGroup provides a mock function with given fields: _a0, _a1
func (_m *MilvusServiceServer) CreateResourceGroup(_a0 context.Context, _a1 *milvuspb.CreateResourceGroupRequest) (*commonpb.Status, error) {
ret := _m.Called(_a0, _a1)
@ -1677,6 +1732,61 @@ func (_c *MilvusServiceServer_DropPartition_Call) RunAndReturn(run func(context.
return _c
}
// DropPrivilegeGroup provides a mock function with given fields: _a0, _a1
func (_m *MilvusServiceServer) DropPrivilegeGroup(_a0 context.Context, _a1 *milvuspb.DropPrivilegeGroupRequest) (*commonpb.Status, error) {
ret := _m.Called(_a0, _a1)
var r0 *commonpb.Status
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.DropPrivilegeGroupRequest) (*commonpb.Status, error)); ok {
return rf(_a0, _a1)
}
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.DropPrivilegeGroupRequest) *commonpb.Status); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*commonpb.Status)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.DropPrivilegeGroupRequest) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MilvusServiceServer_DropPrivilegeGroup_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DropPrivilegeGroup'
type MilvusServiceServer_DropPrivilegeGroup_Call struct {
*mock.Call
}
// DropPrivilegeGroup is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *milvuspb.DropPrivilegeGroupRequest
func (_e *MilvusServiceServer_Expecter) DropPrivilegeGroup(_a0 interface{}, _a1 interface{}) *MilvusServiceServer_DropPrivilegeGroup_Call {
return &MilvusServiceServer_DropPrivilegeGroup_Call{Call: _e.mock.On("DropPrivilegeGroup", _a0, _a1)}
}
func (_c *MilvusServiceServer_DropPrivilegeGroup_Call) Run(run func(_a0 context.Context, _a1 *milvuspb.DropPrivilegeGroupRequest)) *MilvusServiceServer_DropPrivilegeGroup_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*milvuspb.DropPrivilegeGroupRequest))
})
return _c
}
func (_c *MilvusServiceServer_DropPrivilegeGroup_Call) Return(_a0 *commonpb.Status, _a1 error) *MilvusServiceServer_DropPrivilegeGroup_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MilvusServiceServer_DropPrivilegeGroup_Call) RunAndReturn(run func(context.Context, *milvuspb.DropPrivilegeGroupRequest) (*commonpb.Status, error)) *MilvusServiceServer_DropPrivilegeGroup_Call {
_c.Call.Return(run)
return _c
}
// DropResourceGroup provides a mock function with given fields: _a0, _a1
func (_m *MilvusServiceServer) DropResourceGroup(_a0 context.Context, _a1 *milvuspb.DropResourceGroupRequest) (*commonpb.Status, error) {
ret := _m.Called(_a0, _a1)
@ -3492,6 +3602,61 @@ func (_c *MilvusServiceServer_ListIndexedSegment_Call) RunAndReturn(run func(con
return _c
}
// ListPrivilegeGroups provides a mock function with given fields: _a0, _a1
func (_m *MilvusServiceServer) ListPrivilegeGroups(_a0 context.Context, _a1 *milvuspb.ListPrivilegeGroupsRequest) (*milvuspb.ListPrivilegeGroupsResponse, error) {
ret := _m.Called(_a0, _a1)
var r0 *milvuspb.ListPrivilegeGroupsResponse
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.ListPrivilegeGroupsRequest) (*milvuspb.ListPrivilegeGroupsResponse, error)); ok {
return rf(_a0, _a1)
}
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.ListPrivilegeGroupsRequest) *milvuspb.ListPrivilegeGroupsResponse); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*milvuspb.ListPrivilegeGroupsResponse)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.ListPrivilegeGroupsRequest) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MilvusServiceServer_ListPrivilegeGroups_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListPrivilegeGroups'
type MilvusServiceServer_ListPrivilegeGroups_Call struct {
*mock.Call
}
// ListPrivilegeGroups is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *milvuspb.ListPrivilegeGroupsRequest
func (_e *MilvusServiceServer_Expecter) ListPrivilegeGroups(_a0 interface{}, _a1 interface{}) *MilvusServiceServer_ListPrivilegeGroups_Call {
return &MilvusServiceServer_ListPrivilegeGroups_Call{Call: _e.mock.On("ListPrivilegeGroups", _a0, _a1)}
}
func (_c *MilvusServiceServer_ListPrivilegeGroups_Call) Run(run func(_a0 context.Context, _a1 *milvuspb.ListPrivilegeGroupsRequest)) *MilvusServiceServer_ListPrivilegeGroups_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*milvuspb.ListPrivilegeGroupsRequest))
})
return _c
}
func (_c *MilvusServiceServer_ListPrivilegeGroups_Call) Return(_a0 *milvuspb.ListPrivilegeGroupsResponse, _a1 error) *MilvusServiceServer_ListPrivilegeGroups_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MilvusServiceServer_ListPrivilegeGroups_Call) RunAndReturn(run func(context.Context, *milvuspb.ListPrivilegeGroupsRequest) (*milvuspb.ListPrivilegeGroupsResponse, error)) *MilvusServiceServer_ListPrivilegeGroups_Call {
_c.Call.Return(run)
return _c
}
// ListResourceGroups provides a mock function with given fields: _a0, _a1
func (_m *MilvusServiceServer) ListResourceGroups(_a0 context.Context, _a1 *milvuspb.ListResourceGroupsRequest) (*milvuspb.ListResourceGroupsResponse, error) {
ret := _m.Called(_a0, _a1)
@ -3822,6 +3987,61 @@ func (_c *MilvusServiceServer_OperatePrivilege_Call) RunAndReturn(run func(conte
return _c
}
// OperatePrivilegeGroup provides a mock function with given fields: _a0, _a1
func (_m *MilvusServiceServer) OperatePrivilegeGroup(_a0 context.Context, _a1 *milvuspb.OperatePrivilegeGroupRequest) (*commonpb.Status, error) {
ret := _m.Called(_a0, _a1)
var r0 *commonpb.Status
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.OperatePrivilegeGroupRequest) (*commonpb.Status, error)); ok {
return rf(_a0, _a1)
}
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.OperatePrivilegeGroupRequest) *commonpb.Status); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*commonpb.Status)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.OperatePrivilegeGroupRequest) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MilvusServiceServer_OperatePrivilegeGroup_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'OperatePrivilegeGroup'
type MilvusServiceServer_OperatePrivilegeGroup_Call struct {
*mock.Call
}
// OperatePrivilegeGroup is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *milvuspb.OperatePrivilegeGroupRequest
func (_e *MilvusServiceServer_Expecter) OperatePrivilegeGroup(_a0 interface{}, _a1 interface{}) *MilvusServiceServer_OperatePrivilegeGroup_Call {
return &MilvusServiceServer_OperatePrivilegeGroup_Call{Call: _e.mock.On("OperatePrivilegeGroup", _a0, _a1)}
}
func (_c *MilvusServiceServer_OperatePrivilegeGroup_Call) Run(run func(_a0 context.Context, _a1 *milvuspb.OperatePrivilegeGroupRequest)) *MilvusServiceServer_OperatePrivilegeGroup_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*milvuspb.OperatePrivilegeGroupRequest))
})
return _c
}
func (_c *MilvusServiceServer_OperatePrivilegeGroup_Call) Return(_a0 *commonpb.Status, _a1 error) *MilvusServiceServer_OperatePrivilegeGroup_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MilvusServiceServer_OperatePrivilegeGroup_Call) RunAndReturn(run func(context.Context, *milvuspb.OperatePrivilegeGroupRequest) (*commonpb.Status, error)) *MilvusServiceServer_OperatePrivilegeGroup_Call {
_c.Call.Return(run)
return _c
}
// OperateUserRole provides a mock function with given fields: _a0, _a1
func (_m *MilvusServiceServer) OperateUserRole(_a0 context.Context, _a1 *milvuspb.OperateUserRoleRequest) (*commonpb.Status, error) {
ret := _m.Called(_a0, _a1)

2
go.mod
View File

@ -26,7 +26,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/klauspost/compress v1.17.9
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.16-0.20241110064419-549e4694a7e7
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.16-0.20241111072219-bd85c098660b
github.com/minio/minio-go/v7 v7.0.73
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81
github.com/prometheus/client_golang v1.14.0

2
go.sum
View File

@ -610,6 +610,8 @@ github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZz
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.16-0.20241110064419-549e4694a7e7 h1:gq5xxDS2EIYVk3ujO+sQgDWrhTTpsmV+r6Gm7dfFrt8=
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.16-0.20241110064419-549e4694a7e7/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.16-0.20241111072219-bd85c098660b h1:IeH5mMgI9GyMES3/pLcsOVIAQmbJMAj48+xVVybvC9M=
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.16-0.20241111072219-bd85c098660b/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-storage/go v0.0.0-20231227072638-ebd0b8e56d70 h1:Z+sp64fmAOxAG7mU0dfVOXvAXlwRB0c8a96rIM5HevI=
github.com/milvus-io/milvus-storage/go v0.0.0-20231227072638-ebd0b8e56d70/go.mod h1:GPETMcTZq1gLY1WA6Na5kiNAKnq8SEMMiVKUZrM3sho=
github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A=

View File

@ -6,6 +6,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/rgpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -33,20 +34,22 @@ type ResourceGroup struct {
name string
nodes typeutil.UniqueSet
cfg *rgpb.ResourceGroupConfig
nodeMgr *session.NodeManager
}
// NewResourceGroup create resource group.
func NewResourceGroup(name string, cfg *rgpb.ResourceGroupConfig) *ResourceGroup {
func NewResourceGroup(name string, cfg *rgpb.ResourceGroupConfig, nodeMgr *session.NodeManager) *ResourceGroup {
rg := &ResourceGroup{
name: name,
nodes: typeutil.NewUniqueSet(),
cfg: cfg,
nodeMgr: nodeMgr,
}
return rg
}
// NewResourceGroupFromMeta create resource group from meta.
func NewResourceGroupFromMeta(meta *querypb.ResourceGroup) *ResourceGroup {
func NewResourceGroupFromMeta(meta *querypb.ResourceGroup, nodeMgr *session.NodeManager) *ResourceGroup {
// Backward compatibility, recover the config from capacity.
if meta.Config == nil {
// If meta.Config is nil, which means the meta is from old version.
@ -57,7 +60,7 @@ func NewResourceGroupFromMeta(meta *querypb.ResourceGroup) *ResourceGroup {
meta.Config = newResourceGroupConfig(meta.Capacity, meta.Capacity)
}
}
rg := NewResourceGroup(meta.Name, meta.Config)
rg := NewResourceGroup(meta.Name, meta.Config, nodeMgr)
for _, node := range meta.GetNodes() {
rg.nodes.Insert(node)
}
@ -91,14 +94,27 @@ func (rg *ResourceGroup) GetConfigCloned() *rgpb.ResourceGroupConfig {
return proto.Clone(rg.cfg).(*rgpb.ResourceGroupConfig)
}
// GetNodes return nodes of resource group.
// GetNodes return nodes of resource group which match required node labels
func (rg *ResourceGroup) GetNodes() []int64 {
requiredNodeLabels := rg.GetConfig().GetNodeFilter().GetNodeLabels()
if len(requiredNodeLabels) == 0 {
return rg.nodes.Collect()
}
ret := make([]int64, 0)
rg.nodes.Range(func(nodeID int64) bool {
if rg.AcceptNode(nodeID) {
ret = append(ret, nodeID)
}
return true
})
return ret
}
// NodeNum return node count of resource group.
// NodeNum return node count of resource group which match required node labels
func (rg *ResourceGroup) NodeNum() int {
return rg.nodes.Len()
return len(rg.GetNodes())
}
// ContainNode return whether resource group contain node.
@ -106,40 +122,104 @@ func (rg *ResourceGroup) ContainNode(id int64) bool {
return rg.nodes.Contain(id)
}
// OversizedNumOfNodes return oversized nodes count. `len(node) - requests`
// OversizedNumOfNodes return oversized nodes count. `NodeNum - requests`
func (rg *ResourceGroup) OversizedNumOfNodes() int {
oversized := rg.nodes.Len() - int(rg.cfg.Requests.NodeNum)
oversized := rg.NodeNum() - int(rg.cfg.Requests.NodeNum)
if oversized < 0 {
return 0
oversized = 0
}
return oversized
return oversized + len(rg.getDirtyNode())
}
// MissingNumOfNodes return lack nodes count. `requests - len(node)`
// MissingNumOfNodes return lack nodes count. `requests - NodeNum`
func (rg *ResourceGroup) MissingNumOfNodes() int {
missing := int(rg.cfg.Requests.NodeNum) - len(rg.nodes)
missing := int(rg.cfg.Requests.NodeNum) - rg.NodeNum()
if missing < 0 {
return 0
}
return missing
}
// ReachLimitNumOfNodes return reach limit nodes count. `limits - len(node)`
// ReachLimitNumOfNodes return reach limit nodes count. `limits - NodeNum`
func (rg *ResourceGroup) ReachLimitNumOfNodes() int {
reachLimit := int(rg.cfg.Limits.NodeNum) - len(rg.nodes)
reachLimit := int(rg.cfg.Limits.NodeNum) - rg.NodeNum()
if reachLimit < 0 {
return 0
}
return reachLimit
}
// RedundantOfNodes return redundant nodes count. `len(node) - limits`
// RedundantOfNodes return redundant nodes count. `len(node) - limits` or len(dirty_nodes)
func (rg *ResourceGroup) RedundantNumOfNodes() int {
redundant := len(rg.nodes) - int(rg.cfg.Limits.NodeNum)
redundant := rg.NodeNum() - int(rg.cfg.Limits.NodeNum)
if redundant < 0 {
return 0
redundant = 0
}
return redundant
return redundant + len(rg.getDirtyNode())
}
func (rg *ResourceGroup) getDirtyNode() []int64 {
dirtyNodes := make([]int64, 0)
rg.nodes.Range(func(nodeID int64) bool {
if !rg.AcceptNode(nodeID) {
dirtyNodes = append(dirtyNodes, nodeID)
}
return true
})
return dirtyNodes
}
func (rg *ResourceGroup) SelectNodeForRG(targetRG *ResourceGroup) int64 {
// try to move out dirty node
for _, node := range rg.getDirtyNode() {
if targetRG.AcceptNode(node) {
return node
}
}
// try to move out oversized node
oversized := rg.NodeNum() - int(rg.cfg.Requests.NodeNum)
if oversized > 0 {
for _, node := range rg.GetNodes() {
if targetRG.AcceptNode(node) {
return node
}
}
}
return -1
}
// return node and priority.
func (rg *ResourceGroup) AcceptNode(nodeID int64) bool {
if rg.GetName() == DefaultResourceGroupName {
return true
}
nodeInfo := rg.nodeMgr.Get(nodeID)
if nodeInfo == nil {
return false
}
requiredNodeLabels := rg.GetConfig().GetNodeFilter().GetNodeLabels()
if len(requiredNodeLabels) == 0 {
return true
}
nodeLabels := nodeInfo.Labels()
if len(nodeLabels) == 0 {
return false
}
for _, labelPair := range requiredNodeLabels {
valueInNode, ok := nodeLabels[labelPair.Key]
if !ok || valueInNode != labelPair.Value {
return false
}
}
return true
}
// HasFrom return whether given resource group is in `from` of rg.
@ -179,6 +259,7 @@ func (rg *ResourceGroup) Snapshot() *ResourceGroup {
name: rg.name,
nodes: rg.nodes.Clone(),
cfg: rg.GetConfigCloned(),
nodeMgr: rg.nodeMgr,
}
}
@ -186,18 +267,18 @@ func (rg *ResourceGroup) Snapshot() *ResourceGroup {
// Return error with reason if not meet requirement.
func (rg *ResourceGroup) MeetRequirement() error {
// if len(node) is less than requests, new node need to be assigned.
if rg.nodes.Len() < int(rg.cfg.Requests.NodeNum) {
if rg.MissingNumOfNodes() > 0 {
return errors.Errorf(
"has %d nodes, less than request %d",
rg.nodes.Len(),
rg.NodeNum(),
rg.cfg.Requests.NodeNum,
)
}
// if len(node) is greater than limits, node need to be removed.
if rg.nodes.Len() > int(rg.cfg.Limits.NodeNum) {
if rg.RedundantNumOfNodes() > 0 {
return errors.Errorf(
"has %d nodes, greater than limit %d",
rg.nodes.Len(),
rg.NodeNum(),
rg.cfg.Requests.NodeNum,
)
}

View File

@ -5,8 +5,11 @@ import (
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/rgpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
func TestResourceGroup(t *testing.T) {
@ -24,7 +27,10 @@ func TestResourceGroup(t *testing.T) {
ResourceGroup: "rg3",
}},
}
rg := NewResourceGroup("rg1", cfg)
nodeMgr := session.NewNodeManager()
rg := NewResourceGroup("rg1", cfg, nodeMgr)
cfg2 := rg.GetConfig()
assert.Equal(t, cfg.Requests.NodeNum, cfg2.Requests.NodeNum)
@ -84,6 +90,9 @@ func TestResourceGroup(t *testing.T) {
}
assertion()
nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
}))
// Test AddNode
mrg = rg.CopyForWrite()
mrg.AssignNode(1)
@ -108,6 +117,9 @@ func TestResourceGroup(t *testing.T) {
}
assertion()
nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 2,
}))
// Test AddNode until meet requirement.
mrg = rg.CopyForWrite()
mrg.AssignNode(2)
@ -132,6 +144,12 @@ func TestResourceGroup(t *testing.T) {
}
assertion()
nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 3,
}))
nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 4,
}))
// Test AddNode until exceed requirement.
mrg = rg.CopyForWrite()
mrg.AssignNode(3)
@ -202,12 +220,21 @@ func TestResourceGroup(t *testing.T) {
}
func TestResourceGroupMeta(t *testing.T) {
nodeMgr := session.NewNodeManager()
nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
}))
nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 2,
}))
rgMeta := &querypb.ResourceGroup{
Name: "rg1",
Capacity: 1,
Nodes: []int64{1, 2},
}
rg := NewResourceGroupFromMeta(rgMeta)
rg := NewResourceGroupFromMeta(rgMeta, nodeMgr)
assert.Equal(t, "rg1", rg.GetName())
assert.ElementsMatch(t, []int64{1, 2}, rg.GetNodes())
assert.Equal(t, 2, rg.NodeNum())
@ -225,6 +252,9 @@ func TestResourceGroupMeta(t *testing.T) {
assert.False(t, rg.ContainNode(4))
assert.Error(t, rg.MeetRequirement())
nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 4,
}))
rgMeta = &querypb.ResourceGroup{
Name: "rg1",
Capacity: 1,
@ -244,7 +274,7 @@ func TestResourceGroupMeta(t *testing.T) {
}},
},
}
rg = NewResourceGroupFromMeta(rgMeta)
rg = NewResourceGroupFromMeta(rgMeta, nodeMgr)
assert.Equal(t, "rg1", rg.GetName())
assert.ElementsMatch(t, []int64{1, 2, 4}, rg.GetNodes())
assert.Equal(t, 3, rg.NodeNum())
@ -271,7 +301,7 @@ func TestResourceGroupMeta(t *testing.T) {
Capacity: defaultResourceGroupCapacity,
Nodes: []int64{1, 2},
}
rg = NewResourceGroupFromMeta(rgMeta)
rg = NewResourceGroupFromMeta(rgMeta, nodeMgr)
assert.Equal(t, DefaultResourceGroupName, rg.GetName())
assert.ElementsMatch(t, []int64{1, 2}, rg.GetNodes())
assert.Equal(t, 2, rg.NodeNum())
@ -311,7 +341,7 @@ func TestResourceGroupMeta(t *testing.T) {
}},
},
}
rg = NewResourceGroupFromMeta(rgMeta)
rg = NewResourceGroupFromMeta(rgMeta, nodeMgr)
assert.Equal(t, DefaultResourceGroupName, rg.GetName())
assert.ElementsMatch(t, []int64{1, 2}, rg.GetNodes())
assert.Equal(t, 2, rg.NodeNum())
@ -332,3 +362,92 @@ func TestResourceGroupMeta(t *testing.T) {
newMeta = rg.GetMeta()
assert.Equal(t, int32(1000000), newMeta.Capacity)
}
func TestRGNodeFilter(t *testing.T) {
nodeMgr := session.NewNodeManager()
rg := NewResourceGroup("rg1", &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{
NodeNum: 3,
},
Limits: &rgpb.ResourceGroupLimit{
NodeNum: 3,
},
NodeFilter: &rgpb.ResourceGroupNodeFilter{
NodeLabels: []*commonpb.KeyValuePair{
{
Key: "dc_name",
Value: "dc1",
},
},
},
}, nodeMgr)
rg.nodes = typeutil.NewSet[int64](1, 2, 3)
nodeInfo1 := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
Labels: map[string]string{
"dc_name": "dc1",
},
})
nodeInfo2 := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 2,
Labels: map[string]string{
"dc_name": "dc1",
},
})
nodeInfo3 := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 3,
Labels: map[string]string{
"dc_name": "dc2",
},
})
nodeMgr.Add(nodeInfo1)
nodeMgr.Add(nodeInfo2)
nodeMgr.Add(nodeInfo3)
assert.True(t, rg.AcceptNode(1))
assert.True(t, rg.AcceptNode(2))
assert.False(t, rg.AcceptNode(3))
assert.Error(t, rg.MeetRequirement())
assert.Equal(t, rg.NodeNum(), 2)
assert.Len(t, rg.GetNodes(), 2)
rg2 := NewResourceGroup("rg2", &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{
NodeNum: 1,
},
Limits: &rgpb.ResourceGroupLimit{
NodeNum: 1,
},
NodeFilter: &rgpb.ResourceGroupNodeFilter{
NodeLabels: []*commonpb.KeyValuePair{
{
Key: "dc_name",
Value: "dc2",
},
},
},
}, nodeMgr)
assert.Equal(t, rg.SelectNodeForRG(rg2), int64(3))
rg3 := NewResourceGroup("rg2", &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{
NodeNum: 1,
},
Limits: &rgpb.ResourceGroupLimit{
NodeNum: 1,
},
NodeFilter: &rgpb.ResourceGroupNodeFilter{
NodeLabels: []*commonpb.KeyValuePair{
{
Key: "dc_name",
Value: "dc1",
},
},
},
}, nodeMgr)
assert.Equal(t, rg.SelectNodeForRG(rg3), int64(-1))
}

View File

@ -60,7 +60,7 @@ type ResourceManager struct {
func NewResourceManager(catalog metastore.QueryCoordCatalog, nodeMgr *session.NodeManager) *ResourceManager {
groups := make(map[string]*ResourceGroup)
// Always create a default resource group to keep compatibility.
groups[DefaultResourceGroupName] = NewResourceGroup(DefaultResourceGroupName, newResourceGroupConfig(0, defaultResourceGroupCapacity))
groups[DefaultResourceGroupName] = NewResourceGroup(DefaultResourceGroupName, newResourceGroupConfig(0, defaultResourceGroupCapacity), nodeMgr)
return &ResourceManager{
incomingNode: typeutil.NewUniqueSet(),
groups: groups,
@ -89,7 +89,7 @@ func (rm *ResourceManager) Recover() error {
for _, meta := range rgs {
needUpgrade := meta.Config == nil
rg := NewResourceGroupFromMeta(meta)
rg := NewResourceGroupFromMeta(meta, rm.nodeMgr)
rm.groups[rg.GetName()] = rg
for _, node := range rg.GetNodes() {
if _, ok := rm.nodeIDMap[node]; ok {
@ -145,7 +145,7 @@ func (rm *ResourceManager) AddResourceGroup(rgName string, cfg *rgpb.ResourceGro
return err
}
rg := NewResourceGroup(rgName, cfg)
rg := NewResourceGroup(rgName, cfg, rm.nodeMgr)
if err := rm.catalog.SaveResourceGroup(rg.GetMeta()); err != nil {
log.Warn("failed to add resource group",
zap.String("rgName", rgName),
@ -548,135 +548,157 @@ func (rm *ResourceManager) AutoRecoverResourceGroup(rgName string) error {
// recoverMissingNodeRG recover resource group by transfer node from other resource group.
func (rm *ResourceManager) recoverMissingNodeRG(rgName string) error {
for rm.groups[rgName].MissingNumOfNodes() > 0 {
rg := rm.groups[rgName]
sourceRG := rm.selectMissingRecoverSourceRG(rg)
targetRG := rm.groups[rgName]
node, sourceRG := rm.selectNodeForMissingRecover(targetRG)
if sourceRG == nil {
log.Warn("fail to select source resource group", zap.String("rgName", rg.GetName()))
log.Warn("fail to select source resource group", zap.String("rgName", targetRG.GetName()))
return ErrNodeNotEnough
}
nodeID, err := rm.transferOneNodeFromRGToRG(sourceRG, rg)
err := rm.transferNode(targetRG.GetName(), node)
if err != nil {
log.Warn("failed to recover missing node by transfer node from other resource group",
zap.String("sourceRG", sourceRG.GetName()),
zap.String("targetRG", rg.GetName()),
zap.String("targetRG", targetRG.GetName()),
zap.Int64("nodeID", node),
zap.Error(err))
return err
}
log.Info("recover missing node by transfer node from other resource group",
zap.String("sourceRG", sourceRG.GetName()),
zap.String("targetRG", rg.GetName()),
zap.Int64("nodeID", nodeID),
zap.String("targetRG", targetRG.GetName()),
zap.Int64("nodeID", node),
)
}
return nil
}
// selectMissingRecoverSourceRG select source resource group for recover missing resource group.
func (rm *ResourceManager) selectMissingRecoverSourceRG(rg *ResourceGroup) *ResourceGroup {
// First, Transfer node from most redundant resource group first. `len(nodes) > limits`
if redundantRG := rm.findMaxRGWithGivenFilter(
func(sourceRG *ResourceGroup) bool {
return rg.GetName() != sourceRG.GetName() && sourceRG.RedundantNumOfNodes() > 0
},
func(sourceRG *ResourceGroup) int {
return sourceRG.RedundantNumOfNodes()
},
); redundantRG != nil {
return redundantRG
// selectNodeForMissingRecover selects a node for missing recovery.
// It takes a target ResourceGroup and returns the selected node's ID and the source ResourceGroup with highest priority.
func (rm *ResourceManager) selectNodeForMissingRecover(targetRG *ResourceGroup) (int64, *ResourceGroup) {
computeRGPriority := func(rg *ResourceGroup) int {
// If the ResourceGroup has redundant nodes, boost it's priority its priority 1000,000.
if rg.RedundantNumOfNodes() > 0 {
return rg.RedundantNumOfNodes() * 1000000
}
// If the target ResourceGroup has a 'from' relationship with the current ResourceGroup,
// boost it's priority its priority 100,000.
if targetRG.HasFrom(rg.GetName()) {
return rg.OversizedNumOfNodes() * 100000
}
return rg.OversizedNumOfNodes()
}
// Second, Transfer node from most oversized resource group. `len(nodes) > requests`
// `TransferFrom` configured resource group at high priority.
return rm.findMaxRGWithGivenFilter(
func(sourceRG *ResourceGroup) bool {
return rg.GetName() != sourceRG.GetName() && sourceRG.OversizedNumOfNodes() > 0
},
func(sourceRG *ResourceGroup) int {
if rg.HasFrom(sourceRG.GetName()) {
// give a boost if sourceRG is configured as `TransferFrom` to set as high priority to select.
return sourceRG.OversizedNumOfNodes() * resourceGroupTransferBoost
maxPriority := 0
var sourceRG *ResourceGroup
candidateNode := int64(-1)
for _, rg := range rm.groups {
if rg.GetName() == targetRG.GetName() {
continue
}
return sourceRG.OversizedNumOfNodes()
})
if rg.OversizedNumOfNodes() <= 0 {
continue
}
priority := computeRGPriority(rg)
if priority > maxPriority {
// Select a node from the current resource group that is preferred to be removed and assigned to the target resource group.
node := rg.SelectNodeForRG(targetRG)
// If no such node is found, skip the current resource group.
if node == -1 {
continue
}
sourceRG = rg
candidateNode = node
maxPriority = priority
}
}
return candidateNode, sourceRG
}
// recoverRedundantNodeRG recover resource group by transfer node to other resource group.
func (rm *ResourceManager) recoverRedundantNodeRG(rgName string) error {
for rm.groups[rgName].RedundantNumOfNodes() > 0 {
rg := rm.groups[rgName]
targetRG := rm.selectRedundantRecoverTargetRG(rg)
if targetRG == nil {
sourceRG := rm.groups[rgName]
node, targetRG := rm.selectNodeForRedundantRecover(sourceRG)
if node == -1 {
log.Info("failed to select redundant recover target resource group, please check resource group configuration if as expected.",
zap.String("rgName", rg.GetName()))
zap.String("rgName", sourceRG.GetName()))
return errors.New("all resource group reach limits")
}
nodeID, err := rm.transferOneNodeFromRGToRG(rg, targetRG)
if err != nil {
if err := rm.transferNode(targetRG.GetName(), node); err != nil {
log.Warn("failed to recover redundant node by transfer node to other resource group",
zap.String("sourceRG", rg.GetName()),
zap.String("sourceRG", sourceRG.GetName()),
zap.String("targetRG", targetRG.GetName()),
zap.Int64("nodeID", node),
zap.Error(err))
return err
}
log.Info("recover redundant node by transfer node to other resource group",
zap.String("sourceRG", rg.GetName()),
zap.String("sourceRG", sourceRG.GetName()),
zap.String("targetRG", targetRG.GetName()),
zap.Int64("nodeID", nodeID),
zap.Int64("nodeID", node),
)
}
return nil
}
// selectRedundantRecoverTargetRG select target resource group for recover redundant resource group.
func (rm *ResourceManager) selectRedundantRecoverTargetRG(rg *ResourceGroup) *ResourceGroup {
// First, Transfer node to most missing resource group first.
if missingRG := rm.findMaxRGWithGivenFilter(
func(targetRG *ResourceGroup) bool {
return rg.GetName() != targetRG.GetName() && targetRG.MissingNumOfNodes() > 0
},
func(targetRG *ResourceGroup) int {
return targetRG.MissingNumOfNodes()
},
); missingRG != nil {
return missingRG
// selectNodeForRedundantRecover selects a node for redundant recovery.
// It takes a source ResourceGroup and returns the selected node's ID and the target ResourceGroup with highest priority.
func (rm *ResourceManager) selectNodeForRedundantRecover(sourceRG *ResourceGroup) (int64, *ResourceGroup) {
// computeRGPriority calculates the priority of a ResourceGroup based on certain conditions.
computeRGPriority := func(rg *ResourceGroup) int {
// If the ResourceGroup is missing nodes, boost it's priority by 1,000,000.
if rg.MissingNumOfNodes() > 0 {
return rg.MissingNumOfNodes() * 1000000
}
// If the source ResourceGroup has a 'to' relationship with the current ResourceGroup,
// boost it's priority by 1,000,00.
if sourceRG.HasTo(rg.GetName()) {
return rg.ReachLimitNumOfNodes() * 100000
}
return rg.ReachLimitNumOfNodes()
}
// Second, Transfer node to max reachLimit resource group.
// `TransferTo` configured resource group at high priority.
if selectRG := rm.findMaxRGWithGivenFilter(
func(targetRG *ResourceGroup) bool {
return rg.GetName() != targetRG.GetName() && targetRG.ReachLimitNumOfNodes() > 0
},
func(targetRG *ResourceGroup) int {
if rg.HasTo(targetRG.GetName()) {
// give a boost if targetRG is configured as `TransferTo` to set as high priority to select.
return targetRG.ReachLimitNumOfNodes() * resourceGroupTransferBoost
}
return targetRG.ReachLimitNumOfNodes()
},
); selectRG != nil {
return selectRG
maxPriority := 0
var targetRG *ResourceGroup
candidateNode := int64(-1)
for _, rg := range rm.groups {
if rg.GetName() == sourceRG.GetName() {
continue
}
// Finally, Always transfer node to default resource group.
if rg.GetName() != DefaultResourceGroupName {
return rm.groups[DefaultResourceGroupName]
if rg.ReachLimitNumOfNodes() <= 0 {
continue
}
return nil
}
// transferOneNodeFromRGToRG transfer one node from source resource group to target resource group.
func (rm *ResourceManager) transferOneNodeFromRGToRG(sourceRG *ResourceGroup, targetRG *ResourceGroup) (int64, error) {
if sourceRG.NodeNum() == 0 {
return -1, ErrNodeNotEnough
// Calculate the priority of the current resource group.
priority := computeRGPriority(rg)
if priority > maxPriority {
// select a node from it that is preferred to be removed and assigned to the target resource group.
node := sourceRG.SelectNodeForRG(rg)
// If no such node is found, skip the current resource group.
if node == -1 {
continue
}
// TODO: select node by some load strategy, such as segment loaded.
node := sourceRG.GetNodes()[0]
if err := rm.transferNode(targetRG.GetName(), node); err != nil {
return -1, err
candidateNode = node
targetRG = rg
maxPriority = priority
}
return node, nil
}
// Finally, always transfer the node to the default resource group if no other target resource group is found.
if targetRG == nil && sourceRG.GetName() != DefaultResourceGroupName {
targetRG = rm.groups[DefaultResourceGroupName]
if sourceRG != nil {
candidateNode = sourceRG.SelectNodeForRG(targetRG)
}
}
return candidateNode, targetRG
}
// assignIncomingNodeWithNodeCheck assign node to resource group with node status check.
@ -713,7 +735,7 @@ func (rm *ResourceManager) assignIncomingNode(node int64) (string, error) {
}
// select a resource group to assign incoming node.
rg = rm.mustSelectAssignIncomingNodeTargetRG()
rg = rm.mustSelectAssignIncomingNodeTargetRG(node)
if err := rm.transferNode(rg.GetName(), node); err != nil {
return "", errors.Wrap(err, "at finally assign to default resource group")
}
@ -721,11 +743,11 @@ func (rm *ResourceManager) assignIncomingNode(node int64) (string, error) {
}
// mustSelectAssignIncomingNodeTargetRG select resource group for assign incoming node.
func (rm *ResourceManager) mustSelectAssignIncomingNodeTargetRG() *ResourceGroup {
func (rm *ResourceManager) mustSelectAssignIncomingNodeTargetRG(nodeID int64) *ResourceGroup {
// First, Assign it to rg with the most missing nodes at high priority.
if rg := rm.findMaxRGWithGivenFilter(
func(rg *ResourceGroup) bool {
return rg.MissingNumOfNodes() > 0
return rg.MissingNumOfNodes() > 0 && rg.AcceptNode(nodeID)
},
func(rg *ResourceGroup) int {
return rg.MissingNumOfNodes()
@ -737,7 +759,7 @@ func (rm *ResourceManager) mustSelectAssignIncomingNodeTargetRG() *ResourceGroup
// Second, assign it to rg do not reach limit.
if rg := rm.findMaxRGWithGivenFilter(
func(rg *ResourceGroup) bool {
return rg.ReachLimitNumOfNodes() > 0
return rg.ReachLimitNumOfNodes() > 0 && rg.AcceptNode(nodeID)
},
func(rg *ResourceGroup) int {
return rg.ReachLimitNumOfNodes()

View File

@ -21,6 +21,7 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/rgpb"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/kv/mocks"
@ -28,6 +29,7 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/pkg/kv"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
@ -205,6 +207,7 @@ func (suite *ResourceManagerSuite) TestManipulateResourceGroup() {
suite.manager.UpdateResourceGroups(map[string]*rgpb.ResourceGroupConfig{
"rg2": newResourceGroupConfig(0, 0),
})
log.Info("xxxxx")
// RemoveResourceGroup will remove all nodes from the resource group.
err = suite.manager.RemoveResourceGroup("rg2")
suite.NoError(err)
@ -619,3 +622,337 @@ func (suite *ResourceManagerSuite) TestUnassignFail() {
suite.manager.HandleNodeDown(1)
})
}
func (suite *ResourceManagerSuite) TestNodeLabels_NodeAssign() {
suite.manager.AddResourceGroup("rg1", &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{
NodeNum: 10,
},
Limits: &rgpb.ResourceGroupLimit{
NodeNum: 10,
},
NodeFilter: &rgpb.ResourceGroupNodeFilter{
NodeLabels: []*commonpb.KeyValuePair{
{
Key: "dc_name",
Value: "label1",
},
},
},
})
suite.manager.AddResourceGroup("rg2", &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{
NodeNum: 10,
},
Limits: &rgpb.ResourceGroupLimit{
NodeNum: 10,
},
NodeFilter: &rgpb.ResourceGroupNodeFilter{
NodeLabels: []*commonpb.KeyValuePair{
{
Key: "dc_name",
Value: "label2",
},
},
},
})
suite.manager.AddResourceGroup("rg3", &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{
NodeNum: 10,
},
Limits: &rgpb.ResourceGroupLimit{
NodeNum: 10,
},
NodeFilter: &rgpb.ResourceGroupNodeFilter{
NodeLabels: []*commonpb.KeyValuePair{
{
Key: "dc_name",
Value: "label3",
},
},
},
})
// test that all query nodes has been marked label1
for i := 1; i <= 30; i++ {
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(i),
Address: "localhost",
Hostname: "localhost",
Labels: map[string]string{
"dc_name": "label1",
},
}))
suite.manager.HandleNodeUp(int64(i))
}
suite.Equal(10, suite.manager.GetResourceGroup("rg1").NodeNum())
suite.Equal(0, suite.manager.GetResourceGroup("rg2").NodeNum())
suite.Equal(0, suite.manager.GetResourceGroup("rg3").NodeNum())
suite.Equal(20, suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum())
// test new querynode with label2
for i := 31; i <= 40; i++ {
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(i),
Address: "localhost",
Hostname: "localhost",
Labels: map[string]string{
"dc_name": "label2",
},
}))
suite.manager.HandleNodeUp(int64(i))
}
suite.Equal(10, suite.manager.GetResourceGroup("rg1").NodeNum())
suite.Equal(10, suite.manager.GetResourceGroup("rg2").NodeNum())
suite.Equal(0, suite.manager.GetResourceGroup("rg3").NodeNum())
suite.Equal(20, suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum())
nodesInRG, _ := suite.manager.GetNodes("rg2")
for _, node := range nodesInRG {
suite.Equal("label2", suite.manager.nodeMgr.Get(node).Labels()["dc_name"])
}
// test new querynode with label3
for i := 41; i <= 50; i++ {
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(i),
Address: "localhost",
Hostname: "localhost",
Labels: map[string]string{
"dc_name": "label3",
},
}))
suite.manager.HandleNodeUp(int64(i))
}
suite.Equal(10, suite.manager.GetResourceGroup("rg1").NodeNum())
suite.Equal(10, suite.manager.GetResourceGroup("rg2").NodeNum())
suite.Equal(10, suite.manager.GetResourceGroup("rg3").NodeNum())
suite.Equal(20, suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum())
nodesInRG, _ = suite.manager.GetNodes("rg3")
for _, node := range nodesInRG {
suite.Equal("label3", suite.manager.nodeMgr.Get(node).Labels()["dc_name"])
}
// test swap rg's label
suite.manager.UpdateResourceGroups(map[string]*rgpb.ResourceGroupConfig{
"rg1": {
Requests: &rgpb.ResourceGroupLimit{
NodeNum: 10,
},
Limits: &rgpb.ResourceGroupLimit{
NodeNum: 10,
},
NodeFilter: &rgpb.ResourceGroupNodeFilter{
NodeLabels: []*commonpb.KeyValuePair{
{
Key: "dc_name",
Value: "label2",
},
},
},
},
"rg2": {
Requests: &rgpb.ResourceGroupLimit{
NodeNum: 10,
},
Limits: &rgpb.ResourceGroupLimit{
NodeNum: 10,
},
NodeFilter: &rgpb.ResourceGroupNodeFilter{
NodeLabels: []*commonpb.KeyValuePair{
{
Key: "dc_name",
Value: "label3",
},
},
},
},
"rg3": {
Requests: &rgpb.ResourceGroupLimit{
NodeNum: 10,
},
Limits: &rgpb.ResourceGroupLimit{
NodeNum: 10,
},
NodeFilter: &rgpb.ResourceGroupNodeFilter{
NodeLabels: []*commonpb.KeyValuePair{
{
Key: "dc_name",
Value: "label1",
},
},
},
},
})
log.Info("test swap rg's label")
for i := 0; i < 4; i++ {
suite.manager.AutoRecoverResourceGroup("rg1")
suite.manager.AutoRecoverResourceGroup("rg2")
suite.manager.AutoRecoverResourceGroup("rg3")
suite.manager.AutoRecoverResourceGroup(DefaultResourceGroupName)
}
suite.Equal(10, suite.manager.GetResourceGroup("rg1").NodeNum())
suite.Equal(10, suite.manager.GetResourceGroup("rg2").NodeNum())
suite.Equal(10, suite.manager.GetResourceGroup("rg3").NodeNum())
suite.Equal(20, suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum())
nodesInRG, _ = suite.manager.GetNodes("rg1")
for _, node := range nodesInRG {
suite.Equal("label2", suite.manager.nodeMgr.Get(node).Labels()["dc_name"])
}
nodesInRG, _ = suite.manager.GetNodes("rg2")
for _, node := range nodesInRG {
suite.Equal("label3", suite.manager.nodeMgr.Get(node).Labels()["dc_name"])
}
nodesInRG, _ = suite.manager.GetNodes("rg3")
for _, node := range nodesInRG {
suite.Equal("label1", suite.manager.nodeMgr.Get(node).Labels()["dc_name"])
}
}
func (suite *ResourceManagerSuite) TestNodeLabels_NodeDown() {
suite.manager.AddResourceGroup("rg1", &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{
NodeNum: 10,
},
Limits: &rgpb.ResourceGroupLimit{
NodeNum: 10,
},
NodeFilter: &rgpb.ResourceGroupNodeFilter{
NodeLabels: []*commonpb.KeyValuePair{
{
Key: "dc_name",
Value: "label1",
},
},
},
})
suite.manager.AddResourceGroup("rg2", &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{
NodeNum: 10,
},
Limits: &rgpb.ResourceGroupLimit{
NodeNum: 10,
},
NodeFilter: &rgpb.ResourceGroupNodeFilter{
NodeLabels: []*commonpb.KeyValuePair{
{
Key: "dc_name",
Value: "label2",
},
},
},
})
suite.manager.AddResourceGroup("rg3", &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{
NodeNum: 10,
},
Limits: &rgpb.ResourceGroupLimit{
NodeNum: 10,
},
NodeFilter: &rgpb.ResourceGroupNodeFilter{
NodeLabels: []*commonpb.KeyValuePair{
{
Key: "dc_name",
Value: "label3",
},
},
},
})
// test that all query nodes has been marked label1
for i := 1; i <= 10; i++ {
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(i),
Address: "localhost",
Hostname: "localhost",
Labels: map[string]string{
"dc_name": "label1",
},
}))
suite.manager.HandleNodeUp(int64(i))
}
// test new querynode with label2
for i := 31; i <= 40; i++ {
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(i),
Address: "localhost",
Hostname: "localhost",
Labels: map[string]string{
"dc_name": "label2",
},
}))
suite.manager.HandleNodeUp(int64(i))
}
// test new querynode with label3
for i := 41; i <= 50; i++ {
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(i),
Address: "localhost",
Hostname: "localhost",
Labels: map[string]string{
"dc_name": "label3",
},
}))
suite.manager.HandleNodeUp(int64(i))
}
suite.Equal(10, suite.manager.GetResourceGroup("rg1").NodeNum())
suite.Equal(10, suite.manager.GetResourceGroup("rg2").NodeNum())
suite.Equal(10, suite.manager.GetResourceGroup("rg3").NodeNum())
// test node down with label1
suite.manager.HandleNodeDown(int64(1))
suite.manager.nodeMgr.Remove(int64(1))
suite.Equal(9, suite.manager.GetResourceGroup("rg1").NodeNum())
suite.Equal(10, suite.manager.GetResourceGroup("rg2").NodeNum())
suite.Equal(10, suite.manager.GetResourceGroup("rg3").NodeNum())
// test node up with label2
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(101),
Address: "localhost",
Hostname: "localhost",
Labels: map[string]string{
"dc_name": "label2",
},
}))
suite.manager.HandleNodeUp(int64(101))
suite.Equal(9, suite.manager.GetResourceGroup("rg1").NodeNum())
suite.Equal(10, suite.manager.GetResourceGroup("rg2").NodeNum())
suite.Equal(10, suite.manager.GetResourceGroup("rg3").NodeNum())
suite.Equal(1, suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum())
// test node up with label1
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: int64(102),
Address: "localhost",
Hostname: "localhost",
Labels: map[string]string{
"dc_name": "label1",
},
}))
suite.manager.HandleNodeUp(int64(102))
suite.Equal(10, suite.manager.GetResourceGroup("rg1").NodeNum())
suite.Equal(10, suite.manager.GetResourceGroup("rg2").NodeNum())
suite.Equal(10, suite.manager.GetResourceGroup("rg3").NodeNum())
suite.Equal(1, suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum())
nodesInRG, _ := suite.manager.GetNodes("rg1")
for _, node := range nodesInRG {
suite.Equal("label1", suite.manager.nodeMgr.Get(node).Labels()["dc_name"])
}
suite.manager.AutoRecoverResourceGroup("rg1")
suite.manager.AutoRecoverResourceGroup("rg2")
suite.manager.AutoRecoverResourceGroup("rg3")
suite.manager.AutoRecoverResourceGroup(DefaultResourceGroupName)
nodesInRG, _ = suite.manager.GetNodes(DefaultResourceGroupName)
for _, node := range nodesInRG {
suite.Equal("label2", suite.manager.nodeMgr.Get(node).Labels()["dc_name"])
}
}

View File

@ -130,12 +130,8 @@ func (suite *ResourceObserverSuite) TestObserverRecoverOperation() {
suite.NoError(suite.meta.ResourceManager.MeetRequirement("rg1"))
suite.NoError(suite.meta.ResourceManager.MeetRequirement("rg2"))
suite.NoError(suite.meta.ResourceManager.MeetRequirement("rg3"))
// but new node with id 10 is not in
suite.nodeMgr.Remove(10)
suite.NoError(suite.meta.ResourceManager.MeetRequirement("rg1"))
suite.NoError(suite.meta.ResourceManager.MeetRequirement("rg2"))
suite.NoError(suite.meta.ResourceManager.MeetRequirement("rg3"))
// new node is down, rg3 cannot use that node anymore.
suite.nodeMgr.Remove(10)
suite.meta.ResourceManager.HandleNodeDown(10)
suite.observer.checkAndRecoverResourceGroup()
suite.NoError(suite.meta.ResourceManager.MeetRequirement("rg1"))

View File

@ -459,6 +459,7 @@ func (s *Server) startQueryCoord() error {
Address: node.Address,
Hostname: node.HostName,
Version: node.Version,
Labels: node.GetServerLabel(),
}))
s.taskScheduler.AddExecutor(node.ServerID)
@ -698,6 +699,7 @@ func (s *Server) watchNodes(revision int64) {
Address: addr,
Hostname: event.Session.HostName,
Version: event.Session.Version,
Labels: event.Session.GetServerLabel(),
}))
s.nodeUpEventChan <- nodeID
select {

View File

@ -111,6 +111,7 @@ type ImmutableNodeInfo struct {
Address string
Hostname string
Version semver.Version
Labels map[string]string
}
const (
@ -147,6 +148,10 @@ func (n *NodeInfo) Hostname() string {
return n.immutableInfo.Hostname
}
func (n *NodeInfo) Labels() map[string]string {
return n.immutableInfo.Labels
}
func (n *NodeInfo) SegmentCnt() int {
n.mu.RLock()
defer n.mu.RUnlock()

View File

@ -49,6 +49,7 @@ const (
DefaultServiceRoot = "session/"
// DefaultIDKey default id key for Session
DefaultIDKey = "id"
SupportedLabelPrefix = "MILVUS_SERVER_LABEL_"
)
// SessionEventType session event type
@ -102,6 +103,7 @@ type SessionRaw struct {
HostName string `json:"HostName,omitempty"`
EnableDisk bool `json:"EnableDisk,omitempty"`
ServerLabels map[string]string `json:"ServerLabels,omitempty"`
}
func (s *SessionRaw) GetAddress() string {
@ -112,6 +114,10 @@ func (s *SessionRaw) GetServerID() int64 {
return s.ServerID
}
func (s *SessionRaw) GetServerLabel() map[string]string {
return s.ServerLabels
}
func (s *SessionRaw) IsTriggerKill() bool {
return s.TriggerKill
}
@ -286,7 +292,8 @@ func (s *Session) Init(serverName, address string, exclusive bool, triggerKill b
panic(err)
}
s.ServerID = serverID
log.Info("start server", zap.String("name", serverName), zap.String("address", address), zap.Int64("id", s.ServerID))
s.ServerLabels = GetServerLabelsFromEnv(serverName)
log.Info("start server", zap.String("name", serverName), zap.String("address", address), zap.Int64("id", s.ServerID), zap.Any("server_labels", s.ServerLabels))
}
// String makes Session struct able to be logged by zap
@ -329,6 +336,25 @@ func (s *Session) getServerID() (int64, error) {
return nodeID, nil
}
func GetServerLabelsFromEnv(role string) map[string]string {
ret := make(map[string]string)
switch role {
case "querynode":
for _, value := range os.Environ() {
rs := []rune(value)
in := strings.Index(value, "=")
key := string(rs[0:in])
value := string(rs[in+1:])
if strings.HasPrefix(key, SupportedLabelPrefix) {
label := strings.TrimPrefix(key, SupportedLabelPrefix)
ret[label] = value
}
}
}
return ret
}
func (s *Session) checkIDExist() {
s.etcdCli.Txn(s.ctx).If(
clientv3.Compare(

View File

@ -1064,6 +1064,21 @@ func (s *SessionSuite) TestSafeCloseLiveCh() {
})
}
func (s *SessionSuite) TestGetSessions() {
os.Setenv("MILVUS_SERVER_LABEL_key1", "value1")
os.Setenv("MILVUS_SERVER_LABEL_key2", "value2")
os.Setenv("key3", "value3")
defer os.Unsetenv("MILVUS_SERVER_LABEL_key1")
defer os.Unsetenv("MILVUS_SERVER_LABEL_key2")
defer os.Unsetenv("key3")
ret := GetServerLabelsFromEnv("querynode")
assert.Equal(s.T(), 2, len(ret))
assert.Equal(s.T(), "value1", ret["key1"])
assert.Equal(s.T(), "value2", ret["key2"])
}
func TestSessionSuite(t *testing.T) {
suite.Run(t, new(SessionSuite))
}

View File

@ -12,7 +12,7 @@ require (
github.com/expr-lang/expr v1.15.7
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/klauspost/compress v1.17.7
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.16-0.20241110064419-549e4694a7e7
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.16-0.20241111072219-bd85c098660b
github.com/nats-io/nats-server/v2 v2.10.12
github.com/nats-io/nats.go v1.34.1
github.com/panjf2000/ants/v2 v2.7.2

View File

@ -505,6 +505,8 @@ github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZz
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.16-0.20241110064419-549e4694a7e7 h1:gq5xxDS2EIYVk3ujO+sQgDWrhTTpsmV+r6Gm7dfFrt8=
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.16-0.20241110064419-549e4694a7e7/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.16-0.20241111072219-bd85c098660b h1:IeH5mMgI9GyMES3/pLcsOVIAQmbJMAj48+xVVybvC9M=
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.16-0.20241111072219-bd85c098660b/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A=
github.com/milvus-io/pulsar-client-go v0.6.10/go.mod h1:lQqCkgwDF8YFYjKA+zOheTk1tev2B+bKj5j7+nm8M1w=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=