diff --git a/internal/flushcommon/pipeline/flow_graph_dd_node.go b/internal/flushcommon/pipeline/flow_graph_dd_node.go index 1658d960b1..ae1e11d9ad 100644 --- a/internal/flushcommon/pipeline/flow_graph_dd_node.go +++ b/internal/flushcommon/pipeline/flow_graph_dd_node.go @@ -221,7 +221,12 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { continue } - log.Debug("DDNode receive delete messages", zap.String("channel", ddn.vChannelName), zap.Int64("numRows", dmsg.NumRows)) + log.Debug("DDNode receive delete messages", + zap.String("channel", ddn.vChannelName), + zap.Int64("numRows", dmsg.NumRows), + zap.Uint64("startPosTs", msMsg.StartPositions()[0].GetTimestamp()), + zap.Uint64("endPosTs", msMsg.EndPositions()[0].GetTimestamp()), + ) util.GetRateCollector().Add(metricsinfo.DeleteConsumeThroughput, float64(proto.Size(dmsg.DeleteRequest))) metrics.DataNodeConsumeBytesCount. diff --git a/internal/flushcommon/pipeline/flow_graph_dd_node_test.go b/internal/flushcommon/pipeline/flow_graph_dd_node_test.go index 5030598854..b2d7ebcf10 100644 --- a/internal/flushcommon/pipeline/flow_graph_dd_node_test.go +++ b/internal/flushcommon/pipeline/flow_graph_dd_node_test.go @@ -324,7 +324,8 @@ func TestFlowGraph_DDNode_Operate(t *testing.T) { }, } tsMessages := []msgstream.TsMsg{dMsg} - var msgStreamMsg Msg = flowgraph.GenerateMsgStreamMsg(tsMessages, 0, 0, nil, nil) + pos := &msgpb.MsgPosition{Timestamp: 0} + var msgStreamMsg Msg = flowgraph.GenerateMsgStreamMsg(tsMessages, 0, 0, []*msgpb.MsgPosition{pos}, []*msgpb.MsgPosition{pos}) // Test rt := ddn.Operate([]Msg{msgStreamMsg})