make etcdKV private (#24778)

Signed-off-by: yiwangdr <yiwangdr@gmail.com>
This commit is contained in:
yiwangdr 2023-06-12 19:52:38 -07:00 committed by GitHub
parent f12574aaf3
commit 4387f36897
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 138 additions and 123 deletions

View File

@ -17,6 +17,7 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/datapb"
pb "github.com/milvus-io/milvus/internal/proto/etcdpb" pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
@ -51,7 +52,7 @@ type mck struct {
taskIDToInvalidPath map[int64][]string taskIDToInvalidPath map[int64][]string
segmentIDMap map[int64]*datapb.SegmentInfo segmentIDMap map[int64]*datapb.SegmentInfo
partitionIDMap map[int64]struct{} partitionIDMap map[int64]struct{}
etcdKV *etcdkv.EtcdKV metaKV kv.MetaKv
minioChunkManager storage.ChunkManager minioChunkManager storage.ChunkManager
etcdIP string etcdIP string
@ -106,7 +107,7 @@ func (c *mck) execute(args []string, flags *flag.FlagSet) {
func (c *mck) run() { func (c *mck) run() {
c.connectMinio() c.connectMinio()
_, values, err := c.etcdKV.LoadWithPrefix(segmentPrefix) _, values, err := c.metaKV.LoadWithPrefix(segmentPrefix)
if err != nil { if err != nil {
log.Fatal("failed to list the segment info", zap.String("key", segmentPrefix), zap.Error(err)) log.Fatal("failed to list the segment info", zap.String("key", segmentPrefix), zap.Error(err))
} }
@ -120,7 +121,7 @@ func (c *mck) run() {
c.segmentIDMap[info.ID] = info c.segmentIDMap[info.ID] = info
} }
_, values, err = c.etcdKV.LoadWithPrefix(collectionPrefix) _, values, err = c.metaKV.LoadWithPrefix(collectionPrefix)
if err != nil { if err != nil {
log.Fatal("failed to list the collection info", zap.String("key", collectionPrefix), zap.Error(err)) log.Fatal("failed to list the collection info", zap.String("key", collectionPrefix), zap.Error(err))
} }
@ -149,13 +150,13 @@ func (c *mck) run() {
} }
log.Info("partition ids", zap.Int64s("ids", ids)) log.Info("partition ids", zap.Int64s("ids", ids))
keys, values, err := c.etcdKV.LoadWithPrefix(triggerTaskPrefix) keys, values, err := c.metaKV.LoadWithPrefix(triggerTaskPrefix)
if err != nil { if err != nil {
log.Fatal("failed to list the trigger task info", zap.Error(err)) log.Fatal("failed to list the trigger task info", zap.Error(err))
} }
c.extractTask(triggerTaskPrefix, keys, values) c.extractTask(triggerTaskPrefix, keys, values)
keys, values, err = c.etcdKV.LoadWithPrefix(activeTaskPrefix) keys, values, err = c.metaKV.LoadWithPrefix(activeTaskPrefix)
if err != nil { if err != nil {
log.Fatal("failed to list the active task info", zap.Error(err)) log.Fatal("failed to list the active task info", zap.Error(err))
} }
@ -229,7 +230,7 @@ func (c *mck) connectEctd() {
} }
rootPath := getConfigValue(c.ectdRootPath, c.params.EtcdCfg.MetaRootPath.GetValue(), "ectd_root_path") rootPath := getConfigValue(c.ectdRootPath, c.params.EtcdCfg.MetaRootPath.GetValue(), "ectd_root_path")
c.etcdKV = etcdkv.NewEtcdKV(etcdCli, rootPath) c.metaKV = etcdkv.NewEtcdKV(etcdCli, rootPath)
log.Info("Etcd root path", zap.String("root_path", rootPath)) log.Info("Etcd root path", zap.String("root_path", rootPath))
} }
@ -255,7 +256,7 @@ func getConfigValue(a string, b string, name string) string {
} }
func (c *mck) cleanTrash() { func (c *mck) cleanTrash() {
keys, _, err := c.etcdKV.LoadWithPrefix(MckTrash) keys, _, err := c.metaKV.LoadWithPrefix(MckTrash)
if err != nil { if err != nil {
log.Error("failed to load backup info", zap.Error(err)) log.Error("failed to load backup info", zap.Error(err))
return return
@ -269,7 +270,7 @@ func (c *mck) cleanTrash() {
deleteAll := "" deleteAll := ""
fmt.Scanln(&deleteAll) fmt.Scanln(&deleteAll)
if deleteAll == "Y" { if deleteAll == "Y" {
err = c.etcdKV.RemoveWithPrefix(MckTrash) err = c.metaKV.RemoveWithPrefix(MckTrash)
if err != nil { if err != nil {
log.Error("failed to remove backup infos", zap.String("key", MckTrash), zap.Error(err)) log.Error("failed to remove backup infos", zap.String("key", MckTrash), zap.Error(err))
return return
@ -392,31 +393,31 @@ func (c *mck) extractTask(prefix string, keys []string, values []string) {
func (c *mck) removeTask(invalidTask int64) bool { func (c *mck) removeTask(invalidTask int64) bool {
taskType := c.taskNameMap[invalidTask] taskType := c.taskNameMap[invalidTask]
key := c.taskKeyMap[invalidTask] key := c.taskKeyMap[invalidTask]
err := c.etcdKV.Save(getTrashKey(taskType, key), c.allTaskInfo[key]) err := c.metaKV.Save(getTrashKey(taskType, key), c.allTaskInfo[key])
if err != nil { if err != nil {
log.Warn("failed to backup task", zap.String("key", getTrashKey(taskType, key)), zap.Int64("task_id", invalidTask), zap.Error(err)) log.Warn("failed to backup task", zap.String("key", getTrashKey(taskType, key)), zap.Int64("task_id", invalidTask), zap.Error(err))
return false return false
} }
fmt.Printf("Back up task successfully, back path: %s\n", getTrashKey(taskType, key)) fmt.Printf("Back up task successfully, back path: %s\n", getTrashKey(taskType, key))
err = c.etcdKV.Remove(key) err = c.metaKV.Remove(key)
if err != nil { if err != nil {
log.Warn("failed to remove task", zap.Int64("task_id", invalidTask), zap.Error(err)) log.Warn("failed to remove task", zap.Int64("task_id", invalidTask), zap.Error(err))
return false return false
} }
key = fmt.Sprintf("%s/%d", taskInfoPrefix, invalidTask) key = fmt.Sprintf("%s/%d", taskInfoPrefix, invalidTask)
taskInfo, err := c.etcdKV.Load(key) taskInfo, err := c.metaKV.Load(key)
if err != nil { if err != nil {
log.Warn("failed to load task info", zap.Int64("task_id", invalidTask), zap.Error(err)) log.Warn("failed to load task info", zap.Int64("task_id", invalidTask), zap.Error(err))
return false return false
} }
err = c.etcdKV.Save(getTrashKey(taskType, key), taskInfo) err = c.metaKV.Save(getTrashKey(taskType, key), taskInfo)
if err != nil { if err != nil {
log.Warn("failed to backup task info", zap.Int64("task_id", invalidTask), zap.Error(err)) log.Warn("failed to backup task info", zap.Int64("task_id", invalidTask), zap.Error(err))
return false return false
} }
fmt.Printf("Back up task info successfully, back path: %s\n", getTrashKey(taskType, key)) fmt.Printf("Back up task info successfully, back path: %s\n", getTrashKey(taskType, key))
err = c.etcdKV.Remove(key) err = c.metaKV.Remove(key)
if err != nil { if err != nil {
log.Warn("failed to remove task info", zap.Int64("task_id", invalidTask), zap.Error(err)) log.Warn("failed to remove task info", zap.Int64("task_id", invalidTask), zap.Error(err))
} }

View File

@ -449,31 +449,31 @@ type SnapShotKV interface {
###### A.7.5 Etcd KV ###### A.7.5 Etcd KV
```go ```go
type EtcdKV struct { type etcdKV struct {
client *clientv3.Client client *clientv3.Client
rootPath string rootPath string
} }
func (kv *EtcdKV) Close() func (kv *etcdKV) Close()
func (kv *EtcdKV) GetPath(key string) string func (kv *etcdKV) GetPath(key string) string
func (kv *EtcdKV) LoadWithPrefix(key string) ([]string, []string, error) func (kv *etcdKV) LoadWithPrefix(key string) ([]string, []string, error)
func (kv *EtcdKV) Load(key string) (string, error) func (kv *etcdKV) Load(key string) (string, error)
func (kv *EtcdKV) GetCount(key string) (int64, error) func (kv *etcdKV) GetCount(key string) (int64, error)
func (kv *EtcdKV) MultiLoad(keys []string) ([]string, error) func (kv *etcdKV) MultiLoad(keys []string) ([]string, error)
func (kv *EtcdKV) Save(key, value string) error func (kv *etcdKV) Save(key, value string) error
func (kv *EtcdKV) MultiSave(kvs map[string]string) error func (kv *etcdKV) MultiSave(kvs map[string]string) error
func (kv *EtcdKV) RemoveWithPrefix(prefix string) error func (kv *etcdKV) RemoveWithPrefix(prefix string) error
func (kv *EtcdKV) Remove(key string) error func (kv *etcdKV) Remove(key string) error
func (kv *EtcdKV) MultiRemove(keys []string) error func (kv *etcdKV) MultiRemove(keys []string) error
func (kv *EtcdKV) MultiSaveAndRemove(saves map[string]string, removals []string) error func (kv *etcdKV) MultiSaveAndRemove(saves map[string]string, removals []string) error
func (kv *EtcdKV) Watch(key string) clientv3.WatchChan func (kv *etcdKV) Watch(key string) clientv3.WatchChan
func (kv *EtcdKV) WatchWithPrefix(key string) clientv3.WatchChan func (kv *etcdKV) WatchWithPrefix(key string) clientv3.WatchChan
func (kv *EtcdKV) WatchWithRevision(key string, revision int64) clientv3.WatchChan func (kv *etcdKV) WatchWithRevision(key string, revision int64) clientv3.WatchChan
func NewEtcdKV(etcdAddr string, rootPath string) *EtcdKV func NewEtcdKV(etcdAddr string, rootPath string) *etcdKV
``` ```
EtcdKV implements all _TxnKV_ interfaces. etcdKV implements all _TxnKV_ interfaces.
###### A.7.6 Memory KV ###### A.7.6 Memory KV

View File

@ -444,7 +444,7 @@ type Core struct {
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
etcdCli *clientv3.Client etcdCli *clientv3.Client
kvBase *etcdkv.EtcdKV kvBase *etcdkv.etcdKV
//setMsgStreams, send time tick into dd channel and time tick channel //setMsgStreams, send time tick into dd channel and time tick channel
SendTimeTick func(t typeutil.Timestamp) error SendTimeTick func(t typeutil.Timestamp) error
@ -607,7 +607,7 @@ message SegmentIndexInfo {
} }
``` ```
###### 6.6.2 KV pairs in EtcdKV ###### 6.6.2 KV pairs in etcdKV
```go ```go
"proxy/$proxyId" string -> proxyMetaBlob string "proxy/$proxyId" string -> proxyMetaBlob string

View File

@ -42,7 +42,7 @@ func TestEmbedEtcd(te *testing.T) {
param.Init() param.Init()
te.Run("EtcdKV SaveAndLoad", func(t *testing.T) { te.Run("etcdKV SaveAndLoad", func(t *testing.T) {
rootPath := "/etcd/test/root/saveandload" rootPath := "/etcd/test/root/saveandload"
metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, &param.EtcdCfg) metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, &param.EtcdCfg)
require.NoError(te, err) require.NoError(te, err)
@ -146,7 +146,7 @@ func TestEmbedEtcd(te *testing.T) {
} }
}) })
te.Run("EtcdKV SaveAndLoadBytes", func(t *testing.T) { te.Run("etcdKV SaveAndLoadBytes", func(t *testing.T) {
rootPath := "/etcd/test/root/saveandloadbytes" rootPath := "/etcd/test/root/saveandloadbytes"
_metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, &param.EtcdCfg) _metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, &param.EtcdCfg)
metaKv := _metaKv.(*embed_etcd_kv.EmbedEtcdKV) metaKv := _metaKv.(*embed_etcd_kv.EmbedEtcdKV)
@ -257,7 +257,7 @@ func TestEmbedEtcd(te *testing.T) {
} }
}) })
te.Run("EtcdKV LoadBytesWithRevision", func(t *testing.T) { te.Run("etcdKV LoadBytesWithRevision", func(t *testing.T) {
rootPath := "/etcd/test/root/LoadBytesWithRevision" rootPath := "/etcd/test/root/LoadBytesWithRevision"
_metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, &param.EtcdCfg) _metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, &param.EtcdCfg)
metaKv := _metaKv.(*embed_etcd_kv.EmbedEtcdKV) metaKv := _metaKv.(*embed_etcd_kv.EmbedEtcdKV)
@ -303,7 +303,7 @@ func TestEmbedEtcd(te *testing.T) {
}) })
te.Run("EtcdKV MultiSaveAndMultiLoad", func(t *testing.T) { te.Run("etcdKV MultiSaveAndMultiLoad", func(t *testing.T) {
rootPath := "/etcd/test/root/multi_save_and_multi_load" rootPath := "/etcd/test/root/multi_save_and_multi_load"
metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, &param.EtcdCfg) metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, &param.EtcdCfg)
assert.NoError(t, err) assert.NoError(t, err)
@ -412,7 +412,7 @@ func TestEmbedEtcd(te *testing.T) {
assert.Empty(t, vs) assert.Empty(t, vs)
}) })
te.Run("EtcdKV MultiSaveAndMultiLoadBytes", func(t *testing.T) { te.Run("etcdKV MultiSaveAndMultiLoadBytes", func(t *testing.T) {
rootPath := "/etcd/test/root/multi_save_and_multi_load" rootPath := "/etcd/test/root/multi_save_and_multi_load"
_metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, &param.EtcdCfg) _metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, &param.EtcdCfg)
metaKv := _metaKv.(*embed_etcd_kv.EmbedEtcdKV) metaKv := _metaKv.(*embed_etcd_kv.EmbedEtcdKV)
@ -522,7 +522,7 @@ func TestEmbedEtcd(te *testing.T) {
assert.Empty(t, vs) assert.Empty(t, vs)
}) })
te.Run("EtcdKV MultiRemoveWithPrefix", func(t *testing.T) { te.Run("etcdKV MultiRemoveWithPrefix", func(t *testing.T) {
rootPath := "/etcd/test/root/multi_remove_with_prefix" rootPath := "/etcd/test/root/multi_remove_with_prefix"
metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, &param.EtcdCfg) metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, &param.EtcdCfg)
require.NoError(t, err) require.NoError(t, err)
@ -610,7 +610,7 @@ func TestEmbedEtcd(te *testing.T) {
} }
}) })
te.Run("EtcdKV MultiRemoveWithPrefixBytes", func(t *testing.T) { te.Run("etcdKV MultiRemoveWithPrefixBytes", func(t *testing.T) {
rootPath := "/etcd/test/root/multi_remove_with_prefix_bytes" rootPath := "/etcd/test/root/multi_remove_with_prefix_bytes"
_metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, &param.EtcdCfg) _metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, &param.EtcdCfg)
metaKv := _metaKv.(*embed_etcd_kv.EmbedEtcdKV) metaKv := _metaKv.(*embed_etcd_kv.EmbedEtcdKV)
@ -699,7 +699,7 @@ func TestEmbedEtcd(te *testing.T) {
} }
}) })
te.Run("EtcdKV Watch", func(t *testing.T) { te.Run("etcdKV Watch", func(t *testing.T) {
rootPath := "/etcd/test/root/watch" rootPath := "/etcd/test/root/watch"
metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, &param.EtcdCfg) metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, &param.EtcdCfg)
assert.NoError(t, err) assert.NoError(t, err)

View File

@ -41,7 +41,7 @@ func TestEtcdRestartLoad(te *testing.T) {
err := os.RemoveAll(etcdDataDir) err := os.RemoveAll(etcdDataDir)
assert.NoError(te, err) assert.NoError(te, err)
}() }()
te.Run("EtcdKV SaveRestartAndLoad", func(t *testing.T) { te.Run("etcdKV SaveRestartAndLoad", func(t *testing.T) {
rootPath := "/etcd/test/root/saveRestartAndLoad" rootPath := "/etcd/test/root/saveRestartAndLoad"
metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, &param.EtcdCfg) metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, &param.EtcdCfg)
require.NoError(te, err) require.NoError(te, err)

View File

@ -36,15 +36,15 @@ const (
RequestTimeout = 10 * time.Second RequestTimeout = 10 * time.Second
) )
// EtcdKV implements TxnKV interface, it supports to process multiple kvs in a transaction. // etcdKV implements TxnKV interface, it supports to process multiple kvs in a transaction.
type EtcdKV struct { type etcdKV struct {
client *clientv3.Client client *clientv3.Client
rootPath string rootPath string
} }
// NewEtcdKV creates a new etcd kv. // NewEtcdKV creates a new etcd kv.
func NewEtcdKV(client *clientv3.Client, rootPath string) *EtcdKV { func NewEtcdKV(client *clientv3.Client, rootPath string) *etcdKV {
kv := &EtcdKV{ kv := &etcdKV{
client: client, client: client,
rootPath: rootPath, rootPath: rootPath,
} }
@ -52,16 +52,16 @@ func NewEtcdKV(client *clientv3.Client, rootPath string) *EtcdKV {
} }
// Close closes the connection to etcd. // Close closes the connection to etcd.
func (kv *EtcdKV) Close() { func (kv *etcdKV) Close() {
log.Debug("etcd kv closed", zap.String("path", kv.rootPath)) log.Debug("etcd kv closed", zap.String("path", kv.rootPath))
} }
// GetPath returns the path of the key. // GetPath returns the path of the key.
func (kv *EtcdKV) GetPath(key string) string { func (kv *etcdKV) GetPath(key string) string {
return path.Join(kv.rootPath, key) return path.Join(kv.rootPath, key)
} }
func (kv *EtcdKV) WalkWithPrefix(prefix string, paginationSize int, fn func([]byte, []byte) error) error { func (kv *etcdKV) WalkWithPrefix(prefix string, paginationSize int, fn func([]byte, []byte) error) error {
start := time.Now() start := time.Now()
prefix = path.Join(kv.rootPath, prefix) prefix = path.Join(kv.rootPath, prefix)
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
@ -99,7 +99,7 @@ func (kv *EtcdKV) WalkWithPrefix(prefix string, paginationSize int, fn func([]by
} }
// LoadWithPrefix returns all the keys and values with the given key prefix. // LoadWithPrefix returns all the keys and values with the given key prefix.
func (kv *EtcdKV) LoadWithPrefix(key string) ([]string, []string, error) { func (kv *etcdKV) LoadWithPrefix(key string) ([]string, []string, error) {
start := time.Now() start := time.Now()
key = path.Join(kv.rootPath, key) key = path.Join(kv.rootPath, key)
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
@ -120,7 +120,7 @@ func (kv *EtcdKV) LoadWithPrefix(key string) ([]string, []string, error) {
} }
// LoadBytesWithPrefix returns all the keys and values with the given key prefix. // LoadBytesWithPrefix returns all the keys and values with the given key prefix.
func (kv *EtcdKV) LoadBytesWithPrefix(key string) ([]string, [][]byte, error) { func (kv *etcdKV) LoadBytesWithPrefix(key string) ([]string, [][]byte, error) {
start := time.Now() start := time.Now()
key = path.Join(kv.rootPath, key) key = path.Join(kv.rootPath, key)
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
@ -141,7 +141,7 @@ func (kv *EtcdKV) LoadBytesWithPrefix(key string) ([]string, [][]byte, error) {
} }
// LoadBytesWithPrefix2 returns all the the keys,values and key versions with the given key prefix. // LoadBytesWithPrefix2 returns all the the keys,values and key versions with the given key prefix.
func (kv *EtcdKV) LoadBytesWithPrefix2(key string) ([]string, [][]byte, []int64, error) { func (kv *etcdKV) LoadBytesWithPrefix2(key string) ([]string, [][]byte, []int64, error) {
start := time.Now() start := time.Now()
key = path.Join(kv.rootPath, key) key = path.Join(kv.rootPath, key)
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
@ -164,7 +164,7 @@ func (kv *EtcdKV) LoadBytesWithPrefix2(key string) ([]string, [][]byte, []int64,
} }
// Load returns value of the key. // Load returns value of the key.
func (kv *EtcdKV) Load(key string) (string, error) { func (kv *etcdKV) Load(key string) (string, error) {
start := time.Now() start := time.Now()
key = path.Join(kv.rootPath, key) key = path.Join(kv.rootPath, key)
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
@ -181,7 +181,7 @@ func (kv *EtcdKV) Load(key string) (string, error) {
} }
// LoadBytes returns value of the key. // LoadBytes returns value of the key.
func (kv *EtcdKV) LoadBytes(key string) ([]byte, error) { func (kv *etcdKV) LoadBytes(key string) ([]byte, error) {
start := time.Now() start := time.Now()
key = path.Join(kv.rootPath, key) key = path.Join(kv.rootPath, key)
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
@ -198,7 +198,7 @@ func (kv *EtcdKV) LoadBytes(key string) ([]byte, error) {
} }
// MultiLoad gets the values of the keys in a transaction. // MultiLoad gets the values of the keys in a transaction.
func (kv *EtcdKV) MultiLoad(keys []string) ([]string, error) { func (kv *etcdKV) MultiLoad(keys []string) ([]string, error) {
start := time.Now() start := time.Now()
ops := make([]clientv3.Op, 0, len(keys)) ops := make([]clientv3.Op, 0, len(keys))
for _, keyLoad := range keys { for _, keyLoad := range keys {
@ -233,7 +233,7 @@ func (kv *EtcdKV) MultiLoad(keys []string) ([]string, error) {
} }
// MultiLoadBytes gets the values of the keys in a transaction. // MultiLoadBytes gets the values of the keys in a transaction.
func (kv *EtcdKV) MultiLoadBytes(keys []string) ([][]byte, error) { func (kv *etcdKV) MultiLoadBytes(keys []string) ([][]byte, error) {
start := time.Now() start := time.Now()
ops := make([]clientv3.Op, 0, len(keys)) ops := make([]clientv3.Op, 0, len(keys))
for _, keyLoad := range keys { for _, keyLoad := range keys {
@ -268,7 +268,7 @@ func (kv *EtcdKV) MultiLoadBytes(keys []string) ([][]byte, error) {
} }
// LoadBytesWithRevision returns keys, values and revision with given key prefix. // LoadBytesWithRevision returns keys, values and revision with given key prefix.
func (kv *EtcdKV) LoadBytesWithRevision(key string) ([]string, [][]byte, int64, error) { func (kv *etcdKV) LoadBytesWithRevision(key string) ([]string, [][]byte, int64, error) {
start := time.Now() start := time.Now()
key = path.Join(kv.rootPath, key) key = path.Join(kv.rootPath, key)
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
@ -289,7 +289,7 @@ func (kv *EtcdKV) LoadBytesWithRevision(key string) ([]string, [][]byte, int64,
} }
// Save saves the key-value pair. // Save saves the key-value pair.
func (kv *EtcdKV) Save(key, value string) error { func (kv *etcdKV) Save(key, value string) error {
start := time.Now() start := time.Now()
key = path.Join(kv.rootPath, key) key = path.Join(kv.rootPath, key)
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
@ -301,7 +301,7 @@ func (kv *EtcdKV) Save(key, value string) error {
} }
// SaveBytes saves the key-value pair. // SaveBytes saves the key-value pair.
func (kv *EtcdKV) SaveBytes(key string, value []byte) error { func (kv *etcdKV) SaveBytes(key string, value []byte) error {
start := time.Now() start := time.Now()
key = path.Join(kv.rootPath, key) key = path.Join(kv.rootPath, key)
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
@ -313,7 +313,7 @@ func (kv *EtcdKV) SaveBytes(key string, value []byte) error {
} }
// SaveBytesWithLease is a function to put value in etcd with etcd lease options. // SaveBytesWithLease is a function to put value in etcd with etcd lease options.
func (kv *EtcdKV) SaveBytesWithLease(key string, value []byte, id clientv3.LeaseID) error { func (kv *etcdKV) SaveBytesWithLease(key string, value []byte, id clientv3.LeaseID) error {
start := time.Now() start := time.Now()
key = path.Join(kv.rootPath, key) key = path.Join(kv.rootPath, key)
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
@ -325,7 +325,7 @@ func (kv *EtcdKV) SaveBytesWithLease(key string, value []byte, id clientv3.Lease
} }
// MultiSave saves the key-value pairs in a transaction. // MultiSave saves the key-value pairs in a transaction.
func (kv *EtcdKV) MultiSave(kvs map[string]string) error { func (kv *etcdKV) MultiSave(kvs map[string]string) error {
start := time.Now() start := time.Now()
ops := make([]clientv3.Op, 0, len(kvs)) ops := make([]clientv3.Op, 0, len(kvs))
var keys []string var keys []string
@ -347,7 +347,7 @@ func (kv *EtcdKV) MultiSave(kvs map[string]string) error {
} }
// MultiSaveBytes saves the key-value pairs in a transaction. // MultiSaveBytes saves the key-value pairs in a transaction.
func (kv *EtcdKV) MultiSaveBytes(kvs map[string][]byte) error { func (kv *etcdKV) MultiSaveBytes(kvs map[string][]byte) error {
start := time.Now() start := time.Now()
ops := make([]clientv3.Op, 0, len(kvs)) ops := make([]clientv3.Op, 0, len(kvs))
var keys []string var keys []string
@ -369,7 +369,7 @@ func (kv *EtcdKV) MultiSaveBytes(kvs map[string][]byte) error {
} }
// RemoveWithPrefix removes the keys with given prefix. // RemoveWithPrefix removes the keys with given prefix.
func (kv *EtcdKV) RemoveWithPrefix(prefix string) error { func (kv *etcdKV) RemoveWithPrefix(prefix string) error {
start := time.Now() start := time.Now()
key := path.Join(kv.rootPath, prefix) key := path.Join(kv.rootPath, prefix)
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
@ -381,7 +381,7 @@ func (kv *EtcdKV) RemoveWithPrefix(prefix string) error {
} }
// Remove removes the key. // Remove removes the key.
func (kv *EtcdKV) Remove(key string) error { func (kv *etcdKV) Remove(key string) error {
start := time.Now() start := time.Now()
key = path.Join(kv.rootPath, key) key = path.Join(kv.rootPath, key)
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
@ -393,7 +393,7 @@ func (kv *EtcdKV) Remove(key string) error {
} }
// MultiRemove removes the keys in a transaction. // MultiRemove removes the keys in a transaction.
func (kv *EtcdKV) MultiRemove(keys []string) error { func (kv *etcdKV) MultiRemove(keys []string) error {
start := time.Now() start := time.Now()
ops := make([]clientv3.Op, 0, len(keys)) ops := make([]clientv3.Op, 0, len(keys))
for _, key := range keys { for _, key := range keys {
@ -412,7 +412,7 @@ func (kv *EtcdKV) MultiRemove(keys []string) error {
} }
// MultiSaveAndRemove saves the key-value pairs and removes the keys in a transaction. // MultiSaveAndRemove saves the key-value pairs and removes the keys in a transaction.
func (kv *EtcdKV) MultiSaveAndRemove(saves map[string]string, removals []string) error { func (kv *etcdKV) MultiSaveAndRemove(saves map[string]string, removals []string) error {
start := time.Now() start := time.Now()
ops := make([]clientv3.Op, 0, len(saves)+len(removals)) ops := make([]clientv3.Op, 0, len(saves)+len(removals))
var keys []string var keys []string
@ -442,7 +442,7 @@ func (kv *EtcdKV) MultiSaveAndRemove(saves map[string]string, removals []string)
} }
// MultiSaveBytesAndRemove saves the key-value pairs and removes the keys in a transaction. // MultiSaveBytesAndRemove saves the key-value pairs and removes the keys in a transaction.
func (kv *EtcdKV) MultiSaveBytesAndRemove(saves map[string][]byte, removals []string) error { func (kv *etcdKV) MultiSaveBytesAndRemove(saves map[string][]byte, removals []string) error {
start := time.Now() start := time.Now()
ops := make([]clientv3.Op, 0, len(saves)+len(removals)) ops := make([]clientv3.Op, 0, len(saves)+len(removals))
var keys []string var keys []string
@ -472,7 +472,7 @@ func (kv *EtcdKV) MultiSaveBytesAndRemove(saves map[string][]byte, removals []st
} }
// Watch starts watching a key, returns a watch channel. // Watch starts watching a key, returns a watch channel.
func (kv *EtcdKV) Watch(key string) clientv3.WatchChan { func (kv *etcdKV) Watch(key string) clientv3.WatchChan {
start := time.Now() start := time.Now()
key = path.Join(kv.rootPath, key) key = path.Join(kv.rootPath, key)
rch := kv.client.Watch(context.Background(), key, clientv3.WithCreatedNotify()) rch := kv.client.Watch(context.Background(), key, clientv3.WithCreatedNotify())
@ -481,7 +481,7 @@ func (kv *EtcdKV) Watch(key string) clientv3.WatchChan {
} }
// WatchWithPrefix starts watching a key with prefix, returns a watch channel. // WatchWithPrefix starts watching a key with prefix, returns a watch channel.
func (kv *EtcdKV) WatchWithPrefix(key string) clientv3.WatchChan { func (kv *etcdKV) WatchWithPrefix(key string) clientv3.WatchChan {
start := time.Now() start := time.Now()
key = path.Join(kv.rootPath, key) key = path.Join(kv.rootPath, key)
rch := kv.client.Watch(context.Background(), key, clientv3.WithPrefix(), clientv3.WithCreatedNotify()) rch := kv.client.Watch(context.Background(), key, clientv3.WithPrefix(), clientv3.WithCreatedNotify())
@ -490,7 +490,7 @@ func (kv *EtcdKV) WatchWithPrefix(key string) clientv3.WatchChan {
} }
// WatchWithRevision starts watching a key with revision, returns a watch channel. // WatchWithRevision starts watching a key with revision, returns a watch channel.
func (kv *EtcdKV) WatchWithRevision(key string, revision int64) clientv3.WatchChan { func (kv *etcdKV) WatchWithRevision(key string, revision int64) clientv3.WatchChan {
start := time.Now() start := time.Now()
key = path.Join(kv.rootPath, key) key = path.Join(kv.rootPath, key)
rch := kv.client.Watch(context.Background(), key, clientv3.WithPrefix(), clientv3.WithPrevKV(), clientv3.WithRev(revision)) rch := kv.client.Watch(context.Background(), key, clientv3.WithPrefix(), clientv3.WithPrevKV(), clientv3.WithRev(revision))
@ -499,7 +499,7 @@ func (kv *EtcdKV) WatchWithRevision(key string, revision int64) clientv3.WatchCh
} }
// MultiRemoveWithPrefix removes the keys with given prefix. // MultiRemoveWithPrefix removes the keys with given prefix.
func (kv *EtcdKV) MultiRemoveWithPrefix(keys []string) error { func (kv *etcdKV) MultiRemoveWithPrefix(keys []string) error {
start := time.Now() start := time.Now()
ops := make([]clientv3.Op, 0, len(keys)) ops := make([]clientv3.Op, 0, len(keys))
for _, k := range keys { for _, k := range keys {
@ -518,7 +518,7 @@ func (kv *EtcdKV) MultiRemoveWithPrefix(keys []string) error {
} }
// MultiSaveAndRemoveWithPrefix saves kv in @saves and removes the keys with given prefix in @removals. // MultiSaveAndRemoveWithPrefix saves kv in @saves and removes the keys with given prefix in @removals.
func (kv *EtcdKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error { func (kv *etcdKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error {
start := time.Now() start := time.Now()
ops := make([]clientv3.Op, 0, len(saves)) ops := make([]clientv3.Op, 0, len(saves))
var keys []string var keys []string
@ -548,7 +548,7 @@ func (kv *EtcdKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals
} }
// MultiSaveBytesAndRemoveWithPrefix saves kv in @saves and removes the keys with given prefix in @removals. // MultiSaveBytesAndRemoveWithPrefix saves kv in @saves and removes the keys with given prefix in @removals.
func (kv *EtcdKV) MultiSaveBytesAndRemoveWithPrefix(saves map[string][]byte, removals []string) error { func (kv *etcdKV) MultiSaveBytesAndRemoveWithPrefix(saves map[string][]byte, removals []string) error {
start := time.Now() start := time.Now()
ops := make([]clientv3.Op, 0, len(saves)) ops := make([]clientv3.Op, 0, len(saves))
var keys []string var keys []string
@ -579,7 +579,7 @@ func (kv *EtcdKV) MultiSaveBytesAndRemoveWithPrefix(saves map[string][]byte, rem
// CompareVersionAndSwap compares the existing key-value's version with version, and if // CompareVersionAndSwap compares the existing key-value's version with version, and if
// they are equal, the target is stored in etcd. // they are equal, the target is stored in etcd.
func (kv *EtcdKV) CompareVersionAndSwap(key string, source int64, target string, opts ...clientv3.OpOption) (bool, error) { func (kv *etcdKV) CompareVersionAndSwap(key string, source int64, target string, opts ...clientv3.OpOption) (bool, error) {
start := time.Now() start := time.Now()
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel() defer cancel()
@ -598,7 +598,7 @@ func (kv *EtcdKV) CompareVersionAndSwap(key string, source int64, target string,
// CompareVersionAndSwapBytes compares the existing key-value's version with version, and if // CompareVersionAndSwapBytes compares the existing key-value's version with version, and if
// they are equal, the target is stored in etcd. // they are equal, the target is stored in etcd.
func (kv *EtcdKV) CompareVersionAndSwapBytes(key string, source int64, target []byte, opts ...clientv3.OpOption) (bool, error) { func (kv *etcdKV) CompareVersionAndSwapBytes(key string, source int64, target []byte, opts ...clientv3.OpOption) (bool, error) {
start := time.Now() start := time.Now()
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel() defer cancel()

View File

@ -53,7 +53,7 @@ func TestEtcdKV_Load(te *testing.T) {
Params.EtcdCfg.EtcdTLSMinVersion.GetValue()) Params.EtcdCfg.EtcdTLSMinVersion.GetValue())
defer etcdCli.Close() defer etcdCli.Close()
assert.NoError(te, err) assert.NoError(te, err)
te.Run("EtcdKV SaveAndLoad", func(t *testing.T) { te.Run("etcdKV SaveAndLoad", func(t *testing.T) {
rootPath := "/etcd/test/root/saveandload" rootPath := "/etcd/test/root/saveandload"
etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath) etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath)
err = etcdKV.RemoveWithPrefix("") err = etcdKV.RemoveWithPrefix("")
@ -156,7 +156,7 @@ func TestEtcdKV_Load(te *testing.T) {
} }
}) })
te.Run("EtcdKV SaveAndLoadBytes", func(t *testing.T) { te.Run("etcdKV SaveAndLoadBytes", func(t *testing.T) {
rootPath := "/etcd/test/root/saveandloadbytes" rootPath := "/etcd/test/root/saveandloadbytes"
etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath) etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath)
err = etcdKV.RemoveWithPrefix("") err = etcdKV.RemoveWithPrefix("")
@ -273,7 +273,7 @@ func TestEtcdKV_Load(te *testing.T) {
} }
}) })
te.Run("EtcdKV LoadBytesWithRevision", func(t *testing.T) { te.Run("etcdKV LoadBytesWithRevision", func(t *testing.T) {
rootPath := "/etcd/test/root/LoadBytesWithRevision" rootPath := "/etcd/test/root/LoadBytesWithRevision"
etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath) etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath)
@ -321,7 +321,7 @@ func TestEtcdKV_Load(te *testing.T) {
}) })
te.Run("EtcdKV MultiSaveAndMultiLoad", func(t *testing.T) { te.Run("etcdKV MultiSaveAndMultiLoad", func(t *testing.T) {
rootPath := "/etcd/test/root/multi_save_and_multi_load" rootPath := "/etcd/test/root/multi_save_and_multi_load"
etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath) etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath)
@ -429,7 +429,7 @@ func TestEtcdKV_Load(te *testing.T) {
assert.Empty(t, vs) assert.Empty(t, vs)
}) })
te.Run("EtcdKV MultiSaveBytesAndMultiLoadBytes", func(t *testing.T) { te.Run("etcdKV MultiSaveBytesAndMultiLoadBytes", func(t *testing.T) {
rootPath := "/etcd/test/root/multi_save_bytes_and_multi_load_bytes" rootPath := "/etcd/test/root/multi_save_bytes_and_multi_load_bytes"
etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath) etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath)
@ -551,7 +551,7 @@ func TestEtcdKV_Load(te *testing.T) {
assert.Empty(t, vs) assert.Empty(t, vs)
}) })
te.Run("EtcdKV MultiRemoveWithPrefix", func(t *testing.T) { te.Run("etcdKV MultiRemoveWithPrefix", func(t *testing.T) {
rootPath := "/etcd/test/root/multi_remove_with_prefix" rootPath := "/etcd/test/root/multi_remove_with_prefix"
etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath) etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath)
defer etcdKV.Close() defer etcdKV.Close()
@ -637,7 +637,7 @@ func TestEtcdKV_Load(te *testing.T) {
} }
}) })
te.Run("EtcdKV Watch", func(t *testing.T) { te.Run("etcdKV Watch", func(t *testing.T) {
rootPath := "/etcd/test/root/watch" rootPath := "/etcd/test/root/watch"
etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath) etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath)

View File

@ -187,7 +187,7 @@ func Test_binarySearchRecords(t *testing.T) {
func Test_ComposeIsTsKey(t *testing.T) { func Test_ComposeIsTsKey(t *testing.T) {
sep := "_ts" sep := "_ts"
ss, err := NewSuffixSnapshot((*etcdkv.EtcdKV)(nil), sep, "", snapshotPrefix) ss, err := NewSuffixSnapshot(etcdkv.NewEtcdKV(nil, ""), sep, "", snapshotPrefix)
require.Nil(t, err) require.Nil(t, err)
defer ss.Close() defer ss.Close()
@ -227,7 +227,7 @@ func Test_ComposeIsTsKey(t *testing.T) {
func Test_SuffixSnaphotIsTSOfKey(t *testing.T) { func Test_SuffixSnaphotIsTSOfKey(t *testing.T) {
sep := "_ts" sep := "_ts"
ss, err := NewSuffixSnapshot((*etcdkv.EtcdKV)(nil), sep, "", snapshotPrefix) ss, err := NewSuffixSnapshot(etcdkv.NewEtcdKV(nil, ""), sep, "", snapshotPrefix)
require.Nil(t, err) require.Nil(t, err)
defer ss.Close() defer ss.Close()

View File

@ -31,6 +31,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/mq/mqimpl/rocksmq/server" "github.com/milvus-io/milvus/internal/mq/mqimpl/rocksmq/server"
"github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/common"
@ -42,7 +43,7 @@ import (
type fixture struct { type fixture struct {
t *testing.T t *testing.T
etcdKV *etcdkv.EtcdKV kv kv.MetaKv
} }
type parameters struct { type parameters struct {
@ -61,8 +62,8 @@ func (f *fixture) setup() []parameters {
if err != nil { if err != nil {
log.Fatalf("New clientv3 error = %v", err) log.Fatalf("New clientv3 error = %v", err)
} }
f.etcdKV = etcdkv.NewEtcdKV(etcdCli, "/etcd/test/root") f.kv = etcdkv.NewEtcdKV(etcdCli, "/etcd/test/root")
idAllocator := allocator.NewGlobalIDAllocator("dummy", f.etcdKV) idAllocator := allocator.NewGlobalIDAllocator("dummy", f.kv)
_ = idAllocator.Initialize() _ = idAllocator.Initialize()
err = server.InitRmq(rocksdbName, idAllocator) err = server.InitRmq(rocksdbName, idAllocator)
if err != nil { if err != nil {
@ -81,7 +82,7 @@ func (f *fixture) teardown() {
rocksdbName := "/tmp/rocksmq_unittest_" + f.t.Name() rocksdbName := "/tmp/rocksmq_unittest_" + f.t.Name()
server.CloseRocksMQ() server.CloseRocksMQ()
f.etcdKV.Close() f.kv.Close()
_ = os.RemoveAll(rocksdbName) _ = os.RemoveAll(rocksdbName)
_ = os.RemoveAll(rocksdbName + "_meta_kv") _ = os.RemoveAll(rocksdbName + "_meta_kv")
} }
@ -359,7 +360,7 @@ func generateBaseMsg() msgstream.BaseMsg {
/****************************************Rmq test******************************************/ /****************************************Rmq test******************************************/
func initRmq(name string) *etcdkv.EtcdKV { func initRmq(name string) kv.MetaKv {
endpoints := os.Getenv("ETCD_ENDPOINTS") endpoints := os.Getenv("ETCD_ENDPOINTS")
if endpoints == "" { if endpoints == "" {
endpoints = "localhost:2379" endpoints = "localhost:2379"
@ -369,8 +370,8 @@ func initRmq(name string) *etcdkv.EtcdKV {
if err != nil { if err != nil {
log.Fatalf("New clientv3 error = %v", err) log.Fatalf("New clientv3 error = %v", err)
} }
etcdKV := etcdkv.NewEtcdKV(etcdCli, "/etcd/test/root") kv := etcdkv.NewEtcdKV(etcdCli, "/etcd/test/root")
idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV) idAllocator := allocator.NewGlobalIDAllocator("dummy", kv)
_ = idAllocator.Initialize() _ = idAllocator.Initialize()
err = server.InitRmq(name, idAllocator) err = server.InitRmq(name, idAllocator)
@ -378,14 +379,14 @@ func initRmq(name string) *etcdkv.EtcdKV {
if err != nil { if err != nil {
log.Fatalf("InitRmq error = %v", err) log.Fatalf("InitRmq error = %v", err)
} }
return etcdKV return kv
} }
func Close(rocksdbName string, intputStream, outputStream msgstream.MsgStream, etcdKV *etcdkv.EtcdKV) { func Close(rocksdbName string, intputStream, outputStream msgstream.MsgStream, kv kv.MetaKv) {
server.CloseRocksMQ() server.CloseRocksMQ()
intputStream.Close() intputStream.Close()
outputStream.Close() outputStream.Close()
etcdKV.Close() kv.Close()
err := os.RemoveAll(rocksdbName) err := os.RemoveAll(rocksdbName)
_ = os.RemoveAll(rocksdbName + "_meta_kv") _ = os.RemoveAll(rocksdbName + "_meta_kv")
log.Println(err) log.Println(err)
@ -449,14 +450,14 @@ func TestStream_RmqMsgStream_Insert(t *testing.T) {
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Insert, 3)) msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Insert, 3))
rocksdbName := "/tmp/rocksmq_insert" rocksdbName := "/tmp/rocksmq_insert"
etcdKV := initRmq(rocksdbName) kv := initRmq(rocksdbName)
ctx := context.Background() ctx := context.Background()
inputStream, outputStream := initRmqStream(ctx, producerChannels, consumerChannels, consumerGroupName) inputStream, outputStream := initRmqStream(ctx, producerChannels, consumerChannels, consumerGroupName)
err := inputStream.Produce(&msgPack) err := inputStream.Produce(&msgPack)
require.NoErrorf(t, err, fmt.Sprintf("produce error = %v", err)) require.NoErrorf(t, err, fmt.Sprintf("produce error = %v", err))
receiveMsg(ctx, outputStream, len(msgPack.Msgs)) receiveMsg(ctx, outputStream, len(msgPack.Msgs))
Close(rocksdbName, inputStream, outputStream, etcdKV) Close(rocksdbName, inputStream, outputStream, kv)
} }
func TestStream_RmqTtMsgStream_Insert(t *testing.T) { func TestStream_RmqTtMsgStream_Insert(t *testing.T) {
@ -475,7 +476,7 @@ func TestStream_RmqTtMsgStream_Insert(t *testing.T) {
msgPack2.Msgs = append(msgPack2.Msgs, getTimeTickMsg(5)) msgPack2.Msgs = append(msgPack2.Msgs, getTimeTickMsg(5))
rocksdbName := "/tmp/rocksmq_insert_tt" rocksdbName := "/tmp/rocksmq_insert_tt"
etcdKV := initRmq(rocksdbName) kv := initRmq(rocksdbName)
ctx := context.Background() ctx := context.Background()
inputStream, outputStream := initRmqTtStream(ctx, producerChannels, consumerChannels, consumerSubName) inputStream, outputStream := initRmqTtStream(ctx, producerChannels, consumerChannels, consumerSubName)
@ -489,12 +490,12 @@ func TestStream_RmqTtMsgStream_Insert(t *testing.T) {
require.NoErrorf(t, err, fmt.Sprintf("broadcast error = %v", err)) require.NoErrorf(t, err, fmt.Sprintf("broadcast error = %v", err))
receiveMsg(ctx, outputStream, len(msgPack1.Msgs)) receiveMsg(ctx, outputStream, len(msgPack1.Msgs))
Close(rocksdbName, inputStream, outputStream, etcdKV) Close(rocksdbName, inputStream, outputStream, kv)
} }
func TestStream_RmqTtMsgStream_DuplicatedIDs(t *testing.T) { func TestStream_RmqTtMsgStream_DuplicatedIDs(t *testing.T) {
rocksdbName := "/tmp/rocksmq_tt_msg_seek" rocksdbName := "/tmp/rocksmq_tt_msg_seek"
etcdKV := initRmq(rocksdbName) kv := initRmq(rocksdbName)
c1 := funcutil.RandomString(8) c1 := funcutil.RandomString(8)
producerChannels := []string{c1} producerChannels := []string{c1}
@ -549,12 +550,12 @@ func TestStream_RmqTtMsgStream_DuplicatedIDs(t *testing.T) {
assert.Equal(t, commonpb.MsgType_CreateCollection, seekMsg.Msgs[1].Type()) assert.Equal(t, commonpb.MsgType_CreateCollection, seekMsg.Msgs[1].Type())
assert.Equal(t, commonpb.MsgType_CreateCollection, seekMsg.Msgs[2].Type()) assert.Equal(t, commonpb.MsgType_CreateCollection, seekMsg.Msgs[2].Type())
Close(rocksdbName, inputStream, outputStream, etcdKV) Close(rocksdbName, inputStream, outputStream, kv)
} }
func TestStream_RmqTtMsgStream_Seek(t *testing.T) { func TestStream_RmqTtMsgStream_Seek(t *testing.T) {
rocksdbName := "/tmp/rocksmq_tt_msg_seek" rocksdbName := "/tmp/rocksmq_tt_msg_seek"
etcdKV := initRmq(rocksdbName) kv := initRmq(rocksdbName)
c1 := funcutil.RandomString(8) c1 := funcutil.RandomString(8)
producerChannels := []string{c1} producerChannels := []string{c1}
@ -661,12 +662,12 @@ func TestStream_RmqTtMsgStream_Seek(t *testing.T) {
assert.Equal(t, msg.BeginTs(), uint64(19)) assert.Equal(t, msg.BeginTs(), uint64(19))
} }
Close(rocksdbName, inputStream, outputStream, etcdKV) Close(rocksdbName, inputStream, outputStream, kv)
} }
func TestStream_RMqMsgStream_SeekInvalidMessage(t *testing.T) { func TestStream_RMqMsgStream_SeekInvalidMessage(t *testing.T) {
rocksdbName := "/tmp/rocksmq_tt_msg_seekInvalid" rocksdbName := "/tmp/rocksmq_tt_msg_seekInvalid"
etcdKV := initRmq(rocksdbName) kv := initRmq(rocksdbName)
c := funcutil.RandomString(8) c := funcutil.RandomString(8)
producerChannels := []string{c} producerChannels := []string{c}
consumerChannels := []string{c} consumerChannels := []string{c}
@ -720,7 +721,7 @@ func TestStream_RMqMsgStream_SeekInvalidMessage(t *testing.T) {
result := consumer(ctx, outputStream2) result := consumer(ctx, outputStream2)
assert.Equal(t, result.Msgs[0].ID(), int64(1)) assert.Equal(t, result.Msgs[0].ID(), int64(1))
Close(rocksdbName, inputStream, outputStream2, etcdKV) Close(rocksdbName, inputStream, outputStream2, kv)
} }
func TestStream_RmqTtMsgStream_AsConsumerWithPosition(t *testing.T) { func TestStream_RmqTtMsgStream_AsConsumerWithPosition(t *testing.T) {
@ -729,7 +730,7 @@ func TestStream_RmqTtMsgStream_AsConsumerWithPosition(t *testing.T) {
consumerSubName := "subInsert" consumerSubName := "subInsert"
rocksdbName := "/tmp/rocksmq_asconsumer_withpos" rocksdbName := "/tmp/rocksmq_asconsumer_withpos"
etcdKV := initRmq(rocksdbName) kv := initRmq(rocksdbName)
factory := msgstream.ProtoUDFactory{} factory := msgstream.ProtoUDFactory{}
rmqClient, _ := NewClientWithDefaultOptions() rmqClient, _ := NewClientWithDefaultOptions()
@ -755,7 +756,7 @@ func TestStream_RmqTtMsgStream_AsConsumerWithPosition(t *testing.T) {
assert.Equal(t, 1, len(pack.Msgs)) assert.Equal(t, 1, len(pack.Msgs))
assert.EqualValues(t, 1000, pack.Msgs[0].BeginTs()) assert.EqualValues(t, 1000, pack.Msgs[0].BeginTs())
Close(rocksdbName, inputStream, outputStream, etcdKV) Close(rocksdbName, inputStream, outputStream, kv)
} }
func getTimeTickMsgPack(reqID msgstream.UniqueID) *msgstream.MsgPack { func getTimeTickMsgPack(reqID msgstream.UniqueID) *msgstream.MsgPack {

View File

@ -22,6 +22,7 @@ import (
mock "github.com/stretchr/testify/mock" mock "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/querypb"
@ -36,7 +37,7 @@ import (
type RowCountBasedBalancerTestSuite struct { type RowCountBasedBalancerTestSuite struct {
suite.Suite suite.Suite
balancer *RowCountBasedBalancer balancer *RowCountBasedBalancer
kv *etcdkv.EtcdKV kv kv.MetaKv
broker *meta.MockBroker broker *meta.MockBroker
mockScheduler *task.MockScheduler mockScheduler *task.MockScheduler
} }

View File

@ -21,6 +21,7 @@ import (
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/querypb"
@ -35,7 +36,7 @@ import (
type ScoreBasedBalancerTestSuite struct { type ScoreBasedBalancerTestSuite struct {
suite.Suite suite.Suite
balancer *ScoreBasedBalancer balancer *ScoreBasedBalancer
kv *etcdkv.EtcdKV kv kv.MetaKv
broker *meta.MockBroker broker *meta.MockBroker
mockScheduler *task.MockScheduler mockScheduler *task.MockScheduler
} }

View File

@ -20,6 +20,7 @@ import (
"context" "context"
"testing" "testing"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/balance" "github.com/milvus-io/milvus/internal/querycoordv2/balance"
@ -37,7 +38,7 @@ import (
type BalanceCheckerTestSuite struct { type BalanceCheckerTestSuite struct {
suite.Suite suite.Suite
kv *etcdkv.EtcdKV kv kv.MetaKv
checker *BalanceChecker checker *BalanceChecker
balancer *balance.MockBalancer balancer *balance.MockBalancer
meta *meta.Meta meta *meta.Meta

View File

@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/querycoordv2/balance" "github.com/milvus-io/milvus/internal/querycoordv2/balance"
@ -36,7 +37,7 @@ import (
type ChannelCheckerTestSuite struct { type ChannelCheckerTestSuite struct {
suite.Suite suite.Suite
kv *etcdkv.EtcdKV kv kv.MetaKv
checker *ChannelChecker checker *ChannelChecker
meta *meta.Meta meta *meta.Meta
broker *meta.MockBroker broker *meta.MockBroker

View File

@ -25,6 +25,7 @@ import (
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/querycoordv2/balance" "github.com/milvus-io/milvus/internal/querycoordv2/balance"
@ -38,7 +39,7 @@ import (
type SegmentCheckerTestSuite struct { type SegmentCheckerTestSuite struct {
suite.Suite suite.Suite
kv *etcdkv.EtcdKV kv kv.MetaKv
checker *SegmentChecker checker *SegmentChecker
meta *meta.Meta meta *meta.Meta
broker *meta.MockBroker broker *meta.MockBroker

View File

@ -25,6 +25,7 @@ import (
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"go.uber.org/atomic" "go.uber.org/atomic"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/meta"
@ -42,7 +43,7 @@ type DistControllerTestSuite struct {
mockCluster *session.MockCluster mockCluster *session.MockCluster
mockScheduler *task.MockScheduler mockScheduler *task.MockScheduler
kv *etcdkv.EtcdKV kv kv.MetaKv
meta *meta.Meta meta *meta.Meta
broker *meta.MockBroker broker *meta.MockBroker
} }

View File

@ -22,6 +22,7 @@ import (
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/querypb"
. "github.com/milvus-io/milvus/internal/querycoordv2/params" . "github.com/milvus-io/milvus/internal/querycoordv2/params"
@ -33,7 +34,7 @@ import (
type ResourceManagerSuite struct { type ResourceManagerSuite struct {
suite.Suite suite.Suite
kv *etcdkv.EtcdKV kv kv.MetaKv
manager *ResourceManager manager *ResourceManager
} }

View File

@ -26,6 +26,7 @@ import (
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/querypb"
@ -47,7 +48,7 @@ type TargetManagerSuite struct {
allChannels []string allChannels []string
allSegments []int64 allSegments []int64
kv *etcdkv.EtcdKV kv kv.MetaKv
meta *Meta meta *Meta
broker *MockBroker broker *MockBroker
// Test object // Test object

View File

@ -27,6 +27,7 @@ import (
"go.uber.org/atomic" "go.uber.org/atomic"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/querypb"
@ -40,7 +41,7 @@ import (
type LeaderObserverTestSuite struct { type LeaderObserverTestSuite struct {
suite.Suite suite.Suite
observer *LeaderObserver observer *LeaderObserver
kv *etcdkv.EtcdKV kv kv.MetaKv
mockCluster *session.MockCluster mockCluster *session.MockCluster
meta *meta.Meta meta *meta.Meta

View File

@ -22,6 +22,7 @@ import (
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/meta"
@ -36,7 +37,7 @@ import (
type ReplicaObserverSuite struct { type ReplicaObserverSuite struct {
suite.Suite suite.Suite
kv *etcdkv.EtcdKV kv kv.MetaKv
//dependency //dependency
meta *meta.Meta meta *meta.Meta
distMgr *meta.DistributionManager distMgr *meta.DistributionManager

View File

@ -24,6 +24,7 @@ import (
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus/internal/kv"
etcdKV "github.com/milvus-io/milvus/internal/kv/etcd" etcdKV "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/meta"
@ -38,7 +39,7 @@ import (
type ResourceObserverSuite struct { type ResourceObserverSuite struct {
suite.Suite suite.Suite
kv *etcdKV.EtcdKV kv kv.MetaKv
//dependency //dependency
store *meta.MockStore store *meta.MockStore
meta *meta.Meta meta *meta.Meta

View File

@ -24,6 +24,7 @@ import (
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/querypb"
@ -38,7 +39,7 @@ import (
type TargetObserverSuite struct { type TargetObserverSuite struct {
suite.Suite suite.Suite
kv *etcdkv.EtcdKV kv kv.MetaKv
//dependency //dependency
meta *meta.Meta meta *meta.Meta
targetMgr *meta.TargetManager targetMgr *meta.TargetManager

View File

@ -21,10 +21,11 @@ import (
clientv3 "go.etcd.io/etcd/client/v3" clientv3 "go.etcd.io/etcd/client/v3"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
) )
// NewTSOKVBase returns a etcdkv.EtcdKV object // NewTSOKVBase returns a kv.TxnKV object
func NewTSOKVBase(client *clientv3.Client, tsoRoot, subPath string) *etcdkv.EtcdKV { func NewTSOKVBase(client *clientv3.Client, tsoRoot, subPath string) kv.TxnKV {
return etcdkv.NewEtcdKV(client, path.Join(tsoRoot, subPath)) return etcdkv.NewEtcdKV(client, path.Join(tsoRoot, subPath))
} }