diff --git a/pkg/streaming/walimpls/impls/pulsar/backlog_clear_helper.go b/pkg/streaming/walimpls/impls/pulsar/backlog_clear_helper.go index 4024d19482..298042bf0d 100644 --- a/pkg/streaming/walimpls/impls/pulsar/backlog_clear_helper.go +++ b/pkg/streaming/walimpls/impls/pulsar/backlog_clear_helper.go @@ -29,10 +29,11 @@ type backlogClearHelper struct { channelName types.PChannelInfo c pulsar.Client reusedConsumer pulsar.Consumer + tenant tenant } // newBacklogClearHelper creates a new backlog clear helper. -func newBacklogClearHelper(c pulsar.Client, channelName types.PChannelInfo, threshold int64) *backlogClearHelper { +func newBacklogClearHelper(c pulsar.Client, channelName types.PChannelInfo, threshold int64, tenant tenant) *backlogClearHelper { h := &backlogClearHelper{ notifier: syncutil.NewAsyncTaskNotifier[struct{}](), cond: syncutil.NewContextCond(&sync.Mutex{}), @@ -41,6 +42,7 @@ func newBacklogClearHelper(c pulsar.Client, channelName types.PChannelInfo, thre channelName: channelName, c: c, reusedConsumer: nil, + tenant: tenant, } h.SetLogger(log.With(zap.String("channel", channelName.String()), log.FieldComponent("backlog-clear"))) go h.background() @@ -110,8 +112,9 @@ func (h *backlogClearHelper) getConsumer() (pulsar.Consumer, error) { if h.reusedConsumer != nil { return h.reusedConsumer, nil } + topic := h.tenant.MustGetFullTopicName(h.channelName.Name) consumer, err := h.c.Subscribe(pulsar.ConsumerOptions{ - Topic: h.channelName.Name, + Topic: topic, SubscriptionName: backlogClearHelperName, Type: pulsar.Shared, // use shared subscription to avoid the subscription is rejected because of consumer exists. MaxPendingChunkedMessage: 1, // We cannot set it to 0, because the 0 means 100. diff --git a/pkg/streaming/walimpls/impls/pulsar/builder.go b/pkg/streaming/walimpls/impls/pulsar/builder.go index 1b3d9e6646..1d7091939b 100644 --- a/pkg/streaming/walimpls/impls/pulsar/builder.go +++ b/pkg/streaming/walimpls/impls/pulsar/builder.go @@ -31,7 +31,7 @@ func (b *builderImpl) Name() message.WALName { // Build build a wal instance. func (b *builderImpl) Build() (walimpls.OpenerImpls, error) { - options, err := b.getPulsarClientOptions() + options, tenant, err := b.getPulsarClientOptions() if err != nil { return nil, errors.Wrapf(err, "build pulsar client options failed") } @@ -40,16 +40,17 @@ func (b *builderImpl) Build() (walimpls.OpenerImpls, error) { return nil, err } return &openerImpl{ - c: c, + tenant: tenant, + c: c, }, nil } // getPulsarClientOptions gets the pulsar client options from the config. -func (b *builderImpl) getPulsarClientOptions() (pulsar.ClientOptions, error) { +func (b *builderImpl) getPulsarClientOptions() (pulsar.ClientOptions, tenant, error) { cfg := ¶mtable.Get().PulsarCfg auth, err := pulsar.NewAuthentication(cfg.AuthPlugin.GetValue(), cfg.AuthParams.GetValue()) if err != nil { - return pulsar.ClientOptions{}, errors.New("build authencation from config failed") + return pulsar.ClientOptions{}, tenant{}, errors.New("build authencation from config failed") } options := pulsar.ClientOptions{ URL: cfg.Address.GetValue(), @@ -61,5 +62,8 @@ func (b *builderImpl) getPulsarClientOptions() (pulsar.ClientOptions, error) { // Enable client metrics if config.EnableClientMetrics is true, use pkg-defined registerer. options.MetricsRegisterer = metrics.GetRegisterer() } - return options, nil + return options, tenant{ + namespace: cfg.Namespace.GetValue(), + tenant: cfg.Tenant.GetValue(), + }, nil } diff --git a/pkg/streaming/walimpls/impls/pulsar/opener.go b/pkg/streaming/walimpls/impls/pulsar/opener.go index 2e75204cd4..ef4ba3df92 100644 --- a/pkg/streaming/walimpls/impls/pulsar/opener.go +++ b/pkg/streaming/walimpls/impls/pulsar/opener.go @@ -2,6 +2,7 @@ package pulsar import ( "context" + "fmt" "github.com/apache/pulsar-client-go/pulsar" @@ -19,9 +20,25 @@ const ( var _ walimpls.OpenerImpls = (*openerImpl)(nil) +// tenant is the tenant of pulsar. +type tenant struct { + tenant string + namespace string +} + +// MustGetFullTopicName gets the full topic name of pulsar. +// If the tenant or namespace or topic is empty, it will panic. +func (t tenant) MustGetFullTopicName(topic string) string { + if len(t.tenant) == 0 || len(t.namespace) == 0 || len(topic) == 0 { + panic("tenant or namespace or topic is empty") + } + return fmt.Sprintf("%s/%s/%s", t.tenant, t.namespace, topic) +} + // openerImpl is the opener for pulsar wal. type openerImpl struct { - c pulsar.Client + tenant tenant + c pulsar.Client } // Open opens a wal instance. @@ -36,7 +53,7 @@ func (o *openerImpl) Open(ctx context.Context, opt *walimpls.OpenOption) (walimp if backlogAutoClearBytes <= 0 { backlogAutoClearBytes = defaultBacklogSize } - backlogClearHelper = newBacklogClearHelper(o.c, opt.Channel, backlogAutoClearBytes) + backlogClearHelper = newBacklogClearHelper(o.c, opt.Channel, backlogAutoClearBytes, o.tenant) } w := &walImpl{ WALHelper: helper.NewWALHelper(opt), @@ -44,6 +61,7 @@ func (o *openerImpl) Open(ctx context.Context, opt *walimpls.OpenOption) (walimp p: syncutil.NewFuture[pulsar.Producer](), notifier: syncutil.NewAsyncTaskNotifier[struct{}](), backlogClearHelper: backlogClearHelper, + tenant: o.tenant, } // because the producer of pulsar cannot be created if the topic is backlog exceeded, // so we need to set the producer at background with backoff retry. diff --git a/pkg/streaming/walimpls/impls/pulsar/pulsar_test.go b/pkg/streaming/walimpls/impls/pulsar/pulsar_test.go index f06d30d37e..6982657640 100644 --- a/pkg/streaming/walimpls/impls/pulsar/pulsar_test.go +++ b/pkg/streaming/walimpls/impls/pulsar/pulsar_test.go @@ -17,6 +17,14 @@ func TestMain(m *testing.M) { m.Run() } +func TestTenant(t *testing.T) { + tenant := tenant{ + tenant: "milvus", + namespace: "aaa", + } + assert.Equal(t, "milvus/aaa/test", tenant.MustGetFullTopicName("test")) +} + func TestRegistry(t *testing.T) { registeredB := registry.MustGetBuilder(message.WALNamePulsar) assert.NotNil(t, registeredB) diff --git a/pkg/streaming/walimpls/impls/pulsar/wal.go b/pkg/streaming/walimpls/impls/pulsar/wal.go index 769cebf29b..1e9a077c54 100644 --- a/pkg/streaming/walimpls/impls/pulsar/wal.go +++ b/pkg/streaming/walimpls/impls/pulsar/wal.go @@ -25,6 +25,7 @@ type walImpl struct { p *syncutil.Future[pulsar.Producer] notifier *syncutil.AsyncTaskNotifier[struct{}] backlogClearHelper *backlogClearHelper + tenant tenant } // initProducerAtBackground initializes the producer at background. @@ -56,8 +57,9 @@ func (w *walImpl) initProducerAtBackground() { // initProducer initializes the producer. func (w *walImpl) initProducer() error { + topic := w.tenant.MustGetFullTopicName(w.Channel().Name) p, err := w.c.CreateProducer(pulsar.ProducerOptions{ - Topic: w.Channel().Name, + Topic: topic, // TODO: current go pulsar client does not support fencing, we should enable it after go pulsar client supports it. // ProducerAccessMode: pulsar.ProducerAccessModeExclusiveWithFencing, }) @@ -98,9 +100,10 @@ func (w *walImpl) Append(ctx context.Context, msg message.MutableMessage) (messa } func (w *walImpl) Read(ctx context.Context, opt walimpls.ReadOption) (s walimpls.ScannerImpls, err error) { + topic := w.tenant.MustGetFullTopicName(w.Channel().Name) ch := make(chan pulsar.ReaderMessage, 1) readerOpt := pulsar.ReaderOptions{ - Topic: w.Channel().Name, + Topic: topic, Name: opt.Name, MessageChannel: ch, ReceiverQueueSize: opt.ReadAheadBufferSize, @@ -141,8 +144,9 @@ func (w *walImpl) Truncate(ctx context.Context, id message.MessageID) error { // The backlogClearHelper is always non-nil currently, so we can determine the truncate position return nil } + topic := w.tenant.MustGetFullTopicName(w.Channel().Name) cursor, err := w.c.Subscribe(pulsar.ConsumerOptions{ - Topic: w.Channel().Name, + Topic: topic, SubscriptionName: truncateCursorSubscriptionName, Type: pulsar.Exclusive, MaxPendingChunkedMessage: 1, // We cannot set it to 0, because the 0 means 100.