diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index 6a298c994d..026ba901ad 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -89,6 +89,7 @@ type QueryNode struct { // call once initOnce sync.Once + stopOnce sync.Once // internal components manager *segments.Manager @@ -344,12 +345,21 @@ func (node *QueryNode) Start() error { // Stop mainly stop QueryNode's query service, historical loop and streaming loop. func (node *QueryNode) Stop() error { - - log.Warn("Query node stop..") - node.UpdateStateCode(commonpb.StateCode_Abnormal) - node.lifetime.Wait() - node.session.Revoke(time.Second) - node.pipelineManager.Close() + node.stopOnce.Do(func() { + log.Info("Query node stop...") + node.UpdateStateCode(commonpb.StateCode_Abnormal) + node.lifetime.Wait() + node.cancel() + if node.pipelineManager != nil { + node.pipelineManager.Close() + } + if node.session != nil { + node.session.Stop() + } + if node.dispClient != nil { + node.dispClient.Close() + } + }) return nil } diff --git a/pkg/mq/msgdispatcher/client.go b/pkg/mq/msgdispatcher/client.go index 503155e2b8..116afbdd6b 100644 --- a/pkg/mq/msgdispatcher/client.go +++ b/pkg/mq/msgdispatcher/client.go @@ -37,6 +37,7 @@ type ( type Client interface { Register(vchannel string, pos *Pos, subPos SubPos) (<-chan *MsgPack, error) Deregister(vchannel string) + Close() } var _ Client = (*client)(nil) @@ -97,3 +98,16 @@ func (c *client) Deregister(vchannel string) { zap.Int64("nodeID", c.nodeID), zap.String("vchannel", vchannel)) } } + +func (c *client) Close() { + log := log.With(zap.String("role", c.role), + zap.Int64("nodeID", c.nodeID)) + c.managerMu.Lock() + defer c.managerMu.Unlock() + for pchannel, manager := range c.managers { + log.Info("close manager", zap.String("channel", pchannel)) + delete(c.managers, pchannel) + manager.Close() + } + log.Info("dispatcher client closed") +} diff --git a/pkg/mq/msgdispatcher/mock_client.go b/pkg/mq/msgdispatcher/mock_client.go index 11d8a22782..0a478babbe 100644 --- a/pkg/mq/msgdispatcher/mock_client.go +++ b/pkg/mq/msgdispatcher/mock_client.go @@ -1,12 +1,12 @@ -// Code generated by mockery v2.15.0. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package msgdispatcher import ( - "github.com/milvus-io/milvus-proto/go-api/msgpb" + mqwrapper "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper" mock "github.com/stretchr/testify/mock" - mqwrapper "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper" + msgpb "github.com/milvus-io/milvus-proto/go-api/msgpb" msgstream "github.com/milvus-io/milvus/pkg/mq/msgstream" ) @@ -24,6 +24,33 @@ func (_m *MockClient) EXPECT() *MockClient_Expecter { return &MockClient_Expecter{mock: &_m.Mock} } +// Close provides a mock function with given fields: +func (_m *MockClient) Close() { + _m.Called() +} + +// MockClient_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close' +type MockClient_Close_Call struct { + *mock.Call +} + +// Close is a helper method to define mock.On call +func (_e *MockClient_Expecter) Close() *MockClient_Close_Call { + return &MockClient_Close_Call{Call: _e.mock.On("Close")} +} + +func (_c *MockClient_Close_Call) Run(run func()) *MockClient_Close_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockClient_Close_Call) Return() *MockClient_Close_Call { + _c.Call.Return() + return _c +} + // Deregister provides a mock function with given fields: vchannel func (_m *MockClient) Deregister(vchannel string) { _m.Called(vchannel) @@ -35,7 +62,7 @@ type MockClient_Deregister_Call struct { } // Deregister is a helper method to define mock.On call -// - vchannel string +// - vchannel string func (_e *MockClient_Expecter) Deregister(vchannel interface{}) *MockClient_Deregister_Call { return &MockClient_Deregister_Call{Call: _e.mock.On("Deregister", vchannel)} } @@ -81,9 +108,9 @@ type MockClient_Register_Call struct { } // Register is a helper method to define mock.On call -// - vchannel string -// - pos *msgpb.MsgPosition -// - subPos mqwrapper.SubscriptionInitialPosition +// - vchannel string +// - pos *msgpb.MsgPosition +// - subPos mqwrapper.SubscriptionInitialPosition func (_e *MockClient_Expecter) Register(vchannel interface{}, pos interface{}, subPos interface{}) *MockClient_Register_Call { return &MockClient_Register_Call{Call: _e.mock.On("Register", vchannel, pos, subPos)} }