mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
fix: replace removeWithPrefix with remove to avoid delete redundantly (#33328)
#33288 --------- Signed-off-by: lixinguo <xinguo.li@zilliz.com> Co-authored-by: lixinguo <xinguo.li@zilliz.com>
This commit is contained in:
parent
c6a1c49e02
commit
2c7bb0b8ac
@ -91,5 +91,6 @@ type SnapShotKV interface {
|
||||
Load(key string, ts typeutil.Timestamp) (string, error)
|
||||
MultiSave(kvs map[string]string, ts typeutil.Timestamp) error
|
||||
LoadWithPrefix(key string, ts typeutil.Timestamp) ([]string, []string, error)
|
||||
MultiSaveAndRemove(saves map[string]string, removals []string, ts typeutil.Timestamp) error
|
||||
MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, ts typeutil.Timestamp) error
|
||||
}
|
||||
|
||||
@ -11,6 +11,7 @@ type mockSnapshotKV struct {
|
||||
MultiSaveFunc func(kvs map[string]string, ts typeutil.Timestamp) error
|
||||
LoadWithPrefixFunc func(key string, ts typeutil.Timestamp) ([]string, []string, error)
|
||||
MultiSaveAndRemoveWithPrefixFunc func(saves map[string]string, removals []string, ts typeutil.Timestamp) error
|
||||
MultiSaveAndRemoveFunc func(saves map[string]string, removals []string, ts typeutil.Timestamp) error
|
||||
}
|
||||
|
||||
func NewMockSnapshotKV() *mockSnapshotKV {
|
||||
@ -51,3 +52,10 @@ func (m mockSnapshotKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, re
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m mockSnapshotKV) MultiSaveAndRemove(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
|
||||
if m.MultiSaveAndRemoveFunc != nil {
|
||||
return m.MultiSaveAndRemoveFunc(saves, removals, ts)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -87,3 +87,19 @@ func Test_mockSnapshotKV_MultiSaveAndRemoveWithPrefix(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func Test_mockSnapshotKV_MultiSaveAndRemove(t *testing.T) {
|
||||
t.Run("func not set", func(t *testing.T) {
|
||||
snapshot := NewMockSnapshotKV()
|
||||
err := snapshot.MultiSaveAndRemove(nil, nil, 0)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
t.Run("func set", func(t *testing.T) {
|
||||
snapshot := NewMockSnapshotKV()
|
||||
snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
|
||||
return nil
|
||||
}
|
||||
err := snapshot.MultiSaveAndRemove(nil, nil, 0)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
@ -177,6 +177,50 @@ func (_c *SnapShotKV_MultiSave_Call) RunAndReturn(run func(map[string]string, ui
|
||||
return _c
|
||||
}
|
||||
|
||||
// MultiSaveAndRemove provides a mock function with given fields: saves, removals, ts
|
||||
func (_m *SnapShotKV) MultiSaveAndRemove(saves map[string]string, removals []string, ts uint64) error {
|
||||
ret := _m.Called(saves, removals, ts)
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(map[string]string, []string, uint64) error); ok {
|
||||
r0 = rf(saves, removals, ts)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// SnapShotKV_MultiSaveAndRemove_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MultiSaveAndRemove'
|
||||
type SnapShotKV_MultiSaveAndRemove_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// MultiSaveAndRemove is a helper method to define mock.On call
|
||||
// - saves map[string]string
|
||||
// - removals []string
|
||||
// - ts uint64
|
||||
func (_e *SnapShotKV_Expecter) MultiSaveAndRemove(saves interface{}, removals interface{}, ts interface{}) *SnapShotKV_MultiSaveAndRemove_Call {
|
||||
return &SnapShotKV_MultiSaveAndRemove_Call{Call: _e.mock.On("MultiSaveAndRemove", saves, removals, ts)}
|
||||
}
|
||||
|
||||
func (_c *SnapShotKV_MultiSaveAndRemove_Call) Run(run func(saves map[string]string, removals []string, ts uint64)) *SnapShotKV_MultiSaveAndRemove_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(map[string]string), args[1].([]string), args[2].(uint64))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *SnapShotKV_MultiSaveAndRemove_Call) Return(_a0 error) *SnapShotKV_MultiSaveAndRemove_Call {
|
||||
_c.Call.Return(_a0)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *SnapShotKV_MultiSaveAndRemove_Call) RunAndReturn(run func(map[string]string, []string, uint64) error) *SnapShotKV_MultiSaveAndRemove_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// MultiSaveAndRemoveWithPrefix provides a mock function with given fields: saves, removals, ts
|
||||
func (_m *SnapShotKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, ts uint64) error {
|
||||
ret := _m.Called(saves, removals, ts)
|
||||
|
||||
@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sort"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/golang/protobuf/proto"
|
||||
@ -85,7 +84,7 @@ func BuildAliasPrefixWithDB(dbID int64) string {
|
||||
|
||||
// since SnapshotKV may save both snapshot key and the original key if the original key is newest
|
||||
// MaxEtcdTxnNum need to divided by 2
|
||||
func batchMultiSaveAndRemoveWithPrefix(snapshot kv.SnapShotKV, limit int, saves map[string]string, removals []string, ts typeutil.Timestamp) error {
|
||||
func batchMultiSaveAndRemove(snapshot kv.SnapShotKV, limit int, saves map[string]string, removals []string, ts typeutil.Timestamp) error {
|
||||
saveFn := func(partialKvs map[string]string) error {
|
||||
return snapshot.MultiSave(partialKvs, ts)
|
||||
}
|
||||
@ -93,14 +92,8 @@ func batchMultiSaveAndRemoveWithPrefix(snapshot kv.SnapShotKV, limit int, saves
|
||||
return err
|
||||
}
|
||||
|
||||
// avoid a case that the former key is the prefix of the later key.
|
||||
// for example, `root-coord/fields/collection_id/1` is the prefix of `root-coord/fields/collection_id/100`.
|
||||
sort.Slice(removals, func(i, j int) bool {
|
||||
return removals[i] > removals[j]
|
||||
})
|
||||
|
||||
removeFn := func(partialKeys []string) error {
|
||||
return snapshot.MultiSaveAndRemoveWithPrefix(nil, partialKeys, ts)
|
||||
return snapshot.MultiSaveAndRemove(nil, partialKeys, ts)
|
||||
}
|
||||
return etcd.RemoveByBatchWithLimit(removals, limit, removeFn)
|
||||
}
|
||||
@ -127,7 +120,7 @@ func (kc *Catalog) AlterDatabase(ctx context.Context, newColl *model.Database, t
|
||||
|
||||
func (kc *Catalog) DropDatabase(ctx context.Context, dbID int64, ts typeutil.Timestamp) error {
|
||||
key := BuildDatabaseKey(dbID)
|
||||
return kc.Snapshot.MultiSaveAndRemoveWithPrefix(nil, []string{key}, ts)
|
||||
return kc.Snapshot.MultiSaveAndRemove(nil, []string{key}, ts)
|
||||
}
|
||||
|
||||
func (kc *Catalog) ListDatabases(ctx context.Context, ts typeutil.Timestamp) ([]*model.Database, error) {
|
||||
@ -300,7 +293,7 @@ func (kc *Catalog) CreateAlias(ctx context.Context, alias *model.Alias, ts typeu
|
||||
return err
|
||||
}
|
||||
kvs := map[string]string{k: string(v)}
|
||||
return kc.Snapshot.MultiSaveAndRemoveWithPrefix(kvs, []string{oldKBefore210, oldKeyWithoutDb}, ts)
|
||||
return kc.Snapshot.MultiSaveAndRemove(kvs, []string{oldKBefore210, oldKeyWithoutDb}, ts)
|
||||
}
|
||||
|
||||
func (kc *Catalog) CreateCredential(ctx context.Context, credential *model.Credential) error {
|
||||
@ -455,12 +448,12 @@ func (kc *Catalog) DropCollection(ctx context.Context, collectionInfo *model.Col
|
||||
// However, if we remove collection first, we cannot remove other metas.
|
||||
// since SnapshotKV may save both snapshot key and the original key if the original key is newest
|
||||
// MaxEtcdTxnNum need to divided by 2
|
||||
if err := batchMultiSaveAndRemoveWithPrefix(kc.Snapshot, util.MaxEtcdTxnNum/2, nil, delMetakeysSnap, ts); err != nil {
|
||||
if err := batchMultiSaveAndRemove(kc.Snapshot, util.MaxEtcdTxnNum/2, nil, delMetakeysSnap, ts); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// if we found collection dropping, we should try removing related resources.
|
||||
return kc.Snapshot.MultiSaveAndRemoveWithPrefix(nil, collectionKeys, ts)
|
||||
return kc.Snapshot.MultiSaveAndRemove(nil, collectionKeys, ts)
|
||||
}
|
||||
|
||||
func (kc *Catalog) alterModifyCollection(oldColl *model.Collection, newColl *model.Collection, ts typeutil.Timestamp) error {
|
||||
@ -491,7 +484,7 @@ func (kc *Catalog) alterModifyCollection(oldColl *model.Collection, newColl *mod
|
||||
if oldKey == newKey {
|
||||
return kc.Snapshot.Save(newKey, string(value), ts)
|
||||
}
|
||||
return kc.Snapshot.MultiSaveAndRemoveWithPrefix(saves, []string{oldKey}, ts)
|
||||
return kc.Snapshot.MultiSaveAndRemove(saves, []string{oldKey}, ts)
|
||||
}
|
||||
|
||||
func (kc *Catalog) AlterCollection(ctx context.Context, oldColl *model.Collection, newColl *model.Collection, alterType metastore.AlterType, ts typeutil.Timestamp) error {
|
||||
@ -559,7 +552,7 @@ func (kc *Catalog) DropPartition(ctx context.Context, dbID int64, collectionID t
|
||||
|
||||
if partitionVersionAfter210(collMeta) {
|
||||
k := BuildPartitionKey(collectionID, partitionID)
|
||||
return kc.Snapshot.MultiSaveAndRemoveWithPrefix(nil, []string{k}, ts)
|
||||
return kc.Snapshot.MultiSaveAndRemove(nil, []string{k}, ts)
|
||||
}
|
||||
|
||||
k := BuildCollectionKey(util.NonDBID, collectionID)
|
||||
@ -601,7 +594,7 @@ func (kc *Catalog) DropAlias(ctx context.Context, dbID int64, alias string, ts t
|
||||
oldKBefore210 := BuildAliasKey210(alias)
|
||||
oldKeyWithoutDb := BuildAliasKey(alias)
|
||||
k := BuildAliasKeyWithDB(dbID, alias)
|
||||
return kc.Snapshot.MultiSaveAndRemoveWithPrefix(nil, []string{k, oldKeyWithoutDb, oldKBefore210}, ts)
|
||||
return kc.Snapshot.MultiSaveAndRemove(nil, []string{k, oldKeyWithoutDb, oldKBefore210}, ts)
|
||||
}
|
||||
|
||||
func (kc *Catalog) GetCollectionByName(ctx context.Context, dbID int64, collectionName string, ts typeutil.Timestamp) (*model.Collection, error) {
|
||||
|
||||
@ -495,7 +495,7 @@ func TestCatalog_CreateAliasV2(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
snapshot := kv.NewMockSnapshotKV()
|
||||
snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
|
||||
snapshot.MultiSaveAndRemoveFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
|
||||
return errors.New("mock")
|
||||
}
|
||||
|
||||
@ -504,7 +504,7 @@ func TestCatalog_CreateAliasV2(t *testing.T) {
|
||||
err := kc.CreateAlias(ctx, &model.Alias{}, 0)
|
||||
assert.Error(t, err)
|
||||
|
||||
snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
|
||||
snapshot.MultiSaveAndRemoveFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
|
||||
return nil
|
||||
}
|
||||
err = kc.CreateAlias(ctx, &model.Alias{}, 0)
|
||||
@ -623,7 +623,7 @@ func TestCatalog_AlterAliasV2(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
snapshot := kv.NewMockSnapshotKV()
|
||||
snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
|
||||
snapshot.MultiSaveAndRemoveFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
|
||||
return errors.New("mock")
|
||||
}
|
||||
|
||||
@ -632,7 +632,7 @@ func TestCatalog_AlterAliasV2(t *testing.T) {
|
||||
err := kc.AlterAlias(ctx, &model.Alias{}, 0)
|
||||
assert.Error(t, err)
|
||||
|
||||
snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
|
||||
snapshot.MultiSaveAndRemoveFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
|
||||
return nil
|
||||
}
|
||||
err = kc.AlterAlias(ctx, &model.Alias{}, 0)
|
||||
@ -706,7 +706,7 @@ func TestCatalog_DropPartitionV2(t *testing.T) {
|
||||
snapshot.LoadFunc = func(key string, ts typeutil.Timestamp) (string, error) {
|
||||
return string(value), nil
|
||||
}
|
||||
snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
|
||||
snapshot.MultiSaveAndRemoveFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
|
||||
return errors.New("mock")
|
||||
}
|
||||
|
||||
@ -715,7 +715,7 @@ func TestCatalog_DropPartitionV2(t *testing.T) {
|
||||
err = kc.DropPartition(ctx, 0, 100, 101, 0)
|
||||
assert.Error(t, err)
|
||||
|
||||
snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
|
||||
snapshot.MultiSaveAndRemoveFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
|
||||
return nil
|
||||
}
|
||||
err = kc.DropPartition(ctx, 0, 100, 101, 0)
|
||||
@ -758,7 +758,7 @@ func TestCatalog_DropAliasV2(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
snapshot := kv.NewMockSnapshotKV()
|
||||
snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
|
||||
snapshot.MultiSaveAndRemoveFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
|
||||
return errors.New("mock")
|
||||
}
|
||||
|
||||
@ -767,7 +767,7 @@ func TestCatalog_DropAliasV2(t *testing.T) {
|
||||
err := kc.DropAlias(ctx, testDb, "alias", 0)
|
||||
assert.Error(t, err)
|
||||
|
||||
snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
|
||||
snapshot.MultiSaveAndRemoveFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
|
||||
return nil
|
||||
}
|
||||
err = kc.DropAlias(ctx, testDb, "alias", 0)
|
||||
@ -942,14 +942,14 @@ func TestCatalog_ListAliasesV2(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func Test_batchMultiSaveAndRemoveWithPrefix(t *testing.T) {
|
||||
func Test_batchMultiSaveAndRemove(t *testing.T) {
|
||||
t.Run("failed to save", func(t *testing.T) {
|
||||
snapshot := kv.NewMockSnapshotKV()
|
||||
snapshot.MultiSaveFunc = func(kvs map[string]string, ts typeutil.Timestamp) error {
|
||||
return errors.New("error mock MultiSave")
|
||||
}
|
||||
saves := map[string]string{"k": "v"}
|
||||
err := batchMultiSaveAndRemoveWithPrefix(snapshot, util.MaxEtcdTxnNum/2, saves, []string{}, 0)
|
||||
err := batchMultiSaveAndRemove(snapshot, util.MaxEtcdTxnNum/2, saves, []string{}, 0)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
t.Run("failed to remove", func(t *testing.T) {
|
||||
@ -957,12 +957,12 @@ func Test_batchMultiSaveAndRemoveWithPrefix(t *testing.T) {
|
||||
snapshot.MultiSaveFunc = func(kvs map[string]string, ts typeutil.Timestamp) error {
|
||||
return nil
|
||||
}
|
||||
snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
|
||||
return errors.New("error mock MultiSaveAndRemoveWithPrefix")
|
||||
snapshot.MultiSaveAndRemoveFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
|
||||
return errors.New("error mock MultiSaveAndRemove")
|
||||
}
|
||||
saves := map[string]string{"k": "v"}
|
||||
removals := []string{"prefix1", "prefix2"}
|
||||
err := batchMultiSaveAndRemoveWithPrefix(snapshot, util.MaxEtcdTxnNum/2, saves, removals, 0)
|
||||
err := batchMultiSaveAndRemove(snapshot, util.MaxEtcdTxnNum/2, saves, removals, 0)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
t.Run("normal case", func(t *testing.T) {
|
||||
@ -971,7 +971,7 @@ func Test_batchMultiSaveAndRemoveWithPrefix(t *testing.T) {
|
||||
log.Info("multi save", zap.Any("len", len(kvs)), zap.Any("saves", kvs))
|
||||
return nil
|
||||
}
|
||||
snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
|
||||
snapshot.MultiSaveAndRemoveFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
|
||||
log.Info("multi save and remove with prefix", zap.Any("len of saves", len(saves)), zap.Any("len of removals", len(removals)),
|
||||
zap.Any("saves", saves), zap.Any("removals", removals))
|
||||
return nil
|
||||
@ -983,7 +983,7 @@ func Test_batchMultiSaveAndRemoveWithPrefix(t *testing.T) {
|
||||
saves[fmt.Sprintf("k%d", i)] = fmt.Sprintf("v%d", i)
|
||||
removals = append(removals, fmt.Sprintf("k%d", i))
|
||||
}
|
||||
err := batchMultiSaveAndRemoveWithPrefix(snapshot, util.MaxEtcdTxnNum/2, saves, removals, 0)
|
||||
err := batchMultiSaveAndRemove(snapshot, util.MaxEtcdTxnNum/2, saves, removals, 0)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
}
|
||||
@ -1040,7 +1040,7 @@ func TestCatalog_AlterCollection(t *testing.T) {
|
||||
t.Run("modify db name", func(t *testing.T) {
|
||||
var collectionID int64 = 1
|
||||
snapshot := kv.NewMockSnapshotKV()
|
||||
snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
|
||||
snapshot.MultiSaveAndRemoveFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
|
||||
assert.ElementsMatch(t, []string{BuildCollectionKey(0, collectionID)}, removals)
|
||||
assert.Equal(t, len(saves), 1)
|
||||
assert.Contains(t, maps.Keys(saves), BuildCollectionKey(1, collectionID))
|
||||
@ -1149,6 +1149,17 @@ func withMockMultiSaveAndRemoveWithPrefix(err error) mockSnapshotOpt {
|
||||
}
|
||||
}
|
||||
|
||||
func withMockMultiSaveAndRemove(err error) mockSnapshotOpt {
|
||||
return func(ss *mocks.SnapShotKV) {
|
||||
ss.On(
|
||||
"MultiSaveAndRemove",
|
||||
mock.AnythingOfType("map[string]string"),
|
||||
mock.AnythingOfType("[]string"),
|
||||
mock.AnythingOfType("uint64")).
|
||||
Return(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCatalog_CreateCollection(t *testing.T) {
|
||||
t.Run("collection not creating", func(t *testing.T) {
|
||||
kc := &Catalog{}
|
||||
@ -1198,7 +1209,7 @@ func TestCatalog_CreateCollection(t *testing.T) {
|
||||
|
||||
func TestCatalog_DropCollection(t *testing.T) {
|
||||
t.Run("failed to remove", func(t *testing.T) {
|
||||
mockSnapshot := newMockSnapshot(t, withMockMultiSaveAndRemoveWithPrefix(errors.New("error mock MultiSaveAndRemoveWithPrefix")))
|
||||
mockSnapshot := newMockSnapshot(t, withMockMultiSaveAndRemove(errors.New("error mock MultiSaveAndRemove")))
|
||||
kc := &Catalog{Snapshot: mockSnapshot}
|
||||
ctx := context.Background()
|
||||
coll := &model.Collection{
|
||||
@ -1216,7 +1227,7 @@ func TestCatalog_DropCollection(t *testing.T) {
|
||||
removeOtherCalled := false
|
||||
removeCollectionCalled := false
|
||||
mockSnapshot.On(
|
||||
"MultiSaveAndRemoveWithPrefix",
|
||||
"MultiSaveAndRemove",
|
||||
mock.AnythingOfType("map[string]string"),
|
||||
mock.AnythingOfType("[]string"),
|
||||
mock.AnythingOfType("uint64")).
|
||||
@ -1225,13 +1236,13 @@ func TestCatalog_DropCollection(t *testing.T) {
|
||||
return nil
|
||||
}).Once()
|
||||
mockSnapshot.On(
|
||||
"MultiSaveAndRemoveWithPrefix",
|
||||
"MultiSaveAndRemove",
|
||||
mock.AnythingOfType("map[string]string"),
|
||||
mock.AnythingOfType("[]string"),
|
||||
mock.AnythingOfType("uint64")).
|
||||
Return(func(map[string]string, []string, typeutil.Timestamp) error {
|
||||
removeCollectionCalled = true
|
||||
return errors.New("error mock MultiSaveAndRemoveWithPrefix")
|
||||
return errors.New("error mock MultiSaveAndRemove")
|
||||
}).Once()
|
||||
kc := &Catalog{Snapshot: mockSnapshot}
|
||||
ctx := context.Background()
|
||||
@ -1248,7 +1259,7 @@ func TestCatalog_DropCollection(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("normal case", func(t *testing.T) {
|
||||
mockSnapshot := newMockSnapshot(t, withMockMultiSaveAndRemoveWithPrefix(nil))
|
||||
mockSnapshot := newMockSnapshot(t, withMockMultiSaveAndRemove(nil))
|
||||
kc := &Catalog{Snapshot: mockSnapshot}
|
||||
ctx := context.Background()
|
||||
coll := &model.Collection{
|
||||
|
||||
@ -35,6 +35,7 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util"
|
||||
"github.com/milvus-io/milvus/pkg/util/etcd"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/retry"
|
||||
"github.com/milvus-io/milvus/pkg/util/tsoutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
@ -502,6 +503,53 @@ func (ss *SuffixSnapshot) LoadWithPrefix(key string, ts typeutil.Timestamp) ([]s
|
||||
return resultKeys, resultValues, nil
|
||||
}
|
||||
|
||||
// MultiSaveAndRemove save muiltple kvs and remove as well
|
||||
// if ts == 0, act like MetaKv
|
||||
// each key-value will be treated in same logic like Save
|
||||
func (ss *SuffixSnapshot) MultiSaveAndRemove(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
|
||||
// if ts == 0, act like MetaKv
|
||||
if ts == 0 {
|
||||
return ss.MetaKv.MultiSaveAndRemove(saves, removals)
|
||||
}
|
||||
ss.Lock()
|
||||
defer ss.Unlock()
|
||||
var err error
|
||||
|
||||
// process each key, checks whether is the latest
|
||||
execute, updateList, err := ss.generateSaveExecute(saves, ts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// load each removal, change execution to adding tombstones
|
||||
for _, removal := range removals {
|
||||
value, err := ss.MetaKv.Load(removal)
|
||||
if err != nil {
|
||||
log.Warn("SuffixSnapshot MetaKv Load failed", zap.String("key", removal), zap.Error(err))
|
||||
if errors.Is(err, merr.ErrIoKeyNotFound) {
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
// add tombstone to original key and add ts entry
|
||||
if IsTombstone(value) {
|
||||
continue
|
||||
}
|
||||
execute[removal] = string(SuffixSnapshotTombstone)
|
||||
execute[ss.composeTSKey(removal, ts)] = string(SuffixSnapshotTombstone)
|
||||
updateList = append(updateList, removal)
|
||||
}
|
||||
|
||||
// multi save execute map; if succeeds, update ts in the update list
|
||||
err = ss.MetaKv.MultiSave(execute)
|
||||
if err == nil {
|
||||
for _, key := range updateList {
|
||||
ss.lastestTS[key] = ts
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// MultiSaveAndRemoveWithPrefix save muiltple kvs and remove as well
|
||||
// if ts == 0, act like MetaKv
|
||||
// each key-value will be treated in same logic like Save
|
||||
|
||||
@ -673,6 +673,82 @@ func Test_SuffixSnapshotMultiSaveAndRemoveWithPrefix(t *testing.T) {
|
||||
ss.MultiSaveAndRemoveWithPrefix(map[string]string{}, []string{""}, 0)
|
||||
}
|
||||
|
||||
func Test_SuffixSnapshotMultiSaveAndRemove(t *testing.T) {
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
randVal := rand.Int()
|
||||
|
||||
rootPath := fmt.Sprintf("/test/meta/%d", randVal)
|
||||
sep := "_ts"
|
||||
|
||||
etcdCli, err := etcd.GetEtcdClient(
|
||||
Params.EtcdCfg.UseEmbedEtcd.GetAsBool(),
|
||||
Params.EtcdCfg.EtcdUseSSL.GetAsBool(),
|
||||
Params.EtcdCfg.Endpoints.GetAsStrings(),
|
||||
Params.EtcdCfg.EtcdTLSCert.GetValue(),
|
||||
Params.EtcdCfg.EtcdTLSKey.GetValue(),
|
||||
Params.EtcdCfg.EtcdTLSCACert.GetValue(),
|
||||
Params.EtcdCfg.EtcdTLSMinVersion.GetValue())
|
||||
require.Nil(t, err)
|
||||
defer etcdCli.Close()
|
||||
etcdkv := etcdkv.NewEtcdKV(etcdCli, rootPath)
|
||||
require.Nil(t, err)
|
||||
defer etcdkv.Close()
|
||||
|
||||
var vtso typeutil.Timestamp
|
||||
ftso := func() typeutil.Timestamp {
|
||||
return vtso
|
||||
}
|
||||
|
||||
ss, err := NewSuffixSnapshot(etcdkv, sep, rootPath, snapshotPrefix)
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, ss)
|
||||
defer ss.Close()
|
||||
|
||||
for i := 0; i < 20; i++ {
|
||||
vtso = typeutil.Timestamp(100 + i*5)
|
||||
ts := ftso()
|
||||
err = ss.Save(fmt.Sprintf("kd-%04d", i), fmt.Sprintf("value-%d", i), ts)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, vtso, ts)
|
||||
}
|
||||
for i := 20; i < 40; i++ {
|
||||
sm := map[string]string{"ks": fmt.Sprintf("value-%d", i)}
|
||||
dm := []string{fmt.Sprintf("kd-%04d", i-20)}
|
||||
vtso = typeutil.Timestamp(100 + i*5)
|
||||
ts := ftso()
|
||||
err = ss.MultiSaveAndRemove(sm, dm, ts)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, vtso, ts)
|
||||
}
|
||||
for i := 0; i < 20; i++ {
|
||||
val, err := ss.Load(fmt.Sprintf("kd-%04d", i), typeutil.Timestamp(100+i*5+2))
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, fmt.Sprintf("value-%d", i), val)
|
||||
_, vals, err := ss.LoadWithPrefix("kd-", typeutil.Timestamp(100+i*5+2))
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, i+1, len(vals))
|
||||
}
|
||||
for i := 20; i < 40; i++ {
|
||||
val, err := ss.Load("ks", typeutil.Timestamp(100+i*5+2))
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, fmt.Sprintf("value-%d", i), val)
|
||||
_, vals, err := ss.LoadWithPrefix("kd-", typeutil.Timestamp(100+i*5+2))
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 39-i, len(vals))
|
||||
}
|
||||
|
||||
// try to load
|
||||
_, err = ss.Load("kd-0000", 500)
|
||||
assert.Error(t, err)
|
||||
_, err = ss.Load("kd-0000", 0)
|
||||
assert.Error(t, err)
|
||||
_, err = ss.Load("kd-0000", 1)
|
||||
assert.Error(t, err)
|
||||
|
||||
// cleanup
|
||||
ss.MultiSaveAndRemoveWithPrefix(map[string]string{}, []string{""}, 0)
|
||||
}
|
||||
|
||||
func TestSuffixSnapshot_LoadWithPrefix(t *testing.T) {
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
randVal := rand.Int()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user