fix: Fix primary-secondary replication switch blocking (#44898)

1. Fix primary-secondary replication switchover blocking by delete
replicate pchannel meta using modRevision.
2. Stop channel replicator(scanner) when cluster role changes to prevent
continued message consumption and replication.
3. Close Milvus client to prevent goroutine leak.
4. Create Milvus client once for a channel replicator.
5. Simplify CDC controller and resources.

issue: https://github.com/milvus-io/milvus/issues/44123

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
yihao.dai 2025-10-22 10:12:04 +08:00 committed by GitHub
parent 3d11ba06ef
commit a0f6b31380
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
29 changed files with 694 additions and 1277 deletions

View File

@ -19,6 +19,3 @@ packages:
github.com/milvus-io/milvus/internal/cdc/replication/replicatemanager:
interfaces:
ChannelReplicator:
github.com/milvus-io/milvus/internal/cdc/controller:
interfaces:
Controller:

View File

@ -1,51 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cluster
import (
"context"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/client/v2/milvusclient"
"github.com/milvus-io/milvus/pkg/v2/log"
)
type ClusterClient interface {
CreateMilvusClient(ctx context.Context, cluster *commonpb.MilvusCluster) (MilvusClient, error)
}
var _ ClusterClient = (*clusterClient)(nil)
type clusterClient struct{}
func NewClusterClient() ClusterClient {
return &clusterClient{}
}
func (c *clusterClient) CreateMilvusClient(ctx context.Context, cluster *commonpb.MilvusCluster) (MilvusClient, error) {
cli, err := milvusclient.New(ctx, &milvusclient.ClientConfig{
Address: cluster.GetConnectionParam().GetUri(),
APIKey: cluster.GetConnectionParam().GetToken(),
})
if err != nil {
log.Warn("failed to create milvus client", zap.Error(err))
return nil, err
}
return cli, nil
}

View File

@ -19,9 +19,12 @@ package cluster
import (
"context"
"github.com/cockroachdb/errors"
"google.golang.org/grpc"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/client/v2/milvusclient"
)
type MilvusClient interface {
@ -32,3 +35,16 @@ type MilvusClient interface {
// Close closes the milvus client.
Close(ctx context.Context) error
}
type CreateMilvusClientFunc func(ctx context.Context, cluster *commonpb.MilvusCluster) (MilvusClient, error)
func NewMilvusClient(ctx context.Context, cluster *commonpb.MilvusCluster) (MilvusClient, error) {
cli, err := milvusclient.New(ctx, &milvusclient.ClientConfig{
Address: cluster.GetConnectionParam().GetUri(),
APIKey: cluster.GetConnectionParam().GetToken(),
})
if err != nil {
return nil, errors.Wrap(err, "failed to create milvus client")
}
return cli, nil
}

View File

@ -1,97 +0,0 @@
// Code generated by mockery v2.53.3. DO NOT EDIT.
package cluster
import (
context "context"
commonpb "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
mock "github.com/stretchr/testify/mock"
)
// MockClusterClient is an autogenerated mock type for the ClusterClient type
type MockClusterClient struct {
mock.Mock
}
type MockClusterClient_Expecter struct {
mock *mock.Mock
}
func (_m *MockClusterClient) EXPECT() *MockClusterClient_Expecter {
return &MockClusterClient_Expecter{mock: &_m.Mock}
}
// CreateMilvusClient provides a mock function with given fields: ctx, _a1
func (_m *MockClusterClient) CreateMilvusClient(ctx context.Context, _a1 *commonpb.MilvusCluster) (MilvusClient, error) {
ret := _m.Called(ctx, _a1)
if len(ret) == 0 {
panic("no return value specified for CreateMilvusClient")
}
var r0 MilvusClient
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *commonpb.MilvusCluster) (MilvusClient, error)); ok {
return rf(ctx, _a1)
}
if rf, ok := ret.Get(0).(func(context.Context, *commonpb.MilvusCluster) MilvusClient); ok {
r0 = rf(ctx, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(MilvusClient)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *commonpb.MilvusCluster) error); ok {
r1 = rf(ctx, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockClusterClient_CreateMilvusClient_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CreateMilvusClient'
type MockClusterClient_CreateMilvusClient_Call struct {
*mock.Call
}
// CreateMilvusClient is a helper method to define mock.On call
// - ctx context.Context
// - _a1 *commonpb.MilvusCluster
func (_e *MockClusterClient_Expecter) CreateMilvusClient(ctx interface{}, _a1 interface{}) *MockClusterClient_CreateMilvusClient_Call {
return &MockClusterClient_CreateMilvusClient_Call{Call: _e.mock.On("CreateMilvusClient", ctx, _a1)}
}
func (_c *MockClusterClient_CreateMilvusClient_Call) Run(run func(ctx context.Context, _a1 *commonpb.MilvusCluster)) *MockClusterClient_CreateMilvusClient_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*commonpb.MilvusCluster))
})
return _c
}
func (_c *MockClusterClient_CreateMilvusClient_Call) Return(_a0 MilvusClient, _a1 error) *MockClusterClient_CreateMilvusClient_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockClusterClient_CreateMilvusClient_Call) RunAndReturn(run func(context.Context, *commonpb.MilvusCluster) (MilvusClient, error)) *MockClusterClient_CreateMilvusClient_Call {
_c.Call.Return(run)
return _c
}
// NewMockClusterClient creates a new instance of MockClusterClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockClusterClient(t interface {
mock.TestingT
Cleanup(func())
}) *MockClusterClient {
mock := &MockClusterClient{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -16,9 +16,159 @@
package controller
import (
"context"
"path"
"strings"
"sync"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/cdc/meta"
"github.com/milvus-io/milvus/internal/cdc/resource"
"github.com/milvus-io/milvus/internal/metastore/kv/streamingcoord"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)
// Controller controls and schedules the CDC process.
// It will periodically update the replications by the replicate configuration.
type Controller interface {
Start() error
Stop()
}
type controller struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
prefix string
}
func NewController() *controller {
ctx, cancel := context.WithCancel(context.Background())
return &controller{
ctx: ctx,
cancel: cancel,
prefix: path.Join(
paramtable.Get().EtcdCfg.MetaRootPath.GetValue(),
streamingcoord.ReplicatePChannelMetaPrefix,
),
}
}
func (c *controller) Start() error {
c.startWatchLoop()
return nil
}
func (c *controller) recoverReplicatePChannelMeta(channels []*meta.ReplicateChannel) {
currentClusterID := paramtable.Get().CommonCfg.ClusterPrefix.GetValue()
for _, channelMeta := range channels {
if !strings.Contains(channelMeta.Value.GetSourceChannelName(), currentClusterID) {
// current cluster is not source cluster, skip create replicator
continue
}
log.Info("recover replicate pchannel meta",
zap.String("key", channelMeta.Key),
zap.Int64("revision", channelMeta.ModRevision),
)
channel := &meta.ReplicateChannel{
Key: channelMeta.Key,
Value: channelMeta.Value,
ModRevision: channelMeta.ModRevision,
}
resource.Resource().ReplicateManagerClient().CreateReplicator(channel)
}
resource.Resource().ReplicateManagerClient().RemoveOutdatedReplicators(channels)
}
func (c *controller) watchEvents(revision int64) clientv3.WatchChan {
eventCh := resource.Resource().ETCD().Watch(
c.ctx,
c.prefix,
clientv3.WithPrefix(),
clientv3.WithPrevKV(),
clientv3.WithRev(revision),
)
log.Ctx(c.ctx).Info("succeed to watch replicate pchannel meta events",
zap.Int64("revision", revision), zap.String("prefix", c.prefix))
return eventCh
}
func (c *controller) startWatchLoop() {
c.wg.Add(1)
go func() {
defer c.wg.Done()
for {
m, err := meta.ListReplicatePChannels(c.ctx, resource.Resource().ETCD(), c.prefix)
if err != nil && c.ctx.Err() == nil {
log.Ctx(c.ctx).Warn("failed to list replicate pchannels", zap.Error(err))
continue
}
c.recoverReplicatePChannelMeta(m.Channels)
eventCh := c.watchEvents(m.Revision + 1)
err = c.watchLoop(eventCh)
if err == nil {
break
}
}
}()
}
func (c *controller) watchLoop(eventCh clientv3.WatchChan) error {
for {
select {
case <-c.ctx.Done():
return nil
case event, ok := <-eventCh:
if !ok {
panic("etcd event channel closed")
}
if err := event.Err(); err != nil {
log.Warn("etcd event error", zap.Error(err))
return err
}
for _, e := range event.Events {
switch e.Type {
case mvccpb.PUT:
log.Info("handle replicate pchannel PUT event",
zap.String("key", string(e.Kv.Key)),
zap.Int64("modRevision", e.Kv.ModRevision),
)
currentClusterID := paramtable.Get().CommonCfg.ClusterPrefix.GetValue()
replicate := meta.MustParseReplicateChannelFromEvent(e)
if !strings.Contains(replicate.GetSourceChannelName(), currentClusterID) {
// current cluster is not source cluster, skip create replicator
continue
}
channel := &meta.ReplicateChannel{
Key: string(e.Kv.Key),
Value: replicate,
ModRevision: e.Kv.ModRevision,
}
resource.Resource().ReplicateManagerClient().CreateReplicator(channel)
case mvccpb.DELETE:
log.Info("handle replicate pchannel DELETE event",
zap.String("key", string(e.Kv.Key)),
zap.Int64("prevModRevision", e.PrevKv.ModRevision),
)
key := string(e.Kv.Key)
revision := e.PrevKv.ModRevision
resource.Resource().ReplicateManagerClient().RemoveReplicator(key, revision)
}
}
}
}
}
func (c *controller) Stop() {
log.Ctx(c.ctx).Info("stop CDC controller...")
c.cancel()
c.wg.Wait()
resource.Resource().ReplicateManagerClient().Close()
log.Ctx(c.ctx).Info("CDC controller stopped")
}

View File

@ -14,18 +14,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package controllerimpl
package controller
import (
"testing"
"time"
"github.com/stretchr/testify/mock"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus/internal/cdc/meta"
"github.com/milvus-io/milvus/internal/cdc/replication"
"github.com/milvus-io/milvus/internal/cdc/resource"
"github.com/milvus-io/milvus/internal/metastore/kv/streamingcoord"
"github.com/milvus-io/milvus/pkg/v2/proto/streamingpb"
)
@ -44,6 +47,7 @@ func TestController_StartAndStop_WithEvents(t *testing.T) {
putEvent := &clientv3.Event{
Type: mvccpb.PUT,
Kv: &mvccpb.KeyValue{
Key: []byte(streamingcoord.BuildReplicatePChannelMetaKey(replicateMeta)),
Value: metaBytes,
},
}
@ -51,6 +55,9 @@ func TestController_StartAndStop_WithEvents(t *testing.T) {
deleteEvent := &clientv3.Event{
Type: mvccpb.DELETE,
Kv: &mvccpb.KeyValue{
Key: []byte(streamingcoord.BuildReplicatePChannelMetaKey(replicateMeta)),
},
PrevKv: &mvccpb.KeyValue{
Value: metaBytes,
},
}
@ -67,10 +74,10 @@ func TestController_StartAndStop_WithEvents(t *testing.T) {
}()
notifyCh := make(chan struct{}, 2)
mockReplicateManagerClient.EXPECT().CreateReplicator(mock.Anything).RunAndReturn(func(replicate *streamingpb.ReplicatePChannelMeta) {
mockReplicateManagerClient.EXPECT().CreateReplicator(mock.Anything).RunAndReturn(func(replicate *meta.ReplicateChannel) {
notifyCh <- struct{}{}
})
mockReplicateManagerClient.EXPECT().RemoveReplicator(mock.Anything).RunAndReturn(func(replicate *streamingpb.ReplicatePChannelMeta) {
mockReplicateManagerClient.EXPECT().RemoveReplicator(mock.Anything, mock.Anything).RunAndReturn(func(key string, modRevision int64) {
notifyCh <- struct{}{}
})
@ -81,9 +88,11 @@ func TestController_StartAndStop_WithEvents(t *testing.T) {
ctrl := NewController()
go ctrl.watchLoop(eventCh)
// Wait for events to be processed
<-notifyCh
<-notifyCh
// Wait for put event to be processed
select {
case <-notifyCh:
case <-time.After(10 * time.Second):
t.Fatal("timeout waiting for put event")
}
ctrl.Stop()
}

View File

@ -1,152 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package controllerimpl
import (
"context"
"path"
"strings"
"sync"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/cdc/resource"
"github.com/milvus-io/milvus/internal/metastore/kv/streamingcoord"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)
type controller struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
prefix string
}
func NewController() *controller {
ctx, cancel := context.WithCancel(context.Background())
return &controller{
ctx: ctx,
cancel: cancel,
prefix: path.Join(
paramtable.Get().EtcdCfg.MetaRootPath.GetValue(),
streamingcoord.ReplicatePChannelMetaPrefix,
),
}
}
func (c *controller) Start() error {
c.startWatchLoop()
return nil
}
func (c *controller) recoverReplicatePChannelMeta(replicatePChannels []*streamingpb.ReplicatePChannelMeta) {
currentClusterID := paramtable.Get().CommonCfg.ClusterPrefix.GetValue()
for _, replicate := range replicatePChannels {
if !strings.Contains(replicate.GetSourceChannelName(), currentClusterID) {
// current cluster is not source cluster, skip create replicator
continue
}
log.Info("recover replicate pchannel meta",
zap.String("sourceChannel", replicate.GetSourceChannelName()),
zap.String("targetChannel", replicate.GetTargetChannelName()),
)
// Replicate manager ensures the idempotency of the creation.
resource.Resource().ReplicateManagerClient().CreateReplicator(replicate)
}
resource.Resource().ReplicateManagerClient().RemoveOutdatedReplicators(replicatePChannels)
}
func (c *controller) watchEvents(revision int64) clientv3.WatchChan {
eventCh := resource.Resource().ETCD().Watch(
c.ctx,
c.prefix,
clientv3.WithPrefix(),
clientv3.WithRev(revision),
)
log.Ctx(c.ctx).Info("succeed to watch replicate pchannel meta events",
zap.Int64("revision", revision), zap.String("prefix", c.prefix))
return eventCh
}
func (c *controller) startWatchLoop() {
c.wg.Add(1)
go func() {
defer c.wg.Done()
for {
channels, revision, err := ListReplicatePChannels(c.ctx, resource.Resource().ETCD(), c.prefix)
if err != nil && c.ctx.Err() == nil {
log.Ctx(c.ctx).Warn("failed to list replicate pchannels", zap.Error(err))
continue
}
c.recoverReplicatePChannelMeta(channels)
eventCh := c.watchEvents(revision)
err = c.watchLoop(eventCh)
if err == nil {
break
}
}
}()
}
func (c *controller) watchLoop(eventCh clientv3.WatchChan) error {
for {
select {
case <-c.ctx.Done():
return nil
case event, ok := <-eventCh:
if !ok {
panic("etcd event channel closed")
}
if err := event.Err(); err != nil {
log.Warn("etcd event error", zap.Error(err))
return err
}
for _, e := range event.Events {
replicate := MustParseReplicateChannelFromEvent(e)
log.Info("handle replicate pchannel event",
zap.String("sourceChannel", replicate.GetSourceChannelName()),
zap.String("targetChannel", replicate.GetTargetChannelName()),
zap.String("eventType", e.Type.String()),
)
switch e.Type {
case mvccpb.PUT:
currentClusterID := paramtable.Get().CommonCfg.ClusterPrefix.GetValue()
if !strings.Contains(replicate.GetSourceChannelName(), currentClusterID) {
// current cluster is not source cluster, skip create replicator
continue
}
resource.Resource().ReplicateManagerClient().CreateReplicator(replicate)
case mvccpb.DELETE:
resource.Resource().ReplicateManagerClient().RemoveReplicator(replicate)
}
}
}
}
}
func (c *controller) Stop() {
log.Ctx(c.ctx).Info("stop CDC controller...")
c.cancel()
c.wg.Wait()
resource.Resource().ReplicateManagerClient().Close()
log.Ctx(c.ctx).Info("CDC controller stopped")
}

View File

@ -1,56 +0,0 @@
package controllerimpl
import (
"context"
"fmt"
"time"
"github.com/cockroachdb/errors"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus/pkg/v2/proto/streamingpb"
)
func ListReplicatePChannels(ctx context.Context, etcdCli *clientv3.Client, prefix string) ([]*streamingpb.ReplicatePChannelMeta, int64, error) {
resp, err := get(ctx, etcdCli, prefix)
if err != nil {
return nil, 0, err
}
keys := make([]string, 0, resp.Count)
values := make([]string, 0, resp.Count)
for _, kv := range resp.Kvs {
keys = append(keys, string(kv.Key))
values = append(values, string(kv.Value))
}
channels := make([]*streamingpb.ReplicatePChannelMeta, 0, len(values))
for k, value := range values {
info := &streamingpb.ReplicatePChannelMeta{}
err = proto.Unmarshal([]byte(value), info)
if err != nil {
return nil, 0, errors.Wrapf(err, "unmarshal replicate pchannel meta %s failed", keys[k])
}
channels = append(channels, info)
}
return channels, resp.Header.Revision, nil
}
func MustParseReplicateChannelFromEvent(e *clientv3.Event) *streamingpb.ReplicatePChannelMeta {
meta := &streamingpb.ReplicatePChannelMeta{}
err := proto.Unmarshal(e.Kv.Value, meta)
if err != nil {
panic(fmt.Sprintf("failed to unmarshal replicate pchannel meta: %v", err))
}
return meta
}
func get(ctx context.Context, etcdCli *clientv3.Client, prefix string) (*clientv3.GetResponse, error) {
ctx1, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
opts := []clientv3.OpOption{
clientv3.WithPrefix(),
clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend),
}
return etcdCli.Get(ctx1, prefix, opts...)
}

View File

@ -1,109 +0,0 @@
// Code generated by mockery v2.53.3. DO NOT EDIT.
package controller
import mock "github.com/stretchr/testify/mock"
// MockController is an autogenerated mock type for the Controller type
type MockController struct {
mock.Mock
}
type MockController_Expecter struct {
mock *mock.Mock
}
func (_m *MockController) EXPECT() *MockController_Expecter {
return &MockController_Expecter{mock: &_m.Mock}
}
// Start provides a mock function with no fields
func (_m *MockController) Start() error {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for Start")
}
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
// MockController_Start_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Start'
type MockController_Start_Call struct {
*mock.Call
}
// Start is a helper method to define mock.On call
func (_e *MockController_Expecter) Start() *MockController_Start_Call {
return &MockController_Start_Call{Call: _e.mock.On("Start")}
}
func (_c *MockController_Start_Call) Run(run func()) *MockController_Start_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockController_Start_Call) Return(_a0 error) *MockController_Start_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockController_Start_Call) RunAndReturn(run func() error) *MockController_Start_Call {
_c.Call.Return(run)
return _c
}
// Stop provides a mock function with no fields
func (_m *MockController) Stop() {
_m.Called()
}
// MockController_Stop_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Stop'
type MockController_Stop_Call struct {
*mock.Call
}
// Stop is a helper method to define mock.On call
func (_e *MockController_Expecter) Stop() *MockController_Stop_Call {
return &MockController_Stop_Call{Call: _e.mock.On("Stop")}
}
func (_c *MockController_Stop_Call) Run(run func()) *MockController_Stop_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockController_Stop_Call) Return() *MockController_Stop_Call {
_c.Call.Return()
return _c
}
func (_c *MockController_Stop_Call) RunAndReturn(run func()) *MockController_Stop_Call {
_c.Run(run)
return _c
}
// NewMockController creates a new instance of MockController. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockController(t interface {
mock.TestingT
Cleanup(func())
}) *MockController {
mock := &MockController{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -0,0 +1,111 @@
package meta
import (
"context"
"fmt"
"time"
"github.com/cockroachdb/errors"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus/pkg/v2/proto/streamingpb"
)
const requestTimeout = 30 * time.Second
type ReplicateChannel struct {
Key string
Value *streamingpb.ReplicatePChannelMeta
ModRevision int64
}
type ReplicateChannels struct {
Channels []*ReplicateChannel
Revision int64
}
// ListReplicatePChannels lists the replicate pchannels metadata from metastore.
func ListReplicatePChannels(ctx context.Context, etcdCli *clientv3.Client, prefix string) (*ReplicateChannels, error) {
resp, err := listByPrefix(ctx, etcdCli, prefix)
if err != nil {
return nil, err
}
keys := make([]string, 0, resp.Count)
values := make([]string, 0, resp.Count)
revisions := make([]int64, 0, resp.Count)
for _, kv := range resp.Kvs {
keys = append(keys, string(kv.Key))
values = append(values, string(kv.Value))
revisions = append(revisions, kv.ModRevision)
}
channels := make([]*ReplicateChannel, 0, len(values))
for k, value := range values {
meta := &streamingpb.ReplicatePChannelMeta{}
err = proto.Unmarshal([]byte(value), meta)
if err != nil {
return nil, errors.Wrapf(err, "unmarshal replicate pchannel meta %s failed", keys[k])
}
channel := &ReplicateChannel{
Key: keys[k],
Value: meta,
ModRevision: revisions[k],
}
channels = append(channels, channel)
}
return &ReplicateChannels{
Channels: channels,
Revision: resp.Header.Revision,
}, nil
}
// RemoveReplicatePChannelWithRevision removes the replicate pchannel from metastore.
// Remove the task of CDC replication task of current cluster, should be called when a CDC replication task is finished.
// Because CDC removes the replicate pchannel meta asynchronously, there may be ordering conflicts
// during primary-secondary switches. For example:
// 1. Init: A -> B, Save key
// 2. Switch: B -> A, Delete key asynchronously
// 3. Switch again: A -> B, Save key
// Expected meta op order: [Save, Delete, Save]
// Actual meta op may be: [Save, Save, Delete]
// To avoid ordering conflicts, we need to pass the keys revision
// and only remove the KV when the revision matches.
func RemoveReplicatePChannelWithRevision(ctx context.Context, etcdCli *clientv3.Client, key string, revision int64) (bool, error) {
result, err := removeWithCmps(ctx, etcdCli, key, clientv3.Compare(clientv3.ModRevision(key), "=", revision))
return result, err
}
func MustParseReplicateChannelFromEvent(e *clientv3.Event) *streamingpb.ReplicatePChannelMeta {
meta := &streamingpb.ReplicatePChannelMeta{}
err := proto.Unmarshal(e.Kv.Value, meta)
if err != nil {
panic(fmt.Sprintf("failed to unmarshal replicate pchannel meta: %v", err))
}
return meta
}
// listByPrefix gets the key-value pairs with the given prefix.
func listByPrefix(ctx context.Context, etcdCli *clientv3.Client, prefix string) (*clientv3.GetResponse, error) {
ctx1, cancel := context.WithTimeout(ctx, requestTimeout)
defer cancel()
opts := []clientv3.OpOption{
clientv3.WithPrefix(),
clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend),
}
return etcdCli.Get(ctx1, prefix, opts...)
}
// removeWithCmps removes the key with given cmps.
func removeWithCmps(ctx context.Context, etcdCli *clientv3.Client, key string, cmps ...clientv3.Cmp) (bool, error) {
ctx, cancel := context.WithTimeout(ctx, requestTimeout)
defer cancel()
delOp := clientv3.OpDelete(key)
txn := etcdCli.Txn(ctx).If(cmps...).Then(delOp)
resp, err := txn.Commit()
if err != nil {
return false, err
}
return resp.Succeeded, nil
}

View File

@ -3,7 +3,7 @@
package replication
import (
streamingpb "github.com/milvus-io/milvus/pkg/v2/proto/streamingpb"
meta "github.com/milvus-io/milvus/internal/cdc/meta"
mock "github.com/stretchr/testify/mock"
)
@ -52,9 +52,9 @@ func (_c *MockReplicateManagerClient_Close_Call) RunAndReturn(run func()) *MockR
return _c
}
// CreateReplicator provides a mock function with given fields: replicateInfo
func (_m *MockReplicateManagerClient) CreateReplicator(replicateInfo *streamingpb.ReplicatePChannelMeta) {
_m.Called(replicateInfo)
// CreateReplicator provides a mock function with given fields: channel
func (_m *MockReplicateManagerClient) CreateReplicator(channel *meta.ReplicateChannel) {
_m.Called(channel)
}
// MockReplicateManagerClient_CreateReplicator_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CreateReplicator'
@ -63,14 +63,14 @@ type MockReplicateManagerClient_CreateReplicator_Call struct {
}
// CreateReplicator is a helper method to define mock.On call
// - replicateInfo *streamingpb.ReplicatePChannelMeta
func (_e *MockReplicateManagerClient_Expecter) CreateReplicator(replicateInfo interface{}) *MockReplicateManagerClient_CreateReplicator_Call {
return &MockReplicateManagerClient_CreateReplicator_Call{Call: _e.mock.On("CreateReplicator", replicateInfo)}
// - channel *meta.ReplicateChannel
func (_e *MockReplicateManagerClient_Expecter) CreateReplicator(channel interface{}) *MockReplicateManagerClient_CreateReplicator_Call {
return &MockReplicateManagerClient_CreateReplicator_Call{Call: _e.mock.On("CreateReplicator", channel)}
}
func (_c *MockReplicateManagerClient_CreateReplicator_Call) Run(run func(replicateInfo *streamingpb.ReplicatePChannelMeta)) *MockReplicateManagerClient_CreateReplicator_Call {
func (_c *MockReplicateManagerClient_CreateReplicator_Call) Run(run func(channel *meta.ReplicateChannel)) *MockReplicateManagerClient_CreateReplicator_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(*streamingpb.ReplicatePChannelMeta))
run(args[0].(*meta.ReplicateChannel))
})
return _c
}
@ -80,14 +80,14 @@ func (_c *MockReplicateManagerClient_CreateReplicator_Call) Return() *MockReplic
return _c
}
func (_c *MockReplicateManagerClient_CreateReplicator_Call) RunAndReturn(run func(*streamingpb.ReplicatePChannelMeta)) *MockReplicateManagerClient_CreateReplicator_Call {
func (_c *MockReplicateManagerClient_CreateReplicator_Call) RunAndReturn(run func(*meta.ReplicateChannel)) *MockReplicateManagerClient_CreateReplicator_Call {
_c.Run(run)
return _c
}
// RemoveOutdatedReplicators provides a mock function with given fields: aliveReplicates
func (_m *MockReplicateManagerClient) RemoveOutdatedReplicators(aliveReplicates []*streamingpb.ReplicatePChannelMeta) {
_m.Called(aliveReplicates)
// RemoveOutdatedReplicators provides a mock function with given fields: aliveChannels
func (_m *MockReplicateManagerClient) RemoveOutdatedReplicators(aliveChannels []*meta.ReplicateChannel) {
_m.Called(aliveChannels)
}
// MockReplicateManagerClient_RemoveOutdatedReplicators_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemoveOutdatedReplicators'
@ -96,14 +96,14 @@ type MockReplicateManagerClient_RemoveOutdatedReplicators_Call struct {
}
// RemoveOutdatedReplicators is a helper method to define mock.On call
// - aliveReplicates []*streamingpb.ReplicatePChannelMeta
func (_e *MockReplicateManagerClient_Expecter) RemoveOutdatedReplicators(aliveReplicates interface{}) *MockReplicateManagerClient_RemoveOutdatedReplicators_Call {
return &MockReplicateManagerClient_RemoveOutdatedReplicators_Call{Call: _e.mock.On("RemoveOutdatedReplicators", aliveReplicates)}
// - aliveChannels []*meta.ReplicateChannel
func (_e *MockReplicateManagerClient_Expecter) RemoveOutdatedReplicators(aliveChannels interface{}) *MockReplicateManagerClient_RemoveOutdatedReplicators_Call {
return &MockReplicateManagerClient_RemoveOutdatedReplicators_Call{Call: _e.mock.On("RemoveOutdatedReplicators", aliveChannels)}
}
func (_c *MockReplicateManagerClient_RemoveOutdatedReplicators_Call) Run(run func(aliveReplicates []*streamingpb.ReplicatePChannelMeta)) *MockReplicateManagerClient_RemoveOutdatedReplicators_Call {
func (_c *MockReplicateManagerClient_RemoveOutdatedReplicators_Call) Run(run func(aliveChannels []*meta.ReplicateChannel)) *MockReplicateManagerClient_RemoveOutdatedReplicators_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].([]*streamingpb.ReplicatePChannelMeta))
run(args[0].([]*meta.ReplicateChannel))
})
return _c
}
@ -113,14 +113,14 @@ func (_c *MockReplicateManagerClient_RemoveOutdatedReplicators_Call) Return() *M
return _c
}
func (_c *MockReplicateManagerClient_RemoveOutdatedReplicators_Call) RunAndReturn(run func([]*streamingpb.ReplicatePChannelMeta)) *MockReplicateManagerClient_RemoveOutdatedReplicators_Call {
func (_c *MockReplicateManagerClient_RemoveOutdatedReplicators_Call) RunAndReturn(run func([]*meta.ReplicateChannel)) *MockReplicateManagerClient_RemoveOutdatedReplicators_Call {
_c.Run(run)
return _c
}
// RemoveReplicator provides a mock function with given fields: replicateInfo
func (_m *MockReplicateManagerClient) RemoveReplicator(replicateInfo *streamingpb.ReplicatePChannelMeta) {
_m.Called(replicateInfo)
// RemoveReplicator provides a mock function with given fields: key, modRevision
func (_m *MockReplicateManagerClient) RemoveReplicator(key string, modRevision int64) {
_m.Called(key, modRevision)
}
// MockReplicateManagerClient_RemoveReplicator_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemoveReplicator'
@ -129,14 +129,15 @@ type MockReplicateManagerClient_RemoveReplicator_Call struct {
}
// RemoveReplicator is a helper method to define mock.On call
// - replicateInfo *streamingpb.ReplicatePChannelMeta
func (_e *MockReplicateManagerClient_Expecter) RemoveReplicator(replicateInfo interface{}) *MockReplicateManagerClient_RemoveReplicator_Call {
return &MockReplicateManagerClient_RemoveReplicator_Call{Call: _e.mock.On("RemoveReplicator", replicateInfo)}
// - key string
// - modRevision int64
func (_e *MockReplicateManagerClient_Expecter) RemoveReplicator(key interface{}, modRevision interface{}) *MockReplicateManagerClient_RemoveReplicator_Call {
return &MockReplicateManagerClient_RemoveReplicator_Call{Call: _e.mock.On("RemoveReplicator", key, modRevision)}
}
func (_c *MockReplicateManagerClient_RemoveReplicator_Call) Run(run func(replicateInfo *streamingpb.ReplicatePChannelMeta)) *MockReplicateManagerClient_RemoveReplicator_Call {
func (_c *MockReplicateManagerClient_RemoveReplicator_Call) Run(run func(key string, modRevision int64)) *MockReplicateManagerClient_RemoveReplicator_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(*streamingpb.ReplicatePChannelMeta))
run(args[0].(string), args[1].(int64))
})
return _c
}
@ -146,7 +147,7 @@ func (_c *MockReplicateManagerClient_RemoveReplicator_Call) Return() *MockReplic
return _c
}
func (_c *MockReplicateManagerClient_RemoveReplicator_Call) RunAndReturn(run func(*streamingpb.ReplicatePChannelMeta)) *MockReplicateManagerClient_RemoveReplicator_Call {
func (_c *MockReplicateManagerClient_RemoveReplicator_Call) RunAndReturn(run func(string, int64)) *MockReplicateManagerClient_RemoveReplicator_Call {
_c.Run(run)
return _c
}

View File

@ -16,18 +16,18 @@
package replication
import "github.com/milvus-io/milvus/pkg/v2/proto/streamingpb"
import "github.com/milvus-io/milvus/internal/cdc/meta"
// ReplicateManagerClient is the client that manages the replicate configuration.
type ReplicateManagerClient interface {
// CreateReplicator creates a new replicator for the replicate pchannel.
CreateReplicator(replicateInfo *streamingpb.ReplicatePChannelMeta)
CreateReplicator(channel *meta.ReplicateChannel)
// RemoveReplicator removes a replicator for the replicate pchannel.
RemoveReplicator(replicateInfo *streamingpb.ReplicatePChannelMeta)
RemoveReplicator(key string, modRevision int64)
// RemoveOutdatedReplicators removes the outdated replicators.
RemoveOutdatedReplicators(aliveReplicates []*streamingpb.ReplicatePChannelMeta)
RemoveOutdatedReplicators(aliveChannels []*meta.ReplicateChannel)
// Close closes the replicate manager client.
Close()

View File

@ -25,112 +25,129 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/cdc/cluster"
"github.com/milvus-io/milvus/internal/cdc/meta"
"github.com/milvus-io/milvus/internal/cdc/replication/replicatestream"
"github.com/milvus-io/milvus/internal/cdc/resource"
"github.com/milvus-io/milvus/internal/cdc/util"
"github.com/milvus-io/milvus/internal/distributed/streaming"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/utility"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message/adaptor"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/options"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
"github.com/milvus-io/milvus/pkg/v2/util/syncutil"
)
// Replicator is the client that replicates the message to the channel in the target cluster.
type Replicator interface {
// StartReplicate starts the replicate for the channel.
StartReplicate()
// StartReplication starts the replicate for the channel.
StartReplication()
// StopReplicate stops the replicate loop
// StopReplication stops the replicate loop
// and wait for the loop to exit.
StopReplicate()
// GetState returns the current state of the replicator.
GetState() typeutil.LifetimeState
StopReplication()
}
var _ Replicator = (*channelReplicator)(nil)
// channelReplicator is the implementation of ChannelReplicator.
type channelReplicator struct {
replicateInfo *streamingpb.ReplicatePChannelMeta
channel *meta.ReplicateChannel
createRscFunc replicatestream.CreateReplicateStreamClientFunc
createMcFunc cluster.CreateMilvusClientFunc
targetClient cluster.MilvusClient
streamClient replicatestream.ReplicateStreamClient
msgScanner streaming.Scanner
msgChan adaptor.ChanMessageHandler
ctx context.Context
cancel context.CancelFunc
lifetime *typeutil.Lifetime
asyncNotifier *syncutil.AsyncTaskNotifier[struct{}]
}
// NewChannelReplicator creates a new ChannelReplicator.
func NewChannelReplicator(replicateMeta *streamingpb.ReplicatePChannelMeta) Replicator {
ctx, cancel := context.WithCancel(context.Background())
func NewChannelReplicator(channel *meta.ReplicateChannel) Replicator {
createRscFunc := replicatestream.NewReplicateStreamClient
return &channelReplicator{
replicateInfo: replicateMeta,
channel: channel,
createRscFunc: createRscFunc,
ctx: ctx,
cancel: cancel,
lifetime: typeutil.NewLifetime(),
createMcFunc: cluster.NewMilvusClient,
asyncNotifier: syncutil.NewAsyncTaskNotifier[struct{}](),
}
}
func (r *channelReplicator) StartReplicate() {
logger := log.With(
zap.String("sourceChannel", r.replicateInfo.GetSourceChannelName()),
zap.String("targetChannel", r.replicateInfo.GetTargetChannelName()),
)
if !r.lifetime.Add(typeutil.LifetimeStateWorking) {
logger.Warn("replicate channel already started")
return
}
func (r *channelReplicator) StartReplication() {
logger := log.With(zap.String("key", r.channel.Key), zap.Int64("modRevision", r.channel.ModRevision))
logger.Info("start replicate channel")
go func() {
defer r.lifetime.Done()
defer r.asyncNotifier.Finish(struct{}{})
INIT_LOOP:
for {
err := r.replicateLoop()
if err != nil {
logger.Warn("replicate channel failed", zap.Error(err))
time.Sleep(10 * time.Second)
continue
select {
case <-r.asyncNotifier.Context().Done():
return
default:
err := r.init()
if err != nil {
logger.Warn("initialize replicator failed", zap.Error(err))
continue
}
break INIT_LOOP
}
break
}
logger.Info("stop replicate channel")
r.startConsumeLoop()
}()
}
// replicateLoop starts the replicate loop.
func (r *channelReplicator) replicateLoop() error {
logger := log.With(
zap.String("sourceChannel", r.replicateInfo.GetSourceChannelName()),
zap.String("targetChannel", r.replicateInfo.GetTargetChannelName()),
)
cp, err := r.getReplicateCheckpoint()
if err != nil {
return err
func (r *channelReplicator) init() error {
logger := log.With(zap.String("key", r.channel.Key), zap.Int64("modRevision", r.channel.ModRevision))
// init target client
if r.targetClient == nil {
dialCtx, dialCancel := context.WithTimeout(r.asyncNotifier.Context(), 30*time.Second)
defer dialCancel()
milvusClient, err := r.createMcFunc(dialCtx, r.channel.Value.GetTargetCluster())
if err != nil {
return err
}
r.targetClient = milvusClient
logger.Info("target client initialized")
}
ch := make(adaptor.ChanMessageHandler)
scanner := streaming.WAL().Read(r.ctx, streaming.ReadOption{
PChannel: r.replicateInfo.GetSourceChannelName(),
DeliverPolicy: options.DeliverPolicyStartFrom(cp.MessageID),
DeliverFilters: []options.DeliverFilter{options.DeliverFilterTimeTickGT(cp.TimeTick)},
MessageHandler: ch,
})
defer scanner.Close()
// init msg scanner
if r.msgScanner == nil {
cp, err := r.getReplicateCheckpoint()
if err != nil {
return err
}
ch := make(adaptor.ChanMessageHandler)
scanner := streaming.WAL().Read(r.asyncNotifier.Context(), streaming.ReadOption{
PChannel: r.channel.Value.GetSourceChannelName(),
DeliverPolicy: options.DeliverPolicyStartFrom(cp.MessageID),
DeliverFilters: []options.DeliverFilter{options.DeliverFilterTimeTickGT(cp.TimeTick)},
MessageHandler: ch,
})
r.msgScanner = scanner
r.msgChan = ch
logger.Info("scanner initialized", zap.Any("checkpoint", cp))
}
// init replicate stream client
if r.streamClient == nil {
r.streamClient = r.createRscFunc(r.asyncNotifier.Context(), r.targetClient, r.channel)
logger.Info("stream client initialized")
}
return nil
}
rsc := r.createRscFunc(r.ctx, r.replicateInfo)
defer rsc.Close()
logger.Info("start replicate channel loop", zap.Any("startFrom", cp))
// startConsumeLoop starts the replicate loop.
func (r *channelReplicator) startConsumeLoop() {
logger := log.With(zap.String("key", r.channel.Key), zap.Int64("modRevision", r.channel.ModRevision))
logger.Info("start consume loop")
for {
select {
case <-r.ctx.Done():
logger.Info("replicate channel stopped")
return nil
case msg := <-ch:
err := rsc.Replicate(msg)
case <-r.asyncNotifier.Context().Done():
logger.Info("consume loop stopped")
return
case msg := <-r.msgChan:
err := r.streamClient.Replicate(msg)
if err != nil {
if !errors.Is(err, replicatestream.ErrReplicateIgnored) {
panic(fmt.Sprintf("replicate message failed due to unrecoverable error: %v", err))
@ -138,37 +155,35 @@ func (r *channelReplicator) replicateLoop() error {
continue
}
logger.Debug("replicate message success", log.FieldMessage(msg))
if msg.MessageType() == message.MessageTypeAlterReplicateConfig {
if util.IsReplicationRemovedByAlterReplicateConfigMessage(msg, r.channel.Value) {
logger.Info("replication removed, stop consume loop")
return
}
}
}
}
}
func (r *channelReplicator) getReplicateCheckpoint() (*utility.ReplicateCheckpoint, error) {
logger := log.With(
zap.String("sourceChannel", r.replicateInfo.GetSourceChannelName()),
zap.String("targetChannel", r.replicateInfo.GetTargetChannelName()),
)
logger := log.With(zap.String("key", r.channel.Key), zap.Int64("modRevision", r.channel.ModRevision))
ctx, cancel := context.WithTimeout(r.ctx, 30*time.Second)
ctx, cancel := context.WithTimeout(r.asyncNotifier.Context(), 30*time.Second)
defer cancel()
milvusClient, err := resource.Resource().ClusterClient().CreateMilvusClient(ctx, r.replicateInfo.GetTargetCluster())
if err != nil {
return nil, err
}
defer milvusClient.Close(ctx)
sourceClusterID := paramtable.Get().CommonCfg.ClusterPrefix.GetValue()
req := &milvuspb.GetReplicateInfoRequest{
SourceClusterId: sourceClusterID,
TargetPchannel: r.replicateInfo.GetTargetChannelName(),
TargetPchannel: r.channel.Value.GetTargetChannelName(),
}
replicateInfo, err := milvusClient.GetReplicateInfo(ctx, req)
replicateInfo, err := r.targetClient.GetReplicateInfo(ctx, req)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "failed to get replicate info")
}
checkpoint := replicateInfo.GetCheckpoint()
if checkpoint == nil || checkpoint.MessageId == nil {
initializedCheckpoint := utility.NewReplicateCheckpointFromProto(r.replicateInfo.InitializedCheckpoint)
initializedCheckpoint := utility.NewReplicateCheckpointFromProto(r.channel.Value.InitializedCheckpoint)
logger.Info("channel not found in replicate info, will start from the beginning",
zap.Stringer("messageID", initializedCheckpoint.MessageID),
zap.Uint64("timeTick", initializedCheckpoint.TimeTick),
@ -184,12 +199,16 @@ func (r *channelReplicator) getReplicateCheckpoint() (*utility.ReplicateCheckpoi
return cp, nil
}
func (r *channelReplicator) StopReplicate() {
r.lifetime.SetState(typeutil.LifetimeStateStopped)
r.cancel()
r.lifetime.Wait()
}
func (r *channelReplicator) GetState() typeutil.LifetimeState {
return r.lifetime.GetState()
func (r *channelReplicator) StopReplication() {
r.asyncNotifier.Cancel()
r.asyncNotifier.BlockUntilFinish() // wait for the start goroutine to finish
if r.targetClient != nil {
r.targetClient.Close(r.asyncNotifier.Context())
}
if r.streamClient != nil {
r.streamClient.Close()
}
if r.msgScanner != nil {
r.msgScanner.Close()
}
}

View File

@ -19,6 +19,7 @@ package replicatemanager
import (
"context"
"testing"
"time"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/stretchr/testify/assert"
@ -27,13 +28,12 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/cdc/cluster"
"github.com/milvus-io/milvus/internal/cdc/meta"
"github.com/milvus-io/milvus/internal/cdc/replication/replicatestream"
"github.com/milvus-io/milvus/internal/cdc/resource"
"github.com/milvus-io/milvus/internal/distributed/streaming"
"github.com/milvus-io/milvus/internal/mocks/distributed/mock_streaming"
"github.com/milvus-io/milvus/pkg/v2/proto/streamingpb"
pulsar2 "github.com/milvus-io/milvus/pkg/v2/streaming/walimpls/impls/pulsar"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
func newMockPulsarMessageID() *commonpb.MessageID {
@ -56,13 +56,6 @@ func TestChannelReplicator_StartReplicateChannel(t *testing.T) {
}, nil)
mockMilvusClient.EXPECT().Close(mock.Anything).Return(nil)
mockClusterClient := cluster.NewMockClusterClient(t)
mockClusterClient.EXPECT().CreateMilvusClient(mock.Anything, mock.Anything).
Return(mockMilvusClient, nil)
resource.InitForTest(t,
resource.OptClusterClient(mockClusterClient),
)
scanner := mock_streaming.NewMockScanner(t)
scanner.EXPECT().Close().Return()
wal := mock_streaming.NewMockWALAccesser(t)
@ -72,23 +65,31 @@ func TestChannelReplicator_StartReplicateChannel(t *testing.T) {
rs := replicatestream.NewMockReplicateStreamClient(t)
rs.EXPECT().Close().Return()
cluster := &commonpb.MilvusCluster{ClusterId: "test-cluster"}
mc := &commonpb.MilvusCluster{ClusterId: "test-cluster"}
replicateInfo := &streamingpb.ReplicatePChannelMeta{
SourceChannelName: "test-source-channel",
TargetChannelName: "test-target-channel",
TargetCluster: cluster,
TargetCluster: mc,
}
replicator := NewChannelReplicator(replicateInfo)
replicator := NewChannelReplicator(&meta.ReplicateChannel{
Value: replicateInfo,
ModRevision: 0,
})
assert.NotNil(t, replicator)
replicator.(*channelReplicator).createRscFunc = func(ctx context.Context,
replicateInfo *streamingpb.ReplicatePChannelMeta,
c cluster.MilvusClient,
rm *meta.ReplicateChannel,
) replicatestream.ReplicateStreamClient {
return rs
}
assert.NotNil(t, replicator)
replicator.(*channelReplicator).createMcFunc = func(ctx context.Context,
cluster *commonpb.MilvusCluster,
) (cluster.MilvusClient, error) {
return mockMilvusClient, nil
}
replicator.StartReplicate()
replicator.StopReplicate()
state := replicator.GetState()
assert.Equal(t, typeutil.LifetimeStateStopped, state)
replicator.StartReplication()
time.Sleep(200 * time.Millisecond)
replicator.StopReplication()
}

View File

@ -18,93 +18,103 @@ package replicatemanager
import (
"context"
"fmt"
"strings"
"sync"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/metastore/kv/streamingcoord"
"github.com/milvus-io/milvus/internal/cdc/meta"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)
// replicateManager is the implementation of ReplicateManagerClient.
type replicateManager struct {
ctx context.Context
mu sync.Mutex
// replicators is a map of replicate pchannel name to ChannelReplicator.
replicators map[string]Replicator
replicatorPChannels map[string]*streamingpb.ReplicatePChannelMeta
replicators map[string]Replicator
replicatorChannels map[string]*meta.ReplicateChannel
}
func NewReplicateManager() *replicateManager {
return &replicateManager{
ctx: context.Background(),
replicators: make(map[string]Replicator),
replicatorPChannels: make(map[string]*streamingpb.ReplicatePChannelMeta),
ctx: context.Background(),
replicators: make(map[string]Replicator),
replicatorChannels: make(map[string]*meta.ReplicateChannel),
}
}
func (r *replicateManager) CreateReplicator(replicateInfo *streamingpb.ReplicatePChannelMeta) {
logger := log.With(
zap.String("sourceChannel", replicateInfo.GetSourceChannelName()),
zap.String("targetChannel", replicateInfo.GetTargetChannelName()),
)
func buildReplicatorKey(metaKey string, modRevision int64) string {
return fmt.Sprintf("%s/_v%d", metaKey, modRevision)
}
func (r *replicateManager) CreateReplicator(channel *meta.ReplicateChannel) {
r.mu.Lock()
defer r.mu.Unlock()
logger := log.With(zap.String("key", channel.Key), zap.Int64("modRevision", channel.ModRevision))
repKey := buildReplicatorKey(channel.Key, channel.ModRevision)
currentClusterID := paramtable.Get().CommonCfg.ClusterPrefix.GetValue()
if !strings.Contains(replicateInfo.GetSourceChannelName(), currentClusterID) {
// should be checked by controller, here is a redundant check
if !strings.Contains(channel.Value.GetSourceChannelName(), currentClusterID) {
return
}
replicatorKey := streamingcoord.BuildReplicatePChannelMetaKey(replicateInfo)
_, ok := r.replicators[replicatorKey]
_, ok := r.replicators[repKey]
if ok {
logger.Debug("replicator already exists, skip create replicator")
return
}
replicator := NewChannelReplicator(replicateInfo)
replicator.StartReplicate()
r.replicators[replicatorKey] = replicator
r.replicatorPChannels[replicatorKey] = replicateInfo
replicator := NewChannelReplicator(channel)
replicator.StartReplication()
r.replicators[repKey] = replicator
r.replicatorChannels[repKey] = channel
logger.Info("created replicator for replicate pchannel")
}
func (r *replicateManager) RemoveReplicator(replicateInfo *streamingpb.ReplicatePChannelMeta) {
logger := log.With(
zap.String("sourceChannel", replicateInfo.GetSourceChannelName()),
zap.String("targetChannel", replicateInfo.GetTargetChannelName()),
)
replicatorKey := streamingcoord.BuildReplicatePChannelMetaKey(replicateInfo)
replicator, ok := r.replicators[replicatorKey]
func (r *replicateManager) RemoveReplicator(key string, modRevision int64) {
r.mu.Lock()
defer r.mu.Unlock()
r.removeReplicatorInternal(key, modRevision)
}
func (r *replicateManager) removeReplicatorInternal(key string, modRevision int64) {
logger := log.With(zap.String("key", key), zap.Int64("modRevision", modRevision))
repKey := buildReplicatorKey(key, modRevision)
replicator, ok := r.replicators[repKey]
if !ok {
logger.Info("replicator not found, skip remove")
return
}
replicator.StopReplicate()
delete(r.replicators, replicatorKey)
delete(r.replicatorPChannels, replicatorKey)
replicator.StopReplication()
delete(r.replicators, repKey)
delete(r.replicatorChannels, repKey)
logger.Info("removed replicator for replicate pchannel")
}
func (r *replicateManager) RemoveOutdatedReplicators(aliveReplicates []*streamingpb.ReplicatePChannelMeta) {
func (r *replicateManager) RemoveOutdatedReplicators(aliveChannels []*meta.ReplicateChannel) {
r.mu.Lock()
defer r.mu.Unlock()
alivesMap := make(map[string]struct{})
for _, replicate := range aliveReplicates {
alivesMap[streamingcoord.BuildReplicatePChannelMetaKey(replicate)] = struct{}{}
for _, channel := range aliveChannels {
repKey := buildReplicatorKey(channel.Key, channel.ModRevision)
alivesMap[repKey] = struct{}{}
}
for replicatorKey, replicator := range r.replicators {
if _, ok := alivesMap[replicatorKey]; !ok {
replicator.StopReplicate()
info := r.replicatorPChannels[replicatorKey]
delete(r.replicators, replicatorKey)
delete(r.replicatorPChannels, replicatorKey)
log.Info("removed replicator for replicate pchannel",
zap.String("sourceChannel", info.GetSourceChannelName()),
zap.String("targetChannel", info.GetTargetChannelName()))
for repKey := range r.replicators {
if _, ok := alivesMap[repKey]; !ok {
channel := r.replicatorChannels[repKey]
r.removeReplicatorInternal(channel.Key, channel.ModRevision)
}
}
}
func (r *replicateManager) Close() {
r.mu.Lock()
defer r.mu.Unlock()
for _, replicator := range r.replicators {
replicator.StopReplicate()
replicator.StopReplication()
}
r.replicators = make(map[string]Replicator)
r.replicatorChannels = make(map[string]*meta.ReplicateChannel)
}

View File

@ -24,8 +24,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/cdc/cluster"
"github.com/milvus-io/milvus/internal/cdc/resource"
"github.com/milvus-io/milvus/internal/metastore/kv/streamingcoord"
"github.com/milvus-io/milvus/internal/cdc/meta"
"github.com/milvus-io/milvus/pkg/v2/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)
@ -39,13 +38,6 @@ func TestReplicateManager_CreateReplicator(t *testing.T) {
Return(nil, assert.AnError).Maybe()
mockMilvusClient.EXPECT().Close(mock.Anything).Return(nil).Maybe()
mockClusterClient := cluster.NewMockClusterClient(t)
mockClusterClient.EXPECT().CreateMilvusClient(mock.Anything, mock.Anything).
Return(mockMilvusClient, nil).Maybe()
resource.InitForTest(t,
resource.OptClusterClient(mockClusterClient),
)
manager := NewReplicateManager()
// Test creating first replicator
@ -56,13 +48,18 @@ func TestReplicateManager_CreateReplicator(t *testing.T) {
ClusterId: "test-cluster-1",
},
}
key := "test-replicate-key-1"
replicateMeta := &meta.ReplicateChannel{
Key: key,
Value: replicateInfo,
ModRevision: 0,
}
manager.CreateReplicator(replicateInfo)
manager.CreateReplicator(replicateMeta)
// Verify replicator was created
assert.Equal(t, 1, len(manager.replicators))
key := streamingcoord.BuildReplicatePChannelMetaKey(replicateInfo)
replicator, exists := manager.replicators[key]
replicator, exists := manager.replicators[buildReplicatorKey(key, 0)]
assert.True(t, exists)
assert.NotNil(t, replicator)
@ -74,18 +71,23 @@ func TestReplicateManager_CreateReplicator(t *testing.T) {
ClusterId: "test-cluster-2",
},
}
key2 := "test-replicate-key-2"
replicateMeta2 := &meta.ReplicateChannel{
Key: key2,
Value: replicateInfo2,
ModRevision: 0,
}
manager.CreateReplicator(replicateInfo2)
manager.CreateReplicator(replicateMeta2)
// Verify second replicator was created
assert.Equal(t, 2, len(manager.replicators))
key2 := streamingcoord.BuildReplicatePChannelMetaKey(replicateInfo2)
replicator2, exists := manager.replicators[key2]
replicator2, exists := manager.replicators[buildReplicatorKey(key2, 0)]
assert.True(t, exists)
assert.NotNil(t, replicator2)
// Verify first replicator still exists
replicator1, exists := manager.replicators[key]
replicator1, exists := manager.replicators[buildReplicatorKey(key, 0)]
assert.True(t, exists)
assert.NotNil(t, replicator1)
}

View File

@ -21,7 +21,8 @@ import (
"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus/pkg/v2/proto/streamingpb"
"github.com/milvus-io/milvus/internal/cdc/cluster"
"github.com/milvus-io/milvus/internal/cdc/meta"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
)
@ -36,8 +37,8 @@ type ReplicateStreamClient interface {
// return ErrReplicateIgnored if the message should not be replicated.
Replicate(msg message.ImmutableMessage) error
// Stop stops the replicate operation.
// Close closes the replicate stream client.
Close()
}
type CreateReplicateStreamClientFunc func(ctx context.Context, replicateInfo *streamingpb.ReplicatePChannelMeta) ReplicateStreamClient
type CreateReplicateStreamClientFunc func(ctx context.Context, c cluster.MilvusClient, rm *meta.ReplicateChannel) ReplicateStreamClient

View File

@ -22,17 +22,19 @@ import (
"time"
"github.com/cenkalti/backoff/v4"
"github.com/cockroachdb/errors"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/cdc/cluster"
"github.com/milvus-io/milvus/internal/cdc/meta"
"github.com/milvus-io/milvus/internal/cdc/resource"
"github.com/milvus-io/milvus/internal/cdc/util"
"github.com/milvus-io/milvus/internal/util/streamingutil/service/contextutil"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/replicateutil"
)
const (
@ -41,12 +43,14 @@ const (
pendingMessageQueueMaxSize = 128 * 1024 * 1024
)
var ErrReplicationRemoved = errors.New("replication removed")
// replicateStreamClient is the implementation of ReplicateStreamClient.
type replicateStreamClient struct {
replicateInfo *streamingpb.ReplicatePChannelMeta
clusterID string
targetClient cluster.MilvusClient
client milvuspb.MilvusService_CreateReplicateStreamClient
channel *meta.ReplicateChannel
pendingMessages MsgQueue
metrics ReplicateMetrics
@ -56,9 +60,9 @@ type replicateStreamClient struct {
}
// NewReplicateStreamClient creates a new ReplicateStreamClient.
func NewReplicateStreamClient(ctx context.Context, replicateInfo *streamingpb.ReplicatePChannelMeta) ReplicateStreamClient {
func NewReplicateStreamClient(ctx context.Context, c cluster.MilvusClient, channel *meta.ReplicateChannel) ReplicateStreamClient {
ctx1, cancel := context.WithCancel(ctx)
ctx1 = contextutil.WithClusterID(ctx1, replicateInfo.GetTargetCluster().GetClusterId())
ctx1 = contextutil.WithClusterID(ctx1, channel.Value.GetTargetCluster().GetClusterId())
options := MsgQueueOptions{
Capacity: pendingMessageQueueLength,
@ -67,9 +71,10 @@ func NewReplicateStreamClient(ctx context.Context, replicateInfo *streamingpb.Re
pendingMessages := NewMsgQueue(options)
rs := &replicateStreamClient{
clusterID: paramtable.Get().CommonCfg.ClusterPrefix.GetValue(),
replicateInfo: replicateInfo,
targetClient: c,
channel: channel,
pendingMessages: pendingMessages,
metrics: NewReplicateMetrics(replicateInfo),
metrics: NewReplicateMetrics(channel.Value),
ctx: ctx1,
cancel: cancel,
finishedCh: make(chan struct{}),
@ -81,68 +86,79 @@ func NewReplicateStreamClient(ctx context.Context, replicateInfo *streamingpb.Re
}
func (r *replicateStreamClient) startInternal() {
logger := log.With(
zap.String("sourceChannel", r.replicateInfo.GetSourceChannelName()),
zap.String("targetChannel", r.replicateInfo.GetTargetChannelName()),
)
logger := log.With(zap.String("key", r.channel.Key), zap.Int64("revision", r.channel.ModRevision))
defer func() {
r.metrics.OnClose()
logger.Info("replicate stream client closed")
close(r.finishedCh)
logger.Info("replicate stream client closed")
}()
backoff := backoff.NewExponentialBackOff()
backoff.InitialInterval = 100 * time.Millisecond
backoff.MaxInterval = 10 * time.Second
backoff.MaxElapsedTime = 0
backoff.Reset()
for {
// Create a local context for this connection that can be canceled
// when we need to stop the send/recv loops
connCtx, connCancel := context.WithCancel(r.ctx)
milvusClient, err := resource.Resource().ClusterClient().CreateMilvusClient(connCtx, r.replicateInfo.GetTargetCluster())
if err != nil {
logger.Warn("create milvus client failed, retry...", zap.Error(err))
time.Sleep(backoff.NextBackOff())
continue
}
client, err := milvusClient.CreateReplicateStream(connCtx)
if err != nil {
logger.Warn("create milvus replicate stream failed, retry...", zap.Error(err))
time.Sleep(backoff.NextBackOff())
continue
}
logger.Info("replicate stream client service started")
r.metrics.OnConnect()
// reset client and pending messages
r.client = client
r.pendingMessages.SeekToHead()
sendCh := r.startSendLoop(connCtx)
recvCh := r.startRecvLoop(connCtx)
select {
case <-r.ctx.Done():
case <-sendCh:
case <-recvCh:
}
connCancel() // Cancel the connection context
<-sendCh
<-recvCh // wait for send/recv loops to exit
if r.ctx.Err() != nil {
logger.Info("close replicate stream client by ctx done")
restart := r.startReplicating(backoff)
if !restart {
return
} else {
logger.Warn("restart replicate stream client")
r.metrics.OnDisconnect()
time.Sleep(backoff.NextBackOff())
}
time.Sleep(backoff.NextBackOff())
}
}
func (r *replicateStreamClient) startReplicating(backoff backoff.BackOff) (needRestart bool) {
logger := log.With(zap.String("key", r.channel.Key), zap.Int64("revision", r.channel.ModRevision))
if r.ctx.Err() != nil {
logger.Info("close replicate stream client due to ctx done")
return false
}
// Create a local context for this connection that can be canceled
// when we need to stop the send/recv loops
connCtx, connCancel := context.WithCancel(r.ctx)
defer connCancel()
client, err := r.targetClient.CreateReplicateStream(connCtx)
if err != nil {
logger.Warn("create milvus replicate stream failed, retry...", zap.Error(err))
return true
}
defer client.CloseSend()
logger.Info("replicate stream client service started")
r.metrics.OnConnect()
backoff.Reset()
// reset client and pending messages
r.client = client
r.pendingMessages.SeekToHead()
sendCh := r.startSendLoop(connCtx)
recvCh := r.startRecvLoop(connCtx)
var chErr error
select {
case <-r.ctx.Done():
case chErr = <-sendCh:
case chErr = <-recvCh:
}
connCancel() // Cancel the connection context
<-sendCh
<-recvCh // wait for send/recv loops to exit
if r.ctx.Err() != nil {
logger.Info("close replicate stream client due to ctx done")
return false
} else if errors.Is(chErr, ErrReplicationRemoved) {
logger.Info("close replicate stream client due to replication removed")
return false
} else {
logger.Warn("restart replicate stream client due to unexpected error", zap.Error(chErr))
r.metrics.OnDisconnect()
return true
}
}
@ -166,36 +182,34 @@ func (r *replicateStreamClient) Replicate(msg message.ImmutableMessage) error {
}
}
func (r *replicateStreamClient) startSendLoop(ctx context.Context) <-chan struct{} {
ch := make(chan struct{})
func (r *replicateStreamClient) startSendLoop(ctx context.Context) <-chan error {
ch := make(chan error)
go func() {
_ = r.sendLoop(ctx)
err := r.sendLoop(ctx)
ch <- err
close(ch)
}()
return ch
}
func (r *replicateStreamClient) startRecvLoop(ctx context.Context) <-chan struct{} {
ch := make(chan struct{})
func (r *replicateStreamClient) startRecvLoop(ctx context.Context) <-chan error {
ch := make(chan error)
go func() {
_ = r.recvLoop(ctx)
err := r.recvLoop(ctx)
ch <- err
close(ch)
}()
return ch
}
func (r *replicateStreamClient) sendLoop(ctx context.Context) (err error) {
logger := log.With(
zap.String("sourceChannel", r.replicateInfo.GetSourceChannelName()),
zap.String("targetChannel", r.replicateInfo.GetTargetChannelName()),
)
logger := log.With(zap.String("key", r.channel.Key), zap.Int64("revision", r.channel.ModRevision))
defer func() {
if err != nil {
logger.Warn("send loop closed by unexpected error", zap.Error(err))
} else {
logger.Info("send loop closed")
}
r.client.CloseSend()
}()
for {
select {
@ -243,10 +257,7 @@ func (r *replicateStreamClient) sendLoop(ctx context.Context) (err error) {
func (r *replicateStreamClient) sendMessage(msg message.ImmutableMessage) (err error) {
defer func() {
logger := log.With(
zap.String("sourceChannel", r.replicateInfo.GetSourceChannelName()),
zap.String("targetChannel", r.replicateInfo.GetTargetChannelName()),
)
logger := log.With(zap.String("key", r.channel.Key), zap.Int64("revision", r.channel.ModRevision))
if err != nil {
logger.Warn("send message failed", zap.Error(err), log.FieldMessage(msg))
} else {
@ -271,15 +282,12 @@ func (r *replicateStreamClient) sendMessage(msg message.ImmutableMessage) (err e
}
func (r *replicateStreamClient) recvLoop(ctx context.Context) (err error) {
logger := log.With(
zap.String("sourceChannel", r.replicateInfo.GetSourceChannelName()),
zap.String("targetChannel", r.replicateInfo.GetTargetChannelName()),
)
logger := log.With(zap.String("key", r.channel.Key), zap.Int64("revision", r.channel.ModRevision))
defer func() {
if err != nil {
if err != nil && !errors.Is(err, ErrReplicationRemoved) {
logger.Warn("recv loop closed by unexpected error", zap.Error(err))
} else {
logger.Info("recv loop closed")
logger.Info("recv loop closed", zap.Error(err))
}
}()
for {
@ -296,40 +304,46 @@ func (r *replicateStreamClient) recvLoop(ctx context.Context) (err error) {
if lastConfirmedMessageInfo != nil {
messages := r.pendingMessages.CleanupConfirmedMessages(lastConfirmedMessageInfo.GetConfirmedTimeTick())
for _, msg := range messages {
r.metrics.OnConfirmed(msg)
if msg.MessageType() == message.MessageTypeAlterReplicateConfig {
roleChanged := r.handleAlterReplicateConfigMessage(msg)
if roleChanged {
// Role changed, return and stop replicate.
return nil
replicationRemoved := r.handleAlterReplicateConfigMessage(msg)
if replicationRemoved {
// Replication removed, return and stop replicate.
return ErrReplicationRemoved
}
}
r.metrics.OnConfirmed(msg)
}
}
}
}
}
func (r *replicateStreamClient) handleAlterReplicateConfigMessage(msg message.ImmutableMessage) (roleChanged bool) {
logger := log.With(
zap.String("sourceChannel", r.replicateInfo.GetSourceChannelName()),
zap.String("targetChannel", r.replicateInfo.GetTargetChannelName()),
)
func (r *replicateStreamClient) handleAlterReplicateConfigMessage(msg message.ImmutableMessage) (replicationRemoved bool) {
logger := log.With(zap.String("key", r.channel.Key), zap.Int64("revision", r.channel.ModRevision))
logger.Info("handle AlterReplicateConfigMessage", log.FieldMessage(msg))
prcMsg := message.MustAsImmutableAlterReplicateConfigMessageV2(msg)
replicateConfig := prcMsg.Header().ReplicateConfiguration
currentClusterID := paramtable.Get().CommonCfg.ClusterPrefix.GetValue()
currentCluster := replicateutil.MustNewConfigHelper(currentClusterID, replicateConfig).GetCurrentCluster()
_, err := currentCluster.GetTargetChannel(r.replicateInfo.GetSourceChannelName(),
r.replicateInfo.GetTargetCluster().GetClusterId())
if err != nil {
replicationRemoved = util.IsReplicationRemovedByAlterReplicateConfigMessage(msg, r.channel.Value)
if replicationRemoved {
// Cannot find the target channel, it means that the `current->target` topology edge is removed,
// so we need to remove the replicate pchannel and stop replicate.
err := resource.Resource().ReplicationCatalog().RemoveReplicatePChannel(r.ctx, r.replicateInfo)
etcdCli := resource.Resource().ETCD()
ok, err := meta.RemoveReplicatePChannelWithRevision(r.ctx, etcdCli, r.channel.Key, r.channel.ModRevision)
if err != nil {
panic(fmt.Sprintf("failed to remove replicate pchannel: %v", err))
logger.Warn("failed to remove replicate pchannel", zap.Error(err))
// When performing delete operation on etcd, the context may be canceled by the delete event
// in cdc controller and then return `context.Canceled` error.
// Since the delete event is generated after the delete operation is committed in etcd,
// the delete is guaranteed to have succeeded on the server side.
// So we can ignore the context canceled error here.
if !errors.Is(err, context.Canceled) {
panic(fmt.Sprintf("failed to remove replicate pchannel: %v", err))
}
}
if ok {
logger.Info("handle AlterReplicateConfigMessage done, replicate pchannel removed")
} else {
logger.Info("handle AlterReplicateConfigMessage done, revision not match, replicate pchannel not removed")
}
logger.Info("handle AlterReplicateConfigMessage done, replicate pchannel removed")
return true
}
logger.Info("target channel found, skip handle AlterReplicateConfigMessage")

View File

@ -30,7 +30,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
milvuspb "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/cdc/cluster"
"github.com/milvus-io/milvus/internal/cdc/resource"
"github.com/milvus-io/milvus/internal/cdc/meta"
"github.com/milvus-io/milvus/internal/distributed/streaming"
"github.com/milvus-io/milvus/internal/mocks/distributed/mock_streaming"
mock_message "github.com/milvus-io/milvus/pkg/v2/mocks/streaming/util/mock_message"
@ -55,23 +55,22 @@ func TestReplicateStreamClient_Replicate(t *testing.T) {
mockMilvusClient := cluster.NewMockMilvusClient(t)
mockMilvusClient.EXPECT().CreateReplicateStream(mock.Anything).Return(mockStreamClient, nil)
mockClusterClient := cluster.NewMockClusterClient(t)
mockClusterClient.EXPECT().CreateMilvusClient(mock.Anything, mock.Anything).
Return(mockMilvusClient, nil)
resource.InitForTest(t,
resource.OptClusterClient(mockClusterClient),
)
wal := mock_streaming.NewMockWALAccesser(t)
streaming.SetWALForTest(wal)
replicateInfo := &streamingpb.ReplicatePChannelMeta{
repMeta := &streamingpb.ReplicatePChannelMeta{
SourceChannelName: "test-source-channel",
TargetChannelName: "test-target-channel",
TargetCluster: targetCluster,
}
replicateClient := NewReplicateStreamClient(ctx, replicateInfo)
key := "test-replicate-key"
channel := &meta.ReplicateChannel{
Key: key,
Value: repMeta,
ModRevision: 0,
}
replicateClient := NewReplicateStreamClient(ctx, mockMilvusClient, channel)
defer replicateClient.Close()
// Test Replicate method
const msgCount = pendingMessageQueueLength * 10
@ -104,7 +103,6 @@ func TestReplicateStreamClient_Replicate(t *testing.T) {
assert.Eventually(t, func() bool {
return replicateClient.(*replicateStreamClient).pendingMessages.Len() == 0
}, time.Second, 100*time.Millisecond)
replicateClient.Close()
}
func TestReplicateStreamClient_Replicate_ContextCanceled(t *testing.T) {
@ -123,23 +121,21 @@ func TestReplicateStreamClient_Replicate_ContextCanceled(t *testing.T) {
mockMilvusClient.EXPECT().CreateReplicateStream(mock.Anything).Return(mockStreamClient, nil).Maybe()
mockMilvusClient.EXPECT().Close(mock.Anything).Return(nil).Maybe()
mockClusterClient := cluster.NewMockClusterClient(t)
mockClusterClient.EXPECT().CreateMilvusClient(mock.Anything, mock.Anything).
Return(mockMilvusClient, nil).Maybe()
resource.InitForTest(t,
resource.OptClusterClient(mockClusterClient),
)
wal := mock_streaming.NewMockWALAccesser(t)
streaming.SetWALForTest(wal)
replicateInfo := &streamingpb.ReplicatePChannelMeta{
key := "test-replicate-key"
repMeta := &streamingpb.ReplicatePChannelMeta{
SourceChannelName: "test-source-channel",
TargetChannelName: "test-target-channel",
TargetCluster: targetCluster,
}
client := NewReplicateStreamClient(ctx, replicateInfo)
channel := &meta.ReplicateChannel{
Key: key,
Value: repMeta,
ModRevision: 0,
}
client := NewReplicateStreamClient(ctx, mockMilvusClient, channel)
defer client.Close()
// Cancel context
@ -174,24 +170,23 @@ func TestReplicateStreamClient_Reconnect(t *testing.T) {
return mockStreamClient, nil
}).Times(reconnectTimes) // expect to be called reconnectTimes times
mockClusterClient := cluster.NewMockClusterClient(t)
mockClusterClient.EXPECT().CreateMilvusClient(mock.Anything, mock.Anything).
Return(mockMilvusClient, nil)
resource.InitForTest(t,
resource.OptClusterClient(mockClusterClient),
)
wal := mock_streaming.NewMockWALAccesser(t)
streaming.SetWALForTest(wal)
// Create client which will start internal retry loop
replicateInfo := &streamingpb.ReplicatePChannelMeta{
key := "test-replicate-key"
repMeta := &streamingpb.ReplicatePChannelMeta{
SourceChannelName: "test-source-channel",
TargetChannelName: "test-target-channel",
TargetCluster: targetCluster,
}
replicateClient := NewReplicateStreamClient(ctx, replicateInfo)
channel := &meta.ReplicateChannel{
Key: key,
Value: repMeta,
ModRevision: 0,
}
replicateClient := NewReplicateStreamClient(ctx, mockMilvusClient, channel)
defer replicateClient.Close()
// Replicate after reconnected
const msgCount = 100
@ -223,7 +218,6 @@ func TestReplicateStreamClient_Reconnect(t *testing.T) {
assert.Eventually(t, func() bool {
return replicateClient.(*replicateStreamClient).pendingMessages.Len() == 0
}, time.Second, 100*time.Millisecond)
replicateClient.Close()
}
// mockReplicateStreamClient implements the milvuspb.MilvusService_CreateReplicateStreamClient interface

View File

@ -21,12 +21,7 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"
"github.com/milvus-io/milvus/internal/cdc/cluster"
"github.com/milvus-io/milvus/internal/cdc/controller"
"github.com/milvus-io/milvus/internal/cdc/replication"
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/metastore/kv/streamingcoord"
"github.com/milvus-io/milvus/pkg/v2/kv"
)
var r *resourceImpl // singleton resource instance
@ -34,13 +29,6 @@ var r *resourceImpl // singleton resource instance
// optResourceInit is the option to initialize the resource.
type optResourceInit func(r *resourceImpl)
// OptMetaKV provides the meta kv to the resource.
func OptMetaKV(metaKV kv.MetaKv) optResourceInit {
return func(r *resourceImpl) {
r.metaKV = metaKV
}
}
// OptETCD provides the etcd client to the resource.
func OptETCD(etcd *clientv3.Client) optResourceInit {
return func(r *resourceImpl) {
@ -55,27 +43,6 @@ func OptReplicateManagerClient(replicateManagerClient replication.ReplicateManag
}
}
// OptReplicationCatalog provides the replication catalog to the resource.
func OptReplicationCatalog(catalog metastore.ReplicationCatalog) optResourceInit {
return func(r *resourceImpl) {
r.catalog = catalog
}
}
// OptClusterClient provides the cluster client to the resource.
func OptClusterClient(clusterClient cluster.ClusterClient) optResourceInit {
return func(r *resourceImpl) {
r.clusterClient = clusterClient
}
}
// OptController provides the controller to the resource.
func OptController(controller controller.Controller) optResourceInit {
return func(r *resourceImpl) {
r.controller = controller
}
}
// Done finish all initialization of resources.
func Init(opts ...optResourceInit) {
newR := &resourceImpl{}
@ -83,15 +50,8 @@ func Init(opts ...optResourceInit) {
opt(newR)
}
newR.catalog = streamingcoord.NewReplicationCatalog(newR.MetaKV())
newR.clusterClient = cluster.NewClusterClient()
assertNotNil(newR.MetaKV())
assertNotNil(newR.ETCD())
assertNotNil(newR.ReplicationCatalog())
assertNotNil(newR.ClusterClient())
assertNotNil(newR.ReplicateManagerClient())
assertNotNil(newR.Controller())
r = newR
}
@ -106,17 +66,8 @@ func Resource() *resourceImpl {
// resourceImpl is a basic resource dependency for streamingnode server.
// All utility on it is concurrent-safe and singleton.
type resourceImpl struct {
metaKV kv.MetaKv
etcdClient *clientv3.Client
catalog metastore.ReplicationCatalog
clusterClient cluster.ClusterClient
replicateManagerClient replication.ReplicateManagerClient
controller controller.Controller
}
// MetaKV returns the meta kv.
func (r *resourceImpl) MetaKV() kv.MetaKv {
return r.metaKV
}
// ETCD returns the etcd client.
@ -124,26 +75,11 @@ func (r *resourceImpl) ETCD() *clientv3.Client {
return r.etcdClient
}
// ReplicationCatalog returns the replication catalog.
func (r *resourceImpl) ReplicationCatalog() metastore.ReplicationCatalog {
return r.catalog
}
// ClusterClient returns the cluster client.
func (r *resourceImpl) ClusterClient() cluster.ClusterClient {
return r.clusterClient
}
// ReplicateManagerClient returns the replicate manager client.
func (r *resourceImpl) ReplicateManagerClient() replication.ReplicateManagerClient {
return r.replicateManagerClient
}
// Controller returns the controller.
func (r *resourceImpl) Controller() controller.Controller {
return r.controller
}
// assertNotNil panics if the resource is nil.
func assertNotNil(v interface{}) {
iv := reflect.ValueOf(v)

View File

@ -8,11 +8,7 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"
"github.com/milvus-io/milvus/internal/cdc/cluster"
"github.com/milvus-io/milvus/internal/cdc/controller"
"github.com/milvus-io/milvus/internal/cdc/replication"
"github.com/milvus-io/milvus/internal/mocks/mock_metastore"
"github.com/milvus-io/milvus/pkg/v2/mocks/mock_kv"
)
// InitForTest initializes the singleton of resources for test.
@ -21,22 +17,10 @@ func InitForTest(t *testing.T, opts ...optResourceInit) {
for _, opt := range opts {
opt(r)
}
if r.metaKV == nil {
r.metaKV = mock_kv.NewMockMetaKv(t)
}
if r.etcdClient == nil {
r.etcdClient = &clientv3.Client{}
}
if r.catalog == nil {
r.catalog = mock_metastore.NewMockReplicationCatalog(t)
}
if r.clusterClient == nil {
r.clusterClient = cluster.NewMockClusterClient(t)
}
if r.replicateManagerClient == nil {
r.replicateManagerClient = replication.NewMockReplicateManagerClient(t)
}
if r.controller == nil {
r.controller = controller.NewMockController(t)
}
}

View File

@ -21,30 +21,26 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/cdc/resource"
"github.com/milvus-io/milvus/internal/cdc/controller"
"github.com/milvus-io/milvus/pkg/v2/log"
)
type CDCServer struct {
ctx context.Context
ctx context.Context
controller controller.Controller
}
// NewCDCServer will return a CDCServer.
func NewCDCServer(ctx context.Context) *CDCServer {
return &CDCServer{
ctx: ctx,
ctx: ctx,
controller: controller.NewController(),
}
}
// Init initializes the CDCServer.
func (svr *CDCServer) Init() error {
log.Ctx(svr.ctx).Info("CDCServer init successfully")
return nil
}
// Start starts CDCServer.
func (svr *CDCServer) Start() error {
err := resource.Resource().Controller().Start()
err := svr.controller.Start()
if err != nil {
log.Ctx(svr.ctx).Error("start CDC controller failed", zap.Error(err))
return err
@ -55,7 +51,7 @@ func (svr *CDCServer) Start() error {
// Stop stops CDCServer.
func (svr *CDCServer) Stop() error {
resource.Resource().Controller().Stop()
svr.controller.Stop()
log.Ctx(svr.ctx).Info("CDCServer stop successfully")
return nil
}

23
internal/cdc/util/util.go Normal file
View File

@ -0,0 +1,23 @@
package util
import (
"github.com/milvus-io/milvus/pkg/v2/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/replicateutil"
)
func IsReplicationRemovedByAlterReplicateConfigMessage(msg message.ImmutableMessage, replicateInfo *streamingpb.ReplicatePChannelMeta) (replicationRemoved bool) {
prcMsg := message.MustAsImmutableAlterReplicateConfigMessageV2(msg)
replicateConfig := prcMsg.Header().ReplicateConfiguration
currentClusterID := paramtable.Get().CommonCfg.ClusterPrefix.GetValue()
currentCluster := replicateutil.MustNewConfigHelper(currentClusterID, replicateConfig).GetCurrentCluster()
_, err := currentCluster.GetTargetChannel(replicateInfo.GetSourceChannelName(),
replicateInfo.GetTargetCluster().GetClusterId())
if err != nil {
// Cannot find the target channel, it means that the `current->target` topology edge is removed,
// it means that the replication is removed.
return true
}
return false
}

View File

@ -19,25 +19,18 @@ package cdc
import (
"context"
"sync"
"time"
"github.com/tikv/client-go/v2/txnkv"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/cdc"
"github.com/milvus-io/milvus/internal/cdc/controller/controllerimpl"
"github.com/milvus-io/milvus/internal/cdc/replication/replicatemanager"
"github.com/milvus-io/milvus/internal/cdc/resource"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/util/componentutil"
kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv"
"github.com/milvus-io/milvus/pkg/v2/kv"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/util"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
@ -46,11 +39,8 @@ type Server struct {
ctx context.Context
cancel context.CancelFunc
watchKV kv.WatchKV
cdcServer *cdc.CDCServer
etcdCli *clientv3.Client
tikvCli *txnkv.Client
etcdCli *clientv3.Client
componentState *componentutil.ComponentStateService
stopOnce sync.Once
@ -108,13 +98,6 @@ func (s *Server) stop() {
}
}
// Stop tikv
if s.tikvCli != nil {
if err := s.tikvCli.Close(); err != nil {
log.Warn("cdc stop tikv client failed", zap.Error(err))
}
}
log.Info("cdc stop done")
}
@ -137,17 +120,11 @@ func (s *Server) init() (err error) {
// Create etcd client.
s.etcdCli, _ = kvfactory.GetEtcdAndPath()
if err := s.initMeta(); err != nil {
return err
}
// Create CDC service.
s.cdcServer = cdc.NewCDCServer(s.ctx)
resource.Init(
resource.OptMetaKV(s.watchKV),
resource.OptETCD(s.etcdCli),
resource.OptReplicateManagerClient(replicatemanager.NewReplicateManager()),
resource.OptController(controllerimpl.NewController()),
)
return nil
}
@ -167,20 +144,3 @@ func (s *Server) start() (err error) {
s.componentState.OnInitialized(0)
return nil
}
func (s *Server) initMeta() error {
params := paramtable.Get()
metaType := params.MetaStoreCfg.MetaStoreType.GetValue()
log := log.Ctx(s.ctx)
log.Info("cdc connecting to metadata store", zap.String("metaType", metaType))
metaRootPath := ""
if metaType == util.MetaStoreTypeTiKV {
// TODO: sheep, support watch operation for tikv
panic("tikv is not supported for cdc")
} else if metaType == util.MetaStoreTypeEtcd {
metaRootPath = params.EtcdCfg.MetaRootPath.GetValue()
s.watchKV = etcdkv.NewEtcdKV(s.etcdCli, metaRootPath,
etcdkv.WithRequestTimeout(paramtable.Get().ServiceParam.EtcdCfg.RequestTimeout.GetAsDuration(time.Millisecond)))
}
return nil
}

View File

@ -205,25 +205,10 @@ type QueryCoordCatalog interface {
GetCollectionTargets(ctx context.Context) (map[int64]*querypb.CollectionTarget, error)
}
// ReplicationCatalog is the interface for replication catalog
// it's used by CDC component.
type ReplicationCatalog interface {
// RemoveReplicatePChannel removes the replicate pchannel from metastore.
// Remove the task of CDC replication task of current cluster, should be called when a CDC replication task is finished.
RemoveReplicatePChannel(ctx context.Context, meta *streamingpb.ReplicatePChannelMeta) error
// ListReplicatePChannels lists all replicate pchannels from metastore.
// every ReplicatePChannelMeta is a task of CDC replication task of current cluster which is a source cluster in replication topology.
// the task is written by streaming coord, SaveReplicateConfiguration operation.
ListReplicatePChannels(ctx context.Context) ([]*streamingpb.ReplicatePChannelMeta, error)
}
// StreamingCoordCataLog is the interface for streamingcoord catalog
// All write operation of catalog is reliable, the error will only be returned if the ctx is canceled,
// otherwise it will retry until success.
type StreamingCoordCataLog interface {
ReplicationCatalog
// GetCChannel get the control channel from metastore.
GetCChannel(ctx context.Context) (*streamingpb.CChannelMeta, error)

View File

@ -41,12 +41,6 @@ func NewCataLog(metaKV kv.MetaKv) metastore.StreamingCoordCataLog {
}
}
func NewReplicationCatalog(metaKV kv.MetaKv) metastore.ReplicationCatalog {
return &catalog{
metaKV: metaKV,
}
}
// catalog is a kv based catalog.
type catalog struct {
metaKV kv.MetaKv
@ -219,28 +213,6 @@ func (c *catalog) GetReplicateConfiguration(ctx context.Context) (*streamingpb.R
return config, nil
}
func (c *catalog) RemoveReplicatePChannel(ctx context.Context, task *streamingpb.ReplicatePChannelMeta) error {
key := buildReplicatePChannelPath(task.GetTargetCluster().GetClusterId(), task.GetSourceChannelName())
return c.metaKV.Remove(ctx, key)
}
func (c *catalog) ListReplicatePChannels(ctx context.Context) ([]*streamingpb.ReplicatePChannelMeta, error) {
keys, values, err := c.metaKV.LoadWithPrefix(ctx, ReplicatePChannelMetaPrefix)
if err != nil {
return nil, err
}
infos := make([]*streamingpb.ReplicatePChannelMeta, 0, len(values))
for k, value := range values {
info := &streamingpb.ReplicatePChannelMeta{}
err = proto.Unmarshal([]byte(value), info)
if err != nil {
return nil, errors.Wrapf(err, "unmarshal replicate pchannel meta %s failed", keys[k])
}
infos = append(infos, info)
}
return infos, nil
}
func BuildReplicatePChannelMetaKey(meta *streamingpb.ReplicatePChannelMeta) string {
targetClusterID := meta.GetTargetCluster().GetClusterId()
sourceChannelName := meta.GetSourceChannelName()

View File

@ -2,7 +2,6 @@ package streamingcoord
import (
"context"
"sort"
"strings"
"testing"
@ -13,7 +12,6 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/pkg/v2/mocks/mock_kv"
"github.com/milvus-io/milvus/pkg/v2/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
)
func TestCatalog(t *testing.T) {
@ -137,27 +135,12 @@ func TestCatalog_ReplicationCatalog(t *testing.T) {
kv.EXPECT().Load(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, s string) (string, error) {
return kvStorage[s], nil
})
kv.EXPECT().LoadWithPrefix(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, s string) ([]string, []string, error) {
keys := make([]string, 0, len(kvStorage))
vals := make([]string, 0, len(kvStorage))
for k, v := range kvStorage {
if strings.HasPrefix(k, s) {
keys = append(keys, k)
vals = append(vals, v)
}
}
return keys, vals, nil
})
kv.EXPECT().MultiSave(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, kvs map[string]string) error {
for k, v := range kvs {
kvStorage[k] = v
}
return nil
})
kv.EXPECT().Remove(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, key string) error {
delete(kvStorage, key)
return nil
})
catalog := NewCataLog(kv)
@ -217,38 +200,4 @@ func TestCatalog_ReplicationCatalog(t *testing.T) {
},
})
assert.NoError(t, err)
infos, err := catalog.ListReplicatePChannels(context.Background())
assert.NoError(t, err)
assert.Len(t, infos, 2)
sort.Slice(infos, func(i, j int) bool {
return infos[i].GetTargetChannelName() < infos[j].GetTargetChannelName()
})
assert.Equal(t, infos[0].GetSourceChannelName(), "source-channel-1")
assert.Equal(t, infos[0].GetTargetChannelName(), "target-channel-1")
assert.Equal(t, infos[0].GetTargetCluster().GetClusterId(), "target-cluster")
assert.Equal(t, infos[1].GetSourceChannelName(), "source-channel-2")
assert.Equal(t, infos[1].GetTargetChannelName(), "target-channel-2")
assert.Equal(t, infos[1].GetTargetCluster().GetClusterId(), "target-cluster")
err = catalog.RemoveReplicatePChannel(context.Background(), &streamingpb.ReplicatePChannelMeta{
SourceChannelName: "source-channel-1",
TargetChannelName: "target-channel-1",
TargetCluster: &commonpb.MilvusCluster{ClusterId: "target-cluster"},
})
assert.NoError(t, err)
infos, err = catalog.ListReplicatePChannels(context.Background())
assert.NoError(t, err)
assert.Len(t, infos, 1)
assert.Equal(t, infos[0].GetSourceChannelName(), "source-channel-2")
assert.Equal(t, infos[0].GetTargetChannelName(), "target-channel-2")
assert.Equal(t, infos[0].GetTargetCluster().GetClusterId(), "target-cluster")
kv.EXPECT().Load(mock.Anything, mock.Anything).Unset()
kv.EXPECT().Load(mock.Anything, mock.Anything).Return("", merr.ErrIoKeyNotFound)
cfg, err = catalog.GetReplicateConfiguration(context.Background())
assert.NoError(t, err)
assert.Nil(t, cfg)
}

View File

@ -1,143 +0,0 @@
// Code generated by mockery v2.53.3. DO NOT EDIT.
package mock_metastore
import (
context "context"
mock "github.com/stretchr/testify/mock"
streamingpb "github.com/milvus-io/milvus/pkg/v2/proto/streamingpb"
)
// MockReplicationCatalog is an autogenerated mock type for the ReplicationCatalog type
type MockReplicationCatalog struct {
mock.Mock
}
type MockReplicationCatalog_Expecter struct {
mock *mock.Mock
}
func (_m *MockReplicationCatalog) EXPECT() *MockReplicationCatalog_Expecter {
return &MockReplicationCatalog_Expecter{mock: &_m.Mock}
}
// ListReplicatePChannels provides a mock function with given fields: ctx
func (_m *MockReplicationCatalog) ListReplicatePChannels(ctx context.Context) ([]*streamingpb.ReplicatePChannelMeta, error) {
ret := _m.Called(ctx)
if len(ret) == 0 {
panic("no return value specified for ListReplicatePChannels")
}
var r0 []*streamingpb.ReplicatePChannelMeta
var r1 error
if rf, ok := ret.Get(0).(func(context.Context) ([]*streamingpb.ReplicatePChannelMeta, error)); ok {
return rf(ctx)
}
if rf, ok := ret.Get(0).(func(context.Context) []*streamingpb.ReplicatePChannelMeta); ok {
r0 = rf(ctx)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*streamingpb.ReplicatePChannelMeta)
}
}
if rf, ok := ret.Get(1).(func(context.Context) error); ok {
r1 = rf(ctx)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockReplicationCatalog_ListReplicatePChannels_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListReplicatePChannels'
type MockReplicationCatalog_ListReplicatePChannels_Call struct {
*mock.Call
}
// ListReplicatePChannels is a helper method to define mock.On call
// - ctx context.Context
func (_e *MockReplicationCatalog_Expecter) ListReplicatePChannels(ctx interface{}) *MockReplicationCatalog_ListReplicatePChannels_Call {
return &MockReplicationCatalog_ListReplicatePChannels_Call{Call: _e.mock.On("ListReplicatePChannels", ctx)}
}
func (_c *MockReplicationCatalog_ListReplicatePChannels_Call) Run(run func(ctx context.Context)) *MockReplicationCatalog_ListReplicatePChannels_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context))
})
return _c
}
func (_c *MockReplicationCatalog_ListReplicatePChannels_Call) Return(_a0 []*streamingpb.ReplicatePChannelMeta, _a1 error) *MockReplicationCatalog_ListReplicatePChannels_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockReplicationCatalog_ListReplicatePChannels_Call) RunAndReturn(run func(context.Context) ([]*streamingpb.ReplicatePChannelMeta, error)) *MockReplicationCatalog_ListReplicatePChannels_Call {
_c.Call.Return(run)
return _c
}
// RemoveReplicatePChannel provides a mock function with given fields: ctx, meta
func (_m *MockReplicationCatalog) RemoveReplicatePChannel(ctx context.Context, meta *streamingpb.ReplicatePChannelMeta) error {
ret := _m.Called(ctx, meta)
if len(ret) == 0 {
panic("no return value specified for RemoveReplicatePChannel")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, *streamingpb.ReplicatePChannelMeta) error); ok {
r0 = rf(ctx, meta)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockReplicationCatalog_RemoveReplicatePChannel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemoveReplicatePChannel'
type MockReplicationCatalog_RemoveReplicatePChannel_Call struct {
*mock.Call
}
// RemoveReplicatePChannel is a helper method to define mock.On call
// - ctx context.Context
// - meta *streamingpb.ReplicatePChannelMeta
func (_e *MockReplicationCatalog_Expecter) RemoveReplicatePChannel(ctx interface{}, meta interface{}) *MockReplicationCatalog_RemoveReplicatePChannel_Call {
return &MockReplicationCatalog_RemoveReplicatePChannel_Call{Call: _e.mock.On("RemoveReplicatePChannel", ctx, meta)}
}
func (_c *MockReplicationCatalog_RemoveReplicatePChannel_Call) Run(run func(ctx context.Context, meta *streamingpb.ReplicatePChannelMeta)) *MockReplicationCatalog_RemoveReplicatePChannel_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*streamingpb.ReplicatePChannelMeta))
})
return _c
}
func (_c *MockReplicationCatalog_RemoveReplicatePChannel_Call) Return(_a0 error) *MockReplicationCatalog_RemoveReplicatePChannel_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockReplicationCatalog_RemoveReplicatePChannel_Call) RunAndReturn(run func(context.Context, *streamingpb.ReplicatePChannelMeta) error) *MockReplicationCatalog_RemoveReplicatePChannel_Call {
_c.Call.Return(run)
return _c
}
// NewMockReplicationCatalog creates a new instance of MockReplicationCatalog. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockReplicationCatalog(t interface {
mock.TestingT
Cleanup(func())
}) *MockReplicationCatalog {
mock := &MockReplicationCatalog{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -313,111 +313,6 @@ func (_c *MockStreamingCoordCataLog_ListPChannel_Call) RunAndReturn(run func(con
return _c
}
// ListReplicatePChannels provides a mock function with given fields: ctx
func (_m *MockStreamingCoordCataLog) ListReplicatePChannels(ctx context.Context) ([]*streamingpb.ReplicatePChannelMeta, error) {
ret := _m.Called(ctx)
if len(ret) == 0 {
panic("no return value specified for ListReplicatePChannels")
}
var r0 []*streamingpb.ReplicatePChannelMeta
var r1 error
if rf, ok := ret.Get(0).(func(context.Context) ([]*streamingpb.ReplicatePChannelMeta, error)); ok {
return rf(ctx)
}
if rf, ok := ret.Get(0).(func(context.Context) []*streamingpb.ReplicatePChannelMeta); ok {
r0 = rf(ctx)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*streamingpb.ReplicatePChannelMeta)
}
}
if rf, ok := ret.Get(1).(func(context.Context) error); ok {
r1 = rf(ctx)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockStreamingCoordCataLog_ListReplicatePChannels_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListReplicatePChannels'
type MockStreamingCoordCataLog_ListReplicatePChannels_Call struct {
*mock.Call
}
// ListReplicatePChannels is a helper method to define mock.On call
// - ctx context.Context
func (_e *MockStreamingCoordCataLog_Expecter) ListReplicatePChannels(ctx interface{}) *MockStreamingCoordCataLog_ListReplicatePChannels_Call {
return &MockStreamingCoordCataLog_ListReplicatePChannels_Call{Call: _e.mock.On("ListReplicatePChannels", ctx)}
}
func (_c *MockStreamingCoordCataLog_ListReplicatePChannels_Call) Run(run func(ctx context.Context)) *MockStreamingCoordCataLog_ListReplicatePChannels_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context))
})
return _c
}
func (_c *MockStreamingCoordCataLog_ListReplicatePChannels_Call) Return(_a0 []*streamingpb.ReplicatePChannelMeta, _a1 error) *MockStreamingCoordCataLog_ListReplicatePChannels_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockStreamingCoordCataLog_ListReplicatePChannels_Call) RunAndReturn(run func(context.Context) ([]*streamingpb.ReplicatePChannelMeta, error)) *MockStreamingCoordCataLog_ListReplicatePChannels_Call {
_c.Call.Return(run)
return _c
}
// RemoveReplicatePChannel provides a mock function with given fields: ctx, meta
func (_m *MockStreamingCoordCataLog) RemoveReplicatePChannel(ctx context.Context, meta *streamingpb.ReplicatePChannelMeta) error {
ret := _m.Called(ctx, meta)
if len(ret) == 0 {
panic("no return value specified for RemoveReplicatePChannel")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, *streamingpb.ReplicatePChannelMeta) error); ok {
r0 = rf(ctx, meta)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockStreamingCoordCataLog_RemoveReplicatePChannel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemoveReplicatePChannel'
type MockStreamingCoordCataLog_RemoveReplicatePChannel_Call struct {
*mock.Call
}
// RemoveReplicatePChannel is a helper method to define mock.On call
// - ctx context.Context
// - meta *streamingpb.ReplicatePChannelMeta
func (_e *MockStreamingCoordCataLog_Expecter) RemoveReplicatePChannel(ctx interface{}, meta interface{}) *MockStreamingCoordCataLog_RemoveReplicatePChannel_Call {
return &MockStreamingCoordCataLog_RemoveReplicatePChannel_Call{Call: _e.mock.On("RemoveReplicatePChannel", ctx, meta)}
}
func (_c *MockStreamingCoordCataLog_RemoveReplicatePChannel_Call) Run(run func(ctx context.Context, meta *streamingpb.ReplicatePChannelMeta)) *MockStreamingCoordCataLog_RemoveReplicatePChannel_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*streamingpb.ReplicatePChannelMeta))
})
return _c
}
func (_c *MockStreamingCoordCataLog_RemoveReplicatePChannel_Call) Return(_a0 error) *MockStreamingCoordCataLog_RemoveReplicatePChannel_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockStreamingCoordCataLog_RemoveReplicatePChannel_Call) RunAndReturn(run func(context.Context, *streamingpb.ReplicatePChannelMeta) error) *MockStreamingCoordCataLog_RemoveReplicatePChannel_Call {
_c.Call.Return(run)
return _c
}
// SaveBroadcastTask provides a mock function with given fields: ctx, broadcastID, task
func (_m *MockStreamingCoordCataLog) SaveBroadcastTask(ctx context.Context, broadcastID uint64, task *streamingpb.BroadcastTask) error {
ret := _m.Called(ctx, broadcastID, task)