diff --git a/internal/util/mqclient/pulsar_client.go b/internal/util/mqclient/pulsar_client.go index 06186c127f..6e0b75dd62 100644 --- a/internal/util/mqclient/pulsar_client.go +++ b/internal/util/mqclient/pulsar_client.go @@ -67,7 +67,7 @@ func (pc *pulsarClient) Subscribe(options ConsumerOptions) (Consumer, error) { } //consumer.Seek(pulsar.EarliestMessageID()) //consumer.SeekByTime(time.Unix(0, 0)) - pConsumer := &pulsarConsumer{c: consumer} + pConsumer := &pulsarConsumer{c: consumer, closeCh: make(chan struct{})} return pConsumer, nil } diff --git a/internal/util/mqclient/pulsar_consumer.go b/internal/util/mqclient/pulsar_consumer.go index 1287950c6a..f29430814c 100644 --- a/internal/util/mqclient/pulsar_consumer.go +++ b/internal/util/mqclient/pulsar_consumer.go @@ -22,6 +22,7 @@ type pulsarConsumer struct { c pulsar.Consumer msgChannel chan ConsumerMessage hasSeek bool + closeCh chan struct{} } func (pc *pulsarConsumer) Subscription() string { @@ -39,11 +40,13 @@ func (pc *pulsarConsumer) Chan() <-chan ConsumerMessage { select { case msg, ok := <-pc.c.Chan(): if !ok { - close(pc.msgChannel) log.Debug("pulsar consumer channel closed") return } pc.msgChannel <- &pulsarMessage{msg: msg} + case <-pc.closeCh: // workaround for pulsar consumer.receiveCh not closed + close(pc.msgChannel) + return } } }() @@ -67,4 +70,5 @@ func (pc *pulsarConsumer) Ack(message ConsumerMessage) { func (pc *pulsarConsumer) Close() { pc.c.Close() + close(pc.closeCh) }