From 08d12e62f378d2b68e05f37acb57776baede83e2 Mon Sep 17 00:00:00 2001 From: Letian Jiang Date: Fri, 4 Mar 2022 15:14:01 +0800 Subject: [PATCH] Add ByteSlice Method for embed etcd (#15844) Signed-off-by: Letian Jiang --- internal/kv/etcd/embed_etcd_kv.go | 242 +++++++++++++- internal/kv/etcd/embed_etcd_kv_test.go | 438 +++++++++++++++++++++++++ 2 files changed, 677 insertions(+), 3 deletions(-) diff --git a/internal/kv/etcd/embed_etcd_kv.go b/internal/kv/etcd/embed_etcd_kv.go index 2d2b714adb..7d3f332145 100644 --- a/internal/kv/etcd/embed_etcd_kv.go +++ b/internal/kv/etcd/embed_etcd_kv.go @@ -104,6 +104,26 @@ func (kv *EmbedEtcdKV) LoadWithPrefix(key string) ([]string, []string, error) { return keys, values, nil } +// LoadBytesWithPrefix returns all the keys and values with the given key prefix +func (kv *EmbedEtcdKV) LoadBytesWithPrefix(key string) ([]string, [][]byte, error) { + key = path.Join(kv.rootPath, key) + log.Debug("LoadBytesWithPrefix ", zap.String("prefix", key)) + ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) + defer cancel() + resp, err := kv.client.Get(ctx, key, clientv3.WithPrefix(), + clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)) + if err != nil { + return nil, nil, err + } + keys := make([]string, 0, resp.Count) + values := make([][]byte, 0, resp.Count) + for _, kv := range resp.Kvs { + keys = append(keys, string(kv.Key)) + values = append(values, kv.Value) + } + return keys, values, nil +} + // LoadWithPrefix2 returns all the keys and values with versions by the given key prefix func (kv *EmbedEtcdKV) LoadWithPrefix2(key string) ([]string, []string, []int64, error) { key = path.Join(kv.rootPath, key) @@ -126,6 +146,29 @@ func (kv *EmbedEtcdKV) LoadWithPrefix2(key string) ([]string, []string, []int64, return keys, values, versions, nil } +// LoadBytesWithPrefix2 returns all the keys and values with versions by the given key prefix +func (kv *EmbedEtcdKV) LoadBytesWithPrefix2(key string) ([]string, [][]byte, []int64, error) { + key = path.Join(kv.rootPath, key) + log.Debug("LoadBytesWithPrefix2 ", zap.String("prefix", key)) + ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) + defer cancel() + resp, err := kv.client.Get(ctx, key, clientv3.WithPrefix(), + clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)) + if err != nil { + return nil, nil, nil, err + } + keys := make([]string, 0, resp.Count) + values := make([][]byte, 0, resp.Count) + versions := make([]int64, 0, resp.Count) + for _, kv := range resp.Kvs { + keys = append(keys, string(kv.Key)) + values = append(values, kv.Value) + versions = append(versions, kv.Version) + } + return keys, values, versions, nil +} + +// Load returns value of the given key func (kv *EmbedEtcdKV) Load(key string) (string, error) { key = path.Join(kv.rootPath, key) ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) @@ -141,6 +184,23 @@ func (kv *EmbedEtcdKV) Load(key string) (string, error) { return string(resp.Kvs[0].Value), nil } +// LoadBytes returns value of the given key +func (kv *EmbedEtcdKV) LoadBytes(key string) ([]byte, error) { + key = path.Join(kv.rootPath, key) + ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) + defer cancel() + resp, err := kv.client.Get(ctx, key) + if err != nil { + return nil, err + } + if resp.Count <= 0 { + return nil, fmt.Errorf("there is no value on key = %s", key) + } + + return resp.Kvs[0].Value, nil +} + +// MultiLoad returns values of a set of keys func (kv *EmbedEtcdKV) MultiLoad(keys []string) ([]string, error) { ops := make([]clientv3.Op, 0, len(keys)) for _, keyLoad := range keys { @@ -151,7 +211,7 @@ func (kv *EmbedEtcdKV) MultiLoad(keys []string) ([]string, error) { defer cancel() resp, err := kv.client.Txn(ctx).If().Then(ops...).Commit() if err != nil { - return []string{}, err + return nil, err } result := make([]string, 0, len(keys)) @@ -176,6 +236,43 @@ func (kv *EmbedEtcdKV) MultiLoad(keys []string) ([]string, error) { return result, nil } +// MultiLoadBytes returns values of a set of keys +func (kv *EmbedEtcdKV) MultiLoadBytes(keys []string) ([][]byte, error) { + ops := make([]clientv3.Op, 0, len(keys)) + for _, keyLoad := range keys { + ops = append(ops, clientv3.OpGet(path.Join(kv.rootPath, keyLoad))) + } + + ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) + defer cancel() + resp, err := kv.client.Txn(ctx).If().Then(ops...).Commit() + if err != nil { + return nil, err + } + + result := make([][]byte, 0, len(keys)) + invalid := make([]string, 0, len(keys)) + for index, rp := range resp.Responses { + if rp.GetResponseRange().Kvs == nil || len(rp.GetResponseRange().Kvs) == 0 { + invalid = append(invalid, keys[index]) + result = append(result, []byte{}) + } + for _, ev := range rp.GetResponseRange().Kvs { + log.Debug("MultiLoadBytes", zap.ByteString("key", ev.Key), + zap.ByteString("value", ev.Value)) + result = append(result, ev.Value) + } + } + if len(invalid) != 0 { + log.Debug("MultiLoadBytes: there are invalid keys", + zap.Strings("keys", invalid)) + err = fmt.Errorf("there are invalid keys: %s", invalid) + return result, err + } + return result, nil +} + +// LoadWithRevision returns keys, values and revision with given key prefix. func (kv *EmbedEtcdKV) LoadWithRevision(key string) ([]string, []string, int64, error) { key = path.Join(kv.rootPath, key) log.Debug("LoadWithPrefix ", zap.String("prefix", key)) @@ -195,6 +292,27 @@ func (kv *EmbedEtcdKV) LoadWithRevision(key string) ([]string, []string, int64, return keys, values, resp.Header.Revision, nil } +// LoadBytesWithRevision returns keys, values and revision with given key prefix. +func (kv *EmbedEtcdKV) LoadBytesWithRevision(key string) ([]string, [][]byte, int64, error) { + key = path.Join(kv.rootPath, key) + log.Debug("LoadBytesWithRevision ", zap.String("prefix", key)) + ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) + defer cancel() + resp, err := kv.client.Get(ctx, key, clientv3.WithPrefix(), + clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)) + if err != nil { + return nil, nil, 0, err + } + keys := make([]string, 0, resp.Count) + values := make([][]byte, 0, resp.Count) + for _, kv := range resp.Kvs { + keys = append(keys, string(kv.Key)) + values = append(values, kv.Value) + } + return keys, values, resp.Header.Revision, nil +} + +// Save saves the key-value pair. func (kv *EmbedEtcdKV) Save(key, value string) error { key = path.Join(kv.rootPath, key) ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) @@ -203,6 +321,15 @@ func (kv *EmbedEtcdKV) Save(key, value string) error { return err } +// SaveBytes saves the key-value pair. +func (kv *EmbedEtcdKV) SaveBytes(key string, value []byte) error { + key = path.Join(kv.rootPath, key) + ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) + defer cancel() + _, err := kv.client.Put(ctx, key, string(value)) + return err +} + // SaveWithLease is a function to put value in etcd with etcd lease options. func (kv *EmbedEtcdKV) SaveWithLease(key, value string, id clientv3.LeaseID) error { key = path.Join(kv.rootPath, key) @@ -212,6 +339,16 @@ func (kv *EmbedEtcdKV) SaveWithLease(key, value string, id clientv3.LeaseID) err return err } +// SaveBytesWithLease is a function to put value in etcd with etcd lease options. +func (kv *EmbedEtcdKV) SaveBytesWithLease(key string, value []byte, id clientv3.LeaseID) error { + key = path.Join(kv.rootPath, key) + ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) + defer cancel() + _, err := kv.client.Put(ctx, key, string(value), clientv3.WithLease(id)) + return err +} + +// MultiSave saves the key-value pairs in a transaction. func (kv *EmbedEtcdKV) MultiSave(kvs map[string]string) error { ops := make([]clientv3.Op, 0, len(kvs)) for key, value := range kvs { @@ -225,6 +362,21 @@ func (kv *EmbedEtcdKV) MultiSave(kvs map[string]string) error { return err } +// MultiSaveBytes saves the key-value pairs in a transaction. +func (kv *EmbedEtcdKV) MultiSaveBytes(kvs map[string][]byte) error { + ops := make([]clientv3.Op, 0, len(kvs)) + for key, value := range kvs { + ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), string(value))) + } + + ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) + defer cancel() + + _, err := kv.client.Txn(ctx).If().Then(ops...).Commit() + return err +} + +// RemoveWithPrefix removes the keys with given prefix. func (kv *EmbedEtcdKV) RemoveWithPrefix(prefix string) error { key := path.Join(kv.rootPath, prefix) ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) @@ -234,6 +386,7 @@ func (kv *EmbedEtcdKV) RemoveWithPrefix(prefix string) error { return err } +// Remove removes the key. func (kv *EmbedEtcdKV) Remove(key string) error { key = path.Join(kv.rootPath, key) ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) @@ -243,6 +396,7 @@ func (kv *EmbedEtcdKV) Remove(key string) error { return err } +// MultiRemove removes the keys in a transaction. func (kv *EmbedEtcdKV) MultiRemove(keys []string) error { ops := make([]clientv3.Op, 0, len(keys)) for _, key := range keys { @@ -256,6 +410,7 @@ func (kv *EmbedEtcdKV) MultiRemove(keys []string) error { return err } +// MultiSaveAndRemove saves the key-value pairs and removes the keys in a transaction. func (kv *EmbedEtcdKV) MultiSaveAndRemove(saves map[string]string, removals []string) error { ops := make([]clientv3.Op, 0, len(saves)+len(removals)) for key, value := range saves { @@ -274,6 +429,25 @@ func (kv *EmbedEtcdKV) MultiSaveAndRemove(saves map[string]string, removals []st return err } +// MultiSaveBytesAndRemove saves the key-value pairs and removes the keys in a transaction. +func (kv *EmbedEtcdKV) MultiSaveBytesAndRemove(saves map[string][]byte, removals []string) error { + ops := make([]clientv3.Op, 0, len(saves)+len(removals)) + for key, value := range saves { + ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), string(value))) + } + + for _, keyDelete := range removals { + ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, keyDelete))) + } + + log.Debug("MultiSaveBytesAndRemove") + ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) + defer cancel() + + _, err := kv.client.Txn(ctx).If().Then(ops...).Commit() + return err +} + func (kv *EmbedEtcdKV) Watch(key string) clientv3.WatchChan { key = path.Join(kv.rootPath, key) rch := kv.client.Watch(context.Background(), key, clientv3.WithCreatedNotify()) @@ -306,8 +480,9 @@ func (kv *EmbedEtcdKV) MultiRemoveWithPrefix(keys []string) error { return err } +// MultiSaveAndRemoveWithPrefix saves kv in @saves and removes the keys with given prefix in @removals. func (kv *EmbedEtcdKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error { - ops := make([]clientv3.Op, 0, len(saves)) + ops := make([]clientv3.Op, 0, len(saves)+len(removals)) for key, value := range saves { ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), value)) } @@ -324,6 +499,25 @@ func (kv *EmbedEtcdKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, rem return err } +// MultiSaveBytesAndRemoveWithPrefix saves kv in @saves and removes the keys with given prefix in @removals. +func (kv *EmbedEtcdKV) MultiSaveBytesAndRemoveWithPrefix(saves map[string][]byte, removals []string) error { + ops := make([]clientv3.Op, 0, len(saves)+len(removals)) + for key, value := range saves { + ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), string(value))) + } + + for _, keyDelete := range removals { + ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, keyDelete), clientv3.WithPrefix())) + } + + log.Debug("MultiSaveBytesAndRemoveWithPrefix") + ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) + defer cancel() + + _, err := kv.client.Txn(ctx).If().Then(ops...).Commit() + return err +} + // Grant creates a new lease implemented in etcd grant interface. func (kv *EmbedEtcdKV) Grant(ttl int64) (id clientv3.LeaseID, err error) { resp, err := kv.client.Grant(context.Background(), ttl) @@ -355,7 +549,28 @@ func (kv *EmbedEtcdKV) CompareValueAndSwap(key, value, target string, opts ...cl return err } if !resp.Succeeded { - return fmt.Errorf("function CompareAndSwap error for compare is false for key: %s", key) + return fmt.Errorf("function CompareValueAndSwap error for compare is false for key: %s", key) + } + + return nil +} + +// CompareValueAndSwapBytes compares the existing value with compare, and if they are +// equal, the target is stored in etcd. +func (kv *EmbedEtcdKV) CompareValueAndSwapBytes(key string, value, target []byte, opts ...clientv3.OpOption) error { + ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) + defer cancel() + resp, err := kv.client.Txn(ctx).If( + clientv3.Compare( + clientv3.Value(path.Join(kv.rootPath, key)), + "=", + string(value))). + Then(clientv3.OpPut(path.Join(kv.rootPath, key), string(target), opts...)).Commit() + if err != nil { + return err + } + if !resp.Succeeded { + return fmt.Errorf("function CompareValueAndSwapBytes error for compare is false for key: %s", key) } return nil @@ -382,6 +597,27 @@ func (kv *EmbedEtcdKV) CompareVersionAndSwap(key string, version int64, target s return nil } +// CompareVersionAndSwapBytes compares the existing key-value's version with version, and if +// they are equal, the target is stored in etcd. +func (kv *EmbedEtcdKV) CompareVersionAndSwapBytes(key string, version int64, target []byte, opts ...clientv3.OpOption) error { + ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) + defer cancel() + resp, err := kv.client.Txn(ctx).If( + clientv3.Compare( + clientv3.Version(path.Join(kv.rootPath, key)), + "=", + version)). + Then(clientv3.OpPut(path.Join(kv.rootPath, key), string(target), opts...)).Commit() + if err != nil { + return err + } + if !resp.Succeeded { + return fmt.Errorf("function CompareVersionAndSwapBytes error for compare is false for key: %s", key) + } + + return nil +} + func (kv *EmbedEtcdKV) GetConfig() embed.Config { return kv.etcd.Config() } diff --git a/internal/kv/etcd/embed_etcd_kv_test.go b/internal/kv/etcd/embed_etcd_kv_test.go index 2aae7c399e..75e05ddf10 100644 --- a/internal/kv/etcd/embed_etcd_kv_test.go +++ b/internal/kv/etcd/embed_etcd_kv_test.go @@ -151,6 +151,117 @@ func TestEmbedEtcd(te *testing.T) { } }) + te.Run("EtcdKV SaveAndLoadBytes", func(t *testing.T) { + rootPath := "/etcd/test/root/saveandloadbytes" + _metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, ¶m.EtcdCfg) + metaKv := _metaKv.(*embed_etcd_kv.EmbedEtcdKV) + require.NoError(te, err) + assert.NotNil(te, metaKv) + require.NoError(t, err) + + defer metaKv.Close() + defer metaKv.RemoveWithPrefix("") + + saveAndLoadTests := []struct { + key string + value []byte + }{ + {"test1", []byte("value1")}, + {"test2", []byte("value2")}, + {"test1/a", []byte("value_a")}, + {"test1/b", []byte("value_b")}, + } + + for i, test := range saveAndLoadTests { + if i < 4 { + err = metaKv.SaveBytes(test.key, test.value) + assert.NoError(t, err) + } + + val, err := metaKv.LoadBytes(test.key) + assert.NoError(t, err) + assert.Equal(t, test.value, val) + } + + invalidLoadTests := []struct { + invalidKey string + }{ + {"t"}, + {"a"}, + {"test1a"}, + } + + for _, test := range invalidLoadTests { + val, err := metaKv.LoadBytes(test.invalidKey) + assert.Error(t, err) + assert.Zero(t, val) + } + + loadPrefixTests := []struct { + prefix string + + expectedKeys []string + expectedValues [][]byte + expectedError error + }{ + {"test", []string{ + metaKv.GetPath("test1"), + metaKv.GetPath("test2"), + metaKv.GetPath("test1/a"), + metaKv.GetPath("test1/b")}, [][]byte{[]byte("value1"), []byte("value2"), []byte("value_a"), []byte("value_b")}, nil}, + {"test1", []string{ + metaKv.GetPath("test1"), + metaKv.GetPath("test1/a"), + metaKv.GetPath("test1/b")}, [][]byte{[]byte("value1"), []byte("value_a"), []byte("value_b")}, nil}, + {"test2", []string{metaKv.GetPath("test2")}, [][]byte{[]byte("value2")}, nil}, + {"", []string{ + metaKv.GetPath("test1"), + metaKv.GetPath("test2"), + metaKv.GetPath("test1/a"), + metaKv.GetPath("test1/b")}, [][]byte{[]byte("value1"), []byte("value2"), []byte("value_a"), []byte("value_b")}, nil}, + {"test1/a", []string{metaKv.GetPath("test1/a")}, [][]byte{[]byte("value_a")}, nil}, + {"a", []string{}, [][]byte{}, nil}, + {"root", []string{}, [][]byte{}, nil}, + {"/etcd/test/root", []string{}, [][]byte{}, nil}, + } + + for _, test := range loadPrefixTests { + actualKeys, actualValues, err := metaKv.LoadBytesWithPrefix(test.prefix) + assert.ElementsMatch(t, test.expectedKeys, actualKeys) + assert.ElementsMatch(t, test.expectedValues, actualValues) + assert.Equal(t, test.expectedError, err) + + actualKeys, actualValues, versions, err := metaKv.LoadBytesWithPrefix2(test.prefix) + assert.ElementsMatch(t, test.expectedKeys, actualKeys) + assert.ElementsMatch(t, test.expectedValues, actualValues) + assert.NotZero(t, versions) + assert.Equal(t, test.expectedError, err) + } + + removeTests := []struct { + validKey string + invalidKey string + }{ + {"test1", "abc"}, + {"test1/a", "test1/lskfjal"}, + {"test1/b", "test1/b"}, + {"test2", "-"}, + } + + for _, test := range removeTests { + err = metaKv.Remove(test.validKey) + assert.NoError(t, err) + + _, err = metaKv.Load(test.validKey) + assert.Error(t, err) + + err = metaKv.Remove(test.validKey) + assert.NoError(t, err) + err = metaKv.Remove(test.invalidKey) + assert.NoError(t, err) + } + }) + te.Run("EtcdKV LoadWithRevision", func(t *testing.T) { rootPath := "/etcd/test/root/LoadWithRevision" metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, ¶m.EtcdCfg) @@ -196,6 +307,52 @@ func TestEmbedEtcd(te *testing.T) { }) + te.Run("EtcdKV LoadBytesWithRevision", func(t *testing.T) { + rootPath := "/etcd/test/root/LoadBytesWithRevision" + _metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, ¶m.EtcdCfg) + metaKv := _metaKv.(*embed_etcd_kv.EmbedEtcdKV) + assert.Nil(t, err) + + defer metaKv.Close() + defer metaKv.RemoveWithPrefix("") + + prepareKV := []struct { + inKey string + inValue []byte + }{ + {"a", []byte("a_version1")}, + {"b", []byte("b_version2")}, + {"a", []byte("a_version3")}, + {"c", []byte("c_version4")}, + {"a/suba", []byte("a_version5")}, + } + + for _, test := range prepareKV { + err = metaKv.SaveBytes(test.inKey, test.inValue) + require.NoError(t, err) + } + + loadWithRevisionTests := []struct { + inKey string + + expectedKeyNo int + expectedValues [][]byte + }{ + {"a", 2, [][]byte{[]byte("a_version3"), []byte("a_version5")}}, + {"b", 1, [][]byte{[]byte("b_version2")}}, + {"c", 1, [][]byte{[]byte("c_version4")}}, + } + + for _, test := range loadWithRevisionTests { + keys, values, revision, err := metaKv.LoadBytesWithRevision(test.inKey) + assert.NoError(t, err) + assert.Equal(t, test.expectedKeyNo, len(keys)) + assert.ElementsMatch(t, test.expectedValues, values) + assert.NotZero(t, revision) + } + + }) + te.Run("EtcdKV MultiSaveAndMultiLoad", func(t *testing.T) { rootPath := "/etcd/test/root/multi_save_and_multi_load" metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, ¶m.EtcdCfg) @@ -305,6 +462,116 @@ func TestEmbedEtcd(te *testing.T) { assert.Empty(t, vs) }) + te.Run("EtcdKV MultiSaveAndMultiLoadBytes", func(t *testing.T) { + rootPath := "/etcd/test/root/multi_save_and_multi_load" + _metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, ¶m.EtcdCfg) + metaKv := _metaKv.(*embed_etcd_kv.EmbedEtcdKV) + assert.Nil(t, err) + + defer metaKv.Close() + defer metaKv.RemoveWithPrefix("") + + multiSaveTests := map[string][]byte{ + "key_1": []byte("value_1"), + "key_2": []byte("value_2"), + "key_3/a": []byte("value_3a"), + "multikey_1": []byte("multivalue_1"), + "multikey_2": []byte("multivalue_2"), + "_": []byte("other"), + } + + err = metaKv.MultiSaveBytes(multiSaveTests) + assert.NoError(t, err) + for k, v := range multiSaveTests { + actualV, err := metaKv.LoadBytes(k) + assert.NoError(t, err) + assert.Equal(t, v, actualV) + } + + multiLoadTests := []struct { + inputKeys []string + expectedValues [][]byte + }{ + {[]string{"key_1"}, [][]byte{[]byte("value_1")}}, + {[]string{"key_1", "key_2", "key_3/a"}, [][]byte{[]byte("value_1"), []byte("value_2"), []byte("value_3a")}}, + {[]string{"multikey_1", "multikey_2"}, [][]byte{[]byte("multivalue_1"), []byte("multivalue_2")}}, + {[]string{"_"}, [][]byte{[]byte("other")}}, + } + + for _, test := range multiLoadTests { + vs, err := metaKv.MultiLoadBytes(test.inputKeys) + assert.NoError(t, err) + assert.Equal(t, test.expectedValues, vs) + } + + invalidMultiLoad := []struct { + invalidKeys []string + expectedValues [][]byte + }{ + {[]string{"a", "key_1"}, [][]byte{[]byte(""), []byte("value_1")}}, + {[]string{".....", "key_1"}, [][]byte{[]byte(""), []byte("value_1")}}, + {[]string{"*********"}, [][]byte{[]byte("")}}, + {[]string{"key_1", "1"}, [][]byte{[]byte("value_1"), []byte("")}}, + } + + for _, test := range invalidMultiLoad { + vs, err := metaKv.MultiLoadBytes(test.invalidKeys) + assert.Error(t, err) + assert.Equal(t, test.expectedValues, vs) + } + + removeWithPrefixTests := []string{ + "key_1", + "multi", + } + + for _, k := range removeWithPrefixTests { + err = metaKv.RemoveWithPrefix(k) + assert.NoError(t, err) + + ks, vs, err := metaKv.LoadBytesWithPrefix(k) + assert.Empty(t, ks) + assert.Empty(t, vs) + assert.NoError(t, err) + } + + multiRemoveTests := []string{ + "key_2", + "key_3/a", + "multikey_2", + "_", + } + + err = metaKv.MultiRemove(multiRemoveTests) + assert.NoError(t, err) + + ks, vs, err := metaKv.LoadBytesWithPrefix("") + assert.NoError(t, err) + assert.Empty(t, ks) + assert.Empty(t, vs) + + multiSaveAndRemoveTests := []struct { + multiSaves map[string][]byte + multiRemoves []string + }{ + {map[string][]byte{"key_1": []byte("value_1")}, []string{}}, + {map[string][]byte{"key_2": []byte("value_2")}, []string{"key_1"}}, + {map[string][]byte{"key_3/a": []byte("value_3a")}, []string{"key_2"}}, + {map[string][]byte{"multikey_1": []byte("multivalue_1")}, []string{}}, + {map[string][]byte{"multikey_2": []byte("multivalue_2")}, []string{"multikey_1", "key_3/a"}}, + {map[string][]byte{}, []string{"multikey_2"}}, + } + for _, test := range multiSaveAndRemoveTests { + err = metaKv.MultiSaveBytesAndRemove(test.multiSaves, test.multiRemoves) + assert.NoError(t, err) + } + + ks, vs, err = metaKv.LoadBytesWithPrefix("") + assert.NoError(t, err) + assert.Empty(t, ks) + assert.Empty(t, vs) + }) + te.Run("EtcdKV MultiRemoveWithPrefix", func(t *testing.T) { rootPath := "/etcd/test/root/multi_remove_with_prefix" metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, ¶m.EtcdCfg) @@ -393,6 +660,95 @@ func TestEmbedEtcd(te *testing.T) { } }) + te.Run("EtcdKV MultiRemoveWithPrefixBytes", func(t *testing.T) { + rootPath := "/etcd/test/root/multi_remove_with_prefix_bytes" + _metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, ¶m.EtcdCfg) + metaKv := _metaKv.(*embed_etcd_kv.EmbedEtcdKV) + require.NoError(t, err) + + defer metaKv.Close() + defer metaKv.RemoveWithPrefix("") + + prepareTests := map[string][]byte{ + "x/abc/1": []byte("1"), + "x/abc/2": []byte("2"), + "x/def/1": []byte("10"), + "x/def/2": []byte("20"), + "x/den/1": []byte("100"), + "x/den/2": []byte("200"), + } + + err = metaKv.MultiSaveBytes(prepareTests) + require.NoError(t, err) + + multiRemoveWithPrefixTests := []struct { + prefix []string + + testKey string + expectedValue []byte + }{ + {[]string{"x/abc"}, "x/abc/1", nil}, + {[]string{}, "x/abc/2", nil}, + {[]string{}, "x/def/1", []byte("10")}, + {[]string{}, "x/def/2", []byte("20")}, + {[]string{}, "x/den/1", []byte("100")}, + {[]string{}, "x/den/2", []byte("200")}, + {[]string{}, "not-exist", nil}, + {[]string{"x/def", "x/den"}, "x/def/1", nil}, + {[]string{}, "x/def/1", nil}, + {[]string{}, "x/def/2", nil}, + {[]string{}, "x/den/1", nil}, + {[]string{}, "x/den/2", nil}, + {[]string{}, "not-exist", nil}, + } + + for _, test := range multiRemoveWithPrefixTests { + if len(test.prefix) > 0 { + err = metaKv.MultiRemoveWithPrefix(test.prefix) + assert.NoError(t, err) + } + + v, _ := metaKv.LoadBytes(test.testKey) + assert.Equal(t, test.expectedValue, v) + } + + k, v, err := metaKv.LoadBytesWithPrefix("/") + assert.NoError(t, err) + assert.Zero(t, len(k)) + assert.Zero(t, len(v)) + + // MultiSaveAndRemoveWithPrefix + err = metaKv.MultiSaveBytes(prepareTests) + require.NoError(t, err) + multiSaveAndRemoveWithPrefixTests := []struct { + multiSave map[string][]byte + prefix []string + + loadPrefix string + lengthBeforeRemove int + lengthAfterRemove int + }{ + {map[string][]byte{}, []string{"x/abc", "x/def", "x/den"}, "x", 6, 0}, + {map[string][]byte{"y/a": []byte("vvv"), "y/b": []byte("vvv")}, []string{}, "y", 0, 2}, + {map[string][]byte{"y/c": []byte("vvv")}, []string{}, "y", 2, 3}, + {map[string][]byte{"p/a": []byte("vvv")}, []string{"y/a", "y"}, "y", 3, 0}, + {map[string][]byte{}, []string{"p"}, "p", 1, 0}, + } + + for _, test := range multiSaveAndRemoveWithPrefixTests { + k, _, err = metaKv.LoadBytesWithPrefix(test.loadPrefix) + assert.NoError(t, err) + assert.Equal(t, test.lengthBeforeRemove, len(k)) + + err = metaKv.MultiSaveBytesAndRemoveWithPrefix(test.multiSave, test.prefix) + assert.NoError(t, err) + + k, _, err = metaKv.LoadBytesWithPrefix(test.loadPrefix) + assert.NoError(t, err) + assert.Equal(t, test.lengthAfterRemove, len(k)) + } + }) + te.Run("EtcdKV Watch", func(t *testing.T) { rootPath := "/etcd/test/root/watch" metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, ¶m.EtcdCfg) @@ -461,6 +817,58 @@ func TestEmbedEtcd(te *testing.T) { assert.Error(t, err) }) + te.Run("Etcd Revision Bytes", func(t *testing.T) { + rootPath := "/etcd/test/root/revision_bytes" + _metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, ¶m.EtcdCfg) + metaKv := _metaKv.(*embed_etcd_kv.EmbedEtcdKV) + assert.Nil(t, err) + + defer metaKv.Close() + defer metaKv.RemoveWithPrefix("") + + revisionTests := []struct { + inKey string + fistValue []byte + secondValue []byte + }{ + {"a", []byte("v1"), []byte("v11")}, + {"y", []byte("v2"), []byte("v22")}, + {"z", []byte("v3"), []byte("v33")}, + } + + for _, test := range revisionTests { + err = metaKv.SaveBytes(test.inKey, test.fistValue) + require.NoError(t, err) + + _, _, revision, _ := metaKv.LoadBytesWithRevision(test.inKey) + ch := metaKv.WatchWithRevision(test.inKey, revision+1) + + err = metaKv.SaveBytes(test.inKey, test.secondValue) + require.NoError(t, err) + + resp := <-ch + assert.Equal(t, 1, len(resp.Events)) + assert.Equal(t, test.secondValue, resp.Events[0].Kv.Value) + assert.Equal(t, revision+1, resp.Header.Revision) + } + + err = metaKv.CompareVersionAndSwapBytes("a/b/c", 0, []byte("1")) + assert.NoError(t, err) + + value, err := metaKv.LoadBytes("a/b/c") + assert.NoError(t, err) + assert.Equal(t, value, []byte("1")) + + err = metaKv.CompareVersionAndSwapBytes("a/b/c", 0, []byte("1")) + assert.Error(t, err) + + err = metaKv.CompareValueAndSwapBytes("a/b/c", []byte("1"), []byte("2")) + assert.NoError(t, err) + + err = metaKv.CompareValueAndSwapBytes("a/b/c", []byte("1"), []byte("2")) + assert.Error(t, err) + }) + te.Run("Etcd Lease", func(t *testing.T) { rootPath := "/etcd/test/root/lease" metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, ¶m.EtcdCfg) @@ -489,4 +897,34 @@ func TestEmbedEtcd(te *testing.T) { } }) + + te.Run("Etcd Lease Bytes", func(t *testing.T) { + rootPath := "/etcd/test/root/lease_bytes" + _metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, ¶m.EtcdCfg) + metaKv := _metaKv.(*embed_etcd_kv.EmbedEtcdKV) + assert.Nil(t, err) + + defer metaKv.Close() + defer metaKv.RemoveWithPrefix("") + + leaseID, err := metaKv.Grant(10) + assert.NoError(t, err) + + metaKv.KeepAlive(leaseID) + + tests := map[string][]byte{ + "a/b": []byte("v1"), + "a/b/c": []byte("v2"), + "x": []byte("v3"), + } + + for k, v := range tests { + err = metaKv.SaveBytesWithLease(k, v, leaseID) + assert.NoError(t, err) + + err = metaKv.SaveBytesWithLease(k, v, clientv3.LeaseID(999)) + assert.Error(t, err) + } + + }) }