diff --git a/internal/kv/etcd/embed_etcd_kv.go b/internal/kv/etcd/embed_etcd_kv.go index 205b0123d1..1803be5883 100644 --- a/internal/kv/etcd/embed_etcd_kv.go +++ b/internal/kv/etcd/embed_etcd_kv.go @@ -139,6 +139,18 @@ func (kv *EmbedEtcdKV) LoadWithPrefix(key string) ([]string, []string, error) { return keys, values, nil } +func (kv *EmbedEtcdKV) Has(key string) (bool, error) { + key = path.Join(kv.rootPath, key) + log.Debug("Has", zap.String("key", key)) + ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) + defer cancel() + resp, err := kv.client.Get(ctx, key, clientv3.WithCountOnly()) + if err != nil { + return false, err + } + return resp.Count != 0, nil +} + // LoadBytesWithPrefix returns all the keys and values with the given key prefix func (kv *EmbedEtcdKV) LoadBytesWithPrefix(key string) ([]string, [][]byte, error) { key = path.Join(kv.rootPath, key) diff --git a/internal/kv/etcd/embed_etcd_kv_test.go b/internal/kv/etcd/embed_etcd_kv_test.go index 395c681055..17711c625a 100644 --- a/internal/kv/etcd/embed_etcd_kv_test.go +++ b/internal/kv/etcd/embed_etcd_kv_test.go @@ -838,4 +838,31 @@ func TestEmbedEtcd(te *testing.T) { testFn(100) }) }) + + te.Run("test has", func(t *testing.T) { + rootPath := "/etcd/test/root/has" + kv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, ¶m.EtcdCfg) + assert.NoError(t, err) + + defer kv.Close() + defer kv.RemoveWithPrefix("") + + has, err := kv.Has("key1") + assert.NoError(t, err) + assert.False(t, has) + + err = kv.Save("key1", "value1") + assert.NoError(t, err) + + has, err = kv.Has("key1") + assert.NoError(t, err) + assert.True(t, has) + + err = kv.Remove("key1") + assert.NoError(t, err) + + has, err = kv.Has("key1") + assert.NoError(t, err) + assert.False(t, has) + }) } diff --git a/internal/kv/etcd/etcd_kv.go b/internal/kv/etcd/etcd_kv.go index 506d72c9a0..37f90b8be6 100644 --- a/internal/kv/etcd/etcd_kv.go +++ b/internal/kv/etcd/etcd_kv.go @@ -120,6 +120,21 @@ func (kv *etcdKV) LoadWithPrefix(key string) ([]string, []string, error) { return keys, values, nil } +func (kv *etcdKV) Has(key string) (bool, error) { + start := time.Now() + key = path.Join(kv.rootPath, key) + ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) + defer cancel() + + resp, err := kv.getEtcdMeta(ctx, key, clientv3.WithCountOnly()) + if err != nil { + return false, err + } + + CheckElapseAndWarn(start, "Slow etcd operation has", zap.String("key", key)) + return resp.Count != 0, nil +} + // LoadBytesWithPrefix returns all the keys and values with the given key prefix. func (kv *etcdKV) LoadBytesWithPrefix(key string) ([]string, [][]byte, error) { start := time.Now() diff --git a/internal/kv/etcd/etcd_kv_test.go b/internal/kv/etcd/etcd_kv_test.go index e3af5b57cc..d7f37b7ed7 100644 --- a/internal/kv/etcd/etcd_kv_test.go +++ b/internal/kv/etcd/etcd_kv_test.go @@ -824,3 +824,41 @@ func TestCheckTnxStringValueSizeAndWarn(t *testing.T) { ret = etcdkv.CheckTnxStringValueSizeAndWarn(kvs) assert.True(t, ret) } + +func TestHas(t *testing.T) { + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd.GetAsBool(), + Params.EtcdCfg.EtcdUseSSL.GetAsBool(), + Params.EtcdCfg.Endpoints.GetAsStrings(), + Params.EtcdCfg.EtcdTLSCert.GetValue(), + Params.EtcdCfg.EtcdTLSKey.GetValue(), + Params.EtcdCfg.EtcdTLSCACert.GetValue(), + Params.EtcdCfg.EtcdTLSMinVersion.GetValue()) + defer etcdCli.Close() + assert.NoError(t, err) + rootPath := "/etcd/test/root/has" + kv := etcdkv.NewEtcdKV(etcdCli, rootPath) + err = kv.RemoveWithPrefix("") + require.NoError(t, err) + + defer kv.Close() + defer kv.RemoveWithPrefix("") + + has, err := kv.Has("key1") + assert.NoError(t, err) + assert.False(t, has) + + err = kv.Save("key1", "value1") + assert.NoError(t, err) + + has, err = kv.Has("key1") + assert.NoError(t, err) + assert.True(t, has) + + err = kv.Remove("key1") + assert.NoError(t, err) + + has, err = kv.Has("key1") + assert.NoError(t, err) + assert.False(t, has) +} diff --git a/internal/kv/kv.go b/internal/kv/kv.go index 90ee781f56..8cdd0de1ce 100644 --- a/internal/kv/kv.go +++ b/internal/kv/kv.go @@ -47,6 +47,7 @@ type BaseKV interface { Remove(key string) error MultiRemove(keys []string) error RemoveWithPrefix(key string) error + Has(key string) (bool, error) Close() } diff --git a/internal/kv/mem/mem_kv.go b/internal/kv/mem/mem_kv.go index cec49dae80..3710c32036 100644 --- a/internal/kv/mem/mem_kv.go +++ b/internal/kv/mem/mem_kv.go @@ -353,3 +353,9 @@ func (kv *MemoryKV) RemoveWithPrefix(key string) error { } return nil } + +func (kv *MemoryKV) Has(key string) (bool, error) { + kv.Lock() + defer kv.Unlock() + return kv.tree.Has(memoryKVItem{key: key}), nil +} diff --git a/internal/kv/mem/mem_kv_test.go b/internal/kv/mem/mem_kv_test.go index 6e7140abc2..d0a7343f2b 100644 --- a/internal/kv/mem/mem_kv_test.go +++ b/internal/kv/mem/mem_kv_test.go @@ -198,3 +198,25 @@ func TestMemoryKV_MultiSaveBytesAndRemoveWithPrefix(t *testing.T) { assert.ElementsMatch(t, values, _values) assert.NoError(t, err) } + +func TestHas(t *testing.T) { + kv := NewMemoryKV() + + has, err := kv.Has("key1") + assert.NoError(t, err) + assert.False(t, has) + + err = kv.Save("key1", "value1") + assert.NoError(t, err) + + has, err = kv.Has("key1") + assert.NoError(t, err) + assert.True(t, has) + + err = kv.Remove("key1") + assert.NoError(t, err) + + has, err = kv.Has("key1") + assert.NoError(t, err) + assert.False(t, has) +} diff --git a/internal/kv/mocks/MetaKv.go b/internal/kv/mocks/MetaKv.go index abfd363ddb..863b76f151 100644 --- a/internal/kv/mocks/MetaKv.go +++ b/internal/kv/mocks/MetaKv.go @@ -145,6 +145,58 @@ func (_c *MetaKv_GetPath_Call) RunAndReturn(run func(string) string) *MetaKv_Get return _c } +// Has provides a mock function with given fields: key +func (_m *MetaKv) Has(key string) (bool, error) { + ret := _m.Called(key) + + var r0 bool + var r1 error + if rf, ok := ret.Get(0).(func(string) (bool, error)); ok { + return rf(key) + } + if rf, ok := ret.Get(0).(func(string) bool); ok { + r0 = rf(key) + } else { + r0 = ret.Get(0).(bool) + } + + if rf, ok := ret.Get(1).(func(string) error); ok { + r1 = rf(key) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MetaKv_Has_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Has' +type MetaKv_Has_Call struct { + *mock.Call +} + +// Has is a helper method to define mock.On call +// - key string +func (_e *MetaKv_Expecter) Has(key interface{}) *MetaKv_Has_Call { + return &MetaKv_Has_Call{Call: _e.mock.On("Has", key)} +} + +func (_c *MetaKv_Has_Call) Run(run func(key string)) *MetaKv_Has_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string)) + }) + return _c +} + +func (_c *MetaKv_Has_Call) Return(_a0 bool, _a1 error) *MetaKv_Has_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MetaKv_Has_Call) RunAndReturn(run func(string) (bool, error)) *MetaKv_Has_Call { + _c.Call.Return(run) + return _c +} + // Load provides a mock function with given fields: key func (_m *MetaKv) Load(key string) (string, error) { ret := _m.Called(key) diff --git a/internal/kv/mocks/TxnKV.go b/internal/kv/mocks/TxnKV.go index 0da4bcdd13..9fd7224b1e 100644 --- a/internal/kv/mocks/TxnKV.go +++ b/internal/kv/mocks/TxnKV.go @@ -49,6 +49,58 @@ func (_c *TxnKV_Close_Call) RunAndReturn(run func()) *TxnKV_Close_Call { return _c } +// Has provides a mock function with given fields: key +func (_m *TxnKV) Has(key string) (bool, error) { + ret := _m.Called(key) + + var r0 bool + var r1 error + if rf, ok := ret.Get(0).(func(string) (bool, error)); ok { + return rf(key) + } + if rf, ok := ret.Get(0).(func(string) bool); ok { + r0 = rf(key) + } else { + r0 = ret.Get(0).(bool) + } + + if rf, ok := ret.Get(1).(func(string) error); ok { + r1 = rf(key) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// TxnKV_Has_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Has' +type TxnKV_Has_Call struct { + *mock.Call +} + +// Has is a helper method to define mock.On call +// - key string +func (_e *TxnKV_Expecter) Has(key interface{}) *TxnKV_Has_Call { + return &TxnKV_Has_Call{Call: _e.mock.On("Has", key)} +} + +func (_c *TxnKV_Has_Call) Run(run func(key string)) *TxnKV_Has_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string)) + }) + return _c +} + +func (_c *TxnKV_Has_Call) Return(_a0 bool, _a1 error) *TxnKV_Has_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *TxnKV_Has_Call) RunAndReturn(run func(string) (bool, error)) *TxnKV_Has_Call { + _c.Call.Return(run) + return _c +} + // Load provides a mock function with given fields: key func (_m *TxnKV) Load(key string) (string, error) { ret := _m.Called(key) diff --git a/internal/kv/mocks/WatchKV.go b/internal/kv/mocks/WatchKV.go index 317d60b29e..af2dfc9768 100644 --- a/internal/kv/mocks/WatchKV.go +++ b/internal/kv/mocks/WatchKV.go @@ -149,6 +149,58 @@ func (_c *WatchKV_GetPath_Call) RunAndReturn(run func(string) string) *WatchKV_G return _c } +// Has provides a mock function with given fields: key +func (_m *WatchKV) Has(key string) (bool, error) { + ret := _m.Called(key) + + var r0 bool + var r1 error + if rf, ok := ret.Get(0).(func(string) (bool, error)); ok { + return rf(key) + } + if rf, ok := ret.Get(0).(func(string) bool); ok { + r0 = rf(key) + } else { + r0 = ret.Get(0).(bool) + } + + if rf, ok := ret.Get(1).(func(string) error); ok { + r1 = rf(key) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// WatchKV_Has_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Has' +type WatchKV_Has_Call struct { + *mock.Call +} + +// Has is a helper method to define mock.On call +// - key string +func (_e *WatchKV_Expecter) Has(key interface{}) *WatchKV_Has_Call { + return &WatchKV_Has_Call{Call: _e.mock.On("Has", key)} +} + +func (_c *WatchKV_Has_Call) Run(run func(key string)) *WatchKV_Has_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string)) + }) + return _c +} + +func (_c *WatchKV_Has_Call) Return(_a0 bool, _a1 error) *WatchKV_Has_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *WatchKV_Has_Call) RunAndReturn(run func(string) (bool, error)) *WatchKV_Has_Call { + _c.Call.Return(run) + return _c +} + // Load provides a mock function with given fields: key func (_m *WatchKV) Load(key string) (string, error) { ret := _m.Called(key) diff --git a/internal/kv/rocksdb/rocksdb_kv.go b/internal/kv/rocksdb/rocksdb_kv.go index d1ef2a7a0b..9ac091aed0 100644 --- a/internal/kv/rocksdb/rocksdb_kv.go +++ b/internal/kv/rocksdb/rocksdb_kv.go @@ -162,6 +162,22 @@ func (kv *RocksdbKV) LoadWithPrefix(prefix string) ([]string, []string, error) { return keys, values, nil } +func (kv *RocksdbKV) Has(key string) (bool, error) { + if kv.DB == nil { + return false, fmt.Errorf("rocksdb instance is nil when check if has %s", key) + } + + option := gorocksdb.NewDefaultReadOptions() + defer option.Destroy() + + value, err := kv.DB.Get(option, []byte(key)) + if err != nil { + return false, err + } + + return value.Size() != 0, nil +} + func (kv *RocksdbKV) LoadBytesWithPrefix(prefix string) ([]string, [][]byte, error) { if kv.DB == nil { return nil, nil, fmt.Errorf("rocksdb instance is nil when load %s", prefix) diff --git a/internal/kv/rocksdb/rocksdb_kv_test.go b/internal/kv/rocksdb/rocksdb_kv_test.go index 57b59c3b5c..be8becd24d 100644 --- a/internal/kv/rocksdb/rocksdb_kv_test.go +++ b/internal/kv/rocksdb/rocksdb_kv_test.go @@ -341,3 +341,25 @@ func TestRocksdbKV_CornerCase(t *testing.T) { err = rocksdbkv.DeleteRange("a", "a") assert.Error(t, err) } + +func TestHas(t *testing.T) { + dir := t.TempDir() + db, err := rocksdbkv.NewRocksdbKV(dir) + assert.NoError(t, err) + defer db.Close() + defer db.RemoveWithPrefix("") + + has, err := db.Has("key1") + assert.NoError(t, err) + assert.False(t, has) + err = db.Save("key1", "value1") + assert.NoError(t, err) + has, err = db.Has("key1") + assert.NoError(t, err) + assert.True(t, has) + err = db.Remove("key1") + assert.NoError(t, err) + has, err = db.Has("key1") + assert.NoError(t, err) + assert.False(t, has) +}