From b4f21259efb316fac964c8752776b59a4b51dcd5 Mon Sep 17 00:00:00 2001 From: yah01 Date: Wed, 22 Jun 2022 21:08:13 +0800 Subject: [PATCH] Fix SegmentChangeInfo is not set properly (#17701) Signed-off-by: yah01 --- internal/querycoord/cluster.go | 11 +- internal/querycoord/group_balance.go | 26 ++++- internal/querycoord/meta.go | 100 +++++++++--------- .../querycoord/mock_querynode_server_test.go | 2 +- internal/querycoord/querynode_test.go | 2 +- internal/querycoord/task.go | 44 ++++---- internal/querycoord/task_scheduler.go | 28 ++--- internal/querycoord/task_test.go | 2 +- internal/querycoord/util.go | 9 ++ 9 files changed, 128 insertions(+), 96 deletions(-) diff --git a/internal/querycoord/cluster.go b/internal/querycoord/cluster.go index dbbdd409b5..e0c1560e53 100644 --- a/internal/querycoord/cluster.go +++ b/internal/querycoord/cluster.go @@ -405,11 +405,16 @@ func (c *queryNodeCluster) GetSegmentInfoByID(ctx context.Context, segmentID Uni return nil, err } + if len(segmentInfo.NodeIds) == 0 { + return nil, fmt.Errorf("GetSegmentInfoByID: no node loaded the segment %v", segmentID) + } + + node := segmentInfo.NodeIds[0] c.RLock() - targetNode, ok := c.nodes[segmentInfo.NodeID] + targetNode, ok := c.nodes[node] c.RUnlock() if !ok { - return nil, fmt.Errorf("updateSegmentInfo: can't find query node by nodeID, nodeID = %d", segmentInfo.NodeID) + return nil, fmt.Errorf("GetSegmentInfoByID: can't find query node by nodeID, nodeID=%v", node) } res, err := targetNode.getSegmentInfo(ctx, &querypb.GetSegmentInfoRequest{ @@ -429,7 +434,7 @@ func (c *queryNodeCluster) GetSegmentInfoByID(ctx context.Context, segmentID Uni } } - return nil, fmt.Errorf("updateSegmentInfo: can't find segment %d on QueryNode %d", segmentID, segmentInfo.NodeID) + return nil, fmt.Errorf("GetSegmentInfoByID: can't find segment %d on QueryNode %d", segmentID, node) } func (c *queryNodeCluster) GetSegmentInfo(ctx context.Context, in *querypb.GetSegmentInfoRequest) ([]*querypb.SegmentInfo, error) { diff --git a/internal/querycoord/group_balance.go b/internal/querycoord/group_balance.go index 2685c70adf..cd634eafe4 100644 --- a/internal/querycoord/group_balance.go +++ b/internal/querycoord/group_balance.go @@ -20,6 +20,25 @@ type balancePlan struct { targetReplica UniqueID } +// NewAddBalancePlan creates plan for adding nodes into dest replica +func NewAddBalancePlan(dest UniqueID, nodes ...UniqueID) *balancePlan { + return NewMoveBalancePlan(invalidReplicaID, dest, nodes...) +} + +// NewRemoveBalancePlan creates plan for removing nodes from src replica +func NewRemoveBalancePlan(src UniqueID, nodes ...UniqueID) *balancePlan { + return NewMoveBalancePlan(src, invalidReplicaID, nodes...) +} + +// NewMoveBalancePlan creates plan for moving nodes from src replica into dest replicas +func NewMoveBalancePlan(src, dest UniqueID, nodes ...UniqueID) *balancePlan { + return &balancePlan{ + nodes: nodes, + sourceReplica: src, + targetReplica: dest, + } +} + type replicaBalancer struct { meta Meta cluster Cluster @@ -72,11 +91,8 @@ func (b *replicaBalancer) AddNode(nodeID int64) ([]*balancePlan, error) { return replicaAvailableMemory[replicai] < replicaAvailableMemory[replicaj] }) - ret = append(ret, &balancePlan{ - nodes: []UniqueID{nodeID}, - sourceReplica: invalidReplicaID, - targetReplica: replicas[0].GetReplicaID(), - }) + ret = append(ret, + NewAddBalancePlan(replicas[0].GetReplicaID(), nodeID)) } return ret, nil } diff --git a/internal/querycoord/meta.go b/internal/querycoord/meta.go index 33ffc46799..72abe64193 100644 --- a/internal/querycoord/meta.go +++ b/internal/querycoord/meta.go @@ -599,32 +599,36 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal }, Infos: []*querypb.SegmentChangeInfo{}, } + for _, info := range onlineInfos { segmentID := info.SegmentID - onlineNodeID := info.NodeID - changeInfo := &querypb.SegmentChangeInfo{ - OnlineNodeID: onlineNodeID, - OnlineSegments: []*querypb.SegmentInfo{info}, - } + offlineInfo, err := m.getSegmentInfoByID(segmentID) if err == nil { - offlineNodeID := offlineInfo.NodeID // if the offline segment state is growing, it will not impact the global sealed segments if offlineInfo.SegmentState == commonpb.SegmentState_Sealed { - changeInfo.OfflineNodeID = offlineNodeID - changeInfo.OfflineSegments = []*querypb.SegmentInfo{offlineInfo} + offlineNodes := diffSlice(offlineInfo.NodeIds, info.NodeIds...) + + for _, node := range offlineNodes { + segmentsChangeInfo.Infos = append(segmentsChangeInfo.Infos, + &querypb.SegmentChangeInfo{ + OfflineNodeID: node, + OfflineSegments: []*querypb.SegmentInfo{offlineInfo}, + }) + } } } - segmentsChangeInfo.Infos = append(segmentsChangeInfo.Infos, changeInfo) // generate offline segment change info if the loaded segment is compacted from other sealed segments for _, compactionSegmentID := range info.CompactionFrom { compactionSegmentInfo, err := m.getSegmentInfoByID(compactionSegmentID) if err == nil && compactionSegmentInfo.SegmentState == commonpb.SegmentState_Sealed { - segmentsChangeInfo.Infos = append(segmentsChangeInfo.Infos, &querypb.SegmentChangeInfo{ - OfflineNodeID: compactionSegmentInfo.NodeID, - OfflineSegments: []*querypb.SegmentInfo{compactionSegmentInfo}, - }) + for _, node := range compactionSegmentInfo.NodeIds { + segmentsChangeInfo.Infos = append(segmentsChangeInfo.Infos, &querypb.SegmentChangeInfo{ + OfflineNodeID: node, + OfflineSegments: []*querypb.SegmentInfo{compactionSegmentInfo}, + }) + } segmentsCompactionFrom = append(segmentsCompactionFrom, compactionSegmentInfo) } else { return nil, fmt.Errorf("saveGlobalSealedSegInfos: the compacted segment %d has not been loaded into memory", compactionSegmentID) @@ -633,29 +637,6 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal } col2SegmentChangeInfos[collectionID] = segmentsChangeInfo } - queryChannelInfosMap := make(map[UniqueID]*querypb.QueryChannelInfo) - for collectionID, infos := range saves { - // TODO silverxia change QueryChannelInfo struct to simplifed one - // queryChannelInfo contains the GlobalSealedSegment list - queryChannelInfo := m.getQueryChannelInfoByID(collectionID) - - // merge save segment info and existing GlobalSealedSegments - seg2Info := make(map[UniqueID]*querypb.SegmentInfo) - for _, segmentInfo := range queryChannelInfo.GlobalSealedSegments { - segmentID := segmentInfo.SegmentID - seg2Info[segmentID] = segmentInfo - } - for _, segmentInfo := range infos { - segmentID := segmentInfo.SegmentID - seg2Info[segmentID] = segmentInfo - } - - globalSealedSegmentInfos := make([]*querypb.SegmentInfo, len(seg2Info)) - for _, info := range seg2Info { - globalSealedSegmentInfos = append(globalSealedSegmentInfos, info) - } - queryChannelInfo.GlobalSealedSegments = globalSealedSegmentInfos - } // save segmentInfo to etcd for _, infos := range saves { @@ -693,13 +674,6 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal panic(err) } - // Write back to cache - m.channelMu.Lock() - for collectionID, channelInfo := range queryChannelInfosMap { - m.queryChannelInfos[collectionID] = channelInfo - } - m.channelMu.Unlock() - return col2SegmentChangeInfos, nil } @@ -716,13 +690,14 @@ func (m *MetaReplica) removeGlobalSealedSegInfos(collectionID UniqueID, partitio Infos: []*querypb.SegmentChangeInfo{}, } for _, info := range removes { - offlineNodeID := info.NodeID - changeInfo := &querypb.SegmentChangeInfo{ - OfflineNodeID: offlineNodeID, - OfflineSegments: []*querypb.SegmentInfo{info}, - } + for _, node := range info.NodeIds { + segmentChangeInfos.Infos = append(segmentChangeInfos.Infos, + &querypb.SegmentChangeInfo{ + OfflineNodeID: node, + OfflineSegments: []*querypb.SegmentInfo{info}, + }) - segmentChangeInfos.Infos = append(segmentChangeInfos.Infos, changeInfo) + } } // produce sealedSegmentChangeInfos to query channel @@ -1334,6 +1309,33 @@ func getShardNodes(collectionID UniqueID, meta Meta) map[string]map[UniqueID]str return shardNodes } +// addNode2Segment addes node into segment, +// the old one within the same replica will be replaced +func addNode2Segment(meta Meta, node UniqueID, replicas []*milvuspb.ReplicaInfo, segment *querypb.SegmentInfo) { + for _, oldNode := range segment.NodeIds { + isInReplica := false + for _, replica := range replicas { + if nodeIncluded(oldNode, replica.NodeIds) { + // new node is in the same replica, replace the old ones + if nodeIncluded(node, replica.NodeIds) { + break + } + + // The old node is not the offline one + isInReplica = true + break + } + } + + if !isInReplica { + segment.NodeIds = removeFromSlice(segment.NodeIds, oldNode) + break + } + } + + segment.NodeIds = append(segment.NodeIds, node) +} + // getDataSegmentInfosByIDs return the SegmentInfo details according to the given ids through RPC to datacoord func (m *MetaReplica) getDataSegmentInfosByIDs(segmentIds []int64) ([]*datapb.SegmentInfo, error) { var segmentInfos []*datapb.SegmentInfo diff --git a/internal/querycoord/mock_querynode_server_test.go b/internal/querycoord/mock_querynode_server_test.go index 4dcdae5e13..a5026f6844 100644 --- a/internal/querycoord/mock_querynode_server_test.go +++ b/internal/querycoord/mock_querynode_server_test.go @@ -281,7 +281,7 @@ func (qs *queryNodeServerMock) GetMetrics(ctx context.Context, req *milvuspb.Get totalMemUsage := uint64(0) for _, info := range qs.segmentInfos { - if info.NodeID == qs.queryNodeID { + if nodeIncluded(qs.queryNodeID, info.NodeIds) { totalMemUsage += uint64(info.MemSize) } } diff --git a/internal/querycoord/querynode_test.go b/internal/querycoord/querynode_test.go index c920b2f75d..c0d462485e 100644 --- a/internal/querycoord/querynode_test.go +++ b/internal/querycoord/querynode_test.go @@ -275,7 +275,7 @@ func TestSealedSegmentChangeAfterQueryNodeStop(t *testing.T) { segmentInfos := queryCoord.meta.showSegmentInfos(defaultCollectionID, nil) recoverDone := true for _, info := range segmentInfos { - if info.NodeID != queryNode2.queryNodeID { + if !nodeIncluded(queryNode2.queryNodeID, info.NodeIds) { recoverDone = false break } diff --git a/internal/querycoord/task.go b/internal/querycoord/task.go index b039ea9e95..1b090de54c 100644 --- a/internal/querycoord/task.go +++ b/internal/querycoord/task.go @@ -2309,38 +2309,36 @@ func (lbt *loadBalanceTask) globalPostExecute(ctx context.Context) error { for replicaID := range replicas { replicaID := replicaID wg.Go(func() error { - return lbt.meta.applyReplicaBalancePlan(&balancePlan{ - nodes: lbt.SourceNodeIDs, - sourceReplica: replicaID, - }) + return lbt.meta.applyReplicaBalancePlan( + NewRemoveBalancePlan(replicaID, lbt.SourceNodeIDs...)) }) } } // Remove offline nodes from segment - for _, segment := range segments { - segment := segment - wg.Go(func() error { - segment.NodeID = -1 - segment.NodeIds = removeFromSlice(segment.NodeIds, lbt.SourceNodeIDs...) + // for _, segment := range segments { + // segment := segment + // wg.Go(func() error { + // segment.NodeID = -1 + // segment.NodeIds = removeFromSlice(segment.NodeIds, lbt.SourceNodeIDs...) - err := lbt.meta.saveSegmentInfo(segment) - if err != nil { - log.Error("failed to remove offline nodes from segment info", - zap.Int64("segmentID", segment.SegmentID), - zap.Error(err)) + // err := lbt.meta.saveSegmentInfo(segment) + // if err != nil { + // log.Error("failed to remove offline nodes from segment info", + // zap.Int64("segmentID", segment.SegmentID), + // zap.Error(err)) - return err - } + // return err + // } - log.Info("remove offline nodes from segment", - zap.Int64("taskID", lbt.getTaskID()), - zap.Int64("segmentID", segment.GetSegmentID()), - zap.Int64s("nodeIds", segment.GetNodeIds())) + // log.Info("remove offline nodes from segment", + // zap.Int64("taskID", lbt.getTaskID()), + // zap.Int64("segmentID", segment.GetSegmentID()), + // zap.Int64s("nodeIds", segment.GetNodeIds())) - return nil - }) - } + // return nil + // }) + // } // Remove offline nodes from dmChannels for _, dmChannel := range dmChannels { diff --git a/internal/querycoord/task_scheduler.go b/internal/querycoord/task_scheduler.go index 25c2f10c88..b91c00f418 100644 --- a/internal/querycoord/task_scheduler.go +++ b/internal/querycoord/task_scheduler.go @@ -659,14 +659,6 @@ func (scheduler *TaskScheduler) scheduleLoop() { // if triggerCondition == NodeDown, loadSegment and watchDmchannel request will keep reschedule until the success // the node info has been deleted after assgining child task to triggerTask // so it is necessary to update the meta of segment and dmchannel, or some data may be lost in meta - - // triggerTask may be LoadCollection, LoadPartitions, LoadBalance - if triggerTask.getResultInfo().ErrorCode == commonpb.ErrorCode_Success || triggerTask.getTriggerCondition() == querypb.TriggerCondition_NodeDown { - err = updateSegmentInfoFromTask(scheduler.ctx, triggerTask, scheduler.meta) - if err != nil { - triggerTask.setResultInfo(err) - } - } resultInfo := triggerTask.getResultInfo() if resultInfo.ErrorCode != commonpb.ErrorCode_Success { if !alreadyNotify { @@ -695,6 +687,14 @@ func (scheduler *TaskScheduler) scheduleLoop() { } } + // triggerTask may be LoadCollection, LoadPartitions, LoadBalance, Handoff + if triggerTask.getResultInfo().ErrorCode == commonpb.ErrorCode_Success || triggerTask.getTriggerCondition() == querypb.TriggerCondition_NodeDown { + err = updateSegmentInfoFromTask(scheduler.ctx, triggerTask, scheduler.meta) + if err != nil { + triggerTask.setResultInfo(err) + } + } + err = removeTaskFromKVFn(triggerTask) if err != nil { log.Error("scheduleLoop: error when remove trigger and internal tasks from etcd", zap.Int64("triggerTaskID", triggerTask.getTaskID()), zap.Error(err)) @@ -963,7 +963,7 @@ func updateSegmentInfoFromTask(ctx context.Context, triggerTask task, meta Meta) if err != nil { segment = &querypb.SegmentInfo{ SegmentID: segmentID, - CollectionID: loadInfo.CollectionID, + CollectionID: collectionID, PartitionID: loadInfo.PartitionID, DmChannel: loadInfo.InsertChannel, SegmentState: commonpb.SegmentState_Sealed, @@ -975,10 +975,12 @@ func updateSegmentInfoFromTask(ctx context.Context, triggerTask task, meta Meta) } } segment.ReplicaIds = append(segment.ReplicaIds, req.ReplicaID) - segment.ReplicaIds = removeFromSlice(segment.GetReplicaIds()) - - segment.NodeIds = append(segment.NodeIds, dstNodeID) - segment.NodeID = dstNodeID + segment.ReplicaIds = uniqueSlice(segment.GetReplicaIds()) + replicas, err := meta.getReplicasByCollectionID(collectionID) + if err != nil { + return err + } + addNode2Segment(meta, dstNodeID, replicas, segment) segments[segmentID] = segment diff --git a/internal/querycoord/task_test.go b/internal/querycoord/task_test.go index f3a9aedde9..7215ec2481 100644 --- a/internal/querycoord/task_test.go +++ b/internal/querycoord/task_test.go @@ -1171,7 +1171,7 @@ func TestLoadBalanceAndReschedulSegmentTaskAfterNodeDown(t *testing.T) { segmentInfos := queryCoord.meta.getSegmentInfosByNode(node3.queryNodeID) for _, segmentInfo := range segmentInfos { - if segmentInfo.NodeID == node3.queryNodeID { + if nodeIncluded(node3.queryNodeID, segmentInfo.NodeIds) { break } } diff --git a/internal/querycoord/util.go b/internal/querycoord/util.go index b0fd8f3092..6ce01aa8c5 100644 --- a/internal/querycoord/util.go +++ b/internal/querycoord/util.go @@ -209,6 +209,15 @@ func uniqueSlice(origin []UniqueID) []UniqueID { return set.Collect() } +// diffSlice returns a slice containing items in src but not in diff +func diffSlice(src []UniqueID, diff ...UniqueID) []UniqueID { + set := make(typeutil.UniqueSet, len(src)) + set.Insert(src...) + set.Remove(diff...) + + return set.Collect() +} + func getReplicaAvailableMemory(cluster Cluster, replica *milvuspb.ReplicaInfo) uint64 { availableMemory := uint64(0) nodes := getNodeInfos(cluster, replica.NodeIds)