From 6d53a99171a01a9ab813d2a35f322dbe8076f2bc Mon Sep 17 00:00:00 2001 From: jaime Date: Fri, 6 Jan 2023 10:11:36 +0800 Subject: [PATCH] Improve LoadWithPrefix performance for SuffixSnapshot (#21524) Signed-off-by: jaime --- .../metastore/kv/rootcoord/suffix_snapshot.go | 96 +++++++++---------- .../kv/rootcoord/suffix_snapshot_test.go | 68 ++++++++++++- 2 files changed, 110 insertions(+), 54 deletions(-) diff --git a/internal/metastore/kv/rootcoord/suffix_snapshot.go b/internal/metastore/kv/rootcoord/suffix_snapshot.go index 4ef8fa59ab..34e5ff52ed 100644 --- a/internal/metastore/kv/rootcoord/suffix_snapshot.go +++ b/internal/metastore/kv/rootcoord/suffix_snapshot.go @@ -451,61 +451,55 @@ func (ss *SuffixSnapshot) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]s ss.Lock() defer ss.Unlock() - keys, values, err := ss.MetaKv.LoadWithPrefix(key) + resultKeys := make([]string, 0) + resultValues := make([]string, 0) + + latestOriginalKey := "" + tValueGroups := make([]tsv, 0) + + prefix := path.Join(ss.snapshotPrefix, key) + appendResultFn := func(ts typeutil.Timestamp) { + value, ok := binarySearchRecords(tValueGroups, ts) + if !ok || ss.isTombstone(value) { + return + } + + resultKeys = append(resultKeys, latestOriginalKey) + resultValues = append(resultValues, value) + } + + err := ss.MetaKv.WalkWithPrefix(prefix, PaginationSize, func(k []byte, v []byte) error { + sKey := string(k) + sValue := string(v) + + snapshotKey := ss.hideRootPrefix(sKey) + curOriginalKey, err := ss.getOriginalKey(snapshotKey) + if err != nil { + return err + } + + // reset if starting look up a new key group + if latestOriginalKey != "" && latestOriginalKey != curOriginalKey { + appendResultFn(ts) + tValueGroups = make([]tsv, 0) + } + + targetTs, ok := ss.isTSKey(snapshotKey) + if !ok { + log.Warn("skip key because it doesn't contain ts", zap.String("key", key)) + return nil + } + + tValueGroups = append(tValueGroups, tsv{value: sValue, ts: targetTs}) + latestOriginalKey = curOriginalKey + return nil + }) + if err != nil { return nil, nil, err } - // kv group stands for - type kvgroup struct { - key, value string - processed bool - tsRecords []tsv - } - groups := make([]kvgroup, 0, len(keys)) - for i, key := range keys { - group := kvgroup{key: key, value: values[i]} - // load prefix keys contains rootPrefix - sKeys, sValues, err := ss.MetaKv.LoadWithPrefix(ss.composeSnapshotPrefix(ss.hideRootPrefix(key))) - if err != nil { - return nil, nil, err - } - group.tsRecords = make([]tsv, 0, len(sKeys)) - for j, sKey := range sKeys { - ts, ok := ss.isTSOfKey(ss.hideRootPrefix(sKey), ss.hideRootPrefix(key)) - if ok { - group.tsRecords = append(group.tsRecords, tsv{ts: ts, value: sValues[j]}) - } - } - groups = append(groups, group) - } - - resultKeys := make([]string, 0, len(groups)) - resultValues := make([]string, 0, len(groups)) - // for each group, do ts travel logic if appliable - for _, group := range groups { - if len(group.tsRecords) == 0 { - // not ts maybe, just use k,v - resultKeys = append(resultKeys, group.key) - resultValues = append(resultValues, group.value) - continue - } - value, ok := binarySearchRecords(group.tsRecords, ts) - if ok { - // tombstone found, skip entry - if ss.isTombstone(value) { - continue - } - resultKeys = append(resultKeys, group.key) - resultValues = append(resultValues, value) - } - } - - // hide rootPrefix from return value - for i, k := range resultKeys { - resultKeys[i] = ss.hideRootPrefix(k) - } - + appendResultFn(ts) return resultKeys, resultValues, nil } diff --git a/internal/metastore/kv/rootcoord/suffix_snapshot_test.go b/internal/metastore/kv/rootcoord/suffix_snapshot_test.go index 69cb434630..cccbc83eb9 100644 --- a/internal/metastore/kv/rootcoord/suffix_snapshot_test.go +++ b/internal/metastore/kv/rootcoord/suffix_snapshot_test.go @@ -403,9 +403,9 @@ func Test_SuffixSnapshotMultiSave(t *testing.T) { keys, vals, err = ss.LoadWithPrefix("k", typeutil.Timestamp(300)) assert.Nil(t, err) assert.Equal(t, len(keys), len(vals)) - assert.Equal(t, len(keys), 3) - assert.ElementsMatch(t, keys, []string{"k1", "k2", "kextra"}) - assert.ElementsMatch(t, vals, []string{"v1-19", "v2-19", "extra-value"}) + assert.Equal(t, len(keys), 2) + assert.ElementsMatch(t, keys, []string{"k1", "k2"}) + assert.ElementsMatch(t, vals, []string{"v1-19", "v2-19"}) // clean up ss.RemoveWithPrefix("") @@ -679,6 +679,68 @@ func Test_SuffixSnapshotMultiSaveAndRemoveWithPrefix(t *testing.T) { ss.MultiSaveAndRemoveWithPrefix(map[string]string{}, []string{""}, 0) } +func TestSuffixSnapshot_LoadWithPrefix(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + randVal := rand.Int() + + Params.Init() + rootPath := fmt.Sprintf("/test/meta/loadWithPrefix-test-%d", randVal) + sep := "_ts" + + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd.GetAsBool(), + Params.EtcdCfg.EtcdUseSSL.GetAsBool(), + Params.EtcdCfg.Endpoints.GetAsStrings(), + Params.EtcdCfg.EtcdTLSCert.GetValue(), + Params.EtcdCfg.EtcdTLSKey.GetValue(), + Params.EtcdCfg.EtcdTLSCACert.GetValue(), + Params.EtcdCfg.EtcdTLSMinVersion.GetValue()) + assert.NoError(t, err) + defer etcdCli.Close() + etcdkv := etcdkv.NewEtcdKV(etcdCli, rootPath) + assert.NoError(t, err) + defer etcdkv.Close() + + ss, err := NewSuffixSnapshot(etcdkv, sep, rootPath, snapshotPrefix) + assert.NoError(t, err) + assert.NotNil(t, ss) + defer ss.Close() + + t.Run("parse ts fail", func(t *testing.T) { + prefix := fmt.Sprintf("prefix%d", rand.Int()) + key := fmt.Sprintf("%s-%s", prefix, "ts_error-ts") + err = etcdkv.Save(ss.composeSnapshotPrefix(key), "") + assert.NoError(t, err) + + keys, values, err := ss.LoadWithPrefix(prefix, 100) + assert.NoError(t, err) + assert.Equal(t, 0, len(keys)) + assert.Equal(t, 0, len(values)) + + // clean all data + err = etcdkv.RemoveWithPrefix("") + assert.NoError(t, err) + }) + + t.Run("test walk kv data fail", func(t *testing.T) { + sep := "_ts" + rootPath := "root/" + kv := mocks.NewMetaKv(t) + kv.EXPECT(). + WalkWithPrefix(mock.Anything, mock.Anything, mock.Anything). + Return(errors.New("error")) + + ss, err := NewSuffixSnapshot(kv, sep, rootPath, snapshotPrefix) + assert.NotNil(t, ss) + assert.NoError(t, err) + + keys, values, err := ss.LoadWithPrefix("t", 100) + assert.Error(t, err) + assert.Nil(t, keys) + assert.Nil(t, values) + }) +} + func Test_getOriginalKey(t *testing.T) { sep := "_ts" rootPath := "root/"