diff --git a/internal/kv/etcd/etcd_kv.go b/internal/kv/etcd/etcd_kv.go index 09a486ab65..a6395d98b8 100644 --- a/internal/kv/etcd/etcd_kv.go +++ b/internal/kv/etcd/etcd_kv.go @@ -139,6 +139,25 @@ func (kv *EtcdKV) MultiLoad(keys []string) ([]string, error) { return result, nil } +func (kv *EtcdKV) LoadWithVersion(key string) ([]string, []string, int64, error) { + key = path.Join(kv.rootPath, key) + log.Debug("LoadWithPrefix ", zap.String("prefix", key)) + ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) + defer cancel() + resp, err := kv.client.Get(ctx, key, clientv3.WithPrefix(), + clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)) + if err != nil { + return nil, nil, 0, err + } + keys := make([]string, 0, resp.Count) + values := make([]string, 0, resp.Count) + for _, kv := range resp.Kvs { + keys = append(keys, string(kv.Key)) + values = append(values, string(kv.Value)) + } + return keys, values, resp.Header.Revision, nil +} + func (kv *EtcdKV) Save(key, value string) error { key = path.Join(kv.rootPath, key) ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) @@ -230,6 +249,12 @@ func (kv *EtcdKV) WatchWithPrefix(key string) clientv3.WatchChan { return rch } +func (kv *EtcdKV) WatchWithVersion(key string, revision int64) clientv3.WatchChan { + key = path.Join(kv.rootPath, key) + rch := kv.client.Watch(context.Background(), key, clientv3.WithPrefix(), clientv3.WithPrevKV(), clientv3.WithRev(revision)) + return rch +} + func (kv *EtcdKV) MultiRemoveWithPrefix(keys []string) error { ops := make([]clientv3.Op, 0, len(keys)) for _, k := range keys { diff --git a/internal/kv/etcd/etcd_kv_test.go b/internal/kv/etcd/etcd_kv_test.go index 6159eb4f7b..1d8ed36afe 100644 --- a/internal/kv/etcd/etcd_kv_test.go +++ b/internal/kv/etcd/etcd_kv_test.go @@ -308,6 +308,52 @@ func TestEtcdKV_WatchPrefix(t *testing.T) { assert.True(t, resp.Created) } +func TestEtcdKV_LoadWithVersion(t *testing.T) { + cli, err := newEtcdClient() + assert.Nil(t, err) + rootPath := "/etcd/test/root" + etcdKV := etcdkv.NewEtcdKV(cli, rootPath) + + defer etcdKV.Close() + defer etcdKV.RemoveWithPrefix("") + + key := "test-load-version" + + err = etcdKV.Save(key, "v1") + assert.Nil(t, err) + + _, values, _, err := etcdKV.LoadWithVersion(key) + assert.Nil(t, err) + assert.Equal(t, "v1", values[0]) +} + +func TestEtcdKV_WatchWithVersion(t *testing.T) { + cli, err := newEtcdClient() + assert.Nil(t, err) + rootPath := "/etcd/test/root" + etcdKV := etcdkv.NewEtcdKV(cli, rootPath) + + defer etcdKV.Close() + defer etcdKV.RemoveWithPrefix("") + + key := "test-version" + + err = etcdKV.Save(key, "v") + assert.Nil(t, err) + + _, _, revision, _ := etcdKV.LoadWithVersion(key) + + ch := etcdKV.WatchWithVersion(key, revision+1) + err = etcdKV.Save(key, "v2") + assert.Nil(t, err) + + resp2 := <-ch + assert.Equal(t, 1, len(resp2.Events)) + assert.Equal(t, "v2", string(resp2.Events[0].Kv.Value)) + assert.Equal(t, revision+1, resp2.Header.Revision) + +} + func TestEtcdKV_CompareAndSwap(t *testing.T) { cli, err := newEtcdClient() assert.Nil(t, err)