Add proto for system internal message

Signed-off-by: xige-16 <xi.ge@zilliz.com>
This commit is contained in:
xige-16 2020-09-04 14:31:23 +08:00 committed by yefu.chen
parent ce95fd4f20
commit a2de464c71
30 changed files with 78225 additions and 5553 deletions

View File

@ -670,8 +670,6 @@ service MilvusService {
enum OpType {
INSERT = 0;
DELETE = 1;
SEARCH = 2;
SEARCH_RESULT = 3;
}
message InsertOrDeleteMsg {
@ -697,10 +695,14 @@ message SearchMsg {
repeated KeyValuePair extra_params = 7;
}
enum SyncType {
READ = 0;
WRITE = 1;
}
message TimeSyncMsg{
int64 ClientId = 1;
int64 Timestamp = 2;
OpType MsgType = 3;
int64 peer_Id = 1;
int64 Timestamp = 2;
SyncType sync_type = 3;
}
message SegmentRecord {
@ -709,7 +711,6 @@ message SegmentRecord {
}
message Key2SegMsg {
int64 client_id = 1;
SegmentRecord records = 2;
OpType msg_type = 3;
}
int64 client_id = 1;
SegmentRecord records = 2;
}

View File

@ -1,18 +1,18 @@
#!/bin/bash
../../cmake-build-debug/thirdparty/grpc/grpc-build/third_party/protobuf/protoc -I . --grpc_out=./gen-status --plugin=protoc-gen-grpc="../../cmake-build-debug/thirdparty/grpc/grpc-build/grpc_cpp_plugin" status.proto
../../cmake-build-debug/thirdparty/grpc/grpc-build/third_party/protobuf/protoc -I . --cpp_out=./gen-status status.proto
../../cmake-build-debug/thirdparty/grpc/grpc-build/third_party/protobuf/protoc -I . --grpc_out=./gen-milvus --plugin=protoc-gen-grpc="../../cmake-build-debug/thirdparty/grpc/grpc-build/grpc_cpp_plugin" milvus.proto
../../cmake-build-debug/thirdparty/grpc/grpc-build/third_party/protobuf/protoc -I . --cpp_out=./gen-milvus milvus.proto
../../cmake-build-debug/thirdparty/grpc/grpc-build/third_party/protobuf/protoc -I . --grpc_out=./gen-milvus --plugin=protoc-gen-grpc="../../cmake-build-debug/thirdparty/grpc/grpc-build/grpc_cpp_plugin" hello.proto
../../cmake-build-debug/thirdparty/grpc/grpc-build/third_party/protobuf/protoc -I . --cpp_out=./gen-milvus hello.proto
../../cmake-build-debug/thirdparty/grpc/grpc-build/third_party/protobuf/protoc -I . --grpc_out=./gen-milvus --plugin=protoc-gen-grpc="../../cmake-build-debug/thirdparty/grpc/grpc-build/grpc_cpp_plugin" master.proto
../../cmake-build-debug/thirdparty/grpc/grpc-build/third_party/protobuf/protoc -I . --cpp_out=./gen-milvus master.proto
#../../cmake-build-debug/thirdparty/grpc/grpc-build/third_party/protobuf/protoc -I . --grpc_out=./gen-status --plugin=protoc-gen-grpc="../../cmake-build-debug/thirdparty/grpc/grpc-build/grpc_cpp_plugin" status.proto
#
#../../cmake-build-debug/thirdparty/grpc/grpc-build/third_party/protobuf/protoc -I . --cpp_out=./gen-status status.proto
#
#../../cmake-build-debug/thirdparty/grpc/grpc-build/third_party/protobuf/protoc -I . --grpc_out=./gen-milvus --plugin=protoc-gen-grpc="../../cmake-build-debug/thirdparty/grpc/grpc-build/grpc_cpp_plugin" milvus.proto
#
#../../cmake-build-debug/thirdparty/grpc/grpc-build/third_party/protobuf/protoc -I . --cpp_out=./gen-milvus milvus.proto
#
#
#../../cmake-build-debug/thirdparty/grpc/grpc-build/third_party/protobuf/protoc -I . --grpc_out=./gen-milvus --plugin=protoc-gen-grpc="../../cmake-build-debug/thirdparty/grpc/grpc-build/grpc_cpp_plugin" hello.proto
#
#../../cmake-build-debug/thirdparty/grpc/grpc-build/third_party/protobuf/protoc -I . --cpp_out=./gen-milvus hello.proto
#
#../../cmake-build-debug/thirdparty/grpc/grpc-build/third_party/protobuf/protoc -I . --grpc_out=./gen-milvus --plugin=protoc-gen-grpc="../../cmake-build-debug/thirdparty/grpc/grpc-build/grpc_cpp_plugin" master.proto
#
#../../cmake-build-debug/thirdparty/grpc/grpc-build/third_party/protobuf/protoc -I . --cpp_out=./gen-milvus master.proto

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

716
proxy/src/grpc/suvlim.proto Normal file
View File

@ -0,0 +1,716 @@
syntax = "proto3";
package milvus.grpc;
enum ErrorCode {
SUCCESS = 0;
UNEXPECTED_ERROR = 1;
CONNECT_FAILED = 2;
PERMISSION_DENIED = 3;
COLLECTION_NOT_EXISTS = 4;
ILLEGAL_ARGUMENT = 5;
ILLEGAL_DIMENSION = 7;
ILLEGAL_INDEX_TYPE = 8;
ILLEGAL_COLLECTION_NAME = 9;
ILLEGAL_TOPK = 10;
ILLEGAL_ROWRECORD = 11;
ILLEGAL_VECTOR_ID = 12;
ILLEGAL_SEARCH_RESULT = 13;
FILE_NOT_FOUND = 14;
META_FAILED = 15;
CACHE_FAILED = 16;
CANNOT_CREATE_FOLDER = 17;
CANNOT_CREATE_FILE = 18;
CANNOT_DELETE_FOLDER = 19;
CANNOT_DELETE_FILE = 20;
BUILD_INDEX_ERROR = 21;
ILLEGAL_NLIST = 22;
ILLEGAL_METRIC_TYPE = 23;
OUT_OF_MEMORY = 24;
}
message Status {
ErrorCode error_code = 1;
string reason = 2;
}
/**
* @brief Field data type
*/
enum DataType {
NONE = 0;
BOOL = 1;
INT8 = 2;
INT16 = 3;
INT32 = 4;
INT64 = 5;
FLOAT = 10;
DOUBLE = 11;
STRING = 20;
VECTOR_BINARY = 100;
VECTOR_FLOAT = 101;
}
/**
* @brief General usage
*/
message KeyValuePair {
string key = 1;
string value = 2;
}
/**
* @brief Collection name
*/
message CollectionName {
string collection_name = 1;
}
/**
* @brief Collection name list
*/
message CollectionNameList {
Status status = 1;
repeated string collection_names = 2;
}
/**
* @brief Field name
*/
message FieldName {
string collection_name = 1;
string field_name = 2;
}
/**
* @brief Collection mapping
* @extra_params: key-value pair for extra parameters of the collection
* typically usage:
* extra_params["params"] = {segment_row_count: 1000000, auto_id: true}
* Note:
* the segment_row_count specify segment row count limit for merging
* the auto_id = true means entity id is auto-generated by milvus
*/
message Mapping {
Status status = 1;
string collection_name = 2;
repeated FieldParam fields = 3;
repeated KeyValuePair extra_params = 4;
}
/**
* @brief Collection mapping list
*/
message MappingList {
Status status = 1;
repeated Mapping mapping_list = 2;
}
/**
* @brief Parameters of partition
*/
message PartitionParam {
string collection_name = 1;
string tag = 2;
}
/**
* @brief Partition list
*/
message PartitionList {
Status status = 1;
repeated string partition_tag_array = 2;
}
/**
* @brief Vector row record
*/
message VectorRowRecord {
repeated float float_data = 1; //float vector data
bytes binary_data = 2; //binary vector data
}
message EntityIds {
Status status = 1;
repeated int64 entity_id_array = 2;
}
message VectorRecord {
repeated VectorRowRecord records = 1;
}
message VectorParam {
string json = 1;
VectorRecord row_record = 2;
}
//////////////////////////row schema and data///////////////////////////////////
/**
* @brief schema
*/
message FieldMeta {
string field_name = 1;
DataType type = 2;
int64 dim = 3;
}
message Schema {
repeated FieldMeta field_metas = 1;
}
message RowData {
bytes blob = 1;
}
//////////////////////suvlim-proxy///////////////////////////////////
message InsertParam {
string collection_name = 1;
Schema schema = 2;
repeated RowData rows_data = 3;
repeated int64 entity_id_array = 4; //optional
string partition_tag = 5;
repeated KeyValuePair extra_params = 6;
}
message SearchParam {
string collection_name = 1;
repeated VectorParam vector_param = 2;
string dsl = 3; //optional
repeated string partition_tag = 4; //why
repeated KeyValuePair extra_params = 5;
}
message SearchInSegmentParam {
repeated string file_id_array = 1;
SearchParam search_param = 2;
}
message Entities {
Status status = 1;
repeated int64 ids = 2;
repeated bool valid_row = 3;
repeated RowData rows_data = 4;
}
///////////////////////////milvus-server///////////////////////////
/**
* @brief Query result
*/
message QueryResult {
Status status = 1;
Entities entities = 2;
int64 row_num = 3;
repeated float scores = 4;
repeated float distances = 5;
repeated KeyValuePair extra_params = 6;
}
/**
* @brief Server string Reply
*/
message StringReply {
Status status = 1;
string string_reply = 2;
}
/**
* @brief Server bool Reply
*/
message BoolReply {
Status status = 1;
bool bool_reply = 2;
}
/**
* @brief Return collection row count
*/
message CollectionRowCount {
Status status = 1;
int64 collection_row_count = 2;
}
/**
* @brief Server command parameters
*/
message Command {
string cmd = 1;
}
/**
* @brief Index params
* @collection_name: target collection
* @field_name: target field
* @index_name: a name for index provided by user, unique within this field
* @extra_params: index parameters in json format
* for vector field:
* extra_params["index_type"] = one of the values: FLAT, IVF_LAT, IVF_SQ8, NSGMIX, IVFSQ8H,
* PQ, HNSW, HNSW_SQ8NM, ANNOY
* extra_params["metric_type"] = one of the values: L2, IP, HAMMING, JACCARD, TANIMOTO
* SUBSTRUCTURE, SUPERSTRUCTURE
* extra_params["params"] = extra parameters for index, for example ivflat: {nlist: 2048}
* for structured field:
* extra_params["index_type"] = one of the values: SORTED
*/
message IndexParam {
Status status = 1;
string collection_name = 2;
string field_name = 3;
string index_name = 4;
repeated KeyValuePair extra_params = 5;
}
/**
* @brief Parameters for flush action
*/
message FlushParam {
repeated string collection_name_array = 1;
}
/**
* @brief Parameters for flush action
*/
message CompactParam {
string collection_name = 1;
double threshold = 2;
}
/**
* @brief Parameters for deleting entities by id
*/
message DeleteByIDParam {
string collection_name = 1;
repeated int64 id_array = 2;
}
/**
* @brief Return collection stats
* @json_info: collection stats in json format, typically, the format is like:
* {
* row_count: xxx,
* data_size: xxx,
* partitions: [
* {
* tag: xxx,
* id: xxx,
* row_count: xxx,
* data_size: xxx,
* segments: [
* {
* id: xxx,
* row_count: xxx,
* data_size: xxx,
* files: [
* {
* field: xxx,
* name: xxx,
* index_type: xxx,
* path: xxx,
* data_size: xxx,
* }
* ]
* }
* ]
* }
* ]
* }
*/
message CollectionInfo {
Status status = 1;
string json_info = 2;
}
/**
* @brief Parameters for returning entities id of a segment
*/
message GetEntityIDsParam {
string collection_name = 1;
int64 segment_id = 2;
}
/**
* @brief Entities identity
*/
message EntityIdentity {
string collection_name = 1;
repeated int64 id_array = 2;
repeated string field_names = 3;
}
/********************************************SearchPB interface***************************************************/
/**
* @brief Vector field parameters
*/
message VectorFieldParam {
int64 dimension = 1;
}
/**
* @brief Field type
*/
message FieldType {
oneof value {
DataType data_type = 1;
VectorFieldParam vector_param = 2;
}
}
/**
* @brief Field parameters
*/
message FieldParam {
uint64 id = 1;
string name = 2;
DataType type = 3;
repeated KeyValuePair index_params = 4;
repeated KeyValuePair extra_params = 5;
}
/**
* @brief Vector field record
*/
message VectorFieldRecord {
repeated VectorRowRecord value = 1;
}
///////////////////////////////////////////////////////////////////
message TermQuery {
string field_name = 1;
repeated int64 int_value = 2;
repeated double double_value = 3;
int64 value_num = 4;
float boost = 5;
repeated KeyValuePair extra_params = 6;
}
enum CompareOperator {
LT = 0;
LTE = 1;
EQ = 2;
GT = 3;
GTE = 4;
NE = 5;
}
message CompareExpr {
CompareOperator operator = 1;
string operand = 2;
}
message RangeQuery {
string field_name = 1;
repeated CompareExpr operand = 2;
float boost = 3;
repeated KeyValuePair extra_params = 4;
}
message VectorQuery {
string field_name = 1;
float query_boost = 2;
repeated VectorRowRecord records = 3;
int64 topk = 4;
repeated KeyValuePair extra_params = 5;
}
enum Occur {
INVALID = 0;
MUST = 1;
SHOULD = 2;
MUST_NOT = 3;
}
message BooleanQuery {
Occur occur = 1;
repeated GeneralQuery general_query = 2;
}
message GeneralQuery {
oneof query {
BooleanQuery boolean_query = 1;
TermQuery term_query = 2;
RangeQuery range_query = 3;
VectorQuery vector_query = 4;
}
}
message SearchParamPB {
string collection_name = 1;
repeated string partition_tag_array = 2;
GeneralQuery general_query = 3;
repeated KeyValuePair extra_params = 4;
}
service MilvusService {
/**
* @brief This method is used to create collection
*
* @param CollectionSchema, use to provide collection information to be created.
*
* @return Status
*/
rpc CreateCollection(Mapping) returns (Status){}
/**
* @brief This method is used to test collection existence.
*
* @param CollectionName, collection name is going to be tested.
*
* @return BoolReply
*/
rpc HasCollection(CollectionName) returns (BoolReply) {}
/**
* @brief This method is used to get collection schema.
*
* @param CollectionName, target collection name.
*
* @return CollectionSchema
*/
rpc DescribeCollection(CollectionName) returns (Mapping) {}
/**
* @brief This method is used to get collection schema.
*
* @param CollectionName, target collection name.
*
* @return CollectionRowCount
*/
rpc CountCollection(CollectionName) returns (CollectionRowCount) {}
/**
* @brief This method is used to list all collections.
*
* @param Command, dummy parameter.
*
* @return CollectionNameList
*/
rpc ShowCollections(Command) returns (CollectionNameList) {}
/**
* @brief This method is used to get collection detail information.
*
* @param CollectionName, target collection name.
*
* @return CollectionInfo
*/
rpc ShowCollectionInfo(CollectionName) returns (CollectionInfo) {}
/**
* @brief This method is used to delete collection.
*
* @param CollectionName, collection name is going to be deleted.
*
* @return Status
*/
rpc DropCollection(CollectionName) returns (Status) {}
/**
* @brief This method is used to build index by collection in sync mode.
*
* @param IndexParam, index paramters.
*
* @return Status
*/
rpc CreateIndex(IndexParam) returns (Status) {}
/**
* @brief This method is used to describe index
*
* @param IndexParam, target index.
*
* @return IndexParam
*/
rpc DescribeIndex(IndexParam) returns (IndexParam) {}
/**
* @brief This method is used to drop index
*
* @param IndexParam, target field. if the IndexParam.field_name is empty, will drop all index of the collection
*
* @return Status
*/
rpc DropIndex(IndexParam) returns (Status) {}
/**
* @brief This method is used to create partition
*
* @param PartitionParam, partition parameters.
*
* @return Status
*/
rpc CreatePartition(PartitionParam) returns (Status) {}
/**
* @brief This method is used to test partition existence.
*
* @param PartitionParam, target partition.
*
* @return BoolReply
*/
rpc HasPartition(PartitionParam) returns (BoolReply) {}
/**
* @brief This method is used to show partition information
*
* @param CollectionName, target collection name.
*
* @return PartitionList
*/
rpc ShowPartitions(CollectionName) returns (PartitionList) {}
/**
* @brief This method is used to drop partition
*
* @param PartitionParam, target partition.
*
* @return Status
*/
rpc DropPartition(PartitionParam) returns (Status) {}
/**
* @brief This method is used to add vector array to collection.
*
* @param InsertParam, insert parameters.
*
* @return VectorIds
*/
rpc Insert(InsertParam) returns (EntityIds) {}
/**
* @brief This method is used to get entities data by id array.
*
* @param EntitiesIdentity, target entity id array.
*
* @return EntitiesData
*/
rpc GetEntityByID(EntityIdentity) returns (Entities) {}
/**
* @brief This method is used to get vector ids from a segment
*
* @param GetVectorIDsParam, target collection and segment
*
* @return VectorIds
*/
rpc GetEntityIDs(GetEntityIDsParam) returns (EntityIds) {}
/**
* @brief This method is used to query vector in collection.
*
* @param SearchParam, search parameters.
*
* @return KQueryResult
*/
rpc Search(SearchParam) returns (QueryResult) {}
/**
* @brief This method is used to query vector in specified files.
*
* @param SearchInSegmentParam, target segments to search.
*
* @return TopKQueryResult
*/
rpc SearchInSegment(SearchInSegmentParam) returns (QueryResult) {}
/**
* @brief This method is used to give the server status.
*
* @param Command, command string
*
* @return StringReply
*/
rpc Cmd(Command) returns (StringReply) {}
/**
* @brief This method is used to delete vector by id
*
* @param DeleteByIDParam, delete parameters.
*
* @return status
*/
rpc DeleteByID(DeleteByIDParam) returns (Status) {}
/**
* @brief This method is used to preload collection
*
* @param CollectionName, target collection name.
*
* @return Status
*/
rpc PreloadCollection(CollectionName) returns (Status) {}
/**
* @brief This method is used to flush buffer into storage.
*
* @param FlushParam, flush parameters
*
* @return Status
*/
rpc Flush(FlushParam) returns (Status) {}
/**
* @brief This method is used to compact collection
*
* @param CompactParam, compact parameters
*
* @return Status
*/
rpc Compact(CompactParam) returns (Status) {}
/********************************New Interface********************************************/
rpc SearchPB(SearchParamPB) returns (QueryResult) {}
}
////////////////////pulsar//////////////////////////////////////
enum OpType {
INSERT = 0;
DELETE = 1;
}
message InsertOrDeleteMsg {
string collection_name = 1;
RowData rows_data = 2;
int64 uid = 3; //optional
string partition_tag = 4;
int64 timestamp =5;
int64 segment_id = 6;
int64 channel_id = 7;
OpType op = 8;
int64 client_id = 9;
repeated KeyValuePair extra_params = 10;
}
message SearchMsg {
string collection_name = 1;
VectorRowRecord records = 2;
string partition_tag = 3;
int64 uid = 4;
int64 timestamp =5;
int64 client_id = 6;
repeated KeyValuePair extra_params = 7;
}
enum SyncType {
READ = 0;
WRITE = 1;
}
message TimeSyncMsg{
int64 peer_Id = 1;
int64 Timestamp = 2;
SyncType sync_type = 3;
}
message SegmentRecord {
int64 uid = 1;
repeated int64 segment_id = 2;
}
message Key2SegMsg {
int64 client_id = 1;
SegmentRecord records = 2;
}

View File

@ -2,7 +2,7 @@ set(src-cpp
Client.cpp
Consumer.cpp
Producer.cpp
pb/pulsar.pb.cc)
pb/suvlim.pb.cc)
add_library(message_client_cpp SHARED
${src-cpp}

View File

@ -2,6 +2,7 @@
#include "pulsar/Producer.h"
#include "Client.h"
#include "pb/suvlim.pb.h"
namespace message_client {
@ -15,6 +16,9 @@ public:
Result createProducer(const std::string& topic);
Result send(const Message& msg);
Result send(const std::string& msg);
Result send(const suvlim::grpc::InsertOrDeleteMsg& msg);
Result send(const suvlim::grpc::SearchMsg& msg);
Result Send(const suvlim::grpc::GetEntityIDsParam)
Result close();
const Producer&

View File

@ -1,3 +1,4 @@
#!/usr/bin/env bash
../../../../cmake-build-debug/thirdparty/grpc/grpc-build/third_party/protobuf/protoc -I=./ --cpp_out=./ pulsar.proto
#../../../../cmake-build-debug/thirdparty/grpc/grpc-build/third_party/protobuf/protoc -I=./ --cpp_out=./ pulsar.proto
protoc -I=./ --cpp_out=./ suvlim.proto

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,715 @@
syntax = "proto3";
package suvlim.grpc;
enum ErrorCode {
SUCCESS = 0;
UNEXPECTED_ERROR = 1;
CONNECT_FAILED = 2;
PERMISSION_DENIED = 3;
COLLECTION_NOT_EXISTS = 4;
ILLEGAL_ARGUMENT = 5;
ILLEGAL_DIMENSION = 7;
ILLEGAL_INDEX_TYPE = 8;
ILLEGAL_COLLECTION_NAME = 9;
ILLEGAL_TOPK = 10;
ILLEGAL_ROWRECORD = 11;
ILLEGAL_VECTOR_ID = 12;
ILLEGAL_SEARCH_RESULT = 13;
FILE_NOT_FOUND = 14;
META_FAILED = 15;
CACHE_FAILED = 16;
CANNOT_CREATE_FOLDER = 17;
CANNOT_CREATE_FILE = 18;
CANNOT_DELETE_FOLDER = 19;
CANNOT_DELETE_FILE = 20;
BUILD_INDEX_ERROR = 21;
ILLEGAL_NLIST = 22;
ILLEGAL_METRIC_TYPE = 23;
OUT_OF_MEMORY = 24;
}
message Status {
ErrorCode error_code = 1;
string reason = 2;
}
/**
* @brief Field data type
*/
enum DataType {
NONE = 0;
BOOL = 1;
INT8 = 2;
INT16 = 3;
INT32 = 4;
INT64 = 5;
FLOAT = 10;
DOUBLE = 11;
STRING = 20;
VECTOR_BINARY = 100;
VECTOR_FLOAT = 101;
}
/**
* @brief General usage
*/
message KeyValuePair {
string key = 1;
string value = 2;
}
/**
* @brief Collection name
*/
message CollectionName {
string collection_name = 1;
}
/**
* @brief Collection name list
*/
message CollectionNameList {
Status status = 1;
repeated string collection_names = 2;
}
/**
* @brief Field name
*/
message FieldName {
string collection_name = 1;
string field_name = 2;
}
/**
* @brief Collection mapping
* @extra_params: key-value pair for extra parameters of the collection
* typically usage:
* extra_params["params"] = {segment_row_count: 1000000, auto_id: true}
* Note:
* the segment_row_count specify segment row count limit for merging
* the auto_id = true means entity id is auto-generated by milvus
*/
message Mapping {
Status status = 1;
string collection_name = 2;
repeated FieldParam fields = 3;
repeated KeyValuePair extra_params = 4;
}
/**
* @brief Collection mapping list
*/
message MappingList {
Status status = 1;
repeated Mapping mapping_list = 2;
}
/**
* @brief Parameters of partition
*/
message PartitionParam {
string collection_name = 1;
string tag = 2;
}
/**
* @brief Partition list
*/
message PartitionList {
Status status = 1;
repeated string partition_tag_array = 2;
}
/**
* @brief Vector row record
*/
message VectorRowRecord {
repeated float float_data = 1; //float vector data
bytes binary_data = 2; //binary vector data
}
message EntityIds {
Status status = 1;
repeated int64 entity_id_array = 2;
}
message VectorRecord {
repeated VectorRowRecord records = 1;
}
message VectorParam {
string json = 1;
VectorRecord row_record = 2;
}
//////////////////////////row schema and data///////////////////////////////////
/**
* @brief schema
*/
message FieldMeta {
string field_name = 1;
DataType type = 2;
int64 dim = 3;
}
message Schema {
repeated FieldMeta field_metas = 1;
}
message RowData {
bytes blob = 1;
}
//////////////////////suvlim-proxy///////////////////////////////////
message InsertParam {
string collection_name = 1;
Schema schema = 2;
repeated RowData rows_data = 3;
repeated int64 entity_id_array = 4; //optional
string partition_tag = 5;
repeated KeyValuePair extra_params = 6;
}
message SearchParam {
string collection_name = 1;
repeated VectorParam vector_param = 2;
string dsl = 3; //optional
repeated string partition_tag = 4; //why
repeated KeyValuePair extra_params = 5;
}
message SearchInSegmentParam {
repeated string file_id_array = 1;
SearchParam search_param = 2;
}
message Entities {
Status status = 1;
repeated int64 ids = 2;
repeated bool valid_row = 3;
repeated RowData rows_data = 4;
}
///////////////////////////milvus-server///////////////////////////
/**
* @brief Query result
*/
message QueryResult {
Status status = 1;
Entities entities = 2;
int64 row_num = 3;
repeated float scores = 4;
repeated float distances = 5;
repeated KeyValuePair extra_params = 6;
}
/**
* @brief Server string Reply
*/
message StringReply {
Status status = 1;
string string_reply = 2;
}
/**
* @brief Server bool Reply
*/
message BoolReply {
Status status = 1;
bool bool_reply = 2;
}
/**
* @brief Return collection row count
*/
message CollectionRowCount {
Status status = 1;
int64 collection_row_count = 2;
}
/**
* @brief Server command parameters
*/
message Command {
string cmd = 1;
}
/**
* @brief Index params
* @collection_name: target collection
* @field_name: target field
* @index_name: a name for index provided by user, unique within this field
* @extra_params: index parameters in json format
* for vector field:
* extra_params["index_type"] = one of the values: FLAT, IVF_LAT, IVF_SQ8, NSGMIX, IVFSQ8H,
* PQ, HNSW, HNSW_SQ8NM, ANNOY
* extra_params["metric_type"] = one of the values: L2, IP, HAMMING, JACCARD, TANIMOTO
* SUBSTRUCTURE, SUPERSTRUCTURE
* extra_params["params"] = extra parameters for index, for example ivflat: {nlist: 2048}
* for structured field:
* extra_params["index_type"] = one of the values: SORTED
*/
message IndexParam {
Status status = 1;
string collection_name = 2;
string field_name = 3;
string index_name = 4;
repeated KeyValuePair extra_params = 5;
}
/**
* @brief Parameters for flush action
*/
message FlushParam {
repeated string collection_name_array = 1;
}
/**
* @brief Parameters for flush action
*/
message CompactParam {
string collection_name = 1;
double threshold = 2;
}
/**
* @brief Parameters for deleting entities by id
*/
message DeleteByIDParam {
string collection_name = 1;
repeated int64 id_array = 2;
}
/**
* @brief Return collection stats
* @json_info: collection stats in json format, typically, the format is like:
* {
* row_count: xxx,
* data_size: xxx,
* partitions: [
* {
* tag: xxx,
* id: xxx,
* row_count: xxx,
* data_size: xxx,
* segments: [
* {
* id: xxx,
* row_count: xxx,
* data_size: xxx,
* files: [
* {
* field: xxx,
* name: xxx,
* index_type: xxx,
* path: xxx,
* data_size: xxx,
* }
* ]
* }
* ]
* }
* ]
* }
*/
message CollectionInfo {
Status status = 1;
string json_info = 2;
}
/**
* @brief Parameters for returning entities id of a segment
*/
message GetEntityIDsParam {
string collection_name = 1;
int64 segment_id = 2;
}
/**
* @brief Entities identity
*/
message EntityIdentity {
string collection_name = 1;
repeated int64 id_array = 2;
repeated string field_names = 3;
}
/********************************************SearchPB interface***************************************************/
/**
* @brief Vector field parameters
*/
message VectorFieldParam {
int64 dimension = 1;
}
/**
* @brief Field type
*/
message FieldType {
oneof value {
DataType data_type = 1;
VectorFieldParam vector_param = 2;
}
}
/**
* @brief Field parameters
*/
message FieldParam {
uint64 id = 1;
string name = 2;
DataType type = 3;
repeated KeyValuePair index_params = 4;
repeated KeyValuePair extra_params = 5;
}
/**
* @brief Vector field record
*/
message VectorFieldRecord {
repeated VectorRowRecord value = 1;
}
///////////////////////////////////////////////////////////////////
message TermQuery {
string field_name = 1;
repeated int64 int_value = 2;
repeated double double_value = 3;
int64 value_num = 4;
float boost = 5;
repeated KeyValuePair extra_params = 6;
}
enum CompareOperator {
LT = 0;
LTE = 1;
EQ = 2;
GT = 3;
GTE = 4;
NE = 5;
}
message CompareExpr {
CompareOperator operator = 1;
string operand = 2;
}
message RangeQuery {
string field_name = 1;
repeated CompareExpr operand = 2;
float boost = 3;
repeated KeyValuePair extra_params = 4;
}
message VectorQuery {
string field_name = 1;
float query_boost = 2;
repeated VectorRowRecord records = 3;
int64 topk = 4;
repeated KeyValuePair extra_params = 5;
}
enum Occur {
INVALID = 0;
MUST = 1;
SHOULD = 2;
MUST_NOT = 3;
}
message BooleanQuery {
Occur occur = 1;
repeated GeneralQuery general_query = 2;
}
message GeneralQuery {
oneof query {
BooleanQuery boolean_query = 1;
TermQuery term_query = 2;
RangeQuery range_query = 3;
VectorQuery vector_query = 4;
}
}
message SearchParamPB {
string collection_name = 1;
repeated string partition_tag_array = 2;
GeneralQuery general_query = 3;
repeated KeyValuePair extra_params = 4;
}
service MilvusService {
/**
* @brief This method is used to create collection
*
* @param CollectionSchema, use to provide collection information to be created.
*
* @return Status
*/
rpc CreateCollection(Mapping) returns (Status){}
/**
* @brief This method is used to test collection existence.
*
* @param CollectionName, collection name is going to be tested.
*
* @return BoolReply
*/
rpc HasCollection(CollectionName) returns (BoolReply) {}
/**
* @brief This method is used to get collection schema.
*
* @param CollectionName, target collection name.
*
* @return CollectionSchema
*/
rpc DescribeCollection(CollectionName) returns (Mapping) {}
/**
* @brief This method is used to get collection schema.
*
* @param CollectionName, target collection name.
*
* @return CollectionRowCount
*/
rpc CountCollection(CollectionName) returns (CollectionRowCount) {}
/**
* @brief This method is used to list all collections.
*
* @param Command, dummy parameter.
*
* @return CollectionNameList
*/
rpc ShowCollections(Command) returns (CollectionNameList) {}
/**
* @brief This method is used to get collection detail information.
*
* @param CollectionName, target collection name.
*
* @return CollectionInfo
*/
rpc ShowCollectionInfo(CollectionName) returns (CollectionInfo) {}
/**
* @brief This method is used to delete collection.
*
* @param CollectionName, collection name is going to be deleted.
*
* @return Status
*/
rpc DropCollection(CollectionName) returns (Status) {}
/**
* @brief This method is used to build index by collection in sync mode.
*
* @param IndexParam, index paramters.
*
* @return Status
*/
rpc CreateIndex(IndexParam) returns (Status) {}
/**
* @brief This method is used to describe index
*
* @param IndexParam, target index.
*
* @return IndexParam
*/
rpc DescribeIndex(IndexParam) returns (IndexParam) {}
/**
* @brief This method is used to drop index
*
* @param IndexParam, target field. if the IndexParam.field_name is empty, will drop all index of the collection
*
* @return Status
*/
rpc DropIndex(IndexParam) returns (Status) {}
/**
* @brief This method is used to create partition
*
* @param PartitionParam, partition parameters.
*
* @return Status
*/
rpc CreatePartition(PartitionParam) returns (Status) {}
/**
* @brief This method is used to test partition existence.
*
* @param PartitionParam, target partition.
*
* @return BoolReply
*/
rpc HasPartition(PartitionParam) returns (BoolReply) {}
/**
* @brief This method is used to show partition information
*
* @param CollectionName, target collection name.
*
* @return PartitionList
*/
rpc ShowPartitions(CollectionName) returns (PartitionList) {}
/**
* @brief This method is used to drop partition
*
* @param PartitionParam, target partition.
*
* @return Status
*/
rpc DropPartition(PartitionParam) returns (Status) {}
/**
* @brief This method is used to add vector array to collection.
*
* @param InsertParam, insert parameters.
*
* @return VectorIds
*/
rpc Insert(InsertParam) returns (EntityIds) {}
/**
* @brief This method is used to get entities data by id array.
*
* @param EntitiesIdentity, target entity id array.
*
* @return EntitiesData
*/
rpc GetEntityByID(EntityIdentity) returns (Entities) {}
/**
* @brief This method is used to get vector ids from a segment
*
* @param GetVectorIDsParam, target collection and segment
*
* @return VectorIds
*/
rpc GetEntityIDs(GetEntityIDsParam) returns (EntityIds) {}
/**
* @brief This method is used to query vector in collection.
*
* @param SearchParam, search parameters.
*
* @return KQueryResult
*/
rpc Search(SearchParam) returns (QueryResult) {}
/**
* @brief This method is used to query vector in specified files.
*
* @param SearchInSegmentParam, target segments to search.
*
* @return TopKQueryResult
*/
rpc SearchInSegment(SearchInSegmentParam) returns (QueryResult) {}
/**
* @brief This method is used to give the server status.
*
* @param Command, command string
*
* @return StringReply
*/
rpc Cmd(Command) returns (StringReply) {}
/**
* @brief This method is used to delete vector by id
*
* @param DeleteByIDParam, delete parameters.
*
* @return status
*/
rpc DeleteByID(DeleteByIDParam) returns (Status) {}
/**
* @brief This method is used to preload collection
*
* @param CollectionName, target collection name.
*
* @return Status
*/
rpc PreloadCollection(CollectionName) returns (Status) {}
/**
* @brief This method is used to flush buffer into storage.
*
* @param FlushParam, flush parameters
*
* @return Status
*/
rpc Flush(FlushParam) returns (Status) {}
/**
* @brief This method is used to compact collection
*
* @param CompactParam, compact parameters
*
* @return Status
*/
rpc Compact(CompactParam) returns (Status) {}
/********************************New Interface********************************************/
rpc SearchPB(SearchParamPB) returns (QueryResult) {}
}
////////////////////pulsar//////////////////////////////////////
enum OpType {
INSERT = 0;
DELETE = 1;
SEARCH = 2;
SEARCH_RESULT = 3;
}
message InsertOrDeleteMsg {
string collection_name = 1;
RowData rows_data = 2;
int64 uid = 3; //optional
string partition_tag = 4;
int64 timestamp =5;
int64 segment_id = 6;
int64 channel_id = 7;
OpType op = 8;
int64 client_id = 9;
repeated KeyValuePair extra_params = 10;
}
message SearchMsg {
string collection_name = 1;
VectorRowRecord records = 2;
string partition_tag = 3;
int64 uid = 4;
int64 timestamp =5;
int64 client_id = 6;
repeated KeyValuePair extra_params = 7;
}
message TimeSyncMsg{
int64 ClientId = 1;
int64 Timestamp = 2;
OpType MsgType = 3;
}
message SegmentRecord {
int64 uid = 1;
repeated int64 segment_id = 2;
}
message Key2SegMsg {
int64 client_id = 1;
SegmentRecord records = 2;
OpType msg_type = 3;
}

View File

@ -11,5 +11,6 @@
#define MILVUS_VERSION "0.10.0"
#define BUILD_TYPE "Debug"
#define BUILD_TIME "2020-09-03 16:41.13"
#define LAST_COMMIT_ID "a97d3e925e993f4a2a7409a9734dcc788f109767"

View File

@ -6,8 +6,8 @@ TEST(CLIENT_CPP, Producer) {
auto client= std::make_shared<message_client::MsgClient>("pulsar://localhost:6650");
message_client::MsgProducer producer(client,"test");
pb::TestData data;
data.set_id("test");
data.set_name("hahah");
data.set_id("100");
data.set_name("pulsar");
std::string to_string = data.SerializeAsString();
producer.send(to_string);
producer.close();

View File

@ -198,4 +198,4 @@ func (mc *MessageClient) PrepareBatchMsg(jobType JobType) {
mc.PrepareMsg(schema.Key2Seg, key2segLen)
mc.PrepareMsg(schema.Search, searchLen)
}
}
}

View File

@ -1,4 +1,4 @@
#!/usr/bin/env bash
pkg=pb
protoc --go_out=import_path=${pkg}:. pulsar.proto
protoc --go_out=import_path=${pkg}:. suvlim.proto

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,716 @@
syntax = "proto3";
package suvlim.grpc;
enum ErrorCode {
SUCCESS = 0;
UNEXPECTED_ERROR = 1;
CONNECT_FAILED = 2;
PERMISSION_DENIED = 3;
COLLECTION_NOT_EXISTS = 4;
ILLEGAL_ARGUMENT = 5;
ILLEGAL_DIMENSION = 7;
ILLEGAL_INDEX_TYPE = 8;
ILLEGAL_COLLECTION_NAME = 9;
ILLEGAL_TOPK = 10;
ILLEGAL_ROWRECORD = 11;
ILLEGAL_VECTOR_ID = 12;
ILLEGAL_SEARCH_RESULT = 13;
FILE_NOT_FOUND = 14;
META_FAILED = 15;
CACHE_FAILED = 16;
CANNOT_CREATE_FOLDER = 17;
CANNOT_CREATE_FILE = 18;
CANNOT_DELETE_FOLDER = 19;
CANNOT_DELETE_FILE = 20;
BUILD_INDEX_ERROR = 21;
ILLEGAL_NLIST = 22;
ILLEGAL_METRIC_TYPE = 23;
OUT_OF_MEMORY = 24;
}
message Status {
ErrorCode error_code = 1;
string reason = 2;
}
/**
* @brief Field data type
*/
enum DataType {
NONE = 0;
BOOL = 1;
INT8 = 2;
INT16 = 3;
INT32 = 4;
INT64 = 5;
FLOAT = 10;
DOUBLE = 11;
STRING = 20;
VECTOR_BINARY = 100;
VECTOR_FLOAT = 101;
}
/**
* @brief General usage
*/
message KeyValuePair {
string key = 1;
string value = 2;
}
/**
* @brief Collection name
*/
message CollectionName {
string collection_name = 1;
}
/**
* @brief Collection name list
*/
message CollectionNameList {
Status status = 1;
repeated string collection_names = 2;
}
/**
* @brief Field name
*/
message FieldName {
string collection_name = 1;
string field_name = 2;
}
/**
* @brief Collection mapping
* @extra_params: key-value pair for extra parameters of the collection
* typically usage:
* extra_params["params"] = {segment_row_count: 1000000, auto_id: true}
* Note:
* the segment_row_count specify segment row count limit for merging
* the auto_id = true means entity id is auto-generated by milvus
*/
message Mapping {
Status status = 1;
string collection_name = 2;
repeated FieldParam fields = 3;
repeated KeyValuePair extra_params = 4;
}
/**
* @brief Collection mapping list
*/
message MappingList {
Status status = 1;
repeated Mapping mapping_list = 2;
}
/**
* @brief Parameters of partition
*/
message PartitionParam {
string collection_name = 1;
string tag = 2;
}
/**
* @brief Partition list
*/
message PartitionList {
Status status = 1;
repeated string partition_tag_array = 2;
}
/**
* @brief Vector row record
*/
message VectorRowRecord {
repeated float float_data = 1; //float vector data
bytes binary_data = 2; //binary vector data
}
message EntityIds {
Status status = 1;
repeated int64 entity_id_array = 2;
}
message VectorRecord {
repeated VectorRowRecord records = 1;
}
message VectorParam {
string json = 1;
VectorRecord row_record = 2;
}
//////////////////////////row schema and data///////////////////////////////////
/**
* @brief schema
*/
message FieldMeta {
string field_name = 1;
DataType type = 2;
int64 dim = 3;
}
message Schema {
repeated FieldMeta field_metas = 1;
}
message RowData {
bytes blob = 1;
}
//////////////////////suvlim-proxy///////////////////////////////////
message InsertParam {
string collection_name = 1;
Schema schema = 2;
repeated RowData rows_data = 3;
repeated int64 entity_id_array = 4; //optional
string partition_tag = 5;
repeated KeyValuePair extra_params = 6;
}
message SearchParam {
string collection_name = 1;
repeated VectorParam vector_param = 2;
string dsl = 3; //optional
repeated string partition_tag = 4; //why
repeated KeyValuePair extra_params = 5;
}
message SearchInSegmentParam {
repeated string file_id_array = 1;
SearchParam search_param = 2;
}
message Entities {
Status status = 1;
repeated int64 ids = 2;
repeated bool valid_row = 3;
repeated RowData rows_data = 4;
}
///////////////////////////milvus-server///////////////////////////
/**
* @brief Query result
*/
message QueryResult {
Status status = 1;
Entities entities = 2;
int64 row_num = 3;
repeated float scores = 4;
repeated float distances = 5;
repeated KeyValuePair extra_params = 6;
}
/**
* @brief Server string Reply
*/
message StringReply {
Status status = 1;
string string_reply = 2;
}
/**
* @brief Server bool Reply
*/
message BoolReply {
Status status = 1;
bool bool_reply = 2;
}
/**
* @brief Return collection row count
*/
message CollectionRowCount {
Status status = 1;
int64 collection_row_count = 2;
}
/**
* @brief Server command parameters
*/
message Command {
string cmd = 1;
}
/**
* @brief Index params
* @collection_name: target collection
* @field_name: target field
* @index_name: a name for index provided by user, unique within this field
* @extra_params: index parameters in json format
* for vector field:
* extra_params["index_type"] = one of the values: FLAT, IVF_LAT, IVF_SQ8, NSGMIX, IVFSQ8H,
* PQ, HNSW, HNSW_SQ8NM, ANNOY
* extra_params["metric_type"] = one of the values: L2, IP, HAMMING, JACCARD, TANIMOTO
* SUBSTRUCTURE, SUPERSTRUCTURE
* extra_params["params"] = extra parameters for index, for example ivflat: {nlist: 2048}
* for structured field:
* extra_params["index_type"] = one of the values: SORTED
*/
message IndexParam {
Status status = 1;
string collection_name = 2;
string field_name = 3;
string index_name = 4;
repeated KeyValuePair extra_params = 5;
}
/**
* @brief Parameters for flush action
*/
message FlushParam {
repeated string collection_name_array = 1;
}
/**
* @brief Parameters for flush action
*/
message CompactParam {
string collection_name = 1;
double threshold = 2;
}
/**
* @brief Parameters for deleting entities by id
*/
message DeleteByIDParam {
string collection_name = 1;
repeated int64 id_array = 2;
}
/**
* @brief Return collection stats
* @json_info: collection stats in json format, typically, the format is like:
* {
* row_count: xxx,
* data_size: xxx,
* partitions: [
* {
* tag: xxx,
* id: xxx,
* row_count: xxx,
* data_size: xxx,
* segments: [
* {
* id: xxx,
* row_count: xxx,
* data_size: xxx,
* files: [
* {
* field: xxx,
* name: xxx,
* index_type: xxx,
* path: xxx,
* data_size: xxx,
* }
* ]
* }
* ]
* }
* ]
* }
*/
message CollectionInfo {
Status status = 1;
string json_info = 2;
}
/**
* @brief Parameters for returning entities id of a segment
*/
message GetEntityIDsParam {
string collection_name = 1;
int64 segment_id = 2;
}
/**
* @brief Entities identity
*/
message EntityIdentity {
string collection_name = 1;
repeated int64 id_array = 2;
repeated string field_names = 3;
}
/********************************************SearchPB interface***************************************************/
/**
* @brief Vector field parameters
*/
message VectorFieldParam {
int64 dimension = 1;
}
/**
* @brief Field type
*/
message FieldType {
oneof value {
DataType data_type = 1;
VectorFieldParam vector_param = 2;
}
}
/**
* @brief Field parameters
*/
message FieldParam {
uint64 id = 1;
string name = 2;
DataType type = 3;
repeated KeyValuePair index_params = 4;
repeated KeyValuePair extra_params = 5;
}
/**
* @brief Vector field record
*/
message VectorFieldRecord {
repeated VectorRowRecord value = 1;
}
///////////////////////////////////////////////////////////////////
message TermQuery {
string field_name = 1;
repeated int64 int_value = 2;
repeated double double_value = 3;
int64 value_num = 4;
float boost = 5;
repeated KeyValuePair extra_params = 6;
}
enum CompareOperator {
LT = 0;
LTE = 1;
EQ = 2;
GT = 3;
GTE = 4;
NE = 5;
}
message CompareExpr {
CompareOperator operator = 1;
string operand = 2;
}
message RangeQuery {
string field_name = 1;
repeated CompareExpr operand = 2;
float boost = 3;
repeated KeyValuePair extra_params = 4;
}
message VectorQuery {
string field_name = 1;
float query_boost = 2;
repeated VectorRowRecord records = 3;
int64 topk = 4;
repeated KeyValuePair extra_params = 5;
}
enum Occur {
INVALID = 0;
MUST = 1;
SHOULD = 2;
MUST_NOT = 3;
}
message BooleanQuery {
Occur occur = 1;
repeated GeneralQuery general_query = 2;
}
message GeneralQuery {
oneof query {
BooleanQuery boolean_query = 1;
TermQuery term_query = 2;
RangeQuery range_query = 3;
VectorQuery vector_query = 4;
}
}
message SearchParamPB {
string collection_name = 1;
repeated string partition_tag_array = 2;
GeneralQuery general_query = 3;
repeated KeyValuePair extra_params = 4;
}
service MilvusService {
/**
* @brief This method is used to create collection
*
* @param CollectionSchema, use to provide collection information to be created.
*
* @return Status
*/
rpc CreateCollection(Mapping) returns (Status){}
/**
* @brief This method is used to test collection existence.
*
* @param CollectionName, collection name is going to be tested.
*
* @return BoolReply
*/
rpc HasCollection(CollectionName) returns (BoolReply) {}
/**
* @brief This method is used to get collection schema.
*
* @param CollectionName, target collection name.
*
* @return CollectionSchema
*/
rpc DescribeCollection(CollectionName) returns (Mapping) {}
/**
* @brief This method is used to get collection schema.
*
* @param CollectionName, target collection name.
*
* @return CollectionRowCount
*/
rpc CountCollection(CollectionName) returns (CollectionRowCount) {}
/**
* @brief This method is used to list all collections.
*
* @param Command, dummy parameter.
*
* @return CollectionNameList
*/
rpc ShowCollections(Command) returns (CollectionNameList) {}
/**
* @brief This method is used to get collection detail information.
*
* @param CollectionName, target collection name.
*
* @return CollectionInfo
*/
rpc ShowCollectionInfo(CollectionName) returns (CollectionInfo) {}
/**
* @brief This method is used to delete collection.
*
* @param CollectionName, collection name is going to be deleted.
*
* @return Status
*/
rpc DropCollection(CollectionName) returns (Status) {}
/**
* @brief This method is used to build index by collection in sync mode.
*
* @param IndexParam, index paramters.
*
* @return Status
*/
rpc CreateIndex(IndexParam) returns (Status) {}
/**
* @brief This method is used to describe index
*
* @param IndexParam, target index.
*
* @return IndexParam
*/
rpc DescribeIndex(IndexParam) returns (IndexParam) {}
/**
* @brief This method is used to drop index
*
* @param IndexParam, target field. if the IndexParam.field_name is empty, will drop all index of the collection
*
* @return Status
*/
rpc DropIndex(IndexParam) returns (Status) {}
/**
* @brief This method is used to create partition
*
* @param PartitionParam, partition parameters.
*
* @return Status
*/
rpc CreatePartition(PartitionParam) returns (Status) {}
/**
* @brief This method is used to test partition existence.
*
* @param PartitionParam, target partition.
*
* @return BoolReply
*/
rpc HasPartition(PartitionParam) returns (BoolReply) {}
/**
* @brief This method is used to show partition information
*
* @param CollectionName, target collection name.
*
* @return PartitionList
*/
rpc ShowPartitions(CollectionName) returns (PartitionList) {}
/**
* @brief This method is used to drop partition
*
* @param PartitionParam, target partition.
*
* @return Status
*/
rpc DropPartition(PartitionParam) returns (Status) {}
/**
* @brief This method is used to add vector array to collection.
*
* @param InsertParam, insert parameters.
*
* @return VectorIds
*/
rpc Insert(InsertParam) returns (EntityIds) {}
/**
* @brief This method is used to get entities data by id array.
*
* @param EntitiesIdentity, target entity id array.
*
* @return EntitiesData
*/
rpc GetEntityByID(EntityIdentity) returns (Entities) {}
/**
* @brief This method is used to get vector ids from a segment
*
* @param GetVectorIDsParam, target collection and segment
*
* @return VectorIds
*/
rpc GetEntityIDs(GetEntityIDsParam) returns (EntityIds) {}
/**
* @brief This method is used to query vector in collection.
*
* @param SearchParam, search parameters.
*
* @return KQueryResult
*/
rpc Search(SearchParam) returns (QueryResult) {}
/**
* @brief This method is used to query vector in specified files.
*
* @param SearchInSegmentParam, target segments to search.
*
* @return TopKQueryResult
*/
rpc SearchInSegment(SearchInSegmentParam) returns (QueryResult) {}
/**
* @brief This method is used to give the server status.
*
* @param Command, command string
*
* @return StringReply
*/
rpc Cmd(Command) returns (StringReply) {}
/**
* @brief This method is used to delete vector by id
*
* @param DeleteByIDParam, delete parameters.
*
* @return status
*/
rpc DeleteByID(DeleteByIDParam) returns (Status) {}
/**
* @brief This method is used to preload collection
*
* @param CollectionName, target collection name.
*
* @return Status
*/
rpc PreloadCollection(CollectionName) returns (Status) {}
/**
* @brief This method is used to flush buffer into storage.
*
* @param FlushParam, flush parameters
*
* @return Status
*/
rpc Flush(FlushParam) returns (Status) {}
/**
* @brief This method is used to compact collection
*
* @param CompactParam, compact parameters
*
* @return Status
*/
rpc Compact(CompactParam) returns (Status) {}
/********************************New Interface********************************************/
rpc SearchPB(SearchParamPB) returns (QueryResult) {}
}
////////////////////pulsar//////////////////////////////////////
enum OpType {
INSERT = 0;
DELETE = 1;
}
message InsertOrDeleteMsg {
string collection_name = 1;
RowData rows_data = 2;
int64 uid = 3; //optional
string partition_tag = 4;
int64 timestamp =5;
int64 segment_id = 6;
int64 channel_id = 7;
OpType op = 8;
int64 client_id = 9;
repeated KeyValuePair extra_params = 10;
}
message SearchMsg {
string collection_name = 1;
VectorRowRecord records = 2;
string partition_tag = 3;
int64 uid = 4;
int64 timestamp =5;
int64 client_id = 6;
repeated KeyValuePair extra_params = 7;
}
enum SyncType {
READ = 0;
WRITE = 1;
}
message TimeSyncMsg{
int64 peer_Id = 1;
int64 Timestamp = 2;
SyncType sync_type = 3;
}
message SegmentRecord {
int64 uid = 1;
repeated int64 segment_id = 2;
}
message Key2SegMsg {
int64 client_id = 1;
SegmentRecord records = 2;
}

View File

@ -0,0 +1,214 @@
package client_go
import (
"context"
"github.com/apache/pulsar/pulsar-client-go/pulsar"
"github.com/golang/protobuf/proto"
"log"
"suvlim/pulsar/client-go/pb"
"suvlim/pulsar/client-go/schema"
"sync"
)
var (
SyncEofSchema = "{\"type\":\"record\",\"name\":\"suvlim\",\"namespace\":\"pulsar\",\"fields\":[" +
"{\"name\":\"MsgType\",\"type\":\"OpType\"}," +
"]}"
)
type MessageClient struct {
//message channel
insertOrDeleteChan chan *pb.InsertOrDeleteMsg
searchChan chan *pb.SearchMsg
timeSyncChan chan *pb.TimeSyncMsg
key2SegChan chan *pb.Key2SegMsg
// pulsar
client pulsar.Client
key2segProducer pulsar.Producer
writeSyncProducer pulsar.Producer
insertOrDeleteConsumer pulsar.Consumer
searchConsumer pulsar.Consumer
timeSyncConsumer pulsar.Consumer
// batch messages
InsertOrDeleteMsg []*pb.InsertOrDeleteMsg
SearchMsg []*pb.SearchMsg
timeSyncMsg []*pb.TimeSyncMsg
key2segMsg []*pb.Key2SegMsg
}
func (mc *MessageClient)ReceiveInsertOrDeleteMsg() {
for {
insetOrDeleteMsg := pb.InsertOrDeleteMsg{}
msg, err := mc.insertOrDeleteConsumer.Receive(context.Background())
err = msg.GetValue(&insetOrDeleteMsg)
if err != nil {
log.Fatal(err)
}
mc.insertOrDeleteChan <- &insetOrDeleteMsg
}
}
func (mc *MessageClient)ReceiveSearchMsg() {
for {
searchMsg := pb.SearchMsg{}
msg, err := mc.insertOrDeleteConsumer.Receive(context.Background())
err = msg.GetValue(&searchMsg)
if err != nil {
log.Fatal(err)
}
mc.searchChan <- &searchMsg
}
}
func (mc *MessageClient)ReceiveTimeSyncMsg() {
for {
timeSyncMsg := pb.TimeSyncMsg{}
msg, err := mc.insertOrDeleteConsumer.Receive(context.Background())
err = msg.GetValue(&timeSyncMsg)
if err != nil {
log.Fatal(err)
}
mc.timeSyncChan <- &timeSyncMsg
}
}
func (mc *MessageClient) ReceiveMessage() {
go mc.ReceiveInsertOrDeleteMsg()
go mc.ReceiveSearchMsg()
go mc.ReceiveTimeSyncMsg()
}
func (mc *MessageClient) CreatProducer(opType pb.OpType, topicName string) pulsar.Producer{
producer, err := mc.client.CreateProducer(pulsar.ProducerOptions{
Topic: topicName,
})
defer producer.Close()
if err != nil {
log.Fatal(err)
}
proto.Marshal()
return producer
}
func (mc *MessageClient) CreateConsumer(schemaDef string, topics []string) pulsar.Consumer {
originMsgSchema := pulsar.NewProtoSchema(schemaDef, nil)
consumer, err := mc.client.SubscribeWithSchema(pulsar.ConsumerOptions{
Topics: topics,
SubscriptionName: "multi-topic-sub",
}, originMsgSchema)
defer consumer.Close()
if err != nil {
log.Fatal(err)
}
return consumer
}
func (mc *MessageClient) CreateClient(url string) pulsar.Client {
// create client
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: url,
})
defer client.Close()
if err != nil {
log.Fatal(err)
}
return client
}
func (mc *MessageClient) InitClient(url string, topics []string, consumerMsgSchema string) {
//create client
mc.client = mc.CreateClient(url)
//create producer
for topicIndex := range topics {
if topics[topicIndex] == "insert" {
mc.key2segProducer = mc.CreatProducer(SyncEofSchema, "insert")
}
if topics[topicIndex] == "delete" {
mc.syncDeleteProducer = mc.CreatProducer(SyncEofSchema, "delete")
}
if topics[topicIndex] == "key2seg" {
mc.syncInsertProducer = mc.CreatProducer(SyncEofSchema, "key2seg")
}
}
mc.syncInsertProducer = mc.CreatProducer(SyncEofSchema, "insert")
mc.syncDeleteProducer = mc.CreatProducer(SyncEofSchema, "delete")
mc.key2segProducer = mc.CreatProducer(SyncEofSchema, "key2seg")
//create consumer
mc.consumer = mc.CreateConsumer(consumerMsgSchema, topics)
// init channel
mc.insertChan = make(chan *schema.InsertMsg, 1000)
mc.deleteChan = make(chan *schema.DeleteMsg, 1000)
mc.searchChan = make(chan *schema.SearchMsg, 1000)
mc.timeSyncChan = make(chan *schema.TimeSyncMsg, 1000)
mc.key2SegChan = make(chan *schema.Key2SegMsg, 1000)
}
type JobType int
const (
OpInQueryNode JobType = 0
OpInWriteNode JobType = 1
)
func (mc *MessageClient) PrepareMsg(opType schema.OpType, msgLen int) {
switch opType {
case schema.Insert:
for i := 0; i < msgLen; i++ {
msg := <- mc.insertChan
mc.InsertMsg[i] = msg
}
case schema.Delete:
for i := 0; i < msgLen; i++ {
msg := <- mc.deleteChan
mc.DeleteMsg[i] = msg
}
case schema.Search:
for i := 0; i < msgLen; i++ {
msg := <-mc.searchChan
mc.SearchMsg[i] = msg
}
case schema.TimeSync:
for i := 0; i < msgLen; i++ {
msg := <- mc.timeSyncChan
mc.timeMsg[i] = msg
}
case schema.Key2Seg:
for i := 0; i < msgLen; i++ {
msg := <-mc.key2SegChan
mc.key2segMsg[i] = msg
}
}
}
func (mc *MessageClient) PrepareBatchMsg(jobType JobType) {
// assume the channel not full
mc.InsertMsg = make([]*schema.InsertMsg, 1000)
mc.DeleteMsg = make([]*schema.DeleteMsg, 1000)
mc.SearchMsg = make([]*schema.SearchMsg, 1000)
mc.timeMsg = make([]*schema.TimeSyncMsg, 1000)
mc.key2segMsg = make([]*schema.Key2SegMsg, 1000)
// ensure all messages before time in timeSyncTopic have been push into channel
// get the length of every channel
insertLen := len(mc.insertChan)
deleteLen := len(mc.deleteChan)
searchLen := len(mc.searchChan)
timeLen := len(mc.timeSyncChan)
key2segLen := len(mc.key2SegChan)
// get message from channel to slice
mc.PrepareMsg(schema.Insert, insertLen)
mc.PrepareMsg(schema.Delete, deleteLen)
mc.PrepareMsg(schema.TimeSync, timeLen)
if jobType == OpInQueryNode {
mc.PrepareMsg(schema.Key2Seg, key2segLen)
mc.PrepareMsg(schema.Search, searchLen)
}
}

4
writer/client-go/pb/build.sh Executable file
View File

@ -0,0 +1,4 @@
#!/usr/bin/env bash
pkg=pb
protoc --go_out=import_path=${pkg}:. suvlim.proto

View File

@ -0,0 +1,744 @@
syntax = "proto3";
package pb;
enum ErrorCode {
SUCCESS = 0;
UNEXPECTED_ERROR = 1;
CONNECT_FAILED = 2;
PERMISSION_DENIED = 3;
COLLECTION_NOT_EXISTS = 4;
ILLEGAL_ARGUMENT = 5;
ILLEGAL_DIMENSION = 7;
ILLEGAL_INDEX_TYPE = 8;
ILLEGAL_COLLECTION_NAME = 9;
ILLEGAL_TOPK = 10;
ILLEGAL_ROWRECORD = 11;
ILLEGAL_VECTOR_ID = 12;
ILLEGAL_SEARCH_RESULT = 13;
FILE_NOT_FOUND = 14;
META_FAILED = 15;
CACHE_FAILED = 16;
CANNOT_CREATE_FOLDER = 17;
CANNOT_CREATE_FILE = 18;
CANNOT_DELETE_FOLDER = 19;
CANNOT_DELETE_FILE = 20;
BUILD_INDEX_ERROR = 21;
ILLEGAL_NLIST = 22;
ILLEGAL_METRIC_TYPE = 23;
OUT_OF_MEMORY = 24;
}
message Status {
ErrorCode error_code = 1;
string reason = 2;
}
/**
* @brief Field data type
*/
enum DataType {
NONE = 0;
BOOL = 1;
INT8 = 2;
INT16 = 3;
INT32 = 4;
INT64 = 5;
FLOAT = 10;
DOUBLE = 11;
STRING = 20;
VECTOR_BINARY = 100;
VECTOR_FLOAT = 101;
}
/**
* @brief General usage
*/
message KeyValuePair {
string key = 1;
string value = 2;
}
/**
* @brief Collection name
*/
message CollectionName {
string collection_name = 1;
}
/**
* @brief Collection name list
*/
message CollectionNameList {
Status status = 1;
repeated string collection_names = 2;
}
/**
* @brief Field name
*/
message FieldName {
string collection_name = 1;
string field_name = 2;
}
/**
* @brief Collection mapping
* @extra_params: key-value pair for extra parameters of the collection
* typically usage:
* extra_params["params"] = {segment_row_count: 1000000, auto_id: true}
* Note:
* the segment_row_count specify segment row count limit for merging
* the auto_id = true means entity id is auto-generated by milvus
*/
message Mapping {
Status status = 1;
string collection_name = 2;
repeated FieldParam fields = 3;
repeated KeyValuePair extra_params = 4;
}
/**
* @brief Collection mapping list
*/
message MappingList {
Status status = 1;
repeated Mapping mapping_list = 2;
}
/**
* @brief Parameters of partition
*/
message PartitionParam {
string collection_name = 1;
string tag = 2;
}
/**
* @brief Partition list
*/
message PartitionList {
Status status = 1;
repeated string partition_tag_array = 2;
}
/**
* @brief Vector row record
*/
message VectorRowRecord {
repeated float float_data = 1; //float vector data
bytes binary_data = 2; //binary vector data
}
/**
* @brief Attribute record
*/
message AttrRecord {
repeated int32 int32_value = 1;
repeated int64 int64_value = 2;
repeated float float_value = 3;
repeated double double_value = 4;
}
/**
* @brief Vector records
*/
message VectorRecord {
repeated VectorRowRecord records = 1;
}
/**
* @brief Field values
*/
message FieldValue {
string field_name = 1;
DataType type = 2;
AttrRecord attr_record = 3;
VectorRecord vector_record = 4;
}
/**
* @brief Parameters for insert action
*/
message InsertParam {
string collection_name = 1;
repeated FieldValue fields = 2;
repeated int64 entity_id_array = 3; //optional
string partition_tag = 4;
repeated KeyValuePair extra_params = 5;
}
/**
* @brief Entity ids
*/
message EntityIds {
Status status = 1;
repeated int64 entity_id_array = 2;
}
/**
* @brief Search vector parameters
*/
message VectorParam {
string json = 1;
VectorRecord row_record = 2;
}
/**
* @brief Parameters for search action
* @dsl example:
* {
* "query": {
* "bool": {
* "must": [
* {
* "must":[
* {
* "should": [
* {
* "term": {
* "gender": ["male"]
* }
* },
* {
* "range": {
* "height": {"gte": "170.0", "lte": "180.0"}
* }
* }
* ]
* },
* {
* "must_not": [
* {
* "term": {
* "age": [20, 21, 22, 23, 24, 25]
* }
* },
* {
* "Range": {
* "weight": {"lte": "100"}
* }
* }
* ]
* }
* ]
* },
* {
* "must": [
* {
* "vector": {
* "face_img": {
* "topk": 10,
* "metric_type": "L2",
* "query": [],
* "params": {
* "nprobe": 10
* }
* }
* }
* }
* ]
* }
* ]
* }
* },
* "fields": ["age", "face_img"]
* }
*/
message SearchParam {
string collection_name = 1;
repeated string partition_tag_array = 2;
repeated VectorParam vector_param = 3;
string dsl = 4;
repeated KeyValuePair extra_params = 5;
}
/**
* @brief Parameters for searching in segments
*/
message SearchInSegmentParam {
repeated string file_id_array = 1;
SearchParam search_param = 2;
}
/**
* @brief Entities
*/
message Entities {
Status status = 1;
repeated int64 ids = 2;
repeated bool valid_row = 3;
repeated FieldValue fields = 4;
}
/**
* @brief Query result
*/
message QueryResult {
Status status = 1;
Entities entities = 2;
int64 row_num = 3;
repeated float scores = 4;
repeated float distances = 5;
repeated KeyValuePair extra_params = 6;
}
/**
* @brief Server string Reply
*/
message StringReply {
Status status = 1;
string string_reply = 2;
}
/**
* @brief Server bool Reply
*/
message BoolReply {
Status status = 1;
bool bool_reply = 2;
}
/**
* @brief Return collection row count
*/
message CollectionRowCount {
Status status = 1;
int64 collection_row_count = 2;
}
/**
* @brief Server command parameters
*/
message Command {
string cmd = 1;
}
/**
* @brief Index params
* @collection_name: target collection
* @field_name: target field
* @index_name: a name for index provided by user, unique within this field
* @extra_params: index parameters in json format
* for vector field:
* extra_params["index_type"] = one of the values: FLAT, IVF_LAT, IVF_SQ8, NSGMIX, IVFSQ8H,
* PQ, HNSW, HNSW_SQ8NM, ANNOY
* extra_params["metric_type"] = one of the values: L2, IP, HAMMING, JACCARD, TANIMOTO
* SUBSTRUCTURE, SUPERSTRUCTURE
* extra_params["params"] = extra parameters for index, for example ivflat: {nlist: 2048}
* for structured field:
* extra_params["index_type"] = one of the values: SORTED
*/
message IndexParam {
Status status = 1;
string collection_name = 2;
string field_name = 3;
string index_name = 4;
repeated KeyValuePair extra_params = 5;
}
/**
* @brief Parameters for flush action
*/
message FlushParam {
repeated string collection_name_array = 1;
}
/**
* @brief Parameters for flush action
*/
message CompactParam {
string collection_name = 1;
double threshold = 2;
}
/**
* @brief Parameters for deleting entities by id
*/
message DeleteByIDParam {
string collection_name = 1;
repeated int64 id_array = 2;
}
/**
* @brief Return collection stats
* @json_info: collection stats in json format, typically, the format is like:
* {
* row_count: xxx,
* data_size: xxx,
* partitions: [
* {
* tag: xxx,
* id: xxx,
* row_count: xxx,
* data_size: xxx,
* segments: [
* {
* id: xxx,
* row_count: xxx,
* data_size: xxx,
* files: [
* {
* field: xxx,
* name: xxx,
* index_type: xxx,
* path: xxx,
* data_size: xxx,
* }
* ]
* }
* ]
* }
* ]
* }
*/
message CollectionInfo {
Status status = 1;
string json_info = 2;
}
/**
* @brief Parameters for returning entities id of a segment
*/
message GetEntityIDsParam {
string collection_name = 1;
int64 segment_id = 2;
}
/**
* @brief Entities identity
*/
message EntityIdentity {
string collection_name = 1;
repeated int64 id_array = 2;
repeated string field_names = 3;
}
/********************************************SearchPB interface***************************************************/
/**
* @brief Vector field parameters
*/
message VectorFieldParam {
int64 dimension = 1;
}
/**
* @brief Field type
*/
message FieldType {
oneof value {
DataType data_type = 1;
VectorFieldParam vector_param = 2;
}
}
/**
* @brief Field parameters
*/
message FieldParam {
uint64 id = 1;
string name = 2;
DataType type = 3;
repeated KeyValuePair index_params = 4;
repeated KeyValuePair extra_params = 5;
}
/**
* @brief Vector field record
*/
message VectorFieldRecord {
repeated VectorRowRecord value = 1;
}
///////////////////////////////////////////////////////////////////
message TermQuery {
string field_name = 1;
repeated int64 int_value = 2;
repeated double double_value = 3;
int64 value_num = 4;
float boost = 5;
repeated KeyValuePair extra_params = 6;
}
enum CompareOperator {
LT = 0;
LTE = 1;
EQ = 2;
GT = 3;
GTE = 4;
NE = 5;
}
message CompareExpr {
CompareOperator operator = 1;
string operand = 2;
}
message RangeQuery {
string field_name = 1;
repeated CompareExpr operand = 2;
float boost = 3;
repeated KeyValuePair extra_params = 4;
}
message VectorQuery {
string field_name = 1;
float query_boost = 2;
repeated VectorRowRecord records = 3;
int64 topk = 4;
repeated KeyValuePair extra_params = 5;
}
enum Occur {
INVALID = 0;
MUST = 1;
SHOULD = 2;
MUST_NOT = 3;
}
message BooleanQuery {
Occur occur = 1;
repeated GeneralQuery general_query = 2;
}
message GeneralQuery {
oneof query {
BooleanQuery boolean_query = 1;
TermQuery term_query = 2;
RangeQuery range_query = 3;
VectorQuery vector_query = 4;
}
}
message SearchParamPB {
string collection_name = 1;
repeated string partition_tag_array = 2;
GeneralQuery general_query = 3;
repeated KeyValuePair extra_params = 4;
}
service MilvusService {
/**
* @brief This method is used to create collection
*
* @param CollectionSchema, use to provide collection information to be created.
*
* @return Status
*/
rpc CreateCollection(Mapping) returns (Status){}
/**
* @brief This method is used to test collection existence.
*
* @param CollectionName, collection name is going to be tested.
*
* @return BoolReply
*/
rpc HasCollection(CollectionName) returns (BoolReply) {}
/**
* @brief This method is used to get collection schema.
*
* @param CollectionName, target collection name.
*
* @return CollectionSchema
*/
rpc DescribeCollection(CollectionName) returns (Mapping) {}
/**
* @brief This method is used to get collection schema.
*
* @param CollectionName, target collection name.
*
* @return CollectionRowCount
*/
rpc CountCollection(CollectionName) returns (CollectionRowCount) {}
/**
* @brief This method is used to list all collections.
*
* @param Command, dummy parameter.
*
* @return CollectionNameList
*/
rpc ShowCollections(Command) returns (CollectionNameList) {}
/**
* @brief This method is used to get collection detail information.
*
* @param CollectionName, target collection name.
*
* @return CollectionInfo
*/
rpc ShowCollectionInfo(CollectionName) returns (CollectionInfo) {}
/**
* @brief This method is used to delete collection.
*
* @param CollectionName, collection name is going to be deleted.
*
* @return Status
*/
rpc DropCollection(CollectionName) returns (Status) {}
/**
* @brief This method is used to build index by collection in sync mode.
*
* @param IndexParam, index paramters.
*
* @return Status
*/
rpc CreateIndex(IndexParam) returns (Status) {}
/**
* @brief This method is used to describe index
*
* @param IndexParam, target index.
*
* @return IndexParam
*/
rpc DescribeIndex(IndexParam) returns (IndexParam) {}
/**
* @brief This method is used to drop index
*
* @param IndexParam, target field. if the IndexParam.field_name is empty, will drop all index of the collection
*
* @return Status
*/
rpc DropIndex(IndexParam) returns (Status) {}
/**
* @brief This method is used to create partition
*
* @param PartitionParam, partition parameters.
*
* @return Status
*/
rpc CreatePartition(PartitionParam) returns (Status) {}
/**
* @brief This method is used to test partition existence.
*
* @param PartitionParam, target partition.
*
* @return BoolReply
*/
rpc HasPartition(PartitionParam) returns (BoolReply) {}
/**
* @brief This method is used to show partition information
*
* @param CollectionName, target collection name.
*
* @return PartitionList
*/
rpc ShowPartitions(CollectionName) returns (PartitionList) {}
/**
* @brief This method is used to drop partition
*
* @param PartitionParam, target partition.
*
* @return Status
*/
rpc DropPartition(PartitionParam) returns (Status) {}
/**
* @brief This method is used to add vector array to collection.
*
* @param InsertParam, insert parameters.
*
* @return VectorIds
*/
rpc Insert(InsertParam) returns (EntityIds) {}
/**
* @brief This method is used to get entities data by id array.
*
* @param EntitiesIdentity, target entity id array.
*
* @return EntitiesData
*/
rpc GetEntityByID(EntityIdentity) returns (Entities) {}
/**
* @brief This method is used to get vector ids from a segment
*
* @param GetVectorIDsParam, target collection and segment
*
* @return VectorIds
*/
rpc GetEntityIDs(GetEntityIDsParam) returns (EntityIds) {}
/**
* @brief This method is used to query vector in collection.
*
* @param SearchParam, search parameters.
*
* @return KQueryResult
*/
rpc Search(SearchParam) returns (QueryResult) {}
/**
* @brief This method is used to query vector in specified files.
*
* @param SearchInSegmentParam, target segments to search.
*
* @return TopKQueryResult
*/
rpc SearchInSegment(SearchInSegmentParam) returns (QueryResult) {}
/**
* @brief This method is used to give the server status.
*
* @param Command, command string
*
* @return StringReply
*/
rpc Cmd(Command) returns (StringReply) {}
/**
* @brief This method is used to delete vector by id
*
* @param DeleteByIDParam, delete parameters.
*
* @return status
*/
rpc DeleteByID(DeleteByIDParam) returns (Status) {}
/**
* @brief This method is used to preload collection
*
* @param CollectionName, target collection name.
*
* @return Status
*/
rpc PreloadCollection(CollectionName) returns (Status) {}
/**
* @brief This method is used to flush buffer into storage.
*
* @param FlushParam, flush parameters
*
* @return Status
*/
rpc Flush(FlushParam) returns (Status) {}
/**
* @brief This method is used to compact collection
*
* @param CompactParam, compact parameters
*
* @return Status
*/
rpc Compact(CompactParam) returns (Status) {}
/********************************New Interface********************************************/
rpc SearchPB(SearchParamPB) returns (QueryResult) {}
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,716 @@
syntax = "proto3";
package suvlim.grpc;
enum ErrorCode {
SUCCESS = 0;
UNEXPECTED_ERROR = 1;
CONNECT_FAILED = 2;
PERMISSION_DENIED = 3;
COLLECTION_NOT_EXISTS = 4;
ILLEGAL_ARGUMENT = 5;
ILLEGAL_DIMENSION = 7;
ILLEGAL_INDEX_TYPE = 8;
ILLEGAL_COLLECTION_NAME = 9;
ILLEGAL_TOPK = 10;
ILLEGAL_ROWRECORD = 11;
ILLEGAL_VECTOR_ID = 12;
ILLEGAL_SEARCH_RESULT = 13;
FILE_NOT_FOUND = 14;
META_FAILED = 15;
CACHE_FAILED = 16;
CANNOT_CREATE_FOLDER = 17;
CANNOT_CREATE_FILE = 18;
CANNOT_DELETE_FOLDER = 19;
CANNOT_DELETE_FILE = 20;
BUILD_INDEX_ERROR = 21;
ILLEGAL_NLIST = 22;
ILLEGAL_METRIC_TYPE = 23;
OUT_OF_MEMORY = 24;
}
message Status {
ErrorCode error_code = 1;
string reason = 2;
}
/**
* @brief Field data type
*/
enum DataType {
NONE = 0;
BOOL = 1;
INT8 = 2;
INT16 = 3;
INT32 = 4;
INT64 = 5;
FLOAT = 10;
DOUBLE = 11;
STRING = 20;
VECTOR_BINARY = 100;
VECTOR_FLOAT = 101;
}
/**
* @brief General usage
*/
message KeyValuePair {
string key = 1;
string value = 2;
}
/**
* @brief Collection name
*/
message CollectionName {
string collection_name = 1;
}
/**
* @brief Collection name list
*/
message CollectionNameList {
Status status = 1;
repeated string collection_names = 2;
}
/**
* @brief Field name
*/
message FieldName {
string collection_name = 1;
string field_name = 2;
}
/**
* @brief Collection mapping
* @extra_params: key-value pair for extra parameters of the collection
* typically usage:
* extra_params["params"] = {segment_row_count: 1000000, auto_id: true}
* Note:
* the segment_row_count specify segment row count limit for merging
* the auto_id = true means entity id is auto-generated by milvus
*/
message Mapping {
Status status = 1;
string collection_name = 2;
repeated FieldParam fields = 3;
repeated KeyValuePair extra_params = 4;
}
/**
* @brief Collection mapping list
*/
message MappingList {
Status status = 1;
repeated Mapping mapping_list = 2;
}
/**
* @brief Parameters of partition
*/
message PartitionParam {
string collection_name = 1;
string tag = 2;
}
/**
* @brief Partition list
*/
message PartitionList {
Status status = 1;
repeated string partition_tag_array = 2;
}
/**
* @brief Vector row record
*/
message VectorRowRecord {
repeated float float_data = 1; //float vector data
bytes binary_data = 2; //binary vector data
}
message EntityIds {
Status status = 1;
repeated int64 entity_id_array = 2;
}
message VectorRecord {
repeated VectorRowRecord records = 1;
}
message VectorParam {
string json = 1;
VectorRecord row_record = 2;
}
//////////////////////////row schema and data///////////////////////////////////
/**
* @brief schema
*/
message FieldMeta {
string field_name = 1;
DataType type = 2;
int64 dim = 3;
}
message Schema {
repeated FieldMeta field_metas = 1;
}
message RowData {
bytes blob = 1;
}
//////////////////////suvlim-proxy///////////////////////////////////
message InsertParam {
string collection_name = 1;
Schema schema = 2;
repeated RowData rows_data = 3;
repeated int64 entity_id_array = 4; //optional
string partition_tag = 5;
repeated KeyValuePair extra_params = 6;
}
message SearchParam {
string collection_name = 1;
repeated VectorParam vector_param = 2;
string dsl = 3; //optional
repeated string partition_tag = 4; //why
repeated KeyValuePair extra_params = 5;
}
message SearchInSegmentParam {
repeated string file_id_array = 1;
SearchParam search_param = 2;
}
message Entities {
Status status = 1;
repeated int64 ids = 2;
repeated bool valid_row = 3;
repeated RowData rows_data = 4;
}
///////////////////////////milvus-server///////////////////////////
/**
* @brief Query result
*/
message QueryResult {
Status status = 1;
Entities entities = 2;
int64 row_num = 3;
repeated float scores = 4;
repeated float distances = 5;
repeated KeyValuePair extra_params = 6;
}
/**
* @brief Server string Reply
*/
message StringReply {
Status status = 1;
string string_reply = 2;
}
/**
* @brief Server bool Reply
*/
message BoolReply {
Status status = 1;
bool bool_reply = 2;
}
/**
* @brief Return collection row count
*/
message CollectionRowCount {
Status status = 1;
int64 collection_row_count = 2;
}
/**
* @brief Server command parameters
*/
message Command {
string cmd = 1;
}
/**
* @brief Index params
* @collection_name: target collection
* @field_name: target field
* @index_name: a name for index provided by user, unique within this field
* @extra_params: index parameters in json format
* for vector field:
* extra_params["index_type"] = one of the values: FLAT, IVF_LAT, IVF_SQ8, NSGMIX, IVFSQ8H,
* PQ, HNSW, HNSW_SQ8NM, ANNOY
* extra_params["metric_type"] = one of the values: L2, IP, HAMMING, JACCARD, TANIMOTO
* SUBSTRUCTURE, SUPERSTRUCTURE
* extra_params["params"] = extra parameters for index, for example ivflat: {nlist: 2048}
* for structured field:
* extra_params["index_type"] = one of the values: SORTED
*/
message IndexParam {
Status status = 1;
string collection_name = 2;
string field_name = 3;
string index_name = 4;
repeated KeyValuePair extra_params = 5;
}
/**
* @brief Parameters for flush action
*/
message FlushParam {
repeated string collection_name_array = 1;
}
/**
* @brief Parameters for flush action
*/
message CompactParam {
string collection_name = 1;
double threshold = 2;
}
/**
* @brief Parameters for deleting entities by id
*/
message DeleteByIDParam {
string collection_name = 1;
repeated int64 id_array = 2;
}
/**
* @brief Return collection stats
* @json_info: collection stats in json format, typically, the format is like:
* {
* row_count: xxx,
* data_size: xxx,
* partitions: [
* {
* tag: xxx,
* id: xxx,
* row_count: xxx,
* data_size: xxx,
* segments: [
* {
* id: xxx,
* row_count: xxx,
* data_size: xxx,
* files: [
* {
* field: xxx,
* name: xxx,
* index_type: xxx,
* path: xxx,
* data_size: xxx,
* }
* ]
* }
* ]
* }
* ]
* }
*/
message CollectionInfo {
Status status = 1;
string json_info = 2;
}
/**
* @brief Parameters for returning entities id of a segment
*/
message GetEntityIDsParam {
string collection_name = 1;
int64 segment_id = 2;
}
/**
* @brief Entities identity
*/
message EntityIdentity {
string collection_name = 1;
repeated int64 id_array = 2;
repeated string field_names = 3;
}
/********************************************SearchPB interface***************************************************/
/**
* @brief Vector field parameters
*/
message VectorFieldParam {
int64 dimension = 1;
}
/**
* @brief Field type
*/
message FieldType {
oneof value {
DataType data_type = 1;
VectorFieldParam vector_param = 2;
}
}
/**
* @brief Field parameters
*/
message FieldParam {
uint64 id = 1;
string name = 2;
DataType type = 3;
repeated KeyValuePair index_params = 4;
repeated KeyValuePair extra_params = 5;
}
/**
* @brief Vector field record
*/
message VectorFieldRecord {
repeated VectorRowRecord value = 1;
}
///////////////////////////////////////////////////////////////////
message TermQuery {
string field_name = 1;
repeated int64 int_value = 2;
repeated double double_value = 3;
int64 value_num = 4;
float boost = 5;
repeated KeyValuePair extra_params = 6;
}
enum CompareOperator {
LT = 0;
LTE = 1;
EQ = 2;
GT = 3;
GTE = 4;
NE = 5;
}
message CompareExpr {
CompareOperator operator = 1;
string operand = 2;
}
message RangeQuery {
string field_name = 1;
repeated CompareExpr operand = 2;
float boost = 3;
repeated KeyValuePair extra_params = 4;
}
message VectorQuery {
string field_name = 1;
float query_boost = 2;
repeated VectorRowRecord records = 3;
int64 topk = 4;
repeated KeyValuePair extra_params = 5;
}
enum Occur {
INVALID = 0;
MUST = 1;
SHOULD = 2;
MUST_NOT = 3;
}
message BooleanQuery {
Occur occur = 1;
repeated GeneralQuery general_query = 2;
}
message GeneralQuery {
oneof query {
BooleanQuery boolean_query = 1;
TermQuery term_query = 2;
RangeQuery range_query = 3;
VectorQuery vector_query = 4;
}
}
message SearchParamPB {
string collection_name = 1;
repeated string partition_tag_array = 2;
GeneralQuery general_query = 3;
repeated KeyValuePair extra_params = 4;
}
service MilvusService {
/**
* @brief This method is used to create collection
*
* @param CollectionSchema, use to provide collection information to be created.
*
* @return Status
*/
rpc CreateCollection(Mapping) returns (Status){}
/**
* @brief This method is used to test collection existence.
*
* @param CollectionName, collection name is going to be tested.
*
* @return BoolReply
*/
rpc HasCollection(CollectionName) returns (BoolReply) {}
/**
* @brief This method is used to get collection schema.
*
* @param CollectionName, target collection name.
*
* @return CollectionSchema
*/
rpc DescribeCollection(CollectionName) returns (Mapping) {}
/**
* @brief This method is used to get collection schema.
*
* @param CollectionName, target collection name.
*
* @return CollectionRowCount
*/
rpc CountCollection(CollectionName) returns (CollectionRowCount) {}
/**
* @brief This method is used to list all collections.
*
* @param Command, dummy parameter.
*
* @return CollectionNameList
*/
rpc ShowCollections(Command) returns (CollectionNameList) {}
/**
* @brief This method is used to get collection detail information.
*
* @param CollectionName, target collection name.
*
* @return CollectionInfo
*/
rpc ShowCollectionInfo(CollectionName) returns (CollectionInfo) {}
/**
* @brief This method is used to delete collection.
*
* @param CollectionName, collection name is going to be deleted.
*
* @return Status
*/
rpc DropCollection(CollectionName) returns (Status) {}
/**
* @brief This method is used to build index by collection in sync mode.
*
* @param IndexParam, index paramters.
*
* @return Status
*/
rpc CreateIndex(IndexParam) returns (Status) {}
/**
* @brief This method is used to describe index
*
* @param IndexParam, target index.
*
* @return IndexParam
*/
rpc DescribeIndex(IndexParam) returns (IndexParam) {}
/**
* @brief This method is used to drop index
*
* @param IndexParam, target field. if the IndexParam.field_name is empty, will drop all index of the collection
*
* @return Status
*/
rpc DropIndex(IndexParam) returns (Status) {}
/**
* @brief This method is used to create partition
*
* @param PartitionParam, partition parameters.
*
* @return Status
*/
rpc CreatePartition(PartitionParam) returns (Status) {}
/**
* @brief This method is used to test partition existence.
*
* @param PartitionParam, target partition.
*
* @return BoolReply
*/
rpc HasPartition(PartitionParam) returns (BoolReply) {}
/**
* @brief This method is used to show partition information
*
* @param CollectionName, target collection name.
*
* @return PartitionList
*/
rpc ShowPartitions(CollectionName) returns (PartitionList) {}
/**
* @brief This method is used to drop partition
*
* @param PartitionParam, target partition.
*
* @return Status
*/
rpc DropPartition(PartitionParam) returns (Status) {}
/**
* @brief This method is used to add vector array to collection.
*
* @param InsertParam, insert parameters.
*
* @return VectorIds
*/
rpc Insert(InsertParam) returns (EntityIds) {}
/**
* @brief This method is used to get entities data by id array.
*
* @param EntitiesIdentity, target entity id array.
*
* @return EntitiesData
*/
rpc GetEntityByID(EntityIdentity) returns (Entities) {}
/**
* @brief This method is used to get vector ids from a segment
*
* @param GetVectorIDsParam, target collection and segment
*
* @return VectorIds
*/
rpc GetEntityIDs(GetEntityIDsParam) returns (EntityIds) {}
/**
* @brief This method is used to query vector in collection.
*
* @param SearchParam, search parameters.
*
* @return KQueryResult
*/
rpc Search(SearchParam) returns (QueryResult) {}
/**
* @brief This method is used to query vector in specified files.
*
* @param SearchInSegmentParam, target segments to search.
*
* @return TopKQueryResult
*/
rpc SearchInSegment(SearchInSegmentParam) returns (QueryResult) {}
/**
* @brief This method is used to give the server status.
*
* @param Command, command string
*
* @return StringReply
*/
rpc Cmd(Command) returns (StringReply) {}
/**
* @brief This method is used to delete vector by id
*
* @param DeleteByIDParam, delete parameters.
*
* @return status
*/
rpc DeleteByID(DeleteByIDParam) returns (Status) {}
/**
* @brief This method is used to preload collection
*
* @param CollectionName, target collection name.
*
* @return Status
*/
rpc PreloadCollection(CollectionName) returns (Status) {}
/**
* @brief This method is used to flush buffer into storage.
*
* @param FlushParam, flush parameters
*
* @return Status
*/
rpc Flush(FlushParam) returns (Status) {}
/**
* @brief This method is used to compact collection
*
* @param CompactParam, compact parameters
*
* @return Status
*/
rpc Compact(CompactParam) returns (Status) {}
/********************************New Interface********************************************/
rpc SearchPB(SearchParamPB) returns (QueryResult) {}
}
////////////////////pulsar//////////////////////////////////////
enum OpType {
INSERT = 0;
DELETE = 1;
}
message InsertOrDeleteMsg {
string collection_name = 1;
RowData rows_data = 2;
int64 uid = 3; //optional
string partition_tag = 4;
int64 timestamp =5;
int64 segment_id = 6;
int64 channel_id = 7;
OpType op = 8;
int64 client_id = 9;
repeated KeyValuePair extra_params = 10;
}
message SearchMsg {
string collection_name = 1;
VectorRowRecord records = 2;
string partition_tag = 3;
int64 uid = 4;
int64 timestamp =5;
int64 client_id = 6;
repeated KeyValuePair extra_params = 7;
}
enum SyncType {
READ = 0;
WRITE = 1;
}
message TimeSyncMsg{
int64 peer_Id = 1;
int64 Timestamp = 2;
SyncType sync_type = 3;
}
message SegmentRecord {
int64 uid = 1;
repeated int64 segment_id = 2;
}
message Key2SegMsg {
int64 client_id = 1;
SegmentRecord records = 2;
}

View File

@ -0,0 +1,71 @@
package client_go
import (
"fmt"
"suvlim/pulsar/client-go/schema"
"sync"
"time"
)
var (
consumerQSchema = "{\"type\":\"record\",\"name\":\"suvlim\",\"namespace\":\"pulsar\",\"fields\":[" +
"{\"name\":\"MsgType\",\"type\":\"OpType\"}," +
"]}"
)
type QueryNode struct {
mc MessageClient
}
func (qn *QueryNode)doQueryNode(wg sync.WaitGroup) {
wg.Add(3)
go qn.insert_query(qn.mc.InsertMsg, wg)
go qn.delete_query(qn.mc.DeleteMsg, wg)
go qn.search_query(qn.mc.SearchMsg, wg)
wg.Wait()
}
func (qn *QueryNode) PrepareBatchMsg() {
qn.mc.PrepareBatchMsg(JobType(0))
}
func (qn *QueryNode)ReceiveMessage() {
qn.mc.ReceiveMessage()
}
func main() {
mc := MessageClient{}
topics := []string{"insert", "delete"}
mc.InitClient("pulsar://localhost:6650", topics, consumerQSchema)
qn := QueryNode{mc}
wg := sync.WaitGroup{}
go qn.ReceiveMessage()
for {
time.Sleep(200 * time.Millisecond)
qn.PrepareBatchMsg()
qn.doQueryNode(wg)
fmt.Println("do a batch in 200ms")
}
}
func (qn *QueryNode) insert_query(data []*schema.InsertMsg, wg sync.WaitGroup) schema.Status {
wg.Done()
return schema.Status{schema.ErrorCode_SUCCESS, ""}
}
func (qn *QueryNode) delete_query(data []*schema.DeleteMsg, wg sync.WaitGroup) schema.Status {
wg.Done()
return schema.Status{schema.ErrorCode_SUCCESS, ""}
}
func (qn *QueryNode) search_query(data []*schema.SearchMsg, wg sync.WaitGroup) schema.Status {
wg.Done()
return schema.Status{schema.ErrorCode_SUCCESS, ""}
}

View File

@ -0,0 +1,198 @@
package schema
import (
"encoding/json"
"fmt"
)
type ErrorCode int32
const (
ErrorCode_SUCCESS ErrorCode = 0
ErrorCode_UNEXPECTED_ERROR ErrorCode = 1
ErrorCode_CONNECT_FAILED ErrorCode = 2
ErrorCode_PERMISSION_DENIED ErrorCode = 3
ErrorCode_COLLECTION_NOT_EXISTS ErrorCode = 4
ErrorCode_ILLEGAL_ARGUMENT ErrorCode = 5
ErrorCode_ILLEGAL_DIMENSION ErrorCode = 7
ErrorCode_ILLEGAL_INDEX_TYPE ErrorCode = 8
ErrorCode_ILLEGAL_COLLECTION_NAME ErrorCode = 9
ErrorCode_ILLEGAL_TOPK ErrorCode = 10
ErrorCode_ILLEGAL_ROWRECORD ErrorCode = 11
ErrorCode_ILLEGAL_VECTOR_ID ErrorCode = 12
ErrorCode_ILLEGAL_SEARCH_RESULT ErrorCode = 13
ErrorCode_FILE_NOT_FOUND ErrorCode = 14
ErrorCode_META_FAILED ErrorCode = 15
ErrorCode_CACHE_FAILED ErrorCode = 16
ErrorCode_CANNOT_CREATE_FOLDER ErrorCode = 17
ErrorCode_CANNOT_CREATE_FILE ErrorCode = 18
ErrorCode_CANNOT_DELETE_FOLDER ErrorCode = 19
ErrorCode_CANNOT_DELETE_FILE ErrorCode = 20
ErrorCode_BUILD_INDEX_ERROR ErrorCode = 21
ErrorCode_ILLEGAL_NLIST ErrorCode = 22
ErrorCode_ILLEGAL_METRIC_TYPE ErrorCode = 23
ErrorCode_OUT_OF_MEMORY ErrorCode = 24
)
type Status struct {
Error_code ErrorCode
Reason string
}
type DataType int32
const (
NONE DataType = 0
BOOL DataType = 1
INT8 DataType = 2
INT16 DataType = 3
INT32 DataType = 4
INT64 DataType = 5
FLOAT DataType = 10
DOUBLE DataType = 11
STRING DataType = 20
VectorBinary DataType = 100
VectorFloat DataType = 101
)
type AttrRecord struct {
Int32Value int32
Int64Value int64
FloatValue float32
DoubleValue float64
}
type VectorRowRecord struct {
FloatData []float32
BinaryData []byte
}
type VectorRecord struct {
Records *VectorRowRecord
}
type FieldValue struct {
FieldName string
Type DataType
AttrRecord *AttrRecord //what's the diff with VectorRecord
VectorRecord *VectorRecord
}
type VectorParam struct {
Json string
RowRecord *VectorRecord
}
type SegmentRecord struct {
segInfo []string
}
type OpType int
const (
Insert OpType = 0
Delete OpType = 1
Search OpType = 2
TimeSync OpType = 3
Key2Seg OpType = 4
Statistics OpType = 5
EOF OpType = 6
)
type PulsarMessage struct {
CollectionName string
Fields []*FieldValue
EntityId int64
PartitionTag string
VectorParam *VectorParam
Segments []*SegmentRecord
Timestamp int64
ClientId int64
MsgType OpType
TopicName string
PartitionId int64
}
type Message interface {
GetType() OpType
Serialization() []byte
Deserialization(serializationData []byte)
}
type InsertMsg struct {
CollectionName string
Fields []*FieldValue
EntityId uint64
PartitionTag string
SegmentId uint64
Timestamp uint64
ClientId int64
MsgType OpType
}
type DeleteMsg struct {
CollectionName string
EntityId uint64
Timestamp uint64
ClientId int64
MsgType OpType
}
type SearchMsg struct {
CollectionName string
PartitionTag string
VectorParam *VectorParam
Timestamp uint64
ClientId int64
MsgType OpType
}
type TimeSyncMsg struct {
ClientId int64
Timestamp int64
MsgType OpType
}
type Key2SegMsg struct {
EntityId int64
Segments []*SegmentRecord
MsgType OpType
}
func (ims *InsertMsg) GetType() OpType {
return ims.MsgType
}
func (ims *InsertMsg) Serialization() []byte {
data, err := json.Marshal(ims)
if err != nil {
fmt.Println("Can't serialization")
}
return data
}
func (ims *InsertMsg) Deserialization(serializationData []byte) {
}
func (dms *DeleteMsg) GetType() OpType {
return dms.MsgType
}
func (sms *SearchMsg) GetType() OpType {
return sms.MsgType
}
func (tms *TimeSyncMsg) GetType() OpType {
return tms.MsgType
}
func (kms *Key2SegMsg) GetType() OpType {
return kms.MsgType
}
type SyncEofMsg struct {
MsgType OpType
}

View File

@ -0,0 +1,56 @@
package client_go
import (
"fmt"
"suvlim/pulsar/client-go/schema"
"sync"
"time"
)
var (
consumerWSchema = "{\"type\":\"record\",\"name\":\"suvlim\",\"namespace\":\"pulsar\",\"fields\":[" +
"{\"name\":\"MsgType\",\"type\":\"OpType\"}," +
"]}"
)
type WriteNode struct {
mc MessageClient
}
func (wn *WriteNode) doWriteNode(wg sync.WaitGroup) {
wg.Add(2)
go wn.insert_write(wn.mc.InsertMsg, wg)
go wn.delete_write(wn.mc.DeleteMsg, wg)
wg.Wait()
}
func (wn *WriteNode) PrepareBatchMsg() {
wn.mc.PrepareBatchMsg(JobType(1))
}
func main() {
mc := MessageClient{}
topics := []string{"insert", "delete"}
mc.InitClient("pulsar://localhost:6650", topics, consumerWSchema)
go mc.ReceiveMessage()
wg := sync.WaitGroup{}
wn := WriteNode{mc}
for {
time.Sleep(200 * time.Millisecond)
wn.PrepareBatchMsg()
wn.doWriteNode(wg)
fmt.Println("do a batch in 200ms")
}
}
func (wn *WriteNode) insert_write(data []*schema.InsertMsg, wg sync.WaitGroup) schema.Status {
wg.Done()
return schema.Status{schema.ErrorCode_SUCCESS, ""}
}
func (wn *WriteNode) delete_write(data []*schema.DeleteMsg, wg sync.WaitGroup) schema.Status {
wg.Done()
return schema.Status{schema.ErrorCode_SUCCESS, ""}
}

View File

@ -0,0 +1,19 @@
package test
import "sync"
var (
wg sync.WaitGroup
OriginMsgSchema = "{\"type\":\"record\",\"name\":\"suvlim\",\"namespace\":\"pulsar\",\"fields\":[" +
"{\"name\":\"CollectionName\",\"type\":\"string\"}," +
"{\"name\":\"Fields\",\"type\":\"[]*FieldValue\"}" +
"{\"name\":\"EntityId\",\"type\":\"int64\"}" +
"{\"name\":\"PartitionTag\",\"type\":\"string\"}" +
"{\"name\":\"VectorParam\",\"type\":\"*VectorParam\"}" +
"{\"name\":\"Segments\",\"type\":\"[]string\"}" +
"{\"name\":\"Timestamp\",\"type\":\"int64\"}" +
"{\"name\":\"ClientId\",\"type\":\"int64\"}" +
"{\"name\":\"MsgType\",\"type\":\"OpType\"}" +
"]}"
)