Refactor write node using message client

Signed-off-by: xige-16 <xi.ge@zilliz.com>
This commit is contained in:
xige-16 2020-09-04 17:52:49 +08:00 committed by yefu.chen
parent e69c19db5f
commit e13fc08d09
17 changed files with 317 additions and 2110 deletions

3
go.sum
View File

@ -44,10 +44,7 @@ github.com/apache/pulsar/pulsar-client-go v0.0.0-20200901051823-800681aaa9af h1:
github.com/apache/pulsar/pulsar-client-go v0.0.0-20200901051823-800681aaa9af/go.mod h1:QdYxU2iG99VVU6cvoBRkCgkazfJSL9WwPZ20PZR6aUk=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
<<<<<<< HEAD
github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0=
=======
>>>>>>> 2a377a76d1925a76a011a2364b47f4a199f003bf
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=

View File

View File

@ -1,214 +0,0 @@
package client_go
import (
"context"
"github.com/apache/pulsar/pulsar-client-go/pulsar"
"github.com/golang/protobuf/proto"
"log"
"suvlim/pulsar/client-go/pb"
"suvlim/pulsar/client-go/schema"
"sync"
)
var (
SyncEofSchema = "{\"type\":\"record\",\"name\":\"suvlim\",\"namespace\":\"pulsar\",\"fields\":[" +
"{\"name\":\"MsgType\",\"type\":\"OpType\"}," +
"]}"
)
type MessageClient struct {
//message channel
insertOrDeleteChan chan *pb.InsertOrDeleteMsg
searchChan chan *pb.SearchMsg
timeSyncChan chan *pb.TimeSyncMsg
key2SegChan chan *pb.Key2SegMsg
// pulsar
client pulsar.Client
key2segProducer pulsar.Producer
writeSyncProducer pulsar.Producer
insertOrDeleteConsumer pulsar.Consumer
searchConsumer pulsar.Consumer
timeSyncConsumer pulsar.Consumer
// batch messages
InsertOrDeleteMsg []*pb.InsertOrDeleteMsg
SearchMsg []*pb.SearchMsg
timeSyncMsg []*pb.TimeSyncMsg
key2segMsg []*pb.Key2SegMsg
}
func (mc *MessageClient)ReceiveInsertOrDeleteMsg() {
for {
insetOrDeleteMsg := pb.InsertOrDeleteMsg{}
msg, err := mc.insertOrDeleteConsumer.Receive(context.Background())
err = msg.GetValue(&insetOrDeleteMsg)
if err != nil {
log.Fatal(err)
}
mc.insertOrDeleteChan <- &insetOrDeleteMsg
}
}
func (mc *MessageClient)ReceiveSearchMsg() {
for {
searchMsg := pb.SearchMsg{}
msg, err := mc.insertOrDeleteConsumer.Receive(context.Background())
err = msg.GetValue(&searchMsg)
if err != nil {
log.Fatal(err)
}
mc.searchChan <- &searchMsg
}
}
func (mc *MessageClient)ReceiveTimeSyncMsg() {
for {
timeSyncMsg := pb.TimeSyncMsg{}
msg, err := mc.insertOrDeleteConsumer.Receive(context.Background())
err = msg.GetValue(&timeSyncMsg)
if err != nil {
log.Fatal(err)
}
mc.timeSyncChan <- &timeSyncMsg
}
}
func (mc *MessageClient) ReceiveMessage() {
go mc.ReceiveInsertOrDeleteMsg()
go mc.ReceiveSearchMsg()
go mc.ReceiveTimeSyncMsg()
}
func (mc *MessageClient) CreatProducer(opType pb.OpType, topicName string) pulsar.Producer{
producer, err := mc.client.CreateProducer(pulsar.ProducerOptions{
Topic: topicName,
})
defer producer.Close()
if err != nil {
log.Fatal(err)
}
proto.Marshal()
return producer
}
func (mc *MessageClient) CreateConsumer(schemaDef string, topics []string) pulsar.Consumer {
originMsgSchema := pulsar.NewProtoSchema(schemaDef, nil)
consumer, err := mc.client.SubscribeWithSchema(pulsar.ConsumerOptions{
Topics: topics,
SubscriptionName: "multi-topic-sub",
}, originMsgSchema)
defer consumer.Close()
if err != nil {
log.Fatal(err)
}
return consumer
}
func (mc *MessageClient) CreateClient(url string) pulsar.Client {
// create client
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: url,
})
defer client.Close()
if err != nil {
log.Fatal(err)
}
return client
}
func (mc *MessageClient) InitClient(url string, topics []string, consumerMsgSchema string) {
//create client
mc.client = mc.CreateClient(url)
//create producer
for topicIndex := range topics {
if topics[topicIndex] == "insert" {
mc.key2segProducer = mc.CreatProducer(SyncEofSchema, "insert")
}
if topics[topicIndex] == "delete" {
mc.syncDeleteProducer = mc.CreatProducer(SyncEofSchema, "delete")
}
if topics[topicIndex] == "key2seg" {
mc.syncInsertProducer = mc.CreatProducer(SyncEofSchema, "key2seg")
}
}
mc.syncInsertProducer = mc.CreatProducer(SyncEofSchema, "insert")
mc.syncDeleteProducer = mc.CreatProducer(SyncEofSchema, "delete")
mc.key2segProducer = mc.CreatProducer(SyncEofSchema, "key2seg")
//create consumer
mc.consumer = mc.CreateConsumer(consumerMsgSchema, topics)
// init channel
mc.insertChan = make(chan *schema.InsertMsg, 1000)
mc.deleteChan = make(chan *schema.DeleteMsg, 1000)
mc.searchChan = make(chan *schema.SearchMsg, 1000)
mc.timeSyncChan = make(chan *schema.TimeSyncMsg, 1000)
mc.key2SegChan = make(chan *schema.Key2SegMsg, 1000)
}
type JobType int
const (
OpInQueryNode JobType = 0
OpInWriteNode JobType = 1
)
func (mc *MessageClient) PrepareMsg(opType schema.OpType, msgLen int) {
switch opType {
case schema.Insert:
for i := 0; i < msgLen; i++ {
msg := <- mc.insertChan
mc.InsertMsg[i] = msg
}
case schema.Delete:
for i := 0; i < msgLen; i++ {
msg := <- mc.deleteChan
mc.DeleteMsg[i] = msg
}
case schema.Search:
for i := 0; i < msgLen; i++ {
msg := <-mc.searchChan
mc.SearchMsg[i] = msg
}
case schema.TimeSync:
for i := 0; i < msgLen; i++ {
msg := <- mc.timeSyncChan
mc.timeMsg[i] = msg
}
case schema.Key2Seg:
for i := 0; i < msgLen; i++ {
msg := <-mc.key2SegChan
mc.key2segMsg[i] = msg
}
}
}
func (mc *MessageClient) PrepareBatchMsg(jobType JobType) {
// assume the channel not full
mc.InsertMsg = make([]*schema.InsertMsg, 1000)
mc.DeleteMsg = make([]*schema.DeleteMsg, 1000)
mc.SearchMsg = make([]*schema.SearchMsg, 1000)
mc.timeMsg = make([]*schema.TimeSyncMsg, 1000)
mc.key2segMsg = make([]*schema.Key2SegMsg, 1000)
// ensure all messages before time in timeSyncTopic have been push into channel
// get the length of every channel
insertLen := len(mc.insertChan)
deleteLen := len(mc.deleteChan)
searchLen := len(mc.searchChan)
timeLen := len(mc.timeSyncChan)
key2segLen := len(mc.key2SegChan)
// get message from channel to slice
mc.PrepareMsg(schema.Insert, insertLen)
mc.PrepareMsg(schema.Delete, deleteLen)
mc.PrepareMsg(schema.TimeSync, timeLen)
if jobType == OpInQueryNode {
mc.PrepareMsg(schema.Key2Seg, key2segLen)
mc.PrepareMsg(schema.Search, searchLen)
}
}

View File

@ -1,744 +0,0 @@
syntax = "proto3";
package pb;
enum ErrorCode {
SUCCESS = 0;
UNEXPECTED_ERROR = 1;
CONNECT_FAILED = 2;
PERMISSION_DENIED = 3;
COLLECTION_NOT_EXISTS = 4;
ILLEGAL_ARGUMENT = 5;
ILLEGAL_DIMENSION = 7;
ILLEGAL_INDEX_TYPE = 8;
ILLEGAL_COLLECTION_NAME = 9;
ILLEGAL_TOPK = 10;
ILLEGAL_ROWRECORD = 11;
ILLEGAL_VECTOR_ID = 12;
ILLEGAL_SEARCH_RESULT = 13;
FILE_NOT_FOUND = 14;
META_FAILED = 15;
CACHE_FAILED = 16;
CANNOT_CREATE_FOLDER = 17;
CANNOT_CREATE_FILE = 18;
CANNOT_DELETE_FOLDER = 19;
CANNOT_DELETE_FILE = 20;
BUILD_INDEX_ERROR = 21;
ILLEGAL_NLIST = 22;
ILLEGAL_METRIC_TYPE = 23;
OUT_OF_MEMORY = 24;
}
message Status {
ErrorCode error_code = 1;
string reason = 2;
}
/**
* @brief Field data type
*/
enum DataType {
NONE = 0;
BOOL = 1;
INT8 = 2;
INT16 = 3;
INT32 = 4;
INT64 = 5;
FLOAT = 10;
DOUBLE = 11;
STRING = 20;
VECTOR_BINARY = 100;
VECTOR_FLOAT = 101;
}
/**
* @brief General usage
*/
message KeyValuePair {
string key = 1;
string value = 2;
}
/**
* @brief Collection name
*/
message CollectionName {
string collection_name = 1;
}
/**
* @brief Collection name list
*/
message CollectionNameList {
Status status = 1;
repeated string collection_names = 2;
}
/**
* @brief Field name
*/
message FieldName {
string collection_name = 1;
string field_name = 2;
}
/**
* @brief Collection mapping
* @extra_params: key-value pair for extra parameters of the collection
* typically usage:
* extra_params["params"] = {segment_row_count: 1000000, auto_id: true}
* Note:
* the segment_row_count specify segment row count limit for merging
* the auto_id = true means entity id is auto-generated by milvus
*/
message Mapping {
Status status = 1;
string collection_name = 2;
repeated FieldParam fields = 3;
repeated KeyValuePair extra_params = 4;
}
/**
* @brief Collection mapping list
*/
message MappingList {
Status status = 1;
repeated Mapping mapping_list = 2;
}
/**
* @brief Parameters of partition
*/
message PartitionParam {
string collection_name = 1;
string tag = 2;
}
/**
* @brief Partition list
*/
message PartitionList {
Status status = 1;
repeated string partition_tag_array = 2;
}
/**
* @brief Vector row record
*/
message VectorRowRecord {
repeated float float_data = 1; //float vector data
bytes binary_data = 2; //binary vector data
}
/**
* @brief Attribute record
*/
message AttrRecord {
repeated int32 int32_value = 1;
repeated int64 int64_value = 2;
repeated float float_value = 3;
repeated double double_value = 4;
}
/**
* @brief Vector records
*/
message VectorRecord {
repeated VectorRowRecord records = 1;
}
/**
* @brief Field values
*/
message FieldValue {
string field_name = 1;
DataType type = 2;
AttrRecord attr_record = 3;
VectorRecord vector_record = 4;
}
/**
* @brief Parameters for insert action
*/
message InsertParam {
string collection_name = 1;
repeated FieldValue fields = 2;
repeated int64 entity_id_array = 3; //optional
string partition_tag = 4;
repeated KeyValuePair extra_params = 5;
}
/**
* @brief Entity ids
*/
message EntityIds {
Status status = 1;
repeated int64 entity_id_array = 2;
}
/**
* @brief Search vector parameters
*/
message VectorParam {
string json = 1;
VectorRecord row_record = 2;
}
/**
* @brief Parameters for search action
* @dsl example:
* {
* "query": {
* "bool": {
* "must": [
* {
* "must":[
* {
* "should": [
* {
* "term": {
* "gender": ["male"]
* }
* },
* {
* "range": {
* "height": {"gte": "170.0", "lte": "180.0"}
* }
* }
* ]
* },
* {
* "must_not": [
* {
* "term": {
* "age": [20, 21, 22, 23, 24, 25]
* }
* },
* {
* "Range": {
* "weight": {"lte": "100"}
* }
* }
* ]
* }
* ]
* },
* {
* "must": [
* {
* "vector": {
* "face_img": {
* "topk": 10,
* "metric_type": "L2",
* "query": [],
* "params": {
* "nprobe": 10
* }
* }
* }
* }
* ]
* }
* ]
* }
* },
* "fields": ["age", "face_img"]
* }
*/
message SearchParam {
string collection_name = 1;
repeated string partition_tag_array = 2;
repeated VectorParam vector_param = 3;
string dsl = 4;
repeated KeyValuePair extra_params = 5;
}
/**
* @brief Parameters for searching in segments
*/
message SearchInSegmentParam {
repeated string file_id_array = 1;
SearchParam search_param = 2;
}
/**
* @brief Entities
*/
message Entities {
Status status = 1;
repeated int64 ids = 2;
repeated bool valid_row = 3;
repeated FieldValue fields = 4;
}
/**
* @brief Query result
*/
message QueryResult {
Status status = 1;
Entities entities = 2;
int64 row_num = 3;
repeated float scores = 4;
repeated float distances = 5;
repeated KeyValuePair extra_params = 6;
}
/**
* @brief Server string Reply
*/
message StringReply {
Status status = 1;
string string_reply = 2;
}
/**
* @brief Server bool Reply
*/
message BoolReply {
Status status = 1;
bool bool_reply = 2;
}
/**
* @brief Return collection row count
*/
message CollectionRowCount {
Status status = 1;
int64 collection_row_count = 2;
}
/**
* @brief Server command parameters
*/
message Command {
string cmd = 1;
}
/**
* @brief Index params
* @collection_name: target collection
* @field_name: target field
* @index_name: a name for index provided by user, unique within this field
* @extra_params: index parameters in json format
* for vector field:
* extra_params["index_type"] = one of the values: FLAT, IVF_LAT, IVF_SQ8, NSGMIX, IVFSQ8H,
* PQ, HNSW, HNSW_SQ8NM, ANNOY
* extra_params["metric_type"] = one of the values: L2, IP, HAMMING, JACCARD, TANIMOTO
* SUBSTRUCTURE, SUPERSTRUCTURE
* extra_params["params"] = extra parameters for index, for example ivflat: {nlist: 2048}
* for structured field:
* extra_params["index_type"] = one of the values: SORTED
*/
message IndexParam {
Status status = 1;
string collection_name = 2;
string field_name = 3;
string index_name = 4;
repeated KeyValuePair extra_params = 5;
}
/**
* @brief Parameters for flush action
*/
message FlushParam {
repeated string collection_name_array = 1;
}
/**
* @brief Parameters for flush action
*/
message CompactParam {
string collection_name = 1;
double threshold = 2;
}
/**
* @brief Parameters for deleting entities by id
*/
message DeleteByIDParam {
string collection_name = 1;
repeated int64 id_array = 2;
}
/**
* @brief Return collection stats
* @json_info: collection stats in json format, typically, the format is like:
* {
* row_count: xxx,
* data_size: xxx,
* partitions: [
* {
* tag: xxx,
* id: xxx,
* row_count: xxx,
* data_size: xxx,
* segments: [
* {
* id: xxx,
* row_count: xxx,
* data_size: xxx,
* files: [
* {
* field: xxx,
* name: xxx,
* index_type: xxx,
* path: xxx,
* data_size: xxx,
* }
* ]
* }
* ]
* }
* ]
* }
*/
message CollectionInfo {
Status status = 1;
string json_info = 2;
}
/**
* @brief Parameters for returning entities id of a segment
*/
message GetEntityIDsParam {
string collection_name = 1;
int64 segment_id = 2;
}
/**
* @brief Entities identity
*/
message EntityIdentity {
string collection_name = 1;
repeated int64 id_array = 2;
repeated string field_names = 3;
}
/********************************************SearchPB interface***************************************************/
/**
* @brief Vector field parameters
*/
message VectorFieldParam {
int64 dimension = 1;
}
/**
* @brief Field type
*/
message FieldType {
oneof value {
DataType data_type = 1;
VectorFieldParam vector_param = 2;
}
}
/**
* @brief Field parameters
*/
message FieldParam {
uint64 id = 1;
string name = 2;
DataType type = 3;
repeated KeyValuePair index_params = 4;
repeated KeyValuePair extra_params = 5;
}
/**
* @brief Vector field record
*/
message VectorFieldRecord {
repeated VectorRowRecord value = 1;
}
///////////////////////////////////////////////////////////////////
message TermQuery {
string field_name = 1;
repeated int64 int_value = 2;
repeated double double_value = 3;
int64 value_num = 4;
float boost = 5;
repeated KeyValuePair extra_params = 6;
}
enum CompareOperator {
LT = 0;
LTE = 1;
EQ = 2;
GT = 3;
GTE = 4;
NE = 5;
}
message CompareExpr {
CompareOperator operator = 1;
string operand = 2;
}
message RangeQuery {
string field_name = 1;
repeated CompareExpr operand = 2;
float boost = 3;
repeated KeyValuePair extra_params = 4;
}
message VectorQuery {
string field_name = 1;
float query_boost = 2;
repeated VectorRowRecord records = 3;
int64 topk = 4;
repeated KeyValuePair extra_params = 5;
}
enum Occur {
INVALID = 0;
MUST = 1;
SHOULD = 2;
MUST_NOT = 3;
}
message BooleanQuery {
Occur occur = 1;
repeated GeneralQuery general_query = 2;
}
message GeneralQuery {
oneof query {
BooleanQuery boolean_query = 1;
TermQuery term_query = 2;
RangeQuery range_query = 3;
VectorQuery vector_query = 4;
}
}
message SearchParamPB {
string collection_name = 1;
repeated string partition_tag_array = 2;
GeneralQuery general_query = 3;
repeated KeyValuePair extra_params = 4;
}
service MilvusService {
/**
* @brief This method is used to create collection
*
* @param CollectionSchema, use to provide collection information to be created.
*
* @return Status
*/
rpc CreateCollection(Mapping) returns (Status){}
/**
* @brief This method is used to test collection existence.
*
* @param CollectionName, collection name is going to be tested.
*
* @return BoolReply
*/
rpc HasCollection(CollectionName) returns (BoolReply) {}
/**
* @brief This method is used to get collection schema.
*
* @param CollectionName, target collection name.
*
* @return CollectionSchema
*/
rpc DescribeCollection(CollectionName) returns (Mapping) {}
/**
* @brief This method is used to get collection schema.
*
* @param CollectionName, target collection name.
*
* @return CollectionRowCount
*/
rpc CountCollection(CollectionName) returns (CollectionRowCount) {}
/**
* @brief This method is used to list all collections.
*
* @param Command, dummy parameter.
*
* @return CollectionNameList
*/
rpc ShowCollections(Command) returns (CollectionNameList) {}
/**
* @brief This method is used to get collection detail information.
*
* @param CollectionName, target collection name.
*
* @return CollectionInfo
*/
rpc ShowCollectionInfo(CollectionName) returns (CollectionInfo) {}
/**
* @brief This method is used to delete collection.
*
* @param CollectionName, collection name is going to be deleted.
*
* @return Status
*/
rpc DropCollection(CollectionName) returns (Status) {}
/**
* @brief This method is used to build index by collection in sync mode.
*
* @param IndexParam, index paramters.
*
* @return Status
*/
rpc CreateIndex(IndexParam) returns (Status) {}
/**
* @brief This method is used to describe index
*
* @param IndexParam, target index.
*
* @return IndexParam
*/
rpc DescribeIndex(IndexParam) returns (IndexParam) {}
/**
* @brief This method is used to drop index
*
* @param IndexParam, target field. if the IndexParam.field_name is empty, will drop all index of the collection
*
* @return Status
*/
rpc DropIndex(IndexParam) returns (Status) {}
/**
* @brief This method is used to create partition
*
* @param PartitionParam, partition parameters.
*
* @return Status
*/
rpc CreatePartition(PartitionParam) returns (Status) {}
/**
* @brief This method is used to test partition existence.
*
* @param PartitionParam, target partition.
*
* @return BoolReply
*/
rpc HasPartition(PartitionParam) returns (BoolReply) {}
/**
* @brief This method is used to show partition information
*
* @param CollectionName, target collection name.
*
* @return PartitionList
*/
rpc ShowPartitions(CollectionName) returns (PartitionList) {}
/**
* @brief This method is used to drop partition
*
* @param PartitionParam, target partition.
*
* @return Status
*/
rpc DropPartition(PartitionParam) returns (Status) {}
/**
* @brief This method is used to add vector array to collection.
*
* @param InsertParam, insert parameters.
*
* @return VectorIds
*/
rpc Insert(InsertParam) returns (EntityIds) {}
/**
* @brief This method is used to get entities data by id array.
*
* @param EntitiesIdentity, target entity id array.
*
* @return EntitiesData
*/
rpc GetEntityByID(EntityIdentity) returns (Entities) {}
/**
* @brief This method is used to get vector ids from a segment
*
* @param GetVectorIDsParam, target collection and segment
*
* @return VectorIds
*/
rpc GetEntityIDs(GetEntityIDsParam) returns (EntityIds) {}
/**
* @brief This method is used to query vector in collection.
*
* @param SearchParam, search parameters.
*
* @return KQueryResult
*/
rpc Search(SearchParam) returns (QueryResult) {}
/**
* @brief This method is used to query vector in specified files.
*
* @param SearchInSegmentParam, target segments to search.
*
* @return TopKQueryResult
*/
rpc SearchInSegment(SearchInSegmentParam) returns (QueryResult) {}
/**
* @brief This method is used to give the server status.
*
* @param Command, command string
*
* @return StringReply
*/
rpc Cmd(Command) returns (StringReply) {}
/**
* @brief This method is used to delete vector by id
*
* @param DeleteByIDParam, delete parameters.
*
* @return status
*/
rpc DeleteByID(DeleteByIDParam) returns (Status) {}
/**
* @brief This method is used to preload collection
*
* @param CollectionName, target collection name.
*
* @return Status
*/
rpc PreloadCollection(CollectionName) returns (Status) {}
/**
* @brief This method is used to flush buffer into storage.
*
* @param FlushParam, flush parameters
*
* @return Status
*/
rpc Flush(FlushParam) returns (Status) {}
/**
* @brief This method is used to compact collection
*
* @param CompactParam, compact parameters
*
* @return Status
*/
rpc Compact(CompactParam) returns (Status) {}
/********************************New Interface********************************************/
rpc SearchPB(SearchParamPB) returns (QueryResult) {}
}

View File

@ -1,638 +0,0 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: pulsar.proto
/*
Package pb is a generated protocol buffer package.
It is generated from these files:
pulsar.proto
It has these top-level messages:
Status
SegmentRecord
VectorRowRecord
AttrRecord
VectorRecord
VectorParam
FieldValue
PulsarMessage
PulsarMessages
*/
package pb
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type ErrorCode int32
const (
ErrorCode_SUCCESS ErrorCode = 0
ErrorCode_UNEXPECTED_ERROR ErrorCode = 1
ErrorCode_CONNECT_FAILED ErrorCode = 2
ErrorCode_PERMISSION_DENIED ErrorCode = 3
ErrorCode_COLLECTION_NOT_EXISTS ErrorCode = 4
ErrorCode_ILLEGAL_ARGUMENT ErrorCode = 5
ErrorCode_ILLEGAL_DIMENSION ErrorCode = 7
ErrorCode_ILLEGAL_INDEX_TYPE ErrorCode = 8
ErrorCode_ILLEGAL_COLLECTION_NAME ErrorCode = 9
ErrorCode_ILLEGAL_TOPK ErrorCode = 10
ErrorCode_ILLEGAL_ROWRECORD ErrorCode = 11
ErrorCode_ILLEGAL_VECTOR_ID ErrorCode = 12
ErrorCode_ILLEGAL_SEARCH_RESULT ErrorCode = 13
ErrorCode_FILE_NOT_FOUND ErrorCode = 14
ErrorCode_META_FAILED ErrorCode = 15
ErrorCode_CACHE_FAILED ErrorCode = 16
ErrorCode_CANNOT_CREATE_FOLDER ErrorCode = 17
ErrorCode_CANNOT_CREATE_FILE ErrorCode = 18
ErrorCode_CANNOT_DELETE_FOLDER ErrorCode = 19
ErrorCode_CANNOT_DELETE_FILE ErrorCode = 20
ErrorCode_BUILD_INDEX_ERROR ErrorCode = 21
ErrorCode_ILLEGAL_NLIST ErrorCode = 22
ErrorCode_ILLEGAL_METRIC_TYPE ErrorCode = 23
ErrorCode_OUT_OF_MEMORY ErrorCode = 24
)
var ErrorCode_name = map[int32]string{
0: "SUCCESS",
1: "UNEXPECTED_ERROR",
2: "CONNECT_FAILED",
3: "PERMISSION_DENIED",
4: "COLLECTION_NOT_EXISTS",
5: "ILLEGAL_ARGUMENT",
7: "ILLEGAL_DIMENSION",
8: "ILLEGAL_INDEX_TYPE",
9: "ILLEGAL_COLLECTION_NAME",
10: "ILLEGAL_TOPK",
11: "ILLEGAL_ROWRECORD",
12: "ILLEGAL_VECTOR_ID",
13: "ILLEGAL_SEARCH_RESULT",
14: "FILE_NOT_FOUND",
15: "META_FAILED",
16: "CACHE_FAILED",
17: "CANNOT_CREATE_FOLDER",
18: "CANNOT_CREATE_FILE",
19: "CANNOT_DELETE_FOLDER",
20: "CANNOT_DELETE_FILE",
21: "BUILD_INDEX_ERROR",
22: "ILLEGAL_NLIST",
23: "ILLEGAL_METRIC_TYPE",
24: "OUT_OF_MEMORY",
}
var ErrorCode_value = map[string]int32{
"SUCCESS": 0,
"UNEXPECTED_ERROR": 1,
"CONNECT_FAILED": 2,
"PERMISSION_DENIED": 3,
"COLLECTION_NOT_EXISTS": 4,
"ILLEGAL_ARGUMENT": 5,
"ILLEGAL_DIMENSION": 7,
"ILLEGAL_INDEX_TYPE": 8,
"ILLEGAL_COLLECTION_NAME": 9,
"ILLEGAL_TOPK": 10,
"ILLEGAL_ROWRECORD": 11,
"ILLEGAL_VECTOR_ID": 12,
"ILLEGAL_SEARCH_RESULT": 13,
"FILE_NOT_FOUND": 14,
"META_FAILED": 15,
"CACHE_FAILED": 16,
"CANNOT_CREATE_FOLDER": 17,
"CANNOT_CREATE_FILE": 18,
"CANNOT_DELETE_FOLDER": 19,
"CANNOT_DELETE_FILE": 20,
"BUILD_INDEX_ERROR": 21,
"ILLEGAL_NLIST": 22,
"ILLEGAL_METRIC_TYPE": 23,
"OUT_OF_MEMORY": 24,
}
func (x ErrorCode) String() string {
return proto.EnumName(ErrorCode_name, int32(x))
}
func (ErrorCode) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
type DataType int32
const (
DataType_NONE DataType = 0
DataType_BOOL DataType = 1
DataType_INT8 DataType = 2
DataType_INT16 DataType = 3
DataType_INT32 DataType = 4
DataType_INT64 DataType = 5
DataType_FLOAT DataType = 10
DataType_DOUBLE DataType = 11
DataType_STRING DataType = 20
DataType_VECTOR_BINARY DataType = 100
DataType_VECTOR_FLOAT DataType = 101
)
var DataType_name = map[int32]string{
0: "NONE",
1: "BOOL",
2: "INT8",
3: "INT16",
4: "INT32",
5: "INT64",
10: "FLOAT",
11: "DOUBLE",
20: "STRING",
100: "VECTOR_BINARY",
101: "VECTOR_FLOAT",
}
var DataType_value = map[string]int32{
"NONE": 0,
"BOOL": 1,
"INT8": 2,
"INT16": 3,
"INT32": 4,
"INT64": 5,
"FLOAT": 10,
"DOUBLE": 11,
"STRING": 20,
"VECTOR_BINARY": 100,
"VECTOR_FLOAT": 101,
}
func (x DataType) String() string {
return proto.EnumName(DataType_name, int32(x))
}
func (DataType) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
type OpType int32
const (
OpType_Insert OpType = 0
OpType_Delete OpType = 1
OpType_Search OpType = 2
OpType_TimeSync OpType = 3
OpType_Key2Seg OpType = 4
OpType_Statistics OpType = 5
)
var OpType_name = map[int32]string{
0: "Insert",
1: "Delete",
2: "Search",
3: "TimeSync",
4: "Key2Seg",
5: "Statistics",
}
var OpType_value = map[string]int32{
"Insert": 0,
"Delete": 1,
"Search": 2,
"TimeSync": 3,
"Key2Seg": 4,
"Statistics": 5,
}
func (x OpType) String() string {
return proto.EnumName(OpType_name, int32(x))
}
func (OpType) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
type Status struct {
ErrorCode ErrorCode `protobuf:"varint,1,opt,name=error_code,json=errorCode,enum=pb.ErrorCode" json:"error_code,omitempty"`
Reason string `protobuf:"bytes,2,opt,name=reason" json:"reason,omitempty"`
}
func (m *Status) Reset() { *m = Status{} }
func (m *Status) String() string { return proto.CompactTextString(m) }
func (*Status) ProtoMessage() {}
func (*Status) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
func (m *Status) GetErrorCode() ErrorCode {
if m != nil {
return m.ErrorCode
}
return ErrorCode_SUCCESS
}
func (m *Status) GetReason() string {
if m != nil {
return m.Reason
}
return ""
}
type SegmentRecord struct {
SegInfo []string `protobuf:"bytes,1,rep,name=seg_info,json=segInfo" json:"seg_info,omitempty"`
}
func (m *SegmentRecord) Reset() { *m = SegmentRecord{} }
func (m *SegmentRecord) String() string { return proto.CompactTextString(m) }
func (*SegmentRecord) ProtoMessage() {}
func (*SegmentRecord) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
func (m *SegmentRecord) GetSegInfo() []string {
if m != nil {
return m.SegInfo
}
return nil
}
type VectorRowRecord struct {
FloatData []float32 `protobuf:"fixed32,1,rep,packed,name=float_data,json=floatData" json:"float_data,omitempty"`
BinaryData []byte `protobuf:"bytes,2,opt,name=binary_data,json=binaryData,proto3" json:"binary_data,omitempty"`
}
func (m *VectorRowRecord) Reset() { *m = VectorRowRecord{} }
func (m *VectorRowRecord) String() string { return proto.CompactTextString(m) }
func (*VectorRowRecord) ProtoMessage() {}
func (*VectorRowRecord) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
func (m *VectorRowRecord) GetFloatData() []float32 {
if m != nil {
return m.FloatData
}
return nil
}
func (m *VectorRowRecord) GetBinaryData() []byte {
if m != nil {
return m.BinaryData
}
return nil
}
type AttrRecord struct {
Int32Value []int32 `protobuf:"varint,1,rep,packed,name=int32_value,json=int32Value" json:"int32_value,omitempty"`
Int64Value []int64 `protobuf:"varint,2,rep,packed,name=int64_value,json=int64Value" json:"int64_value,omitempty"`
FloatValue []float32 `protobuf:"fixed32,3,rep,packed,name=float_value,json=floatValue" json:"float_value,omitempty"`
DoubleValue []float64 `protobuf:"fixed64,4,rep,packed,name=double_value,json=doubleValue" json:"double_value,omitempty"`
}
func (m *AttrRecord) Reset() { *m = AttrRecord{} }
func (m *AttrRecord) String() string { return proto.CompactTextString(m) }
func (*AttrRecord) ProtoMessage() {}
func (*AttrRecord) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} }
func (m *AttrRecord) GetInt32Value() []int32 {
if m != nil {
return m.Int32Value
}
return nil
}
func (m *AttrRecord) GetInt64Value() []int64 {
if m != nil {
return m.Int64Value
}
return nil
}
func (m *AttrRecord) GetFloatValue() []float32 {
if m != nil {
return m.FloatValue
}
return nil
}
func (m *AttrRecord) GetDoubleValue() []float64 {
if m != nil {
return m.DoubleValue
}
return nil
}
type VectorRecord struct {
Records []*VectorRowRecord `protobuf:"bytes,1,rep,name=records" json:"records,omitempty"`
}
func (m *VectorRecord) Reset() { *m = VectorRecord{} }
func (m *VectorRecord) String() string { return proto.CompactTextString(m) }
func (*VectorRecord) ProtoMessage() {}
func (*VectorRecord) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} }
func (m *VectorRecord) GetRecords() []*VectorRowRecord {
if m != nil {
return m.Records
}
return nil
}
type VectorParam struct {
Json string `protobuf:"bytes,1,opt,name=json" json:"json,omitempty"`
RowRecord *VectorRecord `protobuf:"bytes,2,opt,name=row_record,json=rowRecord" json:"row_record,omitempty"`
}
func (m *VectorParam) Reset() { *m = VectorParam{} }
func (m *VectorParam) String() string { return proto.CompactTextString(m) }
func (*VectorParam) ProtoMessage() {}
func (*VectorParam) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} }
func (m *VectorParam) GetJson() string {
if m != nil {
return m.Json
}
return ""
}
func (m *VectorParam) GetRowRecord() *VectorRecord {
if m != nil {
return m.RowRecord
}
return nil
}
type FieldValue struct {
FieldName string `protobuf:"bytes,1,opt,name=field_name,json=fieldName" json:"field_name,omitempty"`
Type DataType `protobuf:"varint,2,opt,name=type,enum=pb.DataType" json:"type,omitempty"`
AttrRecord *AttrRecord `protobuf:"bytes,3,opt,name=attr_record,json=attrRecord" json:"attr_record,omitempty"`
VectorRecord *VectorRecord `protobuf:"bytes,4,opt,name=vector_record,json=vectorRecord" json:"vector_record,omitempty"`
}
func (m *FieldValue) Reset() { *m = FieldValue{} }
func (m *FieldValue) String() string { return proto.CompactTextString(m) }
func (*FieldValue) ProtoMessage() {}
func (*FieldValue) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} }
func (m *FieldValue) GetFieldName() string {
if m != nil {
return m.FieldName
}
return ""
}
func (m *FieldValue) GetType() DataType {
if m != nil {
return m.Type
}
return DataType_NONE
}
func (m *FieldValue) GetAttrRecord() *AttrRecord {
if m != nil {
return m.AttrRecord
}
return nil
}
func (m *FieldValue) GetVectorRecord() *VectorRecord {
if m != nil {
return m.VectorRecord
}
return nil
}
type PulsarMessage struct {
CollectionName string `protobuf:"bytes,1,opt,name=collection_name,json=collectionName" json:"collection_name,omitempty"`
Fields []*FieldValue `protobuf:"bytes,2,rep,name=fields" json:"fields,omitempty"`
EntityId int64 `protobuf:"varint,3,opt,name=entity_id,json=entityId" json:"entity_id,omitempty"`
PartitionTag string `protobuf:"bytes,4,opt,name=partition_tag,json=partitionTag" json:"partition_tag,omitempty"`
VectorParam *VectorParam `protobuf:"bytes,5,opt,name=vector_param,json=vectorParam" json:"vector_param,omitempty"`
Segments *SegmentRecord `protobuf:"bytes,6,opt,name=segments" json:"segments,omitempty"`
Timestamp int64 `protobuf:"varint,7,opt,name=timestamp" json:"timestamp,omitempty"`
ClientId int64 `protobuf:"varint,8,opt,name=client_id,json=clientId" json:"client_id,omitempty"`
MsgType OpType `protobuf:"varint,9,opt,name=msg_type,json=msgType,enum=pb.OpType" json:"msg_type,omitempty"`
}
func (m *PulsarMessage) Reset() { *m = PulsarMessage{} }
func (m *PulsarMessage) String() string { return proto.CompactTextString(m) }
func (*PulsarMessage) ProtoMessage() {}
func (*PulsarMessage) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} }
func (m *PulsarMessage) GetCollectionName() string {
if m != nil {
return m.CollectionName
}
return ""
}
func (m *PulsarMessage) GetFields() []*FieldValue {
if m != nil {
return m.Fields
}
return nil
}
func (m *PulsarMessage) GetEntityId() int64 {
if m != nil {
return m.EntityId
}
return 0
}
func (m *PulsarMessage) GetPartitionTag() string {
if m != nil {
return m.PartitionTag
}
return ""
}
func (m *PulsarMessage) GetVectorParam() *VectorParam {
if m != nil {
return m.VectorParam
}
return nil
}
func (m *PulsarMessage) GetSegments() *SegmentRecord {
if m != nil {
return m.Segments
}
return nil
}
func (m *PulsarMessage) GetTimestamp() int64 {
if m != nil {
return m.Timestamp
}
return 0
}
func (m *PulsarMessage) GetClientId() int64 {
if m != nil {
return m.ClientId
}
return 0
}
func (m *PulsarMessage) GetMsgType() OpType {
if m != nil {
return m.MsgType
}
return OpType_Insert
}
type PulsarMessages struct {
CollectionName string `protobuf:"bytes,1,opt,name=collection_name,json=collectionName" json:"collection_name,omitempty"`
Fields []*FieldValue `protobuf:"bytes,2,rep,name=fields" json:"fields,omitempty"`
EntityId []int64 `protobuf:"varint,3,rep,packed,name=entity_id,json=entityId" json:"entity_id,omitempty"`
PartitionTag string `protobuf:"bytes,4,opt,name=partition_tag,json=partitionTag" json:"partition_tag,omitempty"`
VectorParam []*VectorParam `protobuf:"bytes,5,rep,name=vector_param,json=vectorParam" json:"vector_param,omitempty"`
Segments []*SegmentRecord `protobuf:"bytes,6,rep,name=segments" json:"segments,omitempty"`
Timestamp []int64 `protobuf:"varint,7,rep,packed,name=timestamp" json:"timestamp,omitempty"`
ClientId []int64 `protobuf:"varint,8,rep,packed,name=client_id,json=clientId" json:"client_id,omitempty"`
MsgType OpType `protobuf:"varint,9,opt,name=msg_type,json=msgType,enum=pb.OpType" json:"msg_type,omitempty"`
}
func (m *PulsarMessages) Reset() { *m = PulsarMessages{} }
func (m *PulsarMessages) String() string { return proto.CompactTextString(m) }
func (*PulsarMessages) ProtoMessage() {}
func (*PulsarMessages) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{8} }
func (m *PulsarMessages) GetCollectionName() string {
if m != nil {
return m.CollectionName
}
return ""
}
func (m *PulsarMessages) GetFields() []*FieldValue {
if m != nil {
return m.Fields
}
return nil
}
func (m *PulsarMessages) GetEntityId() []int64 {
if m != nil {
return m.EntityId
}
return nil
}
func (m *PulsarMessages) GetPartitionTag() string {
if m != nil {
return m.PartitionTag
}
return ""
}
func (m *PulsarMessages) GetVectorParam() []*VectorParam {
if m != nil {
return m.VectorParam
}
return nil
}
func (m *PulsarMessages) GetSegments() []*SegmentRecord {
if m != nil {
return m.Segments
}
return nil
}
func (m *PulsarMessages) GetTimestamp() []int64 {
if m != nil {
return m.Timestamp
}
return nil
}
func (m *PulsarMessages) GetClientId() []int64 {
if m != nil {
return m.ClientId
}
return nil
}
func (m *PulsarMessages) GetMsgType() OpType {
if m != nil {
return m.MsgType
}
return OpType_Insert
}
func init() {
proto.RegisterType((*Status)(nil), "pb.Status")
proto.RegisterType((*SegmentRecord)(nil), "pb.SegmentRecord")
proto.RegisterType((*VectorRowRecord)(nil), "pb.VectorRowRecord")
proto.RegisterType((*AttrRecord)(nil), "pb.AttrRecord")
proto.RegisterType((*VectorRecord)(nil), "pb.VectorRecord")
proto.RegisterType((*VectorParam)(nil), "pb.VectorParam")
proto.RegisterType((*FieldValue)(nil), "pb.FieldValue")
proto.RegisterType((*PulsarMessage)(nil), "pb.PulsarMessage")
proto.RegisterType((*PulsarMessages)(nil), "pb.PulsarMessages")
proto.RegisterEnum("pb.ErrorCode", ErrorCode_name, ErrorCode_value)
proto.RegisterEnum("pb.DataType", DataType_name, DataType_value)
proto.RegisterEnum("pb.OpType", OpType_name, OpType_value)
}
func init() { proto.RegisterFile("pulsar.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 1101 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x56, 0x5d, 0x6e, 0xdb, 0x46,
0x17, 0x0d, 0x45, 0xfd, 0xf1, 0x52, 0x92, 0xc7, 0x13, 0x27, 0x51, 0x90, 0xef, 0x43, 0x54, 0x15,
0x6d, 0x0d, 0xa3, 0x49, 0x50, 0x25, 0x35, 0xfa, 0xd2, 0x07, 0x9a, 0x1c, 0x25, 0x44, 0x28, 0x52,
0x1d, 0x52, 0x8e, 0xfd, 0x44, 0xd0, 0xd2, 0x58, 0x65, 0x21, 0x91, 0x02, 0x39, 0x76, 0xa0, 0x65,
0xb4, 0x4b, 0xe8, 0x1e, 0xba, 0xa6, 0x76, 0x19, 0xc5, 0x0c, 0x49, 0x4b, 0x09, 0xd0, 0x02, 0x29,
0xda, 0xb7, 0xcb, 0x33, 0xe7, 0xde, 0x39, 0xe7, 0x5c, 0x52, 0x10, 0x74, 0x36, 0x37, 0xab, 0x3c,
0xca, 0x9e, 0x6f, 0xb2, 0x94, 0xa7, 0xb8, 0xb6, 0xb9, 0x1a, 0xba, 0xd0, 0xf4, 0x79, 0xc4, 0x6f,
0x72, 0xfc, 0x35, 0x00, 0xcb, 0xb2, 0x34, 0x0b, 0xe7, 0xe9, 0x82, 0xf5, 0x95, 0x81, 0x72, 0xdc,
0x1b, 0x75, 0x9f, 0x6f, 0xae, 0x9e, 0x13, 0x81, 0x9a, 0xe9, 0x82, 0x51, 0x8d, 0x55, 0x25, 0x7e,
0x08, 0xcd, 0x8c, 0x45, 0x79, 0x9a, 0xf4, 0x6b, 0x03, 0xe5, 0x58, 0xa3, 0xe5, 0xd3, 0xf0, 0x04,
0xba, 0x3e, 0x5b, 0xae, 0x59, 0xc2, 0x29, 0x9b, 0xa7, 0xd9, 0x02, 0x3f, 0x86, 0x76, 0xce, 0x96,
0x61, 0x9c, 0x5c, 0xa7, 0x7d, 0x65, 0xa0, 0x1e, 0x6b, 0xb4, 0x95, 0xb3, 0xa5, 0x9d, 0x5c, 0xa7,
0xc3, 0x1f, 0xe0, 0xe0, 0x9c, 0xcd, 0x79, 0x9a, 0xd1, 0xf4, 0x7d, 0xc9, 0xfe, 0x3f, 0xc0, 0xf5,
0x2a, 0x8d, 0x78, 0xb8, 0x88, 0x78, 0x24, 0xf9, 0x35, 0xaa, 0x49, 0xc4, 0x8a, 0x78, 0x84, 0x9f,
0x82, 0x7e, 0x15, 0x27, 0x51, 0xb6, 0x2d, 0xce, 0xc5, 0xd5, 0x1d, 0x0a, 0x05, 0x24, 0x08, 0xc3,
0x5f, 0x14, 0x00, 0x83, 0xf3, 0xac, 0x1c, 0xf7, 0x14, 0xf4, 0x38, 0xe1, 0x2f, 0x47, 0xe1, 0x6d,
0xb4, 0xba, 0x61, 0x72, 0x5e, 0x83, 0x82, 0x84, 0xce, 0x05, 0x52, 0x12, 0x4e, 0x5f, 0x95, 0x84,
0xda, 0x40, 0x3d, 0x56, 0x25, 0xe1, 0xf4, 0xd5, 0x1d, 0xa1, 0x10, 0x54, 0x10, 0x54, 0xa9, 0xa8,
0xd0, 0x58, 0x10, 0x3e, 0x83, 0xce, 0x22, 0xbd, 0xb9, 0x5a, 0xb1, 0x92, 0x51, 0x1f, 0xa8, 0xc7,
0x0a, 0xd5, 0x0b, 0x4c, 0x52, 0x86, 0xdf, 0x43, 0xa7, 0xf4, 0x59, 0xa8, 0x7a, 0x06, 0xad, 0x4c,
0x56, 0xb9, 0x54, 0xa4, 0x8f, 0xee, 0x8b, 0x98, 0x3f, 0x8a, 0x82, 0x56, 0x9c, 0x21, 0x05, 0xbd,
0x38, 0x9b, 0x46, 0x59, 0xb4, 0xc6, 0x18, 0xea, 0x3f, 0x89, 0xdc, 0x15, 0x99, 0xbb, 0xac, 0xf1,
0x0b, 0x80, 0x2c, 0x7d, 0x1f, 0x16, 0x1d, 0x32, 0x16, 0x7d, 0x84, 0xf6, 0x86, 0x16, 0x13, 0xb5,
0xac, 0x1a, 0x3e, 0xfc, 0x4d, 0x01, 0x18, 0xc7, 0x6c, 0xb5, 0x28, 0x4c, 0x88, 0xd8, 0xc5, 0x53,
0x98, 0x44, 0x6b, 0x56, 0x4e, 0xd6, 0x24, 0xe2, 0x46, 0x6b, 0x86, 0x07, 0x50, 0xe7, 0xdb, 0x0d,
0x93, 0x83, 0x7b, 0xa3, 0x8e, 0x18, 0x2c, 0xd2, 0x0e, 0xb6, 0x1b, 0x46, 0xe5, 0x09, 0x7e, 0x01,
0x7a, 0xc4, 0x79, 0x56, 0x29, 0x50, 0xa5, 0x82, 0x9e, 0x20, 0xee, 0xb6, 0x41, 0x21, 0xda, 0x6d,
0xe6, 0x5b, 0xe8, 0xde, 0x4a, 0x6d, 0x55, 0x4b, 0xfd, 0x2f, 0x44, 0x77, 0x6e, 0xf7, 0x9e, 0x86,
0xbf, 0xd7, 0xa0, 0x3b, 0x95, 0xef, 0xf0, 0x84, 0xe5, 0x79, 0xb4, 0x64, 0xf8, 0x2b, 0x38, 0x98,
0xa7, 0xab, 0x15, 0x9b, 0xf3, 0x38, 0x4d, 0xf6, 0xf5, 0xf7, 0x76, 0xb0, 0x34, 0xf1, 0x25, 0x34,
0xa5, 0xa3, 0x5c, 0x6e, 0xb9, 0x54, 0xb7, 0xcb, 0x80, 0x96, 0xa7, 0xf8, 0x09, 0x68, 0x2c, 0xe1,
0x31, 0xdf, 0x86, 0x71, 0x61, 0x44, 0xa5, 0xed, 0x02, 0xb0, 0x17, 0xf8, 0x73, 0xe8, 0x6e, 0xa2,
0x8c, 0xc7, 0xf2, 0x32, 0x1e, 0x2d, 0xa5, 0x6c, 0x8d, 0x76, 0xee, 0xc0, 0x20, 0x5a, 0xe2, 0x11,
0x94, 0xa2, 0xc3, 0x8d, 0xd8, 0x58, 0xbf, 0x21, 0xad, 0x1d, 0xec, 0xac, 0xc9, 0x45, 0x52, 0xfd,
0x76, 0x6f, 0xab, 0xcf, 0xe4, 0x67, 0x22, 0xbe, 0x9b, 0xbc, 0xdf, 0x94, 0xfc, 0x43, 0xc1, 0xff,
0xe0, 0x5b, 0xa2, 0x77, 0x14, 0xfc, 0x3f, 0xd0, 0x78, 0xbc, 0x66, 0x39, 0x8f, 0xd6, 0x9b, 0x7e,
0x4b, 0x8a, 0xdc, 0x01, 0xc2, 0xc2, 0x7c, 0x15, 0xb3, 0x84, 0x0b, 0x0b, 0xed, 0xc2, 0x42, 0x01,
0xd8, 0x0b, 0xfc, 0x05, 0xb4, 0xd7, 0xf9, 0x32, 0x94, 0x0b, 0xd5, 0xe4, 0x42, 0x41, 0xdc, 0xe4,
0x6d, 0xe4, 0x3a, 0x5b, 0xeb, 0x7c, 0x29, 0x8a, 0xe1, 0x1f, 0x35, 0xe8, 0x7d, 0x90, 0x74, 0xfe,
0x9f, 0x47, 0xad, 0xfe, 0x1b, 0x51, 0xab, 0x9f, 0x18, 0xb5, 0xfa, 0x89, 0x51, 0xab, 0x7f, 0x1b,
0xb5, 0xfa, 0x0f, 0xa2, 0x3e, 0xf9, 0xb5, 0x0e, 0xda, 0xdd, 0x8f, 0x2c, 0xd6, 0xa1, 0xe5, 0xcf,
0x4c, 0x93, 0xf8, 0x3e, 0xba, 0x87, 0x8f, 0x00, 0xcd, 0x5c, 0x72, 0x31, 0x25, 0x66, 0x40, 0xac,
0x90, 0x50, 0xea, 0x51, 0xa4, 0x60, 0x0c, 0x3d, 0xd3, 0x73, 0x5d, 0x62, 0x06, 0xe1, 0xd8, 0xb0,
0x1d, 0x62, 0xa1, 0x1a, 0x7e, 0x00, 0x87, 0x53, 0x42, 0x27, 0xb6, 0xef, 0xdb, 0x9e, 0x1b, 0x5a,
0xc4, 0xb5, 0x89, 0x85, 0x54, 0xfc, 0x18, 0x1e, 0x98, 0x9e, 0xe3, 0x10, 0x33, 0x10, 0xb0, 0xeb,
0x05, 0x21, 0xb9, 0xb0, 0xfd, 0xc0, 0x47, 0x75, 0x31, 0xdb, 0x76, 0x1c, 0xf2, 0xda, 0x70, 0x42,
0x83, 0xbe, 0x9e, 0x4d, 0x88, 0x1b, 0xa0, 0x86, 0x98, 0x53, 0xa1, 0x96, 0x3d, 0x21, 0xae, 0x18,
0x87, 0x5a, 0xf8, 0x21, 0xe0, 0x0a, 0xb6, 0x5d, 0x8b, 0x5c, 0x84, 0xc1, 0xe5, 0x94, 0xa0, 0x36,
0x7e, 0x02, 0x8f, 0x2a, 0x7c, 0xff, 0x1e, 0x63, 0x42, 0x90, 0x86, 0x11, 0x74, 0xaa, 0xc3, 0xc0,
0x9b, 0xbe, 0x45, 0xb0, 0x3f, 0x9d, 0x7a, 0xef, 0x28, 0x31, 0x3d, 0x6a, 0x21, 0x7d, 0x1f, 0x3e,
0x27, 0x66, 0xe0, 0xd1, 0xd0, 0xb6, 0x50, 0x47, 0x88, 0xaf, 0x60, 0x9f, 0x18, 0xd4, 0x7c, 0x13,
0x52, 0xe2, 0xcf, 0x9c, 0x00, 0x75, 0x45, 0x04, 0x63, 0xdb, 0x21, 0xd2, 0xd1, 0xd8, 0x9b, 0xb9,
0x16, 0xea, 0xe1, 0x03, 0xd0, 0x27, 0x24, 0x30, 0xaa, 0x4c, 0x0e, 0xc4, 0xfd, 0xa6, 0x61, 0xbe,
0x21, 0x15, 0x82, 0x70, 0x1f, 0x8e, 0x4c, 0xc3, 0x15, 0x4d, 0x26, 0x25, 0x46, 0x40, 0xc2, 0xb1,
0xe7, 0x58, 0x84, 0xa2, 0x43, 0x61, 0xf0, 0xa3, 0x13, 0xdb, 0x21, 0x08, 0xef, 0x75, 0x58, 0xc4,
0x21, 0xbb, 0x8e, 0xfb, 0x7b, 0x1d, 0xd5, 0x89, 0xe8, 0x38, 0x12, 0x66, 0xce, 0x66, 0xb6, 0x63,
0x95, 0x41, 0x15, 0x4b, 0x7b, 0x80, 0x0f, 0xa1, 0x5b, 0x99, 0x71, 0x1d, 0xdb, 0x0f, 0xd0, 0x43,
0xfc, 0x08, 0xee, 0x57, 0xd0, 0x84, 0x04, 0xd4, 0x36, 0x8b, 0x54, 0x1f, 0x09, 0xae, 0x37, 0x0b,
0x42, 0x6f, 0x1c, 0x4e, 0xc8, 0xc4, 0xa3, 0x97, 0xa8, 0x7f, 0xf2, 0xb3, 0x02, 0xed, 0xea, 0x47,
0x17, 0xb7, 0xa1, 0xee, 0x7a, 0x2e, 0x41, 0xf7, 0x44, 0x75, 0xe6, 0x79, 0x0e, 0x52, 0x44, 0x65,
0xbb, 0xc1, 0x77, 0xa8, 0x86, 0x35, 0x68, 0xd8, 0x6e, 0xf0, 0xcd, 0x29, 0x52, 0xcb, 0xf2, 0xe5,
0x08, 0xd5, 0xcb, 0xf2, 0xf4, 0x15, 0x6a, 0x88, 0x72, 0xec, 0x78, 0x46, 0x80, 0x00, 0x03, 0x34,
0x2d, 0x6f, 0x76, 0xe6, 0x10, 0xa4, 0x8b, 0xda, 0x0f, 0xa8, 0xed, 0xbe, 0x46, 0x47, 0x42, 0x41,
0xb9, 0x89, 0x33, 0xdb, 0x35, 0xe8, 0x25, 0x5a, 0x88, 0x34, 0x4b, 0xa8, 0x68, 0x66, 0x27, 0xef,
0xa0, 0x59, 0xbc, 0xcb, 0xa2, 0xd5, 0x4e, 0x72, 0x96, 0x71, 0x74, 0x4f, 0x8e, 0x64, 0x2b, 0xc6,
0x19, 0x52, 0xe4, 0x48, 0x16, 0x65, 0xf3, 0x1f, 0x51, 0x0d, 0x77, 0xa0, 0x1d, 0xc4, 0x6b, 0xe6,
0x6f, 0x93, 0x39, 0x52, 0xc5, 0x6b, 0xfe, 0x96, 0x6d, 0x47, 0x3e, 0x5b, 0xa2, 0x3a, 0xee, 0x01,
0x88, 0x7f, 0x21, 0x71, 0xce, 0xe3, 0x79, 0x8e, 0x1a, 0x57, 0x4d, 0xf9, 0x07, 0xe5, 0xe5, 0x9f,
0x01, 0x00, 0x00, 0xff, 0xff, 0x27, 0x2a, 0xd3, 0x11, 0xb0, 0x08, 0x00, 0x00,
}

View File

@ -1,120 +0,0 @@
syntax = "proto3";
package pb;
enum ErrorCode {
SUCCESS = 0;
UNEXPECTED_ERROR = 1;
CONNECT_FAILED = 2;
PERMISSION_DENIED = 3;
COLLECTION_NOT_EXISTS = 4;
ILLEGAL_ARGUMENT = 5;
ILLEGAL_DIMENSION = 7;
ILLEGAL_INDEX_TYPE = 8;
ILLEGAL_COLLECTION_NAME = 9;
ILLEGAL_TOPK = 10;
ILLEGAL_ROWRECORD = 11;
ILLEGAL_VECTOR_ID = 12;
ILLEGAL_SEARCH_RESULT = 13;
FILE_NOT_FOUND = 14;
META_FAILED = 15;
CACHE_FAILED = 16;
CANNOT_CREATE_FOLDER = 17;
CANNOT_CREATE_FILE = 18;
CANNOT_DELETE_FOLDER = 19;
CANNOT_DELETE_FILE = 20;
BUILD_INDEX_ERROR = 21;
ILLEGAL_NLIST = 22;
ILLEGAL_METRIC_TYPE = 23;
OUT_OF_MEMORY = 24;
}
message Status {
ErrorCode error_code = 1;
string reason = 2;
}
enum DataType {
NONE = 0;
BOOL = 1;
INT8 = 2;
INT16 = 3;
INT32 = 4;
INT64 = 5;
FLOAT = 10;
DOUBLE = 11;
STRING = 20;
VECTOR_BINARY = 100;
VECTOR_FLOAT = 101;
}
enum OpType {
Insert = 0;
Delete = 1;
Search = 2;
TimeSync = 3;
Key2Seg = 4;
Statistics = 5;
}
message SegmentRecord {
repeated string seg_info = 1;
}
message VectorRowRecord {
repeated float float_data = 1; //float vector data
bytes binary_data = 2; //binary vector data
}
message AttrRecord {
repeated int32 int32_value = 1;
repeated int64 int64_value = 2;
repeated float float_value = 3;
repeated double double_value = 4;
}
message VectorRecord {
repeated VectorRowRecord records = 1;
}
message VectorParam {
string json = 1;
VectorRecord row_record = 2;
}
message FieldValue {
string field_name = 1;
DataType type = 2;
AttrRecord attr_record = 3;
VectorRecord vector_record = 4;
}
message PulsarMessage {
string collection_name = 1;
repeated FieldValue fields = 2;
int64 entity_id = 3;
string partition_tag = 4;
VectorParam vector_param =5;
SegmentRecord segments = 6;
int64 timestamp = 7;
int64 client_id = 8;
OpType msg_type = 9;
}
message PulsarMessages {
string collection_name = 1;
repeated FieldValue fields = 2;
repeated int64 entity_id = 3;
string partition_tag = 4;
repeated VectorParam vector_param =5;
repeated SegmentRecord segments = 6;
repeated int64 timestamp = 7;
repeated int64 client_id = 8;
OpType msg_type = 9;
}

View File

@ -1,71 +0,0 @@
package client_go
import (
"fmt"
"suvlim/pulsar/client-go/schema"
"sync"
"time"
)
var (
consumerQSchema = "{\"type\":\"record\",\"name\":\"suvlim\",\"namespace\":\"pulsar\",\"fields\":[" +
"{\"name\":\"MsgType\",\"type\":\"OpType\"}," +
"]}"
)
type QueryNode struct {
mc MessageClient
}
func (qn *QueryNode)doQueryNode(wg sync.WaitGroup) {
wg.Add(3)
go qn.insert_query(qn.mc.InsertMsg, wg)
go qn.delete_query(qn.mc.DeleteMsg, wg)
go qn.search_query(qn.mc.SearchMsg, wg)
wg.Wait()
}
func (qn *QueryNode) PrepareBatchMsg() {
qn.mc.PrepareBatchMsg(JobType(0))
}
func (qn *QueryNode)ReceiveMessage() {
qn.mc.ReceiveMessage()
}
func main() {
mc := MessageClient{}
topics := []string{"insert", "delete"}
mc.InitClient("pulsar://localhost:6650", topics, consumerQSchema)
qn := QueryNode{mc}
wg := sync.WaitGroup{}
go qn.ReceiveMessage()
for {
time.Sleep(200 * time.Millisecond)
qn.PrepareBatchMsg()
qn.doQueryNode(wg)
fmt.Println("do a batch in 200ms")
}
}
func (qn *QueryNode) insert_query(data []*schema.InsertMsg, wg sync.WaitGroup) schema.Status {
wg.Done()
return schema.Status{schema.ErrorCode_SUCCESS, ""}
}
func (qn *QueryNode) delete_query(data []*schema.DeleteMsg, wg sync.WaitGroup) schema.Status {
wg.Done()
return schema.Status{schema.ErrorCode_SUCCESS, ""}
}
func (qn *QueryNode) search_query(data []*schema.SearchMsg, wg sync.WaitGroup) schema.Status {
wg.Done()
return schema.Status{schema.ErrorCode_SUCCESS, ""}
}

View File

@ -1,198 +0,0 @@
package schema
import (
"encoding/json"
"fmt"
)
type ErrorCode int32
const (
ErrorCode_SUCCESS ErrorCode = 0
ErrorCode_UNEXPECTED_ERROR ErrorCode = 1
ErrorCode_CONNECT_FAILED ErrorCode = 2
ErrorCode_PERMISSION_DENIED ErrorCode = 3
ErrorCode_COLLECTION_NOT_EXISTS ErrorCode = 4
ErrorCode_ILLEGAL_ARGUMENT ErrorCode = 5
ErrorCode_ILLEGAL_DIMENSION ErrorCode = 7
ErrorCode_ILLEGAL_INDEX_TYPE ErrorCode = 8
ErrorCode_ILLEGAL_COLLECTION_NAME ErrorCode = 9
ErrorCode_ILLEGAL_TOPK ErrorCode = 10
ErrorCode_ILLEGAL_ROWRECORD ErrorCode = 11
ErrorCode_ILLEGAL_VECTOR_ID ErrorCode = 12
ErrorCode_ILLEGAL_SEARCH_RESULT ErrorCode = 13
ErrorCode_FILE_NOT_FOUND ErrorCode = 14
ErrorCode_META_FAILED ErrorCode = 15
ErrorCode_CACHE_FAILED ErrorCode = 16
ErrorCode_CANNOT_CREATE_FOLDER ErrorCode = 17
ErrorCode_CANNOT_CREATE_FILE ErrorCode = 18
ErrorCode_CANNOT_DELETE_FOLDER ErrorCode = 19
ErrorCode_CANNOT_DELETE_FILE ErrorCode = 20
ErrorCode_BUILD_INDEX_ERROR ErrorCode = 21
ErrorCode_ILLEGAL_NLIST ErrorCode = 22
ErrorCode_ILLEGAL_METRIC_TYPE ErrorCode = 23
ErrorCode_OUT_OF_MEMORY ErrorCode = 24
)
type Status struct {
Error_code ErrorCode
Reason string
}
type DataType int32
const (
NONE DataType = 0
BOOL DataType = 1
INT8 DataType = 2
INT16 DataType = 3
INT32 DataType = 4
INT64 DataType = 5
FLOAT DataType = 10
DOUBLE DataType = 11
STRING DataType = 20
VectorBinary DataType = 100
VectorFloat DataType = 101
)
type AttrRecord struct {
Int32Value int32
Int64Value int64
FloatValue float32
DoubleValue float64
}
type VectorRowRecord struct {
FloatData []float32
BinaryData []byte
}
type VectorRecord struct {
Records *VectorRowRecord
}
type FieldValue struct {
FieldName string
Type DataType
AttrRecord *AttrRecord //what's the diff with VectorRecord
VectorRecord *VectorRecord
}
type VectorParam struct {
Json string
RowRecord *VectorRecord
}
type SegmentRecord struct {
segInfo []string
}
type OpType int
const (
Insert OpType = 0
Delete OpType = 1
Search OpType = 2
TimeSync OpType = 3
Key2Seg OpType = 4
Statistics OpType = 5
EOF OpType = 6
)
type PulsarMessage struct {
CollectionName string
Fields []*FieldValue
EntityId int64
PartitionTag string
VectorParam *VectorParam
Segments []*SegmentRecord
Timestamp int64
ClientId int64
MsgType OpType
TopicName string
PartitionId int64
}
type Message interface {
GetType() OpType
Serialization() []byte
Deserialization(serializationData []byte)
}
type InsertMsg struct {
CollectionName string
Fields []*FieldValue
EntityId uint64
PartitionTag string
SegmentId uint64
Timestamp uint64
ClientId int64
MsgType OpType
}
type DeleteMsg struct {
CollectionName string
EntityId uint64
Timestamp uint64
ClientId int64
MsgType OpType
}
type SearchMsg struct {
CollectionName string
PartitionTag string
VectorParam *VectorParam
Timestamp uint64
ClientId int64
MsgType OpType
}
type TimeSyncMsg struct {
ClientId int64
Timestamp int64
MsgType OpType
}
type Key2SegMsg struct {
EntityId int64
Segments []*SegmentRecord
MsgType OpType
}
func (ims *InsertMsg) GetType() OpType {
return ims.MsgType
}
func (ims *InsertMsg) Serialization() []byte {
data, err := json.Marshal(ims)
if err != nil {
fmt.Println("Can't serialization")
}
return data
}
func (ims *InsertMsg) Deserialization(serializationData []byte) {
}
func (dms *DeleteMsg) GetType() OpType {
return dms.MsgType
}
func (sms *SearchMsg) GetType() OpType {
return sms.MsgType
}
func (tms *TimeSyncMsg) GetType() OpType {
return tms.MsgType
}
func (kms *Key2SegMsg) GetType() OpType {
return kms.MsgType
}
type SyncEofMsg struct {
MsgType OpType
}

View File

@ -1,56 +0,0 @@
package client_go
import (
"fmt"
"suvlim/pulsar/client-go/schema"
"sync"
"time"
)
var (
consumerWSchema = "{\"type\":\"record\",\"name\":\"suvlim\",\"namespace\":\"pulsar\",\"fields\":[" +
"{\"name\":\"MsgType\",\"type\":\"OpType\"}," +
"]}"
)
type WriteNode struct {
mc MessageClient
}
func (wn *WriteNode) doWriteNode(wg sync.WaitGroup) {
wg.Add(2)
go wn.insert_write(wn.mc.InsertMsg, wg)
go wn.delete_write(wn.mc.DeleteMsg, wg)
wg.Wait()
}
func (wn *WriteNode) PrepareBatchMsg() {
wn.mc.PrepareBatchMsg(JobType(1))
}
func main() {
mc := MessageClient{}
topics := []string{"insert", "delete"}
mc.InitClient("pulsar://localhost:6650", topics, consumerWSchema)
go mc.ReceiveMessage()
wg := sync.WaitGroup{}
wn := WriteNode{mc}
for {
time.Sleep(200 * time.Millisecond)
wn.PrepareBatchMsg()
wn.doWriteNode(wg)
fmt.Println("do a batch in 200ms")
}
}
func (wn *WriteNode) insert_write(data []*schema.InsertMsg, wg sync.WaitGroup) schema.Status {
wg.Done()
return schema.Status{schema.ErrorCode_SUCCESS, ""}
}
func (wn *WriteNode) delete_write(data []*schema.DeleteMsg, wg sync.WaitGroup) schema.Status {
wg.Done()
return schema.Status{schema.ErrorCode_SUCCESS, ""}
}

View File

@ -1,19 +0,0 @@
package test
import "sync"
var (
wg sync.WaitGroup
OriginMsgSchema = "{\"type\":\"record\",\"name\":\"suvlim\",\"namespace\":\"pulsar\",\"fields\":[" +
"{\"name\":\"CollectionName\",\"type\":\"string\"}," +
"{\"name\":\"Fields\",\"type\":\"[]*FieldValue\"}" +
"{\"name\":\"EntityId\",\"type\":\"int64\"}" +
"{\"name\":\"PartitionTag\",\"type\":\"string\"}" +
"{\"name\":\"VectorParam\",\"type\":\"*VectorParam\"}" +
"{\"name\":\"Segments\",\"type\":\"[]string\"}" +
"{\"name\":\"Timestamp\",\"type\":\"int64\"}" +
"{\"name\":\"ClientId\",\"type\":\"int64\"}" +
"{\"name\":\"MsgType\",\"type\":\"OpType\"}" +
"]}"
)

68
writer/main.go Normal file
View File

@ -0,0 +1,68 @@
package main
import (
"context"
"fmt"
"log"
"sync"
"time"
"writer/message_client"
"writer/mock"
"writer/pb"
"writer/write_node"
)
func GetInsertMsg(collectionName string, partitionTag string, entityId int64) *pb.InsertOrDeleteMsg {
return &pb.InsertOrDeleteMsg{
CollectionName: collectionName,
PartitionTag: partitionTag,
SegmentId: int64(entityId / 100),
Uid: int64(entityId),
Timestamp: int64(entityId),
ClientId: 0,
}
}
func GetDeleteMsg(collectionName string, entityId int64) *pb.InsertOrDeleteMsg {
return &pb.InsertOrDeleteMsg{
CollectionName: collectionName,
Uid: entityId,
Timestamp: int64(entityId + 100),
}
}
func main() {
mc := message_client.MessageClient{}
mc.InitClient("pulsar://localhost:6650")
//TODO::close client / consumer/ producer
//mc.Close()
go mc.ReceiveMessage()
wg := sync.WaitGroup{}
kv, err := mock.NewTikvStore()
if err != nil {
log.Fatal(err)
}
wn := write_node.WriteNode{
KvStore: kv,
MessageClient: &mc,
TimeSync: 100,
}
ctx := context.Background()
for {
time.Sleep(200 * time.Millisecond)
msgLength := wn.MessageClient.PrepareBatchMsg()
readyDo := true
for _, len := range msgLength {
if len <= 0 { readyDo = false }
}
if readyDo {
wn.DoWriteNode(ctx, 100, wg)
}
fmt.Println("do a batch in 200ms")
}
}

View File

@ -0,0 +1,197 @@
package message_client
import (
"context"
"github.com/apache/pulsar/pulsar-client-go/pulsar"
"github.com/golang/protobuf/proto"
"log"
"writer/pb"
)
type MessageClient struct {
//message channel
insertOrDeleteChan chan *pb.InsertOrDeleteMsg
searchChan chan *pb.SearchMsg
timeSyncChan chan *pb.TimeSyncMsg
// pulsar
client pulsar.Client
key2segProducer pulsar.Producer
writeSyncProducer pulsar.Producer
insertOrDeleteConsumer pulsar.Consumer
searchConsumer pulsar.Consumer
timeSyncConsumer pulsar.Consumer
// batch messages
InsertMsg []*pb.InsertOrDeleteMsg
DeleteMsg []*pb.InsertOrDeleteMsg
SearchMsg []*pb.SearchMsg
timeSyncMsg []*pb.TimeSyncMsg
}
func (mc *MessageClient)ReceiveInsertOrDeleteMsg() {
for {
insetOrDeleteMsg := pb.InsertOrDeleteMsg{}
msg, err := mc.insertOrDeleteConsumer.Receive(context.Background())
err = proto.Unmarshal(msg.Payload(), &insetOrDeleteMsg)
if err != nil {
log.Fatal(err)
}
mc.insertOrDeleteChan <- &insetOrDeleteMsg
}
}
func (mc *MessageClient)ReceiveSearchMsg() {
for {
searchMsg := pb.SearchMsg{}
msg, err := mc.insertOrDeleteConsumer.Receive(context.Background())
err = proto.Unmarshal(msg.Payload(), &searchMsg)
if err != nil {
log.Fatal(err)
}
mc.searchChan <- &searchMsg
}
}
func (mc *MessageClient)ReceiveTimeSyncMsg() {
for {
timeSyncMsg := pb.TimeSyncMsg{}
msg, err := mc.insertOrDeleteConsumer.Receive(context.Background())
err = proto.Unmarshal(msg.Payload(), &timeSyncMsg)
if err != nil {
log.Fatal(err)
}
mc.timeSyncChan <- &timeSyncMsg
}
}
func (mc *MessageClient) ReceiveMessage() {
go mc.ReceiveInsertOrDeleteMsg()
go mc.ReceiveSearchMsg()
go mc.ReceiveTimeSyncMsg()
}
func (mc *MessageClient) CreatProducer(topicName string) pulsar.Producer{
producer, err := mc.client.CreateProducer(pulsar.ProducerOptions{
Topic: topicName,
})
if err != nil {
log.Fatal(err)
}
return producer
}
func (mc *MessageClient) CreateConsumer(topicName string) pulsar.Consumer {
consumer, err := mc.client.Subscribe(pulsar.ConsumerOptions{
Topic: topicName,
SubscriptionName: "multi-topic-sub",
})
if err != nil {
log.Fatal(err)
}
return consumer
}
func (mc *MessageClient) CreateClient(url string) pulsar.Client {
// create client
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: url,
})
if err != nil {
log.Fatal(err)
}
return client
}
func (mc *MessageClient) InitClient(url string) {
//create client
mc.client = mc.CreateClient(url)
//create producer
mc.key2segProducer = mc.CreatProducer("Key2Seg")
mc.writeSyncProducer = mc.CreatProducer("TimeSync")
//create consumer
mc.insertOrDeleteConsumer = mc.CreateConsumer("InsertOrDelete")
mc.searchConsumer = mc.CreateConsumer("Search")
mc.timeSyncConsumer = mc.CreateConsumer("TimeSync")
// init channel
mc.insertOrDeleteChan = make(chan *pb.InsertOrDeleteMsg, 1000)
mc.searchChan = make(chan *pb.SearchMsg, 1000)
mc.timeSyncChan = make(chan *pb.TimeSyncMsg, 1000)
}
func (mc *MessageClient) Close() {
defer mc.client.Close()
defer mc.key2segProducer.Close()
defer mc.writeSyncProducer.Close()
defer mc.insertOrDeleteConsumer.Close()
defer mc.searchConsumer.Close()
defer mc.timeSyncConsumer.Close()
}
type JobType int
const (
OpInQueryNode JobType = 0
OpInWriteNode JobType = 1
)
type MessageType int
const (
InsertOrDelete MessageType = 0
Delete MessageType = 1
Search MessageType = 2
TimeSync MessageType = 3
Key2Seg MessageType = 4
Statistics MessageType = 5
)
func (mc *MessageClient) PrepareMsg(messageType MessageType, msgLen int) {
switch messageType {
case InsertOrDelete:
for i := 0; i < msgLen; i++ {
msg := <-mc.insertOrDeleteChan
if msg.Op == pb.OpType_INSERT {
mc.InsertMsg[i] = msg
} else {
mc.DeleteMsg[i] = msg
}
}
case Search:
for i := 0; i < msgLen; i++ {
msg := <-mc.searchChan
mc.SearchMsg[i] = msg
}
case TimeSync:
for i := 0; i < msgLen; i++ {
msg := <-mc.timeSyncChan
mc.timeSyncMsg[i] = msg
}
}
}
func (mc *MessageClient)PrepareBatchMsg() []int{
// assume the channel not full
mc.InsertMsg = make([]*pb.InsertOrDeleteMsg, 1000)
mc.DeleteMsg = make([]*pb.InsertOrDeleteMsg, 1000)
mc.SearchMsg = make([]*pb.SearchMsg, 1000)
mc.timeSyncMsg = make([]*pb.TimeSyncMsg, 1000)
// get the length of every channel
insertOrDeleteLen := len(mc.insertOrDeleteChan)
searchLen := len(mc.searchChan)
timeLen := len(mc.timeSyncChan)
// get message from channel to slice
mc.PrepareMsg(InsertOrDelete, insertOrDeleteLen)
mc.PrepareMsg(Search, searchLen)
mc.PrepareMsg(TimeSync, timeLen)
return []int{insertOrDeleteLen, searchLen, timeLen}
}

View File

@ -97,7 +97,7 @@ message FieldName {
message Mapping {
Status status = 1;
string collection_name = 2;
repeated FieldParam fields = 3;
Schema schema = 3;
repeated KeyValuePair extra_params = 4;
}

View File

@ -1,57 +1,61 @@
package main
package test
import (
"context"
"github.com/czs007/suvlim/pulsar/schema"
"github.com/czs007/suvlim/writer"
"sync"
"testing"
"writer/pb"
"writer/write_node"
)
func GetInsertMsg(collectionName string, partitionTag string, entityId int64) *schema.InsertMsg {
return &schema.InsertMsg{
func GetInsertMsg(collectionName string, partitionTag string, entityId int64) *pb.InsertOrDeleteMsg {
return &pb.InsertOrDeleteMsg{
CollectionName: collectionName,
PartitionTag: partitionTag,
SegmentId: uint64(entityId / 100),
EntityId: int64(entityId),
Timestamp: uint64(entityId),
SegmentId: int64(entityId / 100),
Uid: int64(entityId),
Timestamp: int64(entityId),
ClientId: 0,
}
}
func GetDeleteMsg(collectionName string, entityId int64) *schema.DeleteMsg {
return &schema.DeleteMsg{
func GetDeleteMsg(collectionName string, entityId int64) *pb.InsertOrDeleteMsg {
return &pb.InsertOrDeleteMsg{
CollectionName: collectionName,
EntityId: entityId,
Timestamp: uint64(entityId + 100),
Uid: entityId,
Timestamp: int64(entityId + 100),
}
}
func main() {
func TestInsert(t *testing.T) {
ctx := context.Background()
var topics []string
topics = append(topics, "test")
topics = append(topics, "test1")
writerNode, _ := writer.NewWriteNode(ctx, "null", topics, 0)
var insertMsgs []*schema.InsertMsg
writerNode, _ := write_node.NewWriteNode(ctx, "null", topics, 0)
var insertMsgs []*pb.InsertOrDeleteMsg
for i := 0; i < 120; i++ {
insertMsgs = append(insertMsgs, GetInsertMsg("collection0", "tag01", int64(i)))
}
wg := sync.WaitGroup{}
wg.Add(3)
//var wg sync.WaitGroup
writerNode.InsertBatchData(ctx, insertMsgs, 100)
writerNode.InsertBatchData(ctx, insertMsgs, wg)
data1 := writerNode.KvStore.GetData(ctx)
gtInsertBuffer := writerNode.GetInsertBuffer()
//gtInsertBuffer := writerNode.GetInsertBuffer()
println(len(data1))
println(gtInsertBuffer.Len())
var insertMsgs2 []*schema.InsertMsg
var insertMsgs2 []*pb.InsertOrDeleteMsg
for i := 120; i < 200; i++ {
insertMsgs2 = append(insertMsgs2, GetInsertMsg("collection0", "tag02", int64(i)))
}
writerNode.InsertBatchData(ctx, insertMsgs2, 200)
writerNode.InsertBatchData(ctx, insertMsgs2, wg)
data2 := writerNode.KvStore.GetData(ctx)
println(len(data2))
var deleteMsgs []*schema.DeleteMsg
var deleteMsgs []*pb.InsertOrDeleteMsg
deleteMsgs = append(deleteMsgs, GetDeleteMsg("collection0", 2))
deleteMsgs = append(deleteMsgs, GetDeleteMsg("collection0", 120))
writerNode.DeleteBatchData(ctx, deleteMsgs, 200)
writerNode.DeleteBatchData(ctx, deleteMsgs, wg)
data3 := writerNode.KvStore.GetData(ctx)
println(len(data3))
}

View File

@ -1,13 +1,13 @@
package writer
package write_node
import (
"context"
"fmt"
"github.com/czs007/suvlim/pulsar"
"github.com/czs007/suvlim/pulsar/schema"
"github.com/czs007/suvlim/writer/mock"
"strconv"
"sync"
"writer/message_client"
"writer/mock"
"writer/pb"
)
type SegmentIdInfo struct {
@ -17,9 +17,9 @@ type SegmentIdInfo struct {
}
type WriteNode struct {
KvStore *mock.TikvStore
mc *pulsar.MessageClient
timeSync uint64
KvStore *mock.TikvStore
MessageClient *message_client.MessageClient
TimeSync uint64
}
func NewWriteNode(ctx context.Context,
@ -27,15 +27,15 @@ func NewWriteNode(ctx context.Context,
topics []string,
timeSync uint64) (*WriteNode, error) {
kv, err := mock.NewTikvStore()
mc := &pulsar.MessageClient{}
mc := &message_client.MessageClient{}
return &WriteNode{
KvStore: kv,
mc: mc,
timeSync: timeSync,
KvStore: kv,
MessageClient: mc,
TimeSync: timeSync,
}, err
}
func (wn *WriteNode) InsertBatchData(ctx context.Context, data []*schema.InsertMsg, timeSync uint64, wg sync.WaitGroup) error {
func (wn *WriteNode) InsertBatchData(ctx context.Context, data []*pb.InsertOrDeleteMsg, wg sync.WaitGroup) error {
var prefixKey string
var suffixKey string
var prefixKeys [][]byte
@ -44,12 +44,12 @@ func (wn *WriteNode) InsertBatchData(ctx context.Context, data []*schema.InsertM
var timeStamp []uint64
for i := 0; i < len(data); i++ {
prefixKey = data[i].CollectionName + "-" + strconv.FormatUint(data[i].EntityId, 10)
suffixKey = strconv.FormatUint(data[i].SegmentId, 10)
prefixKey = data[i].CollectionName + "-" + strconv.FormatUint(uint64(data[i].Uid), 10)
suffixKey = strconv.FormatUint(uint64(data[i].SegmentId), 10)
prefixKeys = append(prefixKeys, []byte(prefixKey))
suffixKeys = append(suffixKeys, []byte(suffixKey))
binaryData = append(binaryData, data[i].Serialization())
timeStamp = append(timeStamp, data[i].Timestamp)
binaryData = append(binaryData, []byte(data[i].String()))
timeStamp = append(timeStamp, uint64(data[i].Timestamp))
}
error := (*wn.KvStore).PutRows(ctx, prefixKeys, timeStamp, suffixKeys, binaryData)
@ -61,22 +61,22 @@ func (wn *WriteNode) InsertBatchData(ctx context.Context, data []*schema.InsertM
return nil
}
func (wn *WriteNode) DeleteBatchData(ctx context.Context, data []*schema.DeleteMsg, timeSync uint64, wg sync.WaitGroup) error {
func (wn *WriteNode) DeleteBatchData(ctx context.Context, data []*pb.InsertOrDeleteMsg, wg sync.WaitGroup) error {
var segmentInfos []*SegmentIdInfo
var prefixKey string
var prefixKeys [][]byte
var timeStamps []uint64
for i := 0; i < len(data); i++ {
prefixKey = data[i].CollectionName + "-" + strconv.FormatUint(data[i].EntityId, 10)
prefixKey = data[i].CollectionName + "-" + strconv.FormatUint(uint64(data[i].Uid), 10)
prefixKeys = append(prefixKeys, []byte(prefixKey))
timeStamps = append(timeStamps, data[i].Timestamp)
timeStamps = append(timeStamps, uint64(data[i].Timestamp))
}
segmentIds := (*wn.KvStore).GetSegment(ctx, prefixKeys)
for i := 0; i < len(prefixKeys); i++ {
segmentInfos = append(segmentInfos, &SegmentIdInfo{
CollectionName: data[i].CollectionName,
EntityId: data[i].EntityId,
EntityId: data[i].Uid,
SegmentIds: segmentIds,
})
}
@ -89,12 +89,13 @@ func (wn *WriteNode) DeleteBatchData(ctx context.Context, data []*schema.DeleteM
}
func (wn *WriteNode) UpdateTimeSync(timeSync uint64) {
wn.timeSync = timeSync
wn.TimeSync = timeSync
}
func (wn *WriteNode) doWriteNode(ctx context.Context, timeSync uint64, wg sync.WaitGroup) {
func (wn *WriteNode) DoWriteNode(ctx context.Context, timeSync uint64, wg sync.WaitGroup) {
wg.Add(2)
go wn.InsertBatchData(ctx, wn.mc.InsertMsg, timeSync, wg)
go wn.DeleteBatchData(ctx, wn.mc.DeleteMsg, timeSync, wg)
go wn.InsertBatchData(ctx, wn.MessageClient.InsertMsg, wg)
go wn.DeleteBatchData(ctx, wn.MessageClient.DeleteMsg, wg)
wg.Wait()
wn.UpdateTimeSync(timeSync)
}