From 33c855dcd20f80532cfa5283232bad215223d5b2 Mon Sep 17 00:00:00 2001 From: yah01 Date: Wed, 18 May 2022 11:55:56 +0800 Subject: [PATCH] Fix LoadBalance doesn't remove the source nodes from segment (#17051) If the triggerCondition isn't NodeDown, the removing won't happen. Signed-off-by: yah01 --- internal/querycoord/task.go | 96 ++++++++++++++++++++----------------- 1 file changed, 51 insertions(+), 45 deletions(-) diff --git a/internal/querycoord/task.go b/internal/querycoord/task.go index 80e6176e5a..cf9837cd0e 100644 --- a/internal/querycoord/task.go +++ b/internal/querycoord/task.go @@ -2252,36 +2252,40 @@ func (lbt *loadBalanceTask) postExecute(context.Context) error { func (lbt *loadBalanceTask) globalPostExecute(ctx context.Context) error { if len(lbt.getChildTask()) > 0 { + replicas := make(map[UniqueID]*milvuspb.ReplicaInfo) + segments := make(map[UniqueID]*querypb.SegmentInfo) + + for _, id := range lbt.SourceNodeIDs { + for _, segment := range lbt.meta.getSegmentInfosByNode(id) { + segments[segment.SegmentID] = segment + } + + nodeReplicas, err := lbt.meta.getReplicasByNodeID(id) + if err != nil { + log.Warn("failed to get replicas for removing offline querynode from it", + zap.Int64("querynodeID", id), + zap.Error(err)) + + continue + } + for _, replica := range nodeReplicas { + replicas[replica.ReplicaID] = replica + } + } + + log.Debug("removing offline nodes from replicas and segments...", + zap.Int("len(replicas)", len(replicas)), + zap.Int("len(segments)", len(segments)), + zap.Int64("trigger task ID", lbt.getTaskID()), + ) + wg := sync.WaitGroup{} + // Remove offline nodes from replica if lbt.triggerCondition == querypb.TriggerCondition_NodeDown { offlineNodes := make(typeutil.UniqueSet, len(lbt.SourceNodeIDs)) for _, nodeID := range lbt.SourceNodeIDs { offlineNodes.Insert(nodeID) } - replicas := make(map[UniqueID]*milvuspb.ReplicaInfo) - segments := make(map[UniqueID]*querypb.SegmentInfo) - for _, id := range lbt.SourceNodeIDs { - for _, segment := range lbt.meta.getSegmentInfosByNode(id) { - segments[segment.SegmentID] = segment - } - - nodeReplicas, err := lbt.meta.getReplicasByNodeID(id) - if err != nil { - log.Warn("failed to get replicas for removing offline querynode from it", - zap.Int64("querynodeID", id), - zap.Error(err)) - } - for _, replica := range nodeReplicas { - replicas[replica.ReplicaID] = replica - } - } - - log.Debug("removing offline nodes from replicas and segments...", - zap.Int("len(replicas)", len(replicas)), - zap.Int("len(segments)", len(segments)), - zap.Int64("trigger task ID", lbt.getTaskID()), - ) - wg := sync.WaitGroup{} for _, replica := range replicas { wg.Add(1) go func(replica *milvuspb.ReplicaInfo) { @@ -2303,29 +2307,31 @@ func (lbt *loadBalanceTask) globalPostExecute(ctx context.Context) error { } }(replica) } - - for _, segment := range segments { - wg.Add(1) - go func(segment *querypb.SegmentInfo) { - defer wg.Done() - - segment.NodeID = -1 - segment.NodeIds = removeFromSlice(segment.NodeIds, lbt.SourceNodeIDs...) - if len(segment.NodeIds) > 0 { - segment.NodeID = segment.NodeIds[0] - } - - err := lbt.meta.saveSegmentInfo(segment) - if err != nil { - log.Warn("failed to remove offline nodes from segment info", - zap.Int64("segmentID", segment.SegmentID), - zap.Error(err)) - } - }(segment) - } - wg.Wait() } + // Update the nodes list of segment, only remove the source nodes, + // adding destination nodes will be executed by updateSegmentInfoFromTask() + for _, segment := range segments { + wg.Add(1) + go func(segment *querypb.SegmentInfo) { + defer wg.Done() + + segment.NodeID = -1 + segment.NodeIds = removeFromSlice(segment.NodeIds, lbt.SourceNodeIDs...) + if len(segment.NodeIds) > 0 { + segment.NodeID = segment.NodeIds[0] + } + + err := lbt.meta.saveSegmentInfo(segment) + if err != nil { + log.Warn("failed to remove offline nodes from segment info", + zap.Int64("segmentID", segment.SegmentID), + zap.Error(err)) + } + }(segment) + } + wg.Wait() + err := syncReplicaSegments(ctx, lbt.cluster, lbt.getChildTask()) if err != nil { return err