diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index 7bd7bada42..377a849ae5 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -143,6 +143,8 @@ func TestDataNode(t *testing.T) { err = node1.Start() assert.Nil(t, err) defer func() { + // TODO: wait for reconnecting to Pulsar, delete sleep after Seek wouldn't lead to disconnect with Pulsar + time.Sleep(200 * time.Millisecond) err := node1.Stop() assert.Nil(t, err) }() @@ -336,6 +338,8 @@ func TestDataNode(t *testing.T) { if i <= 2 { err = node.flowgraphManager.addAndStart(node, &datapb.VchannelInfo{CollectionID: 1, ChannelName: test.dmChannelName}) assert.Nil(t, err) + // TODO: wait for reconnecting to Pulsar, delete sleep after Seek wouldn't lead to disconnect with Pulsar + time.Sleep(200 * time.Millisecond) vchanNameCh <- test.dmChannelName } } @@ -409,6 +413,8 @@ func TestWatchChannel(t *testing.T) { exist := node.flowgraphManager.exist(ch) assert.True(t, exist) + // TODO: wait for reconnecting to Pulsar, delete sleep after Seek wouldn't lead to disconnect with Pulsar + time.Sleep(200 * time.Millisecond) err = kv.RemoveWithPrefix(fmt.Sprintf("%s/%d", Params.DataNodeCfg.ChannelWatchSubPath, node.NodeID)) assert.Nil(t, err) //TODO there is not way to sync Release done, use sleep for now diff --git a/internal/datanode/flow_graph_manager_test.go b/internal/datanode/flow_graph_manager_test.go index fd658ee737..5566b0bac6 100644 --- a/internal/datanode/flow_graph_manager_test.go +++ b/internal/datanode/flow_graph_manager_test.go @@ -19,6 +19,7 @@ package datanode import ( "context" "testing" + "time" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" @@ -44,7 +45,11 @@ func TestFlowGraphManager(t *testing.T) { require.Nil(t, err) fm := newFlowgraphManager() - defer fm.dropAll() + defer func() { + // TODO: wait for reconnecting to Pulsar, delete sleep after Seek wouldn't lead to disconnect with Pulsar + time.Sleep(200 * time.Millisecond) + fm.dropAll() + }() t.Run("Test addAndStart", func(t *testing.T) { vchanName := "by-dev-rootcoord-dml-test-flowgraphmanager-addAndStart" vchan := &datapb.VchannelInfo{ @@ -57,6 +62,8 @@ func TestFlowGraphManager(t *testing.T) { assert.NoError(t, err) assert.True(t, fm.exist(vchanName)) + // TODO: wait for reconnecting to Pulsar, delete sleep after Seek wouldn't lead to disconnect with Pulsar + time.Sleep(200 * time.Millisecond) fm.dropAll() }) @@ -72,6 +79,8 @@ func TestFlowGraphManager(t *testing.T) { assert.NoError(t, err) assert.True(t, fm.exist(vchanName)) + // TODO: wait for reconnecting to Pulsar, delete sleep after Seek wouldn't lead to disconnect with Pulsar + time.Sleep(200 * time.Millisecond) fm.release(vchanName) assert.False(t, fm.exist(vchanName)) diff --git a/internal/util/mqclient/pulsar_consumer.go b/internal/util/mqclient/pulsar_consumer.go index 318d68f57c..c7a2dd995c 100644 --- a/internal/util/mqclient/pulsar_consumer.go +++ b/internal/util/mqclient/pulsar_consumer.go @@ -34,6 +34,7 @@ type PulsarConsumer struct { closeCh chan struct{} once sync.Once skip bool + closeOnce sync.Once } // Subscription get a subscription for the consumer @@ -104,8 +105,15 @@ func (pc *PulsarConsumer) Ack(message Message) { // Close the consumer and stop the broker to push more messages func (pc *PulsarConsumer) Close() { - pc.c.Close() - close(pc.closeCh) + pc.closeOnce.Do(func() { + defer pc.c.Close() + // Unsubscribe for the consumer + err := pc.c.Unsubscribe() + if err != nil { + panic(err) + } + close(pc.closeCh) + }) } // patchEarliestMessageID unsafe patch logic to change messageID partitionIdx to 0 diff --git a/internal/util/mqclient/pulsar_consumer_test.go b/internal/util/mqclient/pulsar_consumer_test.go index e408c5aad6..d9739a71d0 100644 --- a/internal/util/mqclient/pulsar_consumer_test.go +++ b/internal/util/mqclient/pulsar_consumer_test.go @@ -55,3 +55,29 @@ func Test_PatchEarliestMessageID(t *testing.T) { assert.Equal(t, "-1:-1:0", fmt.Sprintf("%v", mid)) } + +func TestPulsarConsumer_Close(t *testing.T) { + pulsarAddress, _ := Params.Load("_PulsarAddress") + pc, err := GetPulsarClientInstance(pulsar.ClientOptions{URL: pulsarAddress}) + assert.Nil(t, err) + + receiveChannel := make(chan pulsar.ConsumerMessage, 100) + consumer, err := pc.client.Subscribe(pulsar.ConsumerOptions{ + Topic: "Topic-1", + SubscriptionName: "SubName-1", + Type: pulsar.SubscriptionType(Exclusive), + SubscriptionInitialPosition: pulsar.SubscriptionInitialPosition(SubscriptionPositionEarliest), + MessageChannel: receiveChannel, + }) + assert.Nil(t, err) + assert.NotNil(t, consumer) + + str := consumer.Subscription() + assert.NotNil(t, str) + + pulsarConsumer := &PulsarConsumer{c: consumer, closeCh: make(chan struct{})} + pulsarConsumer.Close() + + // test double close + pulsarConsumer.Close() +}