From 4e4ff54d984549e2956355fdf5c6e479bbf1eec1 Mon Sep 17 00:00:00 2001 From: xige-16 Date: Tue, 21 Dec 2021 13:50:54 +0800 Subject: [PATCH] Watch query channel to near the latest position (#13682) Signed-off-by: xige-16 --- internal/proto/query_coord.proto | 5 +- internal/proto/querypb/query_coord.pb.go | 303 +++++++++--------- internal/querycoord/cluster.go | 17 +- internal/querycoord/cluster_test.go | 20 +- internal/querycoord/impl.go | 14 +- internal/querycoord/impl_test.go | 19 -- internal/querycoord/meta.go | 246 +++++--------- internal/querycoord/meta_test.go | 49 +-- internal/querycoord/segment_allocator_test.go | 2 +- internal/querycoord/task.go | 7 +- internal/querycoord/task_scheduler.go | 7 +- internal/querycoord/util.go | 26 ++ 12 files changed, 297 insertions(+), 418 deletions(-) create mode 100644 internal/querycoord/util.go diff --git a/internal/proto/query_coord.proto b/internal/proto/query_coord.proto index 46bf5f3ff3..a479a46c08 100644 --- a/internal/proto/query_coord.proto +++ b/internal/proto/query_coord.proto @@ -150,9 +150,8 @@ message AddQueryChannelRequest { int64 collectionID = 3; string query_channel = 4; string query_result_channel = 5; - repeated int64 global_sealed_segmentID = 6; - internal.MsgPosition seek_position = 7; - repeated SegmentInfo global_sealed_segments = 8; + internal.MsgPosition seek_position = 6; + repeated SegmentInfo global_sealed_segments = 7; } message RemoveQueryChannelRequest { diff --git a/internal/proto/querypb/query_coord.pb.go b/internal/proto/querypb/query_coord.pb.go index 5c5b85ebdc..a3c8606f8c 100644 --- a/internal/proto/querypb/query_coord.pb.go +++ b/internal/proto/querypb/query_coord.pb.go @@ -947,17 +947,16 @@ func (m *GetSegmentInfoResponse) GetInfos() []*SegmentInfo { //-----------------query node grpc request and response proto---------------- type AddQueryChannelRequest struct { - Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"` - NodeID int64 `protobuf:"varint,2,opt,name=nodeID,proto3" json:"nodeID,omitempty"` - CollectionID int64 `protobuf:"varint,3,opt,name=collectionID,proto3" json:"collectionID,omitempty"` - QueryChannel string `protobuf:"bytes,4,opt,name=query_channel,json=queryChannel,proto3" json:"query_channel,omitempty"` - QueryResultChannel string `protobuf:"bytes,5,opt,name=query_result_channel,json=queryResultChannel,proto3" json:"query_result_channel,omitempty"` - GlobalSealedSegmentID []int64 `protobuf:"varint,6,rep,packed,name=global_sealed_segmentID,json=globalSealedSegmentID,proto3" json:"global_sealed_segmentID,omitempty"` - SeekPosition *internalpb.MsgPosition `protobuf:"bytes,7,opt,name=seek_position,json=seekPosition,proto3" json:"seek_position,omitempty"` - GlobalSealedSegments []*SegmentInfo `protobuf:"bytes,8,rep,name=global_sealed_segments,json=globalSealedSegments,proto3" json:"global_sealed_segments,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"` + NodeID int64 `protobuf:"varint,2,opt,name=nodeID,proto3" json:"nodeID,omitempty"` + CollectionID int64 `protobuf:"varint,3,opt,name=collectionID,proto3" json:"collectionID,omitempty"` + QueryChannel string `protobuf:"bytes,4,opt,name=query_channel,json=queryChannel,proto3" json:"query_channel,omitempty"` + QueryResultChannel string `protobuf:"bytes,5,opt,name=query_result_channel,json=queryResultChannel,proto3" json:"query_result_channel,omitempty"` + SeekPosition *internalpb.MsgPosition `protobuf:"bytes,6,opt,name=seek_position,json=seekPosition,proto3" json:"seek_position,omitempty"` + GlobalSealedSegments []*SegmentInfo `protobuf:"bytes,7,rep,name=global_sealed_segments,json=globalSealedSegments,proto3" json:"global_sealed_segments,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *AddQueryChannelRequest) Reset() { *m = AddQueryChannelRequest{} } @@ -1020,13 +1019,6 @@ func (m *AddQueryChannelRequest) GetQueryResultChannel() string { return "" } -func (m *AddQueryChannelRequest) GetGlobalSealedSegmentID() []int64 { - if m != nil { - return m.GlobalSealedSegmentID - } - return nil -} - func (m *AddQueryChannelRequest) GetSeekPosition() *internalpb.MsgPosition { if m != nil { return m.SeekPosition @@ -2228,145 +2220,144 @@ func init() { func init() { proto.RegisterFile("query_coord.proto", fileDescriptor_aab7cc9a69ed26e8) } var fileDescriptor_aab7cc9a69ed26e8 = []byte{ - // 2209 bytes of a gzipped FileDescriptorProto + // 2193 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xec, 0x19, 0x4d, 0x73, 0xdc, 0x48, - 0xd5, 0x9a, 0x2f, 0x7b, 0xde, 0x8c, 0x67, 0xe4, 0xf6, 0xc7, 0x8e, 0x87, 0x24, 0xeb, 0xd5, 0xae, - 0xb3, 0xc1, 0xcb, 0xda, 0xc1, 0x01, 0x8a, 0x2d, 0xe0, 0x10, 0xdb, 0xc4, 0x6b, 0x76, 0xe3, 0x98, - 0xb1, 0xb3, 0x14, 0xa9, 0x54, 0x09, 0xcd, 0xa8, 0x3d, 0x56, 0x45, 0x52, 0x4f, 0xd4, 0x9a, 0xb5, - 0x1d, 0xae, 0x1c, 0x96, 0x03, 0xc5, 0x1f, 0xa0, 0xb8, 0x00, 0x05, 0xa9, 0x62, 0x8f, 0xdc, 0x73, - 0xe1, 0x6f, 0x50, 0x1c, 0xe0, 0x27, 0x70, 0xa7, 0xba, 0xd5, 0xd2, 0x48, 0x9a, 0x96, 0x2d, 0xdb, - 0xe5, 0x24, 0x45, 0x71, 0x93, 0x5e, 0xbf, 0xd7, 0xef, 0xf5, 0xfb, 0xee, 0x7e, 0x30, 0xf3, 0x7c, - 0x88, 0xbd, 0x53, 0xbd, 0x47, 0x88, 0x67, 0xae, 0x0e, 0x3c, 0xe2, 0x13, 0x84, 0x1c, 0xcb, 0xfe, - 0x72, 0x48, 0x83, 0xbf, 0x55, 0xbe, 0xde, 0xae, 0xf7, 0x88, 0xe3, 0x10, 0x37, 0x80, 0xb5, 0xeb, - 0x71, 0x8c, 0x76, 0xc3, 0x72, 0x7d, 0xec, 0xb9, 0x86, 0x1d, 0xae, 0xd2, 0xde, 0x11, 0x76, 0x0c, - 0xf1, 0xa7, 0x9a, 0x86, 0x6f, 0xc4, 0xf7, 0x6f, 0xcf, 0x58, 0xae, 0x89, 0x4f, 0xe2, 0x20, 0xed, - 0x57, 0x0a, 0x2c, 0xec, 0x1f, 0x91, 0xe3, 0x4d, 0x62, 0xdb, 0xb8, 0xe7, 0x5b, 0xc4, 0xa5, 0x1d, - 0xfc, 0x7c, 0x88, 0xa9, 0x8f, 0xee, 0x42, 0xa9, 0x6b, 0x50, 0xdc, 0x52, 0x96, 0x94, 0x3b, 0xb5, - 0xf5, 0x1b, 0xab, 0x09, 0xe1, 0x84, 0x54, 0x0f, 0x69, 0x7f, 0xc3, 0xa0, 0xb8, 0xc3, 0x31, 0x11, - 0x82, 0x92, 0xd9, 0xdd, 0xd9, 0x6a, 0x15, 0x96, 0x94, 0x3b, 0xc5, 0x0e, 0xff, 0x46, 0x1f, 0xc0, - 0x74, 0x2f, 0xda, 0x7b, 0x67, 0x8b, 0xb6, 0x8a, 0x4b, 0xc5, 0x3b, 0xc5, 0x4e, 0x12, 0xa8, 0xfd, - 0x59, 0x81, 0x77, 0xc6, 0xc4, 0xa0, 0x03, 0xe2, 0x52, 0x8c, 0xee, 0x41, 0x85, 0xfa, 0x86, 0x3f, - 0xa4, 0x42, 0x92, 0x6f, 0x48, 0x25, 0xd9, 0xe7, 0x28, 0x1d, 0x81, 0x3a, 0xce, 0xb6, 0x20, 0x61, - 0x8b, 0xbe, 0x0d, 0x73, 0x96, 0xfb, 0x10, 0x3b, 0xc4, 0x3b, 0xd5, 0x07, 0xd8, 0xeb, 0x61, 0xd7, - 0x37, 0xfa, 0x38, 0x94, 0x71, 0x36, 0x5c, 0xdb, 0x1b, 0x2d, 0x69, 0x7f, 0x52, 0x60, 0x9e, 0x49, - 0xba, 0x67, 0x78, 0xbe, 0x75, 0x0d, 0xfa, 0xd2, 0xa0, 0x1e, 0x97, 0xb1, 0x55, 0xe4, 0x6b, 0x09, - 0x18, 0xc3, 0x19, 0x84, 0xec, 0xd9, 0xd9, 0x4a, 0x5c, 0xdc, 0x04, 0x4c, 0xfb, 0xa3, 0x30, 0x6c, - 0x5c, 0xce, 0xab, 0x28, 0x34, 0xcd, 0xb3, 0x30, 0xce, 0xf3, 0x32, 0xea, 0x7c, 0xa5, 0xc0, 0xfc, - 0xe7, 0xc4, 0x30, 0x47, 0x86, 0x7f, 0xfd, 0xea, 0xfc, 0x11, 0x54, 0x82, 0xc0, 0x69, 0x95, 0x38, - 0xaf, 0xe5, 0x24, 0x2f, 0x11, 0x54, 0x23, 0x09, 0xf7, 0x39, 0xa0, 0x23, 0x88, 0xb4, 0xdf, 0x29, - 0xd0, 0xea, 0x60, 0x1b, 0x1b, 0x14, 0xbf, 0xc9, 0x53, 0x2c, 0x40, 0xc5, 0x25, 0x26, 0xde, 0xd9, - 0xe2, 0xa7, 0x28, 0x76, 0xc4, 0x9f, 0xf6, 0x2f, 0xa1, 0xe1, 0xb7, 0xdc, 0x61, 0x63, 0x56, 0x28, - 0x5f, 0xc6, 0x0a, 0xaf, 0x46, 0x56, 0x78, 0xdb, 0x4f, 0x3a, 0xb2, 0x54, 0x39, 0x61, 0xa9, 0x9f, - 0xc3, 0xe2, 0xa6, 0x87, 0x0d, 0x1f, 0xff, 0x94, 0x65, 0xfe, 0xcd, 0x23, 0xc3, 0x75, 0xb1, 0x1d, - 0x1e, 0x21, 0xcd, 0x5c, 0x91, 0x30, 0x6f, 0xc1, 0xe4, 0xc0, 0x23, 0x27, 0xa7, 0x91, 0xdc, 0xe1, - 0xaf, 0xf6, 0x17, 0x05, 0xda, 0xb2, 0xbd, 0xaf, 0x92, 0x11, 0xde, 0x87, 0x69, 0x51, 0xc2, 0x82, - 0xdd, 0x38, 0xcf, 0x6a, 0xa7, 0xfe, 0x3c, 0xc6, 0x01, 0xdd, 0x85, 0xb9, 0x00, 0xc9, 0xc3, 0x74, - 0x68, 0xfb, 0x11, 0x6e, 0x91, 0xe3, 0x22, 0xbe, 0xd6, 0xe1, 0x4b, 0x82, 0x42, 0x7b, 0xa9, 0xc0, - 0xe2, 0x36, 0xf6, 0x23, 0x23, 0x32, 0xae, 0xf8, 0x2d, 0x4d, 0xb2, 0x5f, 0x2b, 0xd0, 0x96, 0xc9, - 0x7a, 0x15, 0xb5, 0x3e, 0x81, 0x85, 0x88, 0x87, 0x6e, 0x62, 0xda, 0xf3, 0xac, 0x01, 0x77, 0x66, - 0x9e, 0x72, 0x6b, 0xeb, 0xef, 0xaf, 0x8e, 0x77, 0x09, 0xab, 0x69, 0x09, 0xe6, 0xa3, 0x2d, 0xb6, - 0x62, 0x3b, 0x68, 0xbf, 0x51, 0x60, 0x7e, 0x1b, 0xfb, 0xfb, 0xb8, 0xef, 0x60, 0xd7, 0xdf, 0x71, - 0x0f, 0xc9, 0xe5, 0xf5, 0x7a, 0x0b, 0x80, 0x8a, 0x7d, 0xa2, 0x72, 0x10, 0x83, 0xe4, 0xd1, 0x31, - 0xef, 0x3e, 0xd2, 0xf2, 0x5c, 0x45, 0x77, 0xdf, 0x85, 0xb2, 0xe5, 0x1e, 0x92, 0x50, 0x55, 0xef, - 0xca, 0x54, 0x15, 0x67, 0x16, 0x60, 0x6b, 0x7f, 0x2b, 0xc2, 0xc2, 0x7d, 0xd3, 0x94, 0x85, 0xdd, - 0xc5, 0xf5, 0x32, 0x8a, 0xee, 0x42, 0x3c, 0xba, 0x73, 0xf9, 0xdc, 0x58, 0x48, 0x95, 0x2e, 0x10, - 0x52, 0xe5, 0xac, 0x90, 0x42, 0xdf, 0x83, 0x77, 0xfa, 0x36, 0xe9, 0x1a, 0xb6, 0x4e, 0xb1, 0x61, - 0x63, 0x53, 0x8f, 0xcc, 0xd4, 0xaa, 0x70, 0xbb, 0xcd, 0x07, 0xcb, 0xfb, 0x7c, 0x35, 0x54, 0xd0, - 0x16, 0xda, 0x86, 0x69, 0x8a, 0xf1, 0x33, 0x7d, 0x40, 0x28, 0xf7, 0xa5, 0xd6, 0x24, 0xd7, 0x82, - 0x96, 0xd4, 0x42, 0xd4, 0x84, 0x3e, 0xa4, 0xfd, 0x3d, 0x81, 0xd9, 0xa9, 0x33, 0xc2, 0xf0, 0x0f, - 0x3d, 0x86, 0x05, 0xa9, 0x00, 0xb4, 0x35, 0x95, 0xcf, 0x50, 0x73, 0x12, 0x01, 0xa9, 0xf6, 0x4f, - 0x05, 0x16, 0x3b, 0xd8, 0x21, 0x5f, 0xe2, 0xff, 0x55, 0xd3, 0x69, 0xff, 0x2e, 0xc0, 0xc2, 0xcf, - 0x0c, 0xbf, 0x77, 0xb4, 0xe5, 0x08, 0x10, 0x7d, 0x33, 0xe7, 0xcb, 0x53, 0xd8, 0xa2, 0xf0, 0x2b, - 0xcb, 0xac, 0xca, 0xae, 0x23, 0xab, 0x5f, 0x88, 0x23, 0xc7, 0xc2, 0x2f, 0x56, 0xf9, 0x2b, 0x97, - 0xa8, 0xfc, 0x68, 0x13, 0xa6, 0xf1, 0x49, 0xcf, 0x1e, 0x9a, 0x58, 0x0f, 0xb8, 0x4f, 0x72, 0xee, - 0xb7, 0x24, 0xdc, 0xe3, 0x2e, 0x55, 0x17, 0x44, 0x3b, 0x3c, 0x05, 0xbc, 0x52, 0x60, 0x31, 0xd0, - 0x33, 0xb6, 0x7d, 0xe3, 0xcd, 0xaa, 0x3a, 0x52, 0x63, 0xe9, 0x22, 0x6a, 0xd4, 0xfe, 0x50, 0x82, - 0xa6, 0x38, 0x20, 0xeb, 0xf7, 0xd8, 0x12, 0xba, 0x01, 0xd5, 0x51, 0xac, 0x07, 0x2d, 0xc3, 0x08, - 0x80, 0x96, 0xa0, 0x16, 0xb3, 0x9f, 0x90, 0x34, 0x0e, 0xca, 0x25, 0x6e, 0x58, 0x60, 0x4b, 0xb1, - 0x02, 0x7b, 0x13, 0xe0, 0xd0, 0x1e, 0xd2, 0x23, 0xdd, 0xb7, 0x1c, 0x2c, 0xda, 0x9c, 0x2a, 0x87, - 0x1c, 0x58, 0x0e, 0x46, 0xf7, 0xa1, 0xde, 0xb5, 0x5c, 0x9b, 0xf4, 0xf5, 0x81, 0xe1, 0x1f, 0x51, - 0x9e, 0x85, 0xe4, 0x16, 0x7b, 0x60, 0x61, 0xdb, 0xdc, 0xe0, 0xb8, 0x9d, 0x5a, 0x40, 0xb3, 0xc7, - 0x48, 0xd0, 0x2d, 0xa8, 0xb9, 0x43, 0x47, 0x27, 0x87, 0xba, 0x47, 0x8e, 0x29, 0xcf, 0x4c, 0xc5, - 0x4e, 0xd5, 0x1d, 0x3a, 0x8f, 0x0e, 0x3b, 0xe4, 0x98, 0xa2, 0x1f, 0x42, 0x95, 0x15, 0x05, 0x6a, - 0x93, 0x7e, 0x98, 0x65, 0xce, 0xdb, 0x7f, 0x44, 0xc0, 0xa8, 0x4d, 0xe6, 0x08, 0x9c, 0xba, 0x9a, - 0x8f, 0x3a, 0x22, 0x40, 0xb7, 0xa1, 0xd1, 0x23, 0xce, 0xc0, 0xe0, 0x1a, 0x7a, 0xe0, 0x11, 0xa7, - 0x05, 0x3c, 0x5a, 0x52, 0x50, 0xf4, 0x1e, 0xd4, 0xb1, 0x6b, 0x74, 0x6d, 0xe6, 0xb8, 0x26, 0x3e, - 0x69, 0xd5, 0x96, 0x94, 0x3b, 0x53, 0x9d, 0x5a, 0x00, 0xdb, 0x61, 0x20, 0xf4, 0x08, 0xd4, 0xe0, - 0xd2, 0xce, 0x14, 0x25, 0xfc, 0xbb, 0xce, 0xe5, 0x59, 0x4e, 0x67, 0x61, 0x13, 0x9f, 0xac, 0x72, - 0xa2, 0x07, 0x96, 0x8d, 0x99, 0x92, 0xb8, 0x73, 0x34, 0xf8, 0x42, 0xf8, 0x4b, 0xb5, 0x97, 0x05, - 0x98, 0x65, 0xee, 0x11, 0x26, 0xd1, 0xcb, 0xbb, 0xf8, 0x4d, 0x00, 0x93, 0xfa, 0x7a, 0xc2, 0xcd, - 0xab, 0x26, 0xf5, 0x77, 0x03, 0x4f, 0xff, 0x24, 0xf4, 0xe2, 0x62, 0x76, 0xdb, 0x92, 0x72, 0xd7, - 0xf1, 0x84, 0x70, 0x99, 0x0b, 0x19, 0x4b, 0xc5, 0x94, 0x0c, 0xbd, 0x1e, 0xd6, 0x13, 0x6d, 0x76, - 0x3d, 0x00, 0xee, 0xca, 0x03, 0xb1, 0x22, 0x69, 0x4f, 0xfe, 0xa1, 0xc0, 0x82, 0xb8, 0x53, 0x5c, - 0x5d, 0x5d, 0x59, 0x19, 0x21, 0x0c, 0x9f, 0xe2, 0x19, 0xfd, 0x69, 0x29, 0x47, 0x42, 0x2e, 0x4b, - 0x12, 0x72, 0xb2, 0x47, 0xab, 0xa4, 0x7b, 0x34, 0xed, 0xb7, 0x0a, 0x2c, 0x7c, 0x6a, 0xb8, 0x26, - 0x39, 0x3c, 0xbc, 0xfa, 0x01, 0x37, 0xa1, 0x4e, 0x47, 0xf9, 0x35, 0x77, 0x0f, 0x96, 0x20, 0xd2, - 0xbe, 0x2a, 0x00, 0x62, 0xee, 0xb0, 0x61, 0xd8, 0x86, 0xdb, 0xc3, 0x97, 0x97, 0x66, 0x19, 0x1a, - 0x09, 0x27, 0x88, 0x5e, 0x80, 0xe2, 0x5e, 0x40, 0xd1, 0x67, 0xd0, 0xe8, 0x06, 0xac, 0x74, 0x0f, - 0x1b, 0x94, 0xb8, 0xdc, 0x0e, 0x8d, 0xf5, 0x0f, 0x64, 0x62, 0x1f, 0x78, 0x56, 0xbf, 0x8f, 0xbd, - 0x4d, 0xe2, 0x9a, 0x41, 0x97, 0x33, 0xdd, 0x0d, 0xc5, 0x64, 0xa4, 0xe8, 0x5d, 0xa8, 0x8d, 0x22, - 0x22, 0x2c, 0x91, 0x10, 0x85, 0x04, 0x45, 0x1f, 0xc1, 0x4c, 0xba, 0x03, 0x0b, 0x0d, 0xa7, 0xd2, - 0x64, 0xf3, 0x45, 0xb5, 0x5f, 0x02, 0x8a, 0x8a, 0x3e, 0x2f, 0x4d, 0x3c, 0xa3, 0xe7, 0xb9, 0x07, - 0xde, 0x80, 0xaa, 0x19, 0x52, 0x8a, 0x5b, 0xd9, 0x08, 0xc0, 0xc2, 0x23, 0x90, 0x50, 0xb7, 0x89, - 0x61, 0x62, 0x33, 0x4c, 0xea, 0x01, 0xf0, 0x73, 0x0e, 0xd3, 0xbe, 0x2e, 0x80, 0x1a, 0x6f, 0xaa, - 0x72, 0xf3, 0xbe, 0x9e, 0x5b, 0xe1, 0x19, 0x1d, 0x64, 0xe9, 0x0a, 0x1d, 0xe4, 0x78, 0x87, 0x5b, - 0xbe, 0x5c, 0x87, 0xab, 0xfd, 0x5e, 0x81, 0x66, 0xea, 0x12, 0x96, 0x2e, 0xaf, 0xca, 0x78, 0x79, - 0xfd, 0x3e, 0x94, 0x59, 0xcd, 0xc1, 0x5c, 0x49, 0x8d, 0x34, 0x5b, 0xd9, 0xd5, 0xae, 0x13, 0x10, - 0xa0, 0x35, 0x98, 0x95, 0x3c, 0xb5, 0x09, 0x53, 0xa2, 0xf1, 0x97, 0x36, 0xed, 0xaf, 0x25, 0xa8, - 0xc5, 0xf4, 0x71, 0x4e, 0x67, 0x90, 0xb6, 0x74, 0x41, 0x62, 0xe9, 0xd4, 0xf1, 0x8a, 0xe3, 0xc7, - 0xcb, 0x78, 0x92, 0x42, 0x8b, 0x30, 0xe5, 0x60, 0x47, 0xa7, 0xd6, 0x8b, 0xb0, 0x37, 0x98, 0x74, - 0xb0, 0xb3, 0x6f, 0xbd, 0xc0, 0x6c, 0x89, 0x95, 0x75, 0x5e, 0xd3, 0x83, 0x94, 0x3c, 0xe9, 0x0e, - 0x1d, 0x5e, 0xd1, 0x6f, 0x02, 0x04, 0xa5, 0xd0, 0x35, 0x1c, 0xcc, 0x0b, 0x7e, 0xb5, 0x53, 0xe5, - 0x90, 0x5d, 0xc3, 0xc1, 0xa8, 0x05, 0x93, 0xfc, 0x67, 0x67, 0xab, 0x35, 0x15, 0x10, 0x8a, 0xdf, - 0x64, 0x38, 0x54, 0xd3, 0xe1, 0x90, 0xb7, 0x58, 0xdf, 0x85, 0xd9, 0x1e, 0x7f, 0x41, 0x31, 0x37, - 0x4e, 0x37, 0xa3, 0x25, 0x51, 0xb3, 0x65, 0x4b, 0xe8, 0x01, 0x73, 0x2e, 0xae, 0x51, 0x3d, 0xb0, - 0x72, 0x9d, 0x5b, 0xf9, 0x3d, 0xf9, 0x4d, 0x36, 0xc0, 0x0c, 0x8c, 0x1c, 0xe6, 0x44, 0xfe, 0x37, - 0xd6, 0x26, 0x4c, 0xe7, 0x6b, 0x13, 0x1a, 0x57, 0x69, 0x13, 0xbe, 0x2a, 0x42, 0x63, 0x54, 0x60, - 0x73, 0x47, 0x7f, 0x9e, 0x57, 0xe2, 0x5d, 0x50, 0x47, 0x0f, 0x1c, 0x5c, 0x31, 0x67, 0xf6, 0x08, - 0xe9, 0xa7, 0x8d, 0xe6, 0x20, 0x15, 0x66, 0x9f, 0x40, 0x95, 0x25, 0x32, 0xdd, 0x3f, 0x1d, 0x60, - 0xee, 0x68, 0x8d, 0x74, 0x81, 0x08, 0x36, 0x62, 0x99, 0xed, 0xe0, 0x74, 0x80, 0x3b, 0x53, 0xb6, - 0xf8, 0xba, 0xe2, 0x9b, 0x23, 0xba, 0x07, 0xf3, 0x5e, 0xd0, 0x1e, 0x98, 0x7a, 0xe2, 0xd8, 0x41, - 0xa5, 0x9d, 0x0b, 0x17, 0xf7, 0xe2, 0xc7, 0xcf, 0x88, 0xdc, 0xc9, 0xcc, 0xc8, 0xfd, 0x8f, 0x02, - 0x33, 0xc2, 0x3b, 0x98, 0xcf, 0xf6, 0xf9, 0x8d, 0x85, 0xe5, 0x59, 0xe2, 0xda, 0x96, 0x1b, 0x35, - 0x39, 0xc2, 0x1c, 0x01, 0x50, 0x34, 0x39, 0x9f, 0x42, 0x53, 0x20, 0x45, 0xe9, 0x32, 0x67, 0x55, - 0x6e, 0x04, 0x74, 0x51, 0xa2, 0x5c, 0x86, 0x06, 0x39, 0x3c, 0x8c, 0xf3, 0x0b, 0xe2, 0x7d, 0x5a, - 0x40, 0x05, 0xc3, 0x9f, 0x80, 0x1a, 0xa2, 0x5d, 0x34, 0x41, 0x37, 0x05, 0x61, 0x74, 0xbb, 0xff, - 0xb5, 0x02, 0xad, 0x64, 0xba, 0x8e, 0x1d, 0xff, 0xe2, 0x0d, 0xc1, 0x0f, 0x92, 0x6f, 0x43, 0xcb, - 0x67, 0xc8, 0x33, 0xe2, 0x23, 0x3a, 0xd2, 0x95, 0x17, 0xd0, 0x48, 0xfa, 0x21, 0xaa, 0xc3, 0xd4, - 0x2e, 0xf1, 0x7f, 0x7c, 0x62, 0x51, 0x5f, 0x9d, 0x40, 0x0d, 0x80, 0x5d, 0xe2, 0xef, 0x79, 0x98, - 0x62, 0xd7, 0x57, 0x15, 0x04, 0x50, 0x79, 0xe4, 0x6e, 0x59, 0xf4, 0x99, 0x5a, 0x40, 0xb3, 0xa2, - 0x32, 0x18, 0xf6, 0x8e, 0x30, 0xae, 0x5a, 0x64, 0xe4, 0xd1, 0x5f, 0x09, 0xa9, 0x50, 0x8f, 0x50, - 0xb6, 0xf7, 0x1e, 0xab, 0x65, 0x54, 0x85, 0x72, 0xf0, 0x59, 0x59, 0x31, 0x41, 0x4d, 0x37, 0x1e, - 0x6c, 0xcf, 0xc7, 0xee, 0x67, 0x2e, 0x39, 0x8e, 0x40, 0xea, 0x04, 0xaa, 0xc1, 0xa4, 0x68, 0xe6, - 0x54, 0x05, 0x35, 0xa1, 0x16, 0xeb, 0xa3, 0xd4, 0x02, 0x03, 0x6c, 0x7b, 0x83, 0x9e, 0xe8, 0xa8, - 0x02, 0x11, 0x98, 0xd5, 0xb6, 0xc8, 0xb1, 0xab, 0x96, 0x56, 0xee, 0xc3, 0x54, 0x18, 0x20, 0xec, - 0x34, 0xc1, 0xee, 0xec, 0x4f, 0x9d, 0x40, 0x33, 0x30, 0x9d, 0x98, 0x20, 0xa8, 0x0a, 0x42, 0xd0, - 0xb0, 0x13, 0x63, 0x1b, 0xb5, 0xb0, 0xfe, 0xf7, 0x1a, 0x40, 0xd0, 0x33, 0x10, 0xe2, 0x99, 0x68, - 0x00, 0x68, 0x1b, 0xfb, 0x2c, 0x1f, 0x12, 0x37, 0xcc, 0x65, 0x14, 0xdd, 0xcd, 0x28, 0xad, 0xe3, - 0xa8, 0x42, 0xd2, 0xf6, 0xed, 0x0c, 0x8a, 0x14, 0xba, 0x36, 0x81, 0x1c, 0xce, 0x91, 0xdd, 0x30, - 0x0f, 0xac, 0xde, 0xb3, 0xa8, 0xd9, 0xc8, 0xe6, 0x98, 0x42, 0x0d, 0x39, 0xa6, 0xf2, 0x90, 0xf8, - 0xd9, 0xf7, 0x3d, 0xcb, 0xed, 0x87, 0x0f, 0x94, 0xda, 0x04, 0x7a, 0x0e, 0x73, 0xdb, 0x98, 0x73, - 0xb7, 0xa8, 0x6f, 0xf5, 0x68, 0xc8, 0x70, 0x3d, 0x9b, 0xe1, 0x18, 0xf2, 0x05, 0x59, 0xda, 0xd0, - 0x4c, 0x8d, 0x49, 0xd1, 0x8a, 0xd4, 0x91, 0xa5, 0x23, 0xdd, 0xf6, 0x47, 0xb9, 0x70, 0x23, 0x6e, - 0x16, 0x34, 0x92, 0x23, 0x44, 0xf4, 0xcd, 0xac, 0x0d, 0xc6, 0x66, 0x2e, 0xed, 0x95, 0x3c, 0xa8, - 0x11, 0xab, 0x27, 0xd0, 0x48, 0x0e, 0xa9, 0xe4, 0xac, 0xa4, 0x83, 0xac, 0xf6, 0x59, 0x6f, 0xc3, - 0xda, 0x04, 0xfa, 0x05, 0xcc, 0x8c, 0x4d, 0x86, 0xd0, 0xb7, 0x64, 0xdb, 0x67, 0x0d, 0x90, 0xce, - 0xe3, 0x20, 0xa4, 0x1f, 0x69, 0x31, 0x5b, 0xfa, 0xb1, 0x11, 0x61, 0x7e, 0xe9, 0x63, 0xdb, 0x9f, - 0x25, 0xfd, 0x85, 0x39, 0x0c, 0x01, 0x8d, 0xcf, 0x86, 0xd0, 0xc7, 0x32, 0x16, 0x99, 0xf3, 0xa9, - 0xf6, 0x6a, 0x5e, 0xf4, 0xc8, 0xe4, 0x43, 0x1e, 0xad, 0xe9, 0xa6, 0x59, 0xca, 0x36, 0x73, 0x1e, - 0x24, 0x67, 0x9b, 0x3d, 0x92, 0x09, 0x9c, 0x3a, 0x39, 0x72, 0x90, 0xdb, 0x4a, 0x3a, 0x26, 0x91, - 0x3b, 0xb5, 0x7c, 0x82, 0xa1, 0x4d, 0xa0, 0x83, 0x44, 0x0e, 0x46, 0xb7, 0xb3, 0x7c, 0x22, 0x79, - 0xd9, 0x3d, 0xcf, 0x5c, 0x3a, 0xc0, 0x36, 0xf6, 0x1f, 0x62, 0xdf, 0xb3, 0x7a, 0x34, 0xbd, 0xa9, - 0xf8, 0x19, 0x21, 0x84, 0x9b, 0x7e, 0x78, 0x2e, 0x5e, 0x28, 0xf6, 0xfa, 0x4b, 0x80, 0x2a, 0xb7, - 0x19, 0x2b, 0x0f, 0xff, 0x4f, 0xe3, 0xd7, 0x90, 0xc6, 0x9f, 0x42, 0x33, 0x35, 0x6f, 0x92, 0xa7, - 0x71, 0xf9, 0x50, 0xea, 0x3c, 0x07, 0xe9, 0x02, 0x1a, 0x9f, 0x8a, 0xc8, 0x03, 0x2b, 0x73, 0x7a, - 0x72, 0x1e, 0x8f, 0xa7, 0xd0, 0x4c, 0x8d, 0x25, 0xe4, 0x27, 0x90, 0xcf, 0x2e, 0x72, 0x9c, 0x60, - 0xfc, 0x31, 0x5e, 0x7e, 0x82, 0xcc, 0x47, 0xfb, 0xf3, 0x78, 0x7c, 0x01, 0xf5, 0xf8, 0x3b, 0x28, - 0xfa, 0x30, 0x2b, 0x3a, 0x53, 0x2f, 0x63, 0x6f, 0x3e, 0x5f, 0x5f, 0x7f, 0x3d, 0x7b, 0x0a, 0xcd, - 0xd4, 0xbb, 0xa7, 0xdc, 0xba, 0xf2, 0xc7, 0xd1, 0xf3, 0x76, 0x7f, 0x8d, 0x19, 0xf8, 0xba, 0x73, - 0xe5, 0xc6, 0x77, 0x9e, 0xac, 0xf7, 0x2d, 0xff, 0x68, 0xd8, 0x65, 0xa7, 0x5c, 0x0b, 0x30, 0x3f, - 0xb6, 0x88, 0xf8, 0x5a, 0x0b, 0x93, 0xc6, 0x1a, 0xdf, 0x69, 0x8d, 0x4b, 0x3b, 0xe8, 0x76, 0x2b, - 0xfc, 0xf7, 0xde, 0x7f, 0x03, 0x00, 0x00, 0xff, 0xff, 0x78, 0x64, 0xb3, 0xad, 0x04, 0x28, 0x00, + 0xd5, 0x9a, 0x2f, 0x7b, 0xde, 0x8c, 0x67, 0xe4, 0x4e, 0x62, 0xc6, 0x43, 0x92, 0xf5, 0x6a, 0xd7, + 0xd9, 0xe0, 0x65, 0xed, 0xe0, 0x40, 0x15, 0x5b, 0xc0, 0x21, 0xb6, 0x89, 0xd7, 0xec, 0xc6, 0x31, + 0x63, 0x67, 0x29, 0x52, 0xa9, 0x12, 0x9a, 0x51, 0x7b, 0xac, 0x8a, 0xa4, 0x9e, 0xa8, 0x35, 0x6b, + 0x3b, 0x5c, 0x39, 0x2c, 0x07, 0x8a, 0x3f, 0x40, 0x71, 0x01, 0x0a, 0x52, 0xc5, 0xfe, 0x87, 0x5c, + 0xf8, 0x1b, 0x14, 0x07, 0xf8, 0x09, 0x1c, 0xa9, 0xa2, 0xba, 0xd5, 0xd2, 0xe8, 0xa3, 0x65, 0xcb, + 0x76, 0x79, 0x93, 0xa2, 0xb8, 0x49, 0xaf, 0xdf, 0xeb, 0xf7, 0xfa, 0x7d, 0x77, 0x3f, 0x98, 0x7b, + 0x31, 0xc6, 0xde, 0x89, 0x3e, 0x20, 0xc4, 0x33, 0x57, 0x46, 0x1e, 0xf1, 0x09, 0x42, 0x8e, 0x65, + 0x7f, 0x31, 0xa6, 0xc1, 0xdf, 0x0a, 0x5f, 0xef, 0x36, 0x07, 0xc4, 0x71, 0x88, 0x1b, 0xc0, 0xba, + 0xcd, 0x38, 0x46, 0xb7, 0x65, 0xb9, 0x3e, 0xf6, 0x5c, 0xc3, 0x0e, 0x57, 0xe9, 0xe0, 0x10, 0x3b, + 0x86, 0xf8, 0x53, 0x4d, 0xc3, 0x37, 0xe2, 0xfb, 0x77, 0xe7, 0x2c, 0xd7, 0xc4, 0xc7, 0x71, 0x90, + 0xf6, 0x2b, 0x05, 0xe6, 0xf7, 0x0e, 0xc9, 0xd1, 0x06, 0xb1, 0x6d, 0x3c, 0xf0, 0x2d, 0xe2, 0xd2, + 0x1e, 0x7e, 0x31, 0xc6, 0xd4, 0x47, 0xf7, 0xa0, 0xd2, 0x37, 0x28, 0xee, 0x28, 0x8b, 0xca, 0xdd, + 0xc6, 0xda, 0xcd, 0x95, 0x84, 0x70, 0x42, 0xaa, 0x47, 0x74, 0xb8, 0x6e, 0x50, 0xdc, 0xe3, 0x98, + 0x08, 0x41, 0xc5, 0xec, 0x6f, 0x6f, 0x76, 0x4a, 0x8b, 0xca, 0xdd, 0x72, 0x8f, 0x7f, 0xa3, 0xf7, + 0x61, 0x76, 0x10, 0xed, 0xbd, 0xbd, 0x49, 0x3b, 0xe5, 0xc5, 0xf2, 0xdd, 0x72, 0x2f, 0x09, 0xd4, + 0xfe, 0xac, 0xc0, 0x37, 0x32, 0x62, 0xd0, 0x11, 0x71, 0x29, 0x46, 0xf7, 0xa1, 0x46, 0x7d, 0xc3, + 0x1f, 0x53, 0x21, 0xc9, 0x37, 0xa5, 0x92, 0xec, 0x71, 0x94, 0x9e, 0x40, 0xcd, 0xb2, 0x2d, 0x49, + 0xd8, 0xa2, 0xef, 0xc0, 0x75, 0xcb, 0x7d, 0x84, 0x1d, 0xe2, 0x9d, 0xe8, 0x23, 0xec, 0x0d, 0xb0, + 0xeb, 0x1b, 0x43, 0x1c, 0xca, 0x78, 0x2d, 0x5c, 0xdb, 0x9d, 0x2c, 0x69, 0x7f, 0x52, 0xe0, 0x06, + 0x93, 0x74, 0xd7, 0xf0, 0x7c, 0xeb, 0x0a, 0xf4, 0xa5, 0x41, 0x33, 0x2e, 0x63, 0xa7, 0xcc, 0xd7, + 0x12, 0x30, 0x86, 0x33, 0x0a, 0xd9, 0xb3, 0xb3, 0x55, 0xb8, 0xb8, 0x09, 0x98, 0xf6, 0x47, 0x61, + 0xd8, 0xb8, 0x9c, 0x97, 0x51, 0x68, 0x9a, 0x67, 0x29, 0xcb, 0xf3, 0x22, 0xea, 0x7c, 0xad, 0xc0, + 0x8d, 0xcf, 0x88, 0x61, 0x4e, 0x0c, 0xff, 0xf5, 0xab, 0xf3, 0x47, 0x50, 0x0b, 0x02, 0xa7, 0x53, + 0xe1, 0xbc, 0x96, 0x92, 0xbc, 0x44, 0x50, 0x4d, 0x24, 0xdc, 0xe3, 0x80, 0x9e, 0x20, 0xd2, 0x7e, + 0xa7, 0x40, 0xa7, 0x87, 0x6d, 0x6c, 0x50, 0xfc, 0x26, 0x4f, 0x31, 0x0f, 0x35, 0x97, 0x98, 0x78, + 0x7b, 0x93, 0x9f, 0xa2, 0xdc, 0x13, 0x7f, 0xda, 0x3f, 0x85, 0x86, 0xdf, 0x72, 0x87, 0x8d, 0x59, + 0xa1, 0x7a, 0x11, 0x2b, 0xbc, 0x9e, 0x58, 0xe1, 0x6d, 0x3f, 0xe9, 0xc4, 0x52, 0xd5, 0x84, 0xa5, + 0x7e, 0x0e, 0x0b, 0x1b, 0x1e, 0x36, 0x7c, 0xfc, 0x53, 0x96, 0xf9, 0x37, 0x0e, 0x0d, 0xd7, 0xc5, + 0x76, 0x78, 0x84, 0x34, 0x73, 0x45, 0xc2, 0xbc, 0x03, 0xd3, 0x23, 0x8f, 0x1c, 0x9f, 0x44, 0x72, + 0x87, 0xbf, 0xda, 0x5f, 0x14, 0xe8, 0xca, 0xf6, 0xbe, 0x4c, 0x46, 0x78, 0x0f, 0x66, 0x45, 0x09, + 0x0b, 0x76, 0xe3, 0x3c, 0xeb, 0xbd, 0xe6, 0x8b, 0x18, 0x07, 0x74, 0x0f, 0xae, 0x07, 0x48, 0x1e, + 0xa6, 0x63, 0xdb, 0x8f, 0x70, 0xcb, 0x1c, 0x17, 0xf1, 0xb5, 0x1e, 0x5f, 0x12, 0x14, 0xda, 0x2b, + 0x05, 0x16, 0xb6, 0xb0, 0x1f, 0x19, 0x91, 0x71, 0xc5, 0x6f, 0x69, 0x92, 0xfd, 0x4a, 0x81, 0xae, + 0x4c, 0xd6, 0xcb, 0xa8, 0xf5, 0x29, 0xcc, 0x47, 0x3c, 0x74, 0x13, 0xd3, 0x81, 0x67, 0x8d, 0xb8, + 0x33, 0xf3, 0x94, 0xdb, 0x58, 0x7b, 0x6f, 0x25, 0xdb, 0x25, 0xac, 0xa4, 0x25, 0xb8, 0x11, 0x6d, + 0xb1, 0x19, 0xdb, 0x41, 0xfb, 0x8d, 0x02, 0x37, 0xb6, 0xb0, 0xbf, 0x87, 0x87, 0x0e, 0x76, 0xfd, + 0x6d, 0xf7, 0x80, 0x5c, 0x5c, 0xaf, 0xb7, 0x01, 0xa8, 0xd8, 0x27, 0x2a, 0x07, 0x31, 0x48, 0x11, + 0x1d, 0xf3, 0xee, 0x23, 0x2d, 0xcf, 0x65, 0x74, 0xf7, 0x3d, 0xa8, 0x5a, 0xee, 0x01, 0x09, 0x55, + 0xf5, 0x8e, 0x4c, 0x55, 0x71, 0x66, 0x01, 0xb6, 0xf6, 0x9f, 0x12, 0xcc, 0x3f, 0x30, 0x4d, 0x59, + 0xd8, 0x9d, 0x5f, 0x2f, 0x93, 0xe8, 0x2e, 0xc5, 0xa3, 0xbb, 0x90, 0xcf, 0x65, 0x42, 0xaa, 0x72, + 0x8e, 0x90, 0xaa, 0xe6, 0x85, 0x14, 0xda, 0x82, 0x59, 0x8a, 0xf1, 0x73, 0x7d, 0x44, 0x28, 0xf7, + 0x89, 0x4e, 0x8d, 0x9f, 0x46, 0x4b, 0x9e, 0x26, 0x6a, 0x26, 0x1f, 0xd1, 0xe1, 0xae, 0xc0, 0xec, + 0x35, 0x19, 0x61, 0xf8, 0x87, 0x9e, 0xc0, 0xfc, 0xd0, 0x26, 0x7d, 0xc3, 0xd6, 0x29, 0x36, 0x6c, + 0x6c, 0xea, 0xc2, 0xde, 0xb4, 0x33, 0x5d, 0x4c, 0xe1, 0xd7, 0x03, 0xf2, 0x3d, 0x4e, 0x2d, 0x16, + 0xa8, 0xf6, 0x0f, 0x05, 0x16, 0x7a, 0xd8, 0x21, 0x5f, 0xe0, 0xff, 0x55, 0x13, 0x68, 0xff, 0x2a, + 0xc1, 0xfc, 0xcf, 0x0c, 0x7f, 0x70, 0xb8, 0xe9, 0x08, 0x10, 0x7d, 0x33, 0xe7, 0x2b, 0x52, 0xa0, + 0xa2, 0x30, 0xaa, 0xca, 0xac, 0xca, 0xae, 0x15, 0x2b, 0x9f, 0x8b, 0x23, 0xc7, 0xc2, 0x28, 0x56, + 0xc1, 0x6b, 0x17, 0xa8, 0xe0, 0x68, 0x03, 0x66, 0xf1, 0xf1, 0xc0, 0x1e, 0x9b, 0x58, 0x0f, 0xb8, + 0x07, 0x3e, 0x75, 0x5b, 0xc2, 0x3d, 0xee, 0x52, 0x4d, 0x41, 0xb4, 0xcd, 0x43, 0xf9, 0xb5, 0x02, + 0x0b, 0x81, 0x9e, 0xb1, 0xed, 0x1b, 0x6f, 0x56, 0xd5, 0x91, 0x1a, 0x2b, 0xe7, 0x51, 0xa3, 0xf6, + 0x87, 0x0a, 0xb4, 0xc5, 0x01, 0x59, 0xdf, 0xc6, 0x96, 0xd0, 0x4d, 0xa8, 0x47, 0xa9, 0x55, 0x94, + 0xfe, 0x09, 0x00, 0x2d, 0x42, 0x23, 0x66, 0x3f, 0x21, 0x69, 0x1c, 0x54, 0x48, 0xdc, 0xb0, 0x50, + 0x56, 0x62, 0x85, 0xf2, 0x16, 0xc0, 0x81, 0x3d, 0xa6, 0x87, 0xba, 0x6f, 0x39, 0x58, 0xb4, 0x2b, + 0x75, 0x0e, 0xd9, 0xb7, 0x1c, 0x8c, 0x1e, 0x40, 0xb3, 0x6f, 0xb9, 0x36, 0x19, 0xea, 0x23, 0xc3, + 0x3f, 0xa4, 0x9d, 0x5a, 0xae, 0xc5, 0x1e, 0x5a, 0xd8, 0x36, 0xd7, 0x39, 0x6e, 0xaf, 0x11, 0xd0, + 0xec, 0x32, 0x12, 0x74, 0x1b, 0x1a, 0xee, 0xd8, 0xd1, 0xc9, 0x81, 0xee, 0x91, 0x23, 0x66, 0x73, + 0xce, 0xc2, 0x1d, 0x3b, 0x8f, 0x0f, 0x7a, 0xe4, 0x88, 0xa2, 0x1f, 0x42, 0x9d, 0x25, 0x77, 0x6a, + 0x93, 0x21, 0xed, 0xcc, 0x14, 0xda, 0x7f, 0x42, 0xc0, 0xa8, 0x4d, 0xe6, 0x08, 0x9c, 0xba, 0x5e, + 0x8c, 0x3a, 0x22, 0x40, 0x77, 0xa0, 0x35, 0x20, 0xce, 0xc8, 0xe0, 0x1a, 0x7a, 0xe8, 0x11, 0xa7, + 0x03, 0x3c, 0x5a, 0x52, 0x50, 0xf4, 0x2e, 0x34, 0xb1, 0x6b, 0xf4, 0x6d, 0xe6, 0xb8, 0x26, 0x3e, + 0xee, 0x34, 0x16, 0x95, 0xbb, 0x33, 0xbd, 0x46, 0x00, 0xdb, 0x66, 0x20, 0xf4, 0x18, 0xd4, 0xe0, + 0xf2, 0xcd, 0x14, 0x25, 0xfc, 0xbb, 0xc9, 0xe5, 0x59, 0x4a, 0x67, 0x61, 0x13, 0x1f, 0xaf, 0x70, + 0xa2, 0x87, 0x96, 0x8d, 0x99, 0x92, 0xb8, 0x73, 0xb4, 0xf8, 0x42, 0xf8, 0x4b, 0xb5, 0x57, 0x25, + 0xb8, 0xc6, 0xdc, 0x23, 0x4c, 0xa2, 0x17, 0x77, 0xf1, 0x5b, 0x00, 0x26, 0xf5, 0xf5, 0x84, 0x9b, + 0xd7, 0x4d, 0xea, 0xef, 0x04, 0x9e, 0xfe, 0x71, 0xe8, 0xc5, 0xe5, 0xfc, 0xf6, 0x23, 0xe5, 0xae, + 0xd9, 0x84, 0x70, 0x91, 0x8b, 0x15, 0x4b, 0xc5, 0x94, 0x8c, 0xbd, 0x01, 0xd6, 0x13, 0xed, 0x72, + 0x33, 0x00, 0xee, 0xc8, 0x03, 0xb1, 0x26, 0x69, 0x33, 0xfe, 0xae, 0xc0, 0xbc, 0xb8, 0x1b, 0x5c, + 0x5e, 0x5d, 0x79, 0x19, 0x21, 0x0c, 0x9f, 0xf2, 0x29, 0x7d, 0x66, 0xa5, 0x40, 0x42, 0xae, 0x4a, + 0x12, 0x72, 0xb2, 0xd7, 0xaa, 0xa5, 0x7b, 0x2d, 0xed, 0xb7, 0x0a, 0xcc, 0x7f, 0x62, 0xb8, 0x26, + 0x39, 0x38, 0xb8, 0xfc, 0x01, 0x37, 0xa0, 0x49, 0x27, 0xf9, 0xb5, 0x70, 0x2f, 0x95, 0x20, 0xd2, + 0xbe, 0x2c, 0x01, 0x62, 0xee, 0xb0, 0x6e, 0xd8, 0x86, 0x3b, 0xc0, 0x17, 0x97, 0x66, 0x09, 0x5a, + 0x09, 0x27, 0x88, 0x5e, 0x72, 0xe2, 0x5e, 0x40, 0xd1, 0xa7, 0xd0, 0xea, 0x07, 0xac, 0x74, 0x0f, + 0x1b, 0x94, 0xb8, 0xdc, 0x0e, 0xad, 0xb5, 0xf7, 0x65, 0x62, 0xef, 0x7b, 0xd6, 0x70, 0x88, 0xbd, + 0x0d, 0xe2, 0x9a, 0x41, 0x97, 0x33, 0xdb, 0x0f, 0xc5, 0x64, 0xa4, 0xe8, 0x1d, 0x68, 0x4c, 0x22, + 0x22, 0x2c, 0x91, 0x10, 0x85, 0x04, 0x45, 0x1f, 0xc2, 0x5c, 0xb2, 0x01, 0x9a, 0x18, 0x4e, 0xa5, + 0xf1, 0xde, 0x86, 0x19, 0xe7, 0x97, 0x80, 0xa2, 0xa2, 0xcf, 0x4b, 0x13, 0xcf, 0xe8, 0x45, 0xee, + 0x73, 0x37, 0xa1, 0x6e, 0x86, 0x94, 0xe2, 0x76, 0x35, 0x01, 0xb0, 0xf0, 0x08, 0x24, 0xd4, 0x6d, + 0x62, 0x98, 0xd8, 0x0c, 0x93, 0x7a, 0x00, 0xfc, 0x8c, 0xc3, 0xb4, 0xaf, 0x4a, 0xa0, 0xc6, 0x9b, + 0xaa, 0xc2, 0xbc, 0xaf, 0xe6, 0x76, 0x77, 0x4a, 0x07, 0x59, 0xb9, 0x44, 0x07, 0x99, 0xed, 0x70, + 0xab, 0x17, 0xeb, 0x70, 0xb5, 0xdf, 0x2b, 0xd0, 0x4e, 0x5d, 0xa6, 0xd2, 0xe5, 0x55, 0xc9, 0x96, + 0xd7, 0xef, 0x43, 0x95, 0xd5, 0x1c, 0xcc, 0x95, 0xd4, 0x4a, 0xb3, 0x95, 0x5d, 0xd1, 0x7a, 0x01, + 0x01, 0x5a, 0x85, 0x6b, 0x92, 0x27, 0x33, 0x61, 0x4a, 0x94, 0x7d, 0x31, 0xd3, 0xfe, 0x5a, 0x81, + 0x46, 0x4c, 0x1f, 0x67, 0x74, 0x06, 0x69, 0x4b, 0x97, 0x24, 0x96, 0x4e, 0x1d, 0xaf, 0x9c, 0x3d, + 0x5e, 0xce, 0xd3, 0x12, 0x5a, 0x80, 0x19, 0x07, 0x3b, 0x3a, 0xb5, 0x5e, 0x86, 0xbd, 0xc1, 0xb4, + 0x83, 0x9d, 0x3d, 0xeb, 0x25, 0x66, 0x4b, 0xac, 0xac, 0xf3, 0x9a, 0x1e, 0xa4, 0xe4, 0x69, 0x77, + 0xec, 0xf0, 0x8a, 0x7e, 0x0b, 0x20, 0x28, 0x85, 0xae, 0xe1, 0x60, 0x5e, 0xf0, 0xeb, 0xbd, 0x3a, + 0x87, 0xec, 0x18, 0x0e, 0x46, 0x1d, 0x98, 0xe6, 0x3f, 0xdb, 0x9b, 0x9d, 0x99, 0x80, 0x50, 0xfc, + 0x26, 0xc3, 0xa1, 0x9e, 0x0e, 0x87, 0xa2, 0xc5, 0xfa, 0x1e, 0x5c, 0x1b, 0xf0, 0x97, 0x10, 0x73, + 0xfd, 0x64, 0x23, 0x5a, 0x12, 0x35, 0x5b, 0xb6, 0x84, 0x1e, 0x32, 0xe7, 0xe2, 0x1a, 0xd5, 0x03, + 0x2b, 0x37, 0xb9, 0x95, 0xdf, 0x95, 0xdf, 0x48, 0x03, 0xcc, 0xc0, 0xc8, 0x61, 0x4e, 0xe4, 0x7f, + 0x99, 0x36, 0x61, 0xb6, 0x58, 0x9b, 0xd0, 0xba, 0x4c, 0x9b, 0xf0, 0x65, 0x19, 0x5a, 0x93, 0x02, + 0x5b, 0x38, 0xfa, 0x8b, 0xbc, 0xf6, 0xee, 0x80, 0x3a, 0x79, 0xa8, 0xe0, 0x8a, 0x39, 0xb5, 0x47, + 0x48, 0x3f, 0x51, 0xb4, 0x47, 0xa9, 0x30, 0xfb, 0x18, 0xea, 0x2c, 0x91, 0xe9, 0xfe, 0xc9, 0x08, + 0x73, 0x47, 0x6b, 0xa5, 0x0b, 0x44, 0xb0, 0x11, 0xcb, 0x6c, 0xfb, 0x27, 0x23, 0xdc, 0x9b, 0xb1, + 0xc5, 0xd7, 0x25, 0xdf, 0x0e, 0xd1, 0x7d, 0xb8, 0xe1, 0x05, 0xed, 0x81, 0xa9, 0x27, 0x8e, 0x1d, + 0x54, 0xda, 0xeb, 0xe1, 0xe2, 0x6e, 0xfc, 0xf8, 0x39, 0x91, 0x3b, 0x9d, 0x1b, 0xb9, 0xff, 0x56, + 0x60, 0x4e, 0x78, 0x07, 0xf3, 0xd9, 0x21, 0xbf, 0xb1, 0xb0, 0x3c, 0x4b, 0x5c, 0xdb, 0x72, 0xa3, + 0x26, 0x47, 0x98, 0x23, 0x00, 0x8a, 0x26, 0xe7, 0x13, 0x68, 0x0b, 0xa4, 0x28, 0x5d, 0x16, 0xac, + 0xca, 0xad, 0x80, 0x2e, 0x4a, 0x94, 0x4b, 0xd0, 0x22, 0x07, 0x07, 0x71, 0x7e, 0x41, 0xbc, 0xcf, + 0x0a, 0xa8, 0x60, 0xf8, 0x13, 0x50, 0x43, 0xb4, 0xf3, 0x26, 0xe8, 0xb6, 0x20, 0x8c, 0x6e, 0xf7, + 0xbf, 0x56, 0xa0, 0x93, 0x4c, 0xd7, 0xb1, 0xe3, 0x9f, 0xbf, 0x21, 0xf8, 0x41, 0xf2, 0x8d, 0x67, + 0xe9, 0x14, 0x79, 0x26, 0x7c, 0x44, 0x47, 0xba, 0xfc, 0x12, 0x5a, 0x49, 0x3f, 0x44, 0x4d, 0x98, + 0xd9, 0x21, 0xfe, 0x8f, 0x8f, 0x2d, 0xea, 0xab, 0x53, 0xa8, 0x05, 0xb0, 0x43, 0xfc, 0x5d, 0x0f, + 0x53, 0xec, 0xfa, 0xaa, 0x82, 0x00, 0x6a, 0x8f, 0xdd, 0x4d, 0x8b, 0x3e, 0x57, 0x4b, 0xe8, 0x9a, + 0xa8, 0x0c, 0x86, 0xbd, 0x2d, 0x8c, 0xab, 0x96, 0x19, 0x79, 0xf4, 0x57, 0x41, 0x2a, 0x34, 0x23, + 0x94, 0xad, 0xdd, 0x27, 0x6a, 0x15, 0xd5, 0xa1, 0x1a, 0x7c, 0xd6, 0x96, 0x4d, 0x50, 0xd3, 0x8d, + 0x07, 0xdb, 0xf3, 0x89, 0xfb, 0xa9, 0x4b, 0x8e, 0x22, 0x90, 0x3a, 0x85, 0x1a, 0x30, 0x2d, 0x9a, + 0x39, 0x55, 0x41, 0x6d, 0x68, 0xc4, 0xfa, 0x28, 0xb5, 0xc4, 0x00, 0x5b, 0xde, 0x68, 0x20, 0x3a, + 0xaa, 0x40, 0x04, 0x66, 0xb5, 0x4d, 0x72, 0xe4, 0xaa, 0x95, 0xe5, 0x07, 0x30, 0x13, 0x06, 0x08, + 0x3b, 0x4d, 0xb0, 0x3b, 0xfb, 0x53, 0xa7, 0xd0, 0x1c, 0xcc, 0x26, 0x26, 0x01, 0xaa, 0x82, 0x10, + 0xb4, 0xec, 0xc4, 0xf8, 0x45, 0x2d, 0xad, 0xfd, 0xad, 0x01, 0x10, 0xf4, 0x0c, 0x84, 0x78, 0x26, + 0x1a, 0x01, 0xda, 0xc2, 0x3e, 0xcb, 0x87, 0xc4, 0x0d, 0x73, 0x19, 0x45, 0xf7, 0x72, 0x4a, 0x6b, + 0x16, 0x55, 0x48, 0xda, 0xbd, 0x93, 0x43, 0x91, 0x42, 0xd7, 0xa6, 0x90, 0xc3, 0x39, 0xb2, 0x1b, + 0xe6, 0xbe, 0x35, 0x78, 0x1e, 0x35, 0x1b, 0xf9, 0x1c, 0x53, 0xa8, 0x21, 0xc7, 0x54, 0x1e, 0x12, + 0x3f, 0x7b, 0xbe, 0x67, 0xb9, 0xc3, 0xf0, 0xa1, 0x51, 0x9b, 0x42, 0x2f, 0xe0, 0xfa, 0x16, 0xe6, + 0xdc, 0x2d, 0xea, 0x5b, 0x03, 0x1a, 0x32, 0x5c, 0xcb, 0x67, 0x98, 0x41, 0x3e, 0x27, 0x4b, 0x1b, + 0xda, 0xa9, 0x71, 0x27, 0x5a, 0x96, 0x3a, 0xb2, 0x74, 0x34, 0xdb, 0xfd, 0xb0, 0x10, 0x6e, 0xc4, + 0xcd, 0x82, 0x56, 0x72, 0x14, 0x88, 0xbe, 0x95, 0xb7, 0x41, 0x66, 0x76, 0xd2, 0x5d, 0x2e, 0x82, + 0x1a, 0xb1, 0x7a, 0x0a, 0xad, 0xe4, 0xb0, 0x49, 0xce, 0x4a, 0x3a, 0x90, 0xea, 0x9e, 0xf6, 0xc6, + 0xab, 0x4d, 0xa1, 0x5f, 0xc0, 0x5c, 0x66, 0xc2, 0x83, 0xbe, 0x2d, 0xdb, 0x3e, 0x6f, 0x10, 0x74, + 0x16, 0x07, 0x21, 0xfd, 0x44, 0x8b, 0xf9, 0xd2, 0x67, 0x46, 0x7d, 0xc5, 0xa5, 0x8f, 0x6d, 0x7f, + 0x9a, 0xf4, 0xe7, 0xe6, 0x30, 0x06, 0x94, 0x9d, 0xf1, 0xa0, 0x8f, 0x64, 0x2c, 0x72, 0xe7, 0x4c, + 0xdd, 0x95, 0xa2, 0xe8, 0x91, 0xc9, 0xc7, 0x3c, 0x5a, 0xd3, 0x4d, 0xb3, 0x94, 0x6d, 0xee, 0x5c, + 0x47, 0xce, 0x36, 0x7f, 0xb4, 0x12, 0x38, 0x75, 0x72, 0x74, 0x20, 0xb7, 0x95, 0x74, 0xdc, 0x21, + 0x77, 0x6a, 0xf9, 0x24, 0x42, 0x9b, 0x42, 0xfb, 0x89, 0x1c, 0x8c, 0xee, 0xe4, 0xf9, 0x44, 0xf2, + 0xb2, 0x7b, 0x96, 0xb9, 0x74, 0x80, 0x2d, 0xec, 0x3f, 0xc2, 0xbe, 0x67, 0x0d, 0x68, 0x7a, 0x53, + 0xf1, 0x33, 0x41, 0x08, 0x37, 0xfd, 0xe0, 0x4c, 0xbc, 0x50, 0xec, 0xb5, 0x57, 0x00, 0x75, 0x6e, + 0x33, 0x56, 0x1e, 0xfe, 0x9f, 0xc6, 0xaf, 0x20, 0x8d, 0x3f, 0x83, 0x76, 0x6a, 0x6e, 0x24, 0x4f, + 0xe3, 0xf2, 0xe1, 0xd2, 0x59, 0x0e, 0xd2, 0x07, 0x94, 0x9d, 0x8a, 0xc8, 0x03, 0x2b, 0x77, 0x7a, + 0x72, 0x16, 0x8f, 0x67, 0xd0, 0x4e, 0x8d, 0x25, 0xe4, 0x27, 0x90, 0xcf, 0x2e, 0x0a, 0x9c, 0x20, + 0xfb, 0x18, 0x2f, 0x3f, 0x41, 0xee, 0xa3, 0xfd, 0x59, 0x3c, 0x3e, 0x87, 0x66, 0xfc, 0x1d, 0x14, + 0x7d, 0x90, 0x17, 0x9d, 0xa9, 0x97, 0xb1, 0x37, 0x9f, 0xaf, 0xaf, 0xbe, 0x9e, 0x3d, 0x83, 0x76, + 0xea, 0xdd, 0x53, 0x6e, 0x5d, 0xf9, 0xe3, 0xe8, 0x59, 0xbb, 0x7f, 0x8d, 0x19, 0xf8, 0xaa, 0x73, + 0xe5, 0xfa, 0x77, 0x9f, 0xae, 0x0d, 0x2d, 0xff, 0x70, 0xdc, 0x67, 0xa7, 0x5c, 0x0d, 0x30, 0x3f, + 0xb2, 0x88, 0xf8, 0x5a, 0x0d, 0x93, 0xc6, 0x2a, 0xdf, 0x69, 0x95, 0x4b, 0x3b, 0xea, 0xf7, 0x6b, + 0xfc, 0xf7, 0xfe, 0x7f, 0x03, 0x00, 0x00, 0xff, 0xff, 0xc8, 0x7f, 0xb6, 0xed, 0xcc, 0x27, 0x00, 0x00, } diff --git a/internal/querycoord/cluster.go b/internal/querycoord/cluster.go index 20c036bc97..02879407e3 100644 --- a/internal/querycoord/cluster.go +++ b/internal/querycoord/cluster.go @@ -340,9 +340,22 @@ func (c *queryNodeCluster) addQueryChannel(ctx context.Context, nodeID int64, in c.RUnlock() if targetNode != nil { - err := targetNode.addQueryChannel(ctx, in) + emptyChangeInfo := &querypb.SealedSegmentsChangeInfo{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_SealedSegmentsChangeInfo, + }, + } + msgPosition, err := c.clusterMeta.sendSealedSegmentChangeInfos(in.CollectionID, in.QueryChannel, emptyChangeInfo) if err != nil { - log.Debug("addQueryChannel: queryNode add query channel error", zap.String("error", err.Error())) + log.Error("addQueryChannel: get latest messageID of query channel error", zap.String("queryChannel", in.QueryChannel), zap.Error(err)) + return err + } + + // update watch position to latest + in.SeekPosition = msgPosition + err = targetNode.addQueryChannel(ctx, in) + if err != nil { + log.Error("addQueryChannel: queryNode add query channel error", zap.String("queryChannel", in.QueryChannel), zap.Error(err)) return err } return nil diff --git a/internal/querycoord/cluster_test.go b/internal/querycoord/cluster_test.go index 902abfd4b8..f9fc9a05e8 100644 --- a/internal/querycoord/cluster_test.go +++ b/internal/querycoord/cluster_test.go @@ -33,6 +33,7 @@ import ( etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" minioKV "github.com/milvus-io/milvus/internal/kv/minio" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/etcdpb" @@ -463,8 +464,19 @@ func TestGrpcRequest(t *testing.T) { clusterSession := sessionutil.NewSession(context.Background(), Params.MetaRootPath, Params.EtcdEndpoints) clusterSession.Init(typeutil.QueryCoordRole, Params.Address, true) clusterSession.Register() - meta, err := newMeta(baseCtx, kv, nil, nil) + factory := msgstream.NewPmsFactory() + m := map[string]interface{}{ + "PulsarAddress": Params.PulsarAddress, + "ReceiveBufSize": 1024, + "PulsarBufSize": 1024} + err = factory.SetParams(m) assert.Nil(t, err) + idAllocator := func() (UniqueID, error) { + return 0, nil + } + meta, err := newMeta(baseCtx, kv, factory, idAllocator) + assert.Nil(t, err) + cluster := &queryNodeCluster{ ctx: baseCtx, cancel: cancel, @@ -532,8 +544,7 @@ func TestGrpcRequest(t *testing.T) { }) t.Run("Test AddQueryChannel", func(t *testing.T) { - info, err := cluster.clusterMeta.getQueryChannelInfoByID(defaultCollectionID) - assert.Nil(t, err) + info := cluster.clusterMeta.getQueryChannelInfoByID(defaultCollectionID) addQueryChannelReq := &querypb.AddQueryChannelRequest{ NodeID: nodeID, CollectionID: defaultCollectionID, @@ -545,8 +556,7 @@ func TestGrpcRequest(t *testing.T) { }) t.Run("Test RemoveQueryChannel", func(t *testing.T) { - info, err := cluster.clusterMeta.getQueryChannelInfoByID(defaultCollectionID) - assert.Nil(t, err) + info := cluster.clusterMeta.getQueryChannelInfoByID(defaultCollectionID) removeQueryChannelReq := &querypb.RemoveQueryChannelRequest{ NodeID: nodeID, CollectionID: defaultCollectionID, diff --git a/internal/querycoord/impl.go b/internal/querycoord/impl.go index ff9eb655b7..71f0642f64 100644 --- a/internal/querycoord/impl.go +++ b/internal/querycoord/impl.go @@ -701,19 +701,7 @@ func (qc *QueryCoord) CreateQueryChannel(ctx context.Context, req *querypb.Creat } collectionID := req.CollectionID - info, err := qc.meta.getQueryChannelInfoByID(collectionID) - if err != nil { - status.ErrorCode = commonpb.ErrorCode_UnexpectedError - status.Reason = err.Error() - log.Error("createQueryChannel end with error", - zap.String("role", typeutil.QueryCoordRole), - zap.Int64("collectionID", collectionID), - zap.Error(err)) - return &querypb.CreateQueryChannelResponse{ - Status: status, - }, nil - } - + info := qc.meta.getQueryChannelInfoByID(collectionID) log.Debug("createQueryChannelRequest completed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("collectionID", collectionID), diff --git a/internal/querycoord/impl_test.go b/internal/querycoord/impl_test.go index b10691bd5e..cc306e691a 100644 --- a/internal/querycoord/impl_test.go +++ b/internal/querycoord/impl_test.go @@ -760,25 +760,6 @@ func TestGrpcTaskBeforeHealthy(t *testing.T) { assert.Nil(t, err) } -func Test_GrpcGetQueryChannelFail(t *testing.T) { - kv := &testKv{ - returnFn: failedResult, - } - meta, err := newMeta(context.Background(), kv, nil, nil) - assert.Nil(t, err) - - queryCoord := &QueryCoord{ - meta: meta, - } - queryCoord.stateCode.Store(internalpb.StateCode_Healthy) - - res, err := queryCoord.CreateQueryChannel(context.Background(), &querypb.CreateQueryChannelRequest{ - CollectionID: defaultCollectionID, - }) - assert.Nil(t, err) - assert.Equal(t, commonpb.ErrorCode_UnexpectedError, res.Status.ErrorCode) -} - func TestQueryCoord_GetComponentStates(t *testing.T) { n := &QueryCoord{} n.stateCode.Store(internalpb.StateCode_Healthy) diff --git a/internal/querycoord/meta.go b/internal/querycoord/meta.go index db1ba51ed2..98c617e6cf 100644 --- a/internal/querycoord/meta.go +++ b/internal/querycoord/meta.go @@ -37,15 +37,12 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/util" - "github.com/milvus-io/milvus/internal/util/mqclient" ) const ( - collectionMetaPrefix = "queryCoord-collectionMeta" - dmChannelMetaPrefix = "queryCoord-dmChannelWatchInfo" - queryChannelMetaPrefix = "queryCoord-queryChannel" - deltaChannelMetaPrefix = "queryCoord-deltaChannel" - globalQuerySeekPositionPrefix = "queryCoord-globalQuerySeekPosition" + collectionMetaPrefix = "queryCoord-collectionMeta" + dmChannelMetaPrefix = "queryCoord-dmChannelWatchInfo" + deltaChannelMetaPrefix = "queryCoord-deltaChannel" ) type col2SegmentInfos = map[UniqueID][]*querypb.SegmentInfo @@ -80,15 +77,15 @@ type Meta interface { getDeltaChannelsByCollectionID(collectionID UniqueID) ([]*datapb.VchannelInfo, error) setDeltaChannel(collectionID UniqueID, info []*datapb.VchannelInfo) error - getQueryChannelInfoByID(collectionID UniqueID) (*querypb.QueryChannelInfo, error) - getQueryStreamByID(collectionID UniqueID) (msgstream.MsgStream, error) + getQueryChannelInfoByID(collectionID UniqueID) *querypb.QueryChannelInfo + getQueryStreamByID(collectionID UniqueID, queryChannel string) (msgstream.MsgStream, error) setLoadType(collectionID UniqueID, loadType querypb.LoadType) error setLoadPercentage(collectionID UniqueID, partitionID UniqueID, percentage int64, loadType querypb.LoadType) error //printMeta() saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2SealedSegmentChangeInfos, error) removeGlobalSealedSegInfos(collectionID UniqueID, partitionIDs []UniqueID) (col2SealedSegmentChangeInfos, error) - sendSealedSegmentChangeInfos(collectionID UniqueID, changeInfos *querypb.SealedSegmentsChangeInfo) (*querypb.QueryChannelInfo, map[string][]mqclient.MessageID, error) + sendSealedSegmentChangeInfos(collectionID UniqueID, queryChannel string, changeInfos *querypb.SealedSegmentsChangeInfo) (*internalpb.MsgPosition, error) } // MetaReplica records the current load information on all querynodes @@ -113,7 +110,6 @@ type MetaReplica struct { queryStreams map[UniqueID]msgstream.MsgStream streamMu sync.RWMutex - globalSeekPosition *internalpb.MsgPosition //partitionStates map[UniqueID]*querypb.PartitionStates } @@ -125,7 +121,6 @@ func newMeta(ctx context.Context, kv kv.MetaKv, factory msgstream.Factory, idAll deltaChannelInfos := make(map[UniqueID][]*datapb.VchannelInfo) dmChannelInfos := make(map[string]*querypb.DmChannelWatchInfo) queryMsgStream := make(map[UniqueID]msgstream.MsgStream) - position := &internalpb.MsgPosition{} m := &MetaReplica{ ctx: childCtx, @@ -134,13 +129,12 @@ func newMeta(ctx context.Context, kv kv.MetaKv, factory msgstream.Factory, idAll msFactory: factory, idAllocator: idAllocator, - collectionInfos: collectionInfos, - segmentInfos: segmentInfos, - queryChannelInfos: queryChannelInfos, - deltaChannelInfos: deltaChannelInfos, - dmChannelInfos: dmChannelInfos, - queryStreams: queryMsgStream, - globalSeekPosition: position, + collectionInfos: collectionInfos, + segmentInfos: segmentInfos, + queryChannelInfos: queryChannelInfos, + deltaChannelInfos: deltaChannelInfos, + dmChannelInfos: dmChannelInfos, + queryStreams: queryMsgStream, } err := m.reloadFromKV() @@ -187,23 +181,6 @@ func (m *MetaReplica) reloadFromKV() error { m.segmentInfos[segmentID] = segmentInfo } - queryChannelKeys, queryChannelValues, err := m.client.LoadWithPrefix(queryChannelMetaPrefix) - if err != nil { - return nil - } - for index := range queryChannelKeys { - collectionID, err := strconv.ParseInt(filepath.Base(queryChannelKeys[index]), 10, 64) - if err != nil { - return err - } - queryChannelInfo := &querypb.QueryChannelInfo{} - err = proto.Unmarshal([]byte(queryChannelValues[index]), queryChannelInfo) - if err != nil { - return err - } - m.queryChannelInfos[collectionID] = queryChannelInfo - } - deltaChannelKeys, deltaChannelValues, err := m.client.LoadWithPrefix(deltaChannelMetaPrefix) if err != nil { return nil @@ -236,15 +213,6 @@ func (m *MetaReplica) reloadFromKV() error { m.dmChannelInfos[dmChannel] = dmChannelWatchInfo } - globalSeekPosValue, err := m.client.Load(globalQuerySeekPositionPrefix) - if err == nil { - position := &internalpb.MsgPosition{} - err = proto.Unmarshal([]byte(globalSeekPosValue), position) - if err != nil { - return err - } - m.globalSeekPosition = position - } //TODO::update partition states log.Debug("reload from kv finished") @@ -520,26 +488,14 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal } queryChannelInfosMap := make(map[UniqueID]*querypb.QueryChannelInfo) - var globalSeekPositionTmp *internalpb.MsgPosition for collectionID, segmentChangeInfos := range col2SegmentChangeInfos { // get msgStream to produce sealedSegmentChangeInfos to query channel - queryChannelInfo, messageIDInfos, err := m.sendSealedSegmentChangeInfos(collectionID, segmentChangeInfos) + queryChannelInfo := m.getQueryChannelInfoByID(collectionID) + msgPosition, err := m.sendSealedSegmentChangeInfos(collectionID, queryChannelInfo.QueryChannel, segmentChangeInfos) if err != nil { return nil, err } - // len(messageIDs) == 1 - messageIDs, ok := messageIDInfos[queryChannelInfo.QueryChannel] - if !ok || len(messageIDs) == 0 { - return col2SegmentChangeInfos, errors.New("updateGlobalSealedSegmentInfos: send sealed segment change info failed") - } - - if queryChannelInfo.SeekPosition == nil { - queryChannelInfo.SeekPosition = &internalpb.MsgPosition{ - ChannelName: queryChannelInfo.QueryChannel, - } - } - - queryChannelInfo.SeekPosition.MsgID = messageIDs[0].Serialize() + queryChannelInfo.SeekPosition = msgPosition // update segmentInfo, queryChannelInfo meta to cache and etcd seg2Info := make(map[UniqueID]*querypb.SegmentInfo) @@ -560,7 +516,6 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal } queryChannelInfo.GlobalSealedSegments = globalSealedSegmentInfos queryChannelInfosMap[collectionID] = queryChannelInfo - globalSeekPositionTmp = queryChannelInfo.SeekPosition } // save segmentInfo to etcd @@ -591,22 +546,8 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal } } - // save queryChannelInfo and sealedSegmentsChangeInfo to etcd + // save sealedSegmentsChangeInfo to etcd saveKvs := make(map[string]string) - for collectionID, queryChannelInfo := range queryChannelInfosMap { - channelInfoBytes, err := proto.Marshal(queryChannelInfo) - if err != nil { - return col2SegmentChangeInfos, err - } - channelKey := fmt.Sprintf("%s/%d", queryChannelMetaPrefix, collectionID) - saveKvs[channelKey] = string(channelInfoBytes) - } - seekPos, err := proto.Marshal(globalSeekPositionTmp) - if err != nil { - return col2SegmentChangeInfos, err - } - saveKvs[globalQuerySeekPositionPrefix] = string(seekPos) - // save segmentChangeInfo into etcd, query node will deal the changeInfo if the msgID key exist in etcd // avoid the produce process success but save meta to etcd failed // then the msgID key will not exist, and changeIndo will be ignored by query node @@ -620,7 +561,7 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal saveKvs[changeInfoKey] = string(changeInfoBytes) } - err = m.client.MultiSave(saveKvs) + err := m.client.MultiSave(saveKvs) if err != nil { panic(err) } @@ -641,7 +582,6 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal for collectionID, channelInfo := range queryChannelInfosMap { m.queryChannelInfos[collectionID] = channelInfo } - m.globalSeekPosition = globalSeekPositionTmp m.channelMu.Unlock() return col2SegmentChangeInfos, nil @@ -669,23 +609,13 @@ func (m *MetaReplica) removeGlobalSealedSegInfos(collectionID UniqueID, partitio segmentChangeInfos.Infos = append(segmentChangeInfos.Infos, changeInfo) } - // get msgStream to produce sealedSegmentChangeInfos to query channel - queryChannelInfo, messageIDInfos, err := m.sendSealedSegmentChangeInfos(collectionID, segmentChangeInfos) + // produce sealedSegmentChangeInfos to query channel + queryChannelInfo := m.getQueryChannelInfoByID(collectionID) + msgPosition, err := m.sendSealedSegmentChangeInfos(collectionID, queryChannelInfo.QueryChannel, segmentChangeInfos) if err != nil { return nil, err } - // len(messageIDs) = 1 - messageIDs, ok := messageIDInfos[queryChannelInfo.QueryChannel] - if !ok || len(messageIDs) == 0 { - return col2SealedSegmentChangeInfos{collectionID: segmentChangeInfos}, errors.New("updateGlobalSealedSegmentInfos: send sealed segment change info failed") - } - - if queryChannelInfo.SeekPosition == nil { - queryChannelInfo.SeekPosition = &internalpb.MsgPosition{ - ChannelName: queryChannelInfo.QueryChannel, - } - } - queryChannelInfo.SeekPosition.MsgID = messageIDs[0].Serialize() + queryChannelInfo.SeekPosition = msgPosition // update segmentInfo, queryChannelInfo meta to cache and etcd seg2Info := make(map[UniqueID]*querypb.SegmentInfo) @@ -714,20 +644,7 @@ func (m *MetaReplica) removeGlobalSealedSegInfos(collectionID UniqueID, partitio } } - // save meta to etcd saveKvs := make(map[string]string) - channelInfoBytes, err := proto.Marshal(queryChannelInfo) - if err != nil { - return col2SealedSegmentChangeInfos{collectionID: segmentChangeInfos}, err - } - channelKey := fmt.Sprintf("%s/%d", queryChannelMetaPrefix, collectionID) - saveKvs[channelKey] = string(channelInfoBytes) - seekPos, err := proto.Marshal(queryChannelInfo.SeekPosition) - if err != nil { - return col2SealedSegmentChangeInfos{collectionID: segmentChangeInfos}, err - } - saveKvs[globalQuerySeekPositionPrefix] = string(seekPos) - // save segmentChangeInfo into etcd, query node will deal the changeInfo if the msgID key exist in etcd // avoid the produce process success but save meta to etcd failed // then the msgID key will not exist, and changeIndo will be ignored by query node @@ -752,25 +669,18 @@ func (m *MetaReplica) removeGlobalSealedSegInfos(collectionID UniqueID, partitio m.channelMu.Lock() m.queryChannelInfos[collectionID] = queryChannelInfo - m.globalSeekPosition = queryChannelInfo.SeekPosition m.channelMu.Unlock() return col2SealedSegmentChangeInfos{collectionID: segmentChangeInfos}, nil } // send sealed segment change infos into query channels -func (m *MetaReplica) sendSealedSegmentChangeInfos(collectionID UniqueID, changeInfos *querypb.SealedSegmentsChangeInfo) (*querypb.QueryChannelInfo, map[string][]mqclient.MessageID, error) { +func (m *MetaReplica) sendSealedSegmentChangeInfos(collectionID UniqueID, queryChannel string, changeInfos *querypb.SealedSegmentsChangeInfo) (*internalpb.MsgPosition, error) { // get msgStream to produce sealedSegmentChangeInfos to query channel - queryChannelInfo, err := m.getQueryChannelInfoByID(collectionID) + queryStream, err := m.getQueryStreamByID(collectionID, queryChannel) if err != nil { - log.Error("updateGlobalSealedSegmentInfos: get query channel info failed", zap.Int64("collectionID", collectionID), zap.Error(err)) - return nil, nil, err - } - - queryStream, err := m.getQueryStreamByID(collectionID) - if err != nil { - log.Error("updateGlobalSealedSegmentInfos: get query stream failed", zap.Int64("collectionID", collectionID), zap.Error(err)) - return nil, nil, err + log.Error("sendSealedSegmentChangeInfos: get query stream failed", zap.Int64("collectionID", collectionID), zap.Error(err)) + return nil, err } var msgPack = &msgstream.MsgPack{ @@ -778,8 +688,8 @@ func (m *MetaReplica) sendSealedSegmentChangeInfos(collectionID UniqueID, change } id, err := m.idAllocator() if err != nil { - log.Error("allocator trigger taskID failed", zap.Error(err)) - return nil, nil, err + log.Error("sendSealedSegmentChangeInfos: allocator trigger taskID failed", zap.Int64("collectionID", collectionID), zap.Error(err)) + return nil, err } changeInfos.Base.MsgID = id segmentChangeMsg := &msgstream.SealedSegmentsChangeInfoMsg{ @@ -792,12 +702,25 @@ func (m *MetaReplica) sendSealedSegmentChangeInfos(collectionID UniqueID, change messageIDInfos, err := queryStream.ProduceMark(msgPack) if err != nil { - log.Error("updateGlobalSealedSegmentInfos: send sealed segment change info failed", zap.Int64("collectionID", collectionID), zap.Error(err)) - return nil, nil, err + log.Error("sendSealedSegmentChangeInfos: send sealed segment change info failed", zap.Int64("collectionID", collectionID), zap.Error(err)) + return nil, err } - log.Debug("updateGlobalSealedSegmentInfos: send sealed segment change info to queryChannel", zap.Any("msgPack", msgPack)) - return queryChannelInfo, messageIDInfos, nil + messageIDs, ok := messageIDInfos[queryChannel] + if !ok { + return nil, fmt.Errorf("sendSealedSegmentChangeInfos: send sealed segment change info to wrong query channel, collectionID = %d, query channel = %s", collectionID, queryChannel) + } + + // len(messageIDs) = 1 + if len(messageIDs) != 1 { + return nil, fmt.Errorf("sendSealedSegmentChangeInfos: length of the positions in stream is not correct, collectionID = %d, query channel = %s, len = %d", collectionID, queryChannel, len(messageIDs)) + } + + log.Debug("updateGlobalSealedSegmentInfos: send sealed segment change info to queryChannel", zap.Any("msgPack", msgPack)) + return &internalpb.MsgPosition{ + ChannelName: queryChannel, + MsgID: messageIDs[0].Serialize(), + }, nil } func (m *MetaReplica) showSegmentInfos(collectionID UniqueID, partitionIDs []UniqueID) []*querypb.SegmentInfo { @@ -814,11 +737,12 @@ func (m *MetaReplica) showSegmentInfos(collectionID UniqueID, partitionIDs []Uni if len(partitionIDs) == 0 { return segmentInfos } + + partitionIDMap := getCompareMapFromSlice(partitionIDs) for _, info := range segmentInfos { - for _, partitionID := range partitionIDs { - if info.PartitionID == partitionID { - results = append(results, info) - } + partitionID := info.PartitionID + if _, ok := partitionIDMap[partitionID]; ok { + results = append(results, info) } } return results @@ -905,21 +829,26 @@ func (m *MetaReplica) setDmChannelInfos(dmChannelWatchInfos []*querypb.DmChannel return nil } -func createQueryChannel(collectionID UniqueID) *querypb.QueryChannelInfo { +func (m *MetaReplica) createQueryChannel(collectionID UniqueID) *querypb.QueryChannelInfo { + // TODO::to remove + // all collection use the same query channel + colIDForAssignChannel := UniqueID(0) + searchPrefix := Params.SearchChannelPrefix searchResultPrefix := Params.SearchResultChannelPrefix - allocatedQueryChannel := searchPrefix + "-" + strconv.FormatInt(collectionID, 10) - allocatedQueryResultChannel := searchResultPrefix + "-" + strconv.FormatInt(collectionID, 10) + allocatedQueryChannel := searchPrefix + "-" + strconv.FormatInt(colIDForAssignChannel, 10) + allocatedQueryResultChannel := searchResultPrefix + "-" + strconv.FormatInt(colIDForAssignChannel, 10) log.Debug("query coordinator create query channel", zap.String("queryChannelName", allocatedQueryChannel), zap.String("queryResultChannelName", allocatedQueryResultChannel)) seekPosition := &internalpb.MsgPosition{ ChannelName: allocatedQueryChannel, } + segmentInfos := m.showSegmentInfos(collectionID, nil) info := &querypb.QueryChannelInfo{ CollectionID: collectionID, QueryChannel: allocatedQueryChannel, QueryResultChannel: allocatedQueryResultChannel, - GlobalSealedSegments: []*querypb.SegmentInfo{}, + GlobalSealedSegments: segmentInfos, SeekPosition: seekPosition, } @@ -957,57 +886,42 @@ func (m *MetaReplica) setDeltaChannel(collectionID UniqueID, infos []*datapb.Vch } // Get Query channel info for collection, so far all the collection share the same query channel 0 -func (m *MetaReplica) getQueryChannelInfoByID(collectionID UniqueID) (*querypb.QueryChannelInfo, error) { +func (m *MetaReplica) getQueryChannelInfoByID(collectionID UniqueID) *querypb.QueryChannelInfo { m.channelMu.Lock() defer m.channelMu.Unlock() + var channelInfo *querypb.QueryChannelInfo if info, ok := m.queryChannelInfos[collectionID]; ok { - return proto.Clone(info).(*querypb.QueryChannelInfo), nil + channelInfo = proto.Clone(info).(*querypb.QueryChannelInfo) + } else { + channelInfo = m.createQueryChannel(collectionID) + m.queryChannelInfos[collectionID] = channelInfo } - // TODO::to remove - // all collection use the same query channel - colIDForAssignChannel := UniqueID(0) - info := createQueryChannel(colIDForAssignChannel) - err := saveQueryChannelInfo(info, m.client) - if err != nil { - log.Error("getQueryChannel: save channel to etcd error", zap.Error(err)) - return nil, err - } - // set info.collectionID from 0 to realID - info.CollectionID = collectionID - m.queryChannelInfos[collectionID] = info - info.SeekPosition = m.globalSeekPosition - if info.SeekPosition != nil { - info.SeekPosition.ChannelName = info.QueryChannel - } - return proto.Clone(info).(*querypb.QueryChannelInfo), nil + return proto.Clone(channelInfo).(*querypb.QueryChannelInfo) } -func (m *MetaReplica) getQueryStreamByID(collectionID UniqueID) (msgstream.MsgStream, error) { +func (m *MetaReplica) getQueryStreamByID(collectionID UniqueID, queryChannel string) (msgstream.MsgStream, error) { m.streamMu.Lock() defer m.streamMu.Unlock() - info, err := m.getQueryChannelInfoByID(collectionID) - if err != nil { - return nil, err - } - - stream, ok := m.queryStreams[collectionID] - if !ok { - stream, err = m.msFactory.NewMsgStream(m.ctx) + var queryStream msgstream.MsgStream + var err error + if stream, ok := m.queryStreams[collectionID]; ok { + queryStream = stream + } else { + queryStream, err = m.msFactory.NewMsgStream(m.ctx) if err != nil { log.Error("updateGlobalSealedSegmentInfos: create msgStream failed", zap.Error(err)) return nil, err } - queryChannel := info.QueryChannel - stream.AsProducer([]string{queryChannel}) - m.queryStreams[collectionID] = stream + queryStream.AsProducer([]string{queryChannel}) + m.queryStreams[collectionID] = queryStream log.Debug("getQueryStreamByID: create query msgStream for collection", zap.Int64("collectionID", collectionID)) } - return stream, nil + return queryStream, nil } func (m *MetaReplica) setLoadType(collectionID UniqueID, loadType querypb.LoadType) error { @@ -1107,16 +1021,6 @@ func saveGlobalCollectionInfo(collectionID UniqueID, info *querypb.CollectionInf return kv.Save(key, string(infoBytes)) } -func saveQueryChannelInfo(info *querypb.QueryChannelInfo, kv kv.MetaKv) error { - infoBytes, err := proto.Marshal(info) - if err != nil { - return err - } - - key := fmt.Sprintf("%s/%d", queryChannelMetaPrefix, info.CollectionID) - return kv.Save(key, string(infoBytes)) -} - func saveDeltaChannelInfo(collectionID UniqueID, infos []*datapb.VchannelInfo, kv kv.MetaKv) error { kvs := make(map[string]string) for _, info := range infos { diff --git a/internal/querycoord/meta_test.go b/internal/querycoord/meta_test.go index fd6480bf64..debb6a54c9 100644 --- a/internal/querycoord/meta_test.go +++ b/internal/querycoord/meta_test.go @@ -29,7 +29,6 @@ import ( "github.com/milvus-io/milvus/internal/kv" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/util" ) @@ -106,12 +105,11 @@ func TestMetaFunc(t *testing.T) { NodeID: nodeID, } meta := &MetaReplica{ - client: kv, - collectionInfos: map[UniqueID]*querypb.CollectionInfo{}, - segmentInfos: segmentInfos, - queryChannelInfos: map[UniqueID]*querypb.QueryChannelInfo{}, - dmChannelInfos: map[string]*querypb.DmChannelWatchInfo{}, - globalSeekPosition: &internalpb.MsgPosition{}, + client: kv, + collectionInfos: map[UniqueID]*querypb.CollectionInfo{}, + segmentInfos: segmentInfos, + queryChannelInfos: map[UniqueID]*querypb.QueryChannelInfo{}, + dmChannelInfos: map[string]*querypb.DmChannelWatchInfo{}, } dmChannels := []string{"testDm1", "testDm2"} @@ -149,9 +147,8 @@ func TestMetaFunc(t *testing.T) { assert.NotNil(t, err) }) - t.Run("Test GetQueryChannelInfoByIDFail", func(t *testing.T) { - res, err := meta.getQueryChannelInfoByID(defaultCollectionID) - assert.Nil(t, err) + t.Run("Test GetQueryChannelInfoByIDFirst", func(t *testing.T) { + res := meta.getQueryChannelInfoByID(defaultCollectionID) assert.NotNil(t, res) }) @@ -245,11 +242,10 @@ func TestMetaFunc(t *testing.T) { assert.Equal(t, defaultSegmentID, infos[0].SegmentID) }) - t.Run("Test getQueryChannel", func(t *testing.T) { - info, err := meta.getQueryChannelInfoByID(defaultCollectionID) + t.Run("Test getQueryChannelSecond", func(t *testing.T) { + info := meta.getQueryChannelInfoByID(defaultCollectionID) assert.NotNil(t, info.QueryChannel) assert.NotNil(t, info.QueryResultChannel) - assert.Nil(t, err) }) t.Run("Test GetSegmentInfoByID", func(t *testing.T) { @@ -317,14 +313,6 @@ func TestReloadMetaFromKV(t *testing.T) { segmentKey := fmt.Sprintf("%s/%d", util.SegmentMetaPrefix, defaultSegmentID) kvs[segmentKey] = string(segmentBlobs) - queryChannelInfo := &querypb.QueryChannelInfo{ - CollectionID: defaultCollectionID, - } - queryChannelBlobs, err := proto.Marshal(queryChannelInfo) - assert.Nil(t, err) - queryChannelKey := fmt.Sprintf("%s/%d", queryChannelMetaPrefix, defaultCollectionID) - kvs[queryChannelKey] = string(queryChannelBlobs) - deltaChannel1 := &datapb.VchannelInfo{CollectionID: defaultCollectionID, ChannelName: "delta-channel1"} deltaChannel2 := &datapb.VchannelInfo{CollectionID: defaultCollectionID, ChannelName: "delta-channel2"} @@ -354,27 +342,8 @@ func TestReloadMetaFromKV(t *testing.T) { assert.Equal(t, 1, len(meta.collectionInfos)) assert.Equal(t, 1, len(meta.segmentInfos)) - assert.Equal(t, 1, len(meta.queryChannelInfos)) _, ok := meta.collectionInfos[defaultCollectionID] assert.Equal(t, true, ok) _, ok = meta.segmentInfos[defaultSegmentID] assert.Equal(t, true, ok) - _, ok = meta.queryChannelInfos[defaultCollectionID] - assert.Equal(t, true, ok) - - t.Run("test no global query seek position", func(t *testing.T) { - err = kv.Remove(globalQuerySeekPositionPrefix) - assert.NoError(t, err) - - err = meta.reloadFromKV() - assert.NoError(t, err) - }) - - t.Run("test wrong global query seek position", func(t *testing.T) { - err = kv.Save(globalQuerySeekPositionPrefix, "&%*&^*^(&%*&%&^%") - assert.NoError(t, err) - - err = meta.reloadFromKV() - assert.Error(t, err) - }) } diff --git a/internal/querycoord/segment_allocator_test.go b/internal/querycoord/segment_allocator_test.go index 3586684841..8ce0436d71 100644 --- a/internal/querycoord/segment_allocator_test.go +++ b/internal/querycoord/segment_allocator_test.go @@ -123,7 +123,7 @@ func TestShuffleSegmentsToQueryNode(t *testing.T) { assert.Equal(t, node2ID, firstReq.DstNodeID) assert.Equal(t, node2ID, secondReq.DstNodeID) - err = shuffleSegmentsToQueryNodeV2(baseCtx, reqs, cluster, true, nil, nil) + err = shuffleSegmentsToQueryNodeV2(baseCtx, reqs, cluster, meta, true, nil, nil) assert.Nil(t, err) assert.Equal(t, node2ID, firstReq.DstNodeID) diff --git a/internal/querycoord/task.go b/internal/querycoord/task.go index 1e41f2cdee..216efbae92 100644 --- a/internal/querycoord/task.go +++ b/internal/querycoord/task.go @@ -2061,11 +2061,7 @@ func assignInternalTask(ctx context.Context, for nodeID, collectionIDs := range watchQueryChannelInfo { for _, collectionID := range collectionIDs { - queryChannelInfo, err := meta.getQueryChannelInfoByID(collectionID) - if err != nil { - return nil, err - } - + queryChannelInfo := meta.getQueryChannelInfoByID(collectionID) msgBase := proto.Clone(parentTask.msgBase()).(*commonpb.MsgBase) msgBase.MsgType = commonpb.MsgType_WatchQueryChannels addQueryChannelRequest := &querypb.AddQueryChannelRequest{ @@ -2075,7 +2071,6 @@ func assignInternalTask(ctx context.Context, QueryChannel: queryChannelInfo.QueryChannel, QueryResultChannel: queryChannelInfo.QueryResultChannel, GlobalSealedSegments: queryChannelInfo.GlobalSealedSegments, - SeekPosition: queryChannelInfo.SeekPosition, } baseTask := newBaseTask(ctx, parentTask.getTriggerCondition()) baseTask.setParentTask(parentTask) diff --git a/internal/querycoord/task_scheduler.go b/internal/querycoord/task_scheduler.go index aa0efeb658..c9e5305b64 100644 --- a/internal/querycoord/task_scheduler.go +++ b/internal/querycoord/task_scheduler.go @@ -925,7 +925,8 @@ func updateSegmentInfoFromTask(ctx context.Context, triggerTask task, meta Meta) rollBackSegmentChangeInfoErr := retry.Do(ctx, func() error { rollBackChangeInfos := reverseSealedSegmentChangeInfo(sealedSegmentChangeInfos) for collectionID, infos := range rollBackChangeInfos { - _, _, sendErr := meta.sendSealedSegmentChangeInfos(collectionID, infos) + channelInfo := meta.getQueryChannelInfoByID(collectionID) + _, sendErr := meta.sendSealedSegmentChangeInfos(collectionID, channelInfo.QueryChannel, infos) if sendErr != nil { return sendErr } @@ -934,8 +935,10 @@ func updateSegmentInfoFromTask(ctx context.Context, triggerTask task, meta Meta) }, retry.Attempts(20)) if rollBackSegmentChangeInfoErr != nil { log.Error("scheduleLoop: Restore the information of global sealed segments in query node failed", zap.Error(rollBackSegmentChangeInfoErr)) + panic(rollBackSegmentChangeInfoErr) + } else { + log.Info("Successfully roll back segment info change") } - log.Info("Successfully roll back segment info change") return err } diff --git a/internal/querycoord/util.go b/internal/querycoord/util.go new file mode 100644 index 0000000000..0738501c47 --- /dev/null +++ b/internal/querycoord/util.go @@ -0,0 +1,26 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package querycoord + +func getCompareMapFromSlice(sliceData []int64) map[int64]struct{} { + compareMap := make(map[int64]struct{}) + for _, data := range sliceData { + compareMap[data] = struct{}{} + } + + return compareMap +}