Zhen Ye c171280f63
enhance: support replicate message in wal. (#44456)
issue: #44123

- support replicate message  in wal of milvus.
- support CDC-replicate recovery from wal.
- fix some CDC replicator bugs

Signed-off-by: chyezh <chyezh@outlook.com>
2025-09-22 17:06:11 +08:00

241 lines
9.5 KiB
Go

package replicates
import (
"context"
"sync"
"github.com/cockroachdb/errors"
"google.golang.org/protobuf/encoding/protojson"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/recovery"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/utility"
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
"github.com/milvus-io/milvus/pkg/v2/util/replicateutil"
)
// ErrNotHandledByReplicateManager is a special error to indicate that the message should not be handled by the replicate manager.
var ErrNotHandledByReplicateManager = errors.New("not handled by replicate manager")
// ReplicateManagerRecoverParam is the parameter for recovering the replicate manager.
type ReplicateManagerRecoverParam struct {
ChannelInfo types.PChannelInfo
CurrentClusterID string
InitialRecoverSnapshot *recovery.RecoverySnapshot // the initial recover snapshot of the replicate manager.
}
// RecoverReplicateManager recovers the replicate manager from the initial recover snapshot.
// It will recover the replicate manager from the initial recover snapshot.
// If the wal is on replicating mode, it will recover the replicate state.
func RecoverReplicateManager(param *ReplicateManagerRecoverParam) (ReplicateManager, error) {
replicateConfigHelper, err := replicateutil.NewConfigHelper(param.CurrentClusterID, param.InitialRecoverSnapshot.Checkpoint.ReplicateConfig)
if err != nil {
return nil, newReplicateViolationErrorForConfig(param.InitialRecoverSnapshot.Checkpoint.ReplicateConfig, err)
}
rm := &replicatesManagerImpl{
mu: sync.Mutex{},
currentClusterID: param.CurrentClusterID,
pchannel: param.ChannelInfo,
replicateConfigHelper: replicateConfigHelper,
}
if !rm.isPrimaryRole() {
// if current cluster is not the primary role,
// recover the secondary state for it.
if rm.secondaryState, err = recoverSecondaryState(param); err != nil {
return nil, err
}
}
return rm, nil
}
// replicatesManagerImpl is the implementation of the replicates manager.
type replicatesManagerImpl struct {
mu sync.Mutex
pchannel types.PChannelInfo
currentClusterID string
replicateConfigHelper *replicateutil.ConfigHelper
secondaryState *secondaryState // if the current cluster is not the primary role, it will have secondaryState.
}
// SwitchReplicateMode switches the replicates manager between replicating mode and non-replicating mode.
func (impl *replicatesManagerImpl) SwitchReplicateMode(_ context.Context, msg message.MutableAlterReplicateConfigMessageV2) error {
impl.mu.Lock()
defer impl.mu.Unlock()
newCfg := msg.Header().ReplicateConfiguration
newGraph, err := replicateutil.NewConfigHelper(impl.currentClusterID, newCfg)
if err != nil {
return newReplicateViolationErrorForConfig(newCfg, err)
}
incomingCurrentClusterConfig := newGraph.GetCurrentCluster()
switch incomingCurrentClusterConfig.Role() {
case replicateutil.RolePrimary:
// drop the replicating state if the current cluster is switched to primary.
impl.secondaryState = nil
case replicateutil.RoleSecondary:
if impl.isPrimaryRole() || impl.secondaryState.SourceClusterID() != incomingCurrentClusterConfig.SourceCluster().GetClusterId() {
// Only update the replicating state when the current cluster switch from primary to secondary,
// or the source cluster is changed.
impl.secondaryState = newSecondaryState(
incomingCurrentClusterConfig.SourceCluster().GetClusterId(),
incomingCurrentClusterConfig.MustGetSourceChannel(impl.pchannel.Name),
)
}
}
impl.replicateConfigHelper = newGraph
return nil
}
func (impl *replicatesManagerImpl) BeginReplicateMessage(ctx context.Context, msg message.MutableMessage) (g ReplicateAcker, err error) {
rh := msg.ReplicateHeader()
// some message type like timetick, create segment, flush are generated by wal itself.
// it should never be handled by the replicates manager.
if msg.MessageType().IsSelfControlled() {
if rh != nil {
return nil, status.NewIgnoreOperation("wal self-controlled message cannot be replicated")
}
return nil, ErrNotHandledByReplicateManager
}
impl.mu.Lock()
defer func() {
if err != nil {
impl.mu.Unlock()
}
}()
switch impl.getRole() {
case replicateutil.RolePrimary:
if rh != nil {
return nil, status.NewReplicateViolation("replicate message cannot be received in primary role")
}
return nil, ErrNotHandledByReplicateManager
case replicateutil.RoleSecondary:
if rh == nil {
return nil, status.NewReplicateViolation("non-replicate message cannot be received in secondary role")
}
return impl.beginReplicateMessage(ctx, msg)
default:
panic("unreachable: invalid role")
}
}
// GetReplicateCheckpoint gets the replicate checkpoint.
func (impl *replicatesManagerImpl) GetReplicateCheckpoint() (*utility.ReplicateCheckpoint, error) {
impl.mu.Lock()
defer impl.mu.Unlock()
if impl.isPrimaryRole() {
return nil, status.NewReplicateViolation("wal is not a secondary cluster in replicating topology")
}
return impl.secondaryState.GetCheckpoint(), nil
}
// beginReplicateMessage begins the replicate message operation.
func (impl *replicatesManagerImpl) beginReplicateMessage(ctx context.Context, msg message.MutableMessage) (ReplicateAcker, error) {
rh := msg.ReplicateHeader()
if rh.ClusterID != impl.secondaryState.SourceClusterID() {
return nil, status.NewReplicateViolation("cluster id mismatch, current: %s, expected: %s", rh.ClusterID, impl.secondaryState.SourceClusterID())
}
// if the incoming message's time tick is less than the checkpoint's time tick,
// it means that the message has been written to the wal, so it can be ignored.
// txn message will share same time tick, so we only filter with <, it will be deduplicated by the txnHelper.
isTxnBody := msg.TxnContext() != nil && msg.MessageType() != message.MessageTypeBeginTxn
if (isTxnBody && rh.TimeTick < impl.secondaryState.GetCheckpoint().TimeTick) || (!isTxnBody && rh.TimeTick <= impl.secondaryState.GetCheckpoint().TimeTick) {
return nil, status.NewIgnoreOperation("message is too old, message_id: %s, time_tick: %d, txn: %t, current time tick: %d",
rh.MessageID, rh.TimeTick, isTxnBody, impl.secondaryState.GetCheckpoint().TimeTick)
}
if msg.TxnContext() != nil {
return impl.startReplicateTxnMessage(ctx, msg, rh)
}
return impl.startReplicateNonTxnMessage(ctx, msg, rh)
}
// startReplicateTxnMessage starts the replicate txn message operation.
func (impl *replicatesManagerImpl) startReplicateTxnMessage(_ context.Context, msg message.MutableMessage, rh *message.ReplicateHeader) (ReplicateAcker, error) {
txn := msg.TxnContext()
switch msg.MessageType() {
case message.MessageTypeBeginTxn:
if err := impl.secondaryState.StartBegin(txn, rh); err != nil {
return nil, err
}
return replicateAckerImpl(func(err error) {
if err == nil {
impl.secondaryState.BeginDone(txn)
}
impl.mu.Unlock()
}), nil
case message.MessageTypeCommitTxn:
if err := impl.secondaryState.StartCommit(txn); err != nil {
return nil, err
}
// only update the checkpoint when the txn is committed.
return replicateAckerImpl(func(err error) {
if err == nil {
impl.secondaryState.CommitDone(txn)
impl.secondaryState.PushForwardCheckpoint(rh.TimeTick, rh.LastConfirmedMessageID)
}
impl.mu.Unlock()
}), nil
case message.MessageTypeRollbackTxn:
panic("unreachable: rollback txn message should never be replicated when wal is on replicating mode")
default:
if err := impl.secondaryState.AddNewMessage(txn, rh); err != nil {
return nil, err
}
return replicateAckerImpl(func(err error) {
if err == nil {
impl.secondaryState.AddNewMessageDone(rh)
}
impl.mu.Unlock()
}), nil
}
}
// startReplicateNonTxnMessage starts the replicate non-txn message operation.
func (impl *replicatesManagerImpl) startReplicateNonTxnMessage(_ context.Context, _ message.MutableMessage, rh *message.ReplicateHeader) (ReplicateAcker, error) {
if impl.secondaryState.CurrentTxn() != nil {
return nil, status.NewReplicateViolation(
"txn is in progress, so the incoming message must be txn message, current txn: %d",
impl.secondaryState.CurrentTxn().TxnID,
)
}
return replicateAckerImpl(func(err error) {
if err == nil {
impl.secondaryState.PushForwardCheckpoint(rh.TimeTick, rh.LastConfirmedMessageID)
}
impl.mu.Unlock()
}), nil
}
// Role returns the role of the current cluster in the replicate topology.
func (impl *replicatesManagerImpl) Role() replicateutil.Role {
impl.mu.Lock()
defer impl.mu.Unlock()
return impl.getRole()
}
// getRole returns the role of the current cluster in the replicate topology.
func (impl *replicatesManagerImpl) getRole() replicateutil.Role {
if impl.replicateConfigHelper == nil {
return replicateutil.RolePrimary
}
return impl.replicateConfigHelper.MustGetCluster(impl.currentClusterID).Role()
}
// isPrimaryRole checks if the current cluster is the primary role.
func (impl *replicatesManagerImpl) isPrimaryRole() bool {
return impl.getRole() == replicateutil.RolePrimary
}
// newReplicateViolationErrorForConfig creates a new replicate violation error for the given configuration and error.
func newReplicateViolationErrorForConfig(cfg *commonpb.ReplicateConfiguration, err error) error {
bytes, _ := protojson.Marshal(cfg)
return status.NewReplicateViolation("when greating replciate graph, %s, %s", string(bytes), err.Error())
}