diff --git a/internal/storage/remote_chunk_manager.go b/internal/storage/remote_chunk_manager.go index ce981c5421..dbccc33cdf 100644 --- a/internal/storage/remote_chunk_manager.go +++ b/internal/storage/remote_chunk_manager.go @@ -22,6 +22,7 @@ import ( "io" "net/http" "strings" + "syscall" "github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror" @@ -64,6 +65,8 @@ type RemoteChunkManager struct { // ctx context.Context bucketName string rootPath string + + readRetryAttempts uint } var _ ChunkManager = (*RemoteChunkManager)(nil) @@ -82,9 +85,10 @@ func NewRemoteChunkManager(ctx context.Context, c *objectstorage.Config) (*Remot return nil, err } mcm := &RemoteChunkManager{ - client: client, - bucketName: c.BucketName, - rootPath: strings.TrimLeft(c.RootPath, "/"), + client: client, + bucketName: c.BucketName, + rootPath: strings.TrimLeft(c.RootPath, "/"), + readRetryAttempts: c.ReadRetryAttempts, } log.Info("remote chunk manager init success.", zap.String("remote", c.CloudProvider), zap.String("bucketname", c.BucketName), zap.String("root", mcm.RootPath())) return mcm, nil @@ -93,9 +97,10 @@ func NewRemoteChunkManager(ctx context.Context, c *objectstorage.Config) (*Remot // NewRemoteChunkManagerForTesting is used for testing. func NewRemoteChunkManagerForTesting(c *minio.Client, bucket string, rootPath string) *RemoteChunkManager { mcm := &RemoteChunkManager{ - client: &MinioObjectStorage{c}, - bucketName: bucket, - rootPath: rootPath, + client: &MinioObjectStorage{c}, + bucketName: bucket, + rootPath: rootPath, + readRetryAttempts: 10, } return mcm } @@ -133,13 +138,21 @@ func (mcm *RemoteChunkManager) Reader(ctx context.Context, filePath string) (Fil } func (mcm *RemoteChunkManager) Size(ctx context.Context, filePath string) (int64, error) { - objectInfo, err := mcm.getObjectSize(ctx, mcm.bucketName, filePath) - if err != nil { - log.Warn("failed to stat object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err)) - return 0, err - } - - return objectInfo, nil + var objectInfo int64 + var err error + err = retry.Handle(ctx, func() (bool, error) { + objectInfo, err = mcm.getObjectSize(ctx, mcm.bucketName, filePath) + if err == nil { + return false, nil + } + log.Warn("failed to get object size", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err)) + err = checkObjectStorageError(filePath, err) + if merr.IsRetryableErr(err) { + return true, err + } + return false, err + }, retry.Attempts(mcm.readRetryAttempts)) + return objectInfo, err } // Write writes the data to minio storage. @@ -212,7 +225,7 @@ func (mcm *RemoteChunkManager) Read(ctx context.Context, filePath string) ([]byt } metrics.PersistentDataKvSize.WithLabelValues(metrics.DataGetLabel).Observe(float64(size)) return nil - }, retry.Attempts(3), retry.RetryErr(merr.IsRetryableErr)) + }, retry.Attempts(mcm.readRetryAttempts), retry.RetryErr(merr.IsRetryableErr)) if err != nil { return nil, err } @@ -400,6 +413,10 @@ func (mcm *RemoteChunkManager) removeObject(ctx context.Context, bucketName, obj return err } +func ToMilvusIoError(fileName string, err error) error { + return checkObjectStorageError(fileName, err) +} + func checkObjectStorageError(fileName string, err error) error { if err == nil { return nil @@ -410,18 +427,32 @@ func checkObjectStorageError(fileName string, err error) error { if err.ErrorCode == string(bloberror.BlobNotFound) { return merr.WrapErrIoKeyNotFound(fileName, err.Error()) } + if err.ErrorCode == string(bloberror.ServerBusy) { + return merr.WrapErrIoTooManyRequests(fileName, err) + } return merr.WrapErrIoFailed(fileName, err) case minio.ErrorResponse: if err.Code == "NoSuchKey" { return merr.WrapErrIoKeyNotFound(fileName, err.Error()) } + if err.Code == "SlowDown" || err.Code == "TooManyRequestsException" { + return merr.WrapErrIoTooManyRequests(fileName, err) + } return merr.WrapErrIoFailed(fileName, err) case *googleapi.Error: if err.Code == http.StatusNotFound { return merr.WrapErrIoKeyNotFound(fileName, err.Error()) } + if err.Code == http.StatusTooManyRequests { + return merr.WrapErrIoTooManyRequests(fileName, err) + } return merr.WrapErrIoFailed(fileName, err) } + // syscall.ECONNRESET is typically triggered by rate limiting, with errors such as: `read tcp xxxxx:xx->xxxxxx:xxxxx: read: connection reset by peer` + // so we need to wrap it as ErrIoTooManyRequests and trigger retry. + if errors.Is(err, syscall.ECONNRESET) { + return merr.WrapErrIoTooManyRequests(fileName, err) + } if err == io.ErrUnexpectedEOF { return merr.WrapErrIoUnexpectEOF(fileName, err) } diff --git a/internal/storage/remote_chunk_manager_test.go b/internal/storage/remote_chunk_manager_test.go index 9d073529af..549b5af585 100644 --- a/internal/storage/remote_chunk_manager_test.go +++ b/internal/storage/remote_chunk_manager_test.go @@ -18,12 +18,19 @@ package storage import ( "context" + "io" + "net/http" "path" + "syscall" "testing" + "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror" "github.com/cockroachdb/errors" + "github.com/minio/minio-go/v7" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/api/googleapi" "github.com/milvus-io/milvus/pkg/v2/objectstorage" "github.com/milvus-io/milvus/pkg/v2/util/merr" @@ -973,3 +980,87 @@ func TestAzureChunkManager(t *testing.T) { assert.True(t, errors.Is(err, merr.ErrIoKeyNotFound)) }) } + +func TestToMilvusIoError(t *testing.T) { + fileName := "test_file" + + t.Run("nil error", func(t *testing.T) { + err := ToMilvusIoError(fileName, nil) + assert.NoError(t, err) + }) + + t.Run("io.ErrUnexpectedEOF", func(t *testing.T) { + err := ToMilvusIoError(fileName, io.ErrUnexpectedEOF) + assert.ErrorIs(t, err, merr.ErrIoUnexpectEOF) + }) + + t.Run("syscall.ECONNRESET", func(t *testing.T) { + err := ToMilvusIoError(fileName, syscall.ECONNRESET) + assert.ErrorIs(t, err, merr.ErrIoTooManyRequests) + }) + + t.Run("generic error", func(t *testing.T) { + err := ToMilvusIoError(fileName, errors.New("some error")) + assert.ErrorIs(t, err, merr.ErrIoFailed) + }) + + t.Run("minio NoSuchKey", func(t *testing.T) { + minioErr := minio.ErrorResponse{Code: "NoSuchKey"} + err := ToMilvusIoError(fileName, minioErr) + assert.ErrorIs(t, err, merr.ErrIoKeyNotFound) + }) + + t.Run("minio SlowDown", func(t *testing.T) { + minioErr := minio.ErrorResponse{Code: "SlowDown"} + err := ToMilvusIoError(fileName, minioErr) + assert.ErrorIs(t, err, merr.ErrIoTooManyRequests) + }) + + t.Run("minio TooManyRequestsException", func(t *testing.T) { + minioErr := minio.ErrorResponse{Code: "TooManyRequestsException"} + err := ToMilvusIoError(fileName, minioErr) + assert.ErrorIs(t, err, merr.ErrIoTooManyRequests) + }) + + t.Run("minio other error", func(t *testing.T) { + minioErr := minio.ErrorResponse{Code: "AccessDenied"} + err := ToMilvusIoError(fileName, minioErr) + assert.ErrorIs(t, err, merr.ErrIoFailed) + }) + + t.Run("azure BlobNotFound", func(t *testing.T) { + azureErr := &azcore.ResponseError{ErrorCode: string(bloberror.BlobNotFound)} + err := ToMilvusIoError(fileName, azureErr) + assert.ErrorIs(t, err, merr.ErrIoKeyNotFound) + }) + + t.Run("azure ServerBusy", func(t *testing.T) { + azureErr := &azcore.ResponseError{ErrorCode: string(bloberror.ServerBusy)} + err := ToMilvusIoError(fileName, azureErr) + assert.ErrorIs(t, err, merr.ErrIoTooManyRequests) + }) + + t.Run("azure other error", func(t *testing.T) { + azureErr := &azcore.ResponseError{ErrorCode: "SomeOtherError"} + err := ToMilvusIoError(fileName, azureErr) + assert.ErrorIs(t, err, merr.ErrIoFailed) + }) + + t.Run("googleapi NotFound", func(t *testing.T) { + googleErr := &googleapi.Error{Code: http.StatusNotFound} + err := ToMilvusIoError(fileName, googleErr) + assert.ErrorIs(t, err, merr.ErrIoKeyNotFound) + }) + + t.Run("googleapi TooManyRequests", func(t *testing.T) { + googleErr := &googleapi.Error{Code: http.StatusTooManyRequests} + err := ToMilvusIoError(fileName, googleErr) + assert.ErrorIs(t, err, merr.ErrIoTooManyRequests) + }) + + t.Run("googleapi other error", func(t *testing.T) { + googleErr := &googleapi.Error{Code: http.StatusForbidden} + err := ToMilvusIoError(fileName, googleErr) + assert.ErrorIs(t, err, merr.ErrIoFailed) + }) +} diff --git a/internal/util/importutilv2/common/mock_reader.go b/internal/util/importutilv2/common/mock_reader.go new file mode 100644 index 0000000000..9d5149f0fb --- /dev/null +++ b/internal/util/importutilv2/common/mock_reader.go @@ -0,0 +1,82 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package common + +import ( + "io" + "strings" + + "github.com/milvus-io/milvus/internal/storage" +) + +// mockFileReader is a mock implementation of storage.FileReader for testing. +type mockFileReader struct { + io.Reader + io.Closer + io.ReaderAt + io.Seeker + size int64 + + err error + errCount int +} + +func NewMockReader(content string) storage.FileReader { + reader := strings.NewReader(content) + return &mockFileReader{ + Reader: reader, + Closer: io.NopCloser(reader), + ReaderAt: reader, + Seeker: reader, + size: int64(len(content)), + err: nil, + errCount: 0, + } +} + +func CustomMockReader(reader io.Reader) storage.FileReader { + return &mockFileReader{ + Reader: reader, + Closer: io.NopCloser(reader), + size: 0, + } +} + +func newErrorMockReader(content string, err error, errCount int) storage.FileReader { + reader := strings.NewReader(content) + return &mockFileReader{ + Reader: reader, + Closer: io.NopCloser(reader), + ReaderAt: reader, + Seeker: reader, + size: int64(len(content)), + err: err, + errCount: errCount, + } +} + +func (m *mockFileReader) Read(p []byte) (int, error) { + if m.errCount > 0 { + m.errCount-- + return 0, m.err + } + return m.Reader.Read(p) +} + +func (m *mockFileReader) Size() (int64, error) { + return m.size, nil +} diff --git a/internal/util/importutilv2/common/retryable_reader.go b/internal/util/importutilv2/common/retryable_reader.go new file mode 100644 index 0000000000..37d308a995 --- /dev/null +++ b/internal/util/importutilv2/common/retryable_reader.go @@ -0,0 +1,79 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package common + +import ( + "context" + "io" + + "github.com/cockroachdb/errors" + "go.uber.org/zap" + + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/v2/log" + "github.com/milvus-io/milvus/pkg/v2/util/merr" + "github.com/milvus-io/milvus/pkg/v2/util/paramtable" + "github.com/milvus-io/milvus/pkg/v2/util/retry" +) + +// RetryableReader is a wrapper around a FileReader that retries reads on errors. +type RetryableReader interface { + storage.FileReader +} + +// retryableReader is the implementation of RetryableReader. +type retryableReader struct { + storage.FileReader + ctx context.Context + path string + retryAttempts uint +} + +// NewRetryableReader creates a new RetryableReader. +func NewRetryableReader(ctx context.Context, path string, reader storage.FileReader) RetryableReader { + return &retryableReader{ + FileReader: reader, + ctx: ctx, + path: path, + retryAttempts: paramtable.Get().CommonCfg.StorageReadRetryAttempts.GetAsUint(), + } +} + +// Read reads from the underlying FileReader and retries on errors. +func (r *retryableReader) Read(p []byte) (int, error) { + var n int + var err error + err = retry.Handle(r.ctx, func() (bool, error) { + n, err = r.FileReader.Read(p) + if err == nil { + return false, nil + } + if errors.Is(err, io.EOF) { + return false, err + } + log.Ctx(r.ctx).Warn("retryable reader read failed", + zap.String("path", r.path), + zap.Error(err), + ) + err = storage.ToMilvusIoError(r.path, err) + if merr.IsRetryableErr(err) { + return true, err + } + return false, err + }, retry.Attempts(r.retryAttempts)) + return n, err +} diff --git a/internal/util/importutilv2/common/retryable_reader_test.go b/internal/util/importutilv2/common/retryable_reader_test.go new file mode 100644 index 0000000000..f419283f97 --- /dev/null +++ b/internal/util/importutilv2/common/retryable_reader_test.go @@ -0,0 +1,94 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package common + +import ( + "context" + "io" + "math" + "testing" + + "github.com/minio/minio-go/v7" + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/pkg/v2/util/merr" + "github.com/milvus-io/milvus/pkg/v2/util/paramtable" +) + +func init() { + paramtable.Init() +} + +func TestRetryableReader_ReadSuccess(t *testing.T) { + ctx := context.Background() + path := "/test/path" + expectedData := "hello world" + + mockReader := NewMockReader(expectedData) + + reader := NewRetryableReader(ctx, path, mockReader) + + buf := make([]byte, len(expectedData)) + n, err := reader.Read(buf) + assert.NoError(t, err) + assert.Equal(t, len(expectedData), n) + assert.Equal(t, expectedData, string(buf)) + + buf = make([]byte, len(expectedData)) + n, err = reader.Read(buf) + assert.ErrorIs(t, err, io.EOF) + assert.Equal(t, 0, n) +} + +func TestRetryableReader_ReadWithRetryableError(t *testing.T) { + ctx := context.Background() + path := "/test/path" + expectedData := "data after retry" + + mockReader := newErrorMockReader(expectedData, + minio.ErrorResponse{ + Code: "TooManyRequestsException", + }, + 3, + ) + + reader := NewRetryableReader(ctx, path, mockReader) + buf := make([]byte, len(expectedData)) + n, err := reader.Read(buf) + + assert.NoError(t, err) + assert.Equal(t, len(expectedData), n) + assert.Equal(t, expectedData, string(buf)) +} + +func TestRetryableReader_ReadWithNonRetryableError(t *testing.T) { + ctx := context.Background() + path := "/test/path" + + mockReader := newErrorMockReader("", + merr.WrapErrIoFailed(path, io.ErrNoProgress), + math.MaxInt, + ) + + reader := NewRetryableReader(ctx, path, mockReader) + buf := make([]byte, 10) + n, err := reader.Read(buf) + + assert.Error(t, err) + assert.ErrorIs(t, err, merr.ErrIoFailed) + assert.Equal(t, 0, n) +} diff --git a/internal/util/importutilv2/csv/reader.go b/internal/util/importutilv2/csv/reader.go index 933036b316..0af85ae737 100644 --- a/internal/util/importutilv2/csv/reader.go +++ b/internal/util/importutilv2/csv/reader.go @@ -54,12 +54,13 @@ func NewReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.Co if err != nil { return nil, merr.WrapErrImportFailed(fmt.Sprintf("read csv file failed, path=%s, err=%s", path, err.Error())) } + retryableReader := common.NewRetryableReader(ctx, path, cmReader) count, err := common.EstimateReadCountPerBatch(bufferSize, schema) if err != nil { return nil, err } - csvReader := csv.NewReader(cmReader) + csvReader := csv.NewReader(retryableReader) csvReader.Comma = sep header, err := csvReader.Read() @@ -75,7 +76,7 @@ func NewReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.Co return &reader{ ctx: ctx, cm: cm, - cmr: cmReader, + cmr: retryableReader, schema: schema, cr: csvReader, parser: rowParser, diff --git a/internal/util/importutilv2/csv/reader_test.go b/internal/util/importutilv2/csv/reader_test.go index df83a74bc7..3b1b38da19 100644 --- a/internal/util/importutilv2/csv/reader_test.go +++ b/internal/util/importutilv2/csv/reader_test.go @@ -23,7 +23,6 @@ import ( "io" "math/rand" "os" - "strings" "testing" "github.com/stretchr/testify/mock" @@ -33,6 +32,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/storage" + importcommon "github.com/milvus-io/milvus/internal/util/importutilv2/common" "github.com/milvus-io/milvus/internal/util/testutil" "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/objectstorage" @@ -40,16 +40,8 @@ import ( "github.com/milvus-io/milvus/pkg/v2/util/paramtable" ) -type mockReader struct { - io.Reader - io.Closer - io.ReaderAt - io.Seeker - size int64 -} - -func (mr *mockReader) Size() (int64, error) { - return mr.size, nil +func init() { + paramtable.Init() } type ReaderSuite struct { @@ -60,10 +52,6 @@ type ReaderSuite struct { vecDataType schemapb.DataType } -func (suite *ReaderSuite) SetupSuite() { - paramtable.Get().Init(paramtable.NewBaseTable()) -} - func (suite *ReaderSuite) SetupTest() { suite.numRows = 100 suite.pkDataType = schemapb.DataType_Int64 @@ -258,7 +246,7 @@ func (suite *ReaderSuite) TestError() { if ioErr != nil { return nil, ioErr } else { - r := &mockReader{Reader: strings.NewReader(content)} + r := importcommon.NewMockReader(content) return r, nil } }) @@ -291,7 +279,7 @@ func (suite *ReaderSuite) TestError() { testReadErr := func(schema *schemapb.CollectionSchema, content string) { cm := mocks.NewChunkManager(suite.T()) cm.EXPECT().Reader(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, s string) (storage.FileReader, error) { - r := &mockReader{Reader: strings.NewReader(content)} + r := importcommon.NewMockReader(content) return r, nil }) cm.EXPECT().Size(mock.Anything, mock.Anything).Return(128, nil) @@ -331,11 +319,7 @@ func (suite *ReaderSuite) TestReadLoop() { cm := mocks.NewChunkManager(suite.T()) cm.EXPECT().Reader(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, s string) (storage.FileReader, error) { - reader := strings.NewReader(content) - r := &mockReader{ - Reader: reader, - Closer: io.NopCloser(reader), - } + r := importcommon.NewMockReader(content) return r, nil }) cm.EXPECT().Size(mock.Anything, mock.Anything).Return(128, nil) @@ -398,8 +382,7 @@ func (suite *ReaderSuite) TestAllowInsertAutoID_KeepUserPK() { { cm := mocks.NewChunkManager(suite.T()) cm.EXPECT().Reader(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, s string) (storage.FileReader, error) { - reader := strings.NewReader(content) - r := &mockReader{Reader: reader, Closer: io.NopCloser(reader)} + r := importcommon.NewMockReader(content) return r, nil }) _, err := NewReader(context.Background(), cm, schema, "dummy path", 1024, ',', "") @@ -412,8 +395,7 @@ func (suite *ReaderSuite) TestAllowInsertAutoID_KeepUserPK() { schema.Properties = []*commonpb.KeyValuePair{{Key: common.AllowInsertAutoIDKey, Value: "true"}} cm := mocks.NewChunkManager(suite.T()) cm.EXPECT().Reader(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, s string) (storage.FileReader, error) { - reader := strings.NewReader(content) - r := &mockReader{Reader: reader, Closer: io.NopCloser(reader)} + r := importcommon.NewMockReader(content) return r, nil }) reader, err := NewReader(context.Background(), cm, schema, "dummy path", 1024, ',', "") diff --git a/internal/util/importutilv2/csv/row_parser_test.go b/internal/util/importutilv2/csv/row_parser_test.go index 3a3380354c..aa9f14fa64 100644 --- a/internal/util/importutilv2/csv/row_parser_test.go +++ b/internal/util/importutilv2/csv/row_parser_test.go @@ -31,7 +31,6 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/json" "github.com/milvus-io/milvus/pkg/v2/common" - "github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) @@ -51,10 +50,6 @@ type testCase struct { dontCheckDynamic bool } -func (suite *RowParserSuite) SetupSuite() { - paramtable.Get().Init(paramtable.NewBaseTable()) -} - func (suite *RowParserSuite) SetupTest() { // default suite params suite.nullKey = "" diff --git a/internal/util/importutilv2/json/reader.go b/internal/util/importutilv2/json/reader.go index 09e36c2606..0ca3bdaa51 100644 --- a/internal/util/importutilv2/json/reader.go +++ b/internal/util/importutilv2/json/reader.go @@ -60,6 +60,7 @@ func newReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.Co if err != nil { return nil, merr.WrapErrImportFailed(fmt.Sprintf("read json file failed, path=%s, err=%s", path, err.Error())) } + retryableReader := common.NewRetryableReader(ctx, path, r) count, err := common.EstimateReadCountPerBatch(bufferSize, schema) if err != nil { return nil, err @@ -67,11 +68,11 @@ func newReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.Co reader := &reader{ ctx: ctx, cm: cm, - cmr: r, + cmr: retryableReader, schema: schema, fileSize: atomic.NewInt64(0), filePath: path, - dec: json.NewDecoder(r), + dec: json.NewDecoder(retryableReader), bufferSize: bufferSize, count: count, isLinesFormat: isLinesFormat, diff --git a/internal/util/importutilv2/json/reader_test.go b/internal/util/importutilv2/json/reader_test.go index a74e518c32..e71b702215 100644 --- a/internal/util/importutilv2/json/reader_test.go +++ b/internal/util/importutilv2/json/reader_test.go @@ -19,9 +19,7 @@ package json import ( "context" "encoding/json" - "io" "math" - "strings" "testing" "github.com/stretchr/testify/mock" @@ -31,6 +29,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/storage" + importcommon "github.com/milvus-io/milvus/internal/util/importutilv2/common" "github.com/milvus-io/milvus/internal/util/nullutil" "github.com/milvus-io/milvus/internal/util/testutil" "github.com/milvus-io/milvus/pkg/v2/common" @@ -38,16 +37,8 @@ import ( "github.com/milvus-io/milvus/pkg/v2/util/paramtable" ) -type mockReader struct { - io.Reader - io.Closer - io.ReaderAt - io.Seeker - size int64 -} - -func (mr *mockReader) Size() (int64, error) { - return mr.size, nil +func init() { + paramtable.Init() } type ReaderSuite struct { @@ -58,10 +49,6 @@ type ReaderSuite struct { vecDataType schemapb.DataType } -func (suite *ReaderSuite) SetupSuite() { - paramtable.Get().Init(paramtable.NewBaseTable()) -} - func (suite *ReaderSuite) SetupTest() { // default suite params suite.numRows = 100 @@ -145,11 +132,7 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data cm := mocks.NewChunkManager(suite.T()) cm.EXPECT().Reader(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, s string) (storage.FileReader, error) { - reader := strings.NewReader(string(jsonBytes)) - r := &mockReader{ - Reader: reader, - Closer: io.NopCloser(reader), - } + r := importcommon.NewMockReader(string(jsonBytes)) return r, nil }) cm.EXPECT().Size(mock.Anything, "mockPath").Return(128, nil) @@ -236,7 +219,7 @@ func (suite *ReaderSuite) runWithDefaultValue(dataType schemapb.DataType, elemTy cm := mocks.NewChunkManager(suite.T()) cm.EXPECT().Reader(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, s string) (storage.FileReader, error) { - r := &mockReader{Reader: strings.NewReader(string(jsonBytes))} + r := importcommon.NewMockReader(string(jsonBytes)) return r, nil }) reader, err := NewReader(context.Background(), cm, schema, "mockPath", math.MaxInt) @@ -341,8 +324,8 @@ func (suite *ReaderSuite) TestDecodeError() { cm := mocks.NewChunkManager(suite.T()) cm.EXPECT().Reader(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, s string) (storage.FileReader, error) { - r := &mockReader{Reader: strings.NewReader(jsonContent)} - return r, ioErr + r := importcommon.NewMockReader(jsonContent) + return r, nil }) var reader *reader var err error @@ -436,7 +419,7 @@ func (suite *ReaderSuite) TestReadCount() { cm := mocks.NewChunkManager(suite.T()) cm.EXPECT().Reader(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, s string) (storage.FileReader, error) { - r := &mockReader{Reader: strings.NewReader(string(jsonBytes))} + r := importcommon.NewMockReader(string(jsonBytes)) return r, nil }) @@ -503,7 +486,7 @@ func (suite *ReaderSuite) TestAllowInsertAutoID_KeepUserPK() { { cm := mocks.NewChunkManager(suite.T()) cm.EXPECT().Reader(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, s string) (storage.FileReader, error) { - r := &mockReader{Reader: strings.NewReader(string(jsonBytes))} + r := importcommon.NewMockReader(string(jsonBytes)) return r, nil }) reader, err := NewReader(context.Background(), cm, schema, "mockPath", math.MaxInt) @@ -518,7 +501,7 @@ func (suite *ReaderSuite) TestAllowInsertAutoID_KeepUserPK() { schema.Properties = []*commonpb.KeyValuePair{{Key: common.AllowInsertAutoIDKey, Value: "true"}} cm := mocks.NewChunkManager(suite.T()) cm.EXPECT().Reader(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, s string) (storage.FileReader, error) { - r := &mockReader{Reader: strings.NewReader(string(jsonBytes))} + r := importcommon.NewMockReader(string(jsonBytes)) return r, nil }) reader, err := NewReader(context.Background(), cm, schema, "mockPath", math.MaxInt) diff --git a/internal/util/importutilv2/json/row_parser_test.go b/internal/util/importutilv2/json/row_parser_test.go index 8403191e2e..f2ce4c6a10 100644 --- a/internal/util/importutilv2/json/row_parser_test.go +++ b/internal/util/importutilv2/json/row_parser_test.go @@ -30,7 +30,6 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/json" "github.com/milvus-io/milvus/pkg/v2/common" - "github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) @@ -48,10 +47,6 @@ type testCase struct { dontCheckDynamic bool } -func (suite *RowParserSuite) SetupSuite() { - paramtable.Get().Init(paramtable.NewBaseTable()) -} - func (suite *RowParserSuite) SetupTest() { // default suite params suite.autoID = true diff --git a/internal/util/importutilv2/numpy/field_reader_test.go b/internal/util/importutilv2/numpy/field_reader_test.go index c8077d05b8..d035f0d92c 100644 --- a/internal/util/importutilv2/numpy/field_reader_test.go +++ b/internal/util/importutilv2/numpy/field_reader_test.go @@ -32,8 +32,13 @@ import ( "github.com/milvus-io/milvus/internal/util/testutil" "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/util/merr" + "github.com/milvus-io/milvus/pkg/v2/util/paramtable" ) +func init() { + paramtable.Init() +} + func encodeToGB2312(input string) ([]byte, error) { encoder := simplifiedchinese.GB18030.NewEncoder() // GB18030 is compatible with GB2312. var buf bytes.Buffer diff --git a/internal/util/importutilv2/numpy/reader_test.go b/internal/util/importutilv2/numpy/reader_test.go index 4c6657b40a..7a2baaf6cc 100644 --- a/internal/util/importutilv2/numpy/reader_test.go +++ b/internal/util/importutilv2/numpy/reader_test.go @@ -36,10 +36,10 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/storage" + importcommon "github.com/milvus-io/milvus/internal/util/importutilv2/common" "github.com/milvus-io/milvus/internal/util/testutil" "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/util/merr" - "github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) @@ -47,18 +47,6 @@ const ( dim = 8 ) -type mockReader struct { - io.Reader - io.Closer - io.ReaderAt - io.Seeker - size int64 -} - -func (mr *mockReader) Size() (int64, error) { - return mr.size, nil -} - type ReaderSuite struct { suite.Suite @@ -67,10 +55,6 @@ type ReaderSuite struct { vecDataType schemapb.DataType } -func (suite *ReaderSuite) SetupSuite() { - paramtable.Get().Init(paramtable.NewBaseTable()) -} - func (suite *ReaderSuite) SetupTest() { // default suite params suite.numRows = 100 @@ -220,10 +204,7 @@ func (suite *ReaderSuite) run(dt schemapb.DataType) { dataType := fieldIDToField[fieldID].GetDataType() reader, err := createReader(fieldData, dataType) suite.NoError(err) - cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{ - Reader: reader, - Closer: io.NopCloser(reader), - }, nil) + cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(importcommon.CustomMockReader(reader), nil) cm.EXPECT().Size(mock.Anything, files[fieldID]).Return(128, nil) } @@ -317,9 +298,7 @@ func (suite *ReaderSuite) failRun(dt schemapb.DataType, isDynamic bool) { dataType := fieldIDToField[fieldID].GetDataType() reader, err := createReader(fieldData, dataType) suite.NoError(err) - cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{ - Reader: reader, - }, nil) + cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(importcommon.CustomMockReader(reader), nil) } reader, err := NewReader(context.Background(), cm, schema, lo.Values(files), math.MaxInt) @@ -394,9 +373,7 @@ func (suite *ReaderSuite) runNullable(dt schemapb.DataType, hasFile bool) { dataType := fieldIDToField[fieldID].GetDataType() reader, err := createReader(fieldData, dataType) suite.NoError(err) - cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{ - Reader: reader, - }, nil) + cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(importcommon.CustomMockReader(reader), nil) } reader, err := NewReader(context.Background(), cm, schema, lo.Values(files), math.MaxInt) diff --git a/internal/util/importutilv2/numpy/util.go b/internal/util/importutilv2/numpy/util.go index 466ff3ade4..9ac54d8cb4 100644 --- a/internal/util/importutilv2/numpy/util.go +++ b/internal/util/importutilv2/numpy/util.go @@ -35,6 +35,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/importutilv2/common" "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" @@ -89,7 +90,8 @@ func CreateReaders(ctx context.Context, cm storage.ChunkManager, schema *schemap return nil, merr.WrapErrImportFailed( fmt.Sprintf("failed to read the file '%s', error: %s", path, err.Error())) } - readers[field.GetFieldID()] = reader + retryableReader := common.NewRetryableReader(ctx, path, reader) + readers[field.GetFieldID()] = retryableReader readFields[field.GetName()] = field.GetFieldID() } diff --git a/internal/util/importutilv2/parquet/field_reader_test.go b/internal/util/importutilv2/parquet/field_reader_test.go index d518ab2e3f..97f35bdc9a 100644 --- a/internal/util/importutilv2/parquet/field_reader_test.go +++ b/internal/util/importutilv2/parquet/field_reader_test.go @@ -22,9 +22,14 @@ import ( "github.com/milvus-io/milvus/internal/util/testutil" "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/objectstorage" + "github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) +func init() { + paramtable.Init() +} + func TestInvalidUTF8(t *testing.T) { const ( fieldID = int64(100) diff --git a/internal/util/importutilv2/parquet/reader.go b/internal/util/importutilv2/parquet/reader.go index b4eadbdcac..7f3e18903d 100644 --- a/internal/util/importutilv2/parquet/reader.go +++ b/internal/util/importutilv2/parquet/reader.go @@ -59,6 +59,7 @@ func NewReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.Co if err != nil { return nil, err } + retryableReader := common.NewRetryableReader(ctx, path, cmReader) allFields := typeutil.GetAllFieldSchemas(schema) // Each ColumnReader consumes ReaderProperties.BufferSize memory independently. @@ -66,7 +67,7 @@ func NewReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.Co // to ensure total memory usage stays within the intended limit. columnReaderBufferSize := totalReadBufferSize / int64(len(allFields)) - r, err := file.NewParquetReader(cmReader, file.WithReadProps(&parquet.ReaderProperties{ + r, err := file.NewParquetReader(retryableReader, file.WithReadProps(&parquet.ReaderProperties{ BufferSize: columnReaderBufferSize, BufferedStreamEnabled: true, })) @@ -96,7 +97,7 @@ func NewReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.Co return &reader{ ctx: ctx, cm: cm, - cmr: cmReader, + cmr: retryableReader, schema: schema, fileSize: atomic.NewInt64(0), path: path, diff --git a/internal/util/importutilv2/parquet/reader_test.go b/internal/util/importutilv2/parquet/reader_test.go index 678f22d0e7..c6d9c11342 100644 --- a/internal/util/importutilv2/parquet/reader_test.go +++ b/internal/util/importutilv2/parquet/reader_test.go @@ -46,7 +46,6 @@ import ( "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/objectstorage" "github.com/milvus-io/milvus/pkg/v2/util/merr" - "github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) @@ -62,10 +61,6 @@ type ReaderSuite struct { vecDataType schemapb.DataType } -func (s *ReaderSuite) SetupSuite() { - paramtable.Get().Init(paramtable.NewBaseTable()) -} - func (s *ReaderSuite) SetupTest() { // default suite params s.numRows = 100 diff --git a/pkg/objectstorage/options.go b/pkg/objectstorage/options.go index 1141f4a77b..40ec42e341 100644 --- a/pkg/objectstorage/options.go +++ b/pkg/objectstorage/options.go @@ -1,5 +1,7 @@ package objectstorage +import "github.com/milvus-io/milvus/pkg/v2/util/paramtable" + // Config for setting params used by chunk manager client. type Config struct { Address string @@ -18,10 +20,13 @@ type Config struct { RequestTimeoutMs int64 GcpCredentialJSON string GcpNativeWithoutAuth bool // used for Unit Testing + ReadRetryAttempts uint } func NewDefaultConfig() *Config { - return &Config{} + return &Config{ + ReadRetryAttempts: paramtable.Get().CommonCfg.StorageReadRetryAttempts.GetAsUint(), + } } // Option is used to Config the retry function. diff --git a/pkg/util/merr/errors.go b/pkg/util/merr/errors.go index 1c1b5a4d30..9b9ae8f38e 100644 --- a/pkg/util/merr/errors.go +++ b/pkg/util/merr/errors.go @@ -132,9 +132,10 @@ var ( ErrNodeStateUnexpected = newMilvusError("node state unexpected", 906, false) // IO related - ErrIoKeyNotFound = newMilvusError("key not found", 1000, false) - ErrIoFailed = newMilvusError("IO failed", 1001, false) - ErrIoUnexpectEOF = newMilvusError("unexpected EOF", 1002, true) + ErrIoKeyNotFound = newMilvusError("key not found", 1000, false) + ErrIoFailed = newMilvusError("IO failed", 1001, false) + ErrIoUnexpectEOF = newMilvusError("unexpected EOF", 1002, true) + ErrIoTooManyRequests = newMilvusError("too many requests", 1003, true) // Parameter related ErrParameterInvalid = newMilvusError("invalid parameter", 1100, false) diff --git a/pkg/util/merr/utils.go b/pkg/util/merr/utils.go index ccc8825053..1dda3d0211 100644 --- a/pkg/util/merr/utils.go +++ b/pkg/util/merr/utils.go @@ -944,6 +944,13 @@ func WrapErrIoUnexpectEOF(key string, err error) error { return wrapFieldsWithDesc(ErrIoUnexpectEOF, err.Error(), value("key", key)) } +func WrapErrIoTooManyRequests(key string, err error) error { + if err == nil { + return nil + } + return wrapFieldsWithDesc(ErrIoTooManyRequests, err.Error(), value("key", key)) +} + // Parameter related func WrapErrParameterInvalid[T any](expected, actual T, msg ...string) error { err := wrapFields(ErrParameterInvalid, diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 2942ad9e1f..f71b9e4e4c 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -294,8 +294,10 @@ type commonConfig struct { Stv2SplitAvgSizeThreshold ParamItem `refreshable:"true"` UseLoonFFI ParamItem `refreshable:"true"` - StoragePathPrefix ParamItem `refreshable:"false"` - StorageZstdConcurrency ParamItem `refreshable:"false"` + StoragePathPrefix ParamItem `refreshable:"false"` + StorageZstdConcurrency ParamItem `refreshable:"false"` + StorageReadRetryAttempts ParamItem `refreshable:"true"` + TTMsgEnabled ParamItem `refreshable:"true"` TraceLogMode ParamItem `refreshable:"true"` BloomFilterSize ParamItem `refreshable:"true"` @@ -1065,6 +1067,15 @@ The default value is 1, which is enough for most cases.`, } p.StorageZstdConcurrency.Init(base.mgr) + p.StorageReadRetryAttempts = ParamItem{ + Key: "common.storage.readRetryAttempts", + Version: "2.6.8", + DefaultValue: "10", + Doc: "The number of retry attempts for reading from object storage; only retryable errors will trigger a retry.", + Export: false, + } + p.StorageReadRetryAttempts.Init(base.mgr) + p.TTMsgEnabled = ParamItem{ Key: "common.ttMsgEnabled", Version: "2.3.2",