From f53ab54c5d3a53b71676724dbd200ef24edde868 Mon Sep 17 00:00:00 2001 From: chyezh Date: Sun, 9 Jun 2024 22:55:53 +0800 Subject: [PATCH] enhance: async cgo utility (#33133) issue: #30926, #33132 - implement future-based cgo utility. --------- Signed-off-by: chyezh --- Makefile | 4 +- internal/core/CMakeLists.txt | 5 + internal/core/src/CMakeLists.txt | 1 + internal/core/src/common/EasyAssert.h | 11 +- internal/core/src/futures/CMakeLists.txt | 24 ++ internal/core/src/futures/Executor.cpp | 45 +++ internal/core/src/futures/Executor.h | 40 +++ internal/core/src/futures/Future.h | 226 ++++++++++++++ internal/core/src/futures/LeakyResult.h | 112 +++++++ internal/core/src/futures/Ready.h | 97 ++++++ internal/core/src/futures/future_c.cpp | 51 ++++ internal/core/src/futures/future_c.h | 44 +++ internal/core/src/futures/future_c_types.h | 26 ++ .../core/src/futures/future_test_case_c.cpp | 42 +++ .../core/src/futures/milvus_futures.pc.in | 9 + internal/core/unittest/CMakeLists.txt | 1 + internal/core/unittest/test_futures.cpp | 210 +++++++++++++ internal/util/cgo/errors.go | 27 ++ internal/util/cgo/futures.go | 190 ++++++++++++ internal/util/cgo/futures_test.go | 278 ++++++++++++++++++ internal/util/cgo/futures_test_case.go | 57 ++++ internal/util/cgo/manager_active.go | 124 ++++++++ internal/util/cgo/options.go | 32 ++ internal/util/cgo/pool.go | 42 +++ internal/util/cgo/state.go | 38 +++ pkg/metrics/cgo_metrics.go | 66 +++++ pkg/util/merr/errors.go | 8 +- scripts/run_go_unittest.sh | 1 + 28 files changed, 1802 insertions(+), 9 deletions(-) create mode 100644 internal/core/src/futures/CMakeLists.txt create mode 100644 internal/core/src/futures/Executor.cpp create mode 100644 internal/core/src/futures/Executor.h create mode 100644 internal/core/src/futures/Future.h create mode 100644 internal/core/src/futures/LeakyResult.h create mode 100644 internal/core/src/futures/Ready.h create mode 100644 internal/core/src/futures/future_c.cpp create mode 100644 internal/core/src/futures/future_c.h create mode 100644 internal/core/src/futures/future_c_types.h create mode 100644 internal/core/src/futures/future_test_case_c.cpp create mode 100644 internal/core/src/futures/milvus_futures.pc.in create mode 100644 internal/core/unittest/test_futures.cpp create mode 100644 internal/util/cgo/errors.go create mode 100644 internal/util/cgo/futures.go create mode 100644 internal/util/cgo/futures_test.go create mode 100644 internal/util/cgo/futures_test_case.go create mode 100644 internal/util/cgo/manager_active.go create mode 100644 internal/util/cgo/options.go create mode 100644 internal/util/cgo/pool.go create mode 100644 internal/util/cgo/state.go create mode 100644 pkg/metrics/cgo_metrics.go diff --git a/Makefile b/Makefile index b02415b316..058eafe8c0 100644 --- a/Makefile +++ b/Makefile @@ -159,8 +159,8 @@ lint-fix: getdeps #TODO: Check code specifications by golangci-lint static-check: getdeps @echo "Running $@ check" - @source $(PWD)/scripts/setenv.sh && GO111MODULE=on $(INSTALL_PATH)/golangci-lint run --timeout=30m --config $(PWD)/.golangci.yml - @source $(PWD)/scripts/setenv.sh && cd pkg && GO111MODULE=on $(INSTALL_PATH)/golangci-lint run --timeout=30m --config $(PWD)/.golangci.yml + @source $(PWD)/scripts/setenv.sh && GO111MODULE=on $(INSTALL_PATH)/golangci-lint run --build-tags dynamic,test --timeout=30m --config $(PWD)/.golangci.yml + @source $(PWD)/scripts/setenv.sh && cd pkg && GO111MODULE=on $(INSTALL_PATH)/golangci-lint run --build-tags dynamic,test --timeout=30m --config $(PWD)/.golangci.yml @source $(PWD)/scripts/setenv.sh && cd client && GO111MODULE=on $(INSTALL_PATH)/golangci-lint run --timeout=30m --config $(PWD)/client/.golangci.yml verifiers: build-cpp getdeps cppcheck fmt static-check diff --git a/internal/core/CMakeLists.txt b/internal/core/CMakeLists.txt index 407220304a..170fce961a 100644 --- a/internal/core/CMakeLists.txt +++ b/internal/core/CMakeLists.txt @@ -305,6 +305,11 @@ install(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/src/common/ FILES_MATCHING PATTERN "*_c.h" ) +install(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/src/futures/ + DESTINATION include/futures + FILES_MATCHING PATTERN "*.h" +) + install(DIRECTORY ${CMAKE_BINARY_DIR}/lib/ DESTINATION ${CMAKE_INSTALL_FULL_LIBDIR} ) diff --git a/internal/core/src/CMakeLists.txt b/internal/core/src/CMakeLists.txt index c6da67afb1..cb6bd68be8 100644 --- a/internal/core/src/CMakeLists.txt +++ b/internal/core/src/CMakeLists.txt @@ -35,3 +35,4 @@ add_subdirectory( indexbuilder ) add_subdirectory( clustering ) add_subdirectory( exec ) add_subdirectory( bitset ) +add_subdirectory( futures ) diff --git a/internal/core/src/common/EasyAssert.h b/internal/core/src/common/EasyAssert.h index c23fd0f1c2..164333c5cf 100644 --- a/internal/core/src/common/EasyAssert.h +++ b/internal/core/src/common/EasyAssert.h @@ -63,6 +63,9 @@ enum ErrorCode { ClusterSkip = 2033, KnowhereError = 2100, + // timeout or cancel related. + FollyOtherException = 2200, + FollyCancel = 2201 }; namespace impl { void @@ -87,7 +90,7 @@ class SegcoreError : public std::runtime_error { } ErrorCode - get_error_code() { + get_error_code() const { return error_code_; } @@ -111,9 +114,9 @@ FailureCStatus(int code, const std::string& msg) { } inline CStatus -FailureCStatus(std::exception* ex) { - if (dynamic_cast(ex) != nullptr) { - auto segcore_error = dynamic_cast(ex); +FailureCStatus(const std::exception* ex) { + if (dynamic_cast(ex) != nullptr) { + auto segcore_error = dynamic_cast(ex); return CStatus{static_cast(segcore_error->get_error_code()), strdup(ex->what())}; } diff --git a/internal/core/src/futures/CMakeLists.txt b/internal/core/src/futures/CMakeLists.txt new file mode 100644 index 0000000000..59d4bdd9f2 --- /dev/null +++ b/internal/core/src/futures/CMakeLists.txt @@ -0,0 +1,24 @@ +# Copyright (C) 2019-2020 Zilliz. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing permissions and limitations under the License + +milvus_add_pkg_config("milvus_futures") + +set(FUTURES_SRC + Executor.cpp + future_c.cpp + future_test_case_c.cpp + ) + +add_library(milvus_futures SHARED ${FUTURES_SRC}) + +target_link_libraries(milvus_futures milvus_common) + +install(TARGETS milvus_futures DESTINATION "${CMAKE_INSTALL_LIBDIR}") diff --git a/internal/core/src/futures/Executor.cpp b/internal/core/src/futures/Executor.cpp new file mode 100644 index 0000000000..e202aad2dc --- /dev/null +++ b/internal/core/src/futures/Executor.cpp @@ -0,0 +1,45 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#include +#include "Executor.h" +#include "common/Common.h" + +namespace milvus::futures { + +const int kNumPriority = 3; +const int kMaxQueueSizeFactor = 16; + +folly::Executor::KeepAlive<> +getGlobalCPUExecutor() { + static ExecutorSingleton singleton; + return singleton.GetCPUExecutor(); +} + +folly::Executor::KeepAlive<> +ExecutorSingleton::GetCPUExecutor() { + // TODO: fix the executor with a non-block way. + std::call_once(cpu_executor_once_, [this]() { + int num_threads = milvus::CPU_NUM; + auto num_priority = kNumPriority; + auto max_queue_size = num_threads * kMaxQueueSizeFactor; + cpu_executor_ = std::make_unique( + num_threads, + std::make_unique>(num_priority, + max_queue_size), + std::make_shared("MILVUS_CPU_")); + }); + return folly::getKeepAliveToken(cpu_executor_.get()); +} + +}; // namespace milvus::futures \ No newline at end of file diff --git a/internal/core/src/futures/Executor.h b/internal/core/src/futures/Executor.h new file mode 100644 index 0000000000..c1579009df --- /dev/null +++ b/internal/core/src/futures/Executor.h @@ -0,0 +1,40 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#pragma once + +#include +#include +#include +#include + +namespace milvus::futures { + +folly::Executor::KeepAlive<> +getGlobalCPUExecutor(); + +class ExecutorSingleton { + public: + ExecutorSingleton() = default; + + ExecutorSingleton(const ExecutorSingleton&) = delete; + + ExecutorSingleton(ExecutorSingleton&&) noexcept = delete; + + folly::Executor::KeepAlive<> + GetCPUExecutor(); + + private: + std::unique_ptr cpu_executor_; + std::once_flag cpu_executor_once_; +}; + +}; // namespace milvus::futures diff --git a/internal/core/src/futures/Future.h b/internal/core/src/futures/Future.h new file mode 100644 index 0000000000..5a81af5eca --- /dev/null +++ b/internal/core/src/futures/Future.h @@ -0,0 +1,226 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#pragma once + +#include +#include +#include +#include +#include + +#include "future_c_types.h" +#include "LeakyResult.h" +#include "Ready.h" + +namespace milvus::futures { + +/// @brief a virtual class that represents a future can be polymorphic called by CGO code. +/// implemented by Future template. +class IFuture { + public: + /// @brief cancel the future with the given exception. + /// After cancelled is called, the underlying async function will receive cancellation. + /// It just a signal notification, the cancellation is handled by user-defined. + /// If the underlying async function ignore the cancellation signal, the Future is still blocked. + virtual void + cancel() = 0; + + /// @brief check if the future is ready or canceled. + /// @return true if the future is ready or canceled, otherwise false. + virtual bool + isReady() = 0; + + /// @brief register a callback that will be called when the future is ready or future has been ready. + virtual void + registerReadyCallback(CUnlockGoMutexFn unlockFn, CLockedGoMutex* mutex) = 0; + + /// @brief get the result of the future. it must be called if future is ready. + /// the first element of the pair is the result, + /// the second element of the pair is the exception. + /// !!! It can only be called once, + /// and the result need to be manually released by caller after these call. + virtual std::pair + leakyGet() = 0; + + /// @brief leaked future object created by method `Future::createLeakedFuture` can be droped by these method. + static void + releaseLeakedFuture(IFuture* future) { + delete future; + } +}; + +/// @brief a class that represents a cancellation token +class CancellationToken : public folly::CancellationToken { + public: + CancellationToken(folly::CancellationToken&& token) noexcept + : folly::CancellationToken(std::move(token)) { + } + + /// @brief check if the token is cancelled, throw a FutureCancellation exception if it is. + void + throwIfCancelled() const { + if (isCancellationRequested()) { + throw folly::FutureCancellation(); + } + } +}; + +/// @brief Future is a class that bound a future with a result for +/// using by cgo. +/// @tparam R is the return type of the producer function. +template +class Future : public IFuture { + public: + /// @brief do a async operation which will produce a result. + /// fn returns pointer to R (leaked, default memory allocator) if it is success, otherwise it will throw a exception. + /// returned result or exception will be handled by consumer side. + template >> + static std::unique_ptr> + async(folly::Executor::KeepAlive<> executor, + int priority, + Fn&& fn) noexcept { + auto future = std::make_unique>(); + // setup the interrupt handler for the promise. + future->setInterruptHandler(); + // start async function. + future->asyncProduce(executor, priority, std::forward(fn)); + // register consume callback function. + future->registerConsumeCallback(executor, priority); + return future; + } + + /// use `async`. + Future() + : ready_(std::make_shared>>()), + promise_(std::make_shared>()), + cancellation_source_() { + } + + Future(const Future&) = delete; + + Future(Future&&) noexcept = default; + + Future& + operator=(const Future&) = delete; + + Future& + operator=(Future&&) noexcept = default; + + /// @brief see `IFuture::cancel` + void + cancel() noexcept override { + promise_->getSemiFuture().cancel(); + } + + /// @brief see `IFuture::registerReadyCallback` + void + registerReadyCallback(CUnlockGoMutexFn unlockFn, + CLockedGoMutex* mutex) noexcept override { + ready_->callOrRegisterCallback( + [unlockFn = unlockFn, mutex = mutex]() { unlockFn(mutex); }); + } + + /// @brief see `IFuture::isReady` + bool + isReady() noexcept override { + return ready_->isReady(); + } + + /// @brief see `IFuture::leakyGet` + std::pair + leakyGet() noexcept override { + auto result = std::move(*ready_).getValue(); + return result.leakyGet(); + } + + private: + /// @brief set the interrupt handler for the promise used in async produce arm. + void + setInterruptHandler() { + promise_->setInterruptHandler([cancellation_source = + cancellation_source_, + ready = ready_]( + const folly::exception_wrapper& ew) { + // 1. set the result to perform a fast fail. + // 2. set the cancellation to the source to notify cancellation to the consumers. + ew.handle( + [&](const folly::FutureCancellation& e) { + cancellation_source.requestCancellation(); + }, + [&](const folly::FutureTimeout& e) { + cancellation_source.requestCancellation(); + }); + }); + } + + /// @brief do the R produce operation in async way. + template >> + void + asyncProduce(folly::Executor::KeepAlive<> executor, int priority, Fn&& fn) { + // start produce process async. + auto cancellation_token = + CancellationToken(cancellation_source_.getToken()); + auto runner = [fn = std::forward(fn), + cancellation_token = std::move(cancellation_token)]() { + return fn(cancellation_token); + }; + + // the runner is executed may be executed in different thread. + // so manage the promise with shared_ptr. + auto thenRunner = [promise = promise_, runner = std::move(runner)]( + auto&&) { promise->setWith(std::move(runner)); }; + folly::makeSemiFuture().via(executor, priority).then(thenRunner); + } + + /// @brief async consume the result of the future. + void + registerConsumeCallback(folly::Executor::KeepAlive<> executor, + int priority) noexcept { + // set up the result consume arm and exception consume arm. + promise_->getSemiFuture() + .via(executor, priority) + .thenValue( + [ready = ready_](R* r) { ready->setValue(LeakyResult(r)); }) + .thenError(folly::tag_t{}, + [ready = ready_](const folly::FutureCancellation& e) { + ready->setValue( + LeakyResult(milvus::FollyCancel, e.what())); + }) + .thenError(folly::tag_t{}, + [ready = ready_](const folly::FutureException& e) { + ready->setValue(LeakyResult( + milvus::FollyOtherException, e.what())); + }) + .thenError(folly::tag_t{}, + [ready = ready_](const milvus::SegcoreError& e) { + ready->setValue(LeakyResult( + static_cast(e.get_error_code()), e.what())); + }) + .thenError(folly::tag_t{}, + [ready = ready_](const std::exception& e) { + ready->setValue(LeakyResult( + milvus::UnexpectedError, e.what())); + }); + } + + private: + std::shared_ptr>> ready_; + std::shared_ptr> promise_; + folly::CancellationSource cancellation_source_; +}; + +}; // namespace milvus::futures \ No newline at end of file diff --git a/internal/core/src/futures/LeakyResult.h b/internal/core/src/futures/LeakyResult.h new file mode 100644 index 0000000000..7fbbb990b4 --- /dev/null +++ b/internal/core/src/futures/LeakyResult.h @@ -0,0 +1,112 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace milvus::futures { + +/// @brief LeakyResult is a class that holds the result that can be leaked. +/// @tparam R is a type to real result that can be leak after get operation. +template +class LeakyResult { + public: + /// @brief default construct a empty Result, which is just used for easy contruction. + LeakyResult() { + } + + /// @brief create a LeakyResult with error code and error message which means failure. + /// @param error_code see CStatus difinition. + /// @param error_msg see CStatus difinition. + LeakyResult(int error_code, const std::string& error_msg) { + auto msg = strdup(error_msg.c_str()); + status_ = std::make_optional(CStatus{error_code, msg}); + } + + /// @brief create a LeakyResult with a result which means success. + /// @param r + LeakyResult(R* r) : result_(std::make_optional(r)) { + } + + LeakyResult(const LeakyResult&) = delete; + + LeakyResult(LeakyResult&& other) noexcept { + if (other.result_.has_value()) { + result_ = std::move(other.result_); + other.result_.reset(); + } + if (other.status_.has_value()) { + status_ = std::move(other.status_); + other.status_.reset(); + } + } + + LeakyResult& + operator=(const LeakyResult&) = delete; + + LeakyResult& + operator=(LeakyResult&& other) noexcept { + if (this != &other) { + if (other.result_.has_value()) { + result_ = std::move(other.result_); + other.result_.reset(); + } + if (other.status_.has_value()) { + status_ = std::move(other.status_); + other.status_.reset(); + } + } + return *this; + } + + /// @brief get the Result or CStatus from LeakyResult, performed a manual memory management. + /// caller has responsibitiy to release if void* is not nullptr or cstatus is not nullptr. + /// @return a pair of void* and CStatus is returned, void* => R*. + /// condition (void* == nullptr and CStatus is failure) or (void* != nullptr and CStatus is success) is met. + /// release operation of CStatus see common/type_c.h. + std::pair + leakyGet() { + if (result_.has_value()) { + R* result_ptr = result_.value(); + result_.reset(); + return std::make_pair(result_ptr, + CStatus{0, nullptr}); + } + if (status_.has_value()) { + CStatus status = status_.value(); + status_.reset(); + return std::make_pair( + nullptr, CStatus{status.error_code, status.error_msg}); + } + throw std::logic_error("get on a not ready LeakyResult"); + } + + ~LeakyResult() { + if (result_.has_value()) { + delete result_.value(); + } + if (status_.has_value()) { + free((char*)(status_.value().error_msg)); + } + } + + private: + std::optional status_; + std::optional result_; +}; + +}; // namespace milvus::futures \ No newline at end of file diff --git a/internal/core/src/futures/Ready.h b/internal/core/src/futures/Ready.h new file mode 100644 index 0000000000..566b2d7857 --- /dev/null +++ b/internal/core/src/futures/Ready.h @@ -0,0 +1,97 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#pragma once + +#include +#include +#include +#include + +namespace milvus::futures { + +/// @brief Ready is a class that holds a value of type T. +/// value of Ready can be only set into ready by once, +/// and allows to register callbacks to be called when the value is ready. +template +class Ready { + public: + Ready() : is_ready_(false){}; + + Ready(const Ready&) = delete; + + Ready(Ready&&) noexcept = default; + + Ready& + operator=(const Ready&) = delete; + + Ready& + operator=(Ready&&) noexcept = default; + + /// @brief set the value into Ready. + void + setValue(T&& value) { + mutex_.lock(); + value_ = std::move(value); + is_ready_ = true; + std::vector> callbacks(std::move(callbacks_)); + mutex_.unlock(); + + // perform all callbacks which is registered before value is ready. + for (auto& callback : callbacks) { + callback(); + } + } + + /// @brief get the value from Ready. + /// @return ready value. + T + getValue() && { + std::lock_guard lock(mutex_); + if (!is_ready_) { + throw std::runtime_error("Value is not ready"); + } + auto v(std::move(value_.value())); + value_.reset(); + return std::move(v); + } + + /// @brief check if the value is ready. + bool + isReady() const { + const std::lock_guard lock(mutex_); + return is_ready_; + } + + /// @brief register a callback into Ready if value is not ready, otherwise call it directly. + template >> + void + callOrRegisterCallback(Fn&& fn) { + mutex_.lock(); + // call if value is ready, + // otherwise register as a callback to be called when value is ready. + if (is_ready_) { + mutex_.unlock(); + fn(); + return; + } + callbacks_.push_back(std::forward(fn)); + mutex_.unlock(); + } + + private: + std::optional value_; + mutable std::mutex mutex_; + std::vector> callbacks_; + bool is_ready_; +}; + +}; // namespace milvus::futures \ No newline at end of file diff --git a/internal/core/src/futures/future_c.cpp b/internal/core/src/futures/future_c.cpp new file mode 100644 index 0000000000..8d5159cff4 --- /dev/null +++ b/internal/core/src/futures/future_c.cpp @@ -0,0 +1,51 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#include + +#include "future_c.h" +#include "folly/init/Init.h" +#include "Future.h" + +extern "C" void +future_cancel(CFuture* future) { + static_cast(static_cast(future)) + ->cancel(); +} + +extern "C" bool +future_is_ready(CFuture* future) { + return static_cast(static_cast(future)) + ->isReady(); +} + +extern "C" void +future_register_ready_callback(CFuture* future, + CUnlockGoMutexFn unlockFn, + CLockedGoMutex* mutex) { + static_cast(static_cast(future)) + ->registerReadyCallback(unlockFn, mutex); +} + +extern "C" CStatus +future_leak_and_get(CFuture* future, void** result) { + auto [r, s] = + static_cast(static_cast(future)) + ->leakyGet(); + *result = r; + return s; +} + +extern "C" void +future_destroy(CFuture* future) { + milvus::futures::IFuture::releaseLeakedFuture( + static_cast(static_cast(future))); +} \ No newline at end of file diff --git a/internal/core/src/futures/future_c.h b/internal/core/src/futures/future_c.h new file mode 100644 index 0000000000..392528c98b --- /dev/null +++ b/internal/core/src/futures/future_c.h @@ -0,0 +1,44 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#pragma once + +#include "future_c_types.h" +#include "common/type_c.h" + +#ifdef __cplusplus +extern "C" { +#endif + +void +future_cancel(CFuture* future); + +bool +future_is_ready(CFuture* future); + +void +future_register_ready_callback(CFuture* future, + CUnlockGoMutexFn unlockFn, + CLockedGoMutex* mutex); + +CStatus +future_leak_and_get(CFuture* future, void** result); + +// TODO: only for testing, add test macro for this function. +CFuture* +future_create_test_case(int interval, int loop_cnt, int caseNo); + +void +future_destroy(CFuture* future); + +#ifdef __cplusplus +} +#endif diff --git a/internal/core/src/futures/future_c_types.h b/internal/core/src/futures/future_c_types.h new file mode 100644 index 0000000000..036d71c008 --- /dev/null +++ b/internal/core/src/futures/future_c_types.h @@ -0,0 +1,26 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#pragma once + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct CFuture CFuture; + +typedef struct CLockedGoMutex CLockedGoMutex; + +typedef void (*CUnlockGoMutexFn)(CLockedGoMutex* mutex); + +#ifdef __cplusplus +} +#endif diff --git a/internal/core/src/futures/future_test_case_c.cpp b/internal/core/src/futures/future_test_case_c.cpp new file mode 100644 index 0000000000..9c9b3359cc --- /dev/null +++ b/internal/core/src/futures/future_test_case_c.cpp @@ -0,0 +1,42 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#include "Future.h" +#include "Executor.h" + +extern "C" CFuture* +future_create_test_case(int interval, int loop_cnt, int case_no) { + auto future = milvus::futures::Future::async( + milvus::futures::getGlobalCPUExecutor(), + 0, + [interval = interval, loop_cnt = loop_cnt, case_no = case_no]( + milvus::futures::CancellationToken token) { + for (int i = 0; i < loop_cnt; i++) { + if (case_no != 0) { + token.throwIfCancelled(); + } + std::this_thread::sleep_for( + std::chrono::milliseconds(interval)); + } + switch (case_no) { + case 1: + throw std::runtime_error("case 1"); + case 2: + throw folly::FutureNoExecutor(); + case 3: + throw milvus::SegcoreError(milvus::NotImplemented, + "case 3"); + } + return new int(case_no); + }); + return static_cast(static_cast( + static_cast(future.release()))); +} diff --git a/internal/core/src/futures/milvus_futures.pc.in b/internal/core/src/futures/milvus_futures.pc.in new file mode 100644 index 0000000000..dc75e325e8 --- /dev/null +++ b/internal/core/src/futures/milvus_futures.pc.in @@ -0,0 +1,9 @@ +libdir=@CMAKE_INSTALL_FULL_LIBDIR@ +includedir=@CMAKE_INSTALL_FULL_INCLUDEDIR@ + +Name: Milvus Futures +Description: Futures modules for Milvus +Version: @MILVUS_VERSION@ + +Libs: -L${libdir} -lmilvus_futures +Cflags: -I${includedir} diff --git a/internal/core/unittest/CMakeLists.txt b/internal/core/unittest/CMakeLists.txt index 39e242cec4..1d90af4174 100644 --- a/internal/core/unittest/CMakeLists.txt +++ b/internal/core/unittest/CMakeLists.txt @@ -67,6 +67,7 @@ set(MILVUS_TEST_FILES test_group_by.cpp test_regex_query_util.cpp test_regex_query.cpp + test_futures.cpp ) if ( INDEX_ENGINE STREQUAL "cardinal" ) diff --git a/internal/core/unittest/test_futures.cpp b/internal/core/unittest/test_futures.cpp new file mode 100644 index 0000000000..e5f7baa23a --- /dev/null +++ b/internal/core/unittest/test_futures.cpp @@ -0,0 +1,210 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#include +#include "futures/Future.h" +#include +#include +#include +#include + +using namespace milvus::futures; + +TEST(Futures, LeakyResult) { + { + LeakyResult leaky_result; + ASSERT_ANY_THROW(leaky_result.leakyGet()); + } + + { + auto leaky_result = LeakyResult(1, "error"); + auto [r, s] = leaky_result.leakyGet(); + ASSERT_EQ(r, nullptr); + ASSERT_EQ(s.error_code, 1); + ASSERT_STREQ(s.error_msg, "error"); + free((char*)(s.error_msg)); + } + { + auto leaky_result = LeakyResult(new int(1)); + auto [r, s] = leaky_result.leakyGet(); + ASSERT_NE(r, nullptr); + ASSERT_EQ(*(int*)(r), 1); + ASSERT_EQ(s.error_code, 0); + ASSERT_EQ(s.error_msg, nullptr); + delete (int*)(r); + } + { + LeakyResult leaky_result(1, "error"); + LeakyResult leaky_result_moved(std::move(leaky_result)); + auto [r, s] = leaky_result_moved.leakyGet(); + ASSERT_EQ(r, nullptr); + ASSERT_EQ(s.error_code, 1); + ASSERT_STREQ(s.error_msg, "error"); + free((char*)(s.error_msg)); + } + { + LeakyResult leaky_result(1, "error"); + LeakyResult leaky_result_moved; + leaky_result_moved = std::move(leaky_result); + auto [r, s] = leaky_result_moved.leakyGet(); + ASSERT_EQ(r, nullptr); + ASSERT_EQ(s.error_code, 1); + ASSERT_STREQ(s.error_msg, "error"); + free((char*)(s.error_msg)); + } +} + +TEST(Futures, Ready) { + Ready ready; + int a = 0; + ready.callOrRegisterCallback([&a]() { a++; }); + ASSERT_EQ(a, 0); + ASSERT_FALSE(ready.isReady()); + ready.setValue(1); + ASSERT_EQ(a, 1); + ASSERT_TRUE(ready.isReady()); + ready.callOrRegisterCallback([&a]() { a++; }); + ASSERT_EQ(a, 2); + + ASSERT_EQ(std::move(ready).getValue(), 1); +} + +TEST(Futures, Future) { + folly::CPUThreadPoolExecutor executor(2); + + // success path. + { + // try a async function + auto future = milvus::futures::Future::async( + &executor, 0, [](milvus::futures::CancellationToken token) { + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + return new int(1); + }); + ASSERT_FALSE(future->isReady()); + + std::mutex mu; + mu.lock(); + future->registerReadyCallback( + [](CLockedGoMutex* mutex) { ((std::mutex*)(mutex))->unlock(); }, + (CLockedGoMutex*)(&mu)); + mu.lock(); + ASSERT_TRUE(future->isReady()); + auto [r, s] = future->leakyGet(); + + ASSERT_NE(r, nullptr); + ASSERT_EQ(*(int*)(r), 1); + ASSERT_EQ(s.error_code, 0); + ASSERT_EQ(s.error_msg, nullptr); + delete (int*)(r); + } + + // error path. + { + // try a async function + auto future = milvus::futures::Future::async( + &executor, 0, [](milvus::futures::CancellationToken token) { + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + throw milvus::SegcoreError(milvus::NotImplemented, + "unimplemented"); + return new int(1); + }); + ASSERT_FALSE(future->isReady()); + + std::mutex mu; + mu.lock(); + future->registerReadyCallback( + [](CLockedGoMutex* mutex) { ((std::mutex*)(mutex))->unlock(); }, + (CLockedGoMutex*)(&mu)); + mu.lock(); + ASSERT_TRUE(future->isReady()); + auto [r, s] = future->leakyGet(); + + ASSERT_EQ(r, nullptr); + ASSERT_EQ(s.error_code, milvus::NotImplemented); + ASSERT_STREQ(s.error_msg, "unimplemented"); + free((char*)(s.error_msg)); + } + + { + // try a async function + auto future = milvus::futures::Future::async( + &executor, 0, [](milvus::futures::CancellationToken token) { + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + throw std::runtime_error("unimplemented"); + return new int(1); + }); + ASSERT_FALSE(future->isReady()); + + std::mutex mu; + mu.lock(); + future->registerReadyCallback( + [](CLockedGoMutex* mutex) { ((std::mutex*)(mutex))->unlock(); }, + (CLockedGoMutex*)(&mu)); + mu.lock(); + ASSERT_TRUE(future->isReady()); + auto [r, s] = future->leakyGet(); + + ASSERT_EQ(r, nullptr); + ASSERT_EQ(s.error_code, milvus::UnexpectedError); + free((char*)(s.error_msg)); + } + + { + // try a async function + auto future = milvus::futures::Future::async( + &executor, 0, [](milvus::futures::CancellationToken token) { + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + throw folly::FutureNotReady(); + return new int(1); + }); + ASSERT_FALSE(future->isReady()); + + std::mutex mu; + mu.lock(); + future->registerReadyCallback( + [](CLockedGoMutex* mutex) { ((std::mutex*)(mutex))->unlock(); }, + (CLockedGoMutex*)(&mu)); + mu.lock(); + ASSERT_TRUE(future->isReady()); + auto [r, s] = future->leakyGet(); + + ASSERT_EQ(r, nullptr); + ASSERT_EQ(s.error_code, milvus::FollyOtherException); + free((char*)(s.error_msg)); + } + + // cancellation path. + { + // try a async function + auto future = milvus::futures::Future::async( + &executor, 0, [](milvus::futures::CancellationToken token) { + for (int i = 0; i < 10; i++) { + token.throwIfCancelled(); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + return new int(1); + }); + ASSERT_FALSE(future->isReady()); + future->cancel(); + + std::mutex mu; + mu.lock(); + future->registerReadyCallback( + [](CLockedGoMutex* mutex) { ((std::mutex*)(mutex))->unlock(); }, + (CLockedGoMutex*)(&mu)); + mu.lock(); + ASSERT_TRUE(future->isReady()); + auto [r, s] = future->leakyGet(); + + ASSERT_EQ(r, nullptr); + ASSERT_EQ(s.error_code, milvus::FollyCancel); + } +} \ No newline at end of file diff --git a/internal/util/cgo/errors.go b/internal/util/cgo/errors.go new file mode 100644 index 0000000000..c0bb6e482f --- /dev/null +++ b/internal/util/cgo/errors.go @@ -0,0 +1,27 @@ +package cgo + +/* +#cgo pkg-config: milvus_common + +#include "common/type_c.h" +#include +*/ +import "C" + +import ( + "unsafe" + + "github.com/milvus-io/milvus/pkg/util/merr" +) + +func ConsumeCStatusIntoError(status *C.CStatus) error { + if status.error_code == 0 { + return nil + } + errorCode := status.error_code + errorMsg := C.GoString(status.error_msg) + getCGOCaller().call("free", func() { + C.free(unsafe.Pointer(status.error_msg)) + }) + return merr.SegcoreError(int32(errorCode), errorMsg) +} diff --git a/internal/util/cgo/futures.go b/internal/util/cgo/futures.go new file mode 100644 index 0000000000..aef27fe669 --- /dev/null +++ b/internal/util/cgo/futures.go @@ -0,0 +1,190 @@ +package cgo + +/* +#cgo pkg-config: milvus_futures + +#include "futures/future_c.h" +#include + +extern void unlockMutex(void*); + +static inline void unlockMutexOnC(CLockedGoMutex* m) { + unlockMutex((void*)(m)); +} + +static inline void future_go_register_ready_callback(CFuture* f, CLockedGoMutex* m) { + future_register_ready_callback(f, unlockMutexOnC, m); +} +*/ +import "C" + +import ( + "context" + "sync" + "unsafe" + + "github.com/cockroachdb/errors" + + "github.com/milvus-io/milvus/pkg/util/merr" +) + +var ErrConsumed = errors.New("future is already consumed") + +// Would put this in futures.go but for the documented issue with +// exports and functions in preamble +// (https://code.google.com/p/go-wiki/wiki/cgo#Global_functions) +// +//export unlockMutex +func unlockMutex(p unsafe.Pointer) { + m := (*sync.Mutex)(p) + m.Unlock() +} + +type basicFuture interface { + // Context return the context of the future. + Context() context.Context + + // BlockUntilReady block until the future is ready or canceled. + // caller can call this method multiple times in different concurrent unit. + BlockUntilReady() + + // cancel the future with error. + cancel(error) +} + +type Future interface { + basicFuture + + // BlockAndLeakyGet block until the future is ready or canceled, and return the leaky result. + // Caller should only call once for BlockAndLeakyGet, otherwise the ErrConsumed will returned. + // Caller will get the merr.ErrSegcoreCancel or merr.ErrSegcoreTimeout respectively if the future is canceled or timeout. + // Caller will get other error if the underlying cgo function throws, otherwise caller will get result. + // Caller should free the result after used (defined by caller), otherwise the memory of result is leaked. + BlockAndLeakyGet() (unsafe.Pointer, error) + + // Release the resource of the future. + // !!! Release is not concurrent safe with other methods. + // It should be called only once after all method of future is returned. + Release() +} + +type ( + CFuturePtr unsafe.Pointer + CGOAsyncFunction = func() CFuturePtr +) + +// Async is a helper function to call a C async function that returns a future. +func Async(ctx context.Context, f CGOAsyncFunction, opts ...Opt) Future { + options := getDefaultOpt() + // apply options. + for _, opt := range opts { + opt(options) + } + + // create a future for caller to use. + var cFuturePtr *C.CFuture + getCGOCaller().call(options.name, func() { + cFuturePtr = (*C.CFuture)(f()) + }) + + ctx, cancel := context.WithCancel(ctx) + future := &futureImpl{ + closure: f, + ctx: ctx, + ctxCancel: cancel, + releaserOnce: sync.Once{}, + future: cFuturePtr, + opts: options, + state: newFutureState(), + } + + // register the future to do timeout notification. + futureManager.Register(future) + return future +} + +type futureImpl struct { + ctx context.Context + ctxCancel context.CancelFunc + future *C.CFuture + closure CGOAsyncFunction + opts *options + state futureState + releaserOnce sync.Once +} + +func (f *futureImpl) Context() context.Context { + return f.ctx +} + +func (f *futureImpl) BlockUntilReady() { + f.blockUntilReady() +} + +func (f *futureImpl) BlockAndLeakyGet() (unsafe.Pointer, error) { + f.blockUntilReady() + + if !f.state.intoConsumed() { + return nil, ErrConsumed + } + + var ptr unsafe.Pointer + var status C.CStatus + getCGOCaller().call("future_leak_and_get", func() { + status = C.future_leak_and_get(f.future, &ptr) + }) + err := ConsumeCStatusIntoError(&status) + + if errors.Is(err, merr.ErrSegcoreFollyCancel) { + // mark the error with context error. + return nil, errors.Mark(err, f.ctx.Err()) + } + return ptr, err +} + +func (f *futureImpl) Release() { + // block until ready to release the future. + f.blockUntilReady() + // release the future. + getCGOCaller().call("future_destroy", func() { + C.future_destroy(f.future) + }) +} + +func (f *futureImpl) cancel(err error) { + if !f.state.checkUnready() { + // only unready future can be canceled. + // a ready future' cancel make no sense. + return + } + + if errors.IsAny(err, context.DeadlineExceeded, context.Canceled) { + getCGOCaller().call("future_cancel", func() { + C.future_cancel(f.future) + }) + return + } + panic("unreachable: invalid cancel error type") +} + +func (f *futureImpl) blockUntilReady() { + if !f.state.checkUnready() { + // only unready future should be block until ready. + return + } + + mu := &sync.Mutex{} + mu.Lock() + getCGOCaller().call("future_go_register_ready_callback", func() { + C.future_go_register_ready_callback(f.future, (*C.CLockedGoMutex)(unsafe.Pointer(mu))) + }) + mu.Lock() + + // mark the future as ready at go side to avoid more cgo calls. + f.state.intoReady() + // notify the future manager that the future is ready. + f.ctxCancel() + if f.opts.releaser != nil { + f.releaserOnce.Do(f.opts.releaser) + } +} diff --git a/internal/util/cgo/futures_test.go b/internal/util/cgo/futures_test.go new file mode 100644 index 0000000000..120ba922a5 --- /dev/null +++ b/internal/util/cgo/futures_test.go @@ -0,0 +1,278 @@ +package cgo + +import ( + "context" + "fmt" + "os" + "runtime" + "sync" + "testing" + "time" + + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" +) + +func TestMain(m *testing.M) { + paramtable.Init() + InitCGO() + exitCode := m.Run() + if exitCode > 0 { + os.Exit(exitCode) + } +} + +func TestFutureWithSuccessCase(t *testing.T) { + // Test success case. + future := createFutureWithTestCase(context.Background(), testCase{ + interval: 100 * time.Millisecond, + loopCnt: 10, + caseNo: 100, + }) + defer future.Release() + + start := time.Now() + future.BlockUntilReady() // test block until ready too. + result, err := future.BlockAndLeakyGet() + assert.NoError(t, err) + assert.Equal(t, 100, getCInt(result)) + // The inner function sleep 1 seconds, so the future cost must be greater than 0.5 seconds. + assert.Greater(t, time.Since(start).Seconds(), 0.5) + // free the result after used. + freeCInt(result) + runtime.GC() + + _, err = future.BlockAndLeakyGet() + assert.ErrorIs(t, err, ErrConsumed) + + assert.Eventually(t, func() bool { + return unreleasedCnt.Load() == 0 + }, time.Second, time.Millisecond*100) +} + +func TestFutureWithCaseNoInterrupt(t *testing.T) { + // Test success case. + future := createFutureWithTestCase(context.Background(), testCase{ + interval: 100 * time.Millisecond, + loopCnt: 10, + caseNo: caseNoNoInterrupt, + }) + defer future.Release() + + start := time.Now() + future.BlockUntilReady() // test block until ready too. + result, err := future.BlockAndLeakyGet() + assert.NoError(t, err) + assert.Equal(t, 0, getCInt(result)) + // The inner function sleep 1 seconds, so the future cost must be greater than 0.5 seconds. + assert.Greater(t, time.Since(start).Seconds(), 0.5) + // free the result after used. + freeCInt(result) + + // Test cancellation on no interrupt handling case. + start = time.Now() + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + future = createFutureWithTestCase(ctx, testCase{ + interval: 100 * time.Millisecond, + loopCnt: 20, + caseNo: caseNoNoInterrupt, + }) + defer future.Release() + + result, err = future.BlockAndLeakyGet() + // the future is timeout by the context after 200ms, but the underlying task doesn't handle the cancel, the future will return after 2s. + assert.Greater(t, time.Since(start).Seconds(), 2.0) + assert.NoError(t, err) + assert.NotNil(t, result) + assert.Equal(t, 0, getCInt(result)) + freeCInt(result) +} + +// TestFutures test the future implementation. +func TestFutures(t *testing.T) { + // Test failed case, throw folly exception. + future := createFutureWithTestCase(context.Background(), testCase{ + interval: 100 * time.Millisecond, + loopCnt: 10, + caseNo: caseNoThrowStdException, + }) + defer future.Release() + + start := time.Now() + future.BlockUntilReady() // test block until ready too. + result, err := future.BlockAndLeakyGet() + assert.Error(t, err) + assert.ErrorIs(t, err, merr.ErrSegcoreUnsupported) + assert.Nil(t, result) + // The inner function sleep 1 seconds, so the future cost must be greater than 0.5 seconds. + assert.Greater(t, time.Since(start).Seconds(), 0.5) + + // Test failed case, throw std exception. + future = createFutureWithTestCase(context.Background(), testCase{ + interval: 100 * time.Millisecond, + loopCnt: 10, + caseNo: caseNoThrowFollyException, + }) + defer future.Release() + start = time.Now() + future.BlockUntilReady() // test block until ready too. + result, err = future.BlockAndLeakyGet() + assert.Error(t, err) + assert.ErrorIs(t, err, merr.ErrSegcoreFollyOtherException) + assert.Nil(t, result) + // The inner function sleep 1 seconds, so the future cost must be greater than 0.5 seconds. + assert.Greater(t, time.Since(start).Seconds(), 0.5) + // free the result after used. + + // Test failed case, throw std exception. + future = createFutureWithTestCase(context.Background(), testCase{ + interval: 100 * time.Millisecond, + loopCnt: 10, + caseNo: caseNoThrowSegcoreException, + }) + defer future.Release() + start = time.Now() + future.BlockUntilReady() // test block until ready too. + result, err = future.BlockAndLeakyGet() + assert.Error(t, err) + assert.ErrorIs(t, err, merr.ErrSegcorePretendFinished) + assert.Nil(t, result) + // The inner function sleep 1 seconds, so the future cost must be greater than 0.5 seconds. + assert.Greater(t, time.Since(start).Seconds(), 0.5) + // free the result after used. + + // Test cancellation. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + future = createFutureWithTestCase(ctx, testCase{ + interval: 100 * time.Millisecond, + loopCnt: 20, + caseNo: 100, + }) + defer future.Release() + // canceled before the future(2s) is ready. + go func() { + time.Sleep(200 * time.Millisecond) + cancel() + }() + start = time.Now() + result, err = future.BlockAndLeakyGet() + // the future is canceled by the context after 200ms, so the future should be done in 1s but not 2s. + assert.Less(t, time.Since(start).Seconds(), 1.0) + assert.Error(t, err) + assert.ErrorIs(t, err, merr.ErrSegcoreFollyCancel) + assert.True(t, errors.Is(err, context.Canceled)) + assert.Nil(t, result) + + // Test cancellation. + ctx, cancel = context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + future = createFutureWithTestCase(ctx, testCase{ + interval: 100 * time.Millisecond, + loopCnt: 20, + caseNo: 100, + }) + defer future.Release() + start = time.Now() + result, err = future.BlockAndLeakyGet() + // the future is timeout by the context after 200ms, so the future should be done in 1s but not 2s. + assert.Less(t, time.Since(start).Seconds(), 1.0) + assert.Error(t, err) + assert.ErrorIs(t, err, merr.ErrSegcoreFollyCancel) + assert.True(t, errors.Is(err, context.DeadlineExceeded)) + assert.Nil(t, result) + runtime.GC() + + assert.Eventually(t, func() bool { + return unreleasedCnt.Load() == 0 + }, time.Second, time.Millisecond*100) +} + +func TestConcurrent(t *testing.T) { + // Test is compatible with old implementation of fast fail future. + // So it's complicated and not easy to understand. + wg := sync.WaitGroup{} + for i := 0; i < 3; i++ { + wg.Add(4) + // success case + go func() { + defer wg.Done() + // Test success case. + future := createFutureWithTestCase(context.Background(), testCase{ + interval: 100 * time.Millisecond, + loopCnt: 10, + caseNo: 100, + }) + defer future.Release() + result, err := future.BlockAndLeakyGet() + assert.NoError(t, err) + assert.Equal(t, 100, getCInt(result)) + freeCInt(result) + }() + + // fail case + go func() { + defer wg.Done() + // Test success case. + future := createFutureWithTestCase(context.Background(), testCase{ + interval: 100 * time.Millisecond, + loopCnt: 10, + caseNo: caseNoThrowStdException, + }) + defer future.Release() + result, err := future.BlockAndLeakyGet() + assert.Error(t, err) + assert.Nil(t, result) + }() + + // timeout case + go func() { + defer wg.Done() + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + future := createFutureWithTestCase(ctx, testCase{ + interval: 100 * time.Millisecond, + loopCnt: 20, + caseNo: 100, + }) + defer future.Release() + result, err := future.BlockAndLeakyGet() + assert.Error(t, err) + assert.ErrorIs(t, err, merr.ErrSegcoreFollyCancel) + assert.True(t, errors.Is(err, context.DeadlineExceeded)) + assert.Nil(t, result) + }() + + // no interrupt with timeout case + go func() { + defer wg.Done() + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + future := createFutureWithTestCase(ctx, testCase{ + interval: 100 * time.Millisecond, + loopCnt: 10, + caseNo: caseNoNoInterrupt, + }) + defer future.Release() + result, err := future.BlockAndLeakyGet() + assert.NoError(t, err) + assert.Equal(t, 0, getCInt(result)) + freeCInt(result) + }() + } + wg.Wait() + assert.Eventually(t, func() bool { + stat := futureManager.Stat() + fmt.Printf("active count: %d\n", stat.ActiveCount) + return stat.ActiveCount == 0 + }, 5*time.Second, 100*time.Millisecond) + runtime.GC() + + assert.Eventually(t, func() bool { + return unreleasedCnt.Load() == 0 + }, time.Second, time.Millisecond*100) +} diff --git a/internal/util/cgo/futures_test_case.go b/internal/util/cgo/futures_test_case.go new file mode 100644 index 0000000000..1e3afb832c --- /dev/null +++ b/internal/util/cgo/futures_test_case.go @@ -0,0 +1,57 @@ +//go:build test +// +build test + +package cgo + +/* +#cgo pkg-config: milvus_futures + +#include "futures/future_c.h" +#include + +*/ +import "C" + +import ( + "context" + "time" + "unsafe" + + "go.uber.org/atomic" +) + +const ( + caseNoNoInterrupt int = 0 + caseNoThrowStdException int = 1 + caseNoThrowFollyException int = 2 + caseNoThrowSegcoreException int = 3 +) + +var unreleasedCnt = atomic.NewInt32(0) + +type testCase struct { + interval time.Duration + loopCnt int + caseNo int +} + +func createFutureWithTestCase(ctx context.Context, testCase testCase) Future { + f := func() CFuturePtr { + return (CFuturePtr)(C.future_create_test_case(C.int(testCase.interval.Milliseconds()), C.int(testCase.loopCnt), C.int(testCase.caseNo))) + } + future := Async(ctx, f, + WithName("createFutureWithTestCase"), + WithReleaser(func() { + unreleasedCnt.Dec() + })) + unreleasedCnt.Inc() + return future +} + +func getCInt(p unsafe.Pointer) int { + return int(*(*C.int)(p)) +} + +func freeCInt(p unsafe.Pointer) { + C.free(p) +} diff --git a/internal/util/cgo/manager_active.go b/internal/util/cgo/manager_active.go new file mode 100644 index 0000000000..c003a2c15c --- /dev/null +++ b/internal/util/cgo/manager_active.go @@ -0,0 +1,124 @@ +package cgo + +import ( + "math" + "reflect" + "sync" + + "go.uber.org/atomic" + + "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/hardware" + "github.com/milvus-io/milvus/pkg/util/paramtable" +) + +const ( + registerIndex = 0 + maxSelectCase = 65535 + defaultRegisterBuf = 1 +) + +var ( + futureManager *activeFutureManager + initOnce sync.Once +) + +// InitCGO initializes the cgo caller and future manager. +// Please call this function before using any cgo utilities. +func InitCGO() { + initOnce.Do(func() { + nodeID := paramtable.GetStringNodeID() + chSize := int64(math.Ceil(float64(hardware.GetCPUNum()) * paramtable.Get().QueryNodeCfg.CGOPoolSizeRatio.GetAsFloat())) + if chSize <= 0 { + chSize = 1 + } + caller = &cgoCaller{ + // TODO: temporary solution, need to find a better way to set the pool size. + ch: make(chan struct{}, chSize), + nodeID: nodeID, + } + futureManager = newActiveFutureManager(nodeID) + futureManager.Run() + }) +} + +type futureManagerStat struct { + ActiveCount int64 +} + +func newActiveFutureManager(nodeID string) *activeFutureManager { + manager := &activeFutureManager{ + activeCount: atomic.NewInt64(0), + activeFutures: make([]basicFuture, 0), + cases: make([]reflect.SelectCase, 1), + register: make(chan basicFuture, defaultRegisterBuf), + nodeID: nodeID, + } + manager.cases[0] = reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(manager.register), + } + return manager +} + +// activeFutureManager manages the active futures. +// it will transfer the cancel signal into cgo. +type activeFutureManager struct { + activeCount *atomic.Int64 + activeFutures []basicFuture + cases []reflect.SelectCase + register chan basicFuture + nodeID string +} + +// Run starts the active future manager. +func (m *activeFutureManager) Run() { + go func() { + for { + m.doSelect() + } + }() +} + +// Register registers a future when it's created into the manager. +func (m *activeFutureManager) Register(c basicFuture) { + m.register <- c +} + +// Stat returns the stat of the manager, only for testing now. +func (m *activeFutureManager) Stat() futureManagerStat { + return futureManagerStat{ + ActiveCount: m.activeCount.Load(), + } +} + +// doSelect selects the active futures and cancel the finished ones. +func (m *activeFutureManager) doSelect() { + index, newCancelableObject, _ := reflect.Select(m.getSelectableCases()) + if index == registerIndex { + newCancelable := newCancelableObject.Interface().(basicFuture) + m.cases = append(m.cases, reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(newCancelable.Context().Done()), + }) + m.activeFutures = append(m.activeFutures, newCancelable) + } else { + m.cases = append(m.cases[:index], m.cases[index+1:]...) + offset := index - 1 + // cancel the future and move it into gc manager. + m.activeFutures[offset].cancel(m.activeFutures[offset].Context().Err()) + m.activeFutures = append(m.activeFutures[:offset], m.activeFutures[offset+1:]...) + } + activeTotal := len(m.activeFutures) + m.activeCount.Store(int64(activeTotal)) + metrics.ActiveFutureTotal.WithLabelValues( + m.nodeID, + ).Set(float64(activeTotal)) +} + +func (m *activeFutureManager) getSelectableCases() []reflect.SelectCase { + if len(m.cases) <= maxSelectCase { + return m.cases + } + return m.cases[0:maxSelectCase] +} diff --git a/internal/util/cgo/options.go b/internal/util/cgo/options.go new file mode 100644 index 0000000000..96c25c4357 --- /dev/null +++ b/internal/util/cgo/options.go @@ -0,0 +1,32 @@ +package cgo + +func getDefaultOpt() *options { + return &options{ + name: "unknown", + releaser: nil, + } +} + +type options struct { + name string + releaser func() +} + +// Opt is the option type for future. +type Opt func(*options) + +// WithReleaser sets the releaser function. +// When a future is ready, the releaser function will be called once. +func WithReleaser(releaser func()) Opt { + return func(o *options) { + o.releaser = releaser + } +} + +// WithName sets the name of the future. +// Only used for metrics. +func WithName(name string) Opt { + return func(o *options) { + o.name = name + } +} diff --git a/internal/util/cgo/pool.go b/internal/util/cgo/pool.go new file mode 100644 index 0000000000..f75090b5f3 --- /dev/null +++ b/internal/util/cgo/pool.go @@ -0,0 +1,42 @@ +package cgo + +import ( + "runtime" + "time" + + "github.com/milvus-io/milvus/pkg/metrics" +) + +var caller *cgoCaller + +// getCGOCaller returns the cgoCaller instance. +func getCGOCaller() *cgoCaller { + return caller +} + +// cgoCaller is a limiter to restrict the number of concurrent cgo calls. +type cgoCaller struct { + ch chan struct{} + nodeID string +} + +// call calls the work function with a lock to restrict the number of concurrent cgo calls. +// it collect some metrics too. +func (c *cgoCaller) call(name string, work func()) { + start := time.Now() + c.ch <- struct{}{} + queueTime := time.Since(start) + metrics.CGOQueueDuration.WithLabelValues(c.nodeID).Observe(queueTime.Seconds()) + + runtime.LockOSThread() + defer func() { + runtime.UnlockOSThread() + <-c.ch + + metrics.RunningCgoCallTotal.WithLabelValues(c.nodeID).Dec() + total := time.Since(start) - queueTime + metrics.CGODuration.WithLabelValues(c.nodeID, name).Observe(total.Seconds()) + }() + metrics.RunningCgoCallTotal.WithLabelValues(c.nodeID).Inc() + work() +} diff --git a/internal/util/cgo/state.go b/internal/util/cgo/state.go new file mode 100644 index 0000000000..db262c4b60 --- /dev/null +++ b/internal/util/cgo/state.go @@ -0,0 +1,38 @@ +package cgo + +import "go.uber.org/atomic" + +const ( + stateUnready int32 = iota + stateReady + stateConsumed +) + +// newFutureState creates a new futureState. +func newFutureState() futureState { + return futureState{ + inner: atomic.NewInt32(stateUnready), + } +} + +// futureState is a state machine for future. +// unready --BlockUntilReady--> ready --BlockAndLeakyGet--> consumed +type futureState struct { + inner *atomic.Int32 +} + +// intoReady sets the state to ready. +func (s *futureState) intoReady() { + s.inner.CompareAndSwap(stateUnready, stateReady) +} + +// intoConsumed sets the state to consumed. +// if the state is not ready, it does nothing and returns false. +func (s *futureState) intoConsumed() bool { + return s.inner.CompareAndSwap(stateReady, stateConsumed) +} + +// checkUnready checks if the state is unready. +func (s *futureState) checkUnready() bool { + return s.inner.Load() == stateUnready +} diff --git a/pkg/metrics/cgo_metrics.go b/pkg/metrics/cgo_metrics.go new file mode 100644 index 0000000000..77815e4952 --- /dev/null +++ b/pkg/metrics/cgo_metrics.go @@ -0,0 +1,66 @@ +package metrics + +import ( + "sync" + + "github.com/prometheus/client_golang/prometheus" +) + +var ( + subsystemCGO = "cgo" + cgoLabelName = "name" + once sync.Once + + ActiveFutureTotal = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: subsystemCGO, + Name: "active_future_total", + Help: "Total number of active futures.", + }, []string{ + nodeIDLabelName, + }, + ) + + RunningCgoCallTotal = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: subsystemCGO, + Name: "running_cgo_call_total", + Help: "Total number of running cgo calls.", + }, []string{ + nodeIDLabelName, + }) + + CGODuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: subsystemCGO, + Name: "cgo_duration_seconds", + Help: "Histogram of cgo call duration in seconds.", + }, []string{ + nodeIDLabelName, + cgoLabelName, + }, + ) + + CGOQueueDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: subsystemCGO, + Name: "cgo_queue_duration_seconds", + Help: "Duration of cgo call in queue.", + }, []string{ + nodeIDLabelName, + }, + ) +) + +// RegisterCGOMetrics registers the cgo metrics. +func RegisterCGOMetrics(registry *prometheus.Registry) { + once.Do(func() { + prometheus.MustRegister(RunningCgoCallTotal) + prometheus.MustRegister(CGODuration) + prometheus.MustRegister(CGOQueueDuration) + }) +} diff --git a/pkg/util/merr/errors.go b/pkg/util/merr/errors.go index 93a676cea8..a848e1e1b3 100644 --- a/pkg/util/merr/errors.go +++ b/pkg/util/merr/errors.go @@ -157,9 +157,11 @@ var ( ErrInvalidStreamObj = newMilvusError("invalid stream object", 1903, false) // Segcore related - ErrSegcore = newMilvusError("segcore error", 2000, false) - ErrSegcoreUnsupported = newMilvusError("segcore unsupported error", 2001, false) - ErrSegcorePretendFinished = newMilvusError("segcore pretend finished", 2002, false) + ErrSegcore = newMilvusError("segcore error", 2000, false) + ErrSegcoreUnsupported = newMilvusError("segcore unsupported error", 2001, false) + ErrSegcorePretendFinished = newMilvusError("segcore pretend finished", 2002, false) + ErrSegcoreFollyOtherException = newMilvusError("segcore folly other exception", 2200, false) // throw from segcore. + ErrSegcoreFollyCancel = newMilvusError("segcore Future was canceled", 2201, false) // throw from segcore. // Do NOT export this, // never allow programmer using this, keep only for converting unknown error to milvusError diff --git a/scripts/run_go_unittest.sh b/scripts/run_go_unittest.sh index 919d000fe3..fb3666e7e4 100755 --- a/scripts/run_go_unittest.sh +++ b/scripts/run_go_unittest.sh @@ -107,6 +107,7 @@ go test -race -cover -tags dynamic,test "${MILVUS_DIR}/util/typeutil/..." -failf go test -race -cover -tags dynamic,test "${MILVUS_DIR}/util/importutilv2/..." -failfast -count=1 -ldflags="-r ${RPATH}" go test -race -cover -tags dynamic,test "${MILVUS_DIR}/util/proxyutil/..." -failfast -count=1 -ldflags="-r ${RPATH}" go test -race -cover -tags dynamic,test "${MILVUS_DIR}/util/initcore/..." -failfast -count=1 -ldflags="-r ${RPATH}" +go test -race -cover -tags dynamic,test "${MILVUS_DIR}/util/cgo/..." -failfast -count=1 -ldflags="-r ${RPATH}" } function test_pkg()