From 672a83f66b68f0b1a47b5113b7da975ac4ed9ab9 Mon Sep 17 00:00:00 2001 From: congqixia Date: Fri, 18 Jul 2025 17:37:39 +0800 Subject: [PATCH] enhance: Skip remove op if key in save set (#43425) Related to #43407 Signed-off-by: Congqi Xia --- internal/kv/etcd/embed_etcd_kv.go | 6 ++++++ internal/kv/etcd/etcd_kv.go | 6 ++++++ internal/kv/mem/mem_kv.go | 6 ++++++ internal/kv/tikv/txn_tikv.go | 7 +++++++ pkg/kv/rocksdb/rocksdb_kv.go | 6 ++++++ 5 files changed, 31 insertions(+) diff --git a/internal/kv/etcd/embed_etcd_kv.go b/internal/kv/etcd/embed_etcd_kv.go index bcb07a8d44..cc244eea86 100644 --- a/internal/kv/etcd/embed_etcd_kv.go +++ b/internal/kv/etcd/embed_etcd_kv.go @@ -24,6 +24,7 @@ import ( "time" "github.com/cockroachdb/errors" + "github.com/samber/lo" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/server/v3/embed" "go.etcd.io/etcd/server/v3/etcdserver/api/v3client" @@ -33,6 +34,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/kv/predicates" "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/util/merr" + "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) // implementation assertion @@ -466,6 +468,10 @@ func (kv *EmbedEtcdKV) MultiSaveAndRemove(ctx context.Context, saves map[string] } ops := make([]clientv3.Op, 0, len(saves)+len(removals)) + // use complement to remove keys that are not in saves + saveKeys := typeutil.NewSet(lo.Keys(saves)...) + removeKeys := typeutil.NewSet(removals...) + removals = removeKeys.Complement(saveKeys).Collect() for _, keyDelete := range removals { ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, keyDelete))) diff --git a/internal/kv/etcd/etcd_kv.go b/internal/kv/etcd/etcd_kv.go index cca55059b2..bccb2bade1 100644 --- a/internal/kv/etcd/etcd_kv.go +++ b/internal/kv/etcd/etcd_kv.go @@ -23,6 +23,7 @@ import ( "path" "time" + "github.com/samber/lo" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" @@ -32,6 +33,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/metrics" "github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/timerecord" + "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) const ( @@ -467,6 +469,10 @@ func (kv *etcdKV) MultiSaveAndRemove(ctx context.Context, saves map[string]strin start := time.Now() ops := make([]clientv3.Op, 0, len(saves)+len(removals)) + // use complement to remove keys that are not in saves + saveKeys := typeutil.NewSet(lo.Keys(saves)...) + removeKeys := typeutil.NewSet(removals...) + removals = removeKeys.Complement(saveKeys).Collect() for _, keyDelete := range removals { ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, keyDelete))) } diff --git a/internal/kv/mem/mem_kv.go b/internal/kv/mem/mem_kv.go index da6a4c6ce0..bc46708234 100644 --- a/internal/kv/mem/mem_kv.go +++ b/internal/kv/mem/mem_kv.go @@ -22,10 +22,12 @@ import ( "sync" "github.com/google/btree" + "github.com/samber/lo" "github.com/milvus-io/milvus/pkg/v2/kv" "github.com/milvus-io/milvus/pkg/v2/kv/predicates" "github.com/milvus-io/milvus/pkg/v2/util/merr" + "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) // implementation assertion @@ -229,6 +231,10 @@ func (kv *MemoryKV) MultiSaveAndRemove(ctx context.Context, saves map[string]str } kv.Lock() defer kv.Unlock() + // use complement to remove keys that are not in saves + saveKeys := typeutil.NewSet(lo.Keys(saves)...) + removeKeys := typeutil.NewSet(removals...) + removals = removeKeys.Complement(saveKeys).Collect() for _, key := range removals { kv.tree.Delete(memoryKVItem{key: key}) } diff --git a/internal/kv/tikv/txn_tikv.go b/internal/kv/tikv/txn_tikv.go index 519c14a664..cac396c1f9 100644 --- a/internal/kv/tikv/txn_tikv.go +++ b/internal/kv/tikv/txn_tikv.go @@ -25,6 +25,7 @@ import ( "time" "github.com/cockroachdb/errors" + "github.com/samber/lo" tikverr "github.com/tikv/client-go/v2/error" tikv "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/txnkv" @@ -39,6 +40,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/timerecord" + "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) // A quick note is that we are using loggingErr at our outermost scope in order to perform logging @@ -466,6 +468,11 @@ func (kv *txnTiKV) MultiSaveAndRemove(ctx context.Context, saves map[string]stri } } + // use complement to remove keys that are not in saves + saveKeys := typeutil.NewSet(lo.Keys(saves)...) + removeKeys := typeutil.NewSet(removals...) + removals = removeKeys.Complement(saveKeys).Collect() + for _, key := range removals { key = path.Join(kv.rootPath, key) if err = txn.Delete([]byte(key)); err != nil { diff --git a/pkg/kv/rocksdb/rocksdb_kv.go b/pkg/kv/rocksdb/rocksdb_kv.go index a18450ed8a..ea61f005f8 100644 --- a/pkg/kv/rocksdb/rocksdb_kv.go +++ b/pkg/kv/rocksdb/rocksdb_kv.go @@ -21,6 +21,7 @@ import ( "fmt" "github.com/cockroachdb/errors" + "github.com/samber/lo" "github.com/tecbot/gorocksdb" "github.com/milvus-io/milvus/pkg/v2/kv" @@ -401,6 +402,11 @@ func (kv *RocksdbKV) MultiSaveAndRemove(ctx context.Context, saves map[string]st } writeBatch := gorocksdb.NewWriteBatch() defer writeBatch.Destroy() + // use complement to remove keys that are not in saves + saveKeys := typeutil.NewSet(lo.Keys(saves)...) + removeKeys := typeutil.NewSet(removals...) + removals = removeKeys.Complement(saveKeys).Collect() + for _, key := range removals { writeBatch.Delete([]byte(key)) }