enhance: Log start position of delete msgs (#40315)

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
XuanYang-cn 2025-03-10 14:58:05 +08:00 committed by GitHub
parent 345538d10a
commit 6f70e6d1e1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 8 additions and 2 deletions

View File

@ -221,7 +221,12 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
continue continue
} }
log.Debug("DDNode receive delete messages", zap.String("channel", ddn.vChannelName), zap.Int64("numRows", dmsg.NumRows)) log.Debug("DDNode receive delete messages",
zap.String("channel", ddn.vChannelName),
zap.Int64("numRows", dmsg.NumRows),
zap.Uint64("startPosTs", msMsg.StartPositions()[0].GetTimestamp()),
zap.Uint64("endPosTs", msMsg.EndPositions()[0].GetTimestamp()),
)
util.GetRateCollector().Add(metricsinfo.DeleteConsumeThroughput, float64(proto.Size(dmsg.DeleteRequest))) util.GetRateCollector().Add(metricsinfo.DeleteConsumeThroughput, float64(proto.Size(dmsg.DeleteRequest)))
metrics.DataNodeConsumeBytesCount. metrics.DataNodeConsumeBytesCount.

View File

@ -324,7 +324,8 @@ func TestFlowGraph_DDNode_Operate(t *testing.T) {
}, },
} }
tsMessages := []msgstream.TsMsg{dMsg} tsMessages := []msgstream.TsMsg{dMsg}
var msgStreamMsg Msg = flowgraph.GenerateMsgStreamMsg(tsMessages, 0, 0, nil, nil) pos := &msgpb.MsgPosition{Timestamp: 0}
var msgStreamMsg Msg = flowgraph.GenerateMsgStreamMsg(tsMessages, 0, 0, []*msgpb.MsgPosition{pos}, []*msgpb.MsgPosition{pos})
// Test // Test
rt := ddn.Operate([]Msg{msgStreamMsg}) rt := ddn.Operate([]Msg{msgStreamMsg})