From e66537f9c9c93e2cb8527dbcc0010a25ed1502bf Mon Sep 17 00:00:00 2001 From: xige-16 Date: Fri, 19 Nov 2021 18:09:13 +0800 Subject: [PATCH] Parallel estimate segment size (#12136) Signed-off-by: xige-16 --- internal/querycoord/segment_allocator.go | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/internal/querycoord/segment_allocator.go b/internal/querycoord/segment_allocator.go index a089347e65..6580ac948f 100644 --- a/internal/querycoord/segment_allocator.go +++ b/internal/querycoord/segment_allocator.go @@ -20,6 +20,7 @@ import ( "context" "errors" "sort" + "sync" "time" "go.uber.org/zap" @@ -99,13 +100,23 @@ func shuffleSegmentsToQueryNodeV2(ctx context.Context, reqs []*querypb.LoadSegme return nil } log.Debug("shuffleSegmentsToQueryNodeV2: start estimate the size of loadReqs") - dataSizePerReq := make([]int64, 0) - for _, req := range reqs { - sizeOfReq, err := cluster.estimateSegmentsSize(req) + dataSizePerReq := make([]int64, len(reqs)) + estimateError := make([]error, len(reqs)) + var estimateWg sync.WaitGroup + estimateReqFn := func(offset int, req *querypb.LoadSegmentsRequest) { + defer estimateWg.Done() + dataSizePerReq[offset], estimateError[offset] = cluster.estimateSegmentsSize(req) + } + for offset, req := range reqs { + estimateWg.Add(1) + go estimateReqFn(offset, req) + } + estimateWg.Wait() + for _, err := range estimateError { if err != nil { + log.Debug("shuffleSegmentsToQueryNodeV2: estimate segment size error", zap.Error(err)) return err } - dataSizePerReq = append(dataSizePerReq, sizeOfReq) } log.Debug("shuffleSegmentsToQueryNodeV2: estimate the size of loadReqs end") for {