From b56ec7ea97425bc9ce8201687f8a004a6dbffdcb Mon Sep 17 00:00:00 2001 From: Letian Jiang Date: Fri, 25 Feb 2022 11:25:53 +0800 Subject: [PATCH] Add ByteSlice Method for Etcd-kv (#15738) Signed-off-by: Letian Jiang --- internal/kv/etcd/etcd_kv.go | 240 ++++++++++++++++++++ internal/kv/etcd/etcd_kv_test.go | 366 ++++++++++++++++++++++++++++++- 2 files changed, 605 insertions(+), 1 deletion(-) diff --git a/internal/kv/etcd/etcd_kv.go b/internal/kv/etcd/etcd_kv.go index 4d4f204f82..f7583fc168 100644 --- a/internal/kv/etcd/etcd_kv.go +++ b/internal/kv/etcd/etcd_kv.go @@ -79,6 +79,27 @@ func (kv *EtcdKV) LoadWithPrefix(key string) ([]string, []string, error) { return keys, values, nil } +// LoadBytesWithPrefix returns all the keys and values with the given key prefix. +func (kv *EtcdKV) LoadBytesWithPrefix(key string) ([]string, [][]byte, error) { + start := time.Now() + key = path.Join(kv.rootPath, key) + ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) + defer cancel() + resp, err := kv.client.Get(ctx, key, clientv3.WithPrefix(), + clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)) + if err != nil { + return nil, nil, err + } + keys := make([]string, 0, resp.Count) + values := make([][]byte, 0, resp.Count) + for _, kv := range resp.Kvs { + keys = append(keys, string(kv.Key)) + values = append(values, kv.Value) + } + CheckElapseAndWarn(start, "Slow etcd operation load with prefix") + return keys, values, nil +} + // LoadWithPrefix2 returns all the the keys,values and key versions with the given key prefix. func (kv *EtcdKV) LoadWithPrefix2(key string) ([]string, []string, []int64, error) { start := time.Now() @@ -102,6 +123,29 @@ func (kv *EtcdKV) LoadWithPrefix2(key string) ([]string, []string, []int64, erro return keys, values, versions, nil } +// LoadBytesWithPrefix2 returns all the the keys,values and key versions with the given key prefix. +func (kv *EtcdKV) LoadBytesWithPrefix2(key string) ([]string, [][]byte, []int64, error) { + start := time.Now() + key = path.Join(kv.rootPath, key) + ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) + defer cancel() + resp, err := kv.client.Get(ctx, key, clientv3.WithPrefix(), + clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)) + if err != nil { + return nil, nil, nil, err + } + keys := make([]string, 0, resp.Count) + values := make([][]byte, 0, resp.Count) + versions := make([]int64, 0, resp.Count) + for _, kv := range resp.Kvs { + keys = append(keys, string(kv.Key)) + values = append(values, kv.Value) + versions = append(versions, kv.Version) + } + CheckElapseAndWarn(start, "Slow etcd operation load with prefix2") + return keys, values, versions, nil +} + // Load returns value of the key. func (kv *EtcdKV) Load(key string) (string, error) { start := time.Now() @@ -119,6 +163,23 @@ func (kv *EtcdKV) Load(key string) (string, error) { return string(resp.Kvs[0].Value), nil } +// LoadBytes returns value of the key. +func (kv *EtcdKV) LoadBytes(key string) ([]byte, error) { + start := time.Now() + key = path.Join(kv.rootPath, key) + ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) + defer cancel() + resp, err := kv.client.Get(ctx, key) + if err != nil { + return []byte{}, err + } + if resp.Count <= 0 { + return []byte{}, fmt.Errorf("there is no value on key = %s", key) + } + CheckElapseAndWarn(start, "Slow etcd operation load") + return resp.Kvs[0].Value, nil +} + // MultiLoad gets the values of the keys in a transaction. func (kv *EtcdKV) MultiLoad(keys []string) ([]string, error) { start := time.Now() @@ -154,6 +215,41 @@ func (kv *EtcdKV) MultiLoad(keys []string) ([]string, error) { return result, nil } +// MultiLoadBytes gets the values of the keys in a transaction. +func (kv *EtcdKV) MultiLoadBytes(keys []string) ([][]byte, error) { + start := time.Now() + ops := make([]clientv3.Op, 0, len(keys)) + for _, keyLoad := range keys { + ops = append(ops, clientv3.OpGet(path.Join(kv.rootPath, keyLoad))) + } + + ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) + defer cancel() + resp, err := kv.client.Txn(ctx).If().Then(ops...).Commit() + if err != nil { + return [][]byte{}, err + } + + result := make([][]byte, 0, len(keys)) + invalid := make([]string, 0, len(keys)) + for index, rp := range resp.Responses { + if rp.GetResponseRange().Kvs == nil || len(rp.GetResponseRange().Kvs) == 0 { + invalid = append(invalid, keys[index]) + result = append(result, []byte{}) + } + for _, ev := range rp.GetResponseRange().Kvs { + result = append(result, ev.Value) + } + } + if len(invalid) != 0 { + log.Warn("MultiLoad: there are invalid keys", zap.Strings("keys", invalid)) + err = fmt.Errorf("there are invalid keys: %s", invalid) + return result, err + } + CheckElapseAndWarn(start, "Slow etcd operation multi load") + return result, nil +} + // LoadWithRevision returns keys, values and revision with given key prefix. func (kv *EtcdKV) LoadWithRevision(key string) ([]string, []string, int64, error) { start := time.Now() @@ -175,6 +271,27 @@ func (kv *EtcdKV) LoadWithRevision(key string) ([]string, []string, int64, error return keys, values, resp.Header.Revision, nil } +// LoadBytesWithRevision returns keys, values and revision with given key prefix. +func (kv *EtcdKV) LoadBytesWithRevision(key string) ([]string, [][]byte, int64, error) { + start := time.Now() + key = path.Join(kv.rootPath, key) + ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) + defer cancel() + resp, err := kv.client.Get(ctx, key, clientv3.WithPrefix(), + clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)) + if err != nil { + return nil, nil, 0, err + } + keys := make([]string, 0, resp.Count) + values := make([][]byte, 0, resp.Count) + for _, kv := range resp.Kvs { + keys = append(keys, string(kv.Key)) + values = append(values, kv.Value) + } + CheckElapseAndWarn(start, "Slow etcd operation load with revision") + return keys, values, resp.Header.Revision, nil +} + // Save saves the key-value pair. func (kv *EtcdKV) Save(key, value string) error { start := time.Now() @@ -186,6 +303,17 @@ func (kv *EtcdKV) Save(key, value string) error { return err } +// SaveBytes saves the key-value pair. +func (kv *EtcdKV) SaveBytes(key string, value []byte) error { + start := time.Now() + key = path.Join(kv.rootPath, key) + ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) + defer cancel() + _, err := kv.client.Put(ctx, key, string(value)) + CheckElapseAndWarn(start, "Slow etcd operation save") + return err +} + // SaveWithLease is a function to put value in etcd with etcd lease options. func (kv *EtcdKV) SaveWithLease(key, value string, id clientv3.LeaseID) error { start := time.Now() @@ -197,6 +325,17 @@ func (kv *EtcdKV) SaveWithLease(key, value string, id clientv3.LeaseID) error { return err } +// SaveBytesWithLease is a function to put value in etcd with etcd lease options. +func (kv *EtcdKV) SaveBytesWithLease(key string, value []byte, id clientv3.LeaseID) error { + start := time.Now() + key = path.Join(kv.rootPath, key) + ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) + defer cancel() + _, err := kv.client.Put(ctx, key, string(value), clientv3.WithLease(id)) + CheckElapseAndWarn(start, "Slow etcd operation save with lease") + return err +} + // MultiSave saves the key-value pairs in a transaction. func (kv *EtcdKV) MultiSave(kvs map[string]string) error { start := time.Now() @@ -213,6 +352,22 @@ func (kv *EtcdKV) MultiSave(kvs map[string]string) error { return err } +// MultiSaveBytes saves the key-value pairs in a transaction. +func (kv *EtcdKV) MultiSaveBytes(kvs map[string][]byte) error { + start := time.Now() + ops := make([]clientv3.Op, 0, len(kvs)) + for key, value := range kvs { + ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), string(value))) + } + + ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) + defer cancel() + + _, err := kv.client.Txn(ctx).If().Then(ops...).Commit() + CheckElapseAndWarn(start, "Slow etcd operation multi save") + return err +} + // RemoveWithPrefix removes the keys with given prefix. func (kv *EtcdKV) RemoveWithPrefix(prefix string) error { start := time.Now() @@ -273,6 +428,26 @@ func (kv *EtcdKV) MultiSaveAndRemove(saves map[string]string, removals []string) return err } +// MultiSaveBytesAndRemove saves the key-value pairs and removes the keys in a transaction. +func (kv *EtcdKV) MultiSaveBytesAndRemove(saves map[string][]byte, removals []string) error { + start := time.Now() + ops := make([]clientv3.Op, 0, len(saves)+len(removals)) + for key, value := range saves { + ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), string(value))) + } + + for _, keyDelete := range removals { + ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, keyDelete))) + } + + ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) + defer cancel() + + _, err := kv.client.Txn(ctx).If().Then(ops...).Commit() + CheckElapseAndWarn(start, "Slow etcd operation multi save and remove") + return err +} + // Watch starts watching a key, returns a watch channel. func (kv *EtcdKV) Watch(key string) clientv3.WatchChan { start := time.Now() @@ -336,6 +511,26 @@ func (kv *EtcdKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals return err } +// MultiSaveBytesAndRemoveWithPrefix saves kv in @saves and removes the keys with given prefix in @removals. +func (kv *EtcdKV) MultiSaveBytesAndRemoveWithPrefix(saves map[string][]byte, removals []string) error { + start := time.Now() + ops := make([]clientv3.Op, 0, len(saves)) + for key, value := range saves { + ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), string(value))) + } + + for _, keyDelete := range removals { + ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, keyDelete), clientv3.WithPrefix())) + } + + ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) + defer cancel() + + _, err := kv.client.Txn(ctx).If().Then(ops...).Commit() + CheckElapseAndWarn(start, "Slow etcd operation multi save and move with prefix") + return err +} + // Grant creates a new lease implemented in etcd grant interface. func (kv *EtcdKV) Grant(ttl int64) (id clientv3.LeaseID, err error) { start := time.Now() @@ -378,6 +573,28 @@ func (kv *EtcdKV) CompareValueAndSwap(key, value, target string, opts ...clientv return nil } +// CompareValueAndSwapBytes compares the existing value with compare, and if they are +// equal, the target is stored in etcd. +func (kv *EtcdKV) CompareValueAndSwapBytes(key string, value, target []byte, opts ...clientv3.OpOption) error { + start := time.Now() + ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) + defer cancel() + resp, err := kv.client.Txn(ctx).If( + clientv3.Compare( + clientv3.Value(path.Join(kv.rootPath, key)), + "=", + string(value))). + Then(clientv3.OpPut(path.Join(kv.rootPath, key), string(target), opts...)).Commit() + if err != nil { + return err + } + if !resp.Succeeded { + return fmt.Errorf("function CompareAndSwap error for compare is false for key: %s", key) + } + CheckElapseAndWarn(start, "Slow etcd operation compare value and swap") + return nil +} + // CompareVersionAndSwap compares the existing key-value's version with version, and if // they are equal, the target is stored in etcd. func (kv *EtcdKV) CompareVersionAndSwap(key string, source int64, target string, opts ...clientv3.OpOption) error { @@ -401,6 +618,29 @@ func (kv *EtcdKV) CompareVersionAndSwap(key string, source int64, target string, return nil } +// CompareVersionAndSwapBytes compares the existing key-value's version with version, and if +// they are equal, the target is stored in etcd. +func (kv *EtcdKV) CompareVersionAndSwapBytes(key string, source int64, target []byte, opts ...clientv3.OpOption) error { + start := time.Now() + ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) + defer cancel() + resp, err := kv.client.Txn(ctx).If( + clientv3.Compare( + clientv3.Version(path.Join(kv.rootPath, key)), + "=", + source)). + Then(clientv3.OpPut(path.Join(kv.rootPath, key), string(target), opts...)).Commit() + if err != nil { + return err + } + if !resp.Succeeded { + return fmt.Errorf("function CompareAndSwap error for compare is false for key: %s,"+ + " source version: %d, target version: %s", key, source, target) + } + CheckElapseAndWarn(start, "Slow etcd operation compare version and swap") + return nil +} + // CheckElapseAndWarn checks the elapsed time and warns if it is too long. func CheckElapseAndWarn(start time.Time, message string) bool { elapsed := time.Since(start) diff --git a/internal/kv/etcd/etcd_kv_test.go b/internal/kv/etcd/etcd_kv_test.go index 1f55581eac..7db3a6d78e 100644 --- a/internal/kv/etcd/etcd_kv_test.go +++ b/internal/kv/etcd/etcd_kv_test.go @@ -150,6 +150,123 @@ func TestEtcdKV_Load(te *testing.T) { } }) + te.Run("EtcdKV SaveAndLoadBytes", func(t *testing.T) { + rootPath := "/etcd/test/root/saveandloadbytes" + etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath) + err = etcdKV.RemoveWithPrefix("") + require.NoError(t, err) + + defer etcdKV.Close() + defer etcdKV.RemoveWithPrefix("") + + saveAndLoadTests := []struct { + key string + value string + }{ + {"test1", "value1"}, + {"test2", "value2"}, + {"test1/a", "value_a"}, + {"test1/b", "value_b"}, + } + + for i, test := range saveAndLoadTests { + if i < 4 { + err = etcdKV.SaveBytes(test.key, []byte(test.value)) + assert.NoError(t, err) + } + + val, err := etcdKV.LoadBytes(test.key) + assert.NoError(t, err) + assert.Equal(t, test.value, string(val)) + } + + invalidLoadTests := []struct { + invalidKey string + }{ + {"t"}, + {"a"}, + {"test1a"}, + } + + for _, test := range invalidLoadTests { + val, err := etcdKV.LoadBytes(test.invalidKey) + assert.Error(t, err) + assert.Zero(t, string(val)) + } + + loadPrefixTests := []struct { + prefix string + + expectedKeys []string + expectedValues []string + expectedError error + }{ + {"test", []string{ + etcdKV.GetPath("test1"), + etcdKV.GetPath("test2"), + etcdKV.GetPath("test1/a"), + etcdKV.GetPath("test1/b")}, []string{"value1", "value2", "value_a", "value_b"}, nil}, + {"test1", []string{ + etcdKV.GetPath("test1"), + etcdKV.GetPath("test1/a"), + etcdKV.GetPath("test1/b")}, []string{"value1", "value_a", "value_b"}, nil}, + {"test2", []string{etcdKV.GetPath("test2")}, []string{"value2"}, nil}, + {"", []string{ + etcdKV.GetPath("test1"), + etcdKV.GetPath("test2"), + etcdKV.GetPath("test1/a"), + etcdKV.GetPath("test1/b")}, []string{"value1", "value2", "value_a", "value_b"}, nil}, + {"test1/a", []string{etcdKV.GetPath("test1/a")}, []string{"value_a"}, nil}, + {"a", []string{}, []string{}, nil}, + {"root", []string{}, []string{}, nil}, + {"/etcd/test/root", []string{}, []string{}, nil}, + } + + for _, test := range loadPrefixTests { + actualKeys, actualValues, err := etcdKV.LoadBytesWithPrefix(test.prefix) + actualStringValues := make([]string, len(actualValues)) + for i := range actualValues { + actualStringValues[i] = string(actualValues[i]) + } + assert.ElementsMatch(t, test.expectedKeys, actualKeys) + assert.ElementsMatch(t, test.expectedValues, actualStringValues) + assert.Equal(t, test.expectedError, err) + + actualKeys, actualValues, versions, err := etcdKV.LoadBytesWithPrefix2(test.prefix) + actualStringValues = make([]string, len(actualValues)) + for i := range actualValues { + actualStringValues[i] = string(actualValues[i]) + } + assert.ElementsMatch(t, test.expectedKeys, actualKeys) + assert.ElementsMatch(t, test.expectedValues, actualStringValues) + assert.NotZero(t, versions) + assert.Equal(t, test.expectedError, err) + } + + removeTests := []struct { + validKey string + invalidKey string + }{ + {"test1", "abc"}, + {"test1/a", "test1/lskfjal"}, + {"test1/b", "test1/b"}, + {"test2", "-"}, + } + + for _, test := range removeTests { + err = etcdKV.Remove(test.validKey) + assert.NoError(t, err) + + _, err = etcdKV.Load(test.validKey) + assert.Error(t, err) + + err = etcdKV.Remove(test.validKey) + assert.NoError(t, err) + err = etcdKV.Remove(test.invalidKey) + assert.NoError(t, err) + } + }) + te.Run("EtcdKV LoadWithRevision", func(t *testing.T) { rootPath := "/etcd/test/root/LoadWithRevision" etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath) @@ -194,6 +311,54 @@ func TestEtcdKV_Load(te *testing.T) { }) + te.Run("EtcdKV LoadBytesWithRevision", func(t *testing.T) { + rootPath := "/etcd/test/root/LoadWithRevision" + etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath) + + defer etcdKV.Close() + defer etcdKV.RemoveWithPrefix("") + + prepareKV := []struct { + inKey string + inValue string + }{ + {"a", "a_version1"}, + {"b", "b_version2"}, + {"a", "a_version3"}, + {"c", "c_version4"}, + {"a/suba", "a_version5"}, + } + + for _, test := range prepareKV { + err = etcdKV.SaveBytes(test.inKey, []byte(test.inValue)) + require.NoError(t, err) + } + + loadWithRevisionTests := []struct { + inKey string + + expectedKeyNo int + expectedValues []string + }{ + {"a", 2, []string{"a_version3", "a_version5"}}, + {"b", 1, []string{"b_version2"}}, + {"c", 1, []string{"c_version4"}}, + } + + for _, test := range loadWithRevisionTests { + keys, values, revision, err := etcdKV.LoadBytesWithRevision(test.inKey) + assert.NoError(t, err) + assert.Equal(t, test.expectedKeyNo, len(keys)) + stringValues := make([]string, len(values)) + for i := range values { + stringValues[i] = string(values[i]) + } + assert.ElementsMatch(t, test.expectedValues, stringValues) + assert.NotZero(t, revision) + } + + }) + te.Run("EtcdKV MultiSaveAndMultiLoad", func(t *testing.T) { rootPath := "/etcd/test/root/multi_save_and_multi_load" etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath) @@ -302,6 +467,128 @@ func TestEtcdKV_Load(te *testing.T) { assert.Empty(t, vs) }) + te.Run("EtcdKV MultiSaveBytesAndMultiLoadBytes", func(t *testing.T) { + rootPath := "/etcd/test/root/multi_save_bytes_and_multi_load_bytes" + etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath) + + defer etcdKV.Close() + defer etcdKV.RemoveWithPrefix("") + + multiSaveTests := map[string]string{ + "key_1": "value_1", + "key_2": "value_2", + "key_3/a": "value_3a", + "multikey_1": "multivalue_1", + "multikey_2": "multivalue_2", + "_": "other", + } + + multiSaveBytesTests := make(map[string][]byte) + for k, v := range multiSaveTests { + multiSaveBytesTests[k] = []byte(v) + } + + err = etcdKV.MultiSaveBytes(multiSaveBytesTests) + assert.NoError(t, err) + for k, v := range multiSaveTests { + actualV, err := etcdKV.LoadBytes(k) + assert.NoError(t, err) + assert.Equal(t, v, string(actualV)) + } + + multiLoadTests := []struct { + inputKeys []string + expectedValues []string + }{ + {[]string{"key_1"}, []string{"value_1"}}, + {[]string{"key_1", "key_2", "key_3/a"}, []string{"value_1", "value_2", "value_3a"}}, + {[]string{"multikey_1", "multikey_2"}, []string{"multivalue_1", "multivalue_2"}}, + {[]string{"_"}, []string{"other"}}, + } + + for _, test := range multiLoadTests { + vs, err := etcdKV.MultiLoadBytes(test.inputKeys) + stringVs := make([]string, len(vs)) + for i := range vs { + stringVs[i] = string(vs[i]) + } + assert.NoError(t, err) + assert.Equal(t, test.expectedValues, stringVs) + } + + invalidMultiLoad := []struct { + invalidKeys []string + expectedValues []string + }{ + {[]string{"a", "key_1"}, []string{"", "value_1"}}, + {[]string{".....", "key_1"}, []string{"", "value_1"}}, + {[]string{"*********"}, []string{""}}, + {[]string{"key_1", "1"}, []string{"value_1", ""}}, + } + + for _, test := range invalidMultiLoad { + vs, err := etcdKV.MultiLoadBytes(test.invalidKeys) + stringVs := make([]string, len(vs)) + for i := range vs { + stringVs[i] = string(vs[i]) + } + assert.Error(t, err) + assert.Equal(t, test.expectedValues, stringVs) + } + + removeWithPrefixTests := []string{ + "key_1", + "multi", + } + + for _, k := range removeWithPrefixTests { + err = etcdKV.RemoveWithPrefix(k) + assert.NoError(t, err) + + ks, vs, err := etcdKV.LoadBytesWithPrefix(k) + assert.Empty(t, ks) + assert.Empty(t, vs) + assert.NoError(t, err) + } + + multiRemoveTests := []string{ + "key_2", + "key_3/a", + "multikey_2", + "_", + } + + err = etcdKV.MultiRemove(multiRemoveTests) + assert.NoError(t, err) + + ks, vs, err := etcdKV.LoadBytesWithPrefix("") + assert.NoError(t, err) + assert.Empty(t, ks) + assert.Empty(t, vs) + + multiSaveAndRemoveTests := []struct { + multiSaves map[string][]byte + multiRemoves []string + }{ + {map[string][]byte{"key_1": []byte("value_1")}, []string{}}, + {map[string][]byte{"key_2": []byte("value_2")}, []string{"key_1"}}, + {map[string][]byte{"key_3/a": []byte("value_3a")}, []string{"key_2"}}, + {map[string][]byte{"multikey_1": []byte("multivalue_1")}, []string{}}, + {map[string][]byte{"multikey_2": []byte("multivalue_2")}, []string{"multikey_1", "key_3/a"}}, + {make(map[string][]byte), []string{"multikey_2"}}, + } + + for _, test := range multiSaveAndRemoveTests { + err = etcdKV.MultiSaveBytesAndRemove(test.multiSaves, test.multiRemoves) + assert.NoError(t, err) + } + + ks, vs, err = etcdKV.LoadBytesWithPrefix("") + assert.NoError(t, err) + assert.Empty(t, ks) + assert.Empty(t, vs) + }) + te.Run("EtcdKV MultiRemoveWithPrefix", func(t *testing.T) { rootPath := "/etcd/test/root/multi_remove_with_prefix" etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath) @@ -405,7 +692,7 @@ func TestEtcdKV_Load(te *testing.T) { }) te.Run("Etcd Revision", func(t *testing.T) { - rootPath := "/etcd/test/root/watch" + rootPath := "/etcd/test/root/revision" etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath) defer etcdKV.Close() defer etcdKV.RemoveWithPrefix("") @@ -453,6 +740,55 @@ func TestEtcdKV_Load(te *testing.T) { assert.Error(t, err) }) + te.Run("Etcd Revision Bytes", func(t *testing.T) { + rootPath := "/etcd/test/root/revision_bytes" + etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath) + defer etcdKV.Close() + defer etcdKV.RemoveWithPrefix("") + + revisionTests := []struct { + inKey string + fistValue []byte + secondValue []byte + }{ + {"a", []byte("v1"), []byte("v11")}, + {"y", []byte("v2"), []byte("v22")}, + {"z", []byte("v3"), []byte("v33")}, + } + + for _, test := range revisionTests { + err = etcdKV.SaveBytes(test.inKey, test.fistValue) + require.NoError(t, err) + + _, _, revision, _ := etcdKV.LoadBytesWithRevision(test.inKey) + ch := etcdKV.WatchWithRevision(test.inKey, revision+1) + + err = etcdKV.SaveBytes(test.inKey, test.secondValue) + require.NoError(t, err) + + resp := <-ch + assert.Equal(t, 1, len(resp.Events)) + assert.Equal(t, string(test.secondValue), string(resp.Events[0].Kv.Value)) + assert.Equal(t, revision+1, resp.Header.Revision) + } + + err = etcdKV.CompareVersionAndSwapBytes("a/b/c", 0, []byte("1")) + assert.NoError(t, err) + + value, err := etcdKV.LoadBytes("a/b/c") + assert.NoError(t, err) + assert.Equal(t, string(value), "1") + + err = etcdKV.CompareVersionAndSwapBytes("a/b/c", 0, []byte("1")) + assert.Error(t, err) + + err = etcdKV.CompareValueAndSwapBytes("a/b/c", []byte("1"), []byte("2")) + assert.NoError(t, err) + + err = etcdKV.CompareValueAndSwapBytes("a/b/c", []byte("1"), []byte("2")) + assert.Error(t, err) + }) + te.Run("Etcd Lease", func(t *testing.T) { rootPath := "/etcd/test/root/lease" etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath) @@ -480,6 +816,34 @@ func TestEtcdKV_Load(te *testing.T) { } }) + + te.Run("Etcd Lease Bytes", func(t *testing.T) { + rootPath := "/etcd/test/root/lease_bytes" + etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath) + + defer etcdKV.Close() + defer etcdKV.RemoveWithPrefix("") + + leaseID, err := etcdKV.Grant(10) + assert.NoError(t, err) + + etcdKV.KeepAlive(leaseID) + + tests := map[string][]byte{ + "a/b": []byte("v1"), + "a/b/c": []byte("v2"), + "x": []byte("v3"), + } + + for k, v := range tests { + err = etcdKV.SaveBytesWithLease(k, v, leaseID) + assert.NoError(t, err) + + err = etcdKV.SaveBytesWithLease(k, v, clientv3.LeaseID(999)) + assert.Error(t, err) + } + + }) } func TestElapse(t *testing.T) {