From 1573209846f7f60228431f73f34ea6c23a348c12 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Wed, 9 Sep 2020 10:43:41 +0800 Subject: [PATCH] Add preInsert and preDelete api Signed-off-by: bigsheeper --- core/src/dog_segment/SegmentBase.h | 4 +-- core/src/dog_segment/segment_c.cpp | 24 ++++++++++++++++-- core/src/dog_segment/segment_c.h | 4 +-- proxy/src/message_client/ClientV2.cpp | 10 -------- proxy/src/message_client/ClientV2.h | 9 +++---- proxy/src/server/CMakeLists.txt | 2 +- proxy/src/server/MessageWrapper.cpp | 25 +++++++++++++++++++ proxy/src/server/MessageWrapper.h | 25 +++++++++++++++++++ proxy/src/server/Server.cpp | 3 ++- .../delivery/request/DeleteEntityByIDReq.cpp | 5 ++-- .../src/server/delivery/request/InsertReq.cpp | 5 ++-- proxy/src/server/tso/TSO.h | 4 +-- proxy/src/utils/CMakeLists.txt | 4 +++ proxy/unittest/message_client/CMakeLists.txt | 12 ++++----- 14 files changed, 100 insertions(+), 36 deletions(-) create mode 100644 proxy/src/server/MessageWrapper.cpp create mode 100644 proxy/src/server/MessageWrapper.h diff --git a/core/src/dog_segment/SegmentBase.h b/core/src/dog_segment/SegmentBase.h index 2fbbdfc676..d1cf71fb39 100644 --- a/core/src/dog_segment/SegmentBase.h +++ b/core/src/dog_segment/SegmentBase.h @@ -34,13 +34,13 @@ class SegmentBase { virtual int64_t PreInsert(int64_t size) = 0; virtual Status - Insert(int64_t reserved_offset, int64_t size, const uint64_t* primary_keys, const Timestamp* timestamps, const DogDataChunk& values) = 0; + Insert(int64_t reserved_offset, int64_t size, const int64_t* primary_keys, const Timestamp* timestamps, const DogDataChunk& values) = 0; virtual int64_t PreDelete(int64_t size) = 0; // TODO: add id into delete log, possibly bitmap virtual Status - Delete(int64_t reserved_offset, int64_t size, const uint64_t* primary_keys, const Timestamp* timestamps) = 0; + Delete(int64_t reserved_offset, int64_t size, const int64_t* primary_keys, const Timestamp* timestamps) = 0; // query contains metadata of virtual Status diff --git a/core/src/dog_segment/segment_c.cpp b/core/src/dog_segment/segment_c.cpp index c6705a297d..3109113407 100644 --- a/core/src/dog_segment/segment_c.cpp +++ b/core/src/dog_segment/segment_c.cpp @@ -46,11 +46,21 @@ Insert(CSegmentBase c_segment, dataChunk.sizeof_per_row = sizeof_per_row; dataChunk.count = count; - auto res = segment->Insert(size, primary_keys, timestamps, dataChunk, std::make_pair(timestamp_min, timestamp_max)); + auto res = segment->Insert(reserved_offset, size, primary_keys, timestamps, dataChunk); return res.code(); } +long int +PreInsert(CSegmentBase c_segment, long int size) { + auto segment = (milvus::dog_segment::SegmentBase*)c_segment; + + // TODO: delete print + std::cout << "PreInsert segment " << std::endl; + return segment->PreInsert(size); +} + + int Delete(CSegmentBase c_segment, long int reserved_offset, @@ -61,11 +71,21 @@ Delete(CSegmentBase c_segment, unsigned long timestamp_max) { auto segment = (milvus::dog_segment::SegmentBase*)c_segment; - auto res = segment->Delete(size, primary_keys, timestamps, std::make_pair(timestamp_min, timestamp_max)); + auto res = segment->Delete(reserved_offset, size, primary_keys, timestamps); return res.code(); } +long int +PreDelete(CSegmentBase c_segment, long int size) { + auto segment = (milvus::dog_segment::SegmentBase*)c_segment; + + // TODO: delete print + std::cout << "PreDelete segment " << std::endl; + return segment->PreDelete(size); +} + + int Search(CSegmentBase c_segment, void* fake_query, diff --git a/core/src/dog_segment/segment_c.h b/core/src/dog_segment/segment_c.h index 9714c2bd78..40645f0764 100644 --- a/core/src/dog_segment/segment_c.h +++ b/core/src/dog_segment/segment_c.h @@ -28,7 +28,7 @@ Insert(CSegmentBase c_segment, unsigned long timestamp_max); long int -PreInsert(long int size); +PreInsert(CSegmentBase c_segment, long int size); int Delete(CSegmentBase c_segment, @@ -40,7 +40,7 @@ Delete(CSegmentBase c_segment, unsigned long timestamp_max); long int -PreDelete(long int size); +PreDelete(CSegmentBase c_segment, long int size); int Search(CSegmentBase c_segment, diff --git a/proxy/src/message_client/ClientV2.cpp b/proxy/src/message_client/ClientV2.cpp index 3ef0d8a004..823d1010bc 100644 --- a/proxy/src/message_client/ClientV2.cpp +++ b/proxy/src/message_client/ClientV2.cpp @@ -2,19 +2,9 @@ #include "pulsar/Result.h" #include "PartitionPolicy.h" #include "utils/CommonUtil.h" -#include "config/ServerConfig.h" - namespace milvus::message_client { -MsgClientV2 &MsgClientV2::GetInstance() { - std::string pulsar_server_addr(std::string {"pulsar://"} + config.pulsar.address() + ":" + std::to_string(config.pulsar.port())); - - int64_t client_id = 0; - static MsgClientV2 msg_client(client_id, pulsar_server_addr); - return msg_client; -} - MsgClientV2::MsgClientV2(int64_t client_id, std::string &service_url, const pulsar::ClientConfiguration &config) : client_id_(client_id), service_url_(service_url) {} diff --git a/proxy/src/message_client/ClientV2.h b/proxy/src/message_client/ClientV2.h index 5e97565102..6cc0ac1385 100644 --- a/proxy/src/message_client/ClientV2.h +++ b/proxy/src/message_client/ClientV2.h @@ -8,7 +8,9 @@ namespace milvus::message_client { class MsgClientV2 { public: - static MsgClientV2 &GetInstance(); + MsgClientV2(int64_t client_id, + std::string &service_url, + const pulsar::ClientConfiguration &config = pulsar::ClientConfiguration()); ~MsgClientV2(); // When using MsgClient, make sure it init successfully @@ -29,11 +31,6 @@ class MsgClientV2 { static milvus::grpc::QueryResult GetQueryResult(int64_t query_id); private: - - MsgClientV2(int64_t client_id, - std::string &service_url, - const pulsar::ClientConfiguration &config = pulsar::ClientConfiguration()); - int64_t GetUniqueQId() { return q_id_.fetch_add(1); } diff --git a/proxy/src/server/CMakeLists.txt b/proxy/src/server/CMakeLists.txt index e35c79f728..09e6d8dee2 100644 --- a/proxy/src/server/CMakeLists.txt +++ b/proxy/src/server/CMakeLists.txt @@ -47,7 +47,7 @@ set( GRPC_SERVER_FILES ${GRPC_IMPL_FILES} aux_source_directory( ${MILVUS_ENGINE_SRC}/server/context SERVER_CONTEXT_FILES ) -add_library( server STATIC ) +add_library( server STATIC MessageWrapper.cpp MessageWrapper.h) target_sources( server PRIVATE ${GRPC_SERVER_FILES} ${GRPC_SERVICE_FILES} diff --git a/proxy/src/server/MessageWrapper.cpp b/proxy/src/server/MessageWrapper.cpp new file mode 100644 index 0000000000..3da6328c6f --- /dev/null +++ b/proxy/src/server/MessageWrapper.cpp @@ -0,0 +1,25 @@ +#include "MessageWrapper.h" +#include "config/ServerConfig.h" + +namespace milvus { +namespace server { + +MessageWrapper &MessageWrapper::GetInstance() { + static MessageWrapper wrapper; + return wrapper; +} + +Status MessageWrapper::Init() { + std::string pulsar_server_addr + (std::string{"pulsar://"} + config.pulsar.address() + ":" + std::to_string(config.pulsar.port())); + int64_t client_id = 0; + msg_client_ = std::make_shared(client_id, pulsar_server_addr); + auto status = msg_client_->Init("InsertOrDelete", "Search", "TimeSync", "SearchById", "SearchResult"); + return status; +} +const std::shared_ptr &MessageWrapper::MessageClient() { + return msg_client_; +} + +} +} \ No newline at end of file diff --git a/proxy/src/server/MessageWrapper.h b/proxy/src/server/MessageWrapper.h new file mode 100644 index 0000000000..b9c008c3a0 --- /dev/null +++ b/proxy/src/server/MessageWrapper.h @@ -0,0 +1,25 @@ +#pragma once +#include "message_client/ClientV2.h" + +namespace milvus { +namespace server { + +class MessageWrapper { + + public: + static MessageWrapper& GetInstance(); + + Status Init(); + + const std::shared_ptr& + MessageClient(); + + private: + MessageWrapper() = default; + + private: + std::shared_ptr msg_client_; +}; + +} +} \ No newline at end of file diff --git a/proxy/src/server/Server.cpp b/proxy/src/server/Server.cpp index f04652a802..a7b1898a29 100644 --- a/proxy/src/server/Server.cpp +++ b/proxy/src/server/Server.cpp @@ -32,6 +32,7 @@ #include "utils/Log.h" #include "utils/SignalHandler.h" #include "utils/TimeRecorder.h" +#include "MessageWrapper.h" namespace milvus { namespace server { @@ -236,7 +237,7 @@ Server::StartService() { grpc::GrpcServer::GetInstance().Start(); - stat = message_client::MsgClientV2::GetInstance().Init("InsertOrDelete", "Search", "TimeSync", "SearchById", "SearchResult"); + stat = MessageWrapper::GetInstance().Init(); if (!stat.ok()) { LOG_SERVER_ERROR_ << "Pulsar message client start service fail: " << stat.message(); goto FAIL; diff --git a/proxy/src/server/delivery/request/DeleteEntityByIDReq.cpp b/proxy/src/server/delivery/request/DeleteEntityByIDReq.cpp index 1c599102b1..fd36d5b14d 100644 --- a/proxy/src/server/delivery/request/DeleteEntityByIDReq.cpp +++ b/proxy/src/server/delivery/request/DeleteEntityByIDReq.cpp @@ -17,6 +17,7 @@ #include "server/delivery/request/DeleteEntityByIDReq.h" #include "src/server/delivery/ReqScheduler.h" +#include "server/MessageWrapper.h" #include #include @@ -41,8 +42,8 @@ DeleteEntityByIDReq::Create(const ContextPtr& context, const ::milvus::grpc::Del Status DeleteEntityByIDReq::OnExecute() { - auto &msg_client = message_client::MsgClientV2::GetInstance(); - Status status = msg_client.SendMutMessage(*request_, timestamp_); + auto &msg_client = MessageWrapper::GetInstance().MessageClient(); + Status status = msg_client->SendMutMessage(*request_, timestamp_); return status; } diff --git a/proxy/src/server/delivery/request/InsertReq.cpp b/proxy/src/server/delivery/request/InsertReq.cpp index 89cecd5f29..3b7ce7fe97 100644 --- a/proxy/src/server/delivery/request/InsertReq.cpp +++ b/proxy/src/server/delivery/request/InsertReq.cpp @@ -15,6 +15,7 @@ #include "utils/Log.h" #include "utils/TimeRecorder.h" #include "server/delivery/ReqScheduler.h" +#include "server/MessageWrapper.h" #include #include @@ -42,8 +43,8 @@ InsertReq::Create(const ContextPtr &context, const ::milvus::grpc::InsertParam * Status InsertReq::OnExecute() { LOG_SERVER_INFO_ << LogOut("[%s][%ld] ", "insert", 0) << "Execute InsertReq."; - auto &msg_client = message_client::MsgClientV2::GetInstance(); - Status status = msg_client.SendMutMessage(*insert_param_, timestamp_); + auto &msg_client = MessageWrapper::GetInstance().MessageClient(); + Status status = msg_client->SendMutMessage(*insert_param_, timestamp_); return status; } diff --git a/proxy/src/server/tso/TSO.h b/proxy/src/server/tso/TSO.h index 9031748ee8..159e36f5f7 100644 --- a/proxy/src/server/tso/TSO.h +++ b/proxy/src/server/tso/TSO.h @@ -22,8 +22,8 @@ class TSOracle { private: std::mutex mutex_; - uint64_t last_time_stamp_; - uint64_t logical_; + uint64_t last_time_stamp_ = 0; + uint64_t logical_ = 0; }; } } \ No newline at end of file diff --git a/proxy/src/utils/CMakeLists.txt b/proxy/src/utils/CMakeLists.txt index 9e6a297115..8661cc00bd 100644 --- a/proxy/src/utils/CMakeLists.txt +++ b/proxy/src/utils/CMakeLists.txt @@ -14,3 +14,7 @@ aux_source_directory( ${MILVUS_ENGINE_SRC}/utils UTILS_FILES ) add_library( utils STATIC ${UTILS_FILES} ) + +target_link_libraries(utils + libboost_filesystem.a + libboost_system.a) \ No newline at end of file diff --git a/proxy/unittest/message_client/CMakeLists.txt b/proxy/unittest/message_client/CMakeLists.txt index 89fae77fbd..02977799ff 100644 --- a/proxy/unittest/message_client/CMakeLists.txt +++ b/proxy/unittest/message_client/CMakeLists.txt @@ -2,7 +2,6 @@ enable_testing() set( GRPC_SERVICE_FILES ${MILVUS_ENGINE_SRC}/grpc/message.grpc.pb.cc ${MILVUS_ENGINE_SRC}/grpc/message.pb.cc - ${MILVUS_ENGINE_SRC}/utils/Status.cpp ) @@ -22,16 +21,17 @@ target_include_directories(test_pulsar PUBLIC ${PROJECT_BINARY_DIR}/thirdparty/a #add_executable(test_pulsar ${unittest_srcs}) target_link_libraries(test_pulsar - message_client_cpp - pulsarStatic + message_client_cpp + libboost_filesystem.a + libboost_system.a + libboost_serialization.a gtest gtest_main libprotobuf grpc++_reflection grpc++ - libboost_system.a - libboost_filesystem.a - libboost_serialization.a + pthread + stdc++ ) install(TARGETS test_pulsar DESTINATION unittest)