yihao.dai 51f69f32d0
feat: Add CDC support (#44124)
This PR implements a new CDC service for Milvus 2.6, providing log-based
cross-cluster replication.

issue: https://github.com/milvus-io/milvus/issues/44123

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
Signed-off-by: chyezh <chyezh@outlook.com>
Co-authored-by: chyezh <chyezh@outlook.com>
2025-09-16 16:32:01 +08:00

95 lines
2.3 KiB
Go

package wp
import (
"encoding/base64"
"fmt"
"github.com/cockroachdb/errors"
wp "github.com/zilliztech/woodpecker/woodpecker/log"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
)
func NewWpID(id *wp.LogMessageId) message.MessageID {
return wpID{
logMsgId: id,
}
}
func UnmarshalMessageID(data string) (message.MessageID, error) {
id, err := unmarshalMessageID(data)
if err != nil {
return nil, err
}
return id, nil
}
func unmarshalMessageID(data string) (wpID, error) {
val, err := base64.StdEncoding.DecodeString(data)
if err != nil {
return wpID{logMsgId: nil}, errors.Wrapf(message.ErrInvalidMessageID, "decode pulsar fail when decode base64 with err: %s, id: %s", err.Error(), data)
}
msgID, err := wp.DeserializeLogMessageId(val)
if err != nil {
return wpID{logMsgId: nil}, errors.Wrapf(message.ErrInvalidMessageID, "decode pulsar fail when deserialize with err: %s, id: %s", err.Error(), data)
}
return wpID{logMsgId: msgID}, nil
}
var _ message.MessageID = wpID{}
type wpID struct {
logMsgId *wp.LogMessageId
}
func (id wpID) WoodpeckerID() *wp.LogMessageId {
return id.logMsgId
}
func (id wpID) WoodpeckerMsgId() *wp.LogMessageId {
return id.logMsgId
}
func (id wpID) WALName() message.WALName {
return message.WALNameWoodpecker
}
func (id wpID) LT(other message.MessageID) bool {
id2 := other.(wpID)
if id.logMsgId.SegmentId != id2.logMsgId.SegmentId {
return id.logMsgId.SegmentId < id2.logMsgId.SegmentId
}
return id.logMsgId.EntryId < id2.logMsgId.EntryId
}
func (id wpID) LTE(other message.MessageID) bool {
id2 := other.(wpID)
if id.logMsgId.SegmentId < id2.logMsgId.SegmentId {
return true
} else if id.logMsgId.SegmentId > id2.logMsgId.SegmentId {
return false
}
return id.logMsgId.EntryId <= id2.logMsgId.EntryId
}
func (id wpID) EQ(other message.MessageID) bool {
id2 := other.(wpID)
return id.logMsgId.SegmentId == id2.logMsgId.SegmentId && id.logMsgId.EntryId == id2.logMsgId.EntryId
}
func (id wpID) Marshal() string {
return base64.StdEncoding.EncodeToString(id.logMsgId.Serialize())
}
func (id wpID) IntoProto() *commonpb.MessageID {
return &commonpb.MessageID{
Id: id.Marshal(),
WALName: commonpb.WALName(id.WALName()),
}
}
func (id wpID) String() string {
return fmt.Sprintf("%d/%d", id.logMsgId.SegmentId, id.logMsgId.EntryId)
}