diff --git a/internal/distributed/querycoord/client/client.go b/internal/distributed/querycoord/client/client.go index 80c789d63f..96a09e1f9d 100644 --- a/internal/distributed/querycoord/client/client.go +++ b/internal/distributed/querycoord/client/client.go @@ -368,3 +368,36 @@ func (c *Client) ListResourceGroups(ctx context.Context, req *milvuspb.ListResou return client.ListResourceGroups(ctx, req) }) } + +func (c *Client) ListCheckers(ctx context.Context, req *querypb.ListCheckersRequest, opts ...grpc.CallOption) (*querypb.ListCheckersResponse, error) { + req = typeutil.Clone(req) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())), + ) + return wrapGrpcCall(ctx, c, func(client querypb.QueryCoordClient) (*querypb.ListCheckersResponse, error) { + return client.ListCheckers(ctx, req) + }) +} + +func (c *Client) ActivateChecker(ctx context.Context, req *querypb.ActivateCheckerRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { + req = typeutil.Clone(req) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())), + ) + return wrapGrpcCall(ctx, c, func(client querypb.QueryCoordClient) (*commonpb.Status, error) { + return client.ActivateChecker(ctx, req) + }) +} + +func (c *Client) DeactivateChecker(ctx context.Context, req *querypb.DeactivateCheckerRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { + req = typeutil.Clone(req) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())), + ) + return wrapGrpcCall(ctx, c, func(client querypb.QueryCoordClient) (*commonpb.Status, error) { + return client.DeactivateChecker(ctx, req) + }) +} diff --git a/internal/distributed/querycoord/service.go b/internal/distributed/querycoord/service.go index 30271af103..05f192004b 100644 --- a/internal/distributed/querycoord/service.go +++ b/internal/distributed/querycoord/service.go @@ -417,3 +417,15 @@ func (s *Server) ListResourceGroups(ctx context.Context, req *milvuspb.ListResou func (s *Server) DescribeResourceGroup(ctx context.Context, req *querypb.DescribeResourceGroupRequest) (*querypb.DescribeResourceGroupResponse, error) { return s.queryCoord.DescribeResourceGroup(ctx, req) } + +func (s *Server) ActivateChecker(ctx context.Context, req *querypb.ActivateCheckerRequest) (*commonpb.Status, error) { + return s.queryCoord.ActivateChecker(ctx, req) +} + +func (s *Server) DeactivateChecker(ctx context.Context, req *querypb.DeactivateCheckerRequest) (*commonpb.Status, error) { + return s.queryCoord.DeactivateChecker(ctx, req) +} + +func (s *Server) ListCheckers(ctx context.Context, req *querypb.ListCheckersRequest) (*querypb.ListCheckersResponse, error) { + return s.queryCoord.ListCheckers(ctx, req) +} diff --git a/internal/mocks/mock_querycoord.go b/internal/mocks/mock_querycoord.go index 8b632e1b87..2fd90e2095 100644 --- a/internal/mocks/mock_querycoord.go +++ b/internal/mocks/mock_querycoord.go @@ -34,6 +34,61 @@ func (_m *MockQueryCoord) EXPECT() *MockQueryCoord_Expecter { return &MockQueryCoord_Expecter{mock: &_m.Mock} } +// ActivateChecker provides a mock function with given fields: _a0, _a1 +func (_m *MockQueryCoord) ActivateChecker(_a0 context.Context, _a1 *querypb.ActivateCheckerRequest) (*commonpb.Status, error) { + ret := _m.Called(_a0, _a1) + + var r0 *commonpb.Status + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *querypb.ActivateCheckerRequest) (*commonpb.Status, error)); ok { + return rf(_a0, _a1) + } + if rf, ok := ret.Get(0).(func(context.Context, *querypb.ActivateCheckerRequest) *commonpb.Status); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*commonpb.Status) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *querypb.ActivateCheckerRequest) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockQueryCoord_ActivateChecker_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ActivateChecker' +type MockQueryCoord_ActivateChecker_Call struct { + *mock.Call +} + +// ActivateChecker is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 *querypb.ActivateCheckerRequest +func (_e *MockQueryCoord_Expecter) ActivateChecker(_a0 interface{}, _a1 interface{}) *MockQueryCoord_ActivateChecker_Call { + return &MockQueryCoord_ActivateChecker_Call{Call: _e.mock.On("ActivateChecker", _a0, _a1)} +} + +func (_c *MockQueryCoord_ActivateChecker_Call) Run(run func(_a0 context.Context, _a1 *querypb.ActivateCheckerRequest)) *MockQueryCoord_ActivateChecker_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*querypb.ActivateCheckerRequest)) + }) + return _c +} + +func (_c *MockQueryCoord_ActivateChecker_Call) Return(_a0 *commonpb.Status, _a1 error) *MockQueryCoord_ActivateChecker_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockQueryCoord_ActivateChecker_Call) RunAndReturn(run func(context.Context, *querypb.ActivateCheckerRequest) (*commonpb.Status, error)) *MockQueryCoord_ActivateChecker_Call { + _c.Call.Return(run) + return _c +} + // CheckHealth provides a mock function with given fields: _a0, _a1 func (_m *MockQueryCoord) CheckHealth(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { ret := _m.Called(_a0, _a1) @@ -144,6 +199,61 @@ func (_c *MockQueryCoord_CreateResourceGroup_Call) RunAndReturn(run func(context return _c } +// DeactivateChecker provides a mock function with given fields: _a0, _a1 +func (_m *MockQueryCoord) DeactivateChecker(_a0 context.Context, _a1 *querypb.DeactivateCheckerRequest) (*commonpb.Status, error) { + ret := _m.Called(_a0, _a1) + + var r0 *commonpb.Status + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *querypb.DeactivateCheckerRequest) (*commonpb.Status, error)); ok { + return rf(_a0, _a1) + } + if rf, ok := ret.Get(0).(func(context.Context, *querypb.DeactivateCheckerRequest) *commonpb.Status); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*commonpb.Status) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *querypb.DeactivateCheckerRequest) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockQueryCoord_DeactivateChecker_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeactivateChecker' +type MockQueryCoord_DeactivateChecker_Call struct { + *mock.Call +} + +// DeactivateChecker is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 *querypb.DeactivateCheckerRequest +func (_e *MockQueryCoord_Expecter) DeactivateChecker(_a0 interface{}, _a1 interface{}) *MockQueryCoord_DeactivateChecker_Call { + return &MockQueryCoord_DeactivateChecker_Call{Call: _e.mock.On("DeactivateChecker", _a0, _a1)} +} + +func (_c *MockQueryCoord_DeactivateChecker_Call) Run(run func(_a0 context.Context, _a1 *querypb.DeactivateCheckerRequest)) *MockQueryCoord_DeactivateChecker_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*querypb.DeactivateCheckerRequest)) + }) + return _c +} + +func (_c *MockQueryCoord_DeactivateChecker_Call) Return(_a0 *commonpb.Status, _a1 error) *MockQueryCoord_DeactivateChecker_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockQueryCoord_DeactivateChecker_Call) RunAndReturn(run func(context.Context, *querypb.DeactivateCheckerRequest) (*commonpb.Status, error)) *MockQueryCoord_DeactivateChecker_Call { + _c.Call.Return(run) + return _c +} + // DescribeResourceGroup provides a mock function with given fields: _a0, _a1 func (_m *MockQueryCoord) DescribeResourceGroup(_a0 context.Context, _a1 *querypb.DescribeResourceGroupRequest) (*querypb.DescribeResourceGroupResponse, error) { ret := _m.Called(_a0, _a1) @@ -735,6 +845,61 @@ func (_c *MockQueryCoord_Init_Call) RunAndReturn(run func() error) *MockQueryCoo return _c } +// ListCheckers provides a mock function with given fields: _a0, _a1 +func (_m *MockQueryCoord) ListCheckers(_a0 context.Context, _a1 *querypb.ListCheckersRequest) (*querypb.ListCheckersResponse, error) { + ret := _m.Called(_a0, _a1) + + var r0 *querypb.ListCheckersResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *querypb.ListCheckersRequest) (*querypb.ListCheckersResponse, error)); ok { + return rf(_a0, _a1) + } + if rf, ok := ret.Get(0).(func(context.Context, *querypb.ListCheckersRequest) *querypb.ListCheckersResponse); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*querypb.ListCheckersResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *querypb.ListCheckersRequest) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockQueryCoord_ListCheckers_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListCheckers' +type MockQueryCoord_ListCheckers_Call struct { + *mock.Call +} + +// ListCheckers is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 *querypb.ListCheckersRequest +func (_e *MockQueryCoord_Expecter) ListCheckers(_a0 interface{}, _a1 interface{}) *MockQueryCoord_ListCheckers_Call { + return &MockQueryCoord_ListCheckers_Call{Call: _e.mock.On("ListCheckers", _a0, _a1)} +} + +func (_c *MockQueryCoord_ListCheckers_Call) Run(run func(_a0 context.Context, _a1 *querypb.ListCheckersRequest)) *MockQueryCoord_ListCheckers_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*querypb.ListCheckersRequest)) + }) + return _c +} + +func (_c *MockQueryCoord_ListCheckers_Call) Return(_a0 *querypb.ListCheckersResponse, _a1 error) *MockQueryCoord_ListCheckers_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockQueryCoord_ListCheckers_Call) RunAndReturn(run func(context.Context, *querypb.ListCheckersRequest) (*querypb.ListCheckersResponse, error)) *MockQueryCoord_ListCheckers_Call { + _c.Call.Return(run) + return _c +} + // ListResourceGroups provides a mock function with given fields: _a0, _a1 func (_m *MockQueryCoord) ListResourceGroups(_a0 context.Context, _a1 *milvuspb.ListResourceGroupsRequest) (*milvuspb.ListResourceGroupsResponse, error) { ret := _m.Called(_a0, _a1) diff --git a/internal/mocks/mock_querycoord_client.go b/internal/mocks/mock_querycoord_client.go index 95bfe6ad9e..947bff1387 100644 --- a/internal/mocks/mock_querycoord_client.go +++ b/internal/mocks/mock_querycoord_client.go @@ -31,6 +31,76 @@ func (_m *MockQueryCoordClient) EXPECT() *MockQueryCoordClient_Expecter { return &MockQueryCoordClient_Expecter{mock: &_m.Mock} } +// ActivateChecker provides a mock function with given fields: ctx, in, opts +func (_m *MockQueryCoordClient) ActivateChecker(ctx context.Context, in *querypb.ActivateCheckerRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, in) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 *commonpb.Status + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *querypb.ActivateCheckerRequest, ...grpc.CallOption) (*commonpb.Status, error)); ok { + return rf(ctx, in, opts...) + } + if rf, ok := ret.Get(0).(func(context.Context, *querypb.ActivateCheckerRequest, ...grpc.CallOption) *commonpb.Status); ok { + r0 = rf(ctx, in, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*commonpb.Status) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *querypb.ActivateCheckerRequest, ...grpc.CallOption) error); ok { + r1 = rf(ctx, in, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockQueryCoordClient_ActivateChecker_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ActivateChecker' +type MockQueryCoordClient_ActivateChecker_Call struct { + *mock.Call +} + +// ActivateChecker is a helper method to define mock.On call +// - ctx context.Context +// - in *querypb.ActivateCheckerRequest +// - opts ...grpc.CallOption +func (_e *MockQueryCoordClient_Expecter) ActivateChecker(ctx interface{}, in interface{}, opts ...interface{}) *MockQueryCoordClient_ActivateChecker_Call { + return &MockQueryCoordClient_ActivateChecker_Call{Call: _e.mock.On("ActivateChecker", + append([]interface{}{ctx, in}, opts...)...)} +} + +func (_c *MockQueryCoordClient_ActivateChecker_Call) Run(run func(ctx context.Context, in *querypb.ActivateCheckerRequest, opts ...grpc.CallOption)) *MockQueryCoordClient_ActivateChecker_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]grpc.CallOption, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(grpc.CallOption) + } + } + run(args[0].(context.Context), args[1].(*querypb.ActivateCheckerRequest), variadicArgs...) + }) + return _c +} + +func (_c *MockQueryCoordClient_ActivateChecker_Call) Return(_a0 *commonpb.Status, _a1 error) *MockQueryCoordClient_ActivateChecker_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockQueryCoordClient_ActivateChecker_Call) RunAndReturn(run func(context.Context, *querypb.ActivateCheckerRequest, ...grpc.CallOption) (*commonpb.Status, error)) *MockQueryCoordClient_ActivateChecker_Call { + _c.Call.Return(run) + return _c +} + // CheckHealth provides a mock function with given fields: ctx, in, opts func (_m *MockQueryCoordClient) CheckHealth(ctx context.Context, in *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) { _va := make([]interface{}, len(opts)) @@ -212,6 +282,76 @@ func (_c *MockQueryCoordClient_CreateResourceGroup_Call) RunAndReturn(run func(c return _c } +// DeactivateChecker provides a mock function with given fields: ctx, in, opts +func (_m *MockQueryCoordClient) DeactivateChecker(ctx context.Context, in *querypb.DeactivateCheckerRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, in) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 *commonpb.Status + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *querypb.DeactivateCheckerRequest, ...grpc.CallOption) (*commonpb.Status, error)); ok { + return rf(ctx, in, opts...) + } + if rf, ok := ret.Get(0).(func(context.Context, *querypb.DeactivateCheckerRequest, ...grpc.CallOption) *commonpb.Status); ok { + r0 = rf(ctx, in, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*commonpb.Status) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *querypb.DeactivateCheckerRequest, ...grpc.CallOption) error); ok { + r1 = rf(ctx, in, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockQueryCoordClient_DeactivateChecker_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeactivateChecker' +type MockQueryCoordClient_DeactivateChecker_Call struct { + *mock.Call +} + +// DeactivateChecker is a helper method to define mock.On call +// - ctx context.Context +// - in *querypb.DeactivateCheckerRequest +// - opts ...grpc.CallOption +func (_e *MockQueryCoordClient_Expecter) DeactivateChecker(ctx interface{}, in interface{}, opts ...interface{}) *MockQueryCoordClient_DeactivateChecker_Call { + return &MockQueryCoordClient_DeactivateChecker_Call{Call: _e.mock.On("DeactivateChecker", + append([]interface{}{ctx, in}, opts...)...)} +} + +func (_c *MockQueryCoordClient_DeactivateChecker_Call) Run(run func(ctx context.Context, in *querypb.DeactivateCheckerRequest, opts ...grpc.CallOption)) *MockQueryCoordClient_DeactivateChecker_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]grpc.CallOption, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(grpc.CallOption) + } + } + run(args[0].(context.Context), args[1].(*querypb.DeactivateCheckerRequest), variadicArgs...) + }) + return _c +} + +func (_c *MockQueryCoordClient_DeactivateChecker_Call) Return(_a0 *commonpb.Status, _a1 error) *MockQueryCoordClient_DeactivateChecker_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockQueryCoordClient_DeactivateChecker_Call) RunAndReturn(run func(context.Context, *querypb.DeactivateCheckerRequest, ...grpc.CallOption) (*commonpb.Status, error)) *MockQueryCoordClient_DeactivateChecker_Call { + _c.Call.Return(run) + return _c +} + // DescribeResourceGroup provides a mock function with given fields: ctx, in, opts func (_m *MockQueryCoordClient) DescribeResourceGroup(ctx context.Context, in *querypb.DescribeResourceGroupRequest, opts ...grpc.CallOption) (*querypb.DescribeResourceGroupResponse, error) { _va := make([]interface{}, len(opts)) @@ -912,6 +1052,76 @@ func (_c *MockQueryCoordClient_GetTimeTickChannel_Call) RunAndReturn(run func(co return _c } +// ListCheckers provides a mock function with given fields: ctx, in, opts +func (_m *MockQueryCoordClient) ListCheckers(ctx context.Context, in *querypb.ListCheckersRequest, opts ...grpc.CallOption) (*querypb.ListCheckersResponse, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, in) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 *querypb.ListCheckersResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *querypb.ListCheckersRequest, ...grpc.CallOption) (*querypb.ListCheckersResponse, error)); ok { + return rf(ctx, in, opts...) + } + if rf, ok := ret.Get(0).(func(context.Context, *querypb.ListCheckersRequest, ...grpc.CallOption) *querypb.ListCheckersResponse); ok { + r0 = rf(ctx, in, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*querypb.ListCheckersResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *querypb.ListCheckersRequest, ...grpc.CallOption) error); ok { + r1 = rf(ctx, in, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockQueryCoordClient_ListCheckers_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListCheckers' +type MockQueryCoordClient_ListCheckers_Call struct { + *mock.Call +} + +// ListCheckers is a helper method to define mock.On call +// - ctx context.Context +// - in *querypb.ListCheckersRequest +// - opts ...grpc.CallOption +func (_e *MockQueryCoordClient_Expecter) ListCheckers(ctx interface{}, in interface{}, opts ...interface{}) *MockQueryCoordClient_ListCheckers_Call { + return &MockQueryCoordClient_ListCheckers_Call{Call: _e.mock.On("ListCheckers", + append([]interface{}{ctx, in}, opts...)...)} +} + +func (_c *MockQueryCoordClient_ListCheckers_Call) Run(run func(ctx context.Context, in *querypb.ListCheckersRequest, opts ...grpc.CallOption)) *MockQueryCoordClient_ListCheckers_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]grpc.CallOption, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(grpc.CallOption) + } + } + run(args[0].(context.Context), args[1].(*querypb.ListCheckersRequest), variadicArgs...) + }) + return _c +} + +func (_c *MockQueryCoordClient_ListCheckers_Call) Return(_a0 *querypb.ListCheckersResponse, _a1 error) *MockQueryCoordClient_ListCheckers_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockQueryCoordClient_ListCheckers_Call) RunAndReturn(run func(context.Context, *querypb.ListCheckersRequest, ...grpc.CallOption) (*querypb.ListCheckersResponse, error)) *MockQueryCoordClient_ListCheckers_Call { + _c.Call.Return(run) + return _c +} + // ListResourceGroups provides a mock function with given fields: ctx, in, opts func (_m *MockQueryCoordClient) ListResourceGroups(ctx context.Context, in *milvuspb.ListResourceGroupsRequest, opts ...grpc.CallOption) (*milvuspb.ListResourceGroupsResponse, error) { _va := make([]interface{}, len(opts)) diff --git a/internal/proto/query_coord.proto b/internal/proto/query_coord.proto index 7e66be594f..330d2ac25e 100644 --- a/internal/proto/query_coord.proto +++ b/internal/proto/query_coord.proto @@ -46,6 +46,12 @@ service QueryCoord { rpc TransferReplica(TransferReplicaRequest) returns (common.Status) {} rpc ListResourceGroups(milvus.ListResourceGroupsRequest) returns (milvus.ListResourceGroupsResponse) {} rpc DescribeResourceGroup(DescribeResourceGroupRequest) returns (DescribeResourceGroupResponse) {} + + + // ops interfaces + rpc ListCheckers(ListCheckersRequest) returns (ListCheckersResponse) {} + rpc ActivateChecker(ActivateCheckerRequest) returns (common.Status) {} + rpc DeactivateChecker(DeactivateCheckerRequest) returns (common.Status) {} } service QueryNode { @@ -635,3 +641,31 @@ message DeleteRequest { schema.IDs primary_keys = 6; repeated uint64 timestamps = 7; } + +message ActivateCheckerRequest { + common.MsgBase base = 1; + int32 checkerID = 2; +} + +message DeactivateCheckerRequest { + common.MsgBase base = 1; + int32 checkerID = 2; +} + +message ListCheckersRequest { + common.MsgBase base = 1; + repeated int32 checkerIDs = 2; +} + +message ListCheckersResponse { + common.Status status = 1; + repeated CheckerInfo checkerInfos = 2; +} + +message CheckerInfo { + int32 id = 1; + string desc = 2; + bool activated = 3; + bool found = 4; +} + diff --git a/internal/querycoordv2/checkers/balance_checker.go b/internal/querycoordv2/checkers/balance_checker.go index d9fcdeee6d..092bc4dc07 100644 --- a/internal/querycoordv2/checkers/balance_checker.go +++ b/internal/querycoordv2/checkers/balance_checker.go @@ -55,7 +55,7 @@ func NewBalanceChecker(meta *meta.Meta, balancer balance.Balance, nodeMgr *sessi } } -func (b *BalanceChecker) ID() checkerType { +func (b *BalanceChecker) ID() CheckerType { return balanceChecker } diff --git a/internal/querycoordv2/checkers/channel_checker.go b/internal/querycoordv2/checkers/channel_checker.go index a96bdb95bb..9584d718bb 100644 --- a/internal/querycoordv2/checkers/channel_checker.go +++ b/internal/querycoordv2/checkers/channel_checker.go @@ -56,7 +56,7 @@ func NewChannelChecker( } } -func (c *ChannelChecker) ID() checkerType { +func (c *ChannelChecker) ID() CheckerType { return channelChecker } diff --git a/internal/querycoordv2/checkers/checker.go b/internal/querycoordv2/checkers/checker.go index 2f31859588..33a463b90c 100644 --- a/internal/querycoordv2/checkers/checker.go +++ b/internal/querycoordv2/checkers/checker.go @@ -24,7 +24,7 @@ import ( ) type Checker interface { - ID() checkerType + ID() CheckerType Description() string Check(ctx context.Context) []task.Task IsActive() bool diff --git a/internal/querycoordv2/checkers/controller.go b/internal/querycoordv2/checkers/controller.go index a9f485577b..1a4b541730 100644 --- a/internal/querycoordv2/checkers/controller.go +++ b/internal/querycoordv2/checkers/controller.go @@ -39,10 +39,10 @@ const ( indexCheckerName = "index_checker" ) -type checkerType int32 +type CheckerType int32 const ( - channelChecker checkerType = iota + 1 + channelChecker CheckerType = iota + 1 segmentChecker balanceChecker indexChecker @@ -51,7 +51,7 @@ const ( var ( checkRoundTaskNumLimit = 256 checkerOrder = []string{channelCheckerName, segmentCheckerName, balanceCheckerName, indexCheckerName} - checkerNames = map[checkerType]string{ + checkerNames = map[CheckerType]string{ segmentChecker: segmentCheckerName, channelChecker: channelCheckerName, balanceChecker: balanceCheckerName, @@ -60,13 +60,13 @@ var ( errTypeNotFound = errors.New("checker type not found") ) -func (s checkerType) String() string { +func (s CheckerType) String() string { return checkerNames[s] } type CheckerController struct { cancel context.CancelFunc - manualCheckChs map[checkerType]chan struct{} + manualCheckChs map[CheckerType]chan struct{} meta *meta.Meta dist *meta.DistributionManager targetMgr *meta.TargetManager @@ -75,7 +75,7 @@ type CheckerController struct { balancer balance.Balance scheduler task.Scheduler - checkers map[checkerType]Checker + checkers map[CheckerType]Checker stopOnce sync.Once } @@ -91,14 +91,14 @@ func NewCheckerController( ) *CheckerController { // CheckerController runs checkers with the order, // the former checker has higher priority - checkers := map[checkerType]Checker{ + checkers := map[CheckerType]Checker{ channelChecker: NewChannelChecker(meta, dist, targetMgr, balancer), segmentChecker: NewSegmentChecker(meta, dist, targetMgr, balancer, nodeMgr), balanceChecker: NewBalanceChecker(meta, balancer, nodeMgr, scheduler), indexChecker: NewIndexChecker(meta, dist, broker, nodeMgr), } - manualCheckChs := map[checkerType]chan struct{}{ + manualCheckChs := map[CheckerType]chan struct{}{ channelChecker: make(chan struct{}, 1), segmentChecker: make(chan struct{}, 1), balanceChecker: make(chan struct{}, 1), @@ -124,7 +124,7 @@ func (controller *CheckerController) Start() { } } -func getCheckerInterval(checker checkerType) time.Duration { +func getCheckerInterval(checker CheckerType) time.Duration { switch checker { case segmentChecker: return Params.QueryCoordCfg.SegmentCheckInterval.GetAsDuration(time.Millisecond) @@ -139,7 +139,7 @@ func getCheckerInterval(checker checkerType) time.Duration { } } -func (controller *CheckerController) startChecker(ctx context.Context, checker checkerType) { +func (controller *CheckerController) startChecker(ctx context.Context, checker CheckerType) { interval := getCheckerInterval(checker) ticker := time.NewTicker(interval) defer ticker.Stop() @@ -180,7 +180,7 @@ func (controller *CheckerController) Check() { } // check is the real implementation of Check -func (controller *CheckerController) check(ctx context.Context, checkType checkerType) { +func (controller *CheckerController) check(ctx context.Context, checkType CheckerType) { checker := controller.checkers[checkType] tasks := checker.Check(ctx) @@ -193,7 +193,7 @@ func (controller *CheckerController) check(ctx context.Context, checkType checke } } -func (controller *CheckerController) Deactivate(typ checkerType) error { +func (controller *CheckerController) Deactivate(typ CheckerType) error { for _, checker := range controller.checkers { if checker.ID() == typ { checker.Deactivate() @@ -203,7 +203,7 @@ func (controller *CheckerController) Deactivate(typ checkerType) error { return errTypeNotFound } -func (controller *CheckerController) Activate(typ checkerType) error { +func (controller *CheckerController) Activate(typ CheckerType) error { for _, checker := range controller.checkers { if checker.ID() == typ { checker.Activate() @@ -213,7 +213,7 @@ func (controller *CheckerController) Activate(typ checkerType) error { return errTypeNotFound } -func (controller *CheckerController) IsActive(typ checkerType) (bool, error) { +func (controller *CheckerController) IsActive(typ CheckerType) (bool, error) { for _, checker := range controller.checkers { if checker.ID() == typ { return checker.IsActive(), nil @@ -221,3 +221,11 @@ func (controller *CheckerController) IsActive(typ checkerType) (bool, error) { } return false, errTypeNotFound } + +func (controller *CheckerController) Checkers() []Checker { + checkers := make([]Checker, 0, len(controller.checkers)) + for _, checker := range controller.checkers { + checkers = append(checkers, checker) + } + return checkers +} diff --git a/internal/querycoordv2/checkers/controller_base_test.go b/internal/querycoordv2/checkers/controller_base_test.go index 070e290241..9c56c7481d 100644 --- a/internal/querycoordv2/checkers/controller_base_test.go +++ b/internal/querycoordv2/checkers/controller_base_test.go @@ -95,7 +95,7 @@ func (s *ControllerBaseTestSuite) TestActivation() { s.True(active) invalidTyp := -1 - _, err = s.controller.IsActive(checkerType(invalidTyp)) + _, err = s.controller.IsActive(CheckerType(invalidTyp)) s.Equal(errTypeNotFound, err) } diff --git a/internal/querycoordv2/checkers/index_checker.go b/internal/querycoordv2/checkers/index_checker.go index 2a9e3159e7..421bf210b9 100644 --- a/internal/querycoordv2/checkers/index_checker.go +++ b/internal/querycoordv2/checkers/index_checker.go @@ -58,7 +58,7 @@ func NewIndexChecker( } } -func (c *IndexChecker) ID() checkerType { +func (c *IndexChecker) ID() CheckerType { return indexChecker } diff --git a/internal/querycoordv2/checkers/segment_checker.go b/internal/querycoordv2/checkers/segment_checker.go index 0370b6bc39..603bdb167b 100644 --- a/internal/querycoordv2/checkers/segment_checker.go +++ b/internal/querycoordv2/checkers/segment_checker.go @@ -61,7 +61,7 @@ func NewSegmentChecker( } } -func (c *SegmentChecker) ID() checkerType { +func (c *SegmentChecker) ID() CheckerType { return segmentChecker } diff --git a/internal/querycoordv2/ops_services.go b/internal/querycoordv2/ops_services.go new file mode 100644 index 0000000000..ce5a8ce44b --- /dev/null +++ b/internal/querycoordv2/ops_services.go @@ -0,0 +1,95 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package querycoordv2 + +import ( + "context" + + "go.uber.org/zap" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/querycoordv2/checkers" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +func (s *Server) ListCheckers(ctx context.Context, req *querypb.ListCheckersRequest) (*querypb.ListCheckersResponse, error) { + log := log.Ctx(ctx) + log.Info("list checkers request received") + if err := merr.CheckHealthy(s.State()); err != nil { + log.Warn("failed to list checkers", zap.Error(err)) + return &querypb.ListCheckersResponse{ + Status: merr.Status(err), + }, nil + } + checkers := s.checkerController.Checkers() + checkerIDSet := typeutil.NewSet(req.CheckerIDs...) + + resp := &querypb.ListCheckersResponse{ + Status: merr.Success(), + } + for _, checker := range checkers { + if checkerIDSet.Len() == 0 || checkerIDSet.Contain(int32(checker.ID())) { + resp.CheckerInfos = append(resp.CheckerInfos, &querypb.CheckerInfo{ + Id: int32(checker.ID()), + Activated: checker.IsActive(), + Desc: checker.ID().String(), + Found: true, + }) + checkerIDSet.Remove(int32(checker.ID())) + } + } + + for _, id := range checkerIDSet.Collect() { + resp.CheckerInfos = append(resp.CheckerInfos, &querypb.CheckerInfo{ + Id: id, + Found: false, + }) + } + + return resp, nil +} + +func (s *Server) ActivateChecker(ctx context.Context, req *querypb.ActivateCheckerRequest) (*commonpb.Status, error) { + log := log.Ctx(ctx) + log.Info("activate checker request received") + if err := merr.CheckHealthy(s.State()); err != nil { + log.Warn("failed to activate checker", zap.Error(err)) + return merr.Status(err), nil + } + if err := s.checkerController.Activate(checkers.CheckerType(req.CheckerID)); err != nil { + log.Warn("failed to activate checker", zap.Error(err)) + return merr.Status(merr.WrapErrServiceInternal(err.Error())), nil + } + return merr.Success(), nil +} + +func (s *Server) DeactivateChecker(ctx context.Context, req *querypb.DeactivateCheckerRequest) (*commonpb.Status, error) { + log := log.Ctx(ctx) + log.Info("deactivate checker request received") + if err := merr.CheckHealthy(s.State()); err != nil { + log.Warn("failed to deactivate checker", zap.Error(err)) + return merr.Status(err), nil + } + if err := s.checkerController.Deactivate(checkers.CheckerType(req.CheckerID)); err != nil { + log.Warn("failed to deactivate checker", zap.Error(err)) + return merr.Status(merr.WrapErrServiceInternal(err.Error())), nil + } + return merr.Success(), nil +} diff --git a/internal/util/mock/grpc_querycoord_client.go b/internal/util/mock/grpc_querycoord_client.go index de55dd71ec..bde03927b8 100644 --- a/internal/util/mock/grpc_querycoord_client.go +++ b/internal/util/mock/grpc_querycoord_client.go @@ -129,3 +129,15 @@ func (m *GrpcQueryCoordClient) ListResourceGroups(ctx context.Context, req *milv func (m *GrpcQueryCoordClient) DescribeResourceGroup(ctx context.Context, req *querypb.DescribeResourceGroupRequest, opts ...grpc.CallOption) (*querypb.DescribeResourceGroupResponse, error) { return &querypb.DescribeResourceGroupResponse{}, m.Err } + +func (m *GrpcQueryCoordClient) ListCheckers(ctx context.Context, in *querypb.ListCheckersRequest, opts ...grpc.CallOption) (*querypb.ListCheckersResponse, error) { + return &querypb.ListCheckersResponse{}, m.Err +} + +func (m *GrpcQueryCoordClient) ActivateChecker(ctx context.Context, in *querypb.ActivateCheckerRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { + return &commonpb.Status{}, m.Err +} + +func (m *GrpcQueryCoordClient) DeactivateChecker(ctx context.Context, in *querypb.DeactivateCheckerRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { + return &commonpb.Status{}, m.Err +}