enhance: [2.5] Skip remove op if key in save set (#43426)

Cherry-pick from master
pr: #43425
Related to #43407

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2025-07-18 17:36:58 +08:00 committed by GitHub
parent b657c076a4
commit 8e957a203d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 31 additions and 0 deletions

View File

@ -24,6 +24,7 @@ import (
"time"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/embed"
"go.etcd.io/etcd/server/v3/etcdserver/api/v3client"
@ -33,6 +34,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/kv/predicates"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
// implementation assertion
@ -466,6 +468,10 @@ func (kv *EmbedEtcdKV) MultiSaveAndRemove(ctx context.Context, saves map[string]
}
ops := make([]clientv3.Op, 0, len(saves)+len(removals))
// use complement to remove keys that are not in saves
saveKeys := typeutil.NewSet(lo.Keys(saves)...)
removeKeys := typeutil.NewSet(removals...)
removals = removeKeys.Complement(saveKeys).Collect()
for _, keyDelete := range removals {
ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, keyDelete)))

View File

@ -23,6 +23,7 @@ import (
"path"
"time"
"github.com/samber/lo"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
@ -32,6 +33,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/metrics"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/timerecord"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
const (
@ -467,6 +469,10 @@ func (kv *etcdKV) MultiSaveAndRemove(ctx context.Context, saves map[string]strin
start := time.Now()
ops := make([]clientv3.Op, 0, len(saves)+len(removals))
// use complement to remove keys that are not in saves
saveKeys := typeutil.NewSet(lo.Keys(saves)...)
removeKeys := typeutil.NewSet(removals...)
removals = removeKeys.Complement(saveKeys).Collect()
for _, keyDelete := range removals {
ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, keyDelete)))
}

View File

@ -22,10 +22,12 @@ import (
"sync"
"github.com/google/btree"
"github.com/samber/lo"
"github.com/milvus-io/milvus/pkg/v2/kv"
"github.com/milvus-io/milvus/pkg/v2/kv/predicates"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
// implementation assertion
@ -229,6 +231,10 @@ func (kv *MemoryKV) MultiSaveAndRemove(ctx context.Context, saves map[string]str
}
kv.Lock()
defer kv.Unlock()
// use complement to remove keys that are not in saves
saveKeys := typeutil.NewSet(lo.Keys(saves)...)
removeKeys := typeutil.NewSet(removals...)
removals = removeKeys.Complement(saveKeys).Collect()
for _, key := range removals {
kv.tree.Delete(memoryKVItem{key: key})
}

View File

@ -25,6 +25,7 @@ import (
"time"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
tikverr "github.com/tikv/client-go/v2/error"
tikv "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/txnkv"
@ -39,6 +40,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/timerecord"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
// A quick note is that we are using loggingErr at our outermost scope in order to perform logging
@ -466,6 +468,11 @@ func (kv *txnTiKV) MultiSaveAndRemove(ctx context.Context, saves map[string]stri
}
}
// use complement to remove keys that are not in saves
saveKeys := typeutil.NewSet(lo.Keys(saves)...)
removeKeys := typeutil.NewSet(removals...)
removals = removeKeys.Complement(saveKeys).Collect()
for _, key := range removals {
key = path.Join(kv.rootPath, key)
if err = txn.Delete([]byte(key)); err != nil {

View File

@ -21,6 +21,7 @@ import (
"fmt"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/tecbot/gorocksdb"
"github.com/milvus-io/milvus/pkg/v2/kv"
@ -401,6 +402,11 @@ func (kv *RocksdbKV) MultiSaveAndRemove(ctx context.Context, saves map[string]st
}
writeBatch := gorocksdb.NewWriteBatch()
defer writeBatch.Destroy()
// use complement to remove keys that are not in saves
saveKeys := typeutil.NewSet(lo.Keys(saves)...)
removeKeys := typeutil.NewSet(removals...)
removals = removeKeys.Complement(saveKeys).Collect()
for _, key := range removals {
writeBatch.Delete([]byte(key))
}