fix: lost tenant/namespace support for pulsar since 2.6 (#46752)

issue: #46748

Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
Zhen Ye 2026-01-06 14:33:24 +08:00 committed by GitHub
parent fa2c3c404c
commit 4c6e33f326
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 49 additions and 12 deletions

View File

@ -29,10 +29,11 @@ type backlogClearHelper struct {
channelName types.PChannelInfo
c pulsar.Client
reusedConsumer pulsar.Consumer
tenant tenant
}
// newBacklogClearHelper creates a new backlog clear helper.
func newBacklogClearHelper(c pulsar.Client, channelName types.PChannelInfo, threshold int64) *backlogClearHelper {
func newBacklogClearHelper(c pulsar.Client, channelName types.PChannelInfo, threshold int64, tenant tenant) *backlogClearHelper {
h := &backlogClearHelper{
notifier: syncutil.NewAsyncTaskNotifier[struct{}](),
cond: syncutil.NewContextCond(&sync.Mutex{}),
@ -41,6 +42,7 @@ func newBacklogClearHelper(c pulsar.Client, channelName types.PChannelInfo, thre
channelName: channelName,
c: c,
reusedConsumer: nil,
tenant: tenant,
}
h.SetLogger(log.With(zap.String("channel", channelName.String()), log.FieldComponent("backlog-clear")))
go h.background()
@ -110,8 +112,9 @@ func (h *backlogClearHelper) getConsumer() (pulsar.Consumer, error) {
if h.reusedConsumer != nil {
return h.reusedConsumer, nil
}
topic := h.tenant.MustGetFullTopicName(h.channelName.Name)
consumer, err := h.c.Subscribe(pulsar.ConsumerOptions{
Topic: h.channelName.Name,
Topic: topic,
SubscriptionName: backlogClearHelperName,
Type: pulsar.Shared, // use shared subscription to avoid the subscription is rejected because of consumer exists.
MaxPendingChunkedMessage: 1, // We cannot set it to 0, because the 0 means 100.

View File

@ -31,7 +31,7 @@ func (b *builderImpl) Name() message.WALName {
// Build build a wal instance.
func (b *builderImpl) Build() (walimpls.OpenerImpls, error) {
options, err := b.getPulsarClientOptions()
options, tenant, err := b.getPulsarClientOptions()
if err != nil {
return nil, errors.Wrapf(err, "build pulsar client options failed")
}
@ -40,16 +40,17 @@ func (b *builderImpl) Build() (walimpls.OpenerImpls, error) {
return nil, err
}
return &openerImpl{
c: c,
tenant: tenant,
c: c,
}, nil
}
// getPulsarClientOptions gets the pulsar client options from the config.
func (b *builderImpl) getPulsarClientOptions() (pulsar.ClientOptions, error) {
func (b *builderImpl) getPulsarClientOptions() (pulsar.ClientOptions, tenant, error) {
cfg := &paramtable.Get().PulsarCfg
auth, err := pulsar.NewAuthentication(cfg.AuthPlugin.GetValue(), cfg.AuthParams.GetValue())
if err != nil {
return pulsar.ClientOptions{}, errors.New("build authencation from config failed")
return pulsar.ClientOptions{}, tenant{}, errors.New("build authencation from config failed")
}
options := pulsar.ClientOptions{
URL: cfg.Address.GetValue(),
@ -61,5 +62,8 @@ func (b *builderImpl) getPulsarClientOptions() (pulsar.ClientOptions, error) {
// Enable client metrics if config.EnableClientMetrics is true, use pkg-defined registerer.
options.MetricsRegisterer = metrics.GetRegisterer()
}
return options, nil
return options, tenant{
namespace: cfg.Namespace.GetValue(),
tenant: cfg.Tenant.GetValue(),
}, nil
}

View File

@ -2,6 +2,7 @@ package pulsar
import (
"context"
"fmt"
"github.com/apache/pulsar-client-go/pulsar"
@ -19,9 +20,25 @@ const (
var _ walimpls.OpenerImpls = (*openerImpl)(nil)
// tenant is the tenant of pulsar.
type tenant struct {
tenant string
namespace string
}
// MustGetFullTopicName gets the full topic name of pulsar.
// If the tenant or namespace or topic is empty, it will panic.
func (t tenant) MustGetFullTopicName(topic string) string {
if len(t.tenant) == 0 || len(t.namespace) == 0 || len(topic) == 0 {
panic("tenant or namespace or topic is empty")
}
return fmt.Sprintf("%s/%s/%s", t.tenant, t.namespace, topic)
}
// openerImpl is the opener for pulsar wal.
type openerImpl struct {
c pulsar.Client
tenant tenant
c pulsar.Client
}
// Open opens a wal instance.
@ -36,7 +53,7 @@ func (o *openerImpl) Open(ctx context.Context, opt *walimpls.OpenOption) (walimp
if backlogAutoClearBytes <= 0 {
backlogAutoClearBytes = defaultBacklogSize
}
backlogClearHelper = newBacklogClearHelper(o.c, opt.Channel, backlogAutoClearBytes)
backlogClearHelper = newBacklogClearHelper(o.c, opt.Channel, backlogAutoClearBytes, o.tenant)
}
w := &walImpl{
WALHelper: helper.NewWALHelper(opt),
@ -44,6 +61,7 @@ func (o *openerImpl) Open(ctx context.Context, opt *walimpls.OpenOption) (walimp
p: syncutil.NewFuture[pulsar.Producer](),
notifier: syncutil.NewAsyncTaskNotifier[struct{}](),
backlogClearHelper: backlogClearHelper,
tenant: o.tenant,
}
// because the producer of pulsar cannot be created if the topic is backlog exceeded,
// so we need to set the producer at background with backoff retry.

View File

@ -17,6 +17,14 @@ func TestMain(m *testing.M) {
m.Run()
}
func TestTenant(t *testing.T) {
tenant := tenant{
tenant: "milvus",
namespace: "aaa",
}
assert.Equal(t, "milvus/aaa/test", tenant.MustGetFullTopicName("test"))
}
func TestRegistry(t *testing.T) {
registeredB := registry.MustGetBuilder(message.WALNamePulsar)
assert.NotNil(t, registeredB)

View File

@ -25,6 +25,7 @@ type walImpl struct {
p *syncutil.Future[pulsar.Producer]
notifier *syncutil.AsyncTaskNotifier[struct{}]
backlogClearHelper *backlogClearHelper
tenant tenant
}
// initProducerAtBackground initializes the producer at background.
@ -56,8 +57,9 @@ func (w *walImpl) initProducerAtBackground() {
// initProducer initializes the producer.
func (w *walImpl) initProducer() error {
topic := w.tenant.MustGetFullTopicName(w.Channel().Name)
p, err := w.c.CreateProducer(pulsar.ProducerOptions{
Topic: w.Channel().Name,
Topic: topic,
// TODO: current go pulsar client does not support fencing, we should enable it after go pulsar client supports it.
// ProducerAccessMode: pulsar.ProducerAccessModeExclusiveWithFencing,
})
@ -98,9 +100,10 @@ func (w *walImpl) Append(ctx context.Context, msg message.MutableMessage) (messa
}
func (w *walImpl) Read(ctx context.Context, opt walimpls.ReadOption) (s walimpls.ScannerImpls, err error) {
topic := w.tenant.MustGetFullTopicName(w.Channel().Name)
ch := make(chan pulsar.ReaderMessage, 1)
readerOpt := pulsar.ReaderOptions{
Topic: w.Channel().Name,
Topic: topic,
Name: opt.Name,
MessageChannel: ch,
ReceiverQueueSize: opt.ReadAheadBufferSize,
@ -141,8 +144,9 @@ func (w *walImpl) Truncate(ctx context.Context, id message.MessageID) error {
// The backlogClearHelper is always non-nil currently, so we can determine the truncate position
return nil
}
topic := w.tenant.MustGetFullTopicName(w.Channel().Name)
cursor, err := w.c.Subscribe(pulsar.ConsumerOptions{
Topic: w.Channel().Name,
Topic: topic,
SubscriptionName: truncateCursorSubscriptionName,
Type: pulsar.Exclusive,
MaxPendingChunkedMessage: 1, // We cannot set it to 0, because the 0 means 100.