From 5ad8a29c0b74382f5361cd89a664317612076839 Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Wed, 15 Oct 2025 10:15:59 +0800 Subject: [PATCH] enhance: Speed up CDC scheduling (#44564) Make CDC watch etcd replicate pchannel meta instead of listing them periodically. issue: https://github.com/milvus-io/milvus/issues/44123 --------- Signed-off-by: bigsheeper --- internal/cdc/controller/controller.go | 2 +- .../controllerimpl/controller_impl.go | 145 ++- .../controllerimpl/controller_impl_test.go | 98 +- .../controllerimpl/replicate_meta_util.go | 56 + internal/cdc/controller/mock_controller.go | 25 +- .../mock_replicate_manager_client.go | 57 +- .../replication/replicate_manager_client.go | 7 +- .../replicatemanager/replicate_manager.go | 37 +- internal/cdc/resource/resource.go | 16 + internal/cdc/resource/test_utility.go | 5 + internal/cdc/server.go | 8 +- internal/distributed/cdc/service.go | 20 +- pkg/.mockery_pkg.yaml | 1 + pkg/mocks/mock_kv/mock_WatchKV.go | 1034 +++++++++++++++++ 14 files changed, 1378 insertions(+), 133 deletions(-) create mode 100644 internal/cdc/controller/controllerimpl/replicate_meta_util.go create mode 100644 pkg/mocks/mock_kv/mock_WatchKV.go diff --git a/internal/cdc/controller/controller.go b/internal/cdc/controller/controller.go index e3c14a2834..13316438cd 100644 --- a/internal/cdc/controller/controller.go +++ b/internal/cdc/controller/controller.go @@ -19,6 +19,6 @@ package controller // Controller controls and schedules the CDC process. // It will periodically update the replications by the replicate configuration. type Controller interface { - Start() + Start() error Stop() } diff --git a/internal/cdc/controller/controllerimpl/controller_impl.go b/internal/cdc/controller/controllerimpl/controller_impl.go index 75abd9003c..383bdd5470 100644 --- a/internal/cdc/controller/controllerimpl/controller_impl.go +++ b/internal/cdc/controller/controllerimpl/controller_impl.go @@ -18,70 +18,135 @@ package controllerimpl import ( "context" + "path" + "strings" "sync" - "time" + "go.etcd.io/etcd/api/v3/mvccpb" + clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" "github.com/milvus-io/milvus/internal/cdc/resource" + "github.com/milvus-io/milvus/internal/metastore/kv/streamingcoord" "github.com/milvus-io/milvus/pkg/v2/log" + "github.com/milvus-io/milvus/pkg/v2/proto/streamingpb" + "github.com/milvus-io/milvus/pkg/v2/util/paramtable" ) -const checkInterval = 10 * time.Second - type controller struct { - ctx context.Context - wg sync.WaitGroup - stopOnce sync.Once - stopChan chan struct{} + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + + prefix string } func NewController() *controller { + ctx, cancel := context.WithCancel(context.Background()) return &controller{ - ctx: context.Background(), - stopChan: make(chan struct{}), + ctx: ctx, + cancel: cancel, + prefix: path.Join( + paramtable.Get().EtcdCfg.MetaRootPath.GetValue(), + streamingcoord.ReplicatePChannelMetaPrefix, + ), } } -func (c *controller) Start() { - log.Ctx(c.ctx).Info("CDC controller started") +func (c *controller) Start() error { + c.startWatchLoop() + return nil +} + +func (c *controller) recoverReplicatePChannelMeta(replicatePChannels []*streamingpb.ReplicatePChannelMeta) { + currentClusterID := paramtable.Get().CommonCfg.ClusterPrefix.GetValue() + for _, replicate := range replicatePChannels { + if !strings.Contains(replicate.GetSourceChannelName(), currentClusterID) { + // current cluster is not source cluster, skip create replicator + continue + } + log.Info("recover replicate pchannel meta", + zap.String("sourceChannel", replicate.GetSourceChannelName()), + zap.String("targetChannel", replicate.GetTargetChannelName()), + ) + // Replicate manager ensures the idempotency of the creation. + resource.Resource().ReplicateManagerClient().CreateReplicator(replicate) + } + resource.Resource().ReplicateManagerClient().RemoveOutdatedReplicators(replicatePChannels) +} + +func (c *controller) watchEvents(revision int64) clientv3.WatchChan { + eventCh := resource.Resource().ETCD().Watch( + c.ctx, + c.prefix, + clientv3.WithPrefix(), + clientv3.WithRev(revision), + ) + log.Ctx(c.ctx).Info("succeed to watch replicate pchannel meta events", + zap.Int64("revision", revision), zap.String("prefix", c.prefix)) + return eventCh +} + +func (c *controller) startWatchLoop() { c.wg.Add(1) go func() { defer c.wg.Done() - timer := time.NewTicker(checkInterval) - defer timer.Stop() for { - select { - case <-c.stopChan: - return - case <-timer.C: - c.run() + channels, revision, err := ListReplicatePChannels(c.ctx, resource.Resource().ETCD(), c.prefix) + if err != nil && c.ctx.Err() == nil { + log.Ctx(c.ctx).Warn("failed to list replicate pchannels", zap.Error(err)) + continue + } + c.recoverReplicatePChannelMeta(channels) + eventCh := c.watchEvents(revision) + err = c.watchLoop(eventCh) + if err == nil { + break } } }() } +func (c *controller) watchLoop(eventCh clientv3.WatchChan) error { + for { + select { + case <-c.ctx.Done(): + return nil + case event, ok := <-eventCh: + if !ok { + panic("etcd event channel closed") + } + if err := event.Err(); err != nil { + log.Warn("etcd event error", zap.Error(err)) + return err + } + for _, e := range event.Events { + replicate := MustParseReplicateChannelFromEvent(e) + log.Info("handle replicate pchannel event", + zap.String("sourceChannel", replicate.GetSourceChannelName()), + zap.String("targetChannel", replicate.GetTargetChannelName()), + zap.String("eventType", e.Type.String()), + ) + switch e.Type { + case mvccpb.PUT: + currentClusterID := paramtable.Get().CommonCfg.ClusterPrefix.GetValue() + if !strings.Contains(replicate.GetSourceChannelName(), currentClusterID) { + // current cluster is not source cluster, skip create replicator + continue + } + resource.Resource().ReplicateManagerClient().CreateReplicator(replicate) + case mvccpb.DELETE: + resource.Resource().ReplicateManagerClient().RemoveReplicator(replicate) + } + } + } + } +} + func (c *controller) Stop() { - c.stopOnce.Do(func() { - log.Ctx(c.ctx).Info("CDC controller stopping...") - close(c.stopChan) - c.wg.Wait() - resource.Resource().ReplicateManagerClient().Close() - log.Ctx(c.ctx).Info("CDC controller stopped") - }) -} - -func (c *controller) run() { - targetReplicatePChannels, err := resource.Resource().ReplicationCatalog().ListReplicatePChannels(c.ctx) - if err != nil { - log.Ctx(c.ctx).Error("failed to get replicate pchannels", zap.Error(err)) - return - } - // create replicators for all replicate pchannels - for _, replicatePChannel := range targetReplicatePChannels { - resource.Resource().ReplicateManagerClient().CreateReplicator(replicatePChannel) - } - - // remove out of target replicators - resource.Resource().ReplicateManagerClient().RemoveOutOfTargetReplicators(targetReplicatePChannels) + log.Ctx(c.ctx).Info("stop CDC controller...") + c.cancel() + c.wg.Wait() + resource.Resource().ReplicateManagerClient().Close() + log.Ctx(c.ctx).Info("CDC controller stopped") } diff --git a/internal/cdc/controller/controllerimpl/controller_impl_test.go b/internal/cdc/controller/controllerimpl/controller_impl_test.go index 7945f2c7cc..702186af61 100644 --- a/internal/cdc/controller/controllerimpl/controller_impl_test.go +++ b/internal/cdc/controller/controllerimpl/controller_impl_test.go @@ -19,69 +19,71 @@ package controllerimpl import ( "testing" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "go.etcd.io/etcd/api/v3/mvccpb" + clientv3 "go.etcd.io/etcd/client/v3" + "google.golang.org/protobuf/proto" "github.com/milvus-io/milvus/internal/cdc/replication" "github.com/milvus-io/milvus/internal/cdc/resource" - "github.com/milvus-io/milvus/internal/mocks/mock_metastore" "github.com/milvus-io/milvus/pkg/v2/proto/streamingpb" ) -func TestController_StartAndStop(t *testing.T) { - mockReplicateManagerClient := replication.NewMockReplicateManagerClient(t) - mockReplicateManagerClient.EXPECT().Close().Return() - resource.InitForTest(t, - resource.OptReplicateManagerClient(mockReplicateManagerClient), - ) - - ctrl := NewController() - assert.NotPanics(t, func() { - ctrl.Start() - }) - assert.NotPanics(t, func() { - ctrl.Stop() - }) -} - -func TestController_Run(t *testing.T) { +func TestController_StartAndStop_WithEvents(t *testing.T) { mockReplicateManagerClient := replication.NewMockReplicateManagerClient(t) mockReplicateManagerClient.EXPECT().Close().Return() - replicatePChannels := []*streamingpb.ReplicatePChannelMeta{ - { - SourceChannelName: "test-source-channel-1", - TargetChannelName: "test-target-channel-1", + // Create test data + replicateMeta := &streamingpb.ReplicatePChannelMeta{ + SourceChannelName: "by-dev-test-source-channel", + TargetChannelName: "by-dev-test-target-channel", + } + metaBytes, _ := proto.Marshal(replicateMeta) + + // Create mock events + putEvent := &clientv3.Event{ + Type: mvccpb.PUT, + Kv: &mvccpb.KeyValue{ + Value: metaBytes, }, } - mockReplicationCatalog := mock_metastore.NewMockReplicationCatalog(t) - mockReplicationCatalog.EXPECT().ListReplicatePChannels(mock.Anything).Return(replicatePChannels, nil) - mockReplicateManagerClient.EXPECT().CreateReplicator(replicatePChannels[0]).Return() - mockReplicateManagerClient.EXPECT().RemoveOutOfTargetReplicators(replicatePChannels).Return() + + deleteEvent := &clientv3.Event{ + Type: mvccpb.DELETE, + Kv: &mvccpb.KeyValue{ + Value: metaBytes, + }, + } + + eventCh := make(chan clientv3.WatchResponse, 2) + // Send events + go func() { + eventCh <- clientv3.WatchResponse{ + Events: []*clientv3.Event{putEvent}, + } + eventCh <- clientv3.WatchResponse{ + Events: []*clientv3.Event{deleteEvent}, + } + }() + + notifyCh := make(chan struct{}, 2) + mockReplicateManagerClient.EXPECT().CreateReplicator(mock.Anything).RunAndReturn(func(replicate *streamingpb.ReplicatePChannelMeta) { + notifyCh <- struct{}{} + }) + mockReplicateManagerClient.EXPECT().RemoveReplicator(mock.Anything).RunAndReturn(func(replicate *streamingpb.ReplicatePChannelMeta) { + notifyCh <- struct{}{} + }) + resource.InitForTest(t, resource.OptReplicateManagerClient(mockReplicateManagerClient), - resource.OptReplicationCatalog(mockReplicationCatalog), ) ctrl := NewController() - ctrl.Start() - defer ctrl.Stop() - ctrl.run() -} - -func TestController_RunError(t *testing.T) { - mockReplicateManagerClient := replication.NewMockReplicateManagerClient(t) - mockReplicateManagerClient.EXPECT().Close().Return() - - mockReplicationCatalog := mock_metastore.NewMockReplicationCatalog(t) - mockReplicationCatalog.EXPECT().ListReplicatePChannels(mock.Anything).Return(nil, assert.AnError) - resource.InitForTest(t, - resource.OptReplicateManagerClient(mockReplicateManagerClient), - resource.OptReplicationCatalog(mockReplicationCatalog), - ) - - ctrl := NewController() - ctrl.Start() - defer ctrl.Stop() - ctrl.run() + go ctrl.watchLoop(eventCh) + + // Wait for events to be processed + <-notifyCh + <-notifyCh + + ctrl.Stop() } diff --git a/internal/cdc/controller/controllerimpl/replicate_meta_util.go b/internal/cdc/controller/controllerimpl/replicate_meta_util.go new file mode 100644 index 0000000000..61f2151a6d --- /dev/null +++ b/internal/cdc/controller/controllerimpl/replicate_meta_util.go @@ -0,0 +1,56 @@ +package controllerimpl + +import ( + "context" + "fmt" + "time" + + "github.com/cockroachdb/errors" + clientv3 "go.etcd.io/etcd/client/v3" + "google.golang.org/protobuf/proto" + + "github.com/milvus-io/milvus/pkg/v2/proto/streamingpb" +) + +func ListReplicatePChannels(ctx context.Context, etcdCli *clientv3.Client, prefix string) ([]*streamingpb.ReplicatePChannelMeta, int64, error) { + resp, err := get(ctx, etcdCli, prefix) + if err != nil { + return nil, 0, err + } + keys := make([]string, 0, resp.Count) + values := make([]string, 0, resp.Count) + for _, kv := range resp.Kvs { + keys = append(keys, string(kv.Key)) + values = append(values, string(kv.Value)) + } + channels := make([]*streamingpb.ReplicatePChannelMeta, 0, len(values)) + for k, value := range values { + info := &streamingpb.ReplicatePChannelMeta{} + err = proto.Unmarshal([]byte(value), info) + if err != nil { + return nil, 0, errors.Wrapf(err, "unmarshal replicate pchannel meta %s failed", keys[k]) + } + channels = append(channels, info) + } + return channels, resp.Header.Revision, nil +} + +func MustParseReplicateChannelFromEvent(e *clientv3.Event) *streamingpb.ReplicatePChannelMeta { + meta := &streamingpb.ReplicatePChannelMeta{} + err := proto.Unmarshal(e.Kv.Value, meta) + if err != nil { + panic(fmt.Sprintf("failed to unmarshal replicate pchannel meta: %v", err)) + } + return meta +} + +func get(ctx context.Context, etcdCli *clientv3.Client, prefix string) (*clientv3.GetResponse, error) { + ctx1, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + opts := []clientv3.OpOption{ + clientv3.WithPrefix(), + clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend), + } + return etcdCli.Get(ctx1, prefix, opts...) +} diff --git a/internal/cdc/controller/mock_controller.go b/internal/cdc/controller/mock_controller.go index d7f2ab4403..509dbcc995 100644 --- a/internal/cdc/controller/mock_controller.go +++ b/internal/cdc/controller/mock_controller.go @@ -18,8 +18,21 @@ func (_m *MockController) EXPECT() *MockController_Expecter { } // Start provides a mock function with no fields -func (_m *MockController) Start() { - _m.Called() +func (_m *MockController) Start() error { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Start") + } + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 } // MockController_Start_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Start' @@ -39,13 +52,13 @@ func (_c *MockController_Start_Call) Run(run func()) *MockController_Start_Call return _c } -func (_c *MockController_Start_Call) Return() *MockController_Start_Call { - _c.Call.Return() +func (_c *MockController_Start_Call) Return(_a0 error) *MockController_Start_Call { + _c.Call.Return(_a0) return _c } -func (_c *MockController_Start_Call) RunAndReturn(run func()) *MockController_Start_Call { - _c.Run(run) +func (_c *MockController_Start_Call) RunAndReturn(run func() error) *MockController_Start_Call { + _c.Call.Return(run) return _c } diff --git a/internal/cdc/replication/mock_replicate_manager_client.go b/internal/cdc/replication/mock_replicate_manager_client.go index 88053815ff..9a1768b672 100644 --- a/internal/cdc/replication/mock_replicate_manager_client.go +++ b/internal/cdc/replication/mock_replicate_manager_client.go @@ -85,35 +85,68 @@ func (_c *MockReplicateManagerClient_CreateReplicator_Call) RunAndReturn(run fun return _c } -// RemoveOutOfTargetReplicators provides a mock function with given fields: targetReplicatePChannels -func (_m *MockReplicateManagerClient) RemoveOutOfTargetReplicators(targetReplicatePChannels []*streamingpb.ReplicatePChannelMeta) { - _m.Called(targetReplicatePChannels) +// RemoveOutdatedReplicators provides a mock function with given fields: aliveReplicates +func (_m *MockReplicateManagerClient) RemoveOutdatedReplicators(aliveReplicates []*streamingpb.ReplicatePChannelMeta) { + _m.Called(aliveReplicates) } -// MockReplicateManagerClient_RemoveOutOfTargetReplicators_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemoveOutOfTargetReplicators' -type MockReplicateManagerClient_RemoveOutOfTargetReplicators_Call struct { +// MockReplicateManagerClient_RemoveOutdatedReplicators_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemoveOutdatedReplicators' +type MockReplicateManagerClient_RemoveOutdatedReplicators_Call struct { *mock.Call } -// RemoveOutOfTargetReplicators is a helper method to define mock.On call -// - targetReplicatePChannels []*streamingpb.ReplicatePChannelMeta -func (_e *MockReplicateManagerClient_Expecter) RemoveOutOfTargetReplicators(targetReplicatePChannels interface{}) *MockReplicateManagerClient_RemoveOutOfTargetReplicators_Call { - return &MockReplicateManagerClient_RemoveOutOfTargetReplicators_Call{Call: _e.mock.On("RemoveOutOfTargetReplicators", targetReplicatePChannels)} +// RemoveOutdatedReplicators is a helper method to define mock.On call +// - aliveReplicates []*streamingpb.ReplicatePChannelMeta +func (_e *MockReplicateManagerClient_Expecter) RemoveOutdatedReplicators(aliveReplicates interface{}) *MockReplicateManagerClient_RemoveOutdatedReplicators_Call { + return &MockReplicateManagerClient_RemoveOutdatedReplicators_Call{Call: _e.mock.On("RemoveOutdatedReplicators", aliveReplicates)} } -func (_c *MockReplicateManagerClient_RemoveOutOfTargetReplicators_Call) Run(run func(targetReplicatePChannels []*streamingpb.ReplicatePChannelMeta)) *MockReplicateManagerClient_RemoveOutOfTargetReplicators_Call { +func (_c *MockReplicateManagerClient_RemoveOutdatedReplicators_Call) Run(run func(aliveReplicates []*streamingpb.ReplicatePChannelMeta)) *MockReplicateManagerClient_RemoveOutdatedReplicators_Call { _c.Call.Run(func(args mock.Arguments) { run(args[0].([]*streamingpb.ReplicatePChannelMeta)) }) return _c } -func (_c *MockReplicateManagerClient_RemoveOutOfTargetReplicators_Call) Return() *MockReplicateManagerClient_RemoveOutOfTargetReplicators_Call { +func (_c *MockReplicateManagerClient_RemoveOutdatedReplicators_Call) Return() *MockReplicateManagerClient_RemoveOutdatedReplicators_Call { _c.Call.Return() return _c } -func (_c *MockReplicateManagerClient_RemoveOutOfTargetReplicators_Call) RunAndReturn(run func([]*streamingpb.ReplicatePChannelMeta)) *MockReplicateManagerClient_RemoveOutOfTargetReplicators_Call { +func (_c *MockReplicateManagerClient_RemoveOutdatedReplicators_Call) RunAndReturn(run func([]*streamingpb.ReplicatePChannelMeta)) *MockReplicateManagerClient_RemoveOutdatedReplicators_Call { + _c.Run(run) + return _c +} + +// RemoveReplicator provides a mock function with given fields: replicateInfo +func (_m *MockReplicateManagerClient) RemoveReplicator(replicateInfo *streamingpb.ReplicatePChannelMeta) { + _m.Called(replicateInfo) +} + +// MockReplicateManagerClient_RemoveReplicator_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemoveReplicator' +type MockReplicateManagerClient_RemoveReplicator_Call struct { + *mock.Call +} + +// RemoveReplicator is a helper method to define mock.On call +// - replicateInfo *streamingpb.ReplicatePChannelMeta +func (_e *MockReplicateManagerClient_Expecter) RemoveReplicator(replicateInfo interface{}) *MockReplicateManagerClient_RemoveReplicator_Call { + return &MockReplicateManagerClient_RemoveReplicator_Call{Call: _e.mock.On("RemoveReplicator", replicateInfo)} +} + +func (_c *MockReplicateManagerClient_RemoveReplicator_Call) Run(run func(replicateInfo *streamingpb.ReplicatePChannelMeta)) *MockReplicateManagerClient_RemoveReplicator_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*streamingpb.ReplicatePChannelMeta)) + }) + return _c +} + +func (_c *MockReplicateManagerClient_RemoveReplicator_Call) Return() *MockReplicateManagerClient_RemoveReplicator_Call { + _c.Call.Return() + return _c +} + +func (_c *MockReplicateManagerClient_RemoveReplicator_Call) RunAndReturn(run func(*streamingpb.ReplicatePChannelMeta)) *MockReplicateManagerClient_RemoveReplicator_Call { _c.Run(run) return _c } diff --git a/internal/cdc/replication/replicate_manager_client.go b/internal/cdc/replication/replicate_manager_client.go index 6b40f64e31..2941a005c8 100644 --- a/internal/cdc/replication/replicate_manager_client.go +++ b/internal/cdc/replication/replicate_manager_client.go @@ -23,8 +23,11 @@ type ReplicateManagerClient interface { // CreateReplicator creates a new replicator for the replicate pchannel. CreateReplicator(replicateInfo *streamingpb.ReplicatePChannelMeta) - // RemoveOutOfTargetReplicators removes replicators that are not in the target replicate pchannels. - RemoveOutOfTargetReplicators(targetReplicatePChannels []*streamingpb.ReplicatePChannelMeta) + // RemoveReplicator removes a replicator for the replicate pchannel. + RemoveReplicator(replicateInfo *streamingpb.ReplicatePChannelMeta) + + // RemoveOutdatedReplicators removes the outdated replicators. + RemoveOutdatedReplicators(aliveReplicates []*streamingpb.ReplicatePChannelMeta) // Close closes the replicate manager client. Close() diff --git a/internal/cdc/replication/replicatemanager/replicate_manager.go b/internal/cdc/replication/replicatemanager/replicate_manager.go index d10de54eac..2262beca8d 100644 --- a/internal/cdc/replication/replicatemanager/replicate_manager.go +++ b/internal/cdc/replication/replicatemanager/replicate_manager.go @@ -20,7 +20,6 @@ import ( "context" "strings" - "github.com/samber/lo" "go.uber.org/zap" "github.com/milvus-io/milvus/internal/metastore/kv/streamingcoord" @@ -53,7 +52,7 @@ func (r *replicateManager) CreateReplicator(replicateInfo *streamingpb.Replicate ) currentClusterID := paramtable.Get().CommonCfg.ClusterPrefix.GetValue() if !strings.Contains(replicateInfo.GetSourceChannelName(), currentClusterID) { - // current cluster is not source cluster, skip create replicator + // should be checked by controller, here is a redundant check return } replicatorKey := streamingcoord.BuildReplicatePChannelMetaKey(replicateInfo) @@ -69,17 +68,37 @@ func (r *replicateManager) CreateReplicator(replicateInfo *streamingpb.Replicate logger.Info("created replicator for replicate pchannel") } -func (r *replicateManager) RemoveOutOfTargetReplicators(targetReplicatePChannels []*streamingpb.ReplicatePChannelMeta) { - targets := lo.KeyBy(targetReplicatePChannels, streamingcoord.BuildReplicatePChannelMetaKey) +func (r *replicateManager) RemoveReplicator(replicateInfo *streamingpb.ReplicatePChannelMeta) { + logger := log.With( + zap.String("sourceChannel", replicateInfo.GetSourceChannelName()), + zap.String("targetChannel", replicateInfo.GetTargetChannelName()), + ) + replicatorKey := streamingcoord.BuildReplicatePChannelMetaKey(replicateInfo) + replicator, ok := r.replicators[replicatorKey] + if !ok { + logger.Info("replicator not found, skip remove") + return + } + replicator.StopReplicate() + delete(r.replicators, replicatorKey) + delete(r.replicatorPChannels, replicatorKey) + logger.Info("removed replicator for replicate pchannel") +} + +func (r *replicateManager) RemoveOutdatedReplicators(aliveReplicates []*streamingpb.ReplicatePChannelMeta) { + alivesMap := make(map[string]struct{}) + for _, replicate := range aliveReplicates { + alivesMap[streamingcoord.BuildReplicatePChannelMetaKey(replicate)] = struct{}{} + } for replicatorKey, replicator := range r.replicators { - if pchannelMeta, ok := targets[replicatorKey]; !ok { + if _, ok := alivesMap[replicatorKey]; !ok { replicator.StopReplicate() + info := r.replicatorPChannels[replicatorKey] delete(r.replicators, replicatorKey) delete(r.replicatorPChannels, replicatorKey) - log.Info("removed replicator due to out of target", - zap.String("sourceChannel", pchannelMeta.GetSourceChannelName()), - zap.String("targetChannel", pchannelMeta.GetTargetChannelName()), - ) + log.Info("removed replicator for replicate pchannel", + zap.String("sourceChannel", info.GetSourceChannelName()), + zap.String("targetChannel", info.GetTargetChannelName())) } } } diff --git a/internal/cdc/resource/resource.go b/internal/cdc/resource/resource.go index 8b323482ed..81ac4d2563 100644 --- a/internal/cdc/resource/resource.go +++ b/internal/cdc/resource/resource.go @@ -19,6 +19,8 @@ package resource import ( "reflect" + clientv3 "go.etcd.io/etcd/client/v3" + "github.com/milvus-io/milvus/internal/cdc/cluster" "github.com/milvus-io/milvus/internal/cdc/controller" "github.com/milvus-io/milvus/internal/cdc/replication" @@ -39,6 +41,13 @@ func OptMetaKV(metaKV kv.MetaKv) optResourceInit { } } +// OptETCD provides the etcd client to the resource. +func OptETCD(etcd *clientv3.Client) optResourceInit { + return func(r *resourceImpl) { + r.etcdClient = etcd + } +} + // OptReplicateManagerClient provides the replicate manager client to the resource. func OptReplicateManagerClient(replicateManagerClient replication.ReplicateManagerClient) optResourceInit { return func(r *resourceImpl) { @@ -78,6 +87,7 @@ func Init(opts ...optResourceInit) { newR.clusterClient = cluster.NewClusterClient() assertNotNil(newR.MetaKV()) + assertNotNil(newR.ETCD()) assertNotNil(newR.ReplicationCatalog()) assertNotNil(newR.ClusterClient()) assertNotNil(newR.ReplicateManagerClient()) @@ -97,6 +107,7 @@ func Resource() *resourceImpl { // All utility on it is concurrent-safe and singleton. type resourceImpl struct { metaKV kv.MetaKv + etcdClient *clientv3.Client catalog metastore.ReplicationCatalog clusterClient cluster.ClusterClient replicateManagerClient replication.ReplicateManagerClient @@ -108,6 +119,11 @@ func (r *resourceImpl) MetaKV() kv.MetaKv { return r.metaKV } +// ETCD returns the etcd client. +func (r *resourceImpl) ETCD() *clientv3.Client { + return r.etcdClient +} + // ReplicationCatalog returns the replication catalog. func (r *resourceImpl) ReplicationCatalog() metastore.ReplicationCatalog { return r.catalog diff --git a/internal/cdc/resource/test_utility.go b/internal/cdc/resource/test_utility.go index 6afeba097a..f3e2a225d8 100644 --- a/internal/cdc/resource/test_utility.go +++ b/internal/cdc/resource/test_utility.go @@ -6,6 +6,8 @@ package resource import ( "testing" + clientv3 "go.etcd.io/etcd/client/v3" + "github.com/milvus-io/milvus/internal/cdc/cluster" "github.com/milvus-io/milvus/internal/cdc/controller" "github.com/milvus-io/milvus/internal/cdc/replication" @@ -22,6 +24,9 @@ func InitForTest(t *testing.T, opts ...optResourceInit) { if r.metaKV == nil { r.metaKV = mock_kv.NewMockMetaKv(t) } + if r.etcdClient == nil { + r.etcdClient = &clientv3.Client{} + } if r.catalog == nil { r.catalog = mock_metastore.NewMockReplicationCatalog(t) } diff --git a/internal/cdc/server.go b/internal/cdc/server.go index 315c944a03..9dfba2e1d1 100644 --- a/internal/cdc/server.go +++ b/internal/cdc/server.go @@ -19,6 +19,8 @@ package cdc import ( "context" + "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/cdc/resource" "github.com/milvus-io/milvus/pkg/v2/log" ) @@ -42,7 +44,11 @@ func (svr *CDCServer) Init() error { // Start starts CDCServer. func (svr *CDCServer) Start() error { - resource.Resource().Controller().Start() + err := resource.Resource().Controller().Start() + if err != nil { + log.Ctx(svr.ctx).Error("start CDC controller failed", zap.Error(err)) + return err + } log.Ctx(svr.ctx).Info("CDCServer start successfully") return nil } diff --git a/internal/distributed/cdc/service.go b/internal/distributed/cdc/service.go index 7fa773b798..40bad895af 100644 --- a/internal/distributed/cdc/service.go +++ b/internal/distributed/cdc/service.go @@ -32,14 +32,12 @@ import ( "github.com/milvus-io/milvus/internal/cdc/replication/replicatemanager" "github.com/milvus-io/milvus/internal/cdc/resource" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" - tikvkv "github.com/milvus-io/milvus/internal/kv/tikv" "github.com/milvus-io/milvus/internal/util/componentutil" kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv" "github.com/milvus-io/milvus/pkg/v2/kv" "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/util" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" - "github.com/milvus-io/milvus/pkg/v2/util/tikv" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) @@ -48,7 +46,7 @@ type Server struct { ctx context.Context cancel context.CancelFunc - metaKV kv.MetaKv + watchKV kv.WatchKV cdcServer *cdc.CDCServer etcdCli *clientv3.Client @@ -146,7 +144,8 @@ func (s *Server) init() (err error) { // Create CDC service. s.cdcServer = cdc.NewCDCServer(s.ctx) resource.Init( - resource.OptMetaKV(s.metaKV), + resource.OptMetaKV(s.watchKV), + resource.OptETCD(s.etcdCli), resource.OptReplicateManagerClient(replicatemanager.NewReplicateManager()), resource.OptController(controllerimpl.NewController()), ) @@ -176,18 +175,11 @@ func (s *Server) initMeta() error { log.Info("cdc connecting to metadata store", zap.String("metaType", metaType)) metaRootPath := "" if metaType == util.MetaStoreTypeTiKV { - var err error - s.tikvCli, err = tikv.GetTiKVClient(¶mtable.Get().TiKVCfg) - if err != nil { - log.Warn("cdc init tikv client failed", zap.Error(err)) - return err - } - metaRootPath = params.TiKVCfg.MetaRootPath.GetValue() - s.metaKV = tikvkv.NewTiKV(s.tikvCli, metaRootPath, - tikvkv.WithRequestTimeout(paramtable.Get().ServiceParam.TiKVCfg.RequestTimeout.GetAsDuration(time.Millisecond))) + // TODO: sheep, support watch operation for tikv + panic("tikv is not supported for cdc") } else if metaType == util.MetaStoreTypeEtcd { metaRootPath = params.EtcdCfg.MetaRootPath.GetValue() - s.metaKV = etcdkv.NewEtcdKV(s.etcdCli, metaRootPath, + s.watchKV = etcdkv.NewEtcdKV(s.etcdCli, metaRootPath, etcdkv.WithRequestTimeout(paramtable.Get().ServiceParam.EtcdCfg.RequestTimeout.GetAsDuration(time.Millisecond))) } return nil diff --git a/pkg/.mockery_pkg.yaml b/pkg/.mockery_pkg.yaml index 8ccf26678e..eb41bffb2d 100644 --- a/pkg/.mockery_pkg.yaml +++ b/pkg/.mockery_pkg.yaml @@ -8,6 +8,7 @@ packages: github.com/milvus-io/milvus/pkg/v2/kv: interfaces: MetaKv: + WatchKV: github.com/milvus-io/milvus/pkg/v2/streaming/util/message: interfaces: MessageID: diff --git a/pkg/mocks/mock_kv/mock_WatchKV.go b/pkg/mocks/mock_kv/mock_WatchKV.go new file mode 100644 index 0000000000..bab311bde0 --- /dev/null +++ b/pkg/mocks/mock_kv/mock_WatchKV.go @@ -0,0 +1,1034 @@ +// Code generated by mockery v2.53.3. DO NOT EDIT. + +package mock_kv + +import ( + context "context" + + clientv3 "go.etcd.io/etcd/client/v3" + + mock "github.com/stretchr/testify/mock" + + predicates "github.com/milvus-io/milvus/pkg/v2/kv/predicates" +) + +// MockWatchKV is an autogenerated mock type for the WatchKV type +type MockWatchKV struct { + mock.Mock +} + +type MockWatchKV_Expecter struct { + mock *mock.Mock +} + +func (_m *MockWatchKV) EXPECT() *MockWatchKV_Expecter { + return &MockWatchKV_Expecter{mock: &_m.Mock} +} + +// Close provides a mock function with no fields +func (_m *MockWatchKV) Close() { + _m.Called() +} + +// MockWatchKV_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close' +type MockWatchKV_Close_Call struct { + *mock.Call +} + +// Close is a helper method to define mock.On call +func (_e *MockWatchKV_Expecter) Close() *MockWatchKV_Close_Call { + return &MockWatchKV_Close_Call{Call: _e.mock.On("Close")} +} + +func (_c *MockWatchKV_Close_Call) Run(run func()) *MockWatchKV_Close_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockWatchKV_Close_Call) Return() *MockWatchKV_Close_Call { + _c.Call.Return() + return _c +} + +func (_c *MockWatchKV_Close_Call) RunAndReturn(run func()) *MockWatchKV_Close_Call { + _c.Run(run) + return _c +} + +// CompareVersionAndSwap provides a mock function with given fields: ctx, key, version, target +func (_m *MockWatchKV) CompareVersionAndSwap(ctx context.Context, key string, version int64, target string) (bool, error) { + ret := _m.Called(ctx, key, version, target) + + if len(ret) == 0 { + panic("no return value specified for CompareVersionAndSwap") + } + + var r0 bool + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, int64, string) (bool, error)); ok { + return rf(ctx, key, version, target) + } + if rf, ok := ret.Get(0).(func(context.Context, string, int64, string) bool); ok { + r0 = rf(ctx, key, version, target) + } else { + r0 = ret.Get(0).(bool) + } + + if rf, ok := ret.Get(1).(func(context.Context, string, int64, string) error); ok { + r1 = rf(ctx, key, version, target) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockWatchKV_CompareVersionAndSwap_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CompareVersionAndSwap' +type MockWatchKV_CompareVersionAndSwap_Call struct { + *mock.Call +} + +// CompareVersionAndSwap is a helper method to define mock.On call +// - ctx context.Context +// - key string +// - version int64 +// - target string +func (_e *MockWatchKV_Expecter) CompareVersionAndSwap(ctx interface{}, key interface{}, version interface{}, target interface{}) *MockWatchKV_CompareVersionAndSwap_Call { + return &MockWatchKV_CompareVersionAndSwap_Call{Call: _e.mock.On("CompareVersionAndSwap", ctx, key, version, target)} +} + +func (_c *MockWatchKV_CompareVersionAndSwap_Call) Run(run func(ctx context.Context, key string, version int64, target string)) *MockWatchKV_CompareVersionAndSwap_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(int64), args[3].(string)) + }) + return _c +} + +func (_c *MockWatchKV_CompareVersionAndSwap_Call) Return(_a0 bool, _a1 error) *MockWatchKV_CompareVersionAndSwap_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockWatchKV_CompareVersionAndSwap_Call) RunAndReturn(run func(context.Context, string, int64, string) (bool, error)) *MockWatchKV_CompareVersionAndSwap_Call { + _c.Call.Return(run) + return _c +} + +// GetPath provides a mock function with given fields: key +func (_m *MockWatchKV) GetPath(key string) string { + ret := _m.Called(key) + + if len(ret) == 0 { + panic("no return value specified for GetPath") + } + + var r0 string + if rf, ok := ret.Get(0).(func(string) string); ok { + r0 = rf(key) + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// MockWatchKV_GetPath_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetPath' +type MockWatchKV_GetPath_Call struct { + *mock.Call +} + +// GetPath is a helper method to define mock.On call +// - key string +func (_e *MockWatchKV_Expecter) GetPath(key interface{}) *MockWatchKV_GetPath_Call { + return &MockWatchKV_GetPath_Call{Call: _e.mock.On("GetPath", key)} +} + +func (_c *MockWatchKV_GetPath_Call) Run(run func(key string)) *MockWatchKV_GetPath_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string)) + }) + return _c +} + +func (_c *MockWatchKV_GetPath_Call) Return(_a0 string) *MockWatchKV_GetPath_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockWatchKV_GetPath_Call) RunAndReturn(run func(string) string) *MockWatchKV_GetPath_Call { + _c.Call.Return(run) + return _c +} + +// Has provides a mock function with given fields: ctx, key +func (_m *MockWatchKV) Has(ctx context.Context, key string) (bool, error) { + ret := _m.Called(ctx, key) + + if len(ret) == 0 { + panic("no return value specified for Has") + } + + var r0 bool + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) (bool, error)); ok { + return rf(ctx, key) + } + if rf, ok := ret.Get(0).(func(context.Context, string) bool); ok { + r0 = rf(ctx, key) + } else { + r0 = ret.Get(0).(bool) + } + + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, key) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockWatchKV_Has_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Has' +type MockWatchKV_Has_Call struct { + *mock.Call +} + +// Has is a helper method to define mock.On call +// - ctx context.Context +// - key string +func (_e *MockWatchKV_Expecter) Has(ctx interface{}, key interface{}) *MockWatchKV_Has_Call { + return &MockWatchKV_Has_Call{Call: _e.mock.On("Has", ctx, key)} +} + +func (_c *MockWatchKV_Has_Call) Run(run func(ctx context.Context, key string)) *MockWatchKV_Has_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *MockWatchKV_Has_Call) Return(_a0 bool, _a1 error) *MockWatchKV_Has_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockWatchKV_Has_Call) RunAndReturn(run func(context.Context, string) (bool, error)) *MockWatchKV_Has_Call { + _c.Call.Return(run) + return _c +} + +// HasPrefix provides a mock function with given fields: ctx, prefix +func (_m *MockWatchKV) HasPrefix(ctx context.Context, prefix string) (bool, error) { + ret := _m.Called(ctx, prefix) + + if len(ret) == 0 { + panic("no return value specified for HasPrefix") + } + + var r0 bool + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) (bool, error)); ok { + return rf(ctx, prefix) + } + if rf, ok := ret.Get(0).(func(context.Context, string) bool); ok { + r0 = rf(ctx, prefix) + } else { + r0 = ret.Get(0).(bool) + } + + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, prefix) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockWatchKV_HasPrefix_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'HasPrefix' +type MockWatchKV_HasPrefix_Call struct { + *mock.Call +} + +// HasPrefix is a helper method to define mock.On call +// - ctx context.Context +// - prefix string +func (_e *MockWatchKV_Expecter) HasPrefix(ctx interface{}, prefix interface{}) *MockWatchKV_HasPrefix_Call { + return &MockWatchKV_HasPrefix_Call{Call: _e.mock.On("HasPrefix", ctx, prefix)} +} + +func (_c *MockWatchKV_HasPrefix_Call) Run(run func(ctx context.Context, prefix string)) *MockWatchKV_HasPrefix_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *MockWatchKV_HasPrefix_Call) Return(_a0 bool, _a1 error) *MockWatchKV_HasPrefix_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockWatchKV_HasPrefix_Call) RunAndReturn(run func(context.Context, string) (bool, error)) *MockWatchKV_HasPrefix_Call { + _c.Call.Return(run) + return _c +} + +// Load provides a mock function with given fields: ctx, key +func (_m *MockWatchKV) Load(ctx context.Context, key string) (string, error) { + ret := _m.Called(ctx, key) + + if len(ret) == 0 { + panic("no return value specified for Load") + } + + var r0 string + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) (string, error)); ok { + return rf(ctx, key) + } + if rf, ok := ret.Get(0).(func(context.Context, string) string); ok { + r0 = rf(ctx, key) + } else { + r0 = ret.Get(0).(string) + } + + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, key) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockWatchKV_Load_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Load' +type MockWatchKV_Load_Call struct { + *mock.Call +} + +// Load is a helper method to define mock.On call +// - ctx context.Context +// - key string +func (_e *MockWatchKV_Expecter) Load(ctx interface{}, key interface{}) *MockWatchKV_Load_Call { + return &MockWatchKV_Load_Call{Call: _e.mock.On("Load", ctx, key)} +} + +func (_c *MockWatchKV_Load_Call) Run(run func(ctx context.Context, key string)) *MockWatchKV_Load_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *MockWatchKV_Load_Call) Return(_a0 string, _a1 error) *MockWatchKV_Load_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockWatchKV_Load_Call) RunAndReturn(run func(context.Context, string) (string, error)) *MockWatchKV_Load_Call { + _c.Call.Return(run) + return _c +} + +// LoadWithPrefix provides a mock function with given fields: ctx, key +func (_m *MockWatchKV) LoadWithPrefix(ctx context.Context, key string) ([]string, []string, error) { + ret := _m.Called(ctx, key) + + if len(ret) == 0 { + panic("no return value specified for LoadWithPrefix") + } + + var r0 []string + var r1 []string + var r2 error + if rf, ok := ret.Get(0).(func(context.Context, string) ([]string, []string, error)); ok { + return rf(ctx, key) + } + if rf, ok := ret.Get(0).(func(context.Context, string) []string); ok { + r0 = rf(ctx, key) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]string) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string) []string); ok { + r1 = rf(ctx, key) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).([]string) + } + } + + if rf, ok := ret.Get(2).(func(context.Context, string) error); ok { + r2 = rf(ctx, key) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// MockWatchKV_LoadWithPrefix_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'LoadWithPrefix' +type MockWatchKV_LoadWithPrefix_Call struct { + *mock.Call +} + +// LoadWithPrefix is a helper method to define mock.On call +// - ctx context.Context +// - key string +func (_e *MockWatchKV_Expecter) LoadWithPrefix(ctx interface{}, key interface{}) *MockWatchKV_LoadWithPrefix_Call { + return &MockWatchKV_LoadWithPrefix_Call{Call: _e.mock.On("LoadWithPrefix", ctx, key)} +} + +func (_c *MockWatchKV_LoadWithPrefix_Call) Run(run func(ctx context.Context, key string)) *MockWatchKV_LoadWithPrefix_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *MockWatchKV_LoadWithPrefix_Call) Return(_a0 []string, _a1 []string, _a2 error) *MockWatchKV_LoadWithPrefix_Call { + _c.Call.Return(_a0, _a1, _a2) + return _c +} + +func (_c *MockWatchKV_LoadWithPrefix_Call) RunAndReturn(run func(context.Context, string) ([]string, []string, error)) *MockWatchKV_LoadWithPrefix_Call { + _c.Call.Return(run) + return _c +} + +// MultiLoad provides a mock function with given fields: ctx, keys +func (_m *MockWatchKV) MultiLoad(ctx context.Context, keys []string) ([]string, error) { + ret := _m.Called(ctx, keys) + + if len(ret) == 0 { + panic("no return value specified for MultiLoad") + } + + var r0 []string + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, []string) ([]string, error)); ok { + return rf(ctx, keys) + } + if rf, ok := ret.Get(0).(func(context.Context, []string) []string); ok { + r0 = rf(ctx, keys) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]string) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, []string) error); ok { + r1 = rf(ctx, keys) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockWatchKV_MultiLoad_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MultiLoad' +type MockWatchKV_MultiLoad_Call struct { + *mock.Call +} + +// MultiLoad is a helper method to define mock.On call +// - ctx context.Context +// - keys []string +func (_e *MockWatchKV_Expecter) MultiLoad(ctx interface{}, keys interface{}) *MockWatchKV_MultiLoad_Call { + return &MockWatchKV_MultiLoad_Call{Call: _e.mock.On("MultiLoad", ctx, keys)} +} + +func (_c *MockWatchKV_MultiLoad_Call) Run(run func(ctx context.Context, keys []string)) *MockWatchKV_MultiLoad_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].([]string)) + }) + return _c +} + +func (_c *MockWatchKV_MultiLoad_Call) Return(_a0 []string, _a1 error) *MockWatchKV_MultiLoad_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockWatchKV_MultiLoad_Call) RunAndReturn(run func(context.Context, []string) ([]string, error)) *MockWatchKV_MultiLoad_Call { + _c.Call.Return(run) + return _c +} + +// MultiRemove provides a mock function with given fields: ctx, keys +func (_m *MockWatchKV) MultiRemove(ctx context.Context, keys []string) error { + ret := _m.Called(ctx, keys) + + if len(ret) == 0 { + panic("no return value specified for MultiRemove") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, []string) error); ok { + r0 = rf(ctx, keys) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockWatchKV_MultiRemove_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MultiRemove' +type MockWatchKV_MultiRemove_Call struct { + *mock.Call +} + +// MultiRemove is a helper method to define mock.On call +// - ctx context.Context +// - keys []string +func (_e *MockWatchKV_Expecter) MultiRemove(ctx interface{}, keys interface{}) *MockWatchKV_MultiRemove_Call { + return &MockWatchKV_MultiRemove_Call{Call: _e.mock.On("MultiRemove", ctx, keys)} +} + +func (_c *MockWatchKV_MultiRemove_Call) Run(run func(ctx context.Context, keys []string)) *MockWatchKV_MultiRemove_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].([]string)) + }) + return _c +} + +func (_c *MockWatchKV_MultiRemove_Call) Return(_a0 error) *MockWatchKV_MultiRemove_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockWatchKV_MultiRemove_Call) RunAndReturn(run func(context.Context, []string) error) *MockWatchKV_MultiRemove_Call { + _c.Call.Return(run) + return _c +} + +// MultiSave provides a mock function with given fields: ctx, kvs +func (_m *MockWatchKV) MultiSave(ctx context.Context, kvs map[string]string) error { + ret := _m.Called(ctx, kvs) + + if len(ret) == 0 { + panic("no return value specified for MultiSave") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, map[string]string) error); ok { + r0 = rf(ctx, kvs) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockWatchKV_MultiSave_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MultiSave' +type MockWatchKV_MultiSave_Call struct { + *mock.Call +} + +// MultiSave is a helper method to define mock.On call +// - ctx context.Context +// - kvs map[string]string +func (_e *MockWatchKV_Expecter) MultiSave(ctx interface{}, kvs interface{}) *MockWatchKV_MultiSave_Call { + return &MockWatchKV_MultiSave_Call{Call: _e.mock.On("MultiSave", ctx, kvs)} +} + +func (_c *MockWatchKV_MultiSave_Call) Run(run func(ctx context.Context, kvs map[string]string)) *MockWatchKV_MultiSave_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(map[string]string)) + }) + return _c +} + +func (_c *MockWatchKV_MultiSave_Call) Return(_a0 error) *MockWatchKV_MultiSave_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockWatchKV_MultiSave_Call) RunAndReturn(run func(context.Context, map[string]string) error) *MockWatchKV_MultiSave_Call { + _c.Call.Return(run) + return _c +} + +// MultiSaveAndRemove provides a mock function with given fields: ctx, saves, removals, preds +func (_m *MockWatchKV) MultiSaveAndRemove(ctx context.Context, saves map[string]string, removals []string, preds ...predicates.Predicate) error { + _va := make([]interface{}, len(preds)) + for _i := range preds { + _va[_i] = preds[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, saves, removals) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + if len(ret) == 0 { + panic("no return value specified for MultiSaveAndRemove") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, map[string]string, []string, ...predicates.Predicate) error); ok { + r0 = rf(ctx, saves, removals, preds...) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockWatchKV_MultiSaveAndRemove_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MultiSaveAndRemove' +type MockWatchKV_MultiSaveAndRemove_Call struct { + *mock.Call +} + +// MultiSaveAndRemove is a helper method to define mock.On call +// - ctx context.Context +// - saves map[string]string +// - removals []string +// - preds ...predicates.Predicate +func (_e *MockWatchKV_Expecter) MultiSaveAndRemove(ctx interface{}, saves interface{}, removals interface{}, preds ...interface{}) *MockWatchKV_MultiSaveAndRemove_Call { + return &MockWatchKV_MultiSaveAndRemove_Call{Call: _e.mock.On("MultiSaveAndRemove", + append([]interface{}{ctx, saves, removals}, preds...)...)} +} + +func (_c *MockWatchKV_MultiSaveAndRemove_Call) Run(run func(ctx context.Context, saves map[string]string, removals []string, preds ...predicates.Predicate)) *MockWatchKV_MultiSaveAndRemove_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]predicates.Predicate, len(args)-3) + for i, a := range args[3:] { + if a != nil { + variadicArgs[i] = a.(predicates.Predicate) + } + } + run(args[0].(context.Context), args[1].(map[string]string), args[2].([]string), variadicArgs...) + }) + return _c +} + +func (_c *MockWatchKV_MultiSaveAndRemove_Call) Return(_a0 error) *MockWatchKV_MultiSaveAndRemove_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockWatchKV_MultiSaveAndRemove_Call) RunAndReturn(run func(context.Context, map[string]string, []string, ...predicates.Predicate) error) *MockWatchKV_MultiSaveAndRemove_Call { + _c.Call.Return(run) + return _c +} + +// MultiSaveAndRemoveWithPrefix provides a mock function with given fields: ctx, saves, removals, preds +func (_m *MockWatchKV) MultiSaveAndRemoveWithPrefix(ctx context.Context, saves map[string]string, removals []string, preds ...predicates.Predicate) error { + _va := make([]interface{}, len(preds)) + for _i := range preds { + _va[_i] = preds[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, saves, removals) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + if len(ret) == 0 { + panic("no return value specified for MultiSaveAndRemoveWithPrefix") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, map[string]string, []string, ...predicates.Predicate) error); ok { + r0 = rf(ctx, saves, removals, preds...) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockWatchKV_MultiSaveAndRemoveWithPrefix_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MultiSaveAndRemoveWithPrefix' +type MockWatchKV_MultiSaveAndRemoveWithPrefix_Call struct { + *mock.Call +} + +// MultiSaveAndRemoveWithPrefix is a helper method to define mock.On call +// - ctx context.Context +// - saves map[string]string +// - removals []string +// - preds ...predicates.Predicate +func (_e *MockWatchKV_Expecter) MultiSaveAndRemoveWithPrefix(ctx interface{}, saves interface{}, removals interface{}, preds ...interface{}) *MockWatchKV_MultiSaveAndRemoveWithPrefix_Call { + return &MockWatchKV_MultiSaveAndRemoveWithPrefix_Call{Call: _e.mock.On("MultiSaveAndRemoveWithPrefix", + append([]interface{}{ctx, saves, removals}, preds...)...)} +} + +func (_c *MockWatchKV_MultiSaveAndRemoveWithPrefix_Call) Run(run func(ctx context.Context, saves map[string]string, removals []string, preds ...predicates.Predicate)) *MockWatchKV_MultiSaveAndRemoveWithPrefix_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]predicates.Predicate, len(args)-3) + for i, a := range args[3:] { + if a != nil { + variadicArgs[i] = a.(predicates.Predicate) + } + } + run(args[0].(context.Context), args[1].(map[string]string), args[2].([]string), variadicArgs...) + }) + return _c +} + +func (_c *MockWatchKV_MultiSaveAndRemoveWithPrefix_Call) Return(_a0 error) *MockWatchKV_MultiSaveAndRemoveWithPrefix_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockWatchKV_MultiSaveAndRemoveWithPrefix_Call) RunAndReturn(run func(context.Context, map[string]string, []string, ...predicates.Predicate) error) *MockWatchKV_MultiSaveAndRemoveWithPrefix_Call { + _c.Call.Return(run) + return _c +} + +// Remove provides a mock function with given fields: ctx, key +func (_m *MockWatchKV) Remove(ctx context.Context, key string) error { + ret := _m.Called(ctx, key) + + if len(ret) == 0 { + panic("no return value specified for Remove") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { + r0 = rf(ctx, key) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockWatchKV_Remove_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Remove' +type MockWatchKV_Remove_Call struct { + *mock.Call +} + +// Remove is a helper method to define mock.On call +// - ctx context.Context +// - key string +func (_e *MockWatchKV_Expecter) Remove(ctx interface{}, key interface{}) *MockWatchKV_Remove_Call { + return &MockWatchKV_Remove_Call{Call: _e.mock.On("Remove", ctx, key)} +} + +func (_c *MockWatchKV_Remove_Call) Run(run func(ctx context.Context, key string)) *MockWatchKV_Remove_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *MockWatchKV_Remove_Call) Return(_a0 error) *MockWatchKV_Remove_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockWatchKV_Remove_Call) RunAndReturn(run func(context.Context, string) error) *MockWatchKV_Remove_Call { + _c.Call.Return(run) + return _c +} + +// RemoveWithPrefix provides a mock function with given fields: ctx, key +func (_m *MockWatchKV) RemoveWithPrefix(ctx context.Context, key string) error { + ret := _m.Called(ctx, key) + + if len(ret) == 0 { + panic("no return value specified for RemoveWithPrefix") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { + r0 = rf(ctx, key) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockWatchKV_RemoveWithPrefix_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemoveWithPrefix' +type MockWatchKV_RemoveWithPrefix_Call struct { + *mock.Call +} + +// RemoveWithPrefix is a helper method to define mock.On call +// - ctx context.Context +// - key string +func (_e *MockWatchKV_Expecter) RemoveWithPrefix(ctx interface{}, key interface{}) *MockWatchKV_RemoveWithPrefix_Call { + return &MockWatchKV_RemoveWithPrefix_Call{Call: _e.mock.On("RemoveWithPrefix", ctx, key)} +} + +func (_c *MockWatchKV_RemoveWithPrefix_Call) Run(run func(ctx context.Context, key string)) *MockWatchKV_RemoveWithPrefix_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *MockWatchKV_RemoveWithPrefix_Call) Return(_a0 error) *MockWatchKV_RemoveWithPrefix_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockWatchKV_RemoveWithPrefix_Call) RunAndReturn(run func(context.Context, string) error) *MockWatchKV_RemoveWithPrefix_Call { + _c.Call.Return(run) + return _c +} + +// Save provides a mock function with given fields: ctx, key, value +func (_m *MockWatchKV) Save(ctx context.Context, key string, value string) error { + ret := _m.Called(ctx, key, value) + + if len(ret) == 0 { + panic("no return value specified for Save") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok { + r0 = rf(ctx, key, value) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockWatchKV_Save_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Save' +type MockWatchKV_Save_Call struct { + *mock.Call +} + +// Save is a helper method to define mock.On call +// - ctx context.Context +// - key string +// - value string +func (_e *MockWatchKV_Expecter) Save(ctx interface{}, key interface{}, value interface{}) *MockWatchKV_Save_Call { + return &MockWatchKV_Save_Call{Call: _e.mock.On("Save", ctx, key, value)} +} + +func (_c *MockWatchKV_Save_Call) Run(run func(ctx context.Context, key string, value string)) *MockWatchKV_Save_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(string)) + }) + return _c +} + +func (_c *MockWatchKV_Save_Call) Return(_a0 error) *MockWatchKV_Save_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockWatchKV_Save_Call) RunAndReturn(run func(context.Context, string, string) error) *MockWatchKV_Save_Call { + _c.Call.Return(run) + return _c +} + +// WalkWithPrefix provides a mock function with given fields: ctx, prefix, paginationSize, fn +func (_m *MockWatchKV) WalkWithPrefix(ctx context.Context, prefix string, paginationSize int, fn func([]byte, []byte) error) error { + ret := _m.Called(ctx, prefix, paginationSize, fn) + + if len(ret) == 0 { + panic("no return value specified for WalkWithPrefix") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, int, func([]byte, []byte) error) error); ok { + r0 = rf(ctx, prefix, paginationSize, fn) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockWatchKV_WalkWithPrefix_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'WalkWithPrefix' +type MockWatchKV_WalkWithPrefix_Call struct { + *mock.Call +} + +// WalkWithPrefix is a helper method to define mock.On call +// - ctx context.Context +// - prefix string +// - paginationSize int +// - fn func([]byte , []byte) error +func (_e *MockWatchKV_Expecter) WalkWithPrefix(ctx interface{}, prefix interface{}, paginationSize interface{}, fn interface{}) *MockWatchKV_WalkWithPrefix_Call { + return &MockWatchKV_WalkWithPrefix_Call{Call: _e.mock.On("WalkWithPrefix", ctx, prefix, paginationSize, fn)} +} + +func (_c *MockWatchKV_WalkWithPrefix_Call) Run(run func(ctx context.Context, prefix string, paginationSize int, fn func([]byte, []byte) error)) *MockWatchKV_WalkWithPrefix_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(int), args[3].(func([]byte, []byte) error)) + }) + return _c +} + +func (_c *MockWatchKV_WalkWithPrefix_Call) Return(_a0 error) *MockWatchKV_WalkWithPrefix_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockWatchKV_WalkWithPrefix_Call) RunAndReturn(run func(context.Context, string, int, func([]byte, []byte) error) error) *MockWatchKV_WalkWithPrefix_Call { + _c.Call.Return(run) + return _c +} + +// Watch provides a mock function with given fields: ctx, key +func (_m *MockWatchKV) Watch(ctx context.Context, key string) clientv3.WatchChan { + ret := _m.Called(ctx, key) + + if len(ret) == 0 { + panic("no return value specified for Watch") + } + + var r0 clientv3.WatchChan + if rf, ok := ret.Get(0).(func(context.Context, string) clientv3.WatchChan); ok { + r0 = rf(ctx, key) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(clientv3.WatchChan) + } + } + + return r0 +} + +// MockWatchKV_Watch_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Watch' +type MockWatchKV_Watch_Call struct { + *mock.Call +} + +// Watch is a helper method to define mock.On call +// - ctx context.Context +// - key string +func (_e *MockWatchKV_Expecter) Watch(ctx interface{}, key interface{}) *MockWatchKV_Watch_Call { + return &MockWatchKV_Watch_Call{Call: _e.mock.On("Watch", ctx, key)} +} + +func (_c *MockWatchKV_Watch_Call) Run(run func(ctx context.Context, key string)) *MockWatchKV_Watch_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *MockWatchKV_Watch_Call) Return(_a0 clientv3.WatchChan) *MockWatchKV_Watch_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockWatchKV_Watch_Call) RunAndReturn(run func(context.Context, string) clientv3.WatchChan) *MockWatchKV_Watch_Call { + _c.Call.Return(run) + return _c +} + +// WatchWithPrefix provides a mock function with given fields: ctx, key +func (_m *MockWatchKV) WatchWithPrefix(ctx context.Context, key string) clientv3.WatchChan { + ret := _m.Called(ctx, key) + + if len(ret) == 0 { + panic("no return value specified for WatchWithPrefix") + } + + var r0 clientv3.WatchChan + if rf, ok := ret.Get(0).(func(context.Context, string) clientv3.WatchChan); ok { + r0 = rf(ctx, key) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(clientv3.WatchChan) + } + } + + return r0 +} + +// MockWatchKV_WatchWithPrefix_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'WatchWithPrefix' +type MockWatchKV_WatchWithPrefix_Call struct { + *mock.Call +} + +// WatchWithPrefix is a helper method to define mock.On call +// - ctx context.Context +// - key string +func (_e *MockWatchKV_Expecter) WatchWithPrefix(ctx interface{}, key interface{}) *MockWatchKV_WatchWithPrefix_Call { + return &MockWatchKV_WatchWithPrefix_Call{Call: _e.mock.On("WatchWithPrefix", ctx, key)} +} + +func (_c *MockWatchKV_WatchWithPrefix_Call) Run(run func(ctx context.Context, key string)) *MockWatchKV_WatchWithPrefix_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *MockWatchKV_WatchWithPrefix_Call) Return(_a0 clientv3.WatchChan) *MockWatchKV_WatchWithPrefix_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockWatchKV_WatchWithPrefix_Call) RunAndReturn(run func(context.Context, string) clientv3.WatchChan) *MockWatchKV_WatchWithPrefix_Call { + _c.Call.Return(run) + return _c +} + +// WatchWithRevision provides a mock function with given fields: ctx, key, revision +func (_m *MockWatchKV) WatchWithRevision(ctx context.Context, key string, revision int64) clientv3.WatchChan { + ret := _m.Called(ctx, key, revision) + + if len(ret) == 0 { + panic("no return value specified for WatchWithRevision") + } + + var r0 clientv3.WatchChan + if rf, ok := ret.Get(0).(func(context.Context, string, int64) clientv3.WatchChan); ok { + r0 = rf(ctx, key, revision) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(clientv3.WatchChan) + } + } + + return r0 +} + +// MockWatchKV_WatchWithRevision_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'WatchWithRevision' +type MockWatchKV_WatchWithRevision_Call struct { + *mock.Call +} + +// WatchWithRevision is a helper method to define mock.On call +// - ctx context.Context +// - key string +// - revision int64 +func (_e *MockWatchKV_Expecter) WatchWithRevision(ctx interface{}, key interface{}, revision interface{}) *MockWatchKV_WatchWithRevision_Call { + return &MockWatchKV_WatchWithRevision_Call{Call: _e.mock.On("WatchWithRevision", ctx, key, revision)} +} + +func (_c *MockWatchKV_WatchWithRevision_Call) Run(run func(ctx context.Context, key string, revision int64)) *MockWatchKV_WatchWithRevision_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(int64)) + }) + return _c +} + +func (_c *MockWatchKV_WatchWithRevision_Call) Return(_a0 clientv3.WatchChan) *MockWatchKV_WatchWithRevision_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockWatchKV_WatchWithRevision_Call) RunAndReturn(run func(context.Context, string, int64) clientv3.WatchChan) *MockWatchKV_WatchWithRevision_Call { + _c.Call.Return(run) + return _c +} + +// NewMockWatchKV creates a new instance of MockWatchKV. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockWatchKV(t interface { + mock.TestingT + Cleanup(func()) +}) *MockWatchKV { + mock := &MockWatchKV{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +}