enhance: change GetVChannels rpc into GetPChannelInfo (#34940)

issue: #33285

Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
chyezh 2024-07-25 13:29:45 +08:00 committed by GitHub
parent bef06e5acf
commit d93c51d5f6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 192 additions and 175 deletions

View File

@ -550,7 +550,7 @@ func (m *mockRootCoordClient) ShowSegments(ctx context.Context, req *milvuspb.Sh
panic("not implemented") // TODO: Implement panic("not implemented") // TODO: Implement
} }
func (m *mockRootCoordClient) GetVChannels(ctx context.Context, req *rootcoordpb.GetVChannelsRequest, opts ...grpc.CallOption) (*rootcoordpb.GetVChannelsResponse, error) { func (m *mockRootCoordClient) GetPChannelInfo(ctx context.Context, req *rootcoordpb.GetPChannelInfoRequest, opts ...grpc.CallOption) (*rootcoordpb.GetPChannelInfoResponse, error) {
panic("not implemented") // TODO: Implement panic("not implemented") // TODO: Implement
} }

View File

@ -347,14 +347,14 @@ func (c *Client) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsRequ
} }
// GetVChannels returns all vchannels belonging to the pchannel. // GetVChannels returns all vchannels belonging to the pchannel.
func (c *Client) GetVChannels(ctx context.Context, in *rootcoordpb.GetVChannelsRequest, opts ...grpc.CallOption) (*rootcoordpb.GetVChannelsResponse, error) { func (c *Client) GetPChannelInfo(ctx context.Context, in *rootcoordpb.GetPChannelInfoRequest, opts ...grpc.CallOption) (*rootcoordpb.GetPChannelInfoResponse, error) {
in = typeutil.Clone(in) in = typeutil.Clone(in)
commonpbutil.UpdateMsgBase( commonpbutil.UpdateMsgBase(
in.GetBase(), in.GetBase(),
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())), commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
) )
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*rootcoordpb.GetVChannelsResponse, error) { return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*rootcoordpb.GetPChannelInfoResponse, error) {
return client.GetVChannels(ctx, in) return client.GetPChannelInfo(ctx, in)
}) })
} }

View File

@ -137,7 +137,7 @@ func Test_NewClient(t *testing.T) {
retCheck(retNotNil, r, err) retCheck(retNotNil, r, err)
} }
{ {
r, err := client.GetVChannels(ctx, nil) r, err := client.GetPChannelInfo(ctx, nil)
retCheck(retNotNil, r, err) retCheck(retNotNil, r, err)
} }
{ {
@ -355,7 +355,7 @@ func Test_NewClient(t *testing.T) {
retCheck(rTimeout, err) retCheck(rTimeout, err)
} }
{ {
rTimeout, err := client.GetVChannels(shortCtx, nil) rTimeout, err := client.GetPChannelInfo(shortCtx, nil)
retCheck(rTimeout, err) retCheck(rTimeout, err)
} }
{ {

View File

@ -454,9 +454,9 @@ func (s *Server) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsRequ
return s.rootCoord.ShowSegments(ctx, in) return s.rootCoord.ShowSegments(ctx, in)
} }
// GetVChannels returns all vchannels belonging to the pchannel. // GetPChannelInfo gets the physical channel information
func (s *Server) GetVChannels(ctx context.Context, in *rootcoordpb.GetVChannelsRequest) (*rootcoordpb.GetVChannelsResponse, error) { func (s *Server) GetPChannelInfo(ctx context.Context, in *rootcoordpb.GetPChannelInfoRequest) (*rootcoordpb.GetPChannelInfoResponse, error) {
return s.rootCoord.GetVChannels(ctx, in) return s.rootCoord.GetPChannelInfo(ctx, in)
} }
// InvalidateCollectionMetaCache notifies RootCoord to release the collection cache in Proxies. // InvalidateCollectionMetaCache notifies RootCoord to release the collection cache in Proxies.

View File

@ -1411,6 +1411,61 @@ func (_c *RootCoord_GetMetrics_Call) RunAndReturn(run func(context.Context, *mil
return _c return _c
} }
// GetPChannelInfo provides a mock function with given fields: _a0, _a1
func (_m *RootCoord) GetPChannelInfo(_a0 context.Context, _a1 *rootcoordpb.GetPChannelInfoRequest) (*rootcoordpb.GetPChannelInfoResponse, error) {
ret := _m.Called(_a0, _a1)
var r0 *rootcoordpb.GetPChannelInfoResponse
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *rootcoordpb.GetPChannelInfoRequest) (*rootcoordpb.GetPChannelInfoResponse, error)); ok {
return rf(_a0, _a1)
}
if rf, ok := ret.Get(0).(func(context.Context, *rootcoordpb.GetPChannelInfoRequest) *rootcoordpb.GetPChannelInfoResponse); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*rootcoordpb.GetPChannelInfoResponse)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *rootcoordpb.GetPChannelInfoRequest) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// RootCoord_GetPChannelInfo_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetPChannelInfo'
type RootCoord_GetPChannelInfo_Call struct {
*mock.Call
}
// GetPChannelInfo is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *rootcoordpb.GetPChannelInfoRequest
func (_e *RootCoord_Expecter) GetPChannelInfo(_a0 interface{}, _a1 interface{}) *RootCoord_GetPChannelInfo_Call {
return &RootCoord_GetPChannelInfo_Call{Call: _e.mock.On("GetPChannelInfo", _a0, _a1)}
}
func (_c *RootCoord_GetPChannelInfo_Call) Run(run func(_a0 context.Context, _a1 *rootcoordpb.GetPChannelInfoRequest)) *RootCoord_GetPChannelInfo_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*rootcoordpb.GetPChannelInfoRequest))
})
return _c
}
func (_c *RootCoord_GetPChannelInfo_Call) Return(_a0 *rootcoordpb.GetPChannelInfoResponse, _a1 error) *RootCoord_GetPChannelInfo_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *RootCoord_GetPChannelInfo_Call) RunAndReturn(run func(context.Context, *rootcoordpb.GetPChannelInfoRequest) (*rootcoordpb.GetPChannelInfoResponse, error)) *RootCoord_GetPChannelInfo_Call {
_c.Call.Return(run)
return _c
}
// GetStatisticsChannel provides a mock function with given fields: _a0, _a1 // GetStatisticsChannel provides a mock function with given fields: _a0, _a1
func (_m *RootCoord) GetStatisticsChannel(_a0 context.Context, _a1 *internalpb.GetStatisticsChannelRequest) (*milvuspb.StringResponse, error) { func (_m *RootCoord) GetStatisticsChannel(_a0 context.Context, _a1 *internalpb.GetStatisticsChannelRequest) (*milvuspb.StringResponse, error) {
ret := _m.Called(_a0, _a1) ret := _m.Called(_a0, _a1)
@ -1521,61 +1576,6 @@ func (_c *RootCoord_GetTimeTickChannel_Call) RunAndReturn(run func(context.Conte
return _c return _c
} }
// GetVChannels provides a mock function with given fields: _a0, _a1
func (_m *RootCoord) GetVChannels(_a0 context.Context, _a1 *rootcoordpb.GetVChannelsRequest) (*rootcoordpb.GetVChannelsResponse, error) {
ret := _m.Called(_a0, _a1)
var r0 *rootcoordpb.GetVChannelsResponse
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *rootcoordpb.GetVChannelsRequest) (*rootcoordpb.GetVChannelsResponse, error)); ok {
return rf(_a0, _a1)
}
if rf, ok := ret.Get(0).(func(context.Context, *rootcoordpb.GetVChannelsRequest) *rootcoordpb.GetVChannelsResponse); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*rootcoordpb.GetVChannelsResponse)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *rootcoordpb.GetVChannelsRequest) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// RootCoord_GetVChannels_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetVChannels'
type RootCoord_GetVChannels_Call struct {
*mock.Call
}
// GetVChannels is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *rootcoordpb.GetVChannelsRequest
func (_e *RootCoord_Expecter) GetVChannels(_a0 interface{}, _a1 interface{}) *RootCoord_GetVChannels_Call {
return &RootCoord_GetVChannels_Call{Call: _e.mock.On("GetVChannels", _a0, _a1)}
}
func (_c *RootCoord_GetVChannels_Call) Run(run func(_a0 context.Context, _a1 *rootcoordpb.GetVChannelsRequest)) *RootCoord_GetVChannels_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*rootcoordpb.GetVChannelsRequest))
})
return _c
}
func (_c *RootCoord_GetVChannels_Call) Return(_a0 *rootcoordpb.GetVChannelsResponse, _a1 error) *RootCoord_GetVChannels_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *RootCoord_GetVChannels_Call) RunAndReturn(run func(context.Context, *rootcoordpb.GetVChannelsRequest) (*rootcoordpb.GetVChannelsResponse, error)) *RootCoord_GetVChannels_Call {
_c.Call.Return(run)
return _c
}
// HasCollection provides a mock function with given fields: _a0, _a1 // HasCollection provides a mock function with given fields: _a0, _a1
func (_m *RootCoord) HasCollection(_a0 context.Context, _a1 *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) { func (_m *RootCoord) HasCollection(_a0 context.Context, _a1 *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
ret := _m.Called(_a0, _a1) ret := _m.Called(_a0, _a1)

View File

@ -1824,6 +1824,76 @@ func (_c *MockRootCoordClient_GetMetrics_Call) RunAndReturn(run func(context.Con
return _c return _c
} }
// GetPChannelInfo provides a mock function with given fields: ctx, in, opts
func (_m *MockRootCoordClient) GetPChannelInfo(ctx context.Context, in *rootcoordpb.GetPChannelInfoRequest, opts ...grpc.CallOption) (*rootcoordpb.GetPChannelInfoResponse, error) {
_va := make([]interface{}, len(opts))
for _i := range opts {
_va[_i] = opts[_i]
}
var _ca []interface{}
_ca = append(_ca, ctx, in)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 *rootcoordpb.GetPChannelInfoResponse
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *rootcoordpb.GetPChannelInfoRequest, ...grpc.CallOption) (*rootcoordpb.GetPChannelInfoResponse, error)); ok {
return rf(ctx, in, opts...)
}
if rf, ok := ret.Get(0).(func(context.Context, *rootcoordpb.GetPChannelInfoRequest, ...grpc.CallOption) *rootcoordpb.GetPChannelInfoResponse); ok {
r0 = rf(ctx, in, opts...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*rootcoordpb.GetPChannelInfoResponse)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *rootcoordpb.GetPChannelInfoRequest, ...grpc.CallOption) error); ok {
r1 = rf(ctx, in, opts...)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockRootCoordClient_GetPChannelInfo_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetPChannelInfo'
type MockRootCoordClient_GetPChannelInfo_Call struct {
*mock.Call
}
// GetPChannelInfo is a helper method to define mock.On call
// - ctx context.Context
// - in *rootcoordpb.GetPChannelInfoRequest
// - opts ...grpc.CallOption
func (_e *MockRootCoordClient_Expecter) GetPChannelInfo(ctx interface{}, in interface{}, opts ...interface{}) *MockRootCoordClient_GetPChannelInfo_Call {
return &MockRootCoordClient_GetPChannelInfo_Call{Call: _e.mock.On("GetPChannelInfo",
append([]interface{}{ctx, in}, opts...)...)}
}
func (_c *MockRootCoordClient_GetPChannelInfo_Call) Run(run func(ctx context.Context, in *rootcoordpb.GetPChannelInfoRequest, opts ...grpc.CallOption)) *MockRootCoordClient_GetPChannelInfo_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]grpc.CallOption, len(args)-2)
for i, a := range args[2:] {
if a != nil {
variadicArgs[i] = a.(grpc.CallOption)
}
}
run(args[0].(context.Context), args[1].(*rootcoordpb.GetPChannelInfoRequest), variadicArgs...)
})
return _c
}
func (_c *MockRootCoordClient_GetPChannelInfo_Call) Return(_a0 *rootcoordpb.GetPChannelInfoResponse, _a1 error) *MockRootCoordClient_GetPChannelInfo_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockRootCoordClient_GetPChannelInfo_Call) RunAndReturn(run func(context.Context, *rootcoordpb.GetPChannelInfoRequest, ...grpc.CallOption) (*rootcoordpb.GetPChannelInfoResponse, error)) *MockRootCoordClient_GetPChannelInfo_Call {
_c.Call.Return(run)
return _c
}
// GetStatisticsChannel provides a mock function with given fields: ctx, in, opts // GetStatisticsChannel provides a mock function with given fields: ctx, in, opts
func (_m *MockRootCoordClient) GetStatisticsChannel(ctx context.Context, in *internalpb.GetStatisticsChannelRequest, opts ...grpc.CallOption) (*milvuspb.StringResponse, error) { func (_m *MockRootCoordClient) GetStatisticsChannel(ctx context.Context, in *internalpb.GetStatisticsChannelRequest, opts ...grpc.CallOption) (*milvuspb.StringResponse, error) {
_va := make([]interface{}, len(opts)) _va := make([]interface{}, len(opts))
@ -1964,76 +2034,6 @@ func (_c *MockRootCoordClient_GetTimeTickChannel_Call) RunAndReturn(run func(con
return _c return _c
} }
// GetVChannels provides a mock function with given fields: ctx, in, opts
func (_m *MockRootCoordClient) GetVChannels(ctx context.Context, in *rootcoordpb.GetVChannelsRequest, opts ...grpc.CallOption) (*rootcoordpb.GetVChannelsResponse, error) {
_va := make([]interface{}, len(opts))
for _i := range opts {
_va[_i] = opts[_i]
}
var _ca []interface{}
_ca = append(_ca, ctx, in)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 *rootcoordpb.GetVChannelsResponse
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *rootcoordpb.GetVChannelsRequest, ...grpc.CallOption) (*rootcoordpb.GetVChannelsResponse, error)); ok {
return rf(ctx, in, opts...)
}
if rf, ok := ret.Get(0).(func(context.Context, *rootcoordpb.GetVChannelsRequest, ...grpc.CallOption) *rootcoordpb.GetVChannelsResponse); ok {
r0 = rf(ctx, in, opts...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*rootcoordpb.GetVChannelsResponse)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *rootcoordpb.GetVChannelsRequest, ...grpc.CallOption) error); ok {
r1 = rf(ctx, in, opts...)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockRootCoordClient_GetVChannels_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetVChannels'
type MockRootCoordClient_GetVChannels_Call struct {
*mock.Call
}
// GetVChannels is a helper method to define mock.On call
// - ctx context.Context
// - in *rootcoordpb.GetVChannelsRequest
// - opts ...grpc.CallOption
func (_e *MockRootCoordClient_Expecter) GetVChannels(ctx interface{}, in interface{}, opts ...interface{}) *MockRootCoordClient_GetVChannels_Call {
return &MockRootCoordClient_GetVChannels_Call{Call: _e.mock.On("GetVChannels",
append([]interface{}{ctx, in}, opts...)...)}
}
func (_c *MockRootCoordClient_GetVChannels_Call) Run(run func(ctx context.Context, in *rootcoordpb.GetVChannelsRequest, opts ...grpc.CallOption)) *MockRootCoordClient_GetVChannels_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]grpc.CallOption, len(args)-2)
for i, a := range args[2:] {
if a != nil {
variadicArgs[i] = a.(grpc.CallOption)
}
}
run(args[0].(context.Context), args[1].(*rootcoordpb.GetVChannelsRequest), variadicArgs...)
})
return _c
}
func (_c *MockRootCoordClient_GetVChannels_Call) Return(_a0 *rootcoordpb.GetVChannelsResponse, _a1 error) *MockRootCoordClient_GetVChannels_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockRootCoordClient_GetVChannels_Call) RunAndReturn(run func(context.Context, *rootcoordpb.GetVChannelsRequest, ...grpc.CallOption) (*rootcoordpb.GetVChannelsResponse, error)) *MockRootCoordClient_GetVChannels_Call {
_c.Call.Return(run)
return _c
}
// HasCollection provides a mock function with given fields: ctx, in, opts // HasCollection provides a mock function with given fields: ctx, in, opts
func (_m *MockRootCoordClient) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequest, opts ...grpc.CallOption) (*milvuspb.BoolResponse, error) { func (_m *MockRootCoordClient) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequest, opts ...grpc.CallOption) (*milvuspb.BoolResponse, error) {
_va := make([]interface{}, len(opts)) _va := make([]interface{}, len(opts))

View File

@ -96,7 +96,7 @@ service RootCoord {
rpc ShowPartitions(milvus.ShowPartitionsRequest) returns (milvus.ShowPartitionsResponse) {} rpc ShowPartitions(milvus.ShowPartitionsRequest) returns (milvus.ShowPartitionsResponse) {}
rpc ShowPartitionsInternal(milvus.ShowPartitionsRequest) returns (milvus.ShowPartitionsResponse) {} rpc ShowPartitionsInternal(milvus.ShowPartitionsRequest) returns (milvus.ShowPartitionsResponse) {}
rpc ShowSegments(milvus.ShowSegmentsRequest) returns (milvus.ShowSegmentsResponse) {} rpc ShowSegments(milvus.ShowSegmentsRequest) returns (milvus.ShowSegmentsResponse) {}
rpc GetVChannels(GetVChannelsRequest) returns (GetVChannelsResponse) {} rpc GetPChannelInfo(GetPChannelInfoRequest) returns (GetPChannelInfoResponse) {}
rpc AllocTimestamp(AllocTimestampRequest) returns (AllocTimestampResponse) {} rpc AllocTimestamp(AllocTimestampRequest) returns (AllocTimestampResponse) {}
rpc AllocID(AllocIDRequest) returns (AllocIDResponse) {} rpc AllocID(AllocIDRequest) returns (AllocIDResponse) {}
@ -220,12 +220,23 @@ message AlterDatabaseRequest {
repeated common.KeyValuePair properties = 4; repeated common.KeyValuePair properties = 4;
} }
message GetVChannelsRequest { message GetPChannelInfoRequest {
common.MsgBase base = 1; common.MsgBase base = 1;
string pchannel = 2; string pchannel = 2;
} }
message GetVChannelsResponse { message GetPChannelInfoResponse {
common.Status status = 1; common.Status status = 1;
repeated string vchannels = 2; repeated CollectionInfoOnPChannel collections = 2;
} }
message CollectionInfoOnPChannel {
int64 collection_id = 1;
repeated PartitionInfoOnPChannel partitions = 2;
string vchannel = 3;
}
message PartitionInfoOnPChannel {
int64 partition_id = 1;
}

View File

@ -890,7 +890,7 @@ func (coord *RootCoordMock) ShowSegments(ctx context.Context, req *milvuspb.Show
}, nil }, nil
} }
func (coord *RootCoordMock) GetVChannels(ctx context.Context, in *rootcoordpb.GetVChannelsRequest, opts ...grpc.CallOption) (*rootcoordpb.GetVChannelsResponse, error) { func (coord *RootCoordMock) GetPChannelInfo(ctx context.Context, in *rootcoordpb.GetPChannelInfoRequest, opts ...grpc.CallOption) (*rootcoordpb.GetPChannelInfoResponse, error) {
panic("implement me") panic("implement me")
} }

View File

@ -31,6 +31,7 @@ import (
"github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/metastore/model"
pb "github.com/milvus-io/milvus/internal/proto/etcdpb" pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/tso" "github.com/milvus-io/milvus/internal/tso"
"github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/log"
@ -62,7 +63,7 @@ type IMetaTable interface {
ListAllAvailCollections(ctx context.Context) map[int64][]int64 ListAllAvailCollections(ctx context.Context) map[int64][]int64
ListCollectionPhysicalChannels() map[typeutil.UniqueID][]string ListCollectionPhysicalChannels() map[typeutil.UniqueID][]string
GetCollectionVirtualChannels(colID int64) []string GetCollectionVirtualChannels(colID int64) []string
GetVChannelsByPchannel(pchannel string) []string GetPChannelInfo(pchannel string) *rootcoordpb.GetPChannelInfoResponse
AddPartition(ctx context.Context, partition *model.Partition) error AddPartition(ctx context.Context, partition *model.Partition) error
ChangePartitionState(ctx context.Context, collectionID UniqueID, partitionID UniqueID, state pb.PartitionState, ts Timestamp) error ChangePartitionState(ctx context.Context, collectionID UniqueID, partitionID UniqueID, state pb.PartitionState, ts Timestamp) error
RemovePartition(ctx context.Context, dbID int64, collectionID UniqueID, partitionID UniqueID, ts Timestamp) error RemovePartition(ctx context.Context, dbID int64, collectionID UniqueID, partitionID UniqueID, ts Timestamp) error
@ -835,17 +836,30 @@ func (mt *MetaTable) GetCollectionVirtualChannels(colID int64) []string {
return nil return nil
} }
// GetVChannelsByPchannel returns vchannels by the given pchannel. // GetPChannelInfo returns infos on pchannel.
func (mt *MetaTable) GetVChannelsByPchannel(pchannel string) []string { func (mt *MetaTable) GetPChannelInfo(pchannel string) *rootcoordpb.GetPChannelInfoResponse {
mt.ddLock.RLock() mt.ddLock.RLock()
defer mt.ddLock.RUnlock() defer mt.ddLock.RUnlock()
res := make([]string, 0) resp := &rootcoordpb.GetPChannelInfoResponse{
Status: merr.Success(),
Collections: make([]*rootcoordpb.CollectionInfoOnPChannel, 0),
}
for _, collInfo := range mt.collID2Meta { for _, collInfo := range mt.collID2Meta {
if idx := lo.IndexOf(collInfo.PhysicalChannelNames, pchannel); idx > 0 { if idx := lo.IndexOf(collInfo.PhysicalChannelNames, pchannel); idx >= 0 {
res = append(res, collInfo.VirtualChannelNames[idx]) partitions := make([]*rootcoordpb.PartitionInfoOnPChannel, 0, len(collInfo.Partitions))
for _, part := range collInfo.Partitions {
partitions = append(partitions, &rootcoordpb.PartitionInfoOnPChannel{
PartitionId: part.PartitionID,
})
}
resp.Collections = append(resp.Collections, &rootcoordpb.CollectionInfoOnPChannel{
CollectionId: collInfo.CollectionID,
Partitions: partitions,
Vchannel: collInfo.VirtualChannelNames[idx],
})
} }
} }
return res return resp
} }
func (mt *MetaTable) AddPartition(ctx context.Context, partition *model.Partition) error { func (mt *MetaTable) AddPartition(ctx context.Context, partition *model.Partition) error {

View File

@ -191,10 +191,6 @@ func (m mockMetaTable) GetCollectionVirtualChannels(colID int64) []string {
return m.GetCollectionVirtualChannelsFunc(colID) return m.GetCollectionVirtualChannelsFunc(colID)
} }
func (m mockMetaTable) GetVChannelsByPchannel(pchannel string) []string {
panic("unimplemented")
}
func (m mockMetaTable) AddCredential(credInfo *internalpb.CredentialInfo) error { func (m mockMetaTable) AddCredential(credInfo *internalpb.CredentialInfo) error {
return m.AddCredentialFunc(credInfo) return m.AddCredentialFunc(credInfo)
} }

View File

@ -13,6 +13,8 @@ import (
mock "github.com/stretchr/testify/mock" mock "github.com/stretchr/testify/mock"
model "github.com/milvus-io/milvus/internal/metastore/model" model "github.com/milvus-io/milvus/internal/metastore/model"
rootcoordpb "github.com/milvus-io/milvus/internal/proto/rootcoordpb"
) )
// IMetaTable is an autogenerated mock type for the IMetaTable type // IMetaTable is an autogenerated mock type for the IMetaTable type
@ -1210,46 +1212,46 @@ func (_c *IMetaTable_GetDatabaseByName_Call) RunAndReturn(run func(context.Conte
return _c return _c
} }
// GetVChannelsByPchannel provides a mock function with given fields: pchannel // GetPChannelInfo provides a mock function with given fields: pchannel
func (_m *IMetaTable) GetVChannelsByPchannel(pchannel string) []string { func (_m *IMetaTable) GetPChannelInfo(pchannel string) *rootcoordpb.GetPChannelInfoResponse {
ret := _m.Called(pchannel) ret := _m.Called(pchannel)
var r0 []string var r0 *rootcoordpb.GetPChannelInfoResponse
if rf, ok := ret.Get(0).(func(string) []string); ok { if rf, ok := ret.Get(0).(func(string) *rootcoordpb.GetPChannelInfoResponse); ok {
r0 = rf(pchannel) r0 = rf(pchannel)
} else { } else {
if ret.Get(0) != nil { if ret.Get(0) != nil {
r0 = ret.Get(0).([]string) r0 = ret.Get(0).(*rootcoordpb.GetPChannelInfoResponse)
} }
} }
return r0 return r0
} }
// IMetaTable_GetVChannelsByPchannel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetVChannelsByPchannel' // IMetaTable_GetPChannelInfo_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetPChannelInfo'
type IMetaTable_GetVChannelsByPchannel_Call struct { type IMetaTable_GetPChannelInfo_Call struct {
*mock.Call *mock.Call
} }
// GetVChannelsByPchannel is a helper method to define mock.On call // GetPChannelInfo is a helper method to define mock.On call
// - pchannel string // - pchannel string
func (_e *IMetaTable_Expecter) GetVChannelsByPchannel(pchannel interface{}) *IMetaTable_GetVChannelsByPchannel_Call { func (_e *IMetaTable_Expecter) GetPChannelInfo(pchannel interface{}) *IMetaTable_GetPChannelInfo_Call {
return &IMetaTable_GetVChannelsByPchannel_Call{Call: _e.mock.On("GetVChannelsByPchannel", pchannel)} return &IMetaTable_GetPChannelInfo_Call{Call: _e.mock.On("GetPChannelInfo", pchannel)}
} }
func (_c *IMetaTable_GetVChannelsByPchannel_Call) Run(run func(pchannel string)) *IMetaTable_GetVChannelsByPchannel_Call { func (_c *IMetaTable_GetPChannelInfo_Call) Run(run func(pchannel string)) *IMetaTable_GetPChannelInfo_Call {
_c.Call.Run(func(args mock.Arguments) { _c.Call.Run(func(args mock.Arguments) {
run(args[0].(string)) run(args[0].(string))
}) })
return _c return _c
} }
func (_c *IMetaTable_GetVChannelsByPchannel_Call) Return(_a0 []string) *IMetaTable_GetVChannelsByPchannel_Call { func (_c *IMetaTable_GetPChannelInfo_Call) Return(_a0 *rootcoordpb.GetPChannelInfoResponse) *IMetaTable_GetPChannelInfo_Call {
_c.Call.Return(_a0) _c.Call.Return(_a0)
return _c return _c
} }
func (_c *IMetaTable_GetVChannelsByPchannel_Call) RunAndReturn(run func(string) []string) *IMetaTable_GetVChannelsByPchannel_Call { func (_c *IMetaTable_GetPChannelInfo_Call) RunAndReturn(run func(string) *rootcoordpb.GetPChannelInfoResponse) *IMetaTable_GetPChannelInfo_Call {
_c.Call.Return(run) _c.Call.Return(run)
return _c return _c
} }

View File

@ -1568,20 +1568,14 @@ func (c *Core) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsReques
return &milvuspb.ShowSegmentsResponse{Status: merr.Success()}, nil return &milvuspb.ShowSegmentsResponse{Status: merr.Success()}, nil
} }
// GetVChannels returns all vchannels belonging to the pchannel. // GetPChannelInfo get pchannel info.
func (c *Core) GetVChannels(ctx context.Context, in *rootcoordpb.GetVChannelsRequest) (*rootcoordpb.GetVChannelsResponse, error) { func (c *Core) GetPChannelInfo(ctx context.Context, in *rootcoordpb.GetPChannelInfoRequest) (*rootcoordpb.GetPChannelInfoResponse, error) {
if err := merr.CheckHealthy(c.GetStateCode()); err != nil { if err := merr.CheckHealthy(c.GetStateCode()); err != nil {
return &rootcoordpb.GetVChannelsResponse{ return &rootcoordpb.GetPChannelInfoResponse{
Status: merr.Status(err), Status: merr.Status(err),
}, nil }, nil
} }
return c.meta.GetPChannelInfo(in.GetPchannel()), nil
resp := &rootcoordpb.GetVChannelsResponse{
Status: merr.Success(),
}
vchannels := c.meta.GetVChannelsByPchannel(in.GetPchannel())
resp.Vchannels = vchannels
return resp, nil
} }
// AllocTimestamp alloc timestamp // AllocTimestamp alloc timestamp

View File

@ -186,8 +186,8 @@ func (m *GrpcRootCoordClient) ShowSegments(ctx context.Context, in *milvuspb.Sho
return &milvuspb.ShowSegmentsResponse{}, m.Err return &milvuspb.ShowSegmentsResponse{}, m.Err
} }
func (m *GrpcRootCoordClient) GetVChannels(ctx context.Context, in *rootcoordpb.GetVChannelsRequest, opts ...grpc.CallOption) (*rootcoordpb.GetVChannelsResponse, error) { func (m *GrpcRootCoordClient) GetPChannelInfo(ctx context.Context, in *rootcoordpb.GetPChannelInfoRequest, opts ...grpc.CallOption) (*rootcoordpb.GetPChannelInfoResponse, error) {
return &rootcoordpb.GetVChannelsResponse{}, m.Err return &rootcoordpb.GetPChannelInfoResponse{}, m.Err
} }
func (m *GrpcRootCoordClient) DescribeSegments(ctx context.Context, in *rootcoordpb.DescribeSegmentsRequest, opts ...grpc.CallOption) (*rootcoordpb.DescribeSegmentsResponse, error) { func (m *GrpcRootCoordClient) DescribeSegments(ctx context.Context, in *rootcoordpb.DescribeSegmentsRequest, opts ...grpc.CallOption) (*rootcoordpb.DescribeSegmentsResponse, error) {