diff --git a/internal/querycoord/task.go b/internal/querycoord/task.go index 8752f9e328..1c5a2fbcd6 100644 --- a/internal/querycoord/task.go +++ b/internal/querycoord/task.go @@ -35,6 +35,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/timerecord" + "github.com/milvus-io/milvus/internal/util/typeutil" ) const timeoutForRPC = 10 * time.Second @@ -1924,16 +1925,16 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error { for _, nodeID := range lbt.SourceNodeIDs { segmentID2Info := make(map[UniqueID]*querypb.SegmentInfo) dmChannel2WatchInfo := make(map[string]*querypb.DmChannelWatchInfo) - recoveredCollectionIDs := make(map[UniqueID]struct{}) + recoveredCollectionIDs := make(typeutil.UniqueSet) segmentInfos := lbt.meta.getSegmentInfosByNode(nodeID) for _, segmentInfo := range segmentInfos { segmentID2Info[segmentInfo.SegmentID] = segmentInfo - recoveredCollectionIDs[segmentInfo.CollectionID] = struct{}{} + recoveredCollectionIDs.Insert(segmentInfo.CollectionID) } dmChannelWatchInfos := lbt.meta.getDmChannelInfosByNodeID(nodeID) for _, watchInfo := range dmChannelWatchInfos { dmChannel2WatchInfo[watchInfo.DmChannel] = watchInfo - recoveredCollectionIDs[watchInfo.CollectionID] = struct{}{} + recoveredCollectionIDs.Insert(watchInfo.CollectionID) } for collectionID := range recoveredCollectionIDs { @@ -1977,27 +1978,27 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error { for _, segmentBingLog := range binlogs { segmentID := segmentBingLog.SegmentID - if info, ok := segmentID2Info[segmentID]; ok { + if _, ok := segmentID2Info[segmentID]; ok { segmentLoadInfo := lbt.broker.generateSegmentLoadInfo(ctx, collectionID, partitionID, segmentBingLog, true, schema) + msgBase := proto.Clone(lbt.Base).(*commonpb.MsgBase) msgBase.MsgType = commonpb.MsgType_LoadSegments - for _, replica := range info.ReplicaIds { - loadSegmentReq := &querypb.LoadSegmentsRequest{ - Base: msgBase, - Infos: []*querypb.SegmentLoadInfo{segmentLoadInfo}, - Schema: schema, + + loadSegmentReq := &querypb.LoadSegmentsRequest{ + Base: msgBase, + Infos: []*querypb.SegmentLoadInfo{segmentLoadInfo}, + Schema: schema, + CollectionID: collectionID, + + LoadMeta: &querypb.LoadMetaInfo{ + LoadType: collectionInfo.LoadType, CollectionID: collectionID, - - LoadMeta: &querypb.LoadMetaInfo{ - LoadType: collectionInfo.LoadType, - CollectionID: collectionID, - PartitionIDs: toRecoverPartitionIDs, - }, - ReplicaID: replica, - } - - loadSegmentReqs = append(loadSegmentReqs, loadSegmentReq) + PartitionIDs: toRecoverPartitionIDs, + }, + ReplicaID: replica.ReplicaID, } + + loadSegmentReqs = append(loadSegmentReqs, loadSegmentReq) } } diff --git a/internal/querycoord/task_scheduler.go b/internal/querycoord/task_scheduler.go index 7a88e45546..3a7a2e5fea 100644 --- a/internal/querycoord/task_scheduler.go +++ b/internal/querycoord/task_scheduler.go @@ -659,6 +659,8 @@ func (scheduler *TaskScheduler) scheduleLoop() { // if triggerCondition == NodeDown, loadSegment and watchDmchannel request will keep reschedule until the success // the node info has been deleted after assgining child task to triggerTask // so it is necessary to update the meta of segment and dmchannel, or some data may be lost in meta + + // triggerTask may be LoadCollection, LoadPartitions, LoadBalance if triggerTask.getResultInfo().ErrorCode == commonpb.ErrorCode_Success || triggerTask.getTriggerCondition() == querypb.TriggerCondition_NodeDown { err = updateSegmentInfoFromTask(scheduler.ctx, triggerTask, scheduler.meta) if err != nil { @@ -919,8 +921,8 @@ func updateSegmentInfoFromTask(ctx context.Context, triggerTask task, meta Meta) collectionID := loadInfo.CollectionID segmentID := loadInfo.SegmentID - segment, ok := segments[segmentID] - if !ok { + segment, saved := segments[segmentID] + if !saved { segment = &querypb.SegmentInfo{ SegmentID: segmentID, CollectionID: loadInfo.CollectionID, @@ -941,7 +943,10 @@ func updateSegmentInfoFromTask(ctx context.Context, triggerTask task, meta Meta) if _, ok := segmentInfosToSave[collectionID]; !ok { segmentInfosToSave[collectionID] = make([]*querypb.SegmentInfo, 0) } - segmentInfosToSave[collectionID] = append(segmentInfosToSave[collectionID], segment) + + if !saved { + segmentInfosToSave[collectionID] = append(segmentInfosToSave[collectionID], segment) + } } } }