diff --git a/docs/design_docs/milvus_meta_snapshot_en.md b/docs/design_docs/milvus_meta_snapshot_en.md deleted file mode 100644 index b4b1c8da5f..0000000000 --- a/docs/design_docs/milvus_meta_snapshot_en.md +++ /dev/null @@ -1,109 +0,0 @@ -# metaSnapShot - -`metaSnapShot` enables `RootCoord` to query historical meta based on timestamp, it provides `Key-Vaule` interface. Take an example to illustrate what `metaSnapShot` is. The following figure shows a series of operations happened on the timeline. - -![snap_shot](./graphs/snapshot_1.png) - -| Timestamp | Operation | -|-----------|-----------| -| 100 | Set A=1 | -| 200 | Set B=2 | -| 300 | Set C=3 | -| 400 | Set A=10 | -| 500 | Delete B | -| 600 | Delete C | - -Now assuming the Wall-Clock is `Timestamp=700`, so `B` should have been deleted from the system. But I want to know the value of `B` at `Timesamp=450`, how to do it? `metaSnapShot` is invented to solve this problem. - -We need to briefly introduce `etcd`'s `MVCC` before `metaSnapShot`. Here is the test program: - -```go -package etcdkv - -import ( - "context" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "go.etcd.io/etcd/clientv3" -) - -func TestMVCC(t *testing.T) { - addr := []string{"127.0.0.1:2379"} - cli, err := clientv3.New(clientv3.Config{Endpoints: addr}) - assert.Nil(t, err) - assert.NotNil(t, cli) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - testKey := "test-key" - - rsp0, err := cli.Delete(ctx, testKey) - assert.Nil(t, err) - t.Logf("revision:%d", rsp0.Header.Revision) - - rsp1, err := cli.Put(ctx, testKey, "value1") - assert.Nil(t, err) - t.Logf("revision:%d,value1", rsp1.Header.Revision) - - rsp2, err := cli.Put(ctx, testKey, "value2") - assert.Nil(t, err) - t.Logf("revision:%d,value2", rsp2.Header.Revision) - - rsp3, err := cli.Get(ctx, testKey, clientv3.WithRev(rsp1.Header.Revision)) - assert.Nil(t, err) - t.Logf("get at revision:%d, value=%s", rsp1.Header.Revision, string(rsp3.Kvs[0].Value)) - -} -``` - -The output of above test program should look like this: -```text -=== RUN TestMVCC - etcd_mvcc_test.go:23: revision:401 - etcd_mvcc_test.go:27: revision:402,value1 - etcd_mvcc_test.go:31: revision:403,value2 - etcd_mvcc_test.go:35: get at revision:402, value=value1 ---- PASS: TestMVCC (0.01s) -``` - -In `etcd`, each writes operation would add `1` to `Revision`. So if we specify the `Revision` value at query, we can get the historical value under that `Revision`. - -`metaSnapShot` is based on this feature of `etcd`. We will write an extra `Timestamp` on each write operation. `etcd`'s `Txn` makes sure that the `Timestamp` would have the same `Revision` with user data. - -When querying, `metaSnapShot` will find an appropriate `Revision` based on the input `Timestamp`, and then query on `etcd` with this `Revision`. - -In order to speed up getting `Revision` by `Timestamp`, `metaSnapShot` would maintain an array mapping the `Timestamp` to `Revision`. The default length of this array is `1024`, which is a type of circular array. - -![snap_shot](./graphs/snapshot_2.png) - -- `maxPos` points to the position where `Timestamp` and `Revision` are maximum. -- `minPos` points to the position where `Timestamp` and `Revision` are minimum. -- For each update operation, we first add `1` to `maxPos`. So the new `maxPos` would cover the old `minPos` position, and then add `1` to the old `minPos` -- From `0` to `maxPos` and from `minPos` to `1023`, which are two incremental arrays. We can use binary search to quickly get the `Revision` by the input `Timestamp` -- If the input `Timestamp` is greater than the `Timestamp` where the `maxPos` is located, then the `Revision` at the position of the `maxPos` will be returned -- If the input `Timestamp` is less than the `Timestamp` where `minPos` is located, `metaSnapshot` will load the historical `Timestamp` and `Revision` from `etcd` to find an appropriate `Revision` value. - -The interface of `metaSnapShot` is defined as follows: -```go -type SnapShotKV interface { - Load(key string, ts typeutil.Timestamp) (string, error) - LoadWithPrefix(key string, ts typeutil.Timestamp) ([]string, []string, error) - - Save(key, value string) (typeutil.Timestamp, error) - MultiSave(kvs map[string]string, additions ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) - MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, additions ...func(ts typeutil.Timestamp) (string, string, error)) (typeutil.Timestamp, error) -} -``` - -For the `Read` operations (`Load` and `LoadWithPrefix`), the input parameter `typeutil.Timestamp` is used to tell `metaSnapShot` to load the value based on that `Timestamp`. - -For the `Write` operations (`Save`, `MultiSave`, `MultiSaveAndRemoveWithPrefix`), return values including `typeutil.Timestamp`, which is used to tell the caller when these write operations happened. - -You might be curious about the parameter `additions` of `MultiSave` and `MultiSaveAndRemoveWithPrefix`: What does `additions` do, and why? - -`additions` is an array of `func(ts typeutil.Timestamp) (string, string, error)`. So it's a function, receiving `typeutil.Timestamp` as an input, and returns two `string` which is `key-value` pair. If `error` is `nil` in the return value, `metaSnapShot` would write this `key-value` pair into `etcd`. - -Referring to the document of `CreateCollection`, a timestamp is created for `Collection`, which is the timestamp when the `Collection`'s meta have been written into `etcd`, not the timestamp when `RootCoord` receives the request. So before writing the `Collection`'s meta into `etcd`, `metaSnapshot` would allocate a timestamp, and call all the `additions`. This would make sure the timestamp created for the `Collection` is correct. - -![create_collection](./graphs/dml_create_collection.png) diff --git a/internal/core/src/query/deprecated/ValidationUtil.cpp b/internal/core/src/query/deprecated/ValidationUtil.cpp index a700d47db7..629ae43032 100644 --- a/internal/core/src/query/deprecated/ValidationUtil.cpp +++ b/internal/core/src/query/deprecated/ValidationUtil.cpp @@ -11,8 +11,6 @@ #include "ValidationUtil.h" #include "config/ServerConfig.h" -//#include "db/Constants.h" -//#include "db/Utils.h" #include "knowhere/index/vector_index/ConfAdapter.h" #include "knowhere/index/vector_index/helpers/IndexParameter.h" #include "utils/Log.h" diff --git a/internal/indexcoord/index_coord_mock.go b/internal/indexcoord/index_coord_mock.go index 070ae2c5f3..6f8cb6b64d 100644 --- a/internal/indexcoord/index_coord_mock.go +++ b/internal/indexcoord/index_coord_mock.go @@ -520,6 +520,7 @@ type mockETCDKV struct { loadWithPrefix func(key string) ([]string, []string, error) loadWithRevision func(key string) ([]string, []string, int64, error) removeWithPrefix func(key string) error + walkWithPrefix func(prefix string, paginationSize int, fn func([]byte, []byte) error) error } func NewMockEtcdKV() *mockETCDKV { @@ -551,6 +552,9 @@ func NewMockEtcdKV() *mockETCDKV { removeWithPrefix: func(key string) error { return nil }, + walkWithPrefix: func(prefix string, paginationSize int, fn func([]byte, []byte) error) error { + return nil + }, } } @@ -589,6 +593,10 @@ func NewMockEtcdKVWithReal(real kv.MetaKv) *mockETCDKV { } } +func (mk *mockETCDKV) WalkWithPrefix(prefix string, paginationSize int, fn func([]byte, []byte) error) error { + return mk.walkWithPrefix(prefix, paginationSize, fn) +} + func (mk *mockETCDKV) Save(key string, value string) error { return mk.save(key, value) } diff --git a/internal/kv/etcd/embed_etcd_kv.go b/internal/kv/etcd/embed_etcd_kv.go index 85c9539a14..8d598989ea 100644 --- a/internal/kv/etcd/embed_etcd_kv.go +++ b/internal/kv/etcd/embed_etcd_kv.go @@ -24,13 +24,12 @@ import ( "sync" "time" - "github.com/milvus-io/milvus/internal/common" - clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/server/v3/embed" "go.etcd.io/etcd/server/v3/etcdserver/api/v3client" "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/kv" "github.com/milvus-io/milvus/internal/log" ) @@ -86,6 +85,40 @@ func (kv *EmbedEtcdKV) GetPath(key string) string { return path.Join(kv.rootPath, key) } +func (kv *EmbedEtcdKV) WalkWithPrefix(prefix string, paginationSize int, fn func([]byte, []byte) error) error { + prefix = path.Join(kv.rootPath, prefix) + ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) + defer cancel() + + batch := int64(paginationSize) + opts := []clientv3.OpOption{ + clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend), + clientv3.WithLimit(batch), + clientv3.WithRange(clientv3.GetPrefixRangeEnd(prefix)), + } + + key := prefix + for { + resp, err := kv.client.Get(ctx, key, opts...) + if err != nil { + return err + } + + for _, kv := range resp.Kvs { + if err = fn(kv.Key, kv.Value); err != nil { + return err + } + } + + if !resp.More { + break + } + // move to next key + key = string(append(resp.Kvs[len(resp.Kvs)-1].Key, 0)) + } + return nil +} + // LoadWithPrefix returns all the keys and values with the given key prefix func (kv *EmbedEtcdKV) LoadWithPrefix(key string) ([]string, []string, error) { key = path.Join(kv.rootPath, key) diff --git a/internal/kv/etcd/embed_etcd_kv_test.go b/internal/kv/etcd/embed_etcd_kv_test.go index 2797716265..c206bccdfe 100644 --- a/internal/kv/etcd/embed_etcd_kv_test.go +++ b/internal/kv/etcd/embed_etcd_kv_test.go @@ -17,16 +17,20 @@ package etcdkv_test import ( + "errors" + "fmt" + "sort" "testing" "time" - "github.com/milvus-io/milvus/internal/util/metricsinfo" - - embed_etcd_kv "github.com/milvus-io/milvus/internal/kv/etcd" - "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" + "golang.org/x/exp/maps" + + embed_etcd_kv "github.com/milvus-io/milvus/internal/kv/etcd" + "github.com/milvus-io/milvus/internal/util/metricsinfo" + "github.com/milvus-io/milvus/internal/util/paramtable" ) func TestEmbedEtcd(te *testing.T) { @@ -954,4 +958,79 @@ func TestEmbedEtcd(te *testing.T) { assert.Error(t, err) } }) + + te.Run("Etcd WalkWithPagination", func(t *testing.T) { + rootPath := "/etcd/test/root/walkWithPagination" + metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, ¶m.EtcdCfg) + assert.Nil(t, err) + + defer metaKv.Close() + defer metaKv.RemoveWithPrefix("") + + kvs := map[string]string{ + "A/100": "v1", + "AA/100": "v2", + "AB/100": "v3", + "AB/2/100": "v4", + "B/100": "v5", + } + + err = metaKv.MultiSave(kvs) + assert.NoError(t, err) + for k, v := range kvs { + actualV, err := metaKv.Load(k) + assert.NoError(t, err) + assert.Equal(t, v, actualV) + } + + t.Run("apply function error ", func(t *testing.T) { + err = metaKv.WalkWithPrefix("A", 5, func(key []byte, value []byte) error { + return errors.New("error") + }) + assert.Error(t, err) + }) + + t.Run("get with non-exist prefix ", func(t *testing.T) { + err = metaKv.WalkWithPrefix("non-exist-prefix", 5, func(key []byte, value []byte) error { + return nil + }) + assert.NoError(t, err) + }) + + t.Run("with different pagination", func(t *testing.T) { + testFn := func(pagination int) { + expected := map[string]string{ + "A/100": "v1", + "AA/100": "v2", + "AB/100": "v3", + "AB/2/100": "v4", + } + + expectedSortedKey := maps.Keys(expected) + sort.Strings(expectedSortedKey) + + ret := make(map[string]string) + actualSortedKey := make([]string, 0) + + err = metaKv.WalkWithPrefix("A", pagination, func(key []byte, value []byte) error { + k := string(key) + k = k[len(rootPath)+1:] + ret[k] = string(value) + actualSortedKey = append(actualSortedKey, k) + return nil + }) + + assert.NoError(t, err) + assert.Equal(t, expected, ret, fmt.Errorf("pagination: %d", pagination)) + assert.Equal(t, expectedSortedKey, actualSortedKey, fmt.Errorf("pagination: %d", pagination)) + } + + testFn(-100) + testFn(-1) + testFn(0) + testFn(1) + testFn(5) + testFn(100) + }) + }) } diff --git a/internal/kv/etcd/etcd_kv.go b/internal/kv/etcd/etcd_kv.go index 23c9e2d3e2..c5aff77585 100644 --- a/internal/kv/etcd/etcd_kv.go +++ b/internal/kv/etcd/etcd_kv.go @@ -64,6 +64,43 @@ func (kv *EtcdKV) GetPath(key string) string { return path.Join(kv.rootPath, key) } +func (kv *EtcdKV) WalkWithPrefix(prefix string, paginationSize int, fn func([]byte, []byte) error) error { + start := time.Now() + prefix = path.Join(kv.rootPath, prefix) + ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) + defer cancel() + + batch := int64(paginationSize) + opts := []clientv3.OpOption{ + clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend), + clientv3.WithLimit(batch), + clientv3.WithRange(clientv3.GetPrefixRangeEnd(prefix)), + } + + key := prefix + for { + resp, err := kv.client.Get(ctx, key, opts...) + if err != nil { + return err + } + + for _, kv := range resp.Kvs { + if err = fn(kv.Key, kv.Value); err != nil { + return err + } + } + + if !resp.More { + break + } + // move to next key + key = string(append(resp.Kvs[len(resp.Kvs)-1].Key, 0)) + } + + CheckElapseAndWarn(start, "Slow etcd operation(WalkWithPagination)", zap.String("prefix", prefix)) + return nil +} + // LoadWithPrefix returns all the keys and values with the given key prefix. func (kv *EtcdKV) LoadWithPrefix(key string) ([]string, []string, error) { start := time.Now() diff --git a/internal/kv/etcd/etcd_kv_test.go b/internal/kv/etcd/etcd_kv_test.go index e5e00c5f2c..94921d2381 100644 --- a/internal/kv/etcd/etcd_kv_test.go +++ b/internal/kv/etcd/etcd_kv_test.go @@ -17,18 +17,22 @@ package etcdkv_test import ( + "errors" + "fmt" "os" + "sort" "testing" "time" - "github.com/milvus-io/milvus/internal/util/funcutil" - - etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" - "github.com/milvus-io/milvus/internal/util/etcd" - "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" + "golang.org/x/exp/maps" + + etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" + "github.com/milvus-io/milvus/internal/util/etcd" + "github.com/milvus-io/milvus/internal/util/funcutil" + "github.com/milvus-io/milvus/internal/util/paramtable" ) var Params paramtable.ComponentParam @@ -909,6 +913,91 @@ func TestEtcdKV_Load(te *testing.T) { }) } +func Test_WalkWithPagination(t *testing.T) { + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd.GetAsBool(), + Params.EtcdCfg.EtcdUseSSL.GetAsBool(), + Params.EtcdCfg.Endpoints.GetAsStrings(), + Params.EtcdCfg.EtcdTLSCert.GetValue(), + Params.EtcdCfg.EtcdTLSKey.GetValue(), + Params.EtcdCfg.EtcdTLSCACert.GetValue(), + Params.EtcdCfg.EtcdTLSMinVersion.GetValue()) + defer etcdCli.Close() + assert.NoError(t, err) + + rootPath := "/etcd/test/root/pagination" + etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath) + + defer etcdKV.Close() + defer etcdKV.RemoveWithPrefix("") + + kvs := map[string]string{ + "A/100": "v1", + "AA/100": "v2", + "AB/100": "v3", + "AB/2/100": "v4", + "B/100": "v5", + } + + err = etcdKV.MultiSave(kvs) + assert.NoError(t, err) + for k, v := range kvs { + actualV, err := etcdKV.Load(k) + assert.NoError(t, err) + assert.Equal(t, v, actualV) + } + + t.Run("apply function error ", func(t *testing.T) { + err = etcdKV.WalkWithPrefix("A", 5, func(key []byte, value []byte) error { + return errors.New("error") + }) + assert.Error(t, err) + }) + + t.Run("get with non-exist prefix ", func(t *testing.T) { + err = etcdKV.WalkWithPrefix("non-exist-prefix", 5, func(key []byte, value []byte) error { + return nil + }) + assert.NoError(t, err) + }) + + t.Run("with different pagination", func(t *testing.T) { + testFn := func(pagination int) { + expected := map[string]string{ + "A/100": "v1", + "AA/100": "v2", + "AB/100": "v3", + "AB/2/100": "v4", + } + + expectedSortedKey := maps.Keys(expected) + sort.Strings(expectedSortedKey) + + ret := make(map[string]string) + actualSortedKey := make([]string, 0) + + err = etcdKV.WalkWithPrefix("A", pagination, func(key []byte, value []byte) error { + k := string(key) + k = k[len(rootPath)+1:] + ret[k] = string(value) + actualSortedKey = append(actualSortedKey, k) + return nil + }) + + assert.NoError(t, err) + assert.Equal(t, expected, ret, fmt.Errorf("pagination: %d", pagination)) + assert.Equal(t, expectedSortedKey, actualSortedKey, fmt.Errorf("pagination: %d", pagination)) + } + + testFn(-100) + testFn(-1) + testFn(0) + testFn(1) + testFn(5) + testFn(100) + }) +} + func TestElapse(t *testing.T) { start := time.Now() isElapse := etcdkv.CheckElapseAndWarn(start, "err message") diff --git a/internal/kv/kv.go b/internal/kv/kv.go index 2d5cd7874f..c29e2f4518 100644 --- a/internal/kv/kv.go +++ b/internal/kv/kv.go @@ -17,8 +17,9 @@ package kv import ( - "github.com/milvus-io/milvus/internal/util/typeutil" clientv3 "go.etcd.io/etcd/client/v3" + + "github.com/milvus-io/milvus/internal/util/typeutil" ) // CompareFailedError is a helper type for checking MetaKv CompareAndSwap series func error type @@ -46,7 +47,6 @@ type BaseKV interface { Remove(key string) error MultiRemove(keys []string) error RemoveWithPrefix(key string) error - Close() } @@ -59,6 +59,7 @@ type TxnKV interface { MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error } +//go:generate mockery --name=MetaKv --with-expecter // MetaKv is TxnKV for metadata. It should save data with lease. type MetaKv interface { TxnKV @@ -76,6 +77,7 @@ type MetaKv interface { KeepAlive(id clientv3.LeaseID) (<-chan *clientv3.LeaseKeepAliveResponse, error) CompareValueAndSwap(key, value, target string, opts ...clientv3.OpOption) (bool, error) CompareVersionAndSwap(key string, version int64, target string, opts ...clientv3.OpOption) (bool, error) + WalkWithPrefix(prefix string, paginationSize int, fn func([]byte, []byte) error) error } //go:generate mockery --name=SnapShotKV --with-expecter diff --git a/internal/kv/mem/mem_kv.go b/internal/kv/mem/mem_kv.go index 4953de984b..91c094cc8d 100644 --- a/internal/kv/mem/mem_kv.go +++ b/internal/kv/mem/mem_kv.go @@ -20,9 +20,9 @@ import ( "strings" "sync" - "github.com/milvus-io/milvus/internal/common" - "github.com/google/btree" + + "github.com/milvus-io/milvus/internal/common" ) // MemoryKV implements BaseKv interface and relies on underling btree.BTree. diff --git a/internal/kv/mocks/MetaKv.go b/internal/kv/mocks/MetaKv.go new file mode 100644 index 0000000000..7e81d7c02b --- /dev/null +++ b/internal/kv/mocks/MetaKv.go @@ -0,0 +1,1188 @@ +// Code generated by mockery v2.16.0. DO NOT EDIT. + +package mocks + +import ( + clientv3 "go.etcd.io/etcd/client/v3" + + mock "github.com/stretchr/testify/mock" +) + +// MetaKv is an autogenerated mock type for the MetaKv type +type MetaKv struct { + mock.Mock +} + +type MetaKv_Expecter struct { + mock *mock.Mock +} + +func (_m *MetaKv) EXPECT() *MetaKv_Expecter { + return &MetaKv_Expecter{mock: &_m.Mock} +} + +// Close provides a mock function with given fields: +func (_m *MetaKv) Close() { + _m.Called() +} + +// MetaKv_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close' +type MetaKv_Close_Call struct { + *mock.Call +} + +// Close is a helper method to define mock.On call +func (_e *MetaKv_Expecter) Close() *MetaKv_Close_Call { + return &MetaKv_Close_Call{Call: _e.mock.On("Close")} +} + +func (_c *MetaKv_Close_Call) Run(run func()) *MetaKv_Close_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MetaKv_Close_Call) Return() *MetaKv_Close_Call { + _c.Call.Return() + return _c +} + +// CompareValueAndSwap provides a mock function with given fields: key, value, target, opts +func (_m *MetaKv) CompareValueAndSwap(key string, value string, target string, opts ...clientv3.OpOption) (bool, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, key, value, target) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 bool + if rf, ok := ret.Get(0).(func(string, string, string, ...clientv3.OpOption) bool); ok { + r0 = rf(key, value, target, opts...) + } else { + r0 = ret.Get(0).(bool) + } + + var r1 error + if rf, ok := ret.Get(1).(func(string, string, string, ...clientv3.OpOption) error); ok { + r1 = rf(key, value, target, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MetaKv_CompareValueAndSwap_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CompareValueAndSwap' +type MetaKv_CompareValueAndSwap_Call struct { + *mock.Call +} + +// CompareValueAndSwap is a helper method to define mock.On call +// - key string +// - value string +// - target string +// - opts ...clientv3.OpOption +func (_e *MetaKv_Expecter) CompareValueAndSwap(key interface{}, value interface{}, target interface{}, opts ...interface{}) *MetaKv_CompareValueAndSwap_Call { + return &MetaKv_CompareValueAndSwap_Call{Call: _e.mock.On("CompareValueAndSwap", + append([]interface{}{key, value, target}, opts...)...)} +} + +func (_c *MetaKv_CompareValueAndSwap_Call) Run(run func(key string, value string, target string, opts ...clientv3.OpOption)) *MetaKv_CompareValueAndSwap_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]clientv3.OpOption, len(args)-3) + for i, a := range args[3:] { + if a != nil { + variadicArgs[i] = a.(clientv3.OpOption) + } + } + run(args[0].(string), args[1].(string), args[2].(string), variadicArgs...) + }) + return _c +} + +func (_c *MetaKv_CompareValueAndSwap_Call) Return(_a0 bool, _a1 error) *MetaKv_CompareValueAndSwap_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +// CompareVersionAndSwap provides a mock function with given fields: key, version, target, opts +func (_m *MetaKv) CompareVersionAndSwap(key string, version int64, target string, opts ...clientv3.OpOption) (bool, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, key, version, target) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 bool + if rf, ok := ret.Get(0).(func(string, int64, string, ...clientv3.OpOption) bool); ok { + r0 = rf(key, version, target, opts...) + } else { + r0 = ret.Get(0).(bool) + } + + var r1 error + if rf, ok := ret.Get(1).(func(string, int64, string, ...clientv3.OpOption) error); ok { + r1 = rf(key, version, target, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MetaKv_CompareVersionAndSwap_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CompareVersionAndSwap' +type MetaKv_CompareVersionAndSwap_Call struct { + *mock.Call +} + +// CompareVersionAndSwap is a helper method to define mock.On call +// - key string +// - version int64 +// - target string +// - opts ...clientv3.OpOption +func (_e *MetaKv_Expecter) CompareVersionAndSwap(key interface{}, version interface{}, target interface{}, opts ...interface{}) *MetaKv_CompareVersionAndSwap_Call { + return &MetaKv_CompareVersionAndSwap_Call{Call: _e.mock.On("CompareVersionAndSwap", + append([]interface{}{key, version, target}, opts...)...)} +} + +func (_c *MetaKv_CompareVersionAndSwap_Call) Run(run func(key string, version int64, target string, opts ...clientv3.OpOption)) *MetaKv_CompareVersionAndSwap_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]clientv3.OpOption, len(args)-3) + for i, a := range args[3:] { + if a != nil { + variadicArgs[i] = a.(clientv3.OpOption) + } + } + run(args[0].(string), args[1].(int64), args[2].(string), variadicArgs...) + }) + return _c +} + +func (_c *MetaKv_CompareVersionAndSwap_Call) Return(_a0 bool, _a1 error) *MetaKv_CompareVersionAndSwap_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +// GetPath provides a mock function with given fields: key +func (_m *MetaKv) GetPath(key string) string { + ret := _m.Called(key) + + var r0 string + if rf, ok := ret.Get(0).(func(string) string); ok { + r0 = rf(key) + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// MetaKv_GetPath_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetPath' +type MetaKv_GetPath_Call struct { + *mock.Call +} + +// GetPath is a helper method to define mock.On call +// - key string +func (_e *MetaKv_Expecter) GetPath(key interface{}) *MetaKv_GetPath_Call { + return &MetaKv_GetPath_Call{Call: _e.mock.On("GetPath", key)} +} + +func (_c *MetaKv_GetPath_Call) Run(run func(key string)) *MetaKv_GetPath_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string)) + }) + return _c +} + +func (_c *MetaKv_GetPath_Call) Return(_a0 string) *MetaKv_GetPath_Call { + _c.Call.Return(_a0) + return _c +} + +// Grant provides a mock function with given fields: ttl +func (_m *MetaKv) Grant(ttl int64) (clientv3.LeaseID, error) { + ret := _m.Called(ttl) + + var r0 clientv3.LeaseID + if rf, ok := ret.Get(0).(func(int64) clientv3.LeaseID); ok { + r0 = rf(ttl) + } else { + r0 = ret.Get(0).(clientv3.LeaseID) + } + + var r1 error + if rf, ok := ret.Get(1).(func(int64) error); ok { + r1 = rf(ttl) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MetaKv_Grant_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Grant' +type MetaKv_Grant_Call struct { + *mock.Call +} + +// Grant is a helper method to define mock.On call +// - ttl int64 +func (_e *MetaKv_Expecter) Grant(ttl interface{}) *MetaKv_Grant_Call { + return &MetaKv_Grant_Call{Call: _e.mock.On("Grant", ttl)} +} + +func (_c *MetaKv_Grant_Call) Run(run func(ttl int64)) *MetaKv_Grant_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64)) + }) + return _c +} + +func (_c *MetaKv_Grant_Call) Return(id clientv3.LeaseID, err error) *MetaKv_Grant_Call { + _c.Call.Return(id, err) + return _c +} + +// KeepAlive provides a mock function with given fields: id +func (_m *MetaKv) KeepAlive(id clientv3.LeaseID) (<-chan *clientv3.LeaseKeepAliveResponse, error) { + ret := _m.Called(id) + + var r0 <-chan *clientv3.LeaseKeepAliveResponse + if rf, ok := ret.Get(0).(func(clientv3.LeaseID) <-chan *clientv3.LeaseKeepAliveResponse); ok { + r0 = rf(id) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(<-chan *clientv3.LeaseKeepAliveResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(clientv3.LeaseID) error); ok { + r1 = rf(id) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MetaKv_KeepAlive_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'KeepAlive' +type MetaKv_KeepAlive_Call struct { + *mock.Call +} + +// KeepAlive is a helper method to define mock.On call +// - id clientv3.LeaseID +func (_e *MetaKv_Expecter) KeepAlive(id interface{}) *MetaKv_KeepAlive_Call { + return &MetaKv_KeepAlive_Call{Call: _e.mock.On("KeepAlive", id)} +} + +func (_c *MetaKv_KeepAlive_Call) Run(run func(id clientv3.LeaseID)) *MetaKv_KeepAlive_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(clientv3.LeaseID)) + }) + return _c +} + +func (_c *MetaKv_KeepAlive_Call) Return(_a0 <-chan *clientv3.LeaseKeepAliveResponse, _a1 error) *MetaKv_KeepAlive_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +// Load provides a mock function with given fields: key +func (_m *MetaKv) Load(key string) (string, error) { + ret := _m.Called(key) + + var r0 string + if rf, ok := ret.Get(0).(func(string) string); ok { + r0 = rf(key) + } else { + r0 = ret.Get(0).(string) + } + + var r1 error + if rf, ok := ret.Get(1).(func(string) error); ok { + r1 = rf(key) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MetaKv_Load_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Load' +type MetaKv_Load_Call struct { + *mock.Call +} + +// Load is a helper method to define mock.On call +// - key string +func (_e *MetaKv_Expecter) Load(key interface{}) *MetaKv_Load_Call { + return &MetaKv_Load_Call{Call: _e.mock.On("Load", key)} +} + +func (_c *MetaKv_Load_Call) Run(run func(key string)) *MetaKv_Load_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string)) + }) + return _c +} + +func (_c *MetaKv_Load_Call) Return(_a0 string, _a1 error) *MetaKv_Load_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +// LoadWithPrefix provides a mock function with given fields: key +func (_m *MetaKv) LoadWithPrefix(key string) ([]string, []string, error) { + ret := _m.Called(key) + + var r0 []string + if rf, ok := ret.Get(0).(func(string) []string); ok { + r0 = rf(key) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]string) + } + } + + var r1 []string + if rf, ok := ret.Get(1).(func(string) []string); ok { + r1 = rf(key) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).([]string) + } + } + + var r2 error + if rf, ok := ret.Get(2).(func(string) error); ok { + r2 = rf(key) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// MetaKv_LoadWithPrefix_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'LoadWithPrefix' +type MetaKv_LoadWithPrefix_Call struct { + *mock.Call +} + +// LoadWithPrefix is a helper method to define mock.On call +// - key string +func (_e *MetaKv_Expecter) LoadWithPrefix(key interface{}) *MetaKv_LoadWithPrefix_Call { + return &MetaKv_LoadWithPrefix_Call{Call: _e.mock.On("LoadWithPrefix", key)} +} + +func (_c *MetaKv_LoadWithPrefix_Call) Run(run func(key string)) *MetaKv_LoadWithPrefix_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string)) + }) + return _c +} + +func (_c *MetaKv_LoadWithPrefix_Call) Return(_a0 []string, _a1 []string, _a2 error) *MetaKv_LoadWithPrefix_Call { + _c.Call.Return(_a0, _a1, _a2) + return _c +} + +// LoadWithPrefix2 provides a mock function with given fields: key +func (_m *MetaKv) LoadWithPrefix2(key string) ([]string, []string, []int64, error) { + ret := _m.Called(key) + + var r0 []string + if rf, ok := ret.Get(0).(func(string) []string); ok { + r0 = rf(key) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]string) + } + } + + var r1 []string + if rf, ok := ret.Get(1).(func(string) []string); ok { + r1 = rf(key) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).([]string) + } + } + + var r2 []int64 + if rf, ok := ret.Get(2).(func(string) []int64); ok { + r2 = rf(key) + } else { + if ret.Get(2) != nil { + r2 = ret.Get(2).([]int64) + } + } + + var r3 error + if rf, ok := ret.Get(3).(func(string) error); ok { + r3 = rf(key) + } else { + r3 = ret.Error(3) + } + + return r0, r1, r2, r3 +} + +// MetaKv_LoadWithPrefix2_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'LoadWithPrefix2' +type MetaKv_LoadWithPrefix2_Call struct { + *mock.Call +} + +// LoadWithPrefix2 is a helper method to define mock.On call +// - key string +func (_e *MetaKv_Expecter) LoadWithPrefix2(key interface{}) *MetaKv_LoadWithPrefix2_Call { + return &MetaKv_LoadWithPrefix2_Call{Call: _e.mock.On("LoadWithPrefix2", key)} +} + +func (_c *MetaKv_LoadWithPrefix2_Call) Run(run func(key string)) *MetaKv_LoadWithPrefix2_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string)) + }) + return _c +} + +func (_c *MetaKv_LoadWithPrefix2_Call) Return(_a0 []string, _a1 []string, _a2 []int64, _a3 error) *MetaKv_LoadWithPrefix2_Call { + _c.Call.Return(_a0, _a1, _a2, _a3) + return _c +} + +// LoadWithRevision provides a mock function with given fields: key +func (_m *MetaKv) LoadWithRevision(key string) ([]string, []string, int64, error) { + ret := _m.Called(key) + + var r0 []string + if rf, ok := ret.Get(0).(func(string) []string); ok { + r0 = rf(key) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]string) + } + } + + var r1 []string + if rf, ok := ret.Get(1).(func(string) []string); ok { + r1 = rf(key) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).([]string) + } + } + + var r2 int64 + if rf, ok := ret.Get(2).(func(string) int64); ok { + r2 = rf(key) + } else { + r2 = ret.Get(2).(int64) + } + + var r3 error + if rf, ok := ret.Get(3).(func(string) error); ok { + r3 = rf(key) + } else { + r3 = ret.Error(3) + } + + return r0, r1, r2, r3 +} + +// MetaKv_LoadWithRevision_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'LoadWithRevision' +type MetaKv_LoadWithRevision_Call struct { + *mock.Call +} + +// LoadWithRevision is a helper method to define mock.On call +// - key string +func (_e *MetaKv_Expecter) LoadWithRevision(key interface{}) *MetaKv_LoadWithRevision_Call { + return &MetaKv_LoadWithRevision_Call{Call: _e.mock.On("LoadWithRevision", key)} +} + +func (_c *MetaKv_LoadWithRevision_Call) Run(run func(key string)) *MetaKv_LoadWithRevision_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string)) + }) + return _c +} + +func (_c *MetaKv_LoadWithRevision_Call) Return(_a0 []string, _a1 []string, _a2 int64, _a3 error) *MetaKv_LoadWithRevision_Call { + _c.Call.Return(_a0, _a1, _a2, _a3) + return _c +} + +// LoadWithRevisionAndVersions provides a mock function with given fields: key +func (_m *MetaKv) LoadWithRevisionAndVersions(key string) ([]string, []string, []int64, int64, error) { + ret := _m.Called(key) + + var r0 []string + if rf, ok := ret.Get(0).(func(string) []string); ok { + r0 = rf(key) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]string) + } + } + + var r1 []string + if rf, ok := ret.Get(1).(func(string) []string); ok { + r1 = rf(key) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).([]string) + } + } + + var r2 []int64 + if rf, ok := ret.Get(2).(func(string) []int64); ok { + r2 = rf(key) + } else { + if ret.Get(2) != nil { + r2 = ret.Get(2).([]int64) + } + } + + var r3 int64 + if rf, ok := ret.Get(3).(func(string) int64); ok { + r3 = rf(key) + } else { + r3 = ret.Get(3).(int64) + } + + var r4 error + if rf, ok := ret.Get(4).(func(string) error); ok { + r4 = rf(key) + } else { + r4 = ret.Error(4) + } + + return r0, r1, r2, r3, r4 +} + +// MetaKv_LoadWithRevisionAndVersions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'LoadWithRevisionAndVersions' +type MetaKv_LoadWithRevisionAndVersions_Call struct { + *mock.Call +} + +// LoadWithRevisionAndVersions is a helper method to define mock.On call +// - key string +func (_e *MetaKv_Expecter) LoadWithRevisionAndVersions(key interface{}) *MetaKv_LoadWithRevisionAndVersions_Call { + return &MetaKv_LoadWithRevisionAndVersions_Call{Call: _e.mock.On("LoadWithRevisionAndVersions", key)} +} + +func (_c *MetaKv_LoadWithRevisionAndVersions_Call) Run(run func(key string)) *MetaKv_LoadWithRevisionAndVersions_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string)) + }) + return _c +} + +func (_c *MetaKv_LoadWithRevisionAndVersions_Call) Return(_a0 []string, _a1 []string, _a2 []int64, _a3 int64, _a4 error) *MetaKv_LoadWithRevisionAndVersions_Call { + _c.Call.Return(_a0, _a1, _a2, _a3, _a4) + return _c +} + +// MultiLoad provides a mock function with given fields: keys +func (_m *MetaKv) MultiLoad(keys []string) ([]string, error) { + ret := _m.Called(keys) + + var r0 []string + if rf, ok := ret.Get(0).(func([]string) []string); ok { + r0 = rf(keys) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]string) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func([]string) error); ok { + r1 = rf(keys) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MetaKv_MultiLoad_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MultiLoad' +type MetaKv_MultiLoad_Call struct { + *mock.Call +} + +// MultiLoad is a helper method to define mock.On call +// - keys []string +func (_e *MetaKv_Expecter) MultiLoad(keys interface{}) *MetaKv_MultiLoad_Call { + return &MetaKv_MultiLoad_Call{Call: _e.mock.On("MultiLoad", keys)} +} + +func (_c *MetaKv_MultiLoad_Call) Run(run func(keys []string)) *MetaKv_MultiLoad_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].([]string)) + }) + return _c +} + +func (_c *MetaKv_MultiLoad_Call) Return(_a0 []string, _a1 error) *MetaKv_MultiLoad_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +// MultiRemove provides a mock function with given fields: keys +func (_m *MetaKv) MultiRemove(keys []string) error { + ret := _m.Called(keys) + + var r0 error + if rf, ok := ret.Get(0).(func([]string) error); ok { + r0 = rf(keys) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MetaKv_MultiRemove_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MultiRemove' +type MetaKv_MultiRemove_Call struct { + *mock.Call +} + +// MultiRemove is a helper method to define mock.On call +// - keys []string +func (_e *MetaKv_Expecter) MultiRemove(keys interface{}) *MetaKv_MultiRemove_Call { + return &MetaKv_MultiRemove_Call{Call: _e.mock.On("MultiRemove", keys)} +} + +func (_c *MetaKv_MultiRemove_Call) Run(run func(keys []string)) *MetaKv_MultiRemove_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].([]string)) + }) + return _c +} + +func (_c *MetaKv_MultiRemove_Call) Return(_a0 error) *MetaKv_MultiRemove_Call { + _c.Call.Return(_a0) + return _c +} + +// MultiRemoveWithPrefix provides a mock function with given fields: keys +func (_m *MetaKv) MultiRemoveWithPrefix(keys []string) error { + ret := _m.Called(keys) + + var r0 error + if rf, ok := ret.Get(0).(func([]string) error); ok { + r0 = rf(keys) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MetaKv_MultiRemoveWithPrefix_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MultiRemoveWithPrefix' +type MetaKv_MultiRemoveWithPrefix_Call struct { + *mock.Call +} + +// MultiRemoveWithPrefix is a helper method to define mock.On call +// - keys []string +func (_e *MetaKv_Expecter) MultiRemoveWithPrefix(keys interface{}) *MetaKv_MultiRemoveWithPrefix_Call { + return &MetaKv_MultiRemoveWithPrefix_Call{Call: _e.mock.On("MultiRemoveWithPrefix", keys)} +} + +func (_c *MetaKv_MultiRemoveWithPrefix_Call) Run(run func(keys []string)) *MetaKv_MultiRemoveWithPrefix_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].([]string)) + }) + return _c +} + +func (_c *MetaKv_MultiRemoveWithPrefix_Call) Return(_a0 error) *MetaKv_MultiRemoveWithPrefix_Call { + _c.Call.Return(_a0) + return _c +} + +// MultiSave provides a mock function with given fields: kvs +func (_m *MetaKv) MultiSave(kvs map[string]string) error { + ret := _m.Called(kvs) + + var r0 error + if rf, ok := ret.Get(0).(func(map[string]string) error); ok { + r0 = rf(kvs) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MetaKv_MultiSave_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MultiSave' +type MetaKv_MultiSave_Call struct { + *mock.Call +} + +// MultiSave is a helper method to define mock.On call +// - kvs map[string]string +func (_e *MetaKv_Expecter) MultiSave(kvs interface{}) *MetaKv_MultiSave_Call { + return &MetaKv_MultiSave_Call{Call: _e.mock.On("MultiSave", kvs)} +} + +func (_c *MetaKv_MultiSave_Call) Run(run func(kvs map[string]string)) *MetaKv_MultiSave_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(map[string]string)) + }) + return _c +} + +func (_c *MetaKv_MultiSave_Call) Return(_a0 error) *MetaKv_MultiSave_Call { + _c.Call.Return(_a0) + return _c +} + +// MultiSaveAndRemove provides a mock function with given fields: saves, removals +func (_m *MetaKv) MultiSaveAndRemove(saves map[string]string, removals []string) error { + ret := _m.Called(saves, removals) + + var r0 error + if rf, ok := ret.Get(0).(func(map[string]string, []string) error); ok { + r0 = rf(saves, removals) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MetaKv_MultiSaveAndRemove_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MultiSaveAndRemove' +type MetaKv_MultiSaveAndRemove_Call struct { + *mock.Call +} + +// MultiSaveAndRemove is a helper method to define mock.On call +// - saves map[string]string +// - removals []string +func (_e *MetaKv_Expecter) MultiSaveAndRemove(saves interface{}, removals interface{}) *MetaKv_MultiSaveAndRemove_Call { + return &MetaKv_MultiSaveAndRemove_Call{Call: _e.mock.On("MultiSaveAndRemove", saves, removals)} +} + +func (_c *MetaKv_MultiSaveAndRemove_Call) Run(run func(saves map[string]string, removals []string)) *MetaKv_MultiSaveAndRemove_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(map[string]string), args[1].([]string)) + }) + return _c +} + +func (_c *MetaKv_MultiSaveAndRemove_Call) Return(_a0 error) *MetaKv_MultiSaveAndRemove_Call { + _c.Call.Return(_a0) + return _c +} + +// MultiSaveAndRemoveWithPrefix provides a mock function with given fields: saves, removals +func (_m *MetaKv) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error { + ret := _m.Called(saves, removals) + + var r0 error + if rf, ok := ret.Get(0).(func(map[string]string, []string) error); ok { + r0 = rf(saves, removals) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MetaKv_MultiSaveAndRemoveWithPrefix_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MultiSaveAndRemoveWithPrefix' +type MetaKv_MultiSaveAndRemoveWithPrefix_Call struct { + *mock.Call +} + +// MultiSaveAndRemoveWithPrefix is a helper method to define mock.On call +// - saves map[string]string +// - removals []string +func (_e *MetaKv_Expecter) MultiSaveAndRemoveWithPrefix(saves interface{}, removals interface{}) *MetaKv_MultiSaveAndRemoveWithPrefix_Call { + return &MetaKv_MultiSaveAndRemoveWithPrefix_Call{Call: _e.mock.On("MultiSaveAndRemoveWithPrefix", saves, removals)} +} + +func (_c *MetaKv_MultiSaveAndRemoveWithPrefix_Call) Run(run func(saves map[string]string, removals []string)) *MetaKv_MultiSaveAndRemoveWithPrefix_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(map[string]string), args[1].([]string)) + }) + return _c +} + +func (_c *MetaKv_MultiSaveAndRemoveWithPrefix_Call) Return(_a0 error) *MetaKv_MultiSaveAndRemoveWithPrefix_Call { + _c.Call.Return(_a0) + return _c +} + +// Remove provides a mock function with given fields: key +func (_m *MetaKv) Remove(key string) error { + ret := _m.Called(key) + + var r0 error + if rf, ok := ret.Get(0).(func(string) error); ok { + r0 = rf(key) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MetaKv_Remove_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Remove' +type MetaKv_Remove_Call struct { + *mock.Call +} + +// Remove is a helper method to define mock.On call +// - key string +func (_e *MetaKv_Expecter) Remove(key interface{}) *MetaKv_Remove_Call { + return &MetaKv_Remove_Call{Call: _e.mock.On("Remove", key)} +} + +func (_c *MetaKv_Remove_Call) Run(run func(key string)) *MetaKv_Remove_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string)) + }) + return _c +} + +func (_c *MetaKv_Remove_Call) Return(_a0 error) *MetaKv_Remove_Call { + _c.Call.Return(_a0) + return _c +} + +// RemoveWithPrefix provides a mock function with given fields: key +func (_m *MetaKv) RemoveWithPrefix(key string) error { + ret := _m.Called(key) + + var r0 error + if rf, ok := ret.Get(0).(func(string) error); ok { + r0 = rf(key) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MetaKv_RemoveWithPrefix_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemoveWithPrefix' +type MetaKv_RemoveWithPrefix_Call struct { + *mock.Call +} + +// RemoveWithPrefix is a helper method to define mock.On call +// - key string +func (_e *MetaKv_Expecter) RemoveWithPrefix(key interface{}) *MetaKv_RemoveWithPrefix_Call { + return &MetaKv_RemoveWithPrefix_Call{Call: _e.mock.On("RemoveWithPrefix", key)} +} + +func (_c *MetaKv_RemoveWithPrefix_Call) Run(run func(key string)) *MetaKv_RemoveWithPrefix_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string)) + }) + return _c +} + +func (_c *MetaKv_RemoveWithPrefix_Call) Return(_a0 error) *MetaKv_RemoveWithPrefix_Call { + _c.Call.Return(_a0) + return _c +} + +// Save provides a mock function with given fields: key, value +func (_m *MetaKv) Save(key string, value string) error { + ret := _m.Called(key, value) + + var r0 error + if rf, ok := ret.Get(0).(func(string, string) error); ok { + r0 = rf(key, value) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MetaKv_Save_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Save' +type MetaKv_Save_Call struct { + *mock.Call +} + +// Save is a helper method to define mock.On call +// - key string +// - value string +func (_e *MetaKv_Expecter) Save(key interface{}, value interface{}) *MetaKv_Save_Call { + return &MetaKv_Save_Call{Call: _e.mock.On("Save", key, value)} +} + +func (_c *MetaKv_Save_Call) Run(run func(key string, value string)) *MetaKv_Save_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string), args[1].(string)) + }) + return _c +} + +func (_c *MetaKv_Save_Call) Return(_a0 error) *MetaKv_Save_Call { + _c.Call.Return(_a0) + return _c +} + +// SaveWithIgnoreLease provides a mock function with given fields: key, value +func (_m *MetaKv) SaveWithIgnoreLease(key string, value string) error { + ret := _m.Called(key, value) + + var r0 error + if rf, ok := ret.Get(0).(func(string, string) error); ok { + r0 = rf(key, value) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MetaKv_SaveWithIgnoreLease_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SaveWithIgnoreLease' +type MetaKv_SaveWithIgnoreLease_Call struct { + *mock.Call +} + +// SaveWithIgnoreLease is a helper method to define mock.On call +// - key string +// - value string +func (_e *MetaKv_Expecter) SaveWithIgnoreLease(key interface{}, value interface{}) *MetaKv_SaveWithIgnoreLease_Call { + return &MetaKv_SaveWithIgnoreLease_Call{Call: _e.mock.On("SaveWithIgnoreLease", key, value)} +} + +func (_c *MetaKv_SaveWithIgnoreLease_Call) Run(run func(key string, value string)) *MetaKv_SaveWithIgnoreLease_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string), args[1].(string)) + }) + return _c +} + +func (_c *MetaKv_SaveWithIgnoreLease_Call) Return(_a0 error) *MetaKv_SaveWithIgnoreLease_Call { + _c.Call.Return(_a0) + return _c +} + +// SaveWithLease provides a mock function with given fields: key, value, id +func (_m *MetaKv) SaveWithLease(key string, value string, id clientv3.LeaseID) error { + ret := _m.Called(key, value, id) + + var r0 error + if rf, ok := ret.Get(0).(func(string, string, clientv3.LeaseID) error); ok { + r0 = rf(key, value, id) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MetaKv_SaveWithLease_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SaveWithLease' +type MetaKv_SaveWithLease_Call struct { + *mock.Call +} + +// SaveWithLease is a helper method to define mock.On call +// - key string +// - value string +// - id clientv3.LeaseID +func (_e *MetaKv_Expecter) SaveWithLease(key interface{}, value interface{}, id interface{}) *MetaKv_SaveWithLease_Call { + return &MetaKv_SaveWithLease_Call{Call: _e.mock.On("SaveWithLease", key, value, id)} +} + +func (_c *MetaKv_SaveWithLease_Call) Run(run func(key string, value string, id clientv3.LeaseID)) *MetaKv_SaveWithLease_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string), args[1].(string), args[2].(clientv3.LeaseID)) + }) + return _c +} + +func (_c *MetaKv_SaveWithLease_Call) Return(_a0 error) *MetaKv_SaveWithLease_Call { + _c.Call.Return(_a0) + return _c +} + +// WalkWithPrefix provides a mock function with given fields: prefix, paginationSize, fn +func (_m *MetaKv) WalkWithPrefix(prefix string, paginationSize int, fn func([]byte, []byte) error) error { + ret := _m.Called(prefix, paginationSize, fn) + + var r0 error + if rf, ok := ret.Get(0).(func(string, int, func([]byte, []byte) error) error); ok { + r0 = rf(prefix, paginationSize, fn) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MetaKv_WalkWithPrefix_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'WalkWithPrefix' +type MetaKv_WalkWithPrefix_Call struct { + *mock.Call +} + +// WalkWithPrefix is a helper method to define mock.On call +// - prefix string +// - paginationSize int +// - fn func([]byte , []byte) error +func (_e *MetaKv_Expecter) WalkWithPrefix(prefix interface{}, paginationSize interface{}, fn interface{}) *MetaKv_WalkWithPrefix_Call { + return &MetaKv_WalkWithPrefix_Call{Call: _e.mock.On("WalkWithPrefix", prefix, paginationSize, fn)} +} + +func (_c *MetaKv_WalkWithPrefix_Call) Run(run func(prefix string, paginationSize int, fn func([]byte, []byte) error)) *MetaKv_WalkWithPrefix_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string), args[1].(int), args[2].(func([]byte, []byte) error)) + }) + return _c +} + +func (_c *MetaKv_WalkWithPrefix_Call) Return(_a0 error) *MetaKv_WalkWithPrefix_Call { + _c.Call.Return(_a0) + return _c +} + +// Watch provides a mock function with given fields: key +func (_m *MetaKv) Watch(key string) clientv3.WatchChan { + ret := _m.Called(key) + + var r0 clientv3.WatchChan + if rf, ok := ret.Get(0).(func(string) clientv3.WatchChan); ok { + r0 = rf(key) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(clientv3.WatchChan) + } + } + + return r0 +} + +// MetaKv_Watch_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Watch' +type MetaKv_Watch_Call struct { + *mock.Call +} + +// Watch is a helper method to define mock.On call +// - key string +func (_e *MetaKv_Expecter) Watch(key interface{}) *MetaKv_Watch_Call { + return &MetaKv_Watch_Call{Call: _e.mock.On("Watch", key)} +} + +func (_c *MetaKv_Watch_Call) Run(run func(key string)) *MetaKv_Watch_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string)) + }) + return _c +} + +func (_c *MetaKv_Watch_Call) Return(_a0 clientv3.WatchChan) *MetaKv_Watch_Call { + _c.Call.Return(_a0) + return _c +} + +// WatchWithPrefix provides a mock function with given fields: key +func (_m *MetaKv) WatchWithPrefix(key string) clientv3.WatchChan { + ret := _m.Called(key) + + var r0 clientv3.WatchChan + if rf, ok := ret.Get(0).(func(string) clientv3.WatchChan); ok { + r0 = rf(key) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(clientv3.WatchChan) + } + } + + return r0 +} + +// MetaKv_WatchWithPrefix_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'WatchWithPrefix' +type MetaKv_WatchWithPrefix_Call struct { + *mock.Call +} + +// WatchWithPrefix is a helper method to define mock.On call +// - key string +func (_e *MetaKv_Expecter) WatchWithPrefix(key interface{}) *MetaKv_WatchWithPrefix_Call { + return &MetaKv_WatchWithPrefix_Call{Call: _e.mock.On("WatchWithPrefix", key)} +} + +func (_c *MetaKv_WatchWithPrefix_Call) Run(run func(key string)) *MetaKv_WatchWithPrefix_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string)) + }) + return _c +} + +func (_c *MetaKv_WatchWithPrefix_Call) Return(_a0 clientv3.WatchChan) *MetaKv_WatchWithPrefix_Call { + _c.Call.Return(_a0) + return _c +} + +// WatchWithRevision provides a mock function with given fields: key, revision +func (_m *MetaKv) WatchWithRevision(key string, revision int64) clientv3.WatchChan { + ret := _m.Called(key, revision) + + var r0 clientv3.WatchChan + if rf, ok := ret.Get(0).(func(string, int64) clientv3.WatchChan); ok { + r0 = rf(key, revision) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(clientv3.WatchChan) + } + } + + return r0 +} + +// MetaKv_WatchWithRevision_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'WatchWithRevision' +type MetaKv_WatchWithRevision_Call struct { + *mock.Call +} + +// WatchWithRevision is a helper method to define mock.On call +// - key string +// - revision int64 +func (_e *MetaKv_Expecter) WatchWithRevision(key interface{}, revision interface{}) *MetaKv_WatchWithRevision_Call { + return &MetaKv_WatchWithRevision_Call{Call: _e.mock.On("WatchWithRevision", key, revision)} +} + +func (_c *MetaKv_WatchWithRevision_Call) Run(run func(key string, revision int64)) *MetaKv_WatchWithRevision_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string), args[1].(int64)) + }) + return _c +} + +func (_c *MetaKv_WatchWithRevision_Call) Return(_a0 clientv3.WatchChan) *MetaKv_WatchWithRevision_Call { + _c.Call.Return(_a0) + return _c +} + +type mockConstructorTestingTNewMetaKv interface { + mock.TestingT + Cleanup(func()) +} + +// NewMetaKv creates a new instance of MetaKv. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMetaKv(t mockConstructorTestingTNewMetaKv) *MetaKv { + mock := &MetaKv{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/kv/mocks/SnapShotKV.go b/internal/kv/mocks/SnapShotKV.go index 2c48590f54..43fb291ac8 100644 --- a/internal/kv/mocks/SnapShotKV.go +++ b/internal/kv/mocks/SnapShotKV.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.0. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package mocks @@ -44,8 +44,8 @@ type SnapShotKV_Load_Call struct { } // Load is a helper method to define mock.On call -// - key string -// - ts uint64 +// - key string +// - ts uint64 func (_e *SnapShotKV_Expecter) Load(key interface{}, ts interface{}) *SnapShotKV_Load_Call { return &SnapShotKV_Load_Call{Call: _e.mock.On("Load", key, ts)} } @@ -100,8 +100,8 @@ type SnapShotKV_LoadWithPrefix_Call struct { } // LoadWithPrefix is a helper method to define mock.On call -// - key string -// - ts uint64 +// - key string +// - ts uint64 func (_e *SnapShotKV_Expecter) LoadWithPrefix(key interface{}, ts interface{}) *SnapShotKV_LoadWithPrefix_Call { return &SnapShotKV_LoadWithPrefix_Call{Call: _e.mock.On("LoadWithPrefix", key, ts)} } @@ -138,8 +138,8 @@ type SnapShotKV_MultiSave_Call struct { } // MultiSave is a helper method to define mock.On call -// - kvs map[string]string -// - ts uint64 +// - kvs map[string]string +// - ts uint64 func (_e *SnapShotKV_Expecter) MultiSave(kvs interface{}, ts interface{}) *SnapShotKV_MultiSave_Call { return &SnapShotKV_MultiSave_Call{Call: _e.mock.On("MultiSave", kvs, ts)} } @@ -176,9 +176,9 @@ type SnapShotKV_MultiSaveAndRemoveWithPrefix_Call struct { } // MultiSaveAndRemoveWithPrefix is a helper method to define mock.On call -// - saves map[string]string -// - removals []string -// - ts uint64 +// - saves map[string]string +// - removals []string +// - ts uint64 func (_e *SnapShotKV_Expecter) MultiSaveAndRemoveWithPrefix(saves interface{}, removals interface{}, ts interface{}) *SnapShotKV_MultiSaveAndRemoveWithPrefix_Call { return &SnapShotKV_MultiSaveAndRemoveWithPrefix_Call{Call: _e.mock.On("MultiSaveAndRemoveWithPrefix", saves, removals, ts)} } @@ -215,9 +215,9 @@ type SnapShotKV_Save_Call struct { } // Save is a helper method to define mock.On call -// - key string -// - value string -// - ts uint64 +// - key string +// - value string +// - ts uint64 func (_e *SnapShotKV_Expecter) Save(key interface{}, value interface{}, ts interface{}) *SnapShotKV_Save_Call { return &SnapShotKV_Save_Call{Call: _e.mock.On("Save", key, value, ts)} } diff --git a/internal/kv/mocks/TxnKV.go b/internal/kv/mocks/TxnKV.go index f9a958f55b..0e87bde3b1 100644 --- a/internal/kv/mocks/TxnKV.go +++ b/internal/kv/mocks/TxnKV.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.0. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package mocks @@ -71,7 +71,7 @@ type TxnKV_Load_Call struct { } // Load is a helper method to define mock.On call -// - key string +// - key string func (_e *TxnKV_Expecter) Load(key interface{}) *TxnKV_Load_Call { return &TxnKV_Load_Call{Call: _e.mock.On("Load", key)} } @@ -126,7 +126,7 @@ type TxnKV_LoadWithPrefix_Call struct { } // LoadWithPrefix is a helper method to define mock.On call -// - key string +// - key string func (_e *TxnKV_Expecter) LoadWithPrefix(key interface{}) *TxnKV_LoadWithPrefix_Call { return &TxnKV_LoadWithPrefix_Call{Call: _e.mock.On("LoadWithPrefix", key)} } @@ -172,7 +172,7 @@ type TxnKV_MultiLoad_Call struct { } // MultiLoad is a helper method to define mock.On call -// - keys []string +// - keys []string func (_e *TxnKV_Expecter) MultiLoad(keys interface{}) *TxnKV_MultiLoad_Call { return &TxnKV_MultiLoad_Call{Call: _e.mock.On("MultiLoad", keys)} } @@ -209,7 +209,7 @@ type TxnKV_MultiRemove_Call struct { } // MultiRemove is a helper method to define mock.On call -// - keys []string +// - keys []string func (_e *TxnKV_Expecter) MultiRemove(keys interface{}) *TxnKV_MultiRemove_Call { return &TxnKV_MultiRemove_Call{Call: _e.mock.On("MultiRemove", keys)} } @@ -246,7 +246,7 @@ type TxnKV_MultiRemoveWithPrefix_Call struct { } // MultiRemoveWithPrefix is a helper method to define mock.On call -// - keys []string +// - keys []string func (_e *TxnKV_Expecter) MultiRemoveWithPrefix(keys interface{}) *TxnKV_MultiRemoveWithPrefix_Call { return &TxnKV_MultiRemoveWithPrefix_Call{Call: _e.mock.On("MultiRemoveWithPrefix", keys)} } @@ -283,7 +283,7 @@ type TxnKV_MultiSave_Call struct { } // MultiSave is a helper method to define mock.On call -// - kvs map[string]string +// - kvs map[string]string func (_e *TxnKV_Expecter) MultiSave(kvs interface{}) *TxnKV_MultiSave_Call { return &TxnKV_MultiSave_Call{Call: _e.mock.On("MultiSave", kvs)} } @@ -320,8 +320,8 @@ type TxnKV_MultiSaveAndRemove_Call struct { } // MultiSaveAndRemove is a helper method to define mock.On call -// - saves map[string]string -// - removals []string +// - saves map[string]string +// - removals []string func (_e *TxnKV_Expecter) MultiSaveAndRemove(saves interface{}, removals interface{}) *TxnKV_MultiSaveAndRemove_Call { return &TxnKV_MultiSaveAndRemove_Call{Call: _e.mock.On("MultiSaveAndRemove", saves, removals)} } @@ -358,8 +358,8 @@ type TxnKV_MultiSaveAndRemoveWithPrefix_Call struct { } // MultiSaveAndRemoveWithPrefix is a helper method to define mock.On call -// - saves map[string]string -// - removals []string +// - saves map[string]string +// - removals []string func (_e *TxnKV_Expecter) MultiSaveAndRemoveWithPrefix(saves interface{}, removals interface{}) *TxnKV_MultiSaveAndRemoveWithPrefix_Call { return &TxnKV_MultiSaveAndRemoveWithPrefix_Call{Call: _e.mock.On("MultiSaveAndRemoveWithPrefix", saves, removals)} } @@ -396,7 +396,7 @@ type TxnKV_Remove_Call struct { } // Remove is a helper method to define mock.On call -// - key string +// - key string func (_e *TxnKV_Expecter) Remove(key interface{}) *TxnKV_Remove_Call { return &TxnKV_Remove_Call{Call: _e.mock.On("Remove", key)} } @@ -433,7 +433,7 @@ type TxnKV_RemoveWithPrefix_Call struct { } // RemoveWithPrefix is a helper method to define mock.On call -// - key string +// - key string func (_e *TxnKV_Expecter) RemoveWithPrefix(key interface{}) *TxnKV_RemoveWithPrefix_Call { return &TxnKV_RemoveWithPrefix_Call{Call: _e.mock.On("RemoveWithPrefix", key)} } @@ -470,8 +470,8 @@ type TxnKV_Save_Call struct { } // Save is a helper method to define mock.On call -// - key string -// - value string +// - key string +// - value string func (_e *TxnKV_Expecter) Save(key interface{}, value interface{}) *TxnKV_Save_Call { return &TxnKV_Save_Call{Call: _e.mock.On("Save", key, value)} } diff --git a/internal/metastore/kv/rootcoord/meta_snapshot.go b/internal/metastore/kv/rootcoord/meta_snapshot.go deleted file mode 100644 index fc53663f6a..0000000000 --- a/internal/metastore/kv/rootcoord/meta_snapshot.go +++ /dev/null @@ -1,410 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package rootcoord - -import ( - "context" - "fmt" - "path" - "strconv" - "sync" - "time" - - "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/util/typeutil" - clientv3 "go.etcd.io/etcd/client/v3" - "go.uber.org/zap" -) - -const ( - // RequestTimeout timeout for request - RequestTimeout = 10 * time.Second -) - -type rtPair struct { - rev int64 - ts typeutil.Timestamp -} - -type MetaSnapshot struct { - cli *clientv3.Client - root string - tsKey string - lock sync.RWMutex - - ts2Rev []rtPair - minPos int - maxPos int - numTs int -} - -func NewMetaSnapshot(cli *clientv3.Client, root, tsKey string, bufSize int) (*MetaSnapshot, error) { - if bufSize <= 0 { - bufSize = 1024 - } - ms := &MetaSnapshot{ - cli: cli, - root: root, - tsKey: tsKey, - lock: sync.RWMutex{}, - ts2Rev: make([]rtPair, bufSize), - minPos: 0, - maxPos: 0, - numTs: 0, - } - if err := ms.loadTs(); err != nil { - return nil, err - } - return ms, nil -} - -func (ms *MetaSnapshot) loadTs() error { - ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout) - defer cancel() - - key := path.Join(ms.root, ms.tsKey) - resp, err := ms.cli.Get(ctx, key) - if err != nil { - return err - } - if len(resp.Kvs) <= 0 { - return nil - } - version := resp.Kvs[0].Version - revision := resp.Kvs[0].ModRevision - createRevision := resp.Kvs[0].CreateRevision - strTs := string(resp.Kvs[0].Value) - ts, err := strconv.ParseUint(strTs, 10, 64) - if err != nil { - return err - } - log.Info("Load last ts", zap.Int64("version", version), zap.Int64("revision", revision)) - - ms.initTs(revision, ts) - // start from revision-1, until equals to create revision - for revision--; revision >= createRevision; revision-- { - if ms.numTs == len(ms.ts2Rev) { - break - } - resp, err = ms.cli.Get(ctx, key, clientv3.WithRev(revision)) - if err != nil { - return err - } - if len(resp.Kvs) <= 0 { - return nil - } - - curVer := resp.Kvs[0].Version - curRev := resp.Kvs[0].ModRevision - if curVer > version { - log.Warn("version go backwards", zap.Int64("curVer", curVer), zap.Int64("version", version)) - return nil - } - if curVer == version { - log.Debug("Snapshot found save version with different revision", zap.Int64("revision", revision), zap.Int64("version", version)) - } - strTs := string(resp.Kvs[0].Value) - if strTs == "0" { - //#issue 7150, index building inserted "0", skipping - //this is a special fix for backward compatibility, the previous version will put 0 ts into the Snapshot building index - continue - } - curTs, err := strconv.ParseUint(strTs, 10, 64) - if err != nil { - return err - } - if curTs >= ts { - return fmt.Errorf("timestamp go back, curTs=%d,ts=%d", curTs, ts) - } - ms.initTs(curRev, curTs) - ts = curTs - revision = curRev - version = curVer - } - - return nil -} - -func (ms *MetaSnapshot) maxTs() typeutil.Timestamp { - return ms.ts2Rev[ms.maxPos].ts -} - -func (ms *MetaSnapshot) minTs() typeutil.Timestamp { - return ms.ts2Rev[ms.minPos].ts -} - -func (ms *MetaSnapshot) initTs(rev int64, ts typeutil.Timestamp) { - log.Debug("init meta Snapshot ts", zap.Int64("rev", rev), zap.Uint64("ts", ts)) - if ms.numTs == 0 { - ms.maxPos = len(ms.ts2Rev) - 1 - ms.minPos = len(ms.ts2Rev) - 1 - ms.numTs = 1 - ms.ts2Rev[ms.maxPos].rev = rev - ms.ts2Rev[ms.maxPos].ts = ts - } else if ms.numTs < len(ms.ts2Rev) { - ms.minPos-- - ms.numTs++ - ms.ts2Rev[ms.minPos].rev = rev - ms.ts2Rev[ms.minPos].ts = ts - } -} - -func (ms *MetaSnapshot) putTs(rev int64, ts typeutil.Timestamp) { - log.Debug("put meta snapshto ts", zap.Int64("rev", rev), zap.Uint64("ts", ts)) - ms.maxPos++ - if ms.maxPos == len(ms.ts2Rev) { - ms.maxPos = 0 - } - - ms.ts2Rev[ms.maxPos].rev = rev - ms.ts2Rev[ms.maxPos].ts = ts - if ms.numTs < len(ms.ts2Rev) { - ms.numTs++ - } else { - ms.minPos++ - if ms.minPos == len(ms.ts2Rev) { - ms.minPos = 0 - } - } -} - -func (ms *MetaSnapshot) searchOnCache(ts typeutil.Timestamp, start, length int) int64 { - if length == 1 { - return ms.ts2Rev[start].rev - } - begin := start - end := begin + length - mid := (begin + end) / 2 - for { - if ms.ts2Rev[mid].ts == ts { - return ms.ts2Rev[mid].rev - } - if mid == begin { - if ms.ts2Rev[mid].ts < ts || mid == start { - return ms.ts2Rev[mid].rev - } - return ms.ts2Rev[mid-1].rev - } - if ms.ts2Rev[mid].ts > ts { - end = mid - } else if ms.ts2Rev[mid].ts < ts { - begin = mid + 1 - } - mid = (begin + end) / 2 - } -} - -func (ms *MetaSnapshot) getRevOnCache(ts typeutil.Timestamp) int64 { - if ms.numTs == 0 { - return 0 - } - if ts >= ms.ts2Rev[ms.maxPos].ts { - return ms.ts2Rev[ms.maxPos].rev - } - if ts < ms.ts2Rev[ms.minPos].ts { - return 0 - } - if ms.maxPos > ms.minPos { - return ms.searchOnCache(ts, ms.minPos, ms.maxPos-ms.minPos+1) - } - topVal := ms.ts2Rev[len(ms.ts2Rev)-1] - botVal := ms.ts2Rev[0] - minVal := ms.ts2Rev[ms.minPos] - maxVal := ms.ts2Rev[ms.maxPos] - if ts >= topVal.ts && ts < botVal.ts { - return topVal.rev - } else if ts >= minVal.ts && ts < topVal.ts { - return ms.searchOnCache(ts, ms.minPos, len(ms.ts2Rev)-ms.minPos) - } else if ts >= botVal.ts && ts < maxVal.ts { - return ms.searchOnCache(ts, 0, ms.maxPos+1) - } - - return 0 -} - -func (ms *MetaSnapshot) getRevOnEtcd(ts typeutil.Timestamp, rev int64) int64 { - if rev < 2 { - return 0 - } - ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout) - defer cancel() - - for rev--; rev >= 2; rev-- { - resp, err := ms.cli.Get(ctx, path.Join(ms.root, ms.tsKey), clientv3.WithRev(rev)) - if err != nil { - log.Debug("get ts from etcd failed", zap.Error(err)) - return 0 - } - if len(resp.Kvs) <= 0 { - return 0 - } - rev = resp.Kvs[0].ModRevision - curTs, err := strconv.ParseUint(string(resp.Kvs[0].Value), 10, 64) - if err != nil { - log.Debug("parse timestam error", zap.String("input", string(resp.Kvs[0].Value)), zap.Error(err)) - return 0 - } - if curTs <= ts { - return rev - } - } - return 0 -} - -func (ms *MetaSnapshot) getRev(ts typeutil.Timestamp) (int64, error) { - rev := ms.getRevOnCache(ts) - if rev > 0 { - return rev, nil - } - rev = ms.ts2Rev[ms.minPos].rev - rev = ms.getRevOnEtcd(ts, rev) - if rev > 0 { - return rev, nil - } - return 0, fmt.Errorf("can't find revision on ts=%d", ts) -} - -func (ms *MetaSnapshot) Save(key, value string, ts typeutil.Timestamp) error { - ms.lock.Lock() - defer ms.lock.Unlock() - ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout) - defer cancel() - - strTs := strconv.FormatInt(int64(ts), 10) - resp, err := ms.cli.Txn(ctx).If().Then( - clientv3.OpPut(path.Join(ms.root, key), value), - clientv3.OpPut(path.Join(ms.root, ms.tsKey), strTs), - ).Commit() - if err != nil { - return err - } - ms.putTs(resp.Header.Revision, ts) - - return nil -} - -func (ms *MetaSnapshot) Load(key string, ts typeutil.Timestamp) (string, error) { - ms.lock.RLock() - defer ms.lock.RUnlock() - ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout) - defer cancel() - - var resp *clientv3.GetResponse - var err error - var rev int64 - if ts == 0 { - resp, err = ms.cli.Get(ctx, path.Join(ms.root, key)) - if err != nil { - return "", err - } - } else { - rev, err = ms.getRev(ts) - if err != nil { - return "", err - } - resp, err = ms.cli.Get(ctx, path.Join(ms.root, key), clientv3.WithRev(rev)) - if err != nil { - return "", err - } - } - if len(resp.Kvs) == 0 { - return "", fmt.Errorf("there is no value on key = %s, ts = %d", key, ts) - } - return string(resp.Kvs[0].Value), nil -} - -func (ms *MetaSnapshot) MultiSave(kvs map[string]string, ts typeutil.Timestamp) error { - ms.lock.Lock() - defer ms.lock.Unlock() - ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout) - defer cancel() - - ops := make([]clientv3.Op, 0, len(kvs)+1) - for key, value := range kvs { - ops = append(ops, clientv3.OpPut(path.Join(ms.root, key), value)) - } - - strTs := strconv.FormatInt(int64(ts), 10) - ops = append(ops, clientv3.OpPut(path.Join(ms.root, ms.tsKey), strTs)) - resp, err := ms.cli.Txn(ctx).If().Then(ops...).Commit() - if err != nil { - return err - } - ms.putTs(resp.Header.Revision, ts) - return nil -} - -func (ms *MetaSnapshot) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]string, []string, error) { - ms.lock.RLock() - defer ms.lock.RUnlock() - ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout) - defer cancel() - - var resp *clientv3.GetResponse - var err error - var rev int64 - if ts == 0 { - resp, err = ms.cli.Get(ctx, path.Join(ms.root, key), clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)) - if err != nil { - return nil, nil, err - } - } else { - rev, err = ms.getRev(ts) - if err != nil { - return nil, nil, err - } - resp, err = ms.cli.Get(ctx, path.Join(ms.root, key), clientv3.WithPrefix(), clientv3.WithRev(rev), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)) - if err != nil { - return nil, nil, err - } - } - keys := make([]string, 0, len(resp.Kvs)) - values := make([]string, 0, len(resp.Kvs)) - tk := path.Join(ms.root, "k") - prefixLen := len(tk) - 1 - for _, kv := range resp.Kvs { - tk = string(kv.Key) - tk = tk[prefixLen:] - keys = append(keys, tk) - values = append(values, string(kv.Value)) - } - return keys, values, nil -} - -func (ms *MetaSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, ts typeutil.Timestamp) error { - ms.lock.Lock() - defer ms.lock.Unlock() - ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout) - defer cancel() - - ops := make([]clientv3.Op, 0, len(saves)+len(removals)+1) - for key, value := range saves { - ops = append(ops, clientv3.OpPut(path.Join(ms.root, key), value)) - } - - strTs := strconv.FormatInt(int64(ts), 10) - for _, key := range removals { - ops = append(ops, clientv3.OpDelete(path.Join(ms.root, key), clientv3.WithPrefix())) - } - ops = append(ops, clientv3.OpPut(path.Join(ms.root, ms.tsKey), strTs)) - resp, err := ms.cli.Txn(ctx).If().Then(ops...).Commit() - if err != nil { - return err - } - ms.putTs(resp.Header.Revision, ts) - return nil -} diff --git a/internal/metastore/kv/rootcoord/meta_snapshot_test.go b/internal/metastore/kv/rootcoord/meta_snapshot_test.go deleted file mode 100644 index 10b986a3ee..0000000000 --- a/internal/metastore/kv/rootcoord/meta_snapshot_test.go +++ /dev/null @@ -1,519 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package rootcoord - -import ( - "context" - "fmt" - "math/rand" - "os" - "path" - "testing" - "time" - - "github.com/milvus-io/milvus/internal/util/paramtable" - - "github.com/milvus-io/milvus/internal/util/etcd" - "github.com/milvus-io/milvus/internal/util/typeutil" - "github.com/stretchr/testify/assert" -) - -var Params paramtable.ComponentParam - -func TestMain(m *testing.M) { - Params.Init() - code := m.Run() - os.Exit(code) -} - -func TestMetaSnapshot(t *testing.T) { - rand.Seed(time.Now().UnixNano()) - randVal := rand.Int() - - Params.Init() - rootPath := fmt.Sprintf("/test/meta/%d", randVal) - tsKey := "timestamp" - - etcdCli, err := etcd.GetEtcdClient( - Params.EtcdCfg.UseEmbedEtcd.GetAsBool(), - Params.EtcdCfg.EtcdUseSSL.GetAsBool(), - Params.EtcdCfg.Endpoints.GetAsStrings(), - Params.EtcdCfg.EtcdTLSCert.GetValue(), - Params.EtcdCfg.EtcdTLSKey.GetValue(), - Params.EtcdCfg.EtcdTLSCACert.GetValue(), - Params.EtcdCfg.EtcdTLSMinVersion.GetValue()) - assert.Nil(t, err) - defer etcdCli.Close() - - var vtso typeutil.Timestamp - ftso := func() typeutil.Timestamp { - return vtso - } - - ms, err := NewMetaSnapshot(etcdCli, rootPath, tsKey, 4) - assert.Nil(t, err) - assert.NotNil(t, ms) - - for i := 0; i < 8; i++ { - vtso = typeutil.Timestamp(100 + i) - ts := ftso() - err = ms.Save("abc", fmt.Sprintf("value-%d", i), ts) - assert.Nil(t, err) - assert.Equal(t, vtso, ts) - _, err = etcdCli.Put(context.Background(), "other", fmt.Sprintf("other-%d", i)) - assert.Nil(t, err) - } - - ms, err = NewMetaSnapshot(etcdCli, rootPath, tsKey, 4) - assert.Nil(t, err) - assert.NotNil(t, ms) -} - -func TestSearchOnCache(t *testing.T) { - ms := &MetaSnapshot{} - for i := 0; i < 8; i++ { - ms.ts2Rev = append(ms.ts2Rev, - rtPair{ - rev: int64(i * 2), - ts: typeutil.Timestamp(i * 2), - }) - } - rev := ms.searchOnCache(9, 0, 8) - assert.Equal(t, int64(8), rev) - rev = ms.searchOnCache(1, 0, 2) - assert.Equal(t, int64(0), rev) - rev = ms.searchOnCache(1, 0, 8) - assert.Equal(t, int64(0), rev) - rev = ms.searchOnCache(14, 0, 8) - assert.Equal(t, int64(14), rev) - rev = ms.searchOnCache(0, 0, 8) - assert.Equal(t, int64(0), rev) -} - -func TestGetRevOnCache(t *testing.T) { - ms := &MetaSnapshot{} - ms.ts2Rev = make([]rtPair, 7) - ms.initTs(7, 16) - ms.initTs(6, 14) - ms.initTs(5, 12) - ms.initTs(4, 10) - - var rev int64 - rev = ms.getRevOnCache(17) - assert.Equal(t, int64(7), rev) - rev = ms.getRevOnCache(9) - assert.Equal(t, int64(0), rev) - rev = ms.getRevOnCache(10) - assert.Equal(t, int64(4), rev) - rev = ms.getRevOnCache(16) - assert.Equal(t, int64(7), rev) - rev = ms.getRevOnCache(15) - assert.Equal(t, int64(6), rev) - rev = ms.getRevOnCache(12) - assert.Equal(t, int64(5), rev) - - ms.initTs(3, 8) - ms.initTs(2, 6) - assert.Equal(t, ms.maxPos, 6) - assert.Equal(t, ms.minPos, 1) - - rev = ms.getRevOnCache(17) - assert.Equal(t, int64(7), rev) - rev = ms.getRevOnCache(9) - assert.Equal(t, int64(3), rev) - rev = ms.getRevOnCache(10) - assert.Equal(t, int64(4), rev) - rev = ms.getRevOnCache(16) - assert.Equal(t, int64(7), rev) - rev = ms.getRevOnCache(15) - assert.Equal(t, int64(6), rev) - rev = ms.getRevOnCache(12) - assert.Equal(t, int64(5), rev) - rev = ms.getRevOnCache(5) - assert.Equal(t, int64(0), rev) - - ms.putTs(8, 18) - assert.Equal(t, ms.maxPos, 0) - assert.Equal(t, ms.minPos, 1) - for rev = 2; rev <= 7; rev++ { - ts := ms.getRevOnCache(typeutil.Timestamp(rev*2 + 3)) - assert.Equal(t, rev, ts) - } - ms.putTs(9, 20) - assert.Equal(t, ms.maxPos, 1) - assert.Equal(t, ms.minPos, 2) - assert.Equal(t, ms.numTs, 7) - - curMax := ms.maxPos - curMin := ms.minPos - for i := 10; i < 20; i++ { - ms.putTs(int64(i), typeutil.Timestamp(i*2+2)) - curMax++ - curMin++ - if curMax == len(ms.ts2Rev) { - curMax = 0 - } - if curMin == len(ms.ts2Rev) { - curMin = 0 - } - assert.Equal(t, curMax, ms.maxPos) - assert.Equal(t, curMin, ms.minPos) - } - - for i := 13; i < 20; i++ { - rev = ms.getRevOnCache(typeutil.Timestamp(i*2 + 2)) - assert.Equal(t, int64(i), rev) - rev = ms.getRevOnCache(typeutil.Timestamp(i*2 + 3)) - assert.Equal(t, int64(i), rev) - } - rev = ms.getRevOnCache(27) - assert.Zero(t, rev) -} - -func TestGetRevOnEtcd(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel() - rand.Seed(time.Now().UnixNano()) - randVal := rand.Int() - - Params.Init() - rootPath := fmt.Sprintf("/test/meta/%d", randVal) - tsKey := "timestamp" - key := path.Join(rootPath, tsKey) - - etcdCli, err := etcd.GetEtcdClient( - Params.EtcdCfg.UseEmbedEtcd.GetAsBool(), - Params.EtcdCfg.EtcdUseSSL.GetAsBool(), - Params.EtcdCfg.Endpoints.GetAsStrings(), - Params.EtcdCfg.EtcdTLSCert.GetValue(), - Params.EtcdCfg.EtcdTLSKey.GetValue(), - Params.EtcdCfg.EtcdTLSCACert.GetValue(), - Params.EtcdCfg.EtcdTLSMinVersion.GetValue()) - assert.Nil(t, err) - defer etcdCli.Close() - - ms := MetaSnapshot{ - cli: etcdCli, - root: rootPath, - tsKey: tsKey, - } - resp, err := etcdCli.Put(ctx, key, "100") - assert.Nil(t, err) - revList := []int64{} - tsList := []typeutil.Timestamp{} - revList = append(revList, resp.Header.Revision) - tsList = append(tsList, 100) - for i := 110; i < 200; i += 10 { - resp, err = etcdCli.Put(ctx, key, fmt.Sprintf("%d", i)) - assert.Nil(t, err) - revList = append(revList, resp.Header.Revision) - tsList = append(tsList, typeutil.Timestamp(i)) - } - lastRev := revList[len(revList)-1] + 1 - for i, ts := range tsList { - rev := ms.getRevOnEtcd(ts, lastRev) - assert.Equal(t, revList[i], rev) - } - for i := 0; i < len(tsList); i++ { - rev := ms.getRevOnEtcd(tsList[i]+5, lastRev) - assert.Equal(t, revList[i], rev) - } - rev := ms.getRevOnEtcd(200, lastRev) - assert.Equal(t, lastRev-1, rev) - rev = ms.getRevOnEtcd(99, lastRev) - assert.Zero(t, rev) -} - -func TestLoad(t *testing.T) { - rand.Seed(time.Now().UnixNano()) - randVal := rand.Int() - - Params.Init() - rootPath := fmt.Sprintf("/test/meta/%d", randVal) - tsKey := "timestamp" - - etcdCli, err := etcd.GetEtcdClient( - Params.EtcdCfg.UseEmbedEtcd.GetAsBool(), - Params.EtcdCfg.EtcdUseSSL.GetAsBool(), - Params.EtcdCfg.Endpoints.GetAsStrings(), - Params.EtcdCfg.EtcdTLSCert.GetValue(), - Params.EtcdCfg.EtcdTLSKey.GetValue(), - Params.EtcdCfg.EtcdTLSCACert.GetValue(), - Params.EtcdCfg.EtcdTLSMinVersion.GetValue()) - assert.Nil(t, err) - defer etcdCli.Close() - - var vtso typeutil.Timestamp - ftso := func() typeutil.Timestamp { - return vtso - } - - ms, err := NewMetaSnapshot(etcdCli, rootPath, tsKey, 7) - assert.Nil(t, err) - assert.NotNil(t, ms) - - for i := 0; i < 20; i++ { - vtso = typeutil.Timestamp(100 + i*5) - ts := ftso() - err = ms.Save("key", fmt.Sprintf("value-%d", i), ts) - assert.Nil(t, err) - assert.Equal(t, vtso, ts) - } - for i := 0; i < 20; i++ { - val, err := ms.Load("key", typeutil.Timestamp(100+i*5+2)) - assert.Nil(t, err) - assert.Equal(t, val, fmt.Sprintf("value-%d", i)) - } - val, err := ms.Load("key", 0) - assert.Nil(t, err) - assert.Equal(t, "value-19", val) - - ms, err = NewMetaSnapshot(etcdCli, rootPath, tsKey, 11) - assert.Nil(t, err) - assert.NotNil(t, ms) - - for i := 0; i < 20; i++ { - val, err := ms.Load("key", typeutil.Timestamp(100+i*5+2)) - assert.Nil(t, err) - assert.Equal(t, val, fmt.Sprintf("value-%d", i)) - } -} - -func TestMultiSave(t *testing.T) { - rand.Seed(time.Now().UnixNano()) - randVal := rand.Int() - - Params.Init() - rootPath := fmt.Sprintf("/test/meta/%d", randVal) - tsKey := "timestamp" - - etcdCli, err := etcd.GetEtcdClient( - Params.EtcdCfg.UseEmbedEtcd.GetAsBool(), - Params.EtcdCfg.EtcdUseSSL.GetAsBool(), - Params.EtcdCfg.Endpoints.GetAsStrings(), - Params.EtcdCfg.EtcdTLSCert.GetValue(), - Params.EtcdCfg.EtcdTLSKey.GetValue(), - Params.EtcdCfg.EtcdTLSCACert.GetValue(), - Params.EtcdCfg.EtcdTLSMinVersion.GetValue()) - assert.Nil(t, err) - defer etcdCli.Close() - - var vtso typeutil.Timestamp - ftso := func() typeutil.Timestamp { - return vtso - } - - ms, err := NewMetaSnapshot(etcdCli, rootPath, tsKey, 7) - assert.Nil(t, err) - assert.NotNil(t, ms) - - for i := 0; i < 20; i++ { - saves := map[string]string{"k1": fmt.Sprintf("v1-%d", i), "k2": fmt.Sprintf("v2-%d", i)} - vtso = typeutil.Timestamp(100 + i*5) - ts := ftso() - err = ms.MultiSave(saves, ts) - assert.Nil(t, err) - assert.Equal(t, vtso, ts) - } - for i := 0; i < 20; i++ { - keys, vals, err := ms.LoadWithPrefix("k", typeutil.Timestamp(100+i*5+2)) - assert.Nil(t, err) - assert.Equal(t, len(keys), len(vals)) - assert.Equal(t, len(keys), 2) - assert.Equal(t, keys[0], "k1") - assert.Equal(t, keys[1], "k2") - assert.Equal(t, vals[0], fmt.Sprintf("v1-%d", i)) - assert.Equal(t, vals[1], fmt.Sprintf("v2-%d", i)) - } - keys, vals, err := ms.LoadWithPrefix("k", 0) - assert.Nil(t, err) - assert.Equal(t, len(keys), len(vals)) - assert.Equal(t, len(keys), 2) - assert.Equal(t, keys[0], "k1") - assert.Equal(t, keys[1], "k2") - assert.Equal(t, vals[0], "v1-19") - assert.Equal(t, vals[1], "v2-19") - - ms, err = NewMetaSnapshot(etcdCli, rootPath, tsKey, 11) - assert.Nil(t, err) - assert.NotNil(t, ms) - - for i := 0; i < 20; i++ { - keys, vals, err := ms.LoadWithPrefix("k", typeutil.Timestamp(100+i*5+2)) - assert.Nil(t, err) - assert.Equal(t, len(keys), len(vals)) - assert.Equal(t, len(keys), 2) - assert.Equal(t, keys[0], "k1") - assert.Equal(t, keys[1], "k2") - assert.Equal(t, vals[0], fmt.Sprintf("v1-%d", i)) - assert.Equal(t, vals[1], fmt.Sprintf("v2-%d", i)) - } -} - -func TestMultiSaveAndRemoveWithPrefix(t *testing.T) { - rand.Seed(time.Now().UnixNano()) - randVal := rand.Int() - - Params.Init() - rootPath := fmt.Sprintf("/test/meta/%d", randVal) - tsKey := "timestamp" - - etcdCli, err := etcd.GetEtcdClient( - Params.EtcdCfg.UseEmbedEtcd.GetAsBool(), - Params.EtcdCfg.EtcdUseSSL.GetAsBool(), - Params.EtcdCfg.Endpoints.GetAsStrings(), - Params.EtcdCfg.EtcdTLSCert.GetValue(), - Params.EtcdCfg.EtcdTLSKey.GetValue(), - Params.EtcdCfg.EtcdTLSCACert.GetValue(), - Params.EtcdCfg.EtcdTLSMinVersion.GetValue()) - assert.Nil(t, err) - defer etcdCli.Close() - - var vtso typeutil.Timestamp - ftso := func() typeutil.Timestamp { - return vtso - } - defer etcdCli.Close() - - ms, err := NewMetaSnapshot(etcdCli, rootPath, tsKey, 7) - assert.Nil(t, err) - assert.NotNil(t, ms) - - for i := 0; i < 20; i++ { - vtso = typeutil.Timestamp(100 + i*5) - ts := ftso() - err = ms.Save(fmt.Sprintf("kd-%04d", i), fmt.Sprintf("value-%d", i), ts) - assert.Nil(t, err) - assert.Equal(t, vtso, ts) - } - for i := 20; i < 40; i++ { - sm := map[string]string{"ks": fmt.Sprintf("value-%d", i)} - dm := []string{fmt.Sprintf("kd-%04d", i-20)} - vtso = typeutil.Timestamp(100 + i*5) - ts := ftso() - err = ms.MultiSaveAndRemoveWithPrefix(sm, dm, ts) - assert.Nil(t, err) - assert.Equal(t, vtso, ts) - } - - for i := 0; i < 20; i++ { - val, err := ms.Load(fmt.Sprintf("kd-%04d", i), typeutil.Timestamp(100+i*5+2)) - assert.Nil(t, err) - assert.Equal(t, fmt.Sprintf("value-%d", i), val) - _, vals, err := ms.LoadWithPrefix("kd-", typeutil.Timestamp(100+i*5+2)) - assert.Nil(t, err) - assert.Equal(t, i+1, len(vals)) - } - for i := 20; i < 40; i++ { - val, err := ms.Load("ks", typeutil.Timestamp(100+i*5+2)) - assert.Nil(t, err) - assert.Equal(t, fmt.Sprintf("value-%d", i), val) - _, vals, err := ms.LoadWithPrefix("kd-", typeutil.Timestamp(100+i*5+2)) - assert.Nil(t, err) - assert.Equal(t, 39-i, len(vals)) - } - - ms, err = NewMetaSnapshot(etcdCli, rootPath, tsKey, 11) - assert.Nil(t, err) - assert.NotNil(t, ms) - - for i := 0; i < 20; i++ { - val, err := ms.Load(fmt.Sprintf("kd-%04d", i), typeutil.Timestamp(100+i*5+2)) - assert.Nil(t, err) - assert.Equal(t, fmt.Sprintf("value-%d", i), val) - _, vals, err := ms.LoadWithPrefix("kd-", typeutil.Timestamp(100+i*5+2)) - assert.Nil(t, err) - assert.Equal(t, i+1, len(vals)) - } - for i := 20; i < 40; i++ { - val, err := ms.Load("ks", typeutil.Timestamp(100+i*5+2)) - assert.Nil(t, err) - assert.Equal(t, fmt.Sprintf("value-%d", i), val) - _, vals, err := ms.LoadWithPrefix("kd-", typeutil.Timestamp(100+i*5+2)) - assert.Nil(t, err) - assert.Equal(t, 39-i, len(vals)) - } -} - -func TestTsBackward(t *testing.T) { - rand.Seed(time.Now().UnixNano()) - randVal := rand.Int() - - Params.Init() - rootPath := fmt.Sprintf("/test/meta/%d", randVal) - tsKey := "timestamp" - - etcdCli, err := etcd.GetEtcdClient( - Params.EtcdCfg.UseEmbedEtcd.GetAsBool(), - Params.EtcdCfg.EtcdUseSSL.GetAsBool(), - Params.EtcdCfg.Endpoints.GetAsStrings(), - Params.EtcdCfg.EtcdTLSCert.GetValue(), - Params.EtcdCfg.EtcdTLSKey.GetValue(), - Params.EtcdCfg.EtcdTLSCACert.GetValue(), - Params.EtcdCfg.EtcdTLSMinVersion.GetValue()) - assert.Nil(t, err) - defer etcdCli.Close() - - kv, err := NewMetaSnapshot(etcdCli, rootPath, tsKey, 1024) - assert.Nil(t, err) - - err = kv.loadTs() - assert.Nil(t, err) - - kv.Save("a", "b", 100) - kv.Save("a", "c", 99) // backward - kv.Save("a", "d", 200) - - kv, err = NewMetaSnapshot(etcdCli, rootPath, tsKey, 1024) - assert.Error(t, err) - -} - -func TestFix7150(t *testing.T) { - rand.Seed(time.Now().UnixNano()) - randVal := rand.Int() - - Params.Init() - rootPath := fmt.Sprintf("/test/meta/%d", randVal) - tsKey := "timestamp" - - etcdCli, err := etcd.GetEtcdClient( - Params.EtcdCfg.UseEmbedEtcd.GetAsBool(), - Params.EtcdCfg.EtcdUseSSL.GetAsBool(), - Params.EtcdCfg.Endpoints.GetAsStrings(), - Params.EtcdCfg.EtcdTLSCert.GetValue(), - Params.EtcdCfg.EtcdTLSKey.GetValue(), - Params.EtcdCfg.EtcdTLSCACert.GetValue(), - Params.EtcdCfg.EtcdTLSMinVersion.GetValue()) - assert.Nil(t, err) - defer etcdCli.Close() - - kv, err := NewMetaSnapshot(etcdCli, rootPath, tsKey, 1024) - assert.Nil(t, err) - - err = kv.loadTs() - assert.Nil(t, err) - - kv.Save("a", "b", 100) - kv.Save("a", "c", 0) // bug introduced - kv.Save("a", "d", 200) - - kv, err = NewMetaSnapshot(etcdCli, rootPath, tsKey, 1024) - assert.Nil(t, err) - err = kv.loadTs() - assert.Nil(t, err) -} diff --git a/internal/metastore/kv/rootcoord/suffix_snapshot.go b/internal/metastore/kv/rootcoord/suffix_snapshot.go index e458026f74..4ef8fa59ab 100644 --- a/internal/metastore/kv/rootcoord/suffix_snapshot.go +++ b/internal/metastore/kv/rootcoord/suffix_snapshot.go @@ -26,19 +26,24 @@ import ( "strconv" "strings" "sync" + "time" + + "go.uber.org/zap" "github.com/milvus-io/milvus/internal/common" - "github.com/milvus-io/milvus/internal/kv" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/util/etcd" + "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/retry" + "github.com/milvus-io/milvus/internal/util/tsoutil" "github.com/milvus-io/milvus/internal/util/typeutil" - "go.uber.org/zap" ) var ( // SuffixSnapshotTombstone special value for tombstone mark SuffixSnapshotTombstone = []byte{0xE2, 0x9B, 0xBC} + PaginationSize = 5000 ) // IsTombstone used in migration tool also. @@ -56,7 +61,7 @@ func ConstructTombstone() []byte { // SuffixSnapshot record timestamp as prefix of a key under the Snapshot prefix path type SuffixSnapshot struct { // internal kv which SuffixSnapshot based on - kv.TxnKV + kv.MetaKv // rw mutex provided range lock sync.RWMutex // lastestTS latest timestamp for each key @@ -79,6 +84,8 @@ type SuffixSnapshot struct { // exp is the shortcut format checker for ts-key // composed with separator only exp *regexp.Regexp + + closeGC chan struct{} } // tsv struct stores kv with timestamp @@ -91,9 +98,9 @@ type tsv struct { var _ kv.SnapShotKV = (*SuffixSnapshot)(nil) // NewSuffixSnapshot creates a NewSuffixSnapshot with provided kv -func NewSuffixSnapshot(txnKV kv.TxnKV, sep, root, snapshot string) (*SuffixSnapshot, error) { - if txnKV == nil { - return nil, retry.Unrecoverable(errors.New("txnKV is nil")) +func NewSuffixSnapshot(metaKV kv.MetaKv, sep, root, snapshot string) (*SuffixSnapshot, error) { + if metaKV == nil { + return nil, retry.Unrecoverable(errors.New("MetaKv is nil")) } // handles trailing / logic @@ -104,8 +111,8 @@ func NewSuffixSnapshot(txnKV kv.TxnKV, sep, root, snapshot string) (*SuffixSnaps tk = path.Join(root, "k") rootLen := len(tk) - 1 - return &SuffixSnapshot{ - TxnKV: txnKV, + ss := &SuffixSnapshot{ + MetaKv: metaKV, lastestTS: make(map[string]typeutil.Timestamp), separator: sep, exp: regexp.MustCompile(fmt.Sprintf(`^(.+)%s(\d+)$`, sep)), @@ -113,7 +120,10 @@ func NewSuffixSnapshot(txnKV kv.TxnKV, sep, root, snapshot string) (*SuffixSnaps snapshotLen: snapshotLen, rootPrefix: root, rootLen: rootLen, - }, nil + closeGC: make(chan struct{}, 1), + } + go ss.startBackgroundGC() + return ss, nil } // isTombstone helper function to check whether is tombstone mark @@ -200,9 +210,9 @@ func (ss *SuffixSnapshot) checkKeyTS(key string, ts typeutil.Timestamp) (bool, e // loadLatestTS load the loatest ts for specified key func (ss *SuffixSnapshot) loadLatestTS(key string) error { prefix := ss.composeSnapshotPrefix(key) - keys, _, err := ss.TxnKV.LoadWithPrefix(prefix) + keys, _, err := ss.MetaKv.LoadWithPrefix(prefix) if err != nil { - log.Warn("SuffixSnapshot txnkv LoadWithPrefix failed", zap.String("key", key), + log.Warn("SuffixSnapshot MetaKv LoadWithPrefix failed", zap.String("key", key), zap.Error(err)) return err } @@ -259,14 +269,14 @@ func binarySearchRecords(records []tsv, ts typeutil.Timestamp) (string, bool) { } // Save stores key-value pairs with timestamp -// if ts is 0, SuffixSnapshot works as a TxnKV +// if ts is 0, SuffixSnapshot works as a MetaKv // otherwise, SuffixSnapshot will store a ts-key as "key[sep]ts"-value pair in snapshot path // and for acceleration store original key-value if ts is the latest func (ss *SuffixSnapshot) Save(key string, value string, ts typeutil.Timestamp) error { - // if ts == 0, act like TxnKv + // if ts == 0, act like MetaKv // will not update lastestTs since ts not not valid if ts == 0 { - return ss.TxnKV.Save(key, value) + return ss.MetaKv.Save(key, value) } ss.Lock() @@ -281,7 +291,7 @@ func (ss *SuffixSnapshot) Save(key string, value string, ts typeutil.Timestamp) return err } if after { - err := ss.TxnKV.MultiSave(map[string]string{ + err := ss.MetaKv.MultiSave(map[string]string{ key: value, tsKey: value, }) @@ -293,14 +303,14 @@ func (ss *SuffixSnapshot) Save(key string, value string, ts typeutil.Timestamp) } // modifying history key, just save tskey-value - return ss.TxnKV.Save(tsKey, value) + return ss.MetaKv.Save(tsKey, value) } func (ss *SuffixSnapshot) Load(key string, ts typeutil.Timestamp) (string, error) { // if ts == 0, load latest by definition // and with acceleration logic, just do load key will do if ts == 0 { - value, err := ss.TxnKV.Load(key) + value, err := ss.MetaKv.Load(key) if ss.isTombstone(value) { return "", errors.New("no value found") } @@ -318,7 +328,7 @@ func (ss *SuffixSnapshot) Load(key string, ts typeutil.Timestamp) (string, error return "", err } if after { - value, err := ss.TxnKV.Load(key) + value, err := ss.MetaKv.Load(key) if ss.isTombstone(value) { return "", errors.New("no value found") } @@ -327,9 +337,9 @@ func (ss *SuffixSnapshot) Load(key string, ts typeutil.Timestamp) (string, error // before ts, do time travel // 1. load all tsKey with key/ prefix - keys, values, err := ss.TxnKV.LoadWithPrefix(ss.composeSnapshotPrefix(key)) + keys, values, err := ss.MetaKv.LoadWithPrefix(ss.composeSnapshotPrefix(key)) if err != nil { - log.Warn("prefixSnapshot txnKV LoadWithPrefix failed", zap.String("key", key), zap.Error(err)) + log.Warn("prefixSnapshot MetaKv LoadWithPrefix failed", zap.String("key", key), zap.Error(err)) return "", err } @@ -367,12 +377,12 @@ func (ss *SuffixSnapshot) Load(key string, ts typeutil.Timestamp) (string, error } // MultiSave save multiple kvs -// if ts == 0, act like TxnKV +// if ts == 0, act like MetaKv // each key-value will be treated using same logic like Save func (ss *SuffixSnapshot) MultiSave(kvs map[string]string, ts typeutil.Timestamp) error { - // if ts == 0, act like TxnKV + // if ts == 0, act like MetaKv if ts == 0 { - return ss.TxnKV.MultiSave(kvs) + return ss.MetaKv.MultiSave(kvs) } ss.Lock() defer ss.Unlock() @@ -385,7 +395,7 @@ func (ss *SuffixSnapshot) MultiSave(kvs map[string]string, ts typeutil.Timestamp } // multi save execute map; if succeeds, update ts in the update list - err = ss.TxnKV.MultiSave(execute) + err = ss.MetaKv.MultiSave(execute) if err == nil { for _, key := range updateList { ss.lastestTS[key] = ts @@ -424,7 +434,7 @@ func (ss *SuffixSnapshot) generateSaveExecute(kvs map[string]string, ts typeutil func (ss *SuffixSnapshot) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]string, []string, error) { // ts 0 case shall be treated as fetch latest/current value if ts == 0 { - keys, values, err := ss.TxnKV.LoadWithPrefix(key) + keys, values, err := ss.MetaKv.LoadWithPrefix(key) fks := keys[:0] //make([]string, 0, len(keys)) fvs := values[:0] //make([]string, 0, len(values)) // hide rootPrefix from return value @@ -441,7 +451,7 @@ func (ss *SuffixSnapshot) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]s ss.Lock() defer ss.Unlock() - keys, values, err := ss.TxnKV.LoadWithPrefix(key) + keys, values, err := ss.MetaKv.LoadWithPrefix(key) if err != nil { return nil, nil, err } @@ -456,7 +466,7 @@ func (ss *SuffixSnapshot) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]s for i, key := range keys { group := kvgroup{key: key, value: values[i]} // load prefix keys contains rootPrefix - sKeys, sValues, err := ss.TxnKV.LoadWithPrefix(ss.composeSnapshotPrefix(ss.hideRootPrefix(key))) + sKeys, sValues, err := ss.MetaKv.LoadWithPrefix(ss.composeSnapshotPrefix(ss.hideRootPrefix(key))) if err != nil { return nil, nil, err } @@ -500,12 +510,12 @@ func (ss *SuffixSnapshot) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]s } // MultiSaveAndRemoveWithPrefix save muiltple kvs and remove as well -// if ts == 0, act like TxnKV +// if ts == 0, act like MetaKv // each key-value will be treated in same logic like Save func (ss *SuffixSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, ts typeutil.Timestamp) error { - // if ts == 0, act like TxnKV + // if ts == 0, act like MetaKv if ts == 0 { - return ss.TxnKV.MultiSaveAndRemoveWithPrefix(saves, removals) + return ss.MetaKv.MultiSaveAndRemoveWithPrefix(saves, removals) } ss.Lock() defer ss.Unlock() @@ -519,9 +529,9 @@ func (ss *SuffixSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string, // load each removal, change execution to adding tombstones for _, removal := range removals { - keys, _, err := ss.TxnKV.LoadWithPrefix(removal) + keys, _, err := ss.MetaKv.LoadWithPrefix(removal) if err != nil { - log.Warn("SuffixSnapshot TxnKV LoadwithPrefix failed", zap.String("key", removal), zap.Error(err)) + log.Warn("SuffixSnapshot MetaKv LoadwithPrefix failed", zap.String("key", removal), zap.Error(err)) return err } @@ -535,7 +545,7 @@ func (ss *SuffixSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string, } // multi save execute map; if succeeds, update ts in the update list - err = ss.TxnKV.MultiSave(execute) + err = ss.MetaKv.MultiSave(execute) if err == nil { for _, key := range updateList { ss.lastestTS[key] = ts @@ -543,3 +553,123 @@ func (ss *SuffixSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string, } return err } + +func (ss *SuffixSnapshot) Close() { + close(ss.closeGC) +} + +// startBackgroundGC the data will clean up if key ts!=0 and expired +func (ss *SuffixSnapshot) startBackgroundGC() { + log.Debug("suffix snapshot GC goroutine start!") + ticker := time.NewTicker(60 * time.Minute) + defer ticker.Stop() + + params := paramtable.Get() + retentionDuration := params.CommonCfg.RetentionDuration.GetAsDuration(time.Second) + + for { + select { + case <-ss.closeGC: + log.Warn("quit suffix snapshot GC goroutine!") + return + case now := <-ticker.C: + err := ss.removeExpiredKvs(now, retentionDuration) + if err != nil { + log.Warn("remove expired data fail during GC", zap.Error(err)) + } + } + } +} + +func (ss *SuffixSnapshot) getOriginalKey(snapshotKey string) (string, error) { + if !strings.HasPrefix(snapshotKey, ss.snapshotPrefix) { + return "", fmt.Errorf("get original key failed, invailed snapshot key:%s", snapshotKey) + } + // collect keys that parent node is snapshot node if the corresponding the latest ts is expired. + idx := strings.LastIndex(snapshotKey, ss.separator) + if idx == -1 { + return "", fmt.Errorf("get original key failed, snapshot key:%s", snapshotKey) + } + prefix := snapshotKey[:idx] + return prefix[ss.snapshotLen:], nil +} + +func (ss *SuffixSnapshot) batchRemoveExpiredKvs(keyGroup []string, originalKey string, includeOriginalKey bool) error { + if includeOriginalKey { + keyGroup = append(keyGroup, originalKey) + } + + // to protect txn finished with ascend order, reverse the latest kv with tombstone to tail of array + sort.Strings(keyGroup) + removeFn := func(partialKeys []string) error { + return ss.MetaKv.MultiRemove(keyGroup) + } + return etcd.RemoveByBatch(keyGroup, removeFn) +} + +func (ss *SuffixSnapshot) removeExpiredKvs(now time.Time, retentionDuration time.Duration) error { + keyGroup := make([]string, 0) + latestOriginalKey := "" + latestValue := "" + groupCnt := 0 + + removeFn := func(curOriginalKey string) error { + if !ss.isTombstone(latestValue) { + return nil + } + return ss.batchRemoveExpiredKvs(keyGroup, curOriginalKey, groupCnt == len(keyGroup)) + } + + // walk all kvs with SortAsc, we need walk to the latest key for each key group to check the kv + // whether contains tombstone, then if so, it represents the original key has been removed. + // TODO: walk with Desc + err := ss.MetaKv.WalkWithPrefix(ss.snapshotPrefix, PaginationSize, func(k []byte, v []byte) error { + key := string(k) + value := string(v) + + key = ss.hideRootPrefix(key) + ts, ok := ss.isTSKey(key) + // it is original key if the key doesn't contain ts + if !ok { + log.Warn("skip key because it doesn't contain ts", zap.String("key", key)) + return nil + } + + curOriginalKey, err := ss.getOriginalKey(key) + if err != nil { + return err + } + + // reset if starting look up a new key group + if latestOriginalKey != "" && latestOriginalKey != curOriginalKey { + // it indicates all keys need to remove that the prefix is original key + // it means the latest original kvs has already been removed if the latest kv has tombstone marker. + if err := removeFn(latestOriginalKey); err != nil { + return err + } + + keyGroup = make([]string, 0) + groupCnt = 0 + } + + latestValue = value + groupCnt++ + latestOriginalKey = curOriginalKey + + // record keys if the kv is expired + pts, _ := tsoutil.ParseTS(ts) + expireTime := pts.Add(retentionDuration) + // break loop if it reaches expire time + if expireTime.Before(now) { + keyGroup = append(keyGroup, key) + } + + return nil + }) + + if err != nil { + return err + } + + return removeFn(latestOriginalKey) +} diff --git a/internal/metastore/kv/rootcoord/suffix_snapshot_test.go b/internal/metastore/kv/rootcoord/suffix_snapshot_test.go index f97410d6c9..69cb434630 100644 --- a/internal/metastore/kv/rootcoord/suffix_snapshot_test.go +++ b/internal/metastore/kv/rootcoord/suffix_snapshot_test.go @@ -17,22 +17,37 @@ package rootcoord import ( + "errors" "fmt" "math/rand" + "os" "testing" "time" - etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" - "github.com/milvus-io/milvus/internal/util/etcd" - "github.com/milvus-io/milvus/internal/util/typeutil" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + + etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" + "github.com/milvus-io/milvus/internal/kv/mocks" + "github.com/milvus-io/milvus/internal/util/etcd" + "github.com/milvus-io/milvus/internal/util/paramtable" + "github.com/milvus-io/milvus/internal/util/tsoutil" + "github.com/milvus-io/milvus/internal/util/typeutil" ) var ( snapshotPrefix = "snapshots" ) +var Params = paramtable.Get() + +func TestMain(m *testing.M) { + Params.Init() + code := m.Run() + os.Exit(code) +} + func Test_binarySearchRecords(t *testing.T) { type testcase struct { records []tsv @@ -173,6 +188,8 @@ func Test_ComposeIsTsKey(t *testing.T) { sep := "_ts" ss, err := NewSuffixSnapshot((*etcdkv.EtcdKV)(nil), sep, "", snapshotPrefix) require.Nil(t, err) + defer ss.Close() + type testcase struct { key string expected uint64 @@ -211,6 +228,8 @@ func Test_SuffixSnaphotIsTSOfKey(t *testing.T) { sep := "_ts" ss, err := NewSuffixSnapshot((*etcdkv.EtcdKV)(nil), sep, "", snapshotPrefix) require.Nil(t, err) + defer ss.Close() + type testcase struct { key string target string @@ -284,6 +303,7 @@ func Test_SuffixSnapshotLoad(t *testing.T) { ss, err := NewSuffixSnapshot(etcdkv, sep, rootPath, snapshotPrefix) assert.Nil(t, err) assert.NotNil(t, ss) + defer ss.Close() for i := 0; i < 20; i++ { vtso = typeutil.Timestamp(100 + i*5) @@ -302,10 +322,6 @@ func Test_SuffixSnapshotLoad(t *testing.T) { assert.Nil(t, err) assert.Equal(t, "value-19", val) - ss, err = NewSuffixSnapshot(etcdkv, sep, rootPath, snapshotPrefix) - assert.Nil(t, err) - assert.NotNil(t, ss) - for i := 0; i < 20; i++ { val, err := ss.Load("key", typeutil.Timestamp(100+i*5+2)) assert.Nil(t, err) @@ -343,6 +359,7 @@ func Test_SuffixSnapshotMultiSave(t *testing.T) { ss, err := NewSuffixSnapshot(etcdkv, sep, rootPath, snapshotPrefix) assert.Nil(t, err) assert.NotNil(t, ss) + defer ss.Close() for i := 0; i < 20; i++ { saves := map[string]string{"k1": fmt.Sprintf("v1-%d", i), "k2": fmt.Sprintf("v2-%d", i)} @@ -372,9 +389,6 @@ func Test_SuffixSnapshotMultiSave(t *testing.T) { assert.Equal(t, vals[0], "v1-19") assert.Equal(t, vals[1], "v2-19") - ss, err = NewSuffixSnapshot(etcdkv, sep, rootPath, snapshotPrefix) - assert.Nil(t, err) - assert.NotNil(t, ss) for i := 0; i < 20; i++ { keys, vals, err := ss.LoadWithPrefix("k", typeutil.Timestamp(100+i*5+2)) assert.Nil(t, err) @@ -397,6 +411,181 @@ func Test_SuffixSnapshotMultiSave(t *testing.T) { ss.RemoveWithPrefix("") } +func Test_SuffixSnapshotRemoveExpiredKvs(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + randVal := rand.Int() + + Params.Init() + rootPath := fmt.Sprintf("/test/meta/remove-expired-test-%d", randVal) + sep := "_ts" + + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd.GetAsBool(), + Params.EtcdCfg.EtcdUseSSL.GetAsBool(), + Params.EtcdCfg.Endpoints.GetAsStrings(), + Params.EtcdCfg.EtcdTLSCert.GetValue(), + Params.EtcdCfg.EtcdTLSKey.GetValue(), + Params.EtcdCfg.EtcdTLSCACert.GetValue(), + Params.EtcdCfg.EtcdTLSMinVersion.GetValue()) + assert.NoError(t, err) + defer etcdCli.Close() + etcdkv := etcdkv.NewEtcdKV(etcdCli, rootPath) + assert.NoError(t, err) + defer etcdkv.Close() + + ss, err := NewSuffixSnapshot(etcdkv, sep, rootPath, snapshotPrefix) + assert.NoError(t, err) + assert.NotNil(t, ss) + defer ss.Close() + + saveFn := func(key, value string, ts typeutil.Timestamp) { + err = ss.Save(key, value, ts) + assert.NoError(t, err) + } + + multiSaveFn := func(kvs map[string]string, ts typeutil.Timestamp) { + err = ss.MultiSave(kvs, ts) + assert.NoError(t, err) + } + + now := time.Now() + ftso := func(ts int) typeutil.Timestamp { + return tsoutil.ComposeTS(now.Add(-1*time.Duration(ts)*time.Millisecond).UnixMilli(), 0) + } + + getKey := func(prefix string, id int) string { + return fmt.Sprintf("%s-%d", prefix, id) + } + + generateTestData := func(prefix string, kCnt int, kVersion int, expiredKeyCnt int) { + var value string + cnt := 0 + for i := 0; i < kVersion; i++ { + kvs := make(map[string]string) + ts := ftso((i + 1) * 100) + for v := 0; v < kCnt; v++ { + if i == 0 && v%2 == 0 && cnt < expiredKeyCnt { + value = string(SuffixSnapshotTombstone) + cnt++ + } else { + value = "v" + } + + kvs[getKey(prefix, v)] = value + if v%25 == 0 { + multiSaveFn(kvs, ts) + kvs = make(map[string]string) + } + } + multiSaveFn(kvs, ts) + } + } + + countPrefix := func(prefix string) int { + cnt := 0 + err := etcdkv.WalkWithPrefix("", 10, func(key []byte, value []byte) error { + cnt++ + return nil + }) + assert.NoError(t, err) + return cnt + } + + t.Run("Mixed test ", func(t *testing.T) { + prefix := fmt.Sprintf("prefix%d", rand.Int()) + keyCnt := 500 + keyVersion := 3 + expiredKCnt := 100 + generateTestData(prefix, keyCnt, keyVersion, expiredKCnt) + + cnt := countPrefix(prefix) + assert.Equal(t, keyCnt*keyVersion+keyCnt, cnt) + + err = ss.removeExpiredKvs(now, time.Duration(50)*time.Millisecond) + assert.NoError(t, err) + + cnt = countPrefix(prefix) + assert.Equal(t, keyCnt*keyVersion+keyCnt-(expiredKCnt*keyVersion+expiredKCnt), cnt) + + // clean all data + err := etcdkv.RemoveWithPrefix("") + assert.NoError(t, err) + }) + + t.Run("partial expired and all expired", func(t *testing.T) { + prefix := fmt.Sprintf("prefix%d", rand.Int()) + value := "v" + ts := ftso(100) + saveFn(getKey(prefix, 0), value, ts) + ts = ftso(200) + saveFn(getKey(prefix, 0), value, ts) + ts = ftso(300) + saveFn(getKey(prefix, 0), value, ts) + + // insert partial expired kv + ts = ftso(25) + saveFn(getKey(prefix, 1), string(SuffixSnapshotTombstone), ts) + ts = ftso(50) + saveFn(getKey(prefix, 1), value, ts) + ts = ftso(70) + saveFn(getKey(prefix, 1), value, ts) + + // insert all expired kv + ts = ftso(100) + saveFn(getKey(prefix, 2), string(SuffixSnapshotTombstone), ts) + ts = ftso(200) + saveFn(getKey(prefix, 2), value, ts) + ts = ftso(300) + saveFn(getKey(prefix, 2), value, ts) + + cnt := countPrefix(prefix) + assert.Equal(t, 12, cnt) + + err = ss.removeExpiredKvs(now, time.Duration(50)*time.Millisecond) + assert.NoError(t, err) + + cnt = countPrefix(prefix) + assert.Equal(t, 6, cnt) + + // clean all data + err := etcdkv.RemoveWithPrefix("") + assert.NoError(t, err) + }) + + t.Run("parse ts fail", func(t *testing.T) { + prefix := fmt.Sprintf("prefix%d", rand.Int()) + key := fmt.Sprintf("%s-%s", prefix, "ts_error-ts") + err = etcdkv.Save(ss.composeSnapshotPrefix(key), "") + assert.NoError(t, err) + + err = ss.removeExpiredKvs(now, time.Duration(50)*time.Millisecond) + assert.NoError(t, err) + + cnt := countPrefix(prefix) + assert.Equal(t, 1, cnt) + + // clean all data + err := etcdkv.RemoveWithPrefix("") + assert.NoError(t, err) + }) + + t.Run("test walk kv data fail", func(t *testing.T) { + sep := "_ts" + rootPath := "root/" + kv := mocks.NewMetaKv(t) + kv.EXPECT(). + WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything). + Return(errors.New("error")) + + ss, err := NewSuffixSnapshot(kv, sep, rootPath, snapshotPrefix) + assert.NotNil(t, ss) + assert.NoError(t, err) + + err = ss.removeExpiredKvs(time.Now(), time.Duration(100)) + assert.Error(t, err) + }) +} + func Test_SuffixSnapshotMultiSaveAndRemoveWithPrefix(t *testing.T) { rand.Seed(time.Now().UnixNano()) randVal := rand.Int() @@ -427,6 +616,7 @@ func Test_SuffixSnapshotMultiSaveAndRemoveWithPrefix(t *testing.T) { ss, err := NewSuffixSnapshot(etcdkv, sep, rootPath, snapshotPrefix) assert.Nil(t, err) assert.NotNil(t, ss) + defer ss.Close() for i := 0; i < 20; i++ { vtso = typeutil.Timestamp(100 + i*5) @@ -461,10 +651,6 @@ func Test_SuffixSnapshotMultiSaveAndRemoveWithPrefix(t *testing.T) { assert.Equal(t, 39-i, len(vals)) } - ss, err = NewSuffixSnapshot(etcdkv, sep, rootPath, snapshotPrefix) - assert.Nil(t, err) - assert.NotNil(t, ss) - for i := 0; i < 20; i++ { val, err := ss.Load(fmt.Sprintf("kd-%04d", i), typeutil.Timestamp(100+i*5+2)) assert.Nil(t, err) @@ -492,3 +678,30 @@ func Test_SuffixSnapshotMultiSaveAndRemoveWithPrefix(t *testing.T) { // cleanup ss.MultiSaveAndRemoveWithPrefix(map[string]string{}, []string{""}, 0) } + +func Test_getOriginalKey(t *testing.T) { + sep := "_ts" + rootPath := "root/" + kv := mocks.NewMetaKv(t) + ss, err := NewSuffixSnapshot(kv, sep, rootPath, snapshotPrefix) + assert.NotNil(t, ss) + assert.NoError(t, err) + + t.Run("match prefix fail", func(t *testing.T) { + ret, err := ss.getOriginalKey("non-snapshots/k1") + assert.Equal(t, "", ret) + assert.Error(t, err) + }) + + t.Run("find separator fail", func(t *testing.T) { + ret, err := ss.getOriginalKey("snapshots/k1") + assert.Equal(t, "", ret) + assert.Error(t, err) + }) + + t.Run("ok", func(t *testing.T) { + ret, err := ss.getOriginalKey("snapshots/prefix-1_ts438497159122780160") + assert.Equal(t, "prefix-1", ret) + assert.NoError(t, err) + }) +} diff --git a/internal/rootcoord/proxy_manager.go b/internal/rootcoord/proxy_manager.go index d2279b3bbc..b8eae9354d 100644 --- a/internal/rootcoord/proxy_manager.go +++ b/internal/rootcoord/proxy_manager.go @@ -23,16 +23,16 @@ import ( "path" "sync" - "github.com/milvus-io/milvus/internal/metastore/kv/rootcoord" + "go.etcd.io/etcd/api/v3/mvccpb" v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" + clientv3 "go.etcd.io/etcd/client/v3" + "go.uber.org/zap" + etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/typeutil" - "go.etcd.io/etcd/api/v3/mvccpb" - clientv3 "go.etcd.io/etcd/client/v3" - "go.uber.org/zap" ) // proxyManager manages proxy connected to the rootcoord @@ -77,7 +77,7 @@ func (p *proxyManager) DelSessionFunc(fns ...func(*sessionutil.Session)) { // WatchProxy starts a goroutine to watch proxy session changes on etcd func (p *proxyManager) WatchProxy() error { - ctx, cancel := context.WithTimeout(p.ctx, rootcoord.RequestTimeout) + ctx, cancel := context.WithTimeout(p.ctx, etcdkv.RequestTimeout) defer cancel() sessions, rev, err := p.getSessionsOnEtcd(ctx)