diff --git a/internal/querycoordv2/handlers.go b/internal/querycoordv2/handlers.go index 63251533c9..27c1901ec8 100644 --- a/internal/querycoordv2/handlers.go +++ b/internal/querycoordv2/handlers.go @@ -18,7 +18,6 @@ package querycoordv2 import ( "context" - "fmt" "sync" "time" @@ -35,7 +34,6 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/utils" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/hardware" - "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -373,7 +371,7 @@ func (s *Server) tryGetNodesMetrics(ctx context.Context, req *milvuspb.GetMetric return ret } -func (s *Server) fillReplicaInfo(replica *meta.Replica, withShardNodes bool) (*milvuspb.ReplicaInfo, error) { +func (s *Server) fillReplicaInfo(replica *meta.Replica, withShardNodes bool) *milvuspb.ReplicaInfo { info := &milvuspb.ReplicaInfo{ ReplicaID: replica.GetID(), CollectionID: replica.GetCollectionID(), @@ -384,10 +382,11 @@ func (s *Server) fillReplicaInfo(replica *meta.Replica, withShardNodes bool) (*m channels := s.targetMgr.GetDmChannelsByCollection(replica.GetCollectionID(), meta.CurrentTarget) if len(channels) == 0 { - msg := "failed to get channels, collection not loaded" - log.Warn(msg) - return nil, merr.WrapErrCollectionNotFound(replica.GetCollectionID(), msg) + log.Warn("failed to get channels, collection may be not loaded or in recovering", zap.Int64("collectionID", replica.GetCollectionID())) + return info } + shardReplicas := make([]*milvuspb.ShardReplica, 0, len(channels)) + var segments []*meta.Segment if withShardNodes { segments = s.dist.SegmentDistManager.GetByFilter(meta.WithCollectionID(replica.GetCollectionID())) @@ -400,9 +399,11 @@ func (s *Server) fillReplicaInfo(replica *meta.Replica, withShardNodes bool) (*m leaderInfo = s.nodeMgr.Get(leader) } if leaderInfo == nil { - msg := fmt.Sprintf("failed to get shard leader for shard %s", channel) - log.Warn(msg) - return nil, merr.WrapErrNodeNotFound(leader, msg) + log.Warn("failed to get shard leader for shard", + zap.Int64("collectionID", replica.GetCollectionID()), + zap.Int64("replica", replica.GetID()), + zap.String("shard", channel.GetChannelName())) + return info } shard := &milvuspb.ShardReplica{ @@ -420,9 +421,10 @@ func (s *Server) fillReplicaInfo(replica *meta.Replica, withShardNodes bool) (*m }) shard.NodeIds = typeutil.NewUniqueSet(shardNodes...).Collect() } - info.ShardReplicas = append(info.ShardReplicas, shard) + shardReplicas = append(shardReplicas, shard) } - return info, nil + info.ShardReplicas = shardReplicas + return info } func filterDupLeaders(replicaManager *meta.ReplicaManager, leaders map[int64]*meta.LeaderView) map[int64]*meta.LeaderView { diff --git a/internal/querycoordv2/meta/resource_manager.go b/internal/querycoordv2/meta/resource_manager.go index b074993b14..99da154145 100644 --- a/internal/querycoordv2/meta/resource_manager.go +++ b/internal/querycoordv2/meta/resource_manager.go @@ -258,15 +258,18 @@ func (rm *ResourceManager) TransferNode(sourceRGName string, targetRGName string if sourceCfg.Requests.NodeNum < 0 { sourceCfg.Requests.NodeNum = 0 } + // Special case for compatibility with old version. + if sourceRGName != DefaultResourceGroupName { + sourceCfg.Limits.NodeNum -= int32(nodeNum) + if sourceCfg.Limits.NodeNum < 0 { + sourceCfg.Limits.NodeNum = 0 + } + } + targetCfg.Requests.NodeNum += int32(nodeNum) if targetCfg.Requests.NodeNum > targetCfg.Limits.NodeNum { targetCfg.Limits.NodeNum = targetCfg.Requests.NodeNum } - // transfer node from source resource group to target resource group at high priority. - targetCfg.TransferFrom = append(targetCfg.TransferFrom, &rgpb.ResourceGroupTransfer{ - ResourceGroup: sourceRGName, - }) - return rm.updateResourceGroups(map[string]*rgpb.ResourceGroupConfig{ sourceRGName: sourceCfg, targetRGName: targetCfg, diff --git a/internal/querycoordv2/meta/resource_manager_test.go b/internal/querycoordv2/meta/resource_manager_test.go index 1d6e07b3b1..969e09cde6 100644 --- a/internal/querycoordv2/meta/resource_manager_test.go +++ b/internal/querycoordv2/meta/resource_manager_test.go @@ -537,6 +537,23 @@ func (suite *ResourceManagerSuite) TestAutoRecover() { } func (suite *ResourceManagerSuite) testTransferNode() { + // Test redundant nodes recover to default resource group. + suite.manager.UpdateResourceGroups(map[string]*rgpb.ResourceGroupConfig{ + DefaultResourceGroupName: newResourceGroupConfig(40, 40), + "rg3": newResourceGroupConfig(0, 0), + "rg2": newResourceGroupConfig(40, 40), + "rg1": newResourceGroupConfig(20, 20), + }) + suite.manager.AutoRecoverResourceGroup("rg1") + suite.manager.AutoRecoverResourceGroup("rg2") + suite.manager.AutoRecoverResourceGroup(DefaultResourceGroupName) + suite.manager.AutoRecoverResourceGroup("rg3") + + suite.Equal(20, suite.manager.GetResourceGroup("rg1").NodeNum()) + suite.Equal(40, suite.manager.GetResourceGroup("rg2").NodeNum()) + suite.Equal(0, suite.manager.GetResourceGroup("rg3").NodeNum()) + suite.Equal(40, suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum()) + // Test TransferNode. // param error. err := suite.manager.TransferNode("rg1", "rg1", 1) diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index 53a4361d5c..fb87d87e51 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -842,33 +842,13 @@ func (s *Server) GetReplicas(ctx context.Context, req *milvuspb.GetReplicasReque replicas := s.meta.ReplicaManager.GetByCollection(req.GetCollectionID()) if len(replicas) == 0 { - err := merr.WrapErrReplicaNotFound(req.GetCollectionID(), "failed to get replicas by collection") - msg := "failed to get replicas, collection not loaded" - log.Warn(msg) - resp.Status = merr.Status(err) - return resp, nil + return &milvuspb.GetReplicasResponse{ + Replicas: make([]*milvuspb.ReplicaInfo, 0), + }, nil } for _, replica := range replicas { - msg := "failed to get replica info" - if len(replica.GetNodes()) == 0 { - err := merr.WrapErrReplicaNotAvailable(replica.GetID(), "no available nodes in replica") - log.Warn(msg, - zap.Int64("replica", replica.GetID()), - zap.Error(err)) - resp.Status = merr.Status(err) - break - } - - info, err := s.fillReplicaInfo(replica, req.GetWithShardNodes()) - if err != nil { - log.Warn(msg, - zap.Int64("replica", replica.GetID()), - zap.Error(err)) - resp.Status = merr.Status(err) - break - } - resp.Replicas = append(resp.Replicas, info) + resp.Replicas = append(resp.Replicas, s.fillReplicaInfo(replica, req.GetWithShardNodes())) } return resp, nil } diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index 0c74f60c95..5a062f226f 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -1569,7 +1569,7 @@ func (suite *ServiceSuite) TestGetReplicas() { suite.Equal(resp.GetStatus().GetCode(), merr.Code(merr.ErrServiceNotReady)) } -func (suite *ServiceSuite) TestGetReplicasFailed() { +func (suite *ServiceSuite) TestGetReplicasWhenNoAvailableNodes() { suite.loadAll() ctx := context.Background() server := suite.server @@ -1588,7 +1588,7 @@ func (suite *ServiceSuite) TestGetReplicasFailed() { } resp, err := server.GetReplicas(ctx, req) suite.NoError(err) - suite.ErrorIs(merr.Error(resp.GetStatus()), merr.ErrReplicaNotAvailable) + suite.True(merr.Ok(resp.GetStatus())) } func (suite *ServiceSuite) TestCheckHealth() {