Zhen Ye 52950ce392
enhance: add pulsar truncate api to protect pulsar unconsumed message (#41724)
issue: #41465

- implement truncate api for pulsar based on durable subscription.
- truncate api can only be called if wal is read-write.

Signed-off-by: chyezh <chyezh@outlook.com>
2025-05-11 20:50:55 +08:00

65 lines
1.7 KiB
Go

package pulsar
import (
"context"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
"github.com/milvus-io/milvus/pkg/v2/streaming/walimpls"
"github.com/milvus-io/milvus/pkg/v2/streaming/walimpls/helper"
)
const (
truncateCursorSubscriptionName = "truncate-cursor"
)
var _ walimpls.OpenerImpls = (*openerImpl)(nil)
// openerImpl is the opener for pulsar wal.
type openerImpl struct {
c pulsar.Client
}
// Open opens a wal instance.
func (o *openerImpl) Open(ctx context.Context, opt *walimpls.OpenOption) (walimpls.WALImpls, error) {
if err := opt.Validate(); err != nil {
return nil, err
}
var p pulsar.Producer
if opt.Channel.AccessMode == types.AccessModeRW {
var err error
p, err = o.c.CreateProducer(pulsar.ProducerOptions{
Topic: opt.Channel.Name,
// TODO: current go pulsar client does not support fencing, we should enable it after go pulsar client supports it.
// ProducerAccessMode: pulsar.ProducerAccessModeExclusiveWithFencing,
})
if err != nil {
return nil, err
}
// Initialize a persistent cursor to protect the topic from being retention.
cursor, err := o.c.Subscribe(pulsar.ConsumerOptions{
Topic: opt.Channel.Name,
SubscriptionName: truncateCursorSubscriptionName,
Type: pulsar.Exclusive,
MaxPendingChunkedMessage: 1,
SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest,
})
if err != nil {
return nil, err
}
cursor.Close()
}
return &walImpl{
WALHelper: helper.NewWALHelper(opt),
p: p,
c: o.c,
}, nil
}
// Close closes the opener resources.
func (o *openerImpl) Close() {
o.c.Close()
}