diff --git a/pulsar/go_client.go b/pulsar/go_client.go index 14575ed9d8..76feac6150 100644 --- a/pulsar/go_client.go +++ b/pulsar/go_client.go @@ -1,43 +1,191 @@ package pulsar -import "suvlim/pulsar/schema" - -func Send(msg schema.Message) { - -} - -func BatchSend(msgs []schema.Message) { - -} +import ( + "context" + "github.com/apache/pulsar/pulsar-client-go/pulsar" + "log" + "suvlim/pulsar/schema" + "sync" +) var ( - InsertSchemaDef = "{\"type\":\"record\",\"name\":\"suvlim\",\"namespace\":\"pulsar\",\"fields\":[" + + wg sync.WaitGroup + //wgJob sync.WaitGroup + //wgQuery sync.WaitGroup + //wgWrite sync.WaitGroup + + OriginMsgSchema = "{\"type\":\"record\",\"name\":\"suvlim\",\"namespace\":\"pulsar\",\"fields\":[" + "{\"name\":\"CollectionName\",\"type\":\"string\"}," + "{\"name\":\"Fields\",\"type\":\"[]*FieldValue\"}" + "{\"name\":\"EntityId\",\"type\":\"int64\"}" + "{\"name\":\"PartitionTag\",\"type\":\"string\"}" + - "{\"name\":\"Timestamp\",\"type\":\"int64\"}" + - "{\"name\":\"ClientId\",\"type\":\"int64\"}" + - "{\"name\":\"MsgType\",\"type\":\"OpType\"}" + - "]}" - DeleteSchemaDef = "{\"type\":\"record\",\"name\":\"suvlim\",\"namespace\":\"pulsar\",\"fields\":[" + - "{\"name\":\"CollectionName\",\"type\":\"string\"}," + - "{\"name\":\"EntityId\",\"type\":\"int64\"}" + - "{\"name\":\"Timestamp\",\"type\":\"int64\"}" + - "{\"name\":\"ClientId\",\"type\":\"int64\"}" + - "{\"name\":\"MsgType\",\"type\":\"OpType\"}" + - "]}" - SearchSchemaDef = "{\"type\":\"record\",\"name\":\"suvlim\",\"namespace\":\"pulsar\",\"fields\":[" + - "{\"name\":\"CollectionName\",\"type\":\"string\"}," + - "{\"name\":\"PartitionTag\",\"type\":\"string\"}" + "{\"name\":\"VectorParam\",\"type\":\"*VectorParam\"}" + + "{\"name\":\"Segments\",\"type\":\"[]string\"}" + "{\"name\":\"Timestamp\",\"type\":\"int64\"}" + "{\"name\":\"ClientId\",\"type\":\"int64\"}" + "{\"name\":\"MsgType\",\"type\":\"OpType\"}" + "]}" - TimeSyncSchemaDef = "{\"type\":\"record\",\"name\":\"suvlim\",\"namespace\":\"pulsar\",\"fields\":[" + - "{\"name\":\"Timestamp\",\"type\":\"int64\"}" + - "{\"name\":\"ClientId\",\"type\":\"int64\"}" + - "{\"name\":\"MsgType\",\"type\":\"OpType\"}" + + SyncEofSchema = "{\"type\":\"record\",\"name\":\"suvlim\",\"namespace\":\"pulsar\",\"fields\":[" + + "{\"name\":\"MsgType\",\"type\":\"OpType\"}," + "]}" -) \ No newline at end of file +) + +type MessageClient struct { + + //message channel + insertChan chan *schema.InsertMsg + deleteChan chan *schema.DeleteMsg + searchChan chan *schema.SearchMsg + timeSyncChan chan *schema.TimeSyncMsg + key2SegChan chan *schema.Key2SegMsg + + // pulsar + client pulsar.Client + syncInsertProducer pulsar.Producer + syncDeleteProducer pulsar.Producer + key2segProducer pulsar.Producer + consumer pulsar.Consumer + + // batch messages + insertMsg []*schema.InsertMsg + deleteMsg []*schema.DeleteMsg + searchMsg []*schema.SearchMsg + timeMsg []*schema.TimeSyncMsg + key2segMsg []*schema.Key2SegMsg + +} + +func (mc *MessageClient) ReceiveMessage() { + for { + pulsarMessage := schema.PulsarMessage{} + msg, err := mc.consumer.Receive(context.Background()) + err = msg.GetValue(&pulsarMessage) + if err != nil { + log.Fatal(err) + } + + msgType := pulsarMessage.MsgType + switch msgType { + case schema.Insert: + IMsgObj := schema.InsertMsg{} + mc.insertChan <- &IMsgObj + case schema.Delete: + DMsgObj := schema.DeleteMsg{} + mc.deleteChan <- &DMsgObj + case schema.Search: + SMsgObj := schema.SearchMsg{} + mc.searchChan <- &SMsgObj + case schema.TimeSync: + TMsgObj := schema.TimeSyncMsg{} + mc.timeSyncChan <- &TMsgObj + case schema.Key2Seg: + KMsgObj := schema.Key2SegMsg{} + mc.key2SegChan <- &KMsgObj + } + } +} + +func (mc *MessageClient) CreatProducer(schemaDef string, topicName string) pulsar.Producer{ + schema := pulsar.NewProtoSchema(schemaDef, nil) + producer, err := mc.client.CreateProducerWithSchema(pulsar.ProducerOptions{ + Topic: topicName, + }, schema) + defer producer.Close() + if err != nil { + log.Fatal(err) + } + return producer +} + +func (mc *MessageClient) CreateConsumer(schemaDef string, topics []string) pulsar.Consumer { + originMsgSchema := pulsar.NewProtoSchema(schemaDef, nil) + consumer, err := mc.client.SubscribeWithSchema(pulsar.ConsumerOptions{ + Topics: topics, + SubscriptionName: "multi-topic-sub", + }, originMsgSchema) + defer consumer.Close() + if err != nil { + log.Fatal(err) + } + return consumer +} + +func (mc *MessageClient) CreateClient(url string) pulsar.Client { + // create client + client, err := pulsar.NewClient(pulsar.ClientOptions{ + URL: url, + }) + defer client.Close() + if err != nil { + log.Fatal(err) + } + return client +} + +func (mc *MessageClient) InitClient(url string,topics []string) { + //create client + mc.client = mc.CreateClient(url) + + //create producer + mc.syncInsertProducer = mc.CreatProducer(SyncEofSchema, "insert") + mc.syncDeleteProducer = mc.CreatProducer(SyncEofSchema, "delete") + mc.key2segProducer = mc.CreatProducer(SyncEofSchema, "key2seg") + + //create consumer + mc.consumer = mc.CreateConsumer(OriginMsgSchema, topics) + + // init channel + mc.insertChan = make(chan *schema.InsertMsg, 1000) + mc.deleteChan = make(chan *schema.DeleteMsg, 1000) + mc.searchChan = make(chan *schema.SearchMsg, 1000) + mc.timeSyncChan = make(chan *schema.TimeSyncMsg, 1000) + mc.key2SegChan = make(chan *schema.Key2SegMsg, 1000) +} + +type JobType int +const ( + OpInQueryNode JobType = 0 + OpInWriteNode JobType = 1 +) + +func (mc *MessageClient) PrepareBatchMsg(jobType JobType) { + // assume the channel not full + mc.insertMsg = make([]*schema.InsertMsg, 1000) + mc.deleteMsg = make([]*schema.DeleteMsg, 1000) + mc.searchMsg = make([]*schema.SearchMsg, 1000) + mc.timeMsg = make([]*schema.TimeSyncMsg, 1000) + mc.key2segMsg = make([]*schema.Key2SegMsg, 1000) + + // get the length of every channel + insertLen := len(mc.insertChan) + deleteLen := len(mc.deleteChan) + searchLen := len(mc.searchChan) + timeLen := len(mc.timeSyncChan) + key2segLen := len(mc.key2SegChan) + + + // get message from channel to slice + for i := 0; i < insertLen; i++ { + msg := <- mc.insertChan + mc.insertMsg[i] = msg + } + for i := 0; i < deleteLen; i++ { + msg := <- mc.deleteChan + mc.deleteMsg[i] = msg + } + for i := 0; i < timeLen; i++ { + msg := <- mc.timeSyncChan + mc.timeMsg[i] = msg + } + if jobType == OpInQueryNode { + for i := 0; i < key2segLen; i++ { + msg := <-mc.key2SegChan + mc.key2segMsg[i] = msg + } + + for i := 0; i < searchLen; i++ { + msg := <-mc.searchChan + mc.searchMsg[i] = msg + } + } +} diff --git a/pulsar/pb/pulsar.pb.go b/pulsar/pb/pulsar.pb.go index a4aceb19b3..9d941508ba 100644 --- a/pulsar/pb/pulsar.pb.go +++ b/pulsar/pb/pulsar.pb.go @@ -9,18 +9,14 @@ It is generated from these files: It has these top-level messages: Status - KeyValuePair + SegmentRecord VectorRowRecord AttrRecord VectorRecord VectorParam FieldValue - Entities - InsertParam - EntityIds - DeleteByIDParam - SearchParam - QueryResult + PulsarMessage + PulsarMessages */ package pb @@ -174,6 +170,39 @@ func (x DataType) String() string { } func (DataType) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } +type OpType int32 + +const ( + OpType_Insert OpType = 0 + OpType_Delete OpType = 1 + OpType_Search OpType = 2 + OpType_TimeSync OpType = 3 + OpType_Key2Seg OpType = 4 + OpType_Statistics OpType = 5 +) + +var OpType_name = map[int32]string{ + 0: "Insert", + 1: "Delete", + 2: "Search", + 3: "TimeSync", + 4: "Key2Seg", + 5: "Statistics", +} +var OpType_value = map[string]int32{ + "Insert": 0, + "Delete": 1, + "Search": 2, + "TimeSync": 3, + "Key2Seg": 4, + "Statistics": 5, +} + +func (x OpType) String() string { + return proto.EnumName(OpType_name, int32(x)) +} +func (OpType) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } + type Status struct { ErrorCode ErrorCode `protobuf:"varint,1,opt,name=error_code,json=errorCode,enum=pb.ErrorCode" json:"error_code,omitempty"` Reason string `protobuf:"bytes,2,opt,name=reason" json:"reason,omitempty"` @@ -198,28 +227,20 @@ func (m *Status) GetReason() string { return "" } -type KeyValuePair struct { - Key string `protobuf:"bytes,1,opt,name=key" json:"key,omitempty"` - Value string `protobuf:"bytes,2,opt,name=value" json:"value,omitempty"` +type SegmentRecord struct { + SegInfo []string `protobuf:"bytes,1,rep,name=seg_info,json=segInfo" json:"seg_info,omitempty"` } -func (m *KeyValuePair) Reset() { *m = KeyValuePair{} } -func (m *KeyValuePair) String() string { return proto.CompactTextString(m) } -func (*KeyValuePair) ProtoMessage() {} -func (*KeyValuePair) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } +func (m *SegmentRecord) Reset() { *m = SegmentRecord{} } +func (m *SegmentRecord) String() string { return proto.CompactTextString(m) } +func (*SegmentRecord) ProtoMessage() {} +func (*SegmentRecord) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } -func (m *KeyValuePair) GetKey() string { +func (m *SegmentRecord) GetSegInfo() []string { if m != nil { - return m.Key + return m.SegInfo } - return "" -} - -func (m *KeyValuePair) GetValue() string { - if m != nil { - return m.Value - } - return "" + return nil } type VectorRowRecord struct { @@ -366,392 +387,252 @@ func (m *FieldValue) GetVectorRecord() *VectorRecord { return nil } -type Entities struct { - Status *Status `protobuf:"bytes,1,opt,name=status" json:"status,omitempty"` - Ids []int64 `protobuf:"varint,2,rep,packed,name=ids" json:"ids,omitempty"` - ValidRow []bool `protobuf:"varint,3,rep,packed,name=valid_row,json=validRow" json:"valid_row,omitempty"` - Fields []*FieldValue `protobuf:"bytes,4,rep,name=fields" json:"fields,omitempty"` +type PulsarMessage struct { + CollectionName string `protobuf:"bytes,1,opt,name=collection_name,json=collectionName" json:"collection_name,omitempty"` + Fields []*FieldValue `protobuf:"bytes,2,rep,name=fields" json:"fields,omitempty"` + EntityId int64 `protobuf:"varint,3,opt,name=entity_id,json=entityId" json:"entity_id,omitempty"` + PartitionTag string `protobuf:"bytes,4,opt,name=partition_tag,json=partitionTag" json:"partition_tag,omitempty"` + VectorParam *VectorParam `protobuf:"bytes,5,opt,name=vector_param,json=vectorParam" json:"vector_param,omitempty"` + Segments *SegmentRecord `protobuf:"bytes,6,opt,name=segments" json:"segments,omitempty"` + Timestamp int64 `protobuf:"varint,7,opt,name=timestamp" json:"timestamp,omitempty"` + ClientId int64 `protobuf:"varint,8,opt,name=client_id,json=clientId" json:"client_id,omitempty"` + MsgType OpType `protobuf:"varint,9,opt,name=msg_type,json=msgType,enum=pb.OpType" json:"msg_type,omitempty"` } -func (m *Entities) Reset() { *m = Entities{} } -func (m *Entities) String() string { return proto.CompactTextString(m) } -func (*Entities) ProtoMessage() {} -func (*Entities) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} } +func (m *PulsarMessage) Reset() { *m = PulsarMessage{} } +func (m *PulsarMessage) String() string { return proto.CompactTextString(m) } +func (*PulsarMessage) ProtoMessage() {} +func (*PulsarMessage) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} } -func (m *Entities) GetStatus() *Status { - if m != nil { - return m.Status - } - return nil -} - -func (m *Entities) GetIds() []int64 { - if m != nil { - return m.Ids - } - return nil -} - -func (m *Entities) GetValidRow() []bool { - if m != nil { - return m.ValidRow - } - return nil -} - -func (m *Entities) GetFields() []*FieldValue { - if m != nil { - return m.Fields - } - return nil -} - -type InsertParam struct { - CollectionName string `protobuf:"bytes,1,opt,name=collection_name,json=collectionName" json:"collection_name,omitempty"` - Fields []*FieldValue `protobuf:"bytes,2,rep,name=fields" json:"fields,omitempty"` - EntityIdArray []int64 `protobuf:"varint,3,rep,packed,name=entity_id_array,json=entityIdArray" json:"entity_id_array,omitempty"` - PartitionTag string `protobuf:"bytes,4,opt,name=partition_tag,json=partitionTag" json:"partition_tag,omitempty"` - Timestamp []int64 `protobuf:"varint,5,rep,packed,name=timestamp" json:"timestamp,omitempty"` - GrpcServerClientId int64 `protobuf:"varint,6,opt,name=grpc_server_client_id,json=grpcServerClientId" json:"grpc_server_client_id,omitempty"` - ExtraParams []*KeyValuePair `protobuf:"bytes,7,rep,name=extra_params,json=extraParams" json:"extra_params,omitempty"` -} - -func (m *InsertParam) Reset() { *m = InsertParam{} } -func (m *InsertParam) String() string { return proto.CompactTextString(m) } -func (*InsertParam) ProtoMessage() {} -func (*InsertParam) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{8} } - -func (m *InsertParam) GetCollectionName() string { +func (m *PulsarMessage) GetCollectionName() string { if m != nil { return m.CollectionName } return "" } -func (m *InsertParam) GetFields() []*FieldValue { +func (m *PulsarMessage) GetFields() []*FieldValue { if m != nil { return m.Fields } return nil } -func (m *InsertParam) GetEntityIdArray() []int64 { +func (m *PulsarMessage) GetEntityId() int64 { if m != nil { - return m.EntityIdArray + return m.EntityId } - return nil + return 0 } -func (m *InsertParam) GetPartitionTag() string { +func (m *PulsarMessage) GetPartitionTag() string { if m != nil { return m.PartitionTag } return "" } -func (m *InsertParam) GetTimestamp() []int64 { - if m != nil { - return m.Timestamp - } - return nil -} - -func (m *InsertParam) GetGrpcServerClientId() int64 { - if m != nil { - return m.GrpcServerClientId - } - return 0 -} - -func (m *InsertParam) GetExtraParams() []*KeyValuePair { - if m != nil { - return m.ExtraParams - } - return nil -} - -type EntityIds struct { - Status *Status `protobuf:"bytes,1,opt,name=status" json:"status,omitempty"` - EntityIdArray []int64 `protobuf:"varint,2,rep,packed,name=entity_id_array,json=entityIdArray" json:"entity_id_array,omitempty"` -} - -func (m *EntityIds) Reset() { *m = EntityIds{} } -func (m *EntityIds) String() string { return proto.CompactTextString(m) } -func (*EntityIds) ProtoMessage() {} -func (*EntityIds) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{9} } - -func (m *EntityIds) GetStatus() *Status { - if m != nil { - return m.Status - } - return nil -} - -func (m *EntityIds) GetEntityIdArray() []int64 { - if m != nil { - return m.EntityIdArray - } - return nil -} - -type DeleteByIDParam struct { - CollectionName string `protobuf:"bytes,1,opt,name=collection_name,json=collectionName" json:"collection_name,omitempty"` - IdArray []int64 `protobuf:"varint,2,rep,packed,name=id_array,json=idArray" json:"id_array,omitempty"` - Timestamp []int64 `protobuf:"varint,3,rep,packed,name=timestamp" json:"timestamp,omitempty"` - GrpcServerClientId int64 `protobuf:"varint,4,opt,name=grpc_server_client_id,json=grpcServerClientId" json:"grpc_server_client_id,omitempty"` -} - -func (m *DeleteByIDParam) Reset() { *m = DeleteByIDParam{} } -func (m *DeleteByIDParam) String() string { return proto.CompactTextString(m) } -func (*DeleteByIDParam) ProtoMessage() {} -func (*DeleteByIDParam) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{10} } - -func (m *DeleteByIDParam) GetCollectionName() string { - if m != nil { - return m.CollectionName - } - return "" -} - -func (m *DeleteByIDParam) GetIdArray() []int64 { - if m != nil { - return m.IdArray - } - return nil -} - -func (m *DeleteByIDParam) GetTimestamp() []int64 { - if m != nil { - return m.Timestamp - } - return nil -} - -func (m *DeleteByIDParam) GetGrpcServerClientId() int64 { - if m != nil { - return m.GrpcServerClientId - } - return 0 -} - -type SearchParam struct { - CollectionName string `protobuf:"bytes,1,opt,name=collection_name,json=collectionName" json:"collection_name,omitempty"` - PartitionTagArray []string `protobuf:"bytes,2,rep,name=partition_tag_array,json=partitionTagArray" json:"partition_tag_array,omitempty"` - VectorParam []*VectorParam `protobuf:"bytes,3,rep,name=vector_param,json=vectorParam" json:"vector_param,omitempty"` - Dsl string `protobuf:"bytes,4,opt,name=dsl" json:"dsl,omitempty"` - Timestamp []int64 `protobuf:"varint,5,rep,packed,name=timestamp" json:"timestamp,omitempty"` - GrpcServerClientId int64 `protobuf:"varint,6,opt,name=grpc_server_client_id,json=grpcServerClientId" json:"grpc_server_client_id,omitempty"` - ExtraParams []*KeyValuePair `protobuf:"bytes,7,rep,name=extra_params,json=extraParams" json:"extra_params,omitempty"` -} - -func (m *SearchParam) Reset() { *m = SearchParam{} } -func (m *SearchParam) String() string { return proto.CompactTextString(m) } -func (*SearchParam) ProtoMessage() {} -func (*SearchParam) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{11} } - -func (m *SearchParam) GetCollectionName() string { - if m != nil { - return m.CollectionName - } - return "" -} - -func (m *SearchParam) GetPartitionTagArray() []string { - if m != nil { - return m.PartitionTagArray - } - return nil -} - -func (m *SearchParam) GetVectorParam() []*VectorParam { +func (m *PulsarMessage) GetVectorParam() *VectorParam { if m != nil { return m.VectorParam } return nil } -func (m *SearchParam) GetDsl() string { +func (m *PulsarMessage) GetSegments() *SegmentRecord { if m != nil { - return m.Dsl + return m.Segments + } + return nil +} + +func (m *PulsarMessage) GetTimestamp() int64 { + if m != nil { + return m.Timestamp + } + return 0 +} + +func (m *PulsarMessage) GetClientId() int64 { + if m != nil { + return m.ClientId + } + return 0 +} + +func (m *PulsarMessage) GetMsgType() OpType { + if m != nil { + return m.MsgType + } + return OpType_Insert +} + +type PulsarMessages struct { + CollectionName string `protobuf:"bytes,1,opt,name=collection_name,json=collectionName" json:"collection_name,omitempty"` + Fields []*FieldValue `protobuf:"bytes,2,rep,name=fields" json:"fields,omitempty"` + EntityId []int64 `protobuf:"varint,3,rep,packed,name=entity_id,json=entityId" json:"entity_id,omitempty"` + PartitionTag string `protobuf:"bytes,4,opt,name=partition_tag,json=partitionTag" json:"partition_tag,omitempty"` + VectorParam []*VectorParam `protobuf:"bytes,5,rep,name=vector_param,json=vectorParam" json:"vector_param,omitempty"` + Segments []*SegmentRecord `protobuf:"bytes,6,rep,name=segments" json:"segments,omitempty"` + Timestamp []int64 `protobuf:"varint,7,rep,packed,name=timestamp" json:"timestamp,omitempty"` + ClientId []int64 `protobuf:"varint,8,rep,packed,name=client_id,json=clientId" json:"client_id,omitempty"` + MsgType OpType `protobuf:"varint,9,opt,name=msg_type,json=msgType,enum=pb.OpType" json:"msg_type,omitempty"` +} + +func (m *PulsarMessages) Reset() { *m = PulsarMessages{} } +func (m *PulsarMessages) String() string { return proto.CompactTextString(m) } +func (*PulsarMessages) ProtoMessage() {} +func (*PulsarMessages) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{8} } + +func (m *PulsarMessages) GetCollectionName() string { + if m != nil { + return m.CollectionName } return "" } -func (m *SearchParam) GetTimestamp() []int64 { +func (m *PulsarMessages) GetFields() []*FieldValue { + if m != nil { + return m.Fields + } + return nil +} + +func (m *PulsarMessages) GetEntityId() []int64 { + if m != nil { + return m.EntityId + } + return nil +} + +func (m *PulsarMessages) GetPartitionTag() string { + if m != nil { + return m.PartitionTag + } + return "" +} + +func (m *PulsarMessages) GetVectorParam() []*VectorParam { + if m != nil { + return m.VectorParam + } + return nil +} + +func (m *PulsarMessages) GetSegments() []*SegmentRecord { + if m != nil { + return m.Segments + } + return nil +} + +func (m *PulsarMessages) GetTimestamp() []int64 { if m != nil { return m.Timestamp } return nil } -func (m *SearchParam) GetGrpcServerClientId() int64 { +func (m *PulsarMessages) GetClientId() []int64 { if m != nil { - return m.GrpcServerClientId - } - return 0 -} - -func (m *SearchParam) GetExtraParams() []*KeyValuePair { - if m != nil { - return m.ExtraParams + return m.ClientId } return nil } -type QueryResult struct { - Status *Status `protobuf:"bytes,1,opt,name=status" json:"status,omitempty"` - Entities *Entities `protobuf:"bytes,2,opt,name=entities" json:"entities,omitempty"` - RowNum int64 `protobuf:"varint,3,opt,name=row_num,json=rowNum" json:"row_num,omitempty"` - Scores []float32 `protobuf:"fixed32,4,rep,packed,name=scores" json:"scores,omitempty"` - Distances []float32 `protobuf:"fixed32,5,rep,packed,name=distances" json:"distances,omitempty"` - ExtraParams []*KeyValuePair `protobuf:"bytes,6,rep,name=extra_params,json=extraParams" json:"extra_params,omitempty"` -} - -func (m *QueryResult) Reset() { *m = QueryResult{} } -func (m *QueryResult) String() string { return proto.CompactTextString(m) } -func (*QueryResult) ProtoMessage() {} -func (*QueryResult) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{12} } - -func (m *QueryResult) GetStatus() *Status { +func (m *PulsarMessages) GetMsgType() OpType { if m != nil { - return m.Status + return m.MsgType } - return nil -} - -func (m *QueryResult) GetEntities() *Entities { - if m != nil { - return m.Entities - } - return nil -} - -func (m *QueryResult) GetRowNum() int64 { - if m != nil { - return m.RowNum - } - return 0 -} - -func (m *QueryResult) GetScores() []float32 { - if m != nil { - return m.Scores - } - return nil -} - -func (m *QueryResult) GetDistances() []float32 { - if m != nil { - return m.Distances - } - return nil -} - -func (m *QueryResult) GetExtraParams() []*KeyValuePair { - if m != nil { - return m.ExtraParams - } - return nil + return OpType_Insert } func init() { proto.RegisterType((*Status)(nil), "pb.Status") - proto.RegisterType((*KeyValuePair)(nil), "pb.KeyValuePair") + proto.RegisterType((*SegmentRecord)(nil), "pb.SegmentRecord") proto.RegisterType((*VectorRowRecord)(nil), "pb.VectorRowRecord") proto.RegisterType((*AttrRecord)(nil), "pb.AttrRecord") proto.RegisterType((*VectorRecord)(nil), "pb.VectorRecord") proto.RegisterType((*VectorParam)(nil), "pb.VectorParam") proto.RegisterType((*FieldValue)(nil), "pb.FieldValue") - proto.RegisterType((*Entities)(nil), "pb.Entities") - proto.RegisterType((*InsertParam)(nil), "pb.InsertParam") - proto.RegisterType((*EntityIds)(nil), "pb.EntityIds") - proto.RegisterType((*DeleteByIDParam)(nil), "pb.DeleteByIDParam") - proto.RegisterType((*SearchParam)(nil), "pb.SearchParam") - proto.RegisterType((*QueryResult)(nil), "pb.QueryResult") + proto.RegisterType((*PulsarMessage)(nil), "pb.PulsarMessage") + proto.RegisterType((*PulsarMessages)(nil), "pb.PulsarMessages") proto.RegisterEnum("pb.ErrorCode", ErrorCode_name, ErrorCode_value) proto.RegisterEnum("pb.DataType", DataType_name, DataType_value) + proto.RegisterEnum("pb.OpType", OpType_name, OpType_value) } func init() { proto.RegisterFile("pulsar.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ - // 1248 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x56, 0xdd, 0x72, 0xdb, 0x44, - 0x14, 0xae, 0x2c, 0xc7, 0xb1, 0x8e, 0x9c, 0x64, 0xb3, 0x49, 0x1a, 0x77, 0x0a, 0xd3, 0x60, 0x66, - 0x8a, 0xa7, 0x03, 0xe9, 0xd4, 0x29, 0x19, 0x6e, 0xb8, 0x70, 0xa4, 0x75, 0xab, 0xa9, 0x2c, 0xa5, - 0x2b, 0xb9, 0x3f, 0x57, 0x9a, 0x8d, 0xb5, 0x14, 0x81, 0x6d, 0x79, 0x56, 0x72, 0x82, 0x1f, 0x80, - 0x07, 0x80, 0x47, 0xe0, 0x82, 0x7b, 0x98, 0xe1, 0x79, 0x78, 0x15, 0x66, 0x57, 0x52, 0xec, 0x96, - 0x96, 0x49, 0xef, 0xb8, 0x3b, 0xfb, 0x9d, 0x9f, 0xfd, 0xce, 0x77, 0xd6, 0x47, 0x86, 0xd6, 0x7c, - 0x31, 0xc9, 0x98, 0x38, 0x9e, 0x8b, 0x34, 0x4f, 0x71, 0x6d, 0x7e, 0xd1, 0xf1, 0xa0, 0x11, 0xe4, - 0x2c, 0x5f, 0x64, 0xf8, 0x4b, 0x00, 0x2e, 0x44, 0x2a, 0xa2, 0x71, 0x1a, 0xf3, 0xb6, 0x76, 0xa4, - 0x75, 0xb7, 0x7b, 0x5b, 0xc7, 0xf3, 0x8b, 0x63, 0x22, 0x51, 0x2b, 0x8d, 0x39, 0x35, 0x78, 0x65, - 0xe2, 0xdb, 0xd0, 0x10, 0x9c, 0x65, 0xe9, 0xac, 0x5d, 0x3b, 0xd2, 0xba, 0x06, 0x2d, 0x4f, 0x9d, - 0x53, 0x68, 0x3d, 0xe3, 0xcb, 0x17, 0x6c, 0xb2, 0xe0, 0xe7, 0x2c, 0x11, 0x18, 0x81, 0xfe, 0x23, - 0x5f, 0xaa, 0x72, 0x06, 0x95, 0x26, 0xde, 0x87, 0x8d, 0x4b, 0xe9, 0x2e, 0x13, 0x8b, 0x43, 0xe7, - 0x39, 0xec, 0xbc, 0xe0, 0xe3, 0x3c, 0x15, 0x34, 0xbd, 0xa2, 0x7c, 0x9c, 0x8a, 0x18, 0x7f, 0x0a, - 0xf0, 0xdd, 0x24, 0x65, 0x79, 0x14, 0xb3, 0x9c, 0xb5, 0xb5, 0x23, 0xbd, 0x5b, 0xa3, 0x86, 0x42, - 0x6c, 0x96, 0x33, 0x7c, 0x0f, 0xcc, 0x8b, 0x64, 0xc6, 0xc4, 0xb2, 0xf0, 0xcb, 0x6a, 0x2d, 0x0a, - 0x05, 0x24, 0x03, 0x3a, 0xbf, 0x6a, 0x00, 0xfd, 0x3c, 0x17, 0x65, 0xb9, 0x7b, 0x60, 0x26, 0xb3, - 0xfc, 0xa4, 0x17, 0x15, 0xb7, 0xcb, 0x7a, 0x1b, 0x14, 0x14, 0xa4, 0xe8, 0x96, 0x01, 0xa7, 0x8f, - 0xa3, 0x8a, 0x9e, 0xde, 0xd5, 0x55, 0xc0, 0xe9, 0xe3, 0xeb, 0x80, 0x82, 0x50, 0x11, 0xa0, 0x2b, - 0x46, 0x05, 0xc7, 0x22, 0xe0, 0x33, 0x68, 0xc5, 0xe9, 0xe2, 0x62, 0xc2, 0xcb, 0x88, 0xfa, 0x91, - 0xde, 0xd5, 0xa8, 0x59, 0x60, 0x2a, 0xa4, 0xf3, 0x2d, 0xb4, 0xca, 0x3e, 0x0b, 0x56, 0x5f, 0xc1, - 0xa6, 0x50, 0x56, 0xa6, 0x18, 0x99, 0xbd, 0x3d, 0x29, 0xf9, 0x3b, 0x52, 0xd0, 0x2a, 0xa6, 0x43, - 0xc1, 0x2c, 0x7c, 0xe7, 0x4c, 0xb0, 0x29, 0xc6, 0x50, 0xff, 0x41, 0xce, 0xa0, 0x90, 0x57, 0xd9, - 0xf8, 0x21, 0x80, 0x48, 0xaf, 0xa2, 0x22, 0x43, 0xc9, 0x62, 0xf6, 0xd0, 0x5a, 0xd1, 0xa2, 0xa2, - 0x21, 0xaa, 0xe2, 0x9d, 0xbf, 0x34, 0x80, 0x41, 0xc2, 0x27, 0x71, 0xd1, 0x84, 0x94, 0x5d, 0x9e, - 0xa2, 0x19, 0x9b, 0xf2, 0xb2, 0xb2, 0xa1, 0x10, 0x8f, 0x4d, 0x39, 0x3e, 0x82, 0x7a, 0xbe, 0x9c, - 0x17, 0xd3, 0xdb, 0xee, 0xb5, 0x64, 0x61, 0xa9, 0x76, 0xb8, 0x9c, 0x73, 0xaa, 0x3c, 0xf8, 0x21, - 0x98, 0x2c, 0xcf, 0x45, 0xc5, 0x40, 0x57, 0x0c, 0xb6, 0x65, 0xe0, 0x6a, 0x1a, 0x14, 0xd8, 0x6a, - 0x32, 0x5f, 0xc3, 0xd6, 0xa5, 0xe2, 0x56, 0xa5, 0xd4, 0x3f, 0x40, 0xba, 0x75, 0xb9, 0x76, 0xea, - 0xfc, 0xac, 0x41, 0x93, 0xcc, 0xf2, 0x24, 0x4f, 0x78, 0x86, 0x3b, 0xd0, 0xc8, 0xd4, 0x3b, 0x56, - 0x8c, 0xcd, 0x1e, 0xc8, 0xe4, 0xe2, 0x65, 0xd3, 0xd2, 0x23, 0xdf, 0x62, 0x12, 0x67, 0xe5, 0x60, - 0xa5, 0x89, 0xef, 0x82, 0x71, 0xc9, 0x26, 0x49, 0x1c, 0x89, 0xf4, 0x4a, 0xcd, 0xb3, 0x49, 0x9b, - 0x0a, 0xa0, 0xe9, 0x15, 0xbe, 0x0f, 0x0d, 0xd5, 0x76, 0xa6, 0xe6, 0x58, 0xb6, 0xb0, 0x12, 0x8a, - 0x96, 0xde, 0xce, 0x1f, 0x35, 0x30, 0x9d, 0x59, 0xc6, 0x45, 0x5e, 0x0c, 0xe5, 0x0b, 0xd8, 0x19, - 0xa7, 0x93, 0x09, 0x1f, 0xe7, 0x49, 0x3a, 0x5b, 0x57, 0x71, 0x7b, 0x05, 0x2b, 0x29, 0x57, 0x17, - 0xd4, 0xfe, 0xeb, 0x02, 0x7c, 0x1f, 0x76, 0xb8, 0xec, 0x73, 0x19, 0x25, 0x71, 0xc4, 0x84, 0x60, - 0x4b, 0xc5, 0x55, 0xa7, 0x5b, 0x05, 0xec, 0xc4, 0x7d, 0x09, 0xe2, 0xcf, 0x61, 0x6b, 0xce, 0x84, - 0x14, 0x24, 0x9d, 0x45, 0x39, 0x7b, 0xa3, 0x74, 0x34, 0x68, 0xeb, 0x1a, 0x0c, 0xd9, 0x1b, 0xfc, - 0x09, 0x18, 0x79, 0x32, 0xe5, 0x59, 0xce, 0xa6, 0xf3, 0xf6, 0x86, 0x2a, 0xb3, 0x02, 0xf0, 0x23, - 0x38, 0x78, 0x23, 0xe6, 0xe3, 0x28, 0xe3, 0xe2, 0x92, 0x8b, 0x68, 0x3c, 0x49, 0xf8, 0x2c, 0x8f, - 0x92, 0xb8, 0xdd, 0x38, 0xd2, 0xba, 0x3a, 0xc5, 0xd2, 0x19, 0x28, 0x9f, 0xa5, 0x5c, 0x4e, 0x8c, - 0x4f, 0xa0, 0xc5, 0x7f, 0xca, 0x05, 0x8b, 0xe6, 0xb2, 0xfb, 0xac, 0xbd, 0xa9, 0x7a, 0x51, 0xc3, - 0x5b, 0xdf, 0x04, 0xd4, 0x54, 0x51, 0x4a, 0xa2, 0xac, 0xf3, 0x12, 0x0c, 0x52, 0x72, 0xbf, 0xd9, - 0xec, 0xde, 0xa3, 0x41, 0xed, 0x3d, 0x1a, 0x74, 0x7e, 0xd7, 0x60, 0xc7, 0xe6, 0x13, 0x9e, 0xf3, - 0xb3, 0xa5, 0x63, 0x7f, 0xe4, 0x40, 0xee, 0x40, 0xf3, 0x9d, 0xea, 0x9b, 0x49, 0xa9, 0xed, 0x5b, - 0xb2, 0xe9, 0x37, 0x96, 0xad, 0xfe, 0x21, 0xd9, 0x3a, 0x7f, 0xd6, 0xc0, 0x0c, 0x38, 0x13, 0xe3, - 0xef, 0x3f, 0x92, 0xe4, 0x31, 0xec, 0xbd, 0x35, 0xe5, 0x35, 0xbe, 0x06, 0xdd, 0x5d, 0x9f, 0x75, - 0xc1, 0xbc, 0x07, 0xe5, 0xcf, 0xa6, 0x18, 0x90, 0x22, 0x6f, 0xf6, 0x76, 0x56, 0x3f, 0x2e, 0x75, - 0x3f, 0x35, 0x2f, 0xd7, 0xf6, 0x0a, 0x02, 0x3d, 0xce, 0x26, 0xe5, 0xfb, 0x91, 0xe6, 0xff, 0xe4, - 0xd9, 0xfc, 0xad, 0x81, 0xf9, 0x7c, 0xc1, 0xc5, 0x92, 0xf2, 0x6c, 0x31, 0xc9, 0x6f, 0xf4, 0x72, - 0xba, 0xd0, 0xe4, 0xe5, 0x96, 0x28, 0xb7, 0xa1, 0x5a, 0x5a, 0xd5, 0xe6, 0xa0, 0xd7, 0x5e, 0x7c, - 0x08, 0x9b, 0x72, 0x73, 0xce, 0x16, 0x53, 0xb5, 0xb4, 0x74, 0xda, 0x10, 0xe9, 0x95, 0xb7, 0x98, - 0xca, 0x8f, 0x5d, 0x36, 0x4e, 0x05, 0x2f, 0x36, 0x41, 0x8d, 0x96, 0x27, 0x29, 0x4a, 0x9c, 0x64, - 0x39, 0x9b, 0x8d, 0x79, 0xa6, 0x44, 0xa9, 0xd1, 0x15, 0xf0, 0xaf, 0x0e, 0x1b, 0x37, 0xe8, 0xf0, - 0xc1, 0x6f, 0x75, 0x30, 0xae, 0x3f, 0xb8, 0xd8, 0x84, 0xcd, 0x60, 0x64, 0x59, 0x24, 0x08, 0xd0, - 0x2d, 0xbc, 0x0f, 0x68, 0xe4, 0x91, 0x57, 0xe7, 0xc4, 0x0a, 0x89, 0x1d, 0x11, 0x4a, 0x7d, 0x8a, - 0x34, 0x8c, 0x61, 0xdb, 0xf2, 0x3d, 0x8f, 0x58, 0x61, 0x34, 0xe8, 0x3b, 0x2e, 0xb1, 0x51, 0x0d, - 0x1f, 0xc0, 0xee, 0x39, 0xa1, 0x43, 0x27, 0x08, 0x1c, 0xdf, 0x8b, 0x6c, 0xe2, 0x39, 0xc4, 0x46, - 0x3a, 0xbe, 0x03, 0x07, 0x96, 0xef, 0xba, 0xc4, 0x0a, 0x25, 0xec, 0xf9, 0x61, 0x44, 0x5e, 0x39, - 0x41, 0x18, 0xa0, 0xba, 0xac, 0xed, 0xb8, 0x2e, 0x79, 0xd2, 0x77, 0xa3, 0x3e, 0x7d, 0x32, 0x1a, - 0x12, 0x2f, 0x44, 0x1b, 0xb2, 0x4e, 0x85, 0xda, 0xce, 0x90, 0x78, 0xb2, 0x1c, 0xda, 0xc4, 0xb7, - 0x01, 0x57, 0xb0, 0xe3, 0xd9, 0xe4, 0x55, 0x14, 0xbe, 0x3e, 0x27, 0xa8, 0x89, 0xef, 0xc2, 0x61, - 0x85, 0xaf, 0xdf, 0xd3, 0x1f, 0x12, 0x64, 0x60, 0x04, 0xad, 0xca, 0x19, 0xfa, 0xe7, 0xcf, 0x10, - 0xac, 0x57, 0xa7, 0xfe, 0x4b, 0x4a, 0x2c, 0x9f, 0xda, 0xc8, 0x5c, 0x87, 0x5f, 0x10, 0x2b, 0xf4, - 0x69, 0xe4, 0xd8, 0xa8, 0x25, 0xc9, 0x57, 0x70, 0x40, 0xfa, 0xd4, 0x7a, 0x1a, 0x51, 0x12, 0x8c, - 0xdc, 0x10, 0x6d, 0x49, 0x09, 0x06, 0x8e, 0x4b, 0x54, 0x47, 0x03, 0x7f, 0xe4, 0xd9, 0x68, 0x1b, - 0xef, 0x80, 0x39, 0x24, 0x61, 0xbf, 0xd2, 0x64, 0x47, 0xde, 0x6f, 0xf5, 0xad, 0xa7, 0xa4, 0x42, - 0x10, 0x6e, 0xc3, 0xbe, 0xd5, 0xf7, 0x64, 0x92, 0x45, 0x49, 0x3f, 0x24, 0xd1, 0xc0, 0x77, 0x6d, - 0x42, 0xd1, 0xae, 0x6c, 0xf0, 0x1d, 0x8f, 0xe3, 0x12, 0x84, 0xd7, 0x32, 0x6c, 0xe2, 0x92, 0x55, - 0xc6, 0xde, 0x5a, 0x46, 0xe5, 0x91, 0x19, 0xfb, 0xb2, 0x99, 0xb3, 0x91, 0xe3, 0xda, 0xa5, 0x50, - 0xc5, 0xd0, 0x0e, 0xf0, 0x2e, 0x6c, 0x55, 0xcd, 0x78, 0xae, 0x13, 0x84, 0xe8, 0x36, 0x3e, 0x84, - 0xbd, 0x0a, 0x1a, 0x92, 0x90, 0x3a, 0x56, 0xa1, 0xea, 0xa1, 0x8c, 0xf5, 0x47, 0x61, 0xe4, 0x0f, - 0xa2, 0x21, 0x19, 0xfa, 0xf4, 0x35, 0x6a, 0x3f, 0xf8, 0x45, 0x83, 0x66, 0xf5, 0xd1, 0xc5, 0x4d, - 0xa8, 0x7b, 0xbe, 0x47, 0xd0, 0x2d, 0x69, 0x9d, 0xf9, 0xbe, 0x8b, 0x34, 0x69, 0x39, 0x5e, 0xf8, - 0x0d, 0xaa, 0x61, 0x03, 0x36, 0x1c, 0x2f, 0x7c, 0x74, 0x8a, 0xf4, 0xd2, 0x3c, 0xe9, 0xa1, 0x7a, - 0x69, 0x9e, 0x3e, 0x46, 0x1b, 0xd2, 0x1c, 0xb8, 0x7e, 0x3f, 0x44, 0x80, 0x01, 0x1a, 0xb6, 0x3f, - 0x3a, 0x73, 0x09, 0x32, 0xa5, 0x1d, 0x84, 0xd4, 0xf1, 0x9e, 0xa0, 0x7d, 0xc9, 0xa0, 0x9c, 0xc4, - 0x99, 0xe3, 0xf5, 0xe9, 0x6b, 0x14, 0x4b, 0x35, 0x4b, 0xa8, 0x48, 0xe6, 0x17, 0x0d, 0xf5, 0x9f, - 0xf2, 0xe4, 0x9f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x3c, 0x17, 0xca, 0xe4, 0x63, 0x0a, 0x00, 0x00, + // 1101 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x56, 0x5d, 0x6e, 0xdb, 0x46, + 0x17, 0x0d, 0x45, 0xfd, 0xf1, 0x52, 0x92, 0xc7, 0x13, 0x27, 0x51, 0x90, 0xef, 0x43, 0x54, 0x15, + 0x6d, 0x0d, 0xa3, 0x49, 0x50, 0x25, 0x35, 0xfa, 0xd2, 0x07, 0x9a, 0x1c, 0x25, 0x44, 0x28, 0x52, + 0x1d, 0x52, 0x8e, 0xfd, 0x44, 0xd0, 0xd2, 0x58, 0x65, 0x21, 0x91, 0x02, 0x39, 0x76, 0xa0, 0x65, + 0xb4, 0x4b, 0xe8, 0x1e, 0xba, 0xa6, 0x76, 0x19, 0xc5, 0x0c, 0x49, 0x4b, 0x09, 0xd0, 0x02, 0x29, + 0xda, 0xb7, 0xcb, 0x33, 0xe7, 0xde, 0x39, 0xe7, 0x5c, 0x52, 0x10, 0x74, 0x36, 0x37, 0xab, 0x3c, + 0xca, 0x9e, 0x6f, 0xb2, 0x94, 0xa7, 0xb8, 0xb6, 0xb9, 0x1a, 0xba, 0xd0, 0xf4, 0x79, 0xc4, 0x6f, + 0x72, 0xfc, 0x35, 0x00, 0xcb, 0xb2, 0x34, 0x0b, 0xe7, 0xe9, 0x82, 0xf5, 0x95, 0x81, 0x72, 0xdc, + 0x1b, 0x75, 0x9f, 0x6f, 0xae, 0x9e, 0x13, 0x81, 0x9a, 0xe9, 0x82, 0x51, 0x8d, 0x55, 0x25, 0x7e, + 0x08, 0xcd, 0x8c, 0x45, 0x79, 0x9a, 0xf4, 0x6b, 0x03, 0xe5, 0x58, 0xa3, 0xe5, 0xd3, 0xf0, 0x04, + 0xba, 0x3e, 0x5b, 0xae, 0x59, 0xc2, 0x29, 0x9b, 0xa7, 0xd9, 0x02, 0x3f, 0x86, 0x76, 0xce, 0x96, + 0x61, 0x9c, 0x5c, 0xa7, 0x7d, 0x65, 0xa0, 0x1e, 0x6b, 0xb4, 0x95, 0xb3, 0xa5, 0x9d, 0x5c, 0xa7, + 0xc3, 0x1f, 0xe0, 0xe0, 0x9c, 0xcd, 0x79, 0x9a, 0xd1, 0xf4, 0x7d, 0xc9, 0xfe, 0x3f, 0xc0, 0xf5, + 0x2a, 0x8d, 0x78, 0xb8, 0x88, 0x78, 0x24, 0xf9, 0x35, 0xaa, 0x49, 0xc4, 0x8a, 0x78, 0x84, 0x9f, + 0x82, 0x7e, 0x15, 0x27, 0x51, 0xb6, 0x2d, 0xce, 0xc5, 0xd5, 0x1d, 0x0a, 0x05, 0x24, 0x08, 0xc3, + 0x5f, 0x14, 0x00, 0x83, 0xf3, 0xac, 0x1c, 0xf7, 0x14, 0xf4, 0x38, 0xe1, 0x2f, 0x47, 0xe1, 0x6d, + 0xb4, 0xba, 0x61, 0x72, 0x5e, 0x83, 0x82, 0x84, 0xce, 0x05, 0x52, 0x12, 0x4e, 0x5f, 0x95, 0x84, + 0xda, 0x40, 0x3d, 0x56, 0x25, 0xe1, 0xf4, 0xd5, 0x1d, 0xa1, 0x10, 0x54, 0x10, 0x54, 0xa9, 0xa8, + 0xd0, 0x58, 0x10, 0x3e, 0x83, 0xce, 0x22, 0xbd, 0xb9, 0x5a, 0xb1, 0x92, 0x51, 0x1f, 0xa8, 0xc7, + 0x0a, 0xd5, 0x0b, 0x4c, 0x52, 0x86, 0xdf, 0x43, 0xa7, 0xf4, 0x59, 0xa8, 0x7a, 0x06, 0xad, 0x4c, + 0x56, 0xb9, 0x54, 0xa4, 0x8f, 0xee, 0x8b, 0x98, 0x3f, 0x8a, 0x82, 0x56, 0x9c, 0x21, 0x05, 0xbd, + 0x38, 0x9b, 0x46, 0x59, 0xb4, 0xc6, 0x18, 0xea, 0x3f, 0x89, 0xdc, 0x15, 0x99, 0xbb, 0xac, 0xf1, + 0x0b, 0x80, 0x2c, 0x7d, 0x1f, 0x16, 0x1d, 0x32, 0x16, 0x7d, 0x84, 0xf6, 0x86, 0x16, 0x13, 0xb5, + 0xac, 0x1a, 0x3e, 0xfc, 0x4d, 0x01, 0x18, 0xc7, 0x6c, 0xb5, 0x28, 0x4c, 0x88, 0xd8, 0xc5, 0x53, + 0x98, 0x44, 0x6b, 0x56, 0x4e, 0xd6, 0x24, 0xe2, 0x46, 0x6b, 0x86, 0x07, 0x50, 0xe7, 0xdb, 0x0d, + 0x93, 0x83, 0x7b, 0xa3, 0x8e, 0x18, 0x2c, 0xd2, 0x0e, 0xb6, 0x1b, 0x46, 0xe5, 0x09, 0x7e, 0x01, + 0x7a, 0xc4, 0x79, 0x56, 0x29, 0x50, 0xa5, 0x82, 0x9e, 0x20, 0xee, 0xb6, 0x41, 0x21, 0xda, 0x6d, + 0xe6, 0x5b, 0xe8, 0xde, 0x4a, 0x6d, 0x55, 0x4b, 0xfd, 0x2f, 0x44, 0x77, 0x6e, 0xf7, 0x9e, 0x86, + 0xbf, 0xd7, 0xa0, 0x3b, 0x95, 0xef, 0xf0, 0x84, 0xe5, 0x79, 0xb4, 0x64, 0xf8, 0x2b, 0x38, 0x98, + 0xa7, 0xab, 0x15, 0x9b, 0xf3, 0x38, 0x4d, 0xf6, 0xf5, 0xf7, 0x76, 0xb0, 0x34, 0xf1, 0x25, 0x34, + 0xa5, 0xa3, 0x5c, 0x6e, 0xb9, 0x54, 0xb7, 0xcb, 0x80, 0x96, 0xa7, 0xf8, 0x09, 0x68, 0x2c, 0xe1, + 0x31, 0xdf, 0x86, 0x71, 0x61, 0x44, 0xa5, 0xed, 0x02, 0xb0, 0x17, 0xf8, 0x73, 0xe8, 0x6e, 0xa2, + 0x8c, 0xc7, 0xf2, 0x32, 0x1e, 0x2d, 0xa5, 0x6c, 0x8d, 0x76, 0xee, 0xc0, 0x20, 0x5a, 0xe2, 0x11, + 0x94, 0xa2, 0xc3, 0x8d, 0xd8, 0x58, 0xbf, 0x21, 0xad, 0x1d, 0xec, 0xac, 0xc9, 0x45, 0x52, 0xfd, + 0x76, 0x6f, 0xab, 0xcf, 0xe4, 0x67, 0x22, 0xbe, 0x9b, 0xbc, 0xdf, 0x94, 0xfc, 0x43, 0xc1, 0xff, + 0xe0, 0x5b, 0xa2, 0x77, 0x14, 0xfc, 0x3f, 0xd0, 0x78, 0xbc, 0x66, 0x39, 0x8f, 0xd6, 0x9b, 0x7e, + 0x4b, 0x8a, 0xdc, 0x01, 0xc2, 0xc2, 0x7c, 0x15, 0xb3, 0x84, 0x0b, 0x0b, 0xed, 0xc2, 0x42, 0x01, + 0xd8, 0x0b, 0xfc, 0x05, 0xb4, 0xd7, 0xf9, 0x32, 0x94, 0x0b, 0xd5, 0xe4, 0x42, 0x41, 0xdc, 0xe4, + 0x6d, 0xe4, 0x3a, 0x5b, 0xeb, 0x7c, 0x29, 0x8a, 0xe1, 0x1f, 0x35, 0xe8, 0x7d, 0x90, 0x74, 0xfe, + 0x9f, 0x47, 0xad, 0xfe, 0x1b, 0x51, 0xab, 0x9f, 0x18, 0xb5, 0xfa, 0x89, 0x51, 0xab, 0x7f, 0x1b, + 0xb5, 0xfa, 0x0f, 0xa2, 0x3e, 0xf9, 0xb5, 0x0e, 0xda, 0xdd, 0x8f, 0x2c, 0xd6, 0xa1, 0xe5, 0xcf, + 0x4c, 0x93, 0xf8, 0x3e, 0xba, 0x87, 0x8f, 0x00, 0xcd, 0x5c, 0x72, 0x31, 0x25, 0x66, 0x40, 0xac, + 0x90, 0x50, 0xea, 0x51, 0xa4, 0x60, 0x0c, 0x3d, 0xd3, 0x73, 0x5d, 0x62, 0x06, 0xe1, 0xd8, 0xb0, + 0x1d, 0x62, 0xa1, 0x1a, 0x7e, 0x00, 0x87, 0x53, 0x42, 0x27, 0xb6, 0xef, 0xdb, 0x9e, 0x1b, 0x5a, + 0xc4, 0xb5, 0x89, 0x85, 0x54, 0xfc, 0x18, 0x1e, 0x98, 0x9e, 0xe3, 0x10, 0x33, 0x10, 0xb0, 0xeb, + 0x05, 0x21, 0xb9, 0xb0, 0xfd, 0xc0, 0x47, 0x75, 0x31, 0xdb, 0x76, 0x1c, 0xf2, 0xda, 0x70, 0x42, + 0x83, 0xbe, 0x9e, 0x4d, 0x88, 0x1b, 0xa0, 0x86, 0x98, 0x53, 0xa1, 0x96, 0x3d, 0x21, 0xae, 0x18, + 0x87, 0x5a, 0xf8, 0x21, 0xe0, 0x0a, 0xb6, 0x5d, 0x8b, 0x5c, 0x84, 0xc1, 0xe5, 0x94, 0xa0, 0x36, + 0x7e, 0x02, 0x8f, 0x2a, 0x7c, 0xff, 0x1e, 0x63, 0x42, 0x90, 0x86, 0x11, 0x74, 0xaa, 0xc3, 0xc0, + 0x9b, 0xbe, 0x45, 0xb0, 0x3f, 0x9d, 0x7a, 0xef, 0x28, 0x31, 0x3d, 0x6a, 0x21, 0x7d, 0x1f, 0x3e, + 0x27, 0x66, 0xe0, 0xd1, 0xd0, 0xb6, 0x50, 0x47, 0x88, 0xaf, 0x60, 0x9f, 0x18, 0xd4, 0x7c, 0x13, + 0x52, 0xe2, 0xcf, 0x9c, 0x00, 0x75, 0x45, 0x04, 0x63, 0xdb, 0x21, 0xd2, 0xd1, 0xd8, 0x9b, 0xb9, + 0x16, 0xea, 0xe1, 0x03, 0xd0, 0x27, 0x24, 0x30, 0xaa, 0x4c, 0x0e, 0xc4, 0xfd, 0xa6, 0x61, 0xbe, + 0x21, 0x15, 0x82, 0x70, 0x1f, 0x8e, 0x4c, 0xc3, 0x15, 0x4d, 0x26, 0x25, 0x46, 0x40, 0xc2, 0xb1, + 0xe7, 0x58, 0x84, 0xa2, 0x43, 0x61, 0xf0, 0xa3, 0x13, 0xdb, 0x21, 0x08, 0xef, 0x75, 0x58, 0xc4, + 0x21, 0xbb, 0x8e, 0xfb, 0x7b, 0x1d, 0xd5, 0x89, 0xe8, 0x38, 0x12, 0x66, 0xce, 0x66, 0xb6, 0x63, + 0x95, 0x41, 0x15, 0x4b, 0x7b, 0x80, 0x0f, 0xa1, 0x5b, 0x99, 0x71, 0x1d, 0xdb, 0x0f, 0xd0, 0x43, + 0xfc, 0x08, 0xee, 0x57, 0xd0, 0x84, 0x04, 0xd4, 0x36, 0x8b, 0x54, 0x1f, 0x09, 0xae, 0x37, 0x0b, + 0x42, 0x6f, 0x1c, 0x4e, 0xc8, 0xc4, 0xa3, 0x97, 0xa8, 0x7f, 0xf2, 0xb3, 0x02, 0xed, 0xea, 0x47, + 0x17, 0xb7, 0xa1, 0xee, 0x7a, 0x2e, 0x41, 0xf7, 0x44, 0x75, 0xe6, 0x79, 0x0e, 0x52, 0x44, 0x65, + 0xbb, 0xc1, 0x77, 0xa8, 0x86, 0x35, 0x68, 0xd8, 0x6e, 0xf0, 0xcd, 0x29, 0x52, 0xcb, 0xf2, 0xe5, + 0x08, 0xd5, 0xcb, 0xf2, 0xf4, 0x15, 0x6a, 0x88, 0x72, 0xec, 0x78, 0x46, 0x80, 0x00, 0x03, 0x34, + 0x2d, 0x6f, 0x76, 0xe6, 0x10, 0xa4, 0x8b, 0xda, 0x0f, 0xa8, 0xed, 0xbe, 0x46, 0x47, 0x42, 0x41, + 0xb9, 0x89, 0x33, 0xdb, 0x35, 0xe8, 0x25, 0x5a, 0x88, 0x34, 0x4b, 0xa8, 0x68, 0x66, 0x27, 0xef, + 0xa0, 0x59, 0xbc, 0xcb, 0xa2, 0xd5, 0x4e, 0x72, 0x96, 0x71, 0x74, 0x4f, 0x8e, 0x64, 0x2b, 0xc6, + 0x19, 0x52, 0xe4, 0x48, 0x16, 0x65, 0xf3, 0x1f, 0x51, 0x0d, 0x77, 0xa0, 0x1d, 0xc4, 0x6b, 0xe6, + 0x6f, 0x93, 0x39, 0x52, 0xc5, 0x6b, 0xfe, 0x96, 0x6d, 0x47, 0x3e, 0x5b, 0xa2, 0x3a, 0xee, 0x01, + 0x88, 0x7f, 0x21, 0x71, 0xce, 0xe3, 0x79, 0x8e, 0x1a, 0x57, 0x4d, 0xf9, 0x07, 0xe5, 0xe5, 0x9f, + 0x01, 0x00, 0x00, 0xff, 0xff, 0x27, 0x2a, 0xd3, 0x11, 0xb0, 0x08, 0x00, 0x00, } diff --git a/pulsar/pb/pulsar.proto b/pulsar/pb/pulsar.proto index 4773cfa1aa..adea199110 100644 --- a/pulsar/pb/pulsar.proto +++ b/pulsar/pb/pulsar.proto @@ -52,9 +52,17 @@ enum DataType { VECTOR_FLOAT = 101; } -message KeyValuePair { - string key = 1; - string value = 2; +enum OpType { + Insert = 0; + Delete = 1; + Search = 2; + TimeSync = 3; + Key2Seg = 4; + Statistics = 5; +} + +message SegmentRecord { + repeated string seg_info = 1; } message VectorRowRecord { @@ -85,50 +93,28 @@ message FieldValue { VectorRecord vector_record = 4; } -message Entities { - Status status = 1; - repeated int64 ids = 2; - repeated bool valid_row = 3; - repeated FieldValue fields = 4; -} - -message InsertParam { +message PulsarMessage { string collection_name = 1; repeated FieldValue fields = 2; - repeated int64 entity_id_array = 3; //optional + int64 entity_id = 3; string partition_tag = 4; - repeated int64 timestamp = 5; - int64 grpc_server_client_id = 6; - repeated KeyValuePair extra_params = 7; + VectorParam vector_param =5; + SegmentRecord segments = 6; + int64 timestamp = 7; + int64 client_id = 8; + OpType msg_type = 9; } -message EntityIds { - Status status = 1; - repeated int64 entity_id_array = 2; -} - -message DeleteByIDParam { +message PulsarMessages { string collection_name = 1; - repeated int64 id_array = 2; - repeated int64 timestamp = 3; - int64 grpc_server_client_id = 4; + repeated FieldValue fields = 2; + repeated int64 entity_id = 3; + string partition_tag = 4; + repeated VectorParam vector_param =5; + repeated SegmentRecord segments = 6; + repeated int64 timestamp = 7; + repeated int64 client_id = 8; + OpType msg_type = 9; } -message SearchParam { - string collection_name = 1; - repeated string partition_tag_array = 2; - repeated VectorParam vector_param = 3; - string dsl = 4; - repeated int64 timestamp = 5; - int64 grpc_server_client_id = 6; - repeated KeyValuePair extra_params = 7; -} -message QueryResult { - Status status = 1; - Entities entities = 2; - int64 row_num = 3; - repeated float scores = 4; - repeated float distances = 5; - repeated KeyValuePair extra_params = 6; -} diff --git a/pulsar/query_node.go b/pulsar/query_node.go new file mode 100644 index 0000000000..c52157191a --- /dev/null +++ b/pulsar/query_node.go @@ -0,0 +1,60 @@ +package pulsar + +import ( + "fmt" + "suvlim/pulsar/schema" + "sync" + "time" +) + +type QueryNode struct { + mc MessageClient +} + +func (qn *QueryNode)doQueryNode(wg sync.WaitGroup) { + wg.Add(3) + go qn.insert_query(qn.mc.insertMsg, wg) + go qn.delete_query(qn.mc.deleteMsg, wg) + go qn.search_query(qn.mc.searchMsg, wg) + wg.Wait() +} + + +func (qn *QueryNode) PrepareBatchMsg() { + qn.mc.PrepareBatchMsg(JobType(0)) +} +func main() { + + mc := MessageClient{} + topics := []string{"insert", "delete"} + mc.InitClient("pulsar://localhost:6650", topics) + + go mc.ReceiveMessage() + + qn := QueryNode{mc} + + for { + time.Sleep(200 * time.Millisecond) + qn.PrepareBatchMsg() + qn.doQueryNode(wg) + fmt.Println("do a batch in 200ms") + } +} + +func (qn *QueryNode) insert_query(data []*schema.InsertMsg, wg sync.WaitGroup) schema.Status{ + wg.Done() + return schema.Status{schema.ErrorCode_SUCCESS, ""} +} + +func (qn *QueryNode) delete_query(data []*schema.DeleteMsg, wg sync.WaitGroup) schema.Status{ + wg.Done() + return schema.Status{schema.ErrorCode_SUCCESS, ""} +} + +func (qn *QueryNode) search_query(data []*schema.SearchMsg, wg sync.WaitGroup) schema.Status{ + wg.Done() + return schema.Status{schema.ErrorCode_SUCCESS, ""} +} + + + diff --git a/pulsar/schema/message.go b/pulsar/schema/message.go index 5c4ad56b29..5358c470f6 100644 --- a/pulsar/schema/message.go +++ b/pulsar/schema/message.go @@ -83,6 +83,10 @@ type VectorParam struct { RowRecord *VectorRecord } +type SegmentRecord struct { + segInfo []string +} + type OpType int const ( @@ -92,8 +96,21 @@ const ( TimeSync OpType = 3 Key2Seg OpType = 4 Statistics OpType = 5 + EOF OpType = 6 ) +type PulsarMessage struct { + CollectionName string + Fields []*FieldValue + EntityId int64 + PartitionTag string + VectorParam *VectorParam + Segments []*SegmentRecord + Timestamp int64 + ClientId int64 + MsgType OpType +} + type Message interface { GetType() OpType Serialization() []byte @@ -135,7 +152,7 @@ type TimeSyncMsg struct { type Key2SegMsg struct { EntityId int64 - Segments []string + Segments []*SegmentRecord MsgType OpType } @@ -170,3 +187,7 @@ func (tms *TimeSyncMsg) GetType() OpType { func (kms *Key2SegMsg) GetType() OpType { return kms.MsgType } + +type SyncEofMsg struct { + MsgType OpType +} \ No newline at end of file diff --git a/pulsar/storage_node.go b/pulsar/storage_node.go index 60040dd5d4..357c6abdcb 100644 --- a/pulsar/storage_node.go +++ b/pulsar/storage_node.go @@ -1,27 +1,52 @@ package pulsar import ( + "fmt" "suvlim/pulsar/schema" "sync" + "time" ) -func BeforeSend() schema.Message { - segs := make([]string, 2, 2) - segs[0] = "seg1" - segs[1] = "seg2" - var msg schema.Message = &schema.Key2SegMsg{EntityId: 1, Segments: segs, MsgType: schema.OpType(4)} - return msg +type WriteNode struct { + mc MessageClient } -func insert([]*schema.InsertMsg) schema.Status{ - return schema.Status{schema.ErrorCode_SUCCESS, ""} -} - -var wg sync.WaitGroup -func delete([]*schema.DeleteMsg) schema.Status{ - msg := BeforeSend() - go Send(msg) +func (wn *WriteNode)doWriteNode(wg sync.WaitGroup) { + wg.Add(2) + go wn.insert_write(wn.mc.insertMsg, wg) + go wn.delete_write(wn.mc.deleteMsg, wg) wg.Wait() +} + + +func (wn *WriteNode) PrepareBatchMsg() { + wn.mc.PrepareBatchMsg(JobType(1)) +} +func main() { + + mc := MessageClient{} + topics := []string{"insert", "delete"} + mc.InitClient("pulsar://localhost:6650", topics) + + go mc.ReceiveMessage() + + wn := WriteNode{mc} + + for { + time.Sleep(200 * time.Millisecond) + wn.PrepareBatchMsg() + wn.doWriteNode(wg) + fmt.Println("do a batch in 200ms") + } +} + +func (wn *WriteNode) insert_write(data []*schema.InsertMsg, wg sync.WaitGroup) schema.Status{ + wg.Done() + return schema.Status{schema.ErrorCode_SUCCESS, ""} +} + +func (wn *WriteNode) delete_write(data []*schema.DeleteMsg, wg sync.WaitGroup) schema.Status{ + wg.Done() return schema.Status{schema.ErrorCode_SUCCESS, ""} }