From ebe1c95bb13fc26104a0ca82a8e80ff93cd8f7b2 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Wed, 25 Jun 2025 14:36:41 +0800 Subject: [PATCH] enhance: Add Size interface to FileReader to eliminate the StatObject call during Read (#42908) issue: #42907 --------- Signed-off-by: Cai Zhang --- internal/datanode/importv2/scheduler_test.go | 11 ++-- internal/mocks/mock_chunk_manager.go | 64 ++++++++++++++++++- internal/storage/azure_object_storage.go | 10 ++- internal/storage/gcp_native_object_storage.go | 18 ++++-- internal/storage/local_chunk_manager.go | 20 +++++- internal/storage/minio_object_storage.go | 16 ++++- internal/storage/remote_chunk_manager.go | 2 +- internal/storage/types.go | 1 + .../util/importutilv2/json/reader_test.go | 24 +++---- .../util/importutilv2/numpy/reader_test.go | 26 ++++---- 10 files changed, 149 insertions(+), 43 deletions(-) diff --git a/internal/datanode/importv2/scheduler_test.go b/internal/datanode/importv2/scheduler_test.go index 82255b117a..4c4879931e 100644 --- a/internal/datanode/importv2/scheduler_test.go +++ b/internal/datanode/importv2/scheduler_test.go @@ -60,6 +60,11 @@ type mockReader struct { io.Closer io.ReaderAt io.Seeker + size int64 +} + +func (mr *mockReader) Size() (int64, error) { + return mr.size, nil } type SchedulerSuite struct { @@ -197,12 +202,6 @@ func (s *SchedulerSuite) TestScheduler_Start_Preimport_Failed() { s.NoError(err) cm := mocks.NewChunkManager(s.T()) - type mockReader struct { - io.Reader - io.Closer - io.ReaderAt - io.Seeker - } ioReader := strings.NewReader(string(bytes)) cm.EXPECT().Size(mock.Anything, mock.Anything).Return(1024, nil) cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader, Closer: io.NopCloser(ioReader)}, nil) diff --git a/internal/mocks/mock_chunk_manager.go b/internal/mocks/mock_chunk_manager.go index 966f399215..5323bab9e8 100644 --- a/internal/mocks/mock_chunk_manager.go +++ b/internal/mocks/mock_chunk_manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.32.4. DO NOT EDIT. +// Code generated by mockery v2.53.3. DO NOT EDIT. package mocks @@ -29,6 +29,10 @@ func (_m *ChunkManager) EXPECT() *ChunkManager_Expecter { func (_m *ChunkManager) Exist(ctx context.Context, filePath string) (bool, error) { ret := _m.Called(ctx, filePath) + if len(ret) == 0 { + panic("no return value specified for Exist") + } + var r0 bool var r1 error if rf, ok := ret.Get(0).(func(context.Context, string) (bool, error)); ok { @@ -82,6 +86,10 @@ func (_c *ChunkManager_Exist_Call) RunAndReturn(run func(context.Context, string func (_m *ChunkManager) Mmap(ctx context.Context, filePath string) (*mmap.ReaderAt, error) { ret := _m.Called(ctx, filePath) + if len(ret) == 0 { + panic("no return value specified for Mmap") + } + var r0 *mmap.ReaderAt var r1 error if rf, ok := ret.Get(0).(func(context.Context, string) (*mmap.ReaderAt, error)); ok { @@ -137,6 +145,10 @@ func (_c *ChunkManager_Mmap_Call) RunAndReturn(run func(context.Context, string) func (_m *ChunkManager) MultiRead(ctx context.Context, filePaths []string) ([][]byte, error) { ret := _m.Called(ctx, filePaths) + if len(ret) == 0 { + panic("no return value specified for MultiRead") + } + var r0 [][]byte var r1 error if rf, ok := ret.Get(0).(func(context.Context, []string) ([][]byte, error)); ok { @@ -192,6 +204,10 @@ func (_c *ChunkManager_MultiRead_Call) RunAndReturn(run func(context.Context, [] func (_m *ChunkManager) MultiRemove(ctx context.Context, filePaths []string) error { ret := _m.Called(ctx, filePaths) + if len(ret) == 0 { + panic("no return value specified for MultiRemove") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, []string) error); ok { r0 = rf(ctx, filePaths) @@ -235,6 +251,10 @@ func (_c *ChunkManager_MultiRemove_Call) RunAndReturn(run func(context.Context, func (_m *ChunkManager) MultiWrite(ctx context.Context, contents map[string][]byte) error { ret := _m.Called(ctx, contents) + if len(ret) == 0 { + panic("no return value specified for MultiWrite") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, map[string][]byte) error); ok { r0 = rf(ctx, contents) @@ -278,6 +298,10 @@ func (_c *ChunkManager_MultiWrite_Call) RunAndReturn(run func(context.Context, m func (_m *ChunkManager) Path(ctx context.Context, filePath string) (string, error) { ret := _m.Called(ctx, filePath) + if len(ret) == 0 { + panic("no return value specified for Path") + } + var r0 string var r1 error if rf, ok := ret.Get(0).(func(context.Context, string) (string, error)); ok { @@ -331,6 +355,10 @@ func (_c *ChunkManager_Path_Call) RunAndReturn(run func(context.Context, string) func (_m *ChunkManager) Read(ctx context.Context, filePath string) ([]byte, error) { ret := _m.Called(ctx, filePath) + if len(ret) == 0 { + panic("no return value specified for Read") + } + var r0 []byte var r1 error if rf, ok := ret.Get(0).(func(context.Context, string) ([]byte, error)); ok { @@ -386,6 +414,10 @@ func (_c *ChunkManager_Read_Call) RunAndReturn(run func(context.Context, string) func (_m *ChunkManager) ReadAt(ctx context.Context, filePath string, off int64, length int64) ([]byte, error) { ret := _m.Called(ctx, filePath, off, length) + if len(ret) == 0 { + panic("no return value specified for ReadAt") + } + var r0 []byte var r1 error if rf, ok := ret.Get(0).(func(context.Context, string, int64, int64) ([]byte, error)); ok { @@ -443,6 +475,10 @@ func (_c *ChunkManager_ReadAt_Call) RunAndReturn(run func(context.Context, strin func (_m *ChunkManager) Reader(ctx context.Context, filePath string) (storage.FileReader, error) { ret := _m.Called(ctx, filePath) + if len(ret) == 0 { + panic("no return value specified for Reader") + } + var r0 storage.FileReader var r1 error if rf, ok := ret.Get(0).(func(context.Context, string) (storage.FileReader, error)); ok { @@ -498,6 +534,10 @@ func (_c *ChunkManager_Reader_Call) RunAndReturn(run func(context.Context, strin func (_m *ChunkManager) Remove(ctx context.Context, filePath string) error { ret := _m.Called(ctx, filePath) + if len(ret) == 0 { + panic("no return value specified for Remove") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { r0 = rf(ctx, filePath) @@ -541,6 +581,10 @@ func (_c *ChunkManager_Remove_Call) RunAndReturn(run func(context.Context, strin func (_m *ChunkManager) RemoveWithPrefix(ctx context.Context, prefix string) error { ret := _m.Called(ctx, prefix) + if len(ret) == 0 { + panic("no return value specified for RemoveWithPrefix") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { r0 = rf(ctx, prefix) @@ -580,10 +624,14 @@ func (_c *ChunkManager_RemoveWithPrefix_Call) RunAndReturn(run func(context.Cont return _c } -// RootPath provides a mock function with given fields: +// RootPath provides a mock function with no fields func (_m *ChunkManager) RootPath() string { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for RootPath") + } + var r0 string if rf, ok := ret.Get(0).(func() string); ok { r0 = rf() @@ -625,6 +673,10 @@ func (_c *ChunkManager_RootPath_Call) RunAndReturn(run func() string) *ChunkMana func (_m *ChunkManager) Size(ctx context.Context, filePath string) (int64, error) { ret := _m.Called(ctx, filePath) + if len(ret) == 0 { + panic("no return value specified for Size") + } + var r0 int64 var r1 error if rf, ok := ret.Get(0).(func(context.Context, string) (int64, error)); ok { @@ -678,6 +730,10 @@ func (_c *ChunkManager_Size_Call) RunAndReturn(run func(context.Context, string) func (_m *ChunkManager) WalkWithPrefix(ctx context.Context, prefix string, recursive bool, walkFunc storage.ChunkObjectWalkFunc) error { ret := _m.Called(ctx, prefix, recursive, walkFunc) + if len(ret) == 0 { + panic("no return value specified for WalkWithPrefix") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, bool, storage.ChunkObjectWalkFunc) error); ok { r0 = rf(ctx, prefix, recursive, walkFunc) @@ -723,6 +779,10 @@ func (_c *ChunkManager_WalkWithPrefix_Call) RunAndReturn(run func(context.Contex func (_m *ChunkManager) Write(ctx context.Context, filePath string, content []byte) error { ret := _m.Called(ctx, filePath, content) + if len(ret) == 0 { + panic("no return value specified for Write") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, []byte) error); ok { r0 = rf(ctx, filePath, content) diff --git a/internal/storage/azure_object_storage.go b/internal/storage/azure_object_storage.go index ace65e6617..2091c0d14c 100644 --- a/internal/storage/azure_object_storage.go +++ b/internal/storage/azure_object_storage.go @@ -49,6 +49,7 @@ type BlobReader struct { client *blockblob.Client position int64 body io.ReadCloser + contentLength int64 needResetStream bool } @@ -70,6 +71,7 @@ func (b *BlobReader) Read(p []byte) (n int, err error) { return 0, err } b.body = object.Body + b.contentLength = *object.ContentLength } n, err = b.body.Read(p) @@ -126,6 +128,10 @@ func (b *BlobReader) Seek(offset int64, whence int) (int64, error) { return newOffset, nil } +func (b *BlobReader) Size() (int64, error) { + return b.contentLength, nil +} + func (AzureObjectStorage *AzureObjectStorage) GetObject(ctx context.Context, bucketName, objectName string, offset int64, size int64) (FileReader, error) { return NewBlobReader(AzureObjectStorage.Client.NewContainerClient(bucketName).NewBlockBlobClient(objectName), offset) } @@ -148,7 +154,7 @@ func (AzureObjectStorage *AzureObjectStorage) WalkWithObjects(ctx context.Contex pager := AzureObjectStorage.Client.NewContainerClient(bucketName).NewListBlobsFlatPager(&azblob.ListBlobsFlatOptions{ Prefix: &prefix, }) - if pager.More() { + for pager.More() { pageResp, err := pager.NextPage(ctx) if err != nil { return err @@ -163,7 +169,7 @@ func (AzureObjectStorage *AzureObjectStorage) WalkWithObjects(ctx context.Contex pager := AzureObjectStorage.Client.NewContainerClient(bucketName).NewListBlobsHierarchyPager("/", &container.ListBlobsHierarchyOptions{ Prefix: &prefix, }) - if pager.More() { + for pager.More() { pageResp, err := pager.NextPage(ctx) if err != nil { return err diff --git a/internal/storage/gcp_native_object_storage.go b/internal/storage/gcp_native_object_storage.go index 7a11ba7e16..917364e312 100644 --- a/internal/storage/gcp_native_object_storage.go +++ b/internal/storage/gcp_native_object_storage.go @@ -64,7 +64,12 @@ func (gcs *GcpNativeObjectStorage) GetObject(ctx context.Context, bucketName, ob if err != nil { return nil, checkObjectStorageError(objectName, err) } - return &GcsReader{reader: reader, obj: obj}, nil + + return &GcsReader{ + reader: reader, + obj: obj, + objectSize: reader.Attrs.Size, + }, nil } func (gcs *GcpNativeObjectStorage) PutObject(ctx context.Context, bucketName, objectName string, @@ -165,9 +170,10 @@ func (gcs *GcpNativeObjectStorage) DeleteBucket(ctx context.Context, bucketName } type GcsReader struct { - reader *storage.Reader - obj *storage.ObjectHandle - position int64 + reader *storage.Reader + obj *storage.ObjectHandle + position int64 + objectSize int64 } func (gcsReader *GcsReader) Read(p []byte) (n int, err error) { @@ -237,3 +243,7 @@ func (gcsReader *GcsReader) Seek(offset int64, whence int) (int64, error) { gcsReader.position = newOffset return newOffset, nil } + +func (gcsReader *GcsReader) Size() (int64, error) { + return gcsReader.objectSize, nil +} diff --git a/internal/storage/local_chunk_manager.go b/internal/storage/local_chunk_manager.go index f00d2450b1..da3159854c 100644 --- a/internal/storage/local_chunk_manager.go +++ b/internal/storage/local_chunk_manager.go @@ -71,7 +71,13 @@ func (lcm *LocalChunkManager) Path(ctx context.Context, filePath string) (string } func (lcm *LocalChunkManager) Reader(ctx context.Context, filePath string) (FileReader, error) { - return Open(filePath) + file, err := Open(filePath) + if err != nil { + return nil, err + } + return &LocalReader{ + File: file, + }, nil } // Write writes the data to local storage. @@ -262,3 +268,15 @@ func (lcm *LocalChunkManager) RemoveWithPrefix(ctx context.Context, prefix strin } return removeErr } + +type LocalReader struct { + *os.File +} + +func (lr *LocalReader) Size() (int64, error) { + stat, err := lr.Stat() + if err != nil { + return -1, nil + } + return stat.Size(), nil +} diff --git a/internal/storage/minio_object_storage.go b/internal/storage/minio_object_storage.go index 9eb51ae385..b0f6f1040a 100644 --- a/internal/storage/minio_object_storage.go +++ b/internal/storage/minio_object_storage.go @@ -34,6 +34,18 @@ type MinioObjectStorage struct { *minio.Client } +type ObjectReader struct { + *minio.Object +} + +func (or *ObjectReader) Size() (int64, error) { + stat, err := or.Stat() + if err != nil { + return -1, err + } + return stat.Size, nil +} + func newMinioObjectStorageWithConfig(ctx context.Context, c *objectstorage.Config) (*MinioObjectStorage, error) { minIOClient, err := objectstorage.NewMinioClient(ctx, c) if err != nil { @@ -55,7 +67,9 @@ func (minioObjectStorage *MinioObjectStorage) GetObject(ctx context.Context, buc if err != nil { return nil, checkObjectStorageError(objectName, err) } - return object, nil + return &ObjectReader{ + Object: object, + }, nil } func (minioObjectStorage *MinioObjectStorage) PutObject(ctx context.Context, bucketName, objectName string, reader io.Reader, objectSize int64) error { diff --git a/internal/storage/remote_chunk_manager.go b/internal/storage/remote_chunk_manager.go index 3c3dfae763..ce981c5421 100644 --- a/internal/storage/remote_chunk_manager.go +++ b/internal/storage/remote_chunk_manager.go @@ -199,7 +199,7 @@ func (mcm *RemoteChunkManager) Read(ctx context.Context, filePath string) ([]byt log.Warn("failed to read object", zap.String("path", filePath), zap.Error(err)) return err } - size, err := mcm.getObjectSize(ctx, mcm.bucketName, filePath) + size, err := object.Size() if err != nil { log.Warn("failed to stat object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err)) return err diff --git a/internal/storage/types.go b/internal/storage/types.go index 2b00215fbc..e634f150bc 100644 --- a/internal/storage/types.go +++ b/internal/storage/types.go @@ -39,6 +39,7 @@ type FileReader interface { io.Closer io.ReaderAt io.Seeker + Size() (int64, error) } // ChunkObjectInfo is to store object info. diff --git a/internal/util/importutilv2/json/reader_test.go b/internal/util/importutilv2/json/reader_test.go index aa5ac2432f..f4532a0417 100644 --- a/internal/util/importutilv2/json/reader_test.go +++ b/internal/util/importutilv2/json/reader_test.go @@ -37,6 +37,18 @@ import ( "github.com/milvus-io/milvus/pkg/v2/util/paramtable" ) +type mockReader struct { + io.Reader + io.Closer + io.ReaderAt + io.Seeker + size int64 +} + +func (mr *mockReader) Size() (int64, error) { + return mr.size, nil +} + type ReaderSuite struct { suite.Suite @@ -130,12 +142,6 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data jsonBytes, err := json.Marshal(rows) suite.NoError(err) - type mockReader struct { - io.Reader - io.Closer - io.ReaderAt - io.Seeker - } cm := mocks.NewChunkManager(suite.T()) cm.EXPECT().Reader(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, s string) (storage.FileReader, error) { r := &mockReader{Reader: strings.NewReader(string(jsonBytes))} @@ -205,12 +211,6 @@ func (suite *ReaderSuite) runWithDefaultValue(dataType schemapb.DataType, elemTy jsonBytes, err := json.Marshal(rows) suite.NoError(err) - type mockReader struct { - io.Reader - io.Closer - io.ReaderAt - io.Seeker - } cm := mocks.NewChunkManager(suite.T()) cm.EXPECT().Reader(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, s string) (storage.FileReader, error) { r := &mockReader{Reader: strings.NewReader(string(jsonBytes))} diff --git a/internal/util/importutilv2/numpy/reader_test.go b/internal/util/importutilv2/numpy/reader_test.go index d83b1219d9..cc63fff36c 100644 --- a/internal/util/importutilv2/numpy/reader_test.go +++ b/internal/util/importutilv2/numpy/reader_test.go @@ -42,6 +42,18 @@ import ( "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) +type mockReader struct { + io.Reader + io.Closer + io.ReaderAt + io.Seeker + size int64 +} + +func (mr *mockReader) Size() (int64, error) { + return mr.size, nil +} + type ReaderSuite struct { suite.Suite @@ -145,13 +157,6 @@ func (suite *ReaderSuite) run(dt schemapb.DataType) { } cm := mocks.NewChunkManager(suite.T()) - type mockReader struct { - io.Reader - io.Closer - io.ReaderAt - io.Seeker - } - var data interface{} for fieldID, fieldData := range insertData.Data { dataType := fieldIDToField[fieldID].GetDataType() @@ -287,13 +292,6 @@ func (suite *ReaderSuite) failRun(dt schemapb.DataType, isDynamic bool) { } cm := mocks.NewChunkManager(suite.T()) - type mockReader struct { - io.Reader - io.Closer - io.ReaderAt - io.Seeker - } - var data interface{} for fieldID, fieldData := range insertData.Data { dataType := fieldIDToField[fieldID].GetDataType()