diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 34e3e68da8..d775615589 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -102,6 +102,8 @@ pulsar: port: 6650 # Port of pulsar webport: 80 # Web port of pulsar, if you connect direcly without proxy, should use 8080 maxMessageSize: 5242880 # 5 * 1024 * 1024 Bytes, Maximum size of each message in pulsar. + tenant: public + namespace: default # If you want to enable kafka, needs to comment the pulsar configs kafka: diff --git a/internal/mq/msgstream/mq_factory.go b/internal/mq/msgstream/mq_factory.go index 61233c534c..bbcf93ad27 100644 --- a/internal/mq/msgstream/mq_factory.go +++ b/internal/mq/msgstream/mq_factory.go @@ -18,6 +18,7 @@ package msgstream import ( "context" + "errors" "strings" "github.com/apache/pulsar-client-go/pulsar" @@ -25,14 +26,12 @@ import ( rmqimplserver "github.com/milvus-io/milvus/internal/mq/mqimpl/rocksmq/server" "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper" kafkawrapper "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper/kafka" - puslarmqwrapper "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper/pulsar" + pulsarmqwrapper "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper/pulsar" rmqwrapper "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper/rmq" "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/retry" "github.com/streamnative/pulsarctl/pkg/cli" - "github.com/streamnative/pulsarctl/pkg/cmdutils" "github.com/streamnative/pulsarctl/pkg/pulsar/utils" - "go.uber.org/zap" ) @@ -44,6 +43,10 @@ type PmsFactory struct { PulsarWebAddress string ReceiveBufSize int64 PulsarBufSize int64 + PulsarAuthPlugin string + PulsarAuthParams string + PulsarTenant string + PulsarNameSpace string } func NewPmsFactory(config *paramtable.PulsarConfig) *PmsFactory { @@ -52,12 +55,25 @@ func NewPmsFactory(config *paramtable.PulsarConfig) *PmsFactory { ReceiveBufSize: 1024, PulsarAddress: config.Address, PulsarWebAddress: config.WebAddress, + PulsarAuthPlugin: config.AuthPlugin, + PulsarAuthParams: config.AuthParams, + PulsarTenant: config.Tenant, + PulsarNameSpace: config.Namespace, } } // NewMsgStream is used to generate a new Msgstream object func (f *PmsFactory) NewMsgStream(ctx context.Context) (MsgStream, error) { - pulsarClient, err := puslarmqwrapper.NewClient(pulsar.ClientOptions{URL: f.PulsarAddress}) + auth, err := f.getAuthentication() + if err != nil { + return nil, err + } + clientOpts := pulsar.ClientOptions{ + URL: f.PulsarAddress, + Authentication: auth, + } + + pulsarClient, err := pulsarmqwrapper.NewClient(f.PulsarTenant, f.PulsarNameSpace, clientOpts) if err != nil { return nil, err } @@ -66,13 +82,34 @@ func (f *PmsFactory) NewMsgStream(ctx context.Context) (MsgStream, error) { // NewTtMsgStream is used to generate a new TtMsgstream object func (f *PmsFactory) NewTtMsgStream(ctx context.Context) (MsgStream, error) { - pulsarClient, err := puslarmqwrapper.NewClient(pulsar.ClientOptions{URL: f.PulsarAddress}) + auth, err := f.getAuthentication() + if err != nil { + return nil, err + } + clientOpts := pulsar.ClientOptions{ + URL: f.PulsarAddress, + Authentication: auth, + } + + pulsarClient, err := pulsarmqwrapper.NewClient(f.PulsarTenant, f.PulsarNameSpace, clientOpts) if err != nil { return nil, err } return NewMqTtMsgStream(ctx, f.ReceiveBufSize, f.PulsarBufSize, pulsarClient, f.dispatcherFactory.NewUnmarshalDispatcher()) } +func (f *PmsFactory) getAuthentication() (pulsar.Authentication, error) { + auth, err := pulsar.NewAuthentication(f.PulsarAuthPlugin, f.PulsarAuthParams) + + if err != nil { + log.Error("build authencation from config failed, please check it!", + zap.String("authPlugin", f.PulsarAuthPlugin), + zap.Error(err)) + return nil, errors.New("build authencation from config failed") + } + return auth, nil +} + // NewQueryMsgStream is used to generate a new QueryMsgstream object func (f *PmsFactory) NewQueryMsgStream(ctx context.Context) (MsgStream, error) { return f.NewMsgStream(ctx) @@ -81,9 +118,16 @@ func (f *PmsFactory) NewQueryMsgStream(ctx context.Context) (MsgStream, error) { func (f *PmsFactory) NewMsgStreamDisposer(ctx context.Context) func([]string, string) error { return func(channels []string, subname string) error { // try to delete the old subscription - admin := cmdutils.NewPulsarClient() + admin, err := pulsarmqwrapper.NewAdminClient(f.PulsarWebAddress, f.PulsarAuthPlugin, f.PulsarAuthParams) + if err != nil { + return err + } for _, channel := range channels { - topic, err := utils.GetTopicName(channel) + fullTopicName, err := pulsarmqwrapper.GetFullTopicName(f.PulsarTenant, f.PulsarNameSpace, channel) + if err != nil { + return err + } + topic, err := utils.GetTopicName(fullTopicName) if err != nil { log.Warn("failed to get topic name", zap.Error(err)) return retry.Unrecoverable(err) diff --git a/internal/mq/msgstream/mq_factory_test.go b/internal/mq/msgstream/mq_factory_test.go index 5c3dfd9c13..8dfa7f3b95 100644 --- a/internal/mq/msgstream/mq_factory_test.go +++ b/internal/mq/msgstream/mq_factory_test.go @@ -21,6 +21,7 @@ import ( "os" "testing" + "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/stretchr/testify/assert" ) @@ -36,6 +37,45 @@ func TestPmsFactory(t *testing.T) { _, err = pmsFactory.NewQueryMsgStream(ctx) assert.Nil(t, err) + + err = pmsFactory.NewMsgStreamDisposer(ctx)([]string{"hello"}, "xx") + assert.Nil(t, err) +} + +func TestPmsFactoryWithAuth(t *testing.T) { + config := ¶mtable.PulsarConfig{ + Address: Params.PulsarCfg.Address, + WebAddress: Params.PulsarCfg.WebAddress, + MaxMessageSize: Params.PulsarCfg.MaxMessageSize, + AuthPlugin: "token", + AuthParams: "{\"token\":\"fake_token\"}", + } + + pmsFactory := NewPmsFactory(config) + + ctx := context.Background() + _, err := pmsFactory.NewMsgStream(ctx) + assert.Nil(t, err) + + _, err = pmsFactory.NewTtMsgStream(ctx) + assert.Nil(t, err) + + _, err = pmsFactory.NewQueryMsgStream(ctx) + assert.Nil(t, err) + + config.AuthParams = "" + pmsFactory = NewPmsFactory(config) + + ctx = context.Background() + _, err = pmsFactory.NewMsgStream(ctx) + assert.Error(t, err) + + _, err = pmsFactory.NewTtMsgStream(ctx) + assert.Error(t, err) + + _, err = pmsFactory.NewQueryMsgStream(ctx) + assert.Error(t, err) + } func TestRmsFactory(t *testing.T) { @@ -54,6 +94,9 @@ func TestRmsFactory(t *testing.T) { _, err = rmsFactory.NewQueryMsgStream(ctx) assert.Nil(t, err) + + err = rmsFactory.NewMsgStreamDisposer(ctx)([]string{"hello"}, "xx") + assert.Nil(t, err) } func TestKafkaFactory(t *testing.T) { @@ -68,4 +111,7 @@ func TestKafkaFactory(t *testing.T) { _, err = kmsFactory.NewQueryMsgStream(ctx) assert.Nil(t, err) + + // err = kmsFactory.NewMsgStreamDisposer(ctx)([]string{"hello"}, "xx") + // assert.Nil(t, err) } diff --git a/internal/mq/msgstream/mq_msgstream_test.go b/internal/mq/msgstream/mq_msgstream_test.go index af4dae01d5..177231a58d 100644 --- a/internal/mq/msgstream/mq_msgstream_test.go +++ b/internal/mq/msgstream/mq_msgstream_test.go @@ -51,6 +51,11 @@ import ( "github.com/milvus-io/milvus/internal/util/paramtable" ) +const ( + DefaultPulsarTenant = "public" + DefaultPulsarNamespace = "default" +) + var Params paramtable.ComponentParam func TestMain(m *testing.M) { @@ -94,7 +99,7 @@ type parameters struct { func (f *fixture) setup() []parameters { pulsarAddress := getPulsarAddress() - pulsarClient, err := pulsarwrapper.NewClient(pulsar.ClientOptions{URL: pulsarAddress}) + pulsarClient, err := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) assert.Nil(f.t, err) rocksdbName := "/tmp/rocksmq_unittest_" + f.t.Name() @@ -544,12 +549,12 @@ func TestStream_PulsarMsgStream_InsertRepackFunc(t *testing.T) { factory := ProtoUDFactory{} ctx := context.Background() - pulsarClient, _ := pulsarwrapper.NewClient(pulsar.ClientOptions{URL: pulsarAddress}) + pulsarClient, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) inputStream, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) inputStream.AsProducer(producerChannels) inputStream.Start() - pulsarClient2, _ := pulsarwrapper.NewClient(pulsar.ClientOptions{URL: pulsarAddress}) + pulsarClient2, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) outputStream, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient2, factory.NewUnmarshalDispatcher()) outputStream.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionEarliest) outputStream.Start() @@ -599,12 +604,12 @@ func TestStream_PulsarMsgStream_DeleteRepackFunc(t *testing.T) { factory := ProtoUDFactory{} ctx := context.Background() - pulsarClient, _ := pulsarwrapper.NewClient(pulsar.ClientOptions{URL: pulsarAddress}) + pulsarClient, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) inputStream, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) inputStream.AsProducer(producerChannels) inputStream.Start() - pulsarClient2, _ := pulsarwrapper.NewClient(pulsar.ClientOptions{URL: pulsarAddress}) + pulsarClient2, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) outputStream, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient2, factory.NewUnmarshalDispatcher()) outputStream.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionEarliest) outputStream.Start() @@ -633,12 +638,12 @@ func TestStream_PulsarMsgStream_DefaultRepackFunc(t *testing.T) { factory := ProtoUDFactory{} ctx := context.Background() - pulsarClient, _ := pulsarwrapper.NewClient(pulsar.ClientOptions{URL: pulsarAddress}) + pulsarClient, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) inputStream, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) inputStream.AsProducer(producerChannels) inputStream.Start() - pulsarClient2, _ := pulsarwrapper.NewClient(pulsar.ClientOptions{URL: pulsarAddress}) + pulsarClient2, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) outputStream, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient2, factory.NewUnmarshalDispatcher()) outputStream.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionEarliest) outputStream.Start() @@ -788,7 +793,7 @@ func TestStream_PulsarMsgStream_SeekToLast(t *testing.T) { // create a consumer can consume data from seek position to last msg factory := ProtoUDFactory{} - pulsarClient, _ := pulsarwrapper.NewClient(pulsar.ClientOptions{URL: pulsarAddress}) + pulsarClient, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) outputStream2, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) outputStream2.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionEarliest) lastMsgID, err := outputStream2.GetLatestMsgID(c) @@ -1194,7 +1199,7 @@ func TestStream_MqMsgStream_Seek(t *testing.T) { outputStream.Close() factory := ProtoUDFactory{} - pulsarClient, _ := pulsarwrapper.NewClient(pulsar.ClientOptions{URL: pulsarAddress}) + pulsarClient, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) outputStream2, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) outputStream2.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionEarliest) outputStream2.Seek([]*internalpb.MsgPosition{seekPosition}) @@ -1237,7 +1242,7 @@ func TestStream_MqMsgStream_SeekInvalidMessage(t *testing.T) { } factory := ProtoUDFactory{} - pulsarClient, _ := pulsarwrapper.NewClient(pulsar.ClientOptions{URL: pulsarAddress}) + pulsarClient, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) outputStream2, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) outputStream2.AsConsumer(consumerChannels, funcutil.RandomString(8), mqwrapper.SubscriptionPositionEarliest) defer outputStream2.Close() @@ -1348,7 +1353,7 @@ func TestStream_MqMsgStream_SeekLatest(t *testing.T) { err := inputStream.Produce(msgPack) assert.Nil(t, err) factory := ProtoUDFactory{} - pulsarClient, _ := pulsarwrapper.NewClient(pulsar.ClientOptions{URL: pulsarAddress}) + pulsarClient, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) outputStream2, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) outputStream2.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionLatest) outputStream2.Start() @@ -1689,7 +1694,7 @@ func TestStream_BroadcastMark(t *testing.T) { producerChannels := []string{c1, c2} factory := ProtoUDFactory{} - pulsarClient, err := pulsarwrapper.NewClient(pulsar.ClientOptions{URL: pulsarAddress}) + pulsarClient, err := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) assert.Nil(t, err) outputStream, err := NewMqMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) assert.Nil(t, err) @@ -1751,7 +1756,7 @@ func TestStream_ProduceMark(t *testing.T) { producerChannels := []string{c1, c2} factory := ProtoUDFactory{} - pulsarClient, err := pulsarwrapper.NewClient(pulsar.ClientOptions{URL: pulsarAddress}) + pulsarClient, err := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) assert.Nil(t, err) outputStream, err := NewMqMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) assert.Nil(t, err) @@ -2043,7 +2048,7 @@ func getTimeTickMsgPack(reqID UniqueID) *MsgPack { func getPulsarInputStream(ctx context.Context, pulsarAddress string, producerChannels []string, opts ...RepackFunc) MsgStream { factory := ProtoUDFactory{} - pulsarClient, _ := pulsarwrapper.NewClient(pulsar.ClientOptions{URL: pulsarAddress}) + pulsarClient, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) inputStream, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) inputStream.AsProducer(producerChannels) for _, opt := range opts { @@ -2055,7 +2060,7 @@ func getPulsarInputStream(ctx context.Context, pulsarAddress string, producerCha func getPulsarOutputStream(ctx context.Context, pulsarAddress string, consumerChannels []string, consumerSubName string) MsgStream { factory := ProtoUDFactory{} - pulsarClient, _ := pulsarwrapper.NewClient(pulsar.ClientOptions{URL: pulsarAddress}) + pulsarClient, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) outputStream, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) outputStream.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionEarliest) outputStream.Start() @@ -2064,7 +2069,7 @@ func getPulsarOutputStream(ctx context.Context, pulsarAddress string, consumerCh func getPulsarTtOutputStream(ctx context.Context, pulsarAddress string, consumerChannels []string, consumerSubName string) MsgStream { factory := ProtoUDFactory{} - pulsarClient, _ := pulsarwrapper.NewClient(pulsar.ClientOptions{URL: pulsarAddress}) + pulsarClient, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) outputStream, _ := NewMqTtMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) outputStream.AsConsumer(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionEarliest) outputStream.Start() @@ -2073,7 +2078,7 @@ func getPulsarTtOutputStream(ctx context.Context, pulsarAddress string, consumer func getPulsarTtOutputStreamAndSeek(ctx context.Context, pulsarAddress string, positions []*MsgPosition) MsgStream { factory := ProtoUDFactory{} - pulsarClient, _ := pulsarwrapper.NewClient(pulsar.ClientOptions{URL: pulsarAddress}) + pulsarClient, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) outputStream, _ := NewMqTtMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) consumerName := []string{} for _, c := range positions { diff --git a/internal/mq/msgstream/mqwrapper/pulsar/pulsar_client.go b/internal/mq/msgstream/mqwrapper/pulsar/pulsar_client.go index b4aeb7b7b1..d4522c0374 100644 --- a/internal/mq/msgstream/mqwrapper/pulsar/pulsar_client.go +++ b/internal/mq/msgstream/mqwrapper/pulsar/pulsar_client.go @@ -18,6 +18,7 @@ package pulsar import ( "errors" + "fmt" "strings" "sync" "time" @@ -26,11 +27,15 @@ import ( "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper" "github.com/milvus-io/milvus/internal/util/retry" + pulsarctl "github.com/streamnative/pulsarctl/pkg/pulsar" + "github.com/streamnative/pulsarctl/pkg/pulsar/common" "go.uber.org/zap" ) type pulsarClient struct { - client pulsar.Client + tenant string + namespace string + client pulsar.Client } var sc *pulsarClient @@ -38,14 +43,18 @@ var once sync.Once // NewClient creates a pulsarClient object // according to the parameter opts of type pulsar.ClientOptions -func NewClient(opts pulsar.ClientOptions) (*pulsarClient, error) { +func NewClient(tenant string, namespace string, opts pulsar.ClientOptions) (*pulsarClient, error) { once.Do(func() { c, err := pulsar.NewClient(opts) if err != nil { log.Error("Failed to set pulsar client: ", zap.Error(err)) return } - cli := &pulsarClient{client: c} + cli := &pulsarClient{ + client: c, + tenant: tenant, + namespace: namespace, + } sc = cli }) return sc, nil @@ -53,7 +62,11 @@ func NewClient(opts pulsar.ClientOptions) (*pulsarClient, error) { // CreateProducer create a pulsar producer from options func (pc *pulsarClient) CreateProducer(options mqwrapper.ProducerOptions) (mqwrapper.Producer, error) { - opts := pulsar.ProducerOptions{Topic: options.Topic} + fullTopicName, err := GetFullTopicName(pc.tenant, pc.namespace, options.Topic) + if err != nil { + return nil, err + } + opts := pulsar.ProducerOptions{Topic: fullTopicName} if options.EnableCompression { opts.CompressionType = pulsar.ZSTD opts.CompressionLevel = pulsar.Faster @@ -77,8 +90,12 @@ func (pc *pulsarClient) CreateProducer(options mqwrapper.ProducerOptions) (mqwra // Subscribe creates a pulsar consumer instance and subscribe a topic func (pc *pulsarClient) Subscribe(options mqwrapper.ConsumerOptions) (mqwrapper.Consumer, error) { receiveChannel := make(chan pulsar.ConsumerMessage, options.BufSize) + fullTopicName, err := GetFullTopicName(pc.tenant, pc.namespace, options.Topic) + if err != nil { + return nil, err + } consumer, err := pc.client.Subscribe(pulsar.ConsumerOptions{ - Topic: options.Topic, + Topic: fullTopicName, SubscriptionName: options.SubscriptionName, Type: pulsar.Exclusive, SubscriptionInitialPosition: pulsar.SubscriptionInitialPosition(options.SubscriptionInitialPosition), @@ -100,6 +117,32 @@ func (pc *pulsarClient) Subscribe(options mqwrapper.ConsumerOptions) (mqwrapper. return pConsumer, nil } +func GetFullTopicName(tenant string, namespace string, topic string) (string, error) { + if len(tenant) == 0 || len(namespace) == 0 || len(topic) == 0 { + log.Error("build full topic name failed", + zap.String("tenant", tenant), + zap.String("namesapce", namespace), + zap.String("topic", topic)) + return "", errors.New("build full topic name failed") + } + + return fmt.Sprintf("%s/%s/%s", tenant, namespace, topic), nil +} + +func NewAdminClient(address, authPlugin, authParams string) (pulsarctl.Client, error) { + config := common.Config{ + WebServiceURL: address, + AuthPlugin: authPlugin, + AuthParams: authParams, + } + admin, err := pulsarctl.New(&config) + if err != nil { + return nil, fmt.Errorf("failed to build pulsar admin client due to %s", err.Error()) + } + + return admin, nil +} + // EarliestMessageID returns the earliest message id func (pc *pulsarClient) EarliestMessageID() mqwrapper.MessageID { msgID := pulsar.EarliestMessageID() diff --git a/internal/mq/msgstream/mqwrapper/pulsar/pulsar_client_test.go b/internal/mq/msgstream/mqwrapper/pulsar/pulsar_client_test.go index 6d76c1ca23..dc458f03e0 100644 --- a/internal/mq/msgstream/mqwrapper/pulsar/pulsar_client_test.go +++ b/internal/mq/msgstream/mqwrapper/pulsar/pulsar_client_test.go @@ -34,12 +34,16 @@ import ( "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper" "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/retry" - "github.com/streamnative/pulsarctl/pkg/cmdutils" "github.com/streamnative/pulsarctl/pkg/pulsar/utils" "github.com/stretchr/testify/assert" "go.uber.org/zap" ) +const ( + DefaultPulsarTenant = "public" + DefaultPulsarNamespace = "default" +) + var Params paramtable.BaseTable func TestMain(m *testing.M) { @@ -204,7 +208,7 @@ func Consume3(ctx context.Context, t *testing.T, pc *pulsarClient, topic string, func TestPulsarClient_Consume1(t *testing.T) { pulsarAddress := getPulsarAddress() - pc, err := NewClient(pulsar.ClientOptions{URL: pulsarAddress}) + pc, err := NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) defer pc.Close() assert.NoError(t, err) assert.NotNil(t, pc) @@ -355,7 +359,7 @@ func Consume23(ctx context.Context, t *testing.T, pc *pulsarClient, topic string func TestPulsarClient_Consume2(t *testing.T) { pulsarAddress := getPulsarAddress() - pc, err := NewClient(pulsar.ClientOptions{URL: pulsarAddress}) + pc, err := NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) defer pc.Close() assert.NoError(t, err) assert.NotNil(t, pc) @@ -405,7 +409,7 @@ func TestPulsarClient_Consume2(t *testing.T) { func TestPulsarClient_SeekPosition(t *testing.T) { pulsarAddress := getPulsarAddress() - pc, err := NewClient(pulsar.ClientOptions{URL: pulsarAddress}) + pc, err := NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) defer pc.Close() assert.NoError(t, err) assert.NotNil(t, pc) @@ -478,7 +482,7 @@ func TestPulsarClient_SeekPosition(t *testing.T) { func TestPulsarClient_SeekLatest(t *testing.T) { pulsarAddress := getPulsarAddress() - pc, err := NewClient(pulsar.ClientOptions{URL: pulsarAddress}) + pc, err := NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) defer pc.Close() assert.NoError(t, err) assert.NotNil(t, pc) @@ -541,7 +545,7 @@ func TestPulsarClient_SeekLatest(t *testing.T) { func TestPulsarClient_EarliestMessageID(t *testing.T) { pulsarAddress := getPulsarAddress() - client, _ := NewClient(pulsar.ClientOptions{URL: pulsarAddress}) + client, _ := NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) defer client.Close() mid := client.EarliestMessageID() @@ -550,7 +554,7 @@ func TestPulsarClient_EarliestMessageID(t *testing.T) { func TestPulsarClient_StringToMsgID(t *testing.T) { pulsarAddress := getPulsarAddress() - client, _ := NewClient(pulsar.ClientOptions{URL: pulsarAddress}) + client, _ := NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) defer client.Close() mid := pulsar.EarliestMessageID() @@ -568,7 +572,7 @@ func TestPulsarClient_StringToMsgID(t *testing.T) { func TestPulsarClient_BytesToMsgID(t *testing.T) { pulsarAddress := getPulsarAddress() - client, _ := NewClient(pulsar.ClientOptions{URL: pulsarAddress}) + client, _ := NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) defer client.Close() mid := pulsar.EarliestMessageID() @@ -647,21 +651,52 @@ func (c *mockPulsarClient) Close() { func TestPulsarClient_SubscribeExclusiveFail(t *testing.T) { t.Run("exclusive pulsar consumer failure", func(t *testing.T) { pc := &pulsarClient{ - client: &mockPulsarClient{}, + tenant: DefaultPulsarTenant, + namespace: DefaultPulsarNamespace, + client: &mockPulsarClient{}, } - _, err := pc.Subscribe(mqwrapper.ConsumerOptions{}) + _, err := pc.Subscribe(mqwrapper.ConsumerOptions{Topic: "test_topic_name"}) assert.Error(t, err) assert.True(t, retry.IsUnRecoverable(err)) }) } +func TestPulsarClient_WithTenantAndNamespace(t *testing.T) { + tenant := "public" + namespace := "default" + topic := "test" + subName := "hello_world" + + pulsarAddress := getPulsarAddress() + pc, err := NewClient(tenant, namespace, pulsar.ClientOptions{URL: pulsarAddress}) + assert.Nil(t, err) + producer, err := pc.CreateProducer(mqwrapper.ProducerOptions{Topic: topic}) + defer producer.Close() + assert.Nil(t, err) + assert.NotNil(t, producer) + + fullTopicName, err := GetFullTopicName(tenant, namespace, topic) + assert.Nil(t, err) + assert.Equal(t, fullTopicName, producer.(*pulsarProducer).Topic()) + + consumer, err := pc.Subscribe(mqwrapper.ConsumerOptions{ + Topic: topic, + SubscriptionName: subName, + BufSize: 1024, + SubscriptionInitialPosition: mqwrapper.SubscriptionPositionEarliest, + }) + defer consumer.Close() + assert.Nil(t, err) + assert.NotNil(t, consumer) +} + func TestPulsarCtl(t *testing.T) { topic := "test" subName := "hello" pulsarAddress := getPulsarAddress() - pc, err := NewClient(pulsar.ClientOptions{URL: pulsarAddress}) + pc, err := NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) assert.Nil(t, err) consumer, err := pc.Subscribe(mqwrapper.ConsumerOptions{ Topic: topic, @@ -690,7 +725,9 @@ func TestPulsarCtl(t *testing.T) { }) assert.Error(t, err) - topicName, err := utils.GetTopicName(topic) + fullTopicName, err := GetFullTopicName(DefaultPulsarTenant, DefaultPulsarNamespace, topic) + assert.Nil(t, err) + topicName, err := utils.GetTopicName(fullTopicName) assert.NoError(t, err) pulsarURL, err := url.ParseRequestURI(pulsarAddress) @@ -698,12 +735,14 @@ func TestPulsarCtl(t *testing.T) { panic(err) } webport := Params.LoadWithDefault("pulsar.webport", "80") - cmdutils.PulsarCtlConfig.WebServiceURL = "http://" + pulsarURL.Hostname() + ":" + webport - admin := cmdutils.NewPulsarClient() + webServiceURL := "http://" + pulsarURL.Hostname() + ":" + webport + admin, err := NewAdminClient(webServiceURL, "", "") + assert.NoError(t, err) err = admin.Subscriptions().Delete(*topicName, subName, true) if err != nil { - cmdutils.PulsarCtlConfig.WebServiceURL = "http://" + pulsarURL.Hostname() + ":" + "8080" - admin := cmdutils.NewPulsarClient() + webServiceURL = "http://" + pulsarURL.Hostname() + ":" + "8080" + admin, err := NewAdminClient(webServiceURL, "", "") + assert.NoError(t, err) err = admin.Subscriptions().Delete(*topicName, subName, true) assert.NoError(t, err) } @@ -714,7 +753,29 @@ func TestPulsarCtl(t *testing.T) { BufSize: 1024, SubscriptionInitialPosition: mqwrapper.SubscriptionPositionEarliest, }) + defer consumer2.Close() assert.Nil(t, err) assert.NotNil(t, consumer2) - defer consumer2.Close() +} + +func NewPulsarAdminClient() { + panic("unimplemented") +} + +func TestPulsarClient_GetFullTopicName(t *testing.T) { + fullTopicName, err := GetFullTopicName("", "", "topic") + assert.Error(t, err) + assert.Empty(t, fullTopicName) + + fullTopicName, err = GetFullTopicName("tenant", "", "topic") + assert.Error(t, err) + assert.Empty(t, fullTopicName) + + fullTopicName, err = GetFullTopicName("", "namespace", "topic") + assert.Error(t, err) + assert.Empty(t, fullTopicName) + + fullTopicName, err = GetFullTopicName("tenant", "namespace", "topic") + assert.Nil(t, err) + assert.Equal(t, "tenant/namespace/topic", fullTopicName) } diff --git a/internal/mq/msgstream/mqwrapper/pulsar/pulsar_consumer_test.go b/internal/mq/msgstream/mqwrapper/pulsar/pulsar_consumer_test.go index ca113b45d5..7880e82e8c 100644 --- a/internal/mq/msgstream/mqwrapper/pulsar/pulsar_consumer_test.go +++ b/internal/mq/msgstream/mqwrapper/pulsar/pulsar_consumer_test.go @@ -24,7 +24,6 @@ import ( "testing" "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper" - "github.com/streamnative/pulsarctl/pkg/cmdutils" "github.com/streamnative/pulsarctl/pkg/pulsar/utils" "github.com/apache/pulsar-client-go/pulsar" @@ -33,7 +32,7 @@ import ( func TestPulsarConsumer_Subscription(t *testing.T) { pulsarAddress := getPulsarAddress() - pc, err := NewClient(pulsar.ClientOptions{URL: pulsarAddress}) + pc, err := NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) assert.Nil(t, err) defer pc.Close() @@ -65,7 +64,7 @@ func Test_PatchEarliestMessageID(t *testing.T) { func TestComsumeCompressedMessage(t *testing.T) { pulsarAddress := getPulsarAddress() - pc, err := NewClient(pulsar.ClientOptions{URL: pulsarAddress}) + pc, err := NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) assert.Nil(t, err) defer pc.Close() @@ -113,7 +112,7 @@ func TestComsumeCompressedMessage(t *testing.T) { func TestPulsarConsumer_Close(t *testing.T) { pulsarAddress := getPulsarAddress() - pc, err := NewClient(pulsar.ClientOptions{URL: pulsarAddress}) + pc, err := NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) assert.Nil(t, err) receiveChannel := make(chan pulsar.ConsumerMessage, 100) @@ -173,12 +172,14 @@ func TestPulsarClientCloseUnsubscribeError(t *testing.T) { panic(err) } webport := Params.LoadWithDefault("pulsar.webport", "80") - cmdutils.PulsarCtlConfig.WebServiceURL = "http://" + pulsarURL.Hostname() + ":" + webport - admin := cmdutils.NewPulsarClient() + webServiceURL := "http://" + pulsarURL.Hostname() + ":" + webport + admin, err := NewAdminClient(webServiceURL, "", "") + assert.NoError(t, err) err = admin.Subscriptions().Delete(*topicName, subName, true) if err != nil { - cmdutils.PulsarCtlConfig.WebServiceURL = "http://" + pulsarURL.Hostname() + ":" + "8080" - admin := cmdutils.NewPulsarClient() + webServiceURL = "http://" + pulsarURL.Hostname() + ":" + "8080" + admin, err := NewAdminClient(webServiceURL, "", "") + assert.NoError(t, err) err = admin.Subscriptions().Delete(*topicName, subName, true) assert.NoError(t, err) } diff --git a/internal/mq/msgstream/mqwrapper/pulsar/pulsar_producer_test.go b/internal/mq/msgstream/mqwrapper/pulsar/pulsar_producer_test.go index 1411f133a6..6155bbdc25 100644 --- a/internal/mq/msgstream/mqwrapper/pulsar/pulsar_producer_test.go +++ b/internal/mq/msgstream/mqwrapper/pulsar/pulsar_producer_test.go @@ -28,7 +28,7 @@ import ( func TestPulsarProducer(t *testing.T) { pulsarAddress := getPulsarAddress() - pc, err := NewClient(pulsar.ClientOptions{URL: pulsarAddress}) + pc, err := NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) defer pc.Close() assert.NoError(t, err) assert.NotNil(t, pc) @@ -39,7 +39,9 @@ func TestPulsarProducer(t *testing.T) { assert.NotNil(t, producer) pulsarProd := producer.(*pulsarProducer) - assert.Equal(t, pulsarProd.Topic(), topic) + fullTopicName, err := GetFullTopicName(DefaultPulsarTenant, DefaultPulsarNamespace, topic) + assert.Nil(t, err) + assert.Equal(t, pulsarProd.Topic(), fullTopicName) msg := &mqwrapper.ProducerMessage{ Payload: []byte{}, diff --git a/internal/util/paramtable/service_param.go b/internal/util/paramtable/service_param.go index aad73e4e08..b3c212ce55 100644 --- a/internal/util/paramtable/service_param.go +++ b/internal/util/paramtable/service_param.go @@ -17,22 +17,19 @@ package paramtable import ( + "encoding/json" "net/url" "os" "path" "strconv" "strings" - "sync" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/util" "github.com/milvus-io/milvus/internal/util/metricsinfo" - "github.com/streamnative/pulsarctl/pkg/cmdutils" "go.uber.org/zap" ) -var pulsarOnce sync.Once - const ( // SuggestPulsarMaxMessageSize defines the maximum size of Pulsar message. SuggestPulsarMaxMessageSize = 5 * 1024 * 1024 @@ -72,7 +69,7 @@ func (p *ServiceParam) Init() { p.MinioCfg.init(&p.BaseTable) } -/////////////////////////////////////////////////////////////////////////////// +// ///////////////////////////////////////////////////////////////////////////// // --- etcd --- type EtcdConfig struct { Base *BaseTable @@ -230,7 +227,7 @@ func (p *MetaStoreConfig) initMetaStoreType() { p.MetaStoreType = p.Base.LoadWithDefault("metastore.type", util.MetaStoreTypeEtcd) } -/////////////////////////////////////////////////////////////////////////////// +// ///////////////////////////////////////////////////////////////////////////// // --- meta db --- type MetaDBConfig struct { Base *BaseTable @@ -306,7 +303,7 @@ func (p *MetaDBConfig) initMaxIdleConns() { p.MaxIdleConns = maxIdleConns } -/////////////////////////////////////////////////////////////////////////////// +// ///////////////////////////////////////////////////////////////////////////// // --- pulsar --- type PulsarConfig struct { Base *BaseTable @@ -314,6 +311,14 @@ type PulsarConfig struct { Address string WebAddress string MaxMessageSize int + + // support auth + AuthPlugin string + AuthParams string + + // support tenant + Tenant string + Namespace string } func (p *PulsarConfig) init(base *BaseTable) { @@ -322,6 +327,10 @@ func (p *PulsarConfig) init(base *BaseTable) { p.initAddress() p.initWebAddress() p.initMaxMessageSize() + p.initAuthPlugin() + p.initAuthParams() + p.initTenant() + p.initNamespace() } func (p *PulsarConfig) initAddress() { @@ -350,9 +359,6 @@ func (p *PulsarConfig) initWebAddress() { webport := p.Base.LoadWithDefault("pulsar.webport", "80") p.WebAddress = "http://" + pulsarURL.Hostname() + ":" + webport } - pulsarOnce.Do(func() { - cmdutils.PulsarCtlConfig.WebServiceURL = p.WebAddress - }) } func (p *PulsarConfig) initMaxMessageSize() { @@ -369,6 +375,41 @@ func (p *PulsarConfig) initMaxMessageSize() { } } +func (p *PulsarConfig) initAuthPlugin() { + p.AuthPlugin = p.Base.LoadWithDefault("pulsar.authPlugin", "") +} + +func (p *PulsarConfig) initAuthParams() { + paramString := p.Base.LoadWithDefault("pulsar.authParams", "") + + // need to parse params to json due to .yaml config file doesn't support json format config item + // official pulsar client JWT config : {"token","fake_token_string"} + // milvus config: token:fake_token_string + jsonMap := make(map[string]string) + params := strings.Split(paramString, ",") + for _, param := range params { + kv := strings.Split(param, ":") + if len(kv) == 2 { + jsonMap[kv[0]] = kv[1] + } + } + + if len(jsonMap) == 0 { + p.AuthParams = "" + } else { + jsonData, _ := json.Marshal(&jsonMap) + p.AuthParams = string(jsonData) + } +} + +func (p *PulsarConfig) initTenant() { + p.Tenant = p.Base.LoadWithDefault("pulsar.tenant", "public") +} + +func (p *PulsarConfig) initNamespace() { + p.Namespace = p.Base.LoadWithDefault("pulsar.namespace", "default") +} + // --- kafka --- type KafkaConfig struct { Base *BaseTable @@ -416,7 +457,7 @@ func (k *KafkaConfig) initExtraKafkaConfig() { k.ProducerExtraConfig = k.Base.GetConfigSubSet(KafkaProducerConfigPrefix) } -/////////////////////////////////////////////////////////////////////////////// +// ///////////////////////////////////////////////////////////////////////////// // --- rocksmq --- type RocksmqConfig struct { Base *BaseTable @@ -434,7 +475,7 @@ func (p *RocksmqConfig) initPath() { p.Path = p.Base.LoadWithDefault("rocksmq.path", "") } -/////////////////////////////////////////////////////////////////////////////// +// ///////////////////////////////////////////////////////////////////////////// // --- minio --- type MinioConfig struct { Base *BaseTable diff --git a/internal/util/paramtable/service_param_test.go b/internal/util/paramtable/service_param_test.go index dd86951878..78e0175ce2 100644 --- a/internal/util/paramtable/service_param_test.go +++ b/internal/util/paramtable/service_param_test.go @@ -105,6 +105,26 @@ func TestServiceParam(t *testing.T) { } }) + t.Run("test pulsar auth config", func(t *testing.T) { + Params := SParams.PulsarCfg + + Params.initAuthPlugin() + assert.Equal(t, "", Params.AuthPlugin) + + Params.initAuthParams() + assert.Equal(t, "", Params.AuthParams) + }) + + t.Run("test pulsar tenant/namespace config", func(t *testing.T) { + Params := SParams.PulsarCfg + + Params.initTenant() + assert.Equal(t, "public", Params.Tenant) + + Params.initNamespace() + assert.Equal(t, "default", Params.Namespace) + }) + t.Run("test rocksmqConfig", func(t *testing.T) { Params := SParams.RocksmqCfg