From 608e9c4405f732ea146a925d2bc0ef79accf751b Mon Sep 17 00:00:00 2001 From: jaime Date: Wed, 18 Jan 2023 10:15:44 +0800 Subject: [PATCH] Add based on timetravel GC for snapshot KV (#21417) (#21763) Signed-off-by: jaime --- .../metastore/kv/rootcoord/meta_snapshot.go | 410 -------------- .../kv/rootcoord/meta_snapshot_test.go | 519 ------------------ .../metastore/kv/rootcoord/suffix_snapshot.go | 118 ++++ .../kv/rootcoord/suffix_snapshot_test.go | 204 ++++++- 4 files changed, 311 insertions(+), 940 deletions(-) delete mode 100644 internal/metastore/kv/rootcoord/meta_snapshot.go delete mode 100644 internal/metastore/kv/rootcoord/meta_snapshot_test.go diff --git a/internal/metastore/kv/rootcoord/meta_snapshot.go b/internal/metastore/kv/rootcoord/meta_snapshot.go deleted file mode 100644 index b592615517..0000000000 --- a/internal/metastore/kv/rootcoord/meta_snapshot.go +++ /dev/null @@ -1,410 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package rootcoord - -import ( - "context" - "fmt" - "path" - "strconv" - "sync" - "time" - - "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/util/typeutil" - clientv3 "go.etcd.io/etcd/client/v3" - "go.uber.org/zap" -) - -const ( - // RequestTimeout timeout for request - RequestTimeout = 10 * time.Second -) - -type rtPair struct { - rev int64 - ts typeutil.Timestamp -} - -type MetaSnapshot struct { - cli *clientv3.Client - root string - tsKey string - lock sync.RWMutex - - ts2Rev []rtPair - minPos int - maxPos int - numTs int -} - -func NewMetaSnapshot(cli *clientv3.Client, root, tsKey string, bufSize int) (*MetaSnapshot, error) { - if bufSize <= 0 { - bufSize = 1024 - } - ms := &MetaSnapshot{ - cli: cli, - root: root, - tsKey: tsKey, - lock: sync.RWMutex{}, - ts2Rev: make([]rtPair, bufSize), - minPos: 0, - maxPos: 0, - numTs: 0, - } - if err := ms.loadTs(); err != nil { - return nil, err - } - return ms, nil -} - -func (ms *MetaSnapshot) loadTs() error { - ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout) - defer cancel() - - key := path.Join(ms.root, ms.tsKey) - resp, err := ms.cli.Get(ctx, key) - if err != nil { - return err - } - if len(resp.Kvs) <= 0 { - return nil - } - version := resp.Kvs[0].Version - revision := resp.Kvs[0].ModRevision - createRevision := resp.Kvs[0].CreateRevision - strTs := string(resp.Kvs[0].Value) - ts, err := strconv.ParseUint(strTs, 10, 64) - if err != nil { - return err - } - log.Info("Load last ts", zap.Int64("version", version), zap.Int64("revision", revision)) - - ms.initTs(revision, ts) - // start from revision-1, until equals to create revision - for revision--; revision >= createRevision; revision-- { - if ms.numTs == len(ms.ts2Rev) { - break - } - resp, err = ms.cli.Get(ctx, key, clientv3.WithRev(revision)) - if err != nil { - return err - } - if len(resp.Kvs) <= 0 { - return nil - } - - curVer := resp.Kvs[0].Version - curRev := resp.Kvs[0].ModRevision - if curVer > version { - log.Warn("version go backwards", zap.Int64("curVer", curVer), zap.Int64("version", version)) - return nil - } - if curVer == version { - log.Info("Snapshot found save version with different revision", zap.Int64("revision", revision), zap.Int64("version", version)) - } - strTs := string(resp.Kvs[0].Value) - if strTs == "0" { - //#issue 7150, index building inserted "0", skipping - //this is a special fix for backward compatibility, the previous version will put 0 ts into the Snapshot building index - continue - } - curTs, err := strconv.ParseUint(strTs, 10, 64) - if err != nil { - return err - } - if curTs >= ts { - return fmt.Errorf("timestamp go back, curTs=%d,ts=%d", curTs, ts) - } - ms.initTs(curRev, curTs) - ts = curTs - revision = curRev - version = curVer - } - - return nil -} - -func (ms *MetaSnapshot) maxTs() typeutil.Timestamp { - return ms.ts2Rev[ms.maxPos].ts -} - -func (ms *MetaSnapshot) minTs() typeutil.Timestamp { - return ms.ts2Rev[ms.minPos].ts -} - -func (ms *MetaSnapshot) initTs(rev int64, ts typeutil.Timestamp) { - log.Info("init meta Snapshot ts", zap.Int64("rev", rev), zap.Uint64("ts", ts)) - if ms.numTs == 0 { - ms.maxPos = len(ms.ts2Rev) - 1 - ms.minPos = len(ms.ts2Rev) - 1 - ms.numTs = 1 - ms.ts2Rev[ms.maxPos].rev = rev - ms.ts2Rev[ms.maxPos].ts = ts - } else if ms.numTs < len(ms.ts2Rev) { - ms.minPos-- - ms.numTs++ - ms.ts2Rev[ms.minPos].rev = rev - ms.ts2Rev[ms.minPos].ts = ts - } -} - -func (ms *MetaSnapshot) putTs(rev int64, ts typeutil.Timestamp) { - log.Info("put meta snapshto ts", zap.Int64("rev", rev), zap.Uint64("ts", ts)) - ms.maxPos++ - if ms.maxPos == len(ms.ts2Rev) { - ms.maxPos = 0 - } - - ms.ts2Rev[ms.maxPos].rev = rev - ms.ts2Rev[ms.maxPos].ts = ts - if ms.numTs < len(ms.ts2Rev) { - ms.numTs++ - } else { - ms.minPos++ - if ms.minPos == len(ms.ts2Rev) { - ms.minPos = 0 - } - } -} - -func (ms *MetaSnapshot) searchOnCache(ts typeutil.Timestamp, start, length int) int64 { - if length == 1 { - return ms.ts2Rev[start].rev - } - begin := start - end := begin + length - mid := (begin + end) / 2 - for { - if ms.ts2Rev[mid].ts == ts { - return ms.ts2Rev[mid].rev - } - if mid == begin { - if ms.ts2Rev[mid].ts < ts || mid == start { - return ms.ts2Rev[mid].rev - } - return ms.ts2Rev[mid-1].rev - } - if ms.ts2Rev[mid].ts > ts { - end = mid - } else if ms.ts2Rev[mid].ts < ts { - begin = mid + 1 - } - mid = (begin + end) / 2 - } -} - -func (ms *MetaSnapshot) getRevOnCache(ts typeutil.Timestamp) int64 { - if ms.numTs == 0 { - return 0 - } - if ts >= ms.ts2Rev[ms.maxPos].ts { - return ms.ts2Rev[ms.maxPos].rev - } - if ts < ms.ts2Rev[ms.minPos].ts { - return 0 - } - if ms.maxPos > ms.minPos { - return ms.searchOnCache(ts, ms.minPos, ms.maxPos-ms.minPos+1) - } - topVal := ms.ts2Rev[len(ms.ts2Rev)-1] - botVal := ms.ts2Rev[0] - minVal := ms.ts2Rev[ms.minPos] - maxVal := ms.ts2Rev[ms.maxPos] - if ts >= topVal.ts && ts < botVal.ts { - return topVal.rev - } else if ts >= minVal.ts && ts < topVal.ts { - return ms.searchOnCache(ts, ms.minPos, len(ms.ts2Rev)-ms.minPos) - } else if ts >= botVal.ts && ts < maxVal.ts { - return ms.searchOnCache(ts, 0, ms.maxPos+1) - } - - return 0 -} - -func (ms *MetaSnapshot) getRevOnEtcd(ts typeutil.Timestamp, rev int64) int64 { - if rev < 2 { - return 0 - } - ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout) - defer cancel() - - for rev--; rev >= 2; rev-- { - resp, err := ms.cli.Get(ctx, path.Join(ms.root, ms.tsKey), clientv3.WithRev(rev)) - if err != nil { - log.Info("get ts from etcd failed", zap.Error(err)) - return 0 - } - if len(resp.Kvs) <= 0 { - return 0 - } - rev = resp.Kvs[0].ModRevision - curTs, err := strconv.ParseUint(string(resp.Kvs[0].Value), 10, 64) - if err != nil { - log.Info("parse timestam error", zap.String("input", string(resp.Kvs[0].Value)), zap.Error(err)) - return 0 - } - if curTs <= ts { - return rev - } - } - return 0 -} - -func (ms *MetaSnapshot) getRev(ts typeutil.Timestamp) (int64, error) { - rev := ms.getRevOnCache(ts) - if rev > 0 { - return rev, nil - } - rev = ms.ts2Rev[ms.minPos].rev - rev = ms.getRevOnEtcd(ts, rev) - if rev > 0 { - return rev, nil - } - return 0, fmt.Errorf("can't find revision on ts=%d", ts) -} - -func (ms *MetaSnapshot) Save(key, value string, ts typeutil.Timestamp) error { - ms.lock.Lock() - defer ms.lock.Unlock() - ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout) - defer cancel() - - strTs := strconv.FormatInt(int64(ts), 10) - resp, err := ms.cli.Txn(ctx).If().Then( - clientv3.OpPut(path.Join(ms.root, key), value), - clientv3.OpPut(path.Join(ms.root, ms.tsKey), strTs), - ).Commit() - if err != nil { - return err - } - ms.putTs(resp.Header.Revision, ts) - - return nil -} - -func (ms *MetaSnapshot) Load(key string, ts typeutil.Timestamp) (string, error) { - ms.lock.RLock() - defer ms.lock.RUnlock() - ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout) - defer cancel() - - var resp *clientv3.GetResponse - var err error - var rev int64 - if ts == 0 { - resp, err = ms.cli.Get(ctx, path.Join(ms.root, key)) - if err != nil { - return "", err - } - } else { - rev, err = ms.getRev(ts) - if err != nil { - return "", err - } - resp, err = ms.cli.Get(ctx, path.Join(ms.root, key), clientv3.WithRev(rev)) - if err != nil { - return "", err - } - } - if len(resp.Kvs) == 0 { - return "", fmt.Errorf("there is no value on key = %s, ts = %d", key, ts) - } - return string(resp.Kvs[0].Value), nil -} - -func (ms *MetaSnapshot) MultiSave(kvs map[string]string, ts typeutil.Timestamp) error { - ms.lock.Lock() - defer ms.lock.Unlock() - ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout) - defer cancel() - - ops := make([]clientv3.Op, 0, len(kvs)+1) - for key, value := range kvs { - ops = append(ops, clientv3.OpPut(path.Join(ms.root, key), value)) - } - - strTs := strconv.FormatInt(int64(ts), 10) - ops = append(ops, clientv3.OpPut(path.Join(ms.root, ms.tsKey), strTs)) - resp, err := ms.cli.Txn(ctx).If().Then(ops...).Commit() - if err != nil { - return err - } - ms.putTs(resp.Header.Revision, ts) - return nil -} - -func (ms *MetaSnapshot) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]string, []string, error) { - ms.lock.RLock() - defer ms.lock.RUnlock() - ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout) - defer cancel() - - var resp *clientv3.GetResponse - var err error - var rev int64 - if ts == 0 { - resp, err = ms.cli.Get(ctx, path.Join(ms.root, key), clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)) - if err != nil { - return nil, nil, err - } - } else { - rev, err = ms.getRev(ts) - if err != nil { - return nil, nil, err - } - resp, err = ms.cli.Get(ctx, path.Join(ms.root, key), clientv3.WithPrefix(), clientv3.WithRev(rev), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)) - if err != nil { - return nil, nil, err - } - } - keys := make([]string, 0, len(resp.Kvs)) - values := make([]string, 0, len(resp.Kvs)) - tk := path.Join(ms.root, "k") - prefixLen := len(tk) - 1 - for _, kv := range resp.Kvs { - tk = string(kv.Key) - tk = tk[prefixLen:] - keys = append(keys, tk) - values = append(values, string(kv.Value)) - } - return keys, values, nil -} - -func (ms *MetaSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, ts typeutil.Timestamp) error { - ms.lock.Lock() - defer ms.lock.Unlock() - ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout) - defer cancel() - - ops := make([]clientv3.Op, 0, len(saves)+len(removals)+1) - for key, value := range saves { - ops = append(ops, clientv3.OpPut(path.Join(ms.root, key), value)) - } - - strTs := strconv.FormatInt(int64(ts), 10) - for _, key := range removals { - ops = append(ops, clientv3.OpDelete(path.Join(ms.root, key), clientv3.WithPrefix())) - } - ops = append(ops, clientv3.OpPut(path.Join(ms.root, ms.tsKey), strTs)) - resp, err := ms.cli.Txn(ctx).If().Then(ops...).Commit() - if err != nil { - return err - } - ms.putTs(resp.Header.Revision, ts) - return nil -} diff --git a/internal/metastore/kv/rootcoord/meta_snapshot_test.go b/internal/metastore/kv/rootcoord/meta_snapshot_test.go deleted file mode 100644 index 1396111df9..0000000000 --- a/internal/metastore/kv/rootcoord/meta_snapshot_test.go +++ /dev/null @@ -1,519 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package rootcoord - -import ( - "context" - "fmt" - "math/rand" - "os" - "path" - "testing" - "time" - - "github.com/milvus-io/milvus/internal/util/paramtable" - - "github.com/milvus-io/milvus/internal/util/etcd" - "github.com/milvus-io/milvus/internal/util/typeutil" - "github.com/stretchr/testify/assert" -) - -var Params paramtable.ComponentParam - -func TestMain(m *testing.M) { - Params.Init() - code := m.Run() - os.Exit(code) -} - -func TestMetaSnapshot(t *testing.T) { - rand.Seed(time.Now().UnixNano()) - randVal := rand.Int() - - Params.Init() - rootPath := fmt.Sprintf("/test/meta/%d", randVal) - tsKey := "timestamp" - - etcdCli, err := etcd.GetEtcdClient( - Params.EtcdCfg.UseEmbedEtcd, - Params.EtcdCfg.EtcdUseSSL, - Params.EtcdCfg.Endpoints, - Params.EtcdCfg.EtcdTLSCert, - Params.EtcdCfg.EtcdTLSKey, - Params.EtcdCfg.EtcdTLSCACert, - Params.EtcdCfg.EtcdTLSMinVersion) - assert.Nil(t, err) - defer etcdCli.Close() - - var vtso typeutil.Timestamp - ftso := func() typeutil.Timestamp { - return vtso - } - - ms, err := NewMetaSnapshot(etcdCli, rootPath, tsKey, 4) - assert.Nil(t, err) - assert.NotNil(t, ms) - - for i := 0; i < 8; i++ { - vtso = typeutil.Timestamp(100 + i) - ts := ftso() - err = ms.Save("abc", fmt.Sprintf("value-%d", i), ts) - assert.Nil(t, err) - assert.Equal(t, vtso, ts) - _, err = etcdCli.Put(context.Background(), "other", fmt.Sprintf("other-%d", i)) - assert.Nil(t, err) - } - - ms, err = NewMetaSnapshot(etcdCli, rootPath, tsKey, 4) - assert.Nil(t, err) - assert.NotNil(t, ms) -} - -func TestSearchOnCache(t *testing.T) { - ms := &MetaSnapshot{} - for i := 0; i < 8; i++ { - ms.ts2Rev = append(ms.ts2Rev, - rtPair{ - rev: int64(i * 2), - ts: typeutil.Timestamp(i * 2), - }) - } - rev := ms.searchOnCache(9, 0, 8) - assert.Equal(t, int64(8), rev) - rev = ms.searchOnCache(1, 0, 2) - assert.Equal(t, int64(0), rev) - rev = ms.searchOnCache(1, 0, 8) - assert.Equal(t, int64(0), rev) - rev = ms.searchOnCache(14, 0, 8) - assert.Equal(t, int64(14), rev) - rev = ms.searchOnCache(0, 0, 8) - assert.Equal(t, int64(0), rev) -} - -func TestGetRevOnCache(t *testing.T) { - ms := &MetaSnapshot{} - ms.ts2Rev = make([]rtPair, 7) - ms.initTs(7, 16) - ms.initTs(6, 14) - ms.initTs(5, 12) - ms.initTs(4, 10) - - var rev int64 - rev = ms.getRevOnCache(17) - assert.Equal(t, int64(7), rev) - rev = ms.getRevOnCache(9) - assert.Equal(t, int64(0), rev) - rev = ms.getRevOnCache(10) - assert.Equal(t, int64(4), rev) - rev = ms.getRevOnCache(16) - assert.Equal(t, int64(7), rev) - rev = ms.getRevOnCache(15) - assert.Equal(t, int64(6), rev) - rev = ms.getRevOnCache(12) - assert.Equal(t, int64(5), rev) - - ms.initTs(3, 8) - ms.initTs(2, 6) - assert.Equal(t, ms.maxPos, 6) - assert.Equal(t, ms.minPos, 1) - - rev = ms.getRevOnCache(17) - assert.Equal(t, int64(7), rev) - rev = ms.getRevOnCache(9) - assert.Equal(t, int64(3), rev) - rev = ms.getRevOnCache(10) - assert.Equal(t, int64(4), rev) - rev = ms.getRevOnCache(16) - assert.Equal(t, int64(7), rev) - rev = ms.getRevOnCache(15) - assert.Equal(t, int64(6), rev) - rev = ms.getRevOnCache(12) - assert.Equal(t, int64(5), rev) - rev = ms.getRevOnCache(5) - assert.Equal(t, int64(0), rev) - - ms.putTs(8, 18) - assert.Equal(t, ms.maxPos, 0) - assert.Equal(t, ms.minPos, 1) - for rev = 2; rev <= 7; rev++ { - ts := ms.getRevOnCache(typeutil.Timestamp(rev*2 + 3)) - assert.Equal(t, rev, ts) - } - ms.putTs(9, 20) - assert.Equal(t, ms.maxPos, 1) - assert.Equal(t, ms.minPos, 2) - assert.Equal(t, ms.numTs, 7) - - curMax := ms.maxPos - curMin := ms.minPos - for i := 10; i < 20; i++ { - ms.putTs(int64(i), typeutil.Timestamp(i*2+2)) - curMax++ - curMin++ - if curMax == len(ms.ts2Rev) { - curMax = 0 - } - if curMin == len(ms.ts2Rev) { - curMin = 0 - } - assert.Equal(t, curMax, ms.maxPos) - assert.Equal(t, curMin, ms.minPos) - } - - for i := 13; i < 20; i++ { - rev = ms.getRevOnCache(typeutil.Timestamp(i*2 + 2)) - assert.Equal(t, int64(i), rev) - rev = ms.getRevOnCache(typeutil.Timestamp(i*2 + 3)) - assert.Equal(t, int64(i), rev) - } - rev = ms.getRevOnCache(27) - assert.Zero(t, rev) -} - -func TestGetRevOnEtcd(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel() - rand.Seed(time.Now().UnixNano()) - randVal := rand.Int() - - Params.Init() - rootPath := fmt.Sprintf("/test/meta/%d", randVal) - tsKey := "timestamp" - key := path.Join(rootPath, tsKey) - - etcdCli, err := etcd.GetEtcdClient( - Params.EtcdCfg.UseEmbedEtcd, - Params.EtcdCfg.EtcdUseSSL, - Params.EtcdCfg.Endpoints, - Params.EtcdCfg.EtcdTLSCert, - Params.EtcdCfg.EtcdTLSKey, - Params.EtcdCfg.EtcdTLSCACert, - Params.EtcdCfg.EtcdTLSMinVersion) - assert.Nil(t, err) - defer etcdCli.Close() - - ms := MetaSnapshot{ - cli: etcdCli, - root: rootPath, - tsKey: tsKey, - } - resp, err := etcdCli.Put(ctx, key, "100") - assert.Nil(t, err) - revList := []int64{} - tsList := []typeutil.Timestamp{} - revList = append(revList, resp.Header.Revision) - tsList = append(tsList, 100) - for i := 110; i < 200; i += 10 { - resp, err = etcdCli.Put(ctx, key, fmt.Sprintf("%d", i)) - assert.Nil(t, err) - revList = append(revList, resp.Header.Revision) - tsList = append(tsList, typeutil.Timestamp(i)) - } - lastRev := revList[len(revList)-1] + 1 - for i, ts := range tsList { - rev := ms.getRevOnEtcd(ts, lastRev) - assert.Equal(t, revList[i], rev) - } - for i := 0; i < len(tsList); i++ { - rev := ms.getRevOnEtcd(tsList[i]+5, lastRev) - assert.Equal(t, revList[i], rev) - } - rev := ms.getRevOnEtcd(200, lastRev) - assert.Equal(t, lastRev-1, rev) - rev = ms.getRevOnEtcd(99, lastRev) - assert.Zero(t, rev) -} - -func TestLoad(t *testing.T) { - rand.Seed(time.Now().UnixNano()) - randVal := rand.Int() - - Params.Init() - rootPath := fmt.Sprintf("/test/meta/%d", randVal) - tsKey := "timestamp" - - etcdCli, err := etcd.GetEtcdClient( - Params.EtcdCfg.UseEmbedEtcd, - Params.EtcdCfg.EtcdUseSSL, - Params.EtcdCfg.Endpoints, - Params.EtcdCfg.EtcdTLSCert, - Params.EtcdCfg.EtcdTLSKey, - Params.EtcdCfg.EtcdTLSCACert, - Params.EtcdCfg.EtcdTLSMinVersion) - assert.Nil(t, err) - defer etcdCli.Close() - - var vtso typeutil.Timestamp - ftso := func() typeutil.Timestamp { - return vtso - } - - ms, err := NewMetaSnapshot(etcdCli, rootPath, tsKey, 7) - assert.Nil(t, err) - assert.NotNil(t, ms) - - for i := 0; i < 20; i++ { - vtso = typeutil.Timestamp(100 + i*5) - ts := ftso() - err = ms.Save("key", fmt.Sprintf("value-%d", i), ts) - assert.Nil(t, err) - assert.Equal(t, vtso, ts) - } - for i := 0; i < 20; i++ { - val, err := ms.Load("key", typeutil.Timestamp(100+i*5+2)) - assert.Nil(t, err) - assert.Equal(t, val, fmt.Sprintf("value-%d", i)) - } - val, err := ms.Load("key", 0) - assert.Nil(t, err) - assert.Equal(t, "value-19", val) - - ms, err = NewMetaSnapshot(etcdCli, rootPath, tsKey, 11) - assert.Nil(t, err) - assert.NotNil(t, ms) - - for i := 0; i < 20; i++ { - val, err := ms.Load("key", typeutil.Timestamp(100+i*5+2)) - assert.Nil(t, err) - assert.Equal(t, val, fmt.Sprintf("value-%d", i)) - } -} - -func TestMultiSave(t *testing.T) { - rand.Seed(time.Now().UnixNano()) - randVal := rand.Int() - - Params.Init() - rootPath := fmt.Sprintf("/test/meta/%d", randVal) - tsKey := "timestamp" - - etcdCli, err := etcd.GetEtcdClient( - Params.EtcdCfg.UseEmbedEtcd, - Params.EtcdCfg.EtcdUseSSL, - Params.EtcdCfg.Endpoints, - Params.EtcdCfg.EtcdTLSCert, - Params.EtcdCfg.EtcdTLSKey, - Params.EtcdCfg.EtcdTLSCACert, - Params.EtcdCfg.EtcdTLSMinVersion) - assert.Nil(t, err) - defer etcdCli.Close() - - var vtso typeutil.Timestamp - ftso := func() typeutil.Timestamp { - return vtso - } - - ms, err := NewMetaSnapshot(etcdCli, rootPath, tsKey, 7) - assert.Nil(t, err) - assert.NotNil(t, ms) - - for i := 0; i < 20; i++ { - saves := map[string]string{"k1": fmt.Sprintf("v1-%d", i), "k2": fmt.Sprintf("v2-%d", i)} - vtso = typeutil.Timestamp(100 + i*5) - ts := ftso() - err = ms.MultiSave(saves, ts) - assert.Nil(t, err) - assert.Equal(t, vtso, ts) - } - for i := 0; i < 20; i++ { - keys, vals, err := ms.LoadWithPrefix("k", typeutil.Timestamp(100+i*5+2)) - assert.Nil(t, err) - assert.Equal(t, len(keys), len(vals)) - assert.Equal(t, len(keys), 2) - assert.Equal(t, keys[0], "k1") - assert.Equal(t, keys[1], "k2") - assert.Equal(t, vals[0], fmt.Sprintf("v1-%d", i)) - assert.Equal(t, vals[1], fmt.Sprintf("v2-%d", i)) - } - keys, vals, err := ms.LoadWithPrefix("k", 0) - assert.Nil(t, err) - assert.Equal(t, len(keys), len(vals)) - assert.Equal(t, len(keys), 2) - assert.Equal(t, keys[0], "k1") - assert.Equal(t, keys[1], "k2") - assert.Equal(t, vals[0], "v1-19") - assert.Equal(t, vals[1], "v2-19") - - ms, err = NewMetaSnapshot(etcdCli, rootPath, tsKey, 11) - assert.Nil(t, err) - assert.NotNil(t, ms) - - for i := 0; i < 20; i++ { - keys, vals, err := ms.LoadWithPrefix("k", typeutil.Timestamp(100+i*5+2)) - assert.Nil(t, err) - assert.Equal(t, len(keys), len(vals)) - assert.Equal(t, len(keys), 2) - assert.Equal(t, keys[0], "k1") - assert.Equal(t, keys[1], "k2") - assert.Equal(t, vals[0], fmt.Sprintf("v1-%d", i)) - assert.Equal(t, vals[1], fmt.Sprintf("v2-%d", i)) - } -} - -func TestMultiSaveAndRemoveWithPrefix(t *testing.T) { - rand.Seed(time.Now().UnixNano()) - randVal := rand.Int() - - Params.Init() - rootPath := fmt.Sprintf("/test/meta/%d", randVal) - tsKey := "timestamp" - - etcdCli, err := etcd.GetEtcdClient( - Params.EtcdCfg.UseEmbedEtcd, - Params.EtcdCfg.EtcdUseSSL, - Params.EtcdCfg.Endpoints, - Params.EtcdCfg.EtcdTLSCert, - Params.EtcdCfg.EtcdTLSKey, - Params.EtcdCfg.EtcdTLSCACert, - Params.EtcdCfg.EtcdTLSMinVersion) - assert.Nil(t, err) - defer etcdCli.Close() - - var vtso typeutil.Timestamp - ftso := func() typeutil.Timestamp { - return vtso - } - defer etcdCli.Close() - - ms, err := NewMetaSnapshot(etcdCli, rootPath, tsKey, 7) - assert.Nil(t, err) - assert.NotNil(t, ms) - - for i := 0; i < 20; i++ { - vtso = typeutil.Timestamp(100 + i*5) - ts := ftso() - err = ms.Save(fmt.Sprintf("kd-%04d", i), fmt.Sprintf("value-%d", i), ts) - assert.Nil(t, err) - assert.Equal(t, vtso, ts) - } - for i := 20; i < 40; i++ { - sm := map[string]string{"ks": fmt.Sprintf("value-%d", i)} - dm := []string{fmt.Sprintf("kd-%04d", i-20)} - vtso = typeutil.Timestamp(100 + i*5) - ts := ftso() - err = ms.MultiSaveAndRemoveWithPrefix(sm, dm, ts) - assert.Nil(t, err) - assert.Equal(t, vtso, ts) - } - - for i := 0; i < 20; i++ { - val, err := ms.Load(fmt.Sprintf("kd-%04d", i), typeutil.Timestamp(100+i*5+2)) - assert.Nil(t, err) - assert.Equal(t, fmt.Sprintf("value-%d", i), val) - _, vals, err := ms.LoadWithPrefix("kd-", typeutil.Timestamp(100+i*5+2)) - assert.Nil(t, err) - assert.Equal(t, i+1, len(vals)) - } - for i := 20; i < 40; i++ { - val, err := ms.Load("ks", typeutil.Timestamp(100+i*5+2)) - assert.Nil(t, err) - assert.Equal(t, fmt.Sprintf("value-%d", i), val) - _, vals, err := ms.LoadWithPrefix("kd-", typeutil.Timestamp(100+i*5+2)) - assert.Nil(t, err) - assert.Equal(t, 39-i, len(vals)) - } - - ms, err = NewMetaSnapshot(etcdCli, rootPath, tsKey, 11) - assert.Nil(t, err) - assert.NotNil(t, ms) - - for i := 0; i < 20; i++ { - val, err := ms.Load(fmt.Sprintf("kd-%04d", i), typeutil.Timestamp(100+i*5+2)) - assert.Nil(t, err) - assert.Equal(t, fmt.Sprintf("value-%d", i), val) - _, vals, err := ms.LoadWithPrefix("kd-", typeutil.Timestamp(100+i*5+2)) - assert.Nil(t, err) - assert.Equal(t, i+1, len(vals)) - } - for i := 20; i < 40; i++ { - val, err := ms.Load("ks", typeutil.Timestamp(100+i*5+2)) - assert.Nil(t, err) - assert.Equal(t, fmt.Sprintf("value-%d", i), val) - _, vals, err := ms.LoadWithPrefix("kd-", typeutil.Timestamp(100+i*5+2)) - assert.Nil(t, err) - assert.Equal(t, 39-i, len(vals)) - } -} - -func TestTsBackward(t *testing.T) { - rand.Seed(time.Now().UnixNano()) - randVal := rand.Int() - - Params.Init() - rootPath := fmt.Sprintf("/test/meta/%d", randVal) - tsKey := "timestamp" - - etcdCli, err := etcd.GetEtcdClient( - Params.EtcdCfg.UseEmbedEtcd, - Params.EtcdCfg.EtcdUseSSL, - Params.EtcdCfg.Endpoints, - Params.EtcdCfg.EtcdTLSCert, - Params.EtcdCfg.EtcdTLSKey, - Params.EtcdCfg.EtcdTLSCACert, - Params.EtcdCfg.EtcdTLSMinVersion) - assert.Nil(t, err) - defer etcdCli.Close() - - kv, err := NewMetaSnapshot(etcdCli, rootPath, tsKey, 1024) - assert.Nil(t, err) - - err = kv.loadTs() - assert.Nil(t, err) - - kv.Save("a", "b", 100) - kv.Save("a", "c", 99) // backward - kv.Save("a", "d", 200) - - kv, err = NewMetaSnapshot(etcdCli, rootPath, tsKey, 1024) - assert.Error(t, err) - -} - -func TestFix7150(t *testing.T) { - rand.Seed(time.Now().UnixNano()) - randVal := rand.Int() - - Params.Init() - rootPath := fmt.Sprintf("/test/meta/%d", randVal) - tsKey := "timestamp" - - etcdCli, err := etcd.GetEtcdClient( - Params.EtcdCfg.UseEmbedEtcd, - Params.EtcdCfg.EtcdUseSSL, - Params.EtcdCfg.Endpoints, - Params.EtcdCfg.EtcdTLSCert, - Params.EtcdCfg.EtcdTLSKey, - Params.EtcdCfg.EtcdTLSCACert, - Params.EtcdCfg.EtcdTLSMinVersion) - assert.Nil(t, err) - defer etcdCli.Close() - - kv, err := NewMetaSnapshot(etcdCli, rootPath, tsKey, 1024) - assert.Nil(t, err) - - err = kv.loadTs() - assert.Nil(t, err) - - kv.Save("a", "b", 100) - kv.Save("a", "c", 0) // bug introduced - kv.Save("a", "d", 200) - - kv, err = NewMetaSnapshot(etcdCli, rootPath, tsKey, 1024) - assert.Nil(t, err) - err = kv.loadTs() - assert.Nil(t, err) -} diff --git a/internal/metastore/kv/rootcoord/suffix_snapshot.go b/internal/metastore/kv/rootcoord/suffix_snapshot.go index ddd4bbd1b4..6e4e9c36c3 100644 --- a/internal/metastore/kv/rootcoord/suffix_snapshot.go +++ b/internal/metastore/kv/rootcoord/suffix_snapshot.go @@ -26,13 +26,17 @@ import ( "strconv" "strings" "sync" + "time" "go.uber.org/zap" "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/kv" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/util/etcd" + "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/retry" + "github.com/milvus-io/milvus/internal/util/tsoutil" "github.com/milvus-io/milvus/internal/util/typeutil" ) @@ -80,6 +84,8 @@ type SuffixSnapshot struct { // exp is the shortcut format checker for ts-key // composed with separator only exp *regexp.Regexp + + closeGC chan struct{} } // tsv struct stores kv with timestamp @@ -114,7 +120,10 @@ func NewSuffixSnapshot(metaKV kv.MetaKv, sep, root, snapshot string) (*SuffixSna snapshotLen: snapshotLen, rootPrefix: root, rootLen: rootLen, + closeGC: make(chan struct{}, 1), } + + go ss.startBackgroundGC() return ss, nil } @@ -540,6 +549,35 @@ func (ss *SuffixSnapshot) MultiSaveAndRemoveWithPrefix(saves map[string]string, return err } +func (ss *SuffixSnapshot) Close() { + close(ss.closeGC) +} + +// startBackgroundGC the data will clean up if key ts!=0 and expired +func (ss *SuffixSnapshot) startBackgroundGC() { + log.Debug("suffix snapshot GC goroutine start!") + + var params paramtable.ComponentParam + params.Init() + + ticker := time.NewTicker(60 * time.Minute) + defer ticker.Stop() + + retentionDuration := time.Duration(params.CommonCfg.RetentionDuration) * time.Second + for { + select { + case <-ss.closeGC: + log.Warn("quit suffix snapshot GC goroutine!") + return + case now := <-ticker.C: + err := ss.removeExpiredKvs(now, retentionDuration) + if err != nil { + log.Warn("remove expired data fail during GC", zap.Error(err)) + } + } + } +} + func (ss *SuffixSnapshot) getOriginalKey(snapshotKey string) (string, error) { if !strings.HasPrefix(snapshotKey, ss.snapshotPrefix) { return "", fmt.Errorf("get original key failed, invailed snapshot key:%s", snapshotKey) @@ -552,3 +590,83 @@ func (ss *SuffixSnapshot) getOriginalKey(snapshotKey string) (string, error) { prefix := snapshotKey[:idx] return prefix[ss.snapshotLen:], nil } + +func (ss *SuffixSnapshot) batchRemoveExpiredKvs(keyGroup []string, originalKey string, includeOriginalKey bool) error { + if includeOriginalKey { + keyGroup = append(keyGroup, originalKey) + } + + // to protect txn finished with ascend order, reverse the latest kv with tombstone to tail of array + sort.Strings(keyGroup) + removeFn := func(partialKeys []string) error { + return ss.MetaKv.MultiRemove(keyGroup) + } + return etcd.RemoveByBatch(keyGroup, removeFn) +} + +func (ss *SuffixSnapshot) removeExpiredKvs(now time.Time, retentionDuration time.Duration) error { + keyGroup := make([]string, 0) + latestOriginalKey := "" + latestValue := "" + groupCnt := 0 + + removeFn := func(curOriginalKey string) error { + if !ss.isTombstone(latestValue) { + return nil + } + return ss.batchRemoveExpiredKvs(keyGroup, curOriginalKey, groupCnt == len(keyGroup)) + } + + // walk all kvs with SortAsc, we need walk to the latest key for each key group to check the kv + // whether contains tombstone, then if so, it represents the original key has been removed. + // TODO: walk with Desc + err := ss.MetaKv.WalkWithPrefix(ss.snapshotPrefix, PaginationSize, func(k []byte, v []byte) error { + key := string(k) + value := string(v) + + key = ss.hideRootPrefix(key) + ts, ok := ss.isTSKey(key) + // it is original key if the key doesn't contain ts + if !ok { + log.Warn("skip key because it doesn't contain ts", zap.String("key", key)) + return nil + } + + curOriginalKey, err := ss.getOriginalKey(key) + if err != nil { + return err + } + + // reset if starting look up a new key group + if latestOriginalKey != "" && latestOriginalKey != curOriginalKey { + // it indicates all keys need to remove that the prefix is original key + // it means the latest original kvs has already been removed if the latest kv has tombstone marker. + if err := removeFn(latestOriginalKey); err != nil { + return err + } + + keyGroup = make([]string, 0) + groupCnt = 0 + } + + latestValue = value + groupCnt++ + latestOriginalKey = curOriginalKey + + // record keys if the kv is expired + pts, _ := tsoutil.ParseTS(ts) + expireTime := pts.Add(retentionDuration) + // break loop if it reaches expire time + if expireTime.Before(now) { + keyGroup = append(keyGroup, key) + } + + return nil + }) + + if err != nil { + return err + } + + return removeFn(latestOriginalKey) +} diff --git a/internal/metastore/kv/rootcoord/suffix_snapshot_test.go b/internal/metastore/kv/rootcoord/suffix_snapshot_test.go index 88bec529bf..fe41cce732 100644 --- a/internal/metastore/kv/rootcoord/suffix_snapshot_test.go +++ b/internal/metastore/kv/rootcoord/suffix_snapshot_test.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "math/rand" + "os" "testing" "time" @@ -30,6 +31,8 @@ import ( etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/kv/mocks" "github.com/milvus-io/milvus/internal/util/etcd" + "github.com/milvus-io/milvus/internal/util/paramtable" + "github.com/milvus-io/milvus/internal/util/tsoutil" "github.com/milvus-io/milvus/internal/util/typeutil" ) @@ -37,6 +40,14 @@ var ( snapshotPrefix = "snapshots" ) +var Params paramtable.ComponentParam + +func TestMain(m *testing.M) { + Params.Init() + code := m.Run() + os.Exit(code) +} + func Test_binarySearchRecords(t *testing.T) { type testcase struct { records []tsv @@ -177,6 +188,8 @@ func Test_ComposeIsTsKey(t *testing.T) { sep := "_ts" ss, err := NewSuffixSnapshot((*etcdkv.EtcdKV)(nil), sep, "", snapshotPrefix) require.Nil(t, err) + defer ss.Close() + type testcase struct { key string expected uint64 @@ -215,6 +228,8 @@ func Test_SuffixSnaphotIsTSOfKey(t *testing.T) { sep := "_ts" ss, err := NewSuffixSnapshot((*etcdkv.EtcdKV)(nil), sep, "", snapshotPrefix) require.Nil(t, err) + defer ss.Close() + type testcase struct { key string target string @@ -288,6 +303,7 @@ func Test_SuffixSnapshotLoad(t *testing.T) { ss, err := NewSuffixSnapshot(etcdkv, sep, rootPath, snapshotPrefix) assert.Nil(t, err) assert.NotNil(t, ss) + defer ss.Close() for i := 0; i < 20; i++ { vtso = typeutil.Timestamp(100 + i*5) @@ -306,10 +322,6 @@ func Test_SuffixSnapshotLoad(t *testing.T) { assert.Nil(t, err) assert.Equal(t, "value-19", val) - ss, err = NewSuffixSnapshot(etcdkv, sep, rootPath, snapshotPrefix) - assert.Nil(t, err) - assert.NotNil(t, ss) - for i := 0; i < 20; i++ { val, err := ss.Load("key", typeutil.Timestamp(100+i*5+2)) assert.Nil(t, err) @@ -347,6 +359,7 @@ func Test_SuffixSnapshotMultiSave(t *testing.T) { ss, err := NewSuffixSnapshot(etcdkv, sep, rootPath, snapshotPrefix) assert.Nil(t, err) assert.NotNil(t, ss) + defer ss.Close() for i := 0; i < 20; i++ { saves := map[string]string{"k1": fmt.Sprintf("v1-%d", i), "k2": fmt.Sprintf("v2-%d", i)} @@ -376,9 +389,6 @@ func Test_SuffixSnapshotMultiSave(t *testing.T) { assert.Equal(t, vals[0], "v1-19") assert.Equal(t, vals[1], "v2-19") - ss, err = NewSuffixSnapshot(etcdkv, sep, rootPath, snapshotPrefix) - assert.Nil(t, err) - assert.NotNil(t, ss) for i := 0; i < 20; i++ { keys, vals, err := ss.LoadWithPrefix("k", typeutil.Timestamp(100+i*5+2)) assert.Nil(t, err) @@ -401,6 +411,181 @@ func Test_SuffixSnapshotMultiSave(t *testing.T) { ss.RemoveWithPrefix("") } +func Test_SuffixSnapshotRemoveExpiredKvs(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + randVal := rand.Int() + + Params.Init() + rootPath := fmt.Sprintf("/test/meta/remove-expired-test-%d", randVal) + sep := "_ts" + + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) + assert.NoError(t, err) + defer etcdCli.Close() + etcdkv := etcdkv.NewEtcdKV(etcdCli, rootPath) + assert.NoError(t, err) + defer etcdkv.Close() + + ss, err := NewSuffixSnapshot(etcdkv, sep, rootPath, snapshotPrefix) + assert.NoError(t, err) + assert.NotNil(t, ss) + defer ss.Close() + + saveFn := func(key, value string, ts typeutil.Timestamp) { + err = ss.Save(key, value, ts) + assert.NoError(t, err) + } + + multiSaveFn := func(kvs map[string]string, ts typeutil.Timestamp) { + err = ss.MultiSave(kvs, ts) + assert.NoError(t, err) + } + + now := time.Now() + ftso := func(ts int) typeutil.Timestamp { + return tsoutil.ComposeTS(now.Add(-1*time.Duration(ts)*time.Millisecond).UnixMilli(), 0) + } + + getKey := func(prefix string, id int) string { + return fmt.Sprintf("%s-%d", prefix, id) + } + + generateTestData := func(prefix string, kCnt int, kVersion int, expiredKeyCnt int) { + var value string + cnt := 0 + for i := 0; i < kVersion; i++ { + kvs := make(map[string]string) + ts := ftso((i + 1) * 100) + for v := 0; v < kCnt; v++ { + if i == 0 && v%2 == 0 && cnt < expiredKeyCnt { + value = string(SuffixSnapshotTombstone) + cnt++ + } else { + value = "v" + } + + kvs[getKey(prefix, v)] = value + if v%25 == 0 { + multiSaveFn(kvs, ts) + kvs = make(map[string]string) + } + } + multiSaveFn(kvs, ts) + } + } + + countPrefix := func(prefix string) int { + cnt := 0 + err := etcdkv.WalkWithPrefix("", 10, func(key []byte, value []byte) error { + cnt++ + return nil + }) + assert.NoError(t, err) + return cnt + } + + t.Run("Mixed test ", func(t *testing.T) { + prefix := fmt.Sprintf("prefix%d", rand.Int()) + keyCnt := 500 + keyVersion := 3 + expiredKCnt := 100 + generateTestData(prefix, keyCnt, keyVersion, expiredKCnt) + + cnt := countPrefix(prefix) + assert.Equal(t, keyCnt*keyVersion+keyCnt, cnt) + + err = ss.removeExpiredKvs(now, time.Duration(50)*time.Millisecond) + assert.NoError(t, err) + + cnt = countPrefix(prefix) + assert.Equal(t, keyCnt*keyVersion+keyCnt-(expiredKCnt*keyVersion+expiredKCnt), cnt) + + // clean all data + err := etcdkv.RemoveWithPrefix("") + assert.NoError(t, err) + }) + + t.Run("partial expired and all expired", func(t *testing.T) { + prefix := fmt.Sprintf("prefix%d", rand.Int()) + value := "v" + ts := ftso(100) + saveFn(getKey(prefix, 0), value, ts) + ts = ftso(200) + saveFn(getKey(prefix, 0), value, ts) + ts = ftso(300) + saveFn(getKey(prefix, 0), value, ts) + + // insert partial expired kv + ts = ftso(25) + saveFn(getKey(prefix, 1), string(SuffixSnapshotTombstone), ts) + ts = ftso(50) + saveFn(getKey(prefix, 1), value, ts) + ts = ftso(70) + saveFn(getKey(prefix, 1), value, ts) + + // insert all expired kv + ts = ftso(100) + saveFn(getKey(prefix, 2), string(SuffixSnapshotTombstone), ts) + ts = ftso(200) + saveFn(getKey(prefix, 2), value, ts) + ts = ftso(300) + saveFn(getKey(prefix, 2), value, ts) + + cnt := countPrefix(prefix) + assert.Equal(t, 12, cnt) + + err = ss.removeExpiredKvs(now, time.Duration(50)*time.Millisecond) + assert.NoError(t, err) + + cnt = countPrefix(prefix) + assert.Equal(t, 6, cnt) + + // clean all data + err := etcdkv.RemoveWithPrefix("") + assert.NoError(t, err) + }) + + t.Run("parse ts fail", func(t *testing.T) { + prefix := fmt.Sprintf("prefix%d", rand.Int()) + key := fmt.Sprintf("%s-%s", prefix, "ts_error-ts") + err = etcdkv.Save(ss.composeSnapshotPrefix(key), "") + assert.NoError(t, err) + + err = ss.removeExpiredKvs(now, time.Duration(50)*time.Millisecond) + assert.NoError(t, err) + + cnt := countPrefix(prefix) + assert.Equal(t, 1, cnt) + + // clean all data + err := etcdkv.RemoveWithPrefix("") + assert.NoError(t, err) + }) + + t.Run("test walk kv data fail", func(t *testing.T) { + sep := "_ts" + rootPath := "root/" + kv := mocks.NewMetaKv(t) + kv.EXPECT(). + WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything). + Return(errors.New("error")) + + ss, err := NewSuffixSnapshot(kv, sep, rootPath, snapshotPrefix) + assert.NotNil(t, ss) + assert.NoError(t, err) + + err = ss.removeExpiredKvs(time.Now(), time.Duration(100)) + assert.Error(t, err) + }) +} + func Test_SuffixSnapshotMultiSaveAndRemoveWithPrefix(t *testing.T) { rand.Seed(time.Now().UnixNano()) randVal := rand.Int() @@ -431,6 +616,7 @@ func Test_SuffixSnapshotMultiSaveAndRemoveWithPrefix(t *testing.T) { ss, err := NewSuffixSnapshot(etcdkv, sep, rootPath, snapshotPrefix) assert.Nil(t, err) assert.NotNil(t, ss) + defer ss.Close() for i := 0; i < 20; i++ { vtso = typeutil.Timestamp(100 + i*5) @@ -465,10 +651,6 @@ func Test_SuffixSnapshotMultiSaveAndRemoveWithPrefix(t *testing.T) { assert.Equal(t, 39-i, len(vals)) } - ss, err = NewSuffixSnapshot(etcdkv, sep, rootPath, snapshotPrefix) - assert.Nil(t, err) - assert.NotNil(t, ss) - for i := 0; i < 20; i++ { val, err := ss.Load(fmt.Sprintf("kd-%04d", i), typeutil.Timestamp(100+i*5+2)) assert.Nil(t, err)