enhance: Support retriving file size from importutilv2.Reader (#31533) (#31594)

To reduce the overhead caused by listing the S3 objects, add an
interface to importutil.Reader to retrieve file sizes.

issue: https://github.com/milvus-io/milvus/issues/31532,
https://github.com/milvus-io/milvus/issues/28521

pr: https://github.com/milvus-io/milvus/pull/31533

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
yihao.dai 2024-03-26 10:09:08 +08:00 committed by GitHub
parent 3addc68c66
commit 35664fa302
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 230 additions and 104 deletions

View File

@ -150,7 +150,7 @@ func (e *executor) PreImport(task Task) {
}
defer reader.Close()
start := time.Now()
err = e.readFileStat(reader, task, i, file)
err = e.readFileStat(reader, task, i)
if err != nil {
e.handleErr(task, err, "preimport failed")
return err
@ -180,11 +180,17 @@ func (e *executor) PreImport(task Task) {
WrapLogFields(task, zap.Any("fileStats", task.(*PreImportTask).GetFileStats()))...)
}
func (e *executor) readFileStat(reader importutilv2.Reader, task Task, fileIdx int, file *internalpb.ImportFile) error {
fileSize, err := GetFileSize(file, e.cm, task)
func (e *executor) readFileStat(reader importutilv2.Reader, task Task, fileIdx int) error {
fileSize, err := reader.Size()
if err != nil {
return err
}
maxSize := paramtable.Get().DataNodeCfg.MaxImportFileSizeInGB.GetAsFloat() * 1024 * 1024 * 1024
if fileSize > int64(maxSize) {
return errors.New(fmt.Sprintf(
"The import file size has reached the maximum limit allowed for importing, "+
"fileSize=%d, maxSize=%d", fileSize, int64(maxSize)))
}
totalRows := 0
totalSize := 0

View File

@ -464,13 +464,10 @@ func (s *ExecutorSuite) TestExecutor_ReadFileStat() {
Paths: []string{"dummy.json"},
}
cm := mocks.NewChunkManager(s.T())
cm.EXPECT().Size(mock.Anything, mock.Anything).Return(1024, nil)
s.executor.cm = cm
var once sync.Once
data := createInsertData(s.T(), s.schema, s.numRows)
s.reader = importutilv2.NewMockReader(s.T())
s.reader.EXPECT().Size().Return(1024, nil)
s.reader.EXPECT().Read().RunAndReturn(func() (*storage.InsertData, error) {
var res *storage.InsertData
once.Do(func() {
@ -492,7 +489,7 @@ func (s *ExecutorSuite) TestExecutor_ReadFileStat() {
}
preimportTask := NewPreImportTask(preimportReq)
s.manager.Add(preimportTask)
err := s.executor.readFileStat(s.reader, preimportTask, 0, importFile)
err := s.executor.readFileStat(s.reader, preimportTask, 0)
s.NoError(err)
}

View File

@ -20,7 +20,6 @@ import (
"context"
"fmt"
"strconv"
"time"
"github.com/samber/lo"
"go.uber.org/zap"
@ -30,10 +29,8 @@ import (
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/importutilv2"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
@ -204,43 +201,6 @@ func GetInsertDataRowCount(data *storage.InsertData, schema *schemapb.Collection
return 0
}
func GetFileSize(file *internalpb.ImportFile, cm storage.ChunkManager, task Task) (int64, error) {
paths := file.GetPaths()
if importutilv2.IsBackup(task.GetOptions()) {
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
paths = make([]string, 0)
for _, prefix := range file.GetPaths() {
binlogs, _, err := cm.ListWithPrefix(ctx, prefix, true)
if err != nil {
return 0, err
}
paths = append(paths, binlogs...)
}
}
fn := func(path string) (int64, error) {
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
return cm.Size(ctx, path)
}
var totalSize int64 = 0
for _, path := range paths {
size, err := fn(path)
if err != nil {
return 0, err
}
totalSize += size
}
maxSize := paramtable.Get().DataNodeCfg.MaxImportFileSizeInGB.GetAsFloat() * 1024 * 1024 * 1024
if totalSize > int64(maxSize) {
return 0, merr.WrapErrImportFailed(fmt.Sprintf(
"The import file size has reached the maximum limit allowed for importing, "+
"fileSize=%d, maxSize=%d", totalSize, int64(maxSize)))
}
return totalSize, nil
}
func LogStats(manager TaskManager) {
logFunc := func(tasks []Task, taskType TaskType) {
byState := lo.GroupBy(tasks, func(t Task) datapb.ImportTaskStateV2 {

View File

@ -18,6 +18,7 @@ package storage
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"io"
@ -1247,3 +1248,15 @@ func Min(a, b int64) int64 {
}
return b
}
func GetFilesSize(ctx context.Context, paths []string, cm ChunkManager) (int64, error) {
totalSize := int64(0)
for _, filePath := range paths {
size, err := cm.Size(ctx, filePath)
if err != nil {
return 0, err
}
totalSize += size
}
return totalSize, nil
}

View File

@ -23,6 +23,9 @@ import (
"io"
"math"
"github.com/samber/lo"
"go.uber.org/atomic"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/util/merr"
@ -34,6 +37,7 @@ type reader struct {
cm storage.ChunkManager
schema *schemapb.CollectionSchema
fileSize *atomic.Int64
deleteData *storage.DeleteData
insertLogs map[int64][]string // fieldID -> binlogs
@ -50,9 +54,10 @@ func NewReader(ctx context.Context,
) (*reader, error) {
schema = typeutil.AppendSystemFields(schema)
r := &reader{
ctx: ctx,
cm: cm,
schema: schema,
ctx: ctx,
cm: cm,
schema: schema,
fileSize: atomic.NewInt64(0),
}
err := r.init(paths, tsStart, tsEnd)
if err != nil {
@ -200,4 +205,16 @@ OUTER:
return result, nil
}
func (r *reader) Size() (int64, error) {
if size := r.fileSize.Load(); size != 0 {
return size, nil
}
size, err := storage.GetFilesSize(r.ctx, lo.Flatten(lo.Values(r.insertLogs)), r.cm)
if err != nil {
return 0, err
}
r.fileSize.Store(size)
return size, nil
}
func (r *reader) Close() {}

View File

@ -17,11 +17,14 @@
package json
import (
"context"
"encoding/json"
"fmt"
"io"
"strings"
"go.uber.org/atomic"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/util/merr"
@ -35,9 +38,14 @@ const (
type Row = map[storage.FieldID]any
type reader struct {
dec *json.Decoder
ctx context.Context
cm storage.ChunkManager
schema *schemapb.CollectionSchema
fileSize *atomic.Int64
filePath string
dec *json.Decoder
bufferSize int
count int64
isOldFormat bool
@ -45,15 +53,22 @@ type reader struct {
parser RowParser
}
func NewReader(r io.Reader, schema *schemapb.CollectionSchema, bufferSize int) (*reader, error) {
var err error
func NewReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.CollectionSchema, path string, bufferSize int) (*reader, error) {
r, err := cm.Reader(ctx, path)
if err != nil {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("read json file failed, path=%s, err=%s", path, err.Error()))
}
count, err := estimateReadCountPerBatch(bufferSize, schema)
if err != nil {
return nil, err
}
reader := &reader{
dec: json.NewDecoder(r),
ctx: ctx,
cm: cm,
schema: schema,
fileSize: atomic.NewInt64(0),
filePath: path,
dec: json.NewDecoder(r),
bufferSize: bufferSize,
count: count,
}
@ -153,6 +168,18 @@ func (j *reader) Read() (*storage.InsertData, error) {
return insertData, nil
}
func (j *reader) Size() (int64, error) {
if size := j.fileSize.Load(); size != 0 {
return size, nil
}
size, err := j.cm.Size(j.ctx, j.filePath)
if err != nil {
return 0, err
}
j.fileSize.Store(size)
return size, nil
}
func (j *reader) Close() {}
func estimateReadCountPerBatch(bufferSize int, schema *schemapb.CollectionSchema) (int64, error) {

View File

@ -17,9 +17,11 @@
package json
import (
"context"
rand2 "crypto/rand"
"encoding/json"
"fmt"
"io"
"math"
"math/rand"
"strconv"
@ -28,11 +30,13 @@ import (
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"golang.org/x/exp/slices"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/paramtable"
@ -246,8 +250,18 @@ func (suite *ReaderSuite) run(dt schemapb.DataType) {
jsonBytes, err := json.Marshal(rows)
suite.NoError(err)
r := strings.NewReader(string(jsonBytes))
reader, err := NewReader(r, schema, math.MaxInt)
type mockReader struct {
io.Reader
io.Closer
io.ReaderAt
io.Seeker
}
cm := mocks.NewChunkManager(suite.T())
cm.EXPECT().Reader(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, s string) (storage.FileReader, error) {
r := &mockReader{Reader: strings.NewReader(string(jsonBytes))}
return r, nil
})
reader, err := NewReader(context.Background(), cm, schema, "mockPath", math.MaxInt)
suite.NoError(err)
checkFn := func(actualInsertData *storage.InsertData, offsetBegin, expectRows int) {

View File

@ -105,6 +105,57 @@ func (_c *MockReader_Read_Call) RunAndReturn(run func() (*storage.InsertData, er
return _c
}
// Size provides a mock function with given fields:
func (_m *MockReader) Size() (int64, error) {
ret := _m.Called()
var r0 int64
var r1 error
if rf, ok := ret.Get(0).(func() (int64, error)); ok {
return rf()
}
if rf, ok := ret.Get(0).(func() int64); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(int64)
}
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockReader_Size_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Size'
type MockReader_Size_Call struct {
*mock.Call
}
// Size is a helper method to define mock.On call
func (_e *MockReader_Expecter) Size() *MockReader_Size_Call {
return &MockReader_Size_Call{Call: _e.mock.On("Size")}
}
func (_c *MockReader_Size_Call) Run(run func()) *MockReader_Size_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockReader_Size_Call) Return(_a0 int64, _a1 error) *MockReader_Size_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockReader_Size_Call) RunAndReturn(run func() (int64, error)) *MockReader_Size_Call {
_c.Call.Return(run)
return _c
}
// NewMockReader creates a new instance of MockReader. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockReader(t interface {

View File

@ -24,19 +24,26 @@ import (
"strings"
"github.com/samber/lo"
"go.uber.org/atomic"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/util/merr"
)
type Reader struct {
type reader struct {
ctx context.Context
cm storage.ChunkManager
schema *schemapb.CollectionSchema
count int64
frs map[int64]*FieldReader // fieldID -> FieldReader
fileSize *atomic.Int64
paths []string
count int64
frs map[int64]*FieldReader // fieldID -> FieldReader
}
func NewReader(ctx context.Context, schema *schemapb.CollectionSchema, paths []string, cm storage.ChunkManager, bufferSize int) (*Reader, error) {
func NewReader(ctx context.Context, schema *schemapb.CollectionSchema, paths []string, cm storage.ChunkManager, bufferSize int) (*reader, error) {
fields := lo.KeyBy(schema.GetFields(), func(field *schemapb.FieldSchema) int64 {
return field.GetFieldID()
})
@ -56,14 +63,18 @@ func NewReader(ctx context.Context, schema *schemapb.CollectionSchema, paths []s
}
crs[fieldID] = cr
}
return &Reader{
schema: schema,
count: count,
frs: crs,
return &reader{
ctx: ctx,
cm: cm,
schema: schema,
fileSize: atomic.NewInt64(0),
paths: paths,
count: count,
frs: crs,
}, nil
}
func (r *Reader) Read() (*storage.InsertData, error) {
func (r *reader) Read() (*storage.InsertData, error) {
insertData, err := storage.NewInsertData(r.schema)
if err != nil {
return nil, err
@ -89,7 +100,19 @@ func (r *Reader) Read() (*storage.InsertData, error) {
return insertData, nil
}
func (r *Reader) Close() {
func (r *reader) Size() (int64, error) {
if size := r.fileSize.Load(); size != 0 {
return size, nil
}
size, err := storage.GetFilesSize(r.ctx, r.paths, r.cm)
if err != nil {
return 0, err
}
r.fileSize.Store(size)
return size, nil
}
func (r *reader) Close() {
for _, cr := range r.frs {
cr.Close()
}

View File

@ -251,7 +251,7 @@ func fillDynamicData(data *storage.InsertData, schema *schemapb.CollectionSchema
if dynamicField == nil {
return nil
}
rowNum := GetInsertDataRowNum(data, schema)
rowNum := getInsertDataRowNum(data, schema)
dynamicData := data.Data[dynamicField.GetFieldID()]
jsonFD := dynamicData.(*storage.JSONFieldData)
bs := []byte("{}")
@ -262,7 +262,7 @@ func fillDynamicData(data *storage.InsertData, schema *schemapb.CollectionSchema
return nil
}
func GetInsertDataRowNum(data *storage.InsertData, schema *schemapb.CollectionSchema) int {
func getInsertDataRowNum(data *storage.InsertData, schema *schemapb.CollectionSchema) int {
fields := lo.KeyBy(schema.GetFields(), func(field *schemapb.FieldSchema) int64 {
return field.GetFieldID()
})

View File

@ -25,6 +25,7 @@ import (
"github.com/apache/arrow/go/v12/parquet"
"github.com/apache/arrow/go/v12/parquet/file"
"github.com/apache/arrow/go/v12/parquet/pqarrow"
"go.uber.org/atomic"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
@ -33,28 +34,37 @@ import (
"github.com/milvus-io/milvus/pkg/util/merr"
)
type Reader struct {
reader *file.Reader
type reader struct {
ctx context.Context
cm storage.ChunkManager
schema *schemapb.CollectionSchema
path string
r *file.Reader
fileSize *atomic.Int64
bufferSize int
count int64
schema *schemapb.CollectionSchema
frs map[int64]*FieldReader // fieldID -> FieldReader
frs map[int64]*FieldReader // fieldID -> FieldReader
}
func NewReader(ctx context.Context, schema *schemapb.CollectionSchema, cmReader storage.FileReader, bufferSize int) (*Reader, error) {
reader, err := file.NewParquetReader(cmReader, file.WithReadProps(&parquet.ReaderProperties{
func NewReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.CollectionSchema, path string, bufferSize int) (*reader, error) {
cmReader, err := cm.Reader(ctx, path)
if err != nil {
return nil, err
}
r, err := file.NewParquetReader(cmReader, file.WithReadProps(&parquet.ReaderProperties{
BufferSize: int64(bufferSize),
BufferedStreamEnabled: true,
}))
if err != nil {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("new parquet reader failed, err=%v", err))
}
log.Info("create parquet reader done", zap.Int("row group num", reader.NumRowGroups()),
zap.Int64("num rows", reader.NumRows()))
log.Info("create parquet reader done", zap.Int("row group num", r.NumRowGroups()),
zap.Int64("num rows", r.NumRows()))
fileReader, err := pqarrow.NewFileReader(reader, pqarrow.ArrowReadProperties{}, memory.DefaultAllocator)
fileReader, err := pqarrow.NewFileReader(r, pqarrow.ArrowReadProperties{}, memory.DefaultAllocator)
if err != nil {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("new parquet file reader failed, err=%v", err))
}
@ -67,16 +77,20 @@ func NewReader(ctx context.Context, schema *schemapb.CollectionSchema, cmReader
if err != nil {
return nil, err
}
return &Reader{
reader: reader,
return &reader{
ctx: ctx,
cm: cm,
schema: schema,
fileSize: atomic.NewInt64(0),
path: path,
r: r,
bufferSize: bufferSize,
count: count,
schema: schema,
frs: crs,
}, nil
}
func (r *Reader) Read() (*storage.InsertData, error) {
func (r *reader) Read() (*storage.InsertData, error) {
insertData, err := storage.NewInsertData(r.schema)
if err != nil {
return nil, err
@ -108,11 +122,23 @@ OUTER:
return insertData, nil
}
func (r *Reader) Close() {
func (r *reader) Size() (int64, error) {
if size := r.fileSize.Load(); size != 0 {
return size, nil
}
size, err := r.cm.Size(r.ctx, r.path)
if err != nil {
return 0, err
}
r.fileSize.Store(size)
return size, nil
}
func (r *reader) Close() {
for _, cr := range r.frs {
cr.Close()
}
err := r.reader.Close()
err := r.r.Close()
if err != nil {
log.Warn("close parquet reader failed", zap.Error(err))
}

View File

@ -413,9 +413,7 @@ func (s *ReaderSuite) run(dt schemapb.DataType) {
f := storage.NewChunkManagerFactory("local", storage.RootPath("/tmp/milvus_test/test_parquet_reader/"))
cm, err := f.NewPersistentStorageChunkManager(ctx)
assert.NoError(s.T(), err)
cmReader, err := cm.Reader(ctx, filePath)
assert.NoError(s.T(), err)
reader, err := NewReader(ctx, schema, cmReader, 64*1024*1024)
reader, err := NewReader(ctx, cm, schema, filePath, 64*1024*1024)
s.NoError(err)
checkFn := func(actualInsertData *storage.InsertData, offsetBegin, expectRows int) {
@ -494,9 +492,7 @@ func (s *ReaderSuite) failRun(dt schemapb.DataType, isDynamic bool) {
f := storage.NewChunkManagerFactory("local", storage.RootPath("/tmp/milvus_test/test_parquet_reader/"))
cm, err := f.NewPersistentStorageChunkManager(ctx)
assert.NoError(s.T(), err)
cmReader, err := cm.Reader(ctx, filePath)
assert.NoError(s.T(), err)
reader, err := NewReader(ctx, schema, cmReader, 64*1024*1024)
reader, err := NewReader(ctx, cm, schema, filePath, 64*1024*1024)
s.NoError(err)
_, err = reader.Read()

View File

@ -31,7 +31,15 @@ import (
//go:generate mockery --name=Reader --structname=MockReader --output=./ --filename=mock_reader.go --with-expecter --inpackage
type Reader interface {
// Size returns the size of the underlying file/files in bytes.
// It returns an error if the size cannot be determined.
Size() (int64, error)
// Read reads data from the underlying file/files.
// It returns the storage.InsertData and an error, if any.
Read() (*storage.InsertData, error)
// Close closes the underlying file reader.
Close()
}
@ -57,19 +65,11 @@ func NewReader(ctx context.Context,
}
switch fileType {
case JSON:
reader, err := cm.Reader(ctx, importFile.GetPaths()[0])
if err != nil {
return nil, WrapReadFileError(importFile.GetPaths()[0], err)
}
return json.NewReader(reader, schema, bufferSize)
return json.NewReader(ctx, cm, schema, importFile.GetPaths()[0], bufferSize)
case Numpy:
return numpy.NewReader(ctx, schema, importFile.GetPaths(), cm, bufferSize)
case Parquet:
cmReader, err := cm.Reader(ctx, importFile.GetPaths()[0])
if err != nil {
return nil, err
}
return parquet.NewReader(ctx, schema, cmReader, bufferSize)
return parquet.NewReader(ctx, cm, schema, importFile.GetPaths()[0], bufferSize)
}
return nil, merr.WrapErrImportFailed("unexpected import file")
}

View File

@ -50,10 +50,6 @@ func (f FileType) String() string {
return FileTypeName[int(f)]
}
func WrapReadFileError(file string, err error) error {
return merr.WrapErrImportFailed(fmt.Sprintf("failed to read the file '%s', error: %s", file, err.Error()))
}
func GetFileType(file *internalpb.ImportFile) (FileType, error) {
if len(file.GetPaths()) == 0 {
return Invalid, merr.WrapErrImportFailed("no file to import")