mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 01:28:27 +08:00
enhance: Skip remove op if key in save set (#43425)
Related to #43407 Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
2adc6ce0bc
commit
672a83f66b
@ -24,6 +24,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/cockroachdb/errors"
|
"github.com/cockroachdb/errors"
|
||||||
|
"github.com/samber/lo"
|
||||||
clientv3 "go.etcd.io/etcd/client/v3"
|
clientv3 "go.etcd.io/etcd/client/v3"
|
||||||
"go.etcd.io/etcd/server/v3/embed"
|
"go.etcd.io/etcd/server/v3/embed"
|
||||||
"go.etcd.io/etcd/server/v3/etcdserver/api/v3client"
|
"go.etcd.io/etcd/server/v3/etcdserver/api/v3client"
|
||||||
@ -33,6 +34,7 @@ import (
|
|||||||
"github.com/milvus-io/milvus/pkg/v2/kv/predicates"
|
"github.com/milvus-io/milvus/pkg/v2/kv/predicates"
|
||||||
"github.com/milvus-io/milvus/pkg/v2/log"
|
"github.com/milvus-io/milvus/pkg/v2/log"
|
||||||
"github.com/milvus-io/milvus/pkg/v2/util/merr"
|
"github.com/milvus-io/milvus/pkg/v2/util/merr"
|
||||||
|
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
// implementation assertion
|
// implementation assertion
|
||||||
@ -466,6 +468,10 @@ func (kv *EmbedEtcdKV) MultiSaveAndRemove(ctx context.Context, saves map[string]
|
|||||||
}
|
}
|
||||||
|
|
||||||
ops := make([]clientv3.Op, 0, len(saves)+len(removals))
|
ops := make([]clientv3.Op, 0, len(saves)+len(removals))
|
||||||
|
// use complement to remove keys that are not in saves
|
||||||
|
saveKeys := typeutil.NewSet(lo.Keys(saves)...)
|
||||||
|
removeKeys := typeutil.NewSet(removals...)
|
||||||
|
removals = removeKeys.Complement(saveKeys).Collect()
|
||||||
|
|
||||||
for _, keyDelete := range removals {
|
for _, keyDelete := range removals {
|
||||||
ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, keyDelete)))
|
ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, keyDelete)))
|
||||||
|
|||||||
@ -23,6 +23,7 @@ import (
|
|||||||
"path"
|
"path"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/samber/lo"
|
||||||
clientv3 "go.etcd.io/etcd/client/v3"
|
clientv3 "go.etcd.io/etcd/client/v3"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
@ -32,6 +33,7 @@ import (
|
|||||||
"github.com/milvus-io/milvus/pkg/v2/metrics"
|
"github.com/milvus-io/milvus/pkg/v2/metrics"
|
||||||
"github.com/milvus-io/milvus/pkg/v2/util/merr"
|
"github.com/milvus-io/milvus/pkg/v2/util/merr"
|
||||||
"github.com/milvus-io/milvus/pkg/v2/util/timerecord"
|
"github.com/milvus-io/milvus/pkg/v2/util/timerecord"
|
||||||
|
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -467,6 +469,10 @@ func (kv *etcdKV) MultiSaveAndRemove(ctx context.Context, saves map[string]strin
|
|||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
ops := make([]clientv3.Op, 0, len(saves)+len(removals))
|
ops := make([]clientv3.Op, 0, len(saves)+len(removals))
|
||||||
|
// use complement to remove keys that are not in saves
|
||||||
|
saveKeys := typeutil.NewSet(lo.Keys(saves)...)
|
||||||
|
removeKeys := typeutil.NewSet(removals...)
|
||||||
|
removals = removeKeys.Complement(saveKeys).Collect()
|
||||||
for _, keyDelete := range removals {
|
for _, keyDelete := range removals {
|
||||||
ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, keyDelete)))
|
ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, keyDelete)))
|
||||||
}
|
}
|
||||||
|
|||||||
@ -22,10 +22,12 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/google/btree"
|
"github.com/google/btree"
|
||||||
|
"github.com/samber/lo"
|
||||||
|
|
||||||
"github.com/milvus-io/milvus/pkg/v2/kv"
|
"github.com/milvus-io/milvus/pkg/v2/kv"
|
||||||
"github.com/milvus-io/milvus/pkg/v2/kv/predicates"
|
"github.com/milvus-io/milvus/pkg/v2/kv/predicates"
|
||||||
"github.com/milvus-io/milvus/pkg/v2/util/merr"
|
"github.com/milvus-io/milvus/pkg/v2/util/merr"
|
||||||
|
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
// implementation assertion
|
// implementation assertion
|
||||||
@ -229,6 +231,10 @@ func (kv *MemoryKV) MultiSaveAndRemove(ctx context.Context, saves map[string]str
|
|||||||
}
|
}
|
||||||
kv.Lock()
|
kv.Lock()
|
||||||
defer kv.Unlock()
|
defer kv.Unlock()
|
||||||
|
// use complement to remove keys that are not in saves
|
||||||
|
saveKeys := typeutil.NewSet(lo.Keys(saves)...)
|
||||||
|
removeKeys := typeutil.NewSet(removals...)
|
||||||
|
removals = removeKeys.Complement(saveKeys).Collect()
|
||||||
for _, key := range removals {
|
for _, key := range removals {
|
||||||
kv.tree.Delete(memoryKVItem{key: key})
|
kv.tree.Delete(memoryKVItem{key: key})
|
||||||
}
|
}
|
||||||
|
|||||||
@ -25,6 +25,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/cockroachdb/errors"
|
"github.com/cockroachdb/errors"
|
||||||
|
"github.com/samber/lo"
|
||||||
tikverr "github.com/tikv/client-go/v2/error"
|
tikverr "github.com/tikv/client-go/v2/error"
|
||||||
tikv "github.com/tikv/client-go/v2/kv"
|
tikv "github.com/tikv/client-go/v2/kv"
|
||||||
"github.com/tikv/client-go/v2/txnkv"
|
"github.com/tikv/client-go/v2/txnkv"
|
||||||
@ -39,6 +40,7 @@ import (
|
|||||||
"github.com/milvus-io/milvus/pkg/v2/util/merr"
|
"github.com/milvus-io/milvus/pkg/v2/util/merr"
|
||||||
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
|
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
|
||||||
"github.com/milvus-io/milvus/pkg/v2/util/timerecord"
|
"github.com/milvus-io/milvus/pkg/v2/util/timerecord"
|
||||||
|
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
// A quick note is that we are using loggingErr at our outermost scope in order to perform logging
|
// A quick note is that we are using loggingErr at our outermost scope in order to perform logging
|
||||||
@ -466,6 +468,11 @@ func (kv *txnTiKV) MultiSaveAndRemove(ctx context.Context, saves map[string]stri
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// use complement to remove keys that are not in saves
|
||||||
|
saveKeys := typeutil.NewSet(lo.Keys(saves)...)
|
||||||
|
removeKeys := typeutil.NewSet(removals...)
|
||||||
|
removals = removeKeys.Complement(saveKeys).Collect()
|
||||||
|
|
||||||
for _, key := range removals {
|
for _, key := range removals {
|
||||||
key = path.Join(kv.rootPath, key)
|
key = path.Join(kv.rootPath, key)
|
||||||
if err = txn.Delete([]byte(key)); err != nil {
|
if err = txn.Delete([]byte(key)); err != nil {
|
||||||
|
|||||||
@ -21,6 +21,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/cockroachdb/errors"
|
"github.com/cockroachdb/errors"
|
||||||
|
"github.com/samber/lo"
|
||||||
"github.com/tecbot/gorocksdb"
|
"github.com/tecbot/gorocksdb"
|
||||||
|
|
||||||
"github.com/milvus-io/milvus/pkg/v2/kv"
|
"github.com/milvus-io/milvus/pkg/v2/kv"
|
||||||
@ -401,6 +402,11 @@ func (kv *RocksdbKV) MultiSaveAndRemove(ctx context.Context, saves map[string]st
|
|||||||
}
|
}
|
||||||
writeBatch := gorocksdb.NewWriteBatch()
|
writeBatch := gorocksdb.NewWriteBatch()
|
||||||
defer writeBatch.Destroy()
|
defer writeBatch.Destroy()
|
||||||
|
// use complement to remove keys that are not in saves
|
||||||
|
saveKeys := typeutil.NewSet(lo.Keys(saves)...)
|
||||||
|
removeKeys := typeutil.NewSet(removals...)
|
||||||
|
removals = removeKeys.Complement(saveKeys).Collect()
|
||||||
|
|
||||||
for _, key := range removals {
|
for _, key := range removals {
|
||||||
writeBatch.Delete([]byte(key))
|
writeBatch.Delete([]byte(key))
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user