mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
enhance: [2.4] keep the latest snapshot key if the origin key is existed (#38351)
/kind improvement - pr: #38333 Signed-off-by: SimFG <bang.fu@zilliz.com>
This commit is contained in:
parent
f4696a1993
commit
fc4a0b1463
@ -84,24 +84,27 @@ func (kv *etcdKV) WalkWithPrefix(prefix string, paginationSize int, fn func([]by
|
||||
|
||||
key := prefix
|
||||
for {
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), kv.requestTimeout)
|
||||
defer cancel()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), kv.requestTimeout)
|
||||
resp, err := kv.getEtcdMeta(ctx, key, opts...)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return err
|
||||
}
|
||||
|
||||
for _, kv := range resp.Kvs {
|
||||
if err = fn(kv.Key, kv.Value); err != nil {
|
||||
cancel()
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if !resp.More {
|
||||
cancel()
|
||||
break
|
||||
}
|
||||
// move to next key
|
||||
key = string(append(resp.Kvs[len(resp.Kvs)-1].Key, 0))
|
||||
cancel()
|
||||
}
|
||||
|
||||
CheckElapseAndWarn(start, "Slow etcd operation(WalkWithPagination)", zap.String("prefix", prefix))
|
||||
|
||||
@ -596,6 +596,10 @@ func (ss *SuffixSnapshot) batchRemoveExpiredKvs(keyGroup []string, originalKey s
|
||||
|
||||
// to protect txn finished with ascend order, reverse the latest kv with tombstone to tail of array
|
||||
sort.Strings(keyGroup)
|
||||
if !includeOriginalKey && len(keyGroup) > 0 {
|
||||
// keep the latest snapshot key for historical version compatibility
|
||||
keyGroup = keyGroup[0 : len(keyGroup)-1]
|
||||
}
|
||||
removeFn := func(partialKeys []string) error {
|
||||
return ss.MetaKv.MultiRemove(partialKeys)
|
||||
}
|
||||
|
||||
@ -20,6 +20,8 @@ import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"os"
|
||||
"path"
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -485,6 +487,15 @@ func Test_SuffixSnapshotRemoveExpiredKvs(t *testing.T) {
|
||||
return cnt
|
||||
}
|
||||
|
||||
getPrefix := func(prefix string) []string {
|
||||
var res []string
|
||||
_ = etcdkv.WalkWithPrefix("", 10, func(key []byte, value []byte) error {
|
||||
res = append(res, string(key))
|
||||
return nil
|
||||
})
|
||||
return res
|
||||
}
|
||||
|
||||
t.Run("Mixed test ", func(t *testing.T) {
|
||||
prefix := fmt.Sprintf("prefix%d", rand.Int())
|
||||
keyCnt := 500
|
||||
@ -581,7 +592,12 @@ func Test_SuffixSnapshotRemoveExpiredKvs(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
|
||||
cnt = countPrefix(prefix)
|
||||
assert.Equal(t, 1, cnt)
|
||||
assert.Equal(t, 2, cnt)
|
||||
res := getPrefix(prefix)
|
||||
sort.Strings(res)
|
||||
keepKey := getKey(prefix, 0)
|
||||
keepTs := ftso(100)
|
||||
assert.Equal(t, []string{path.Join(rootPath, keepKey), path.Join(rootPath, ss.composeTSKey(keepKey, keepTs))}, res)
|
||||
|
||||
// clean all data
|
||||
err := etcdkv.RemoveWithPrefix("")
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user