Limit the concurrency level for single load request (#18400)

Signed-off-by: yah01 <yang.cen@zilliz.com>
This commit is contained in:
yah01 2022-07-26 15:18:30 +08:00 committed by GitHub
parent ef20375af2
commit 9621b6615d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -47,6 +47,10 @@ import (
"github.com/milvus-io/milvus/internal/util/timerecord"
)
const (
requestConcurrencyLevelLimit = 8
)
// segmentLoader is only responsible for loading the field data from binlog
type segmentLoader struct {
metaReplica ReplicaInterface
@ -89,10 +93,19 @@ func (loader *segmentLoader) LoadSegment(req *querypb.LoadSegmentsRequest, segme
zap.Any("segmentType", segmentType.String()))
// check memory limit
concurrencyLevel := loader.cpuPool.Cap()
if concurrencyLevel > segmentNum {
concurrencyLevel = segmentNum
min := func(first int, values ...int) int {
minValue := first
for _, v := range values {
if v < minValue {
minValue = v
}
}
return minValue
}
concurrencyLevel := min(loader.cpuPool.Cap(),
len(req.Infos),
requestConcurrencyLevelLimit)
for ; concurrencyLevel > 1; concurrencyLevel /= 2 {
err := loader.checkSegmentSize(req.CollectionID, req.Infos, concurrencyLevel)
if err == nil {
@ -848,6 +861,10 @@ func newSegmentLoader(
panic(err)
}
log.Info("SegmentLoader created",
zap.Int("cpu-pool-size", cpuNum),
zap.Int("io-pool-size", ioPoolSize))
loader := &segmentLoader{
metaReplica: metaReplica,