diff --git a/proxy/src/pulsar/message_client/ClientV2.h b/proxy/src/pulsar/message_client/ClientV2.h index f8e5db541b..c7912ab794 100644 --- a/proxy/src/pulsar/message_client/ClientV2.h +++ b/proxy/src/pulsar/message_client/ClientV2.h @@ -1,6 +1,6 @@ #pragma once -#include "src/utils/Status.h" +#include "utils/Status.h" #include "Producer.h" #include "Consumer.h" #include "grpc/gen-milvus/suvlim.pb.h" diff --git a/proxy/src/pulsar/message_client/Consumer.cpp b/proxy/src/pulsar/message_client/Consumer.cpp index 8c9ea720a3..b208e431ef 100644 --- a/proxy/src/pulsar/message_client/Consumer.cpp +++ b/proxy/src/pulsar/message_client/Consumer.cpp @@ -1,6 +1,6 @@ #include "Consumer.h" -#include "src/grpc/gen-milvus/suvlim.pb.h" +#include "grpc/gen-milvus/suvlim.pb.h" namespace milvus { namespace message_client { diff --git a/proxy/src/pulsar/message_client/Producer.h b/proxy/src/pulsar/message_client/Producer.h index 903c3b54ca..3a357f3590 100644 --- a/proxy/src/pulsar/message_client/Producer.h +++ b/proxy/src/pulsar/message_client/Producer.h @@ -2,7 +2,7 @@ #include "pulsar/Producer.h" #include "Client.h" -#include "src/grpc/gen-milvus/suvlim.pb.h" +#include "grpc/gen-milvus/suvlim.pb.h" namespace milvus { namespace message_client { diff --git a/proxy/src/server/CMakeLists.txt b/proxy/src/server/CMakeLists.txt index d7065c76c6..c3133fbca7 100644 --- a/proxy/src/server/CMakeLists.txt +++ b/proxy/src/server/CMakeLists.txt @@ -26,6 +26,7 @@ aux_source_directory( ${MILVUS_ENGINE_SRC}/server/init SERVER_INIT_ aux_source_directory( ${MILVUS_ENGINE_SRC}/server/delivery/request DELIVERY_REQUEST_FILES ) aux_source_directory( ${MILVUS_ENGINE_SRC}/server/delivery/strategy DELIVERY_STRATEGY_FILES ) aux_source_directory( ${MILVUS_ENGINE_SRC}/server/delivery DELIVERY_FILES ) +aux_source_directory( ${MILVUS_ENGINE_SRC}/server/timesync TIME_SYNC_FILES ) set( SERVER_FILES ${SERVER_INIT_FILES} ${SERVER_SERVICE_FILES} @@ -33,6 +34,7 @@ set( SERVER_FILES ${SERVER_INIT_FILES} ${DELIVERY_REQUEST_FILES} ${DELIVERY_STRATEGY_FILES} ${DELIVERY_FILES} + ${TIME_SYNC_FILES} ) aux_source_directory( ${MILVUS_ENGINE_SRC}/server/grpc_impl GRPC_IMPL_FILES ) diff --git a/proxy/src/server/timesync/TimeSync.cpp b/proxy/src/server/timesync/TimeSync.cpp new file mode 100644 index 0000000000..b60e06674c --- /dev/null +++ b/proxy/src/server/timesync/TimeSync.cpp @@ -0,0 +1,68 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#include +#include "TimeSync.h" +#include "pulsar/message_client/Producer.h" + +namespace milvus { +namespace timesync { + +TimeSync::TimeSync(int64_t id, + std::function timestamp, + const int interval, + const std::string &pulsar_addr, + const std::string &time_sync_topic) : + timestamp_(timestamp), interval_(interval), pulsar_addr_(pulsar_addr), time_sync_topic_(time_sync_topic) { + sync_msg_.set_peer_id(id); + auto timer = [&]() { + std::shared_ptr + client = std::make_shared(this->pulsar_addr_); + milvus::message_client::MsgProducer producer(client, this->time_sync_topic_); + + for (;;) { + if (this->stop_) break; + this->sync_msg_.set_timestamp(this->timestamp_()); + //TODO, set msg type + //this->sync_msg_.set_msgtype(); + auto rst = producer.send(sync_msg_.SerializeAsString()); + if (rst != pulsar::ResultOk) { + //TODO, add log + } + std::this_thread::sleep_for(std::chrono::milliseconds(this->interval_)); + } + auto rst = producer.close(); + if (rst != pulsar::ResultOk) { + //TODO, add log or throw exception + } + rst = client->close(); + if (rst != pulsar::ResultOk) { + //TODO, add log or throw exception + } + }; + timer_ = std::thread(timer); +} + +TimeSync::~TimeSync() { + stop_ = true; + timer_.join(); +} + +void TimeSync::Stop() { + stop_ = true; +} + +bool TimeSync::IsStop() const { + return stop_; +} + +} // namespace timesync +} // namespace milvus \ No newline at end of file diff --git a/proxy/src/server/timesync/TimeSync.h b/proxy/src/server/timesync/TimeSync.h new file mode 100644 index 0000000000..44b58837b4 --- /dev/null +++ b/proxy/src/server/timesync/TimeSync.h @@ -0,0 +1,45 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include "suvlim.pb.h" + +namespace milvus { +namespace timesync { + +class TimeSync { + public: + TimeSync(int64_t id, + std::function timestamp, + const int interval, + const std::string &pulsar_addr, + const std::string &time_sync_topic); + virtual ~TimeSync(); + + void Stop(); + bool IsStop() const; + private: + std::function timestamp_; + const int interval_; + const std::string pulsar_addr_; + const std::string time_sync_topic_; + bool stop_ = false; + std::thread timer_; + milvus::grpc::TimeSyncMsg sync_msg_; +}; + +} // namespace timesync +} // namespace milvus \ No newline at end of file