diff --git a/internal/kv/etcd/embed_etcd_kv.go b/internal/kv/etcd/embed_etcd_kv.go index 3f55ab761c..bcb07a8d44 100644 --- a/internal/kv/etcd/embed_etcd_kv.go +++ b/internal/kv/etcd/embed_etcd_kv.go @@ -466,14 +466,15 @@ func (kv *EmbedEtcdKV) MultiSaveAndRemove(ctx context.Context, saves map[string] } ops := make([]clientv3.Op, 0, len(saves)+len(removals)) - for key, value := range saves { - ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), value)) - } for _, keyDelete := range removals { ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, keyDelete))) } + for key, value := range saves { + ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), value)) + } + ctx, cancel := context.WithTimeout(ctx, kv.requestTimeout) defer cancel() @@ -491,14 +492,15 @@ func (kv *EmbedEtcdKV) MultiSaveAndRemove(ctx context.Context, saves map[string] // MultiSaveBytesAndRemove saves the key-value pairs and removes the keys in a transaction. func (kv *EmbedEtcdKV) MultiSaveBytesAndRemove(ctx context.Context, saves map[string][]byte, removals []string) error { ops := make([]clientv3.Op, 0, len(saves)+len(removals)) - for key, value := range saves { - ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), string(value))) - } for _, keyDelete := range removals { ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, keyDelete))) } + for key, value := range saves { + ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), string(value))) + } + ctx, cancel := context.WithTimeout(ctx, kv.requestTimeout) defer cancel() @@ -532,14 +534,14 @@ func (kv *EmbedEtcdKV) MultiSaveAndRemoveWithPrefix(ctx context.Context, saves m } ops := make([]clientv3.Op, 0, len(saves)+len(removals)) - for key, value := range saves { - ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), value)) - } - for _, keyDelete := range removals { ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, keyDelete), clientv3.WithPrefix())) } + for key, value := range saves { + ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), value)) + } + ctx, cancel := context.WithTimeout(ctx, kv.requestTimeout) defer cancel() diff --git a/internal/kv/etcd/etcd_kv.go b/internal/kv/etcd/etcd_kv.go index 0baf2015f0..cca55059b2 100644 --- a/internal/kv/etcd/etcd_kv.go +++ b/internal/kv/etcd/etcd_kv.go @@ -467,16 +467,16 @@ func (kv *etcdKV) MultiSaveAndRemove(ctx context.Context, saves map[string]strin start := time.Now() ops := make([]clientv3.Op, 0, len(saves)+len(removals)) + for _, keyDelete := range removals { + ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, keyDelete))) + } + var keys []string for key, value := range saves { keys = append(keys, key) ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), value)) } - for _, keyDelete := range removals { - ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, keyDelete))) - } - ctx, cancel := context.WithTimeout(ctx, kv.requestTimeout) defer cancel() @@ -503,15 +503,15 @@ func (kv *etcdKV) MultiSaveBytesAndRemove(ctx context.Context, saves map[string] start := time.Now() ops := make([]clientv3.Op, 0, len(saves)+len(removals)) var keys []string + for _, keyDelete := range removals { + ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, keyDelete))) + } + for key, value := range saves { keys = append(keys, key) ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), string(value))) } - for _, keyDelete := range removals { - ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, keyDelete))) - } - ctx, cancel := context.WithTimeout(ctx, kv.requestTimeout) defer cancel() @@ -564,16 +564,16 @@ func (kv *etcdKV) MultiSaveAndRemoveWithPrefix(ctx context.Context, saves map[st start := time.Now() ops := make([]clientv3.Op, 0, len(saves)) + for _, keyDelete := range removals { + ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, keyDelete), clientv3.WithPrefix())) + } + var keys []string for key, value := range saves { keys = append(keys, key) ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), value)) } - for _, keyDelete := range removals { - ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, keyDelete), clientv3.WithPrefix())) - } - ctx, cancel := context.WithTimeout(ctx, kv.requestTimeout) defer cancel() diff --git a/internal/kv/mem/mem_kv.go b/internal/kv/mem/mem_kv.go index 3a66bf9e4d..da6a4c6ce0 100644 --- a/internal/kv/mem/mem_kv.go +++ b/internal/kv/mem/mem_kv.go @@ -229,12 +229,14 @@ func (kv *MemoryKV) MultiSaveAndRemove(ctx context.Context, saves map[string]str } kv.Lock() defer kv.Unlock() - for key, value := range saves { - kv.tree.ReplaceOrInsert(memoryKVItem{key, StringValue(value)}) - } for _, key := range removals { kv.tree.Delete(memoryKVItem{key: key}) } + + for key, value := range saves { + kv.tree.ReplaceOrInsert(memoryKVItem{key, StringValue(value)}) + } + return nil } diff --git a/internal/kv/tikv/txn_tikv.go b/internal/kv/tikv/txn_tikv.go index 85dee924f0..519c14a664 100644 --- a/internal/kv/tikv/txn_tikv.go +++ b/internal/kv/tikv/txn_tikv.go @@ -466,6 +466,14 @@ func (kv *txnTiKV) MultiSaveAndRemove(ctx context.Context, saves map[string]stri } } + for _, key := range removals { + key = path.Join(kv.rootPath, key) + if err = txn.Delete([]byte(key)); err != nil { + loggingErr = errors.Wrap(err, fmt.Sprintf("Failed to delete %s for MultiSaveAndRemove", key)) + return loggingErr + } + } + for key, value := range saves { key = path.Join(kv.rootPath, key) // Check if value is empty or taking reserved EmptyValue @@ -481,14 +489,6 @@ func (kv *txnTiKV) MultiSaveAndRemove(ctx context.Context, saves map[string]stri } } - for _, key := range removals { - key = path.Join(kv.rootPath, key) - if err = txn.Delete([]byte(key)); err != nil { - loggingErr = errors.Wrap(err, fmt.Sprintf("Failed to delete %s for MultiSaveAndRemove", key)) - return loggingErr - } - } - err = kv.executeTxn(ctx, txn) if err != nil { loggingErr = errors.Wrap(err, "Failed to commit for MultiSaveAndRemove") @@ -529,21 +529,6 @@ func (kv *txnTiKV) MultiSaveAndRemoveWithPrefix(ctx context.Context, saves map[s } } - // Save key-value pairs - for key, value := range saves { - key = path.Join(kv.rootPath, key) - // Check if value is empty or taking reserved EmptyValue - byteValue, err := convertEmptyStringToByte(value) - if err != nil { - loggingErr = errors.Wrap(err, fmt.Sprintf("Failed to cast to byte (%s:%s) for MultiSaveAndRemoveWithPrefix()", key, value)) - return loggingErr - } - err = txn.Set([]byte(key), byteValue) - if err != nil { - loggingErr = errors.Wrap(err, fmt.Sprintf("Failed to set (%s:%s) for MultiSaveAndRemoveWithPrefix()", key, value)) - return loggingErr - } - } // Remove keys with prefix for _, prefix := range removals { prefix = path.Join(kv.rootPath, prefix) @@ -575,6 +560,23 @@ func (kv *txnTiKV) MultiSaveAndRemoveWithPrefix(ctx context.Context, saves map[s } } } + + // Save key-value pairs + for key, value := range saves { + key = path.Join(kv.rootPath, key) + // Check if value is empty or taking reserved EmptyValue + byteValue, err := convertEmptyStringToByte(value) + if err != nil { + loggingErr = errors.Wrap(err, fmt.Sprintf("Failed to cast to byte (%s:%s) for MultiSaveAndRemoveWithPrefix()", key, value)) + return loggingErr + } + err = txn.Set([]byte(key), byteValue) + if err != nil { + loggingErr = errors.Wrap(err, fmt.Sprintf("Failed to set (%s:%s) for MultiSaveAndRemoveWithPrefix()", key, value)) + return loggingErr + } + } + err = kv.executeTxn(ctx, txn) if err != nil { loggingErr = errors.Wrap(err, "Failed to commit for MultiSaveAndRemoveWithPrefix") diff --git a/internal/metastore/kv/rootcoord/suffix_snapshot.go b/internal/metastore/kv/rootcoord/suffix_snapshot.go index 068b478426..3cecf653d7 100644 --- a/internal/metastore/kv/rootcoord/suffix_snapshot.go +++ b/internal/metastore/kv/rootcoord/suffix_snapshot.go @@ -526,6 +526,10 @@ func (ss *SuffixSnapshot) MultiSaveAndRemove(ctx context.Context, saves map[stri // load each removal, change execution to adding tombstones for _, removal := range removals { + // if save batch contains removal, skip remove op + if _, ok := execute[removal]; ok { + continue + } value, err := ss.MetaKv.Load(ctx, removal) if err != nil { log.Warn("SuffixSnapshot MetaKv Load failed", zap.String("key", removal), zap.Error(err)) diff --git a/pkg/kv/rocksdb/rocksdb_kv.go b/pkg/kv/rocksdb/rocksdb_kv.go index c407582f01..a18450ed8a 100644 --- a/pkg/kv/rocksdb/rocksdb_kv.go +++ b/pkg/kv/rocksdb/rocksdb_kv.go @@ -401,12 +401,13 @@ func (kv *RocksdbKV) MultiSaveAndRemove(ctx context.Context, saves map[string]st } writeBatch := gorocksdb.NewWriteBatch() defer writeBatch.Destroy() - for k, v := range saves { - writeBatch.Put([]byte(k), []byte(v)) - } for _, key := range removals { writeBatch.Delete([]byte(key)) } + for k, v := range saves { + writeBatch.Put([]byte(k), []byte(v)) + } + err := kv.DB.Write(kv.WriteOptions, writeBatch) return err } @@ -436,10 +437,10 @@ func (kv *RocksdbKV) MultiSaveAndRemoveWithPrefix(ctx context.Context, saves map } writeBatch := gorocksdb.NewWriteBatch() defer writeBatch.Destroy() + kv.prepareRemovePrefix(removals, writeBatch) for k, v := range saves { writeBatch.Put([]byte(k), []byte(v)) } - kv.prepareRemovePrefix(removals, writeBatch) err := kv.DB.Write(kv.WriteOptions, writeBatch) return err }