mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 01:28:27 +08:00
Add Has interface in kv (#25439)
Signed-off-by: sunby <bingyi.sun@zilliz.com> Co-authored-by: sunby <bingyi.sun@zilliz.com>
This commit is contained in:
parent
9e3a591bda
commit
f75201bbfd
@ -139,6 +139,18 @@ func (kv *EmbedEtcdKV) LoadWithPrefix(key string) ([]string, []string, error) {
|
|||||||
return keys, values, nil
|
return keys, values, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (kv *EmbedEtcdKV) Has(key string) (bool, error) {
|
||||||
|
key = path.Join(kv.rootPath, key)
|
||||||
|
log.Debug("Has", zap.String("key", key))
|
||||||
|
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||||
|
defer cancel()
|
||||||
|
resp, err := kv.client.Get(ctx, key, clientv3.WithCountOnly())
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
return resp.Count != 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
// LoadBytesWithPrefix returns all the keys and values with the given key prefix
|
// LoadBytesWithPrefix returns all the keys and values with the given key prefix
|
||||||
func (kv *EmbedEtcdKV) LoadBytesWithPrefix(key string) ([]string, [][]byte, error) {
|
func (kv *EmbedEtcdKV) LoadBytesWithPrefix(key string) ([]string, [][]byte, error) {
|
||||||
key = path.Join(kv.rootPath, key)
|
key = path.Join(kv.rootPath, key)
|
||||||
|
|||||||
@ -838,4 +838,31 @@ func TestEmbedEtcd(te *testing.T) {
|
|||||||
testFn(100)
|
testFn(100)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
te.Run("test has", func(t *testing.T) {
|
||||||
|
rootPath := "/etcd/test/root/has"
|
||||||
|
kv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, ¶m.EtcdCfg)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
defer kv.Close()
|
||||||
|
defer kv.RemoveWithPrefix("")
|
||||||
|
|
||||||
|
has, err := kv.Has("key1")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.False(t, has)
|
||||||
|
|
||||||
|
err = kv.Save("key1", "value1")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
has, err = kv.Has("key1")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.True(t, has)
|
||||||
|
|
||||||
|
err = kv.Remove("key1")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
has, err = kv.Has("key1")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.False(t, has)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@ -120,6 +120,21 @@ func (kv *etcdKV) LoadWithPrefix(key string) ([]string, []string, error) {
|
|||||||
return keys, values, nil
|
return keys, values, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (kv *etcdKV) Has(key string) (bool, error) {
|
||||||
|
start := time.Now()
|
||||||
|
key = path.Join(kv.rootPath, key)
|
||||||
|
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
resp, err := kv.getEtcdMeta(ctx, key, clientv3.WithCountOnly())
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
CheckElapseAndWarn(start, "Slow etcd operation has", zap.String("key", key))
|
||||||
|
return resp.Count != 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
// LoadBytesWithPrefix returns all the keys and values with the given key prefix.
|
// LoadBytesWithPrefix returns all the keys and values with the given key prefix.
|
||||||
func (kv *etcdKV) LoadBytesWithPrefix(key string) ([]string, [][]byte, error) {
|
func (kv *etcdKV) LoadBytesWithPrefix(key string) ([]string, [][]byte, error) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|||||||
@ -824,3 +824,41 @@ func TestCheckTnxStringValueSizeAndWarn(t *testing.T) {
|
|||||||
ret = etcdkv.CheckTnxStringValueSizeAndWarn(kvs)
|
ret = etcdkv.CheckTnxStringValueSizeAndWarn(kvs)
|
||||||
assert.True(t, ret)
|
assert.True(t, ret)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestHas(t *testing.T) {
|
||||||
|
etcdCli, err := etcd.GetEtcdClient(
|
||||||
|
Params.EtcdCfg.UseEmbedEtcd.GetAsBool(),
|
||||||
|
Params.EtcdCfg.EtcdUseSSL.GetAsBool(),
|
||||||
|
Params.EtcdCfg.Endpoints.GetAsStrings(),
|
||||||
|
Params.EtcdCfg.EtcdTLSCert.GetValue(),
|
||||||
|
Params.EtcdCfg.EtcdTLSKey.GetValue(),
|
||||||
|
Params.EtcdCfg.EtcdTLSCACert.GetValue(),
|
||||||
|
Params.EtcdCfg.EtcdTLSMinVersion.GetValue())
|
||||||
|
defer etcdCli.Close()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
rootPath := "/etcd/test/root/has"
|
||||||
|
kv := etcdkv.NewEtcdKV(etcdCli, rootPath)
|
||||||
|
err = kv.RemoveWithPrefix("")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
defer kv.Close()
|
||||||
|
defer kv.RemoveWithPrefix("")
|
||||||
|
|
||||||
|
has, err := kv.Has("key1")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.False(t, has)
|
||||||
|
|
||||||
|
err = kv.Save("key1", "value1")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
has, err = kv.Has("key1")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.True(t, has)
|
||||||
|
|
||||||
|
err = kv.Remove("key1")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
has, err = kv.Has("key1")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.False(t, has)
|
||||||
|
}
|
||||||
|
|||||||
@ -47,6 +47,7 @@ type BaseKV interface {
|
|||||||
Remove(key string) error
|
Remove(key string) error
|
||||||
MultiRemove(keys []string) error
|
MultiRemove(keys []string) error
|
||||||
RemoveWithPrefix(key string) error
|
RemoveWithPrefix(key string) error
|
||||||
|
Has(key string) (bool, error)
|
||||||
Close()
|
Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -353,3 +353,9 @@ func (kv *MemoryKV) RemoveWithPrefix(key string) error {
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (kv *MemoryKV) Has(key string) (bool, error) {
|
||||||
|
kv.Lock()
|
||||||
|
defer kv.Unlock()
|
||||||
|
return kv.tree.Has(memoryKVItem{key: key}), nil
|
||||||
|
}
|
||||||
|
|||||||
@ -198,3 +198,25 @@ func TestMemoryKV_MultiSaveBytesAndRemoveWithPrefix(t *testing.T) {
|
|||||||
assert.ElementsMatch(t, values, _values)
|
assert.ElementsMatch(t, values, _values)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestHas(t *testing.T) {
|
||||||
|
kv := NewMemoryKV()
|
||||||
|
|
||||||
|
has, err := kv.Has("key1")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.False(t, has)
|
||||||
|
|
||||||
|
err = kv.Save("key1", "value1")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
has, err = kv.Has("key1")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.True(t, has)
|
||||||
|
|
||||||
|
err = kv.Remove("key1")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
has, err = kv.Has("key1")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.False(t, has)
|
||||||
|
}
|
||||||
|
|||||||
@ -145,6 +145,58 @@ func (_c *MetaKv_GetPath_Call) RunAndReturn(run func(string) string) *MetaKv_Get
|
|||||||
return _c
|
return _c
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Has provides a mock function with given fields: key
|
||||||
|
func (_m *MetaKv) Has(key string) (bool, error) {
|
||||||
|
ret := _m.Called(key)
|
||||||
|
|
||||||
|
var r0 bool
|
||||||
|
var r1 error
|
||||||
|
if rf, ok := ret.Get(0).(func(string) (bool, error)); ok {
|
||||||
|
return rf(key)
|
||||||
|
}
|
||||||
|
if rf, ok := ret.Get(0).(func(string) bool); ok {
|
||||||
|
r0 = rf(key)
|
||||||
|
} else {
|
||||||
|
r0 = ret.Get(0).(bool)
|
||||||
|
}
|
||||||
|
|
||||||
|
if rf, ok := ret.Get(1).(func(string) error); ok {
|
||||||
|
r1 = rf(key)
|
||||||
|
} else {
|
||||||
|
r1 = ret.Error(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
return r0, r1
|
||||||
|
}
|
||||||
|
|
||||||
|
// MetaKv_Has_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Has'
|
||||||
|
type MetaKv_Has_Call struct {
|
||||||
|
*mock.Call
|
||||||
|
}
|
||||||
|
|
||||||
|
// Has is a helper method to define mock.On call
|
||||||
|
// - key string
|
||||||
|
func (_e *MetaKv_Expecter) Has(key interface{}) *MetaKv_Has_Call {
|
||||||
|
return &MetaKv_Has_Call{Call: _e.mock.On("Has", key)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *MetaKv_Has_Call) Run(run func(key string)) *MetaKv_Has_Call {
|
||||||
|
_c.Call.Run(func(args mock.Arguments) {
|
||||||
|
run(args[0].(string))
|
||||||
|
})
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *MetaKv_Has_Call) Return(_a0 bool, _a1 error) *MetaKv_Has_Call {
|
||||||
|
_c.Call.Return(_a0, _a1)
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *MetaKv_Has_Call) RunAndReturn(run func(string) (bool, error)) *MetaKv_Has_Call {
|
||||||
|
_c.Call.Return(run)
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
// Load provides a mock function with given fields: key
|
// Load provides a mock function with given fields: key
|
||||||
func (_m *MetaKv) Load(key string) (string, error) {
|
func (_m *MetaKv) Load(key string) (string, error) {
|
||||||
ret := _m.Called(key)
|
ret := _m.Called(key)
|
||||||
|
|||||||
@ -49,6 +49,58 @@ func (_c *TxnKV_Close_Call) RunAndReturn(run func()) *TxnKV_Close_Call {
|
|||||||
return _c
|
return _c
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Has provides a mock function with given fields: key
|
||||||
|
func (_m *TxnKV) Has(key string) (bool, error) {
|
||||||
|
ret := _m.Called(key)
|
||||||
|
|
||||||
|
var r0 bool
|
||||||
|
var r1 error
|
||||||
|
if rf, ok := ret.Get(0).(func(string) (bool, error)); ok {
|
||||||
|
return rf(key)
|
||||||
|
}
|
||||||
|
if rf, ok := ret.Get(0).(func(string) bool); ok {
|
||||||
|
r0 = rf(key)
|
||||||
|
} else {
|
||||||
|
r0 = ret.Get(0).(bool)
|
||||||
|
}
|
||||||
|
|
||||||
|
if rf, ok := ret.Get(1).(func(string) error); ok {
|
||||||
|
r1 = rf(key)
|
||||||
|
} else {
|
||||||
|
r1 = ret.Error(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
return r0, r1
|
||||||
|
}
|
||||||
|
|
||||||
|
// TxnKV_Has_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Has'
|
||||||
|
type TxnKV_Has_Call struct {
|
||||||
|
*mock.Call
|
||||||
|
}
|
||||||
|
|
||||||
|
// Has is a helper method to define mock.On call
|
||||||
|
// - key string
|
||||||
|
func (_e *TxnKV_Expecter) Has(key interface{}) *TxnKV_Has_Call {
|
||||||
|
return &TxnKV_Has_Call{Call: _e.mock.On("Has", key)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *TxnKV_Has_Call) Run(run func(key string)) *TxnKV_Has_Call {
|
||||||
|
_c.Call.Run(func(args mock.Arguments) {
|
||||||
|
run(args[0].(string))
|
||||||
|
})
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *TxnKV_Has_Call) Return(_a0 bool, _a1 error) *TxnKV_Has_Call {
|
||||||
|
_c.Call.Return(_a0, _a1)
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *TxnKV_Has_Call) RunAndReturn(run func(string) (bool, error)) *TxnKV_Has_Call {
|
||||||
|
_c.Call.Return(run)
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
// Load provides a mock function with given fields: key
|
// Load provides a mock function with given fields: key
|
||||||
func (_m *TxnKV) Load(key string) (string, error) {
|
func (_m *TxnKV) Load(key string) (string, error) {
|
||||||
ret := _m.Called(key)
|
ret := _m.Called(key)
|
||||||
|
|||||||
@ -149,6 +149,58 @@ func (_c *WatchKV_GetPath_Call) RunAndReturn(run func(string) string) *WatchKV_G
|
|||||||
return _c
|
return _c
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Has provides a mock function with given fields: key
|
||||||
|
func (_m *WatchKV) Has(key string) (bool, error) {
|
||||||
|
ret := _m.Called(key)
|
||||||
|
|
||||||
|
var r0 bool
|
||||||
|
var r1 error
|
||||||
|
if rf, ok := ret.Get(0).(func(string) (bool, error)); ok {
|
||||||
|
return rf(key)
|
||||||
|
}
|
||||||
|
if rf, ok := ret.Get(0).(func(string) bool); ok {
|
||||||
|
r0 = rf(key)
|
||||||
|
} else {
|
||||||
|
r0 = ret.Get(0).(bool)
|
||||||
|
}
|
||||||
|
|
||||||
|
if rf, ok := ret.Get(1).(func(string) error); ok {
|
||||||
|
r1 = rf(key)
|
||||||
|
} else {
|
||||||
|
r1 = ret.Error(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
return r0, r1
|
||||||
|
}
|
||||||
|
|
||||||
|
// WatchKV_Has_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Has'
|
||||||
|
type WatchKV_Has_Call struct {
|
||||||
|
*mock.Call
|
||||||
|
}
|
||||||
|
|
||||||
|
// Has is a helper method to define mock.On call
|
||||||
|
// - key string
|
||||||
|
func (_e *WatchKV_Expecter) Has(key interface{}) *WatchKV_Has_Call {
|
||||||
|
return &WatchKV_Has_Call{Call: _e.mock.On("Has", key)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *WatchKV_Has_Call) Run(run func(key string)) *WatchKV_Has_Call {
|
||||||
|
_c.Call.Run(func(args mock.Arguments) {
|
||||||
|
run(args[0].(string))
|
||||||
|
})
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *WatchKV_Has_Call) Return(_a0 bool, _a1 error) *WatchKV_Has_Call {
|
||||||
|
_c.Call.Return(_a0, _a1)
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *WatchKV_Has_Call) RunAndReturn(run func(string) (bool, error)) *WatchKV_Has_Call {
|
||||||
|
_c.Call.Return(run)
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
// Load provides a mock function with given fields: key
|
// Load provides a mock function with given fields: key
|
||||||
func (_m *WatchKV) Load(key string) (string, error) {
|
func (_m *WatchKV) Load(key string) (string, error) {
|
||||||
ret := _m.Called(key)
|
ret := _m.Called(key)
|
||||||
|
|||||||
@ -162,6 +162,22 @@ func (kv *RocksdbKV) LoadWithPrefix(prefix string) ([]string, []string, error) {
|
|||||||
return keys, values, nil
|
return keys, values, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (kv *RocksdbKV) Has(key string) (bool, error) {
|
||||||
|
if kv.DB == nil {
|
||||||
|
return false, fmt.Errorf("rocksdb instance is nil when check if has %s", key)
|
||||||
|
}
|
||||||
|
|
||||||
|
option := gorocksdb.NewDefaultReadOptions()
|
||||||
|
defer option.Destroy()
|
||||||
|
|
||||||
|
value, err := kv.DB.Get(option, []byte(key))
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return value.Size() != 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (kv *RocksdbKV) LoadBytesWithPrefix(prefix string) ([]string, [][]byte, error) {
|
func (kv *RocksdbKV) LoadBytesWithPrefix(prefix string) ([]string, [][]byte, error) {
|
||||||
if kv.DB == nil {
|
if kv.DB == nil {
|
||||||
return nil, nil, fmt.Errorf("rocksdb instance is nil when load %s", prefix)
|
return nil, nil, fmt.Errorf("rocksdb instance is nil when load %s", prefix)
|
||||||
|
|||||||
@ -341,3 +341,25 @@ func TestRocksdbKV_CornerCase(t *testing.T) {
|
|||||||
err = rocksdbkv.DeleteRange("a", "a")
|
err = rocksdbkv.DeleteRange("a", "a")
|
||||||
assert.Error(t, err)
|
assert.Error(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestHas(t *testing.T) {
|
||||||
|
dir := t.TempDir()
|
||||||
|
db, err := rocksdbkv.NewRocksdbKV(dir)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
defer db.Close()
|
||||||
|
defer db.RemoveWithPrefix("")
|
||||||
|
|
||||||
|
has, err := db.Has("key1")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.False(t, has)
|
||||||
|
err = db.Save("key1", "value1")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
has, err = db.Has("key1")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.True(t, has)
|
||||||
|
err = db.Remove("key1")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
has, err = db.Has("key1")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.False(t, has)
|
||||||
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user