diff --git a/internal/util/rocksmq/client/rocksmq/client_impl.go b/internal/util/rocksmq/client/rocksmq/client_impl.go index 7097b353fc..f798625c3b 100644 --- a/internal/util/rocksmq/client/rocksmq/client_impl.go +++ b/internal/util/rocksmq/client/rocksmq/client_impl.go @@ -82,8 +82,6 @@ func (c *client) Subscribe(options ConsumerOptions) (Consumer, error) { return nil, err } } - c.wg.Add(1) - go c.consume(consumer) return consumer, nil } consumer, err := newConsumer(c, options) @@ -113,8 +111,6 @@ func (c *client) Subscribe(options ConsumerOptions) (Consumer, error) { // Take messages from RocksDB and put it into consumer.Chan(), // trigger by consumer.MsgMutex which trigger by producer - c.wg.Add(1) - go c.consume(consumer) c.consumerOptions = append(c.consumerOptions, options) return consumer, nil diff --git a/internal/util/rocksmq/client/rocksmq/consumer_impl.go b/internal/util/rocksmq/client/rocksmq/consumer_impl.go index 6f68c7e4a9..d57c7e0050 100644 --- a/internal/util/rocksmq/client/rocksmq/consumer_impl.go +++ b/internal/util/rocksmq/client/rocksmq/consumer_impl.go @@ -12,6 +12,8 @@ package rocksmq import ( + "sync" + "github.com/milvus-io/milvus/internal/log" "go.uber.org/zap" ) @@ -22,6 +24,8 @@ type consumer struct { consumerName string options ConsumerOptions + startOnce sync.Once + msgMutex chan struct{} messageCh chan ConsumerMessage } @@ -95,6 +99,10 @@ func (c *consumer) MsgMutex() chan struct{} { } func (c *consumer) Chan() <-chan ConsumerMessage { + c.startOnce.Do(func() { + c.client.wg.Add(1) + go c.client.consume(c) + }) return c.messageCh }