From f4de99e1298beddc081b9071c9b02e751c6bd717 Mon Sep 17 00:00:00 2001 From: chyezh Date: Mon, 22 Jul 2024 11:32:04 +0800 Subject: [PATCH] enhance: implement streaming coord client (#34654) issue: #33285 - add streaming coord channel assignment watch client Signed-off-by: chyezh --- internal/.mockery.yaml | 2 + ...k_StreamingCoordAssignmentServiceClient.go | 109 +++++ ...ignmentService_AssignmentDiscoverClient.go | 398 ++++++++++++++++++ .../client/assignment/assignment_impl.go | 174 ++++++++ .../client/assignment/assignment_test.go | 113 +++++ .../client/assignment/discoverer.go | 154 +++++++ .../client/assignment/watcher.go | 54 +++ internal/streamingcoord/client/client.go | 110 +++++ internal/streamingcoord/client/client_impl.go | 26 ++ internal/streamingcoord/client/client_test.go | 25 ++ 10 files changed, 1165 insertions(+) create mode 100644 internal/mocks/proto/mock_streamingpb/mock_StreamingCoordAssignmentServiceClient.go create mode 100644 internal/mocks/proto/mock_streamingpb/mock_StreamingCoordAssignmentService_AssignmentDiscoverClient.go create mode 100644 internal/streamingcoord/client/assignment/assignment_impl.go create mode 100644 internal/streamingcoord/client/assignment/assignment_test.go create mode 100644 internal/streamingcoord/client/assignment/discoverer.go create mode 100644 internal/streamingcoord/client/assignment/watcher.go create mode 100644 internal/streamingcoord/client/client.go create mode 100644 internal/streamingcoord/client/client_impl.go create mode 100644 internal/streamingcoord/client/client_test.go diff --git a/internal/.mockery.yaml b/internal/.mockery.yaml index 0d1de8c737..c68667da37 100644 --- a/internal/.mockery.yaml +++ b/internal/.mockery.yaml @@ -38,6 +38,8 @@ packages: interfaces: StreamingNodeHandlerService_ConsumeServer: StreamingNodeHandlerService_ProduceServer: + StreamingCoordAssignmentServiceClient: + StreamingCoordAssignmentService_AssignmentDiscoverClient: StreamingCoordAssignmentService_AssignmentDiscoverServer: StreamingNodeManagerServiceClient: StreamingNodeHandlerServiceClient: diff --git a/internal/mocks/proto/mock_streamingpb/mock_StreamingCoordAssignmentServiceClient.go b/internal/mocks/proto/mock_streamingpb/mock_StreamingCoordAssignmentServiceClient.go new file mode 100644 index 0000000000..569636ca72 --- /dev/null +++ b/internal/mocks/proto/mock_streamingpb/mock_StreamingCoordAssignmentServiceClient.go @@ -0,0 +1,109 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package mock_streamingpb + +import ( + context "context" + + grpc "google.golang.org/grpc" + + mock "github.com/stretchr/testify/mock" + + streamingpb "github.com/milvus-io/milvus/internal/proto/streamingpb" +) + +// MockStreamingCoordAssignmentServiceClient is an autogenerated mock type for the StreamingCoordAssignmentServiceClient type +type MockStreamingCoordAssignmentServiceClient struct { + mock.Mock +} + +type MockStreamingCoordAssignmentServiceClient_Expecter struct { + mock *mock.Mock +} + +func (_m *MockStreamingCoordAssignmentServiceClient) EXPECT() *MockStreamingCoordAssignmentServiceClient_Expecter { + return &MockStreamingCoordAssignmentServiceClient_Expecter{mock: &_m.Mock} +} + +// AssignmentDiscover provides a mock function with given fields: ctx, opts +func (_m *MockStreamingCoordAssignmentServiceClient) AssignmentDiscover(ctx context.Context, opts ...grpc.CallOption) (streamingpb.StreamingCoordAssignmentService_AssignmentDiscoverClient, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 streamingpb.StreamingCoordAssignmentService_AssignmentDiscoverClient + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, ...grpc.CallOption) (streamingpb.StreamingCoordAssignmentService_AssignmentDiscoverClient, error)); ok { + return rf(ctx, opts...) + } + if rf, ok := ret.Get(0).(func(context.Context, ...grpc.CallOption) streamingpb.StreamingCoordAssignmentService_AssignmentDiscoverClient); ok { + r0 = rf(ctx, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(streamingpb.StreamingCoordAssignmentService_AssignmentDiscoverClient) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, ...grpc.CallOption) error); ok { + r1 = rf(ctx, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockStreamingCoordAssignmentServiceClient_AssignmentDiscover_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AssignmentDiscover' +type MockStreamingCoordAssignmentServiceClient_AssignmentDiscover_Call struct { + *mock.Call +} + +// AssignmentDiscover is a helper method to define mock.On call +// - ctx context.Context +// - opts ...grpc.CallOption +func (_e *MockStreamingCoordAssignmentServiceClient_Expecter) AssignmentDiscover(ctx interface{}, opts ...interface{}) *MockStreamingCoordAssignmentServiceClient_AssignmentDiscover_Call { + return &MockStreamingCoordAssignmentServiceClient_AssignmentDiscover_Call{Call: _e.mock.On("AssignmentDiscover", + append([]interface{}{ctx}, opts...)...)} +} + +func (_c *MockStreamingCoordAssignmentServiceClient_AssignmentDiscover_Call) Run(run func(ctx context.Context, opts ...grpc.CallOption)) *MockStreamingCoordAssignmentServiceClient_AssignmentDiscover_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]grpc.CallOption, len(args)-1) + for i, a := range args[1:] { + if a != nil { + variadicArgs[i] = a.(grpc.CallOption) + } + } + run(args[0].(context.Context), variadicArgs...) + }) + return _c +} + +func (_c *MockStreamingCoordAssignmentServiceClient_AssignmentDiscover_Call) Return(_a0 streamingpb.StreamingCoordAssignmentService_AssignmentDiscoverClient, _a1 error) *MockStreamingCoordAssignmentServiceClient_AssignmentDiscover_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockStreamingCoordAssignmentServiceClient_AssignmentDiscover_Call) RunAndReturn(run func(context.Context, ...grpc.CallOption) (streamingpb.StreamingCoordAssignmentService_AssignmentDiscoverClient, error)) *MockStreamingCoordAssignmentServiceClient_AssignmentDiscover_Call { + _c.Call.Return(run) + return _c +} + +// NewMockStreamingCoordAssignmentServiceClient creates a new instance of MockStreamingCoordAssignmentServiceClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockStreamingCoordAssignmentServiceClient(t interface { + mock.TestingT + Cleanup(func()) +}) *MockStreamingCoordAssignmentServiceClient { + mock := &MockStreamingCoordAssignmentServiceClient{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/mocks/proto/mock_streamingpb/mock_StreamingCoordAssignmentService_AssignmentDiscoverClient.go b/internal/mocks/proto/mock_streamingpb/mock_StreamingCoordAssignmentService_AssignmentDiscoverClient.go new file mode 100644 index 0000000000..a054beea79 --- /dev/null +++ b/internal/mocks/proto/mock_streamingpb/mock_StreamingCoordAssignmentService_AssignmentDiscoverClient.go @@ -0,0 +1,398 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package mock_streamingpb + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" + metadata "google.golang.org/grpc/metadata" + + streamingpb "github.com/milvus-io/milvus/internal/proto/streamingpb" +) + +// MockStreamingCoordAssignmentService_AssignmentDiscoverClient is an autogenerated mock type for the StreamingCoordAssignmentService_AssignmentDiscoverClient type +type MockStreamingCoordAssignmentService_AssignmentDiscoverClient struct { + mock.Mock +} + +type MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Expecter struct { + mock *mock.Mock +} + +func (_m *MockStreamingCoordAssignmentService_AssignmentDiscoverClient) EXPECT() *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Expecter { + return &MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Expecter{mock: &_m.Mock} +} + +// CloseSend provides a mock function with given fields: +func (_m *MockStreamingCoordAssignmentService_AssignmentDiscoverClient) CloseSend() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockStreamingCoordAssignmentService_AssignmentDiscoverClient_CloseSend_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CloseSend' +type MockStreamingCoordAssignmentService_AssignmentDiscoverClient_CloseSend_Call struct { + *mock.Call +} + +// CloseSend is a helper method to define mock.On call +func (_e *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Expecter) CloseSend() *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_CloseSend_Call { + return &MockStreamingCoordAssignmentService_AssignmentDiscoverClient_CloseSend_Call{Call: _e.mock.On("CloseSend")} +} + +func (_c *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_CloseSend_Call) Run(run func()) *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_CloseSend_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_CloseSend_Call) Return(_a0 error) *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_CloseSend_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_CloseSend_Call) RunAndReturn(run func() error) *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_CloseSend_Call { + _c.Call.Return(run) + return _c +} + +// Context provides a mock function with given fields: +func (_m *MockStreamingCoordAssignmentService_AssignmentDiscoverClient) Context() context.Context { + ret := _m.Called() + + var r0 context.Context + if rf, ok := ret.Get(0).(func() context.Context); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(context.Context) + } + } + + return r0 +} + +// MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Context_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Context' +type MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Context_Call struct { + *mock.Call +} + +// Context is a helper method to define mock.On call +func (_e *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Expecter) Context() *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Context_Call { + return &MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Context_Call{Call: _e.mock.On("Context")} +} + +func (_c *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Context_Call) Run(run func()) *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Context_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Context_Call) Return(_a0 context.Context) *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Context_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Context_Call) RunAndReturn(run func() context.Context) *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Context_Call { + _c.Call.Return(run) + return _c +} + +// Header provides a mock function with given fields: +func (_m *MockStreamingCoordAssignmentService_AssignmentDiscoverClient) Header() (metadata.MD, error) { + ret := _m.Called() + + var r0 metadata.MD + var r1 error + if rf, ok := ret.Get(0).(func() (metadata.MD, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() metadata.MD); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(metadata.MD) + } + } + + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Header_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Header' +type MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Header_Call struct { + *mock.Call +} + +// Header is a helper method to define mock.On call +func (_e *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Expecter) Header() *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Header_Call { + return &MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Header_Call{Call: _e.mock.On("Header")} +} + +func (_c *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Header_Call) Run(run func()) *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Header_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Header_Call) Return(_a0 metadata.MD, _a1 error) *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Header_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Header_Call) RunAndReturn(run func() (metadata.MD, error)) *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Header_Call { + _c.Call.Return(run) + return _c +} + +// Recv provides a mock function with given fields: +func (_m *MockStreamingCoordAssignmentService_AssignmentDiscoverClient) Recv() (*streamingpb.AssignmentDiscoverResponse, error) { + ret := _m.Called() + + var r0 *streamingpb.AssignmentDiscoverResponse + var r1 error + if rf, ok := ret.Get(0).(func() (*streamingpb.AssignmentDiscoverResponse, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() *streamingpb.AssignmentDiscoverResponse); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*streamingpb.AssignmentDiscoverResponse) + } + } + + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Recv_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Recv' +type MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Recv_Call struct { + *mock.Call +} + +// Recv is a helper method to define mock.On call +func (_e *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Expecter) Recv() *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Recv_Call { + return &MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Recv_Call{Call: _e.mock.On("Recv")} +} + +func (_c *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Recv_Call) Run(run func()) *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Recv_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Recv_Call) Return(_a0 *streamingpb.AssignmentDiscoverResponse, _a1 error) *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Recv_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Recv_Call) RunAndReturn(run func() (*streamingpb.AssignmentDiscoverResponse, error)) *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Recv_Call { + _c.Call.Return(run) + return _c +} + +// RecvMsg provides a mock function with given fields: m +func (_m *MockStreamingCoordAssignmentService_AssignmentDiscoverClient) RecvMsg(m interface{}) error { + ret := _m.Called(m) + + var r0 error + if rf, ok := ret.Get(0).(func(interface{}) error); ok { + r0 = rf(m) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockStreamingCoordAssignmentService_AssignmentDiscoverClient_RecvMsg_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RecvMsg' +type MockStreamingCoordAssignmentService_AssignmentDiscoverClient_RecvMsg_Call struct { + *mock.Call +} + +// RecvMsg is a helper method to define mock.On call +// - m interface{} +func (_e *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Expecter) RecvMsg(m interface{}) *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_RecvMsg_Call { + return &MockStreamingCoordAssignmentService_AssignmentDiscoverClient_RecvMsg_Call{Call: _e.mock.On("RecvMsg", m)} +} + +func (_c *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_RecvMsg_Call) Run(run func(m interface{})) *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_RecvMsg_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(interface{})) + }) + return _c +} + +func (_c *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_RecvMsg_Call) Return(_a0 error) *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_RecvMsg_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_RecvMsg_Call) RunAndReturn(run func(interface{}) error) *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_RecvMsg_Call { + _c.Call.Return(run) + return _c +} + +// Send provides a mock function with given fields: _a0 +func (_m *MockStreamingCoordAssignmentService_AssignmentDiscoverClient) Send(_a0 *streamingpb.AssignmentDiscoverRequest) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(*streamingpb.AssignmentDiscoverRequest) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Send_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Send' +type MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Send_Call struct { + *mock.Call +} + +// Send is a helper method to define mock.On call +// - _a0 *streamingpb.AssignmentDiscoverRequest +func (_e *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Expecter) Send(_a0 interface{}) *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Send_Call { + return &MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Send_Call{Call: _e.mock.On("Send", _a0)} +} + +func (_c *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Send_Call) Run(run func(_a0 *streamingpb.AssignmentDiscoverRequest)) *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Send_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*streamingpb.AssignmentDiscoverRequest)) + }) + return _c +} + +func (_c *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Send_Call) Return(_a0 error) *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Send_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Send_Call) RunAndReturn(run func(*streamingpb.AssignmentDiscoverRequest) error) *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Send_Call { + _c.Call.Return(run) + return _c +} + +// SendMsg provides a mock function with given fields: m +func (_m *MockStreamingCoordAssignmentService_AssignmentDiscoverClient) SendMsg(m interface{}) error { + ret := _m.Called(m) + + var r0 error + if rf, ok := ret.Get(0).(func(interface{}) error); ok { + r0 = rf(m) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockStreamingCoordAssignmentService_AssignmentDiscoverClient_SendMsg_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SendMsg' +type MockStreamingCoordAssignmentService_AssignmentDiscoverClient_SendMsg_Call struct { + *mock.Call +} + +// SendMsg is a helper method to define mock.On call +// - m interface{} +func (_e *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Expecter) SendMsg(m interface{}) *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_SendMsg_Call { + return &MockStreamingCoordAssignmentService_AssignmentDiscoverClient_SendMsg_Call{Call: _e.mock.On("SendMsg", m)} +} + +func (_c *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_SendMsg_Call) Run(run func(m interface{})) *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_SendMsg_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(interface{})) + }) + return _c +} + +func (_c *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_SendMsg_Call) Return(_a0 error) *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_SendMsg_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_SendMsg_Call) RunAndReturn(run func(interface{}) error) *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_SendMsg_Call { + _c.Call.Return(run) + return _c +} + +// Trailer provides a mock function with given fields: +func (_m *MockStreamingCoordAssignmentService_AssignmentDiscoverClient) Trailer() metadata.MD { + ret := _m.Called() + + var r0 metadata.MD + if rf, ok := ret.Get(0).(func() metadata.MD); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(metadata.MD) + } + } + + return r0 +} + +// MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Trailer_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Trailer' +type MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Trailer_Call struct { + *mock.Call +} + +// Trailer is a helper method to define mock.On call +func (_e *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Expecter) Trailer() *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Trailer_Call { + return &MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Trailer_Call{Call: _e.mock.On("Trailer")} +} + +func (_c *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Trailer_Call) Run(run func()) *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Trailer_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Trailer_Call) Return(_a0 metadata.MD) *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Trailer_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Trailer_Call) RunAndReturn(run func() metadata.MD) *MockStreamingCoordAssignmentService_AssignmentDiscoverClient_Trailer_Call { + _c.Call.Return(run) + return _c +} + +// NewMockStreamingCoordAssignmentService_AssignmentDiscoverClient creates a new instance of MockStreamingCoordAssignmentService_AssignmentDiscoverClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockStreamingCoordAssignmentService_AssignmentDiscoverClient(t interface { + mock.TestingT + Cleanup(func()) +}) *MockStreamingCoordAssignmentService_AssignmentDiscoverClient { + mock := &MockStreamingCoordAssignmentService_AssignmentDiscoverClient{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/streamingcoord/client/assignment/assignment_impl.go b/internal/streamingcoord/client/assignment/assignment_impl.go new file mode 100644 index 0000000000..1e7497e1b0 --- /dev/null +++ b/internal/streamingcoord/client/assignment/assignment_impl.go @@ -0,0 +1,174 @@ +package assignment + +import ( + "context" + "sync" + "time" + + "github.com/cockroachdb/errors" + "go.uber.org/zap" + + "github.com/milvus-io/milvus/internal/proto/streamingpb" + "github.com/milvus-io/milvus/internal/util/streamingutil/service/lazygrpc" + "github.com/milvus-io/milvus/internal/util/streamingutil/status" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/streaming/util/types" + "github.com/milvus-io/milvus/pkg/util/lifetime" + "github.com/milvus-io/milvus/pkg/util/syncutil" +) + +// NewAssignmentService creates a new assignment service. +func NewAssignmentService(service lazygrpc.Service[streamingpb.StreamingCoordAssignmentServiceClient]) *AssignmentServiceImpl { + ctx, cancel := context.WithCancel(context.Background()) + s := &AssignmentServiceImpl{ + ctx: ctx, + cancel: cancel, + lifetime: lifetime.NewLifetime(lifetime.Working), + watcher: newWatcher(), + service: service, + resumingExitCh: make(chan struct{}), + cond: syncutil.NewContextCond(&sync.Mutex{}), + discoverer: nil, + logger: log.With(), + } + go s.resumeLoop() + return s +} + +type AssignmentServiceImpl struct { + ctx context.Context + cancel context.CancelFunc + lifetime lifetime.Lifetime[lifetime.State] + watcher *watcher + service lazygrpc.Service[streamingpb.StreamingCoordAssignmentServiceClient] + resumingExitCh chan struct{} + cond *syncutil.ContextCond + discoverer *assignmentDiscoverClient + logger *log.MLogger +} + +// AssignmentDiscover watches the assignment discovery. +func (c *AssignmentServiceImpl) AssignmentDiscover(ctx context.Context, cb func(*types.VersionedStreamingNodeAssignments) error) error { + if c.lifetime.Add(lifetime.IsWorking) != nil { + return status.NewOnShutdownError("assignment service client is closing") + } + defer c.lifetime.Done() + + return c.watcher.AssignmentDiscover(ctx, cb) +} + +// ReportAssignmentError reports the assignment error to server. +func (c *AssignmentServiceImpl) ReportAssignmentError(ctx context.Context, pchannel types.PChannelInfo, assignmentErr error) error { + if c.lifetime.Add(lifetime.IsWorking) != nil { + return status.NewOnShutdownError("assignment service client is closing") + } + defer c.lifetime.Done() + + // wait for service ready. + assignment, err := c.getAssignmentDiscoverOrWait(ctx) + if err != nil { + return errors.Wrap(err, "at creating assignment service") + } + assignment.ReportAssignmentError(pchannel, assignmentErr) + return nil +} + +// Close closes the assignment service. +func (c *AssignmentServiceImpl) Close() { + c.lifetime.SetState(lifetime.Stopped) + c.lifetime.Wait() + + c.cancel() + <-c.resumingExitCh + c.cond.L.Lock() + if c.discoverer != nil { + c.discoverer.Close() + } + c.cond.L.Unlock() +} + +// getProducerOrWaitProducerReady get producer or wait the new producer is available. +func (c *AssignmentServiceImpl) getAssignmentDiscoverOrWait(ctx context.Context) (*assignmentDiscoverClient, error) { + c.cond.L.Lock() + for c.discoverer == nil || !c.discoverer.IsAvailable() { + if err := c.cond.Wait(ctx); err != nil { + return nil, err + } + } + discoverer := c.discoverer + c.cond.L.Unlock() + return discoverer, nil +} + +func (c *AssignmentServiceImpl) resumeLoop() (err error) { + defer func() { + if err != nil { + c.logger.Warn("stop resuming", zap.Error(err)) + } else { + c.logger.Info("stop resuming") + } + close(c.resumingExitCh) + }() + + for { + // Do a underlying assignmentDiscoverClient swap + adc, err := c.swapAssignmentDiscoverClient() + if err != nil { + return err + } + if err := c.waitUntilUnavailable(adc); err != nil { + return err + } + } +} + +// swapAssignmentDiscoverClient swaps the assignment discover client. +func (c *AssignmentServiceImpl) swapAssignmentDiscoverClient() (*assignmentDiscoverClient, error) { + adc, err := c.createNewAssignmentDiscoverClient() + if err != nil { + return nil, err + } + c.cond.LockAndBroadcast() + oldADC := c.discoverer + c.discoverer = adc + c.cond.L.Unlock() + + c.logger.Info("swap assignment discover client") + if oldADC != nil { + oldADC.Close() + } + c.logger.Info("old assignment discover client closed") + return adc, nil +} + +// getAssignmentDiscoverClient returns the assignment discover client. +func (c *AssignmentServiceImpl) createNewAssignmentDiscoverClient() (*assignmentDiscoverClient, error) { + for { + // Create a new available assignment discover client. + service, err := c.service.GetService(c.ctx) + if err != nil { + return nil, err + } + client, err := service.AssignmentDiscover(c.ctx) + if errors.Is(err, context.Canceled) { + return nil, err + } + if err != nil { + c.logger.Warn("create a assignment discover stream failed", zap.Error(err)) + // TODO: backoff + time.Sleep(50 * time.Millisecond) + continue + } + return newAssignmentDiscoverClient(c.watcher, client), nil + } +} + +func (c *AssignmentServiceImpl) waitUntilUnavailable(adc *assignmentDiscoverClient) error { + select { + case <-adc.Available(): + c.logger.Warn("assignment discover client is unavailable, try to resuming...") + return nil + case <-c.ctx.Done(): + return c.ctx.Err() + } +} diff --git a/internal/streamingcoord/client/assignment/assignment_test.go b/internal/streamingcoord/client/assignment/assignment_test.go new file mode 100644 index 0000000000..3073aa7654 --- /dev/null +++ b/internal/streamingcoord/client/assignment/assignment_test.go @@ -0,0 +1,113 @@ +package assignment + +import ( + "context" + "io" + "testing" + "time" + + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/milvus-io/milvus/internal/mocks/proto/mock_streamingpb" + "github.com/milvus-io/milvus/internal/mocks/util/streamingutil/service/mock_lazygrpc" + "github.com/milvus-io/milvus/internal/proto/streamingpb" + "github.com/milvus-io/milvus/internal/util/streamingutil/status" + "github.com/milvus-io/milvus/pkg/streaming/util/types" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +func TestAssignmentService(t *testing.T) { + s := mock_lazygrpc.NewMockService[streamingpb.StreamingCoordAssignmentServiceClient](t) + c := mock_streamingpb.NewMockStreamingCoordAssignmentServiceClient(t) + s.EXPECT().GetService(mock.Anything).Return(c, nil) + cc := mock_streamingpb.NewMockStreamingCoordAssignmentService_AssignmentDiscoverClient(t) + c.EXPECT().AssignmentDiscover(mock.Anything).Return(cc, nil) + k := 0 + closeCh := make(chan struct{}) + cc.EXPECT().Send(mock.Anything).Return(nil) + cc.EXPECT().CloseSend().Return(nil) + cc.EXPECT().Recv().RunAndReturn(func() (*streamingpb.AssignmentDiscoverResponse, error) { + resps := []*streamingpb.AssignmentDiscoverResponse{ + { + Response: &streamingpb.AssignmentDiscoverResponse_FullAssignment{ + FullAssignment: &streamingpb.FullStreamingNodeAssignmentWithVersion{ + Version: &streamingpb.VersionPair{Global: 1, Local: 2}, + Assignments: []*streamingpb.StreamingNodeAssignment{ + { + Node: &streamingpb.StreamingNodeInfo{ServerId: 1}, + Channels: []*streamingpb.PChannelInfo{{Name: "c1", Term: 1}, {Name: "c2", Term: 2}}, + }, + }, + }, + }, + }, + { + Response: &streamingpb.AssignmentDiscoverResponse_FullAssignment{ + FullAssignment: &streamingpb.FullStreamingNodeAssignmentWithVersion{ + Version: &streamingpb.VersionPair{Global: 2, Local: 3}, + Assignments: []*streamingpb.StreamingNodeAssignment{ + { + Node: &streamingpb.StreamingNodeInfo{ServerId: 1}, + Channels: []*streamingpb.PChannelInfo{{Name: "c1", Term: 1}, {Name: "c2", Term: 2}}, + }, + { + Node: &streamingpb.StreamingNodeInfo{ServerId: 2}, + Channels: []*streamingpb.PChannelInfo{{Name: "c3", Term: 1}, {Name: "c4", Term: 2}}, + }, + }, + }, + }, + }, + nil, + } + errs := []error{ + nil, + nil, + io.ErrUnexpectedEOF, + } + if k > len(resps) { + return nil, io.EOF + } else if k == len(resps) { + <-closeCh + k++ + return &streamingpb.AssignmentDiscoverResponse{ + Response: &streamingpb.AssignmentDiscoverResponse_Close{}, + }, nil + } + time.Sleep(25 * time.Millisecond) + k++ + return resps[k-1], errs[k-1] + }) + + assignmentService := NewAssignmentService(s) + + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + var finalAssignments *types.VersionedStreamingNodeAssignments + err := assignmentService.AssignmentDiscover(ctx, func(vsna *types.VersionedStreamingNodeAssignments) error { + finalAssignments = vsna + return nil + }) + assert.ErrorIs(t, err, context.DeadlineExceeded) + assert.True(t, finalAssignments.Version.EQ(typeutil.VersionInt64Pair{Global: 2, Local: 3})) + + assignmentService.ReportAssignmentError(ctx, types.PChannelInfo{Name: "c1", Term: 1}, errors.New("test")) + + // test close + go close(closeCh) + time.Sleep(10 * time.Millisecond) + assignmentService.Close() + + // running assignment service should be closed too. + err = assignmentService.AssignmentDiscover(ctx, func(vsna *types.VersionedStreamingNodeAssignments) error { + return nil + }) + se := status.AsStreamingError(err) + assert.Equal(t, streamingpb.StreamingCode_STREAMING_CODE_ON_SHUTDOWN, se.Code) + + err = assignmentService.ReportAssignmentError(ctx, types.PChannelInfo{Name: "c1", Term: 1}, errors.New("test")) + se = status.AsStreamingError(err) + assert.Equal(t, streamingpb.StreamingCode_STREAMING_CODE_ON_SHUTDOWN, se.Code) +} diff --git a/internal/streamingcoord/client/assignment/discoverer.go b/internal/streamingcoord/client/assignment/discoverer.go new file mode 100644 index 0000000000..eae0b66770 --- /dev/null +++ b/internal/streamingcoord/client/assignment/discoverer.go @@ -0,0 +1,154 @@ +package assignment + +import ( + "io" + "sync" + + "github.com/milvus-io/milvus/internal/proto/streamingpb" + "github.com/milvus-io/milvus/internal/util/streamingutil/status" + "github.com/milvus-io/milvus/internal/util/streamingutil/typeconverter" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/streaming/util/types" + "github.com/milvus-io/milvus/pkg/util/lifetime" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +// newAssignmentDiscoverClient creates a new assignment discover client. +func newAssignmentDiscoverClient(w *watcher, streamClient streamingpb.StreamingCoordAssignmentService_AssignmentDiscoverClient) *assignmentDiscoverClient { + c := &assignmentDiscoverClient{ + lifetime: lifetime.NewLifetime(lifetime.Working), + w: w, + streamClient: streamClient, + logger: log.With(), + requestCh: make(chan *streamingpb.AssignmentDiscoverRequest, 16), + exitCh: make(chan struct{}), + wg: sync.WaitGroup{}, + } + c.executeBackgroundTask() + return c +} + +// assignmentDiscoverClient is the client for assignment discover. +type assignmentDiscoverClient struct { + lifetime lifetime.Lifetime[lifetime.State] + w *watcher + logger *log.MLogger + requestCh chan *streamingpb.AssignmentDiscoverRequest + exitCh chan struct{} + wg sync.WaitGroup + streamClient streamingpb.StreamingCoordAssignmentService_AssignmentDiscoverClient +} + +// ReportAssignmentError reports the assignment error to server. +func (c *assignmentDiscoverClient) ReportAssignmentError(pchannel types.PChannelInfo, err error) { + if err := c.lifetime.Add(lifetime.IsWorking); err != nil { + return + } + defer c.lifetime.Done() + + statusErr := status.AsStreamingError(err).AsPBError() + select { + case c.requestCh <- &streamingpb.AssignmentDiscoverRequest{ + Command: &streamingpb.AssignmentDiscoverRequest_ReportError{ + ReportError: &streamingpb.ReportAssignmentErrorRequest{ + Pchannel: typeconverter.NewProtoFromPChannelInfo(pchannel), + Err: statusErr, + }, + }, + }: + case <-c.exitCh: + } +} + +func (c *assignmentDiscoverClient) IsAvailable() bool { + select { + case <-c.Available(): + return false + default: + return true + } +} + +// Available returns a channel that will be closed when the assignment discover client is available. +func (c *assignmentDiscoverClient) Available() <-chan struct{} { + return c.exitCh +} + +// Close closes the assignment discover client. +func (c *assignmentDiscoverClient) Close() { + c.lifetime.SetState(lifetime.Stopped) + c.lifetime.Wait() + c.lifetime.Close() + + close(c.requestCh) + c.wg.Wait() +} + +func (c *assignmentDiscoverClient) executeBackgroundTask() { + c.wg.Add(2) + go c.recvLoop() + go c.sendLoop() +} + +// sendLoop sends the request to server. +func (c *assignmentDiscoverClient) sendLoop() (err error) { + defer c.wg.Done() + for { + req, ok := <-c.requestCh + if !ok { + // send close message and close send operation. + if err := c.streamClient.Send(&streamingpb.AssignmentDiscoverRequest{ + Command: &streamingpb.AssignmentDiscoverRequest_Close{}, + }); err != nil { + return err + } + return c.streamClient.CloseSend() + } + if err := c.streamClient.Send(req); err != nil { + return err + } + } +} + +// recvLoop receives the message from server. +// 1. FullAssignment +// 2. Close +func (c *assignmentDiscoverClient) recvLoop() (err error) { + defer func() { + c.wg.Done() + close(c.exitCh) + }() + for { + resp, err := c.streamClient.Recv() + if err == io.EOF { + return nil + } + if err != nil { + return err + } + switch resp := resp.Response.(type) { + case *streamingpb.AssignmentDiscoverResponse_FullAssignment: + newIncomingVersion := typeutil.VersionInt64Pair{ + Global: resp.FullAssignment.Version.Global, + Local: resp.FullAssignment.Version.Local, + } + newIncomingAssignments := make(map[int64]types.StreamingNodeAssignment, len(resp.FullAssignment.Assignments)) + for _, assignment := range resp.FullAssignment.Assignments { + channels := make(map[string]types.PChannelInfo, len(assignment.Channels)) + for _, channel := range assignment.Channels { + channels[channel.Name] = typeconverter.NewPChannelInfoFromProto(channel) + } + newIncomingAssignments[assignment.GetNode().GetServerId()] = types.StreamingNodeAssignment{ + NodeInfo: typeconverter.NewStreamingNodeInfoFromProto(assignment.Node), + Channels: channels, + } + } + c.w.Update(types.VersionedStreamingNodeAssignments{ + Version: newIncomingVersion, + Assignments: newIncomingAssignments, + }) + case *streamingpb.AssignmentDiscoverResponse_Close: + // nothing to do now, just wait io.EOF. + } + } +} diff --git a/internal/streamingcoord/client/assignment/watcher.go b/internal/streamingcoord/client/assignment/watcher.go new file mode 100644 index 0000000000..6fefe97a82 --- /dev/null +++ b/internal/streamingcoord/client/assignment/watcher.go @@ -0,0 +1,54 @@ +package assignment + +import ( + "context" + "sync" + + "github.com/cockroachdb/errors" + + "github.com/milvus-io/milvus/pkg/streaming/util/types" + "github.com/milvus-io/milvus/pkg/util/syncutil" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +var ErrWatcherClosed = errors.New("watcher is closed") + +// newWatcher creates a new watcher. +func newWatcher() *watcher { + return &watcher{ + cond: syncutil.NewContextCond(&sync.Mutex{}), + lastVersionedAssignment: types.VersionedStreamingNodeAssignments{ + Version: typeutil.VersionInt64Pair{Global: -1, Local: -1}, + Assignments: make(map[int64]types.StreamingNodeAssignment), + }, + } +} + +// watcher is the watcher for assignment discovery. +type watcher struct { + cond *syncutil.ContextCond + lastVersionedAssignment types.VersionedStreamingNodeAssignments +} + +// AssignmentDiscover watches the assignment discovery. +func (w *watcher) AssignmentDiscover(ctx context.Context, cb func(*types.VersionedStreamingNodeAssignments) error) error { + w.cond.L.Lock() + for { + if err := cb(&w.lastVersionedAssignment); err != nil { + w.cond.L.Unlock() + return err + } + if err := w.cond.Wait(ctx); err != nil { + return err + } + } +} + +// Update updates the assignment. +func (w *watcher) Update(assignments types.VersionedStreamingNodeAssignments) { + w.cond.LockAndBroadcast() + if assignments.Version.GT(w.lastVersionedAssignment.Version) { + w.lastVersionedAssignment = assignments + } + w.cond.L.Unlock() +} diff --git a/internal/streamingcoord/client/client.go b/internal/streamingcoord/client/client.go new file mode 100644 index 0000000000..18fe4302db --- /dev/null +++ b/internal/streamingcoord/client/client.go @@ -0,0 +1,110 @@ +package client + +import ( + "context" + "encoding/json" + "time" + + clientv3 "go.etcd.io/etcd/client/v3" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + "github.com/milvus-io/milvus/internal/proto/streamingpb" + "github.com/milvus-io/milvus/internal/streamingcoord/client/assignment" + "github.com/milvus-io/milvus/internal/util/sessionutil" + "github.com/milvus-io/milvus/internal/util/streamingutil/service/balancer/picker" + streamingserviceinterceptor "github.com/milvus-io/milvus/internal/util/streamingutil/service/interceptor" + "github.com/milvus-io/milvus/internal/util/streamingutil/service/lazygrpc" + "github.com/milvus-io/milvus/internal/util/streamingutil/service/resolver" + "github.com/milvus-io/milvus/pkg/streaming/util/types" + "github.com/milvus-io/milvus/pkg/tracer" + "github.com/milvus-io/milvus/pkg/util/interceptor" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +var _ Client = (*clientImpl)(nil) + +// AssignmentService is the interface of assignment service. +type AssignmentService interface { + // AssignmentDiscover is used to watches the assignment discovery. + types.AssignmentDiscoverWatcher +} + +// Client is the interface of log service client. +type Client interface { + // Assignment access assignment service. + Assignment() AssignmentService + + // Close close the client. + Close() +} + +// NewClient creates a new client. +func NewClient(etcdCli *clientv3.Client) Client { + // StreamingCoord is deployed on DataCoord node. + role := sessionutil.GetSessionPrefixByRole(typeutil.DataCoordRole) + rb := resolver.NewSessionBuilder(etcdCli, role) + dialTimeout := paramtable.Get().StreamingCoordGrpcClientCfg.DialTimeout.GetAsDuration(time.Millisecond) + dialOptions := getDialOptions(rb) + conn := lazygrpc.NewConn(func(ctx context.Context) (*grpc.ClientConn, error) { + ctx, cancel := context.WithTimeout(ctx, dialTimeout) + defer cancel() + return grpc.DialContext( + ctx, + resolver.SessionResolverScheme+":///"+typeutil.DataCoordRole, + dialOptions..., + ) + }) + assignmentService := lazygrpc.WithServiceCreator(conn, streamingpb.NewStreamingCoordAssignmentServiceClient) + return &clientImpl{ + conn: conn, + rb: rb, + assignmentService: assignment.NewAssignmentService(assignmentService), + } +} + +// getDialOptions returns grpc dial options. +func getDialOptions(rb resolver.Builder) []grpc.DialOption { + cfg := ¶mtable.Get().StreamingCoordGrpcClientCfg + retryPolicy := cfg.GetDefaultRetryPolicy() + retryPolicy["retryableStatusCodes"] = []string{"UNAVAILABLE"} + defaultServiceConfig := map[string]interface{}{ + "loadBalancingConfig": []map[string]interface{}{ + {picker.ServerIDPickerBalancerName: map[string]interface{}{}}, + }, + "methodConfig": []map[string]interface{}{ + { + "name": []map[string]string{ + {"service": "milvus.proto.streaming.StreamingCoordAssignmentService"}, + }, + "waitForReady": true, + "retryPolicy": retryPolicy, + }, + }, + } + defaultServiceConfigJSON, err := json.Marshal(defaultServiceConfig) + if err != nil { + panic(err) + } + dialOptions := cfg.GetDialOptionsFromConfig() + dialOptions = append(dialOptions, + grpc.WithBlock(), + grpc.WithResolvers(rb), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithChainUnaryInterceptor( + otelgrpc.UnaryClientInterceptor(tracer.GetInterceptorOpts()...), + interceptor.ClusterInjectionUnaryClientInterceptor(), + streamingserviceinterceptor.NewStreamingServiceUnaryClientInterceptor(), + ), + grpc.WithChainStreamInterceptor( + otelgrpc.StreamClientInterceptor(tracer.GetInterceptorOpts()...), + interceptor.ClusterInjectionStreamClientInterceptor(), + streamingserviceinterceptor.NewStreamingServiceStreamClientInterceptor(), + ), + grpc.WithReturnConnectionError(), + grpc.WithDefaultServiceConfig(string(defaultServiceConfigJSON)), + ) + return dialOptions +} diff --git a/internal/streamingcoord/client/client_impl.go b/internal/streamingcoord/client/client_impl.go new file mode 100644 index 0000000000..ffb0b0355a --- /dev/null +++ b/internal/streamingcoord/client/client_impl.go @@ -0,0 +1,26 @@ +package client + +import ( + "github.com/milvus-io/milvus/internal/streamingcoord/client/assignment" + "github.com/milvus-io/milvus/internal/util/streamingutil/service/lazygrpc" + "github.com/milvus-io/milvus/internal/util/streamingutil/service/resolver" +) + +// clientImpl is the implementation of Client. +type clientImpl struct { + conn lazygrpc.Conn + rb resolver.Builder + assignmentService *assignment.AssignmentServiceImpl +} + +// Assignment access assignment service. +func (c *clientImpl) Assignment() AssignmentService { + return c.assignmentService +} + +// Close close the client. +func (c *clientImpl) Close() { + c.assignmentService.Close() + c.conn.Close() + c.rb.Close() +} diff --git a/internal/streamingcoord/client/client_test.go b/internal/streamingcoord/client/client_test.go new file mode 100644 index 0000000000..a3210b63c9 --- /dev/null +++ b/internal/streamingcoord/client/client_test.go @@ -0,0 +1,25 @@ +package client + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/pkg/util/etcd" + "github.com/milvus-io/milvus/pkg/util/paramtable" +) + +func TestDial(t *testing.T) { + paramtable.Init() + + err := etcd.InitEtcdServer(true, "", t.TempDir(), "stdout", "info") + assert.NoError(t, err) + defer etcd.StopEtcdServer() + c, err := etcd.GetEmbedEtcdClient() + assert.NoError(t, err) + assert.NotNil(t, c) + + client := NewClient(c) + assert.NotNil(t, client) + client.Close() +}