diff --git a/conf/conf.go b/conf/conf.go index 41f5cc17fe..f97b13e4f4 100644 --- a/conf/conf.go +++ b/conf/conf.go @@ -12,13 +12,11 @@ import ( // yaml.MapSlice type MasterConfig struct { - PulsarURL string + Address string + Port int32 PulsarMoniterInterval int32 PulsarTopic string - EtcdRootPath string SegmentThreshole float32 - DefaultGRPCPort string - EtcdEndPoints []string } type EtcdConfig struct { @@ -41,8 +39,10 @@ type StorageConfig struct { } type PulsarConfig struct { - Address string - Port int32 + Address string + Port int32 + TopicNum int + NodeNum int } //type ProxyConfig struct { @@ -51,12 +51,33 @@ type PulsarConfig struct { // Port int32 //} +type Reader struct { + ClientId int + StopFlag int64 + ReaderQueueSize int + SearchChanSize int + Key2SegChanSize int + InsertTopicStart int + InsertTopicEnd int +} + +type Writer struct { + ClientId int + StopFlag int64 + ReaderQueueSize int + SearchByIdChanSize int + InsertTopicStart int + InsertTopicEnd int +} + type ServerConfig struct { Master MasterConfig Etcd EtcdConfig Timesync TimeSyncConfig Storage StorageConfig Pulsar PulsarConfig + Writer Writer + Reader Reader //Proxy ProxyConfig } diff --git a/conf/config.yaml b/conf/config.yaml index b7bd121951..b7153fa04d 100644 --- a/conf/config.yaml +++ b/conf/config.yaml @@ -10,25 +10,23 @@ # or implied. See the License for the specific language governing permissions and limitations under the License. master: - pulsarurl: "pulsar://localhost:6650" + adress: localhost + port: 53100 pulsarmoniterinterval: 1 pulsartopic: "monitor-topic" - etcdrootpath: "by-dev" segmentthreshole: 10000 - defaultgrpcport: ":53100" - etcdendpoints: ["127.0.0.1:12379"] etcd: address: localhost - port: 2379 + port: 12379 rootpath: by-dev segthreshold: 10000 timesync: - interval: 10 + interval: 400 storage: - driver: MinIO + driver: TIKV address: localhost port: 0 accesskey: ab @@ -37,6 +35,25 @@ storage: pulsar: address: localhost port: 6650 + topicnum: 128 + nodenum: 1 + +reader: + clientid: 1 + stopflag: -1 + readerqueuesize: 1024 + searchchansize: 10000 + key2segchansize: 10000 + inserttopicstart: 0 + inserttopicend: 128 + +writer: + clientid: 1 + stopflag: -2 + readerqueuesize: 1024 + searchbyidchansize: 10000 + insertopicstart: 0 + inserttopicend: 128 proxy: timezone: UTC+8 diff --git a/pkg/master/common/config.go b/pkg/master/common/config.go index fa77ddfaff..844b5b6e8d 100644 --- a/pkg/master/common/config.go +++ b/pkg/master/common/config.go @@ -1,12 +1,10 @@ package common -import "time" - -const ( - PULSAR_URL = "pulsar://localhost:6650" - PULSAR_MONITER_INTERVAL = 1 * time.Second - PULSAR_TOPIC = "monitor-topic" - ETCD_ROOT_PATH = "by-dev" - SEGMENT_THRESHOLE = 10000 - DEFAULT_GRPC_PORT = ":53100" -) +//const ( +// PULSAR_URL = "pulsar://localhost:6650" +// PULSAR_MONITER_INTERVAL = 1 * time.Second +// PULSAR_TOPIC = "monitor-topic" +// ETCD_ROOT_PATH = "by-dev" +// SEGMENT_THRESHOLE = 10000 +// DEFAULT_GRPC_PORT = ":53100" +//) diff --git a/pkg/master/informer/pulsar.go b/pkg/master/informer/pulsar.go index 2a3a5ca85a..e6eaee7c1c 100644 --- a/pkg/master/informer/pulsar.go +++ b/pkg/master/informer/pulsar.go @@ -3,17 +3,22 @@ package informer import ( "context" "fmt" + "github.com/czs007/suvlim/conf" "log" + "strconv" "time" "github.com/apache/pulsar-client-go/pulsar" - "github.com/czs007/suvlim/pkg/master/common" "github.com/czs007/suvlim/pkg/master/mock" ) func NewPulsarClient() PulsarClient { + pulsarAddr := "pulsar://" + pulsarAddr += conf.Config.Pulsar.Address + pulsarAddr += ":" + pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10) client, err := pulsar.NewClient(pulsar.ClientOptions{ - URL: common.PULSAR_URL, + URL: pulsarAddr, OperationTimeout: 30 * time.Second, ConnectionTimeout: 30 * time.Second, }) @@ -32,7 +37,7 @@ type PulsarClient struct { func (pc PulsarClient) Listener(ssChan chan mock.SegmentStats) error { consumer, err := pc.Client.Subscribe(pulsar.ConsumerOptions{ - Topic: common.PULSAR_TOPIC, + Topic: conf.Config.Master.PulsarTopic, SubscriptionName: "my-sub", Type: pulsar.Shared, }) diff --git a/pkg/master/mock/grpc_client_test.go b/pkg/master/mock/grpc_client_test.go index 4b743bddbe..5251ad6179 100644 --- a/pkg/master/mock/grpc_client_test.go +++ b/pkg/master/mock/grpc_client_test.go @@ -1,3 +1,4 @@ + package mock import ( diff --git a/pkg/master/mock/pulsar.go b/pkg/master/mock/pulsar.go index f7d8edadb8..d7f551aa3a 100644 --- a/pkg/master/mock/pulsar.go +++ b/pkg/master/mock/pulsar.go @@ -3,16 +3,21 @@ package mock import ( "context" "fmt" + "github.com/czs007/suvlim/conf" "log" + "strconv" "time" "github.com/apache/pulsar-client-go/pulsar" - "github.com/czs007/suvlim/pkg/master/common" ) func FakePulsarProducer() { + pulsarAddr := "pulsar://" + pulsarAddr += conf.Config.Pulsar.Address + pulsarAddr += ":" + pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10) client, err := pulsar.NewClient(pulsar.ClientOptions{ - URL: common.PULSAR_URL, + URL: pulsarAddr, OperationTimeout: 30 * time.Second, ConnectionTimeout: 30 * time.Second, }) @@ -21,7 +26,7 @@ func FakePulsarProducer() { } producer, err := client.CreateProducer(pulsar.ProducerOptions{ - Topic: common.PULSAR_TOPIC, + Topic: conf.Config.Master.PulsarTopic, }) testSegmentStats, _ := SegmentMarshal(SegmentStats{ SegementID: uint64(1111), diff --git a/pkg/master/server.go b/pkg/master/server.go index 820ddecd6c..0c2b266696 100644 --- a/pkg/master/server.go +++ b/pkg/master/server.go @@ -32,12 +32,15 @@ func Run() { } func SegmentStatsController() { + etcdAddr := conf.Config.Etcd.Address + etcdAddr += ":" + etcdAddr += strconv.FormatInt(int64(conf.Config.Etcd.Port), 10) cli, _ := clientv3.New(clientv3.Config{ - Endpoints: conf.Config.Master.EtcdEndPoints, + Endpoints: []string{etcdAddr}, DialTimeout: 5 * time.Second, }) defer cli.Close() - kvbase := kv.NewEtcdKVBase(cli, conf.Config.Master.EtcdRootPath) + kvbase := kv.NewEtcdKVBase(cli, conf.Config.Etcd.Rootpath) ssChan := make(chan mock.SegmentStats, 10) defer close(ssChan) @@ -138,7 +141,9 @@ func UpdateSegmentStatus(ss mock.SegmentStats, kvbase kv.Base) error { } func GRPCServer(ch chan *messagepb.Mapping) error { - lis, err := net.Listen("tcp", conf.Config.Master.DefaultGRPCPort) + defaultGRPCPort := ":" + defaultGRPCPort += strconv.FormatInt(int64(conf.Config.Master.Port), 10) + lis, err := net.Listen("tcp", defaultGRPCPort) if err != nil { return err } @@ -193,12 +198,15 @@ func (ms GRPCMasterServer) CreateIndex(ctx context.Context, in *messagepb.IndexP // } func CollectionController(ch chan *messagepb.Mapping) { + etcdAddr := conf.Config.Etcd.Address + etcdAddr += ":" + etcdAddr += strconv.FormatInt(int64(conf.Config.Etcd.Port), 10) cli, _ := clientv3.New(clientv3.Config{ - Endpoints: conf.Config.Master.EtcdEndPoints, + Endpoints: []string{etcdAddr}, DialTimeout: 5 * time.Second, }) defer cli.Close() - kvbase := kv.NewEtcdKVBase(cli, conf.Config.Master.EtcdRootPath) + kvbase := kv.NewEtcdKVBase(cli, conf.Config.Etcd.Rootpath) for collection := range ch { sID := id.New().Uint64() cID := id.New().Uint64() @@ -238,12 +246,15 @@ func CollectionController(ch chan *messagepb.Mapping) { } func WriteCollection2Datastore(collection *messagepb.Mapping) error { + etcdAddr := conf.Config.Etcd.Address + etcdAddr += ":" + etcdAddr += strconv.FormatInt(int64(conf.Config.Etcd.Port), 10) cli, _ := clientv3.New(clientv3.Config{ - Endpoints: []string{"127.0.0.1:12379"}, + Endpoints: []string{etcdAddr}, DialTimeout: 5 * time.Second, }) defer cli.Close() - kvbase := kv.NewEtcdKVBase(cli, conf.Config.Master.EtcdRootPath) + kvbase := kv.NewEtcdKVBase(cli, conf.Config.Etcd.Rootpath) sID := id.New().Uint64() cID := id.New().Uint64() fieldMetas := []*messagepb.FieldMeta{} @@ -254,7 +265,7 @@ func WriteCollection2Datastore(collection *messagepb.Mapping) error { time.Now(), fieldMetas, []uint64{sID}, []string{"default"}) cm := mock.GrpcMarshal(&c) - s := mock.NewSegment(sID, cID, collection.CollectionName, "default", 0, 100, time.Now(), time.Unix(1<<36-1, 0)) + s := mock.NewSegment(sID, cID, collection.CollectionName, "default", 0, conf.Config.Pulsar.TopicNum, time.Now(), time.Unix(1<<46-1, 0)) collectionData, err := mock.Collection2JSON(*cm) if err != nil { log.Fatal(err) @@ -280,12 +291,15 @@ func WriteCollection2Datastore(collection *messagepb.Mapping) error { } func UpdateCollectionIndex(index *messagepb.IndexParam) error { + etcdAddr := conf.Config.Etcd.Address + etcdAddr += ":" + etcdAddr += strconv.FormatInt(int64(conf.Config.Etcd.Port), 10) cli, _ := clientv3.New(clientv3.Config{ - Endpoints: conf.Config.Master.EtcdEndPoints, + Endpoints: []string{etcdAddr}, DialTimeout: 5 * time.Second, }) defer cli.Close() - kvbase := kv.NewEtcdKVBase(cli, conf.Config.Master.EtcdRootPath) + kvbase := kv.NewEtcdKVBase(cli, conf.Config.Etcd.Rootpath) collectionName := index.CollectionName c, err := kvbase.Load("collection/" + collectionName) if err != nil { diff --git a/proxy/CMakeLists.txt b/proxy/CMakeLists.txt index 027e7a3dca..7e0ac6f571 100644 --- a/proxy/CMakeLists.txt +++ b/proxy/CMakeLists.txt @@ -172,6 +172,7 @@ config_summary() add_subdirectory( thirdparty ) add_subdirectory( src ) +add_subdirectory(unittest) # Unittest lib if ( BUILD_UNIT_TEST STREQUAL "ON" ) diff --git a/proxy/src/config/ConfigMgr.cpp b/proxy/src/config/ConfigMgr.cpp index 8d05793abf..67cf4d7b8f 100644 --- a/proxy/src/config/ConfigMgr.cpp +++ b/proxy/src/config/ConfigMgr.cpp @@ -88,11 +88,15 @@ ConfigMgr::ConfigMgr() { "localhost", nullptr, nullptr)}, {"pulsar.port", CreateIntegerConfig("pulsar.port", false, 0, 65535, &config.pulsar.port.value, 6650, nullptr, nullptr)}, + {"pulsar.topicnum", CreateIntegerConfig("pulsar.topicnum", false, 0, 1024, &config.pulsar.topicnum.value, + 1024, nullptr, nullptr)}, + {"pulsar.nodenum", CreateIntegerConfig("pulsar.nodenum", false, 0, 1024, &config.pulsar.nodenum.value, + 1, nullptr, nullptr)}, /* master */ {"master.address", CreateStringConfig("master.address", false, &config.master.address.value, "localhost", nullptr, nullptr)}, {"master.port", CreateIntegerConfig("master.port", false, 0, 65535, &config.master.port.value, - 6000, nullptr, nullptr)}, + 53100, nullptr, nullptr)}, /* etcd */ {"etcd.address", CreateStringConfig("etcd.address", false, &config.etcd.address.value, "localhost", nullptr, diff --git a/proxy/src/config/ServerConfig.h b/proxy/src/config/ServerConfig.h index 321e9f9631..ce84b1e4ac 100644 --- a/proxy/src/config/ServerConfig.h +++ b/proxy/src/config/ServerConfig.h @@ -75,11 +75,13 @@ struct ServerConfig { struct Pulsar{ String address{"localhost"}; Integer port{6650}; + Integer topicnum{1024}; + Integer nodenum{1}; }pulsar; struct Master{ String address{"localhost"}; - Integer port{6000}; + Integer port{53100}; }master; struct Etcd{ diff --git a/proxy/src/message_client/ClientV2.cpp b/proxy/src/message_client/ClientV2.cpp index e4f38586f3..d2cb29d825 100644 --- a/proxy/src/message_client/ClientV2.cpp +++ b/proxy/src/message_client/ClientV2.cpp @@ -35,6 +35,7 @@ Status MsgClientV2::Init(const std::string &insert_delete, time_sync_producer_ = std::make_shared(pulsar_client, time_sync); for (auto i = 0; i < mut_parallelism_; i++) { +// std::string topic = insert_delete + "-" + std::to_string(i); paralle_mut_producers_.emplace_back(std::make_shared(pulsar_client, insert_delete, producerConfiguration)); @@ -162,8 +163,8 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request, auto row_count = request.rows_data_size(); auto stats = std::vector(ParallelNum); std::atomic_uint64_t msg_sended = 0; - -#pragma omp parallel for default(none), shared(row_count, request, timestamp, stats, segment_id, msg_sended), num_threads(ParallelNum) + auto topic_num = config.pulsar.topicnum(); +#pragma omp parallel for default(none), shared(row_count, request, timestamp, stats, segment_id, msg_sended, topic_num), num_threads(ParallelNum) for (auto i = 0; i < row_count; i++) { milvus::grpc::InsertOrDeleteMsg mut_msg; int this_thread = omp_get_thread_num(); @@ -174,7 +175,7 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request, mut_msg.set_collection_name(request.collection_name()); mut_msg.set_partition_tag(request.partition_tag()); uint64_t uid = request.entity_id_array(i); - auto channel_id = makeHash(&uid, sizeof(uint64_t)) % 1024; + auto channel_id = makeHash(&uid, sizeof(uint64_t)) % topic_num; try { mut_msg.set_segment_id(segment_id(request.collection_name(), channel_id, timestamp)); printf("%ld \n", mut_msg.segment_id()); @@ -190,6 +191,7 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request, paralle_mut_producers_[this_thread]->sendAsync(mut_msg, callback); } catch (const std::exception &e) { + msg_sended += 1; stats[this_thread] = Status(DB_ERROR, e.what()); } } diff --git a/proxy/src/message_client/Producer.cpp b/proxy/src/message_client/Producer.cpp index 470d0bed18..d49a13b2c7 100644 --- a/proxy/src/message_client/Producer.cpp +++ b/proxy/src/message_client/Producer.cpp @@ -46,7 +46,6 @@ namespace message_client { Result MsgProducer::send(milvus::grpc::InsertOrDeleteMsg &msg) { int32_t channel_id = makeHash(std::to_string(msg.uid())) % 1024; -// std::cout << "partition id := " << channel_id <(etcd_addr, segment_path_, f, true); - SyncMeta(); + return SyncMeta(); } catch (const std::exception &e) { return Status(DB_ERROR, "Init meta error"); @@ -115,11 +115,10 @@ uint64_t MetaWrapper::AskSegmentId(const std::string &collection_name, uint64_t uint64_t open_ts = segment_info.open_timestamp(); uint64_t close_ts = segment_info.close_timestamp(); if (channel_id >= segment_info.channel_start() && channel_id < segment_info.channel_end() - && timestamp >= open_ts << 18 && timestamp < close_ts << 18 - && segment_info.collection_name() == collection_name) { + && timestamp >= (open_ts << 18) && timestamp < (close_ts << 18) + && std::string(segment_info.collection_name()) == collection_name) { return segment_info.segment_id(); } - return 0; } throw std::runtime_error("Can't find eligible segment"); } @@ -166,8 +165,8 @@ Status MetaWrapper::SyncMeta() { int64_t MetaWrapper::CountCollection(const std::string &collection_name) { uint64_t count = 0; // TODO: index to speed up - for (const auto& segment_info : segment_infos_){ - if (segment_info.second.collection_name() == collection_name){ + for (const auto &segment_info : segment_infos_) { + if (segment_info.second.collection_name() == collection_name) { count += segment_info.second.rows(); } } diff --git a/proxy/unittest/message_client/CMakeLists.txt b/proxy/unittest/message_client/CMakeLists.txt index 02977799ff..bf6daa916a 100644 --- a/proxy/unittest/message_client/CMakeLists.txt +++ b/proxy/unittest/message_client/CMakeLists.txt @@ -7,8 +7,10 @@ set( GRPC_SERVICE_FILES set(unittest_srcs unittest_entry.cpp -# consumer_test.cpp producer_test.cpp - get_result_test.cpp) + consumer_test.cpp + producer_test.cpp + get_result_test.cpp + test_pulsar.cpp) add_executable(test_pulsar ${unittest_srcs} ${GRPC_SERVICE_FILES}) @@ -32,6 +34,7 @@ target_link_libraries(test_pulsar grpc++ pthread stdc++ + config ) install(TARGETS test_pulsar DESTINATION unittest) diff --git a/proxy/unittest/message_client/get_result_test.cpp b/proxy/unittest/message_client/get_result_test.cpp index 47ca45f240..d867009cf2 100644 --- a/proxy/unittest/message_client/get_result_test.cpp +++ b/proxy/unittest/message_client/get_result_test.cpp @@ -81,7 +81,7 @@ TEST(CLIENT_CPP, GetResult) { auto status_send = client_v2.SendQueryMessage(request, 10, query_id); milvus::grpc::QueryResult result; - auto status = client_v2.GetQueryResult(query_id, result); + auto status = client_v2.GetQueryResult(query_id, &result); std::cout << result.client_id() << std::endl; for (int k = 0; k < result.distances_size(); ++k) { diff --git a/proxy/unittest/message_client/test_pulsar.cpp b/proxy/unittest/message_client/test_pulsar.cpp index 404f894be7..a465831ea3 100644 --- a/proxy/unittest/message_client/test_pulsar.cpp +++ b/proxy/unittest/message_client/test_pulsar.cpp @@ -1,69 +1,36 @@ -#include "thread" -#include "pulsar/Client.h" +#include +#include +#include "message_client/Producer.h" +#include "grpc/message.pb.h" -using namespace pulsar; -using MyData = milvus::grpc::PMessage; -static const std::string exampleSchema = - "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," - "\"fields\":[{\"name\":\"id\",\"type\":\"string\"}, {\"name\":\"reason\",\"type\":\"string\"}]}"; - -int consumer() { - - Client client("pulsar://localhost:6650"); - - ConsumerConfiguration consumerConf; - Consumer consumer; - consumerConf.setSchema(SchemaInfo(PROTOBUF, "Protobuf", exampleSchema)); - Result result = client.subscribe("topic-proto", "sub-2", consumerConf, consumer); - - if (result != ResultOk) { - std::cout << "Failed to subscribe: " << result << std::endl; - return -1; - } - - Message msg; - for (int i = 0; i < 10; i++) { - consumer.receive(msg); - MyData data; - data.ParseFromString(msg.getDataAsString()); - std::cout << " Received: " << msg - << " with payload '" << data.id() << " " << data.reason() << "'" - << std::endl; - - consumer.acknowledge(msg); - } - - client.close(); - return 0; -} - -int main() { - Client client("pulsar://localhost:6650"); - - Producer producer; - ProducerConfiguration producerConf; - producerConf.setSchema(SchemaInfo(PROTOBUF, "Protobuf", exampleSchema)); - Result result = client.createProducer("pro", producerConf, producer); - - if (result != ResultOk) { - std::cout << "Error creating producer: " << result << std::endl; - return -1; - } - -// std::thread t(consumer); - // Publish 10 messages to the topic - for (int i = 0; i < 1; i++) { - auto data = MyData(); - auto a = new milvus::grpc::A(); - a->set_a(9999); - data.set_allocated_a(a); - data.set_id("999"); - data.set_reason("*****test****"); - Message msg = MessageBuilder().setContent(data.SerializeAsString()).build(); - Result res = producer.send(msg); - std::cout << " Message sent: " << res << std::endl; - } - client.close(); -// t.join(); +TEST(CLIENT_CPP, MultiTopic) { + auto row_count = 10000; + int ParallelNum = 128; + uint64_t timestamp = 0; + // TODO: Get the segment from master + int64_t segment = 0; + auto stats = std::vector(ParallelNum); + std::chrono::milliseconds start = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()); + std::string blob = new char[2000]; + milvus::grpc::RowData data; + data.set_blob(blob); +#pragma omp parallel for default(none), shared(row_count, timestamp, segment, stats, data), num_threads(ParallelNum) + for (auto i = 0; i < row_count; i++) { + milvus::grpc::InsertOrDeleteMsg mut_msg; + int this_thread = omp_get_thread_num(); + mut_msg.set_op(milvus::grpc::OpType::INSERT); + mut_msg.set_uid(i); + mut_msg.set_client_id(0); + mut_msg.set_timestamp(timestamp); + mut_msg.set_collection_name("collection0"); + mut_msg.set_partition_tag("partition0"); + mut_msg.set_segment_id(segment); +// mut_msg.mutable_rows_data()->CopyFrom(&data); +// auto result = paralle_mut_producers_[this_thread]->send(mut_msg); +// if (result != pulsar::ResultOk) { +// stats[this_thread] = result; +// } + } } \ No newline at end of file diff --git a/reader/main.go b/reader/main.go index 9f1ae8c535..f8733d8193 100644 --- a/reader/main.go +++ b/reader/main.go @@ -1,26 +1,16 @@ package main import ( + "github.com/czs007/suvlim/conf" reader "github.com/czs007/suvlim/reader/read_node" - "sync" + "strconv" ) func main() { - pulsarURL := "pulsar://localhost:6650" - - numOfQueryNode := 2 - - go reader.StartQueryNode(pulsarURL, numOfQueryNode, 0) - reader.StartQueryNode(pulsarURL, numOfQueryNode, 1) - - + pulsarAddr := "pulsar://" + pulsarAddr += conf.Config.Pulsar.Address + pulsarAddr += ":" + pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10) + reader.StartQueryNode(pulsarAddr) } -func main2() { - wg := sync.WaitGroup{} - //ctx, cancel := context.WithCancel(context.Background()) - //defer cancel() - wg.Add(1) - reader.StartQueryNode2() - wg.Wait() -} diff --git a/reader/message_client/message_client.go b/reader/message_client/message_client.go index 64245f8497..4698affb72 100644 --- a/reader/message_client/message_client.go +++ b/reader/message_client/message_client.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "github.com/apache/pulsar-client-go/pulsar" + "github.com/czs007/suvlim/conf" masterPb "github.com/czs007/suvlim/pkg/master/grpc/master" msgpb "github.com/czs007/suvlim/pkg/master/grpc/message" timesync "github.com/czs007/suvlim/timesync" @@ -35,7 +36,7 @@ type MessageClient struct { timestampBatchEnd uint64 batchIDLen int - // + //client id MessageClientID int } @@ -151,11 +152,10 @@ func (mc *MessageClient) createClient(url string) pulsar.Client { return client } -func (mc *MessageClient) InitClient(url string, numOfQueryNode int) { - const ChannelNum = 1024 - +func (mc *MessageClient) InitClient(url string) { //create client mc.client = mc.createClient(url) + mc.MessageClientID = conf.Config.Reader.ClientId //create producer mc.searchResultProducer = mc.creatProducer("SearchResult") @@ -166,25 +166,33 @@ func (mc *MessageClient) InitClient(url string, numOfQueryNode int) { mc.key2segConsumer = mc.createConsumer("Key2Seg") // init channel - mc.searchChan = make(chan *msgpb.SearchMsg, 10000) - mc.key2SegChan = make(chan *msgpb.Key2SegMsg, 10000) + mc.searchChan = make(chan *msgpb.SearchMsg, conf.Config.Reader.SearchChanSize) + mc.key2SegChan = make(chan *msgpb.Key2SegMsg, conf.Config.Reader.Key2SegChanSize) mc.InsertOrDeleteMsg = make([]*msgpb.InsertOrDeleteMsg, 0) mc.Key2SegMsg = make([]*msgpb.Key2SegMsg, 0) //init timesync - URL := "pulsar://localhost:6650" timeSyncTopic := "TimeSync" timeSyncSubName := "reader" + strconv.Itoa(mc.MessageClientID) - readTopics := make([]string, 0, ChannelNum) - for i := ChannelNum / numOfQueryNode * mc.MessageClientID; i < ChannelNum/numOfQueryNode*(mc.MessageClientID+1); i++ { + readTopics := make([]string, 0) + for i := conf.Config.Reader.InsertTopicStart; i < conf.Config.Reader.InsertTopicEnd; i++ { str := "InsertOrDelete-partition-" str = str + strconv.Itoa(i) readTopics = append(readTopics, str) } + readSubName := "reader" + strconv.Itoa(mc.MessageClientID) + // TODO::read proxy conf from config.yaml proxyIdList := []int64{0} - timeSync, err := timesync.NewReaderTimeSync(URL, timeSyncTopic, timeSyncSubName, readTopics, readSubName, proxyIdList, 400, -2, timesync.WithReaderQueueSize(ChannelNum)) + readerQueueSize := timesync.WithReaderQueueSize(conf.Config.Reader.ReaderQueueSize) + timeSync, err := timesync.NewReaderTimeSync(timeSyncTopic, + timeSyncSubName, + readTopics, + readSubName, + proxyIdList, + conf.Config.Reader.StopFlag, + readerQueueSize) if err != nil { log.Fatal(err) } diff --git a/reader/read_node/meta.go b/reader/read_node/meta.go index 9dacce17f2..c8e4be19dd 100644 --- a/reader/read_node/meta.go +++ b/reader/read_node/meta.go @@ -104,9 +104,9 @@ func (node *QueryNode) processSegmentCreate(id string, value string) { } printSegmentStruct(segment) - if !isSegmentChannelRangeInQueryNodeChannelRange(segment) { - return - } + //if !isSegmentChannelRangeInQueryNodeChannelRange(segment) { + // return + //} collection := node.GetCollectionByID(segment.CollectionID) if collection != nil { @@ -147,9 +147,9 @@ func (node *QueryNode) processSegmentModify(id string, value string) { } printSegmentStruct(segment) - if !isSegmentChannelRangeInQueryNodeChannelRange(segment) { - return - } + //if !isSegmentChannelRangeInQueryNodeChannelRange(segment) { + // return + //} seg, err := node.GetSegmentBySegmentID(int64(segment.SegmentID)) // todo change to uint64 if seg != nil { @@ -269,7 +269,7 @@ func (node *QueryNode) InitFromMeta() error { } func (node *QueryNode) RunMetaService(ctx context.Context, wg *sync.WaitGroup) { - node.InitFromMeta() + //node.InitFromMeta() metaChan := node.kvBase.WatchWithPrefix("") for { select { diff --git a/reader/read_node/query_node.go b/reader/read_node/query_node.go index 663d44cdd9..7a9cfe7e25 100644 --- a/reader/read_node/query_node.go +++ b/reader/read_node/query_node.go @@ -16,6 +16,7 @@ import "C" import ( "encoding/json" "fmt" + "github.com/stretchr/testify/assert" "log" "sort" "sync" @@ -225,6 +226,8 @@ func (node *QueryNode) RunInsertDelete(wg *sync.WaitGroup) { // TODO: get timeRange from message client var msgLen = node.PrepareBatchMsg() var timeRange = TimeRange{node.messageClient.TimeSyncStart(), node.messageClient.TimeSyncEnd()} + assert.NotEqual(nil, 0, timeRange.timestampMin) + assert.NotEqual(nil, 0, timeRange.timestampMax) if msgLen[0] == 0 && len(node.buffer.InsertDeleteBuffer) <= 0 { continue @@ -232,12 +235,12 @@ func (node *QueryNode) RunInsertDelete(wg *sync.WaitGroup) { node.QueryNodeDataInit() node.MessagesPreprocess(node.messageClient.InsertOrDeleteMsg, timeRange) - fmt.Println("MessagesPreprocess Done") + //fmt.Println("MessagesPreprocess Done") node.WriterDelete() node.PreInsertAndDelete() - fmt.Println("PreInsertAndDelete Done") + //fmt.Println("PreInsertAndDelete Done") node.DoInsertAndDelete() - fmt.Println("DoInsertAndDelete Done") + //fmt.Println("DoInsertAndDelete Done") node.queryNodeTimeSync.UpdateSearchTimeSync(timeRange) } wg.Done() @@ -337,7 +340,6 @@ func (node *QueryNode) MessagesPreprocess(insertDeleteMessages []*msgPb.InsertOr atomic.AddInt32(&node.deletePreprocessData.count, 1) } } else { - fmt.Println("msg timestamp:= ", msg.Timestamp>>18) node.buffer.InsertDeleteBuffer = append(node.buffer.InsertDeleteBuffer, msg) node.buffer.validInsertDeleteBuffer = append(node.buffer.validInsertDeleteBuffer, true) } diff --git a/reader/read_node/reader.go b/reader/read_node/reader.go index 82bf102068..2b0d596d63 100644 --- a/reader/read_node/reader.go +++ b/reader/read_node/reader.go @@ -2,48 +2,28 @@ package reader import ( "context" - "log" - "sync" - "github.com/czs007/suvlim/reader/message_client" + "sync" ) -func StartQueryNode(pulsarURL string, numOfQueryNode int, messageClientID int) { - if messageClientID >= numOfQueryNode { - log.Printf("Illegal channel id") - return - } - - mc := message_client.MessageClient{ - MessageClientID: messageClientID, - } - mc.InitClient(pulsarURL, numOfQueryNode) +func StartQueryNode(pulsarURL string) { + mc := message_client.MessageClient{} + mc.InitClient(pulsarURL) mc.ReceiveMessage() qn := CreateQueryNode(0, 0, &mc) - qn.InitQueryNodeCollection() + ctx := context.Background() // Segments Services - // go qn.SegmentManagementService() + //go qn.SegmentManagementService() go qn.SegmentStatisticService() wg := sync.WaitGroup{} - wg.Add(2) + qn.InitFromMeta() + wg.Add(3) + go qn.RunMetaService(ctx, &wg) go qn.RunInsertDelete(&wg) go qn.RunSearch(&wg) wg.Wait() qn.Close() } - -func StartQueryNode2() { - ctx := context.Background() - qn := CreateQueryNode(0, 0, nil) - //qn.InitQueryNodeCollection() - wg := sync.WaitGroup{} - wg.Add(1) - //go qn.RunInsertDelete(&wg) - //go qn.RunSearch(&wg) - go qn.RunMetaService(ctx, &wg) - wg.Wait() - qn.Close() -} diff --git a/reader/read_node/reader_test.go b/reader/read_node/reader_test.go index 6f3758aeb6..35db89f951 100644 --- a/reader/read_node/reader_test.go +++ b/reader/read_node/reader_test.go @@ -1,15 +1,22 @@ package reader import ( + "github.com/czs007/suvlim/conf" + "strconv" "testing" ) func TestReader_startQueryNode(t *testing.T) { + //pulsarURL := "pulsar://localhost:6650" + pulsarAddr := "pulsar://" + pulsarAddr += conf.Config.Pulsar.Address + pulsarAddr += ":" + pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10) + println(pulsarAddr) + StartQueryNode(pulsarAddr) - pulsarURL := "pulsar://localhost:6650" + //go StartQueryNode(pulsarAddr, 0) + //StartQueryNode(pulsarAddr, 1) - numOfQueryNode := 2 - go StartQueryNode(pulsarURL, numOfQueryNode, 0) - StartQueryNode(pulsarURL, numOfQueryNode, 1) } diff --git a/reader/read_node/segment_service.go b/reader/read_node/segment_service.go index 1e5aa004b8..37bf33665c 100644 --- a/reader/read_node/segment_service.go +++ b/reader/read_node/segment_service.go @@ -34,7 +34,7 @@ func (node *QueryNode) SegmentsManagement() { } func (node *QueryNode) SegmentManagementService() { - sleepMillisecondTime := 200 + sleepMillisecondTime := 1000 fmt.Println("do segments management in ", strconv.Itoa(sleepMillisecondTime), "ms") for { time.Sleep(time.Duration(sleepMillisecondTime) * time.Millisecond) diff --git a/reader/read_node/segment_test.go b/reader/read_node/segment_test.go index 9c2fd653ab..3d83c99d67 100644 --- a/reader/read_node/segment_test.go +++ b/reader/read_node/segment_test.go @@ -354,10 +354,11 @@ func TestSegment_GetMemSize(t *testing.T) { func TestSegment_RealSchemaTest(t *testing.T) { // 1. Construct node, collection, partition and segment - var schemaString = "id: 6873737669791618215\nname: \"collection0\"\nschema: \u003c\n " + - "field_metas: \u003c\n field_name: \"age\"\n type: INT32\n dim: 1\n \u003e\n " + - "field_metas: \u003c\n field_name: \"field_1\"\n type: VECTOR_FLOAT\n dim: 16\n \u003e\n" + - "\u003e\ncreate_time: 1600416765\nsegment_ids: 6873737669791618215\npartition_tags: \"default\"\n" + //var schemaString = "id: 6873737669791618215\nname: \"collection0\"\nschema: \u003c\n " + + // "field_metas: \u003c\n field_name: \"age\"\n type: INT32\n dim: 1\n \u003e\n " + + // "field_metas: \u003c\n field_name: \"field_1\"\n type: VECTOR_FLOAT\n dim: 16\n \u003e\n" + + // "\u003e\ncreate_time: 1600416765\nsegment_ids: 6873737669791618215\npartition_tags: \"default\"\n" + var schemaString = "id: 6875229265736357360\nname: \"collection0\"\nschema: \u003c\n field_metas: \u003c\n field_name: \"field_3\"\n type: INT32\n \u003e\n field_metas: \u003c\n field_name: \"field_vec\"\n type: VECTOR_FLOAT\n \u003e\n\u003e\ncreate_time: 1600764055\nsegment_ids: 6875229265736357360\npartition_tags: \"default\"\n" node := NewQueryNode(0, 0) var collection = node.NewCollection(0, "collection0", schemaString) var partition = collection.NewPartition("partition0") diff --git a/sdk/examples/simple/CreateCollection.cpp b/sdk/examples/simple/CreateCollection.cpp index a3ec51b6d8..9e91c97823 100644 --- a/sdk/examples/simple/CreateCollection.cpp +++ b/sdk/examples/simple/CreateCollection.cpp @@ -18,27 +18,30 @@ int main(int argc , char**argv) { client.Connect(connect_param); milvus::Status stat; - const std::string collectin_name = "collection1"; + const std::string collectin_name = "collection0"; // Create - milvus::FieldPtr field_ptr1 = std::make_shared(); - milvus::FieldPtr field_ptr2 = std::make_shared(); +// milvus::FieldPtr field_ptr1 = std::make_shared(); +// milvus::FieldPtr field_ptr2 = std::make_shared(); milvus::FieldPtr field_ptr3 = std::make_shared(); milvus::FieldPtr field_ptr4 = std::make_shared(); - field_ptr1->field_name = "field_1"; - field_ptr1->field_type = milvus::DataType::INT64; - - field_ptr2->field_name = "field_2"; - field_ptr2->field_type = milvus::DataType::FLOAT; +// field_ptr1->field_name = "field_1"; +// field_ptr1->field_type = milvus::DataType::INT64; +// +// field_ptr2->field_name = "field_2"; +// field_ptr2->field_type = milvus::DataType::FLOAT; field_ptr3->field_name = "field_3"; field_ptr3->field_type = milvus::DataType::INT32; + field_ptr3->dim = 1; field_ptr4->field_name = "field_vec"; field_ptr4->field_type = milvus::DataType::VECTOR_FLOAT; + field_ptr4->dim = 16; - milvus::Mapping mapping = {collectin_name, {field_ptr1, field_ptr2, field_ptr3, field_ptr4}}; +// milvus::Mapping mapping = {collectin_name, {field_ptr1, field_ptr2, field_ptr3, field_ptr4}}; + milvus::Mapping mapping = {collectin_name, {field_ptr3, field_ptr4}}; stat = client.CreateCollection(mapping, "test_extra_params"); diff --git a/sdk/examples/simple/insert.cpp b/sdk/examples/simple/insert.cpp index 87bb31a6a0..1fe54e449d 100644 --- a/sdk/examples/simple/insert.cpp +++ b/sdk/examples/simple/insert.cpp @@ -50,7 +50,7 @@ int main(int argc, char* argv[]) { TestParameters parameters = milvus_sdk::Utils::ParseTestParameters(argc, argv); if (!parameters.is_valid){ - return 0; + return 0; } auto client = milvus::ConnectionImpl(); @@ -66,7 +66,7 @@ main(int argc, char* argv[]) { milvus_sdk::TimeRecorder insert("insert"); for (int j = 0; j < LOOP; ++j) { - auto status = client.Insert("collection1", "tag01", data, ids_array); + auto status = client.Insert("collection0", "tag01", data, ids_array); if (!status.ok()){ return -1; } diff --git a/sdk/examples/simple/search.cpp b/sdk/examples/simple/search.cpp index c58d4a75b0..f15781fa28 100644 --- a/sdk/examples/simple/search.cpp +++ b/sdk/examples/simple/search.cpp @@ -54,7 +54,7 @@ int main(int argc , char**argv) { nlohmann::json vector_param_json; vector_param_json["num_queries"] = 1; vector_param_json["topK"] = TOP_K; - vector_param_json["field_name"] = "fakevec"; + vector_param_json["field_name"] = "field_vec"; std::string vector_param_json_string = vector_param_json.dump(); vectorParam.json_param = vector_param_json_string; @@ -63,7 +63,7 @@ int main(int argc , char**argv) { milvus::TopKQueryResult result; milvus_sdk::TimeRecorder test_search("search"); - auto status = client.Search("collection1", partition_list, "dsl", vectorParam, result); + auto status = client.Search("collection0", partition_list, "dsl", vectorParam, result); return 0; } diff --git a/timesync/readertimesync.go b/timesync/readertimesync.go index c453a95417..4c6201f205 100644 --- a/timesync/readertimesync.go +++ b/timesync/readertimesync.go @@ -4,10 +4,12 @@ import ( "context" "fmt" "github.com/apache/pulsar-client-go/pulsar" + "github.com/czs007/suvlim/conf" pb "github.com/czs007/suvlim/pkg/master/grpc/message" "github.com/golang/protobuf/proto" "log" "sort" + "strconv" "sync" ) @@ -39,10 +41,9 @@ type ReaderTimeSyncCfg struct { insertOrDeleteChan chan *pb.InsertOrDeleteMsg //output insert or delete msg readStopFlagClientId int64 - interval int + interval int64 proxyIdList []int64 readerQueueSize int - TestData int revTimesyncFromReader map[uint64]int @@ -62,16 +63,20 @@ func toMillisecond(ts *pb.TimeSyncMsg) int { } func NewReaderTimeSync( - pulsarAddr string, timeSyncTopic string, timeSyncSubName string, readTopics []string, readSubName string, proxyIdList []int64, - interval int, readStopFlagClientId int64, opts ...ReaderTimeSyncOption, ) (ReaderTimeSync, error) { + pulsarAddr := "pulsar://" + pulsarAddr += conf.Config.Pulsar.Address + pulsarAddr += ":" + pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10) + interval := int64(conf.Config.Timesync.Interval) + //check if proxyId has duplication if len(proxyIdList) == 0 { return nil, fmt.Errorf("proxy id list is empty") @@ -189,7 +194,7 @@ func (r *ReaderTimeSyncCfg) alignTimeSync(ts []*pb.TimeSyncMsg) []*pb.TimeSyncMs curIdx := len(ts) - 1 - i preIdx := len(ts) - i timeGap := toMillisecond(ts[curIdx]) - toMillisecond(ts[preIdx]) - if timeGap >= (r.interval/2) || timeGap <= (-r.interval/2) { + if int64(timeGap) >= (r.interval/2) || int64(timeGap) <= (-r.interval/2) { ts = ts[preIdx:] return ts } diff --git a/timesync/readertimesync_test.go b/timesync/readertimesync_test.go index a2c83b77e8..4d72a8375f 100644 --- a/timesync/readertimesync_test.go +++ b/timesync/readertimesync_test.go @@ -206,14 +206,12 @@ func TestAlignTimeSync5(t *testing.T) { } func TestNewReaderTimeSync(t *testing.T) { - r, err := NewReaderTimeSync(pulsarAddr, - timeSyncTopic, + r, err := NewReaderTimeSync(timeSyncTopic, timeSyncSubName, []string{readerTopic1, readerTopic2, readerTopic3, readerTopic4}, readerSubName, []int64{2, 1}, interval, - readStopFlag, WithReaderQueueSize(8), ) if err != nil { @@ -290,14 +288,12 @@ func TestPulsarClient(t *testing.T) { } func TestReaderTimesync(t *testing.T) { - r, err := NewReaderTimeSync(pulsarAddr, - timeSyncTopic, + r, err := NewReaderTimeSync(timeSyncTopic, timeSyncSubName, []string{readerTopic1, readerTopic2, readerTopic3, readerTopic4}, readerSubName, []int64{2, 1}, interval, - readStopFlag, WithReaderQueueSize(1024), ) if err != nil { @@ -392,25 +388,21 @@ func TestReaderTimesync2(t *testing.T) { go startProxy(pt1, 1, pr1, 1, pr2, 2, 2*time.Second, t) go startProxy(pt2, 2, pr3, 3, pr4, 4, 2*time.Second, t) - r1, _ := NewReaderTimeSync(pulsarAddr, - timeSyncTopic2, + r1, _ := NewReaderTimeSync(timeSyncTopic2, timeSyncSubName1, []string{readerTopic12, readerTopic22, readerTopic32, readerTopic42}, readerSubName1, []int64{2, 1}, interval, - readStopFlag1, WithReaderQueueSize(1024), ) - r2, _ := NewReaderTimeSync(pulsarAddr, - timeSyncTopic2, + r2, _ := NewReaderTimeSync(timeSyncTopic2, timeSyncSubName2, []string{readerTopic12, readerTopic22, readerTopic32, readerTopic42}, readerSubName2, []int64{2, 1}, interval, - readStopFlag2, WithReaderQueueSize(1024), ) @@ -514,14 +506,12 @@ func TestReaderTimesync3(t *testing.T) { } }() - r, err := NewReaderTimeSync(pulsarAddr, - timeSyncTopic3, + r, err := NewReaderTimeSync(timeSyncTopic3, timeSyncSubName3, []string{readerTopic13, readerTopic23, readerTopic33, readerTopic43}, readerSubName3, []int64{1}, interval, - readStopFlag3, WithReaderQueueSize(1024)) if err != nil { t.Fatal(err) diff --git a/writer/main.go b/writer/main.go index 9ea076bb82..7a937054a3 100644 --- a/writer/main.go +++ b/writer/main.go @@ -17,7 +17,6 @@ func main() { pulsarAddr += conf.Config.Pulsar.Address pulsarAddr += ":" pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10) - println(pulsarAddr) mc := message_client.MessageClient{} mc.InitClient(pulsarAddr) @@ -39,7 +38,6 @@ func main() { } //TODO:: start a gorouter for searchById - for { if ctx.Err() != nil { break diff --git a/writer/message_client/message_client.go b/writer/message_client/message_client.go index 22fd0d6fbb..c85e6a3cf6 100644 --- a/writer/message_client/message_client.go +++ b/writer/message_client/message_client.go @@ -3,6 +3,7 @@ package message_client import ( "context" "github.com/apache/pulsar-client-go/pulsar" + "github.com/czs007/suvlim/conf" msgpb "github.com/czs007/suvlim/pkg/master/grpc/message" timesync "github.com/czs007/suvlim/timesync" "github.com/golang/protobuf/proto" @@ -28,6 +29,9 @@ type MessageClient struct { timestampBatchStart uint64 timestampBatchEnd uint64 batchIDLen int + + //client id + MessageClientID int } func (mc *MessageClient) Send(ctx context.Context, msg msgpb.Key2SegMsg) { @@ -107,6 +111,7 @@ func (mc *MessageClient) createClient(url string) pulsar.Client { func (mc *MessageClient) InitClient(url string) { //create client mc.client = mc.createClient(url) + mc.MessageClientID = conf.Config.Writer.ClientId //create producer mc.key2segProducer = mc.creatProducer("Key2Seg") @@ -115,25 +120,32 @@ func (mc *MessageClient) InitClient(url string) { mc.searchByIdConsumer = mc.createConsumer("SearchById") //init channel - mc.searchByIdChan = make(chan *msgpb.EntityIdentity, 10000) + mc.searchByIdChan = make(chan *msgpb.EntityIdentity, conf.Config.Writer.SearchByIdChanSize) //init msg slice mc.InsertMsg = make([]*msgpb.InsertOrDeleteMsg, 0) mc.DeleteMsg = make([]*msgpb.InsertOrDeleteMsg, 0) //init timesync - URL := "pulsar://localhost:6650" timeSyncTopic := "TimeSync" - timeSyncSubName := "writer" - readTopics := make([]string, 0, 1024) - for i := 0; i < 1024; i++ { + timeSyncSubName := "writer" + strconv.Itoa(mc.MessageClientID) + readTopics := make([]string, 0) + for i := conf.Config.Writer.InsertTopicStart; i < conf.Config.Writer.InsertTopicEnd; i++ { str := "InsertOrDelete-partition-" str = str + strconv.Itoa(i) readTopics = append(readTopics, str) } - readSubName := "writer" + readSubName := "writer" + strconv.Itoa(mc.MessageClientID) + // TODO::read proxy conf from config.yaml proxyIdList := []int64{0} - timeSync, err := timesync.NewReaderTimeSync(URL, timeSyncTopic, timeSyncSubName, readTopics, readSubName, proxyIdList, 400, -1, timesync.WithReaderQueueSize(1024)) + readerQueueSize := timesync.WithReaderQueueSize(conf.Config.Reader.ReaderQueueSize) + timeSync, err := timesync.NewReaderTimeSync(timeSyncTopic, + timeSyncSubName, + readTopics, + readSubName, + proxyIdList, + conf.Config.Writer.StopFlag, + readerQueueSize) if err != nil { log.Fatal(err) } diff --git a/writer/write_node/writer_node.go b/writer/write_node/writer_node.go index 00f2706b62..0ecb9eb3e3 100644 --- a/writer/write_node/writer_node.go +++ b/writer/write_node/writer_node.go @@ -78,6 +78,7 @@ func (wn *WriteNode) DeleteBatchData(ctx context.Context, data []*msgpb.InsertOr prefixKeys = append(prefixKeys, []byte(prefixKey)) timeStamps = append(timeStamps, uint64(data[i].Timestamp)) segmentString, _ := (*wn.KvStore).GetSegments(ctx, []byte(prefixKey), uint64(data[i].Timestamp)) + var segmentIds []int64 for _, str := range segmentString { id, err := strconv.ParseInt(str, 10, 64)