diff --git a/internal/querycoordv2/meta/resource_manager.go b/internal/querycoordv2/meta/resource_manager.go index 9cbb2d6566..d69a3a5674 100644 --- a/internal/querycoordv2/meta/resource_manager.go +++ b/internal/querycoordv2/meta/resource_manager.go @@ -489,7 +489,7 @@ func (rm *ResourceManager) HandleNodeDown(node int64) (string, error) { return "", ErrNodeNotAssignToRG } -func (rm *ResourceManager) TransferNode(from, to string) error { +func (rm *ResourceManager) TransferNode(from string, to string, numNode int) error { rm.rwmutex.Lock() defer rm.rwmutex.Unlock() @@ -497,57 +497,66 @@ func (rm *ResourceManager) TransferNode(from, to string) error { return ErrRGNotExist } - if len(rm.groups[from].nodes) == 0 { - return ErrRGIsEmpty - } - rm.checkRGNodeStatus(from) rm.checkRGNodeStatus(to) + if len(rm.groups[from].nodes) < numNode { + return ErrNodeNotEnough + } //todo: a better way to choose a node with least balance cost - node := rm.groups[from].GetNodes()[0] - if err := rm.transferNodeInStore(from, to, node); err != nil { + movedNodes, err := rm.transferNodeInStore(from, to, numNode) + if err != nil { return err } - err := rm.groups[from].unassignNode(node) - if err != nil { - // interrupt transfer, unreachable logic path - return err - } + for _, node := range movedNodes { + err := rm.groups[from].unassignNode(node) + if err != nil { + // interrupt transfer, unreachable logic path + return err + } - err = rm.groups[to].assignNode(node) - if err != nil { - // interrupt transfer, unreachable logic path - return err + err = rm.groups[to].assignNode(node) + if err != nil { + // interrupt transfer, unreachable logic path + return err + } } return nil } -func (rm *ResourceManager) transferNodeInStore(from string, to string, node int64) error { +func (rm *ResourceManager) transferNodeInStore(from string, to string, numNode int) ([]int64, error) { + availableNodes := rm.groups[from].GetNodes() + if len(availableNodes) < numNode { + return nil, ErrNodeNotEnough + } + + movedNodes := make([]int64, 0, numNode) fromNodeList := make([]int64, 0) - for nid := range rm.groups[from].nodes { - if nid != node { - fromNodeList = append(fromNodeList, nid) + toNodeList := rm.groups[to].GetNodes() + for i := 0; i < len(availableNodes); i++ { + if i < numNode { + movedNodes = append(movedNodes, availableNodes[i]) + toNodeList = append(toNodeList, availableNodes[i]) + } else { + fromNodeList = append(fromNodeList, availableNodes[i]) } } - toNodeList := rm.groups[to].GetNodes() - toNodeList = append(toNodeList, node) fromRG := &querypb.ResourceGroup{ Name: from, - Capacity: int32(rm.groups[from].GetCapacity()) - 1, + Capacity: int32(rm.groups[from].GetCapacity() - numNode), Nodes: fromNodeList, } toRG := &querypb.ResourceGroup{ Name: to, - Capacity: int32(rm.groups[to].GetCapacity()) + 1, + Capacity: int32(rm.groups[to].GetCapacity() + numNode), Nodes: toNodeList, } - return rm.store.SaveResourceGroup(fromRG, toRG) + return movedNodes, rm.store.SaveResourceGroup(fromRG, toRG) } // auto recover rg, return recover used node num diff --git a/internal/querycoordv2/meta/resource_manager_test.go b/internal/querycoordv2/meta/resource_manager_test.go index 8dc48df234..910e4f4062 100644 --- a/internal/querycoordv2/meta/resource_manager_test.go +++ b/internal/querycoordv2/meta/resource_manager_test.go @@ -115,12 +115,34 @@ func (suite *ResourceManagerSuite) TestManipulateNode() { suite.ErrorIs(err, ErrNodeAlreadyAssign) // transfer node between rgs - err = suite.manager.TransferNode("rg1", "rg2") + err = suite.manager.TransferNode("rg1", "rg2", 1) suite.NoError(err) // transfer meet non exist rg - err = suite.manager.TransferNode("rgggg", "rg2") + err = suite.manager.TransferNode("rgggg", "rg2", 1) suite.ErrorIs(err, ErrRGNotExist) + + err = suite.manager.TransferNode("rg1", "rg2", 5) + suite.ErrorIs(err, ErrNodeNotEnough) + + suite.manager.nodeMgr.Add(session.NewNodeInfo(11, "localhost")) + suite.manager.nodeMgr.Add(session.NewNodeInfo(12, "localhost")) + suite.manager.nodeMgr.Add(session.NewNodeInfo(13, "localhost")) + suite.manager.nodeMgr.Add(session.NewNodeInfo(14, "localhost")) + suite.manager.AssignNode("rg1", 11) + suite.manager.AssignNode("rg1", 12) + suite.manager.AssignNode("rg1", 13) + suite.manager.AssignNode("rg1", 14) + + rg1, err := suite.manager.GetResourceGroup("rg1") + suite.NoError(err) + rg2, err := suite.manager.GetResourceGroup("rg2") + suite.NoError(err) + suite.Equal(rg1.GetCapacity(), 4) + suite.Equal(rg2.GetCapacity(), 1) + suite.manager.TransferNode("rg1", "rg2", 3) + suite.Equal(rg1.GetCapacity(), 1) + suite.Equal(rg2.GetCapacity(), 4) } func (suite *ResourceManagerSuite) TestHandleNodeUp() { diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index 0d15d8ea66..29c54e7470 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -1009,6 +1009,7 @@ func (s *Server) TransferNode(ctx context.Context, req *milvuspb.TransferNodeReq log := log.Ctx(ctx).With( zap.String("source", req.GetSourceResourceGroup()), zap.String("target", req.GetTargetResourceGroup()), + zap.Int32("numNode", req.GetNumNode()), ) log.Info("transfer node between resource group request received") @@ -1027,7 +1028,7 @@ func (s *Server) TransferNode(ctx context.Context, req *milvuspb.TransferNodeReq fmt.Sprintf("the target resource group[%s] doesn't exist", req.GetTargetResourceGroup()), meta.ErrRGNotExist), nil } - err := s.meta.ResourceManager.TransferNode(req.GetSourceResourceGroup(), req.GetTargetResourceGroup()) + err := s.meta.ResourceManager.TransferNode(req.GetSourceResourceGroup(), req.GetTargetResourceGroup(), int(req.GetNumNode())) if err != nil { log.Warn(ErrTransferNodeFailed.Error(), zap.Error(err)) return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, ErrTransferNodeFailed.Error(), err), nil diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index dc5be0414b..b7a53c1024 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -473,6 +473,7 @@ func (suite *ServiceSuite) TestTransferNode() { resp, err := server.TransferNode(ctx, &milvuspb.TransferNodeRequest{ SourceResourceGroup: meta.DefaultResourceGroupName, TargetResourceGroup: "rg1", + NumNode: 1, }) suite.NoError(err) suite.Equal(commonpb.ErrorCode_Success, resp.ErrorCode) @@ -498,6 +499,40 @@ func (suite *ServiceSuite) TestTransferNode() { suite.Contains(resp.Reason, meta.ErrRGNotExist.Error()) suite.Equal(commonpb.ErrorCode_IllegalArgument, resp.ErrorCode) + err = server.meta.ResourceManager.AddResourceGroup("rg3") + suite.NoError(err) + err = server.meta.ResourceManager.AddResourceGroup("rg4") + suite.NoError(err) + suite.nodeMgr.Add(session.NewNodeInfo(11, "localhost")) + suite.nodeMgr.Add(session.NewNodeInfo(12, "localhost")) + suite.nodeMgr.Add(session.NewNodeInfo(13, "localhost")) + suite.nodeMgr.Add(session.NewNodeInfo(14, "localhost")) + suite.meta.ResourceManager.AssignNode("rg3", 11) + suite.meta.ResourceManager.AssignNode("rg3", 12) + suite.meta.ResourceManager.AssignNode("rg3", 13) + suite.meta.ResourceManager.AssignNode("rg3", 14) + + resp, err = server.TransferNode(ctx, &milvuspb.TransferNodeRequest{ + SourceResourceGroup: "rg3", + TargetResourceGroup: "rg4", + NumNode: 3, + }) + suite.NoError(err) + suite.Equal(commonpb.ErrorCode_Success, resp.ErrorCode) + nodes, err = server.meta.ResourceManager.GetNodes("rg3") + suite.NoError(err) + suite.Len(nodes, 1) + nodes, err = server.meta.ResourceManager.GetNodes("rg4") + suite.NoError(err) + suite.Len(nodes, 3) + resp, err = server.TransferNode(ctx, &milvuspb.TransferNodeRequest{ + SourceResourceGroup: "rg3", + TargetResourceGroup: "rg4", + NumNode: 3, + }) + suite.NoError(err) + suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.ErrorCode) + // server unhealthy server.status.Store(commonpb.StateCode_Abnormal) resp, err = server.TransferNode(ctx, &milvuspb.TransferNodeRequest{