diff --git a/internal/kv/minio/minio_kv.go b/internal/kv/minio/minio_kv.go index 2a3506393b..3afa2809da 100644 --- a/internal/kv/minio/minio_kv.go +++ b/internal/kv/minio/minio_kv.go @@ -17,6 +17,7 @@ package miniokv import ( + "bytes" "context" "fmt" "io/ioutil" @@ -25,6 +26,7 @@ import ( "io" "strings" + "github.com/milvus-io/milvus/internal/kv" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/util/retry" "github.com/minio/minio-go/v7" @@ -32,6 +34,8 @@ import ( "go.uber.org/zap" ) +var _ kv.DataKV = (*MinIOKV)(nil) + // MinIOKV implements DataKV interface and relies on underling MinIO service. // MinIOKV object contains a client which can be used to access the MinIO service. type MinIOKV struct { @@ -119,28 +123,75 @@ func (kv *MinIOKV) LoadWithPrefix(key string) ([]string, []string, error) { return objectsKeys, objectsValues, nil } +func (kv *MinIOKV) LoadBytesWithPrefix(key string) ([]string, [][]byte, error) { + objects := kv.minioClient.ListObjects(kv.ctx, kv.bucketName, minio.ListObjectsOptions{Prefix: key}) + + var ( + objectsKeys = make([]string, 0, len(objects)) + objectsValues [][]byte + ) + + for object := range objects { + objectsKeys = append(objectsKeys, object.Key) + } + objectsValues, err := kv.MultiLoadBytes(objectsKeys) + if err != nil { + log.Error(fmt.Sprintf("MinIO load with prefix error. path = %s", key), zap.Error(err)) + return nil, nil, err + } + + return objectsKeys, objectsValues, nil +} + // Load loads an object with @key. func (kv *MinIOKV) Load(key string) (string, error) { object, err := kv.minioClient.GetObject(kv.ctx, kv.bucketName, key, minio.GetObjectOptions{}) - if object != nil { - defer object.Close() - } if err != nil { return "", err } + if object != nil { + defer object.Close() + } + info, err := object.Stat() if err != nil { return "", err } + buf := new(strings.Builder) buf.Grow(int(info.Size)) _, err = io.Copy(buf, object) if err != nil && err != io.EOF { return "", err } + return buf.String(), nil } +// Load loads an object with @key. +func (kv *MinIOKV) LoadBytes(key string) ([]byte, error) { + object, err := kv.minioClient.GetObject(kv.ctx, kv.bucketName, key, minio.GetObjectOptions{}) + if err != nil { + return nil, err + } + if object != nil { + defer object.Close() + } + + info, err := object.Stat() + if err != nil { + return nil, err + } + + buf := bytes.NewBuffer(make([]byte, 0, info.Size)) + _, err = io.Copy(buf, object) + if err != nil && err != io.EOF { + return nil, err + } + + return buf.Bytes(), nil +} + // FGetObject downloads file from minio to local storage system. func (kv *MinIOKV) FGetObject(key, localPath string) error { return kv.minioClient.FGetObject(kv.ctx, kv.bucketName, key, localPath+key, minio.GetObjectOptions{}) @@ -172,19 +223,30 @@ func (kv *MinIOKV) FGetObjects(keys []string, localPath string) error { // MultiLoad loads objects with multi @keys. func (kv *MinIOKV) MultiLoad(keys []string) ([]string, error) { - var resultErr error var objectsValues []string for _, key := range keys { objectValue, err := kv.Load(key) if err != nil { - if resultErr == nil { - resultErr = err - } + return nil, err } objectsValues = append(objectsValues, objectValue) } - return objectsValues, resultErr + return objectsValues, nil +} + +func (kv *MinIOKV) MultiLoadBytes(keys []string) ([][]byte, error) { + objectsValues := make([][]byte, 0, len(keys)) + + for _, key := range keys { + objectValue, err := kv.LoadBytes(key) + if err != nil { + return nil, err + } + objectsValues = append(objectsValues, objectValue) + } + + return objectsValues, nil } // Save object with @key to Minio. Object value is @value. @@ -192,9 +254,12 @@ func (kv *MinIOKV) Save(key, value string) error { reader := strings.NewReader(value) _, err := kv.minioClient.PutObject(kv.ctx, kv.bucketName, key, reader, int64(len(value)), minio.PutObjectOptions{}) - if err != nil { - return err - } + return err +} + +func (kv *MinIOKV) SaveBytes(key string, value []byte) error { + reader := bytes.NewReader(value) + _, err := kv.minioClient.PutObject(kv.ctx, kv.bucketName, key, reader, int64(len(value)), minio.PutObjectOptions{}) return err } @@ -202,16 +267,25 @@ func (kv *MinIOKV) Save(key, value string) error { // MultiSave saves multiple objects, the path is the key of @kvs. // The object value is the value of @kvs. func (kv *MinIOKV) MultiSave(kvs map[string]string) error { - var resultErr error for key, value := range kvs { err := kv.Save(key, value) if err != nil { - if resultErr == nil { - resultErr = err - } + return err } } - return resultErr + + return nil +} + +func (kv *MinIOKV) MultiSaveBytes(kvs map[string][]byte) error { + for key, value := range kvs { + err := kv.SaveBytes(key, value) + if err != nil { + return err + } + } + + return nil } // RemoveWithPrefix removes all objects with the same prefix @prefix from minio. @@ -236,22 +310,19 @@ func (kv *MinIOKV) RemoveWithPrefix(prefix string) error { // Remove deletes an object with @key. func (kv *MinIOKV) Remove(key string) error { - err := kv.minioClient.RemoveObject(kv.ctx, kv.bucketName, string(key), minio.RemoveObjectOptions{}) - return err + return kv.minioClient.RemoveObject(kv.ctx, kv.bucketName, key, minio.RemoveObjectOptions{}) } // MultiRemove deletes an objects with @keys. func (kv *MinIOKV) MultiRemove(keys []string) error { - var resultErr error for _, key := range keys { err := kv.Remove(key) if err != nil { - if resultErr == nil { - resultErr = err - } + return err } } - return resultErr + + return nil } // LoadPartial loads partial data ranged in [start, end) with @key. diff --git a/internal/kv/minio/minio_kv_test.go b/internal/kv/minio/minio_kv_test.go index 58079f8296..e42d325869 100644 --- a/internal/kv/minio/minio_kv_test.go +++ b/internal/kv/minio/minio_kv_test.go @@ -118,6 +118,10 @@ func TestMinIOKV(t *testing.T) { got, err := testKV.Load(path.Join(testLoadRoot, test.loadKey)) assert.Error(t, err) assert.Empty(t, got) + + value, err := testKV.LoadBytes(path.Join(testLoadRoot, test.loadKey)) + assert.Error(t, err) + assert.Nil(t, value) } }) } @@ -141,6 +145,17 @@ func TestMinIOKV(t *testing.T) { assert.Equal(t, len(test.expectedValue), len(gotk)) assert.Equal(t, len(test.expectedValue), len(gotv)) assert.ElementsMatch(t, test.expectedValue, gotv) + + keys, values, err := testKV.LoadBytesWithPrefix(path.Join(testLoadRoot, test.prefix)) + assert.NoError(t, err) + + assert.Equal(t, len(test.expectedValue), len(keys)) + assert.Equal(t, len(test.expectedValue), len(values)) + expectedValuesBytes := make([][]byte, 0) + for _, value := range test.expectedValue { + expectedValuesBytes = append(expectedValuesBytes, []byte(value)) + } + assert.ElementsMatch(t, expectedValuesBytes, values) }) } @@ -151,7 +166,7 @@ func TestMinIOKV(t *testing.T) { expectedValue []string description string }{ - {false, []string{"key_1", "key_not_exist"}, []string{"111", ""}, "multiload 1 exist 1 not"}, + {false, []string{"key_1", "key_not_exist"}, nil, "multiload 1 exist 1 not"}, {true, []string{"abc", "key_3"}, []string{"123", "333"}, "multiload 2 exist"}, } @@ -168,9 +183,14 @@ func TestMinIOKV(t *testing.T) { got, err := testKV.MultiLoad(test.multiKeys) assert.Error(t, err) assert.Equal(t, test.expectedValue, got) + + value, err := testKV.MultiLoadBytes(test.multiKeys) + assert.Error(t, err) + assert.Nil(t, value) } }) } + }) t.Run("test MultiSave", func(t *testing.T) { @@ -194,9 +214,25 @@ func TestMinIOKV(t *testing.T) { err = testKV.MultiSave(kvs) assert.Nil(t, err) - val, err := testKV.Load(path.Join(testMultiSaveRoot, "key_1")) - assert.Nil(t, err) - assert.Equal(t, "123", val) + for k, v := range kvs { + val, err := testKV.Load(k) + assert.Nil(t, err) + assert.Equal(t, v, val) + } + + bytesKvs := map[string][]byte{ + path.Join(testMultiSaveRoot, "key_1"): {0x12, 0x34}, + path.Join(testMultiSaveRoot, "key_2"): {0x56, 0x78}, + } + + err = testKV.MultiSaveBytes(bytesKvs) + assert.NoError(t, err) + + for k, v := range bytesKvs { + val, err := testKV.LoadBytes(k) + assert.Nil(t, err) + assert.Equal(t, v, val) + } }) t.Run("test Remove", func(t *testing.T) { @@ -245,6 +281,9 @@ func TestMinIOKV(t *testing.T) { err = testKV.Remove(k) assert.NoError(t, err) + exist := testKV.Exist(k) + assert.False(t, exist) + v, err = testKV.Load(k) require.Error(t, err) require.Empty(t, v)