diff --git a/internal/util/mqclient/pulsar_consumer.go b/internal/util/mqclient/pulsar_consumer.go index 8d2d992348..ae83869bb8 100644 --- a/internal/util/mqclient/pulsar_consumer.go +++ b/internal/util/mqclient/pulsar_consumer.go @@ -12,6 +12,7 @@ package mqclient import ( + "sync" "unsafe" "github.com/apache/pulsar-client-go/pulsar" @@ -23,6 +24,7 @@ type pulsarConsumer struct { msgChannel chan ConsumerMessage hasSeek bool closeCh chan struct{} + once sync.Once } func (pc *pulsarConsumer) Subscription() string { @@ -31,35 +33,37 @@ func (pc *pulsarConsumer) Subscription() string { func (pc *pulsarConsumer) Chan() <-chan ConsumerMessage { if pc.msgChannel == nil { - pc.msgChannel = make(chan ConsumerMessage) - // this part handles msgstream expectation when the consumer is not seeked - // pulsar's default behavior is setting postition to the earliest pointer when client of the same subscription pointer is not acked - // yet, our message stream is to setting to the very start point of the topic - if !pc.hasSeek { - // the concrete value of the MessageID is pulsar.messageID{-1,-1,-1,-1} - // but Seek function logic does not allow partitionID -1, See line 618-620 of github.com/apache/pulsar-client-go@v0.5.0 pulsar/consumer_impl.go - mid := pulsar.EarliestMessageID() - // the patch function use unsafe pointer to set partitionIdx to 0, which is the valid default partition index of current use case - // NOTE: when pulsar client version check, do check this logic is fixed or offset is changed!!! - // NOTE: unsafe solution, check implementation asap - patchEarliestMessageID(&mid) - pc.c.Seek(mid) - } - go func() { - for { //nolint:gosimple - select { - case msg, ok := <-pc.c.Chan(): - if !ok { - log.Debug("pulsar consumer channel closed") + pc.once.Do(func() { + pc.msgChannel = make(chan ConsumerMessage) + // this part handles msgstream expectation when the consumer is not seeked + // pulsar's default behavior is setting postition to the earliest pointer when client of the same subscription pointer is not acked + // yet, our message stream is to setting to the very start point of the topic + if !pc.hasSeek { + // the concrete value of the MessageID is pulsar.messageID{-1,-1,-1,-1} + // but Seek function logic does not allow partitionID -1, See line 618-620 of github.com/apache/pulsar-client-go@v0.5.0 pulsar/consumer_impl.go + mid := pulsar.EarliestMessageID() + // the patch function use unsafe pointer to set partitionIdx to 0, which is the valid default partition index of current use case + // NOTE: when pulsar client version check, do check this logic is fixed or offset is changed!!! + // NOTE: unsafe solution, check implementation asap + patchEarliestMessageID(&mid) + pc.c.Seek(mid) + } + go func() { + for { //nolint:gosimple + select { + case msg, ok := <-pc.c.Chan(): + if !ok { + log.Debug("pulsar consumer channel closed") + return + } + pc.msgChannel <- &pulsarMessage{msg: msg} + case <-pc.closeCh: // workaround for pulsar consumer.receiveCh not closed + close(pc.msgChannel) return } - pc.msgChannel <- &pulsarMessage{msg: msg} - case <-pc.closeCh: // workaround for pulsar consumer.receiveCh not closed - close(pc.msgChannel) - return } - } - }() + }() + }) } return pc.msgChannel } diff --git a/internal/util/mqclient/rmq_consumer.go b/internal/util/mqclient/rmq_consumer.go index 951340f685..4249a7ce55 100644 --- a/internal/util/mqclient/rmq_consumer.go +++ b/internal/util/mqclient/rmq_consumer.go @@ -12,6 +12,8 @@ package mqclient import ( + "sync" + "github.com/milvus-io/milvus/internal/util/rocksmq/client/rocksmq" ) @@ -19,6 +21,7 @@ type RmqConsumer struct { c rocksmq.Consumer msgChannel chan ConsumerMessage closeCh chan struct{} + once sync.Once } func (rc *RmqConsumer) Subscription() string { @@ -27,22 +30,24 @@ func (rc *RmqConsumer) Subscription() string { func (rc *RmqConsumer) Chan() <-chan ConsumerMessage { if rc.msgChannel == nil { - rc.msgChannel = make(chan ConsumerMessage) - go func() { - for { //nolint:gosimple - select { - case msg, ok := <-rc.c.Chan(): - if !ok { + rc.once.Do(func() { + rc.msgChannel = make(chan ConsumerMessage) + go func() { + for { //nolint:gosimple + select { + case msg, ok := <-rc.c.Chan(): + if !ok { + close(rc.msgChannel) + return + } + rc.msgChannel <- &rmqMessage{msg: msg} + case <-rc.closeCh: close(rc.msgChannel) return } - rc.msgChannel <- &rmqMessage{msg: msg} - case <-rc.closeCh: - close(rc.msgChannel) - return } - } - }() + }() + }) } return rc.msgChannel }