mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
enhance: gc in the snapshot kv (#36792)
issue: #36770 Signed-off-by: SimFG <bang.fu@zilliz.com>
This commit is contained in:
parent
7acf724185
commit
03a78ecc3d
@ -20,7 +20,6 @@ import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"path"
|
||||
"regexp"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
@ -43,8 +42,10 @@ import (
|
||||
|
||||
var (
|
||||
// SuffixSnapshotTombstone special value for tombstone mark
|
||||
SuffixSnapshotTombstone = []byte{0xE2, 0x9B, 0xBC}
|
||||
PaginationSize = 5000
|
||||
SuffixSnapshotTombstone = []byte{0xE2, 0x9B, 0xBC}
|
||||
PaginationSize = 5000
|
||||
DefaultSnapshotReserveTime = 1 * time.Hour
|
||||
DefaultSnapshotTTL = 24 * time.Hour
|
||||
)
|
||||
|
||||
// IsTombstone used in migration tool also.
|
||||
@ -82,9 +83,6 @@ type SuffixSnapshot struct {
|
||||
snapshotPrefix string
|
||||
// snapshotLen pre calculated offset when parsing snapshot key
|
||||
snapshotLen int
|
||||
// exp is the shortcut format checker for ts-key
|
||||
// composed with separator only
|
||||
exp *regexp.Regexp
|
||||
|
||||
closeGC chan struct{}
|
||||
}
|
||||
@ -116,7 +114,6 @@ func NewSuffixSnapshot(metaKV kv.MetaKv, sep, root, snapshot string) (*SuffixSna
|
||||
MetaKv: metaKV,
|
||||
lastestTS: make(map[string]typeutil.Timestamp),
|
||||
separator: sep,
|
||||
exp: regexp.MustCompile(fmt.Sprintf(`^(.+)%s(\d+)$`, sep)),
|
||||
snapshotPrefix: snapshot,
|
||||
snapshotLen: snapshotLen,
|
||||
rootPrefix: root,
|
||||
@ -164,12 +161,14 @@ func (ss *SuffixSnapshot) isTSKey(key string) (typeutil.Timestamp, bool) {
|
||||
return 0, false
|
||||
}
|
||||
key = key[ss.snapshotLen:]
|
||||
matches := ss.exp.FindStringSubmatch(key)
|
||||
if len(matches) < 3 {
|
||||
idx := strings.LastIndex(key, ss.separator)
|
||||
if idx == -1 {
|
||||
return 0, false
|
||||
}
|
||||
ts, err := strconv.ParseUint(key[idx+len(ss.separator):], 10, 64)
|
||||
if err != nil {
|
||||
return 0, false
|
||||
}
|
||||
// err ignores since it's protected by the regexp
|
||||
ts, _ := strconv.ParseUint(matches[2], 10, 64)
|
||||
return ts, true
|
||||
}
|
||||
|
||||
@ -182,15 +181,17 @@ func (ss *SuffixSnapshot) isTSOfKey(key string, groupKey string) (typeutil.Times
|
||||
}
|
||||
key = key[ss.snapshotLen:]
|
||||
|
||||
matches := ss.exp.FindStringSubmatch(key)
|
||||
if len(matches) < 3 {
|
||||
idx := strings.LastIndex(key, ss.separator)
|
||||
if idx == -1 {
|
||||
return 0, false
|
||||
}
|
||||
if matches[1] != groupKey {
|
||||
if key[:idx] != groupKey {
|
||||
return 0, false
|
||||
}
|
||||
ts, err := strconv.ParseUint(key[idx+len(ss.separator):], 10, 64)
|
||||
if err != nil {
|
||||
return 0, false
|
||||
}
|
||||
// err ignores since it's protected by the regexp
|
||||
ts, _ := strconv.ParseUint(matches[2], 10, 64)
|
||||
return ts, true
|
||||
}
|
||||
|
||||
@ -270,7 +271,7 @@ func binarySearchRecords(records []tsv, ts typeutil.Timestamp) (string, bool) {
|
||||
}
|
||||
|
||||
// Save stores key-value pairs with timestamp
|
||||
// if ts is 0, SuffixSnapshot works as a MetaKv
|
||||
// if ts == 0, SuffixSnapshot works as a MetaKv
|
||||
// otherwise, SuffixSnapshot will store a ts-key as "key[sep]ts"-value pair in snapshot path
|
||||
// and for acceleration store original key-value if ts is the latest
|
||||
func (ss *SuffixSnapshot) Save(key string, value string, ts typeutil.Timestamp) error {
|
||||
@ -648,67 +649,82 @@ func (ss *SuffixSnapshot) batchRemoveExpiredKvs(keyGroup []string, originalKey s
|
||||
return etcd.RemoveByBatchWithLimit(keyGroup, util.MaxEtcdTxnNum, removeFn)
|
||||
}
|
||||
|
||||
// removeExpiredKvs removes expired key-value pairs from the snapshot
|
||||
// It walks through all keys with the snapshot prefix, groups them by original key,
|
||||
// and removes expired versions or all versions if the original key has been deleted
|
||||
func (ss *SuffixSnapshot) removeExpiredKvs(now time.Time) error {
|
||||
keyGroup := make([]string, 0)
|
||||
candidateExpiredKeys := make([]string, 0)
|
||||
latestOriginalKey := ""
|
||||
latestValue := ""
|
||||
groupCnt := 0
|
||||
latestOriginValue := ""
|
||||
totalVersions := 0
|
||||
|
||||
removeFn := func(curOriginalKey string) error {
|
||||
if !ss.isTombstone(latestValue) {
|
||||
// cleanFn processes a group of keys for a single original key
|
||||
cleanFn := func(curOriginalKey string) error {
|
||||
if curOriginalKey == "" {
|
||||
return nil
|
||||
}
|
||||
return ss.batchRemoveExpiredKvs(keyGroup, curOriginalKey, groupCnt == len(keyGroup))
|
||||
if ss.isTombstone(latestOriginValue) {
|
||||
// If deleted, remove all versions including the original key
|
||||
return ss.batchRemoveExpiredKvs(candidateExpiredKeys, curOriginalKey, totalVersions == len(candidateExpiredKeys))
|
||||
}
|
||||
|
||||
// If not deleted, check for expired versions
|
||||
expiredKeys := make([]string, 0)
|
||||
for _, key := range candidateExpiredKeys {
|
||||
ts, _ := ss.isTSKey(key)
|
||||
expireTime, _ := tsoutil.ParseTS(ts)
|
||||
if expireTime.Add(DefaultSnapshotTTL).Before(now) {
|
||||
expiredKeys = append(expiredKeys, key)
|
||||
}
|
||||
}
|
||||
if len(expiredKeys) > 0 {
|
||||
return ss.batchRemoveExpiredKvs(expiredKeys, curOriginalKey, false)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// walk all kvs with SortAsc, we need walk to the latest key for each key group to check the kv
|
||||
// whether contains tombstone, then if so, it represents the original key has been removed.
|
||||
// TODO: walk with Desc
|
||||
// Walk through all keys with the snapshot prefix
|
||||
err := ss.MetaKv.WalkWithPrefix(ss.snapshotPrefix, PaginationSize, func(k []byte, v []byte) error {
|
||||
key := string(k)
|
||||
value := string(v)
|
||||
|
||||
key = ss.hideRootPrefix(key)
|
||||
key := ss.hideRootPrefix(string(k))
|
||||
ts, ok := ss.isTSKey(key)
|
||||
// it is original key if the key doesn't contain ts
|
||||
if !ok {
|
||||
log.Warn("skip key because it doesn't contain ts", zap.String("key", key))
|
||||
log.Warn("Skip key because it doesn't contain ts", zap.String("key", key))
|
||||
return nil
|
||||
}
|
||||
|
||||
curOriginalKey, err := ss.getOriginalKey(key)
|
||||
if err != nil {
|
||||
log.Error("Failed to parse the original key for GC", zap.String("key", key), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
// reset if starting look up a new key group
|
||||
// If we've moved to a new original key, process the previous group
|
||||
if latestOriginalKey != "" && latestOriginalKey != curOriginalKey {
|
||||
// it indicates all keys need to remove that the prefix is original key
|
||||
// it means the latest original kvs has already been removed if the latest kv has tombstone marker.
|
||||
if err := removeFn(latestOriginalKey); err != nil {
|
||||
if err := cleanFn(latestOriginalKey); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
keyGroup = make([]string, 0)
|
||||
groupCnt = 0
|
||||
candidateExpiredKeys = make([]string, 0)
|
||||
totalVersions = 0
|
||||
}
|
||||
|
||||
latestValue = value
|
||||
groupCnt++
|
||||
latestOriginalKey = curOriginalKey
|
||||
latestOriginValue = string(v)
|
||||
totalVersions++
|
||||
|
||||
// record keys if the kv is expired
|
||||
expireTime, _ := tsoutil.ParseTS(ts)
|
||||
// break loop if it reaches expire time
|
||||
if expireTime.Before(now) {
|
||||
keyGroup = append(keyGroup, key)
|
||||
// Record versions that are already expired but not removed
|
||||
time, _ := tsoutil.ParseTS(ts)
|
||||
if time.Add(DefaultSnapshotReserveTime).Before(now) {
|
||||
candidateExpiredKeys = append(candidateExpiredKeys, key)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
log.Error("Error occurred during WalkWithPrefix", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
return removeFn(latestOriginalKey)
|
||||
// Process the last group of keys
|
||||
return cleanFn(latestOriginalKey)
|
||||
}
|
||||
|
||||
@ -444,7 +444,7 @@ func Test_SuffixSnapshotRemoveExpiredKvs(t *testing.T) {
|
||||
|
||||
now := time.Now()
|
||||
ftso := func(ts int) typeutil.Timestamp {
|
||||
return tsoutil.ComposeTS(now.Add(-1*time.Duration(ts)*time.Millisecond).UnixMilli(), 0)
|
||||
return tsoutil.ComposeTS(now.Add(-1*time.Duration(ts)*time.Hour).UnixMilli(), 0)
|
||||
}
|
||||
|
||||
getKey := func(prefix string, id int) string {
|
||||
@ -456,7 +456,7 @@ func Test_SuffixSnapshotRemoveExpiredKvs(t *testing.T) {
|
||||
cnt := 0
|
||||
for i := 0; i < kVersion; i++ {
|
||||
kvs := make(map[string]string)
|
||||
ts := ftso((i + 1) * 100)
|
||||
ts := ftso((i + 1) * 2)
|
||||
for v := 0; v < kCnt; v++ {
|
||||
if i == 0 && v%2 == 0 && cnt < expiredKeyCnt {
|
||||
value = string(SuffixSnapshotTombstone)
|
||||
@ -509,27 +509,27 @@ func Test_SuffixSnapshotRemoveExpiredKvs(t *testing.T) {
|
||||
t.Run("partial expired and all expired", func(t *testing.T) {
|
||||
prefix := fmt.Sprintf("prefix%d", rand.Int())
|
||||
value := "v"
|
||||
ts := ftso(100)
|
||||
ts := ftso(1)
|
||||
saveFn(getKey(prefix, 0), value, ts)
|
||||
ts = ftso(200)
|
||||
ts = ftso(2)
|
||||
saveFn(getKey(prefix, 0), value, ts)
|
||||
ts = ftso(300)
|
||||
ts = ftso(3)
|
||||
saveFn(getKey(prefix, 0), value, ts)
|
||||
|
||||
// insert partial expired kv
|
||||
ts = ftso(25)
|
||||
ts = ftso(2)
|
||||
saveFn(getKey(prefix, 1), string(SuffixSnapshotTombstone), ts)
|
||||
ts = ftso(50)
|
||||
ts = ftso(4)
|
||||
saveFn(getKey(prefix, 1), value, ts)
|
||||
ts = ftso(70)
|
||||
ts = ftso(6)
|
||||
saveFn(getKey(prefix, 1), value, ts)
|
||||
|
||||
// insert all expired kv
|
||||
ts = ftso(100)
|
||||
ts = ftso(1)
|
||||
saveFn(getKey(prefix, 2), string(SuffixSnapshotTombstone), ts)
|
||||
ts = ftso(200)
|
||||
ts = ftso(2)
|
||||
saveFn(getKey(prefix, 2), value, ts)
|
||||
ts = ftso(300)
|
||||
ts = ftso(3)
|
||||
saveFn(getKey(prefix, 2), value, ts)
|
||||
|
||||
cnt := countPrefix(prefix)
|
||||
@ -547,6 +547,47 @@ func Test_SuffixSnapshotRemoveExpiredKvs(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("partial 24 expired and all expired", func(t *testing.T) {
|
||||
prefix := fmt.Sprintf("prefix%d", rand.Int())
|
||||
value := "v"
|
||||
ts := ftso(100)
|
||||
saveFn(getKey(prefix, 0), value, ts)
|
||||
ts = ftso(200)
|
||||
saveFn(getKey(prefix, 0), value, ts)
|
||||
ts = ftso(300)
|
||||
saveFn(getKey(prefix, 0), value, ts)
|
||||
|
||||
// insert partial expired kv
|
||||
ts = ftso(2)
|
||||
saveFn(getKey(prefix, 1), string(SuffixSnapshotTombstone), ts)
|
||||
ts = ftso(4)
|
||||
saveFn(getKey(prefix, 1), value, ts)
|
||||
ts = ftso(6)
|
||||
saveFn(getKey(prefix, 1), value, ts)
|
||||
|
||||
// insert all expired kv
|
||||
ts = ftso(1)
|
||||
saveFn(getKey(prefix, 2), string(SuffixSnapshotTombstone), ts)
|
||||
ts = ftso(2)
|
||||
saveFn(getKey(prefix, 2), value, ts)
|
||||
ts = ftso(3)
|
||||
saveFn(getKey(prefix, 2), value, ts)
|
||||
|
||||
cnt := countPrefix(prefix)
|
||||
assert.Equal(t, 12, cnt)
|
||||
|
||||
// err = ss.removeExpiredKvs(now, time.Duration(50)*time.Millisecond)
|
||||
err = ss.removeExpiredKvs(now)
|
||||
assert.NoError(t, err)
|
||||
|
||||
cnt = countPrefix(prefix)
|
||||
assert.Equal(t, 1, cnt)
|
||||
|
||||
// clean all data
|
||||
err := etcdkv.RemoveWithPrefix("")
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("parse ts fail", func(t *testing.T) {
|
||||
prefix := fmt.Sprintf("prefix%d", rand.Int())
|
||||
key := fmt.Sprintf("%s-%s", prefix, "ts_error-ts")
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user