diff --git a/internal/querycoordv2/balance/rowcount_based_balancer.go b/internal/querycoordv2/balance/rowcount_based_balancer.go index 29878a74eb..1e95f05796 100644 --- a/internal/querycoordv2/balance/rowcount_based_balancer.go +++ b/internal/querycoordv2/balance/rowcount_based_balancer.go @@ -19,11 +19,13 @@ package balance import ( "sort" + "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/samber/lo" + "go.uber.org/zap" ) type RowCountBasedBalancer struct { @@ -117,17 +119,21 @@ func (b *RowCountBasedBalancer) balanceReplica(replica *meta.Replica) ([]Segment segments = lo.Filter(segments, func(segment *meta.Segment, _ int) bool { return b.targetMgr.GetHistoricalSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil }) + + if isStopping, err := b.nodeManager.IsStoppingNode(nid); err != nil { + log.Info("not existed node", zap.Int64("nid", nid), zap.Any("segments", segments), zap.Error(err)) + continue + } else if isStopping { + stoppingNodesSegments[nid] = segments + } else { + nodesSegments[nid] = segments + } + cnt := 0 for _, s := range segments { cnt += int(s.GetNumOfRows()) } nodesRowCnt[nid] = cnt - - if nodeInfo := b.nodeManager.Get(nid); nodeInfo.IsStoppingState() { - stoppingNodesSegments[nid] = segments - } else { - nodesSegments[nid] = segments - } totalCnt += cnt } diff --git a/internal/querycoordv2/balance/rowcount_based_balancer_test.go b/internal/querycoordv2/balance/rowcount_based_balancer_test.go index f525f510d9..c61e9ff881 100644 --- a/internal/querycoordv2/balance/rowcount_based_balancer_test.go +++ b/internal/querycoordv2/balance/rowcount_based_balancer_test.go @@ -134,6 +134,7 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() { cases := []struct { name string nodes []int64 + notExistedNodes []int64 segmentCnts []int states []session.State shouldMock bool @@ -212,16 +213,18 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() { }, }, { - name: "already balanced", - nodes: []int64{1, 2}, - segmentCnts: []int{1, 2}, - states: []session.State{session.NodeStateNormal, session.NodeStateNormal}, + name: "already balanced", + nodes: []int64{1, 2}, + notExistedNodes: []int64{10}, + segmentCnts: []int{1, 2}, + states: []session.State{session.NodeStateNormal, session.NodeStateNormal}, distributions: map[int64][]*meta.Segment{ 1: {{SegmentInfo: &datapb.SegmentInfo{ID: 1, CollectionID: 1, NumOfRows: 30}, Node: 1}}, 2: { {SegmentInfo: &datapb.SegmentInfo{ID: 2, CollectionID: 1, NumOfRows: 20}, Node: 2}, {SegmentInfo: &datapb.SegmentInfo{ID: 3, CollectionID: 1, NumOfRows: 30}, Node: 2}, }, + 10: {{SegmentInfo: &datapb.SegmentInfo{ID: 4, CollectionID: 1, NumOfRows: 30}, Node: 10}}, }, expectPlans: []SegmentAssignPlan{}, expectChannelPlans: []ChannelAssignPlan{}, @@ -259,7 +262,7 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() { collection.LoadPercentage = 100 collection.Status = querypb.LoadStatus_Loaded balancer.meta.CollectionManager.PutCollection(collection) - balancer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, c.nodes)) + balancer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, append(c.nodes, c.notExistedNodes...))) for node, s := range c.distributions { balancer.dist.SegmentDistManager.Update(node, s...) } diff --git a/internal/querycoordv2/session/node_manager.go b/internal/querycoordv2/session/node_manager.go index f748cd0ebb..2552843fb9 100644 --- a/internal/querycoordv2/session/node_manager.go +++ b/internal/querycoordv2/session/node_manager.go @@ -17,6 +17,7 @@ package session import ( + "fmt" "sync" "time" @@ -59,6 +60,17 @@ func (m *NodeManager) Stopping(nodeID int64) { } } +func (m *NodeManager) IsStoppingNode(nodeID int64) (bool, error) { + m.mu.RLock() + defer m.mu.RUnlock() + + node := m.nodes[nodeID] + if node == nil { + return false, fmt.Errorf("nodeID[%d] isn't existed", nodeID) + } + return node.IsStoppingState(), nil +} + func (m *NodeManager) Get(nodeID int64) *NodeInfo { m.mu.RLock() defer m.mu.RUnlock() diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index 3d3de9e912..c3e18fd074 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -328,7 +328,6 @@ func (node *QueryNode) Stop() error { if err != nil { log.Warn("session fail to go stopping state", zap.Error(err)) } else { - node.UpdateStateCode(commonpb.StateCode_Stopping) noSegmentChan := node.metaReplica.getNoSegmentChan() select { case <-noSegmentChan: diff --git a/scripts/stop_graceful.sh b/scripts/stop_graceful.sh index 0d8f753978..14ad072c7d 100755 --- a/scripts/stop_graceful.sh +++ b/scripts/stop_graceful.sh @@ -15,7 +15,7 @@ # limitations under the License. function get_milvus_process() { - return $(ps -e | grep milvus | grep -v grep | awk '{print $1}') + echo $(ps -e | grep milvus | grep -v grep | awk '{print $1}') } echo "Stopping milvus..."