mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
enhance: Accelerate listing objects during binlog import (#40047)
issue: https://github.com/milvus-io/milvus/issues/40030 --------- Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
parent
8fd39779f7
commit
8f077089ba
@ -21,6 +21,7 @@ import (
|
||||
"fmt"
|
||||
"math"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
@ -42,7 +43,9 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/v2/metrics"
|
||||
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
|
||||
"github.com/milvus-io/milvus/pkg/v2/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/conc"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/funcutil"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/hardware"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/metricsinfo"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
|
||||
@ -1695,7 +1698,8 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter
|
||||
log := log.Ctx(ctx).With(zap.Int64("collection", in.GetCollectionID()),
|
||||
zap.Int64s("partitions", in.GetPartitionIDs()),
|
||||
zap.Strings("channels", in.GetChannelNames()))
|
||||
log.Info("receive import request", zap.Any("files", in.GetFiles()), zap.Any("options", in.GetOptions()))
|
||||
log.Info("receive import request", zap.Int("fileNum", len(in.GetFiles())),
|
||||
zap.Any("files", in.GetFiles()), zap.Any("options", in.GetOptions()))
|
||||
|
||||
timeoutTs, err := importutilv2.GetTimeoutTs(in.GetOptions())
|
||||
if err != nil {
|
||||
@ -1707,14 +1711,28 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter
|
||||
isBackup := importutilv2.IsBackup(in.GetOptions())
|
||||
if isBackup {
|
||||
files = make([]*internalpb.ImportFile, 0)
|
||||
pool := conc.NewPool[struct{}](hardware.GetCPUNum() * 2)
|
||||
futures := make([]*conc.Future[struct{}], 0, len(in.GetFiles()))
|
||||
mu := &sync.Mutex{}
|
||||
for _, importFile := range in.GetFiles() {
|
||||
segmentPrefixes, err := ListBinlogsAndGroupBySegment(ctx, s.meta.chunkManager, importFile)
|
||||
if err != nil {
|
||||
resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprintf("list binlogs failed, err=%s", err)))
|
||||
return resp, nil
|
||||
}
|
||||
files = append(files, segmentPrefixes...)
|
||||
importFile := importFile
|
||||
futures = append(futures, pool.Submit(func() (struct{}, error) {
|
||||
segmentPrefixes, err := ListBinlogsAndGroupBySegment(ctx, s.meta.chunkManager, importFile)
|
||||
if err != nil {
|
||||
return struct{}{}, err
|
||||
}
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
files = append(files, segmentPrefixes...)
|
||||
return struct{}{}, nil
|
||||
}))
|
||||
}
|
||||
err = conc.AwaitAll(futures...)
|
||||
if err != nil {
|
||||
resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprintf("list binlogs failed, err=%s", err)))
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
files = lo.Filter(files, func(file *internalpb.ImportFile, _ int) bool {
|
||||
return len(file.GetPaths()) > 0
|
||||
})
|
||||
@ -1727,7 +1745,7 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter
|
||||
paramtable.Get().DataCoordCfg.MaxFilesPerImportReq.GetAsInt(), len(files))))
|
||||
return resp, nil
|
||||
}
|
||||
log.Info("list binlogs prefixes for import", zap.Any("binlog_prefixes", files))
|
||||
log.Info("list binlogs prefixes for import", zap.Int("num", len(files)), zap.Any("binlog_prefixes", files))
|
||||
}
|
||||
|
||||
// The import task does not need to be controlled for the time being, and additional development is required later.
|
||||
@ -1798,6 +1816,7 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter
|
||||
resp.JobID = fmt.Sprint(job.GetJobID())
|
||||
log.Info("add import job done",
|
||||
zap.Int64("jobID", job.GetJobID()),
|
||||
zap.Int("fileNum", len(files)),
|
||||
zap.Any("files", files),
|
||||
zap.Strings("readyChannels", in.GetChannelNames()),
|
||||
)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user