From dc8b5c1130a2727b259d789913555625cd765584 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Thu, 4 Jan 2024 20:50:46 +0800 Subject: [PATCH] enhance: Read azure file without ReadAll (#29602) issue: #29292 Signed-off-by: Cai Zhang --- internal/storage/azure_object_storage.go | 76 ++++++- internal/storage/azure_object_storage_test.go | 191 ++++++++++++++++++ internal/storage/file.go | 117 ----------- internal/storage/file_test.go | 88 -------- 4 files changed, 265 insertions(+), 207 deletions(-) delete mode 100644 internal/storage/file.go delete mode 100644 internal/storage/file_test.go diff --git a/internal/storage/azure_object_storage.go b/internal/storage/azure_object_storage.go index a773601295..b0324299e3 100644 --- a/internal/storage/azure_object_storage.go +++ b/internal/storage/azure_object_storage.go @@ -27,6 +27,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service" @@ -89,6 +90,77 @@ func newAzureObjectStorageWithConfig(ctx context.Context, c *config) (*AzureObje return &AzureObjectStorage{Client: client}, nil } +type BlobReader struct { + client *blockblob.Client + position int64 +} + +func NewBlobReader(client *blockblob.Client, offset int64) (*BlobReader, error) { + return &BlobReader{client: client, position: offset}, nil +} + +func (b *BlobReader) Read(p []byte) (n int, err error) { + ctx := context.TODO() + + opts := &azblob.DownloadStreamOptions{} + if b.position > 0 { + opts.Range = blob.HTTPRange{ + Offset: b.position, + } + } + object, err := b.client.DownloadStream(ctx, opts) + if err != nil { + return 0, err + } + n, err = object.Body.Read(p) + if err != nil { + return n, err + } + b.position += int64(n) + + return n, nil +} + +func (b *BlobReader) Close() error { + return nil +} + +func (b *BlobReader) ReadAt(p []byte, off int64) (n int, err error) { + httpRange := blob.HTTPRange{ + Offset: off, + } + object, err := b.client.DownloadStream(context.Background(), &blob.DownloadStreamOptions{ + Range: httpRange, + }) + if err != nil { + return 0, err + } + defer object.Body.Close() + return io.ReadFull(object.Body, p) +} + +func (b *BlobReader) Seek(offset int64, whence int) (int64, error) { + props, err := b.client.GetProperties(context.Background(), &blob.GetPropertiesOptions{}) + if err != nil { + return 0, err + } + size := *props.ContentLength + var newOffset int64 + switch whence { + case io.SeekStart: + newOffset = offset + case io.SeekCurrent: + newOffset = b.position + offset + case io.SeekEnd: + newOffset = size + offset + default: + return 0, merr.WrapErrIoFailedReason("invalid whence") + } + + b.position = newOffset + return newOffset, nil +} + func (AzureObjectStorage *AzureObjectStorage) GetObject(ctx context.Context, bucketName, objectName string, offset int64, size int64) (FileReader, error) { opts := azblob.DownloadStreamOptions{} if offset > 0 { @@ -97,11 +169,11 @@ func (AzureObjectStorage *AzureObjectStorage) GetObject(ctx context.Context, buc Count: size, } } - object, err := AzureObjectStorage.Client.NewContainerClient(bucketName).NewBlockBlobClient(objectName).DownloadStream(ctx, &opts) + _, err := AzureObjectStorage.Client.NewContainerClient(bucketName).NewBlockBlobClient(objectName).DownloadStream(ctx, &opts) if err != nil { return nil, checkObjectStorageError(objectName, err) } - return NewAzureFile(object.Body), nil + return NewBlobReader(AzureObjectStorage.Client.NewContainerClient(bucketName).NewBlockBlobClient(objectName), offset) } func (AzureObjectStorage *AzureObjectStorage) PutObject(ctx context.Context, bucketName, objectName string, reader io.Reader, objectSize int64) error { diff --git a/internal/storage/azure_object_storage_test.go b/internal/storage/azure_object_storage_test.go index 05483f5f97..90e0eed0af 100644 --- a/internal/storage/azure_object_storage_test.go +++ b/internal/storage/azure_object_storage_test.go @@ -216,3 +216,194 @@ func TestAzureObjectStorage(t *testing.T) { os.Setenv("AZURE_STORAGE_CONNECTION_STRING", connectionString) }) } + +func TestReadFile(t *testing.T) { + ctx := context.Background() + bucketName := Params.MinioCfg.BucketName.GetValue() + c := &config{ + bucketName: bucketName, + createBucket: true, + useIAM: false, + cloudProvider: "azure", + } + rcm, err := NewRemoteChunkManager(ctx, c) + + t.Run("Read", func(t *testing.T) { + filePath := "test-Read" + data := []byte("Test data for Read.") + + err = rcm.Write(ctx, filePath, data) + assert.NoError(t, err) + defer rcm.Remove(ctx, filePath) + + reader, err := rcm.Reader(ctx, filePath) + assert.NoError(t, err) + + buffer := make([]byte, 4) + n, err := reader.Read(buffer) + assert.NoError(t, err) + assert.Equal(t, 4, n) + assert.Equal(t, "Test", string(buffer)) + + buffer = make([]byte, 6) + n, err = reader.Read(buffer) + assert.NoError(t, err) + assert.Equal(t, 6, n) + assert.Equal(t, " data ", string(buffer)) + + buffer = make([]byte, 40) + n, err = reader.Read(buffer) + assert.Error(t, err) + assert.Equal(t, 9, n) + assert.Equal(t, "for Read.", string(buffer[:9])) + }) + + t.Run("ReadAt", func(t *testing.T) { + filePath := "test-ReadAt" + data := []byte("Test data for ReadAt.") + + err = rcm.Write(ctx, filePath, data) + assert.NoError(t, err) + defer rcm.Remove(ctx, filePath) + + reader, err := rcm.Reader(ctx, filePath) + assert.NoError(t, err) + + buffer := make([]byte, 4) + n, err := reader.ReadAt(buffer, 5) + assert.NoError(t, err) + assert.Equal(t, 4, n) + assert.Equal(t, "data", string(buffer)) + + buffer = make([]byte, 4) + n, err = reader.Read(buffer) + assert.NoError(t, err) + assert.Equal(t, 4, n) + assert.Equal(t, "Test", string(buffer)) + + buffer = make([]byte, 4) + n, err = reader.ReadAt(buffer, 20) + assert.Error(t, err) + assert.Equal(t, 1, n) + assert.Equal(t, ".", string(buffer[:1])) + + buffer = make([]byte, 4) + n, err = reader.ReadAt(buffer, 25) + assert.Error(t, err) + assert.Equal(t, 0, n) + }) + + t.Run("Seek start", func(t *testing.T) { + filePath := "test-SeekStart" + data := []byte("Test data for Seek start.") + + err = rcm.Write(ctx, filePath, data) + assert.NoError(t, err) + defer rcm.Remove(ctx, filePath) + + reader, err := rcm.Reader(ctx, filePath) + assert.NoError(t, err) + + offset, err := reader.Seek(10, io.SeekStart) + assert.NoError(t, err) + assert.Equal(t, int64(10), offset) + + buffer := make([]byte, 4) + n, err := reader.Read(buffer) + assert.NoError(t, err) + assert.Equal(t, 4, n) + assert.Equal(t, "for ", string(buffer)) + + offset, err = reader.Seek(40, io.SeekStart) + assert.NoError(t, err) + assert.Equal(t, int64(40), offset) + + buffer = make([]byte, 4) + n, err = reader.Read(buffer) + assert.Error(t, err) + assert.Equal(t, 0, n) + }) + + t.Run("Seek current", func(t *testing.T) { + filePath := "test-SeekStart" + data := []byte("Test data for Seek current.") + + err = rcm.Write(ctx, filePath, data) + assert.NoError(t, err) + defer rcm.Remove(ctx, filePath) + + reader, err := rcm.Reader(ctx, filePath) + assert.NoError(t, err) + + buffer := make([]byte, 4) + n, err := reader.Read(buffer) + assert.NoError(t, err) + assert.Equal(t, 4, n) + assert.Equal(t, "Test", string(buffer)) + + offset, err := reader.Seek(10, io.SeekCurrent) + assert.NoError(t, err) + assert.Equal(t, int64(14), offset) + + buffer = make([]byte, 4) + n, err = reader.Read(buffer) + assert.NoError(t, err) + assert.Equal(t, 4, n) + assert.Equal(t, "Seek", string(buffer)) + + offset, err = reader.Seek(40, io.SeekCurrent) + assert.NoError(t, err) + assert.Equal(t, int64(58), offset) + + buffer = make([]byte, 4) + n, err = reader.Read(buffer) + assert.Error(t, err) + assert.Equal(t, 0, n) + }) + + t.Run("Seek end", func(t *testing.T) { + filePath := "test-SeekEnd" + data := []byte("Test data for Seek end.") + + err = rcm.Write(ctx, filePath, data) + assert.NoError(t, err) + defer rcm.Remove(ctx, filePath) + + reader, err := rcm.Reader(ctx, filePath) + assert.NoError(t, err) + + buffer := make([]byte, 4) + n, err := reader.Read(buffer) + assert.NoError(t, err) + assert.Equal(t, 4, n) + assert.Equal(t, "Test", string(buffer)) + + offset, err := reader.Seek(10, io.SeekEnd) + assert.NoError(t, err) + assert.Equal(t, int64(33), offset) + + buffer = make([]byte, 4) + n, err = reader.Read(buffer) + assert.Error(t, err) + assert.Equal(t, 0, n) + + offset, err = reader.Seek(10, 3) + assert.Error(t, err) + assert.Equal(t, int64(0), offset) + }) + + t.Run("Close", func(t *testing.T) { + filePath := "test-Close" + data := []byte("Test data for Close.") + + err = rcm.Write(ctx, filePath, data) + assert.NoError(t, err) + defer rcm.Remove(ctx, filePath) + + reader, err := rcm.Reader(ctx, filePath) + assert.NoError(t, err) + + err = reader.Close() + assert.NoError(t, err) + }) +} diff --git a/internal/storage/file.go b/internal/storage/file.go deleted file mode 100644 index d27c1fcd85..0000000000 --- a/internal/storage/file.go +++ /dev/null @@ -1,117 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package storage - -import ( - "io" - - "github.com/cockroachdb/errors" - "go.uber.org/zap" - - "github.com/milvus-io/milvus/pkg/log" -) - -var errInvalid = errors.New("invalid argument") - -// MemoryFile implements the FileReader interface -type MemoryFile struct { - data []byte - position int -} - -// NewMemoryFile creates a new instance of MemoryFile -func NewMemoryFile(data []byte) *MemoryFile { - return &MemoryFile{data: data} -} - -// ReadAt implements the ReadAt method of the io.ReaderAt interface -func (mf *MemoryFile) ReadAt(p []byte, off int64) (n int, err error) { - if off < 0 || int64(int(off)) < off { - return 0, errInvalid - } - if off > int64(len(mf.data)) { - return 0, io.EOF - } - n = copy(p, mf.data[off:]) - mf.position += n - if n < len(p) { - return n, io.EOF - } - return n, nil -} - -// Seek implements the Seek method of the io.Seeker interface -func (mf *MemoryFile) Seek(offset int64, whence int) (int64, error) { - var newOffset int64 - switch whence { - case io.SeekStart: - newOffset = offset - case io.SeekCurrent: - newOffset = int64(mf.position) + offset - case io.SeekEnd: - newOffset = int64(len(mf.data)) + offset - default: - return 0, errInvalid - } - if newOffset < 0 { - return 0, errInvalid - } - mf.position = int(newOffset) - return newOffset, nil -} - -// Read implements the Read method of the io.Reader interface -func (mf *MemoryFile) Read(p []byte) (n int, err error) { - if mf.position >= len(mf.data) { - return 0, io.EOF - } - n = copy(p, mf.data[mf.position:]) - mf.position += n - return n, nil -} - -// Write implements the Write method of the io.Writer interface -func (mf *MemoryFile) Write(p []byte) (n int, err error) { - // Write data to memory - mf.data = append(mf.data, p...) - return len(p), nil -} - -// Close implements the Close method of the io.Closer interface -func (mf *MemoryFile) Close() error { - // Memory file does not need a close operation - return nil -} - -type AzureFile struct { - *MemoryFile -} - -func NewAzureFile(body io.ReadCloser) *AzureFile { - data, err := io.ReadAll(body) - defer body.Close() - if err != nil && err != io.EOF { - log.Warn("create azure file failed, read data failed", zap.Error(err)) - return &AzureFile{ - NewMemoryFile(nil), - } - } - - return &AzureFile{ - NewMemoryFile(data), - } -} diff --git a/internal/storage/file_test.go b/internal/storage/file_test.go deleted file mode 100644 index 64a8b45095..0000000000 --- a/internal/storage/file_test.go +++ /dev/null @@ -1,88 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package storage - -import ( - "bytes" - "io" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestAzureFile(t *testing.T) { - t.Run("Read", func(t *testing.T) { - data := []byte("Test data for Read.") - azureFile := NewAzureFile(io.NopCloser(bytes.NewReader(data))) - buffer := make([]byte, 4) - n, err := azureFile.Read(buffer) - assert.NoError(t, err) - assert.Equal(t, 4, n) - assert.Equal(t, "Test", string(buffer)) - - buffer = make([]byte, 6) - n, err = azureFile.Read(buffer) - assert.NoError(t, err) - assert.Equal(t, 6, n) - assert.Equal(t, " data ", string(buffer)) - }) - - t.Run("ReadAt", func(t *testing.T) { - data := []byte("Test data for ReadAt.") - azureFile := NewAzureFile(io.NopCloser(bytes.NewReader(data))) - buffer := make([]byte, 4) - n, err := azureFile.ReadAt(buffer, 5) - assert.NoError(t, err) - assert.Equal(t, 4, n) - assert.Equal(t, "data", string(buffer)) - }) - - t.Run("Seek start", func(t *testing.T) { - data := []byte("Test data for Seek.") - azureFile := NewAzureFile(io.NopCloser(bytes.NewReader(data))) - offset, err := azureFile.Seek(10, io.SeekStart) - assert.NoError(t, err) - assert.Equal(t, int64(10), offset) - buffer := make([]byte, 4) - - n, err := azureFile.Read(buffer) - assert.NoError(t, err) - assert.Equal(t, 4, n) - assert.Equal(t, "for ", string(buffer)) - }) - - t.Run("Seek current", func(t *testing.T) { - data := []byte("Test data for Seek.") - azureFile := NewAzureFile(io.NopCloser(bytes.NewReader(data))) - - buffer := make([]byte, 4) - n, err := azureFile.Read(buffer) - assert.NoError(t, err) - assert.Equal(t, 4, n) - assert.Equal(t, "Test", string(buffer)) - - offset, err := azureFile.Seek(10, io.SeekCurrent) - assert.NoError(t, err) - assert.Equal(t, int64(14), offset) - - buffer = make([]byte, 4) - n, err = azureFile.Read(buffer) - assert.NoError(t, err) - assert.Equal(t, 4, n) - assert.Equal(t, "Seek", string(buffer)) - }) -}