diff --git a/internal/flushcommon/pipeline/flow_graph_dd_node.go b/internal/flushcommon/pipeline/flow_graph_dd_node.go index a522f998b8..06e75ac9dc 100644 --- a/internal/flushcommon/pipeline/flow_graph_dd_node.go +++ b/internal/flushcommon/pipeline/flow_graph_dd_node.go @@ -282,6 +282,16 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { } else { logger.Info("handle manual flush message success") } + case commonpb.MsgType_AddCollectionField: + schemaMsg := msg.(*adaptor.SchemaChangeMessageBody) + logger := log.With( + zap.String("vchannel", ddn.Name()), + zap.Int32("msgType", int32(msg.Type())), + zap.Uint64("timetick", schemaMsg.SchemaChangeMessage.TimeTick()), + zap.Int64s("segmentIDs", schemaMsg.SchemaChangeMessage.Header().FlushedSegmentIds), + ) + logger.Info("receive schema change message") + ddn.msgHandler.HandleSchemaChange(ddn.ctx, schemaMsg.SchemaChangeMessage) } }