milvus/internal/core/src/storage/ThreadPool.cpp
2025-08-11 14:09:42 +08:00

137 lines
4.2 KiB
C++

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "ThreadPool.h"
#include "log/Log.h"
namespace milvus {
int CPU_NUM = DEFAULT_CPU_NUM;
std::atomic<float> HIGH_PRIORITY_THREAD_CORE_COEFFICIENT(
DEFAULT_HIGH_PRIORITY_THREAD_CORE_COEFFICIENT);
std::atomic<float> MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT(
DEFAULT_MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT);
std::atomic<float> LOW_PRIORITY_THREAD_CORE_COEFFICIENT(
DEFAULT_LOW_PRIORITY_THREAD_CORE_COEFFICIENT);
void
SetHighPriorityThreadCoreCoefficient(const float coefficient) {
HIGH_PRIORITY_THREAD_CORE_COEFFICIENT.store(coefficient);
LOG_INFO("set high priority thread pool core coefficient: {}",
HIGH_PRIORITY_THREAD_CORE_COEFFICIENT.load());
}
void
SetMiddlePriorityThreadCoreCoefficient(const float coefficient) {
MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT.store(coefficient);
LOG_INFO("set middle priority thread pool core coefficient: {}",
MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT.load());
}
void
SetLowPriorityThreadCoreCoefficient(const float coefficient) {
LOW_PRIORITY_THREAD_CORE_COEFFICIENT.store(coefficient);
LOG_INFO("set low priority thread pool core coefficient: {}",
LOW_PRIORITY_THREAD_CORE_COEFFICIENT.load());
}
void
InitCpuNum(const int num) {
CPU_NUM = num;
}
void
ThreadPool::Init() {
std::lock_guard<std::mutex> lock(mutex_);
for (int i = 0; i < min_threads_size_; i++) {
std::thread t(&ThreadPool::Worker, this);
assert(threads_.find(t.get_id()) == threads_.end());
threads_[t.get_id()] = std::move(t);
current_threads_size_++;
}
}
void
ThreadPool::ShutDown() {
LOG_INFO("Start shutting down {}", name_);
{
std::lock_guard<std::mutex> lock(mutex_);
shutdown_ = true;
}
condition_lock_.notify_all();
for (auto& thread : threads_) {
if (thread.second.joinable()) {
thread.second.join();
}
}
LOG_INFO("Finish shutting down {}", name_);
}
void
ThreadPool::FinishThreads() {
while (!need_finish_threads_.empty()) {
std::thread::id id;
auto dequeue = need_finish_threads_.dequeue(id);
if (dequeue) {
auto iter = threads_.find(id);
assert(iter != threads_.end());
if (iter->second.joinable()) {
iter->second.join();
}
threads_.erase(iter);
}
}
}
void
ThreadPool::Worker() {
std::function<void()> func;
bool dequeue;
SetThreadName(name_);
while (!shutdown_) {
std::unique_lock<std::mutex> lock(mutex_);
idle_threads_size_++;
auto is_timeout = !condition_lock_.wait_for(
lock, std::chrono::seconds(WAIT_SECONDS), [this]() {
return shutdown_ || !work_queue_.empty();
});
idle_threads_size_--;
if (work_queue_.empty()) {
// Dynamic reduce thread number
if (shutdown_) {
current_threads_size_--;
return;
}
if (is_timeout) {
FinishThreads();
if (current_threads_size_ > min_threads_size_) {
need_finish_threads_.enqueue(std::this_thread::get_id());
current_threads_size_--;
return;
}
continue;
}
}
dequeue = work_queue_.dequeue(func);
lock.unlock();
if (dequeue) {
func();
}
}
}
}; // namespace milvus