mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-30 23:45:28 +08:00
MS-637 - out of memory when load too many tasks
Former-commit-id: 9c40f373286086f57b02da119212b665d1cb8e8c
This commit is contained in:
parent
8ab67f6965
commit
762140abc7
@ -13,6 +13,7 @@ Please mark all change in change log and use the ticket from JIRA.
|
||||
- MS-601 - Docker logs error caused by get CPUTemperature error
|
||||
- MS-622 - Delete vectors should be failed if date range is invalid
|
||||
- MS-620 - Get table row counts display wrong error code
|
||||
- MS-637 - out of memory when load too many tasks
|
||||
|
||||
## Improvement
|
||||
- MS-552 - Add and change the easylogging library
|
||||
|
||||
@ -49,40 +49,27 @@ load_simple_config() {
|
||||
std::vector<std::string> pool;
|
||||
config.GetResourceConfigPool(pool);
|
||||
|
||||
bool cpu = false;
|
||||
std::set<uint64_t> gpu_ids;
|
||||
// get resources
|
||||
bool use_cpu_to_compute = false;
|
||||
for (auto& resource : pool) {
|
||||
if (resource == "cpu") {
|
||||
cpu = true;
|
||||
use_cpu_to_compute = true;
|
||||
break;
|
||||
} else {
|
||||
if (resource.length() < 4 || resource.substr(0, 3) != "gpu") {
|
||||
// error
|
||||
exit(-1);
|
||||
}
|
||||
auto gpu_id = std::stoi(resource.substr(3));
|
||||
if (gpu_id >= get_num_gpu()) {
|
||||
// error
|
||||
exit(-1);
|
||||
}
|
||||
gpu_ids.insert(gpu_id);
|
||||
}
|
||||
}
|
||||
auto gpu_ids = get_gpu_pool();
|
||||
|
||||
// create and connect
|
||||
ResMgrInst::GetInstance()->Add(ResourceFactory::Create("disk", "DISK", 0, true, false));
|
||||
auto io = Connection("io", 500);
|
||||
if (cpu) {
|
||||
ResMgrInst::GetInstance()->Add(ResourceFactory::Create("cpu", "CPU", 0, true, true));
|
||||
ResMgrInst::GetInstance()->Connect("disk", "cpu", io);
|
||||
} else {
|
||||
ResMgrInst::GetInstance()->Add(ResourceFactory::Create("cpu", "CPU", 0, true, false));
|
||||
ResMgrInst::GetInstance()->Connect("disk", "cpu", io);
|
||||
|
||||
auto pcie = Connection("pcie", 12000);
|
||||
for (auto& gpu_id : gpu_ids) {
|
||||
ResMgrInst::GetInstance()->Add(ResourceFactory::Create(std::to_string(gpu_id), "GPU", gpu_id, true, true));
|
||||
ResMgrInst::GetInstance()->Connect("cpu", std::to_string(gpu_id), io);
|
||||
}
|
||||
auto io = Connection("io", 500);
|
||||
ResMgrInst::GetInstance()->Add(ResourceFactory::Create("cpu", "CPU", 0, true, use_cpu_to_compute));
|
||||
ResMgrInst::GetInstance()->Connect("disk", "cpu", io);
|
||||
|
||||
auto pcie = Connection("pcie", 12000);
|
||||
for (auto& gpu_id : gpu_ids) {
|
||||
ResMgrInst::GetInstance()->Add(ResourceFactory::Create(std::to_string(gpu_id), "GPU", gpu_id, true, true));
|
||||
ResMgrInst::GetInstance()->Connect("cpu", std::to_string(gpu_id), pcie);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -110,11 +110,15 @@ Scheduler::OnLoadCompleted(const EventPtr& event) {
|
||||
break;
|
||||
}
|
||||
case TaskLabelType::BROADCAST: {
|
||||
if (resource->HasExecutor() == false) {
|
||||
load_completed_event->task_table_item_->Move();
|
||||
}
|
||||
Action::PushTaskToAllNeighbour(load_completed_event->task_table_item_->task, resource);
|
||||
break;
|
||||
}
|
||||
default: { break; }
|
||||
}
|
||||
resource->WakeupLoader();
|
||||
}
|
||||
}
|
||||
|
||||
@ -127,6 +131,9 @@ Scheduler::OnStartUp(const EventPtr& event) {
|
||||
|
||||
void
|
||||
Scheduler::OnFinishTask(const EventPtr& event) {
|
||||
if (auto resource = event->resource_.lock()) {
|
||||
resource->WakeupLoader();
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
|
||||
@ -17,6 +17,7 @@
|
||||
|
||||
#include "scheduler/TaskTable.h"
|
||||
#include "Utils.h"
|
||||
#include "utils/Log.h"
|
||||
#include "event/TaskTableUpdatedEvent.h"
|
||||
|
||||
#include <ctime>
|
||||
@ -157,6 +158,17 @@ TaskTableItem::Dump() {
|
||||
|
||||
std::vector<uint64_t>
|
||||
TaskTable::PickToLoad(uint64_t limit) {
|
||||
size_t count = 0;
|
||||
for (int j = last_finish_ + 1; j < table_.size(); ++j) {
|
||||
if (not table_[j]) {
|
||||
SERVER_LOG_WARNING << "table[" << j << "] is nullptr";
|
||||
}
|
||||
if (table_[j]->state == TaskTableItemState::LOADED) {
|
||||
++count;
|
||||
if (count > 2) return std::vector<uint64_t >();
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<uint64_t> indexes;
|
||||
bool cross = false;
|
||||
for (uint64_t i = last_finish_ + 1, count = 0; i < table_.size() && count < limit; ++i) {
|
||||
|
||||
@ -46,7 +46,7 @@ TEST(NormalTest, INST_TEST) {
|
||||
res_mgr->Start();
|
||||
scheduler->Start();
|
||||
|
||||
const uint64_t NUM_TASK = 1000;
|
||||
const uint64_t NUM_TASK = 2;
|
||||
std::vector<std::shared_ptr<ms::TestTask>> tasks;
|
||||
ms::TableFileSchemaPtr dummy = nullptr;
|
||||
|
||||
|
||||
@ -31,6 +31,8 @@
|
||||
namespace milvus {
|
||||
namespace scheduler {
|
||||
|
||||
constexpr uint64_t max_once_load = 2;
|
||||
|
||||
/************ ResourceBaseTest ************/
|
||||
class ResourceBaseTest : public testing::Test {
|
||||
protected:
|
||||
@ -182,7 +184,7 @@ class ResourceAdvanceTest : public testing::Test {
|
||||
};
|
||||
|
||||
TEST_F(ResourceAdvanceTest, DISK_RESOURCE_TEST) {
|
||||
const uint64_t NUM = 100;
|
||||
const uint64_t NUM = 10;
|
||||
std::vector<std::shared_ptr<TestTask>> tasks;
|
||||
TableFileSchemaPtr dummy = nullptr;
|
||||
for (uint64_t i = 0; i < NUM; ++i) {
|
||||
@ -208,7 +210,7 @@ TEST_F(ResourceAdvanceTest, DISK_RESOURCE_TEST) {
|
||||
}
|
||||
|
||||
TEST_F(ResourceAdvanceTest, CPU_RESOURCE_TEST) {
|
||||
const uint64_t NUM = 100;
|
||||
const uint64_t NUM = max_once_load;
|
||||
std::vector<std::shared_ptr<TestTask>> tasks;
|
||||
TableFileSchemaPtr dummy = nullptr;
|
||||
for (uint64_t i = 0; i < NUM; ++i) {
|
||||
@ -234,7 +236,7 @@ TEST_F(ResourceAdvanceTest, CPU_RESOURCE_TEST) {
|
||||
}
|
||||
|
||||
TEST_F(ResourceAdvanceTest, GPU_RESOURCE_TEST) {
|
||||
const uint64_t NUM = 100;
|
||||
const uint64_t NUM = max_once_load;
|
||||
std::vector<std::shared_ptr<TestTask>> tasks;
|
||||
TableFileSchemaPtr dummy = nullptr;
|
||||
for (uint64_t i = 0; i < NUM; ++i) {
|
||||
@ -260,7 +262,7 @@ TEST_F(ResourceAdvanceTest, GPU_RESOURCE_TEST) {
|
||||
}
|
||||
|
||||
TEST_F(ResourceAdvanceTest, TEST_RESOURCE_TEST) {
|
||||
const uint64_t NUM = 100;
|
||||
const uint64_t NUM = max_once_load;
|
||||
std::vector<std::shared_ptr<TestTask>> tasks;
|
||||
TableFileSchemaPtr dummy = nullptr;
|
||||
for (uint64_t i = 0; i < NUM; ++i) {
|
||||
|
||||
@ -1,88 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include <boost/filesystem.hpp>
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include "server/Config.h"
|
||||
#include "scheduler/SchedInst.h"
|
||||
|
||||
|
||||
namespace milvus {
|
||||
namespace scheduler {
|
||||
|
||||
class SchedInstTest : public testing::Test {
|
||||
protected:
|
||||
void
|
||||
SetUp() override {
|
||||
boost::filesystem::create_directory(TMP_DIR);
|
||||
std::stringstream ss;
|
||||
ss << "resource_config: " << std::endl;
|
||||
ss << " resources: " << std::endl;
|
||||
ss << " ssda: " << std::endl;
|
||||
ss << " type: DISK" << std::endl;
|
||||
ss << " device_id: 0" << std::endl;
|
||||
ss << " enable_loader: true" << std::endl;
|
||||
ss << " enable_executor: false" << std::endl;
|
||||
ss << " " << std::endl;
|
||||
ss << " cpu: " << std::endl;
|
||||
ss << " type: CPU" << std::endl;
|
||||
ss << " device_id: 0" << std::endl;
|
||||
ss << " enable_loader: true" << std::endl;
|
||||
ss << " enable_executor: false" << std::endl;
|
||||
ss << " " << std::endl;
|
||||
ss << " gpu0: " << std::endl;
|
||||
ss << " type: GPU" << std::endl;
|
||||
ss << " device_id: 0" << std::endl;
|
||||
ss << " enable_loader: true" << std::endl;
|
||||
ss << " enable_executor: true" << std::endl;
|
||||
ss << " " << std::endl;
|
||||
ss << " connections: " << std::endl;
|
||||
ss << " io: " << std::endl;
|
||||
ss << " speed: 500" << std::endl;
|
||||
ss << " endpoint: ssda===cpu" << std::endl;
|
||||
ss << " pcie: " << std::endl;
|
||||
ss << " speed: 11000" << std::endl;
|
||||
ss << " endpoint: cpu===gpu0" << std::endl;
|
||||
|
||||
boost::filesystem::path fpath(CONFIG_FILE);
|
||||
boost::filesystem::fstream fstream(fpath, std::ios_base::out);
|
||||
fstream << ss.str();
|
||||
fstream.close();
|
||||
|
||||
server::Config::GetInstance().LoadConfigFile(CONFIG_FILE);
|
||||
}
|
||||
|
||||
void
|
||||
TearDown() override {
|
||||
StopSchedulerService();
|
||||
boost::filesystem::remove_all(TMP_DIR);
|
||||
}
|
||||
|
||||
const std::string TMP_DIR = "/tmp/milvus_sched_test";
|
||||
const std::string CONFIG_FILE = "/tmp/milvus_sched_test/config.yaml";
|
||||
};
|
||||
|
||||
TEST_F(SchedInstTest, SIMPLE_GPU) {
|
||||
StartSchedulerService();
|
||||
}
|
||||
|
||||
} // namespace scheduler
|
||||
} // namespace milvus
|
||||
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user