diff --git a/internal/util/mqclient/pulsar_client.go b/internal/util/mqclient/pulsar_client.go index bed6e112c1..4ece2995b0 100644 --- a/internal/util/mqclient/pulsar_client.go +++ b/internal/util/mqclient/pulsar_client.go @@ -58,22 +58,7 @@ func (pc *pulsarClient) Subscribe(options ConsumerOptions) (Consumer, error) { if err != nil { return nil, err } - msgChannel := make(chan ConsumerMessage) - pConsumer := &pulsarConsumer{c: consumer, msgChannel: msgChannel} - - go func() { - for { //nolint:gosimple - select { - case msg, ok := <-pConsumer.c.Chan(): - if !ok { - close(msgChannel) - log.Debug("pulsar consumer channel closed") - return - } - msgChannel <- &pulsarMessage{msg: msg} - } - } - }() + pConsumer := &pulsarConsumer{c: consumer} return pConsumer, nil } diff --git a/internal/util/mqclient/pulsar_consumer.go b/internal/util/mqclient/pulsar_consumer.go index aa42609305..528fa3c1be 100644 --- a/internal/util/mqclient/pulsar_consumer.go +++ b/internal/util/mqclient/pulsar_consumer.go @@ -13,6 +13,7 @@ package mqclient import ( "github.com/apache/pulsar-client-go/pulsar" + "github.com/milvus-io/milvus/internal/log" ) type pulsarConsumer struct { @@ -25,6 +26,22 @@ func (pc *pulsarConsumer) Subscription() string { } func (pc *pulsarConsumer) Chan() <-chan ConsumerMessage { + if pc.msgChannel == nil { + pc.msgChannel = make(chan ConsumerMessage) + go func() { + for { //nolint:gosimple + select { + case msg, ok := <-pc.c.Chan(): + if !ok { + close(pc.msgChannel) + log.Debug("pulsar consumer channel closed") + return + } + pc.msgChannel <- &pulsarMessage{msg: msg} + } + } + }() + } return pc.msgChannel } diff --git a/internal/util/mqclient/rmq_client.go b/internal/util/mqclient/rmq_client.go index 31dceaf8bc..a8a3253224 100644 --- a/internal/util/mqclient/rmq_client.go +++ b/internal/util/mqclient/rmq_client.go @@ -55,22 +55,8 @@ func (rc *rmqClient) Subscribe(options ConsumerOptions) (Consumer, error) { return nil, err } - msgChannel := make(chan ConsumerMessage, 1) - rConsumer := &rmqConsumer{c: cli, msgChannel: msgChannel} + rConsumer := &rmqConsumer{c: cli} - go func() { - for { //nolint:gosimple - select { - case msg, ok := <-rConsumer.c.Chan(): - if !ok { - close(msgChannel) - return - } - msg.Topic = options.Topic - msgChannel <- &rmqMessage{msg: msg} - } - } - }() return rConsumer, nil } diff --git a/internal/util/mqclient/rmq_consumer.go b/internal/util/mqclient/rmq_consumer.go index 2d16d9f452..c210dc98b4 100644 --- a/internal/util/mqclient/rmq_consumer.go +++ b/internal/util/mqclient/rmq_consumer.go @@ -25,6 +25,22 @@ func (rc *rmqConsumer) Subscription() string { } func (rc *rmqConsumer) Chan() <-chan ConsumerMessage { + + if rc.msgChannel == nil { + rc.msgChannel = make(chan ConsumerMessage) + go func() { + for { //nolint:gosimple + select { + case msg, ok := <-rc.c.Chan(): + if !ok { + close(rc.msgChannel) + return + } + rc.msgChannel <- &rmqMessage{msg: msg} + } + } + }() + } return rc.msgChannel } diff --git a/internal/util/rocksmq/client/rocksmq/client_impl.go b/internal/util/rocksmq/client/rocksmq/client_impl.go index 5859e262b4..5e73c3e8f2 100644 --- a/internal/util/rocksmq/client/rocksmq/client_impl.go +++ b/internal/util/rocksmq/client/rocksmq/client_impl.go @@ -134,6 +134,7 @@ func consume(ctx context.Context, consumer *consumer) { consumer.messageCh <- ConsumerMessage{ MsgID: msg[0].MsgID, Payload: msg[0].Payload, + Topic: consumer.Topic(), } } }