mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
Refactor assigning loading segments task in QueryCoord (#15895)
Signed-off-by: yah01 <yang.cen@zilliz.com>
This commit is contained in:
parent
a5c3f4eef4
commit
7124e91d53
@ -20,6 +20,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -1933,49 +1934,48 @@ func assignInternalTask(ctx context.Context,
|
||||
}
|
||||
log.Debug("assignInternalTask: assign dmChannel to node success")
|
||||
|
||||
mergedLoadSegmentReqs := make(map[UniqueID]map[int64][]*querypb.LoadSegmentsRequest)
|
||||
sizeCounts := make(map[UniqueID]map[int64]int)
|
||||
for _, req := range loadSegmentRequests {
|
||||
nodeID := req.DstNodeID
|
||||
collectionID := req.CollectionID
|
||||
if _, ok := mergedLoadSegmentReqs[collectionID]; !ok {
|
||||
mergedLoadSegmentReqs[collectionID] = make(map[int64][]*querypb.LoadSegmentsRequest)
|
||||
sizeCounts[collectionID] = make(map[int64]int)
|
||||
}
|
||||
mergedLoadSegmentReqsPerCol := mergedLoadSegmentReqs[collectionID]
|
||||
sizeCountsPerCol := sizeCounts[collectionID]
|
||||
sizeOfReq := getSizeOfLoadSegmentReq(req)
|
||||
if _, ok := mergedLoadSegmentReqsPerCol[nodeID]; !ok {
|
||||
mergedLoadSegmentReqsPerCol[nodeID] = []*querypb.LoadSegmentsRequest{}
|
||||
mergedLoadSegmentReqsPerCol[nodeID] = append(mergedLoadSegmentReqsPerCol[nodeID], req)
|
||||
sizeCountsPerCol[nodeID] = sizeOfReq
|
||||
} else {
|
||||
if sizeCountsPerCol[nodeID]+sizeOfReq > MaxSendSizeToEtcd {
|
||||
mergedLoadSegmentReqsPerCol[nodeID] = append(mergedLoadSegmentReqsPerCol[nodeID], req)
|
||||
sizeCountsPerCol[nodeID] = sizeOfReq
|
||||
} else {
|
||||
lastReq := mergedLoadSegmentReqsPerCol[nodeID][len(mergedLoadSegmentReqsPerCol[nodeID])-1]
|
||||
lastReq.Infos = append(lastReq.Infos, req.Infos...)
|
||||
sizeCountsPerCol[nodeID] += sizeOfReq
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(loadSegmentRequests) > 0 {
|
||||
sort.Slice(loadSegmentRequests, func(i, j int) bool {
|
||||
return loadSegmentRequests[i].CollectionID < loadSegmentRequests[j].CollectionID ||
|
||||
loadSegmentRequests[i].CollectionID == loadSegmentRequests[j].CollectionID && loadSegmentRequests[i].DstNodeID < loadSegmentRequests[j].DstNodeID
|
||||
})
|
||||
|
||||
for _, loadSegmentsReqsPerCol := range mergedLoadSegmentReqs {
|
||||
for _, loadSegmentReqs := range loadSegmentsReqsPerCol {
|
||||
for _, req := range loadSegmentReqs {
|
||||
batchReq := loadSegmentRequests[0]
|
||||
batchSize := proto.Size(batchReq)
|
||||
for _, req := range loadSegmentRequests[1:] {
|
||||
// Pack current batch, switch to new batch
|
||||
if req.CollectionID != batchReq.CollectionID || req.DstNodeID != batchReq.DstNodeID ||
|
||||
batchSize+proto.Size(req) > MaxSendSizeToEtcd {
|
||||
baseTask := newBaseTask(ctx, parentTask.getTriggerCondition())
|
||||
baseTask.setParentTask(parentTask)
|
||||
loadSegmentTask := &loadSegmentTask{
|
||||
baseTask: baseTask,
|
||||
LoadSegmentsRequest: req,
|
||||
LoadSegmentsRequest: batchReq,
|
||||
meta: meta,
|
||||
cluster: cluster,
|
||||
excludeNodeIDs: excludeNodeIDs,
|
||||
}
|
||||
internalTasks = append(internalTasks, loadSegmentTask)
|
||||
|
||||
batchReq = req
|
||||
batchSize = proto.Size(batchReq)
|
||||
} else {
|
||||
batchReq.Infos = append(batchReq.Infos, req.Infos...)
|
||||
batchSize += proto.Size(req)
|
||||
}
|
||||
}
|
||||
|
||||
// Pack the last batch
|
||||
baseTask := newBaseTask(ctx, parentTask.getTriggerCondition())
|
||||
baseTask.setParentTask(parentTask)
|
||||
loadSegmentTask := &loadSegmentTask{
|
||||
baseTask: baseTask,
|
||||
LoadSegmentsRequest: batchReq,
|
||||
meta: meta,
|
||||
cluster: cluster,
|
||||
excludeNodeIDs: excludeNodeIDs,
|
||||
}
|
||||
internalTasks = append(internalTasks, loadSegmentTask)
|
||||
}
|
||||
|
||||
for _, req := range watchDmChannelRequests {
|
||||
@ -1994,10 +1994,6 @@ func assignInternalTask(ctx context.Context,
|
||||
return internalTasks, nil
|
||||
}
|
||||
|
||||
func getSizeOfLoadSegmentReq(req *querypb.LoadSegmentsRequest) int {
|
||||
return proto.Size(req)
|
||||
}
|
||||
|
||||
func generateWatchDeltaChannelInfo(info *datapb.VchannelInfo) (*datapb.VchannelInfo, error) {
|
||||
deltaChannelName, err := rootcoord.ConvertChannelName(info.ChannelName, Params.CommonCfg.RootCoordDml, Params.CommonCfg.RootCoordDelta)
|
||||
if err != nil {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user