Zhen Ye 5bdc593b8a
enhance: use v0.15.1 official pulsar client and add logging for pulsar client (#43913)
issue: #43785

- pulsar client will print log into milvus logger now.
- pulsar client open the metric by default.
- upgrade the pulsar client to v0.15.1, and use offical repo.
- the fixing of milvus-io/pulsar-client-go is already covered by
official v0.15.1.

Signed-off-by: chyezh <chyezh@outlook.com>
2025-08-26 16:45:53 +08:00

70 lines
2.0 KiB
Go

package pulsar
import (
"time"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus/pkg/v2/metrics"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
"github.com/milvus-io/milvus/pkg/v2/streaming/walimpls"
"github.com/milvus-io/milvus/pkg/v2/streaming/walimpls/impls/pulsar/pulsarlog"
"github.com/milvus-io/milvus/pkg/v2/streaming/walimpls/registry"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)
const (
walName = "pulsar"
)
func init() {
// register the builder to the wal registry.
registry.RegisterBuilder(&builderImpl{})
// register the unmarshaler to the message registry.
message.RegisterMessageIDUnmsarshaler(walName, UnmarshalMessageID)
}
// builderImpl is the builder for pulsar wal.
type builderImpl struct{}
// Name returns the name of the wal.
func (b *builderImpl) Name() string {
return walName
}
// Build build a wal instance.
func (b *builderImpl) Build() (walimpls.OpenerImpls, error) {
options, err := b.getPulsarClientOptions()
if err != nil {
return nil, errors.Wrapf(err, "build pulsar client options failed")
}
c, err := pulsar.NewClient(options)
if err != nil {
return nil, err
}
return &openerImpl{
c: c,
}, nil
}
// getPulsarClientOptions gets the pulsar client options from the config.
func (b *builderImpl) getPulsarClientOptions() (pulsar.ClientOptions, error) {
cfg := &paramtable.Get().PulsarCfg
auth, err := pulsar.NewAuthentication(cfg.AuthPlugin.GetValue(), cfg.AuthParams.GetValue())
if err != nil {
return pulsar.ClientOptions{}, errors.New("build authencation from config failed")
}
options := pulsar.ClientOptions{
URL: cfg.Address.GetValue(),
OperationTimeout: cfg.RequestTimeout.GetAsDuration(time.Second),
Authentication: auth,
Logger: pulsarlog.NewLogger(),
}
if cfg.EnableClientMetrics.GetAsBool() {
// Enable client metrics if config.EnableClientMetrics is true, use pkg-defined registerer.
options.MetricsRegisterer = metrics.GetRegisterer()
}
return options, nil
}