From 923406da8b92deb961f68adb41914164c90ef244 Mon Sep 17 00:00:00 2001 From: Yu Kun Date: Sun, 13 Oct 2019 21:14:32 +0800 Subject: [PATCH] MS-639 SQ8H index created failed and server hang Former-commit-id: f57b0eb2423ee7d99fb58f06a68e828b3273d0f9 --- cpp/src/scheduler/BuildMgr.cpp | 25 ++++++++++ cpp/src/scheduler/BuildMgr.h | 63 +++++++++++++++++++++++++ cpp/src/scheduler/SchedInst.cpp | 3 ++ cpp/src/scheduler/SchedInst.h | 19 ++++++++ cpp/src/scheduler/TaskTable.cpp | 28 +++++++++-- cpp/src/scheduler/resource/Resource.cpp | 20 ++++++-- cpp/src/scheduler/task/Path.h | 11 ++++- 7 files changed, 162 insertions(+), 7 deletions(-) create mode 100644 cpp/src/scheduler/BuildMgr.cpp create mode 100644 cpp/src/scheduler/BuildMgr.h diff --git a/cpp/src/scheduler/BuildMgr.cpp b/cpp/src/scheduler/BuildMgr.cpp new file mode 100644 index 0000000000..3e8b80087d --- /dev/null +++ b/cpp/src/scheduler/BuildMgr.cpp @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +#include "BuildMgr.h" + +namespace milvus { +namespace scheduler { + +} // namespace scheduler +} // namespace milvus diff --git a/cpp/src/scheduler/BuildMgr.h b/cpp/src/scheduler/BuildMgr.h new file mode 100644 index 0000000000..ee7ab38e25 --- /dev/null +++ b/cpp/src/scheduler/BuildMgr.h @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace milvus { +namespace scheduler { + +class BuildMgr { + public: + explicit BuildMgr(int64_t numoftasks) : numoftasks_(numoftasks) { + } + + public: + void + Put() { + ++numoftasks_; + } + + void + take() { + --numoftasks_; + } + + int64_t + numoftasks() { + return (int64_t)numoftasks_; + } + + private: + std::atomic_long numoftasks_; +}; + +using BuildMgrPtr = std::shared_ptr; + +} // namespace scheduler +} // namespace milvus diff --git a/cpp/src/scheduler/SchedInst.cpp b/cpp/src/scheduler/SchedInst.cpp index 194e0d0e00..0053332746 100644 --- a/cpp/src/scheduler/SchedInst.cpp +++ b/cpp/src/scheduler/SchedInst.cpp @@ -41,6 +41,9 @@ std::mutex JobMgrInst::mutex_; OptimizerPtr OptimizerInst::instance = nullptr; std::mutex OptimizerInst::mutex_; +BuildMgrPtr BuildMgrInst::instance = nullptr; +std::mutex BuildMgrInst::mutex_; + void load_simple_config() { server::Config& config = server::Config::GetInstance(); diff --git a/cpp/src/scheduler/SchedInst.h b/cpp/src/scheduler/SchedInst.h index 0d2a04b02c..2a1388345c 100644 --- a/cpp/src/scheduler/SchedInst.h +++ b/cpp/src/scheduler/SchedInst.h @@ -17,6 +17,7 @@ #pragma once +#include "BuildMgr.h" #include "JobMgr.h" #include "ResourceMgr.h" #include "Scheduler.h" @@ -105,6 +106,24 @@ class OptimizerInst { static std::mutex mutex_; }; +class BuildMgrInst { + public: + static BuildMgrPtr + GetInstance() { + if (instance == nullptr) { + std::lock_guard lock(mutex_); + if (instance == nullptr) { + instance = std::make_shared(4); + } + } + return instance; + } + + private: + static BuildMgrPtr instance; + static std::mutex mutex_; +}; + void StartSchedulerService(); diff --git a/cpp/src/scheduler/TaskTable.cpp b/cpp/src/scheduler/TaskTable.cpp index 2f7576de34..60a3425d74 100644 --- a/cpp/src/scheduler/TaskTable.cpp +++ b/cpp/src/scheduler/TaskTable.cpp @@ -16,6 +16,7 @@ // under the License. #include "scheduler/TaskTable.h" +#include "scheduler/SchedInst.h" #include "Utils.h" #include "event/TaskTableUpdatedEvent.h" #include "utils/Log.h" @@ -164,6 +165,15 @@ TaskTable::PickToLoad(uint64_t limit) { if (not table_[j]) { SERVER_LOG_WARNING << "table[" << j << "] is nullptr"; } + + if (table_[j]->task->path().Current() == "cpu") { + if (table_[j]->task->Type() == TaskType::BuildIndexTask + && BuildMgrInst::GetInstance()->numoftasks() < 1) { + return std::vector(); + } + } + + if (table_[j]->state == TaskTableItemState::LOADED) { ++count; if (count > 2) @@ -177,9 +187,21 @@ TaskTable::PickToLoad(uint64_t limit) { if (not cross && table_[i]->IsFinish()) { last_finish_ = i; } else if (table_[i]->state == TaskTableItemState::START) { - cross = true; - indexes.push_back(i); - ++count; + auto task = table_[i]->task; + if (task->Type() == TaskType::BuildIndexTask && task->path().Current() == "cpu") { + if (BuildMgrInst::GetInstance()->numoftasks() == 0) { + break; + } else { + cross = true; + indexes.push_back(i); + ++count; + BuildMgrInst::GetInstance()->take(); + } + } else { + cross = true; + indexes.push_back(i); + ++count; + } } } return indexes; diff --git a/cpp/src/scheduler/resource/Resource.cpp b/cpp/src/scheduler/resource/Resource.cpp index 8fea475d70..0342edc61b 100644 --- a/cpp/src/scheduler/resource/Resource.cpp +++ b/cpp/src/scheduler/resource/Resource.cpp @@ -16,6 +16,7 @@ // under the License. #include "scheduler/resource/Resource.h" +#include "scheduler/SchedInst.h" #include "scheduler/Utils.h" #include @@ -111,11 +112,18 @@ Resource::pick_task_load() { TaskTableItemPtr Resource::pick_task_execute() { - auto indexes = task_table_.PickToExecute(3); +// auto indexes = task_table_.PickToExecute(3); + auto indexes = task_table_.PickToExecute(std::numeric_limits::max()); for (auto index : indexes) { // try to set one task executing, then return - if (task_table_.Execute(index)) - return task_table_.Get(index); +// if (task_table_.Execute(index)) +// return task_table_.Get(index); + if (task_table_.Get(index)->task->path().Current() + == task_table_.Get(index)->task->path().Last()) { + if (task_table_.Execute(index)) { + return task_table_.Get(index); + } + } // else try next } return nullptr; @@ -167,6 +175,12 @@ Resource::executor_function() { total_cost_ += finish - start; task_item->Executed(); + + if (task_item->task->Type() == TaskType::BuildIndexTask) { + BuildMgrInst::GetInstance()->Put(); + ResMgrInst::GetInstance()->GetResource("cpu")->WakeupLoader(); + } + if (subscriber_) { auto event = std::make_shared(shared_from_this(), task_item); subscriber_(std::static_pointer_cast(event)); diff --git a/cpp/src/scheduler/task/Path.h b/cpp/src/scheduler/task/Path.h index c23db9bb09..30e77a17b8 100644 --- a/cpp/src/scheduler/task/Path.h +++ b/cpp/src/scheduler/task/Path.h @@ -40,13 +40,22 @@ class Path { return path_; } + std::string + Current() { + if (!path_.empty() && path_.size() > index_) { + return path_[index_]; + } else { + return ""; + } + } + std::string Next() { if (index_ > 0 && !path_.empty()) { --index_; return path_[index_]; } else { - return nullptr; + return ""; } }