From 5355153805f48df69c31590fd500cd86d1bb4900 Mon Sep 17 00:00:00 2001 From: Xiaofan <83447078+xiaofan-luan@users.noreply.github.com> Date: Mon, 16 May 2022 19:25:55 +0800 Subject: [PATCH] Minio Error is not Handled gracefully (#17003) Signed-off-by: xiaofan-luan --- internal/storage/local_chunk_manager.go | 49 +++++++--- internal/storage/local_chunk_manager_test.go | 30 ++++++ internal/storage/minio_chunk_manager.go | 95 ++++++++++++------- internal/storage/minio_chunk_manager_test.go | 44 ++++++++- internal/storage/types.go | 2 +- internal/storage/vector_chunk_manager.go | 2 +- internal/storage/vector_chunk_manager_test.go | 18 ++-- .../util/importutil/import_wrapper_test.go | 4 +- 8 files changed, 187 insertions(+), 57 deletions(-) diff --git a/internal/storage/local_chunk_manager.go b/internal/storage/local_chunk_manager.go index d7cc790ebb..ac8738816b 100644 --- a/internal/storage/local_chunk_manager.go +++ b/internal/storage/local_chunk_manager.go @@ -51,7 +51,12 @@ func NewLocalChunkManager(opts ...Option) *LocalChunkManager { // Path returns the path of local data if exists. func (lcm *LocalChunkManager) Path(filePath string) (string, error) { - if !lcm.Exist(filePath) { + exist, err := lcm.Exist(filePath) + if err != nil { + return "", err + } + + if !exist { return "", fmt.Errorf("local file cannot be found with filePath: %s", filePath) } absPath := path.Join(lcm.localPath, filePath) @@ -59,7 +64,11 @@ func (lcm *LocalChunkManager) Path(filePath string) (string, error) { } func (lcm *LocalChunkManager) Reader(filePath string) (FileReader, error) { - if !lcm.Exist(filePath) { + exist, err := lcm.Exist(filePath) + if err != nil { + return nil, err + } + if !exist { return nil, errors.New("local file cannot be found with filePath:" + filePath) } absPath := path.Join(lcm.localPath, filePath) @@ -70,17 +79,17 @@ func (lcm *LocalChunkManager) Reader(filePath string) (FileReader, error) { func (lcm *LocalChunkManager) Write(filePath string, content []byte) error { absPath := path.Join(lcm.localPath, filePath) dir := path.Dir(absPath) - if _, err := os.Stat(dir); os.IsNotExist(err) { + exist, err := lcm.Exist(dir) + if err != nil { + return err + } + if !exist { err := os.MkdirAll(dir, os.ModePerm) if err != nil { return err } } - err := ioutil.WriteFile(absPath, content, os.ModePerm) - if err != nil { - return err - } - return nil + return ioutil.WriteFile(absPath, content, os.ModePerm) } // MultiWrite writes the data to local storage. @@ -99,17 +108,25 @@ func (lcm *LocalChunkManager) MultiWrite(contents map[string][]byte) error { } // Exist checks whether chunk is saved to local storage. -func (lcm *LocalChunkManager) Exist(filePath string) bool { +func (lcm *LocalChunkManager) Exist(filePath string) (bool, error) { absPath := path.Join(lcm.localPath, filePath) - if _, err := os.Stat(absPath); errors.Is(err, os.ErrNotExist) { - return false + _, err := os.Stat(absPath) + if err != nil { + if os.IsNotExist(err) { + return false, nil + } + return false, err } - return true + return true, nil } // Read reads the local storage data if exists. func (lcm *LocalChunkManager) Read(filePath string) ([]byte, error) { - if !lcm.Exist(filePath) { + exist, err := lcm.Exist(filePath) + if err != nil { + return nil, err + } + if !exist { return nil, fmt.Errorf("file not exist: %s", filePath) } absPath := path.Join(lcm.localPath, filePath) @@ -202,7 +219,11 @@ func (lcm *LocalChunkManager) Size(filePath string) (int64, error) { } func (lcm *LocalChunkManager) Remove(filePath string) error { - if lcm.Exist(filePath) { + exist, err := lcm.Exist(filePath) + if err != nil { + return err + } + if exist { absPath := path.Join(lcm.localPath, filePath) err := os.RemoveAll(absPath) if err != nil { diff --git a/internal/storage/local_chunk_manager_test.go b/internal/storage/local_chunk_manager_test.go index 087e084fc3..3005245cad 100644 --- a/internal/storage/local_chunk_manager_test.go +++ b/internal/storage/local_chunk_manager_test.go @@ -370,4 +370,34 @@ func TestLocalCM(t *testing.T) { assert.Error(t, err) assert.Equal(t, p, "") }) + + t.Run("test Prefix", func(t *testing.T) { + testPrefix := "prefix" + + testCM := NewLocalChunkManager(RootPath(localPath)) + defer testCM.RemoveWithPrefix(testPrefix) + + pathB := path.Join("a", "b") + + key := path.Join(testPrefix, pathB) + value := []byte("a") + + err := testCM.Write(key, value) + assert.NoError(t, err) + + pathC := path.Join("a", "c") + key = path.Join(testPrefix, pathC) + err = testCM.Write(key, value) + assert.NoError(t, err) + + pathPrefix := path.Join(testPrefix, "a") + r, err := testCM.ListWithPrefix(pathPrefix) + assert.NoError(t, err) + assert.Equal(t, len(r), 2) + + testCM.RemoveWithPrefix(testPrefix) + r, err = testCM.ListWithPrefix(pathPrefix) + assert.NoError(t, err) + assert.Equal(t, len(r), 0) + }) } diff --git a/internal/storage/minio_chunk_manager.go b/internal/storage/minio_chunk_manager.go index cdfe9e080f..f87a1c8d30 100644 --- a/internal/storage/minio_chunk_manager.go +++ b/internal/storage/minio_chunk_manager.go @@ -69,19 +69,24 @@ func newMinioChunkManagerWithConfig(ctx context.Context, c *config) (*MinioChunk checkBucketFn := func() error { bucketExists, err = minIOClient.BucketExists(ctx, c.bucketName) if err != nil { + log.Warn("failed to check blob bucket exist", zap.String("bucket", c.bucketName), zap.Error(err)) return err } if !bucketExists { - log.Debug("minio chunk manager new minio client", zap.Any("Check bucket", "bucket not exist")) if c.createBucket { - log.Debug("minio chunk manager create minio bucket.", zap.Any("bucket name", c.bucketName)) - return minIOClient.MakeBucket(ctx, c.bucketName, minio.MakeBucketOptions{}) + log.Info("blob bucket not exist, create bucket.", zap.Any("bucket name", c.bucketName)) + err := minIOClient.MakeBucket(ctx, c.bucketName, minio.MakeBucketOptions{}) + if err != nil { + log.Warn("failed to create blob bucket", zap.String("bucket", c.bucketName), zap.Error(err)) + return err + } + } else { + return fmt.Errorf("bucket %s not Existed", c.bucketName) } - return fmt.Errorf("bucket %s not Existed", c.bucketName) } return nil } - err = retry.Do(ctx, checkBucketFn, retry.Attempts(100)) + err = retry.Do(ctx, checkBucketFn, retry.Attempts(20)) if err != nil { return nil, err } @@ -91,14 +96,17 @@ func newMinioChunkManagerWithConfig(ctx context.Context, c *config) (*MinioChunk Client: minIOClient, bucketName: c.bucketName, } - log.Debug("minio chunk manager new minio client success.") - + log.Info("minio chunk manager init success.", zap.String("bucketname", c.bucketName), zap.String("root", c.rootPath)) return mcm, nil } // Path returns the path of minio data if exists. func (mcm *MinioChunkManager) Path(filePath string) (string, error) { - if !mcm.Exist(filePath) { + exist, err := mcm.Exist(filePath) + if err != nil { + return "", err + } + if !exist { return "", errors.New("minio file manage cannot be found with filePath:" + filePath) } return filePath, nil @@ -106,15 +114,18 @@ func (mcm *MinioChunkManager) Path(filePath string) (string, error) { // Reader returns the path of minio data if exists. func (mcm *MinioChunkManager) Reader(filePath string) (FileReader, error) { - if !mcm.Exist(filePath) { - return nil, errors.New("minio file manage cannot be found with filePath:" + filePath) + reader, err := mcm.Client.GetObject(mcm.ctx, mcm.bucketName, filePath, minio.GetObjectOptions{}) + if err != nil { + log.Warn("failed to get object", zap.String("path", filePath), zap.Error(err)) + return nil, err } - return mcm.Client.GetObject(mcm.ctx, mcm.bucketName, filePath, minio.GetObjectOptions{}) + return reader, nil } func (mcm *MinioChunkManager) Size(filePath string) (int64, error) { objectInfo, err := mcm.Client.StatObject(mcm.ctx, mcm.bucketName, filePath, minio.StatObjectOptions{}) if err != nil { + log.Warn("failed to stat object", zap.String("path", filePath), zap.Error(err)) return 0, err } @@ -126,6 +137,7 @@ func (mcm *MinioChunkManager) Write(filePath string, content []byte) error { _, err := mcm.Client.PutObject(mcm.ctx, mcm.bucketName, filePath, bytes.NewReader(content), int64(len(content)), minio.PutObjectOptions{}) if err != nil { + log.Warn("failed to put object", zap.String("path", filePath), zap.Error(err)) return err } @@ -149,20 +161,34 @@ func (mcm *MinioChunkManager) MultiWrite(kvs map[string][]byte) error { } // Exist checks whether chunk is saved to minio storage. -func (mcm *MinioChunkManager) Exist(filePath string) bool { +func (mcm *MinioChunkManager) Exist(filePath string) (bool, error) { _, err := mcm.Client.StatObject(mcm.ctx, mcm.bucketName, filePath, minio.StatObjectOptions{}) - return err == nil + if err != nil { + errResponse := minio.ToErrorResponse(err) + if errResponse.Code == "NoSuchKey" { + return false, nil + } + log.Warn("failed to stat object", zap.String("path", filePath), zap.Error(err)) + return false, err + } + return true, nil } // Read reads the minio storage data if exists. func (mcm *MinioChunkManager) Read(filePath string) ([]byte, error) { object, err := mcm.Client.GetObject(mcm.ctx, mcm.bucketName, filePath, minio.GetObjectOptions{}) if err != nil { + log.Warn("failed to get object", zap.String("path", filePath), zap.Error(err)) return nil, err } defer object.Close() - return ioutil.ReadAll(object) + data, err := ioutil.ReadAll(object) + if err != nil { + log.Warn("failed to read object", zap.String("path", filePath), zap.Error(err)) + return nil, err + } + return data, nil } func (mcm *MinioChunkManager) MultiRead(keys []string) ([][]byte, error) { @@ -189,7 +215,6 @@ func (mcm *MinioChunkManager) ReadWithPrefix(prefix string) ([]string, [][]byte, } objectsValues, err := mcm.MultiRead(objectsKeys) if err != nil { - log.Error(fmt.Sprintf("MinIO load with prefix error. path = %s", prefix), zap.Error(err)) return nil, nil, err } @@ -209,21 +234,32 @@ func (mcm *MinioChunkManager) ReadAt(filePath string, off int64, length int64) ( opts := minio.GetObjectOptions{} err := opts.SetRange(off, off+length-1) if err != nil { + log.Warn("failed to set range", zap.String("path", filePath), zap.Error(err)) return nil, err } object, err := mcm.Client.GetObject(mcm.ctx, mcm.bucketName, filePath, opts) if err != nil { + log.Warn("failed to get object", zap.String("path", filePath), zap.Error(err)) return nil, err } defer object.Close() - return ioutil.ReadAll(object) + data, err := ioutil.ReadAll(object) + if err != nil { + log.Warn("failed to read object", zap.String("path", filePath), zap.Error(err)) + return nil, err + } + return data, nil } // Remove deletes an object with @key. -func (mcm *MinioChunkManager) Remove(key string) error { - err := mcm.Client.RemoveObject(mcm.ctx, mcm.bucketName, key, minio.RemoveObjectOptions{}) - return err +func (mcm *MinioChunkManager) Remove(filePath string) error { + err := mcm.Client.RemoveObject(mcm.ctx, mcm.bucketName, filePath, minio.RemoveObjectOptions{}) + if err != nil { + log.Warn("failed to remove object", zap.String("path", filePath), zap.Error(err)) + return err + } + return nil } // MultiRemove deletes a objects with @keys. @@ -243,18 +279,10 @@ func (mcm *MinioChunkManager) MultiRemove(keys []string) error { // RemoveWithPrefix removes all objects with the same prefix @prefix from minio. func (mcm *MinioChunkManager) RemoveWithPrefix(prefix string) error { - objectsCh := make(chan minio.ObjectInfo) - - go func() { - defer close(objectsCh) - - for object := range mcm.Client.ListObjects(mcm.ctx, mcm.bucketName, minio.ListObjectsOptions{Prefix: prefix, Recursive: true}) { - objectsCh <- object - } - }() - - for rErr := range mcm.Client.RemoveObjects(mcm.ctx, mcm.bucketName, objectsCh, minio.RemoveObjectsOptions{GovernanceBypass: true}) { + objects := mcm.Client.ListObjects(mcm.ctx, mcm.bucketName, minio.ListObjectsOptions{Prefix: prefix, Recursive: true}) + for rErr := range mcm.Client.RemoveObjects(mcm.ctx, mcm.bucketName, objects, minio.RemoveObjectsOptions{GovernanceBypass: true}) { if rErr.Err != nil { + log.Warn("failed to remove objects", zap.String("prefix", prefix), zap.Error(rErr.Err)) return rErr.Err } } @@ -262,11 +290,14 @@ func (mcm *MinioChunkManager) RemoveWithPrefix(prefix string) error { } func (mcm *MinioChunkManager) ListWithPrefix(prefix string) ([]string, error) { - objects := mcm.Client.ListObjects(mcm.ctx, mcm.bucketName, minio.ListObjectsOptions{Prefix: prefix}) - + objects := mcm.Client.ListObjects(mcm.ctx, mcm.bucketName, minio.ListObjectsOptions{Prefix: prefix, Recursive: true}) var objectsKeys []string for object := range objects { + if object.Err != nil { + log.Warn("failed to list with prefix", zap.String("prefix", prefix), zap.Error(object.Err)) + return nil, object.Err + } objectsKeys = append(objectsKeys, object.Key) } return objectsKeys, nil diff --git a/internal/storage/minio_chunk_manager_test.go b/internal/storage/minio_chunk_manager_test.go index 9fb401eb00..95f68ea87f 100644 --- a/internal/storage/minio_chunk_manager_test.go +++ b/internal/storage/minio_chunk_manager_test.go @@ -18,6 +18,7 @@ package storage import ( "context" + "fmt" "path" "strconv" "testing" @@ -163,7 +164,7 @@ func TestMinIOCM(t *testing.T) { expectedValue [][]byte description string }{ - {false, []string{"key_1", "key_not_exist"}, [][]byte{[]byte("111"), {}}, "multiload 1 exist 1 not"}, + {false, []string{"key_1", "key_not_exist"}, [][]byte{[]byte("111"), nil}, "multiload 1 exist 1 not"}, {true, []string{"abc", "key_3"}, [][]byte{[]byte("123"), []byte("333")}, "multiload 2 exist"}, } @@ -406,6 +407,7 @@ func TestMinIOCM(t *testing.T) { assert.Error(t, err) assert.Equal(t, p, "") }) + t.Run("test Mmap", func(t *testing.T) { testMmapRoot := path.Join(testMinIOKVRoot, "mmap") ctx, cancel := context.WithCancel(context.Background()) @@ -426,4 +428,44 @@ func TestMinIOCM(t *testing.T) { assert.Nil(t, r) }) + + t.Run("test Prefix", func(t *testing.T) { + testPrefix := path.Join(testMinIOKVRoot, "prefix") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + testCM, err := newMinIOChunkManager(ctx, testBucket) + require.NoError(t, err) + defer testCM.RemoveWithPrefix(testPrefix) + + pathB := path.Join("a", "b") + + key := path.Join(testPrefix, pathB) + value := []byte("a") + + err = testCM.Write(key, value) + assert.NoError(t, err) + + pathC := path.Join("a", "c") + key = path.Join(testPrefix, pathC) + err = testCM.Write(key, value) + assert.NoError(t, err) + + pathPrefix := path.Join(testPrefix, "a") + r, err := testCM.ListWithPrefix(pathPrefix) + assert.NoError(t, err) + assert.Equal(t, len(r), 2) + + testCM.RemoveWithPrefix(testPrefix) + r, err = testCM.ListWithPrefix(pathPrefix) + assert.NoError(t, err) + assert.Equal(t, len(r), 0) + + // test wrong prefix + b := make([]byte, 2048) + pathWrong := path.Join(testPrefix, string(b)) + _, err = testCM.ListWithPrefix(pathWrong) + assert.Error(t, err) + fmt.Println(err) + }) } diff --git a/internal/storage/types.go b/internal/storage/types.go index 9ce86b1c7e..a82c682738 100644 --- a/internal/storage/types.go +++ b/internal/storage/types.go @@ -34,7 +34,7 @@ type ChunkManager interface { // MultiWrite writes multi @content to @filePath. MultiWrite(contents map[string][]byte) error // Exist returns true if @filePath exists. - Exist(filePath string) bool + Exist(filePath string) (bool, error) // Read reads @filePath and returns content. Read(filePath string) ([]byte, error) // Reader return a reader for @filePath diff --git a/internal/storage/vector_chunk_manager.go b/internal/storage/vector_chunk_manager.go index 7628322de2..41b4eceea1 100644 --- a/internal/storage/vector_chunk_manager.go +++ b/internal/storage/vector_chunk_manager.go @@ -136,7 +136,7 @@ func (vcm *VectorChunkManager) MultiWrite(contents map[string][]byte) error { } // Exist checks whether vector data is saved to local cache. -func (vcm *VectorChunkManager) Exist(filePath string) bool { +func (vcm *VectorChunkManager) Exist(filePath string) (bool, error) { return vcm.vectorStorage.Exist(filePath) } diff --git a/internal/storage/vector_chunk_manager_test.go b/internal/storage/vector_chunk_manager_test.go index 09d768119f..2aa6bd3589 100644 --- a/internal/storage/vector_chunk_manager_test.go +++ b/internal/storage/vector_chunk_manager_test.go @@ -235,8 +235,9 @@ func TestVectorChunkManager_Write(t *testing.T) { err = vcm.Write(key, []byte{1}) assert.Nil(t, err) - exist := vcm.Exist(key) + exist, err := vcm.Exist(key) assert.True(t, exist) + assert.NoError(t, err) contents := map[string][]byte{ "key_1": {111}, @@ -245,10 +246,12 @@ func TestVectorChunkManager_Write(t *testing.T) { err = vcm.MultiWrite(contents) assert.NoError(t, err) - exist = vcm.Exist("key_1") + exist, err = vcm.Exist("key_1") assert.True(t, exist) - exist = vcm.Exist("key_2") + assert.NoError(t, err) + exist, err = vcm.Exist("key_2") assert.True(t, exist) + assert.NoError(t, err) err = vcm.RemoveWithPrefix(localPath) assert.NoError(t, err) @@ -271,8 +274,9 @@ func TestVectorChunkManager_Remove(t *testing.T) { err = vcm.Remove(key) assert.Nil(t, err) - exist := vcm.Exist(key) + exist, err := vcm.Exist(key) assert.False(t, exist) + assert.NoError(t, err) contents := map[string][]byte{ "key_1": {111}, @@ -284,10 +288,12 @@ func TestVectorChunkManager_Remove(t *testing.T) { err = vcm.MultiRemove([]string{"key_1", "key_2"}) assert.NoError(t, err) - exist = vcm.Exist("key_1") + exist, err = vcm.Exist("key_1") assert.False(t, exist) - exist = vcm.Exist("key_2") + assert.NoError(t, err) + exist, err = vcm.Exist("key_2") assert.False(t, exist) + assert.NoError(t, err) err = vcm.RemoveWithPrefix(localPath) assert.NoError(t, err) diff --git a/internal/util/importutil/import_wrapper_test.go b/internal/util/importutil/import_wrapper_test.go index 64b49559fb..2db78184cf 100644 --- a/internal/util/importutil/import_wrapper_test.go +++ b/internal/util/importutil/import_wrapper_test.go @@ -46,8 +46,8 @@ func (mc *MockChunkManager) MultiWrite(contents map[string][]byte) error { return nil } -func (mc *MockChunkManager) Exist(filePath string) bool { - return true +func (mc *MockChunkManager) Exist(filePath string) (bool, error) { + return true, nil } func (mc *MockChunkManager) Read(filePath string) ([]byte, error) {