enhance: Add Size interface to FileReader to eliminate the StatObject call during Read (#42908)

issue: #42907

---------

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
cai.zhang 2025-06-25 14:36:41 +08:00 committed by GitHub
parent e2566c0e92
commit ebe1c95bb1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 149 additions and 43 deletions

View File

@ -60,6 +60,11 @@ type mockReader struct {
io.Closer io.Closer
io.ReaderAt io.ReaderAt
io.Seeker io.Seeker
size int64
}
func (mr *mockReader) Size() (int64, error) {
return mr.size, nil
} }
type SchedulerSuite struct { type SchedulerSuite struct {
@ -197,12 +202,6 @@ func (s *SchedulerSuite) TestScheduler_Start_Preimport_Failed() {
s.NoError(err) s.NoError(err)
cm := mocks.NewChunkManager(s.T()) cm := mocks.NewChunkManager(s.T())
type mockReader struct {
io.Reader
io.Closer
io.ReaderAt
io.Seeker
}
ioReader := strings.NewReader(string(bytes)) ioReader := strings.NewReader(string(bytes))
cm.EXPECT().Size(mock.Anything, mock.Anything).Return(1024, nil) cm.EXPECT().Size(mock.Anything, mock.Anything).Return(1024, nil)
cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader, Closer: io.NopCloser(ioReader)}, nil) cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader, Closer: io.NopCloser(ioReader)}, nil)

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.32.4. DO NOT EDIT. // Code generated by mockery v2.53.3. DO NOT EDIT.
package mocks package mocks
@ -29,6 +29,10 @@ func (_m *ChunkManager) EXPECT() *ChunkManager_Expecter {
func (_m *ChunkManager) Exist(ctx context.Context, filePath string) (bool, error) { func (_m *ChunkManager) Exist(ctx context.Context, filePath string) (bool, error) {
ret := _m.Called(ctx, filePath) ret := _m.Called(ctx, filePath)
if len(ret) == 0 {
panic("no return value specified for Exist")
}
var r0 bool var r0 bool
var r1 error var r1 error
if rf, ok := ret.Get(0).(func(context.Context, string) (bool, error)); ok { if rf, ok := ret.Get(0).(func(context.Context, string) (bool, error)); ok {
@ -82,6 +86,10 @@ func (_c *ChunkManager_Exist_Call) RunAndReturn(run func(context.Context, string
func (_m *ChunkManager) Mmap(ctx context.Context, filePath string) (*mmap.ReaderAt, error) { func (_m *ChunkManager) Mmap(ctx context.Context, filePath string) (*mmap.ReaderAt, error) {
ret := _m.Called(ctx, filePath) ret := _m.Called(ctx, filePath)
if len(ret) == 0 {
panic("no return value specified for Mmap")
}
var r0 *mmap.ReaderAt var r0 *mmap.ReaderAt
var r1 error var r1 error
if rf, ok := ret.Get(0).(func(context.Context, string) (*mmap.ReaderAt, error)); ok { if rf, ok := ret.Get(0).(func(context.Context, string) (*mmap.ReaderAt, error)); ok {
@ -137,6 +145,10 @@ func (_c *ChunkManager_Mmap_Call) RunAndReturn(run func(context.Context, string)
func (_m *ChunkManager) MultiRead(ctx context.Context, filePaths []string) ([][]byte, error) { func (_m *ChunkManager) MultiRead(ctx context.Context, filePaths []string) ([][]byte, error) {
ret := _m.Called(ctx, filePaths) ret := _m.Called(ctx, filePaths)
if len(ret) == 0 {
panic("no return value specified for MultiRead")
}
var r0 [][]byte var r0 [][]byte
var r1 error var r1 error
if rf, ok := ret.Get(0).(func(context.Context, []string) ([][]byte, error)); ok { if rf, ok := ret.Get(0).(func(context.Context, []string) ([][]byte, error)); ok {
@ -192,6 +204,10 @@ func (_c *ChunkManager_MultiRead_Call) RunAndReturn(run func(context.Context, []
func (_m *ChunkManager) MultiRemove(ctx context.Context, filePaths []string) error { func (_m *ChunkManager) MultiRemove(ctx context.Context, filePaths []string) error {
ret := _m.Called(ctx, filePaths) ret := _m.Called(ctx, filePaths)
if len(ret) == 0 {
panic("no return value specified for MultiRemove")
}
var r0 error var r0 error
if rf, ok := ret.Get(0).(func(context.Context, []string) error); ok { if rf, ok := ret.Get(0).(func(context.Context, []string) error); ok {
r0 = rf(ctx, filePaths) r0 = rf(ctx, filePaths)
@ -235,6 +251,10 @@ func (_c *ChunkManager_MultiRemove_Call) RunAndReturn(run func(context.Context,
func (_m *ChunkManager) MultiWrite(ctx context.Context, contents map[string][]byte) error { func (_m *ChunkManager) MultiWrite(ctx context.Context, contents map[string][]byte) error {
ret := _m.Called(ctx, contents) ret := _m.Called(ctx, contents)
if len(ret) == 0 {
panic("no return value specified for MultiWrite")
}
var r0 error var r0 error
if rf, ok := ret.Get(0).(func(context.Context, map[string][]byte) error); ok { if rf, ok := ret.Get(0).(func(context.Context, map[string][]byte) error); ok {
r0 = rf(ctx, contents) r0 = rf(ctx, contents)
@ -278,6 +298,10 @@ func (_c *ChunkManager_MultiWrite_Call) RunAndReturn(run func(context.Context, m
func (_m *ChunkManager) Path(ctx context.Context, filePath string) (string, error) { func (_m *ChunkManager) Path(ctx context.Context, filePath string) (string, error) {
ret := _m.Called(ctx, filePath) ret := _m.Called(ctx, filePath)
if len(ret) == 0 {
panic("no return value specified for Path")
}
var r0 string var r0 string
var r1 error var r1 error
if rf, ok := ret.Get(0).(func(context.Context, string) (string, error)); ok { if rf, ok := ret.Get(0).(func(context.Context, string) (string, error)); ok {
@ -331,6 +355,10 @@ func (_c *ChunkManager_Path_Call) RunAndReturn(run func(context.Context, string)
func (_m *ChunkManager) Read(ctx context.Context, filePath string) ([]byte, error) { func (_m *ChunkManager) Read(ctx context.Context, filePath string) ([]byte, error) {
ret := _m.Called(ctx, filePath) ret := _m.Called(ctx, filePath)
if len(ret) == 0 {
panic("no return value specified for Read")
}
var r0 []byte var r0 []byte
var r1 error var r1 error
if rf, ok := ret.Get(0).(func(context.Context, string) ([]byte, error)); ok { if rf, ok := ret.Get(0).(func(context.Context, string) ([]byte, error)); ok {
@ -386,6 +414,10 @@ func (_c *ChunkManager_Read_Call) RunAndReturn(run func(context.Context, string)
func (_m *ChunkManager) ReadAt(ctx context.Context, filePath string, off int64, length int64) ([]byte, error) { func (_m *ChunkManager) ReadAt(ctx context.Context, filePath string, off int64, length int64) ([]byte, error) {
ret := _m.Called(ctx, filePath, off, length) ret := _m.Called(ctx, filePath, off, length)
if len(ret) == 0 {
panic("no return value specified for ReadAt")
}
var r0 []byte var r0 []byte
var r1 error var r1 error
if rf, ok := ret.Get(0).(func(context.Context, string, int64, int64) ([]byte, error)); ok { if rf, ok := ret.Get(0).(func(context.Context, string, int64, int64) ([]byte, error)); ok {
@ -443,6 +475,10 @@ func (_c *ChunkManager_ReadAt_Call) RunAndReturn(run func(context.Context, strin
func (_m *ChunkManager) Reader(ctx context.Context, filePath string) (storage.FileReader, error) { func (_m *ChunkManager) Reader(ctx context.Context, filePath string) (storage.FileReader, error) {
ret := _m.Called(ctx, filePath) ret := _m.Called(ctx, filePath)
if len(ret) == 0 {
panic("no return value specified for Reader")
}
var r0 storage.FileReader var r0 storage.FileReader
var r1 error var r1 error
if rf, ok := ret.Get(0).(func(context.Context, string) (storage.FileReader, error)); ok { if rf, ok := ret.Get(0).(func(context.Context, string) (storage.FileReader, error)); ok {
@ -498,6 +534,10 @@ func (_c *ChunkManager_Reader_Call) RunAndReturn(run func(context.Context, strin
func (_m *ChunkManager) Remove(ctx context.Context, filePath string) error { func (_m *ChunkManager) Remove(ctx context.Context, filePath string) error {
ret := _m.Called(ctx, filePath) ret := _m.Called(ctx, filePath)
if len(ret) == 0 {
panic("no return value specified for Remove")
}
var r0 error var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { if rf, ok := ret.Get(0).(func(context.Context, string) error); ok {
r0 = rf(ctx, filePath) r0 = rf(ctx, filePath)
@ -541,6 +581,10 @@ func (_c *ChunkManager_Remove_Call) RunAndReturn(run func(context.Context, strin
func (_m *ChunkManager) RemoveWithPrefix(ctx context.Context, prefix string) error { func (_m *ChunkManager) RemoveWithPrefix(ctx context.Context, prefix string) error {
ret := _m.Called(ctx, prefix) ret := _m.Called(ctx, prefix)
if len(ret) == 0 {
panic("no return value specified for RemoveWithPrefix")
}
var r0 error var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { if rf, ok := ret.Get(0).(func(context.Context, string) error); ok {
r0 = rf(ctx, prefix) r0 = rf(ctx, prefix)
@ -580,10 +624,14 @@ func (_c *ChunkManager_RemoveWithPrefix_Call) RunAndReturn(run func(context.Cont
return _c return _c
} }
// RootPath provides a mock function with given fields: // RootPath provides a mock function with no fields
func (_m *ChunkManager) RootPath() string { func (_m *ChunkManager) RootPath() string {
ret := _m.Called() ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for RootPath")
}
var r0 string var r0 string
if rf, ok := ret.Get(0).(func() string); ok { if rf, ok := ret.Get(0).(func() string); ok {
r0 = rf() r0 = rf()
@ -625,6 +673,10 @@ func (_c *ChunkManager_RootPath_Call) RunAndReturn(run func() string) *ChunkMana
func (_m *ChunkManager) Size(ctx context.Context, filePath string) (int64, error) { func (_m *ChunkManager) Size(ctx context.Context, filePath string) (int64, error) {
ret := _m.Called(ctx, filePath) ret := _m.Called(ctx, filePath)
if len(ret) == 0 {
panic("no return value specified for Size")
}
var r0 int64 var r0 int64
var r1 error var r1 error
if rf, ok := ret.Get(0).(func(context.Context, string) (int64, error)); ok { if rf, ok := ret.Get(0).(func(context.Context, string) (int64, error)); ok {
@ -678,6 +730,10 @@ func (_c *ChunkManager_Size_Call) RunAndReturn(run func(context.Context, string)
func (_m *ChunkManager) WalkWithPrefix(ctx context.Context, prefix string, recursive bool, walkFunc storage.ChunkObjectWalkFunc) error { func (_m *ChunkManager) WalkWithPrefix(ctx context.Context, prefix string, recursive bool, walkFunc storage.ChunkObjectWalkFunc) error {
ret := _m.Called(ctx, prefix, recursive, walkFunc) ret := _m.Called(ctx, prefix, recursive, walkFunc)
if len(ret) == 0 {
panic("no return value specified for WalkWithPrefix")
}
var r0 error var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string, bool, storage.ChunkObjectWalkFunc) error); ok { if rf, ok := ret.Get(0).(func(context.Context, string, bool, storage.ChunkObjectWalkFunc) error); ok {
r0 = rf(ctx, prefix, recursive, walkFunc) r0 = rf(ctx, prefix, recursive, walkFunc)
@ -723,6 +779,10 @@ func (_c *ChunkManager_WalkWithPrefix_Call) RunAndReturn(run func(context.Contex
func (_m *ChunkManager) Write(ctx context.Context, filePath string, content []byte) error { func (_m *ChunkManager) Write(ctx context.Context, filePath string, content []byte) error {
ret := _m.Called(ctx, filePath, content) ret := _m.Called(ctx, filePath, content)
if len(ret) == 0 {
panic("no return value specified for Write")
}
var r0 error var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string, []byte) error); ok { if rf, ok := ret.Get(0).(func(context.Context, string, []byte) error); ok {
r0 = rf(ctx, filePath, content) r0 = rf(ctx, filePath, content)

View File

@ -49,6 +49,7 @@ type BlobReader struct {
client *blockblob.Client client *blockblob.Client
position int64 position int64
body io.ReadCloser body io.ReadCloser
contentLength int64
needResetStream bool needResetStream bool
} }
@ -70,6 +71,7 @@ func (b *BlobReader) Read(p []byte) (n int, err error) {
return 0, err return 0, err
} }
b.body = object.Body b.body = object.Body
b.contentLength = *object.ContentLength
} }
n, err = b.body.Read(p) n, err = b.body.Read(p)
@ -126,6 +128,10 @@ func (b *BlobReader) Seek(offset int64, whence int) (int64, error) {
return newOffset, nil return newOffset, nil
} }
func (b *BlobReader) Size() (int64, error) {
return b.contentLength, nil
}
func (AzureObjectStorage *AzureObjectStorage) GetObject(ctx context.Context, bucketName, objectName string, offset int64, size int64) (FileReader, error) { func (AzureObjectStorage *AzureObjectStorage) GetObject(ctx context.Context, bucketName, objectName string, offset int64, size int64) (FileReader, error) {
return NewBlobReader(AzureObjectStorage.Client.NewContainerClient(bucketName).NewBlockBlobClient(objectName), offset) return NewBlobReader(AzureObjectStorage.Client.NewContainerClient(bucketName).NewBlockBlobClient(objectName), offset)
} }
@ -148,7 +154,7 @@ func (AzureObjectStorage *AzureObjectStorage) WalkWithObjects(ctx context.Contex
pager := AzureObjectStorage.Client.NewContainerClient(bucketName).NewListBlobsFlatPager(&azblob.ListBlobsFlatOptions{ pager := AzureObjectStorage.Client.NewContainerClient(bucketName).NewListBlobsFlatPager(&azblob.ListBlobsFlatOptions{
Prefix: &prefix, Prefix: &prefix,
}) })
if pager.More() { for pager.More() {
pageResp, err := pager.NextPage(ctx) pageResp, err := pager.NextPage(ctx)
if err != nil { if err != nil {
return err return err
@ -163,7 +169,7 @@ func (AzureObjectStorage *AzureObjectStorage) WalkWithObjects(ctx context.Contex
pager := AzureObjectStorage.Client.NewContainerClient(bucketName).NewListBlobsHierarchyPager("/", &container.ListBlobsHierarchyOptions{ pager := AzureObjectStorage.Client.NewContainerClient(bucketName).NewListBlobsHierarchyPager("/", &container.ListBlobsHierarchyOptions{
Prefix: &prefix, Prefix: &prefix,
}) })
if pager.More() { for pager.More() {
pageResp, err := pager.NextPage(ctx) pageResp, err := pager.NextPage(ctx)
if err != nil { if err != nil {
return err return err

View File

@ -64,7 +64,12 @@ func (gcs *GcpNativeObjectStorage) GetObject(ctx context.Context, bucketName, ob
if err != nil { if err != nil {
return nil, checkObjectStorageError(objectName, err) return nil, checkObjectStorageError(objectName, err)
} }
return &GcsReader{reader: reader, obj: obj}, nil
return &GcsReader{
reader: reader,
obj: obj,
objectSize: reader.Attrs.Size,
}, nil
} }
func (gcs *GcpNativeObjectStorage) PutObject(ctx context.Context, bucketName, objectName string, func (gcs *GcpNativeObjectStorage) PutObject(ctx context.Context, bucketName, objectName string,
@ -168,6 +173,7 @@ type GcsReader struct {
reader *storage.Reader reader *storage.Reader
obj *storage.ObjectHandle obj *storage.ObjectHandle
position int64 position int64
objectSize int64
} }
func (gcsReader *GcsReader) Read(p []byte) (n int, err error) { func (gcsReader *GcsReader) Read(p []byte) (n int, err error) {
@ -237,3 +243,7 @@ func (gcsReader *GcsReader) Seek(offset int64, whence int) (int64, error) {
gcsReader.position = newOffset gcsReader.position = newOffset
return newOffset, nil return newOffset, nil
} }
func (gcsReader *GcsReader) Size() (int64, error) {
return gcsReader.objectSize, nil
}

View File

@ -71,7 +71,13 @@ func (lcm *LocalChunkManager) Path(ctx context.Context, filePath string) (string
} }
func (lcm *LocalChunkManager) Reader(ctx context.Context, filePath string) (FileReader, error) { func (lcm *LocalChunkManager) Reader(ctx context.Context, filePath string) (FileReader, error) {
return Open(filePath) file, err := Open(filePath)
if err != nil {
return nil, err
}
return &LocalReader{
File: file,
}, nil
} }
// Write writes the data to local storage. // Write writes the data to local storage.
@ -262,3 +268,15 @@ func (lcm *LocalChunkManager) RemoveWithPrefix(ctx context.Context, prefix strin
} }
return removeErr return removeErr
} }
type LocalReader struct {
*os.File
}
func (lr *LocalReader) Size() (int64, error) {
stat, err := lr.Stat()
if err != nil {
return -1, nil
}
return stat.Size(), nil
}

View File

@ -34,6 +34,18 @@ type MinioObjectStorage struct {
*minio.Client *minio.Client
} }
type ObjectReader struct {
*minio.Object
}
func (or *ObjectReader) Size() (int64, error) {
stat, err := or.Stat()
if err != nil {
return -1, err
}
return stat.Size, nil
}
func newMinioObjectStorageWithConfig(ctx context.Context, c *objectstorage.Config) (*MinioObjectStorage, error) { func newMinioObjectStorageWithConfig(ctx context.Context, c *objectstorage.Config) (*MinioObjectStorage, error) {
minIOClient, err := objectstorage.NewMinioClient(ctx, c) minIOClient, err := objectstorage.NewMinioClient(ctx, c)
if err != nil { if err != nil {
@ -55,7 +67,9 @@ func (minioObjectStorage *MinioObjectStorage) GetObject(ctx context.Context, buc
if err != nil { if err != nil {
return nil, checkObjectStorageError(objectName, err) return nil, checkObjectStorageError(objectName, err)
} }
return object, nil return &ObjectReader{
Object: object,
}, nil
} }
func (minioObjectStorage *MinioObjectStorage) PutObject(ctx context.Context, bucketName, objectName string, reader io.Reader, objectSize int64) error { func (minioObjectStorage *MinioObjectStorage) PutObject(ctx context.Context, bucketName, objectName string, reader io.Reader, objectSize int64) error {

View File

@ -199,7 +199,7 @@ func (mcm *RemoteChunkManager) Read(ctx context.Context, filePath string) ([]byt
log.Warn("failed to read object", zap.String("path", filePath), zap.Error(err)) log.Warn("failed to read object", zap.String("path", filePath), zap.Error(err))
return err return err
} }
size, err := mcm.getObjectSize(ctx, mcm.bucketName, filePath) size, err := object.Size()
if err != nil { if err != nil {
log.Warn("failed to stat object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err)) log.Warn("failed to stat object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err))
return err return err

View File

@ -39,6 +39,7 @@ type FileReader interface {
io.Closer io.Closer
io.ReaderAt io.ReaderAt
io.Seeker io.Seeker
Size() (int64, error)
} }
// ChunkObjectInfo is to store object info. // ChunkObjectInfo is to store object info.

View File

@ -37,6 +37,18 @@ import (
"github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/paramtable"
) )
type mockReader struct {
io.Reader
io.Closer
io.ReaderAt
io.Seeker
size int64
}
func (mr *mockReader) Size() (int64, error) {
return mr.size, nil
}
type ReaderSuite struct { type ReaderSuite struct {
suite.Suite suite.Suite
@ -130,12 +142,6 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data
jsonBytes, err := json.Marshal(rows) jsonBytes, err := json.Marshal(rows)
suite.NoError(err) suite.NoError(err)
type mockReader struct {
io.Reader
io.Closer
io.ReaderAt
io.Seeker
}
cm := mocks.NewChunkManager(suite.T()) cm := mocks.NewChunkManager(suite.T())
cm.EXPECT().Reader(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, s string) (storage.FileReader, error) { cm.EXPECT().Reader(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, s string) (storage.FileReader, error) {
r := &mockReader{Reader: strings.NewReader(string(jsonBytes))} r := &mockReader{Reader: strings.NewReader(string(jsonBytes))}
@ -205,12 +211,6 @@ func (suite *ReaderSuite) runWithDefaultValue(dataType schemapb.DataType, elemTy
jsonBytes, err := json.Marshal(rows) jsonBytes, err := json.Marshal(rows)
suite.NoError(err) suite.NoError(err)
type mockReader struct {
io.Reader
io.Closer
io.ReaderAt
io.Seeker
}
cm := mocks.NewChunkManager(suite.T()) cm := mocks.NewChunkManager(suite.T())
cm.EXPECT().Reader(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, s string) (storage.FileReader, error) { cm.EXPECT().Reader(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, s string) (storage.FileReader, error) {
r := &mockReader{Reader: strings.NewReader(string(jsonBytes))} r := &mockReader{Reader: strings.NewReader(string(jsonBytes))}

View File

@ -42,6 +42,18 @@ import (
"github.com/milvus-io/milvus/pkg/v2/util/typeutil" "github.com/milvus-io/milvus/pkg/v2/util/typeutil"
) )
type mockReader struct {
io.Reader
io.Closer
io.ReaderAt
io.Seeker
size int64
}
func (mr *mockReader) Size() (int64, error) {
return mr.size, nil
}
type ReaderSuite struct { type ReaderSuite struct {
suite.Suite suite.Suite
@ -145,13 +157,6 @@ func (suite *ReaderSuite) run(dt schemapb.DataType) {
} }
cm := mocks.NewChunkManager(suite.T()) cm := mocks.NewChunkManager(suite.T())
type mockReader struct {
io.Reader
io.Closer
io.ReaderAt
io.Seeker
}
var data interface{} var data interface{}
for fieldID, fieldData := range insertData.Data { for fieldID, fieldData := range insertData.Data {
dataType := fieldIDToField[fieldID].GetDataType() dataType := fieldIDToField[fieldID].GetDataType()
@ -287,13 +292,6 @@ func (suite *ReaderSuite) failRun(dt schemapb.DataType, isDynamic bool) {
} }
cm := mocks.NewChunkManager(suite.T()) cm := mocks.NewChunkManager(suite.T())
type mockReader struct {
io.Reader
io.Closer
io.ReaderAt
io.Seeker
}
var data interface{} var data interface{}
for fieldID, fieldData := range insertData.Data { for fieldID, fieldData := range insertData.Data {
dataType := fieldIDToField[fieldID].GetDataType() dataType := fieldIDToField[fieldID].GetDataType()