From fce792b8bf4eb480267e7cf71cbdffcb969588d9 Mon Sep 17 00:00:00 2001 From: xige-16 Date: Fri, 28 May 2021 10:26:30 +0800 Subject: [PATCH] Add historical and streaming module in querynode (#5469) * add historical and streaming Signed-off-by: bigsheeper * fix GetSegmentInfo Signed-off-by: bigsheeper * pass regression test Signed-off-by: xige-16 Co-authored-by: bigsheeper --- internal/msgstream/mq_msgstream.go | 2 + internal/msgstream/mq_msgstream_test.go | 2 +- internal/proto/internal.proto | 3 + internal/proto/internalpb/internal.pb.go | 259 ++++++++++-------- internal/querynode/collection_replica.go | 15 + internal/querynode/collection_replica_test.go | 78 +++--- internal/querynode/data_sync_service_test.go | 4 +- internal/querynode/historical.go | 54 ++++ internal/querynode/impl.go | 26 +- internal/querynode/load_service_test.go | 16 +- internal/querynode/meta_service.go | 252 ----------------- internal/querynode/meta_service_test.go | 227 --------------- internal/querynode/query_node.go | 92 ++----- internal/querynode/query_node_test.go | 12 +- internal/querynode/search_collection.go | 95 +++++-- internal/querynode/search_service.go | 17 +- internal/querynode/search_service_test.go | 32 +-- internal/querynode/stats_service_test.go | 12 +- internal/querynode/streaming.go | 75 +++++ internal/querynode/task.go | 89 ++++-- 20 files changed, 560 insertions(+), 802 deletions(-) create mode 100644 internal/querynode/historical.go delete mode 100644 internal/querynode/meta_service.go delete mode 100644 internal/querynode/meta_service_test.go create mode 100644 internal/querynode/streaming.go diff --git a/internal/msgstream/mq_msgstream.go b/internal/msgstream/mq_msgstream.go index 9921149e67..83cec0afec 100644 --- a/internal/msgstream/mq_msgstream.go +++ b/internal/msgstream/mq_msgstream.go @@ -542,6 +542,7 @@ func (ms *MqTtMsgStream) bufMsgPackToChannel() { ChannelName: tempBuffer[0].Position().ChannelName, MsgID: tempBuffer[0].Position().MsgID, Timestamp: timeStamp, + MsgGroup: consumer.Subscription(), } endMsgPositions = append(endMsgPositions, newPos) } else { @@ -549,6 +550,7 @@ func (ms *MqTtMsgStream) bufMsgPackToChannel() { ChannelName: timeTickMsg.Position().ChannelName, MsgID: timeTickMsg.Position().MsgID, Timestamp: timeStamp, + MsgGroup: consumer.Subscription(), } endMsgPositions = append(endMsgPositions, newPos) } diff --git a/internal/msgstream/mq_msgstream_test.go b/internal/msgstream/mq_msgstream_test.go index adcfbc152a..b1d52b3fb9 100644 --- a/internal/msgstream/mq_msgstream_test.go +++ b/internal/msgstream/mq_msgstream_test.go @@ -247,11 +247,11 @@ func getPulsarTtOutputStreamAndSeek(pulsarAddress string, positions []*MsgPositi pulsarClient, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress}) outputStream, _ := NewMqTtMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) //outputStream.AsConsumer(consumerChannels, consumerSubName) - outputStream.Start() for _, pos := range positions { pos.MsgGroup = funcutil.RandomString(4) outputStream.Seek(pos) } + outputStream.Start() //outputStream.Start() return outputStream } diff --git a/internal/proto/internal.proto b/internal/proto/internal.proto index 096287901e..9232c773b7 100644 --- a/internal/proto/internal.proto +++ b/internal/proto/internal.proto @@ -140,6 +140,9 @@ message SearchResults { string result_channelID = 3; string metric_type = 4; repeated bytes hits = 5; + repeated int64 sealed_segmentIDs_searched = 6; + repeated string channelIDs_searched = 7; + repeated int64 global_sealed_segmentIDs = 8; } message RetrieveRequest { diff --git a/internal/proto/internalpb/internal.pb.go b/internal/proto/internalpb/internal.pb.go index 47fa1cbdae..83a4206771 100644 --- a/internal/proto/internalpb/internal.pb.go +++ b/internal/proto/internalpb/internal.pb.go @@ -1140,14 +1140,17 @@ func (m *SearchRequest) GetSerializedExprPlan() []byte { } type SearchResults struct { - Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"` - Status *commonpb.Status `protobuf:"bytes,2,opt,name=status,proto3" json:"status,omitempty"` - ResultChannelID string `protobuf:"bytes,3,opt,name=result_channelID,json=resultChannelID,proto3" json:"result_channelID,omitempty"` - MetricType string `protobuf:"bytes,4,opt,name=metric_type,json=metricType,proto3" json:"metric_type,omitempty"` - Hits [][]byte `protobuf:"bytes,5,rep,name=hits,proto3" json:"hits,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"` + Status *commonpb.Status `protobuf:"bytes,2,opt,name=status,proto3" json:"status,omitempty"` + ResultChannelID string `protobuf:"bytes,3,opt,name=result_channelID,json=resultChannelID,proto3" json:"result_channelID,omitempty"` + MetricType string `protobuf:"bytes,4,opt,name=metric_type,json=metricType,proto3" json:"metric_type,omitempty"` + Hits [][]byte `protobuf:"bytes,5,rep,name=hits,proto3" json:"hits,omitempty"` + SealedSegmentIDsSearched []int64 `protobuf:"varint,6,rep,packed,name=sealed_segmentIDs_searched,json=sealedSegmentIDsSearched,proto3" json:"sealed_segmentIDs_searched,omitempty"` + ChannelIDsSearched []string `protobuf:"bytes,7,rep,name=channelIDs_searched,json=channelIDsSearched,proto3" json:"channelIDs_searched,omitempty"` + GlobalSealedSegmentIDs []int64 `protobuf:"varint,8,rep,packed,name=global_sealed_segmentIDs,json=globalSealedSegmentIDs,proto3" json:"global_sealed_segmentIDs,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *SearchResults) Reset() { *m = SearchResults{} } @@ -1210,6 +1213,27 @@ func (m *SearchResults) GetHits() [][]byte { return nil } +func (m *SearchResults) GetSealedSegmentIDsSearched() []int64 { + if m != nil { + return m.SealedSegmentIDsSearched + } + return nil +} + +func (m *SearchResults) GetChannelIDsSearched() []string { + if m != nil { + return m.ChannelIDsSearched + } + return nil +} + +func (m *SearchResults) GetGlobalSealedSegmentIDs() []int64 { + if m != nil { + return m.GlobalSealedSegmentIDs + } + return nil +} + type RetrieveRequest struct { Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"` ResultChannelID string `protobuf:"bytes,2,opt,name=result_channelID,json=resultChannelID,proto3" json:"result_channelID,omitempty"` @@ -2075,112 +2099,115 @@ func init() { func init() { proto.RegisterFile("internal.proto", fileDescriptor_41f4a519b878ee3b) } var fileDescriptor_41f4a519b878ee3b = []byte{ - // 1697 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xec, 0x58, 0xcd, 0x8f, 0x1c, 0x47, - 0x15, 0xa7, 0x67, 0x66, 0x77, 0x66, 0x5e, 0xf7, 0xae, 0xc7, 0x65, 0x3b, 0x69, 0x7f, 0x24, 0x99, - 0x74, 0xf8, 0x58, 0x12, 0x61, 0x5b, 0x1b, 0x20, 0x88, 0x8b, 0x13, 0xef, 0x24, 0x66, 0xe4, 0xac, - 0xb5, 0xf4, 0x38, 0x91, 0xe0, 0xd2, 0xaa, 0xe9, 0x2e, 0xcf, 0x74, 0xd2, 0x5f, 0x54, 0x55, 0xdb, - 0x3b, 0x39, 0x71, 0xe0, 0x04, 0x02, 0x21, 0x24, 0x4e, 0xfc, 0x0f, 0xfc, 0x09, 0x80, 0x72, 0x42, - 0xe2, 0x8e, 0xc4, 0xbf, 0xc1, 0x91, 0x13, 0xaa, 0x57, 0xd5, 0x3d, 0x1f, 0xee, 0x5d, 0x26, 0x6b, - 0x21, 0x88, 0xe0, 0xd6, 0xf5, 0xde, 0xab, 0xaa, 0xf7, 0xfb, 0xbd, 0x57, 0xef, 0x55, 0x35, 0xec, - 0xc7, 0x99, 0x64, 0x3c, 0xa3, 0xc9, 0xed, 0x82, 0xe7, 0x32, 0x27, 0xd7, 0xd2, 0x38, 0x79, 0x5a, - 0x0a, 0x3d, 0xba, 0x5d, 0x29, 0x6f, 0x38, 0x61, 0x9e, 0xa6, 0x79, 0xa6, 0xc5, 0x37, 0x1c, 0x11, - 0xce, 0x59, 0x4a, 0xf5, 0xc8, 0xfb, 0x83, 0x05, 0x7b, 0x47, 0x79, 0x5a, 0xe4, 0x19, 0xcb, 0xe4, - 0x38, 0x7b, 0x92, 0x93, 0x97, 0x60, 0x37, 0xcb, 0x23, 0x36, 0x1e, 0xb9, 0xd6, 0xd0, 0x3a, 0x68, - 0xfb, 0x66, 0x44, 0x08, 0x74, 0x78, 0x9e, 0x30, 0xb7, 0x35, 0xb4, 0x0e, 0xfa, 0x3e, 0x7e, 0x93, - 0x7b, 0x00, 0x42, 0x52, 0xc9, 0x82, 0x30, 0x8f, 0x98, 0xdb, 0x1e, 0x5a, 0x07, 0xfb, 0x87, 0xc3, - 0xdb, 0x8d, 0x5e, 0xdc, 0x9e, 0x28, 0xc3, 0xa3, 0x3c, 0x62, 0x7e, 0x5f, 0x54, 0x9f, 0xe4, 0x5d, - 0x00, 0x76, 0x2a, 0x39, 0x0d, 0xe2, 0xec, 0x49, 0xee, 0x76, 0x86, 0xed, 0x03, 0xfb, 0xf0, 0xf5, - 0xf5, 0x05, 0x8c, 0xf3, 0x0f, 0xd9, 0xe2, 0x63, 0x9a, 0x94, 0xec, 0x84, 0xc6, 0xdc, 0xef, 0xe3, - 0x24, 0xe5, 0xae, 0xf7, 0x37, 0x0b, 0x2e, 0xd5, 0x00, 0x70, 0x0f, 0x41, 0xbe, 0x0f, 0x3b, 0xb8, - 0x05, 0x22, 0xb0, 0x0f, 0xbf, 0x7a, 0x86, 0x47, 0x6b, 0xb8, 0x7d, 0x3d, 0x85, 0x7c, 0x04, 0x57, - 0x44, 0x39, 0x0d, 0x2b, 0x55, 0x80, 0x52, 0xe1, 0xb6, 0xd0, 0xb5, 0xed, 0x56, 0x22, 0xab, 0x0b, - 0x18, 0x97, 0xde, 0x86, 0x5d, 0xb5, 0x52, 0x29, 0x90, 0x25, 0xfb, 0xf0, 0x66, 0x23, 0xc8, 0x09, - 0x9a, 0xf8, 0xc6, 0xd4, 0xbb, 0x09, 0xd7, 0x1f, 0x30, 0xb9, 0x81, 0xce, 0x67, 0x3f, 0x29, 0x99, - 0x90, 0x46, 0xf9, 0x38, 0x4e, 0xd9, 0xe3, 0x38, 0xfc, 0xf4, 0x68, 0x4e, 0xb3, 0x8c, 0x25, 0x95, - 0xf2, 0x15, 0xb8, 0xf9, 0x80, 0xe1, 0x84, 0x58, 0xc8, 0x38, 0x14, 0x1b, 0xea, 0x6b, 0x70, 0xe5, - 0x01, 0x93, 0xa3, 0x68, 0x43, 0xfc, 0x31, 0xf4, 0x1e, 0xa9, 0x60, 0xab, 0x34, 0xf8, 0x2e, 0x74, - 0x69, 0x14, 0x71, 0x26, 0x84, 0x61, 0xf1, 0x56, 0xa3, 0xc7, 0xef, 0x69, 0x1b, 0xbf, 0x32, 0x6e, - 0x4a, 0x13, 0xef, 0x13, 0x80, 0x71, 0x16, 0xcb, 0x13, 0xca, 0x69, 0x2a, 0xce, 0x4c, 0xb0, 0x11, - 0x38, 0x42, 0x52, 0x2e, 0x83, 0x02, 0xed, 0x0c, 0xe5, 0x5b, 0x64, 0x83, 0x8d, 0xd3, 0xf4, 0xea, - 0xde, 0x8f, 0x00, 0x26, 0x92, 0xc7, 0xd9, 0xec, 0xc3, 0x58, 0x48, 0xb5, 0xd7, 0x53, 0x65, 0xa7, - 0x40, 0xb4, 0x0f, 0xfa, 0xbe, 0x19, 0xad, 0x84, 0xa3, 0xb5, 0x7d, 0x38, 0xee, 0x81, 0x5d, 0xd1, - 0x7d, 0x2c, 0x66, 0xe4, 0x2e, 0x74, 0xa6, 0x54, 0xb0, 0x73, 0xe9, 0x39, 0x16, 0xb3, 0xfb, 0x54, - 0x30, 0x1f, 0x2d, 0xbd, 0xcf, 0x5b, 0xf0, 0xf2, 0x11, 0x67, 0x98, 0xfc, 0x49, 0xc2, 0x42, 0x19, - 0xe7, 0x99, 0xe1, 0xfe, 0x8b, 0xaf, 0x46, 0x5e, 0x86, 0x6e, 0x34, 0x0d, 0x32, 0x9a, 0x56, 0x64, - 0xef, 0x46, 0xd3, 0x47, 0x34, 0x65, 0xe4, 0xeb, 0xb0, 0x1f, 0xd6, 0xeb, 0x2b, 0x09, 0xe6, 0x5c, - 0xdf, 0xdf, 0x90, 0xaa, 0x50, 0x45, 0xd3, 0xf1, 0xc8, 0xed, 0x60, 0x18, 0xf0, 0x9b, 0x78, 0xe0, - 0x2c, 0xad, 0xc6, 0x23, 0x77, 0x07, 0x75, 0x6b, 0x32, 0x45, 0xaa, 0xae, 0x21, 0xee, 0xee, 0xd0, - 0x3a, 0x70, 0x7c, 0x33, 0x22, 0x77, 0xe1, 0xca, 0xd3, 0x98, 0xcb, 0x92, 0x26, 0x26, 0xaf, 0xd4, - 0x2e, 0xc2, 0xed, 0x22, 0xf3, 0x4d, 0x2a, 0x72, 0x08, 0x57, 0x8b, 0xf9, 0x42, 0xc4, 0xe1, 0xc6, - 0x94, 0x1e, 0x4e, 0x69, 0xd4, 0x79, 0x9f, 0x5b, 0x70, 0x6d, 0xc4, 0xf3, 0xe2, 0xcb, 0x4c, 0xa1, - 0xf7, 0xcb, 0x16, 0xbc, 0xa4, 0x33, 0xe1, 0x84, 0x72, 0x19, 0xff, 0x9b, 0x50, 0x7c, 0x03, 0x2e, - 0x2d, 0x77, 0xd5, 0x06, 0xcd, 0x30, 0xbe, 0x06, 0xfb, 0x45, 0xe5, 0x87, 0xb6, 0xeb, 0xa0, 0xdd, - 0x5e, 0x2d, 0x5d, 0x43, 0xbb, 0x73, 0x0e, 0xda, 0xdd, 0x86, 0x84, 0x19, 0x82, 0x5d, 0x2f, 0x34, - 0x1e, 0xb9, 0x5d, 0x34, 0x59, 0x15, 0x79, 0xbf, 0x68, 0xc1, 0x55, 0x15, 0xd4, 0xff, 0xb3, 0xa1, - 0xd8, 0xf8, 0x63, 0x0b, 0x88, 0xce, 0x8e, 0x71, 0x16, 0xb1, 0xd3, 0xff, 0x24, 0x17, 0xaf, 0x00, - 0x3c, 0x89, 0x59, 0x12, 0xad, 0xf2, 0xd0, 0x47, 0xc9, 0x0b, 0x71, 0xe0, 0x42, 0x17, 0x17, 0xa9, - 0xf1, 0x57, 0x43, 0xd5, 0x05, 0xf4, 0x8d, 0xc0, 0x74, 0x81, 0xde, 0xd6, 0x5d, 0x00, 0xa7, 0x99, - 0x2e, 0xf0, 0xfb, 0x36, 0xec, 0x8d, 0x33, 0xc1, 0xb8, 0xfc, 0x5f, 0x4e, 0x24, 0x72, 0x0b, 0xfa, - 0x82, 0xcd, 0x52, 0x75, 0x31, 0x19, 0xb9, 0x3d, 0xd4, 0x2f, 0x05, 0x4a, 0x1b, 0xea, 0xca, 0x3a, - 0x1e, 0xb9, 0x7d, 0x1d, 0xda, 0x5a, 0x40, 0x5e, 0x05, 0x90, 0x71, 0xca, 0x84, 0xa4, 0x69, 0x21, - 0x5c, 0x18, 0xb6, 0x0f, 0x3a, 0xfe, 0x8a, 0x44, 0x75, 0x01, 0x9e, 0x3f, 0x1b, 0x8f, 0x84, 0x6b, - 0x0f, 0xdb, 0xaa, 0x8d, 0xeb, 0x11, 0xf9, 0x36, 0xf4, 0x78, 0xfe, 0x2c, 0x88, 0xa8, 0xa4, 0xae, - 0x83, 0xc1, 0xbb, 0xde, 0x48, 0xf6, 0xfd, 0x24, 0x9f, 0xfa, 0x5d, 0x9e, 0x3f, 0x1b, 0x51, 0x49, - 0xbd, 0xbf, 0xb7, 0x60, 0x6f, 0xc2, 0x28, 0x0f, 0xe7, 0x17, 0x0f, 0xd8, 0x37, 0x61, 0xc0, 0x99, - 0x28, 0x13, 0x19, 0x2c, 0x61, 0xe9, 0xc8, 0x5d, 0xd2, 0xf2, 0xa3, 0x1a, 0x5c, 0x45, 0x79, 0xfb, - 0x1c, 0xca, 0x3b, 0x0d, 0x94, 0x7b, 0xe0, 0xac, 0xf0, 0x2b, 0xdc, 0x1d, 0x84, 0xbe, 0x26, 0x23, - 0x03, 0x68, 0x47, 0x22, 0xc1, 0x88, 0xf5, 0x7d, 0xf5, 0x49, 0xde, 0x82, 0xcb, 0x45, 0x42, 0x43, - 0x36, 0xcf, 0x93, 0x88, 0xf1, 0x60, 0xc6, 0xf3, 0xb2, 0xc0, 0x70, 0x39, 0xfe, 0x60, 0x45, 0xf1, - 0x40, 0xc9, 0xc9, 0x3b, 0xd0, 0x8b, 0x44, 0x12, 0xc8, 0x45, 0xc1, 0x30, 0x64, 0xfb, 0x67, 0x60, - 0x1f, 0x89, 0xe4, 0xf1, 0xa2, 0x60, 0x7e, 0x37, 0xd2, 0x1f, 0xe4, 0x2e, 0x5c, 0x15, 0x8c, 0xc7, - 0x34, 0x89, 0x3f, 0x63, 0x51, 0xc0, 0x4e, 0x0b, 0x1e, 0x14, 0x09, 0xcd, 0x30, 0xb2, 0x8e, 0x4f, - 0x96, 0xba, 0xf7, 0x4f, 0x0b, 0x7e, 0x92, 0xd0, 0xcc, 0xfb, 0xab, 0xb5, 0x24, 0x5d, 0xf1, 0x23, - 0x2e, 0x40, 0xfa, 0x45, 0x6e, 0x52, 0x8d, 0x91, 0x6a, 0x37, 0x47, 0xea, 0x35, 0xb0, 0x53, 0x26, - 0x79, 0x1c, 0x6a, 0x46, 0xf4, 0x01, 0x02, 0x2d, 0x42, 0xd8, 0x04, 0x3a, 0xf3, 0x58, 0xea, 0x50, - 0x38, 0x3e, 0x7e, 0x7b, 0xbf, 0x6b, 0xc1, 0x25, 0x5f, 0x99, 0xb0, 0xa7, 0xec, 0x4b, 0x9f, 0x4f, - 0x6f, 0x42, 0x3b, 0x8e, 0x04, 0xe6, 0x93, 0x7d, 0xe8, 0xae, 0xfb, 0x6d, 0xde, 0x72, 0xe3, 0x91, - 0xf0, 0x95, 0x11, 0x79, 0x03, 0xf6, 0xf2, 0x52, 0x16, 0xa5, 0x0c, 0xb0, 0x9e, 0x56, 0x97, 0x2f, - 0x47, 0x0b, 0x3f, 0x40, 0x99, 0xf7, 0xeb, 0x35, 0x76, 0xfe, 0x5b, 0x03, 0x6f, 0x60, 0x77, 0xb6, - 0x81, 0x7d, 0x0f, 0x6c, 0x8d, 0x57, 0x97, 0x9d, 0x1d, 0x2c, 0x3b, 0xaf, 0x36, 0xce, 0x41, 0x0e, - 0x54, 0xc9, 0xf1, 0x75, 0x63, 0x13, 0x58, 0x7e, 0xfe, 0x62, 0xc1, 0xde, 0x88, 0x25, 0x4c, 0xbe, - 0x40, 0xba, 0x34, 0xb4, 0x85, 0x56, 0x63, 0x5b, 0x58, 0xab, 0xbb, 0xed, 0xf3, 0xeb, 0x6e, 0xe7, - 0xb9, 0xba, 0xfb, 0x3a, 0x38, 0x05, 0x8f, 0x53, 0xca, 0x17, 0xc1, 0xa7, 0x6c, 0x51, 0xa5, 0x8c, - 0x6d, 0x64, 0x0f, 0xd9, 0x42, 0x78, 0xff, 0xb0, 0xa0, 0xff, 0x61, 0x4e, 0x23, 0xbc, 0x3d, 0x5c, - 0x00, 0xc9, 0x5a, 0xdb, 0x68, 0x35, 0xb4, 0x8d, 0xfa, 0x02, 0x50, 0xb9, 0xbf, 0xbc, 0x11, 0xac, - 0x74, 0xf6, 0xce, 0x7a, 0x67, 0x7f, 0x0d, 0xec, 0x58, 0x39, 0x14, 0x14, 0x54, 0xce, 0xb5, 0xdf, - 0x7d, 0x1f, 0x50, 0x74, 0xa2, 0x24, 0xaa, 0xf5, 0x57, 0x06, 0xd8, 0xfa, 0x77, 0xb7, 0x6e, 0xfd, - 0x66, 0x11, 0x6c, 0xfd, 0x7f, 0x6a, 0x81, 0x3b, 0xd1, 0xce, 0x2e, 0xdf, 0xbf, 0x1f, 0x15, 0x11, - 0x3e, 0xc3, 0x6f, 0x41, 0x7f, 0x52, 0x23, 0xd3, 0xcf, 0xcf, 0xa5, 0x40, 0x51, 0x7f, 0xcc, 0xd2, - 0x9c, 0x2f, 0x26, 0xf1, 0x67, 0xcc, 0x00, 0x5f, 0x91, 0x28, 0x6c, 0x8f, 0xca, 0xd4, 0xcf, 0x9f, - 0x09, 0x73, 0xd0, 0xab, 0xa1, 0xc2, 0x16, 0xe2, 0x85, 0x2d, 0x50, 0x91, 0x42, 0xe4, 0x1d, 0x1f, - 0xb4, 0x48, 0xbd, 0x19, 0xc9, 0x75, 0xe8, 0xb1, 0x2c, 0xd2, 0xda, 0x1d, 0xd4, 0x76, 0x59, 0x16, - 0xa1, 0x6a, 0x0c, 0xfb, 0xe6, 0xdd, 0x9b, 0x0b, 0x3c, 0xf4, 0xe6, 0xa8, 0x7b, 0x67, 0xfc, 0x6c, - 0x38, 0x16, 0xb3, 0x13, 0x63, 0xe9, 0xef, 0xe9, 0xa7, 0xaf, 0x19, 0x92, 0xf7, 0xc1, 0x51, 0xbb, - 0xd4, 0x0b, 0x75, 0xb7, 0x5e, 0xc8, 0x66, 0x59, 0x54, 0x0d, 0xbc, 0xdf, 0x58, 0x70, 0xf9, 0x39, - 0x0a, 0x2f, 0x90, 0x47, 0x0f, 0xa1, 0x37, 0x61, 0x33, 0xb5, 0x44, 0xf5, 0x9a, 0xbf, 0x73, 0xd6, - 0xcf, 0xa1, 0x33, 0x02, 0xe6, 0xd7, 0x0b, 0x78, 0x9f, 0xd4, 0x61, 0xfd, 0x20, 0x29, 0xc5, 0xfc, - 0x28, 0x4f, 0x0b, 0x75, 0x5e, 0xa3, 0x0b, 0x3d, 0xc5, 0xcf, 0x4f, 0x71, 0xef, 0x67, 0x16, 0x00, - 0x1e, 0x1e, 0xdc, 0xfa, 0xb9, 0xc4, 0xb4, 0x2e, 0x92, 0x98, 0xaa, 0x3f, 0x67, 0x65, 0x1a, 0x70, - 0x96, 0x50, 0xc9, 0xa2, 0xc0, 0xec, 0x26, 0xcc, 0xee, 0x24, 0x2b, 0x53, 0x5f, 0xab, 0x0c, 0x4c, - 0xe1, 0xfd, 0xca, 0x02, 0xc0, 0x7a, 0xa5, 0xdd, 0xd8, 0x6c, 0x28, 0xd6, 0xf9, 0x17, 0xeb, 0xd6, - 0xfa, 0xf1, 0xbb, 0x5f, 0x1d, 0x3f, 0x81, 0xf1, 0x68, 0x37, 0x61, 0xa8, 0xe3, 0xb1, 0x04, 0x6f, - 0x4e, 0xa8, 0x8e, 0xc1, 0x6f, 0x2d, 0x70, 0x56, 0x42, 0x25, 0xd6, 0x69, 0xb4, 0x36, 0x2b, 0x05, - 0xf6, 0x6e, 0x75, 0x7a, 0x02, 0xb1, 0x72, 0xa0, 0xd2, 0xe5, 0x81, 0xba, 0x0e, 0x3d, 0xa4, 0x64, - 0xe5, 0x44, 0x65, 0xe6, 0x44, 0xbd, 0x05, 0x97, 0x39, 0x0b, 0x59, 0x26, 0x93, 0x45, 0x90, 0xe6, - 0x51, 0xfc, 0x24, 0x66, 0x11, 0x9e, 0xab, 0x9e, 0x3f, 0xa8, 0x14, 0xc7, 0x46, 0xee, 0xfd, 0xd9, - 0x82, 0xfd, 0x1f, 0x96, 0x8c, 0x2f, 0x1e, 0xe5, 0x11, 0xd3, 0x9e, 0x7d, 0xf1, 0x94, 0x78, 0x17, - 0xb1, 0x18, 0x7a, 0x74, 0xba, 0xbe, 0xf1, 0xaf, 0xd3, 0x55, 0xf8, 0x3d, 0x61, 0x52, 0x54, 0x51, - 0xac, 0x1f, 0x4b, 0xdb, 0x50, 0xbc, 0x0c, 0xac, 0xe9, 0x44, 0x9a, 0xe2, 0x9f, 0x5a, 0x60, 0xaf, - 0x1c, 0x4c, 0x55, 0xee, 0x4d, 0x6f, 0xd0, 0x2d, 0xc5, 0xc2, 0x82, 0x6b, 0x87, 0xcb, 0x5f, 0x22, - 0xe4, 0x2a, 0xec, 0xa4, 0x62, 0x66, 0x22, 0xee, 0xf8, 0x7a, 0x40, 0x6e, 0x40, 0x2f, 0x15, 0x33, - 0xbc, 0x53, 0x9a, 0x2a, 0x5d, 0x8f, 0x55, 0xd8, 0xea, 0x8e, 0x62, 0x8a, 0xd5, 0x52, 0xe0, 0xfd, - 0xdc, 0x02, 0x62, 0xfa, 0xf0, 0x0b, 0xfd, 0xef, 0xc2, 0x84, 0x5d, 0xfd, 0xad, 0xd3, 0xd2, 0x97, - 0x91, 0x55, 0xd9, 0x46, 0xbb, 0x6b, 0x6f, 0xb6, 0xbb, 0x37, 0xbf, 0x07, 0xfd, 0xfa, 0xcf, 0x31, - 0x19, 0x80, 0x33, 0xce, 0x62, 0x89, 0xf7, 0xd8, 0x38, 0x9b, 0x0d, 0xbe, 0x42, 0x6c, 0xe8, 0xfe, - 0x80, 0xd1, 0x44, 0xce, 0x17, 0x03, 0x8b, 0x38, 0xd0, 0x7b, 0x6f, 0x9a, 0xe5, 0x3c, 0xa5, 0xc9, - 0xa0, 0x75, 0xff, 0x9d, 0x1f, 0x7f, 0x67, 0x16, 0xcb, 0x79, 0x39, 0x55, 0xce, 0xdd, 0xd1, 0xde, - 0x7e, 0x2b, 0xce, 0xcd, 0xd7, 0x9d, 0x2a, 0x10, 0x77, 0x10, 0x40, 0x3d, 0x2c, 0xa6, 0xd3, 0x5d, - 0x94, 0xbc, 0xfd, 0xcf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x45, 0x0a, 0xb0, 0xba, 0x5f, 0x17, 0x00, - 0x00, + // 1755 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xec, 0x58, 0x5b, 0x8f, 0x23, 0x47, + 0x15, 0xa6, 0x6d, 0xcf, 0xd8, 0x3e, 0xee, 0x99, 0x9d, 0xad, 0xbd, 0xa4, 0xf7, 0x92, 0xc4, 0xe9, + 0x70, 0x19, 0x12, 0xb1, 0xb3, 0x9a, 0x00, 0x89, 0x10, 0xd2, 0x26, 0x3b, 0x4e, 0x16, 0x6b, 0x33, + 0xab, 0xa1, 0xbc, 0x89, 0x04, 0x2f, 0xad, 0xb2, 0xbb, 0xd6, 0xee, 0xa4, 0x6f, 0x54, 0x95, 0x77, + 0xd7, 0x79, 0xe2, 0x81, 0x27, 0x10, 0x08, 0x21, 0xf1, 0xc4, 0x7f, 0xe0, 0x27, 0x00, 0xca, 0x13, + 0x12, 0xbf, 0x80, 0xbf, 0xc1, 0x23, 0x2f, 0xa0, 0x3a, 0x55, 0xdd, 0x6e, 0x7b, 0x7a, 0x06, 0x67, + 0x56, 0x08, 0x22, 0x78, 0xeb, 0x3a, 0xe7, 0x54, 0xd5, 0x39, 0xdf, 0xb9, 0x76, 0xc1, 0x6e, 0x94, + 0x2a, 0x2e, 0x52, 0x16, 0xdf, 0xc9, 0x45, 0xa6, 0x32, 0x72, 0x2d, 0x89, 0xe2, 0xa7, 0x73, 0x69, + 0x56, 0x77, 0x0a, 0xe6, 0x4d, 0x77, 0x92, 0x25, 0x49, 0x96, 0x1a, 0xf2, 0x4d, 0x57, 0x4e, 0x66, + 0x3c, 0x61, 0x66, 0xe5, 0xff, 0xc1, 0x81, 0x9d, 0xa3, 0x2c, 0xc9, 0xb3, 0x94, 0xa7, 0x6a, 0x98, + 0x3e, 0xc9, 0xc8, 0x75, 0xd8, 0x4e, 0xb3, 0x90, 0x0f, 0x07, 0x9e, 0xd3, 0x77, 0xf6, 0x9b, 0xd4, + 0xae, 0x08, 0x81, 0x96, 0xc8, 0x62, 0xee, 0x35, 0xfa, 0xce, 0x7e, 0x97, 0xe2, 0x37, 0xb9, 0x07, + 0x20, 0x15, 0x53, 0x3c, 0x98, 0x64, 0x21, 0xf7, 0x9a, 0x7d, 0x67, 0x7f, 0xf7, 0xb0, 0x7f, 0xa7, + 0x56, 0x8b, 0x3b, 0x23, 0x2d, 0x78, 0x94, 0x85, 0x9c, 0x76, 0x65, 0xf1, 0x49, 0xde, 0x05, 0xe0, + 0xcf, 0x95, 0x60, 0x41, 0x94, 0x3e, 0xc9, 0xbc, 0x56, 0xbf, 0xb9, 0xdf, 0x3b, 0x7c, 0x6d, 0xf5, + 0x00, 0xab, 0xfc, 0x43, 0xbe, 0xf8, 0x98, 0xc5, 0x73, 0x7e, 0xc2, 0x22, 0x41, 0xbb, 0xb8, 0x49, + 0xab, 0xeb, 0xff, 0xd5, 0x81, 0x4b, 0xa5, 0x01, 0x78, 0x87, 0x24, 0xdf, 0x83, 0x2d, 0xbc, 0x02, + 0x2d, 0xe8, 0x1d, 0x7e, 0xf5, 0x0c, 0x8d, 0x56, 0xec, 0xa6, 0x66, 0x0b, 0xf9, 0x08, 0xae, 0xc8, + 0xf9, 0x78, 0x52, 0xb0, 0x02, 0xa4, 0x4a, 0xaf, 0x81, 0xaa, 0x6d, 0x76, 0x12, 0xa9, 0x1e, 0x60, + 0x55, 0x7a, 0x0b, 0xb6, 0xf5, 0x49, 0x73, 0x89, 0x28, 0xf5, 0x0e, 0x6f, 0xd5, 0x1a, 0x39, 0x42, + 0x11, 0x6a, 0x45, 0xfd, 0x5b, 0x70, 0xe3, 0x01, 0x57, 0x6b, 0xd6, 0x51, 0xfe, 0x93, 0x39, 0x97, + 0xca, 0x32, 0x1f, 0x47, 0x09, 0x7f, 0x1c, 0x4d, 0x3e, 0x3d, 0x9a, 0xb1, 0x34, 0xe5, 0x71, 0xc1, + 0x7c, 0x19, 0x6e, 0x3d, 0xe0, 0xb8, 0x21, 0x92, 0x2a, 0x9a, 0xc8, 0x35, 0xf6, 0x35, 0xb8, 0xf2, + 0x80, 0xab, 0x41, 0xb8, 0x46, 0xfe, 0x18, 0x3a, 0x8f, 0xb4, 0xb3, 0x75, 0x18, 0x7c, 0x17, 0xda, + 0x2c, 0x0c, 0x05, 0x97, 0xd2, 0xa2, 0x78, 0xbb, 0x56, 0xe3, 0xf7, 0x8c, 0x0c, 0x2d, 0x84, 0xeb, + 0xc2, 0xc4, 0xff, 0x04, 0x60, 0x98, 0x46, 0xea, 0x84, 0x09, 0x96, 0xc8, 0x33, 0x03, 0x6c, 0x00, + 0xae, 0x54, 0x4c, 0xa8, 0x20, 0x47, 0x39, 0x0b, 0xf9, 0x06, 0xd1, 0xd0, 0xc3, 0x6d, 0xe6, 0x74, + 0xff, 0x47, 0x00, 0x23, 0x25, 0xa2, 0x74, 0xfa, 0x61, 0x24, 0x95, 0xbe, 0xeb, 0xa9, 0x96, 0xd3, + 0x46, 0x34, 0xf7, 0xbb, 0xd4, 0xae, 0x2a, 0xee, 0x68, 0x6c, 0xee, 0x8e, 0x7b, 0xd0, 0x2b, 0xe0, + 0x3e, 0x96, 0x53, 0x72, 0x17, 0x5a, 0x63, 0x26, 0xf9, 0xb9, 0xf0, 0x1c, 0xcb, 0xe9, 0x7d, 0x26, + 0x39, 0x45, 0x49, 0xff, 0xf3, 0x06, 0xbc, 0x74, 0x24, 0x38, 0x06, 0x7f, 0x1c, 0xf3, 0x89, 0x8a, + 0xb2, 0xd4, 0x62, 0xff, 0xc5, 0x4f, 0x23, 0x2f, 0x41, 0x3b, 0x1c, 0x07, 0x29, 0x4b, 0x0a, 0xb0, + 0xb7, 0xc3, 0xf1, 0x23, 0x96, 0x70, 0xf2, 0x75, 0xd8, 0x9d, 0x94, 0xe7, 0x6b, 0x0a, 0xc6, 0x5c, + 0x97, 0xae, 0x51, 0xb5, 0xab, 0xc2, 0xf1, 0x70, 0xe0, 0xb5, 0xd0, 0x0d, 0xf8, 0x4d, 0x7c, 0x70, + 0x97, 0x52, 0xc3, 0x81, 0xb7, 0x85, 0xbc, 0x15, 0x9a, 0x06, 0xd5, 0xd4, 0x10, 0x6f, 0xbb, 0xef, + 0xec, 0xbb, 0xd4, 0xae, 0xc8, 0x5d, 0xb8, 0xf2, 0x34, 0x12, 0x6a, 0xce, 0x62, 0x1b, 0x57, 0xfa, + 0x16, 0xe9, 0xb5, 0x11, 0xf9, 0x3a, 0x16, 0x39, 0x84, 0xab, 0xf9, 0x6c, 0x21, 0xa3, 0xc9, 0xda, + 0x96, 0x0e, 0x6e, 0xa9, 0xe5, 0xf9, 0x9f, 0x3b, 0x70, 0x6d, 0x20, 0xb2, 0xfc, 0xcb, 0x0c, 0xa1, + 0xff, 0xcb, 0x06, 0x5c, 0x37, 0x91, 0x70, 0xc2, 0x84, 0x8a, 0xfe, 0x4d, 0x56, 0x7c, 0x03, 0x2e, + 0x2d, 0x6f, 0x35, 0x02, 0xf5, 0x66, 0x7c, 0x0d, 0x76, 0xf3, 0x42, 0x0f, 0x23, 0xd7, 0x42, 0xb9, + 0x9d, 0x92, 0xba, 0x62, 0xed, 0xd6, 0x39, 0xd6, 0x6e, 0xd7, 0x04, 0x4c, 0x1f, 0x7a, 0xe5, 0x41, + 0xc3, 0x81, 0xd7, 0x46, 0x91, 0x2a, 0xc9, 0xff, 0x45, 0x03, 0xae, 0x6a, 0xa7, 0xfe, 0x1f, 0x0d, + 0x8d, 0xc6, 0x1f, 0x1b, 0x40, 0x4c, 0x74, 0x0c, 0xd3, 0x90, 0x3f, 0xff, 0x4f, 0x62, 0xf1, 0x32, + 0xc0, 0x93, 0x88, 0xc7, 0x61, 0x15, 0x87, 0x2e, 0x52, 0x5e, 0x08, 0x03, 0x0f, 0xda, 0x78, 0x48, + 0x69, 0x7f, 0xb1, 0xd4, 0x5d, 0xc0, 0x4c, 0x04, 0xb6, 0x0b, 0x74, 0x36, 0xee, 0x02, 0xb8, 0xcd, + 0x76, 0x81, 0xdf, 0x37, 0x61, 0x67, 0x98, 0x4a, 0x2e, 0xd4, 0xff, 0x72, 0x20, 0x91, 0xdb, 0xd0, + 0x95, 0x7c, 0x9a, 0xe8, 0xc1, 0x64, 0xe0, 0x75, 0x90, 0xbf, 0x24, 0x68, 0xee, 0xc4, 0x54, 0xd6, + 0xe1, 0xc0, 0xeb, 0x1a, 0xd7, 0x96, 0x04, 0xf2, 0x0a, 0x80, 0x8a, 0x12, 0x2e, 0x15, 0x4b, 0x72, + 0xe9, 0x41, 0xbf, 0xb9, 0xdf, 0xa2, 0x15, 0x8a, 0xee, 0x02, 0x22, 0x7b, 0x36, 0x1c, 0x48, 0xaf, + 0xd7, 0x6f, 0xea, 0x36, 0x6e, 0x56, 0xe4, 0xdb, 0xd0, 0x11, 0xd9, 0xb3, 0x20, 0x64, 0x8a, 0x79, + 0x2e, 0x3a, 0xef, 0x46, 0x2d, 0xd8, 0xf7, 0xe3, 0x6c, 0x4c, 0xdb, 0x22, 0x7b, 0x36, 0x60, 0x8a, + 0xf9, 0x7f, 0x6b, 0xc0, 0xce, 0x88, 0x33, 0x31, 0x99, 0x5d, 0xdc, 0x61, 0xdf, 0x84, 0x3d, 0xc1, + 0xe5, 0x3c, 0x56, 0xc1, 0xd2, 0x2c, 0xe3, 0xb9, 0x4b, 0x86, 0x7e, 0x54, 0x1a, 0x57, 0x40, 0xde, + 0x3c, 0x07, 0xf2, 0x56, 0x0d, 0xe4, 0x3e, 0xb8, 0x15, 0x7c, 0xa5, 0xb7, 0x85, 0xa6, 0xaf, 0xd0, + 0xc8, 0x1e, 0x34, 0x43, 0x19, 0xa3, 0xc7, 0xba, 0x54, 0x7f, 0x92, 0x37, 0xe1, 0x72, 0x1e, 0xb3, + 0x09, 0x9f, 0x65, 0x71, 0xc8, 0x45, 0x30, 0x15, 0xd9, 0x3c, 0x47, 0x77, 0xb9, 0x74, 0xaf, 0xc2, + 0x78, 0xa0, 0xe9, 0xe4, 0x6d, 0xe8, 0x84, 0x32, 0x0e, 0xd4, 0x22, 0xe7, 0xe8, 0xb2, 0xdd, 0x33, + 0x6c, 0x1f, 0xc8, 0xf8, 0xf1, 0x22, 0xe7, 0xb4, 0x1d, 0x9a, 0x0f, 0x72, 0x17, 0xae, 0x4a, 0x2e, + 0x22, 0x16, 0x47, 0x9f, 0xf1, 0x30, 0xe0, 0xcf, 0x73, 0x11, 0xe4, 0x31, 0x4b, 0xd1, 0xb3, 0x2e, + 0x25, 0x4b, 0xde, 0xfb, 0xcf, 0x73, 0x71, 0x12, 0xb3, 0xd4, 0xff, 0x47, 0x05, 0x74, 0x8d, 0x8f, + 0xbc, 0x00, 0xe8, 0x17, 0x99, 0xa4, 0x6a, 0x3d, 0xd5, 0xac, 0xf7, 0xd4, 0xab, 0xd0, 0x4b, 0xb8, + 0x12, 0xd1, 0xc4, 0x20, 0x62, 0x12, 0x08, 0x0c, 0x09, 0xcd, 0x26, 0xd0, 0x9a, 0x45, 0xca, 0xb8, + 0xc2, 0xa5, 0xf8, 0x4d, 0xbe, 0x0f, 0x37, 0x25, 0x67, 0x31, 0x0f, 0x83, 0x32, 0xda, 0x65, 0x20, + 0xd1, 0x52, 0x1e, 0x7a, 0xdb, 0xe8, 0x34, 0xcf, 0x48, 0x8c, 0x4a, 0x81, 0x91, 0xe5, 0x93, 0x03, + 0xb8, 0x52, 0xaa, 0x55, 0xd9, 0x66, 0xe6, 0x18, 0xb2, 0x64, 0x95, 0x1b, 0xde, 0x01, 0x6f, 0x1a, + 0x67, 0x63, 0x16, 0x07, 0xa7, 0x6e, 0xc5, 0xfa, 0xd5, 0xa4, 0xd7, 0x0d, 0x7f, 0xb4, 0x76, 0xa5, + 0xff, 0xbb, 0x06, 0x5c, 0xa2, 0xda, 0x16, 0xfe, 0x94, 0x7f, 0xe9, 0x03, 0xff, 0x0d, 0x68, 0x46, + 0xa1, 0xc4, 0xc0, 0xef, 0x1d, 0x7a, 0xab, 0x7a, 0xdb, 0x9f, 0xce, 0xe1, 0x40, 0x52, 0x2d, 0x44, + 0x5e, 0x87, 0x9d, 0x6c, 0xae, 0xf2, 0xb9, 0x0a, 0xb0, 0xf0, 0x17, 0x53, 0xa2, 0x6b, 0x88, 0x1f, + 0x20, 0xcd, 0xff, 0xf5, 0x0a, 0x3a, 0xff, 0xad, 0x11, 0x6a, 0xcd, 0x6e, 0x6d, 0x62, 0xf6, 0x3d, + 0xe8, 0x19, 0x7b, 0x4d, 0x7d, 0xdc, 0xc2, 0xfa, 0xf8, 0x4a, 0xed, 0x1e, 0xc4, 0x40, 0xd7, 0x46, + 0x6a, 0x3a, 0xb0, 0xc4, 0x3a, 0xf9, 0x17, 0x07, 0x76, 0x06, 0x3c, 0xe6, 0xea, 0x05, 0xc2, 0xa5, + 0xa6, 0x7f, 0x35, 0x6a, 0xfb, 0xd7, 0x4a, 0x83, 0x68, 0x9e, 0xdf, 0x20, 0x5a, 0xa7, 0x1a, 0xc4, + 0x6b, 0xe0, 0xe6, 0x22, 0x4a, 0x98, 0x58, 0x04, 0x9f, 0xf2, 0x45, 0x11, 0x32, 0x3d, 0x4b, 0x7b, + 0xc8, 0x17, 0xd2, 0xff, 0xbb, 0x03, 0xdd, 0x0f, 0x33, 0x16, 0xe2, 0x98, 0x73, 0x01, 0x4b, 0x56, + 0xfa, 0x5b, 0xa3, 0xa6, 0xbf, 0x95, 0x93, 0x4a, 0xa1, 0xfe, 0x72, 0x74, 0xa9, 0x8c, 0x20, 0xad, + 0xd5, 0x11, 0xe4, 0x55, 0xe8, 0x45, 0x5a, 0xa1, 0x20, 0x67, 0x6a, 0x66, 0xf4, 0xee, 0x52, 0x40, + 0xd2, 0x89, 0xa6, 0xe8, 0x19, 0xa5, 0x10, 0xc0, 0x19, 0x65, 0x7b, 0xe3, 0x19, 0xc5, 0x1e, 0x82, + 0x33, 0xca, 0x9f, 0x1a, 0xe0, 0xd9, 0x52, 0xb0, 0xfc, 0x51, 0xff, 0x28, 0x0f, 0xf1, 0xbd, 0xe0, + 0x36, 0x74, 0xcb, 0x32, 0x61, 0xff, 0x93, 0x97, 0x04, 0x0d, 0xfd, 0x31, 0x4f, 0x32, 0xb1, 0x18, + 0x45, 0x9f, 0x71, 0x6b, 0x78, 0x85, 0xa2, 0x6d, 0x7b, 0x34, 0x4f, 0x68, 0xf6, 0x4c, 0xda, 0x44, + 0x2f, 0x96, 0xda, 0xb6, 0x09, 0x4e, 0x96, 0x81, 0xf6, 0x14, 0x5a, 0xde, 0xa2, 0x60, 0x48, 0xfa, + 0xe7, 0x96, 0xdc, 0x80, 0x0e, 0x4f, 0x43, 0xc3, 0xdd, 0x42, 0x6e, 0x9b, 0xa7, 0x21, 0xb2, 0x86, + 0xb0, 0x6b, 0x7f, 0xd0, 0x33, 0x89, 0x49, 0x6f, 0x53, 0xdd, 0x3f, 0xe3, 0x55, 0xe4, 0x58, 0x4e, + 0x4f, 0xac, 0x24, 0xdd, 0x31, 0xff, 0xe8, 0x76, 0x49, 0xde, 0x07, 0x57, 0xdf, 0x52, 0x1e, 0xd4, + 0xde, 0xf8, 0xa0, 0x1e, 0x4f, 0xc3, 0x62, 0xe1, 0xff, 0xc6, 0x81, 0xcb, 0xa7, 0x20, 0xbc, 0x40, + 0x1c, 0x3d, 0x84, 0xce, 0x88, 0x4f, 0xf5, 0x11, 0xc5, 0xb3, 0xc3, 0xc1, 0x59, 0xaf, 0x58, 0x67, + 0x38, 0x8c, 0x96, 0x07, 0xf8, 0x9f, 0x94, 0x6e, 0xfd, 0x20, 0x9e, 0xcb, 0xd9, 0x51, 0x96, 0xe4, + 0x3a, 0x5f, 0xc3, 0x0b, 0xbd, 0x19, 0x9c, 0x1f, 0xe2, 0xfe, 0xcf, 0x1c, 0x00, 0x4c, 0x1e, 0xbc, + 0xfa, 0x54, 0x60, 0x3a, 0x17, 0x09, 0x4c, 0x3d, 0x48, 0xa4, 0xf3, 0x24, 0x10, 0x3c, 0x66, 0x6a, + 0xd9, 0xcc, 0xa4, 0xbd, 0x9d, 0xa4, 0xf3, 0x84, 0x1a, 0x96, 0x35, 0x53, 0xfa, 0xbf, 0x72, 0x00, + 0xb0, 0x5e, 0x19, 0x35, 0xd6, 0x1b, 0x8a, 0x73, 0xfe, 0x1f, 0x40, 0x63, 0x35, 0xfd, 0xee, 0x17, + 0xe9, 0x27, 0xd1, 0x1f, 0xcd, 0x3a, 0x1b, 0x4a, 0x7f, 0x2c, 0x8d, 0xb7, 0x19, 0x6a, 0x7c, 0xf0, + 0x5b, 0x07, 0xdc, 0x8a, 0xab, 0xe4, 0x2a, 0x8c, 0xce, 0x7a, 0xa5, 0xc0, 0x21, 0x43, 0x67, 0x4f, + 0x20, 0x2b, 0x09, 0x95, 0x2c, 0x13, 0xea, 0x06, 0x74, 0x10, 0x92, 0x4a, 0x46, 0xa5, 0x36, 0xa3, + 0xde, 0x84, 0xcb, 0x82, 0x4f, 0x78, 0xaa, 0xe2, 0x45, 0x90, 0x64, 0x61, 0xf4, 0x24, 0xe2, 0x21, + 0xe6, 0x55, 0x87, 0xee, 0x15, 0x8c, 0x63, 0x4b, 0xf7, 0xff, 0xec, 0xc0, 0xee, 0x0f, 0xe7, 0x5c, + 0x2c, 0x1e, 0x65, 0x21, 0x37, 0x9a, 0x7d, 0xf1, 0x90, 0x78, 0x17, 0x6d, 0xb1, 0xf0, 0x98, 0x70, + 0x7d, 0xfd, 0x5f, 0x87, 0xab, 0xa4, 0x1d, 0x69, 0x43, 0x54, 0x43, 0x6c, 0xfe, 0xea, 0x36, 0x81, + 0x78, 0xe9, 0x58, 0xdb, 0x89, 0x0c, 0xc4, 0x3f, 0x75, 0xa0, 0x57, 0x49, 0x4c, 0x5d, 0xee, 0x6d, + 0x6f, 0x30, 0x2d, 0xc5, 0xc1, 0x82, 0xdb, 0x9b, 0x2c, 0xdf, 0x6e, 0xc8, 0x55, 0xd8, 0x4a, 0xe4, + 0xd4, 0x7a, 0xdc, 0xa5, 0x66, 0x41, 0x6e, 0x42, 0x27, 0x91, 0x53, 0x1c, 0x7e, 0x6d, 0x95, 0x2e, + 0xd7, 0xda, 0x6d, 0x65, 0x47, 0xb1, 0xc5, 0x6a, 0x49, 0xf0, 0x7f, 0xee, 0x00, 0xb1, 0x7d, 0xf8, + 0x85, 0x1e, 0xe6, 0x30, 0x60, 0xab, 0xef, 0x4f, 0x0d, 0x33, 0x8c, 0x54, 0x69, 0x6b, 0xed, 0xae, + 0xb9, 0xde, 0xee, 0xde, 0x78, 0x07, 0xba, 0xe5, 0x13, 0x37, 0xd9, 0x03, 0x77, 0x98, 0x46, 0x0a, + 0x07, 0xee, 0x28, 0x9d, 0xee, 0x7d, 0x85, 0xf4, 0xa0, 0xfd, 0x03, 0xce, 0x62, 0x35, 0x5b, 0xec, + 0x39, 0xc4, 0x85, 0xce, 0x7b, 0xe3, 0x34, 0x13, 0x09, 0x8b, 0xf7, 0x1a, 0xf7, 0xdf, 0xfe, 0xf1, + 0x77, 0xa6, 0x91, 0x9a, 0xcd, 0xc7, 0x5a, 0xb9, 0x03, 0xa3, 0xed, 0xb7, 0xa2, 0xcc, 0x7e, 0x1d, + 0x14, 0x8e, 0x38, 0x40, 0x03, 0xca, 0x65, 0x3e, 0x1e, 0x6f, 0x23, 0xe5, 0xad, 0x7f, 0x06, 0x00, + 0x00, 0xff, 0xff, 0xa3, 0xc1, 0xe7, 0x20, 0x08, 0x18, 0x00, 0x00, } diff --git a/internal/querynode/collection_replica.go b/internal/querynode/collection_replica.go index 3e48754802..50426a2d8b 100644 --- a/internal/querynode/collection_replica.go +++ b/internal/querynode/collection_replica.go @@ -90,6 +90,10 @@ type ReplicaInterface interface { getSegmentsBySegmentType(segType segmentType) ([]UniqueID, []UniqueID, []UniqueID) replaceGrowingSegmentBySealedSegment(segment *Segment) error + //channels + addWatchedDmChannels(channels []string) + getWatchedDmChannels() []string + getTSafe(collectionID UniqueID) tSafer addTSafe(collectionID UniqueID) removeTSafe(collectionID UniqueID) @@ -105,6 +109,7 @@ type collectionReplica struct { segments map[UniqueID]*Segment excludedSegments map[UniqueID][]UniqueID // map[collectionID]segmentIDs + watchedChannels []string } //----------------------------------------------------------------------------------------------------- collection @@ -707,11 +712,20 @@ func (colReplica *collectionReplica) getSegmentsToLoadBySegmentType(segType segm return targetCollectionIDs, targetPartitionIDs, targetSegmentIDs } +func (colReplica *collectionReplica) addWatchedDmChannels(channels []string) { + colReplica.watchedChannels = append(colReplica.watchedChannels, channels...) +} + +func (colReplica *collectionReplica) getWatchedDmChannels() []string { + return colReplica.watchedChannels +} + func newCollectionReplica() ReplicaInterface { collections := make(map[UniqueID]*Collection) partitions := make(map[UniqueID]*Partition) segments := make(map[UniqueID]*Segment) excludedSegments := make(map[UniqueID][]UniqueID) + watchedChannels := make([]string, 0) var replica ReplicaInterface = &collectionReplica{ collections: collections, @@ -719,6 +733,7 @@ func newCollectionReplica() ReplicaInterface { segments: segments, excludedSegments: excludedSegments, + watchedChannels: watchedChannels, tSafes: make(map[UniqueID]tSafer), } diff --git a/internal/querynode/collection_replica_test.go b/internal/querynode/collection_replica_test.go index 11514b463a..8e9d1f04a9 100644 --- a/internal/querynode/collection_replica_test.go +++ b/internal/querynode/collection_replica_test.go @@ -21,7 +21,7 @@ import ( func TestCollectionReplica_getCollectionNum(t *testing.T) { node := newQueryNodeMock() initTestMeta(t, node, 0, 0) - assert.Equal(t, node.replica.getCollectionNum(), 1) + assert.Equal(t, node.historical.replica.getCollectionNum(), 1) err := node.Stop() assert.NoError(t, err) } @@ -36,11 +36,11 @@ func TestCollectionReplica_addCollection(t *testing.T) { func TestCollectionReplica_removeCollection(t *testing.T) { node := newQueryNodeMock() initTestMeta(t, node, 0, 0) - assert.Equal(t, node.replica.getCollectionNum(), 1) + assert.Equal(t, node.historical.replica.getCollectionNum(), 1) - err := node.replica.removeCollection(0) + err := node.historical.replica.removeCollection(0) assert.NoError(t, err) - assert.Equal(t, node.replica.getCollectionNum(), 0) + assert.Equal(t, node.historical.replica.getCollectionNum(), 0) err = node.Stop() assert.NoError(t, err) } @@ -49,7 +49,7 @@ func TestCollectionReplica_getCollectionByID(t *testing.T) { node := newQueryNodeMock() collectionID := UniqueID(0) initTestMeta(t, node, collectionID, 0) - targetCollection, err := node.replica.getCollectionByID(collectionID) + targetCollection, err := node.historical.replica.getCollectionByID(collectionID) assert.NoError(t, err) assert.NotNil(t, targetCollection) assert.Equal(t, targetCollection.ID(), collectionID) @@ -62,9 +62,9 @@ func TestCollectionReplica_hasCollection(t *testing.T) { collectionID := UniqueID(0) initTestMeta(t, node, collectionID, 0) - hasCollection := node.replica.hasCollection(collectionID) + hasCollection := node.historical.replica.hasCollection(collectionID) assert.Equal(t, hasCollection, true) - hasCollection = node.replica.hasCollection(UniqueID(1)) + hasCollection = node.historical.replica.hasCollection(UniqueID(1)) assert.Equal(t, hasCollection, false) err := node.Stop() @@ -79,14 +79,14 @@ func TestCollectionReplica_getPartitionNum(t *testing.T) { partitionIDs := []UniqueID{1, 2, 3} for _, id := range partitionIDs { - err := node.replica.addPartition(collectionID, id) + err := node.historical.replica.addPartition(collectionID, id) assert.NoError(t, err) - partition, err := node.replica.getPartitionByID(id) + partition, err := node.historical.replica.getPartitionByID(id) assert.NoError(t, err) assert.Equal(t, partition.ID(), id) } - partitionNum := node.replica.getPartitionNum() + partitionNum := node.historical.replica.getPartitionNum() assert.Equal(t, partitionNum, len(partitionIDs)+1) err := node.Stop() assert.NoError(t, err) @@ -99,9 +99,9 @@ func TestCollectionReplica_addPartition(t *testing.T) { partitionIDs := []UniqueID{1, 2, 3} for _, id := range partitionIDs { - err := node.replica.addPartition(collectionID, id) + err := node.historical.replica.addPartition(collectionID, id) assert.NoError(t, err) - partition, err := node.replica.getPartitionByID(id) + partition, err := node.historical.replica.getPartitionByID(id) assert.NoError(t, err) assert.Equal(t, partition.ID(), id) } @@ -117,12 +117,12 @@ func TestCollectionReplica_removePartition(t *testing.T) { partitionIDs := []UniqueID{1, 2, 3} for _, id := range partitionIDs { - err := node.replica.addPartition(collectionID, id) + err := node.historical.replica.addPartition(collectionID, id) assert.NoError(t, err) - partition, err := node.replica.getPartitionByID(id) + partition, err := node.historical.replica.getPartitionByID(id) assert.NoError(t, err) assert.Equal(t, partition.ID(), id) - err = node.replica.removePartition(id) + err = node.historical.replica.removePartition(id) assert.NoError(t, err) } err := node.Stop() @@ -137,9 +137,9 @@ func TestCollectionReplica_getPartitionByTag(t *testing.T) { collectionMeta := genTestCollectionMeta(collectionID, false) for _, id := range collectionMeta.PartitionIDs { - err := node.replica.addPartition(collectionID, id) + err := node.historical.replica.addPartition(collectionID, id) assert.NoError(t, err) - partition, err := node.replica.getPartitionByID(id) + partition, err := node.historical.replica.getPartitionByID(id) assert.NoError(t, err) assert.Equal(t, partition.ID(), id) assert.NotNil(t, partition) @@ -154,11 +154,11 @@ func TestCollectionReplica_hasPartition(t *testing.T) { initTestMeta(t, node, collectionID, 0) collectionMeta := genTestCollectionMeta(collectionID, false) - err := node.replica.addPartition(collectionID, collectionMeta.PartitionIDs[0]) + err := node.historical.replica.addPartition(collectionID, collectionMeta.PartitionIDs[0]) assert.NoError(t, err) - hasPartition := node.replica.hasPartition(defaultPartitionID) + hasPartition := node.historical.replica.hasPartition(defaultPartitionID) assert.Equal(t, hasPartition, true) - hasPartition = node.replica.hasPartition(defaultPartitionID + 1) + hasPartition = node.historical.replica.hasPartition(defaultPartitionID + 1) assert.Equal(t, hasPartition, false) err = node.Stop() assert.NoError(t, err) @@ -172,9 +172,9 @@ func TestCollectionReplica_addSegment(t *testing.T) { const segmentNum = 3 for i := 0; i < segmentNum; i++ { - err := node.replica.addSegment(UniqueID(i), defaultPartitionID, collectionID, segmentTypeGrowing) + err := node.historical.replica.addSegment(UniqueID(i), defaultPartitionID, collectionID, segmentTypeGrowing) assert.NoError(t, err) - targetSeg, err := node.replica.getSegmentByID(UniqueID(i)) + targetSeg, err := node.historical.replica.getSegmentByID(UniqueID(i)) assert.NoError(t, err) assert.Equal(t, targetSeg.segmentID, UniqueID(i)) } @@ -191,12 +191,12 @@ func TestCollectionReplica_removeSegment(t *testing.T) { const segmentNum = 3 for i := 0; i < segmentNum; i++ { - err := node.replica.addSegment(UniqueID(i), defaultPartitionID, collectionID, segmentTypeGrowing) + err := node.historical.replica.addSegment(UniqueID(i), defaultPartitionID, collectionID, segmentTypeGrowing) assert.NoError(t, err) - targetSeg, err := node.replica.getSegmentByID(UniqueID(i)) + targetSeg, err := node.historical.replica.getSegmentByID(UniqueID(i)) assert.NoError(t, err) assert.Equal(t, targetSeg.segmentID, UniqueID(i)) - err = node.replica.removeSegment(UniqueID(i)) + err = node.historical.replica.removeSegment(UniqueID(i)) assert.NoError(t, err) } @@ -212,9 +212,9 @@ func TestCollectionReplica_getSegmentByID(t *testing.T) { const segmentNum = 3 for i := 0; i < segmentNum; i++ { - err := node.replica.addSegment(UniqueID(i), defaultPartitionID, collectionID, segmentTypeGrowing) + err := node.historical.replica.addSegment(UniqueID(i), defaultPartitionID, collectionID, segmentTypeGrowing) assert.NoError(t, err) - targetSeg, err := node.replica.getSegmentByID(UniqueID(i)) + targetSeg, err := node.historical.replica.getSegmentByID(UniqueID(i)) assert.NoError(t, err) assert.Equal(t, targetSeg.segmentID, UniqueID(i)) } @@ -231,14 +231,14 @@ func TestCollectionReplica_hasSegment(t *testing.T) { const segmentNum = 3 for i := 0; i < segmentNum; i++ { - err := node.replica.addSegment(UniqueID(i), defaultPartitionID, collectionID, segmentTypeGrowing) + err := node.historical.replica.addSegment(UniqueID(i), defaultPartitionID, collectionID, segmentTypeGrowing) assert.NoError(t, err) - targetSeg, err := node.replica.getSegmentByID(UniqueID(i)) + targetSeg, err := node.historical.replica.getSegmentByID(UniqueID(i)) assert.NoError(t, err) assert.Equal(t, targetSeg.segmentID, UniqueID(i)) - hasSeg := node.replica.hasSegment(UniqueID(i)) + hasSeg := node.historical.replica.hasSegment(UniqueID(i)) assert.Equal(t, hasSeg, true) - hasSeg = node.replica.hasSegment(UniqueID(i + 100)) + hasSeg = node.historical.replica.hasSegment(UniqueID(i + 100)) assert.Equal(t, hasSeg, false) } @@ -261,28 +261,28 @@ func TestReplaceGrowingSegmentBySealedSegment(t *testing.T) { segmentID := UniqueID(520) initTestMeta(t, node, collectionID, segmentID) - _, _, segIDs := node.replica.getSegmentsBySegmentType(segmentTypeGrowing) + _, _, segIDs := node.historical.replica.getSegmentsBySegmentType(segmentTypeGrowing) assert.Equal(t, len(segIDs), 1) - collection, err := node.replica.getCollectionByID(collectionID) + collection, err := node.historical.replica.getCollectionByID(collectionID) assert.NoError(t, err) ns := newSegment(collection, segmentID, defaultPartitionID, collectionID, segmentTypeSealed) - err = node.replica.replaceGrowingSegmentBySealedSegment(ns) + err = node.historical.replica.replaceGrowingSegmentBySealedSegment(ns) assert.NoError(t, err) - err = node.replica.setSegmentEnableIndex(segmentID, true) + err = node.historical.replica.setSegmentEnableIndex(segmentID, true) assert.NoError(t, err) - segmentNums := node.replica.getSegmentNum() + segmentNums := node.historical.replica.getSegmentNum() assert.Equal(t, segmentNums, 1) - segment, err := node.replica.getSegmentByID(segmentID) + segment, err := node.historical.replica.getSegmentByID(segmentID) assert.NoError(t, err) assert.Equal(t, segment.getType(), segmentTypeSealed) - _, _, segIDs = node.replica.getSegmentsBySegmentType(segmentTypeGrowing) + _, _, segIDs = node.historical.replica.getSegmentsBySegmentType(segmentTypeGrowing) assert.Equal(t, len(segIDs), 0) - _, _, segIDs = node.replica.getSegmentsBySegmentType(segmentTypeSealed) + _, _, segIDs = node.historical.replica.getSegmentsBySegmentType(segmentTypeSealed) assert.Equal(t, len(segIDs), 1) err = node.Stop() diff --git a/internal/querynode/data_sync_service_test.go b/internal/querynode/data_sync_service_test.go index 3efd7439df..95381c374a 100644 --- a/internal/querynode/data_sync_service_test.go +++ b/internal/querynode/data_sync_service_test.go @@ -116,8 +116,8 @@ func TestDataSyncService_Start(t *testing.T) { assert.Nil(t, err) // dataSync - node.dataSyncServices[collectionID] = newDataSyncService(node.queryNodeLoopCtx, node.replica, msFactory, collectionID) - go node.dataSyncServices[collectionID].start() + node.streaming.dataSyncServices[collectionID] = newDataSyncService(node.queryNodeLoopCtx, node.streaming.replica, msFactory, collectionID) + go node.streaming.dataSyncServices[collectionID].start() <-node.queryNodeLoopCtx.Done() node.Stop() diff --git a/internal/querynode/historical.go b/internal/querynode/historical.go new file mode 100644 index 0000000000..b13682e7f7 --- /dev/null +++ b/internal/querynode/historical.go @@ -0,0 +1,54 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package querynode + +import ( + "context" + + "github.com/milvus-io/milvus/internal/msgstream" + "github.com/milvus-io/milvus/internal/types" +) + +type historical struct { + replica ReplicaInterface + loadService *loadService + statsService *statsService +} + +func newHistorical(ctx context.Context, + masterService types.MasterService, + dataService types.DataService, + indexService types.IndexService, + factory msgstream.Factory) *historical { + replica := newCollectionReplica() + ls := newLoadService(ctx, masterService, dataService, indexService, replica) + ss := newStatsService(ctx, replica, ls.segLoader.indexLoader.fieldStatsChan, factory) + + return &historical{ + replica: replica, + loadService: ls, + statsService: ss, + } +} + +func (h *historical) start() { + h.loadService.start() + h.statsService.start() +} + +func (h *historical) close() { + h.loadService.close() + h.statsService.close() + + // free collectionReplica + h.replica.freeAll() +} diff --git a/internal/querynode/impl.go b/internal/querynode/impl.go index 661c337209..94b68cc271 100644 --- a/internal/querynode/impl.go +++ b/internal/querynode/impl.go @@ -298,7 +298,7 @@ func (node *QueryNode) ReleaseSegments(ctx context.Context, in *queryPb.ReleaseS ErrorCode: commonpb.ErrorCode_Success, } for _, id := range in.SegmentIDs { - err2 := node.loadService.segLoader.replica.removeSegment(id) + err2 := node.historical.loadService.segLoader.replica.removeSegment(id) if err2 != nil { // not return, try to release all segments status.ErrorCode = commonpb.ErrorCode_UnexpectedError @@ -310,11 +310,7 @@ func (node *QueryNode) ReleaseSegments(ctx context.Context, in *queryPb.ReleaseS func (node *QueryNode) GetSegmentInfo(ctx context.Context, in *queryPb.GetSegmentInfoRequest) (*queryPb.GetSegmentInfoResponse, error) { infos := make([]*queryPb.SegmentInfo, 0) - for _, id := range in.SegmentIDs { - segment, err := node.replica.getSegmentByID(id) - if err != nil { - continue - } + getSegmentInfo := func(segment *Segment) *queryPb.SegmentInfo { var indexName string var indexID int64 // TODO:: segment has multi vec column @@ -334,6 +330,24 @@ func (node *QueryNode) GetSegmentInfo(ctx context.Context, in *queryPb.GetSegmen IndexName: indexName, IndexID: indexID, } + return info + } + // get info from historical + for _, id := range in.SegmentIDs { + segment, err := node.historical.replica.getSegmentByID(id) + if err != nil { + continue + } + info := getSegmentInfo(segment) + infos = append(infos, info) + } + // get info from streaming + for _, id := range in.SegmentIDs { + segment, err := node.streaming.replica.getSegmentByID(id) + if err != nil { + continue + } + info := getSegmentInfo(segment) infos = append(infos, info) } return &queryPb.GetSegmentInfoResponse{ diff --git a/internal/querynode/load_service_test.go b/internal/querynode/load_service_test.go index 1f01863845..8f2a2d5062 100644 --- a/internal/querynode/load_service_test.go +++ b/internal/querynode/load_service_test.go @@ -1038,26 +1038,26 @@ func TestSegmentLoad_Search_Vector(t *testing.T) { defer node.Stop() ctx := node.queryNodeLoopCtx - node.loadService = newLoadService(ctx, nil, nil, nil, node.replica) + node.historical.loadService = newLoadService(ctx, nil, nil, nil, node.historical.replica) initTestMeta(t, node, collectionID, 0) - err := node.replica.addPartition(collectionID, partitionID) + err := node.historical.replica.addPartition(collectionID, partitionID) assert.NoError(t, err) - err = node.replica.addSegment(segmentID, partitionID, collectionID, segmentTypeSealed) + err = node.historical.replica.addSegment(segmentID, partitionID, collectionID, segmentTypeSealed) assert.NoError(t, err) paths, srcFieldIDs, err := generateInsertBinLog(collectionID, partitionID, segmentID, keyPrefix) assert.NoError(t, err) - fieldsMap, _ := node.loadService.segLoader.checkTargetFields(paths, srcFieldIDs, fieldIDs) + fieldsMap, _ := node.historical.loadService.segLoader.checkTargetFields(paths, srcFieldIDs, fieldIDs) assert.Equal(t, len(fieldsMap), 4) - segment, err := node.replica.getSegmentByID(segmentID) + segment, err := node.historical.replica.getSegmentByID(segmentID) assert.NoError(t, err) - err = node.loadService.segLoader.loadSegmentFieldsData(segment, fieldsMap) + err = node.historical.loadService.segLoader.loadSegmentFieldsData(segment, fieldsMap) assert.NoError(t, err) indexPaths, err := generateIndex(segmentID) @@ -1070,7 +1070,7 @@ func TestSegmentLoad_Search_Vector(t *testing.T) { err = segment.setIndexInfo(100, indexInfo) assert.NoError(t, err) - err = node.loadService.segLoader.indexLoader.loadIndex(segment, 100) + err = node.historical.loadService.segLoader.indexLoader.loadIndex(segment, 100) assert.NoError(t, err) // do search @@ -1098,7 +1098,7 @@ func TestSegmentLoad_Search_Vector(t *testing.T) { assert.NoError(t, err) searchTimestamp := Timestamp(1020) - collection, err := node.replica.getCollectionByID(collectionID) + collection, err := node.historical.replica.getCollectionByID(collectionID) assert.NoError(t, err) plan, err := createPlan(*collection, dslString) assert.NoError(t, err) diff --git a/internal/querynode/meta_service.go b/internal/querynode/meta_service.go deleted file mode 100644 index e5e65b1461..0000000000 --- a/internal/querynode/meta_service.go +++ /dev/null @@ -1,252 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -package querynode - -import ( - "context" - "fmt" - "path" - "reflect" - "strings" - "time" - - "github.com/golang/protobuf/proto" - "go.etcd.io/etcd/clientv3" - "go.uber.org/zap" - - etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" - "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/milvus-io/milvus/internal/proto/etcdpb" - "github.com/milvus-io/milvus/internal/util/retry" -) - -const ( - CollectionPrefix = "/collection/" - SegmentPrefix = "/segment/" -) - -type metaService struct { - ctx context.Context - kvBase *etcdkv.EtcdKV - replica ReplicaInterface -} - -func newMetaService(ctx context.Context, replica ReplicaInterface) *metaService { - ETCDAddr := Params.EtcdAddress - MetaRootPath := Params.MetaRootPath - var cli *clientv3.Client - var err error - - connectEtcdFn := func() error { - cli, err = clientv3.New(clientv3.Config{ - Endpoints: []string{ETCDAddr}, - DialTimeout: 5 * time.Second, - }) - if err != nil { - return err - } - return nil - } - err = retry.Retry(100000, time.Millisecond*200, connectEtcdFn) - if err != nil { - panic(err) - } - - return &metaService{ - ctx: ctx, - kvBase: etcdkv.NewEtcdKV(cli, MetaRootPath), - replica: replica, - } -} - -func (mService *metaService) start() { - // init from meta - err := mService.loadCollections() - if err != nil { - log.Error("metaService loadCollections failed") - } - err = mService.loadSegments() - if err != nil { - log.Error("metaService loadSegments failed") - } -} - -func GetCollectionObjID(key string) string { - ETCDRootPath := Params.MetaRootPath - - prefix := path.Join(ETCDRootPath, CollectionPrefix) + "/" - return strings.TrimPrefix(key, prefix) -} - -func GetSegmentObjID(key string) string { - ETCDRootPath := Params.MetaRootPath - - prefix := path.Join(ETCDRootPath, SegmentPrefix) + "/" - return strings.TrimPrefix(key, prefix) -} - -func isCollectionObj(key string) bool { - ETCDRootPath := Params.MetaRootPath - - prefix := path.Join(ETCDRootPath, CollectionPrefix) + "/" - prefix = strings.TrimSpace(prefix) - index := strings.Index(key, prefix) - - return index == 0 -} - -func isSegmentObj(key string) bool { - ETCDRootPath := Params.MetaRootPath - - prefix := path.Join(ETCDRootPath, SegmentPrefix) + "/" - prefix = strings.TrimSpace(prefix) - index := strings.Index(key, prefix) - - return index == 0 -} - -func printCollectionStruct(obj *etcdpb.CollectionInfo) { - v := reflect.ValueOf(obj) - v = reflect.Indirect(v) - typeOfS := v.Type() - - for i := 0; i < v.NumField(); i++ { - if typeOfS.Field(i).Name == "GrpcMarshalString" { - continue - } - log.Debug("Field", zap.String("field name", typeOfS.Field(i).Name), zap.String("field", fmt.Sprintln(v.Field(i).Interface()))) - } -} - -func printSegmentStruct(obj *datapb.SegmentInfo) { - v := reflect.ValueOf(obj) - v = reflect.Indirect(v) - typeOfS := v.Type() - - for i := 0; i < v.NumField(); i++ { - log.Debug("Field", zap.String("field name", typeOfS.Field(i).Name), zap.String("field", fmt.Sprintln(v.Field(i).Interface()))) - } -} - -func (mService *metaService) processCollectionCreate(id string, value string) { - col := mService.collectionUnmarshal(value) - if col != nil { - schema := col.Schema - err := mService.replica.addCollection(col.ID, schema) - if err != nil { - log.Error(err.Error()) - } - for _, partitionID := range col.PartitionIDs { - err = mService.replica.addPartition(col.ID, partitionID) - if err != nil { - log.Error(err.Error()) - } - } - } -} - -func (mService *metaService) processSegmentCreate(id string, value string) { - //println("Create Segment: ", id) - - seg := mService.segmentUnmarshal(value) - - // TODO: what if seg == nil? We need to notify master and return rpc request failed - if seg != nil { - // TODO: get partition id from segment meta - err := mService.replica.addSegment(seg.ID, seg.PartitionID, seg.CollectionID, segmentTypeGrowing) - if err != nil { - log.Error(err.Error()) - return - } - } -} - -func (mService *metaService) processCreate(key string, msg string) { - //println("process create", key) - if isCollectionObj(key) { - objID := GetCollectionObjID(key) - mService.processCollectionCreate(objID, msg) - } else if isSegmentObj(key) { - objID := GetSegmentObjID(key) - mService.processSegmentCreate(objID, msg) - } else { - println("can not process create msg:", key) - } -} - -func (mService *metaService) loadCollections() error { - keys, values, err := mService.kvBase.LoadWithPrefix(CollectionPrefix) - if err != nil { - return err - } - - for i := range keys { - objID := GetCollectionObjID(keys[i]) - mService.processCollectionCreate(objID, values[i]) - } - - return nil -} - -func (mService *metaService) loadSegments() error { - keys, values, err := mService.kvBase.LoadWithPrefix(SegmentPrefix) - if err != nil { - return err - } - - for i := range keys { - objID := GetSegmentObjID(keys[i]) - mService.processSegmentCreate(objID, values[i]) - } - - return nil -} - -//----------------------------------------------------------------------- Unmarshal and Marshal -func (mService *metaService) collectionUnmarshal(value string) *etcdpb.CollectionInfo { - col := etcdpb.CollectionInfo{} - err := proto.UnmarshalText(value, &col) - if err != nil { - log.Error(fmt.Errorf("QueryNode metaService UnmarshalText etcdpb.CollectionInfo err:%w", err).Error()) - return nil - } - return &col -} - -func (mService *metaService) collectionMarshal(col *etcdpb.CollectionInfo) string { - value := proto.MarshalTextString(col) - if value == "" { - log.Error("marshal collection failed") - return "" - } - return value -} - -func (mService *metaService) segmentUnmarshal(value string) *datapb.SegmentInfo { - seg := datapb.SegmentInfo{} - err := proto.UnmarshalText(value, &seg) - if err != nil { - log.Error(fmt.Errorf("QueryNode metaService UnmarshalText datapb.SegmentInfo err:%w", err).Error()) - return nil - } - return &seg -} - -func (mService *metaService) segmentMarshal(seg *etcdpb.SegmentMeta) string { - value := proto.MarshalTextString(seg) - if value == "" { - log.Error("marshal segment failed") - return "" - } - return value -} diff --git a/internal/querynode/meta_service_test.go b/internal/querynode/meta_service_test.go deleted file mode 100644 index efda2da7ef..0000000000 --- a/internal/querynode/meta_service_test.go +++ /dev/null @@ -1,227 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -package querynode - -import ( - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/milvus-io/milvus/internal/proto/datapb" -) - -func TestMetaService_start(t *testing.T) { - node := newQueryNodeMock() - node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica) - - node.metaService.start() - node.Stop() -} - -func TestMetaService_getCollectionObjId(t *testing.T) { - var key = "/collection/collection0" - var collectionObjID1 = GetCollectionObjID(key) - - assert.Equal(t, collectionObjID1, "/collection/collection0") - - key = "fakeKey" - var collectionObjID2 = GetCollectionObjID(key) - - assert.Equal(t, collectionObjID2, "fakeKey") -} - -func TestMetaService_getSegmentObjId(t *testing.T) { - var key = "/segment/segment0" - var segmentObjID1 = GetSegmentObjID(key) - - assert.Equal(t, segmentObjID1, "/segment/segment0") - - key = "fakeKey" - var segmentObjID2 = GetSegmentObjID(key) - - assert.Equal(t, segmentObjID2, "fakeKey") -} - -func TestMetaService_isCollectionObj(t *testing.T) { - var key = Params.MetaRootPath + "/collection/collection0" - var b1 = isCollectionObj(key) - - assert.Equal(t, b1, true) - - key = Params.MetaRootPath + "/segment/segment0" - var b2 = isCollectionObj(key) - - assert.Equal(t, b2, false) -} - -func TestMetaService_isSegmentObj(t *testing.T) { - var key = Params.MetaRootPath + "/segment/segment0" - var b1 = isSegmentObj(key) - - assert.Equal(t, b1, true) - - key = Params.MetaRootPath + "/collection/collection0" - var b2 = isSegmentObj(key) - - assert.Equal(t, b2, false) -} - -func TestMetaService_printCollectionStruct(t *testing.T) { - collectionID := UniqueID(0) - collectionMeta := genTestCollectionMeta(collectionID, false) - printCollectionStruct(collectionMeta) -} - -func TestMetaService_printSegmentStruct(t *testing.T) { - var s = datapb.SegmentInfo{ - ID: UniqueID(0), - CollectionID: UniqueID(0), - PartitionID: defaultPartitionID, - NumRows: UniqueID(0), - } - - printSegmentStruct(&s) -} - -func TestMetaService_processCollectionCreate(t *testing.T) { - node := newQueryNodeMock() - node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica) - - id := "0" - value := `schema: < - name: "test" - autoID: true - fields: < - fieldID:100 - name: "vec" - data_type: FloatVector - type_params: < - key: "dim" - value: "16" - > - index_params: < - key: "metric_type" - value: "L2" - > - > - fields: < - fieldID:101 - name: "age" - data_type: Int32 - type_params: < - key: "dim" - value: "1" - > - > - > - partitionIDs: 2021 - ` - - node.metaService.processCollectionCreate(id, value) - - collectionNum := node.replica.getCollectionNum() - assert.Equal(t, collectionNum, 1) - - collection, err := node.replica.getCollectionByID(UniqueID(0)) - assert.NoError(t, err) - assert.Equal(t, collection.ID(), UniqueID(0)) - node.Stop() -} - -func TestMetaService_processSegmentCreate(t *testing.T) { - node := newQueryNodeMock() - collectionID := UniqueID(0) - initTestMeta(t, node, collectionID, 0) - node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica) - - id := "0" - value := `partitionID: 2021 - ` - - (*node.metaService).processSegmentCreate(id, value) - - s, err := node.replica.getSegmentByID(UniqueID(0)) - assert.NoError(t, err) - assert.Equal(t, s.segmentID, UniqueID(0)) - node.Stop() -} - -func TestMetaService_processCreate(t *testing.T) { - node := newQueryNodeMock() - node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica) - - key1 := Params.MetaRootPath + "/collection/0" - msg1 := `schema: < - name: "test" - autoID: true - fields: < - fieldID:100 - name: "vec" - data_type: FloatVector - type_params: < - key: "dim" - value: "16" - > - index_params: < - key: "metric_type" - value: "L2" - > - > - fields: < - fieldID:101 - name: "age" - data_type: Int32 - type_params: < - key: "dim" - value: "1" - > - > - > - partitionIDs: 2021 - ` - - (*node.metaService).processCreate(key1, msg1) - collectionNum := node.replica.getCollectionNum() - assert.Equal(t, collectionNum, 1) - - collection, err := node.replica.getCollectionByID(UniqueID(0)) - assert.NoError(t, err) - assert.Equal(t, collection.ID(), UniqueID(0)) - - key2 := Params.MetaRootPath + "/segment/0" - msg2 := `partitionID: 2021 - ` - - (*node.metaService).processCreate(key2, msg2) - s, err := node.replica.getSegmentByID(UniqueID(0)) - assert.NoError(t, err) - assert.Equal(t, s.segmentID, UniqueID(0)) - node.Stop() -} - -func TestMetaService_loadCollections(t *testing.T) { - node := newQueryNodeMock() - node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica) - - err2 := (*node.metaService).loadCollections() - assert.Nil(t, err2) - node.Stop() -} - -func TestMetaService_loadSegments(t *testing.T) { - node := newQueryNodeMock() - node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica) - - err2 := (*node.metaService).loadSegments() - assert.Nil(t, err2) - node.Stop() -} diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index da8ea14051..e3fce2412a 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -30,7 +30,6 @@ import ( "fmt" "math/rand" "strconv" - "sync" "sync/atomic" "time" @@ -53,15 +52,12 @@ type QueryNode struct { QueryNodeID UniqueID stateCode atomic.Value - replica ReplicaInterface + // internal components + historical *historical + streaming *streaming // internal services - metaService *metaService - searchService *searchService - loadService *loadService - statsService *statsService - dsServicesMu sync.Mutex // guards dataSyncServices - dataSyncServices map[UniqueID]*dataSyncService + searchService *searchService // clients masterService types.MasterService @@ -82,18 +78,13 @@ func NewQueryNode(ctx context.Context, queryNodeID UniqueID, factory msgstream.F queryNodeLoopCtx: ctx1, queryNodeLoopCancel: cancel, QueryNodeID: queryNodeID, - - dataSyncServices: make(map[UniqueID]*dataSyncService), - metaService: nil, - searchService: nil, - statsService: nil, - - msFactory: factory, + searchService: nil, + msFactory: factory, } node.scheduler = newTaskScheduler(ctx1) - node.replica = newCollectionReplica() node.UpdateStateCode(internalpb.StateCode_Abnormal) + return node } @@ -102,17 +93,11 @@ func NewQueryNodeWithoutID(ctx context.Context, factory msgstream.Factory) *Quer node := &QueryNode{ queryNodeLoopCtx: ctx1, queryNodeLoopCancel: cancel, - - dataSyncServices: make(map[UniqueID]*dataSyncService), - metaService: nil, - searchService: nil, - statsService: nil, - - msFactory: factory, + searchService: nil, + msFactory: factory, } node.scheduler = newTaskScheduler(ctx1) - node.replica = newCollectionReplica() node.UpdateStateCode(internalpb.StateCode_Abnormal) return node @@ -129,6 +114,13 @@ func (node *QueryNode) Register() error { func (node *QueryNode) Init() error { ctx := context.Background() + node.historical = newHistorical(node.queryNodeLoopCtx, + node.masterService, + node.dataService, + node.indexService, + node.msFactory) + node.streaming = newStreaming() + C.SegcoreInit() registerReq := &queryPb.RegisterNodeRequest{ Base: &commonpb.MsgBase{ @@ -192,17 +184,15 @@ func (node *QueryNode) Start() error { } // init services and manager - node.searchService = newSearchService(node.queryNodeLoopCtx, node.replica, node.msFactory) - node.loadService = newLoadService(node.queryNodeLoopCtx, node.masterService, node.dataService, node.indexService, node.replica) - node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, node.loadService.segLoader.indexLoader.fieldStatsChan, node.msFactory) + // TODO: pass node.streaming.replica to search service + node.searchService = newSearchService(node.queryNodeLoopCtx, node.historical.replica, node.streaming.replica, node.msFactory) // start task scheduler go node.scheduler.Start() // start services go node.searchService.start() - go node.loadService.start() - go node.statsService.start() + go node.historical.start() node.UpdateStateCode(internalpb.StateCode_Healthy) return nil } @@ -211,24 +201,16 @@ func (node *QueryNode) Stop() error { node.UpdateStateCode(internalpb.StateCode_Abnormal) node.queryNodeLoopCancel() - // free collectionReplica - node.replica.freeAll() - // close services - for _, dsService := range node.dataSyncServices { - if dsService != nil { - dsService.close() - } + if node.historical != nil { + node.historical.close() + } + if node.streaming != nil { + node.streaming.close() } if node.searchService != nil { node.searchService.close() } - if node.loadService != nil { - node.loadService.close() - } - if node.statsService != nil { - node.statsService.close() - } return nil } @@ -267,29 +249,3 @@ func (node *QueryNode) SetDataService(data types.DataService) error { node.dataService = data return nil } - -func (node *QueryNode) getDataSyncService(collectionID UniqueID) (*dataSyncService, error) { - node.dsServicesMu.Lock() - defer node.dsServicesMu.Unlock() - ds, ok := node.dataSyncServices[collectionID] - if !ok { - return nil, errors.New("cannot found dataSyncService, collectionID =" + fmt.Sprintln(collectionID)) - } - return ds, nil -} - -func (node *QueryNode) addDataSyncService(collectionID UniqueID, ds *dataSyncService) error { - node.dsServicesMu.Lock() - defer node.dsServicesMu.Unlock() - if _, ok := node.dataSyncServices[collectionID]; ok { - return errors.New("dataSyncService has been existed, collectionID =" + fmt.Sprintln(collectionID)) - } - node.dataSyncServices[collectionID] = ds - return nil -} - -func (node *QueryNode) removeDataSyncService(collectionID UniqueID) { - node.dsServicesMu.Lock() - defer node.dsServicesMu.Unlock() - delete(node.dataSyncServices, collectionID) -} diff --git a/internal/querynode/query_node_test.go b/internal/querynode/query_node_test.go index 1fab8b028d..bf3ed33961 100644 --- a/internal/querynode/query_node_test.go +++ b/internal/querynode/query_node_test.go @@ -121,18 +121,18 @@ func initTestMeta(t *testing.T, node *QueryNode, collectionID UniqueID, segmentI } collectionMeta := genTestCollectionMeta(collectionID, isBinary) - var err = node.replica.addCollection(collectionMeta.ID, collectionMeta.Schema) + var err = node.historical.replica.addCollection(collectionMeta.ID, collectionMeta.Schema) assert.NoError(t, err) - collection, err := node.replica.getCollectionByID(collectionID) + collection, err := node.historical.replica.getCollectionByID(collectionID) assert.NoError(t, err) assert.Equal(t, collection.ID(), collectionID) - assert.Equal(t, node.replica.getCollectionNum(), 1) + assert.Equal(t, node.historical.replica.getCollectionNum(), 1) - err = node.replica.addPartition(collection.ID(), collectionMeta.PartitionIDs[0]) + err = node.historical.replica.addPartition(collection.ID(), collectionMeta.PartitionIDs[0]) assert.NoError(t, err) - err = node.replica.addSegment(segmentID, collectionMeta.PartitionIDs[0], collectionID, segmentTypeGrowing) + err = node.historical.replica.addSegment(segmentID, collectionMeta.PartitionIDs[0], collectionID, segmentTypeGrowing) assert.NoError(t, err) } @@ -179,6 +179,8 @@ func newQueryNodeMock() *QueryNode { if err != nil { panic(err) } + svr.historical = newHistorical(svr.queryNodeLoopCtx, nil, nil, nil, svr.msFactory) + svr.streaming = newStreaming() return svr } diff --git a/internal/querynode/search_collection.go b/internal/querynode/search_collection.go index 8826de5c09..9fb719fb0a 100644 --- a/internal/querynode/search_collection.go +++ b/internal/querynode/search_collection.go @@ -34,8 +34,9 @@ type searchCollection struct { releaseCtx context.Context cancel context.CancelFunc - collectionID UniqueID - replica ReplicaInterface + collectionID UniqueID + historicalReplica ReplicaInterface + streamingReplica ReplicaInterface msgBuffer chan *msgstream.SearchMsg unsolvedMSgMu sync.Mutex // guards unsolvedMsg @@ -52,7 +53,12 @@ type searchCollection struct { type ResultEntityIds []UniqueID -func newSearchCollection(releaseCtx context.Context, cancel context.CancelFunc, collectionID UniqueID, replica ReplicaInterface, searchResultStream msgstream.MsgStream) *searchCollection { +func newSearchCollection(releaseCtx context.Context, + cancel context.CancelFunc, + collectionID UniqueID, + historicalReplica ReplicaInterface, + streamingReplica ReplicaInterface, + searchResultStream msgstream.MsgStream) *searchCollection { receiveBufSize := Params.SearchReceiveBufSize msgBuffer := make(chan *msgstream.SearchMsg, receiveBufSize) unsolvedMsg := make([]*msgstream.SearchMsg, 0) @@ -61,8 +67,9 @@ func newSearchCollection(releaseCtx context.Context, cancel context.CancelFunc, releaseCtx: releaseCtx, cancel: cancel, - collectionID: collectionID, - replica: replica, + collectionID: collectionID, + historicalReplica: historicalReplica, + streamingReplica: streamingReplica, msgBuffer: msgBuffer, unsolvedMsg: unsolvedMsg, @@ -80,8 +87,8 @@ func (s *searchCollection) start() { } func (s *searchCollection) register(collectionID UniqueID) { - s.replica.addTSafe(collectionID) - tSafe := s.replica.getTSafe(collectionID) + s.streamingReplica.addTSafe(collectionID) + tSafe := s.streamingReplica.getTSafe(collectionID) s.tSafeMutex.Lock() s.tSafeWatcher = newTSafeWatcher() s.tSafeMutex.Unlock() @@ -105,7 +112,7 @@ func (s *searchCollection) popAllUnsolvedMsg() []*msgstream.SearchMsg { func (s *searchCollection) waitNewTSafe() (Timestamp, error) { // block until dataSyncService updating tSafe s.tSafeWatcher.hasUpdate() - ts := s.replica.getTSafe(s.collectionID) + ts := s.streamingReplica.getTSafe(s.collectionID) if ts != nil { return ts.get(), nil } @@ -271,7 +278,7 @@ func (s *searchCollection) search(searchMsg *msgstream.SearchMsg) error { searchTimestamp := searchMsg.Base.Timestamp collectionID := searchMsg.CollectionID - collection, err := s.replica.getCollectionByID(collectionID) + collection, err := s.historicalReplica.getCollectionByID(collectionID) if err != nil { return err } @@ -302,25 +309,35 @@ func (s *searchCollection) search(searchMsg *msgstream.SearchMsg) error { matchedSegments := make([]*Segment, 0) //log.Debug("search msg's partitionID = ", partitionIDsInQuery) - partitionIDsInCol, err := s.replica.getPartitionIDs(collectionID) - if err != nil { - return err + partitionIDsInHistoricalCol, err1 := s.historicalReplica.getPartitionIDs(collectionID) + partitionIDsInStreamingCol, err2 := s.streamingReplica.getPartitionIDs(collectionID) + + if err1 != nil && err2 != nil { + return err2 } - var searchPartitionIDs []UniqueID + var searchPartitionIDsInHistorical []UniqueID + var searchPartitionIDsInStreaming []UniqueID partitionIDsInQuery := searchMsg.PartitionIDs if len(partitionIDsInQuery) == 0 { - if len(partitionIDsInCol) == 0 { + if len(partitionIDsInHistoricalCol) == 0 { return errors.New("none of this collection's partition has been loaded") } - searchPartitionIDs = partitionIDsInCol + searchPartitionIDsInHistorical = partitionIDsInHistoricalCol + searchPartitionIDsInStreaming = partitionIDsInStreamingCol } else { for _, id := range partitionIDsInQuery { - _, err2 := s.replica.getPartitionByID(id) - if err2 != nil { + _, err1 = s.historicalReplica.getPartitionByID(id) + if err1 == nil { + searchPartitionIDsInHistorical = append(searchPartitionIDsInHistorical, id) + } + _, err2 = s.streamingReplica.getPartitionByID(id) + if err2 == nil { + searchPartitionIDsInStreaming = append(searchPartitionIDsInStreaming, id) + } + if err1 != nil && err2 != nil { return err2 } } - searchPartitionIDs = partitionIDsInQuery } if searchMsg.GetDslType() == commonpb.DslType_BoolExprV1 { @@ -332,13 +349,37 @@ func (s *searchCollection) search(searchMsg *msgstream.SearchMsg) error { oplog.Object("nq", queryNum), oplog.Object("dsl", searchMsg.Dsl)) } - for _, partitionID := range searchPartitionIDs { - segmentIDs, err := s.replica.getSegmentIDs(partitionID) + + sealedSegmentSearched := make([]UniqueID, 0) + for _, partitionID := range searchPartitionIDsInHistorical { + segmentIDs, err := s.historicalReplica.getSegmentIDs(partitionID) if err != nil { return err } for _, segmentID := range segmentIDs { - segment, err := s.replica.getSegmentByID(segmentID) + segment, err := s.historicalReplica.getSegmentByID(segmentID) + if err != nil { + return err + } + searchResult, err := segment.segmentSearch(plan, searchRequests, []Timestamp{searchTimestamp}) + + if err != nil { + return err + } + searchResults = append(searchResults, searchResult) + matchedSegments = append(matchedSegments, segment) + sealedSegmentSearched = append(sealedSegmentSearched, segmentID) + } + } + + //TODO:: get searched channels + for _, partitionID := range searchPartitionIDsInStreaming { + segmentIDs, err := s.streamingReplica.getSegmentIDs(partitionID) + if err != nil { + return err + } + for _, segmentID := range segmentIDs { + segment, err := s.streamingReplica.getSegmentByID(segmentID) if err != nil { return err } @@ -456,10 +497,14 @@ func (s *searchCollection) search(searchMsg *msgstream.SearchMsg) error { Timestamp: searchTimestamp, SourceID: searchMsg.Base.SourceID, }, - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, - ResultChannelID: searchMsg.ResultChannelID, - Hits: hits, - MetricType: plan.getMetricType(), + Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, + ResultChannelID: searchMsg.ResultChannelID, + Hits: hits, + MetricType: plan.getMetricType(), + SealedSegmentIDsSearched: sealedSegmentSearched, + ChannelIDsSearched: s.streamingReplica.getWatchedDmChannels(), + //TODO:: get global sealed segment from etcd + GlobalSealedSegmentIDs: sealedSegmentSearched, }, } diff --git a/internal/querynode/search_service.go b/internal/querynode/search_service.go index 8359ae8e15..3bbee0e397 100644 --- a/internal/querynode/search_service.go +++ b/internal/querynode/search_service.go @@ -26,7 +26,8 @@ type searchService struct { ctx context.Context cancel context.CancelFunc - replica ReplicaInterface + historicalReplica ReplicaInterface + streamingReplica ReplicaInterface searchMsgStream msgstream.MsgStream searchResultMsgStream msgstream.MsgStream @@ -36,7 +37,10 @@ type searchService struct { emptySearchCollection *searchCollection } -func newSearchService(ctx context.Context, replica ReplicaInterface, factory msgstream.Factory) *searchService { +func newSearchService(ctx context.Context, + historicalReplica ReplicaInterface, + streamingReplica ReplicaInterface, + factory msgstream.Factory) *searchService { searchStream, _ := factory.NewQueryMsgStream(ctx) searchResultStream, _ := factory.NewQueryMsgStream(ctx) @@ -56,7 +60,8 @@ func newSearchService(ctx context.Context, replica ReplicaInterface, factory msg ctx: searchServiceCtx, cancel: searchServiceCancel, - replica: replica, + historicalReplica: historicalReplica, + streamingReplica: streamingReplica, searchMsgStream: searchStream, searchResultMsgStream: searchResultStream, @@ -75,7 +80,7 @@ func (s *searchService) start() { func (s *searchService) collectionCheck(collectionID UniqueID) error { // check if collection exists - if ok := s.replica.hasCollection(collectionID); !ok { + if ok := s.historicalReplica.hasCollection(collectionID); !ok { err := errors.New("no collection found, collectionID = " + strconv.FormatInt(collectionID, 10)) log.Error(err.Error()) return err @@ -139,14 +144,14 @@ func (s *searchService) close() { func (s *searchService) startSearchCollection(collectionID UniqueID) { ctx1, cancel := context.WithCancel(s.ctx) - sc := newSearchCollection(ctx1, cancel, collectionID, s.replica, s.searchResultMsgStream) + sc := newSearchCollection(ctx1, cancel, collectionID, s.historicalReplica, s.streamingReplica, s.searchResultMsgStream) s.searchCollections[collectionID] = sc sc.start() } func (s *searchService) startEmptySearchCollection() { ctx1, cancel := context.WithCancel(s.ctx) - sc := newSearchCollection(ctx1, cancel, UniqueID(-1), s.replica, s.searchResultMsgStream) + sc := newSearchCollection(ctx1, cancel, UniqueID(-1), s.historicalReplica, s.streamingReplica, s.searchResultMsgStream) s.emptySearchCollection = sc sc.start() } diff --git a/internal/querynode/search_service_test.go b/internal/querynode/search_service_test.go index c7dc89b543..17ad5241f8 100644 --- a/internal/querynode/search_service_test.go +++ b/internal/querynode/search_service_test.go @@ -141,26 +141,26 @@ func TestSearch_Search(t *testing.T) { assert.NoError(t, err) // start dataSync - newDS := newDataSyncService(node.queryNodeLoopCtx, node.replica, msFactory, collectionID) - err = node.addDataSyncService(collectionID, newDS) + newDS := newDataSyncService(node.queryNodeLoopCtx, node.streaming.replica, msFactory, collectionID) + err = node.streaming.addDataSyncService(collectionID, newDS) assert.NoError(t, err) - ds, err := node.getDataSyncService(collectionID) + ds, err := node.streaming.getDataSyncService(collectionID) assert.NoError(t, err) go ds.start() // start search service - node.searchService = newSearchService(node.queryNodeLoopCtx, node.replica, msFactory) + node.searchService = newSearchService(node.queryNodeLoopCtx, node.historical.replica, node.streaming.replica, msFactory) go node.searchService.start() node.searchService.startSearchCollection(collectionID) - tSafe := node.replica.getTSafe(collectionID) + tSafe := node.streaming.replica.getTSafe(collectionID) assert.NotNil(t, tSafe) tSafe.set(1000) // load segment - err = node.replica.addSegment(segmentID, defaultPartitionID, collectionID, segmentTypeSealed) + err = node.historical.replica.addSegment(segmentID, defaultPartitionID, collectionID, segmentTypeSealed) assert.NoError(t, err) - segment, err := node.replica.getSegmentByID(segmentID) + segment, err := node.historical.replica.getSegmentByID(segmentID) assert.NoError(t, err) err = loadFields(segment, DIM, N) assert.NoError(t, err) @@ -189,33 +189,33 @@ func TestSearch_SearchMultiSegments(t *testing.T) { assert.NoError(t, err) // start dataSync - newDS := newDataSyncService(node.queryNodeLoopCtx, node.replica, msFactory, collectionID) - err = node.addDataSyncService(collectionID, newDS) + newDS := newDataSyncService(node.queryNodeLoopCtx, node.streaming.replica, msFactory, collectionID) + err = node.streaming.addDataSyncService(collectionID, newDS) assert.NoError(t, err) - ds, err := node.getDataSyncService(collectionID) + ds, err := node.streaming.getDataSyncService(collectionID) assert.NoError(t, err) go ds.start() // start search service - node.searchService = newSearchService(node.queryNodeLoopCtx, node.replica, msFactory) + node.searchService = newSearchService(node.queryNodeLoopCtx, node.streaming.replica, node.streaming.replica, msFactory) go node.searchService.start() node.searchService.startSearchCollection(collectionID) - tSafe := node.replica.getTSafe(collectionID) + tSafe := node.streaming.replica.getTSafe(collectionID) assert.NotNil(t, tSafe) tSafe.set(1000) // load segments - err = node.replica.addSegment(segmentID1, defaultPartitionID, collectionID, segmentTypeSealed) + err = node.historical.replica.addSegment(segmentID1, defaultPartitionID, collectionID, segmentTypeSealed) assert.NoError(t, err) - segment1, err := node.replica.getSegmentByID(segmentID1) + segment1, err := node.historical.replica.getSegmentByID(segmentID1) assert.NoError(t, err) err = loadFields(segment1, DIM, N) assert.NoError(t, err) - err = node.replica.addSegment(segmentID2, defaultPartitionID, collectionID, segmentTypeSealed) + err = node.historical.replica.addSegment(segmentID2, defaultPartitionID, collectionID, segmentTypeSealed) assert.NoError(t, err) - segment2, err := node.replica.getSegmentByID(segmentID2) + segment2, err := node.historical.replica.getSegmentByID(segmentID2) assert.NoError(t, err) err = loadFields(segment2, DIM, N) assert.NoError(t, err) diff --git a/internal/querynode/stats_service_test.go b/internal/querynode/stats_service_test.go index d714373e03..f69883ca6d 100644 --- a/internal/querynode/stats_service_test.go +++ b/internal/querynode/stats_service_test.go @@ -29,8 +29,8 @@ func TestStatsService_start(t *testing.T) { "ReceiveBufSize": 1024, "PulsarBufSize": 1024} msFactory.SetParams(m) - node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, nil, msFactory) - node.statsService.start() + node.historical.statsService = newStatsService(node.queryNodeLoopCtx, node.historical.replica, nil, msFactory) + node.historical.statsService.start() node.Stop() } @@ -57,11 +57,11 @@ func TestSegmentManagement_sendSegmentStatistic(t *testing.T) { var statsMsgStream msgstream.MsgStream = statsStream - node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, nil, msFactory) - node.statsService.statsStream = statsMsgStream - node.statsService.statsStream.Start() + node.historical.statsService = newStatsService(node.queryNodeLoopCtx, node.historical.replica, nil, msFactory) + node.historical.statsService.statsStream = statsMsgStream + node.historical.statsService.statsStream.Start() // send stats - node.statsService.publicStatistic(nil) + node.historical.statsService.publicStatistic(nil) node.Stop() } diff --git a/internal/querynode/streaming.go b/internal/querynode/streaming.go new file mode 100644 index 0000000000..8bc9a23985 --- /dev/null +++ b/internal/querynode/streaming.go @@ -0,0 +1,75 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package querynode + +import ( + "errors" + "fmt" + "sync" +) + +type streaming struct { + replica ReplicaInterface + + dsServicesMu sync.Mutex // guards dataSyncServices + dataSyncServices map[UniqueID]*dataSyncService +} + +func newStreaming() *streaming { + replica := newCollectionReplica() + ds := make(map[UniqueID]*dataSyncService) + return &streaming{ + replica: replica, + dataSyncServices: ds, + } +} + +func (s *streaming) start() { + // TODO: start stats +} + +func (s *streaming) close() { + // TODO: stop stats + for _, ds := range s.dataSyncServices { + ds.close() + } + s.dataSyncServices = make(map[UniqueID]*dataSyncService) + + // free collectionReplica + s.replica.freeAll() +} + +func (s *streaming) getDataSyncService(collectionID UniqueID) (*dataSyncService, error) { + s.dsServicesMu.Lock() + defer s.dsServicesMu.Unlock() + ds, ok := s.dataSyncServices[collectionID] + if !ok { + return nil, errors.New("cannot found dataSyncService, collectionID =" + fmt.Sprintln(collectionID)) + } + return ds, nil +} + +func (s *streaming) addDataSyncService(collectionID UniqueID, ds *dataSyncService) error { + s.dsServicesMu.Lock() + defer s.dsServicesMu.Unlock() + if _, ok := s.dataSyncServices[collectionID]; ok { + return errors.New("dataSyncService has been existed, collectionID =" + fmt.Sprintln(collectionID)) + } + s.dataSyncServices[collectionID] = ds + return nil +} + +func (s *streaming) removeDataSyncService(collectionID UniqueID) { + s.dsServicesMu.Lock() + defer s.dsServicesMu.Unlock() + delete(s.dataSyncServices, collectionID) +} diff --git a/internal/querynode/task.go b/internal/querynode/task.go index 7ede7ed8df..8c0c51b27c 100644 --- a/internal/querynode/task.go +++ b/internal/querynode/task.go @@ -111,7 +111,7 @@ func (w *watchDmChannelsTask) PreExecute(ctx context.Context) error { func (w *watchDmChannelsTask) Execute(ctx context.Context) error { log.Debug("starting WatchDmChannels ...", zap.String("ChannelIDs", fmt.Sprintln(w.req.ChannelIDs))) collectionID := w.req.CollectionID - ds, err := w.node.getDataSyncService(collectionID) + ds, err := w.node.streaming.getDataSyncService(collectionID) if err != nil || ds.dmStream == nil { errMsg := "null data sync service or null data manipulation stream, collectionID = " + fmt.Sprintln(collectionID) log.Error(errMsg) @@ -148,7 +148,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error { toSeekInfo = append(toSeekInfo, info.Pos) log.Debug("prevent inserting segments", zap.String("segmentIDs", fmt.Sprintln(info.ExcludedSegments))) - err := w.node.replica.addExcludedSegments(collectionID, info.ExcludedSegments) + err := w.node.streaming.replica.addExcludedSegments(collectionID, info.ExcludedSegments) if err != nil { log.Error(err.Error()) return err @@ -164,6 +164,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error { return errors.New(errMsg) } } + w.node.streaming.replica.addWatchedDmChannels(w.req.ChannelIDs) log.Debug("querynode AsConsumer: " + strings.Join(consumeChannels, ", ") + " : " + consumeSubName) log.Debug("WatchDmChannels done", zap.String("ChannelIDs", fmt.Sprintln(w.req.ChannelIDs))) return nil @@ -205,32 +206,48 @@ func (l *loadSegmentsTask) Execute(ctx context.Context) error { log.Debug("query node load segment", zap.String("loadSegmentRequest", fmt.Sprintln(l.req))) - hasCollection := l.node.replica.hasCollection(collectionID) - hasPartition := l.node.replica.hasPartition(partitionID) - if !hasCollection { + hasCollectionInHistorical := l.node.historical.replica.hasCollection(collectionID) + hasPartitionInHistorical := l.node.historical.replica.hasPartition(partitionID) + if !hasCollectionInHistorical { // loading init - err := l.node.replica.addCollection(collectionID, schema) + err := l.node.historical.replica.addCollection(collectionID, schema) if err != nil { return err } - l.node.replica.initExcludedSegments(collectionID) - newDS := newDataSyncService(l.node.queryNodeLoopCtx, l.node.replica, l.node.msFactory, collectionID) + + hasCollectionInStreaming := l.node.streaming.replica.hasCollection(collectionID) + if !hasCollectionInStreaming { + err = l.node.streaming.replica.addCollection(collectionID, schema) + if err != nil { + return err + } + } + l.node.streaming.replica.initExcludedSegments(collectionID) + newDS := newDataSyncService(l.node.queryNodeLoopCtx, l.node.streaming.replica, l.node.msFactory, collectionID) // ignore duplicated dataSyncService error - _ = l.node.addDataSyncService(collectionID, newDS) - ds, err := l.node.getDataSyncService(collectionID) + _ = l.node.streaming.addDataSyncService(collectionID, newDS) + ds, err := l.node.streaming.getDataSyncService(collectionID) if err != nil { return err } go ds.start() l.node.searchService.startSearchCollection(collectionID) } - if !hasPartition { - err := l.node.replica.addPartition(collectionID, partitionID) + if !hasPartitionInHistorical { + err := l.node.historical.replica.addPartition(collectionID, partitionID) if err != nil { return err } + + hasPartitionInStreaming := l.node.streaming.replica.hasPartition(partitionID) + if !hasPartitionInStreaming { + err = l.node.streaming.replica.addPartition(collectionID, partitionID) + if err != nil { + return err + } + } } - err := l.node.replica.enablePartition(partitionID) + err := l.node.streaming.replica.enablePartition(partitionID) if err != nil { return err } @@ -239,7 +256,7 @@ func (l *loadSegmentsTask) Execute(ctx context.Context) error { return nil } - err = l.node.loadService.loadSegmentPassively(collectionID, partitionID, segmentIDs, fieldIDs) + err = l.node.historical.loadService.loadSegmentPassively(collectionID, partitionID, segmentIDs, fieldIDs) if err != nil { return err } @@ -275,21 +292,32 @@ func (r *releaseCollectionTask) PreExecute(ctx context.Context) error { } func (r *releaseCollectionTask) Execute(ctx context.Context) error { - ds, err := r.node.getDataSyncService(r.req.CollectionID) + ds, err := r.node.streaming.getDataSyncService(r.req.CollectionID) if err == nil && ds != nil { ds.close() - r.node.removeDataSyncService(r.req.CollectionID) - r.node.replica.removeTSafe(r.req.CollectionID) - r.node.replica.removeExcludedSegments(r.req.CollectionID) + r.node.streaming.removeDataSyncService(r.req.CollectionID) + r.node.streaming.replica.removeTSafe(r.req.CollectionID) + r.node.streaming.replica.removeExcludedSegments(r.req.CollectionID) } if r.node.searchService.hasSearchCollection(r.req.CollectionID) { r.node.searchService.stopSearchCollection(r.req.CollectionID) } - err = r.node.replica.removeCollection(r.req.CollectionID) - if err != nil { - return err + hasCollectionInHistorical := r.node.historical.replica.hasCollection(r.req.CollectionID) + if hasCollectionInHistorical { + err := r.node.historical.replica.removeCollection(r.req.CollectionID) + if err != nil { + return err + } + } + + hasCollectionInStreaming := r.node.streaming.replica.hasCollection(r.req.CollectionID) + if hasCollectionInStreaming { + err := r.node.streaming.replica.removePartition(r.req.CollectionID) + if err != nil { + return err + } } log.Debug("ReleaseCollection done", zap.Int64("collectionID", r.req.CollectionID)) @@ -324,10 +352,21 @@ func (r *releasePartitionsTask) PreExecute(ctx context.Context) error { func (r *releasePartitionsTask) Execute(ctx context.Context) error { for _, id := range r.req.PartitionIDs { - err := r.node.loadService.segLoader.replica.removePartition(id) - if err != nil { - // not return, try to release all partitions - log.Error(err.Error()) + hasPartitionInHistorical := r.node.historical.replica.hasPartition(id) + if hasPartitionInHistorical { + err := r.node.historical.replica.removePartition(id) + if err != nil { + // not return, try to release all partitions + log.Error(err.Error()) + } + } + + hasPartitionInStreaming := r.node.streaming.replica.hasPartition(id) + if hasPartitionInStreaming { + err := r.node.streaming.replica.removePartition(id) + if err != nil { + log.Error(err.Error()) + } } } return nil