diff --git a/pkg/mq/msgdispatcher/dispatcher.go b/pkg/mq/msgdispatcher/dispatcher.go index 6d4918722c..8cc7b4fda0 100644 --- a/pkg/mq/msgdispatcher/dispatcher.go +++ b/pkg/mq/msgdispatcher/dispatcher.go @@ -145,6 +145,7 @@ func NewDispatcher( pchannel: pchannel, targets: typeutil.NewConcurrentMap[string, *target](), stream: stream, + includeSkipWhenSplit: includeSkipWhenSplit, } metrics.NumConsumers.WithLabelValues(paramtable.GetRole(), fmt.Sprint(paramtable.GetNodeID())).Inc() @@ -271,7 +272,6 @@ func (d *Dispatcher) work() { log.Debug("skip msg info", zap.String("vchannel", vchannel), zap.String("msgType", msg.Type().String()), - zap.Int64("msgID", msg.ID()), zap.Uint64("msgBeginTs", msg.BeginTs()), zap.Uint64("msgEndTs", msg.EndTs()), zap.Uint64("packBeginTs", p.BeginTs), diff --git a/pkg/streaming/util/message/adaptor/ts_msg_newer.go b/pkg/streaming/util/message/adaptor/ts_msg_newer.go index 5d168a552c..eee2e23977 100644 --- a/pkg/streaming/util/message/adaptor/ts_msg_newer.go +++ b/pkg/streaming/util/message/adaptor/ts_msg_newer.go @@ -127,6 +127,10 @@ type SchemaChangeMessageBody struct { SchemaChangeMessage message.ImmutableSchemaChangeMessageV2 } +func (s *SchemaChangeMessageBody) ID() msgstream.UniqueID { + return 0 +} + func NewSchemaChangeMessageBody(msg message.ImmutableMessage) (msgstream.TsMsg, error) { schChgMsg, err := message.AsImmutableCollectionSchemaChangeV2(msg) if err != nil {