From e13fc08d09d1b9af127bd690f1aa5b0ad9654672 Mon Sep 17 00:00:00 2001 From: xige-16 Date: Fri, 4 Sep 2020 17:52:49 +0800 Subject: [PATCH] Refactor write node using message client Signed-off-by: xige-16 --- go.sum | 3 - util/util.go | 0 writer/client-go/go_client.go | 214 ----- writer/client-go/pb/milvus.proto | 744 ------------------ writer/client-go/pb/pulsar.pb.go | 638 --------------- writer/client-go/pb/pulsar.proto | 120 --- writer/client-go/query_node.go | 71 -- writer/client-go/schema/message.go | 198 ----- writer/client-go/storage_node.go | 56 -- writer/client-go/test/client_test.go | 19 - writer/main.go | 68 ++ writer/message_client/message_client.go | 197 +++++ writer/{client-go => }/pb/build.sh | 0 writer/{client-go => }/pb/suvlim.pb.go | 0 writer/{client-go => }/pb/suvlim.proto | 2 +- .../test/{test_writer.go => insert_test.go} | 48 +- .../{writer.go => write_node/writer_node.go} | 49 +- 17 files changed, 317 insertions(+), 2110 deletions(-) delete mode 100644 util/util.go delete mode 100644 writer/client-go/go_client.go delete mode 100644 writer/client-go/pb/milvus.proto delete mode 100644 writer/client-go/pb/pulsar.pb.go delete mode 100644 writer/client-go/pb/pulsar.proto delete mode 100644 writer/client-go/query_node.go delete mode 100644 writer/client-go/schema/message.go delete mode 100644 writer/client-go/storage_node.go delete mode 100644 writer/client-go/test/client_test.go create mode 100644 writer/main.go create mode 100644 writer/message_client/message_client.go rename writer/{client-go => }/pb/build.sh (100%) rename writer/{client-go => }/pb/suvlim.pb.go (100%) rename writer/{client-go => }/pb/suvlim.proto (99%) rename writer/test/{test_writer.go => insert_test.go} (51%) rename writer/{writer.go => write_node/writer_node.go} (65%) diff --git a/go.sum b/go.sum index c26465eeae..a4b74932cb 100644 --- a/go.sum +++ b/go.sum @@ -44,10 +44,7 @@ github.com/apache/pulsar/pulsar-client-go v0.0.0-20200901051823-800681aaa9af h1: github.com/apache/pulsar/pulsar-client-go v0.0.0-20200901051823-800681aaa9af/go.mod h1:QdYxU2iG99VVU6cvoBRkCgkazfJSL9WwPZ20PZR6aUk= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= -<<<<<<< HEAD github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0= -======= ->>>>>>> 2a377a76d1925a76a011a2364b47f4a199f003bf github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= diff --git a/util/util.go b/util/util.go deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/writer/client-go/go_client.go b/writer/client-go/go_client.go deleted file mode 100644 index d6cfed861f..0000000000 --- a/writer/client-go/go_client.go +++ /dev/null @@ -1,214 +0,0 @@ -package client_go - -import ( - "context" - "github.com/apache/pulsar/pulsar-client-go/pulsar" - "github.com/golang/protobuf/proto" - "log" - "suvlim/pulsar/client-go/pb" - "suvlim/pulsar/client-go/schema" - "sync" -) - -var ( - SyncEofSchema = "{\"type\":\"record\",\"name\":\"suvlim\",\"namespace\":\"pulsar\",\"fields\":[" + - "{\"name\":\"MsgType\",\"type\":\"OpType\"}," + - "]}" -) - -type MessageClient struct { - - //message channel - insertOrDeleteChan chan *pb.InsertOrDeleteMsg - searchChan chan *pb.SearchMsg - timeSyncChan chan *pb.TimeSyncMsg - key2SegChan chan *pb.Key2SegMsg - - // pulsar - client pulsar.Client - key2segProducer pulsar.Producer - writeSyncProducer pulsar.Producer - insertOrDeleteConsumer pulsar.Consumer - searchConsumer pulsar.Consumer - timeSyncConsumer pulsar.Consumer - - // batch messages - InsertOrDeleteMsg []*pb.InsertOrDeleteMsg - SearchMsg []*pb.SearchMsg - timeSyncMsg []*pb.TimeSyncMsg - key2segMsg []*pb.Key2SegMsg -} - -func (mc *MessageClient)ReceiveInsertOrDeleteMsg() { - for { - insetOrDeleteMsg := pb.InsertOrDeleteMsg{} - msg, err := mc.insertOrDeleteConsumer.Receive(context.Background()) - err = msg.GetValue(&insetOrDeleteMsg) - if err != nil { - log.Fatal(err) - } - mc.insertOrDeleteChan <- &insetOrDeleteMsg - } -} - -func (mc *MessageClient)ReceiveSearchMsg() { - for { - searchMsg := pb.SearchMsg{} - msg, err := mc.insertOrDeleteConsumer.Receive(context.Background()) - err = msg.GetValue(&searchMsg) - if err != nil { - log.Fatal(err) - } - mc.searchChan <- &searchMsg - } -} - -func (mc *MessageClient)ReceiveTimeSyncMsg() { - for { - timeSyncMsg := pb.TimeSyncMsg{} - msg, err := mc.insertOrDeleteConsumer.Receive(context.Background()) - err = msg.GetValue(&timeSyncMsg) - if err != nil { - log.Fatal(err) - } - mc.timeSyncChan <- &timeSyncMsg - } -} - -func (mc *MessageClient) ReceiveMessage() { - go mc.ReceiveInsertOrDeleteMsg() - go mc.ReceiveSearchMsg() - go mc.ReceiveTimeSyncMsg() -} - -func (mc *MessageClient) CreatProducer(opType pb.OpType, topicName string) pulsar.Producer{ - producer, err := mc.client.CreateProducer(pulsar.ProducerOptions{ - Topic: topicName, - }) - defer producer.Close() - if err != nil { - log.Fatal(err) - } - proto.Marshal() - return producer -} - -func (mc *MessageClient) CreateConsumer(schemaDef string, topics []string) pulsar.Consumer { - originMsgSchema := pulsar.NewProtoSchema(schemaDef, nil) - consumer, err := mc.client.SubscribeWithSchema(pulsar.ConsumerOptions{ - Topics: topics, - SubscriptionName: "multi-topic-sub", - }, originMsgSchema) - defer consumer.Close() - if err != nil { - log.Fatal(err) - } - return consumer -} - -func (mc *MessageClient) CreateClient(url string) pulsar.Client { - // create client - client, err := pulsar.NewClient(pulsar.ClientOptions{ - URL: url, - }) - defer client.Close() - if err != nil { - log.Fatal(err) - } - return client -} - -func (mc *MessageClient) InitClient(url string, topics []string, consumerMsgSchema string) { - //create client - mc.client = mc.CreateClient(url) - - //create producer - for topicIndex := range topics { - if topics[topicIndex] == "insert" { - mc.key2segProducer = mc.CreatProducer(SyncEofSchema, "insert") - } - if topics[topicIndex] == "delete" { - mc.syncDeleteProducer = mc.CreatProducer(SyncEofSchema, "delete") - } - if topics[topicIndex] == "key2seg" { - mc.syncInsertProducer = mc.CreatProducer(SyncEofSchema, "key2seg") - } - - } - mc.syncInsertProducer = mc.CreatProducer(SyncEofSchema, "insert") - mc.syncDeleteProducer = mc.CreatProducer(SyncEofSchema, "delete") - mc.key2segProducer = mc.CreatProducer(SyncEofSchema, "key2seg") - - //create consumer - mc.consumer = mc.CreateConsumer(consumerMsgSchema, topics) - - // init channel - mc.insertChan = make(chan *schema.InsertMsg, 1000) - mc.deleteChan = make(chan *schema.DeleteMsg, 1000) - mc.searchChan = make(chan *schema.SearchMsg, 1000) - mc.timeSyncChan = make(chan *schema.TimeSyncMsg, 1000) - mc.key2SegChan = make(chan *schema.Key2SegMsg, 1000) -} - -type JobType int -const ( - OpInQueryNode JobType = 0 - OpInWriteNode JobType = 1 -) - -func (mc *MessageClient) PrepareMsg(opType schema.OpType, msgLen int) { - switch opType { - case schema.Insert: - for i := 0; i < msgLen; i++ { - msg := <- mc.insertChan - mc.InsertMsg[i] = msg - } - case schema.Delete: - for i := 0; i < msgLen; i++ { - msg := <- mc.deleteChan - mc.DeleteMsg[i] = msg - } - case schema.Search: - for i := 0; i < msgLen; i++ { - msg := <-mc.searchChan - mc.SearchMsg[i] = msg - } - case schema.TimeSync: - for i := 0; i < msgLen; i++ { - msg := <- mc.timeSyncChan - mc.timeMsg[i] = msg - } - case schema.Key2Seg: - for i := 0; i < msgLen; i++ { - msg := <-mc.key2SegChan - mc.key2segMsg[i] = msg - } - } -} - -func (mc *MessageClient) PrepareBatchMsg(jobType JobType) { - // assume the channel not full - mc.InsertMsg = make([]*schema.InsertMsg, 1000) - mc.DeleteMsg = make([]*schema.DeleteMsg, 1000) - mc.SearchMsg = make([]*schema.SearchMsg, 1000) - mc.timeMsg = make([]*schema.TimeSyncMsg, 1000) - mc.key2segMsg = make([]*schema.Key2SegMsg, 1000) - - // ensure all messages before time in timeSyncTopic have been push into channel - - // get the length of every channel - insertLen := len(mc.insertChan) - deleteLen := len(mc.deleteChan) - searchLen := len(mc.searchChan) - timeLen := len(mc.timeSyncChan) - key2segLen := len(mc.key2SegChan) - - // get message from channel to slice - mc.PrepareMsg(schema.Insert, insertLen) - mc.PrepareMsg(schema.Delete, deleteLen) - mc.PrepareMsg(schema.TimeSync, timeLen) - if jobType == OpInQueryNode { - mc.PrepareMsg(schema.Key2Seg, key2segLen) - mc.PrepareMsg(schema.Search, searchLen) - } -} diff --git a/writer/client-go/pb/milvus.proto b/writer/client-go/pb/milvus.proto deleted file mode 100644 index 6f58ae1fae..0000000000 --- a/writer/client-go/pb/milvus.proto +++ /dev/null @@ -1,744 +0,0 @@ -syntax = "proto3"; - -package pb; - -enum ErrorCode { - SUCCESS = 0; - UNEXPECTED_ERROR = 1; - CONNECT_FAILED = 2; - PERMISSION_DENIED = 3; - COLLECTION_NOT_EXISTS = 4; - ILLEGAL_ARGUMENT = 5; - ILLEGAL_DIMENSION = 7; - ILLEGAL_INDEX_TYPE = 8; - ILLEGAL_COLLECTION_NAME = 9; - ILLEGAL_TOPK = 10; - ILLEGAL_ROWRECORD = 11; - ILLEGAL_VECTOR_ID = 12; - ILLEGAL_SEARCH_RESULT = 13; - FILE_NOT_FOUND = 14; - META_FAILED = 15; - CACHE_FAILED = 16; - CANNOT_CREATE_FOLDER = 17; - CANNOT_CREATE_FILE = 18; - CANNOT_DELETE_FOLDER = 19; - CANNOT_DELETE_FILE = 20; - BUILD_INDEX_ERROR = 21; - ILLEGAL_NLIST = 22; - ILLEGAL_METRIC_TYPE = 23; - OUT_OF_MEMORY = 24; -} - -message Status { - ErrorCode error_code = 1; - string reason = 2; -} - -/** - * @brief Field data type - */ -enum DataType { - NONE = 0; - BOOL = 1; - INT8 = 2; - INT16 = 3; - INT32 = 4; - INT64 = 5; - - FLOAT = 10; - DOUBLE = 11; - - STRING = 20; - - VECTOR_BINARY = 100; - VECTOR_FLOAT = 101; -} - -/** - * @brief General usage - */ -message KeyValuePair { - string key = 1; - string value = 2; -} - -/** - * @brief Collection name - */ -message CollectionName { - string collection_name = 1; -} - -/** - * @brief Collection name list - */ -message CollectionNameList { - Status status = 1; - repeated string collection_names = 2; -} - -/** - * @brief Field name - */ -message FieldName { - string collection_name = 1; - string field_name = 2; -} - -/** - * @brief Collection mapping - * @extra_params: key-value pair for extra parameters of the collection - * typically usage: - * extra_params["params"] = {segment_row_count: 1000000, auto_id: true} - * Note: - * the segment_row_count specify segment row count limit for merging - * the auto_id = true means entity id is auto-generated by milvus - */ -message Mapping { - Status status = 1; - string collection_name = 2; - repeated FieldParam fields = 3; - repeated KeyValuePair extra_params = 4; -} - -/** - * @brief Collection mapping list - */ -message MappingList { - Status status = 1; - repeated Mapping mapping_list = 2; -} - -/** - * @brief Parameters of partition - */ -message PartitionParam { - string collection_name = 1; - string tag = 2; -} - -/** - * @brief Partition list - */ -message PartitionList { - Status status = 1; - repeated string partition_tag_array = 2; -} - -/** - * @brief Vector row record - */ -message VectorRowRecord { - repeated float float_data = 1; //float vector data - bytes binary_data = 2; //binary vector data -} - -/** - * @brief Attribute record - */ -message AttrRecord { - repeated int32 int32_value = 1; - repeated int64 int64_value = 2; - repeated float float_value = 3; - repeated double double_value = 4; -} - -/** - * @brief Vector records - */ -message VectorRecord { - repeated VectorRowRecord records = 1; -} - -/** - * @brief Field values - */ -message FieldValue { - string field_name = 1; - DataType type = 2; - AttrRecord attr_record = 3; - VectorRecord vector_record = 4; -} - -/** - * @brief Parameters for insert action - */ -message InsertParam { - string collection_name = 1; - repeated FieldValue fields = 2; - repeated int64 entity_id_array = 3; //optional - string partition_tag = 4; - repeated KeyValuePair extra_params = 5; -} - -/** - * @brief Entity ids - */ -message EntityIds { - Status status = 1; - repeated int64 entity_id_array = 2; -} - -/** - * @brief Search vector parameters - */ -message VectorParam { - string json = 1; - VectorRecord row_record = 2; -} - -/** - * @brief Parameters for search action - * @dsl example: - * { - * "query": { - * "bool": { - * "must": [ - * { - * "must":[ - * { - * "should": [ - * { - * "term": { - * "gender": ["male"] - * } - * }, - * { - * "range": { - * "height": {"gte": "170.0", "lte": "180.0"} - * } - * } - * ] - * }, - * { - * "must_not": [ - * { - * "term": { - * "age": [20, 21, 22, 23, 24, 25] - * } - * }, - * { - * "Range": { - * "weight": {"lte": "100"} - * } - * } - * ] - * } - * ] - * }, - * { - * "must": [ - * { - * "vector": { - * "face_img": { - * "topk": 10, - * "metric_type": "L2", - * "query": [], - * "params": { - * "nprobe": 10 - * } - * } - * } - * } - * ] - * } - * ] - * } - * }, - * "fields": ["age", "face_img"] - * } - */ -message SearchParam { - string collection_name = 1; - repeated string partition_tag_array = 2; - repeated VectorParam vector_param = 3; - string dsl = 4; - repeated KeyValuePair extra_params = 5; -} - -/** - * @brief Parameters for searching in segments - */ -message SearchInSegmentParam { - repeated string file_id_array = 1; - SearchParam search_param = 2; -} - -/** - * @brief Entities - */ -message Entities { - Status status = 1; - repeated int64 ids = 2; - repeated bool valid_row = 3; - repeated FieldValue fields = 4; -} - -/** - * @brief Query result - */ -message QueryResult { - Status status = 1; - Entities entities = 2; - int64 row_num = 3; - repeated float scores = 4; - repeated float distances = 5; - repeated KeyValuePair extra_params = 6; -} - -/** - * @brief Server string Reply - */ -message StringReply { - Status status = 1; - string string_reply = 2; -} - -/** - * @brief Server bool Reply - */ -message BoolReply { - Status status = 1; - bool bool_reply = 2; -} - -/** - * @brief Return collection row count - */ -message CollectionRowCount { - Status status = 1; - int64 collection_row_count = 2; -} - -/** - * @brief Server command parameters - */ -message Command { - string cmd = 1; -} - -/** - * @brief Index params - * @collection_name: target collection - * @field_name: target field - * @index_name: a name for index provided by user, unique within this field - * @extra_params: index parameters in json format - * for vector field: - * extra_params["index_type"] = one of the values: FLAT, IVF_LAT, IVF_SQ8, NSGMIX, IVFSQ8H, - * PQ, HNSW, HNSW_SQ8NM, ANNOY - * extra_params["metric_type"] = one of the values: L2, IP, HAMMING, JACCARD, TANIMOTO - * SUBSTRUCTURE, SUPERSTRUCTURE - * extra_params["params"] = extra parameters for index, for example ivflat: {nlist: 2048} - * for structured field: - * extra_params["index_type"] = one of the values: SORTED - */ -message IndexParam { - Status status = 1; - string collection_name = 2; - string field_name = 3; - string index_name = 4; - repeated KeyValuePair extra_params = 5; -} - -/** - * @brief Parameters for flush action - */ -message FlushParam { - repeated string collection_name_array = 1; -} - -/** - * @brief Parameters for flush action - */ -message CompactParam { - string collection_name = 1; - double threshold = 2; -} - -/** - * @brief Parameters for deleting entities by id - */ -message DeleteByIDParam { - string collection_name = 1; - repeated int64 id_array = 2; -} - -/** - * @brief Return collection stats - * @json_info: collection stats in json format, typically, the format is like: - * { - * row_count: xxx, - * data_size: xxx, - * partitions: [ - * { - * tag: xxx, - * id: xxx, - * row_count: xxx, - * data_size: xxx, - * segments: [ - * { - * id: xxx, - * row_count: xxx, - * data_size: xxx, - * files: [ - * { - * field: xxx, - * name: xxx, - * index_type: xxx, - * path: xxx, - * data_size: xxx, - * } - * ] - * } - * ] - * } - * ] - * } - */ -message CollectionInfo { - Status status = 1; - string json_info = 2; -} - -/** - * @brief Parameters for returning entities id of a segment - */ -message GetEntityIDsParam { - string collection_name = 1; - int64 segment_id = 2; -} - -/** - * @brief Entities identity - */ -message EntityIdentity { - string collection_name = 1; - repeated int64 id_array = 2; - repeated string field_names = 3; -} - -/********************************************SearchPB interface***************************************************/ -/** - * @brief Vector field parameters - */ -message VectorFieldParam { - int64 dimension = 1; -} - -/** - * @brief Field type - */ -message FieldType { - oneof value { - DataType data_type = 1; - VectorFieldParam vector_param = 2; - } -} - -/** - * @brief Field parameters - */ -message FieldParam { - uint64 id = 1; - string name = 2; - DataType type = 3; - repeated KeyValuePair index_params = 4; - repeated KeyValuePair extra_params = 5; -} - -/** - * @brief Vector field record - */ -message VectorFieldRecord { - repeated VectorRowRecord value = 1; -} - -/////////////////////////////////////////////////////////////////// - -message TermQuery { - string field_name = 1; - repeated int64 int_value = 2; - repeated double double_value = 3; - int64 value_num = 4; - float boost = 5; - repeated KeyValuePair extra_params = 6; -} - -enum CompareOperator { - LT = 0; - LTE = 1; - EQ = 2; - GT = 3; - GTE = 4; - NE = 5; -} - -message CompareExpr { - CompareOperator operator = 1; - string operand = 2; -} - -message RangeQuery { - string field_name = 1; - repeated CompareExpr operand = 2; - float boost = 3; - repeated KeyValuePair extra_params = 4; -} - -message VectorQuery { - string field_name = 1; - float query_boost = 2; - repeated VectorRowRecord records = 3; - int64 topk = 4; - repeated KeyValuePair extra_params = 5; -} - -enum Occur { - INVALID = 0; - MUST = 1; - SHOULD = 2; - MUST_NOT = 3; -} - -message BooleanQuery { - Occur occur = 1; - repeated GeneralQuery general_query = 2; -} - -message GeneralQuery { - oneof query { - BooleanQuery boolean_query = 1; - TermQuery term_query = 2; - RangeQuery range_query = 3; - VectorQuery vector_query = 4; - } -} - -message SearchParamPB { - string collection_name = 1; - repeated string partition_tag_array = 2; - GeneralQuery general_query = 3; - repeated KeyValuePair extra_params = 4; -} - -service MilvusService { - /** - * @brief This method is used to create collection - * - * @param CollectionSchema, use to provide collection information to be created. - * - * @return Status - */ - rpc CreateCollection(Mapping) returns (Status){} - - /** - * @brief This method is used to test collection existence. - * - * @param CollectionName, collection name is going to be tested. - * - * @return BoolReply - */ - rpc HasCollection(CollectionName) returns (BoolReply) {} - - /** - * @brief This method is used to get collection schema. - * - * @param CollectionName, target collection name. - * - * @return CollectionSchema - */ - rpc DescribeCollection(CollectionName) returns (Mapping) {} - - /** - * @brief This method is used to get collection schema. - * - * @param CollectionName, target collection name. - * - * @return CollectionRowCount - */ - rpc CountCollection(CollectionName) returns (CollectionRowCount) {} - - /** - * @brief This method is used to list all collections. - * - * @param Command, dummy parameter. - * - * @return CollectionNameList - */ - rpc ShowCollections(Command) returns (CollectionNameList) {} - - /** - * @brief This method is used to get collection detail information. - * - * @param CollectionName, target collection name. - * - * @return CollectionInfo - */ - rpc ShowCollectionInfo(CollectionName) returns (CollectionInfo) {} - - /** - * @brief This method is used to delete collection. - * - * @param CollectionName, collection name is going to be deleted. - * - * @return Status - */ - rpc DropCollection(CollectionName) returns (Status) {} - - /** - * @brief This method is used to build index by collection in sync mode. - * - * @param IndexParam, index paramters. - * - * @return Status - */ - rpc CreateIndex(IndexParam) returns (Status) {} - - /** - * @brief This method is used to describe index - * - * @param IndexParam, target index. - * - * @return IndexParam - */ - rpc DescribeIndex(IndexParam) returns (IndexParam) {} - - /** - * @brief This method is used to drop index - * - * @param IndexParam, target field. if the IndexParam.field_name is empty, will drop all index of the collection - * - * @return Status - */ - rpc DropIndex(IndexParam) returns (Status) {} - - /** - * @brief This method is used to create partition - * - * @param PartitionParam, partition parameters. - * - * @return Status - */ - rpc CreatePartition(PartitionParam) returns (Status) {} - - /** - * @brief This method is used to test partition existence. - * - * @param PartitionParam, target partition. - * - * @return BoolReply - */ - rpc HasPartition(PartitionParam) returns (BoolReply) {} - - /** - * @brief This method is used to show partition information - * - * @param CollectionName, target collection name. - * - * @return PartitionList - */ - rpc ShowPartitions(CollectionName) returns (PartitionList) {} - - /** - * @brief This method is used to drop partition - * - * @param PartitionParam, target partition. - * - * @return Status - */ - rpc DropPartition(PartitionParam) returns (Status) {} - - /** - * @brief This method is used to add vector array to collection. - * - * @param InsertParam, insert parameters. - * - * @return VectorIds - */ - rpc Insert(InsertParam) returns (EntityIds) {} - - /** - * @brief This method is used to get entities data by id array. - * - * @param EntitiesIdentity, target entity id array. - * - * @return EntitiesData - */ - rpc GetEntityByID(EntityIdentity) returns (Entities) {} - - /** - * @brief This method is used to get vector ids from a segment - * - * @param GetVectorIDsParam, target collection and segment - * - * @return VectorIds - */ - rpc GetEntityIDs(GetEntityIDsParam) returns (EntityIds) {} - - /** - * @brief This method is used to query vector in collection. - * - * @param SearchParam, search parameters. - * - * @return KQueryResult - */ - rpc Search(SearchParam) returns (QueryResult) {} - - /** - * @brief This method is used to query vector in specified files. - * - * @param SearchInSegmentParam, target segments to search. - * - * @return TopKQueryResult - */ - rpc SearchInSegment(SearchInSegmentParam) returns (QueryResult) {} - - /** - * @brief This method is used to give the server status. - * - * @param Command, command string - * - * @return StringReply - */ - rpc Cmd(Command) returns (StringReply) {} - - /** - * @brief This method is used to delete vector by id - * - * @param DeleteByIDParam, delete parameters. - * - * @return status - */ - rpc DeleteByID(DeleteByIDParam) returns (Status) {} - - /** - * @brief This method is used to preload collection - * - * @param CollectionName, target collection name. - * - * @return Status - */ - rpc PreloadCollection(CollectionName) returns (Status) {} - - /** - * @brief This method is used to flush buffer into storage. - * - * @param FlushParam, flush parameters - * - * @return Status - */ - rpc Flush(FlushParam) returns (Status) {} - - /** - * @brief This method is used to compact collection - * - * @param CompactParam, compact parameters - * - * @return Status - */ - rpc Compact(CompactParam) returns (Status) {} - - /********************************New Interface********************************************/ - - rpc SearchPB(SearchParamPB) returns (QueryResult) {} -} diff --git a/writer/client-go/pb/pulsar.pb.go b/writer/client-go/pb/pulsar.pb.go deleted file mode 100644 index 9d941508ba..0000000000 --- a/writer/client-go/pb/pulsar.pb.go +++ /dev/null @@ -1,638 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// source: pulsar.proto - -/* -Package pb is a generated protocol buffer package. - -It is generated from these files: - pulsar.proto - -It has these top-level messages: - Status - SegmentRecord - VectorRowRecord - AttrRecord - VectorRecord - VectorParam - FieldValue - PulsarMessage - PulsarMessages -*/ -package pb - -import proto "github.com/golang/protobuf/proto" -import fmt "fmt" -import math "math" - -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the proto package it is being compiled against. -// A compilation error at this line likely means your copy of the -// proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package - -type ErrorCode int32 - -const ( - ErrorCode_SUCCESS ErrorCode = 0 - ErrorCode_UNEXPECTED_ERROR ErrorCode = 1 - ErrorCode_CONNECT_FAILED ErrorCode = 2 - ErrorCode_PERMISSION_DENIED ErrorCode = 3 - ErrorCode_COLLECTION_NOT_EXISTS ErrorCode = 4 - ErrorCode_ILLEGAL_ARGUMENT ErrorCode = 5 - ErrorCode_ILLEGAL_DIMENSION ErrorCode = 7 - ErrorCode_ILLEGAL_INDEX_TYPE ErrorCode = 8 - ErrorCode_ILLEGAL_COLLECTION_NAME ErrorCode = 9 - ErrorCode_ILLEGAL_TOPK ErrorCode = 10 - ErrorCode_ILLEGAL_ROWRECORD ErrorCode = 11 - ErrorCode_ILLEGAL_VECTOR_ID ErrorCode = 12 - ErrorCode_ILLEGAL_SEARCH_RESULT ErrorCode = 13 - ErrorCode_FILE_NOT_FOUND ErrorCode = 14 - ErrorCode_META_FAILED ErrorCode = 15 - ErrorCode_CACHE_FAILED ErrorCode = 16 - ErrorCode_CANNOT_CREATE_FOLDER ErrorCode = 17 - ErrorCode_CANNOT_CREATE_FILE ErrorCode = 18 - ErrorCode_CANNOT_DELETE_FOLDER ErrorCode = 19 - ErrorCode_CANNOT_DELETE_FILE ErrorCode = 20 - ErrorCode_BUILD_INDEX_ERROR ErrorCode = 21 - ErrorCode_ILLEGAL_NLIST ErrorCode = 22 - ErrorCode_ILLEGAL_METRIC_TYPE ErrorCode = 23 - ErrorCode_OUT_OF_MEMORY ErrorCode = 24 -) - -var ErrorCode_name = map[int32]string{ - 0: "SUCCESS", - 1: "UNEXPECTED_ERROR", - 2: "CONNECT_FAILED", - 3: "PERMISSION_DENIED", - 4: "COLLECTION_NOT_EXISTS", - 5: "ILLEGAL_ARGUMENT", - 7: "ILLEGAL_DIMENSION", - 8: "ILLEGAL_INDEX_TYPE", - 9: "ILLEGAL_COLLECTION_NAME", - 10: "ILLEGAL_TOPK", - 11: "ILLEGAL_ROWRECORD", - 12: "ILLEGAL_VECTOR_ID", - 13: "ILLEGAL_SEARCH_RESULT", - 14: "FILE_NOT_FOUND", - 15: "META_FAILED", - 16: "CACHE_FAILED", - 17: "CANNOT_CREATE_FOLDER", - 18: "CANNOT_CREATE_FILE", - 19: "CANNOT_DELETE_FOLDER", - 20: "CANNOT_DELETE_FILE", - 21: "BUILD_INDEX_ERROR", - 22: "ILLEGAL_NLIST", - 23: "ILLEGAL_METRIC_TYPE", - 24: "OUT_OF_MEMORY", -} -var ErrorCode_value = map[string]int32{ - "SUCCESS": 0, - "UNEXPECTED_ERROR": 1, - "CONNECT_FAILED": 2, - "PERMISSION_DENIED": 3, - "COLLECTION_NOT_EXISTS": 4, - "ILLEGAL_ARGUMENT": 5, - "ILLEGAL_DIMENSION": 7, - "ILLEGAL_INDEX_TYPE": 8, - "ILLEGAL_COLLECTION_NAME": 9, - "ILLEGAL_TOPK": 10, - "ILLEGAL_ROWRECORD": 11, - "ILLEGAL_VECTOR_ID": 12, - "ILLEGAL_SEARCH_RESULT": 13, - "FILE_NOT_FOUND": 14, - "META_FAILED": 15, - "CACHE_FAILED": 16, - "CANNOT_CREATE_FOLDER": 17, - "CANNOT_CREATE_FILE": 18, - "CANNOT_DELETE_FOLDER": 19, - "CANNOT_DELETE_FILE": 20, - "BUILD_INDEX_ERROR": 21, - "ILLEGAL_NLIST": 22, - "ILLEGAL_METRIC_TYPE": 23, - "OUT_OF_MEMORY": 24, -} - -func (x ErrorCode) String() string { - return proto.EnumName(ErrorCode_name, int32(x)) -} -func (ErrorCode) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } - -type DataType int32 - -const ( - DataType_NONE DataType = 0 - DataType_BOOL DataType = 1 - DataType_INT8 DataType = 2 - DataType_INT16 DataType = 3 - DataType_INT32 DataType = 4 - DataType_INT64 DataType = 5 - DataType_FLOAT DataType = 10 - DataType_DOUBLE DataType = 11 - DataType_STRING DataType = 20 - DataType_VECTOR_BINARY DataType = 100 - DataType_VECTOR_FLOAT DataType = 101 -) - -var DataType_name = map[int32]string{ - 0: "NONE", - 1: "BOOL", - 2: "INT8", - 3: "INT16", - 4: "INT32", - 5: "INT64", - 10: "FLOAT", - 11: "DOUBLE", - 20: "STRING", - 100: "VECTOR_BINARY", - 101: "VECTOR_FLOAT", -} -var DataType_value = map[string]int32{ - "NONE": 0, - "BOOL": 1, - "INT8": 2, - "INT16": 3, - "INT32": 4, - "INT64": 5, - "FLOAT": 10, - "DOUBLE": 11, - "STRING": 20, - "VECTOR_BINARY": 100, - "VECTOR_FLOAT": 101, -} - -func (x DataType) String() string { - return proto.EnumName(DataType_name, int32(x)) -} -func (DataType) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } - -type OpType int32 - -const ( - OpType_Insert OpType = 0 - OpType_Delete OpType = 1 - OpType_Search OpType = 2 - OpType_TimeSync OpType = 3 - OpType_Key2Seg OpType = 4 - OpType_Statistics OpType = 5 -) - -var OpType_name = map[int32]string{ - 0: "Insert", - 1: "Delete", - 2: "Search", - 3: "TimeSync", - 4: "Key2Seg", - 5: "Statistics", -} -var OpType_value = map[string]int32{ - "Insert": 0, - "Delete": 1, - "Search": 2, - "TimeSync": 3, - "Key2Seg": 4, - "Statistics": 5, -} - -func (x OpType) String() string { - return proto.EnumName(OpType_name, int32(x)) -} -func (OpType) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } - -type Status struct { - ErrorCode ErrorCode `protobuf:"varint,1,opt,name=error_code,json=errorCode,enum=pb.ErrorCode" json:"error_code,omitempty"` - Reason string `protobuf:"bytes,2,opt,name=reason" json:"reason,omitempty"` -} - -func (m *Status) Reset() { *m = Status{} } -func (m *Status) String() string { return proto.CompactTextString(m) } -func (*Status) ProtoMessage() {} -func (*Status) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } - -func (m *Status) GetErrorCode() ErrorCode { - if m != nil { - return m.ErrorCode - } - return ErrorCode_SUCCESS -} - -func (m *Status) GetReason() string { - if m != nil { - return m.Reason - } - return "" -} - -type SegmentRecord struct { - SegInfo []string `protobuf:"bytes,1,rep,name=seg_info,json=segInfo" json:"seg_info,omitempty"` -} - -func (m *SegmentRecord) Reset() { *m = SegmentRecord{} } -func (m *SegmentRecord) String() string { return proto.CompactTextString(m) } -func (*SegmentRecord) ProtoMessage() {} -func (*SegmentRecord) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } - -func (m *SegmentRecord) GetSegInfo() []string { - if m != nil { - return m.SegInfo - } - return nil -} - -type VectorRowRecord struct { - FloatData []float32 `protobuf:"fixed32,1,rep,packed,name=float_data,json=floatData" json:"float_data,omitempty"` - BinaryData []byte `protobuf:"bytes,2,opt,name=binary_data,json=binaryData,proto3" json:"binary_data,omitempty"` -} - -func (m *VectorRowRecord) Reset() { *m = VectorRowRecord{} } -func (m *VectorRowRecord) String() string { return proto.CompactTextString(m) } -func (*VectorRowRecord) ProtoMessage() {} -func (*VectorRowRecord) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } - -func (m *VectorRowRecord) GetFloatData() []float32 { - if m != nil { - return m.FloatData - } - return nil -} - -func (m *VectorRowRecord) GetBinaryData() []byte { - if m != nil { - return m.BinaryData - } - return nil -} - -type AttrRecord struct { - Int32Value []int32 `protobuf:"varint,1,rep,packed,name=int32_value,json=int32Value" json:"int32_value,omitempty"` - Int64Value []int64 `protobuf:"varint,2,rep,packed,name=int64_value,json=int64Value" json:"int64_value,omitempty"` - FloatValue []float32 `protobuf:"fixed32,3,rep,packed,name=float_value,json=floatValue" json:"float_value,omitempty"` - DoubleValue []float64 `protobuf:"fixed64,4,rep,packed,name=double_value,json=doubleValue" json:"double_value,omitempty"` -} - -func (m *AttrRecord) Reset() { *m = AttrRecord{} } -func (m *AttrRecord) String() string { return proto.CompactTextString(m) } -func (*AttrRecord) ProtoMessage() {} -func (*AttrRecord) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } - -func (m *AttrRecord) GetInt32Value() []int32 { - if m != nil { - return m.Int32Value - } - return nil -} - -func (m *AttrRecord) GetInt64Value() []int64 { - if m != nil { - return m.Int64Value - } - return nil -} - -func (m *AttrRecord) GetFloatValue() []float32 { - if m != nil { - return m.FloatValue - } - return nil -} - -func (m *AttrRecord) GetDoubleValue() []float64 { - if m != nil { - return m.DoubleValue - } - return nil -} - -type VectorRecord struct { - Records []*VectorRowRecord `protobuf:"bytes,1,rep,name=records" json:"records,omitempty"` -} - -func (m *VectorRecord) Reset() { *m = VectorRecord{} } -func (m *VectorRecord) String() string { return proto.CompactTextString(m) } -func (*VectorRecord) ProtoMessage() {} -func (*VectorRecord) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} } - -func (m *VectorRecord) GetRecords() []*VectorRowRecord { - if m != nil { - return m.Records - } - return nil -} - -type VectorParam struct { - Json string `protobuf:"bytes,1,opt,name=json" json:"json,omitempty"` - RowRecord *VectorRecord `protobuf:"bytes,2,opt,name=row_record,json=rowRecord" json:"row_record,omitempty"` -} - -func (m *VectorParam) Reset() { *m = VectorParam{} } -func (m *VectorParam) String() string { return proto.CompactTextString(m) } -func (*VectorParam) ProtoMessage() {} -func (*VectorParam) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} } - -func (m *VectorParam) GetJson() string { - if m != nil { - return m.Json - } - return "" -} - -func (m *VectorParam) GetRowRecord() *VectorRecord { - if m != nil { - return m.RowRecord - } - return nil -} - -type FieldValue struct { - FieldName string `protobuf:"bytes,1,opt,name=field_name,json=fieldName" json:"field_name,omitempty"` - Type DataType `protobuf:"varint,2,opt,name=type,enum=pb.DataType" json:"type,omitempty"` - AttrRecord *AttrRecord `protobuf:"bytes,3,opt,name=attr_record,json=attrRecord" json:"attr_record,omitempty"` - VectorRecord *VectorRecord `protobuf:"bytes,4,opt,name=vector_record,json=vectorRecord" json:"vector_record,omitempty"` -} - -func (m *FieldValue) Reset() { *m = FieldValue{} } -func (m *FieldValue) String() string { return proto.CompactTextString(m) } -func (*FieldValue) ProtoMessage() {} -func (*FieldValue) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} } - -func (m *FieldValue) GetFieldName() string { - if m != nil { - return m.FieldName - } - return "" -} - -func (m *FieldValue) GetType() DataType { - if m != nil { - return m.Type - } - return DataType_NONE -} - -func (m *FieldValue) GetAttrRecord() *AttrRecord { - if m != nil { - return m.AttrRecord - } - return nil -} - -func (m *FieldValue) GetVectorRecord() *VectorRecord { - if m != nil { - return m.VectorRecord - } - return nil -} - -type PulsarMessage struct { - CollectionName string `protobuf:"bytes,1,opt,name=collection_name,json=collectionName" json:"collection_name,omitempty"` - Fields []*FieldValue `protobuf:"bytes,2,rep,name=fields" json:"fields,omitempty"` - EntityId int64 `protobuf:"varint,3,opt,name=entity_id,json=entityId" json:"entity_id,omitempty"` - PartitionTag string `protobuf:"bytes,4,opt,name=partition_tag,json=partitionTag" json:"partition_tag,omitempty"` - VectorParam *VectorParam `protobuf:"bytes,5,opt,name=vector_param,json=vectorParam" json:"vector_param,omitempty"` - Segments *SegmentRecord `protobuf:"bytes,6,opt,name=segments" json:"segments,omitempty"` - Timestamp int64 `protobuf:"varint,7,opt,name=timestamp" json:"timestamp,omitempty"` - ClientId int64 `protobuf:"varint,8,opt,name=client_id,json=clientId" json:"client_id,omitempty"` - MsgType OpType `protobuf:"varint,9,opt,name=msg_type,json=msgType,enum=pb.OpType" json:"msg_type,omitempty"` -} - -func (m *PulsarMessage) Reset() { *m = PulsarMessage{} } -func (m *PulsarMessage) String() string { return proto.CompactTextString(m) } -func (*PulsarMessage) ProtoMessage() {} -func (*PulsarMessage) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} } - -func (m *PulsarMessage) GetCollectionName() string { - if m != nil { - return m.CollectionName - } - return "" -} - -func (m *PulsarMessage) GetFields() []*FieldValue { - if m != nil { - return m.Fields - } - return nil -} - -func (m *PulsarMessage) GetEntityId() int64 { - if m != nil { - return m.EntityId - } - return 0 -} - -func (m *PulsarMessage) GetPartitionTag() string { - if m != nil { - return m.PartitionTag - } - return "" -} - -func (m *PulsarMessage) GetVectorParam() *VectorParam { - if m != nil { - return m.VectorParam - } - return nil -} - -func (m *PulsarMessage) GetSegments() *SegmentRecord { - if m != nil { - return m.Segments - } - return nil -} - -func (m *PulsarMessage) GetTimestamp() int64 { - if m != nil { - return m.Timestamp - } - return 0 -} - -func (m *PulsarMessage) GetClientId() int64 { - if m != nil { - return m.ClientId - } - return 0 -} - -func (m *PulsarMessage) GetMsgType() OpType { - if m != nil { - return m.MsgType - } - return OpType_Insert -} - -type PulsarMessages struct { - CollectionName string `protobuf:"bytes,1,opt,name=collection_name,json=collectionName" json:"collection_name,omitempty"` - Fields []*FieldValue `protobuf:"bytes,2,rep,name=fields" json:"fields,omitempty"` - EntityId []int64 `protobuf:"varint,3,rep,packed,name=entity_id,json=entityId" json:"entity_id,omitempty"` - PartitionTag string `protobuf:"bytes,4,opt,name=partition_tag,json=partitionTag" json:"partition_tag,omitempty"` - VectorParam []*VectorParam `protobuf:"bytes,5,rep,name=vector_param,json=vectorParam" json:"vector_param,omitempty"` - Segments []*SegmentRecord `protobuf:"bytes,6,rep,name=segments" json:"segments,omitempty"` - Timestamp []int64 `protobuf:"varint,7,rep,packed,name=timestamp" json:"timestamp,omitempty"` - ClientId []int64 `protobuf:"varint,8,rep,packed,name=client_id,json=clientId" json:"client_id,omitempty"` - MsgType OpType `protobuf:"varint,9,opt,name=msg_type,json=msgType,enum=pb.OpType" json:"msg_type,omitempty"` -} - -func (m *PulsarMessages) Reset() { *m = PulsarMessages{} } -func (m *PulsarMessages) String() string { return proto.CompactTextString(m) } -func (*PulsarMessages) ProtoMessage() {} -func (*PulsarMessages) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{8} } - -func (m *PulsarMessages) GetCollectionName() string { - if m != nil { - return m.CollectionName - } - return "" -} - -func (m *PulsarMessages) GetFields() []*FieldValue { - if m != nil { - return m.Fields - } - return nil -} - -func (m *PulsarMessages) GetEntityId() []int64 { - if m != nil { - return m.EntityId - } - return nil -} - -func (m *PulsarMessages) GetPartitionTag() string { - if m != nil { - return m.PartitionTag - } - return "" -} - -func (m *PulsarMessages) GetVectorParam() []*VectorParam { - if m != nil { - return m.VectorParam - } - return nil -} - -func (m *PulsarMessages) GetSegments() []*SegmentRecord { - if m != nil { - return m.Segments - } - return nil -} - -func (m *PulsarMessages) GetTimestamp() []int64 { - if m != nil { - return m.Timestamp - } - return nil -} - -func (m *PulsarMessages) GetClientId() []int64 { - if m != nil { - return m.ClientId - } - return nil -} - -func (m *PulsarMessages) GetMsgType() OpType { - if m != nil { - return m.MsgType - } - return OpType_Insert -} - -func init() { - proto.RegisterType((*Status)(nil), "pb.Status") - proto.RegisterType((*SegmentRecord)(nil), "pb.SegmentRecord") - proto.RegisterType((*VectorRowRecord)(nil), "pb.VectorRowRecord") - proto.RegisterType((*AttrRecord)(nil), "pb.AttrRecord") - proto.RegisterType((*VectorRecord)(nil), "pb.VectorRecord") - proto.RegisterType((*VectorParam)(nil), "pb.VectorParam") - proto.RegisterType((*FieldValue)(nil), "pb.FieldValue") - proto.RegisterType((*PulsarMessage)(nil), "pb.PulsarMessage") - proto.RegisterType((*PulsarMessages)(nil), "pb.PulsarMessages") - proto.RegisterEnum("pb.ErrorCode", ErrorCode_name, ErrorCode_value) - proto.RegisterEnum("pb.DataType", DataType_name, DataType_value) - proto.RegisterEnum("pb.OpType", OpType_name, OpType_value) -} - -func init() { proto.RegisterFile("pulsar.proto", fileDescriptor0) } - -var fileDescriptor0 = []byte{ - // 1101 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x56, 0x5d, 0x6e, 0xdb, 0x46, - 0x17, 0x0d, 0x45, 0xfd, 0xf1, 0x52, 0x92, 0xc7, 0x13, 0x27, 0x51, 0x90, 0xef, 0x43, 0x54, 0x15, - 0x6d, 0x0d, 0xa3, 0x49, 0x50, 0x25, 0x35, 0xfa, 0xd2, 0x07, 0x9a, 0x1c, 0x25, 0x44, 0x28, 0x52, - 0x1d, 0x52, 0x8e, 0xfd, 0x44, 0xd0, 0xd2, 0x58, 0x65, 0x21, 0x91, 0x02, 0x39, 0x76, 0xa0, 0x65, - 0xb4, 0x4b, 0xe8, 0x1e, 0xba, 0xa6, 0x76, 0x19, 0xc5, 0x0c, 0x49, 0x4b, 0x09, 0xd0, 0x02, 0x29, - 0xda, 0xb7, 0xcb, 0x33, 0xe7, 0xde, 0x39, 0xe7, 0x5c, 0x52, 0x10, 0x74, 0x36, 0x37, 0xab, 0x3c, - 0xca, 0x9e, 0x6f, 0xb2, 0x94, 0xa7, 0xb8, 0xb6, 0xb9, 0x1a, 0xba, 0xd0, 0xf4, 0x79, 0xc4, 0x6f, - 0x72, 0xfc, 0x35, 0x00, 0xcb, 0xb2, 0x34, 0x0b, 0xe7, 0xe9, 0x82, 0xf5, 0x95, 0x81, 0x72, 0xdc, - 0x1b, 0x75, 0x9f, 0x6f, 0xae, 0x9e, 0x13, 0x81, 0x9a, 0xe9, 0x82, 0x51, 0x8d, 0x55, 0x25, 0x7e, - 0x08, 0xcd, 0x8c, 0x45, 0x79, 0x9a, 0xf4, 0x6b, 0x03, 0xe5, 0x58, 0xa3, 0xe5, 0xd3, 0xf0, 0x04, - 0xba, 0x3e, 0x5b, 0xae, 0x59, 0xc2, 0x29, 0x9b, 0xa7, 0xd9, 0x02, 0x3f, 0x86, 0x76, 0xce, 0x96, - 0x61, 0x9c, 0x5c, 0xa7, 0x7d, 0x65, 0xa0, 0x1e, 0x6b, 0xb4, 0x95, 0xb3, 0xa5, 0x9d, 0x5c, 0xa7, - 0xc3, 0x1f, 0xe0, 0xe0, 0x9c, 0xcd, 0x79, 0x9a, 0xd1, 0xf4, 0x7d, 0xc9, 0xfe, 0x3f, 0xc0, 0xf5, - 0x2a, 0x8d, 0x78, 0xb8, 0x88, 0x78, 0x24, 0xf9, 0x35, 0xaa, 0x49, 0xc4, 0x8a, 0x78, 0x84, 0x9f, - 0x82, 0x7e, 0x15, 0x27, 0x51, 0xb6, 0x2d, 0xce, 0xc5, 0xd5, 0x1d, 0x0a, 0x05, 0x24, 0x08, 0xc3, - 0x5f, 0x14, 0x00, 0x83, 0xf3, 0xac, 0x1c, 0xf7, 0x14, 0xf4, 0x38, 0xe1, 0x2f, 0x47, 0xe1, 0x6d, - 0xb4, 0xba, 0x61, 0x72, 0x5e, 0x83, 0x82, 0x84, 0xce, 0x05, 0x52, 0x12, 0x4e, 0x5f, 0x95, 0x84, - 0xda, 0x40, 0x3d, 0x56, 0x25, 0xe1, 0xf4, 0xd5, 0x1d, 0xa1, 0x10, 0x54, 0x10, 0x54, 0xa9, 0xa8, - 0xd0, 0x58, 0x10, 0x3e, 0x83, 0xce, 0x22, 0xbd, 0xb9, 0x5a, 0xb1, 0x92, 0x51, 0x1f, 0xa8, 0xc7, - 0x0a, 0xd5, 0x0b, 0x4c, 0x52, 0x86, 0xdf, 0x43, 0xa7, 0xf4, 0x59, 0xa8, 0x7a, 0x06, 0xad, 0x4c, - 0x56, 0xb9, 0x54, 0xa4, 0x8f, 0xee, 0x8b, 0x98, 0x3f, 0x8a, 0x82, 0x56, 0x9c, 0x21, 0x05, 0xbd, - 0x38, 0x9b, 0x46, 0x59, 0xb4, 0xc6, 0x18, 0xea, 0x3f, 0x89, 0xdc, 0x15, 0x99, 0xbb, 0xac, 0xf1, - 0x0b, 0x80, 0x2c, 0x7d, 0x1f, 0x16, 0x1d, 0x32, 0x16, 0x7d, 0x84, 0xf6, 0x86, 0x16, 0x13, 0xb5, - 0xac, 0x1a, 0x3e, 0xfc, 0x4d, 0x01, 0x18, 0xc7, 0x6c, 0xb5, 0x28, 0x4c, 0x88, 0xd8, 0xc5, 0x53, - 0x98, 0x44, 0x6b, 0x56, 0x4e, 0xd6, 0x24, 0xe2, 0x46, 0x6b, 0x86, 0x07, 0x50, 0xe7, 0xdb, 0x0d, - 0x93, 0x83, 0x7b, 0xa3, 0x8e, 0x18, 0x2c, 0xd2, 0x0e, 0xb6, 0x1b, 0x46, 0xe5, 0x09, 0x7e, 0x01, - 0x7a, 0xc4, 0x79, 0x56, 0x29, 0x50, 0xa5, 0x82, 0x9e, 0x20, 0xee, 0xb6, 0x41, 0x21, 0xda, 0x6d, - 0xe6, 0x5b, 0xe8, 0xde, 0x4a, 0x6d, 0x55, 0x4b, 0xfd, 0x2f, 0x44, 0x77, 0x6e, 0xf7, 0x9e, 0x86, - 0xbf, 0xd7, 0xa0, 0x3b, 0x95, 0xef, 0xf0, 0x84, 0xe5, 0x79, 0xb4, 0x64, 0xf8, 0x2b, 0x38, 0x98, - 0xa7, 0xab, 0x15, 0x9b, 0xf3, 0x38, 0x4d, 0xf6, 0xf5, 0xf7, 0x76, 0xb0, 0x34, 0xf1, 0x25, 0x34, - 0xa5, 0xa3, 0x5c, 0x6e, 0xb9, 0x54, 0xb7, 0xcb, 0x80, 0x96, 0xa7, 0xf8, 0x09, 0x68, 0x2c, 0xe1, - 0x31, 0xdf, 0x86, 0x71, 0x61, 0x44, 0xa5, 0xed, 0x02, 0xb0, 0x17, 0xf8, 0x73, 0xe8, 0x6e, 0xa2, - 0x8c, 0xc7, 0xf2, 0x32, 0x1e, 0x2d, 0xa5, 0x6c, 0x8d, 0x76, 0xee, 0xc0, 0x20, 0x5a, 0xe2, 0x11, - 0x94, 0xa2, 0xc3, 0x8d, 0xd8, 0x58, 0xbf, 0x21, 0xad, 0x1d, 0xec, 0xac, 0xc9, 0x45, 0x52, 0xfd, - 0x76, 0x6f, 0xab, 0xcf, 0xe4, 0x67, 0x22, 0xbe, 0x9b, 0xbc, 0xdf, 0x94, 0xfc, 0x43, 0xc1, 0xff, - 0xe0, 0x5b, 0xa2, 0x77, 0x14, 0xfc, 0x3f, 0xd0, 0x78, 0xbc, 0x66, 0x39, 0x8f, 0xd6, 0x9b, 0x7e, - 0x4b, 0x8a, 0xdc, 0x01, 0xc2, 0xc2, 0x7c, 0x15, 0xb3, 0x84, 0x0b, 0x0b, 0xed, 0xc2, 0x42, 0x01, - 0xd8, 0x0b, 0xfc, 0x05, 0xb4, 0xd7, 0xf9, 0x32, 0x94, 0x0b, 0xd5, 0xe4, 0x42, 0x41, 0xdc, 0xe4, - 0x6d, 0xe4, 0x3a, 0x5b, 0xeb, 0x7c, 0x29, 0x8a, 0xe1, 0x1f, 0x35, 0xe8, 0x7d, 0x90, 0x74, 0xfe, - 0x9f, 0x47, 0xad, 0xfe, 0x1b, 0x51, 0xab, 0x9f, 0x18, 0xb5, 0xfa, 0x89, 0x51, 0xab, 0x7f, 0x1b, - 0xb5, 0xfa, 0x0f, 0xa2, 0x3e, 0xf9, 0xb5, 0x0e, 0xda, 0xdd, 0x8f, 0x2c, 0xd6, 0xa1, 0xe5, 0xcf, - 0x4c, 0x93, 0xf8, 0x3e, 0xba, 0x87, 0x8f, 0x00, 0xcd, 0x5c, 0x72, 0x31, 0x25, 0x66, 0x40, 0xac, - 0x90, 0x50, 0xea, 0x51, 0xa4, 0x60, 0x0c, 0x3d, 0xd3, 0x73, 0x5d, 0x62, 0x06, 0xe1, 0xd8, 0xb0, - 0x1d, 0x62, 0xa1, 0x1a, 0x7e, 0x00, 0x87, 0x53, 0x42, 0x27, 0xb6, 0xef, 0xdb, 0x9e, 0x1b, 0x5a, - 0xc4, 0xb5, 0x89, 0x85, 0x54, 0xfc, 0x18, 0x1e, 0x98, 0x9e, 0xe3, 0x10, 0x33, 0x10, 0xb0, 0xeb, - 0x05, 0x21, 0xb9, 0xb0, 0xfd, 0xc0, 0x47, 0x75, 0x31, 0xdb, 0x76, 0x1c, 0xf2, 0xda, 0x70, 0x42, - 0x83, 0xbe, 0x9e, 0x4d, 0x88, 0x1b, 0xa0, 0x86, 0x98, 0x53, 0xa1, 0x96, 0x3d, 0x21, 0xae, 0x18, - 0x87, 0x5a, 0xf8, 0x21, 0xe0, 0x0a, 0xb6, 0x5d, 0x8b, 0x5c, 0x84, 0xc1, 0xe5, 0x94, 0xa0, 0x36, - 0x7e, 0x02, 0x8f, 0x2a, 0x7c, 0xff, 0x1e, 0x63, 0x42, 0x90, 0x86, 0x11, 0x74, 0xaa, 0xc3, 0xc0, - 0x9b, 0xbe, 0x45, 0xb0, 0x3f, 0x9d, 0x7a, 0xef, 0x28, 0x31, 0x3d, 0x6a, 0x21, 0x7d, 0x1f, 0x3e, - 0x27, 0x66, 0xe0, 0xd1, 0xd0, 0xb6, 0x50, 0x47, 0x88, 0xaf, 0x60, 0x9f, 0x18, 0xd4, 0x7c, 0x13, - 0x52, 0xe2, 0xcf, 0x9c, 0x00, 0x75, 0x45, 0x04, 0x63, 0xdb, 0x21, 0xd2, 0xd1, 0xd8, 0x9b, 0xb9, - 0x16, 0xea, 0xe1, 0x03, 0xd0, 0x27, 0x24, 0x30, 0xaa, 0x4c, 0x0e, 0xc4, 0xfd, 0xa6, 0x61, 0xbe, - 0x21, 0x15, 0x82, 0x70, 0x1f, 0x8e, 0x4c, 0xc3, 0x15, 0x4d, 0x26, 0x25, 0x46, 0x40, 0xc2, 0xb1, - 0xe7, 0x58, 0x84, 0xa2, 0x43, 0x61, 0xf0, 0xa3, 0x13, 0xdb, 0x21, 0x08, 0xef, 0x75, 0x58, 0xc4, - 0x21, 0xbb, 0x8e, 0xfb, 0x7b, 0x1d, 0xd5, 0x89, 0xe8, 0x38, 0x12, 0x66, 0xce, 0x66, 0xb6, 0x63, - 0x95, 0x41, 0x15, 0x4b, 0x7b, 0x80, 0x0f, 0xa1, 0x5b, 0x99, 0x71, 0x1d, 0xdb, 0x0f, 0xd0, 0x43, - 0xfc, 0x08, 0xee, 0x57, 0xd0, 0x84, 0x04, 0xd4, 0x36, 0x8b, 0x54, 0x1f, 0x09, 0xae, 0x37, 0x0b, - 0x42, 0x6f, 0x1c, 0x4e, 0xc8, 0xc4, 0xa3, 0x97, 0xa8, 0x7f, 0xf2, 0xb3, 0x02, 0xed, 0xea, 0x47, - 0x17, 0xb7, 0xa1, 0xee, 0x7a, 0x2e, 0x41, 0xf7, 0x44, 0x75, 0xe6, 0x79, 0x0e, 0x52, 0x44, 0x65, - 0xbb, 0xc1, 0x77, 0xa8, 0x86, 0x35, 0x68, 0xd8, 0x6e, 0xf0, 0xcd, 0x29, 0x52, 0xcb, 0xf2, 0xe5, - 0x08, 0xd5, 0xcb, 0xf2, 0xf4, 0x15, 0x6a, 0x88, 0x72, 0xec, 0x78, 0x46, 0x80, 0x00, 0x03, 0x34, - 0x2d, 0x6f, 0x76, 0xe6, 0x10, 0xa4, 0x8b, 0xda, 0x0f, 0xa8, 0xed, 0xbe, 0x46, 0x47, 0x42, 0x41, - 0xb9, 0x89, 0x33, 0xdb, 0x35, 0xe8, 0x25, 0x5a, 0x88, 0x34, 0x4b, 0xa8, 0x68, 0x66, 0x27, 0xef, - 0xa0, 0x59, 0xbc, 0xcb, 0xa2, 0xd5, 0x4e, 0x72, 0x96, 0x71, 0x74, 0x4f, 0x8e, 0x64, 0x2b, 0xc6, - 0x19, 0x52, 0xe4, 0x48, 0x16, 0x65, 0xf3, 0x1f, 0x51, 0x0d, 0x77, 0xa0, 0x1d, 0xc4, 0x6b, 0xe6, - 0x6f, 0x93, 0x39, 0x52, 0xc5, 0x6b, 0xfe, 0x96, 0x6d, 0x47, 0x3e, 0x5b, 0xa2, 0x3a, 0xee, 0x01, - 0x88, 0x7f, 0x21, 0x71, 0xce, 0xe3, 0x79, 0x8e, 0x1a, 0x57, 0x4d, 0xf9, 0x07, 0xe5, 0xe5, 0x9f, - 0x01, 0x00, 0x00, 0xff, 0xff, 0x27, 0x2a, 0xd3, 0x11, 0xb0, 0x08, 0x00, 0x00, -} diff --git a/writer/client-go/pb/pulsar.proto b/writer/client-go/pb/pulsar.proto deleted file mode 100644 index adea199110..0000000000 --- a/writer/client-go/pb/pulsar.proto +++ /dev/null @@ -1,120 +0,0 @@ - -syntax = "proto3"; - -package pb; - -enum ErrorCode { - SUCCESS = 0; - UNEXPECTED_ERROR = 1; - CONNECT_FAILED = 2; - PERMISSION_DENIED = 3; - COLLECTION_NOT_EXISTS = 4; - ILLEGAL_ARGUMENT = 5; - ILLEGAL_DIMENSION = 7; - ILLEGAL_INDEX_TYPE = 8; - ILLEGAL_COLLECTION_NAME = 9; - ILLEGAL_TOPK = 10; - ILLEGAL_ROWRECORD = 11; - ILLEGAL_VECTOR_ID = 12; - ILLEGAL_SEARCH_RESULT = 13; - FILE_NOT_FOUND = 14; - META_FAILED = 15; - CACHE_FAILED = 16; - CANNOT_CREATE_FOLDER = 17; - CANNOT_CREATE_FILE = 18; - CANNOT_DELETE_FOLDER = 19; - CANNOT_DELETE_FILE = 20; - BUILD_INDEX_ERROR = 21; - ILLEGAL_NLIST = 22; - ILLEGAL_METRIC_TYPE = 23; - OUT_OF_MEMORY = 24; -} - -message Status { - ErrorCode error_code = 1; - string reason = 2; -} - -enum DataType { - NONE = 0; - BOOL = 1; - INT8 = 2; - INT16 = 3; - INT32 = 4; - INT64 = 5; - - FLOAT = 10; - DOUBLE = 11; - - STRING = 20; - - VECTOR_BINARY = 100; - VECTOR_FLOAT = 101; -} - -enum OpType { - Insert = 0; - Delete = 1; - Search = 2; - TimeSync = 3; - Key2Seg = 4; - Statistics = 5; -} - -message SegmentRecord { - repeated string seg_info = 1; -} - -message VectorRowRecord { - repeated float float_data = 1; //float vector data - bytes binary_data = 2; //binary vector data -} - -message AttrRecord { - repeated int32 int32_value = 1; - repeated int64 int64_value = 2; - repeated float float_value = 3; - repeated double double_value = 4; -} - -message VectorRecord { - repeated VectorRowRecord records = 1; -} - -message VectorParam { - string json = 1; - VectorRecord row_record = 2; -} - -message FieldValue { - string field_name = 1; - DataType type = 2; - AttrRecord attr_record = 3; - VectorRecord vector_record = 4; -} - -message PulsarMessage { - string collection_name = 1; - repeated FieldValue fields = 2; - int64 entity_id = 3; - string partition_tag = 4; - VectorParam vector_param =5; - SegmentRecord segments = 6; - int64 timestamp = 7; - int64 client_id = 8; - OpType msg_type = 9; -} - -message PulsarMessages { - string collection_name = 1; - repeated FieldValue fields = 2; - repeated int64 entity_id = 3; - string partition_tag = 4; - repeated VectorParam vector_param =5; - repeated SegmentRecord segments = 6; - repeated int64 timestamp = 7; - repeated int64 client_id = 8; - OpType msg_type = 9; -} - - diff --git a/writer/client-go/query_node.go b/writer/client-go/query_node.go deleted file mode 100644 index 60b4f737fe..0000000000 --- a/writer/client-go/query_node.go +++ /dev/null @@ -1,71 +0,0 @@ -package client_go - -import ( - "fmt" - "suvlim/pulsar/client-go/schema" - "sync" - "time" -) - -var ( - consumerQSchema = "{\"type\":\"record\",\"name\":\"suvlim\",\"namespace\":\"pulsar\",\"fields\":[" + - "{\"name\":\"MsgType\",\"type\":\"OpType\"}," + - "]}" -) - -type QueryNode struct { - mc MessageClient -} - -func (qn *QueryNode)doQueryNode(wg sync.WaitGroup) { - wg.Add(3) - go qn.insert_query(qn.mc.InsertMsg, wg) - go qn.delete_query(qn.mc.DeleteMsg, wg) - go qn.search_query(qn.mc.SearchMsg, wg) - wg.Wait() -} - - -func (qn *QueryNode) PrepareBatchMsg() { - qn.mc.PrepareBatchMsg(JobType(0)) -} - -func (qn *QueryNode)ReceiveMessage() { - qn.mc.ReceiveMessage() -} - -func main() { - - mc := MessageClient{} - topics := []string{"insert", "delete"} - mc.InitClient("pulsar://localhost:6650", topics, consumerQSchema) - - qn := QueryNode{mc} - wg := sync.WaitGroup{} - go qn.ReceiveMessage() - - for { - time.Sleep(200 * time.Millisecond) - qn.PrepareBatchMsg() - qn.doQueryNode(wg) - fmt.Println("do a batch in 200ms") - } -} - -func (qn *QueryNode) insert_query(data []*schema.InsertMsg, wg sync.WaitGroup) schema.Status { - wg.Done() - return schema.Status{schema.ErrorCode_SUCCESS, ""} -} - -func (qn *QueryNode) delete_query(data []*schema.DeleteMsg, wg sync.WaitGroup) schema.Status { - wg.Done() - return schema.Status{schema.ErrorCode_SUCCESS, ""} -} - -func (qn *QueryNode) search_query(data []*schema.SearchMsg, wg sync.WaitGroup) schema.Status { - wg.Done() - return schema.Status{schema.ErrorCode_SUCCESS, ""} -} - - - diff --git a/writer/client-go/schema/message.go b/writer/client-go/schema/message.go deleted file mode 100644 index 0dbe7e97f3..0000000000 --- a/writer/client-go/schema/message.go +++ /dev/null @@ -1,198 +0,0 @@ -package schema - -import ( - "encoding/json" - "fmt" -) - -type ErrorCode int32 - -const ( - ErrorCode_SUCCESS ErrorCode = 0 - ErrorCode_UNEXPECTED_ERROR ErrorCode = 1 - ErrorCode_CONNECT_FAILED ErrorCode = 2 - ErrorCode_PERMISSION_DENIED ErrorCode = 3 - ErrorCode_COLLECTION_NOT_EXISTS ErrorCode = 4 - ErrorCode_ILLEGAL_ARGUMENT ErrorCode = 5 - ErrorCode_ILLEGAL_DIMENSION ErrorCode = 7 - ErrorCode_ILLEGAL_INDEX_TYPE ErrorCode = 8 - ErrorCode_ILLEGAL_COLLECTION_NAME ErrorCode = 9 - ErrorCode_ILLEGAL_TOPK ErrorCode = 10 - ErrorCode_ILLEGAL_ROWRECORD ErrorCode = 11 - ErrorCode_ILLEGAL_VECTOR_ID ErrorCode = 12 - ErrorCode_ILLEGAL_SEARCH_RESULT ErrorCode = 13 - ErrorCode_FILE_NOT_FOUND ErrorCode = 14 - ErrorCode_META_FAILED ErrorCode = 15 - ErrorCode_CACHE_FAILED ErrorCode = 16 - ErrorCode_CANNOT_CREATE_FOLDER ErrorCode = 17 - ErrorCode_CANNOT_CREATE_FILE ErrorCode = 18 - ErrorCode_CANNOT_DELETE_FOLDER ErrorCode = 19 - ErrorCode_CANNOT_DELETE_FILE ErrorCode = 20 - ErrorCode_BUILD_INDEX_ERROR ErrorCode = 21 - ErrorCode_ILLEGAL_NLIST ErrorCode = 22 - ErrorCode_ILLEGAL_METRIC_TYPE ErrorCode = 23 - ErrorCode_OUT_OF_MEMORY ErrorCode = 24 -) - -type Status struct { - Error_code ErrorCode - Reason string -} - -type DataType int32 - -const ( - NONE DataType = 0 - BOOL DataType = 1 - INT8 DataType = 2 - INT16 DataType = 3 - INT32 DataType = 4 - INT64 DataType = 5 - FLOAT DataType = 10 - DOUBLE DataType = 11 - STRING DataType = 20 - VectorBinary DataType = 100 - VectorFloat DataType = 101 -) - -type AttrRecord struct { - Int32Value int32 - Int64Value int64 - FloatValue float32 - DoubleValue float64 -} - -type VectorRowRecord struct { - FloatData []float32 - BinaryData []byte -} - -type VectorRecord struct { - Records *VectorRowRecord -} - -type FieldValue struct { - FieldName string - Type DataType - AttrRecord *AttrRecord //what's the diff with VectorRecord - VectorRecord *VectorRecord -} - -type VectorParam struct { - Json string - RowRecord *VectorRecord -} - -type SegmentRecord struct { - segInfo []string -} - -type OpType int - -const ( - Insert OpType = 0 - Delete OpType = 1 - Search OpType = 2 - TimeSync OpType = 3 - Key2Seg OpType = 4 - Statistics OpType = 5 - EOF OpType = 6 -) - -type PulsarMessage struct { - CollectionName string - Fields []*FieldValue - EntityId int64 - PartitionTag string - VectorParam *VectorParam - Segments []*SegmentRecord - Timestamp int64 - ClientId int64 - MsgType OpType - TopicName string - PartitionId int64 - -} - -type Message interface { - GetType() OpType - Serialization() []byte - Deserialization(serializationData []byte) -} - -type InsertMsg struct { - CollectionName string - Fields []*FieldValue - EntityId uint64 - PartitionTag string - SegmentId uint64 - Timestamp uint64 - ClientId int64 - MsgType OpType -} - -type DeleteMsg struct { - CollectionName string - EntityId uint64 - Timestamp uint64 - ClientId int64 - MsgType OpType -} - -type SearchMsg struct { - CollectionName string - PartitionTag string - VectorParam *VectorParam - Timestamp uint64 - ClientId int64 - MsgType OpType -} - -type TimeSyncMsg struct { - ClientId int64 - Timestamp int64 - MsgType OpType -} - -type Key2SegMsg struct { - EntityId int64 - Segments []*SegmentRecord - MsgType OpType -} - -func (ims *InsertMsg) GetType() OpType { - return ims.MsgType -} - -func (ims *InsertMsg) Serialization() []byte { - data, err := json.Marshal(ims) - if err != nil { - fmt.Println("Can't serialization") - } - return data -} - -func (ims *InsertMsg) Deserialization(serializationData []byte) { - -} - -func (dms *DeleteMsg) GetType() OpType { - return dms.MsgType -} - -func (sms *SearchMsg) GetType() OpType { - return sms.MsgType -} - -func (tms *TimeSyncMsg) GetType() OpType { - return tms.MsgType -} - -func (kms *Key2SegMsg) GetType() OpType { - return kms.MsgType -} - -type SyncEofMsg struct { - MsgType OpType -} - diff --git a/writer/client-go/storage_node.go b/writer/client-go/storage_node.go deleted file mode 100644 index b8a37a8098..0000000000 --- a/writer/client-go/storage_node.go +++ /dev/null @@ -1,56 +0,0 @@ -package client_go - -import ( - "fmt" - "suvlim/pulsar/client-go/schema" - "sync" - "time" -) - -var ( - consumerWSchema = "{\"type\":\"record\",\"name\":\"suvlim\",\"namespace\":\"pulsar\",\"fields\":[" + - "{\"name\":\"MsgType\",\"type\":\"OpType\"}," + - "]}" -) - -type WriteNode struct { - mc MessageClient -} - -func (wn *WriteNode) doWriteNode(wg sync.WaitGroup) { - wg.Add(2) - go wn.insert_write(wn.mc.InsertMsg, wg) - go wn.delete_write(wn.mc.DeleteMsg, wg) - wg.Wait() -} - -func (wn *WriteNode) PrepareBatchMsg() { - wn.mc.PrepareBatchMsg(JobType(1)) -} -func main() { - - mc := MessageClient{} - topics := []string{"insert", "delete"} - mc.InitClient("pulsar://localhost:6650", topics, consumerWSchema) - - go mc.ReceiveMessage() - wg := sync.WaitGroup{} - wn := WriteNode{mc} - - for { - time.Sleep(200 * time.Millisecond) - wn.PrepareBatchMsg() - wn.doWriteNode(wg) - fmt.Println("do a batch in 200ms") - } -} - -func (wn *WriteNode) insert_write(data []*schema.InsertMsg, wg sync.WaitGroup) schema.Status { - wg.Done() - return schema.Status{schema.ErrorCode_SUCCESS, ""} -} - -func (wn *WriteNode) delete_write(data []*schema.DeleteMsg, wg sync.WaitGroup) schema.Status { - wg.Done() - return schema.Status{schema.ErrorCode_SUCCESS, ""} -} diff --git a/writer/client-go/test/client_test.go b/writer/client-go/test/client_test.go deleted file mode 100644 index f13a91cb2c..0000000000 --- a/writer/client-go/test/client_test.go +++ /dev/null @@ -1,19 +0,0 @@ -package test - -import "sync" - -var ( - wg sync.WaitGroup - - OriginMsgSchema = "{\"type\":\"record\",\"name\":\"suvlim\",\"namespace\":\"pulsar\",\"fields\":[" + - "{\"name\":\"CollectionName\",\"type\":\"string\"}," + - "{\"name\":\"Fields\",\"type\":\"[]*FieldValue\"}" + - "{\"name\":\"EntityId\",\"type\":\"int64\"}" + - "{\"name\":\"PartitionTag\",\"type\":\"string\"}" + - "{\"name\":\"VectorParam\",\"type\":\"*VectorParam\"}" + - "{\"name\":\"Segments\",\"type\":\"[]string\"}" + - "{\"name\":\"Timestamp\",\"type\":\"int64\"}" + - "{\"name\":\"ClientId\",\"type\":\"int64\"}" + - "{\"name\":\"MsgType\",\"type\":\"OpType\"}" + - "]}" -) \ No newline at end of file diff --git a/writer/main.go b/writer/main.go new file mode 100644 index 0000000000..f4e8d34f06 --- /dev/null +++ b/writer/main.go @@ -0,0 +1,68 @@ +package main + +import ( + "context" + "fmt" + "log" + "sync" + "time" + "writer/message_client" + "writer/mock" + "writer/pb" + "writer/write_node" +) + +func GetInsertMsg(collectionName string, partitionTag string, entityId int64) *pb.InsertOrDeleteMsg { + return &pb.InsertOrDeleteMsg{ + CollectionName: collectionName, + PartitionTag: partitionTag, + SegmentId: int64(entityId / 100), + Uid: int64(entityId), + Timestamp: int64(entityId), + ClientId: 0, + } +} + +func GetDeleteMsg(collectionName string, entityId int64) *pb.InsertOrDeleteMsg { + return &pb.InsertOrDeleteMsg{ + CollectionName: collectionName, + Uid: entityId, + Timestamp: int64(entityId + 100), + } +} + +func main() { + + mc := message_client.MessageClient{} + mc.InitClient("pulsar://localhost:6650") + //TODO::close client / consumer/ producer + //mc.Close() + + go mc.ReceiveMessage() + wg := sync.WaitGroup{} + + kv, err := mock.NewTikvStore() + if err != nil { + log.Fatal(err) + } + + wn := write_node.WriteNode{ + KvStore: kv, + MessageClient: &mc, + TimeSync: 100, + } + + ctx := context.Background() + for { + time.Sleep(200 * time.Millisecond) + msgLength := wn.MessageClient.PrepareBatchMsg() + readyDo := true + for _, len := range msgLength { + if len <= 0 { readyDo = false } + } + if readyDo { + wn.DoWriteNode(ctx, 100, wg) + } + fmt.Println("do a batch in 200ms") + } +} diff --git a/writer/message_client/message_client.go b/writer/message_client/message_client.go new file mode 100644 index 0000000000..5e4f15fc87 --- /dev/null +++ b/writer/message_client/message_client.go @@ -0,0 +1,197 @@ +package message_client + +import ( + "context" + "github.com/apache/pulsar/pulsar-client-go/pulsar" + "github.com/golang/protobuf/proto" + "log" + "writer/pb" +) + + +type MessageClient struct { + + //message channel + insertOrDeleteChan chan *pb.InsertOrDeleteMsg + searchChan chan *pb.SearchMsg + timeSyncChan chan *pb.TimeSyncMsg + + // pulsar + client pulsar.Client + key2segProducer pulsar.Producer + writeSyncProducer pulsar.Producer + insertOrDeleteConsumer pulsar.Consumer + searchConsumer pulsar.Consumer + timeSyncConsumer pulsar.Consumer + + // batch messages + InsertMsg []*pb.InsertOrDeleteMsg + DeleteMsg []*pb.InsertOrDeleteMsg + SearchMsg []*pb.SearchMsg + timeSyncMsg []*pb.TimeSyncMsg +} + +func (mc *MessageClient)ReceiveInsertOrDeleteMsg() { + for { + insetOrDeleteMsg := pb.InsertOrDeleteMsg{} + msg, err := mc.insertOrDeleteConsumer.Receive(context.Background()) + err = proto.Unmarshal(msg.Payload(), &insetOrDeleteMsg) + if err != nil { + log.Fatal(err) + } + mc.insertOrDeleteChan <- &insetOrDeleteMsg + } +} + +func (mc *MessageClient)ReceiveSearchMsg() { + for { + searchMsg := pb.SearchMsg{} + msg, err := mc.insertOrDeleteConsumer.Receive(context.Background()) + err = proto.Unmarshal(msg.Payload(), &searchMsg) + if err != nil { + log.Fatal(err) + } + mc.searchChan <- &searchMsg + } +} + +func (mc *MessageClient)ReceiveTimeSyncMsg() { + for { + timeSyncMsg := pb.TimeSyncMsg{} + msg, err := mc.insertOrDeleteConsumer.Receive(context.Background()) + err = proto.Unmarshal(msg.Payload(), &timeSyncMsg) + if err != nil { + log.Fatal(err) + } + mc.timeSyncChan <- &timeSyncMsg + } +} + +func (mc *MessageClient) ReceiveMessage() { + go mc.ReceiveInsertOrDeleteMsg() + go mc.ReceiveSearchMsg() + go mc.ReceiveTimeSyncMsg() +} + +func (mc *MessageClient) CreatProducer(topicName string) pulsar.Producer{ + producer, err := mc.client.CreateProducer(pulsar.ProducerOptions{ + Topic: topicName, + }) + + if err != nil { + log.Fatal(err) + } + return producer +} + +func (mc *MessageClient) CreateConsumer(topicName string) pulsar.Consumer { + consumer, err := mc.client.Subscribe(pulsar.ConsumerOptions{ + Topic: topicName, + SubscriptionName: "multi-topic-sub", + }) + + if err != nil { + log.Fatal(err) + } + return consumer +} + +func (mc *MessageClient) CreateClient(url string) pulsar.Client { + // create client + client, err := pulsar.NewClient(pulsar.ClientOptions{ + URL: url, + }) + + if err != nil { + log.Fatal(err) + } + return client +} + +func (mc *MessageClient) InitClient(url string) { + //create client + mc.client = mc.CreateClient(url) + + //create producer + mc.key2segProducer = mc.CreatProducer("Key2Seg") + mc.writeSyncProducer = mc.CreatProducer("TimeSync") + + //create consumer + mc.insertOrDeleteConsumer = mc.CreateConsumer("InsertOrDelete") + mc.searchConsumer = mc.CreateConsumer("Search") + mc.timeSyncConsumer = mc.CreateConsumer("TimeSync") + + // init channel + mc.insertOrDeleteChan = make(chan *pb.InsertOrDeleteMsg, 1000) + mc.searchChan = make(chan *pb.SearchMsg, 1000) + mc.timeSyncChan = make(chan *pb.TimeSyncMsg, 1000) +} + +func (mc *MessageClient) Close() { + defer mc.client.Close() + defer mc.key2segProducer.Close() + defer mc.writeSyncProducer.Close() + defer mc.insertOrDeleteConsumer.Close() + defer mc.searchConsumer.Close() + defer mc.timeSyncConsumer.Close() +} + +type JobType int +const ( + OpInQueryNode JobType = 0 + OpInWriteNode JobType = 1 +) + +type MessageType int +const ( + InsertOrDelete MessageType = 0 + Delete MessageType = 1 + Search MessageType = 2 + TimeSync MessageType = 3 + Key2Seg MessageType = 4 + Statistics MessageType = 5 +) + +func (mc *MessageClient) PrepareMsg(messageType MessageType, msgLen int) { + switch messageType { + case InsertOrDelete: + for i := 0; i < msgLen; i++ { + msg := <-mc.insertOrDeleteChan + if msg.Op == pb.OpType_INSERT { + mc.InsertMsg[i] = msg + } else { + mc.DeleteMsg[i] = msg + } + } + case Search: + for i := 0; i < msgLen; i++ { + msg := <-mc.searchChan + mc.SearchMsg[i] = msg + } + case TimeSync: + for i := 0; i < msgLen; i++ { + msg := <-mc.timeSyncChan + mc.timeSyncMsg[i] = msg + } + } +} + +func (mc *MessageClient)PrepareBatchMsg() []int{ + // assume the channel not full + mc.InsertMsg = make([]*pb.InsertOrDeleteMsg, 1000) + mc.DeleteMsg = make([]*pb.InsertOrDeleteMsg, 1000) + mc.SearchMsg = make([]*pb.SearchMsg, 1000) + mc.timeSyncMsg = make([]*pb.TimeSyncMsg, 1000) + + // get the length of every channel + insertOrDeleteLen := len(mc.insertOrDeleteChan) + searchLen := len(mc.searchChan) + timeLen := len(mc.timeSyncChan) + + // get message from channel to slice + mc.PrepareMsg(InsertOrDelete, insertOrDeleteLen) + mc.PrepareMsg(Search, searchLen) + mc.PrepareMsg(TimeSync, timeLen) + + return []int{insertOrDeleteLen, searchLen, timeLen} +} \ No newline at end of file diff --git a/writer/client-go/pb/build.sh b/writer/pb/build.sh similarity index 100% rename from writer/client-go/pb/build.sh rename to writer/pb/build.sh diff --git a/writer/client-go/pb/suvlim.pb.go b/writer/pb/suvlim.pb.go similarity index 100% rename from writer/client-go/pb/suvlim.pb.go rename to writer/pb/suvlim.pb.go diff --git a/writer/client-go/pb/suvlim.proto b/writer/pb/suvlim.proto similarity index 99% rename from writer/client-go/pb/suvlim.proto rename to writer/pb/suvlim.proto index f629d79db5..272efbff59 100644 --- a/writer/client-go/pb/suvlim.proto +++ b/writer/pb/suvlim.proto @@ -97,7 +97,7 @@ message FieldName { message Mapping { Status status = 1; string collection_name = 2; - repeated FieldParam fields = 3; + Schema schema = 3; repeated KeyValuePair extra_params = 4; } diff --git a/writer/test/test_writer.go b/writer/test/insert_test.go similarity index 51% rename from writer/test/test_writer.go rename to writer/test/insert_test.go index 23301a7dbc..06d4cc4d17 100644 --- a/writer/test/test_writer.go +++ b/writer/test/insert_test.go @@ -1,57 +1,61 @@ -package main +package test import ( "context" - "github.com/czs007/suvlim/pulsar/schema" - "github.com/czs007/suvlim/writer" + "sync" + "testing" + "writer/pb" + "writer/write_node" ) -func GetInsertMsg(collectionName string, partitionTag string, entityId int64) *schema.InsertMsg { - return &schema.InsertMsg{ +func GetInsertMsg(collectionName string, partitionTag string, entityId int64) *pb.InsertOrDeleteMsg { + return &pb.InsertOrDeleteMsg{ CollectionName: collectionName, PartitionTag: partitionTag, - SegmentId: uint64(entityId / 100), - EntityId: int64(entityId), - Timestamp: uint64(entityId), + SegmentId: int64(entityId / 100), + Uid: int64(entityId), + Timestamp: int64(entityId), ClientId: 0, } } -func GetDeleteMsg(collectionName string, entityId int64) *schema.DeleteMsg { - return &schema.DeleteMsg{ +func GetDeleteMsg(collectionName string, entityId int64) *pb.InsertOrDeleteMsg { + return &pb.InsertOrDeleteMsg{ CollectionName: collectionName, - EntityId: entityId, - Timestamp: uint64(entityId + 100), + Uid: entityId, + Timestamp: int64(entityId + 100), } } -func main() { +func TestInsert(t *testing.T) { ctx := context.Background() var topics []string topics = append(topics, "test") topics = append(topics, "test1") - writerNode, _ := writer.NewWriteNode(ctx, "null", topics, 0) - var insertMsgs []*schema.InsertMsg + writerNode, _ := write_node.NewWriteNode(ctx, "null", topics, 0) + var insertMsgs []*pb.InsertOrDeleteMsg for i := 0; i < 120; i++ { insertMsgs = append(insertMsgs, GetInsertMsg("collection0", "tag01", int64(i))) } + wg := sync.WaitGroup{} + wg.Add(3) //var wg sync.WaitGroup - writerNode.InsertBatchData(ctx, insertMsgs, 100) + writerNode.InsertBatchData(ctx, insertMsgs, wg) data1 := writerNode.KvStore.GetData(ctx) - gtInsertBuffer := writerNode.GetInsertBuffer() + //gtInsertBuffer := writerNode.GetInsertBuffer() println(len(data1)) - println(gtInsertBuffer.Len()) - var insertMsgs2 []*schema.InsertMsg + + var insertMsgs2 []*pb.InsertOrDeleteMsg for i := 120; i < 200; i++ { insertMsgs2 = append(insertMsgs2, GetInsertMsg("collection0", "tag02", int64(i))) } - writerNode.InsertBatchData(ctx, insertMsgs2, 200) + writerNode.InsertBatchData(ctx, insertMsgs2, wg) data2 := writerNode.KvStore.GetData(ctx) println(len(data2)) - var deleteMsgs []*schema.DeleteMsg + var deleteMsgs []*pb.InsertOrDeleteMsg deleteMsgs = append(deleteMsgs, GetDeleteMsg("collection0", 2)) deleteMsgs = append(deleteMsgs, GetDeleteMsg("collection0", 120)) - writerNode.DeleteBatchData(ctx, deleteMsgs, 200) + writerNode.DeleteBatchData(ctx, deleteMsgs, wg) data3 := writerNode.KvStore.GetData(ctx) println(len(data3)) } diff --git a/writer/writer.go b/writer/write_node/writer_node.go similarity index 65% rename from writer/writer.go rename to writer/write_node/writer_node.go index e33d7fa34a..78a3222257 100644 --- a/writer/writer.go +++ b/writer/write_node/writer_node.go @@ -1,13 +1,13 @@ -package writer +package write_node import ( "context" "fmt" - "github.com/czs007/suvlim/pulsar" - "github.com/czs007/suvlim/pulsar/schema" - "github.com/czs007/suvlim/writer/mock" "strconv" "sync" + "writer/message_client" + "writer/mock" + "writer/pb" ) type SegmentIdInfo struct { @@ -17,9 +17,9 @@ type SegmentIdInfo struct { } type WriteNode struct { - KvStore *mock.TikvStore - mc *pulsar.MessageClient - timeSync uint64 + KvStore *mock.TikvStore + MessageClient *message_client.MessageClient + TimeSync uint64 } func NewWriteNode(ctx context.Context, @@ -27,15 +27,15 @@ func NewWriteNode(ctx context.Context, topics []string, timeSync uint64) (*WriteNode, error) { kv, err := mock.NewTikvStore() - mc := &pulsar.MessageClient{} + mc := &message_client.MessageClient{} return &WriteNode{ - KvStore: kv, - mc: mc, - timeSync: timeSync, + KvStore: kv, + MessageClient: mc, + TimeSync: timeSync, }, err } -func (wn *WriteNode) InsertBatchData(ctx context.Context, data []*schema.InsertMsg, timeSync uint64, wg sync.WaitGroup) error { +func (wn *WriteNode) InsertBatchData(ctx context.Context, data []*pb.InsertOrDeleteMsg, wg sync.WaitGroup) error { var prefixKey string var suffixKey string var prefixKeys [][]byte @@ -44,12 +44,12 @@ func (wn *WriteNode) InsertBatchData(ctx context.Context, data []*schema.InsertM var timeStamp []uint64 for i := 0; i < len(data); i++ { - prefixKey = data[i].CollectionName + "-" + strconv.FormatUint(data[i].EntityId, 10) - suffixKey = strconv.FormatUint(data[i].SegmentId, 10) + prefixKey = data[i].CollectionName + "-" + strconv.FormatUint(uint64(data[i].Uid), 10) + suffixKey = strconv.FormatUint(uint64(data[i].SegmentId), 10) prefixKeys = append(prefixKeys, []byte(prefixKey)) suffixKeys = append(suffixKeys, []byte(suffixKey)) - binaryData = append(binaryData, data[i].Serialization()) - timeStamp = append(timeStamp, data[i].Timestamp) + binaryData = append(binaryData, []byte(data[i].String())) + timeStamp = append(timeStamp, uint64(data[i].Timestamp)) } error := (*wn.KvStore).PutRows(ctx, prefixKeys, timeStamp, suffixKeys, binaryData) @@ -61,22 +61,22 @@ func (wn *WriteNode) InsertBatchData(ctx context.Context, data []*schema.InsertM return nil } -func (wn *WriteNode) DeleteBatchData(ctx context.Context, data []*schema.DeleteMsg, timeSync uint64, wg sync.WaitGroup) error { +func (wn *WriteNode) DeleteBatchData(ctx context.Context, data []*pb.InsertOrDeleteMsg, wg sync.WaitGroup) error { var segmentInfos []*SegmentIdInfo var prefixKey string var prefixKeys [][]byte var timeStamps []uint64 for i := 0; i < len(data); i++ { - prefixKey = data[i].CollectionName + "-" + strconv.FormatUint(data[i].EntityId, 10) + prefixKey = data[i].CollectionName + "-" + strconv.FormatUint(uint64(data[i].Uid), 10) prefixKeys = append(prefixKeys, []byte(prefixKey)) - timeStamps = append(timeStamps, data[i].Timestamp) + timeStamps = append(timeStamps, uint64(data[i].Timestamp)) } segmentIds := (*wn.KvStore).GetSegment(ctx, prefixKeys) for i := 0; i < len(prefixKeys); i++ { segmentInfos = append(segmentInfos, &SegmentIdInfo{ CollectionName: data[i].CollectionName, - EntityId: data[i].EntityId, + EntityId: data[i].Uid, SegmentIds: segmentIds, }) } @@ -89,12 +89,13 @@ func (wn *WriteNode) DeleteBatchData(ctx context.Context, data []*schema.DeleteM } func (wn *WriteNode) UpdateTimeSync(timeSync uint64) { - wn.timeSync = timeSync + wn.TimeSync = timeSync } -func (wn *WriteNode) doWriteNode(ctx context.Context, timeSync uint64, wg sync.WaitGroup) { +func (wn *WriteNode) DoWriteNode(ctx context.Context, timeSync uint64, wg sync.WaitGroup) { wg.Add(2) - go wn.InsertBatchData(ctx, wn.mc.InsertMsg, timeSync, wg) - go wn.DeleteBatchData(ctx, wn.mc.DeleteMsg, timeSync, wg) + go wn.InsertBatchData(ctx, wn.MessageClient.InsertMsg, wg) + go wn.DeleteBatchData(ctx, wn.MessageClient.DeleteMsg, wg) wg.Wait() + wn.UpdateTimeSync(timeSync) }