Add preInsert and preDelete api

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
bigsheeper 2020-09-09 10:43:41 +08:00 committed by yefu.chen
parent 51c45ced37
commit 1573209846
14 changed files with 100 additions and 36 deletions

View File

@ -34,13 +34,13 @@ class SegmentBase {
virtual int64_t PreInsert(int64_t size) = 0;
virtual Status
Insert(int64_t reserved_offset, int64_t size, const uint64_t* primary_keys, const Timestamp* timestamps, const DogDataChunk& values) = 0;
Insert(int64_t reserved_offset, int64_t size, const int64_t* primary_keys, const Timestamp* timestamps, const DogDataChunk& values) = 0;
virtual int64_t PreDelete(int64_t size) = 0;
// TODO: add id into delete log, possibly bitmap
virtual Status
Delete(int64_t reserved_offset, int64_t size, const uint64_t* primary_keys, const Timestamp* timestamps) = 0;
Delete(int64_t reserved_offset, int64_t size, const int64_t* primary_keys, const Timestamp* timestamps) = 0;
// query contains metadata of
virtual Status

View File

@ -46,11 +46,21 @@ Insert(CSegmentBase c_segment,
dataChunk.sizeof_per_row = sizeof_per_row;
dataChunk.count = count;
auto res = segment->Insert(size, primary_keys, timestamps, dataChunk, std::make_pair(timestamp_min, timestamp_max));
auto res = segment->Insert(reserved_offset, size, primary_keys, timestamps, dataChunk);
return res.code();
}
long int
PreInsert(CSegmentBase c_segment, long int size) {
auto segment = (milvus::dog_segment::SegmentBase*)c_segment;
// TODO: delete print
std::cout << "PreInsert segment " << std::endl;
return segment->PreInsert(size);
}
int
Delete(CSegmentBase c_segment,
long int reserved_offset,
@ -61,11 +71,21 @@ Delete(CSegmentBase c_segment,
unsigned long timestamp_max) {
auto segment = (milvus::dog_segment::SegmentBase*)c_segment;
auto res = segment->Delete(size, primary_keys, timestamps, std::make_pair(timestamp_min, timestamp_max));
auto res = segment->Delete(reserved_offset, size, primary_keys, timestamps);
return res.code();
}
long int
PreDelete(CSegmentBase c_segment, long int size) {
auto segment = (milvus::dog_segment::SegmentBase*)c_segment;
// TODO: delete print
std::cout << "PreDelete segment " << std::endl;
return segment->PreDelete(size);
}
int
Search(CSegmentBase c_segment,
void* fake_query,

View File

@ -28,7 +28,7 @@ Insert(CSegmentBase c_segment,
unsigned long timestamp_max);
long int
PreInsert(long int size);
PreInsert(CSegmentBase c_segment, long int size);
int
Delete(CSegmentBase c_segment,
@ -40,7 +40,7 @@ Delete(CSegmentBase c_segment,
unsigned long timestamp_max);
long int
PreDelete(long int size);
PreDelete(CSegmentBase c_segment, long int size);
int
Search(CSegmentBase c_segment,

View File

@ -2,19 +2,9 @@
#include "pulsar/Result.h"
#include "PartitionPolicy.h"
#include "utils/CommonUtil.h"
#include "config/ServerConfig.h"
namespace milvus::message_client {
MsgClientV2 &MsgClientV2::GetInstance() {
std::string pulsar_server_addr(std::string {"pulsar://"} + config.pulsar.address() + ":" + std::to_string(config.pulsar.port()));
int64_t client_id = 0;
static MsgClientV2 msg_client(client_id, pulsar_server_addr);
return msg_client;
}
MsgClientV2::MsgClientV2(int64_t client_id, std::string &service_url, const pulsar::ClientConfiguration &config)
: client_id_(client_id), service_url_(service_url) {}

View File

@ -8,7 +8,9 @@
namespace milvus::message_client {
class MsgClientV2 {
public:
static MsgClientV2 &GetInstance();
MsgClientV2(int64_t client_id,
std::string &service_url,
const pulsar::ClientConfiguration &config = pulsar::ClientConfiguration());
~MsgClientV2();
// When using MsgClient, make sure it init successfully
@ -29,11 +31,6 @@ class MsgClientV2 {
static milvus::grpc::QueryResult GetQueryResult(int64_t query_id);
private:
MsgClientV2(int64_t client_id,
std::string &service_url,
const pulsar::ClientConfiguration &config = pulsar::ClientConfiguration());
int64_t GetUniqueQId() {
return q_id_.fetch_add(1);
}

View File

@ -47,7 +47,7 @@ set( GRPC_SERVER_FILES ${GRPC_IMPL_FILES}
aux_source_directory( ${MILVUS_ENGINE_SRC}/server/context SERVER_CONTEXT_FILES )
add_library( server STATIC )
add_library( server STATIC MessageWrapper.cpp MessageWrapper.h)
target_sources( server
PRIVATE ${GRPC_SERVER_FILES}
${GRPC_SERVICE_FILES}

View File

@ -0,0 +1,25 @@
#include "MessageWrapper.h"
#include "config/ServerConfig.h"
namespace milvus {
namespace server {
MessageWrapper &MessageWrapper::GetInstance() {
static MessageWrapper wrapper;
return wrapper;
}
Status MessageWrapper::Init() {
std::string pulsar_server_addr
(std::string{"pulsar://"} + config.pulsar.address() + ":" + std::to_string(config.pulsar.port()));
int64_t client_id = 0;
msg_client_ = std::make_shared<message_client::MsgClientV2>(client_id, pulsar_server_addr);
auto status = msg_client_->Init("InsertOrDelete", "Search", "TimeSync", "SearchById", "SearchResult");
return status;
}
const std::shared_ptr<message_client::MsgClientV2> &MessageWrapper::MessageClient() {
return msg_client_;
}
}
}

View File

@ -0,0 +1,25 @@
#pragma once
#include "message_client/ClientV2.h"
namespace milvus {
namespace server {
class MessageWrapper {
public:
static MessageWrapper& GetInstance();
Status Init();
const std::shared_ptr<message_client::MsgClientV2>&
MessageClient();
private:
MessageWrapper() = default;
private:
std::shared_ptr<message_client::MsgClientV2> msg_client_;
};
}
}

View File

@ -32,6 +32,7 @@
#include "utils/Log.h"
#include "utils/SignalHandler.h"
#include "utils/TimeRecorder.h"
#include "MessageWrapper.h"
namespace milvus {
namespace server {
@ -236,7 +237,7 @@ Server::StartService() {
grpc::GrpcServer::GetInstance().Start();
stat = message_client::MsgClientV2::GetInstance().Init("InsertOrDelete", "Search", "TimeSync", "SearchById", "SearchResult");
stat = MessageWrapper::GetInstance().Init();
if (!stat.ok()) {
LOG_SERVER_ERROR_ << "Pulsar message client start service fail: " << stat.message();
goto FAIL;

View File

@ -17,6 +17,7 @@
#include "server/delivery/request/DeleteEntityByIDReq.h"
#include "src/server/delivery/ReqScheduler.h"
#include "server/MessageWrapper.h"
#include <memory>
#include <string>
@ -41,8 +42,8 @@ DeleteEntityByIDReq::Create(const ContextPtr& context, const ::milvus::grpc::Del
Status
DeleteEntityByIDReq::OnExecute() {
auto &msg_client = message_client::MsgClientV2::GetInstance();
Status status = msg_client.SendMutMessage(*request_, timestamp_);
auto &msg_client = MessageWrapper::GetInstance().MessageClient();
Status status = msg_client->SendMutMessage(*request_, timestamp_);
return status;
}

View File

@ -15,6 +15,7 @@
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include "server/delivery/ReqScheduler.h"
#include "server/MessageWrapper.h"
#include <memory>
#include <string>
@ -42,8 +43,8 @@ InsertReq::Create(const ContextPtr &context, const ::milvus::grpc::InsertParam *
Status
InsertReq::OnExecute() {
LOG_SERVER_INFO_ << LogOut("[%s][%ld] ", "insert", 0) << "Execute InsertReq.";
auto &msg_client = message_client::MsgClientV2::GetInstance();
Status status = msg_client.SendMutMessage(*insert_param_, timestamp_);
auto &msg_client = MessageWrapper::GetInstance().MessageClient();
Status status = msg_client->SendMutMessage(*insert_param_, timestamp_);
return status;
}

View File

@ -22,8 +22,8 @@ class TSOracle {
private:
std::mutex mutex_;
uint64_t last_time_stamp_;
uint64_t logical_;
uint64_t last_time_stamp_ = 0;
uint64_t logical_ = 0;
};
}
}

View File

@ -14,3 +14,7 @@
aux_source_directory( ${MILVUS_ENGINE_SRC}/utils UTILS_FILES )
add_library( utils STATIC ${UTILS_FILES} )
target_link_libraries(utils
libboost_filesystem.a
libboost_system.a)

View File

@ -2,7 +2,6 @@ enable_testing()
set( GRPC_SERVICE_FILES
${MILVUS_ENGINE_SRC}/grpc/message.grpc.pb.cc
${MILVUS_ENGINE_SRC}/grpc/message.pb.cc
${MILVUS_ENGINE_SRC}/utils/Status.cpp
)
@ -22,16 +21,17 @@ target_include_directories(test_pulsar PUBLIC ${PROJECT_BINARY_DIR}/thirdparty/a
#add_executable(test_pulsar ${unittest_srcs})
target_link_libraries(test_pulsar
message_client_cpp
pulsarStatic
message_client_cpp
libboost_filesystem.a
libboost_system.a
libboost_serialization.a
gtest
gtest_main
libprotobuf
grpc++_reflection
grpc++
libboost_system.a
libboost_filesystem.a
libboost_serialization.a
pthread
stdc++
)
install(TARGETS test_pulsar DESTINATION unittest)