From 4f5409e1fedd74e352cf05bee29333e752b0d18e Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Fri, 13 Jun 2025 17:43:07 +0800 Subject: [PATCH] fix: panic when schema change (#42727) issue: #42723 Signed-off-by: chyezh --- pkg/mq/msgdispatcher/dispatcher.go | 2 +- pkg/streaming/util/message/adaptor/ts_msg_newer.go | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/mq/msgdispatcher/dispatcher.go b/pkg/mq/msgdispatcher/dispatcher.go index 6d4918722c..8cc7b4fda0 100644 --- a/pkg/mq/msgdispatcher/dispatcher.go +++ b/pkg/mq/msgdispatcher/dispatcher.go @@ -145,6 +145,7 @@ func NewDispatcher( pchannel: pchannel, targets: typeutil.NewConcurrentMap[string, *target](), stream: stream, + includeSkipWhenSplit: includeSkipWhenSplit, } metrics.NumConsumers.WithLabelValues(paramtable.GetRole(), fmt.Sprint(paramtable.GetNodeID())).Inc() @@ -271,7 +272,6 @@ func (d *Dispatcher) work() { log.Debug("skip msg info", zap.String("vchannel", vchannel), zap.String("msgType", msg.Type().String()), - zap.Int64("msgID", msg.ID()), zap.Uint64("msgBeginTs", msg.BeginTs()), zap.Uint64("msgEndTs", msg.EndTs()), zap.Uint64("packBeginTs", p.BeginTs), diff --git a/pkg/streaming/util/message/adaptor/ts_msg_newer.go b/pkg/streaming/util/message/adaptor/ts_msg_newer.go index 5d168a552c..eee2e23977 100644 --- a/pkg/streaming/util/message/adaptor/ts_msg_newer.go +++ b/pkg/streaming/util/message/adaptor/ts_msg_newer.go @@ -127,6 +127,10 @@ type SchemaChangeMessageBody struct { SchemaChangeMessage message.ImmutableSchemaChangeMessageV2 } +func (s *SchemaChangeMessageBody) ID() msgstream.UniqueID { + return 0 +} + func NewSchemaChangeMessageBody(msg message.ImmutableMessage) (msgstream.TsMsg, error) { schChgMsg, err := message.AsImmutableCollectionSchemaChangeV2(msg) if err != nil {