Optimize workflow of parallel (#15001)

Signed-off-by: dragondriver <jiquan.long@zilliz.com>
This commit is contained in:
Jiquan Long 2022-01-07 18:07:22 +08:00 committed by GitHub
parent 708dfddba6
commit 7f47ef0244
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -19,6 +19,7 @@ package funcutil
import ( import (
"reflect" "reflect"
"runtime" "runtime"
"sync"
"time" "time"
"github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/log"
@ -56,10 +57,20 @@ func ProcessFuncParallel(total, maxParallel int, f func(idx int) error, fname st
return b return b
} }
routineNum := 0 routineNum := 0
var wg sync.WaitGroup
for begin := 0; begin < total; begin = begin + nPerBatch { for begin := 0; begin < total; begin = begin + nPerBatch {
j := begin j := begin
wg.Add(1)
go func(begin int) { go func(begin int) {
defer wg.Done()
select {
case <-quit:
return
default:
}
err := error(nil) err := error(nil)
end := getMin(total, begin+nPerBatch) end := getMin(total, begin+nPerBatch)
@ -98,10 +109,12 @@ func ProcessFuncParallel(total, maxParallel int, f func(idx int) error, fname st
select { select {
case err := <-errc: case err := <-errc:
close(quit) close(quit)
wg.Wait()
return err return err
case <-done: case <-done:
count++ count++
if count == routineNum { if count == routineNum {
wg.Wait()
return nil return nil
} }
} }