From 41bd866ff6305a59bdea33474ec1867648ef9dbc Mon Sep 17 00:00:00 2001 From: dragondriver Date: Sat, 9 Oct 2021 19:33:03 +0800 Subject: [PATCH] Support LoadPartial interface for DataKV (#9554) Signed-off-by: dragondriver --- internal/kv/kv.go | 6 ++++ internal/kv/mem/mem_kv.go | 16 +++++++++ internal/kv/mem/mem_kv_test.go | 58 ++++++++++++++++++++++++++++++ internal/kv/minio/minio_kv.go | 26 ++++++++++++++ internal/kv/minio/minio_kv_test.go | 55 ++++++++++++++++++++++++++++ 5 files changed, 161 insertions(+) create mode 100644 internal/kv/mem/mem_kv_test.go diff --git a/internal/kv/kv.go b/internal/kv/kv.go index 7a815939fa..5fa1729638 100644 --- a/internal/kv/kv.go +++ b/internal/kv/kv.go @@ -30,6 +30,12 @@ type BaseKV interface { Close() } +// DataKV persists the data. +type DataKV interface { + BaseKV + LoadPartial(key string, start, end int64) ([]byte, error) +} + // TxnKV contains extra txn operations of kv. The extra operations is transactional. type TxnKV interface { BaseKV diff --git a/internal/kv/mem/mem_kv.go b/internal/kv/mem/mem_kv.go index 4ce274b1af..85314e9daf 100644 --- a/internal/kv/mem/mem_kv.go +++ b/internal/kv/mem/mem_kv.go @@ -12,6 +12,7 @@ package memkv import ( + "fmt" "strings" "sync" @@ -196,3 +197,18 @@ func (kv *MemoryKV) RemoveWithPrefix(key string) error { } return nil } + +// item already in memory, just slice the value. +func (kv *MemoryKV) LoadPartial(key string, start, end int64) ([]byte, error) { + value, err := kv.Load(key) + if err != nil { + return nil, err + } + switch { + case 0 <= start && start < end && end <= int64(len(value)): + return []byte(value[start:end]), nil + default: + return nil, fmt.Errorf("invalid range specified: start=%d end=%d", + start, end) + } +} diff --git a/internal/kv/mem/mem_kv_test.go b/internal/kv/mem/mem_kv_test.go new file mode 100644 index 0000000000..b824d52ea7 --- /dev/null +++ b/internal/kv/mem/mem_kv_test.go @@ -0,0 +1,58 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package memkv + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestMemoryKV_LoadPartial(t *testing.T) { + memKV := NewMemoryKV() + + key := "TestMemoryKV_LoadPartial_key" + value := "TestMemoryKV_LoadPartial_value" + + err := memKV.Save(key, value) + assert.NoError(t, err) + + var start, end int64 + var partial []byte + + // case 0 <= start && start = end && end <= int64(len(value)) + + start, end = 1, 2 + partial, err = memKV.LoadPartial(key, start, end) + assert.NoError(t, err) + assert.ElementsMatch(t, partial, []byte(value[start:end])) + + start, end = int64(len(value)-2), int64(len(value)-1) + partial, err = memKV.LoadPartial(key, start, end) + assert.NoError(t, err) + assert.ElementsMatch(t, partial, []byte(value[start:end])) + + // error case + start, end = 5, 3 + _, err = memKV.LoadPartial(key, start, end) + assert.Error(t, err) + + start, end = 1, 1 + _, err = memKV.LoadPartial(key, start, end) + assert.Error(t, err) + + err = memKV.Remove(key) + assert.NoError(t, err) + start, end = 1, 2 + _, err = memKV.LoadPartial(key, start, end) + assert.Error(t, err) +} diff --git a/internal/kv/minio/minio_kv.go b/internal/kv/minio/minio_kv.go index a0ffd62ea2..a8aef4e6d7 100644 --- a/internal/kv/minio/minio_kv.go +++ b/internal/kv/minio/minio_kv.go @@ -14,6 +14,7 @@ package miniokv import ( "context" "fmt" + "io/ioutil" "sync" "io" @@ -245,6 +246,31 @@ func (kv *MinIOKV) MultiRemove(keys []string) error { return resultErr } +func (kv *MinIOKV) LoadPartial(key string, start, end int64) ([]byte, error) { + switch { + case start < 0 || end < 0: + return nil, fmt.Errorf("invalid range specified: start=%d end=%d", + start, end) + case start >= end: + return nil, fmt.Errorf("invalid range specified: start=%d end=%d", + start, end) + } + + opts := minio.GetObjectOptions{} + err := opts.SetRange(start, end-1) + if err != nil { + return nil, err + } + + object, err := kv.minioClient.GetObject(kv.ctx, kv.bucketName, key, opts) + if err != nil { + return nil, err + } + defer object.Close() + + return ioutil.ReadAll(object) +} + func (kv *MinIOKV) Close() { } diff --git a/internal/kv/minio/minio_kv_test.go b/internal/kv/minio/minio_kv_test.go index 4cb93d976b..f4df16e2a4 100644 --- a/internal/kv/minio/minio_kv_test.go +++ b/internal/kv/minio/minio_kv_test.go @@ -189,6 +189,61 @@ func TestMinIOKV_Remove(t *testing.T) { assert.Empty(t, val) } +func TestMinIOKV_LoadPartial(t *testing.T) { + Params.Init() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + bucketName := "fantastic-tech-test" + minIOKV, err := newMinIOKVClient(ctx, bucketName) + assert.Nil(t, err) + + defer minIOKV.RemoveWithPrefix("") + + key := "TestMinIOKV_LoadPartial_key" + value := "TestMinIOKV_LoadPartial_value" + + err = minIOKV.Save(key, value) + assert.NoError(t, err) + + var start, end int64 + var partial []byte + + start, end = 1, 2 + partial, err = minIOKV.LoadPartial(key, start, end) + assert.NoError(t, err) + assert.ElementsMatch(t, partial, []byte(value[start:end])) + + start, end = 0, int64(len(value)) + partial, err = minIOKV.LoadPartial(key, start, end) + assert.NoError(t, err) + assert.ElementsMatch(t, partial, []byte(value[start:end])) + + // error case + start, end = 5, 3 + _, err = minIOKV.LoadPartial(key, start, end) + assert.Error(t, err) + + start, end = 1, 1 + _, err = minIOKV.LoadPartial(key, start, end) + assert.Error(t, err) + + start, end = -1, 1 + _, err = minIOKV.LoadPartial(key, start, end) + assert.Error(t, err) + + start, end = 1, -1 + _, err = minIOKV.LoadPartial(key, start, end) + assert.Error(t, err) + + err = minIOKV.Remove(key) + assert.NoError(t, err) + start, end = 1, 2 + _, err = minIOKV.LoadPartial(key, start, end) + assert.Error(t, err) +} + func TestMinIOKV_FGetObject(t *testing.T) { Params.Init() path := "/tmp/milvus/data"