From 76b64a9d017a4c407c699df9001f55bba3c3b159 Mon Sep 17 00:00:00 2001 From: congqixia Date: Fri, 22 Oct 2021 17:25:12 +0800 Subject: [PATCH] Remove not needed ctx from rocksmq client (#10416) Signed-off-by: Congqi Xia --- .../util/rocksmq/client/rocksmq/client.go | 4 --- .../rocksmq/client/rocksmq/client_impl.go | 30 ++++++++----------- .../client/rocksmq/client_impl_test.go | 8 ----- 3 files changed, 13 insertions(+), 29 deletions(-) diff --git a/internal/util/rocksmq/client/rocksmq/client.go b/internal/util/rocksmq/client/rocksmq/client.go index 93ed083213..715dd77b2d 100644 --- a/internal/util/rocksmq/client/rocksmq/client.go +++ b/internal/util/rocksmq/client/rocksmq/client.go @@ -12,8 +12,6 @@ package rocksmq import ( - "context" - server "github.com/milvus-io/milvus/internal/util/rocksmq/server/rocksmq" ) @@ -31,8 +29,6 @@ func NewClient(options ClientOptions) (Client, error) { // ClientOptions is the options of a client type ClientOptions struct { Server RocksMQ - Ctx context.Context - Cancel context.CancelFunc } // Client is the interface rocksmq client diff --git a/internal/util/rocksmq/client/rocksmq/client_impl.go b/internal/util/rocksmq/client/rocksmq/client_impl.go index 3386aba9b4..9d8c3351f2 100644 --- a/internal/util/rocksmq/client/rocksmq/client_impl.go +++ b/internal/util/rocksmq/client/rocksmq/client_impl.go @@ -12,7 +12,6 @@ package rocksmq import ( - "context" "reflect" "sync" @@ -25,9 +24,9 @@ type client struct { server RocksMQ producerOptions []ProducerOptions consumerOptions []ConsumerOptions - ctx context.Context - cancel context.CancelFunc wg *sync.WaitGroup + closeCh chan struct{} + closeOnce sync.Once } func newClient(options ClientOptions) (*client, error) { @@ -35,16 +34,11 @@ func newClient(options ClientOptions) (*client, error) { return nil, newError(InvalidConfiguration, "options.Server is nil") } - if options.Ctx == nil { - options.Ctx, options.Cancel = context.WithCancel(context.Background()) - } - c := &client{ server: options.Server, producerOptions: []ProducerOptions{}, - ctx: options.Ctx, - cancel: options.Cancel, wg: &sync.WaitGroup{}, + closeCh: make(chan struct{}), } return c, nil } @@ -128,7 +122,7 @@ func (c *client) consume(consumer *consumer) { defer c.wg.Done() for { select { - case <-c.ctx.Done(): + case <-c.closeCh: return case _, ok := <-consumer.MsgMutex(): if !ok { @@ -164,11 +158,13 @@ func (c *client) consume(consumer *consumer) { func (c *client) Close() { // TODO(yukun): Should call server.close() here? - c.cancel() - // Wait all consume goroutines exit - c.wg.Wait() - if c.server != nil { - c.server.Close() - } - c.consumerOptions = nil + c.closeOnce.Do(func() { + close(c.closeCh) + c.wg.Wait() + if c.server != nil { + c.server.Close() + } + // Wait all consume goroutines exit + c.consumerOptions = nil + }) } diff --git a/internal/util/rocksmq/client/rocksmq/client_impl_test.go b/internal/util/rocksmq/client/rocksmq/client_impl_test.go index c7a40818b1..8dd4d8ca3a 100644 --- a/internal/util/rocksmq/client/rocksmq/client_impl_test.go +++ b/internal/util/rocksmq/client/rocksmq/client_impl_test.go @@ -12,7 +12,6 @@ package rocksmq import ( - "context" "testing" "github.com/stretchr/testify/assert" @@ -68,11 +67,8 @@ func TestClient_CreateProducer(t *testing.T) { } func TestClient_Subscribe(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) client, err := NewClient(ClientOptions{ Server: newMockRocksMQ(), - Ctx: ctx, - Cancel: cancel, }) assert.NoError(t, err) @@ -128,11 +124,8 @@ func TestClient_consume(t *testing.T) { rmqPath := "/tmp/milvus/test_client3" rmq := newRocksMQ(rmqPath) defer removePath(rmqPath) - ctx, cancel := context.WithCancel(context.Background()) client, err := NewClient(ClientOptions{ Server: rmq, - Ctx: ctx, - Cancel: cancel, }) assert.NoError(t, err) defer client.Close() @@ -159,5 +152,4 @@ func TestClient_consume(t *testing.T) { assert.Nil(t, err) <-consumer.Chan() - }