mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-30 23:45:28 +08:00
Add methods to load/save bytes for MinIOKV (#15732)
Signed-off-by: yah01 <yang.cen@zilliz.com>
This commit is contained in:
parent
cdcb3627b4
commit
1a9933a7c3
@ -17,6 +17,7 @@
|
||||
package miniokv
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
@ -25,6 +26,7 @@ import (
|
||||
"io"
|
||||
"strings"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/kv"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/util/retry"
|
||||
"github.com/minio/minio-go/v7"
|
||||
@ -32,6 +34,8 @@ import (
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var _ kv.DataKV = (*MinIOKV)(nil)
|
||||
|
||||
// MinIOKV implements DataKV interface and relies on underling MinIO service.
|
||||
// MinIOKV object contains a client which can be used to access the MinIO service.
|
||||
type MinIOKV struct {
|
||||
@ -119,28 +123,75 @@ func (kv *MinIOKV) LoadWithPrefix(key string) ([]string, []string, error) {
|
||||
return objectsKeys, objectsValues, nil
|
||||
}
|
||||
|
||||
func (kv *MinIOKV) LoadBytesWithPrefix(key string) ([]string, [][]byte, error) {
|
||||
objects := kv.minioClient.ListObjects(kv.ctx, kv.bucketName, minio.ListObjectsOptions{Prefix: key})
|
||||
|
||||
var (
|
||||
objectsKeys = make([]string, 0, len(objects))
|
||||
objectsValues [][]byte
|
||||
)
|
||||
|
||||
for object := range objects {
|
||||
objectsKeys = append(objectsKeys, object.Key)
|
||||
}
|
||||
objectsValues, err := kv.MultiLoadBytes(objectsKeys)
|
||||
if err != nil {
|
||||
log.Error(fmt.Sprintf("MinIO load with prefix error. path = %s", key), zap.Error(err))
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return objectsKeys, objectsValues, nil
|
||||
}
|
||||
|
||||
// Load loads an object with @key.
|
||||
func (kv *MinIOKV) Load(key string) (string, error) {
|
||||
object, err := kv.minioClient.GetObject(kv.ctx, kv.bucketName, key, minio.GetObjectOptions{})
|
||||
if object != nil {
|
||||
defer object.Close()
|
||||
}
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if object != nil {
|
||||
defer object.Close()
|
||||
}
|
||||
|
||||
info, err := object.Stat()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
buf := new(strings.Builder)
|
||||
buf.Grow(int(info.Size))
|
||||
_, err = io.Copy(buf, object)
|
||||
if err != nil && err != io.EOF {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return buf.String(), nil
|
||||
}
|
||||
|
||||
// Load loads an object with @key.
|
||||
func (kv *MinIOKV) LoadBytes(key string) ([]byte, error) {
|
||||
object, err := kv.minioClient.GetObject(kv.ctx, kv.bucketName, key, minio.GetObjectOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if object != nil {
|
||||
defer object.Close()
|
||||
}
|
||||
|
||||
info, err := object.Stat()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
buf := bytes.NewBuffer(make([]byte, 0, info.Size))
|
||||
_, err = io.Copy(buf, object)
|
||||
if err != nil && err != io.EOF {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return buf.Bytes(), nil
|
||||
}
|
||||
|
||||
// FGetObject downloads file from minio to local storage system.
|
||||
func (kv *MinIOKV) FGetObject(key, localPath string) error {
|
||||
return kv.minioClient.FGetObject(kv.ctx, kv.bucketName, key, localPath+key, minio.GetObjectOptions{})
|
||||
@ -172,19 +223,30 @@ func (kv *MinIOKV) FGetObjects(keys []string, localPath string) error {
|
||||
|
||||
// MultiLoad loads objects with multi @keys.
|
||||
func (kv *MinIOKV) MultiLoad(keys []string) ([]string, error) {
|
||||
var resultErr error
|
||||
var objectsValues []string
|
||||
for _, key := range keys {
|
||||
objectValue, err := kv.Load(key)
|
||||
if err != nil {
|
||||
if resultErr == nil {
|
||||
resultErr = err
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
objectsValues = append(objectsValues, objectValue)
|
||||
}
|
||||
|
||||
return objectsValues, resultErr
|
||||
return objectsValues, nil
|
||||
}
|
||||
|
||||
func (kv *MinIOKV) MultiLoadBytes(keys []string) ([][]byte, error) {
|
||||
objectsValues := make([][]byte, 0, len(keys))
|
||||
|
||||
for _, key := range keys {
|
||||
objectValue, err := kv.LoadBytes(key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
objectsValues = append(objectsValues, objectValue)
|
||||
}
|
||||
|
||||
return objectsValues, nil
|
||||
}
|
||||
|
||||
// Save object with @key to Minio. Object value is @value.
|
||||
@ -192,9 +254,12 @@ func (kv *MinIOKV) Save(key, value string) error {
|
||||
reader := strings.NewReader(value)
|
||||
_, err := kv.minioClient.PutObject(kv.ctx, kv.bucketName, key, reader, int64(len(value)), minio.PutObjectOptions{})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (kv *MinIOKV) SaveBytes(key string, value []byte) error {
|
||||
reader := bytes.NewReader(value)
|
||||
_, err := kv.minioClient.PutObject(kv.ctx, kv.bucketName, key, reader, int64(len(value)), minio.PutObjectOptions{})
|
||||
|
||||
return err
|
||||
}
|
||||
@ -202,16 +267,25 @@ func (kv *MinIOKV) Save(key, value string) error {
|
||||
// MultiSave saves multiple objects, the path is the key of @kvs.
|
||||
// The object value is the value of @kvs.
|
||||
func (kv *MinIOKV) MultiSave(kvs map[string]string) error {
|
||||
var resultErr error
|
||||
for key, value := range kvs {
|
||||
err := kv.Save(key, value)
|
||||
if err != nil {
|
||||
if resultErr == nil {
|
||||
resultErr = err
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
return resultErr
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (kv *MinIOKV) MultiSaveBytes(kvs map[string][]byte) error {
|
||||
for key, value := range kvs {
|
||||
err := kv.SaveBytes(key, value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// RemoveWithPrefix removes all objects with the same prefix @prefix from minio.
|
||||
@ -236,22 +310,19 @@ func (kv *MinIOKV) RemoveWithPrefix(prefix string) error {
|
||||
|
||||
// Remove deletes an object with @key.
|
||||
func (kv *MinIOKV) Remove(key string) error {
|
||||
err := kv.minioClient.RemoveObject(kv.ctx, kv.bucketName, string(key), minio.RemoveObjectOptions{})
|
||||
return err
|
||||
return kv.minioClient.RemoveObject(kv.ctx, kv.bucketName, key, minio.RemoveObjectOptions{})
|
||||
}
|
||||
|
||||
// MultiRemove deletes an objects with @keys.
|
||||
func (kv *MinIOKV) MultiRemove(keys []string) error {
|
||||
var resultErr error
|
||||
for _, key := range keys {
|
||||
err := kv.Remove(key)
|
||||
if err != nil {
|
||||
if resultErr == nil {
|
||||
resultErr = err
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
return resultErr
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// LoadPartial loads partial data ranged in [start, end) with @key.
|
||||
|
||||
@ -118,6 +118,10 @@ func TestMinIOKV(t *testing.T) {
|
||||
got, err := testKV.Load(path.Join(testLoadRoot, test.loadKey))
|
||||
assert.Error(t, err)
|
||||
assert.Empty(t, got)
|
||||
|
||||
value, err := testKV.LoadBytes(path.Join(testLoadRoot, test.loadKey))
|
||||
assert.Error(t, err)
|
||||
assert.Nil(t, value)
|
||||
}
|
||||
})
|
||||
}
|
||||
@ -141,6 +145,17 @@ func TestMinIOKV(t *testing.T) {
|
||||
assert.Equal(t, len(test.expectedValue), len(gotk))
|
||||
assert.Equal(t, len(test.expectedValue), len(gotv))
|
||||
assert.ElementsMatch(t, test.expectedValue, gotv)
|
||||
|
||||
keys, values, err := testKV.LoadBytesWithPrefix(path.Join(testLoadRoot, test.prefix))
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, len(test.expectedValue), len(keys))
|
||||
assert.Equal(t, len(test.expectedValue), len(values))
|
||||
expectedValuesBytes := make([][]byte, 0)
|
||||
for _, value := range test.expectedValue {
|
||||
expectedValuesBytes = append(expectedValuesBytes, []byte(value))
|
||||
}
|
||||
assert.ElementsMatch(t, expectedValuesBytes, values)
|
||||
})
|
||||
}
|
||||
|
||||
@ -151,7 +166,7 @@ func TestMinIOKV(t *testing.T) {
|
||||
expectedValue []string
|
||||
description string
|
||||
}{
|
||||
{false, []string{"key_1", "key_not_exist"}, []string{"111", ""}, "multiload 1 exist 1 not"},
|
||||
{false, []string{"key_1", "key_not_exist"}, nil, "multiload 1 exist 1 not"},
|
||||
{true, []string{"abc", "key_3"}, []string{"123", "333"}, "multiload 2 exist"},
|
||||
}
|
||||
|
||||
@ -168,9 +183,14 @@ func TestMinIOKV(t *testing.T) {
|
||||
got, err := testKV.MultiLoad(test.multiKeys)
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, test.expectedValue, got)
|
||||
|
||||
value, err := testKV.MultiLoadBytes(test.multiKeys)
|
||||
assert.Error(t, err)
|
||||
assert.Nil(t, value)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
})
|
||||
|
||||
t.Run("test MultiSave", func(t *testing.T) {
|
||||
@ -194,9 +214,25 @@ func TestMinIOKV(t *testing.T) {
|
||||
err = testKV.MultiSave(kvs)
|
||||
assert.Nil(t, err)
|
||||
|
||||
val, err := testKV.Load(path.Join(testMultiSaveRoot, "key_1"))
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, "123", val)
|
||||
for k, v := range kvs {
|
||||
val, err := testKV.Load(k)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, v, val)
|
||||
}
|
||||
|
||||
bytesKvs := map[string][]byte{
|
||||
path.Join(testMultiSaveRoot, "key_1"): {0x12, 0x34},
|
||||
path.Join(testMultiSaveRoot, "key_2"): {0x56, 0x78},
|
||||
}
|
||||
|
||||
err = testKV.MultiSaveBytes(bytesKvs)
|
||||
assert.NoError(t, err)
|
||||
|
||||
for k, v := range bytesKvs {
|
||||
val, err := testKV.LoadBytes(k)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, v, val)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("test Remove", func(t *testing.T) {
|
||||
@ -245,6 +281,9 @@ func TestMinIOKV(t *testing.T) {
|
||||
err = testKV.Remove(k)
|
||||
assert.NoError(t, err)
|
||||
|
||||
exist := testKV.Exist(k)
|
||||
assert.False(t, exist)
|
||||
|
||||
v, err = testKV.Load(k)
|
||||
require.Error(t, err)
|
||||
require.Empty(t, v)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user