enhance: Read azure file without ReadAll (#29602)

issue: #29292

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
cai.zhang 2024-01-04 20:50:46 +08:00 committed by GitHub
parent 05d735c322
commit dc8b5c1130
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 265 additions and 207 deletions

View File

@ -27,6 +27,7 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
@ -89,6 +90,77 @@ func newAzureObjectStorageWithConfig(ctx context.Context, c *config) (*AzureObje
return &AzureObjectStorage{Client: client}, nil
}
type BlobReader struct {
client *blockblob.Client
position int64
}
func NewBlobReader(client *blockblob.Client, offset int64) (*BlobReader, error) {
return &BlobReader{client: client, position: offset}, nil
}
func (b *BlobReader) Read(p []byte) (n int, err error) {
ctx := context.TODO()
opts := &azblob.DownloadStreamOptions{}
if b.position > 0 {
opts.Range = blob.HTTPRange{
Offset: b.position,
}
}
object, err := b.client.DownloadStream(ctx, opts)
if err != nil {
return 0, err
}
n, err = object.Body.Read(p)
if err != nil {
return n, err
}
b.position += int64(n)
return n, nil
}
func (b *BlobReader) Close() error {
return nil
}
func (b *BlobReader) ReadAt(p []byte, off int64) (n int, err error) {
httpRange := blob.HTTPRange{
Offset: off,
}
object, err := b.client.DownloadStream(context.Background(), &blob.DownloadStreamOptions{
Range: httpRange,
})
if err != nil {
return 0, err
}
defer object.Body.Close()
return io.ReadFull(object.Body, p)
}
func (b *BlobReader) Seek(offset int64, whence int) (int64, error) {
props, err := b.client.GetProperties(context.Background(), &blob.GetPropertiesOptions{})
if err != nil {
return 0, err
}
size := *props.ContentLength
var newOffset int64
switch whence {
case io.SeekStart:
newOffset = offset
case io.SeekCurrent:
newOffset = b.position + offset
case io.SeekEnd:
newOffset = size + offset
default:
return 0, merr.WrapErrIoFailedReason("invalid whence")
}
b.position = newOffset
return newOffset, nil
}
func (AzureObjectStorage *AzureObjectStorage) GetObject(ctx context.Context, bucketName, objectName string, offset int64, size int64) (FileReader, error) {
opts := azblob.DownloadStreamOptions{}
if offset > 0 {
@ -97,11 +169,11 @@ func (AzureObjectStorage *AzureObjectStorage) GetObject(ctx context.Context, buc
Count: size,
}
}
object, err := AzureObjectStorage.Client.NewContainerClient(bucketName).NewBlockBlobClient(objectName).DownloadStream(ctx, &opts)
_, err := AzureObjectStorage.Client.NewContainerClient(bucketName).NewBlockBlobClient(objectName).DownloadStream(ctx, &opts)
if err != nil {
return nil, checkObjectStorageError(objectName, err)
}
return NewAzureFile(object.Body), nil
return NewBlobReader(AzureObjectStorage.Client.NewContainerClient(bucketName).NewBlockBlobClient(objectName), offset)
}
func (AzureObjectStorage *AzureObjectStorage) PutObject(ctx context.Context, bucketName, objectName string, reader io.Reader, objectSize int64) error {

View File

@ -216,3 +216,194 @@ func TestAzureObjectStorage(t *testing.T) {
os.Setenv("AZURE_STORAGE_CONNECTION_STRING", connectionString)
})
}
func TestReadFile(t *testing.T) {
ctx := context.Background()
bucketName := Params.MinioCfg.BucketName.GetValue()
c := &config{
bucketName: bucketName,
createBucket: true,
useIAM: false,
cloudProvider: "azure",
}
rcm, err := NewRemoteChunkManager(ctx, c)
t.Run("Read", func(t *testing.T) {
filePath := "test-Read"
data := []byte("Test data for Read.")
err = rcm.Write(ctx, filePath, data)
assert.NoError(t, err)
defer rcm.Remove(ctx, filePath)
reader, err := rcm.Reader(ctx, filePath)
assert.NoError(t, err)
buffer := make([]byte, 4)
n, err := reader.Read(buffer)
assert.NoError(t, err)
assert.Equal(t, 4, n)
assert.Equal(t, "Test", string(buffer))
buffer = make([]byte, 6)
n, err = reader.Read(buffer)
assert.NoError(t, err)
assert.Equal(t, 6, n)
assert.Equal(t, " data ", string(buffer))
buffer = make([]byte, 40)
n, err = reader.Read(buffer)
assert.Error(t, err)
assert.Equal(t, 9, n)
assert.Equal(t, "for Read.", string(buffer[:9]))
})
t.Run("ReadAt", func(t *testing.T) {
filePath := "test-ReadAt"
data := []byte("Test data for ReadAt.")
err = rcm.Write(ctx, filePath, data)
assert.NoError(t, err)
defer rcm.Remove(ctx, filePath)
reader, err := rcm.Reader(ctx, filePath)
assert.NoError(t, err)
buffer := make([]byte, 4)
n, err := reader.ReadAt(buffer, 5)
assert.NoError(t, err)
assert.Equal(t, 4, n)
assert.Equal(t, "data", string(buffer))
buffer = make([]byte, 4)
n, err = reader.Read(buffer)
assert.NoError(t, err)
assert.Equal(t, 4, n)
assert.Equal(t, "Test", string(buffer))
buffer = make([]byte, 4)
n, err = reader.ReadAt(buffer, 20)
assert.Error(t, err)
assert.Equal(t, 1, n)
assert.Equal(t, ".", string(buffer[:1]))
buffer = make([]byte, 4)
n, err = reader.ReadAt(buffer, 25)
assert.Error(t, err)
assert.Equal(t, 0, n)
})
t.Run("Seek start", func(t *testing.T) {
filePath := "test-SeekStart"
data := []byte("Test data for Seek start.")
err = rcm.Write(ctx, filePath, data)
assert.NoError(t, err)
defer rcm.Remove(ctx, filePath)
reader, err := rcm.Reader(ctx, filePath)
assert.NoError(t, err)
offset, err := reader.Seek(10, io.SeekStart)
assert.NoError(t, err)
assert.Equal(t, int64(10), offset)
buffer := make([]byte, 4)
n, err := reader.Read(buffer)
assert.NoError(t, err)
assert.Equal(t, 4, n)
assert.Equal(t, "for ", string(buffer))
offset, err = reader.Seek(40, io.SeekStart)
assert.NoError(t, err)
assert.Equal(t, int64(40), offset)
buffer = make([]byte, 4)
n, err = reader.Read(buffer)
assert.Error(t, err)
assert.Equal(t, 0, n)
})
t.Run("Seek current", func(t *testing.T) {
filePath := "test-SeekStart"
data := []byte("Test data for Seek current.")
err = rcm.Write(ctx, filePath, data)
assert.NoError(t, err)
defer rcm.Remove(ctx, filePath)
reader, err := rcm.Reader(ctx, filePath)
assert.NoError(t, err)
buffer := make([]byte, 4)
n, err := reader.Read(buffer)
assert.NoError(t, err)
assert.Equal(t, 4, n)
assert.Equal(t, "Test", string(buffer))
offset, err := reader.Seek(10, io.SeekCurrent)
assert.NoError(t, err)
assert.Equal(t, int64(14), offset)
buffer = make([]byte, 4)
n, err = reader.Read(buffer)
assert.NoError(t, err)
assert.Equal(t, 4, n)
assert.Equal(t, "Seek", string(buffer))
offset, err = reader.Seek(40, io.SeekCurrent)
assert.NoError(t, err)
assert.Equal(t, int64(58), offset)
buffer = make([]byte, 4)
n, err = reader.Read(buffer)
assert.Error(t, err)
assert.Equal(t, 0, n)
})
t.Run("Seek end", func(t *testing.T) {
filePath := "test-SeekEnd"
data := []byte("Test data for Seek end.")
err = rcm.Write(ctx, filePath, data)
assert.NoError(t, err)
defer rcm.Remove(ctx, filePath)
reader, err := rcm.Reader(ctx, filePath)
assert.NoError(t, err)
buffer := make([]byte, 4)
n, err := reader.Read(buffer)
assert.NoError(t, err)
assert.Equal(t, 4, n)
assert.Equal(t, "Test", string(buffer))
offset, err := reader.Seek(10, io.SeekEnd)
assert.NoError(t, err)
assert.Equal(t, int64(33), offset)
buffer = make([]byte, 4)
n, err = reader.Read(buffer)
assert.Error(t, err)
assert.Equal(t, 0, n)
offset, err = reader.Seek(10, 3)
assert.Error(t, err)
assert.Equal(t, int64(0), offset)
})
t.Run("Close", func(t *testing.T) {
filePath := "test-Close"
data := []byte("Test data for Close.")
err = rcm.Write(ctx, filePath, data)
assert.NoError(t, err)
defer rcm.Remove(ctx, filePath)
reader, err := rcm.Reader(ctx, filePath)
assert.NoError(t, err)
err = reader.Close()
assert.NoError(t, err)
})
}

View File

@ -1,117 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"io"
"github.com/cockroachdb/errors"
"go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/log"
)
var errInvalid = errors.New("invalid argument")
// MemoryFile implements the FileReader interface
type MemoryFile struct {
data []byte
position int
}
// NewMemoryFile creates a new instance of MemoryFile
func NewMemoryFile(data []byte) *MemoryFile {
return &MemoryFile{data: data}
}
// ReadAt implements the ReadAt method of the io.ReaderAt interface
func (mf *MemoryFile) ReadAt(p []byte, off int64) (n int, err error) {
if off < 0 || int64(int(off)) < off {
return 0, errInvalid
}
if off > int64(len(mf.data)) {
return 0, io.EOF
}
n = copy(p, mf.data[off:])
mf.position += n
if n < len(p) {
return n, io.EOF
}
return n, nil
}
// Seek implements the Seek method of the io.Seeker interface
func (mf *MemoryFile) Seek(offset int64, whence int) (int64, error) {
var newOffset int64
switch whence {
case io.SeekStart:
newOffset = offset
case io.SeekCurrent:
newOffset = int64(mf.position) + offset
case io.SeekEnd:
newOffset = int64(len(mf.data)) + offset
default:
return 0, errInvalid
}
if newOffset < 0 {
return 0, errInvalid
}
mf.position = int(newOffset)
return newOffset, nil
}
// Read implements the Read method of the io.Reader interface
func (mf *MemoryFile) Read(p []byte) (n int, err error) {
if mf.position >= len(mf.data) {
return 0, io.EOF
}
n = copy(p, mf.data[mf.position:])
mf.position += n
return n, nil
}
// Write implements the Write method of the io.Writer interface
func (mf *MemoryFile) Write(p []byte) (n int, err error) {
// Write data to memory
mf.data = append(mf.data, p...)
return len(p), nil
}
// Close implements the Close method of the io.Closer interface
func (mf *MemoryFile) Close() error {
// Memory file does not need a close operation
return nil
}
type AzureFile struct {
*MemoryFile
}
func NewAzureFile(body io.ReadCloser) *AzureFile {
data, err := io.ReadAll(body)
defer body.Close()
if err != nil && err != io.EOF {
log.Warn("create azure file failed, read data failed", zap.Error(err))
return &AzureFile{
NewMemoryFile(nil),
}
}
return &AzureFile{
NewMemoryFile(data),
}
}

View File

@ -1,88 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"bytes"
"io"
"testing"
"github.com/stretchr/testify/assert"
)
func TestAzureFile(t *testing.T) {
t.Run("Read", func(t *testing.T) {
data := []byte("Test data for Read.")
azureFile := NewAzureFile(io.NopCloser(bytes.NewReader(data)))
buffer := make([]byte, 4)
n, err := azureFile.Read(buffer)
assert.NoError(t, err)
assert.Equal(t, 4, n)
assert.Equal(t, "Test", string(buffer))
buffer = make([]byte, 6)
n, err = azureFile.Read(buffer)
assert.NoError(t, err)
assert.Equal(t, 6, n)
assert.Equal(t, " data ", string(buffer))
})
t.Run("ReadAt", func(t *testing.T) {
data := []byte("Test data for ReadAt.")
azureFile := NewAzureFile(io.NopCloser(bytes.NewReader(data)))
buffer := make([]byte, 4)
n, err := azureFile.ReadAt(buffer, 5)
assert.NoError(t, err)
assert.Equal(t, 4, n)
assert.Equal(t, "data", string(buffer))
})
t.Run("Seek start", func(t *testing.T) {
data := []byte("Test data for Seek.")
azureFile := NewAzureFile(io.NopCloser(bytes.NewReader(data)))
offset, err := azureFile.Seek(10, io.SeekStart)
assert.NoError(t, err)
assert.Equal(t, int64(10), offset)
buffer := make([]byte, 4)
n, err := azureFile.Read(buffer)
assert.NoError(t, err)
assert.Equal(t, 4, n)
assert.Equal(t, "for ", string(buffer))
})
t.Run("Seek current", func(t *testing.T) {
data := []byte("Test data for Seek.")
azureFile := NewAzureFile(io.NopCloser(bytes.NewReader(data)))
buffer := make([]byte, 4)
n, err := azureFile.Read(buffer)
assert.NoError(t, err)
assert.Equal(t, 4, n)
assert.Equal(t, "Test", string(buffer))
offset, err := azureFile.Seek(10, io.SeekCurrent)
assert.NoError(t, err)
assert.Equal(t, int64(14), offset)
buffer = make([]byte, 4)
n, err = azureFile.Read(buffer)
assert.NoError(t, err)
assert.Equal(t, 4, n)
assert.Equal(t, "Seek", string(buffer))
})
}