Zhen Ye 2aa48bf4ca
fix: wrong execution order of DDL/DCL on secondary (#44886)
issue: #44697, #44696

- The DDL executing order of secondary keep same with order of control
channel timetick now.
- filtering the control channel operation on shard manager of
streamingnode to avoid wrong vchannel of create segment.
- fix that the immutable txn message lost replicate header.

---------

Signed-off-by: chyezh <chyezh@outlook.com>
2025-10-21 22:38:05 +08:00

195 lines
6.2 KiB
Go

package utility
import (
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/metricsutil"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
)
// NewTxnBuffer creates a new txn buffer.
func NewTxnBuffer(logger *log.MLogger, metrics *metricsutil.ScannerMetrics) *TxnBuffer {
return &TxnBuffer{
logger: logger,
builders: make(map[message.TxnID]*message.ImmutableTxnMessageBuilder),
metrics: metrics,
}
}
// TxnBuffer is a buffer for txn messages.
type TxnBuffer struct {
logger *log.MLogger
builders map[message.TxnID]*message.ImmutableTxnMessageBuilder
metrics *metricsutil.ScannerMetrics
bytes int
}
func (b *TxnBuffer) Bytes() int {
return b.bytes
}
// GetUncommittedMessageBuilder returns the uncommitted message builders.
func (b *TxnBuffer) GetUncommittedMessageBuilder() map[message.TxnID]*message.ImmutableTxnMessageBuilder {
return b.builders
}
// HandleImmutableMessages handles immutable messages.
// The timetick of msgs should be in ascending order, and the timetick of all messages is less than or equal to ts.
// Hold the uncommitted txn messages until the commit or rollback message comes and pop the committed txn messages.
func (b *TxnBuffer) HandleImmutableMessages(msgs []message.ImmutableMessage, ts uint64) []message.ImmutableMessage {
result := make([]message.ImmutableMessage, 0, len(msgs))
for _, msg := range msgs {
// Not a txn message, can be consumed right now.
if msg.TxnContext() == nil {
b.metrics.ObserveAutoCommitTxn()
result = append(result, msg)
continue
}
switch msg.MessageType() {
case message.MessageTypeBeginTxn:
b.handleBeginTxn(msg)
case message.MessageTypeCommitTxn:
if newTxnMsg := b.handleCommitTxn(msg); newTxnMsg != nil {
result = append(result, newTxnMsg)
}
case message.MessageTypeRollbackTxn:
b.handleRollbackTxn(msg)
default:
b.handleTxnBodyMessage(msg)
}
}
b.clearExpiredTxn(ts)
return result
}
// handleBeginTxn handles begin txn message.
func (b *TxnBuffer) handleBeginTxn(msg message.ImmutableMessage) {
beginMsg, err := message.AsImmutableBeginTxnMessageV2(msg)
if err != nil {
b.logger.DPanic(
"failed to convert message to begin txn message, it's a critical error",
zap.Int64("txnID", int64(beginMsg.TxnContext().TxnID)),
zap.Any("messageID", beginMsg.MessageID()),
zap.Error(err))
return
}
if _, ok := b.builders[beginMsg.TxnContext().TxnID]; ok {
// Because the wal on secondary node may replicate the same txn message, so we need to reset the txn from buffer to avoid
// the txn body repeated.
b.logger.Warn(
"txn id already exist, rollback the txn from buffer",
zap.Int64("txnID", int64(beginMsg.TxnContext().TxnID)),
zap.Any("messageID", beginMsg.MessageID()),
)
b.rollbackTxn(beginMsg.TxnContext().TxnID)
}
b.builders[beginMsg.TxnContext().TxnID] = message.NewImmutableTxnMessageBuilder(beginMsg)
b.bytes += beginMsg.EstimateSize()
}
// handleCommitTxn handles commit txn message.
func (b *TxnBuffer) handleCommitTxn(msg message.ImmutableMessage) message.ImmutableMessage {
commitMsg, err := message.AsImmutableCommitTxnMessageV2(msg)
if err != nil {
b.logger.DPanic(
"failed to convert message to commit txn message, it's a critical error",
zap.Int64("txnID", int64(commitMsg.TxnContext().TxnID)),
zap.Any("messageID", commitMsg.MessageID()),
zap.Error(err))
return nil
}
builder, ok := b.builders[commitMsg.TxnContext().TxnID]
if !ok {
b.logger.Warn(
"txn id not exist, it may be a repeated committed message, so ignore it",
zap.Int64("txnID", int64(commitMsg.TxnContext().TxnID)),
zap.Any("messageID", commitMsg.MessageID()),
)
return nil
}
// build the txn message and remove it from buffer.
b.bytes -= builder.EstimateSize()
txnMsg, err := builder.Build(commitMsg)
delete(b.builders, commitMsg.TxnContext().TxnID)
if err != nil {
b.metrics.ObserveErrorTxn()
b.logger.Warn(
"failed to build txn message, it's a critical error, some data is lost",
zap.Int64("txnID", int64(commitMsg.TxnContext().TxnID)),
zap.Any("messageID", commitMsg.MessageID()),
zap.Error(err))
return nil
}
b.logger.Debug(
"the txn is committed",
zap.Int64("txnID", int64(commitMsg.TxnContext().TxnID)),
zap.Any("messageID", commitMsg.MessageID()),
)
b.metrics.ObserveTxn(message.TxnStateCommitted)
return txnMsg
}
// handleRollbackTxn handles rollback txn message.
func (b *TxnBuffer) handleRollbackTxn(msg message.ImmutableMessage) {
rollbackMsg, err := message.AsImmutableRollbackTxnMessageV2(msg)
if err != nil {
b.logger.DPanic(
"failed to convert message to rollback txn message, it's a critical error",
zap.Int64("txnID", int64(rollbackMsg.TxnContext().TxnID)),
zap.Any("messageID", rollbackMsg.MessageID()),
zap.Error(err))
return
}
b.logger.Debug(
"the txn is rollback, so drop the txn from buffer",
zap.Int64("txnID", int64(rollbackMsg.TxnContext().TxnID)),
zap.Any("messageID", rollbackMsg.MessageID()),
)
b.rollbackTxn(rollbackMsg.TxnContext().TxnID)
}
func (b *TxnBuffer) rollbackTxn(txnID message.TxnID) {
if builder, ok := b.builders[txnID]; ok {
// just drop the txn from buffer.
delete(b.builders, txnID)
b.bytes -= builder.EstimateSize()
b.metrics.ObserveTxn(message.TxnStateRollbacked)
}
}
// handleTxnBodyMessage handles txn body message.
func (b *TxnBuffer) handleTxnBodyMessage(msg message.ImmutableMessage) {
builder, ok := b.builders[msg.TxnContext().TxnID]
if !ok {
b.logger.Warn(
"txn id not exist, so ignore the body message",
zap.Int64("txnID", int64(msg.TxnContext().TxnID)),
zap.Any("messageID", msg.MessageID()),
)
return
}
builder.Add(msg)
b.bytes += msg.EstimateSize()
}
// clearExpiredTxn clears the expired txn.
func (b *TxnBuffer) clearExpiredTxn(ts uint64) {
for txnID, builder := range b.builders {
if builder.ExpiredTimeTick() <= ts {
delete(b.builders, txnID)
b.bytes -= builder.EstimateSize()
b.metrics.ObserveExpiredTxn()
if b.logger.Level().Enabled(zap.DebugLevel) {
b.logger.Debug(
"the txn is expired, so drop the txn from buffer",
zap.Int64("txnID", int64(txnID)),
zap.Uint64("expiredTimeTick", builder.ExpiredTimeTick()),
zap.Uint64("currentTimeTick", ts),
)
}
}
}
}