diff --git a/internal/storage/minio_chunk_manager.go b/internal/storage/minio_chunk_manager.go index 00c9f2e513..f011726c48 100644 --- a/internal/storage/minio_chunk_manager.go +++ b/internal/storage/minio_chunk_manager.go @@ -33,6 +33,7 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/retry" "github.com/milvus-io/milvus/pkg/util/timerecord" ) @@ -166,38 +167,46 @@ func (mcm *MinioChunkManager) Exist(ctx context.Context, filePath string) (bool, // Read reads the minio storage data if exists. func (mcm *MinioChunkManager) Read(ctx context.Context, filePath string) ([]byte, error) { - start := time.Now() - object, err := mcm.getMinioObject(ctx, mcm.bucketName, filePath, minio.GetObjectOptions{}) - if err != nil { - log.Warn("failed to get object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err)) - return nil, err - } - defer object.Close() + var data []byte + err := retry.Do(ctx, func() error { + start := time.Now() + object, err := mcm.getMinioObject(ctx, mcm.bucketName, filePath, minio.GetObjectOptions{}) + if err != nil { + log.Warn("failed to get object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err)) + return err + } + defer object.Close() - // Prefetch object data - var empty []byte - _, err = object.Read(empty) - err = checkObjectStorageError(filePath, err) + // Prefetch object data + var empty []byte + _, err = object.Read(empty) + err = checkObjectStorageError(filePath, err) + if err != nil { + log.Warn("failed to read object", zap.String("path", filePath), zap.Error(err)) + return err + } + + objectInfo, err := object.Stat() + err = checkObjectStorageError(filePath, err) + if err != nil { + log.Warn("failed to stat object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err)) + return err + } + + data, err = Read(object, objectInfo.Size) + err = checkObjectStorageError(filePath, err) + if err != nil { + log.Warn("failed to read object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err)) + return err + } + metrics.PersistentDataKvSize.WithLabelValues(metrics.DataGetLabel).Observe(float64(objectInfo.Size)) + metrics.PersistentDataRequestLatency.WithLabelValues(metrics.DataGetLabel).Observe(float64(time.Since(start).Milliseconds())) + return nil + }, retry.Attempts(3), retry.RetryErr(merr.IsRetryableErr)) if err != nil { - log.Warn("failed to read object", zap.String("path", filePath), zap.Error(err)) return nil, err } - objectInfo, err := object.Stat() - err = checkObjectStorageError(filePath, err) - if err != nil { - log.Warn("failed to stat object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err)) - return nil, err - } - - data, err := Read(object, objectInfo.Size) - err = checkObjectStorageError(filePath, err) - if err != nil { - log.Warn("failed to read object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err)) - return nil, err - } - metrics.PersistentDataKvSize.WithLabelValues(metrics.DataGetLabel).Observe(float64(objectInfo.Size)) - metrics.PersistentDataRequestLatency.WithLabelValues(metrics.DataGetLabel).Observe(float64(time.Since(start).Milliseconds())) return data, nil } diff --git a/internal/storage/remote_chunk_manager.go b/internal/storage/remote_chunk_manager.go index 48f098fbfa..9e631fce07 100644 --- a/internal/storage/remote_chunk_manager.go +++ b/internal/storage/remote_chunk_manager.go @@ -34,6 +34,7 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/retry" "github.com/milvus-io/milvus/pkg/util/timerecord" ) @@ -161,33 +162,41 @@ func (mcm *RemoteChunkManager) Exist(ctx context.Context, filePath string) (bool // Read reads the minio storage data if exists. func (mcm *RemoteChunkManager) Read(ctx context.Context, filePath string) ([]byte, error) { - object, err := mcm.getObject(ctx, mcm.bucketName, filePath, int64(0), int64(0)) - if err != nil { - log.Warn("failed to get object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err)) - return nil, err - } - defer object.Close() + var data []byte + err := retry.Do(ctx, func() error { + object, err := mcm.getObject(ctx, mcm.bucketName, filePath, int64(0), int64(0)) + if err != nil { + log.Warn("failed to get object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err)) + return err + } + defer object.Close() - // Prefetch object data - var empty []byte - _, err = object.Read(empty) - err = checkObjectStorageError(filePath, err) + // Prefetch object data + var empty []byte + _, err = object.Read(empty) + err = checkObjectStorageError(filePath, err) + if err != nil { + log.Warn("failed to read object", zap.String("path", filePath), zap.Error(err)) + return err + } + size, err := mcm.getObjectSize(ctx, mcm.bucketName, filePath) + if err != nil { + log.Warn("failed to stat object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err)) + return err + } + data, err = Read(object, size) + err = checkObjectStorageError(filePath, err) + if err != nil { + log.Warn("failed to read object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err)) + return err + } + metrics.PersistentDataKvSize.WithLabelValues(metrics.DataGetLabel).Observe(float64(size)) + return nil + }, retry.Attempts(3), retry.RetryErr(merr.IsRetryableErr)) if err != nil { - log.Warn("failed to read object", zap.String("path", filePath), zap.Error(err)) return nil, err } - size, err := mcm.getObjectSize(ctx, mcm.bucketName, filePath) - if err != nil { - log.Warn("failed to stat object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err)) - return nil, err - } - data, err := Read(object, size) - err = checkObjectStorageError(filePath, err) - if err != nil { - log.Warn("failed to read object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err)) - return nil, err - } - metrics.PersistentDataKvSize.WithLabelValues(metrics.DataGetLabel).Observe(float64(size)) + return data, nil } @@ -407,5 +416,8 @@ func checkObjectStorageError(fileName string, err error) error { } return merr.WrapErrIoFailed(fileName, err) } + if err == io.ErrUnexpectedEOF { + return merr.WrapErrIoUnexpectEOF(fileName, err) + } return merr.WrapErrIoFailed(fileName, err) } diff --git a/pkg/util/merr/errors.go b/pkg/util/merr/errors.go index 664d06441d..9d64880f3f 100644 --- a/pkg/util/merr/errors.go +++ b/pkg/util/merr/errors.go @@ -99,6 +99,7 @@ var ( // IO related ErrIoKeyNotFound = newMilvusError("key not found", 1000, false) ErrIoFailed = newMilvusError("IO failed", 1001, false) + ErrIoUnexpectEOF = newMilvusError("unexpected EOF", 1002, true) // Parameter related ErrParameterInvalid = newMilvusError("invalid parameter", 1100, false) diff --git a/pkg/util/merr/errors_test.go b/pkg/util/merr/errors_test.go index f2d5064dfc..cb1046bab7 100644 --- a/pkg/util/merr/errors_test.go +++ b/pkg/util/merr/errors_test.go @@ -124,6 +124,7 @@ func (s *ErrSuite) TestWrap() { // IO related s.ErrorIs(WrapErrIoKeyNotFound("test_key", "failed to read"), ErrIoKeyNotFound) s.ErrorIs(WrapErrIoFailed("test_key", os.ErrClosed), ErrIoFailed) + s.ErrorIs(WrapErrIoUnexpectEOF("test_key", os.ErrClosed), ErrIoUnexpectEOF) // Parameter related s.ErrorIs(WrapErrParameterInvalid(8, 1, "failed to create"), ErrParameterInvalid) diff --git a/pkg/util/merr/utils.go b/pkg/util/merr/utils.go index 228c0500c9..5479a69c26 100644 --- a/pkg/util/merr/utils.go +++ b/pkg/util/merr/utils.go @@ -774,6 +774,13 @@ func WrapErrIoFailedReason(reason string, msg ...string) error { return err } +func WrapErrIoUnexpectEOF(key string, err error) error { + if err == nil { + return nil + } + return wrapFieldsWithDesc(ErrIoUnexpectEOF, err.Error(), value("key", key)) +} + // Parameter related func WrapErrParameterInvalid[T any](expected, actual T, msg ...string) error { err := wrapFields(ErrParameterInvalid, diff --git a/pkg/util/retry/options.go b/pkg/util/retry/options.go index 765bd23bc6..80f00a9ffc 100644 --- a/pkg/util/retry/options.go +++ b/pkg/util/retry/options.go @@ -17,6 +17,7 @@ type config struct { attempts uint sleep time.Duration maxSleepTime time.Duration + isRetryErr func(err error) bool } func newDefaultConfig() *config { @@ -59,3 +60,9 @@ func MaxSleepTime(maxSleepTime time.Duration) Option { } } } + +func RetryErr(isRetryErr func(err error) bool) Option { + return func(c *config) { + c.isRetryErr = isRetryErr + } +} diff --git a/pkg/util/retry/retry.go b/pkg/util/retry/retry.go index afb01ab31b..9c5ff12e56 100644 --- a/pkg/util/retry/retry.go +++ b/pkg/util/retry/retry.go @@ -52,6 +52,9 @@ func Do(ctx context.Context, fn func() error, opts ...Option) error { } return err } + if c.isRetryErr != nil && !c.isRetryErr(err) { + return err + } deadline, ok := ctx.Deadline() if ok && time.Until(deadline) < c.sleep { diff --git a/pkg/util/retry/retry_test.go b/pkg/util/retry/retry_test.go index d0a2c501e4..60a722e44b 100644 --- a/pkg/util/retry/retry_test.go +++ b/pkg/util/retry/retry_test.go @@ -152,3 +152,33 @@ func TestWrap(t *testing.T) { assert.True(t, errors.Is(err2, merr.ErrSegmentNotFound)) assert.False(t, IsRecoverable(err2)) } + +func TestRetryErrorParam(t *testing.T) { + { + mockErr := errors.New("mock not retry error") + runTimes := 0 + err := Do(context.Background(), func() error { + runTimes++ + return mockErr + }, RetryErr(func(err error) bool { + return err != mockErr + })) + + assert.Error(t, err) + assert.Equal(t, 1, runTimes) + } + + { + mockErr := errors.New("mock retry error") + runTimes := 0 + err := Do(context.Background(), func() error { + runTimes++ + return mockErr + }, Attempts(3), RetryErr(func(err error) bool { + return err == mockErr + })) + + assert.Error(t, err) + assert.Equal(t, 3, runTimes) + } +}