mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
Fix LoadBalance doesn't remove the source nodes from segment (#17051)
If the triggerCondition isn't NodeDown, the removing won't happen. Signed-off-by: yah01 <yang.cen@zilliz.com>
This commit is contained in:
parent
127dd34b37
commit
33c855dcd2
@ -2252,36 +2252,40 @@ func (lbt *loadBalanceTask) postExecute(context.Context) error {
|
||||
|
||||
func (lbt *loadBalanceTask) globalPostExecute(ctx context.Context) error {
|
||||
if len(lbt.getChildTask()) > 0 {
|
||||
replicas := make(map[UniqueID]*milvuspb.ReplicaInfo)
|
||||
segments := make(map[UniqueID]*querypb.SegmentInfo)
|
||||
|
||||
for _, id := range lbt.SourceNodeIDs {
|
||||
for _, segment := range lbt.meta.getSegmentInfosByNode(id) {
|
||||
segments[segment.SegmentID] = segment
|
||||
}
|
||||
|
||||
nodeReplicas, err := lbt.meta.getReplicasByNodeID(id)
|
||||
if err != nil {
|
||||
log.Warn("failed to get replicas for removing offline querynode from it",
|
||||
zap.Int64("querynodeID", id),
|
||||
zap.Error(err))
|
||||
|
||||
continue
|
||||
}
|
||||
for _, replica := range nodeReplicas {
|
||||
replicas[replica.ReplicaID] = replica
|
||||
}
|
||||
}
|
||||
|
||||
log.Debug("removing offline nodes from replicas and segments...",
|
||||
zap.Int("len(replicas)", len(replicas)),
|
||||
zap.Int("len(segments)", len(segments)),
|
||||
zap.Int64("trigger task ID", lbt.getTaskID()),
|
||||
)
|
||||
wg := sync.WaitGroup{}
|
||||
// Remove offline nodes from replica
|
||||
if lbt.triggerCondition == querypb.TriggerCondition_NodeDown {
|
||||
offlineNodes := make(typeutil.UniqueSet, len(lbt.SourceNodeIDs))
|
||||
for _, nodeID := range lbt.SourceNodeIDs {
|
||||
offlineNodes.Insert(nodeID)
|
||||
}
|
||||
replicas := make(map[UniqueID]*milvuspb.ReplicaInfo)
|
||||
segments := make(map[UniqueID]*querypb.SegmentInfo)
|
||||
|
||||
for _, id := range lbt.SourceNodeIDs {
|
||||
for _, segment := range lbt.meta.getSegmentInfosByNode(id) {
|
||||
segments[segment.SegmentID] = segment
|
||||
}
|
||||
|
||||
nodeReplicas, err := lbt.meta.getReplicasByNodeID(id)
|
||||
if err != nil {
|
||||
log.Warn("failed to get replicas for removing offline querynode from it",
|
||||
zap.Int64("querynodeID", id),
|
||||
zap.Error(err))
|
||||
}
|
||||
for _, replica := range nodeReplicas {
|
||||
replicas[replica.ReplicaID] = replica
|
||||
}
|
||||
}
|
||||
|
||||
log.Debug("removing offline nodes from replicas and segments...",
|
||||
zap.Int("len(replicas)", len(replicas)),
|
||||
zap.Int("len(segments)", len(segments)),
|
||||
zap.Int64("trigger task ID", lbt.getTaskID()),
|
||||
)
|
||||
wg := sync.WaitGroup{}
|
||||
for _, replica := range replicas {
|
||||
wg.Add(1)
|
||||
go func(replica *milvuspb.ReplicaInfo) {
|
||||
@ -2303,29 +2307,31 @@ func (lbt *loadBalanceTask) globalPostExecute(ctx context.Context) error {
|
||||
}
|
||||
}(replica)
|
||||
}
|
||||
|
||||
for _, segment := range segments {
|
||||
wg.Add(1)
|
||||
go func(segment *querypb.SegmentInfo) {
|
||||
defer wg.Done()
|
||||
|
||||
segment.NodeID = -1
|
||||
segment.NodeIds = removeFromSlice(segment.NodeIds, lbt.SourceNodeIDs...)
|
||||
if len(segment.NodeIds) > 0 {
|
||||
segment.NodeID = segment.NodeIds[0]
|
||||
}
|
||||
|
||||
err := lbt.meta.saveSegmentInfo(segment)
|
||||
if err != nil {
|
||||
log.Warn("failed to remove offline nodes from segment info",
|
||||
zap.Int64("segmentID", segment.SegmentID),
|
||||
zap.Error(err))
|
||||
}
|
||||
}(segment)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// Update the nodes list of segment, only remove the source nodes,
|
||||
// adding destination nodes will be executed by updateSegmentInfoFromTask()
|
||||
for _, segment := range segments {
|
||||
wg.Add(1)
|
||||
go func(segment *querypb.SegmentInfo) {
|
||||
defer wg.Done()
|
||||
|
||||
segment.NodeID = -1
|
||||
segment.NodeIds = removeFromSlice(segment.NodeIds, lbt.SourceNodeIDs...)
|
||||
if len(segment.NodeIds) > 0 {
|
||||
segment.NodeID = segment.NodeIds[0]
|
||||
}
|
||||
|
||||
err := lbt.meta.saveSegmentInfo(segment)
|
||||
if err != nil {
|
||||
log.Warn("failed to remove offline nodes from segment info",
|
||||
zap.Int64("segmentID", segment.SegmentID),
|
||||
zap.Error(err))
|
||||
}
|
||||
}(segment)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
err := syncReplicaSegments(ctx, lbt.cluster, lbt.getChildTask())
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user