clean up meta interface (#24729)

Signed-off-by: yiwangdr <yiwangdr@gmail.com>
This commit is contained in:
yiwangdr 2023-06-07 23:18:35 -07:00 committed by GitHub
parent 691809065f
commit 37c02c9927
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 3 additions and 747 deletions

View File

@ -426,15 +426,9 @@ type MetaKv interface {
TxnKV
GetPath(key string) string
LoadWithPrefix(key string) ([]string, []string, error)
LoadWithPrefix2(key string) ([]string, []string, []int64, error)
LoadWithRevision(key string) ([]string, []string, int64, error)
Watch(key string) clientv3.WatchChan
WatchWithPrefix(key string) clientv3.WatchChan
WatchWithRevision(key string, revision int64) clientv3.WatchChan
SaveWithLease(key, value string, id clientv3.LeaseID) error
Grant(ttl int64) (id clientv3.LeaseID, err error)
KeepAlive(id clientv3.LeaseID) (<-chan *clientv3.LeaseKeepAliveResponse, error)
CompareValueAndSwap(key, value, target string, opts ...clientv3.OpOption) error
CompareVersionAndSwap(key string, version int64, target string, opts ...clientv3.OpOption) error
}

View File

@ -67,18 +67,6 @@ func (mm *metaMemoryKV) GetPath(key string) string {
panic("implement me")
}
func (mm *metaMemoryKV) LoadWithPrefix2(key string) ([]string, []string, []int64, error) {
panic("implement me")
}
func (mm *metaMemoryKV) LoadWithRevisionAndVersions(key string) ([]string, []string, []int64, int64, error) {
panic("implement me")
}
func (mm *metaMemoryKV) LoadWithRevision(key string) ([]string, []string, int64, error) {
panic("implement me")
}
func (mm *metaMemoryKV) Watch(key string) clientv3.WatchChan {
panic("implement me")
}
@ -91,26 +79,6 @@ func (mm *metaMemoryKV) WatchWithRevision(key string, revision int64) clientv3.W
panic("implement me")
}
func (mm *metaMemoryKV) SaveWithLease(key, value string, id clientv3.LeaseID) error {
panic("implement me")
}
func (mm *metaMemoryKV) SaveWithIgnoreLease(key, value string) error {
panic("implement me")
}
func (mm *metaMemoryKV) Grant(ttl int64) (id clientv3.LeaseID, err error) {
panic("implement me")
}
func (mm *metaMemoryKV) KeepAlive(id clientv3.LeaseID) (<-chan *clientv3.LeaseKeepAliveResponse, error) {
panic("implement me")
}
func (mm *metaMemoryKV) CompareValueAndSwap(key, value, target string, opts ...clientv3.OpOption) (bool, error) {
panic("implement me")
}
func (mm *metaMemoryKV) CompareVersionAndSwap(key string, version int64, target string, opts ...clientv3.OpOption) (bool, error) {
panic("implement me")
}

View File

@ -38,6 +38,7 @@ import (
datanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client"
indexnodeclient "github.com/milvus-io/milvus/internal/distributed/indexnode/client"
rootcoordclient "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
"github.com/milvus-io/milvus/internal/proto/datapb"
@ -105,7 +106,7 @@ type Server struct {
etcdCli *clientv3.Client
address string
kvClient *etcdkv.EtcdKV
kvClient kv.MetaKv
meta *meta
segmentManager Manager
allocator allocator

View File

@ -134,10 +134,6 @@ package indexnode
// loadWithPrefix2 func(key string) ([]string, []string, []int64, error)
// }
// func (mk *mockETCDKV) LoadWithPrefix2(key string) ([]string, []string, []int64, error) {
// return mk.loadWithPrefix2(key)
// }
// func TestIndexBuildTask_loadIndexMeta(t *testing.T) {
// t.Run("load empty meta", func(t *testing.T) {
// indexTask := &IndexBuildTask{

View File

@ -159,49 +159,6 @@ func (kv *EmbedEtcdKV) LoadBytesWithPrefix(key string) ([]string, [][]byte, erro
return keys, values, nil
}
// LoadWithPrefix2 returns all the keys and values with versions by the given key prefix
func (kv *EmbedEtcdKV) LoadWithPrefix2(key string) ([]string, []string, []int64, error) {
key = path.Join(kv.rootPath, key)
log.Debug("LoadWithPrefix ", zap.String("prefix", key))
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
resp, err := kv.client.Get(ctx, key, clientv3.WithPrefix(),
clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend))
if err != nil {
return nil, nil, nil, err
}
keys := make([]string, 0, resp.Count)
values := make([]string, 0, resp.Count)
versions := make([]int64, 0, resp.Count)
for _, kv := range resp.Kvs {
keys = append(keys, string(kv.Key))
values = append(values, string(kv.Value))
versions = append(versions, kv.Version)
}
return keys, values, versions, nil
}
func (kv *EmbedEtcdKV) LoadWithRevisionAndVersions(key string) ([]string, []string, []int64, int64, error) {
key = path.Join(kv.rootPath, key)
log.Debug("LoadWithPrefix ", zap.String("prefix", key))
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
resp, err := kv.client.Get(ctx, key, clientv3.WithPrefix(),
clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend))
if err != nil {
return nil, nil, nil, 0, err
}
keys := make([]string, 0, resp.Count)
values := make([]string, 0, resp.Count)
versions := make([]int64, 0, resp.Count)
for _, kv := range resp.Kvs {
keys = append(keys, string(kv.Key))
values = append(values, string(kv.Value))
versions = append(versions, kv.Version)
}
return keys, values, versions, resp.Header.Revision, nil
}
// LoadBytesWithPrefix2 returns all the keys and values with versions by the given key prefix
func (kv *EmbedEtcdKV) LoadBytesWithPrefix2(key string) ([]string, [][]byte, []int64, error) {
key = path.Join(kv.rootPath, key)
@ -328,26 +285,6 @@ func (kv *EmbedEtcdKV) MultiLoadBytes(keys []string) ([][]byte, error) {
return result, nil
}
// LoadWithRevision returns keys, values and revision with given key prefix.
func (kv *EmbedEtcdKV) LoadWithRevision(key string) ([]string, []string, int64, error) {
key = path.Join(kv.rootPath, key)
log.Debug("LoadWithPrefix ", zap.String("prefix", key))
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
resp, err := kv.client.Get(ctx, key, clientv3.WithPrefix(),
clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend))
if err != nil {
return nil, nil, 0, err
}
keys := make([]string, 0, resp.Count)
values := make([]string, 0, resp.Count)
for _, kv := range resp.Kvs {
keys = append(keys, string(kv.Key))
values = append(values, string(kv.Value))
}
return keys, values, resp.Header.Revision, nil
}
// LoadBytesWithRevision returns keys, values and revision with given key prefix.
func (kv *EmbedEtcdKV) LoadBytesWithRevision(key string) ([]string, [][]byte, int64, error) {
key = path.Join(kv.rootPath, key)
@ -386,26 +323,6 @@ func (kv *EmbedEtcdKV) SaveBytes(key string, value []byte) error {
return err
}
// SaveWithLease is a function to put value in etcd with etcd lease options.
func (kv *EmbedEtcdKV) SaveWithLease(key, value string, id clientv3.LeaseID) error {
log.Debug("Embedded Etcd saving with lease", zap.String("etcd_key", key))
key = path.Join(kv.rootPath, key)
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
_, err := kv.client.Put(ctx, key, value, clientv3.WithLease(id))
return err
}
// SaveWithIgnoreLease updates the key without changing its current lease.
func (kv *EmbedEtcdKV) SaveWithIgnoreLease(key, value string) error {
log.Debug("Embedded Etcd saving with ignore lease", zap.String("etcd_key", key))
key = path.Join(kv.rootPath, key)
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
_, err := kv.client.Put(ctx, key, value, clientv3.WithIgnoreLease())
return err
}
// SaveBytesWithLease is a function to put value in etcd with etcd lease options.
func (kv *EmbedEtcdKV) SaveBytesWithLease(key string, value []byte, id clientv3.LeaseID) error {
key = path.Join(kv.rootPath, key)
@ -580,56 +497,6 @@ func (kv *EmbedEtcdKV) MultiSaveBytesAndRemoveWithPrefix(saves map[string][]byte
return err
}
// Grant creates a new lease implemented in etcd grant interface.
func (kv *EmbedEtcdKV) Grant(ttl int64) (id clientv3.LeaseID, err error) {
resp, err := kv.client.Grant(context.Background(), ttl)
return resp.ID, err
}
// KeepAlive keeps the lease alive forever with leaseID.
// Implemented in etcd interface.
func (kv *EmbedEtcdKV) KeepAlive(id clientv3.LeaseID) (<-chan *clientv3.LeaseKeepAliveResponse, error) {
ch, err := kv.client.KeepAlive(context.Background(), id)
if err != nil {
return nil, err
}
return ch, nil
}
// CompareValueAndSwap compares the existing value with compare, and if they are
// equal, the target is stored in etcd.
func (kv *EmbedEtcdKV) CompareValueAndSwap(key, value, target string, opts ...clientv3.OpOption) (bool, error) {
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
resp, err := kv.client.Txn(ctx).If(
clientv3.Compare(
clientv3.Value(path.Join(kv.rootPath, key)),
"=",
value)).
Then(clientv3.OpPut(path.Join(kv.rootPath, key), target, opts...)).Commit()
if err != nil {
return false, err
}
return resp.Succeeded, nil
}
// CompareValueAndSwapBytes compares the existing value with compare, and if they are
// equal, the target is stored in etcd.
func (kv *EmbedEtcdKV) CompareValueAndSwapBytes(key string, value, target []byte, opts ...clientv3.OpOption) (bool, error) {
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
resp, err := kv.client.Txn(ctx).If(
clientv3.Compare(
clientv3.Value(path.Join(kv.rootPath, key)),
"=",
string(value))).
Then(clientv3.OpPut(path.Join(kv.rootPath, key), string(target), opts...)).Commit()
if err != nil {
return false, err
}
return resp.Succeeded, nil
}
// CompareVersionAndSwap compares the existing key-value's version with version, and if
// they are equal, the target is stored in etcd.
func (kv *EmbedEtcdKV) CompareVersionAndSwap(key string, version int64, target string, opts ...clientv3.OpOption) (bool, error) {

View File

@ -20,12 +20,10 @@ import (
"fmt"
"sort"
"testing"
"time"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
clientv3 "go.etcd.io/etcd/client/v3"
"golang.org/x/exp/maps"
embed_etcd_kv "github.com/milvus-io/milvus/internal/kv/etcd"
@ -122,19 +120,6 @@ func TestEmbedEtcd(te *testing.T) {
assert.ElementsMatch(t, test.expectedKeys, actualKeys)
assert.ElementsMatch(t, test.expectedValues, actualValues)
assert.Equal(t, test.expectedError, err)
actualKeys, actualValues, versions, err := metaKv.LoadWithPrefix2(test.prefix)
assert.ElementsMatch(t, test.expectedKeys, actualKeys)
assert.ElementsMatch(t, test.expectedValues, actualValues)
assert.NotZero(t, versions)
assert.Equal(t, test.expectedError, err)
actualKeys, actualValues, versions, revision, err := metaKv.LoadWithRevisionAndVersions(test.prefix)
assert.ElementsMatch(t, test.expectedKeys, actualKeys)
assert.ElementsMatch(t, test.expectedValues, actualValues)
assert.NotZero(t, versions)
assert.NotZero(t, revision)
assert.Equal(t, test.expectedError, err)
}
removeTests := []struct {
@ -272,51 +257,6 @@ func TestEmbedEtcd(te *testing.T) {
}
})
te.Run("EtcdKV LoadWithRevision", func(t *testing.T) {
rootPath := "/etcd/test/root/LoadWithRevision"
metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, &param.EtcdCfg)
assert.Nil(t, err)
defer metaKv.Close()
defer metaKv.RemoveWithPrefix("")
prepareKV := []struct {
inKey string
inValue string
}{
{"a", "a_version1"},
{"b", "b_version2"},
{"a", "a_version3"},
{"c", "c_version4"},
{"a/suba", "a_version5"},
}
for _, test := range prepareKV {
err = metaKv.Save(test.inKey, test.inValue)
require.NoError(t, err)
}
loadWithRevisionTests := []struct {
inKey string
expectedKeyNo int
expectedValues []string
}{
{"a", 2, []string{"a_version3", "a_version5"}},
{"b", 1, []string{"b_version2"}},
{"c", 1, []string{"c_version4"}},
}
for _, test := range loadWithRevisionTests {
keys, values, revision, err := metaKv.LoadWithRevision(test.inKey)
assert.NoError(t, err)
assert.Equal(t, test.expectedKeyNo, len(keys))
assert.ElementsMatch(t, test.expectedValues, values)
assert.NotZero(t, revision)
}
})
te.Run("EtcdKV LoadBytesWithRevision", func(t *testing.T) {
rootPath := "/etcd/test/root/LoadBytesWithRevision"
_metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, &param.EtcdCfg)
@ -776,61 +716,6 @@ func TestEmbedEtcd(te *testing.T) {
assert.True(t, resp.Created)
})
te.Run("Etcd Revision", func(t *testing.T) {
rootPath := "/etcd/test/root/watch"
metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, &param.EtcdCfg)
assert.Nil(t, err)
defer metaKv.Close()
defer metaKv.RemoveWithPrefix("")
revisionTests := []struct {
inKey string
fistValue string
secondValue string
}{
{"a", "v1", "v11"},
{"y", "v2", "v22"},
{"z", "v3", "v33"},
}
for _, test := range revisionTests {
err = metaKv.Save(test.inKey, test.fistValue)
require.NoError(t, err)
_, _, revision, _ := metaKv.LoadWithRevision(test.inKey)
ch := metaKv.WatchWithRevision(test.inKey, revision+1)
err = metaKv.Save(test.inKey, test.secondValue)
require.NoError(t, err)
resp := <-ch
assert.Equal(t, 1, len(resp.Events))
assert.Equal(t, test.secondValue, string(resp.Events[0].Kv.Value))
assert.Equal(t, revision+1, resp.Header.Revision)
}
success, err := metaKv.CompareVersionAndSwap("a/b/c", 0, "1")
assert.NoError(t, err)
assert.True(t, success)
value, err := metaKv.Load("a/b/c")
assert.NoError(t, err)
assert.Equal(t, value, "1")
success, err = metaKv.CompareVersionAndSwap("a/b/c", 0, "1")
assert.NoError(t, err)
assert.False(t, success)
success, err = metaKv.CompareValueAndSwap("a/b/c", "1", "2")
assert.True(t, success)
assert.NoError(t, err)
success, err = metaKv.CompareValueAndSwap("a/b/c", "1", "2")
assert.NoError(t, err)
assert.False(t, success)
})
te.Run("Etcd Revision Bytes", func(t *testing.T) {
rootPath := "/etcd/test/root/revision_bytes"
_metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, &param.EtcdCfg)
@ -877,86 +762,6 @@ func TestEmbedEtcd(te *testing.T) {
success, err = metaKv.CompareVersionAndSwapBytes("a/b/c", 0, []byte("1"))
assert.NoError(t, err)
assert.False(t, success)
success, err = metaKv.CompareValueAndSwapBytes("a/b/c", []byte("1"), []byte("2"))
assert.True(t, success)
assert.NoError(t, err)
success, err = metaKv.CompareValueAndSwapBytes("a/b/c", []byte("1"), []byte("2"))
assert.NoError(t, err)
assert.False(t, success)
})
te.Run("Etcd Lease", func(t *testing.T) {
rootPath := "/etcd/test/root/lease"
metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, &param.EtcdCfg)
assert.Nil(t, err)
defer metaKv.Close()
defer metaKv.RemoveWithPrefix("")
leaseID, err := metaKv.Grant(10)
assert.NoError(t, err)
metaKv.KeepAlive(leaseID)
tests := map[string]string{
"a/b": "v1",
"a/b/c": "v2",
"x": "v3",
}
for k, v := range tests {
// SaveWithIgnoreLease must be used when the key already exists.
err = metaKv.SaveWithIgnoreLease(k, v)
assert.Error(t, err)
err = metaKv.SaveWithLease(k, v, leaseID)
assert.NoError(t, err)
err = metaKv.SaveWithLease(k, v, clientv3.LeaseID(999))
assert.Error(t, err)
}
})
te.Run("Etcd Lease Ignore", func(t *testing.T) {
rootPath := "/etcd/test/root/lease_ignore"
metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, &param.EtcdCfg)
assert.Nil(t, err)
defer metaKv.Close()
defer metaKv.RemoveWithPrefix("")
tests := map[string]string{
"a/b": "v1",
"a/b/c": "v2",
"x": "v3",
}
for k, v := range tests {
leaseID, err := metaKv.Grant(1)
assert.NoError(t, err)
err = metaKv.SaveWithLease(k, v, leaseID)
assert.NoError(t, err)
err = metaKv.SaveWithIgnoreLease(k, "updated_"+v)
assert.NoError(t, err)
// Record should be updated correctly.
value, err := metaKv.Load(k)
assert.NoError(t, err)
assert.Equal(t, "updated_"+v, value)
// Let the lease expire. 3 seconds should be pretty safe.
time.Sleep(3 * time.Second)
// Updated record should still expire with lease.
_, err = metaKv.Load(k)
assert.Error(t, err)
}
})
te.Run("Etcd WalkWithPagination", func(t *testing.T) {

View File

@ -140,51 +140,6 @@ func (kv *EtcdKV) LoadBytesWithPrefix(key string) ([]string, [][]byte, error) {
return keys, values, nil
}
// LoadWithPrefix2 returns all the the keys,values and key versions with the given key prefix.
func (kv *EtcdKV) LoadWithPrefix2(key string) ([]string, []string, []int64, error) {
start := time.Now()
key = path.Join(kv.rootPath, key)
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
resp, err := kv.client.Get(ctx, key, clientv3.WithPrefix(),
clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend))
if err != nil {
return nil, nil, nil, err
}
keys := make([]string, 0, resp.Count)
values := make([]string, 0, resp.Count)
versions := make([]int64, 0, resp.Count)
for _, kv := range resp.Kvs {
keys = append(keys, string(kv.Key))
values = append(values, string(kv.Value))
versions = append(versions, kv.Version)
}
CheckElapseAndWarn(start, "Slow etcd operation load with prefix2", zap.Strings("keys", keys))
return keys, values, versions, nil
}
func (kv *EtcdKV) LoadWithRevisionAndVersions(key string) ([]string, []string, []int64, int64, error) {
start := time.Now()
key = path.Join(kv.rootPath, key)
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
resp, err := kv.client.Get(ctx, key, clientv3.WithPrefix(),
clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend))
if err != nil {
return nil, nil, nil, 0, err
}
keys := make([]string, 0, resp.Count)
values := make([]string, 0, resp.Count)
versions := make([]int64, 0, resp.Count)
for _, kv := range resp.Kvs {
keys = append(keys, string(kv.Key))
values = append(values, string(kv.Value))
versions = append(versions, kv.Version)
}
CheckElapseAndWarn(start, "Slow etcd operation load with prefix2", zap.Strings("keys", keys))
return keys, values, versions, resp.Header.Revision, nil
}
// LoadBytesWithPrefix2 returns all the the keys,values and key versions with the given key prefix.
func (kv *EtcdKV) LoadBytesWithPrefix2(key string) ([]string, [][]byte, []int64, error) {
start := time.Now()
@ -312,27 +267,6 @@ func (kv *EtcdKV) MultiLoadBytes(keys []string) ([][]byte, error) {
return result, nil
}
// LoadWithRevision returns keys, values and revision with given key prefix.
func (kv *EtcdKV) LoadWithRevision(key string) ([]string, []string, int64, error) {
start := time.Now()
key = path.Join(kv.rootPath, key)
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
resp, err := kv.client.Get(ctx, key, clientv3.WithPrefix(),
clientv3.WithSort(clientv3.SortByCreateRevision, clientv3.SortAscend))
if err != nil {
return nil, nil, 0, err
}
keys := make([]string, 0, resp.Count)
values := make([]string, 0, resp.Count)
for _, kv := range resp.Kvs {
keys = append(keys, string(kv.Key))
values = append(values, string(kv.Value))
}
CheckElapseAndWarn(start, "Slow etcd operation load with revision", zap.Strings("keys", keys))
return keys, values, resp.Header.Revision, nil
}
// LoadBytesWithRevision returns keys, values and revision with given key prefix.
func (kv *EtcdKV) LoadBytesWithRevision(key string) ([]string, [][]byte, int64, error) {
start := time.Now()
@ -378,32 +312,6 @@ func (kv *EtcdKV) SaveBytes(key string, value []byte) error {
return err
}
// SaveWithLease is a function to put value in etcd with etcd lease options.
func (kv *EtcdKV) SaveWithLease(key, value string, id clientv3.LeaseID) error {
log.Debug("Etcd saving with lease", zap.String("etcd_key", key))
start := time.Now()
key = path.Join(kv.rootPath, key)
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
CheckValueSizeAndWarn(key, value)
_, err := kv.client.Put(ctx, key, value, clientv3.WithLease(id))
CheckElapseAndWarn(start, "Slow etcd operation save with lease", zap.String("key", key))
return err
}
// SaveWithIgnoreLease updates the key without changing its current lease. Must be used when key already exists.
func (kv *EtcdKV) SaveWithIgnoreLease(key, value string) error {
log.Debug("Etcd saving with ignore lease", zap.String("etcd_key", key))
start := time.Now()
key = path.Join(kv.rootPath, key)
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
CheckValueSizeAndWarn(key, value)
_, err := kv.client.Put(ctx, key, value, clientv3.WithIgnoreLease())
CheckElapseAndWarn(start, "Slow etcd operation save with lease", zap.String("key", key))
return err
}
// SaveBytesWithLease is a function to put value in etcd with etcd lease options.
func (kv *EtcdKV) SaveBytesWithLease(key string, value []byte, id clientv3.LeaseID) error {
start := time.Now()
@ -669,64 +577,6 @@ func (kv *EtcdKV) MultiSaveBytesAndRemoveWithPrefix(saves map[string][]byte, rem
return err
}
// Grant creates a new lease implemented in etcd grant interface.
func (kv *EtcdKV) Grant(ttl int64) (id clientv3.LeaseID, err error) {
start := time.Now()
resp, err := kv.client.Grant(context.Background(), ttl)
CheckElapseAndWarn(start, "Slow etcd operation grant")
return resp.ID, err
}
// KeepAlive keeps the lease alive forever with leaseID.
// Implemented in etcd interface.
func (kv *EtcdKV) KeepAlive(id clientv3.LeaseID) (<-chan *clientv3.LeaseKeepAliveResponse, error) {
start := time.Now()
ch, err := kv.client.KeepAlive(context.Background(), id)
if err != nil {
return nil, err
}
CheckElapseAndWarn(start, "Slow etcd operation keepAlive")
return ch, nil
}
// CompareValueAndSwap compares the existing value with compare, and if they are
// equal, the target is stored in etcd.
func (kv *EtcdKV) CompareValueAndSwap(key, value, target string, opts ...clientv3.OpOption) (bool, error) {
start := time.Now()
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
resp, err := kv.client.Txn(ctx).If(
clientv3.Compare(
clientv3.Value(path.Join(kv.rootPath, key)),
"=",
value)).
Then(clientv3.OpPut(path.Join(kv.rootPath, key), target, opts...)).Commit()
if err != nil {
return false, err
}
CheckElapseAndWarn(start, "Slow etcd operation compare value and swap", zap.String("key", key))
return resp.Succeeded, nil
}
// CompareValueAndSwapBytes compares the existing value with compare, and if they are
// equal, the target is stored in etcd.
func (kv *EtcdKV) CompareValueAndSwapBytes(key string, value, target []byte, opts ...clientv3.OpOption) (bool, error) {
start := time.Now()
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
resp, err := kv.client.Txn(ctx).If(
clientv3.Compare(
clientv3.Value(path.Join(kv.rootPath, key)),
"=",
string(value))).
Then(clientv3.OpPut(path.Join(kv.rootPath, key), string(target), opts...)).Commit()
if err != nil {
return false, err
}
CheckElapseAndWarn(start, "Slow etcd operation compare value and swap", zap.String("key", key))
return resp.Succeeded, nil
}
// CompareVersionAndSwap compares the existing key-value's version with version, and if
// they are equal, the target is stored in etcd.
func (kv *EtcdKV) CompareVersionAndSwap(key string, source int64, target string, opts ...clientv3.OpOption) (bool, error) {

View File

@ -26,7 +26,6 @@ import (
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
clientv3 "go.etcd.io/etcd/client/v3"
"golang.org/x/exp/maps"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
@ -131,19 +130,6 @@ func TestEtcdKV_Load(te *testing.T) {
assert.ElementsMatch(t, test.expectedKeys, actualKeys)
assert.ElementsMatch(t, test.expectedValues, actualValues)
assert.Equal(t, test.expectedError, err)
actualKeys, actualValues, versions, err := etcdKV.LoadWithPrefix2(test.prefix)
assert.ElementsMatch(t, test.expectedKeys, actualKeys)
assert.ElementsMatch(t, test.expectedValues, actualValues)
assert.NotZero(t, versions)
assert.Equal(t, test.expectedError, err)
actualKeys, actualValues, versions, revision, err := etcdKV.LoadWithRevisionAndVersions(test.prefix)
assert.ElementsMatch(t, test.expectedKeys, actualKeys)
assert.ElementsMatch(t, test.expectedValues, actualValues)
assert.NotZero(t, versions)
assert.NotZero(t, revision)
assert.Equal(t, test.expectedError, err)
}
removeTests := []struct {
@ -287,52 +273,8 @@ func TestEtcdKV_Load(te *testing.T) {
}
})
te.Run("EtcdKV LoadWithRevision", func(t *testing.T) {
rootPath := "/etcd/test/root/LoadWithRevision"
etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath)
defer etcdKV.Close()
defer etcdKV.RemoveWithPrefix("")
prepareKV := []struct {
inKey string
inValue string
}{
{"a", "a_version1"},
{"b", "b_version2"},
{"a", "a_version3"},
{"c", "c_version4"},
{"a/suba", "a_version5"},
}
for _, test := range prepareKV {
err = etcdKV.Save(test.inKey, test.inValue)
require.NoError(t, err)
}
loadWithRevisionTests := []struct {
inKey string
expectedKeyNo int
expectedValues []string
}{
{"a", 2, []string{"a_version3", "a_version5"}},
{"b", 1, []string{"b_version2"}},
{"c", 1, []string{"c_version4"}},
}
for _, test := range loadWithRevisionTests {
keys, values, revision, err := etcdKV.LoadWithRevision(test.inKey)
assert.NoError(t, err)
assert.Equal(t, test.expectedKeyNo, len(keys))
assert.ElementsMatch(t, test.expectedValues, values)
assert.NotZero(t, revision)
}
})
te.Run("EtcdKV LoadBytesWithRevision", func(t *testing.T) {
rootPath := "/etcd/test/root/LoadWithRevision"
rootPath := "/etcd/test/root/LoadBytesWithRevision"
etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath)
defer etcdKV.Close()
@ -711,59 +653,6 @@ func TestEtcdKV_Load(te *testing.T) {
assert.True(t, resp.Created)
})
te.Run("Etcd Revision", func(t *testing.T) {
rootPath := "/etcd/test/root/revision"
etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath)
defer etcdKV.Close()
defer etcdKV.RemoveWithPrefix("")
revisionTests := []struct {
inKey string
fistValue string
secondValue string
}{
{"a", "v1", "v11"},
{"y", "v2", "v22"},
{"z", "v3", "v33"},
}
for _, test := range revisionTests {
err = etcdKV.Save(test.inKey, test.fistValue)
require.NoError(t, err)
_, _, revision, _ := etcdKV.LoadWithRevision(test.inKey)
ch := etcdKV.WatchWithRevision(test.inKey, revision+1)
err = etcdKV.Save(test.inKey, test.secondValue)
require.NoError(t, err)
resp := <-ch
assert.Equal(t, 1, len(resp.Events))
assert.Equal(t, test.secondValue, string(resp.Events[0].Kv.Value))
assert.Equal(t, revision+1, resp.Header.Revision)
}
success, err := etcdKV.CompareVersionAndSwap("a/b/c", 0, "1")
assert.NoError(t, err)
assert.True(t, success)
value, err := etcdKV.Load("a/b/c")
assert.NoError(t, err)
assert.Equal(t, value, "1")
success, err = etcdKV.CompareVersionAndSwap("a/b/c", 0, "1")
assert.NoError(t, err)
assert.False(t, success)
success, err = etcdKV.CompareValueAndSwap("a/b/c", "1", "2")
assert.True(t, success)
assert.NoError(t, err)
success, err = etcdKV.CompareValueAndSwap("a/b/c", "1", "2")
assert.NoError(t, err)
assert.False(t, success)
})
te.Run("Etcd Revision Bytes", func(t *testing.T) {
rootPath := "/etcd/test/root/revision_bytes"
etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath)
@ -807,109 +696,6 @@ func TestEtcdKV_Load(te *testing.T) {
success, err = etcdKV.CompareVersionAndSwapBytes("a/b/c", 0, []byte("1"))
assert.NoError(t, err)
assert.False(t, success)
success, err = etcdKV.CompareValueAndSwapBytes("a/b/c", []byte("1"), []byte("2"))
assert.True(t, success)
assert.NoError(t, err)
success, err = etcdKV.CompareValueAndSwapBytes("a/b/c", []byte("1"), []byte("2"))
assert.NoError(t, err)
assert.False(t, success)
})
te.Run("Etcd Lease", func(t *testing.T) {
rootPath := "/etcd/test/root/lease"
etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath)
defer etcdKV.Close()
defer etcdKV.RemoveWithPrefix("")
leaseID, err := etcdKV.Grant(10)
assert.NoError(t, err)
etcdKV.KeepAlive(leaseID)
tests := map[string]string{
"a/b": "v1",
"a/b/c": "v2",
"x": "v3",
}
for k, v := range tests {
// SaveWithIgnoreLease must be used when the key already exists.
err = etcdKV.SaveWithIgnoreLease(k, v)
assert.Error(t, err)
err = etcdKV.SaveWithLease(k, v, leaseID)
assert.NoError(t, err)
err = etcdKV.SaveWithLease(k, v, clientv3.LeaseID(999))
assert.Error(t, err)
}
})
te.Run("Etcd Lease Ignore", func(t *testing.T) {
rootPath := "/etcd/test/root/lease_ignore"
etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath)
defer etcdKV.Close()
defer etcdKV.RemoveWithPrefix("")
tests := map[string]string{
"a/b": "v1",
"a/b/c": "v2",
"x": "v3",
}
for k, v := range tests {
leaseID, err := etcdKV.Grant(1)
assert.NoError(t, err)
err = etcdKV.SaveWithLease(k, v, leaseID)
assert.NoError(t, err)
err = etcdKV.SaveWithIgnoreLease(k, "updated_"+v)
assert.NoError(t, err)
// Record should be updated correctly.
value, err := etcdKV.Load(k)
assert.NoError(t, err)
assert.Equal(t, "updated_"+v, value)
// Let the lease expire. 3 seconds should be pretty safe.
time.Sleep(3 * time.Second)
// Updated record should still expire with lease.
_, err = etcdKV.Load(k)
assert.Error(t, err)
}
})
te.Run("Etcd Lease Bytes", func(t *testing.T) {
rootPath := "/etcd/test/root/lease_bytes"
etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath)
defer etcdKV.Close()
defer etcdKV.RemoveWithPrefix("")
leaseID, err := etcdKV.Grant(10)
assert.NoError(t, err)
etcdKV.KeepAlive(leaseID)
tests := map[string][]byte{
"a/b": []byte("v1"),
"a/b/c": []byte("v2"),
"x": []byte("v3"),
}
for k, v := range tests {
err = etcdKV.SaveBytesWithLease(k, v, leaseID)
assert.NoError(t, err)
err = etcdKV.SaveBytesWithLease(k, v, clientv3.LeaseID(999))
assert.Error(t, err)
}
})
}

View File

@ -67,17 +67,9 @@ type MetaKv interface {
TxnKV
GetPath(key string) string
LoadWithPrefix(key string) ([]string, []string, error)
LoadWithPrefix2(key string) ([]string, []string, []int64, error)
LoadWithRevisionAndVersions(key string) ([]string, []string, []int64, int64, error)
LoadWithRevision(key string) ([]string, []string, int64, error)
Watch(key string) clientv3.WatchChan
WatchWithPrefix(key string) clientv3.WatchChan
WatchWithRevision(key string, revision int64) clientv3.WatchChan
SaveWithLease(key, value string, id clientv3.LeaseID) error
SaveWithIgnoreLease(key, value string) error
Grant(ttl int64) (id clientv3.LeaseID, err error)
KeepAlive(id clientv3.LeaseID) (<-chan *clientv3.LeaseKeepAliveResponse, error)
CompareValueAndSwap(key, value, target string, opts ...clientv3.OpOption) (bool, error)
CompareVersionAndSwap(key string, version int64, target string, opts ...clientv3.OpOption) (bool, error)
WalkWithPrefix(prefix string, paginationSize int, fn func([]byte, []byte) error) error
}

View File

@ -45,7 +45,6 @@ import (
"github.com/milvus-io/milvus-proto/go-api/commonpb"
grpcquerynodeclient "github.com/milvus-io/milvus/internal/distributed/querynode/client"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/querynodev2/cluster"
"github.com/milvus-io/milvus/internal/querynodev2/delegator"
"github.com/milvus-io/milvus/internal/querynodev2/pipeline"
@ -118,7 +117,6 @@ type QueryNode struct {
cacheChunkManager storage.ChunkManager
vectorStorage storage.ChunkManager
etcdKV *etcdkv.EtcdKV
/*
// Pool for search/query
@ -274,7 +272,6 @@ func (node *QueryNode) Init() error {
return
}
node.etcdKV = etcdkv.NewEtcdKV(node.etcdCli, paramtable.Get().EtcdCfg.MetaRootPath.GetValue())
log.Info("queryNode try to connect etcd success", zap.String("MetaRootPath", paramtable.Get().EtcdCfg.MetaRootPath.GetValue()))
node.scheduler = tasks.NewScheduler()