diff --git a/internal/metastore/kv/rootcoord/suffix_snapshot.go b/internal/metastore/kv/rootcoord/suffix_snapshot.go index c0203c9da2..7fb55f3283 100644 --- a/internal/metastore/kv/rootcoord/suffix_snapshot.go +++ b/internal/metastore/kv/rootcoord/suffix_snapshot.go @@ -20,7 +20,6 @@ import ( "bytes" "fmt" "path" - "regexp" "sort" "strconv" "strings" @@ -43,8 +42,10 @@ import ( var ( // SuffixSnapshotTombstone special value for tombstone mark - SuffixSnapshotTombstone = []byte{0xE2, 0x9B, 0xBC} - PaginationSize = 5000 + SuffixSnapshotTombstone = []byte{0xE2, 0x9B, 0xBC} + PaginationSize = 5000 + DefaultSnapshotReserveTime = 1 * time.Hour + DefaultSnapshotTTL = 24 * time.Hour ) // IsTombstone used in migration tool also. @@ -82,9 +83,6 @@ type SuffixSnapshot struct { snapshotPrefix string // snapshotLen pre calculated offset when parsing snapshot key snapshotLen int - // exp is the shortcut format checker for ts-key - // composed with separator only - exp *regexp.Regexp closeGC chan struct{} } @@ -116,7 +114,6 @@ func NewSuffixSnapshot(metaKV kv.MetaKv, sep, root, snapshot string) (*SuffixSna MetaKv: metaKV, lastestTS: make(map[string]typeutil.Timestamp), separator: sep, - exp: regexp.MustCompile(fmt.Sprintf(`^(.+)%s(\d+)$`, sep)), snapshotPrefix: snapshot, snapshotLen: snapshotLen, rootPrefix: root, @@ -164,12 +161,14 @@ func (ss *SuffixSnapshot) isTSKey(key string) (typeutil.Timestamp, bool) { return 0, false } key = key[ss.snapshotLen:] - matches := ss.exp.FindStringSubmatch(key) - if len(matches) < 3 { + idx := strings.LastIndex(key, ss.separator) + if idx == -1 { + return 0, false + } + ts, err := strconv.ParseUint(key[idx+len(ss.separator):], 10, 64) + if err != nil { return 0, false } - // err ignores since it's protected by the regexp - ts, _ := strconv.ParseUint(matches[2], 10, 64) return ts, true } @@ -182,15 +181,17 @@ func (ss *SuffixSnapshot) isTSOfKey(key string, groupKey string) (typeutil.Times } key = key[ss.snapshotLen:] - matches := ss.exp.FindStringSubmatch(key) - if len(matches) < 3 { + idx := strings.LastIndex(key, ss.separator) + if idx == -1 { return 0, false } - if matches[1] != groupKey { + if key[:idx] != groupKey { + return 0, false + } + ts, err := strconv.ParseUint(key[idx+len(ss.separator):], 10, 64) + if err != nil { return 0, false } - // err ignores since it's protected by the regexp - ts, _ := strconv.ParseUint(matches[2], 10, 64) return ts, true } @@ -270,7 +271,7 @@ func binarySearchRecords(records []tsv, ts typeutil.Timestamp) (string, bool) { } // Save stores key-value pairs with timestamp -// if ts is 0, SuffixSnapshot works as a MetaKv +// if ts == 0, SuffixSnapshot works as a MetaKv // otherwise, SuffixSnapshot will store a ts-key as "key[sep]ts"-value pair in snapshot path // and for acceleration store original key-value if ts is the latest func (ss *SuffixSnapshot) Save(key string, value string, ts typeutil.Timestamp) error { @@ -648,67 +649,82 @@ func (ss *SuffixSnapshot) batchRemoveExpiredKvs(keyGroup []string, originalKey s return etcd.RemoveByBatchWithLimit(keyGroup, util.MaxEtcdTxnNum, removeFn) } +// removeExpiredKvs removes expired key-value pairs from the snapshot +// It walks through all keys with the snapshot prefix, groups them by original key, +// and removes expired versions or all versions if the original key has been deleted func (ss *SuffixSnapshot) removeExpiredKvs(now time.Time) error { - keyGroup := make([]string, 0) + candidateExpiredKeys := make([]string, 0) latestOriginalKey := "" - latestValue := "" - groupCnt := 0 + latestOriginValue := "" + totalVersions := 0 - removeFn := func(curOriginalKey string) error { - if !ss.isTombstone(latestValue) { + // cleanFn processes a group of keys for a single original key + cleanFn := func(curOriginalKey string) error { + if curOriginalKey == "" { return nil } - return ss.batchRemoveExpiredKvs(keyGroup, curOriginalKey, groupCnt == len(keyGroup)) + if ss.isTombstone(latestOriginValue) { + // If deleted, remove all versions including the original key + return ss.batchRemoveExpiredKvs(candidateExpiredKeys, curOriginalKey, totalVersions == len(candidateExpiredKeys)) + } + + // If not deleted, check for expired versions + expiredKeys := make([]string, 0) + for _, key := range candidateExpiredKeys { + ts, _ := ss.isTSKey(key) + expireTime, _ := tsoutil.ParseTS(ts) + if expireTime.Add(DefaultSnapshotTTL).Before(now) { + expiredKeys = append(expiredKeys, key) + } + } + if len(expiredKeys) > 0 { + return ss.batchRemoveExpiredKvs(expiredKeys, curOriginalKey, false) + } + return nil } - // walk all kvs with SortAsc, we need walk to the latest key for each key group to check the kv - // whether contains tombstone, then if so, it represents the original key has been removed. - // TODO: walk with Desc + // Walk through all keys with the snapshot prefix err := ss.MetaKv.WalkWithPrefix(ss.snapshotPrefix, PaginationSize, func(k []byte, v []byte) error { - key := string(k) - value := string(v) - - key = ss.hideRootPrefix(key) + key := ss.hideRootPrefix(string(k)) ts, ok := ss.isTSKey(key) - // it is original key if the key doesn't contain ts if !ok { - log.Warn("skip key because it doesn't contain ts", zap.String("key", key)) + log.Warn("Skip key because it doesn't contain ts", zap.String("key", key)) return nil } curOriginalKey, err := ss.getOriginalKey(key) if err != nil { + log.Error("Failed to parse the original key for GC", zap.String("key", key), zap.Error(err)) return err } - // reset if starting look up a new key group + // If we've moved to a new original key, process the previous group if latestOriginalKey != "" && latestOriginalKey != curOriginalKey { - // it indicates all keys need to remove that the prefix is original key - // it means the latest original kvs has already been removed if the latest kv has tombstone marker. - if err := removeFn(latestOriginalKey); err != nil { + if err := cleanFn(latestOriginalKey); err != nil { return err } - keyGroup = make([]string, 0) - groupCnt = 0 + candidateExpiredKeys = make([]string, 0) + totalVersions = 0 } - latestValue = value - groupCnt++ latestOriginalKey = curOriginalKey + latestOriginValue = string(v) + totalVersions++ - // record keys if the kv is expired - expireTime, _ := tsoutil.ParseTS(ts) - // break loop if it reaches expire time - if expireTime.Before(now) { - keyGroup = append(keyGroup, key) + // Record versions that are already expired but not removed + time, _ := tsoutil.ParseTS(ts) + if time.Add(DefaultSnapshotReserveTime).Before(now) { + candidateExpiredKeys = append(candidateExpiredKeys, key) } return nil }) if err != nil { + log.Error("Error occurred during WalkWithPrefix", zap.Error(err)) return err } - return removeFn(latestOriginalKey) + // Process the last group of keys + return cleanFn(latestOriginalKey) } diff --git a/internal/metastore/kv/rootcoord/suffix_snapshot_test.go b/internal/metastore/kv/rootcoord/suffix_snapshot_test.go index 6d76e54470..326716919f 100644 --- a/internal/metastore/kv/rootcoord/suffix_snapshot_test.go +++ b/internal/metastore/kv/rootcoord/suffix_snapshot_test.go @@ -444,7 +444,7 @@ func Test_SuffixSnapshotRemoveExpiredKvs(t *testing.T) { now := time.Now() ftso := func(ts int) typeutil.Timestamp { - return tsoutil.ComposeTS(now.Add(-1*time.Duration(ts)*time.Millisecond).UnixMilli(), 0) + return tsoutil.ComposeTS(now.Add(-1*time.Duration(ts)*time.Hour).UnixMilli(), 0) } getKey := func(prefix string, id int) string { @@ -456,7 +456,7 @@ func Test_SuffixSnapshotRemoveExpiredKvs(t *testing.T) { cnt := 0 for i := 0; i < kVersion; i++ { kvs := make(map[string]string) - ts := ftso((i + 1) * 100) + ts := ftso((i + 1) * 2) for v := 0; v < kCnt; v++ { if i == 0 && v%2 == 0 && cnt < expiredKeyCnt { value = string(SuffixSnapshotTombstone) @@ -509,27 +509,27 @@ func Test_SuffixSnapshotRemoveExpiredKvs(t *testing.T) { t.Run("partial expired and all expired", func(t *testing.T) { prefix := fmt.Sprintf("prefix%d", rand.Int()) value := "v" - ts := ftso(100) + ts := ftso(1) saveFn(getKey(prefix, 0), value, ts) - ts = ftso(200) + ts = ftso(2) saveFn(getKey(prefix, 0), value, ts) - ts = ftso(300) + ts = ftso(3) saveFn(getKey(prefix, 0), value, ts) // insert partial expired kv - ts = ftso(25) + ts = ftso(2) saveFn(getKey(prefix, 1), string(SuffixSnapshotTombstone), ts) - ts = ftso(50) + ts = ftso(4) saveFn(getKey(prefix, 1), value, ts) - ts = ftso(70) + ts = ftso(6) saveFn(getKey(prefix, 1), value, ts) // insert all expired kv - ts = ftso(100) + ts = ftso(1) saveFn(getKey(prefix, 2), string(SuffixSnapshotTombstone), ts) - ts = ftso(200) + ts = ftso(2) saveFn(getKey(prefix, 2), value, ts) - ts = ftso(300) + ts = ftso(3) saveFn(getKey(prefix, 2), value, ts) cnt := countPrefix(prefix) @@ -547,6 +547,47 @@ func Test_SuffixSnapshotRemoveExpiredKvs(t *testing.T) { assert.NoError(t, err) }) + t.Run("partial 24 expired and all expired", func(t *testing.T) { + prefix := fmt.Sprintf("prefix%d", rand.Int()) + value := "v" + ts := ftso(100) + saveFn(getKey(prefix, 0), value, ts) + ts = ftso(200) + saveFn(getKey(prefix, 0), value, ts) + ts = ftso(300) + saveFn(getKey(prefix, 0), value, ts) + + // insert partial expired kv + ts = ftso(2) + saveFn(getKey(prefix, 1), string(SuffixSnapshotTombstone), ts) + ts = ftso(4) + saveFn(getKey(prefix, 1), value, ts) + ts = ftso(6) + saveFn(getKey(prefix, 1), value, ts) + + // insert all expired kv + ts = ftso(1) + saveFn(getKey(prefix, 2), string(SuffixSnapshotTombstone), ts) + ts = ftso(2) + saveFn(getKey(prefix, 2), value, ts) + ts = ftso(3) + saveFn(getKey(prefix, 2), value, ts) + + cnt := countPrefix(prefix) + assert.Equal(t, 12, cnt) + + // err = ss.removeExpiredKvs(now, time.Duration(50)*time.Millisecond) + err = ss.removeExpiredKvs(now) + assert.NoError(t, err) + + cnt = countPrefix(prefix) + assert.Equal(t, 1, cnt) + + // clean all data + err := etcdkv.RemoveWithPrefix("") + assert.NoError(t, err) + }) + t.Run("parse ts fail", func(t *testing.T) { prefix := fmt.Sprintf("prefix%d", rand.Int()) key := fmt.Sprintf("%s-%s", prefix, "ts_error-ts")