From a32cd72ba9dcb0688bb22e126824d65a4efe99be Mon Sep 17 00:00:00 2001 From: neza2017 Date: Sat, 5 Sep 2020 10:04:22 +0800 Subject: [PATCH] Add time sync Signed-off-by: neza2017 --- proxy/src/pulsar/message_client/ClientV2.h | 2 +- proxy/src/pulsar/message_client/Consumer.cpp | 2 +- proxy/src/pulsar/message_client/Producer.h | 2 +- proxy/src/server/CMakeLists.txt | 2 + proxy/src/server/timesync/TimeSync.cpp | 68 ++++++++++++++++++++ proxy/src/server/timesync/TimeSync.h | 45 +++++++++++++ 6 files changed, 118 insertions(+), 3 deletions(-) create mode 100644 proxy/src/server/timesync/TimeSync.cpp create mode 100644 proxy/src/server/timesync/TimeSync.h diff --git a/proxy/src/pulsar/message_client/ClientV2.h b/proxy/src/pulsar/message_client/ClientV2.h index f8e5db541b..c7912ab794 100644 --- a/proxy/src/pulsar/message_client/ClientV2.h +++ b/proxy/src/pulsar/message_client/ClientV2.h @@ -1,6 +1,6 @@ #pragma once -#include "src/utils/Status.h" +#include "utils/Status.h" #include "Producer.h" #include "Consumer.h" #include "grpc/gen-milvus/suvlim.pb.h" diff --git a/proxy/src/pulsar/message_client/Consumer.cpp b/proxy/src/pulsar/message_client/Consumer.cpp index 8c9ea720a3..b208e431ef 100644 --- a/proxy/src/pulsar/message_client/Consumer.cpp +++ b/proxy/src/pulsar/message_client/Consumer.cpp @@ -1,6 +1,6 @@ #include "Consumer.h" -#include "src/grpc/gen-milvus/suvlim.pb.h" +#include "grpc/gen-milvus/suvlim.pb.h" namespace milvus { namespace message_client { diff --git a/proxy/src/pulsar/message_client/Producer.h b/proxy/src/pulsar/message_client/Producer.h index 903c3b54ca..3a357f3590 100644 --- a/proxy/src/pulsar/message_client/Producer.h +++ b/proxy/src/pulsar/message_client/Producer.h @@ -2,7 +2,7 @@ #include "pulsar/Producer.h" #include "Client.h" -#include "src/grpc/gen-milvus/suvlim.pb.h" +#include "grpc/gen-milvus/suvlim.pb.h" namespace milvus { namespace message_client { diff --git a/proxy/src/server/CMakeLists.txt b/proxy/src/server/CMakeLists.txt index d7065c76c6..c3133fbca7 100644 --- a/proxy/src/server/CMakeLists.txt +++ b/proxy/src/server/CMakeLists.txt @@ -26,6 +26,7 @@ aux_source_directory( ${MILVUS_ENGINE_SRC}/server/init SERVER_INIT_ aux_source_directory( ${MILVUS_ENGINE_SRC}/server/delivery/request DELIVERY_REQUEST_FILES ) aux_source_directory( ${MILVUS_ENGINE_SRC}/server/delivery/strategy DELIVERY_STRATEGY_FILES ) aux_source_directory( ${MILVUS_ENGINE_SRC}/server/delivery DELIVERY_FILES ) +aux_source_directory( ${MILVUS_ENGINE_SRC}/server/timesync TIME_SYNC_FILES ) set( SERVER_FILES ${SERVER_INIT_FILES} ${SERVER_SERVICE_FILES} @@ -33,6 +34,7 @@ set( SERVER_FILES ${SERVER_INIT_FILES} ${DELIVERY_REQUEST_FILES} ${DELIVERY_STRATEGY_FILES} ${DELIVERY_FILES} + ${TIME_SYNC_FILES} ) aux_source_directory( ${MILVUS_ENGINE_SRC}/server/grpc_impl GRPC_IMPL_FILES ) diff --git a/proxy/src/server/timesync/TimeSync.cpp b/proxy/src/server/timesync/TimeSync.cpp new file mode 100644 index 0000000000..b60e06674c --- /dev/null +++ b/proxy/src/server/timesync/TimeSync.cpp @@ -0,0 +1,68 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#include +#include "TimeSync.h" +#include "pulsar/message_client/Producer.h" + +namespace milvus { +namespace timesync { + +TimeSync::TimeSync(int64_t id, + std::function timestamp, + const int interval, + const std::string &pulsar_addr, + const std::string &time_sync_topic) : + timestamp_(timestamp), interval_(interval), pulsar_addr_(pulsar_addr), time_sync_topic_(time_sync_topic) { + sync_msg_.set_peer_id(id); + auto timer = [&]() { + std::shared_ptr + client = std::make_shared(this->pulsar_addr_); + milvus::message_client::MsgProducer producer(client, this->time_sync_topic_); + + for (;;) { + if (this->stop_) break; + this->sync_msg_.set_timestamp(this->timestamp_()); + //TODO, set msg type + //this->sync_msg_.set_msgtype(); + auto rst = producer.send(sync_msg_.SerializeAsString()); + if (rst != pulsar::ResultOk) { + //TODO, add log + } + std::this_thread::sleep_for(std::chrono::milliseconds(this->interval_)); + } + auto rst = producer.close(); + if (rst != pulsar::ResultOk) { + //TODO, add log or throw exception + } + rst = client->close(); + if (rst != pulsar::ResultOk) { + //TODO, add log or throw exception + } + }; + timer_ = std::thread(timer); +} + +TimeSync::~TimeSync() { + stop_ = true; + timer_.join(); +} + +void TimeSync::Stop() { + stop_ = true; +} + +bool TimeSync::IsStop() const { + return stop_; +} + +} // namespace timesync +} // namespace milvus \ No newline at end of file diff --git a/proxy/src/server/timesync/TimeSync.h b/proxy/src/server/timesync/TimeSync.h new file mode 100644 index 0000000000..44b58837b4 --- /dev/null +++ b/proxy/src/server/timesync/TimeSync.h @@ -0,0 +1,45 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include "suvlim.pb.h" + +namespace milvus { +namespace timesync { + +class TimeSync { + public: + TimeSync(int64_t id, + std::function timestamp, + const int interval, + const std::string &pulsar_addr, + const std::string &time_sync_topic); + virtual ~TimeSync(); + + void Stop(); + bool IsStop() const; + private: + std::function timestamp_; + const int interval_; + const std::string pulsar_addr_; + const std::string time_sync_topic_; + bool stop_ = false; + std::thread timer_; + milvus::grpc::TimeSyncMsg sync_msg_; +}; + +} // namespace timesync +} // namespace milvus \ No newline at end of file