diff --git a/internal/querycoord/task.go b/internal/querycoord/task.go index eac5e35772..f4b2efd596 100644 --- a/internal/querycoord/task.go +++ b/internal/querycoord/task.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "sort" "sync" "time" @@ -1933,49 +1934,48 @@ func assignInternalTask(ctx context.Context, } log.Debug("assignInternalTask: assign dmChannel to node success") - mergedLoadSegmentReqs := make(map[UniqueID]map[int64][]*querypb.LoadSegmentsRequest) - sizeCounts := make(map[UniqueID]map[int64]int) - for _, req := range loadSegmentRequests { - nodeID := req.DstNodeID - collectionID := req.CollectionID - if _, ok := mergedLoadSegmentReqs[collectionID]; !ok { - mergedLoadSegmentReqs[collectionID] = make(map[int64][]*querypb.LoadSegmentsRequest) - sizeCounts[collectionID] = make(map[int64]int) - } - mergedLoadSegmentReqsPerCol := mergedLoadSegmentReqs[collectionID] - sizeCountsPerCol := sizeCounts[collectionID] - sizeOfReq := getSizeOfLoadSegmentReq(req) - if _, ok := mergedLoadSegmentReqsPerCol[nodeID]; !ok { - mergedLoadSegmentReqsPerCol[nodeID] = []*querypb.LoadSegmentsRequest{} - mergedLoadSegmentReqsPerCol[nodeID] = append(mergedLoadSegmentReqsPerCol[nodeID], req) - sizeCountsPerCol[nodeID] = sizeOfReq - } else { - if sizeCountsPerCol[nodeID]+sizeOfReq > MaxSendSizeToEtcd { - mergedLoadSegmentReqsPerCol[nodeID] = append(mergedLoadSegmentReqsPerCol[nodeID], req) - sizeCountsPerCol[nodeID] = sizeOfReq - } else { - lastReq := mergedLoadSegmentReqsPerCol[nodeID][len(mergedLoadSegmentReqsPerCol[nodeID])-1] - lastReq.Infos = append(lastReq.Infos, req.Infos...) - sizeCountsPerCol[nodeID] += sizeOfReq - } - } - } + if len(loadSegmentRequests) > 0 { + sort.Slice(loadSegmentRequests, func(i, j int) bool { + return loadSegmentRequests[i].CollectionID < loadSegmentRequests[j].CollectionID || + loadSegmentRequests[i].CollectionID == loadSegmentRequests[j].CollectionID && loadSegmentRequests[i].DstNodeID < loadSegmentRequests[j].DstNodeID + }) - for _, loadSegmentsReqsPerCol := range mergedLoadSegmentReqs { - for _, loadSegmentReqs := range loadSegmentsReqsPerCol { - for _, req := range loadSegmentReqs { + batchReq := loadSegmentRequests[0] + batchSize := proto.Size(batchReq) + for _, req := range loadSegmentRequests[1:] { + // Pack current batch, switch to new batch + if req.CollectionID != batchReq.CollectionID || req.DstNodeID != batchReq.DstNodeID || + batchSize+proto.Size(req) > MaxSendSizeToEtcd { baseTask := newBaseTask(ctx, parentTask.getTriggerCondition()) baseTask.setParentTask(parentTask) loadSegmentTask := &loadSegmentTask{ baseTask: baseTask, - LoadSegmentsRequest: req, + LoadSegmentsRequest: batchReq, meta: meta, cluster: cluster, excludeNodeIDs: excludeNodeIDs, } internalTasks = append(internalTasks, loadSegmentTask) + + batchReq = req + batchSize = proto.Size(batchReq) + } else { + batchReq.Infos = append(batchReq.Infos, req.Infos...) + batchSize += proto.Size(req) } } + + // Pack the last batch + baseTask := newBaseTask(ctx, parentTask.getTriggerCondition()) + baseTask.setParentTask(parentTask) + loadSegmentTask := &loadSegmentTask{ + baseTask: baseTask, + LoadSegmentsRequest: batchReq, + meta: meta, + cluster: cluster, + excludeNodeIDs: excludeNodeIDs, + } + internalTasks = append(internalTasks, loadSegmentTask) } for _, req := range watchDmChannelRequests { @@ -1994,10 +1994,6 @@ func assignInternalTask(ctx context.Context, return internalTasks, nil } -func getSizeOfLoadSegmentReq(req *querypb.LoadSegmentsRequest) int { - return proto.Size(req) -} - func generateWatchDeltaChannelInfo(info *datapb.VchannelInfo) (*datapb.VchannelInfo, error) { deltaChannelName, err := rootcoord.ConvertChannelName(info.ChannelName, Params.CommonCfg.RootCoordDml, Params.CommonCfg.RootCoordDelta) if err != nil {