milvus/pkg/streaming/util/message/marshal_log_object.go
Zhen Ye dfbb02a5f7
enhance: make streaming message as a log field for easier coding (#41545)
issue: #41544

- implement message can be logged as a field by zap.
- fix too many slow log for woodpecker.

Signed-off-by: chyezh <chyezh@outlook.com>
2025-04-28 14:38:42 +08:00

118 lines
4.1 KiB
Go

package message
import (
"reflect"
"strconv"
"strings"
"go.uber.org/zap/zapcore"
"google.golang.org/protobuf/proto"
)
// MarshalLogObject encodes the message into zap log object.
func (m *messageImpl) MarshalLogObject(enc zapcore.ObjectEncoder) error {
if m == nil {
return nil
}
enc.AddString("type", m.MessageType().String())
enc.AddString("vchannel", m.VChannel())
if m.properties.Exist(messageTimeTick) {
enc.AddUint64("timetick", m.TimeTick())
}
if txn := m.TxnContext(); txn != nil {
enc.AddInt64("txnID", int64(txn.TxnID))
}
if broadcast := m.BroadcastHeader(); broadcast != nil {
enc.AddInt64("broadcastID", int64(broadcast.BroadcastID))
}
enc.AddInt("size", len(m.payload))
marshalSpecializedHeader(m.MessageType(), m.properties[messageHeader], enc)
return nil
}
// MarshalLogObject encodes the immutable message into zap log object.
func (m *immutableMessageImpl) MarshalLogObject(enc zapcore.ObjectEncoder) error {
if m == nil {
return nil
}
enc.AddString("type", m.MessageType().String())
enc.AddString("vchannel", m.VChannel())
enc.AddUint64("timetick", m.TimeTick())
enc.AddString("messageID", m.MessageID().String())
enc.AddString("lastConfirmed", m.LastConfirmedMessageID().String())
if txn := m.TxnContext(); txn != nil {
enc.AddInt64("txnID", int64(txn.TxnID))
}
if broadcast := m.BroadcastHeader(); broadcast != nil {
enc.AddInt64("broadcastID", int64(broadcast.BroadcastID))
}
enc.AddInt("size", len(m.payload))
marshalSpecializedHeader(m.MessageType(), m.properties[messageHeader], enc)
return nil
}
func (m *immutableTxnMessageImpl) MarshalLogObject(enc zapcore.ObjectEncoder) error {
if m == nil {
return nil
}
enc.AddArray("txn", zapcore.ArrayMarshalerFunc(func(enc zapcore.ArrayEncoder) error {
txnMessage := AsImmutableTxnMessage(m)
txnMessage.RangeOver(func(im ImmutableMessage) error {
enc.AppendObject(im)
return nil
})
return nil
}))
return nil
}
// marshalSpecializedHeader marshals the specialized header of the message.
func marshalSpecializedHeader(t MessageType, h string, enc zapcore.ObjectEncoder) {
typ := messageTypeToCustomHeaderMap[t]
// must be a proto type.
header := reflect.New(typ.Elem()).Interface().(proto.Message)
if err := DecodeProto(h, header); err != nil {
enc.AddString("headerDecodeError", err.Error())
return
}
switch header := header.(type) {
case *InsertMessageHeader:
enc.AddInt64("collectionID", header.GetCollectionId())
segmentIDs := make([]string, 0, len(header.GetPartitions()))
for _, partition := range header.GetPartitions() {
segmentIDs = append(segmentIDs, strconv.FormatInt(partition.GetSegmentAssignment().GetSegmentId(), 10))
}
enc.AddString("segmentIDs", strings.Join(segmentIDs, "|"))
case *DeleteMessageHeader:
enc.AddInt64("collectionID", header.GetCollectionId())
case *CreateCollectionMessageHeader:
enc.AddInt64("collectionID", header.GetCollectionId())
case *DropCollectionMessageHeader:
enc.AddInt64("collectionID", header.GetCollectionId())
case *CreatePartitionMessageHeader:
enc.AddInt64("collectionID", header.GetCollectionId())
enc.AddInt64("partitionID", header.GetPartitionId())
case *DropPartitionMessageHeader:
enc.AddInt64("collectionID", header.GetCollectionId())
enc.AddInt64("partitionID", header.GetPartitionId())
case *CreateSegmentMessageHeader:
enc.AddInt64("collectionID", header.GetCollectionId())
segmentIDs := make([]string, 0, len(header.GetSegmentIds()))
for _, segmentID := range header.GetSegmentIds() {
segmentIDs = append(segmentIDs, strconv.FormatInt(segmentID, 10))
}
enc.AddString("segmentIDs", strings.Join(segmentIDs, "|"))
case *FlushMessageHeader:
enc.AddInt64("collectionID", header.GetCollectionId())
segmentIDs := make([]string, 0, len(header.GetSegmentIds()))
for _, segmentID := range header.GetSegmentIds() {
segmentIDs = append(segmentIDs, strconv.FormatInt(segmentID, 10))
}
enc.AddString("segmentIDs", strings.Join(segmentIDs, "|"))
case *ManualFlushMessageHeader:
enc.AddInt64("collectionID", header.GetCollectionId())
case *SchemaChangeMessageHeader:
case *ImportMessageHeader:
}
}