mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
This PR implements a new CDC service for Milvus 2.6, providing log-based cross-cluster replication. issue: https://github.com/milvus-io/milvus/issues/44123 --------- Signed-off-by: bigsheeper <yihao.dai@zilliz.com> Signed-off-by: chyezh <chyezh@outlook.com> Co-authored-by: chyezh <chyezh@outlook.com>
119 lines
3.4 KiB
Go
119 lines
3.4 KiB
Go
package kafka
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/cockroachdb/errors"
|
|
"github.com/confluentinc/confluent-kafka-go/kafka"
|
|
|
|
"github.com/milvus-io/milvus/pkg/v2/proto/streamingpb"
|
|
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
|
|
"github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
|
|
"github.com/milvus-io/milvus/pkg/v2/streaming/walimpls"
|
|
"github.com/milvus-io/milvus/pkg/v2/streaming/walimpls/helper"
|
|
)
|
|
|
|
var _ walimpls.WALImpls = (*walImpl)(nil)
|
|
|
|
type walImpl struct {
|
|
*helper.WALHelper
|
|
p *kafka.Producer
|
|
consumerConfig kafka.ConfigMap
|
|
}
|
|
|
|
func (w *walImpl) WALName() message.WALName {
|
|
return message.WALNameKafka
|
|
}
|
|
|
|
func (w *walImpl) Append(ctx context.Context, msg message.MutableMessage) (message.MessageID, error) {
|
|
if w.Channel().AccessMode != types.AccessModeRW {
|
|
panic("write on a wal that is not in read-write mode")
|
|
}
|
|
|
|
pb := msg.IntoMessageProto()
|
|
properties := pb.Properties
|
|
headers := make([]kafka.Header, 0, len(properties))
|
|
for key, value := range properties {
|
|
header := kafka.Header{Key: key, Value: []byte(value)}
|
|
headers = append(headers, header)
|
|
}
|
|
ch := make(chan kafka.Event, 1)
|
|
topic := w.Channel().Name
|
|
|
|
if err := w.p.Produce(&kafka.Message{
|
|
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: 0},
|
|
Value: pb.Payload,
|
|
Headers: headers,
|
|
}, ch); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
case event := <-ch:
|
|
relatedMsg := event.(*kafka.Message)
|
|
if relatedMsg.TopicPartition.Error != nil {
|
|
return nil, relatedMsg.TopicPartition.Error
|
|
}
|
|
return kafkaID(relatedMsg.TopicPartition.Offset), nil
|
|
}
|
|
}
|
|
|
|
func (w *walImpl) Read(ctx context.Context, opt walimpls.ReadOption) (s walimpls.ScannerImpls, err error) {
|
|
// The scanner is stateless, so we can create a scanner with an anonymous consumer.
|
|
// and there's no commit opeartions.
|
|
consumerConfig := cloneKafkaConfig(w.consumerConfig)
|
|
consumerConfig.SetKey("group.id", opt.Name)
|
|
c, err := kafka.NewConsumer(&consumerConfig)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to create kafka consumer")
|
|
}
|
|
|
|
topic := w.Channel().Name
|
|
seekPosition := kafka.TopicPartition{
|
|
Topic: &topic,
|
|
Partition: 0,
|
|
}
|
|
var exclude *kafkaID
|
|
switch t := opt.DeliverPolicy.GetPolicy().(type) {
|
|
case *streamingpb.DeliverPolicy_All:
|
|
seekPosition.Offset = kafka.OffsetBeginning
|
|
case *streamingpb.DeliverPolicy_Latest:
|
|
seekPosition.Offset = kafka.OffsetEnd
|
|
case *streamingpb.DeliverPolicy_StartFrom:
|
|
id, err := unmarshalMessageID(t.StartFrom.GetId())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
seekPosition.Offset = kafka.Offset(id)
|
|
case *streamingpb.DeliverPolicy_StartAfter:
|
|
id, err := unmarshalMessageID(t.StartAfter.GetId())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
seekPosition.Offset = kafka.Offset(id)
|
|
exclude = &id
|
|
default:
|
|
panic("unknown deliver policy")
|
|
}
|
|
|
|
if err := c.Assign([]kafka.TopicPartition{seekPosition}); err != nil {
|
|
return nil, errors.Wrap(err, "failed to assign kafka consumer")
|
|
}
|
|
return newScanner(opt.Name, exclude, c), nil
|
|
}
|
|
|
|
func (w *walImpl) Truncate(ctx context.Context, id message.MessageID) error {
|
|
if w.Channel().AccessMode != types.AccessModeRW {
|
|
panic("truncate on a wal that is not in read-write mode")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (w *walImpl) Close() {
|
|
// The lifetime control of the producer is delegated to the wal adaptor.
|
|
// So we just make resource cleanup here.
|
|
// But kafka producer is not topic level, so we don't close it here.
|
|
}
|