From 8fc7069e1af5d078c21856dab14cd27d471419c3 Mon Sep 17 00:00:00 2001 From: congqixia Date: Fri, 18 Jul 2025 15:41:40 +0800 Subject: [PATCH] fix: Make MultiSaveAndRemove execute removal first (#43408) Realted to #43407 When `MultiSaveAndRemove` like ops contains same key in saves and removal keys it may cause data lost if the execution order is save first than removal. This PR make all the kv execute removal first then save the new values. Even when same key appeared in both saves and removals, the new value shall stay. Signed-off-by: Congqi Xia --- internal/kv/etcd/embed_etcd_kv.go | 22 +++++---- internal/kv/etcd/etcd_kv.go | 24 +++++----- internal/kv/mem/mem_kv.go | 8 ++-- internal/kv/tikv/txn_tikv.go | 48 ++++++++++--------- .../metastore/kv/rootcoord/suffix_snapshot.go | 4 ++ pkg/kv/rocksdb/rocksdb_kv.go | 9 ++-- 6 files changed, 63 insertions(+), 52 deletions(-) diff --git a/internal/kv/etcd/embed_etcd_kv.go b/internal/kv/etcd/embed_etcd_kv.go index 3f55ab761c..bcb07a8d44 100644 --- a/internal/kv/etcd/embed_etcd_kv.go +++ b/internal/kv/etcd/embed_etcd_kv.go @@ -466,14 +466,15 @@ func (kv *EmbedEtcdKV) MultiSaveAndRemove(ctx context.Context, saves map[string] } ops := make([]clientv3.Op, 0, len(saves)+len(removals)) - for key, value := range saves { - ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), value)) - } for _, keyDelete := range removals { ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, keyDelete))) } + for key, value := range saves { + ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), value)) + } + ctx, cancel := context.WithTimeout(ctx, kv.requestTimeout) defer cancel() @@ -491,14 +492,15 @@ func (kv *EmbedEtcdKV) MultiSaveAndRemove(ctx context.Context, saves map[string] // MultiSaveBytesAndRemove saves the key-value pairs and removes the keys in a transaction. func (kv *EmbedEtcdKV) MultiSaveBytesAndRemove(ctx context.Context, saves map[string][]byte, removals []string) error { ops := make([]clientv3.Op, 0, len(saves)+len(removals)) - for key, value := range saves { - ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), string(value))) - } for _, keyDelete := range removals { ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, keyDelete))) } + for key, value := range saves { + ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), string(value))) + } + ctx, cancel := context.WithTimeout(ctx, kv.requestTimeout) defer cancel() @@ -532,14 +534,14 @@ func (kv *EmbedEtcdKV) MultiSaveAndRemoveWithPrefix(ctx context.Context, saves m } ops := make([]clientv3.Op, 0, len(saves)+len(removals)) - for key, value := range saves { - ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), value)) - } - for _, keyDelete := range removals { ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, keyDelete), clientv3.WithPrefix())) } + for key, value := range saves { + ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), value)) + } + ctx, cancel := context.WithTimeout(ctx, kv.requestTimeout) defer cancel() diff --git a/internal/kv/etcd/etcd_kv.go b/internal/kv/etcd/etcd_kv.go index 0baf2015f0..cca55059b2 100644 --- a/internal/kv/etcd/etcd_kv.go +++ b/internal/kv/etcd/etcd_kv.go @@ -467,16 +467,16 @@ func (kv *etcdKV) MultiSaveAndRemove(ctx context.Context, saves map[string]strin start := time.Now() ops := make([]clientv3.Op, 0, len(saves)+len(removals)) + for _, keyDelete := range removals { + ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, keyDelete))) + } + var keys []string for key, value := range saves { keys = append(keys, key) ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), value)) } - for _, keyDelete := range removals { - ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, keyDelete))) - } - ctx, cancel := context.WithTimeout(ctx, kv.requestTimeout) defer cancel() @@ -503,15 +503,15 @@ func (kv *etcdKV) MultiSaveBytesAndRemove(ctx context.Context, saves map[string] start := time.Now() ops := make([]clientv3.Op, 0, len(saves)+len(removals)) var keys []string + for _, keyDelete := range removals { + ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, keyDelete))) + } + for key, value := range saves { keys = append(keys, key) ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), string(value))) } - for _, keyDelete := range removals { - ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, keyDelete))) - } - ctx, cancel := context.WithTimeout(ctx, kv.requestTimeout) defer cancel() @@ -564,16 +564,16 @@ func (kv *etcdKV) MultiSaveAndRemoveWithPrefix(ctx context.Context, saves map[st start := time.Now() ops := make([]clientv3.Op, 0, len(saves)) + for _, keyDelete := range removals { + ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, keyDelete), clientv3.WithPrefix())) + } + var keys []string for key, value := range saves { keys = append(keys, key) ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), value)) } - for _, keyDelete := range removals { - ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, keyDelete), clientv3.WithPrefix())) - } - ctx, cancel := context.WithTimeout(ctx, kv.requestTimeout) defer cancel() diff --git a/internal/kv/mem/mem_kv.go b/internal/kv/mem/mem_kv.go index 3a66bf9e4d..da6a4c6ce0 100644 --- a/internal/kv/mem/mem_kv.go +++ b/internal/kv/mem/mem_kv.go @@ -229,12 +229,14 @@ func (kv *MemoryKV) MultiSaveAndRemove(ctx context.Context, saves map[string]str } kv.Lock() defer kv.Unlock() - for key, value := range saves { - kv.tree.ReplaceOrInsert(memoryKVItem{key, StringValue(value)}) - } for _, key := range removals { kv.tree.Delete(memoryKVItem{key: key}) } + + for key, value := range saves { + kv.tree.ReplaceOrInsert(memoryKVItem{key, StringValue(value)}) + } + return nil } diff --git a/internal/kv/tikv/txn_tikv.go b/internal/kv/tikv/txn_tikv.go index 85dee924f0..519c14a664 100644 --- a/internal/kv/tikv/txn_tikv.go +++ b/internal/kv/tikv/txn_tikv.go @@ -466,6 +466,14 @@ func (kv *txnTiKV) MultiSaveAndRemove(ctx context.Context, saves map[string]stri } } + for _, key := range removals { + key = path.Join(kv.rootPath, key) + if err = txn.Delete([]byte(key)); err != nil { + loggingErr = errors.Wrap(err, fmt.Sprintf("Failed to delete %s for MultiSaveAndRemove", key)) + return loggingErr + } + } + for key, value := range saves { key = path.Join(kv.rootPath, key) // Check if value is empty or taking reserved EmptyValue @@ -481,14 +489,6 @@ func (kv *txnTiKV) MultiSaveAndRemove(ctx context.Context, saves map[string]stri } } - for _, key := range removals { - key = path.Join(kv.rootPath, key) - if err = txn.Delete([]byte(key)); err != nil { - loggingErr = errors.Wrap(err, fmt.Sprintf("Failed to delete %s for MultiSaveAndRemove", key)) - return loggingErr - } - } - err = kv.executeTxn(ctx, txn) if err != nil { loggingErr = errors.Wrap(err, "Failed to commit for MultiSaveAndRemove") @@ -529,21 +529,6 @@ func (kv *txnTiKV) MultiSaveAndRemoveWithPrefix(ctx context.Context, saves map[s } } - // Save key-value pairs - for key, value := range saves { - key = path.Join(kv.rootPath, key) - // Check if value is empty or taking reserved EmptyValue - byteValue, err := convertEmptyStringToByte(value) - if err != nil { - loggingErr = errors.Wrap(err, fmt.Sprintf("Failed to cast to byte (%s:%s) for MultiSaveAndRemoveWithPrefix()", key, value)) - return loggingErr - } - err = txn.Set([]byte(key), byteValue) - if err != nil { - loggingErr = errors.Wrap(err, fmt.Sprintf("Failed to set (%s:%s) for MultiSaveAndRemoveWithPrefix()", key, value)) - return loggingErr - } - } // Remove keys with prefix for _, prefix := range removals { prefix = path.Join(kv.rootPath, prefix) @@ -575,6 +560,23 @@ func (kv *txnTiKV) MultiSaveAndRemoveWithPrefix(ctx context.Context, saves map[s } } } + + // Save key-value pairs + for key, value := range saves { + key = path.Join(kv.rootPath, key) + // Check if value is empty or taking reserved EmptyValue + byteValue, err := convertEmptyStringToByte(value) + if err != nil { + loggingErr = errors.Wrap(err, fmt.Sprintf("Failed to cast to byte (%s:%s) for MultiSaveAndRemoveWithPrefix()", key, value)) + return loggingErr + } + err = txn.Set([]byte(key), byteValue) + if err != nil { + loggingErr = errors.Wrap(err, fmt.Sprintf("Failed to set (%s:%s) for MultiSaveAndRemoveWithPrefix()", key, value)) + return loggingErr + } + } + err = kv.executeTxn(ctx, txn) if err != nil { loggingErr = errors.Wrap(err, "Failed to commit for MultiSaveAndRemoveWithPrefix") diff --git a/internal/metastore/kv/rootcoord/suffix_snapshot.go b/internal/metastore/kv/rootcoord/suffix_snapshot.go index 068b478426..3cecf653d7 100644 --- a/internal/metastore/kv/rootcoord/suffix_snapshot.go +++ b/internal/metastore/kv/rootcoord/suffix_snapshot.go @@ -526,6 +526,10 @@ func (ss *SuffixSnapshot) MultiSaveAndRemove(ctx context.Context, saves map[stri // load each removal, change execution to adding tombstones for _, removal := range removals { + // if save batch contains removal, skip remove op + if _, ok := execute[removal]; ok { + continue + } value, err := ss.MetaKv.Load(ctx, removal) if err != nil { log.Warn("SuffixSnapshot MetaKv Load failed", zap.String("key", removal), zap.Error(err)) diff --git a/pkg/kv/rocksdb/rocksdb_kv.go b/pkg/kv/rocksdb/rocksdb_kv.go index c407582f01..a18450ed8a 100644 --- a/pkg/kv/rocksdb/rocksdb_kv.go +++ b/pkg/kv/rocksdb/rocksdb_kv.go @@ -401,12 +401,13 @@ func (kv *RocksdbKV) MultiSaveAndRemove(ctx context.Context, saves map[string]st } writeBatch := gorocksdb.NewWriteBatch() defer writeBatch.Destroy() - for k, v := range saves { - writeBatch.Put([]byte(k), []byte(v)) - } for _, key := range removals { writeBatch.Delete([]byte(key)) } + for k, v := range saves { + writeBatch.Put([]byte(k), []byte(v)) + } + err := kv.DB.Write(kv.WriteOptions, writeBatch) return err } @@ -436,10 +437,10 @@ func (kv *RocksdbKV) MultiSaveAndRemoveWithPrefix(ctx context.Context, saves map } writeBatch := gorocksdb.NewWriteBatch() defer writeBatch.Destroy() + kv.prepareRemovePrefix(removals, writeBatch) for k, v := range saves { writeBatch.Put([]byte(k), []byte(v)) } - kv.prepareRemovePrefix(removals, writeBatch) err := kv.DB.Write(kv.WriteOptions, writeBatch) return err }