Support to manage connections (#24224) (#24293)

Signed-off-by: longjiquan <jiquan.long@zilliz.com>
This commit is contained in:
Jiquan Long 2023-05-23 09:45:26 +08:00 committed by GitHub
parent 9569e7ee76
commit 6e69ffd496
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 4093 additions and 1092 deletions

2
go.mod
View File

@ -28,7 +28,7 @@ require (
github.com/klauspost/compress v1.14.2
github.com/lingdor/stackerror v0.0.0-20191119040541-976d8885ed76
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230511113116-d65d4d30bff4
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230518083718-af486bf54d60
github.com/minio/minio-go/v7 v7.0.17
github.com/opentracing/opentracing-go v1.2.0
github.com/panjf2000/ants/v2 v2.4.8

2
go.sum
View File

@ -508,6 +508,8 @@ github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZz
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230511113116-d65d4d30bff4 h1:4POQn9n+vlawERWVnXQLXyQ7mT5Pz00gON9aqAgFq3w=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230511113116-d65d4d30bff4/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230518083718-af486bf54d60 h1:mV9vNiZvJGm9YXzDUivFlwgbAA5N1wcO9XXwem0QS+Y=
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230518083718-af486bf54d60/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk=
github.com/milvus-io/pulsar-client-go v0.6.8 h1:fZdZH73aPRszu2fazyeeahQEz34tyn1Pt9EkqJmV100=
github.com/milvus-io/pulsar-client-go v0.6.8/go.mod h1:oFIlYIk23tamkSLttw849qphmMIpHY8ztEBWDWJW+sc=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -509,6 +509,7 @@ const ::PROTOBUF_NAMESPACE_ID::uint32 TableStruct_schema_2eproto::offsets[] PROT
offsetof(::milvus::proto::schema::FieldDataDefaultTypeInternal, scalars_),
offsetof(::milvus::proto::schema::FieldDataDefaultTypeInternal, vectors_),
PROTOBUF_FIELD_OFFSET(::milvus::proto::schema::FieldData, field_id_),
PROTOBUF_FIELD_OFFSET(::milvus::proto::schema::FieldData, is_dynamic_),
PROTOBUF_FIELD_OFFSET(::milvus::proto::schema::FieldData, field_),
~0u, // no _has_bits_
PROTOBUF_FIELD_OFFSET(::milvus::proto::schema::IDs, _internal_metadata_),
@ -546,8 +547,8 @@ static const ::PROTOBUF_NAMESPACE_ID::internal::MigrationSchema schemas[] PROTOB
{ 96, -1, sizeof(::milvus::proto::schema::ScalarField)},
{ 111, -1, sizeof(::milvus::proto::schema::VectorField)},
{ 120, -1, sizeof(::milvus::proto::schema::FieldData)},
{ 131, -1, sizeof(::milvus::proto::schema::IDs)},
{ 139, -1, sizeof(::milvus::proto::schema::SearchResultData)},
{ 132, -1, sizeof(::milvus::proto::schema::IDs)},
{ 140, -1, sizeof(::milvus::proto::schema::SearchResultData)},
};
static ::PROTOBUF_NAMESPACE_ID::Message const * const file_default_instances[] = {
@ -616,29 +617,30 @@ const char descriptor_table_protodef_schema_2eproto[] PROTOBUF_SECTION_VARIABLE(
"JSONArrayH\000B\006\n\004data\"t\n\013VectorField\022\013\n\003di"
"m\030\001 \001(\003\0227\n\014float_vector\030\002 \001(\0132\037.milvus.p"
"roto.schema.FloatArrayH\000\022\027\n\rbinary_vecto"
"r\030\003 \001(\014H\000B\006\n\004data\"\321\001\n\tFieldData\022+\n\004type\030"
"r\030\003 \001(\014H\000B\006\n\004data\"\345\001\n\tFieldData\022+\n\004type\030"
"\001 \001(\0162\035.milvus.proto.schema.DataType\022\022\n\n"
"field_name\030\002 \001(\t\0223\n\007scalars\030\003 \001(\0132 .milv"
"us.proto.schema.ScalarFieldH\000\0223\n\007vectors"
"\030\004 \001(\0132 .milvus.proto.schema.VectorField"
"H\000\022\020\n\010field_id\030\005 \001(\003B\007\n\005field\"w\n\003IDs\0220\n\006"
"int_id\030\001 \001(\0132\036.milvus.proto.schema.LongA"
"rrayH\000\0222\n\006str_id\030\002 \001(\0132 .milvus.proto.sc"
"hema.StringArrayH\000B\n\n\010id_field\"\261\001\n\020Searc"
"hResultData\022\023\n\013num_queries\030\001 \001(\003\022\r\n\005top_"
"k\030\002 \001(\003\0223\n\013fields_data\030\003 \003(\0132\036.milvus.pr"
"oto.schema.FieldData\022\016\n\006scores\030\004 \003(\002\022%\n\003"
"ids\030\005 \001(\0132\030.milvus.proto.schema.IDs\022\r\n\005t"
"opks\030\006 \003(\003*\261\001\n\010DataType\022\010\n\004None\020\000\022\010\n\004Boo"
"l\020\001\022\010\n\004Int8\020\002\022\t\n\005Int16\020\003\022\t\n\005Int32\020\004\022\t\n\005I"
"nt64\020\005\022\t\n\005Float\020\n\022\n\n\006Double\020\013\022\n\n\006String\020"
"\024\022\013\n\007VarChar\020\025\022\t\n\005Array\020\026\022\010\n\004JSON\020\027\022\020\n\014B"
"inaryVector\020d\022\017\n\013FloatVector\020e*V\n\nFieldS"
"tate\022\020\n\014FieldCreated\020\000\022\021\n\rFieldCreating\020"
"\001\022\021\n\rFieldDropping\020\002\022\020\n\014FieldDropped\020\003Bf"
"\n\016io.milvus.grpcB\013SchemaProtoP\001Z1github."
"com/milvus-io/milvus-proto/go-api/schema"
"pb\240\001\001\252\002\016IO.Milvus.Grpcb\006proto3"
"H\000\022\020\n\010field_id\030\005 \001(\003\022\022\n\nis_dynamic\030\006 \001(\010"
"B\007\n\005field\"w\n\003IDs\0220\n\006int_id\030\001 \001(\0132\036.milvu"
"s.proto.schema.LongArrayH\000\0222\n\006str_id\030\002 \001"
"(\0132 .milvus.proto.schema.StringArrayH\000B\n"
"\n\010id_field\"\261\001\n\020SearchResultData\022\023\n\013num_q"
"ueries\030\001 \001(\003\022\r\n\005top_k\030\002 \001(\003\0223\n\013fields_da"
"ta\030\003 \003(\0132\036.milvus.proto.schema.FieldData"
"\022\016\n\006scores\030\004 \003(\002\022%\n\003ids\030\005 \001(\0132\030.milvus.p"
"roto.schema.IDs\022\r\n\005topks\030\006 \003(\003*\261\001\n\010DataT"
"ype\022\010\n\004None\020\000\022\010\n\004Bool\020\001\022\010\n\004Int8\020\002\022\t\n\005Int"
"16\020\003\022\t\n\005Int32\020\004\022\t\n\005Int64\020\005\022\t\n\005Float\020\n\022\n\n"
"\006Double\020\013\022\n\n\006String\020\024\022\013\n\007VarChar\020\025\022\t\n\005Ar"
"ray\020\026\022\010\n\004JSON\020\027\022\020\n\014BinaryVector\020d\022\017\n\013Flo"
"atVector\020e*V\n\nFieldState\022\020\n\014FieldCreated"
"\020\000\022\021\n\rFieldCreating\020\001\022\021\n\rFieldDropping\020\002"
"\022\020\n\014FieldDropped\020\003Bf\n\016io.milvus.grpcB\013Sc"
"hemaProtoP\001Z1github.com/milvus-io/milvus"
"-proto/go-api/schemapb\240\001\001\252\002\016IO.Milvus.Gr"
"pcb\006proto3"
;
static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor_table_schema_2eproto_deps[1] = {
&::descriptor_table_common_2eproto,
@ -664,7 +666,7 @@ static ::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase*const descriptor_table_sch
static ::PROTOBUF_NAMESPACE_ID::internal::once_flag descriptor_table_schema_2eproto_once;
static bool descriptor_table_schema_2eproto_initialized = false;
const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table_schema_2eproto = {
&descriptor_table_schema_2eproto_initialized, descriptor_table_protodef_schema_2eproto, "schema.proto", 2710,
&descriptor_table_schema_2eproto_initialized, descriptor_table_protodef_schema_2eproto, "schema.proto", 2730,
&descriptor_table_schema_2eproto_once, descriptor_table_schema_2eproto_sccs, descriptor_table_schema_2eproto_deps, 16, 1,
schemas, file_default_instances, TableStruct_schema_2eproto::offsets,
file_level_metadata_schema_2eproto, 17, file_level_enum_descriptors_schema_2eproto, file_level_service_descriptors_schema_2eproto,
@ -6562,6 +6564,13 @@ const char* FieldData::_InternalParse(const char* ptr, ::PROTOBUF_NAMESPACE_ID::
CHK_(ptr);
} else goto handle_unusual;
continue;
// bool is_dynamic = 6;
case 6:
if (PROTOBUF_PREDICT_TRUE(static_cast<::PROTOBUF_NAMESPACE_ID::uint8>(tag) == 48)) {
is_dynamic_ = ::PROTOBUF_NAMESPACE_ID::internal::ReadVarint(&ptr);
CHK_(ptr);
} else goto handle_unusual;
continue;
default: {
handle_unusual:
if ((tag & 7) == 4 || tag == 0) {
@ -6656,6 +6665,19 @@ bool FieldData::MergePartialFromCodedStream(
break;
}
// bool is_dynamic = 6;
case 6: {
if (static_cast< ::PROTOBUF_NAMESPACE_ID::uint8>(tag) == (48 & 0xFF)) {
DO_((::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::ReadPrimitive<
bool, ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::TYPE_BOOL>(
input, &is_dynamic_)));
} else {
goto handle_unusual;
}
break;
}
default: {
handle_unusual:
if (tag == 0) {
@ -6716,6 +6738,11 @@ void FieldData::SerializeWithCachedSizes(
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteInt64(5, this->field_id(), output);
}
// bool is_dynamic = 6;
if (this->is_dynamic() != 0) {
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteBool(6, this->is_dynamic(), output);
}
if (_internal_metadata_.have_unknown_fields()) {
::PROTOBUF_NAMESPACE_ID::internal::WireFormat::SerializeUnknownFields(
_internal_metadata_.unknown_fields(), output);
@ -6765,6 +6792,11 @@ void FieldData::SerializeWithCachedSizes(
target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteInt64ToArray(5, this->field_id(), target);
}
// bool is_dynamic = 6;
if (this->is_dynamic() != 0) {
target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteBoolToArray(6, this->is_dynamic(), target);
}
if (_internal_metadata_.have_unknown_fields()) {
target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormat::SerializeUnknownFieldsToArray(
_internal_metadata_.unknown_fields(), target);
@ -6799,6 +6831,11 @@ size_t FieldData::ByteSizeLong() const {
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::EnumSize(this->type());
}
// bool is_dynamic = 6;
if (this->is_dynamic() != 0) {
total_size += 1 + 1;
}
// int64 field_id = 5;
if (this->field_id() != 0) {
total_size += 1 +
@ -6859,6 +6896,9 @@ void FieldData::MergeFrom(const FieldData& from) {
if (from.type() != 0) {
set_type(from.type());
}
if (from.is_dynamic() != 0) {
set_is_dynamic(from.is_dynamic());
}
if (from.field_id() != 0) {
set_field_id(from.field_id());
}
@ -6901,6 +6941,7 @@ void FieldData::InternalSwap(FieldData* other) {
field_name_.Swap(&other->field_name_, &::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(),
GetArenaNoVirtual());
swap(type_, other->type_);
swap(is_dynamic_, other->is_dynamic_);
swap(field_id_, other->field_id_);
swap(field_, other->field_);
swap(_oneof_case_[0], other->_oneof_case_[0]);

View File

@ -2660,6 +2660,7 @@ class FieldData :
enum : int {
kFieldNameFieldNumber = 2,
kTypeFieldNumber = 1,
kIsDynamicFieldNumber = 6,
kFieldIdFieldNumber = 5,
kScalarsFieldNumber = 3,
kVectorsFieldNumber = 4,
@ -2680,6 +2681,11 @@ class FieldData :
::milvus::proto::schema::DataType type() const;
void set_type(::milvus::proto::schema::DataType value);
// bool is_dynamic = 6;
void clear_is_dynamic();
bool is_dynamic() const;
void set_is_dynamic(bool value);
// int64 field_id = 5;
void clear_field_id();
::PROTOBUF_NAMESPACE_ID::int64 field_id() const;
@ -2715,6 +2721,7 @@ class FieldData :
::PROTOBUF_NAMESPACE_ID::internal::InternalMetadataWithArena _internal_metadata_;
::PROTOBUF_NAMESPACE_ID::internal::ArenaStringPtr field_name_;
int type_;
bool is_dynamic_;
::PROTOBUF_NAMESPACE_ID::int64 field_id_;
union FieldUnion {
FieldUnion() {}
@ -5039,6 +5046,20 @@ inline void FieldData::set_field_id(::PROTOBUF_NAMESPACE_ID::int64 value) {
// @@protoc_insertion_point(field_set:milvus.proto.schema.FieldData.field_id)
}
// bool is_dynamic = 6;
inline void FieldData::clear_is_dynamic() {
is_dynamic_ = false;
}
inline bool FieldData::is_dynamic() const {
// @@protoc_insertion_point(field_get:milvus.proto.schema.FieldData.is_dynamic)
return is_dynamic_;
}
inline void FieldData::set_is_dynamic(bool value) {
is_dynamic_ = value;
// @@protoc_insertion_point(field_set:milvus.proto.schema.FieldData.is_dynamic)
}
inline bool FieldData::has_field() const {
return field_case() != FIELD_NOT_SET;
}

View File

@ -236,3 +236,21 @@ func (c *Client) SetRates(ctx context.Context, req *proxypb.SetRatesRequest) (*c
}
return ret.(*commonpb.Status), err
}
func (c *Client) ListClientInfos(ctx context.Context, req *proxypb.ListClientInfosRequest) (*proxypb.ListClientInfosResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.ProxyCfg.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
)
ret, err := c.grpcClient.ReCall(ctx, func(client proxypb.ProxyClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return client.ListClientInfos(ctx, req)
})
if err != nil {
return nil, err
}
return ret.(*proxypb.ListClientInfosResponse), nil
}

View File

@ -183,6 +183,7 @@ func (s *Server) startExternalGrpc(grpcPort int, errChan chan error) {
proxy.UnaryServerInterceptor(proxy.PrivilegeInterceptor),
logutil.UnaryTraceLoggerInterceptor,
proxy.RateLimitInterceptor(limiter),
proxy.KeepActiveInterceptor,
)),
}
@ -943,3 +944,11 @@ func (s *Server) DropDatabase(ctx context.Context, request *milvuspb.DropDatabas
func (s *Server) ListDatabases(ctx context.Context, request *milvuspb.ListDatabasesRequest) (*milvuspb.ListDatabasesResponse, error) {
return s.proxy.ListDatabases(ctx, request)
}
func (s *Server) Connect(ctx context.Context, req *milvuspb.ConnectRequest) (*milvuspb.ConnectResponse, error) {
return s.proxy.Connect(ctx, req)
}
func (s *Server) ListClientInfos(ctx context.Context, req *proxypb.ListClientInfosRequest) (*proxypb.ListClientInfosResponse, error) {
return s.proxy.ListClientInfos(ctx, req)
}

View File

@ -703,6 +703,10 @@ func (m *MockProxy) Register() error {
return m.regErr
}
func (m *MockProxy) ListClientInfos(ctx context.Context, request *proxypb.ListClientInfosRequest) (*proxypb.ListClientInfosResponse, error) {
return nil, nil
}
func (m *MockProxy) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
return nil, nil
}
@ -1038,6 +1042,10 @@ func (m *MockProxy) RenameCollection(ctx context.Context, req *milvuspb.RenameCo
return nil, nil
}
func (m *MockProxy) Connect(ctx context.Context, req *milvuspb.ConnectRequest) (*milvuspb.ConnectResponse, error) {
return nil, nil
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
type WaitOption struct {

File diff suppressed because it is too large Load Diff

View File

@ -20,6 +20,8 @@ service Proxy {
rpc RefreshPolicyInfoCache(RefreshPolicyInfoCacheRequest) returns (common.Status) {}
rpc GetProxyMetrics(milvus.GetMetricsRequest) returns (milvus.GetMetricsResponse) {}
rpc SetRates(SetRatesRequest) returns (common.Status) {}
rpc ListClientInfos(ListClientInfosRequest) returns (ListClientInfosResponse) {}
}
message InvalidateCollMetaCacheRequest {
@ -61,3 +63,12 @@ message SetRatesRequest {
common.MsgBase base = 1;
repeated CollectionRate rates = 2;
}
message ListClientInfosRequest {
common.MsgBase base = 1;
}
message ListClientInfosResponse {
common.Status status = 1;
repeated common.ClientInfo client_infos = 2;
}

View File

@ -361,6 +361,92 @@ func (m *SetRatesRequest) GetRates() []*CollectionRate {
return nil
}
type ListClientInfosRequest struct {
Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *ListClientInfosRequest) Reset() { *m = ListClientInfosRequest{} }
func (m *ListClientInfosRequest) String() string { return proto.CompactTextString(m) }
func (*ListClientInfosRequest) ProtoMessage() {}
func (*ListClientInfosRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_700b50b08ed8dbaf, []int{6}
}
func (m *ListClientInfosRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ListClientInfosRequest.Unmarshal(m, b)
}
func (m *ListClientInfosRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_ListClientInfosRequest.Marshal(b, m, deterministic)
}
func (m *ListClientInfosRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_ListClientInfosRequest.Merge(m, src)
}
func (m *ListClientInfosRequest) XXX_Size() int {
return xxx_messageInfo_ListClientInfosRequest.Size(m)
}
func (m *ListClientInfosRequest) XXX_DiscardUnknown() {
xxx_messageInfo_ListClientInfosRequest.DiscardUnknown(m)
}
var xxx_messageInfo_ListClientInfosRequest proto.InternalMessageInfo
func (m *ListClientInfosRequest) GetBase() *commonpb.MsgBase {
if m != nil {
return m.Base
}
return nil
}
type ListClientInfosResponse struct {
Status *commonpb.Status `protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"`
ClientInfos []*commonpb.ClientInfo `protobuf:"bytes,2,rep,name=client_infos,json=clientInfos,proto3" json:"client_infos,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *ListClientInfosResponse) Reset() { *m = ListClientInfosResponse{} }
func (m *ListClientInfosResponse) String() string { return proto.CompactTextString(m) }
func (*ListClientInfosResponse) ProtoMessage() {}
func (*ListClientInfosResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_700b50b08ed8dbaf, []int{7}
}
func (m *ListClientInfosResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ListClientInfosResponse.Unmarshal(m, b)
}
func (m *ListClientInfosResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_ListClientInfosResponse.Marshal(b, m, deterministic)
}
func (m *ListClientInfosResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_ListClientInfosResponse.Merge(m, src)
}
func (m *ListClientInfosResponse) XXX_Size() int {
return xxx_messageInfo_ListClientInfosResponse.Size(m)
}
func (m *ListClientInfosResponse) XXX_DiscardUnknown() {
xxx_messageInfo_ListClientInfosResponse.DiscardUnknown(m)
}
var xxx_messageInfo_ListClientInfosResponse proto.InternalMessageInfo
func (m *ListClientInfosResponse) GetStatus() *commonpb.Status {
if m != nil {
return m.Status
}
return nil
}
func (m *ListClientInfosResponse) GetClientInfos() []*commonpb.ClientInfo {
if m != nil {
return m.ClientInfos
}
return nil
}
func init() {
proto.RegisterType((*InvalidateCollMetaCacheRequest)(nil), "milvus.proto.proxy.InvalidateCollMetaCacheRequest")
proto.RegisterType((*InvalidateCredCacheRequest)(nil), "milvus.proto.proxy.InvalidateCredCacheRequest")
@ -368,54 +454,60 @@ func init() {
proto.RegisterType((*RefreshPolicyInfoCacheRequest)(nil), "milvus.proto.proxy.RefreshPolicyInfoCacheRequest")
proto.RegisterType((*CollectionRate)(nil), "milvus.proto.proxy.CollectionRate")
proto.RegisterType((*SetRatesRequest)(nil), "milvus.proto.proxy.SetRatesRequest")
proto.RegisterType((*ListClientInfosRequest)(nil), "milvus.proto.proxy.ListClientInfosRequest")
proto.RegisterType((*ListClientInfosResponse)(nil), "milvus.proto.proxy.ListClientInfosResponse")
}
func init() { proto.RegisterFile("proxy.proto", fileDescriptor_700b50b08ed8dbaf) }
var fileDescriptor_700b50b08ed8dbaf = []byte{
// 657 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x54, 0xd1, 0x4e, 0x13, 0x41,
0x14, 0x65, 0x29, 0x2d, 0x78, 0x69, 0x4a, 0x32, 0x41, 0xac, 0x45, 0xb0, 0x59, 0x8c, 0x34, 0x24,
0x6e, 0xa5, 0x92, 0xe8, 0x33, 0xc5, 0x34, 0xc4, 0x40, 0x70, 0xab, 0x2f, 0xbe, 0x98, 0xd9, 0xdd,
0x0b, 0x5d, 0xb2, 0x9d, 0x59, 0x66, 0xa6, 0x68, 0x1f, 0x8c, 0x89, 0x7f, 0xe4, 0x9b, 0xdf, 0xe1,
0x17, 0x99, 0xdd, 0xd9, 0x6e, 0xbb, 0x65, 0x61, 0xa3, 0xc4, 0xb7, 0xde, 0x99, 0x73, 0x7b, 0xce,
0xb9, 0x77, 0xf6, 0xc0, 0x6a, 0x28, 0xf8, 0xd7, 0xb1, 0x15, 0x0a, 0xae, 0x38, 0x21, 0x43, 0x3f,
0xb8, 0x1e, 0x49, 0x5d, 0x59, 0xf1, 0x4d, 0xa3, 0xea, 0xf2, 0xe1, 0x90, 0x33, 0x7d, 0xd6, 0xa8,
0xf9, 0x4c, 0xa1, 0x60, 0x34, 0x48, 0xea, 0xea, 0x6c, 0x87, 0xf9, 0xcb, 0x80, 0xed, 0x63, 0x76,
0x4d, 0x03, 0xdf, 0xa3, 0x0a, 0xbb, 0x3c, 0x08, 0x4e, 0x50, 0xd1, 0x2e, 0x75, 0x07, 0x68, 0xe3,
0xd5, 0x08, 0xa5, 0x22, 0x2f, 0x61, 0xc9, 0xa1, 0x12, 0xeb, 0x46, 0xd3, 0x68, 0xad, 0x76, 0x9e,
0x58, 0x19, 0xc6, 0x84, 0xea, 0x44, 0x5e, 0x1c, 0x52, 0x89, 0x76, 0x8c, 0x24, 0x8f, 0x60, 0xd9,
0x73, 0x3e, 0x33, 0x3a, 0xc4, 0xfa, 0x62, 0xd3, 0x68, 0x3d, 0xb0, 0x2b, 0x9e, 0x73, 0x4a, 0x87,
0x48, 0x76, 0x61, 0xcd, 0xe5, 0x41, 0x80, 0xae, 0xf2, 0x39, 0xd3, 0x80, 0x52, 0x0c, 0xa8, 0x4d,
0x8f, 0x63, 0xa0, 0x09, 0xd5, 0xe9, 0xc9, 0xf1, 0x51, 0x7d, 0xa9, 0x69, 0xb4, 0x4a, 0x76, 0xe6,
0xcc, 0xbc, 0x84, 0xc6, 0x8c, 0x72, 0x81, 0xde, 0x3d, 0x55, 0x37, 0x60, 0x65, 0x24, 0xa3, 0x49,
0xa5, 0xb2, 0xd3, 0xda, 0xfc, 0x61, 0xc0, 0xc6, 0xc7, 0xf0, 0xff, 0x13, 0x45, 0x77, 0x21, 0x95,
0xf2, 0x0b, 0x17, 0x5e, 0x32, 0x9a, 0xb4, 0x36, 0xbf, 0xc3, 0x96, 0x8d, 0xe7, 0x02, 0xe5, 0xe0,
0x8c, 0x07, 0xbe, 0x3b, 0x3e, 0x66, 0xe7, 0xfc, 0x9e, 0x52, 0x36, 0xa0, 0xc2, 0xc3, 0x0f, 0xe3,
0x50, 0x0b, 0x29, 0xdb, 0x49, 0x45, 0xd6, 0xa1, 0xcc, 0xc3, 0x77, 0x38, 0x4e, 0x34, 0xe8, 0xc2,
0xfc, 0x6d, 0x40, 0xad, 0x9b, 0xae, 0xc0, 0xa6, 0x0a, 0xc9, 0x36, 0xc0, 0x74, 0x29, 0x31, 0x71,
0xc9, 0x9e, 0x39, 0x21, 0xfb, 0x50, 0x16, 0x54, 0xa1, 0xac, 0x2f, 0x36, 0x4b, 0xad, 0xd5, 0xce,
0x66, 0x56, 0x53, 0xfa, 0x34, 0xa3, 0xff, 0xb2, 0x35, 0x92, 0xbc, 0x86, 0x8a, 0x54, 0x71, 0x4f,
0xa9, 0x59, 0x6a, 0xd5, 0x3a, 0x4f, 0xb3, 0x3d, 0x49, 0xf1, 0x7e, 0xc4, 0x15, 0xed, 0x47, 0x38,
0x3b, 0x81, 0x93, 0x03, 0x28, 0xbb, 0xdc, 0x43, 0x59, 0x5f, 0x8a, 0xfb, 0xb6, 0x73, 0xfd, 0xbf,
0x15, 0x82, 0x8b, 0x2e, 0xf7, 0xd0, 0xd6, 0x60, 0xf3, 0x1b, 0xac, 0xf5, 0x51, 0x45, 0x02, 0xe4,
0xbf, 0xcf, 0xf1, 0x4d, 0xd6, 0xa6, 0x69, 0xdd, 0xfc, 0x2c, 0xad, 0xec, 0xe4, 0x12, 0xb7, 0x9d,
0x9f, 0xcb, 0x50, 0x3e, 0x8b, 0xee, 0x49, 0x00, 0xa4, 0x87, 0xaa, 0xcb, 0x87, 0x21, 0x67, 0xc8,
0x54, 0x5f, 0x9b, 0xb2, 0x72, 0xdd, 0xdf, 0x04, 0x26, 0xda, 0x1b, 0xcf, 0x72, 0xf1, 0x73, 0x60,
0x73, 0x81, 0x5c, 0xc1, 0x7a, 0x0f, 0xe3, 0xd2, 0x97, 0xca, 0x77, 0x65, 0x77, 0x40, 0x19, 0xc3,
0x80, 0x74, 0x6e, 0xd9, 0x50, 0x1e, 0x78, 0xc2, 0xb9, 0x93, 0xcb, 0xd9, 0x57, 0xc2, 0x67, 0x17,
0x36, 0xca, 0x90, 0x33, 0x89, 0xe6, 0x02, 0x11, 0xb0, 0x95, 0x8d, 0x1a, 0x3d, 0x8d, 0x34, 0x70,
0xe6, 0xb9, 0xf5, 0xd8, 0xee, 0x4e, 0xa7, 0xc6, 0x66, 0xee, 0x76, 0x22, 0xa9, 0xa3, 0xc8, 0x26,
0x85, 0x6a, 0x0f, 0xd5, 0x91, 0x37, 0xb1, 0xb7, 0x77, 0xbb, 0xbd, 0x14, 0xf4, 0x97, 0xb6, 0x2e,
0xe1, 0x71, 0x36, 0x87, 0x90, 0x29, 0x9f, 0x06, 0xda, 0x92, 0x55, 0x60, 0x69, 0x2e, 0x4d, 0x8a,
0xec, 0x38, 0xf0, 0x70, 0x1a, 0x43, 0xb3, 0x3c, 0x7b, 0x79, 0x3c, 0xf9, 0x89, 0x55, 0xc4, 0x71,
0x09, 0x1b, 0xf9, 0x31, 0x43, 0xf6, 0xf3, 0x48, 0xee, 0x8c, 0xa4, 0x22, 0x2e, 0x0f, 0xd6, 0x7a,
0xa8, 0xe2, 0xf7, 0x7f, 0x82, 0x4a, 0xf8, 0xae, 0x24, 0xcf, 0x6f, 0x7b, 0xf0, 0x09, 0x60, 0xf2,
0xcf, 0xbb, 0x85, 0xb8, 0x74, 0x43, 0xa7, 0xb0, 0x32, 0xf9, 0xc4, 0xc9, 0x4e, 0x9e, 0x87, 0xb9,
0x00, 0x28, 0x50, 0x7d, 0x78, 0xf0, 0xa9, 0x73, 0xe1, 0xab, 0xc1, 0xc8, 0x89, 0x6e, 0xda, 0x1a,
0xfa, 0xc2, 0xe7, 0xc9, 0xaf, 0xf6, 0xe4, 0x51, 0xb5, 0xe3, 0xee, 0x76, 0x4c, 0x11, 0x3a, 0x4e,
0x25, 0x2e, 0x5f, 0xfd, 0x09, 0x00, 0x00, 0xff, 0xff, 0x9a, 0xf6, 0x3d, 0x98, 0xc0, 0x07, 0x00,
0x00,
// 735 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x54, 0xdd, 0x4e, 0x1a, 0x5b,
0x14, 0x76, 0x44, 0x38, 0x9e, 0x05, 0x81, 0x64, 0xc7, 0x83, 0x1c, 0x3c, 0x2a, 0x19, 0x4f, 0x2a,
0xb1, 0x29, 0x54, 0x34, 0x69, 0xaf, 0xc5, 0x86, 0xd8, 0x56, 0x63, 0x87, 0xf6, 0xa6, 0x37, 0x66,
0x33, 0xb3, 0x94, 0x6d, 0x86, 0xbd, 0xc7, 0xd9, 0x1b, 0x5b, 0x2e, 0x9a, 0x26, 0x7d, 0x84, 0xbe,
0x40, 0x5f, 0xa3, 0xcf, 0xd1, 0x27, 0x6a, 0xe6, 0x87, 0x81, 0xc1, 0xad, 0xb4, 0x9a, 0xde, 0xb1,
0xf6, 0x7c, 0x6b, 0x7d, 0xdf, 0xfa, 0xe1, 0x83, 0xbc, 0xe7, 0x8b, 0x8f, 0xa3, 0x86, 0xe7, 0x0b,
0x25, 0x08, 0x19, 0x30, 0xf7, 0x7a, 0x28, 0xa3, 0xa8, 0x11, 0x7e, 0xa9, 0x16, 0x6c, 0x31, 0x18,
0x08, 0x1e, 0xbd, 0x55, 0x8b, 0x8c, 0x2b, 0xf4, 0x39, 0x75, 0xe3, 0xb8, 0x30, 0x9d, 0x61, 0x7e,
0x37, 0x60, 0xe3, 0x88, 0x5f, 0x53, 0x97, 0x39, 0x54, 0x61, 0x5b, 0xb8, 0xee, 0x31, 0x2a, 0xda,
0xa6, 0x76, 0x1f, 0x2d, 0xbc, 0x1a, 0xa2, 0x54, 0xe4, 0x29, 0x2c, 0xf5, 0xa8, 0xc4, 0x8a, 0x51,
0x33, 0xea, 0xf9, 0xd6, 0x7f, 0x8d, 0x14, 0x63, 0x4c, 0x75, 0x2c, 0x2f, 0x0e, 0xa8, 0x44, 0x2b,
0x44, 0x92, 0x55, 0xf8, 0xcb, 0xe9, 0x9d, 0x71, 0x3a, 0xc0, 0xca, 0x62, 0xcd, 0xa8, 0xff, 0x6d,
0xe5, 0x9c, 0xde, 0x09, 0x1d, 0x20, 0xd9, 0x86, 0x92, 0x2d, 0x5c, 0x17, 0x6d, 0xc5, 0x04, 0x8f,
0x00, 0x99, 0x10, 0x50, 0x9c, 0x3c, 0x87, 0x40, 0x13, 0x0a, 0x93, 0x97, 0xa3, 0xc3, 0xca, 0x52,
0xcd, 0xa8, 0x67, 0xac, 0xd4, 0x9b, 0x79, 0x09, 0xd5, 0x29, 0xe5, 0x3e, 0x3a, 0x0f, 0x54, 0x5d,
0x85, 0xe5, 0xa1, 0x0c, 0x26, 0x95, 0xc8, 0x4e, 0x62, 0xf3, 0x8b, 0x01, 0xe5, 0x77, 0xde, 0x9f,
0x27, 0x0a, 0xbe, 0x79, 0x54, 0xca, 0x0f, 0xc2, 0x77, 0xe2, 0xd1, 0x24, 0xb1, 0xf9, 0x19, 0xd6,
0x2d, 0x3c, 0xf7, 0x51, 0xf6, 0x4f, 0x85, 0xcb, 0xec, 0xd1, 0x11, 0x3f, 0x17, 0x0f, 0x94, 0x52,
0x86, 0x9c, 0xf0, 0xde, 0x8e, 0xbc, 0x48, 0x48, 0xd6, 0x8a, 0x23, 0xb2, 0x02, 0x59, 0xe1, 0xbd,
0xc2, 0x51, 0xac, 0x21, 0x0a, 0xcc, 0x1f, 0x06, 0x14, 0xdb, 0xc9, 0x0a, 0x2c, 0xaa, 0x90, 0x6c,
0x00, 0x4c, 0x96, 0x12, 0x12, 0x67, 0xac, 0xa9, 0x17, 0xb2, 0x0b, 0x59, 0x9f, 0x2a, 0x94, 0x95,
0xc5, 0x5a, 0xa6, 0x9e, 0x6f, 0xad, 0xa5, 0x35, 0x25, 0xa7, 0x19, 0xd4, 0xb2, 0x22, 0x24, 0x79,
0x06, 0x39, 0xa9, 0xc2, 0x9c, 0x4c, 0x2d, 0x53, 0x2f, 0xb6, 0x36, 0xd3, 0x39, 0x71, 0xf0, 0x66,
0x28, 0x14, 0xed, 0x06, 0x38, 0x2b, 0x86, 0x93, 0x7d, 0xc8, 0xda, 0xc2, 0x41, 0x59, 0x59, 0x0a,
0xf3, 0x36, 0xb4, 0xfd, 0xbf, 0xf0, 0x7d, 0xe1, 0xb7, 0x85, 0x83, 0x56, 0x04, 0x36, 0x3f, 0x41,
0xa9, 0x8b, 0x2a, 0x10, 0x20, 0xef, 0x3f, 0xc7, 0xe7, 0xe9, 0x36, 0xcd, 0xc6, 0xcd, 0xbf, 0x65,
0x23, 0x3d, 0xb9, 0xb8, 0x5b, 0xf3, 0x25, 0x94, 0x5f, 0x33, 0xa9, 0xda, 0x2e, 0x43, 0xae, 0x82,
0x8d, 0xde, 0x5f, 0x85, 0xf9, 0xd5, 0x80, 0xd5, 0x1b, 0xc5, 0xa4, 0x27, 0xb8, 0x44, 0xb2, 0x17,
0x4d, 0x75, 0x28, 0xe3, 0x7a, 0x6b, 0xda, 0x7a, 0xdd, 0x10, 0x62, 0xc5, 0x50, 0x72, 0x00, 0x05,
0x3b, 0xac, 0x75, 0xc6, 0x82, 0x62, 0x71, 0x77, 0x9b, 0xda, 0xd4, 0x09, 0xa9, 0x95, 0xb7, 0x27,
0x02, 0x5a, 0xdf, 0x96, 0x21, 0x7b, 0x1a, 0x0c, 0x80, 0xb8, 0x40, 0x3a, 0xa8, 0xda, 0x62, 0xe0,
0x09, 0x8e, 0x5c, 0x75, 0xa3, 0xad, 0x35, 0xb4, 0xeb, 0xbd, 0x09, 0x8c, 0xc7, 0x52, 0xfd, 0x5f,
0x8b, 0x9f, 0x01, 0x9b, 0x0b, 0xe4, 0x0a, 0x56, 0x3a, 0x18, 0x86, 0x4c, 0x2a, 0x66, 0xcb, 0x76,
0x9f, 0x72, 0x8e, 0x2e, 0x69, 0xdd, 0x72, 0x82, 0x3a, 0xf0, 0x98, 0x73, 0x4b, 0xcb, 0xd9, 0x55,
0x3e, 0xe3, 0x17, 0xe3, 0x09, 0x9b, 0x0b, 0xc4, 0x87, 0xf5, 0xb4, 0x97, 0x46, 0xeb, 0x4e, 0x1c,
0x75, 0x96, 0x3b, 0xba, 0x8b, 0xbb, 0xed, 0xb7, 0x7a, 0xd7, 0xa2, 0xcc, 0x05, 0x42, 0xa1, 0xd0,
0x41, 0x75, 0xe8, 0x8c, 0xdb, 0xdb, 0xb9, 0xbd, 0xbd, 0x04, 0xf4, 0x9b, 0x6d, 0x5d, 0xc2, 0xbf,
0x69, 0xa3, 0x45, 0xae, 0x18, 0x75, 0xa3, 0x96, 0x1a, 0x73, 0x5a, 0x9a, 0xb1, 0xcb, 0x79, 0xed,
0xf4, 0xe0, 0x9f, 0x89, 0xcf, 0x4e, 0xf3, 0xec, 0xe8, 0x78, 0xf4, 0x96, 0x3c, 0x8f, 0xe3, 0x12,
0xca, 0x7a, 0x1f, 0x25, 0xbb, 0x3a, 0x92, 0x3b, 0x3d, 0x77, 0x1e, 0x97, 0x03, 0xa5, 0x0e, 0xaa,
0xf0, 0xfe, 0x8f, 0x51, 0xf9, 0xcc, 0x96, 0xe4, 0xd1, 0x6d, 0x07, 0x1f, 0x03, 0xc6, 0x95, 0xb7,
0xe7, 0xe2, 0x92, 0x0d, 0x9d, 0xc0, 0xf2, 0xd8, 0xc3, 0xc8, 0x96, 0xae, 0x87, 0x19, 0x87, 0x9b,
0xa7, 0xda, 0x85, 0xd2, 0x8c, 0x8f, 0xe8, 0xe7, 0xaf, 0x77, 0xae, 0xea, 0xe3, 0x5f, 0xc2, 0x8e,
0xd5, 0x1f, 0xec, 0xbf, 0x6f, 0x5d, 0x30, 0xd5, 0x1f, 0xf6, 0x02, 0x1d, 0xcd, 0x28, 0xf5, 0x09,
0x13, 0xf1, 0xaf, 0xe6, 0xf8, 0x84, 0x9b, 0x61, 0xb5, 0x66, 0x58, 0xcd, 0xeb, 0xf5, 0x72, 0x61,
0xb8, 0xf7, 0x33, 0x00, 0x00, 0xff, 0xff, 0x0e, 0x7a, 0xad, 0xdf, 0x0f, 0x09, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
@ -439,6 +531,7 @@ type ProxyClient interface {
RefreshPolicyInfoCache(ctx context.Context, in *RefreshPolicyInfoCacheRequest, opts ...grpc.CallOption) (*commonpb.Status, error)
GetProxyMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest, opts ...grpc.CallOption) (*milvuspb.GetMetricsResponse, error)
SetRates(ctx context.Context, in *SetRatesRequest, opts ...grpc.CallOption) (*commonpb.Status, error)
ListClientInfos(ctx context.Context, in *ListClientInfosRequest, opts ...grpc.CallOption) (*ListClientInfosResponse, error)
}
type proxyClient struct {
@ -530,6 +623,15 @@ func (c *proxyClient) SetRates(ctx context.Context, in *SetRatesRequest, opts ..
return out, nil
}
func (c *proxyClient) ListClientInfos(ctx context.Context, in *ListClientInfosRequest, opts ...grpc.CallOption) (*ListClientInfosResponse, error) {
out := new(ListClientInfosResponse)
err := c.cc.Invoke(ctx, "/milvus.proto.proxy.Proxy/ListClientInfos", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// ProxyServer is the server API for Proxy service.
type ProxyServer interface {
GetComponentStates(context.Context, *milvuspb.GetComponentStatesRequest) (*milvuspb.ComponentStates, error)
@ -541,6 +643,7 @@ type ProxyServer interface {
RefreshPolicyInfoCache(context.Context, *RefreshPolicyInfoCacheRequest) (*commonpb.Status, error)
GetProxyMetrics(context.Context, *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error)
SetRates(context.Context, *SetRatesRequest) (*commonpb.Status, error)
ListClientInfos(context.Context, *ListClientInfosRequest) (*ListClientInfosResponse, error)
}
// UnimplementedProxyServer can be embedded to have forward compatible implementations.
@ -574,6 +677,9 @@ func (*UnimplementedProxyServer) GetProxyMetrics(ctx context.Context, req *milvu
func (*UnimplementedProxyServer) SetRates(ctx context.Context, req *SetRatesRequest) (*commonpb.Status, error) {
return nil, status.Errorf(codes.Unimplemented, "method SetRates not implemented")
}
func (*UnimplementedProxyServer) ListClientInfos(ctx context.Context, req *ListClientInfosRequest) (*ListClientInfosResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ListClientInfos not implemented")
}
func RegisterProxyServer(s *grpc.Server, srv ProxyServer) {
s.RegisterService(&_Proxy_serviceDesc, srv)
@ -741,6 +847,24 @@ func _Proxy_SetRates_Handler(srv interface{}, ctx context.Context, dec func(inte
return interceptor(ctx, in, info, handler)
}
func _Proxy_ListClientInfos_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ListClientInfosRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ProxyServer).ListClientInfos(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/milvus.proto.proxy.Proxy/ListClientInfos",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ProxyServer).ListClientInfos(ctx, req.(*ListClientInfosRequest))
}
return interceptor(ctx, in, info, handler)
}
var _Proxy_serviceDesc = grpc.ServiceDesc{
ServiceName: "milvus.proto.proxy.Proxy",
HandlerType: (*ProxyServer)(nil),
@ -781,6 +905,10 @@ var _Proxy_serviceDesc = grpc.ServiceDesc{
MethodName: "SetRates",
Handler: _Proxy_SetRates_Handler,
},
{
MethodName: "ListClientInfos",
Handler: _Proxy_ListClientInfos_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "proxy.proto",

View File

@ -0,0 +1,49 @@
package proxy
import (
"context"
"time"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
)
type clientInfo struct {
*commonpb.ClientInfo
identifier int64
lastActiveTime time.Time
}
func getLoggerOfClientInfo(info *commonpb.ClientInfo) []zap.Field {
fields := []zap.Field{
zap.String("sdk_type", info.GetSdkType()),
zap.String("sdk_version", info.GetSdkVersion()),
zap.String("local_time", info.GetLocalTime()),
zap.String("user", info.GetUser()),
zap.String("host", info.GetHost()),
}
for k, v := range info.GetReserved() {
fields = append(fields, zap.String(k, v))
}
return fields
}
func (c clientInfo) getLogger() []zap.Field {
fields := getLoggerOfClientInfo(c.ClientInfo)
fields = append(fields,
zap.Int64("identifier", c.identifier),
zap.Time("last_active_time", c.lastActiveTime),
)
return fields
}
func (c clientInfo) ctxLogRegister(ctx context.Context) {
log.Ctx(ctx).Info("client register", c.getLogger()...)
}
func (c clientInfo) logDeregister() {
log.Info("client deregister", c.getLogger()...)
}

View File

@ -0,0 +1,178 @@
package proxy
import (
"context"
"strconv"
"sync"
"time"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus/internal/log"
)
const (
// we shouldn't check this too frequently.
defaultConnCheckDuration = 2 * time.Minute
defaultTTLForInactiveConn = 24 * time.Hour
)
type connectionManager struct {
mu sync.RWMutex
initOnce sync.Once
stopOnce sync.Once
closeSignal chan struct{}
wg sync.WaitGroup
buffer chan int64
duration time.Duration
ttl time.Duration
clientInfos map[int64]clientInfo
}
type connectionManagerOption func(s *connectionManager)
func withDuration(duration time.Duration) connectionManagerOption {
return func(s *connectionManager) {
s.duration = duration
}
}
func withTTL(ttl time.Duration) connectionManagerOption {
return func(s *connectionManager) {
s.ttl = ttl
}
}
func (s *connectionManager) apply(opts ...connectionManagerOption) {
for _, opt := range opts {
opt(s)
}
}
func (s *connectionManager) init() {
s.initOnce.Do(func() {
s.wg.Add(1)
go s.checkLoop()
})
}
func (s *connectionManager) stop() {
s.stopOnce.Do(func() {
close(s.closeSignal)
s.wg.Wait()
})
}
func (s *connectionManager) checkLoop() {
defer s.wg.Done()
t := time.NewTicker(s.duration)
defer t.Stop()
for {
select {
case <-s.closeSignal:
log.Info("connection manager closed")
return
case identifier := <-s.buffer:
s.update(identifier)
case <-t.C:
s.removeLongInactiveClients()
}
}
}
func (s *connectionManager) register(ctx context.Context, identifier int64, info *commonpb.ClientInfo) {
cli := clientInfo{
ClientInfo: info,
identifier: identifier,
lastActiveTime: time.Now(),
}
s.mu.Lock()
defer s.mu.Unlock()
s.clientInfos[identifier] = cli
cli.ctxLogRegister(ctx)
}
func (s *connectionManager) keepActive(identifier int64) {
// make this asynchronous and then the rpc won't be blocked too long.
s.buffer <- identifier
}
func (s *connectionManager) update(identifier int64) {
s.mu.Lock()
defer s.mu.Unlock()
cli, ok := s.clientInfos[identifier]
if ok {
cli.lastActiveTime = time.Now()
s.clientInfos[identifier] = cli
}
}
func (s *connectionManager) removeLongInactiveClients() {
s.mu.Lock()
defer s.mu.Unlock()
for candidate, cli := range s.clientInfos {
if time.Since(cli.lastActiveTime) > s.ttl {
cli.logDeregister()
delete(s.clientInfos, candidate)
}
}
}
func (s *connectionManager) list() []*commonpb.ClientInfo {
s.mu.RLock()
defer s.mu.RUnlock()
clients := make([]*commonpb.ClientInfo, 0, len(s.clientInfos))
for identifier, cli := range s.clientInfos {
if cli.ClientInfo != nil {
client := proto.Clone(cli.ClientInfo).(*commonpb.ClientInfo)
if client.Reserved == nil {
client.Reserved = make(map[string]string)
}
client.Reserved["identifier"] = string(strconv.AppendInt(nil, identifier, 10))
client.Reserved["last_active_time"] = cli.lastActiveTime.String()
clients = append(clients, client)
}
}
return clients
}
func newConnectionManager(opts ...connectionManagerOption) *connectionManager {
s := &connectionManager{
closeSignal: make(chan struct{}, 1),
buffer: make(chan int64, 64),
duration: defaultConnCheckDuration,
ttl: defaultTTLForInactiveConn,
clientInfos: make(map[int64]clientInfo),
}
s.apply(opts...)
s.init()
return s
}
var connectionManagerInstance *connectionManager
var getConnectionManagerInstanceOnce sync.Once
func GetConnectionManager() *connectionManager {
getConnectionManagerInstanceOnce.Do(func() {
connectionManagerInstance = newConnectionManager(
withDuration(defaultConnCheckDuration),
withTTL(defaultTTLForInactiveConn))
})
return connectionManagerInstance
}

View File

@ -0,0 +1,68 @@
package proxy
import (
"context"
"testing"
"time"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/stretchr/testify/assert"
)
func Test_withDuration(t *testing.T) {
s := &connectionManager{}
s.apply(withDuration(defaultConnCheckDuration))
assert.Equal(t, defaultConnCheckDuration, s.duration)
}
func Test_withTTL(t *testing.T) {
s := &connectionManager{}
s.apply(withTTL(defaultTTLForInactiveConn))
assert.Equal(t, defaultTTLForInactiveConn, s.ttl)
}
func Test_connectionManager_apply(t *testing.T) {
s := &connectionManager{}
s.apply(
withDuration(defaultConnCheckDuration),
withTTL(defaultTTLForInactiveConn))
assert.Equal(t, defaultConnCheckDuration, s.duration)
assert.Equal(t, defaultTTLForInactiveConn, s.ttl)
}
func TestGetConnectionManager(t *testing.T) {
s := GetConnectionManager()
assert.Equal(t, defaultConnCheckDuration, s.duration)
assert.Equal(t, defaultTTLForInactiveConn, s.ttl)
}
func TestConnectionManager(t *testing.T) {
s := newConnectionManager(
withDuration(time.Millisecond*5),
withTTL(time.Millisecond*100))
s.register(context.TODO(), 1, &commonpb.ClientInfo{
Reserved: map[string]string{"for_test": "for_test"},
})
assert.Equal(t, 1, len(s.list()))
// register duplicate.
s.register(context.TODO(), 1, &commonpb.ClientInfo{})
assert.Equal(t, 1, len(s.list()))
s.register(context.TODO(), 2, &commonpb.ClientInfo{})
assert.Equal(t, 2, len(s.list()))
s.keepActive(1)
s.keepActive(2)
time.Sleep(time.Millisecond * 5)
assert.Equal(t, 2, len(s.list()))
time.Sleep(time.Millisecond * 100)
assert.Equal(t, 0, len(s.list()))
s.stop()
time.Sleep(time.Millisecond * 5)
}

View File

@ -5478,3 +5478,49 @@ func (node *Proxy) DescribeResourceGroup(ctx context.Context, request *milvuspb.
metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(Params.QueryCoordCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
return t.result, nil
}
func (node *Proxy) Connect(ctx context.Context, request *milvuspb.ConnectRequest) (*milvuspb.ConnectResponse, error) {
if !node.checkHealthy() {
return &milvuspb.ConnectResponse{Status: unhealthyStatus()}, nil
}
ts, err := node.tsoAllocator.AllocOne(ctx)
if err != nil {
return &milvuspb.ConnectResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
}, nil
}
serverInfo := &commonpb.ServerInfo{
BuildTags: os.Getenv(metricsinfo.GitBuildTagsEnvKey),
BuildTime: os.Getenv(metricsinfo.MilvusBuildTimeEnvKey),
GitCommit: os.Getenv(metricsinfo.GitCommitEnvKey),
GoVersion: os.Getenv(metricsinfo.MilvusUsedGoVersion),
DeployMode: os.Getenv(metricsinfo.DeployModeEnvKey),
Reserved: make(map[string]string),
}
GetConnectionManager().register(ctx, int64(ts), request.GetClientInfo())
return &milvuspb.ConnectResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
ServerInfo: serverInfo,
Identifier: int64(ts),
}, nil
}
func (node *Proxy) ListClientInfos(ctx context.Context, req *proxypb.ListClientInfosRequest) (*proxypb.ListClientInfosResponse, error) {
if !node.checkHealthy() {
return &proxypb.ListClientInfosResponse{Status: unhealthyStatus()}, nil
}
clients := GetConnectionManager().list()
return &proxypb.ListClientInfosResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
ClientInfos: clients,
}, nil
}

View File

@ -30,6 +30,7 @@ import (
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/proxypb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/sessionutil"
@ -722,3 +723,75 @@ func TestProxyListDatabase(t *testing.T) {
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
}
func TestProxy_Connect(t *testing.T) {
t.Run("proxy unhealthy", func(t *testing.T) {
node := &Proxy{}
node.UpdateStateCode(commonpb.StateCode_Abnormal)
resp, err := node.Connect(context.TODO(), nil)
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
t.Run("failed to allocate ts", func(t *testing.T) {
m := newMockTimestampAllocator(t)
m.On("AllocTimestamp",
mock.Anything,
mock.Anything,
).Return(nil, errors.New("error mock AllocateTimestamp"))
alloc, _ := newTimestampAllocator(m, 199)
node := Proxy{
tsoAllocator: alloc,
}
node.UpdateStateCode(commonpb.StateCode_Healthy)
resp, err := node.Connect(context.TODO(), nil)
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
t.Run("normal case", func(t *testing.T) {
m := newMockTimestampAllocator(t)
m.On("AllocTimestamp",
mock.Anything,
mock.Anything,
).Return(&rootcoordpb.AllocTimestampResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
Timestamp: 20230518,
Count: 1,
}, nil)
alloc, _ := newTimestampAllocator(m, 199)
node := Proxy{
tsoAllocator: alloc,
}
node.UpdateStateCode(commonpb.StateCode_Healthy)
resp, err := node.Connect(context.TODO(), &milvuspb.ConnectRequest{
ClientInfo: &commonpb.ClientInfo{},
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
}
func TestProxy_ListClientInfos(t *testing.T) {
t.Run("proxy unhealthy", func(t *testing.T) {
node := &Proxy{}
node.UpdateStateCode(commonpb.StateCode_Abnormal)
resp, err := node.ListClientInfos(context.TODO(), nil)
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
t.Run("normal case", func(t *testing.T) {
node := Proxy{}
node.UpdateStateCode(commonpb.StateCode_Healthy)
resp, err := node.ListClientInfos(context.TODO(), nil)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
}

View File

@ -29,6 +29,7 @@ type tsoAllocator interface {
}
// use timestampAllocatorInterface to keep other components testable
//go:generate mockery --name=timestampAllocatorInterface --filename=mock_tso_test.go --outpkg=proxy --output=. --inpackage --structname=mockTimestampAllocator --with-expecter
type timestampAllocatorInterface interface {
AllocTimestamp(ctx context.Context, req *rootcoordpb.AllocTimestampRequest) (*rootcoordpb.AllocTimestampResponse, error)
}

View File

@ -0,0 +1,42 @@
package proxy
import (
"context"
"fmt"
"strconv"
"github.com/milvus-io/milvus/internal/util"
"github.com/milvus-io/milvus/internal/util/funcutil"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
func getIdentifierFromContext(ctx context.Context) (int64, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return 0, fmt.Errorf("fail to get metadata from the context")
}
identifierContent, ok := md[util.IdentifierKey]
if !ok || len(identifierContent) < 1 {
return 0, fmt.Errorf("no identifier found in metadata")
}
identifier, err := strconv.ParseInt(identifierContent[0], 10, 64)
if err != nil {
return 0, fmt.Errorf("failed to parse identifier: %s, error: %s", identifierContent[0], err.Error())
}
return identifier, nil
}
func KeepActiveInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
// We shouldn't block the normal rpc. though this may be not very accurate enough.
// On the other hand, too many goroutines will also influence the rpc.
// Not sure which way is better, since actually we already make the `keepActive` asynchronous.
go func() {
identifier, err := getIdentifierFromContext(ctx)
if err == nil && funcutil.CheckCtxValid(ctx) {
GetConnectionManager().keepActive(identifier)
}
}()
return handler(ctx, req)
}

View File

@ -0,0 +1,65 @@
package proxy
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
func Test_getIdentifierFromContext(t *testing.T) {
t.Run("metadata not found", func(t *testing.T) {
ctx := context.TODO()
_, err := getIdentifierFromContext(ctx)
assert.Error(t, err)
})
t.Run("no identifier", func(t *testing.T) {
md := metadata.New(map[string]string{})
ctx := metadata.NewIncomingContext(context.TODO(), md)
_, err := getIdentifierFromContext(ctx)
assert.Error(t, err)
})
t.Run("invalid identifier", func(t *testing.T) {
md := metadata.New(map[string]string{
"identifier": "i-am-not-invalid-identifier",
})
ctx := metadata.NewIncomingContext(context.TODO(), md)
_, err := getIdentifierFromContext(ctx)
assert.Error(t, err)
})
t.Run("normal case", func(t *testing.T) {
md := metadata.New(map[string]string{
"identifier": "20230518",
})
ctx := metadata.NewIncomingContext(context.TODO(), md)
identifier, err := getIdentifierFromContext(ctx)
assert.NoError(t, err)
assert.Equal(t, int64(20230518), identifier)
})
}
func TestKeepActiveInterceptor(t *testing.T) {
md := metadata.New(map[string]string{
"identifier": "20230518",
})
ctx := metadata.NewIncomingContext(context.TODO(), md)
rpcCalled := false
rpcChan := make(chan struct{}, 1)
var handler grpc.UnaryHandler = func(ctx context.Context, req interface{}) (interface{}, error) {
rpcCalled = true
rpcChan <- struct{}{}
return "not-important", nil
}
got, err := KeepActiveInterceptor(ctx, nil, nil, handler)
<-rpcChan
assert.True(t, rpcCalled)
assert.NoError(t, err)
assert.Equal(t, "not-important", got)
}

View File

@ -0,0 +1,85 @@
// Code generated by mockery v2.16.0. DO NOT EDIT.
package proxy
import (
context "context"
rootcoordpb "github.com/milvus-io/milvus/internal/proto/rootcoordpb"
mock "github.com/stretchr/testify/mock"
)
// mockTimestampAllocator is an autogenerated mock type for the timestampAllocatorInterface type
type mockTimestampAllocator struct {
mock.Mock
}
type mockTimestampAllocator_Expecter struct {
mock *mock.Mock
}
func (_m *mockTimestampAllocator) EXPECT() *mockTimestampAllocator_Expecter {
return &mockTimestampAllocator_Expecter{mock: &_m.Mock}
}
// AllocTimestamp provides a mock function with given fields: ctx, req
func (_m *mockTimestampAllocator) AllocTimestamp(ctx context.Context, req *rootcoordpb.AllocTimestampRequest) (*rootcoordpb.AllocTimestampResponse, error) {
ret := _m.Called(ctx, req)
var r0 *rootcoordpb.AllocTimestampResponse
if rf, ok := ret.Get(0).(func(context.Context, *rootcoordpb.AllocTimestampRequest) *rootcoordpb.AllocTimestampResponse); ok {
r0 = rf(ctx, req)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*rootcoordpb.AllocTimestampResponse)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *rootcoordpb.AllocTimestampRequest) error); ok {
r1 = rf(ctx, req)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// mockTimestampAllocator_AllocTimestamp_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AllocTimestamp'
type mockTimestampAllocator_AllocTimestamp_Call struct {
*mock.Call
}
// AllocTimestamp is a helper method to define mock.On call
// - ctx context.Context
// - req *rootcoordpb.AllocTimestampRequest
func (_e *mockTimestampAllocator_Expecter) AllocTimestamp(ctx interface{}, req interface{}) *mockTimestampAllocator_AllocTimestamp_Call {
return &mockTimestampAllocator_AllocTimestamp_Call{Call: _e.mock.On("AllocTimestamp", ctx, req)}
}
func (_c *mockTimestampAllocator_AllocTimestamp_Call) Run(run func(ctx context.Context, req *rootcoordpb.AllocTimestampRequest)) *mockTimestampAllocator_AllocTimestamp_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*rootcoordpb.AllocTimestampRequest))
})
return _c
}
func (_c *mockTimestampAllocator_AllocTimestamp_Call) Return(_a0 *rootcoordpb.AllocTimestampResponse, _a1 error) *mockTimestampAllocator_AllocTimestamp_Call {
_c.Call.Return(_a0, _a1)
return _c
}
type mockConstructorTestingTnewMockTimestampAllocator interface {
mock.TestingT
Cleanup(func())
}
// newMockTimestampAllocator creates a new instance of mockTimestampAllocator. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func newMockTimestampAllocator(t mockConstructorTestingTnewMockTimestampAllocator) *mockTimestampAllocator {
mock := &mockTimestampAllocator{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -440,6 +440,8 @@ func (node *Proxy) Stop() error {
// https://github.com/milvus-io/milvus/issues/12282
node.UpdateStateCode(commonpb.StateCode_Abnormal)
GetConnectionManager().stop()
return nil
}

View File

@ -909,6 +909,8 @@ type Proxy interface {
// because it only obtains the metrics of Proxy, not including the topological metrics of Query cluster and Data cluster.
GetProxyMetrics(ctx context.Context, request *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error)
RefreshPolicyInfoCache(ctx context.Context, req *proxypb.RefreshPolicyInfoCacheRequest) (*commonpb.Status, error)
ListClientInfos(ctx context.Context, req *proxypb.ListClientInfosRequest) (*proxypb.ListClientInfosResponse, error)
}
// ProxyComponent defines the interface of proxy component.
@ -1410,6 +1412,8 @@ type ProxyComponent interface {
TransferReplica(ctx context.Context, req *milvuspb.TransferReplicaRequest) (*commonpb.Status, error)
ListResourceGroups(ctx context.Context, req *milvuspb.ListResourceGroupsRequest) (*milvuspb.ListResourceGroupsResponse, error)
DescribeResourceGroup(ctx context.Context, req *milvuspb.DescribeResourceGroupRequest) (*milvuspb.DescribeResourceGroupResponse, error)
Connect(ctx context.Context, req *milvuspb.ConnectRequest) (*milvuspb.ConnectResponse, error)
}
// QueryNode is the interface `querynode` package implements

View File

@ -58,6 +58,8 @@ const (
AnyWord = "*"
HeaderDatabase = "dbname"
IdentifierKey = "identifier"
)
const (

View File

@ -68,3 +68,7 @@ func (m *GrpcProxyClient) GetProxyMetrics(ctx context.Context, in *milvuspb.GetM
func (m *GrpcProxyClient) SetRates(ctx context.Context, in *proxypb.SetRatesRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
return &commonpb.Status{}, m.Err
}
func (m *GrpcProxyClient) ListClientInfos(ctx context.Context, in *proxypb.ListClientInfosRequest, opts ...grpc.CallOption) (*proxypb.ListClientInfosResponse, error) {
return &proxypb.ListClientInfosResponse{}, m.Err
}