From a3dd2756cfa38f4ed989efcbf18756fe68af3fd2 Mon Sep 17 00:00:00 2001 From: congqixia Date: Wed, 27 Sep 2023 10:21:26 +0800 Subject: [PATCH] Add predicates for TxnKV operations (#27365) Signed-off-by: Congqi Xia --- Makefile | 1 + internal/datacoord/channel_store_test.go | 3 +- internal/kv/etcd/embed_etcd_kv.go | 38 +- internal/kv/etcd/embed_etcd_kv_test.go | 92 ++ internal/kv/etcd/etcd_kv.go | 33 +- internal/kv/etcd/etcd_kv_test.go | 1238 +++++++++++----------- internal/kv/etcd/util.go | 42 + internal/kv/etcd/util_test.go | 72 ++ internal/kv/kv.go | 5 +- internal/kv/mem/mem_kv.go | 12 +- internal/kv/mem/mem_kv_test.go | 16 + internal/kv/mocks/meta_kv.go | 75 +- internal/kv/mocks/txn_kv.go | 75 +- internal/kv/mocks/watch_kv.go | 72 +- internal/kv/predicates/mock_predicate.go | 240 +++++ internal/kv/predicates/predicate.go | 73 ++ internal/kv/predicates/predicate_test.go | 33 + internal/kv/rocksdb/rocksdb_kv.go | 12 +- internal/kv/rocksdb/rocksdb_kv_test.go | 20 + internal/kv/tikv/txn_tikv.go | 94 +- internal/kv/tikv/txn_tikv_test.go | 44 + pkg/util/merr/utils.go | 8 + 22 files changed, 1585 insertions(+), 713 deletions(-) create mode 100644 internal/kv/etcd/util.go create mode 100644 internal/kv/etcd/util_test.go create mode 100644 internal/kv/predicates/mock_predicate.go create mode 100644 internal/kv/predicates/predicate.go create mode 100644 internal/kv/predicates/predicate_test.go diff --git a/Makefile b/Makefile index 278f98fb5e..8ca4f3d629 100644 --- a/Makefile +++ b/Makefile @@ -440,6 +440,7 @@ generate-mockery-kv: getdeps $(INSTALL_PATH)/mockery --name=MetaKv --dir=$(PWD)/internal/kv --output=$(PWD)/internal/kv/mocks --filename=meta_kv.go --with-expecter $(INSTALL_PATH)/mockery --name=WatchKV --dir=$(PWD)/internal/kv --output=$(PWD)/internal/kv/mocks --filename=watch_kv.go --with-expecter $(INSTALL_PATH)/mockery --name=SnapShotKV --dir=$(PWD)/internal/kv --output=$(PWD)/internal/kv/mocks --filename=snapshot_kv.go --with-expecter + $(INSTALL_PATH)/mockery --name=Predicate --dir=$(PWD)/internal/kv/predicates --output=$(PWD)/internal/kv/predicates --filename=mock_predicate.go --with-expecter --inpackage generate-mockery-pkg: $(MAKE) -C pkg generate-mockery diff --git a/internal/datacoord/channel_store_test.go b/internal/datacoord/channel_store_test.go index ace7e086f8..22f545fb40 100644 --- a/internal/datacoord/channel_store_test.go +++ b/internal/datacoord/channel_store_test.go @@ -28,6 +28,7 @@ import ( "github.com/milvus-io/milvus/internal/kv" "github.com/milvus-io/milvus/internal/kv/mocks" + "github.com/milvus-io/milvus/internal/kv/predicates" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/testutils" @@ -73,7 +74,7 @@ func genChannelOperations(from, to int64, num int) ChannelOpSet { func TestChannelStore_Update(t *testing.T) { txnKv := mocks.NewTxnKV(t) - txnKv.EXPECT().MultiSaveAndRemove(mock.Anything, mock.Anything).Run(func(saves map[string]string, removals []string) { + txnKv.EXPECT().MultiSaveAndRemove(mock.Anything, mock.Anything).Run(func(saves map[string]string, removals []string, preds ...predicates.Predicate) { assert.False(t, len(saves)+len(removals) > 128, "too many operations") }).Return(nil) diff --git a/internal/kv/etcd/embed_etcd_kv.go b/internal/kv/etcd/embed_etcd_kv.go index 15155c42d8..3b8c7ef119 100644 --- a/internal/kv/etcd/embed_etcd_kv.go +++ b/internal/kv/etcd/embed_etcd_kv.go @@ -30,8 +30,10 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus/internal/kv" + "github.com/milvus-io/milvus/internal/kv/predicates" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/merr" ) // implementation assertion @@ -421,7 +423,12 @@ func (kv *EmbedEtcdKV) MultiRemove(keys []string) error { } // MultiSaveAndRemove saves the key-value pairs and removes the keys in a transaction. -func (kv *EmbedEtcdKV) MultiSaveAndRemove(saves map[string]string, removals []string) error { +func (kv *EmbedEtcdKV) MultiSaveAndRemove(saves map[string]string, removals []string, preds ...predicates.Predicate) error { + cmps, err := parsePredicates(kv.rootPath, preds...) + if err != nil { + return err + } + ops := make([]clientv3.Op, 0, len(saves)+len(removals)) for key, value := range saves { ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), value)) @@ -434,8 +441,15 @@ func (kv *EmbedEtcdKV) MultiSaveAndRemove(saves map[string]string, removals []st ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() - _, err := kv.client.Txn(ctx).If().Then(ops...).Commit() - return err + resp, err := kv.client.Txn(ctx).If(cmps...).Then(ops...).Commit() + if err != nil { + return err + } + + if !resp.Succeeded { + return merr.WrapErrIoFailedReason("failed to execute transaction") + } + return nil } // MultiSaveBytesAndRemove saves the key-value pairs and removes the keys in a transaction. @@ -475,7 +489,12 @@ func (kv *EmbedEtcdKV) WatchWithRevision(key string, revision int64) clientv3.Wa } // MultiSaveAndRemoveWithPrefix saves kv in @saves and removes the keys with given prefix in @removals. -func (kv *EmbedEtcdKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error { +func (kv *EmbedEtcdKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, preds ...predicates.Predicate) error { + cmps, err := parsePredicates(kv.rootPath, preds...) + if err != nil { + return err + } + ops := make([]clientv3.Op, 0, len(saves)+len(removals)) for key, value := range saves { ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), value)) @@ -488,8 +507,15 @@ func (kv *EmbedEtcdKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, rem ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() - _, err := kv.client.Txn(ctx).If().Then(ops...).Commit() - return err + resp, err := kv.client.Txn(ctx).If(cmps...).Then(ops...).Commit() + if err != nil { + return err + } + + if !resp.Succeeded { + return merr.WrapErrIoFailedReason("failed to execute transaction") + } + return nil } // MultiSaveBytesAndRemoveWithPrefix saves kv in @saves and removes the keys with given prefix in @removals. diff --git a/internal/kv/etcd/embed_etcd_kv_test.go b/internal/kv/etcd/embed_etcd_kv_test.go index 5d6c599d71..d41684ff2d 100644 --- a/internal/kv/etcd/embed_etcd_kv_test.go +++ b/internal/kv/etcd/embed_etcd_kv_test.go @@ -18,15 +18,20 @@ package etcdkv_test import ( "fmt" + "path" "sort" "testing" "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" "golang.org/x/exp/maps" + "github.com/milvus-io/milvus/internal/kv" embed_etcd_kv "github.com/milvus-io/milvus/internal/kv/etcd" + "github.com/milvus-io/milvus/internal/kv/predicates" + "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -825,3 +830,90 @@ func TestEmbedEtcd(te *testing.T) { assert.False(t, has) }) } + +type EmbedEtcdKVSuite struct { + suite.Suite + + param *paramtable.ComponentParam + + rootPath string + kv kv.MetaKv +} + +func (s *EmbedEtcdKVSuite) SetupSuite() { + te := s.T() + te.Setenv(metricsinfo.DeployModeEnvKey, metricsinfo.StandaloneDeployMode) + param := new(paramtable.ComponentParam) + te.Setenv("etcd.use.embed", "true") + te.Setenv("etcd.config.path", "../../../configs/advanced/etcd.yaml") + + dir := te.TempDir() + te.Setenv("etcd.data.dir", dir) + + param.Init(paramtable.NewBaseTable()) + s.param = param +} + +func (s *EmbedEtcdKVSuite) SetupTest() { + s.rootPath = path.Join("unittest/etcdkv", funcutil.RandomString(8)) + + metaKv, err := embed_etcd_kv.NewMetaKvFactory(s.rootPath, &s.param.EtcdCfg) + s.Require().NoError(err) + s.kv = metaKv +} + +func (s *EmbedEtcdKVSuite) TearDownTest() { + if s.kv != nil { + s.kv.RemoveWithPrefix("") + s.kv.Close() + s.kv = nil + } +} + +func (s *EmbedEtcdKVSuite) TestTxnWithPredicates() { + etcdKV := s.kv + + prepareKV := map[string]string{ + "lease1": "1", + "lease2": "2", + } + + err := etcdKV.MultiSave(prepareKV) + s.Require().NoError(err) + + badPredicate := predicates.NewMockPredicate(s.T()) + badPredicate.EXPECT().Type().Return(0) + badPredicate.EXPECT().Target().Return(predicates.PredTargetValue) + + multiSaveAndRemovePredTests := []struct { + tag string + multiSave map[string]string + preds []predicates.Predicate + expectSuccess bool + }{ + {"predicate_ok", map[string]string{"a": "b"}, []predicates.Predicate{predicates.ValueEqual("lease1", "1")}, true}, + {"predicate_fail", map[string]string{"a": "b"}, []predicates.Predicate{predicates.ValueEqual("lease1", "2")}, false}, + {"bad_predicate", map[string]string{"a": "b"}, []predicates.Predicate{badPredicate}, false}, + } + + for _, test := range multiSaveAndRemovePredTests { + s.Run(test.tag, func() { + err := etcdKV.MultiSaveAndRemove(test.multiSave, nil, test.preds...) + if test.expectSuccess { + s.NoError(err) + } else { + s.Error(err) + } + err = etcdKV.MultiSaveAndRemoveWithPrefix(test.multiSave, nil, test.preds...) + if test.expectSuccess { + s.NoError(err) + } else { + s.Error(err) + } + }) + } +} + +func TestEmbedEtcdKV(t *testing.T) { + suite.Run(t, new(EmbedEtcdKVSuite)) +} diff --git a/internal/kv/etcd/etcd_kv.go b/internal/kv/etcd/etcd_kv.go index 7a071266e4..dd05e4ed6e 100644 --- a/internal/kv/etcd/etcd_kv.go +++ b/internal/kv/etcd/etcd_kv.go @@ -26,9 +26,11 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/kv/predicates" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/timerecord" ) @@ -443,7 +445,12 @@ func (kv *etcdKV) MultiRemove(keys []string) error { } // MultiSaveAndRemove saves the key-value pairs and removes the keys in a transaction. -func (kv *etcdKV) MultiSaveAndRemove(saves map[string]string, removals []string) error { +func (kv *etcdKV) MultiSaveAndRemove(saves map[string]string, removals []string, preds ...predicates.Predicate) error { + cmps, err := parsePredicates(kv.rootPath, preds...) + if err != nil { + return err + } + start := time.Now() ops := make([]clientv3.Op, 0, len(saves)+len(removals)) var keys []string @@ -459,7 +466,7 @@ func (kv *etcdKV) MultiSaveAndRemove(saves map[string]string, removals []string) ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() - _, err := kv.executeTxn(kv.getTxnWithCmp(ctx), ops...) + resp, err := kv.executeTxn(kv.getTxnWithCmp(ctx, cmps...), ops...) if err != nil { log.Warn("Etcd MultiSaveAndRemove error", zap.Any("saves", saves), @@ -467,9 +474,14 @@ func (kv *etcdKV) MultiSaveAndRemove(saves map[string]string, removals []string) zap.Int("saveLength", len(saves)), zap.Int("removeLength", len(removals)), zap.Error(err)) + return err } CheckElapseAndWarn(start, "Slow etcd operation multi save and remove", zap.Strings("keys", keys)) - return err + if !resp.Succeeded { + log.Warn("failed to executeTxn", zap.Any("resp", resp)) + return merr.WrapErrIoFailedReason("failed to execute transaction") + } + return nil } // MultiSaveBytesAndRemove saves the key-value pairs and removes the keys in a transaction. @@ -530,7 +542,12 @@ func (kv *etcdKV) WatchWithRevision(key string, revision int64) clientv3.WatchCh } // MultiSaveAndRemoveWithPrefix saves kv in @saves and removes the keys with given prefix in @removals. -func (kv *etcdKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error { +func (kv *etcdKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, preds ...predicates.Predicate) error { + cmps, err := parsePredicates(kv.rootPath, preds...) + if err != nil { + return err + } + start := time.Now() ops := make([]clientv3.Op, 0, len(saves)) var keys []string @@ -546,7 +563,7 @@ func (kv *etcdKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() - _, err := kv.executeTxn(kv.getTxnWithCmp(ctx), ops...) + resp, err := kv.executeTxn(kv.getTxnWithCmp(ctx, cmps...), ops...) if err != nil { log.Warn("Etcd MultiSaveAndRemoveWithPrefix error", zap.Any("saves", saves), @@ -554,9 +571,13 @@ func (kv *etcdKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals zap.Int("saveLength", len(saves)), zap.Int("removeLength", len(removals)), zap.Error(err)) + return err } CheckElapseAndWarn(start, "Slow etcd operation multi save and move with prefix", zap.Strings("keys", keys)) - return err + if !resp.Succeeded { + return merr.WrapErrIoFailedReason("failed to execute transaction") + } + return nil } // MultiSaveBytesAndRemoveWithPrefix saves kv in @saves and removes the keys with given prefix in @removals. diff --git a/internal/kv/etcd/etcd_kv_test.go b/internal/kv/etcd/etcd_kv_test.go index 300568c147..76908530fb 100644 --- a/internal/kv/etcd/etcd_kv_test.go +++ b/internal/kv/etcd/etcd_kv_test.go @@ -14,11 +14,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -package etcdkv_test +package etcdkv import ( "fmt" "os" + "path" "sort" "testing" "time" @@ -26,9 +27,11 @@ import ( "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + clientv3 "go.etcd.io/etcd/client/v3" "golang.org/x/exp/maps" - etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" + "github.com/milvus-io/milvus/internal/kv/predicates" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -42,7 +45,15 @@ func TestMain(m *testing.M) { os.Exit(code) } -func TestEtcdKV_Load(te *testing.T) { +type EtcdKVSuite struct { + suite.Suite + + rootPath string + etcdCli *clientv3.Client + etcdKV *etcdKV +} + +func (s *EtcdKVSuite) SetupSuite() { etcdCli, err := etcd.GetEtcdClient( Params.EtcdCfg.UseEmbedEtcd.GetAsBool(), Params.EtcdCfg.EtcdUseSSL.GetAsBool(), @@ -51,618 +62,647 @@ func TestEtcdKV_Load(te *testing.T) { Params.EtcdCfg.EtcdTLSKey.GetValue(), Params.EtcdCfg.EtcdTLSCACert.GetValue(), Params.EtcdCfg.EtcdTLSMinVersion.GetValue()) - defer etcdCli.Close() - assert.NoError(te, err) - te.Run("etcdKV SaveAndLoad", func(t *testing.T) { - rootPath := "/etcd/test/root/saveandload" - etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath) - err = etcdKV.RemoveWithPrefix("") - require.NoError(t, err) + s.Require().NoError(err) - defer etcdKV.Close() - defer etcdKV.RemoveWithPrefix("") + s.etcdCli = etcdCli +} - saveAndLoadTests := []struct { - key string - value string - }{ - {"test1", "value1"}, - {"test2", "value2"}, - {"test1/a", "value_a"}, - {"test1/b", "value_b"}, +func (s *EtcdKVSuite) TearDownSuite() { + if s.etcdCli != nil { + s.etcdCli.Close() + } +} + +func (s *EtcdKVSuite) SetupTest() { + s.rootPath = path.Join("unittest/etcdkv", funcutil.RandomString(8)) + s.etcdKV = NewEtcdKV(s.etcdCli, s.rootPath) +} + +func (s *EtcdKVSuite) TearDownTest() { + s.etcdKV.RemoveWithPrefix("") + s.etcdKV.Close() +} + +func (s *EtcdKVSuite) TestSaveLoad() { + etcdKV := s.etcdKV + saveAndLoadTests := []struct { + key string + value string + }{ + {"test1", "value1"}, + {"test2", "value2"}, + {"test1/a", "value_a"}, + {"test1/b", "value_b"}, + } + + for i, test := range saveAndLoadTests { + if i < 4 { + err := etcdKV.Save(test.key, test.value) + s.Require().NoError(err) } - for i, test := range saveAndLoadTests { - if i < 4 { - err = etcdKV.Save(test.key, test.value) - assert.NoError(t, err) + val, err := etcdKV.Load(test.key) + s.Require().NoError(err) + s.Equal(test.value, val) + } + + invalidLoadTests := []struct { + invalidKey string + }{ + {"t"}, + {"a"}, + {"test1a"}, + } + + for _, test := range invalidLoadTests { + val, err := etcdKV.Load(test.invalidKey) + s.Error(err) + s.Zero(val) + } + + loadPrefixTests := []struct { + prefix string + + expectedKeys []string + expectedValues []string + expectedError error + }{ + {"test", []string{ + etcdKV.GetPath("test1"), + etcdKV.GetPath("test2"), + etcdKV.GetPath("test1/a"), + etcdKV.GetPath("test1/b"), + }, []string{"value1", "value2", "value_a", "value_b"}, nil}, + {"test1", []string{ + etcdKV.GetPath("test1"), + etcdKV.GetPath("test1/a"), + etcdKV.GetPath("test1/b"), + }, []string{"value1", "value_a", "value_b"}, nil}, + {"test2", []string{etcdKV.GetPath("test2")}, []string{"value2"}, nil}, + {"", []string{ + etcdKV.GetPath("test1"), + etcdKV.GetPath("test2"), + etcdKV.GetPath("test1/a"), + etcdKV.GetPath("test1/b"), + }, []string{"value1", "value2", "value_a", "value_b"}, nil}, + {"test1/a", []string{etcdKV.GetPath("test1/a")}, []string{"value_a"}, nil}, + {"a", []string{}, []string{}, nil}, + {"root", []string{}, []string{}, nil}, + {"/etcd/test/root", []string{}, []string{}, nil}, + } + + for _, test := range loadPrefixTests { + actualKeys, actualValues, err := etcdKV.LoadWithPrefix(test.prefix) + s.ElementsMatch(test.expectedKeys, actualKeys) + s.ElementsMatch(test.expectedValues, actualValues) + s.Equal(test.expectedError, err) + } + + removeTests := []struct { + validKey string + invalidKey string + }{ + {"test1", "abc"}, + {"test1/a", "test1/lskfjal"}, + {"test1/b", "test1/b"}, + {"test2", "-"}, + } + + for _, test := range removeTests { + err := etcdKV.Remove(test.validKey) + s.NoError(err) + + _, err = etcdKV.Load(test.validKey) + s.Error(err) + + err = etcdKV.Remove(test.validKey) + s.NoError(err) + err = etcdKV.Remove(test.invalidKey) + s.NoError(err) + } +} + +func (s *EtcdKVSuite) TestSaveAndLoadBytes() { + etcdKV := s.etcdKV + + saveAndLoadTests := []struct { + key string + value string + }{ + {"test1", "value1"}, + {"test2", "value2"}, + {"test1/a", "value_a"}, + {"test1/b", "value_b"}, + } + + for i, test := range saveAndLoadTests { + if i < 4 { + err := etcdKV.SaveBytes(test.key, []byte(test.value)) + s.Require().NoError(err) + } + + val, err := etcdKV.LoadBytes(test.key) + s.NoError(err) + s.Equal(test.value, string(val)) + } + + invalidLoadTests := []struct { + invalidKey string + }{ + {"t"}, + {"a"}, + {"test1a"}, + } + + for _, test := range invalidLoadTests { + val, err := etcdKV.LoadBytes(test.invalidKey) + s.Error(err) + s.Zero(string(val)) + } + + loadPrefixTests := []struct { + prefix string + + expectedKeys []string + expectedValues []string + expectedError error + }{ + {"test", []string{ + etcdKV.GetPath("test1"), + etcdKV.GetPath("test2"), + etcdKV.GetPath("test1/a"), + etcdKV.GetPath("test1/b"), + }, []string{"value1", "value2", "value_a", "value_b"}, nil}, + {"test1", []string{ + etcdKV.GetPath("test1"), + etcdKV.GetPath("test1/a"), + etcdKV.GetPath("test1/b"), + }, []string{"value1", "value_a", "value_b"}, nil}, + {"test2", []string{etcdKV.GetPath("test2")}, []string{"value2"}, nil}, + {"", []string{ + etcdKV.GetPath("test1"), + etcdKV.GetPath("test2"), + etcdKV.GetPath("test1/a"), + etcdKV.GetPath("test1/b"), + }, []string{"value1", "value2", "value_a", "value_b"}, nil}, + {"test1/a", []string{etcdKV.GetPath("test1/a")}, []string{"value_a"}, nil}, + {"a", []string{}, []string{}, nil}, + {"root", []string{}, []string{}, nil}, + {"/etcd/test/root", []string{}, []string{}, nil}, + } + + for _, test := range loadPrefixTests { + actualKeys, actualValues, err := etcdKV.LoadBytesWithPrefix(test.prefix) + actualStringValues := make([]string, len(actualValues)) + for i := range actualValues { + actualStringValues[i] = string(actualValues[i]) + } + s.ElementsMatch(test.expectedKeys, actualKeys) + s.ElementsMatch(test.expectedValues, actualStringValues) + s.Equal(test.expectedError, err) + + actualKeys, actualValues, versions, err := etcdKV.LoadBytesWithPrefix2(test.prefix) + actualStringValues = make([]string, len(actualValues)) + for i := range actualValues { + actualStringValues[i] = string(actualValues[i]) + } + s.ElementsMatch(test.expectedKeys, actualKeys) + s.ElementsMatch(test.expectedValues, actualStringValues) + s.NotZero(versions) + s.Equal(test.expectedError, err) + } + + removeTests := []struct { + validKey string + invalidKey string + }{ + {"test1", "abc"}, + {"test1/a", "test1/lskfjal"}, + {"test1/b", "test1/b"}, + {"test2", "-"}, + } + + for _, test := range removeTests { + err := etcdKV.Remove(test.validKey) + s.NoError(err) + + _, err = etcdKV.Load(test.validKey) + s.Error(err) + + err = etcdKV.Remove(test.validKey) + s.NoError(err) + err = etcdKV.Remove(test.invalidKey) + s.NoError(err) + } +} + +func (s *EtcdKVSuite) TestLoadBytesWithRevision() { + etcdKV := s.etcdKV + + prepareKV := []struct { + inKey string + inValue string + }{ + {"a", "a_version1"}, + {"b", "b_version2"}, + {"a", "a_version3"}, + {"c", "c_version4"}, + {"a/suba", "a_version5"}, + } + + for _, test := range prepareKV { + err := etcdKV.SaveBytes(test.inKey, []byte(test.inValue)) + s.NoError(err) + } + + loadWithRevisionTests := []struct { + inKey string + + expectedKeyNo int + expectedValues []string + }{ + {"a", 2, []string{"a_version3", "a_version5"}}, + {"b", 1, []string{"b_version2"}}, + {"c", 1, []string{"c_version4"}}, + } + + for _, test := range loadWithRevisionTests { + keys, values, revision, err := etcdKV.LoadBytesWithRevision(test.inKey) + s.NoError(err) + s.Equal(test.expectedKeyNo, len(keys)) + stringValues := make([]string, len(values)) + for i := range values { + stringValues[i] = string(values[i]) + } + s.ElementsMatch(test.expectedValues, stringValues) + s.NotZero(revision) + } +} + +func (s *EtcdKVSuite) TestMultiSaveAndMultiLoad() { + etcdKV := s.etcdKV + multiSaveTests := map[string]string{ + "key_1": "value_1", + "key_2": "value_2", + "key_3/a": "value_3a", + "multikey_1": "multivalue_1", + "multikey_2": "multivalue_2", + "_": "other", + } + + err := etcdKV.MultiSave(multiSaveTests) + s.Require().NoError(err) + for k, v := range multiSaveTests { + actualV, err := etcdKV.Load(k) + s.NoError(err) + s.Equal(v, actualV) + } + + multiLoadTests := []struct { + inputKeys []string + expectedValues []string + }{ + {[]string{"key_1"}, []string{"value_1"}}, + {[]string{"key_1", "key_2", "key_3/a"}, []string{"value_1", "value_2", "value_3a"}}, + {[]string{"multikey_1", "multikey_2"}, []string{"multivalue_1", "multivalue_2"}}, + {[]string{"_"}, []string{"other"}}, + } + + for _, test := range multiLoadTests { + vs, err := etcdKV.MultiLoad(test.inputKeys) + s.NoError(err) + s.Equal(test.expectedValues, vs) + } + + invalidMultiLoad := []struct { + invalidKeys []string + expectedValues []string + }{ + {[]string{"a", "key_1"}, []string{"", "value_1"}}, + {[]string{".....", "key_1"}, []string{"", "value_1"}}, + {[]string{"*********"}, []string{""}}, + {[]string{"key_1", "1"}, []string{"value_1", ""}}, + } + + for _, test := range invalidMultiLoad { + vs, err := etcdKV.MultiLoad(test.invalidKeys) + s.Error(err) + s.Equal(test.expectedValues, vs) + } + + removeWithPrefixTests := []string{ + "key_1", + "multi", + } + + for _, k := range removeWithPrefixTests { + err = etcdKV.RemoveWithPrefix(k) + s.NoError(err) + + ks, vs, err := etcdKV.LoadWithPrefix(k) + s.Empty(ks) + s.Empty(vs) + s.NoError(err) + } + + multiRemoveTests := []string{ + "key_2", + "key_3/a", + "multikey_2", + "_", + } + + err = etcdKV.MultiRemove(multiRemoveTests) + s.NoError(err) + + ks, vs, err := etcdKV.LoadWithPrefix("") + s.NoError(err) + s.Empty(ks) + s.Empty(vs) + + multiSaveAndRemoveTests := []struct { + multiSaves map[string]string + multiRemoves []string + }{ + {map[string]string{"key_1": "value_1"}, []string{}}, + {map[string]string{"key_2": "value_2"}, []string{"key_1"}}, + {map[string]string{"key_3/a": "value_3a"}, []string{"key_2"}}, + {map[string]string{"multikey_1": "multivalue_1"}, []string{}}, + {map[string]string{"multikey_2": "multivalue_2"}, []string{"multikey_1", "key_3/a"}}, + {make(map[string]string), []string{"multikey_2"}}, + } + for _, test := range multiSaveAndRemoveTests { + err = etcdKV.MultiSaveAndRemove(test.multiSaves, test.multiRemoves) + s.NoError(err) + } + + ks, vs, err = etcdKV.LoadWithPrefix("") + s.NoError(err) + s.Empty(ks) + s.Empty(vs) +} + +func (s *EtcdKVSuite) TestMultiSaveBytesAndMultiLoadBytes() { + etcdKV := s.etcdKV + multiSaveTests := map[string]string{ + "key_1": "value_1", + "key_2": "value_2", + "key_3/a": "value_3a", + "multikey_1": "multivalue_1", + "multikey_2": "multivalue_2", + "_": "other", + } + + multiSaveBytesTests := make(map[string][]byte) + for k, v := range multiSaveTests { + multiSaveBytesTests[k] = []byte(v) + } + + err := etcdKV.MultiSaveBytes(multiSaveBytesTests) + s.Require().NoError(err) + for k, v := range multiSaveTests { + actualV, err := etcdKV.LoadBytes(k) + s.NoError(err) + s.Equal(v, string(actualV)) + } + + multiLoadTests := []struct { + inputKeys []string + expectedValues []string + }{ + {[]string{"key_1"}, []string{"value_1"}}, + {[]string{"key_1", "key_2", "key_3/a"}, []string{"value_1", "value_2", "value_3a"}}, + {[]string{"multikey_1", "multikey_2"}, []string{"multivalue_1", "multivalue_2"}}, + {[]string{"_"}, []string{"other"}}, + } + + for _, test := range multiLoadTests { + vs, err := etcdKV.MultiLoadBytes(test.inputKeys) + stringVs := make([]string, len(vs)) + for i := range vs { + stringVs[i] = string(vs[i]) + } + s.NoError(err) + s.Equal(test.expectedValues, stringVs) + } + + invalidMultiLoad := []struct { + invalidKeys []string + expectedValues []string + }{ + {[]string{"a", "key_1"}, []string{"", "value_1"}}, + {[]string{".....", "key_1"}, []string{"", "value_1"}}, + {[]string{"*********"}, []string{""}}, + {[]string{"key_1", "1"}, []string{"value_1", ""}}, + } + + for _, test := range invalidMultiLoad { + vs, err := etcdKV.MultiLoadBytes(test.invalidKeys) + stringVs := make([]string, len(vs)) + for i := range vs { + stringVs[i] = string(vs[i]) + } + s.Error(err) + s.Equal(test.expectedValues, stringVs) + } + + removeWithPrefixTests := []string{ + "key_1", + "multi", + } + + for _, k := range removeWithPrefixTests { + err = etcdKV.RemoveWithPrefix(k) + s.NoError(err) + + ks, vs, err := etcdKV.LoadBytesWithPrefix(k) + s.Empty(ks) + s.Empty(vs) + s.NoError(err) + } + + multiRemoveTests := []string{ + "key_2", + "key_3/a", + "multikey_2", + "_", + } + + err = etcdKV.MultiRemove(multiRemoveTests) + s.NoError(err) + + ks, vs, err := etcdKV.LoadBytesWithPrefix("") + s.NoError(err) + s.Empty(ks) + s.Empty(vs) + + multiSaveAndRemoveTests := []struct { + multiSaves map[string][]byte + multiRemoves []string + }{ + {map[string][]byte{"key_1": []byte("value_1")}, []string{}}, + {map[string][]byte{"key_2": []byte("value_2")}, []string{"key_1"}}, + {map[string][]byte{"key_3/a": []byte("value_3a")}, []string{"key_2"}}, + {map[string][]byte{"multikey_1": []byte("multivalue_1")}, []string{}}, + {map[string][]byte{"multikey_2": []byte("multivalue_2")}, []string{"multikey_1", "key_3/a"}}, + {make(map[string][]byte), []string{"multikey_2"}}, + } + + for _, test := range multiSaveAndRemoveTests { + err = etcdKV.MultiSaveBytesAndRemove(test.multiSaves, test.multiRemoves) + s.NoError(err) + } + + ks, vs, err = etcdKV.LoadBytesWithPrefix("") + s.NoError(err) + s.Empty(ks) + s.Empty(vs) +} + +func (s *EtcdKVSuite) TestTxnWithPredicates() { + etcdKV := s.etcdKV + + prepareKV := map[string]string{ + "lease1": "1", + "lease2": "2", + } + + err := etcdKV.MultiSave(prepareKV) + s.Require().NoError(err) + + badPredicate := predicates.NewMockPredicate(s.T()) + badPredicate.EXPECT().Type().Return(0) + badPredicate.EXPECT().Target().Return(predicates.PredTargetValue) + + multiSaveAndRemovePredTests := []struct { + tag string + multiSave map[string]string + preds []predicates.Predicate + expectSuccess bool + }{ + {"predicate_ok", map[string]string{"a": "b"}, []predicates.Predicate{predicates.ValueEqual("lease1", "1")}, true}, + {"predicate_fail", map[string]string{"a": "b"}, []predicates.Predicate{predicates.ValueEqual("lease1", "2")}, false}, + {"bad_predicate", map[string]string{"a": "b"}, []predicates.Predicate{badPredicate}, false}, + } + + for _, test := range multiSaveAndRemovePredTests { + s.Run(test.tag, func() { + err := etcdKV.MultiSaveAndRemove(test.multiSave, nil, test.preds...) + if test.expectSuccess { + s.NoError(err) + } else { + s.Error(err) } - - val, err := etcdKV.Load(test.key) - assert.NoError(t, err) - assert.Equal(t, test.value, val) - } - - invalidLoadTests := []struct { - invalidKey string - }{ - {"t"}, - {"a"}, - {"test1a"}, - } - - for _, test := range invalidLoadTests { - val, err := etcdKV.Load(test.invalidKey) - assert.Error(t, err) - assert.Zero(t, val) - } - - loadPrefixTests := []struct { - prefix string - - expectedKeys []string - expectedValues []string - expectedError error - }{ - {"test", []string{ - etcdKV.GetPath("test1"), - etcdKV.GetPath("test2"), - etcdKV.GetPath("test1/a"), - etcdKV.GetPath("test1/b"), - }, []string{"value1", "value2", "value_a", "value_b"}, nil}, - {"test1", []string{ - etcdKV.GetPath("test1"), - etcdKV.GetPath("test1/a"), - etcdKV.GetPath("test1/b"), - }, []string{"value1", "value_a", "value_b"}, nil}, - {"test2", []string{etcdKV.GetPath("test2")}, []string{"value2"}, nil}, - {"", []string{ - etcdKV.GetPath("test1"), - etcdKV.GetPath("test2"), - etcdKV.GetPath("test1/a"), - etcdKV.GetPath("test1/b"), - }, []string{"value1", "value2", "value_a", "value_b"}, nil}, - {"test1/a", []string{etcdKV.GetPath("test1/a")}, []string{"value_a"}, nil}, - {"a", []string{}, []string{}, nil}, - {"root", []string{}, []string{}, nil}, - {"/etcd/test/root", []string{}, []string{}, nil}, - } - - for _, test := range loadPrefixTests { - actualKeys, actualValues, err := etcdKV.LoadWithPrefix(test.prefix) - assert.ElementsMatch(t, test.expectedKeys, actualKeys) - assert.ElementsMatch(t, test.expectedValues, actualValues) - assert.Equal(t, test.expectedError, err) - } - - removeTests := []struct { - validKey string - invalidKey string - }{ - {"test1", "abc"}, - {"test1/a", "test1/lskfjal"}, - {"test1/b", "test1/b"}, - {"test2", "-"}, - } - - for _, test := range removeTests { - err = etcdKV.Remove(test.validKey) - assert.NoError(t, err) - - _, err = etcdKV.Load(test.validKey) - assert.Error(t, err) - - err = etcdKV.Remove(test.validKey) - assert.NoError(t, err) - err = etcdKV.Remove(test.invalidKey) - assert.NoError(t, err) - } - }) - - te.Run("etcdKV SaveAndLoadBytes", func(t *testing.T) { - rootPath := "/etcd/test/root/saveandloadbytes" - etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath) - err = etcdKV.RemoveWithPrefix("") - require.NoError(t, err) - - defer etcdKV.Close() - defer etcdKV.RemoveWithPrefix("") - - saveAndLoadTests := []struct { - key string - value string - }{ - {"test1", "value1"}, - {"test2", "value2"}, - {"test1/a", "value_a"}, - {"test1/b", "value_b"}, - } - - for i, test := range saveAndLoadTests { - if i < 4 { - err = etcdKV.SaveBytes(test.key, []byte(test.value)) - assert.NoError(t, err) + err = etcdKV.MultiSaveAndRemoveWithPrefix(test.multiSave, nil, test.preds...) + if test.expectSuccess { + s.NoError(err) + } else { + s.Error(err) } + }) + } +} - val, err := etcdKV.LoadBytes(test.key) - assert.NoError(t, err) - assert.Equal(t, test.value, string(val)) - } +func (s *EtcdKVSuite) TestMultiSaveAndRemoveWithPrefix() { + etcdKV := s.etcdKV - invalidLoadTests := []struct { - invalidKey string - }{ - {"t"}, - {"a"}, - {"test1a"}, - } + prepareTests := map[string]string{ + "x/abc/1": "1", + "x/abc/2": "2", + "x/def/1": "10", + "x/def/2": "20", + "x/den/1": "100", + "x/den/2": "200", + } - for _, test := range invalidLoadTests { - val, err := etcdKV.LoadBytes(test.invalidKey) - assert.Error(t, err) - assert.Zero(t, string(val)) - } + // MultiSaveAndRemoveWithPrefix + err := etcdKV.MultiSave(prepareTests) + s.Require().NoError(err) + multiSaveAndRemoveWithPrefixTests := []struct { + multiSave map[string]string + prefix []string - loadPrefixTests := []struct { - prefix string + loadPrefix string + lengthBeforeRemove int + lengthAfterRemove int + }{ + {map[string]string{}, []string{"x/abc", "x/def", "x/den"}, "x", 6, 0}, + {map[string]string{"y/a": "vvv", "y/b": "vvv"}, []string{}, "y", 0, 2}, + {map[string]string{"y/c": "vvv"}, []string{}, "y", 2, 3}, + {map[string]string{"p/a": "vvv"}, []string{"y/a", "y"}, "y", 3, 0}, + {map[string]string{}, []string{"p"}, "p", 1, 0}, + } - expectedKeys []string - expectedValues []string - expectedError error - }{ - {"test", []string{ - etcdKV.GetPath("test1"), - etcdKV.GetPath("test2"), - etcdKV.GetPath("test1/a"), - etcdKV.GetPath("test1/b"), - }, []string{"value1", "value2", "value_a", "value_b"}, nil}, - {"test1", []string{ - etcdKV.GetPath("test1"), - etcdKV.GetPath("test1/a"), - etcdKV.GetPath("test1/b"), - }, []string{"value1", "value_a", "value_b"}, nil}, - {"test2", []string{etcdKV.GetPath("test2")}, []string{"value2"}, nil}, - {"", []string{ - etcdKV.GetPath("test1"), - etcdKV.GetPath("test2"), - etcdKV.GetPath("test1/a"), - etcdKV.GetPath("test1/b"), - }, []string{"value1", "value2", "value_a", "value_b"}, nil}, - {"test1/a", []string{etcdKV.GetPath("test1/a")}, []string{"value_a"}, nil}, - {"a", []string{}, []string{}, nil}, - {"root", []string{}, []string{}, nil}, - {"/etcd/test/root", []string{}, []string{}, nil}, - } + for _, test := range multiSaveAndRemoveWithPrefixTests { + k, _, err := etcdKV.LoadWithPrefix(test.loadPrefix) + s.NoError(err) + s.Equal(test.lengthBeforeRemove, len(k)) - for _, test := range loadPrefixTests { - actualKeys, actualValues, err := etcdKV.LoadBytesWithPrefix(test.prefix) - actualStringValues := make([]string, len(actualValues)) - for i := range actualValues { - actualStringValues[i] = string(actualValues[i]) - } - assert.ElementsMatch(t, test.expectedKeys, actualKeys) - assert.ElementsMatch(t, test.expectedValues, actualStringValues) - assert.Equal(t, test.expectedError, err) + err = etcdKV.MultiSaveAndRemoveWithPrefix(test.multiSave, test.prefix) + s.NoError(err) - actualKeys, actualValues, versions, err := etcdKV.LoadBytesWithPrefix2(test.prefix) - actualStringValues = make([]string, len(actualValues)) - for i := range actualValues { - actualStringValues[i] = string(actualValues[i]) - } - assert.ElementsMatch(t, test.expectedKeys, actualKeys) - assert.ElementsMatch(t, test.expectedValues, actualStringValues) - assert.NotZero(t, versions) - assert.Equal(t, test.expectedError, err) - } + k, _, err = etcdKV.LoadWithPrefix(test.loadPrefix) + s.NoError(err) + s.Equal(test.lengthAfterRemove, len(k)) + } +} - removeTests := []struct { - validKey string - invalidKey string - }{ - {"test1", "abc"}, - {"test1/a", "test1/lskfjal"}, - {"test1/b", "test1/b"}, - {"test2", "-"}, - } +func (s *EtcdKVSuite) TestWatch() { + etcdKV := s.etcdKV - for _, test := range removeTests { - err = etcdKV.Remove(test.validKey) - assert.NoError(t, err) + ch := etcdKV.Watch("x") + resp := <-ch + s.True(resp.Created) - _, err = etcdKV.Load(test.validKey) - assert.Error(t, err) + ch = etcdKV.WatchWithPrefix("x") + resp = <-ch + s.True(resp.Created) +} - err = etcdKV.Remove(test.validKey) - assert.NoError(t, err) - err = etcdKV.Remove(test.invalidKey) - assert.NoError(t, err) - } - }) +func (s *EtcdKVSuite) TestRevisionBytes() { + etcdKV := s.etcdKV - te.Run("etcdKV LoadBytesWithRevision", func(t *testing.T) { - rootPath := "/etcd/test/root/LoadBytesWithRevision" - etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath) + revisionTests := []struct { + inKey string + fistValue []byte + secondValue []byte + }{ + {"a", []byte("v1"), []byte("v11")}, + {"y", []byte("v2"), []byte("v22")}, + {"z", []byte("v3"), []byte("v33")}, + } - defer etcdKV.Close() - defer etcdKV.RemoveWithPrefix("") + for _, test := range revisionTests { + err := etcdKV.SaveBytes(test.inKey, test.fistValue) + s.Require().NoError(err) - prepareKV := []struct { - inKey string - inValue string - }{ - {"a", "a_version1"}, - {"b", "b_version2"}, - {"a", "a_version3"}, - {"c", "c_version4"}, - {"a/suba", "a_version5"}, - } + _, _, revision, _ := etcdKV.LoadBytesWithRevision(test.inKey) + ch := etcdKV.WatchWithRevision(test.inKey, revision+1) - for _, test := range prepareKV { - err = etcdKV.SaveBytes(test.inKey, []byte(test.inValue)) - require.NoError(t, err) - } + err = etcdKV.SaveBytes(test.inKey, test.secondValue) + s.Require().NoError(err) - loadWithRevisionTests := []struct { - inKey string - - expectedKeyNo int - expectedValues []string - }{ - {"a", 2, []string{"a_version3", "a_version5"}}, - {"b", 1, []string{"b_version2"}}, - {"c", 1, []string{"c_version4"}}, - } - - for _, test := range loadWithRevisionTests { - keys, values, revision, err := etcdKV.LoadBytesWithRevision(test.inKey) - assert.NoError(t, err) - assert.Equal(t, test.expectedKeyNo, len(keys)) - stringValues := make([]string, len(values)) - for i := range values { - stringValues[i] = string(values[i]) - } - assert.ElementsMatch(t, test.expectedValues, stringValues) - assert.NotZero(t, revision) - } - }) - - te.Run("etcdKV MultiSaveAndMultiLoad", func(t *testing.T) { - rootPath := "/etcd/test/root/multi_save_and_multi_load" - etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath) - - defer etcdKV.Close() - defer etcdKV.RemoveWithPrefix("") - - multiSaveTests := map[string]string{ - "key_1": "value_1", - "key_2": "value_2", - "key_3/a": "value_3a", - "multikey_1": "multivalue_1", - "multikey_2": "multivalue_2", - "_": "other", - } - - err = etcdKV.MultiSave(multiSaveTests) - assert.NoError(t, err) - for k, v := range multiSaveTests { - actualV, err := etcdKV.Load(k) - assert.NoError(t, err) - assert.Equal(t, v, actualV) - } - - multiLoadTests := []struct { - inputKeys []string - expectedValues []string - }{ - {[]string{"key_1"}, []string{"value_1"}}, - {[]string{"key_1", "key_2", "key_3/a"}, []string{"value_1", "value_2", "value_3a"}}, - {[]string{"multikey_1", "multikey_2"}, []string{"multivalue_1", "multivalue_2"}}, - {[]string{"_"}, []string{"other"}}, - } - - for _, test := range multiLoadTests { - vs, err := etcdKV.MultiLoad(test.inputKeys) - assert.NoError(t, err) - assert.Equal(t, test.expectedValues, vs) - } - - invalidMultiLoad := []struct { - invalidKeys []string - expectedValues []string - }{ - {[]string{"a", "key_1"}, []string{"", "value_1"}}, - {[]string{".....", "key_1"}, []string{"", "value_1"}}, - {[]string{"*********"}, []string{""}}, - {[]string{"key_1", "1"}, []string{"value_1", ""}}, - } - - for _, test := range invalidMultiLoad { - vs, err := etcdKV.MultiLoad(test.invalidKeys) - assert.Error(t, err) - assert.Equal(t, test.expectedValues, vs) - } - - removeWithPrefixTests := []string{ - "key_1", - "multi", - } - - for _, k := range removeWithPrefixTests { - err = etcdKV.RemoveWithPrefix(k) - assert.NoError(t, err) - - ks, vs, err := etcdKV.LoadWithPrefix(k) - assert.Empty(t, ks) - assert.Empty(t, vs) - assert.NoError(t, err) - } - - multiRemoveTests := []string{ - "key_2", - "key_3/a", - "multikey_2", - "_", - } - - err = etcdKV.MultiRemove(multiRemoveTests) - assert.NoError(t, err) - - ks, vs, err := etcdKV.LoadWithPrefix("") - assert.NoError(t, err) - assert.Empty(t, ks) - assert.Empty(t, vs) - - multiSaveAndRemoveTests := []struct { - multiSaves map[string]string - multiRemoves []string - }{ - {map[string]string{"key_1": "value_1"}, []string{}}, - {map[string]string{"key_2": "value_2"}, []string{"key_1"}}, - {map[string]string{"key_3/a": "value_3a"}, []string{"key_2"}}, - {map[string]string{"multikey_1": "multivalue_1"}, []string{}}, - {map[string]string{"multikey_2": "multivalue_2"}, []string{"multikey_1", "key_3/a"}}, - {make(map[string]string), []string{"multikey_2"}}, - } - for _, test := range multiSaveAndRemoveTests { - err = etcdKV.MultiSaveAndRemove(test.multiSaves, test.multiRemoves) - assert.NoError(t, err) - } - - ks, vs, err = etcdKV.LoadWithPrefix("") - assert.NoError(t, err) - assert.Empty(t, ks) - assert.Empty(t, vs) - }) - - te.Run("etcdKV MultiSaveBytesAndMultiLoadBytes", func(t *testing.T) { - rootPath := "/etcd/test/root/multi_save_bytes_and_multi_load_bytes" - etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath) - - defer etcdKV.Close() - defer etcdKV.RemoveWithPrefix("") - - multiSaveTests := map[string]string{ - "key_1": "value_1", - "key_2": "value_2", - "key_3/a": "value_3a", - "multikey_1": "multivalue_1", - "multikey_2": "multivalue_2", - "_": "other", - } - - multiSaveBytesTests := make(map[string][]byte) - for k, v := range multiSaveTests { - multiSaveBytesTests[k] = []byte(v) - } - - err = etcdKV.MultiSaveBytes(multiSaveBytesTests) - assert.NoError(t, err) - for k, v := range multiSaveTests { - actualV, err := etcdKV.LoadBytes(k) - assert.NoError(t, err) - assert.Equal(t, v, string(actualV)) - } - - multiLoadTests := []struct { - inputKeys []string - expectedValues []string - }{ - {[]string{"key_1"}, []string{"value_1"}}, - {[]string{"key_1", "key_2", "key_3/a"}, []string{"value_1", "value_2", "value_3a"}}, - {[]string{"multikey_1", "multikey_2"}, []string{"multivalue_1", "multivalue_2"}}, - {[]string{"_"}, []string{"other"}}, - } - - for _, test := range multiLoadTests { - vs, err := etcdKV.MultiLoadBytes(test.inputKeys) - stringVs := make([]string, len(vs)) - for i := range vs { - stringVs[i] = string(vs[i]) - } - assert.NoError(t, err) - assert.Equal(t, test.expectedValues, stringVs) - } - - invalidMultiLoad := []struct { - invalidKeys []string - expectedValues []string - }{ - {[]string{"a", "key_1"}, []string{"", "value_1"}}, - {[]string{".....", "key_1"}, []string{"", "value_1"}}, - {[]string{"*********"}, []string{""}}, - {[]string{"key_1", "1"}, []string{"value_1", ""}}, - } - - for _, test := range invalidMultiLoad { - vs, err := etcdKV.MultiLoadBytes(test.invalidKeys) - stringVs := make([]string, len(vs)) - for i := range vs { - stringVs[i] = string(vs[i]) - } - assert.Error(t, err) - assert.Equal(t, test.expectedValues, stringVs) - } - - removeWithPrefixTests := []string{ - "key_1", - "multi", - } - - for _, k := range removeWithPrefixTests { - err = etcdKV.RemoveWithPrefix(k) - assert.NoError(t, err) - - ks, vs, err := etcdKV.LoadBytesWithPrefix(k) - assert.Empty(t, ks) - assert.Empty(t, vs) - assert.NoError(t, err) - } - - multiRemoveTests := []string{ - "key_2", - "key_3/a", - "multikey_2", - "_", - } - - err = etcdKV.MultiRemove(multiRemoveTests) - assert.NoError(t, err) - - ks, vs, err := etcdKV.LoadBytesWithPrefix("") - assert.NoError(t, err) - assert.Empty(t, ks) - assert.Empty(t, vs) - - multiSaveAndRemoveTests := []struct { - multiSaves map[string][]byte - multiRemoves []string - }{ - {map[string][]byte{"key_1": []byte("value_1")}, []string{}}, - {map[string][]byte{"key_2": []byte("value_2")}, []string{"key_1"}}, - {map[string][]byte{"key_3/a": []byte("value_3a")}, []string{"key_2"}}, - {map[string][]byte{"multikey_1": []byte("multivalue_1")}, []string{}}, - {map[string][]byte{"multikey_2": []byte("multivalue_2")}, []string{"multikey_1", "key_3/a"}}, - {make(map[string][]byte), []string{"multikey_2"}}, - } - - for _, test := range multiSaveAndRemoveTests { - err = etcdKV.MultiSaveBytesAndRemove(test.multiSaves, test.multiRemoves) - assert.NoError(t, err) - } - - ks, vs, err = etcdKV.LoadBytesWithPrefix("") - assert.NoError(t, err) - assert.Empty(t, ks) - assert.Empty(t, vs) - }) - - te.Run("etcdKV MultiSaveAndRemoveWithPrefix", func(t *testing.T) { - rootPath := "/etcd/test/root/multi_save_and_remove_with_prefix" - etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath) - defer etcdKV.Close() - defer etcdKV.RemoveWithPrefix("") - - prepareTests := map[string]string{ - "x/abc/1": "1", - "x/abc/2": "2", - "x/def/1": "10", - "x/def/2": "20", - "x/den/1": "100", - "x/den/2": "200", - } - - // MultiSaveAndRemoveWithPrefix - err = etcdKV.MultiSave(prepareTests) - require.NoError(t, err) - multiSaveAndRemoveWithPrefixTests := []struct { - multiSave map[string]string - prefix []string - - loadPrefix string - lengthBeforeRemove int - lengthAfterRemove int - }{ - {map[string]string{}, []string{"x/abc", "x/def", "x/den"}, "x", 6, 0}, - {map[string]string{"y/a": "vvv", "y/b": "vvv"}, []string{}, "y", 0, 2}, - {map[string]string{"y/c": "vvv"}, []string{}, "y", 2, 3}, - {map[string]string{"p/a": "vvv"}, []string{"y/a", "y"}, "y", 3, 0}, - {map[string]string{}, []string{"p"}, "p", 1, 0}, - } - - for _, test := range multiSaveAndRemoveWithPrefixTests { - k, _, err := etcdKV.LoadWithPrefix(test.loadPrefix) - assert.NoError(t, err) - assert.Equal(t, test.lengthBeforeRemove, len(k)) - - err = etcdKV.MultiSaveAndRemoveWithPrefix(test.multiSave, test.prefix) - assert.NoError(t, err) - - k, _, err = etcdKV.LoadWithPrefix(test.loadPrefix) - assert.NoError(t, err) - assert.Equal(t, test.lengthAfterRemove, len(k)) - } - }) - - te.Run("etcdKV Watch", func(t *testing.T) { - rootPath := "/etcd/test/root/watch" - etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath) - - defer etcdKV.Close() - defer etcdKV.RemoveWithPrefix("") - - ch := etcdKV.Watch("x") resp := <-ch - assert.True(t, resp.Created) + s.Equal(1, len(resp.Events)) + s.Equal(string(test.secondValue), string(resp.Events[0].Kv.Value)) + s.Equal(revision+1, resp.Header.Revision) + } - ch = etcdKV.WatchWithPrefix("x") - resp = <-ch - assert.True(t, resp.Created) - }) + success, err := etcdKV.CompareVersionAndSwapBytes("a/b/c", 0, []byte("1")) + s.NoError(err) + s.True(success) - te.Run("Etcd Revision Bytes", func(t *testing.T) { - rootPath := "/etcd/test/root/revision_bytes" - etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath) - defer etcdKV.Close() - defer etcdKV.RemoveWithPrefix("") + value, err := etcdKV.LoadBytes("a/b/c") + s.NoError(err) + s.Equal(string(value), "1") - revisionTests := []struct { - inKey string - fistValue []byte - secondValue []byte - }{ - {"a", []byte("v1"), []byte("v11")}, - {"y", []byte("v2"), []byte("v22")}, - {"z", []byte("v3"), []byte("v33")}, - } + success, err = etcdKV.CompareVersionAndSwapBytes("a/b/c", 0, []byte("1")) + s.NoError(err) + s.False(success) +} - for _, test := range revisionTests { - err = etcdKV.SaveBytes(test.inKey, test.fistValue) - require.NoError(t, err) - - _, _, revision, _ := etcdKV.LoadBytesWithRevision(test.inKey) - ch := etcdKV.WatchWithRevision(test.inKey, revision+1) - - err = etcdKV.SaveBytes(test.inKey, test.secondValue) - require.NoError(t, err) - - resp := <-ch - assert.Equal(t, 1, len(resp.Events)) - assert.Equal(t, string(test.secondValue), string(resp.Events[0].Kv.Value)) - assert.Equal(t, revision+1, resp.Header.Revision) - } - - success, err := etcdKV.CompareVersionAndSwapBytes("a/b/c", 0, []byte("1")) - assert.NoError(t, err) - assert.True(t, success) - - value, err := etcdKV.LoadBytes("a/b/c") - assert.NoError(t, err) - assert.Equal(t, string(value), "1") - - success, err = etcdKV.CompareVersionAndSwapBytes("a/b/c", 0, []byte("1")) - assert.NoError(t, err) - assert.False(t, success) - }) +func TestEtcdKV(t *testing.T) { + suite.Run(t, new(EtcdKVSuite)) } func Test_WalkWithPagination(t *testing.T) { @@ -678,7 +718,7 @@ func Test_WalkWithPagination(t *testing.T) { assert.NoError(t, err) rootPath := "/etcd/test/root/pagination" - etcdKV := etcdkv.NewEtcdKV(etcdCli, rootPath) + etcdKV := NewEtcdKV(etcdCli, rootPath) defer etcdKV.Close() defer etcdKV.RemoveWithPrefix("") @@ -752,42 +792,42 @@ func Test_WalkWithPagination(t *testing.T) { func TestElapse(t *testing.T) { start := time.Now() - isElapse := etcdkv.CheckElapseAndWarn(start, "err message") + isElapse := CheckElapseAndWarn(start, "err message") assert.Equal(t, isElapse, false) time.Sleep(2001 * time.Millisecond) - isElapse = etcdkv.CheckElapseAndWarn(start, "err message") + isElapse = CheckElapseAndWarn(start, "err message") assert.Equal(t, isElapse, true) } func TestCheckValueSizeAndWarn(t *testing.T) { - ret := etcdkv.CheckValueSizeAndWarn("k", "v") + ret := CheckValueSizeAndWarn("k", "v") assert.False(t, ret) v := make([]byte, 1024000) - ret = etcdkv.CheckValueSizeAndWarn("k", v) + ret = CheckValueSizeAndWarn("k", v) assert.True(t, ret) } func TestCheckTnxBytesValueSizeAndWarn(t *testing.T) { kvs := make(map[string][]byte, 0) kvs["k"] = []byte("v") - ret := etcdkv.CheckTnxBytesValueSizeAndWarn(kvs) + ret := CheckTnxBytesValueSizeAndWarn(kvs) assert.False(t, ret) kvs["k"] = make([]byte, 1024000) - ret = etcdkv.CheckTnxBytesValueSizeAndWarn(kvs) + ret = CheckTnxBytesValueSizeAndWarn(kvs) assert.True(t, ret) } func TestCheckTnxStringValueSizeAndWarn(t *testing.T) { kvs := make(map[string]string, 0) kvs["k"] = "v" - ret := etcdkv.CheckTnxStringValueSizeAndWarn(kvs) + ret := CheckTnxStringValueSizeAndWarn(kvs) assert.False(t, ret) kvs["k1"] = funcutil.RandomString(1024000) - ret = etcdkv.CheckTnxStringValueSizeAndWarn(kvs) + ret = CheckTnxStringValueSizeAndWarn(kvs) assert.True(t, ret) } @@ -803,7 +843,7 @@ func TestHas(t *testing.T) { defer etcdCli.Close() assert.NoError(t, err) rootPath := "/etcd/test/root/has" - kv := etcdkv.NewEtcdKV(etcdCli, rootPath) + kv := NewEtcdKV(etcdCli, rootPath) err = kv.RemoveWithPrefix("") require.NoError(t, err) @@ -841,7 +881,7 @@ func TestHasPrefix(t *testing.T) { defer etcdCli.Close() assert.NoError(t, err) rootPath := "/etcd/test/root/hasprefix" - kv := etcdkv.NewEtcdKV(etcdCli, rootPath) + kv := NewEtcdKV(etcdCli, rootPath) err = kv.RemoveWithPrefix("") require.NoError(t, err) diff --git a/internal/kv/etcd/util.go b/internal/kv/etcd/util.go new file mode 100644 index 0000000000..6363ddb5f9 --- /dev/null +++ b/internal/kv/etcd/util.go @@ -0,0 +1,42 @@ +package etcdkv + +import ( + "fmt" + "path" + + clientv3 "go.etcd.io/etcd/client/v3" + + "github.com/milvus-io/milvus/internal/kv/predicates" + "github.com/milvus-io/milvus/pkg/util/merr" +) + +func parsePredicates(rootPath string, preds ...predicates.Predicate) ([]clientv3.Cmp, error) { + if len(preds) == 0 { + return []clientv3.Cmp{}, nil + } + result := make([]clientv3.Cmp, 0, len(preds)) + for _, pred := range preds { + switch pred.Target() { + case predicates.PredTargetValue: + pt, err := parsePredicateType(pred.Type()) + if err != nil { + return nil, err + } + cmp := clientv3.Compare(clientv3.Value(path.Join(rootPath, pred.Key())), pt, pred.TargetValue()) + result = append(result, cmp) + default: + return nil, merr.WrapErrParameterInvalid("valid predicate target", fmt.Sprintf("%d", pred.Target())) + } + } + return result, nil +} + +// parsePredicateType parse predicates.PredicateType to clientv3.Result +func parsePredicateType(pt predicates.PredicateType) (string, error) { + switch pt { + case predicates.PredTypeEqual: + return "=", nil + default: + return "", merr.WrapErrParameterInvalid("valid predicate type", fmt.Sprintf("%d", pt)) + } +} diff --git a/internal/kv/etcd/util_test.go b/internal/kv/etcd/util_test.go new file mode 100644 index 0000000000..331f4845ae --- /dev/null +++ b/internal/kv/etcd/util_test.go @@ -0,0 +1,72 @@ +package etcdkv + +import ( + "testing" + + "github.com/stretchr/testify/suite" + + "github.com/milvus-io/milvus/internal/kv/predicates" +) + +type EtcdKVUtilSuite struct { + suite.Suite +} + +func (s *EtcdKVUtilSuite) TestParsePredicateType() { + type testCase struct { + tag string + pt predicates.PredicateType + expectResult string + expectSucceed bool + } + + cases := []testCase{ + {tag: "equal", pt: predicates.PredTypeEqual, expectResult: "=", expectSucceed: true}, + {tag: "zero_value", pt: 0, expectResult: "", expectSucceed: false}, + } + + for _, tc := range cases { + s.Run(tc.tag, func() { + result, err := parsePredicateType(tc.pt) + if tc.expectSucceed { + s.NoError(err) + s.Equal(tc.expectResult, result) + } else { + s.Error(err) + } + }) + } +} + +func (s *EtcdKVUtilSuite) TestParsePredicates() { + type testCase struct { + tag string + input []predicates.Predicate + expectSucceed bool + } + + badPredicate := predicates.NewMockPredicate(s.T()) + badPredicate.EXPECT().Target().Return(0) + + cases := []testCase{ + {tag: "normal_value_equal", input: []predicates.Predicate{predicates.ValueEqual("a", "b")}, expectSucceed: true}, + {tag: "empty_input", input: nil, expectSucceed: true}, + {tag: "bad_predicates", input: []predicates.Predicate{badPredicate}, expectSucceed: false}, + } + + for _, tc := range cases { + s.Run(tc.tag, func() { + result, err := parsePredicates("", tc.input...) + if tc.expectSucceed { + s.NoError(err) + s.Equal(len(tc.input), len(result)) + } else { + s.Error(err) + } + }) + } +} + +func TestEtcdKVUtil(t *testing.T) { + suite.Run(t, new(EtcdKVUtilSuite)) +} diff --git a/internal/kv/kv.go b/internal/kv/kv.go index f96128f9ef..14091cdc1e 100644 --- a/internal/kv/kv.go +++ b/internal/kv/kv.go @@ -19,6 +19,7 @@ package kv import ( clientv3 "go.etcd.io/etcd/client/v3" + "github.com/milvus-io/milvus/internal/kv/predicates" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -57,8 +58,8 @@ type BaseKV interface { //go:generate mockery --name=TxnKV --with-expecter type TxnKV interface { BaseKV - MultiSaveAndRemove(saves map[string]string, removals []string) error - MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error + MultiSaveAndRemove(saves map[string]string, removals []string, preds ...predicates.Predicate) error + MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, preds ...predicates.Predicate) error } // MetaKv is TxnKV for metadata. It should save data with lease. diff --git a/internal/kv/mem/mem_kv.go b/internal/kv/mem/mem_kv.go index 1607184c8a..5be581bb44 100644 --- a/internal/kv/mem/mem_kv.go +++ b/internal/kv/mem/mem_kv.go @@ -22,7 +22,9 @@ import ( "github.com/google/btree" + "github.com/milvus-io/milvus/internal/kv/predicates" "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/util/merr" ) // MemoryKV implements BaseKv interface and relies on underling btree.BTree. @@ -217,7 +219,10 @@ func (kv *MemoryKV) MultiRemove(keys []string) error { } // MultiSaveAndRemove saves and removes given key-value pairs in MemoryKV atomicly. -func (kv *MemoryKV) MultiSaveAndRemove(saves map[string]string, removals []string) error { +func (kv *MemoryKV) MultiSaveAndRemove(saves map[string]string, removals []string, preds ...predicates.Predicate) error { + if len(preds) > 0 { + return merr.WrapErrServiceUnavailable("predicates not supported") + } kv.Lock() defer kv.Unlock() for key, value := range saves { @@ -283,7 +288,10 @@ func (kv *MemoryKV) Close() { } // MultiSaveAndRemoveWithPrefix saves key-value pairs in @saves, & remove key with prefix in @removals in MemoryKV atomically. -func (kv *MemoryKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error { +func (kv *MemoryKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, preds ...predicates.Predicate) error { + if len(preds) > 0 { + return merr.WrapErrServiceUnavailable("predicates not supported") + } kv.Lock() defer kv.Unlock() diff --git a/internal/kv/mem/mem_kv_test.go b/internal/kv/mem/mem_kv_test.go index 79f1651c68..76e8896827 100644 --- a/internal/kv/mem/mem_kv_test.go +++ b/internal/kv/mem/mem_kv_test.go @@ -20,6 +20,9 @@ import ( "testing" "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/internal/kv/predicates" + "github.com/milvus-io/milvus/pkg/util/merr" ) func TestMemoryKV_SaveAndLoadBytes(t *testing.T) { @@ -242,3 +245,16 @@ func TestHasPrefix(t *testing.T) { assert.NoError(t, err) assert.False(t, has) } + +func TestPredicates(t *testing.T) { + kv := NewMemoryKV() + + // predicates not supported for mem kv for now + err := kv.MultiSaveAndRemove(map[string]string{}, []string{}, predicates.ValueEqual("a", "b")) + assert.Error(t, err) + assert.ErrorIs(t, err, merr.ErrServiceUnavailable) + + err = kv.MultiSaveAndRemoveWithPrefix(map[string]string{}, []string{}, predicates.ValueEqual("a", "b")) + assert.Error(t, err) + assert.ErrorIs(t, err, merr.ErrServiceUnavailable) +} diff --git a/internal/kv/mocks/meta_kv.go b/internal/kv/mocks/meta_kv.go index 7d66db0b58..5a615ff525 100644 --- a/internal/kv/mocks/meta_kv.go +++ b/internal/kv/mocks/meta_kv.go @@ -2,7 +2,10 @@ package mocks -import mock "github.com/stretchr/testify/mock" +import ( + predicates "github.com/milvus-io/milvus/internal/kv/predicates" + mock "github.com/stretchr/testify/mock" +) // MetaKv is an autogenerated mock type for the MetaKv type type MetaKv struct { @@ -502,13 +505,20 @@ func (_c *MetaKv_MultiSave_Call) RunAndReturn(run func(map[string]string) error) return _c } -// MultiSaveAndRemove provides a mock function with given fields: saves, removals -func (_m *MetaKv) MultiSaveAndRemove(saves map[string]string, removals []string) error { - ret := _m.Called(saves, removals) +// MultiSaveAndRemove provides a mock function with given fields: saves, removals, preds +func (_m *MetaKv) MultiSaveAndRemove(saves map[string]string, removals []string, preds ...predicates.Predicate) error { + _va := make([]interface{}, len(preds)) + for _i := range preds { + _va[_i] = preds[_i] + } + var _ca []interface{} + _ca = append(_ca, saves, removals) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) var r0 error - if rf, ok := ret.Get(0).(func(map[string]string, []string) error); ok { - r0 = rf(saves, removals) + if rf, ok := ret.Get(0).(func(map[string]string, []string, ...predicates.Predicate) error); ok { + r0 = rf(saves, removals, preds...) } else { r0 = ret.Error(0) } @@ -524,13 +534,21 @@ type MetaKv_MultiSaveAndRemove_Call struct { // MultiSaveAndRemove is a helper method to define mock.On call // - saves map[string]string // - removals []string -func (_e *MetaKv_Expecter) MultiSaveAndRemove(saves interface{}, removals interface{}) *MetaKv_MultiSaveAndRemove_Call { - return &MetaKv_MultiSaveAndRemove_Call{Call: _e.mock.On("MultiSaveAndRemove", saves, removals)} +// - preds ...predicates.Predicate +func (_e *MetaKv_Expecter) MultiSaveAndRemove(saves interface{}, removals interface{}, preds ...interface{}) *MetaKv_MultiSaveAndRemove_Call { + return &MetaKv_MultiSaveAndRemove_Call{Call: _e.mock.On("MultiSaveAndRemove", + append([]interface{}{saves, removals}, preds...)...)} } -func (_c *MetaKv_MultiSaveAndRemove_Call) Run(run func(saves map[string]string, removals []string)) *MetaKv_MultiSaveAndRemove_Call { +func (_c *MetaKv_MultiSaveAndRemove_Call) Run(run func(saves map[string]string, removals []string, preds ...predicates.Predicate)) *MetaKv_MultiSaveAndRemove_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(map[string]string), args[1].([]string)) + variadicArgs := make([]predicates.Predicate, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(predicates.Predicate) + } + } + run(args[0].(map[string]string), args[1].([]string), variadicArgs...) }) return _c } @@ -540,18 +558,25 @@ func (_c *MetaKv_MultiSaveAndRemove_Call) Return(_a0 error) *MetaKv_MultiSaveAnd return _c } -func (_c *MetaKv_MultiSaveAndRemove_Call) RunAndReturn(run func(map[string]string, []string) error) *MetaKv_MultiSaveAndRemove_Call { +func (_c *MetaKv_MultiSaveAndRemove_Call) RunAndReturn(run func(map[string]string, []string, ...predicates.Predicate) error) *MetaKv_MultiSaveAndRemove_Call { _c.Call.Return(run) return _c } -// MultiSaveAndRemoveWithPrefix provides a mock function with given fields: saves, removals -func (_m *MetaKv) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error { - ret := _m.Called(saves, removals) +// MultiSaveAndRemoveWithPrefix provides a mock function with given fields: saves, removals, preds +func (_m *MetaKv) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, preds ...predicates.Predicate) error { + _va := make([]interface{}, len(preds)) + for _i := range preds { + _va[_i] = preds[_i] + } + var _ca []interface{} + _ca = append(_ca, saves, removals) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) var r0 error - if rf, ok := ret.Get(0).(func(map[string]string, []string) error); ok { - r0 = rf(saves, removals) + if rf, ok := ret.Get(0).(func(map[string]string, []string, ...predicates.Predicate) error); ok { + r0 = rf(saves, removals, preds...) } else { r0 = ret.Error(0) } @@ -567,13 +592,21 @@ type MetaKv_MultiSaveAndRemoveWithPrefix_Call struct { // MultiSaveAndRemoveWithPrefix is a helper method to define mock.On call // - saves map[string]string // - removals []string -func (_e *MetaKv_Expecter) MultiSaveAndRemoveWithPrefix(saves interface{}, removals interface{}) *MetaKv_MultiSaveAndRemoveWithPrefix_Call { - return &MetaKv_MultiSaveAndRemoveWithPrefix_Call{Call: _e.mock.On("MultiSaveAndRemoveWithPrefix", saves, removals)} +// - preds ...predicates.Predicate +func (_e *MetaKv_Expecter) MultiSaveAndRemoveWithPrefix(saves interface{}, removals interface{}, preds ...interface{}) *MetaKv_MultiSaveAndRemoveWithPrefix_Call { + return &MetaKv_MultiSaveAndRemoveWithPrefix_Call{Call: _e.mock.On("MultiSaveAndRemoveWithPrefix", + append([]interface{}{saves, removals}, preds...)...)} } -func (_c *MetaKv_MultiSaveAndRemoveWithPrefix_Call) Run(run func(saves map[string]string, removals []string)) *MetaKv_MultiSaveAndRemoveWithPrefix_Call { +func (_c *MetaKv_MultiSaveAndRemoveWithPrefix_Call) Run(run func(saves map[string]string, removals []string, preds ...predicates.Predicate)) *MetaKv_MultiSaveAndRemoveWithPrefix_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(map[string]string), args[1].([]string)) + variadicArgs := make([]predicates.Predicate, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(predicates.Predicate) + } + } + run(args[0].(map[string]string), args[1].([]string), variadicArgs...) }) return _c } @@ -583,7 +616,7 @@ func (_c *MetaKv_MultiSaveAndRemoveWithPrefix_Call) Return(_a0 error) *MetaKv_Mu return _c } -func (_c *MetaKv_MultiSaveAndRemoveWithPrefix_Call) RunAndReturn(run func(map[string]string, []string) error) *MetaKv_MultiSaveAndRemoveWithPrefix_Call { +func (_c *MetaKv_MultiSaveAndRemoveWithPrefix_Call) RunAndReturn(run func(map[string]string, []string, ...predicates.Predicate) error) *MetaKv_MultiSaveAndRemoveWithPrefix_Call { _c.Call.Return(run) return _c } diff --git a/internal/kv/mocks/txn_kv.go b/internal/kv/mocks/txn_kv.go index 08659bbcd2..25bbb438ff 100644 --- a/internal/kv/mocks/txn_kv.go +++ b/internal/kv/mocks/txn_kv.go @@ -2,7 +2,10 @@ package mocks -import mock "github.com/stretchr/testify/mock" +import ( + predicates "github.com/milvus-io/milvus/internal/kv/predicates" + mock "github.com/stretchr/testify/mock" +) // TxnKV is an autogenerated mock type for the TxnKV type type TxnKV struct { @@ -406,13 +409,20 @@ func (_c *TxnKV_MultiSave_Call) RunAndReturn(run func(map[string]string) error) return _c } -// MultiSaveAndRemove provides a mock function with given fields: saves, removals -func (_m *TxnKV) MultiSaveAndRemove(saves map[string]string, removals []string) error { - ret := _m.Called(saves, removals) +// MultiSaveAndRemove provides a mock function with given fields: saves, removals, preds +func (_m *TxnKV) MultiSaveAndRemove(saves map[string]string, removals []string, preds ...predicates.Predicate) error { + _va := make([]interface{}, len(preds)) + for _i := range preds { + _va[_i] = preds[_i] + } + var _ca []interface{} + _ca = append(_ca, saves, removals) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) var r0 error - if rf, ok := ret.Get(0).(func(map[string]string, []string) error); ok { - r0 = rf(saves, removals) + if rf, ok := ret.Get(0).(func(map[string]string, []string, ...predicates.Predicate) error); ok { + r0 = rf(saves, removals, preds...) } else { r0 = ret.Error(0) } @@ -428,13 +438,21 @@ type TxnKV_MultiSaveAndRemove_Call struct { // MultiSaveAndRemove is a helper method to define mock.On call // - saves map[string]string // - removals []string -func (_e *TxnKV_Expecter) MultiSaveAndRemove(saves interface{}, removals interface{}) *TxnKV_MultiSaveAndRemove_Call { - return &TxnKV_MultiSaveAndRemove_Call{Call: _e.mock.On("MultiSaveAndRemove", saves, removals)} +// - preds ...predicates.Predicate +func (_e *TxnKV_Expecter) MultiSaveAndRemove(saves interface{}, removals interface{}, preds ...interface{}) *TxnKV_MultiSaveAndRemove_Call { + return &TxnKV_MultiSaveAndRemove_Call{Call: _e.mock.On("MultiSaveAndRemove", + append([]interface{}{saves, removals}, preds...)...)} } -func (_c *TxnKV_MultiSaveAndRemove_Call) Run(run func(saves map[string]string, removals []string)) *TxnKV_MultiSaveAndRemove_Call { +func (_c *TxnKV_MultiSaveAndRemove_Call) Run(run func(saves map[string]string, removals []string, preds ...predicates.Predicate)) *TxnKV_MultiSaveAndRemove_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(map[string]string), args[1].([]string)) + variadicArgs := make([]predicates.Predicate, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(predicates.Predicate) + } + } + run(args[0].(map[string]string), args[1].([]string), variadicArgs...) }) return _c } @@ -444,18 +462,25 @@ func (_c *TxnKV_MultiSaveAndRemove_Call) Return(_a0 error) *TxnKV_MultiSaveAndRe return _c } -func (_c *TxnKV_MultiSaveAndRemove_Call) RunAndReturn(run func(map[string]string, []string) error) *TxnKV_MultiSaveAndRemove_Call { +func (_c *TxnKV_MultiSaveAndRemove_Call) RunAndReturn(run func(map[string]string, []string, ...predicates.Predicate) error) *TxnKV_MultiSaveAndRemove_Call { _c.Call.Return(run) return _c } -// MultiSaveAndRemoveWithPrefix provides a mock function with given fields: saves, removals -func (_m *TxnKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error { - ret := _m.Called(saves, removals) +// MultiSaveAndRemoveWithPrefix provides a mock function with given fields: saves, removals, preds +func (_m *TxnKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, preds ...predicates.Predicate) error { + _va := make([]interface{}, len(preds)) + for _i := range preds { + _va[_i] = preds[_i] + } + var _ca []interface{} + _ca = append(_ca, saves, removals) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) var r0 error - if rf, ok := ret.Get(0).(func(map[string]string, []string) error); ok { - r0 = rf(saves, removals) + if rf, ok := ret.Get(0).(func(map[string]string, []string, ...predicates.Predicate) error); ok { + r0 = rf(saves, removals, preds...) } else { r0 = ret.Error(0) } @@ -471,13 +496,21 @@ type TxnKV_MultiSaveAndRemoveWithPrefix_Call struct { // MultiSaveAndRemoveWithPrefix is a helper method to define mock.On call // - saves map[string]string // - removals []string -func (_e *TxnKV_Expecter) MultiSaveAndRemoveWithPrefix(saves interface{}, removals interface{}) *TxnKV_MultiSaveAndRemoveWithPrefix_Call { - return &TxnKV_MultiSaveAndRemoveWithPrefix_Call{Call: _e.mock.On("MultiSaveAndRemoveWithPrefix", saves, removals)} +// - preds ...predicates.Predicate +func (_e *TxnKV_Expecter) MultiSaveAndRemoveWithPrefix(saves interface{}, removals interface{}, preds ...interface{}) *TxnKV_MultiSaveAndRemoveWithPrefix_Call { + return &TxnKV_MultiSaveAndRemoveWithPrefix_Call{Call: _e.mock.On("MultiSaveAndRemoveWithPrefix", + append([]interface{}{saves, removals}, preds...)...)} } -func (_c *TxnKV_MultiSaveAndRemoveWithPrefix_Call) Run(run func(saves map[string]string, removals []string)) *TxnKV_MultiSaveAndRemoveWithPrefix_Call { +func (_c *TxnKV_MultiSaveAndRemoveWithPrefix_Call) Run(run func(saves map[string]string, removals []string, preds ...predicates.Predicate)) *TxnKV_MultiSaveAndRemoveWithPrefix_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(map[string]string), args[1].([]string)) + variadicArgs := make([]predicates.Predicate, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(predicates.Predicate) + } + } + run(args[0].(map[string]string), args[1].([]string), variadicArgs...) }) return _c } @@ -487,7 +520,7 @@ func (_c *TxnKV_MultiSaveAndRemoveWithPrefix_Call) Return(_a0 error) *TxnKV_Mult return _c } -func (_c *TxnKV_MultiSaveAndRemoveWithPrefix_Call) RunAndReturn(run func(map[string]string, []string) error) *TxnKV_MultiSaveAndRemoveWithPrefix_Call { +func (_c *TxnKV_MultiSaveAndRemoveWithPrefix_Call) RunAndReturn(run func(map[string]string, []string, ...predicates.Predicate) error) *TxnKV_MultiSaveAndRemoveWithPrefix_Call { _c.Call.Return(run) return _c } diff --git a/internal/kv/mocks/watch_kv.go b/internal/kv/mocks/watch_kv.go index 8c9c9b18f4..c49ff4a924 100644 --- a/internal/kv/mocks/watch_kv.go +++ b/internal/kv/mocks/watch_kv.go @@ -6,6 +6,8 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" mock "github.com/stretchr/testify/mock" + + predicates "github.com/milvus-io/milvus/internal/kv/predicates" ) // WatchKV is an autogenerated mock type for the WatchKV type @@ -506,13 +508,20 @@ func (_c *WatchKV_MultiSave_Call) RunAndReturn(run func(map[string]string) error return _c } -// MultiSaveAndRemove provides a mock function with given fields: saves, removals -func (_m *WatchKV) MultiSaveAndRemove(saves map[string]string, removals []string) error { - ret := _m.Called(saves, removals) +// MultiSaveAndRemove provides a mock function with given fields: saves, removals, preds +func (_m *WatchKV) MultiSaveAndRemove(saves map[string]string, removals []string, preds ...predicates.Predicate) error { + _va := make([]interface{}, len(preds)) + for _i := range preds { + _va[_i] = preds[_i] + } + var _ca []interface{} + _ca = append(_ca, saves, removals) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) var r0 error - if rf, ok := ret.Get(0).(func(map[string]string, []string) error); ok { - r0 = rf(saves, removals) + if rf, ok := ret.Get(0).(func(map[string]string, []string, ...predicates.Predicate) error); ok { + r0 = rf(saves, removals, preds...) } else { r0 = ret.Error(0) } @@ -528,13 +537,21 @@ type WatchKV_MultiSaveAndRemove_Call struct { // MultiSaveAndRemove is a helper method to define mock.On call // - saves map[string]string // - removals []string -func (_e *WatchKV_Expecter) MultiSaveAndRemove(saves interface{}, removals interface{}) *WatchKV_MultiSaveAndRemove_Call { - return &WatchKV_MultiSaveAndRemove_Call{Call: _e.mock.On("MultiSaveAndRemove", saves, removals)} +// - preds ...predicates.Predicate +func (_e *WatchKV_Expecter) MultiSaveAndRemove(saves interface{}, removals interface{}, preds ...interface{}) *WatchKV_MultiSaveAndRemove_Call { + return &WatchKV_MultiSaveAndRemove_Call{Call: _e.mock.On("MultiSaveAndRemove", + append([]interface{}{saves, removals}, preds...)...)} } -func (_c *WatchKV_MultiSaveAndRemove_Call) Run(run func(saves map[string]string, removals []string)) *WatchKV_MultiSaveAndRemove_Call { +func (_c *WatchKV_MultiSaveAndRemove_Call) Run(run func(saves map[string]string, removals []string, preds ...predicates.Predicate)) *WatchKV_MultiSaveAndRemove_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(map[string]string), args[1].([]string)) + variadicArgs := make([]predicates.Predicate, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(predicates.Predicate) + } + } + run(args[0].(map[string]string), args[1].([]string), variadicArgs...) }) return _c } @@ -544,18 +561,25 @@ func (_c *WatchKV_MultiSaveAndRemove_Call) Return(_a0 error) *WatchKV_MultiSaveA return _c } -func (_c *WatchKV_MultiSaveAndRemove_Call) RunAndReturn(run func(map[string]string, []string) error) *WatchKV_MultiSaveAndRemove_Call { +func (_c *WatchKV_MultiSaveAndRemove_Call) RunAndReturn(run func(map[string]string, []string, ...predicates.Predicate) error) *WatchKV_MultiSaveAndRemove_Call { _c.Call.Return(run) return _c } -// MultiSaveAndRemoveWithPrefix provides a mock function with given fields: saves, removals -func (_m *WatchKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error { - ret := _m.Called(saves, removals) +// MultiSaveAndRemoveWithPrefix provides a mock function with given fields: saves, removals, preds +func (_m *WatchKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, preds ...predicates.Predicate) error { + _va := make([]interface{}, len(preds)) + for _i := range preds { + _va[_i] = preds[_i] + } + var _ca []interface{} + _ca = append(_ca, saves, removals) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) var r0 error - if rf, ok := ret.Get(0).(func(map[string]string, []string) error); ok { - r0 = rf(saves, removals) + if rf, ok := ret.Get(0).(func(map[string]string, []string, ...predicates.Predicate) error); ok { + r0 = rf(saves, removals, preds...) } else { r0 = ret.Error(0) } @@ -571,13 +595,21 @@ type WatchKV_MultiSaveAndRemoveWithPrefix_Call struct { // MultiSaveAndRemoveWithPrefix is a helper method to define mock.On call // - saves map[string]string // - removals []string -func (_e *WatchKV_Expecter) MultiSaveAndRemoveWithPrefix(saves interface{}, removals interface{}) *WatchKV_MultiSaveAndRemoveWithPrefix_Call { - return &WatchKV_MultiSaveAndRemoveWithPrefix_Call{Call: _e.mock.On("MultiSaveAndRemoveWithPrefix", saves, removals)} +// - preds ...predicates.Predicate +func (_e *WatchKV_Expecter) MultiSaveAndRemoveWithPrefix(saves interface{}, removals interface{}, preds ...interface{}) *WatchKV_MultiSaveAndRemoveWithPrefix_Call { + return &WatchKV_MultiSaveAndRemoveWithPrefix_Call{Call: _e.mock.On("MultiSaveAndRemoveWithPrefix", + append([]interface{}{saves, removals}, preds...)...)} } -func (_c *WatchKV_MultiSaveAndRemoveWithPrefix_Call) Run(run func(saves map[string]string, removals []string)) *WatchKV_MultiSaveAndRemoveWithPrefix_Call { +func (_c *WatchKV_MultiSaveAndRemoveWithPrefix_Call) Run(run func(saves map[string]string, removals []string, preds ...predicates.Predicate)) *WatchKV_MultiSaveAndRemoveWithPrefix_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(map[string]string), args[1].([]string)) + variadicArgs := make([]predicates.Predicate, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(predicates.Predicate) + } + } + run(args[0].(map[string]string), args[1].([]string), variadicArgs...) }) return _c } @@ -587,7 +619,7 @@ func (_c *WatchKV_MultiSaveAndRemoveWithPrefix_Call) Return(_a0 error) *WatchKV_ return _c } -func (_c *WatchKV_MultiSaveAndRemoveWithPrefix_Call) RunAndReturn(run func(map[string]string, []string) error) *WatchKV_MultiSaveAndRemoveWithPrefix_Call { +func (_c *WatchKV_MultiSaveAndRemoveWithPrefix_Call) RunAndReturn(run func(map[string]string, []string, ...predicates.Predicate) error) *WatchKV_MultiSaveAndRemoveWithPrefix_Call { _c.Call.Return(run) return _c } diff --git a/internal/kv/predicates/mock_predicate.go b/internal/kv/predicates/mock_predicate.go new file mode 100644 index 0000000000..1183a47881 --- /dev/null +++ b/internal/kv/predicates/mock_predicate.go @@ -0,0 +1,240 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package predicates + +import mock "github.com/stretchr/testify/mock" + +// MockPredicate is an autogenerated mock type for the Predicate type +type MockPredicate struct { + mock.Mock +} + +type MockPredicate_Expecter struct { + mock *mock.Mock +} + +func (_m *MockPredicate) EXPECT() *MockPredicate_Expecter { + return &MockPredicate_Expecter{mock: &_m.Mock} +} + +// IsTrue provides a mock function with given fields: _a0 +func (_m *MockPredicate) IsTrue(_a0 interface{}) bool { + ret := _m.Called(_a0) + + var r0 bool + if rf, ok := ret.Get(0).(func(interface{}) bool); ok { + r0 = rf(_a0) + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// MockPredicate_IsTrue_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsTrue' +type MockPredicate_IsTrue_Call struct { + *mock.Call +} + +// IsTrue is a helper method to define mock.On call +// - _a0 interface{} +func (_e *MockPredicate_Expecter) IsTrue(_a0 interface{}) *MockPredicate_IsTrue_Call { + return &MockPredicate_IsTrue_Call{Call: _e.mock.On("IsTrue", _a0)} +} + +func (_c *MockPredicate_IsTrue_Call) Run(run func(_a0 interface{})) *MockPredicate_IsTrue_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(interface{})) + }) + return _c +} + +func (_c *MockPredicate_IsTrue_Call) Return(_a0 bool) *MockPredicate_IsTrue_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockPredicate_IsTrue_Call) RunAndReturn(run func(interface{}) bool) *MockPredicate_IsTrue_Call { + _c.Call.Return(run) + return _c +} + +// Key provides a mock function with given fields: +func (_m *MockPredicate) Key() string { + ret := _m.Called() + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// MockPredicate_Key_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Key' +type MockPredicate_Key_Call struct { + *mock.Call +} + +// Key is a helper method to define mock.On call +func (_e *MockPredicate_Expecter) Key() *MockPredicate_Key_Call { + return &MockPredicate_Key_Call{Call: _e.mock.On("Key")} +} + +func (_c *MockPredicate_Key_Call) Run(run func()) *MockPredicate_Key_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockPredicate_Key_Call) Return(_a0 string) *MockPredicate_Key_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockPredicate_Key_Call) RunAndReturn(run func() string) *MockPredicate_Key_Call { + _c.Call.Return(run) + return _c +} + +// Target provides a mock function with given fields: +func (_m *MockPredicate) Target() PredicateTarget { + ret := _m.Called() + + var r0 PredicateTarget + if rf, ok := ret.Get(0).(func() PredicateTarget); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(PredicateTarget) + } + + return r0 +} + +// MockPredicate_Target_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Target' +type MockPredicate_Target_Call struct { + *mock.Call +} + +// Target is a helper method to define mock.On call +func (_e *MockPredicate_Expecter) Target() *MockPredicate_Target_Call { + return &MockPredicate_Target_Call{Call: _e.mock.On("Target")} +} + +func (_c *MockPredicate_Target_Call) Run(run func()) *MockPredicate_Target_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockPredicate_Target_Call) Return(_a0 PredicateTarget) *MockPredicate_Target_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockPredicate_Target_Call) RunAndReturn(run func() PredicateTarget) *MockPredicate_Target_Call { + _c.Call.Return(run) + return _c +} + +// TargetValue provides a mock function with given fields: +func (_m *MockPredicate) TargetValue() interface{} { + ret := _m.Called() + + var r0 interface{} + if rf, ok := ret.Get(0).(func() interface{}); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(interface{}) + } + } + + return r0 +} + +// MockPredicate_TargetValue_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'TargetValue' +type MockPredicate_TargetValue_Call struct { + *mock.Call +} + +// TargetValue is a helper method to define mock.On call +func (_e *MockPredicate_Expecter) TargetValue() *MockPredicate_TargetValue_Call { + return &MockPredicate_TargetValue_Call{Call: _e.mock.On("TargetValue")} +} + +func (_c *MockPredicate_TargetValue_Call) Run(run func()) *MockPredicate_TargetValue_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockPredicate_TargetValue_Call) Return(_a0 interface{}) *MockPredicate_TargetValue_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockPredicate_TargetValue_Call) RunAndReturn(run func() interface{}) *MockPredicate_TargetValue_Call { + _c.Call.Return(run) + return _c +} + +// Type provides a mock function with given fields: +func (_m *MockPredicate) Type() PredicateType { + ret := _m.Called() + + var r0 PredicateType + if rf, ok := ret.Get(0).(func() PredicateType); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(PredicateType) + } + + return r0 +} + +// MockPredicate_Type_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Type' +type MockPredicate_Type_Call struct { + *mock.Call +} + +// Type is a helper method to define mock.On call +func (_e *MockPredicate_Expecter) Type() *MockPredicate_Type_Call { + return &MockPredicate_Type_Call{Call: _e.mock.On("Type")} +} + +func (_c *MockPredicate_Type_Call) Run(run func()) *MockPredicate_Type_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockPredicate_Type_Call) Return(_a0 PredicateType) *MockPredicate_Type_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockPredicate_Type_Call) RunAndReturn(run func() PredicateType) *MockPredicate_Type_Call { + _c.Call.Return(run) + return _c +} + +// NewMockPredicate creates a new instance of MockPredicate. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockPredicate(t interface { + mock.TestingT + Cleanup(func()) +}) *MockPredicate { + mock := &MockPredicate{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/kv/predicates/predicate.go b/internal/kv/predicates/predicate.go new file mode 100644 index 0000000000..7f1a234888 --- /dev/null +++ b/internal/kv/predicates/predicate.go @@ -0,0 +1,73 @@ +package predicates + +// PredicateTarget is enum for Predicate target type. +type PredicateTarget int32 + +const ( + // PredTargetValue is predicate target for key-value perid + PredTargetValue PredicateTarget = iota + 1 +) + +type PredicateType int32 + +const ( + PredTypeEqual PredicateType = iota + 1 +) + +// Predicate provides interface for kv predicate. +type Predicate interface { + Target() PredicateTarget + Type() PredicateType + IsTrue(any) bool + Key() string + TargetValue() any +} + +type valuePredicate struct { + k, v string + pt PredicateType +} + +func (p *valuePredicate) Target() PredicateTarget { + return PredTargetValue +} + +func (p *valuePredicate) Type() PredicateType { + return p.pt +} + +func (p *valuePredicate) IsTrue(target any) bool { + switch v := target.(type) { + case string: + return predicateValue(p.pt, v, p.v) + case []byte: + return predicateValue(p.pt, string(v), p.v) + default: + return false + } +} + +func (p *valuePredicate) Key() string { + return p.k +} + +func (p *valuePredicate) TargetValue() any { + return p.v +} + +func predicateValue[T comparable](pt PredicateType, v1, v2 T) bool { + switch pt { + case PredTypeEqual: + return v1 == v2 + default: + return false + } +} + +func ValueEqual(k, v string) Predicate { + return &valuePredicate{ + k: k, + v: v, + pt: PredTypeEqual, + } +} diff --git a/internal/kv/predicates/predicate_test.go b/internal/kv/predicates/predicate_test.go new file mode 100644 index 0000000000..774cd56c62 --- /dev/null +++ b/internal/kv/predicates/predicate_test.go @@ -0,0 +1,33 @@ +package predicates + +import ( + "testing" + + "github.com/stretchr/testify/suite" +) + +type PredicateSuite struct { + suite.Suite +} + +func (s *PredicateSuite) TestValueEqual() { + p := ValueEqual("key", "value") + s.Equal("key", p.Key()) + s.Equal("value", p.TargetValue()) + s.Equal(PredTargetValue, p.Target()) + s.Equal(PredTypeEqual, p.Type()) + s.True(p.IsTrue("value")) + s.False(p.IsTrue("not_value")) + s.True(p.IsTrue([]byte("value"))) + s.False(p.IsTrue(1)) +} + +func (s *PredicateSuite) TestPredicateValue() { + s.True(predicateValue(PredTypeEqual, 1, 1)) + s.False(predicateValue(PredTypeEqual, 1, 2)) + s.False(predicateValue(0, 1, 1)) +} + +func TestPredicates(t *testing.T) { + suite.Run(t, new(PredicateSuite)) +} diff --git a/internal/kv/rocksdb/rocksdb_kv.go b/internal/kv/rocksdb/rocksdb_kv.go index 51eb162782..f885413891 100644 --- a/internal/kv/rocksdb/rocksdb_kv.go +++ b/internal/kv/rocksdb/rocksdb_kv.go @@ -23,6 +23,8 @@ import ( "github.com/tecbot/gorocksdb" "github.com/milvus-io/milvus/internal/kv" + "github.com/milvus-io/milvus/internal/kv/predicates" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -389,7 +391,10 @@ func (kv *RocksdbKV) MultiRemove(keys []string) error { } // MultiSaveAndRemove provides a transaction to execute a batch of operations -func (kv *RocksdbKV) MultiSaveAndRemove(saves map[string]string, removals []string) error { +func (kv *RocksdbKV) MultiSaveAndRemove(saves map[string]string, removals []string, preds ...predicates.Predicate) error { + if len(preds) > 0 { + return merr.WrapErrServiceUnavailable("predicates not supported") + } if kv.DB == nil { return errors.New("Rocksdb instance is nil when do MultiSaveAndRemove") } @@ -421,7 +426,10 @@ func (kv *RocksdbKV) DeleteRange(startKey, endKey string) error { } // MultiSaveAndRemoveWithPrefix is used to execute a batch operators with the same prefix -func (kv *RocksdbKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error { +func (kv *RocksdbKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, preds ...predicates.Predicate) error { + if len(preds) > 0 { + return merr.WrapErrServiceUnavailable("predicates not supported") + } if kv.DB == nil { return errors.New("Rocksdb instance is nil when do MultiSaveAndRemove") } diff --git a/internal/kv/rocksdb/rocksdb_kv_test.go b/internal/kv/rocksdb/rocksdb_kv_test.go index 2b0dc2507a..b1d07010b8 100644 --- a/internal/kv/rocksdb/rocksdb_kv_test.go +++ b/internal/kv/rocksdb/rocksdb_kv_test.go @@ -23,8 +23,11 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/milvus-io/milvus/internal/kv/predicates" rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb" + "github.com/milvus-io/milvus/pkg/util/merr" ) func TestRocksdbKV(t *testing.T) { @@ -364,3 +367,20 @@ func TestHasPrefix(t *testing.T) { assert.NoError(t, err) assert.False(t, has) } + +func TestPredicates(t *testing.T) { + dir := t.TempDir() + db, err := rocksdbkv.NewRocksdbKV(dir) + + require.NoError(t, err) + defer db.Close() + defer db.RemoveWithPrefix("") + + err = db.MultiSaveAndRemove(map[string]string{}, []string{}, predicates.ValueEqual("a", "b")) + assert.Error(t, err) + assert.ErrorIs(t, err, merr.ErrServiceUnavailable) + + err = db.MultiSaveAndRemoveWithPrefix(map[string]string{}, []string{}, predicates.ValueEqual("a", "b")) + assert.Error(t, err) + assert.ErrorIs(t, err, merr.ErrServiceUnavailable) +} diff --git a/internal/kv/tikv/txn_tikv.go b/internal/kv/tikv/txn_tikv.go index 0c95702e38..7603aad714 100644 --- a/internal/kv/tikv/txn_tikv.go +++ b/internal/kv/tikv/txn_tikv.go @@ -33,9 +33,11 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus/internal/kv" + "github.com/milvus-io/milvus/internal/kv/predicates" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/timerecord" ) @@ -426,72 +428,98 @@ func (kv *txnTiKV) RemoveWithPrefix(prefix string) error { } // MultiSaveAndRemove saves the key-value pairs and removes the keys in a transaction. -func (kv *txnTiKV) MultiSaveAndRemove(saves map[string]string, removals []string) error { +func (kv *txnTiKV) MultiSaveAndRemove(saves map[string]string, removals []string, preds ...predicates.Predicate) error { start := time.Now() ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout) defer cancel() - var logging_error error - defer logWarnOnFailure(&logging_error, "txnTiKV MultiSaveAndRemove error", zap.Any("saves", saves), zap.Strings("removes", removals), zap.Int("saveLength", len(saves)), zap.Int("removeLength", len(removals))) + var loggingErr error + defer logWarnOnFailure(&loggingErr, "txnTiKV MultiSaveAndRemove error", zap.Any("saves", saves), zap.Strings("removes", removals), zap.Int("saveLength", len(saves)), zap.Int("removeLength", len(removals))) txn, err := beginTxn(kv.txn) if err != nil { - logging_error = errors.Wrap(err, "Failed to create txn for MultiSaveAndRemove") - return logging_error + loggingErr = errors.Wrap(err, "Failed to create txn for MultiSaveAndRemove") + return loggingErr } // Defer a rollback only if the transaction hasn't been committed - defer rollbackOnFailure(&logging_error, txn) + defer rollbackOnFailure(&loggingErr, txn) + + for _, pred := range preds { + key := path.Join(kv.rootPath, pred.Key()) + val, err := txn.Get(ctx, []byte(key)) + if err != nil { + loggingErr = errors.Wrap(err, fmt.Sprintf("failed to read predicate target (%s:%v) for MultiSaveAndRemove", pred.Key(), pred.TargetValue())) + return loggingErr + } + if !pred.IsTrue(val) { + loggingErr = merr.WrapErrIoFailedReason("failed to meet predicate", fmt.Sprintf("key=%s, value=%v", pred.Key(), pred.TargetValue())) + return loggingErr + } + } for key, value := range saves { key = path.Join(kv.rootPath, key) // Check if value is empty or taking reserved EmptyValue byte_value, err := convertEmptyStringToByte(value) if err != nil { - logging_error = errors.Wrap(err, fmt.Sprintf("Failed to cast to byte (%s:%s) for MultiSaveAndRemove", key, value)) - return logging_error + loggingErr = errors.Wrap(err, fmt.Sprintf("Failed to cast to byte (%s:%s) for MultiSaveAndRemove", key, value)) + return loggingErr } err = txn.Set([]byte(key), byte_value) if err != nil { - logging_error = errors.Wrap(err, fmt.Sprintf("Failed to set (%s:%s) for MultiSaveAndRemove", key, value)) - return logging_error + loggingErr = errors.Wrap(err, fmt.Sprintf("Failed to set (%s:%s) for MultiSaveAndRemove", key, value)) + return loggingErr } } for _, key := range removals { key = path.Join(kv.rootPath, key) if err = txn.Delete([]byte(key)); err != nil { - logging_error = errors.Wrap(err, fmt.Sprintf("Failed to delete %s for MultiSaveAndRemove", key)) - return logging_error + loggingErr = errors.Wrap(err, fmt.Sprintf("Failed to delete %s for MultiSaveAndRemove", key)) + return loggingErr } } err = kv.executeTxn(txn, ctx) if err != nil { - logging_error = errors.Wrap(err, "Failed to commit for MultiSaveAndRemove") - return logging_error + loggingErr = errors.Wrap(err, "Failed to commit for MultiSaveAndRemove") + return loggingErr } CheckElapseAndWarn(start, "Slow txnTiKV MultiSaveAndRemove() operation", zap.Any("saves", saves), zap.Strings("removals", removals)) return nil } // MultiSaveAndRemoveWithPrefix saves kv in @saves and removes the keys with given prefix in @removals. -func (kv *txnTiKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error { +func (kv *txnTiKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string, preds ...predicates.Predicate) error { start := time.Now() ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout) defer cancel() - var logging_error error - defer logWarnOnFailure(&logging_error, "txnTiKV MultiSaveAndRemoveWithPrefix() error", zap.Any("saves", saves), zap.Strings("removes", removals), zap.Int("saveLength", len(saves)), zap.Int("removeLength", len(removals))) + var loggingErr error + defer logWarnOnFailure(&loggingErr, "txnTiKV MultiSaveAndRemoveWithPrefix() error", zap.Any("saves", saves), zap.Strings("removes", removals), zap.Int("saveLength", len(saves)), zap.Int("removeLength", len(removals))) txn, err := beginTxn(kv.txn) if err != nil { - logging_error = errors.Wrap(err, "Failed to create txn for MultiSaveAndRemoveWithPrefix") - return logging_error + loggingErr = errors.Wrap(err, "Failed to create txn for MultiSaveAndRemoveWithPrefix") + return loggingErr } // Defer a rollback only if the transaction hasn't been committed - defer rollbackOnFailure(&logging_error, txn) + defer rollbackOnFailure(&loggingErr, txn) + + for _, pred := range preds { + key := path.Join(kv.rootPath, pred.Key()) + val, err := txn.Get(ctx, []byte(key)) + if err != nil { + loggingErr = errors.Wrap(err, fmt.Sprintf("failed to read predicate target (%s:%v) for MultiSaveAndRemove", pred.Key(), pred.TargetValue())) + return loggingErr + } + if !pred.IsTrue(val) { + loggingErr = merr.WrapErrIoFailedReason("failed to meet predicate", fmt.Sprintf("key=%s, value=%v", pred.Key(), pred.TargetValue())) + return loggingErr + } + } // Save key-value pairs for key, value := range saves { @@ -499,13 +527,13 @@ func (kv *txnTiKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removal // Check if value is empty or taking reserved EmptyValue byte_value, err := convertEmptyStringToByte(value) if err != nil { - logging_error = errors.Wrap(err, fmt.Sprintf("Failed to cast to byte (%s:%s) for MultiSaveAndRemoveWithPrefix()", key, value)) - return logging_error + loggingErr = errors.Wrap(err, fmt.Sprintf("Failed to cast to byte (%s:%s) for MultiSaveAndRemoveWithPrefix()", key, value)) + return loggingErr } err = txn.Set([]byte(key), byte_value) if err != nil { - logging_error = errors.Wrap(err, fmt.Sprintf("Failed to set (%s:%s) for MultiSaveAndRemoveWithPrefix()", key, value)) - return logging_error + loggingErr = errors.Wrap(err, fmt.Sprintf("Failed to set (%s:%s) for MultiSaveAndRemoveWithPrefix()", key, value)) + return loggingErr } } // Remove keys with prefix @@ -518,31 +546,31 @@ func (kv *txnTiKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removal // Use Scan to iterate over keys in the prefix range iter, err := txn.Iter(startKey, endKey) if err != nil { - logging_error = errors.Wrap(err, fmt.Sprintf("Failed to create iterater for %s during MultiSaveAndRemoveWithPrefix()", prefix)) - return logging_error + loggingErr = errors.Wrap(err, fmt.Sprintf("Failed to create iterater for %s during MultiSaveAndRemoveWithPrefix()", prefix)) + return loggingErr } // Iterate over keys and delete them for iter.Valid() { key := iter.Key() err = txn.Delete(key) - if logging_error != nil { - logging_error = errors.Wrap(err, fmt.Sprintf("Failed to delete %s for MultiSaveAndRemoveWithPrefix", string(key))) - return logging_error + if loggingErr != nil { + loggingErr = errors.Wrap(err, fmt.Sprintf("Failed to delete %s for MultiSaveAndRemoveWithPrefix", string(key))) + return loggingErr } // Move the iterator to the next key err = iter.Next() if err != nil { - logging_error = errors.Wrap(err, fmt.Sprintf("Failed to move Iterator after key %s for MultiSaveAndRemoveWithPrefix", string(key))) - return logging_error + loggingErr = errors.Wrap(err, fmt.Sprintf("Failed to move Iterator after key %s for MultiSaveAndRemoveWithPrefix", string(key))) + return loggingErr } } } err = kv.executeTxn(txn, ctx) if err != nil { - logging_error = errors.Wrap(err, "Failed to commit for MultiSaveAndRemoveWithPrefix") - return logging_error + loggingErr = errors.Wrap(err, "Failed to commit for MultiSaveAndRemoveWithPrefix") + return loggingErr } CheckElapseAndWarn(start, "Slow txnTiKV MultiSaveAndRemoveWithPrefix() operation", zap.Any("saves", saves), zap.Strings("removals", removals)) return nil diff --git a/internal/kv/tikv/txn_tikv_test.go b/internal/kv/tikv/txn_tikv_test.go index 55c995766a..c6b91fe045 100644 --- a/internal/kv/tikv/txn_tikv_test.go +++ b/internal/kv/tikv/txn_tikv_test.go @@ -29,6 +29,8 @@ import ( "github.com/tikv/client-go/v2/txnkv" "github.com/tikv/client-go/v2/txnkv/transaction" "golang.org/x/exp/maps" + + "github.com/milvus-io/milvus/internal/kv/predicates" ) func TestTiKVLoad(te *testing.T) { @@ -596,3 +598,45 @@ func TestTiKVUnimplemented(t *testing.T) { _, err = kv.CompareVersionAndSwap("k", 1, "target") assert.Error(t, err) } + +func TestTxnWithPredicates(t *testing.T) { + kv := NewTiKV(txnClient, "/") + err := kv.RemoveWithPrefix("") + require.NoError(t, err) + + prepareKV := map[string]string{ + "lease1": "1", + "lease2": "2", + } + + err = kv.MultiSave(prepareKV) + require.NoError(t, err) + + multiSaveAndRemovePredTests := []struct { + tag string + multiSave map[string]string + preds []predicates.Predicate + expectSuccess bool + }{ + {"predicate_ok", map[string]string{"a": "b"}, []predicates.Predicate{predicates.ValueEqual("lease1", "1")}, true}, + {"predicate_fail", map[string]string{"a": "b"}, []predicates.Predicate{predicates.ValueEqual("lease1", "2")}, false}, + } + + for _, test := range multiSaveAndRemovePredTests { + t.Run(test.tag, func(t *testing.T) { + err := kv.MultiSaveAndRemove(test.multiSave, nil, test.preds...) + t.Log(err) + if test.expectSuccess { + assert.NoError(t, err) + } else { + assert.Error(t, err) + } + err = kv.MultiSaveAndRemoveWithPrefix(test.multiSave, nil, test.preds...) + if test.expectSuccess { + assert.NoError(t, err) + } else { + assert.Error(t, err) + } + }) + } +} diff --git a/pkg/util/merr/utils.go b/pkg/util/merr/utils.go index d70153b079..07f199fdfe 100644 --- a/pkg/util/merr/utils.go +++ b/pkg/util/merr/utils.go @@ -554,6 +554,14 @@ func WrapErrIoFailed(key string, msg ...string) error { return err } +func WrapErrIoFailedReason(reason string, msg ...string) error { + err := errors.Wrapf(ErrIoFailed, reason) + if len(msg) > 0 { + err = errors.Wrap(err, strings.Join(msg, "; ")) + } + return err +} + // Parameter related func WrapErrParameterInvalid[T any](expected, actual T, msg ...string) error { err := errors.Wrapf(ErrParameterInvalid, "expected=%v, actual=%v", expected, actual)