diff --git a/internal/kv/etcd/etcd_kv.go b/internal/kv/etcd/etcd_kv.go index 5003d0fae4..0460236827 100644 --- a/internal/kv/etcd/etcd_kv.go +++ b/internal/kv/etcd/etcd_kv.go @@ -84,24 +84,27 @@ func (kv *etcdKV) WalkWithPrefix(prefix string, paginationSize int, fn func([]by key := prefix for { - ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout) - defer cancel() + ctx, cancel := context.WithTimeout(context.Background(), kv.requestTimeout) resp, err := kv.getEtcdMeta(ctx, key, opts...) if err != nil { + cancel() return err } for _, kv := range resp.Kvs { if err = fn(kv.Key, kv.Value); err != nil { + cancel() return err } } if !resp.More { + cancel() break } // move to next key key = string(append(resp.Kvs[len(resp.Kvs)-1].Key, 0)) + cancel() } CheckElapseAndWarn(start, "Slow etcd operation(WalkWithPagination)", zap.String("prefix", prefix)) diff --git a/internal/metastore/kv/rootcoord/suffix_snapshot.go b/internal/metastore/kv/rootcoord/suffix_snapshot.go index 75b9c002d8..1fbe1c3648 100644 --- a/internal/metastore/kv/rootcoord/suffix_snapshot.go +++ b/internal/metastore/kv/rootcoord/suffix_snapshot.go @@ -596,6 +596,10 @@ func (ss *SuffixSnapshot) batchRemoveExpiredKvs(keyGroup []string, originalKey s // to protect txn finished with ascend order, reverse the latest kv with tombstone to tail of array sort.Strings(keyGroup) + if !includeOriginalKey && len(keyGroup) > 0 { + // keep the latest snapshot key for historical version compatibility + keyGroup = keyGroup[0 : len(keyGroup)-1] + } removeFn := func(partialKeys []string) error { return ss.MetaKv.MultiRemove(partialKeys) } diff --git a/internal/metastore/kv/rootcoord/suffix_snapshot_test.go b/internal/metastore/kv/rootcoord/suffix_snapshot_test.go index a10e3fd6a6..9bac252523 100644 --- a/internal/metastore/kv/rootcoord/suffix_snapshot_test.go +++ b/internal/metastore/kv/rootcoord/suffix_snapshot_test.go @@ -20,6 +20,8 @@ import ( "fmt" "math/rand" "os" + "path" + "sort" "testing" "time" @@ -485,6 +487,15 @@ func Test_SuffixSnapshotRemoveExpiredKvs(t *testing.T) { return cnt } + getPrefix := func(prefix string) []string { + var res []string + _ = etcdkv.WalkWithPrefix("", 10, func(key []byte, value []byte) error { + res = append(res, string(key)) + return nil + }) + return res + } + t.Run("Mixed test ", func(t *testing.T) { prefix := fmt.Sprintf("prefix%d", rand.Int()) keyCnt := 500 @@ -581,7 +592,12 @@ func Test_SuffixSnapshotRemoveExpiredKvs(t *testing.T) { assert.NoError(t, err) cnt = countPrefix(prefix) - assert.Equal(t, 1, cnt) + assert.Equal(t, 2, cnt) + res := getPrefix(prefix) + sort.Strings(res) + keepKey := getKey(prefix, 0) + keepTs := ftso(100) + assert.Equal(t, []string{path.Join(rootPath, keepKey), path.Join(rootPath, ss.composeTSKey(keepKey, keepTs))}, res) // clean all data err := etcdkv.RemoveWithPrefix("")