Parallel estimate segment size (#12136)

Signed-off-by: xige-16 <xi.ge@zilliz.com>
This commit is contained in:
xige-16 2021-11-19 18:09:13 +08:00 committed by GitHub
parent cec1b0cb77
commit e66537f9c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -20,6 +20,7 @@ import (
"context" "context"
"errors" "errors"
"sort" "sort"
"sync"
"time" "time"
"go.uber.org/zap" "go.uber.org/zap"
@ -99,13 +100,23 @@ func shuffleSegmentsToQueryNodeV2(ctx context.Context, reqs []*querypb.LoadSegme
return nil return nil
} }
log.Debug("shuffleSegmentsToQueryNodeV2: start estimate the size of loadReqs") log.Debug("shuffleSegmentsToQueryNodeV2: start estimate the size of loadReqs")
dataSizePerReq := make([]int64, 0) dataSizePerReq := make([]int64, len(reqs))
for _, req := range reqs { estimateError := make([]error, len(reqs))
sizeOfReq, err := cluster.estimateSegmentsSize(req) var estimateWg sync.WaitGroup
estimateReqFn := func(offset int, req *querypb.LoadSegmentsRequest) {
defer estimateWg.Done()
dataSizePerReq[offset], estimateError[offset] = cluster.estimateSegmentsSize(req)
}
for offset, req := range reqs {
estimateWg.Add(1)
go estimateReqFn(offset, req)
}
estimateWg.Wait()
for _, err := range estimateError {
if err != nil { if err != nil {
log.Debug("shuffleSegmentsToQueryNodeV2: estimate segment size error", zap.Error(err))
return err return err
} }
dataSizePerReq = append(dataSizePerReq, sizeOfReq)
} }
log.Debug("shuffleSegmentsToQueryNodeV2: estimate the size of loadReqs end") log.Debug("shuffleSegmentsToQueryNodeV2: estimate the size of loadReqs end")
for { for {