mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-30 07:25:37 +08:00
Merge branch 'branch-0.5.0-yk' into 'branch-0.5.0'
MS-636 Add optimizer in scheduler for FAISS_IVFSQ8H See merge request megasearch/milvus!690 Former-commit-id: 5c239cfe28b286e8047a4386dc4a83825f3d5161
This commit is contained in:
commit
c8feb980fe
@ -37,6 +37,7 @@ Please mark all change in change log and use the ticket from JIRA.
|
||||
## New Feature
|
||||
- MS-627 - Integrate new index: IVFSQHybrid
|
||||
- MS-631 - IVFSQ8H Index support
|
||||
- MS-636 - Add optimizer in scheduler for FAISS_IVFSQ8H
|
||||
|
||||
## Task
|
||||
- MS-554 - Change license to Apache 2.0
|
||||
|
||||
@ -900,20 +900,21 @@ DBImpl::BackgroundBuildIndex() {
|
||||
meta_ptr_->FilesToIndex(to_index_files);
|
||||
Status status;
|
||||
|
||||
scheduler::BuildIndexJobPtr job = std::make_shared<scheduler::BuildIndexJob>(0, meta_ptr_, options_);
|
||||
if (!to_index_files.empty()) {
|
||||
scheduler::BuildIndexJobPtr job = std::make_shared<scheduler::BuildIndexJob>(0, meta_ptr_, options_);
|
||||
|
||||
// step 2: put build index task to scheduler
|
||||
for (auto& file : to_index_files) {
|
||||
scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
|
||||
job->AddToIndexFiles(file_ptr);
|
||||
// step 2: put build index task to scheduler
|
||||
for (auto& file : to_index_files) {
|
||||
scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
|
||||
job->AddToIndexFiles(file_ptr);
|
||||
}
|
||||
scheduler::JobMgrInst::GetInstance()->Put(job);
|
||||
job->WaitBuildIndexFinish();
|
||||
if (!job->GetStatus().ok()) {
|
||||
Status status = job->GetStatus();
|
||||
ENGINE_LOG_ERROR << "Building index failed: " << status.ToString();
|
||||
}
|
||||
}
|
||||
scheduler::JobMgrInst::GetInstance()->Put(job);
|
||||
job->WaitBuildIndexFinish();
|
||||
if (!job->GetStatus().ok()) {
|
||||
Status status = job->GetStatus();
|
||||
ENGINE_LOG_ERROR << "Building index failed: " << status.ToString();
|
||||
}
|
||||
|
||||
// for (auto &file : to_index_files) {
|
||||
// status = BuildIndex(file);
|
||||
// if (!status.ok()) {
|
||||
|
||||
@ -29,8 +29,6 @@ constexpr uint64_t MAXINT = std::numeric_limits<uint32_t>::max();
|
||||
uint64_t
|
||||
ShortestPath(const ResourcePtr& src, const ResourcePtr& dest, const ResourceMgrPtr& res_mgr,
|
||||
std::vector<std::string>& path) {
|
||||
std::vector<std::vector<std::string>> paths;
|
||||
|
||||
uint64_t num_of_resources = res_mgr->GetAllResources().size();
|
||||
std::unordered_map<uint64_t, std::string> id_name_map;
|
||||
std::unordered_map<std::string, uint64_t> name_id_map;
|
||||
|
||||
@ -16,7 +16,9 @@
|
||||
// under the License.
|
||||
|
||||
#include "scheduler/JobMgr.h"
|
||||
#include "SchedInst.h"
|
||||
#include "TaskCreator.h"
|
||||
#include "optimizer/Optimizer.h"
|
||||
#include "task/Task.h"
|
||||
|
||||
#include <src/scheduler/optimizer/Optimizer.h>
|
||||
@ -67,8 +69,9 @@ JobMgr::worker_function() {
|
||||
}
|
||||
|
||||
auto tasks = build_task(job);
|
||||
|
||||
// TODO: optimizer all task
|
||||
for (auto& task : tasks) {
|
||||
OptimizerInst::GetInstance()->Run(task);
|
||||
}
|
||||
|
||||
// disk resources NEVER be empty.
|
||||
if (auto disk = res_mgr_->GetDiskResources()[0].lock()) {
|
||||
|
||||
@ -38,6 +38,9 @@ std::mutex SchedInst::mutex_;
|
||||
scheduler::JobMgrPtr JobMgrInst::instance = nullptr;
|
||||
std::mutex JobMgrInst::mutex_;
|
||||
|
||||
OptimizerPtr OptimizerInst::instance = nullptr;
|
||||
std::mutex OptimizerInst::mutex_;
|
||||
|
||||
void
|
||||
load_simple_config() {
|
||||
server::Config& config = server::Config::GetInstance();
|
||||
|
||||
@ -20,9 +20,12 @@
|
||||
#include "JobMgr.h"
|
||||
#include "ResourceMgr.h"
|
||||
#include "Scheduler.h"
|
||||
#include "optimizer/HybridPass.h"
|
||||
#include "optimizer/Optimizer.h"
|
||||
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <vector>
|
||||
|
||||
namespace milvus {
|
||||
namespace scheduler {
|
||||
@ -81,6 +84,27 @@ class JobMgrInst {
|
||||
static std::mutex mutex_;
|
||||
};
|
||||
|
||||
class OptimizerInst {
|
||||
public:
|
||||
static OptimizerPtr
|
||||
GetInstance() {
|
||||
if (instance == nullptr) {
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
if (instance == nullptr) {
|
||||
HybridPassPtr pass_ptr = std::make_shared<HybridPass>();
|
||||
std::vector<PassPtr> pass_list;
|
||||
pass_list.push_back(pass_ptr);
|
||||
instance = std::make_shared<Optimizer>(pass_list);
|
||||
}
|
||||
}
|
||||
return instance;
|
||||
}
|
||||
|
||||
private:
|
||||
static scheduler::OptimizerPtr instance;
|
||||
static std::mutex mutex_;
|
||||
};
|
||||
|
||||
void
|
||||
StartSchedulerService();
|
||||
|
||||
|
||||
@ -16,10 +16,10 @@
|
||||
// under the License.
|
||||
|
||||
#include "scheduler/TaskCreator.h"
|
||||
#include <src/scheduler/tasklabel/SpecResLabel.h>
|
||||
#include "SchedInst.h"
|
||||
#include "scheduler/tasklabel/BroadcastLabel.h"
|
||||
#include "tasklabel/BroadcastLabel.h"
|
||||
#include "tasklabel/DefaultLabel.h"
|
||||
#include "tasklabel/SpecResLabel.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace scheduler {
|
||||
|
||||
@ -19,6 +19,7 @@
|
||||
#include <random>
|
||||
#include "../Algorithm.h"
|
||||
#include "Action.h"
|
||||
#include "scheduler/tasklabel/SpecResLabel.h"
|
||||
#include "src/cache/GpuCacheMgr.h"
|
||||
#include "src/server/Config.h"
|
||||
|
||||
@ -145,25 +146,35 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr
|
||||
paths.emplace_back(path);
|
||||
}
|
||||
if (task->job_.lock()->type() == JobType::SEARCH) {
|
||||
// step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost
|
||||
uint64_t min_cost = std::numeric_limits<uint64_t>::max();
|
||||
uint64_t min_cost_idx = 0;
|
||||
for (uint64_t i = 0; i < compute_resources.size(); ++i) {
|
||||
if (compute_resources[i]->TotalTasks() == 0) {
|
||||
min_cost_idx = i;
|
||||
break;
|
||||
}
|
||||
uint64_t cost =
|
||||
compute_resources[i]->TaskAvgCost() * compute_resources[i]->NumOfTaskToExec() + transport_costs[i];
|
||||
if (min_cost > cost) {
|
||||
min_cost = cost;
|
||||
min_cost_idx = i;
|
||||
auto label = task->label();
|
||||
auto spec_label = std::static_pointer_cast<SpecResLabel>(label);
|
||||
if (spec_label->resource().lock()->type() == ResourceType::CPU) {
|
||||
std::vector<std::string> spec_path;
|
||||
spec_path.push_back(spec_label->resource().lock()->name());
|
||||
spec_path.push_back(resource->name());
|
||||
task->path() = Path(spec_path, spec_path.size() - 1);
|
||||
} else {
|
||||
// step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost
|
||||
uint64_t min_cost = std::numeric_limits<uint64_t>::max();
|
||||
uint64_t min_cost_idx = 0;
|
||||
for (uint64_t i = 0; i < compute_resources.size(); ++i) {
|
||||
if (compute_resources[i]->TotalTasks() == 0) {
|
||||
min_cost_idx = i;
|
||||
break;
|
||||
}
|
||||
uint64_t cost = compute_resources[i]->TaskAvgCost() * compute_resources[i]->NumOfTaskToExec() +
|
||||
transport_costs[i];
|
||||
if (min_cost > cost) {
|
||||
min_cost = cost;
|
||||
min_cost_idx = i;
|
||||
}
|
||||
}
|
||||
|
||||
// step 3: set path in task
|
||||
Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1);
|
||||
task->path() = task_path;
|
||||
}
|
||||
|
||||
// step 3: set path in task
|
||||
Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1);
|
||||
task->path() = task_path;
|
||||
} else if (task->job_.lock()->type() == JobType::BUILD) {
|
||||
// step2: Read device id in config
|
||||
// get build index gpu resource
|
||||
|
||||
@ -16,7 +16,9 @@
|
||||
// under the License.
|
||||
|
||||
#include "scheduler/optimizer/HybridPass.h"
|
||||
#include "scheduler/SchedInst.h"
|
||||
#include "scheduler/task/SearchTask.h"
|
||||
#include "scheduler/tasklabel/SpecResLabel.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace scheduler {
|
||||
@ -28,7 +30,10 @@ HybridPass::Run(const TaskPtr& task) {
|
||||
return false;
|
||||
auto search_task = std::static_pointer_cast<XSearchTask>(task);
|
||||
if (search_task->file_->engine_type_ == (int)engine::EngineType::FAISS_IVFSQ8H) {
|
||||
// TODO: make specified label
|
||||
// TODO: remove "cpu" hardcode
|
||||
ResourcePtr res_ptr = ResMgrInst::GetInstance()->GetResource("cpu");
|
||||
auto label = std::make_shared<SpecResLabel>(std::weak_ptr<Resource>(res_ptr));
|
||||
task->label() = label;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
|
||||
@ -25,6 +25,7 @@
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <unordered_map>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include "Pass.h"
|
||||
@ -34,7 +35,8 @@ namespace scheduler {
|
||||
|
||||
class Optimizer {
|
||||
public:
|
||||
Optimizer() = default;
|
||||
explicit Optimizer(std::vector<PassPtr> pass_list) : pass_list_(std::move(pass_list)) {
|
||||
}
|
||||
|
||||
void
|
||||
Init();
|
||||
@ -46,5 +48,7 @@ class Optimizer {
|
||||
std::vector<PassPtr> pass_list_;
|
||||
};
|
||||
|
||||
using OptimizerPtr = std::shared_ptr<Optimizer>;
|
||||
|
||||
} // namespace scheduler
|
||||
} // namespace milvus
|
||||
|
||||
@ -124,6 +124,7 @@ XBuildIndexTask::Execute() {
|
||||
ENGINE_LOG_ERROR << "Failed to create table file: " << status.ToString();
|
||||
build_index_job->BuildIndexDone(to_index_id_);
|
||||
build_index_job->GetStatus() = status;
|
||||
to_index_engine_ = nullptr;
|
||||
return;
|
||||
}
|
||||
|
||||
@ -136,6 +137,7 @@ XBuildIndexTask::Execute() {
|
||||
ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_
|
||||
<< " to to_delete";
|
||||
|
||||
to_index_engine_ = nullptr;
|
||||
return;
|
||||
}
|
||||
} catch (std::exception& ex) {
|
||||
@ -150,6 +152,7 @@ XBuildIndexTask::Execute() {
|
||||
<< std::endl;
|
||||
|
||||
build_index_job->GetStatus() = Status(DB_ERROR, msg);
|
||||
to_index_engine_ = nullptr;
|
||||
return;
|
||||
}
|
||||
|
||||
@ -158,6 +161,7 @@ XBuildIndexTask::Execute() {
|
||||
meta_ptr->HasTable(file_->table_id_, has_table);
|
||||
if (!has_table) {
|
||||
meta_ptr->DeleteTableFiles(file_->table_id_);
|
||||
to_index_engine_ = nullptr;
|
||||
return;
|
||||
}
|
||||
|
||||
@ -177,6 +181,7 @@ XBuildIndexTask::Execute() {
|
||||
<< ", possible out of disk space" << std::endl;
|
||||
|
||||
build_index_job->GetStatus() = Status(DB_ERROR, msg);
|
||||
to_index_engine_ = nullptr;
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
@ -50,6 +50,7 @@ aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/event scheduler_event_files)
|
||||
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/job scheduler_job_files)
|
||||
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/resource scheduler_resource_files)
|
||||
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/task scheduler_task_files)
|
||||
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/optimizer scheduler_optimizer_files)
|
||||
set(scheduler_files
|
||||
${scheduler_main_files}
|
||||
${scheduler_action_files}
|
||||
@ -57,6 +58,7 @@ set(scheduler_files
|
||||
${scheduler_job_files}
|
||||
${scheduler_resource_files}
|
||||
${scheduler_task_files}
|
||||
${scheduler_optimizer_files}
|
||||
)
|
||||
|
||||
aux_source_directory(${MILVUS_ENGINE_SRC}/server server_files)
|
||||
|
||||
@ -31,3 +31,12 @@ target_link_libraries(test_db
|
||||
|
||||
install(TARGETS test_db DESTINATION unittest)
|
||||
|
||||
configure_file(appendix/server_config.yaml
|
||||
"${CMAKE_CURRENT_BINARY_DIR}/milvus/conf/server_config.yaml"
|
||||
COPYONLY)
|
||||
|
||||
configure_file(appendix/log_config.conf
|
||||
"${CMAKE_CURRENT_BINARY_DIR}/milvus/conf/log_config.conf"
|
||||
COPYONLY)
|
||||
|
||||
|
||||
|
||||
27
cpp/unittest/db/appendix/log_config.conf
Normal file
27
cpp/unittest/db/appendix/log_config.conf
Normal file
@ -0,0 +1,27 @@
|
||||
* GLOBAL:
|
||||
FORMAT = "%datetime | %level | %logger | %msg"
|
||||
FILENAME = "/tmp/milvus/logs/milvus-%datetime{%y-%M-%d-%H:%m}-global.log"
|
||||
ENABLED = true
|
||||
TO_FILE = true
|
||||
TO_STANDARD_OUTPUT = false
|
||||
SUBSECOND_PRECISION = 3
|
||||
PERFORMANCE_TRACKING = false
|
||||
MAX_LOG_FILE_SIZE = 209715200 ## Throw log files away after 200MB
|
||||
* DEBUG:
|
||||
FILENAME = "/tmp/milvus/logs/milvus-%datetime{%y-%M-%d-%H:%m}-debug.log"
|
||||
ENABLED = true
|
||||
* WARNING:
|
||||
FILENAME = "/tmp/milvus/logs/milvus-%datetime{%y-%M-%d-%H:%m}-warning.log"
|
||||
* TRACE:
|
||||
FILENAME = "/tmp/milvus/logs/milvus-%datetime{%y-%M-%d-%H:%m}-trace.log"
|
||||
* VERBOSE:
|
||||
FORMAT = "%datetime{%d/%M/%y} | %level-%vlevel | %msg"
|
||||
TO_FILE = false
|
||||
TO_STANDARD_OUTPUT = false
|
||||
## Error logs
|
||||
* ERROR:
|
||||
ENABLED = true
|
||||
FILENAME = "/tmp/milvus/logs/milvus-%datetime{%y-%M-%d-%H:%m}-error.log"
|
||||
* FATAL:
|
||||
ENABLED = true
|
||||
FILENAME = "/tmp/milvus/logs/milvus-%datetime{%y-%M-%d-%H:%m}-fatal.log"
|
||||
37
cpp/unittest/db/appendix/server_config.yaml
Normal file
37
cpp/unittest/db/appendix/server_config.yaml
Normal file
@ -0,0 +1,37 @@
|
||||
# All the following configurations are default values.
|
||||
|
||||
server_config:
|
||||
address: 0.0.0.0 # milvus server ip address (IPv4)
|
||||
port: 19530 # port range: 1025 ~ 65534
|
||||
deploy_mode: single # deployment type: single, cluster_readonly, cluster_writable
|
||||
time_zone: UTC+8
|
||||
|
||||
db_config:
|
||||
primary_path: /tmp/milvus # path used to store data and meta
|
||||
secondary_path: # path used to store data only, split by semicolon
|
||||
|
||||
backend_url: sqlite://:@:/ # URI format: dialect://username:password@host:port/database
|
||||
# Keep 'dialect://:@:/', and replace other texts with real values.
|
||||
# Replace 'dialect' with 'mysql' or 'sqlite'
|
||||
|
||||
insert_buffer_size: 4 # GB, maximum insert buffer size allowed
|
||||
build_index_gpu: 0 # gpu id used for building index
|
||||
|
||||
metric_config:
|
||||
enable_monitor: false # enable monitoring or not
|
||||
collector: prometheus # prometheus
|
||||
prometheus_config:
|
||||
port: 8080 # port prometheus used to fetch metrics
|
||||
|
||||
cache_config:
|
||||
cpu_mem_capacity: 16 # GB, CPU memory used for cache
|
||||
cpu_mem_threshold: 0.85 # percentage of data kept when cache cleanup triggered
|
||||
cache_insert_data: false # whether load inserted data into cache
|
||||
|
||||
engine_config:
|
||||
blas_threshold: 20
|
||||
|
||||
resource_config:
|
||||
resource_pool:
|
||||
- cpu
|
||||
- gpu0
|
||||
@ -23,14 +23,18 @@
|
||||
#include "db/DBFactory.h"
|
||||
#include "cache/CpuCacheMgr.h"
|
||||
#include "utils/CommonUtil.h"
|
||||
#include "server/Config.h"
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
#include <boost/filesystem.hpp>
|
||||
#include <thread>
|
||||
#include <random>
|
||||
|
||||
|
||||
namespace {
|
||||
|
||||
static const char *CONFIG_FILE_PATH = "./milvus/conf/server_config.yaml";
|
||||
|
||||
static const char *TABLE_NAME = "test_group";
|
||||
static constexpr int64_t TABLE_DIM = 256;
|
||||
static constexpr int64_t VECTOR_COUNT = 25000;
|
||||
@ -228,6 +232,9 @@ TEST_F(DBTest, DB_TEST) {
|
||||
}
|
||||
|
||||
TEST_F(DBTest, SEARCH_TEST) {
|
||||
milvus::server::Config &config = milvus::server::Config::GetInstance();
|
||||
milvus::Status s = config.LoadConfigFile(CONFIG_FILE_PATH);
|
||||
|
||||
milvus::engine::meta::TableSchema table_info = BuildTableSchema();
|
||||
auto stat = db_->CreateTable(table_info);
|
||||
|
||||
@ -290,6 +297,25 @@ TEST_F(DBTest, SEARCH_TEST) {
|
||||
ASSERT_TRUE(stat.ok());
|
||||
}
|
||||
|
||||
//test FAISS_IVFSQ8H optimizer
|
||||
index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFSQ8H;
|
||||
db_->CreateIndex(TABLE_NAME, index); // wait until build index finish
|
||||
|
||||
{
|
||||
milvus::engine::QueryResults results;
|
||||
stat = db_->Query(TABLE_NAME, k, nq, 10, xq.data(), results);
|
||||
ASSERT_TRUE(stat.ok());
|
||||
}
|
||||
|
||||
{//search by specify index file
|
||||
milvus::engine::meta::DatesT dates;
|
||||
std::vector<std::string> file_ids = {"1", "2", "3", "4", "5", "6"};
|
||||
milvus::engine::QueryResults results;
|
||||
stat = db_->Query(TABLE_NAME, file_ids, k, nq, 10, xq.data(), dates, results);
|
||||
ASSERT_TRUE(stat.ok());
|
||||
}
|
||||
|
||||
|
||||
// TODO(lxj): add groundTruth assert
|
||||
}
|
||||
|
||||
|
||||
@ -97,7 +97,7 @@ DBTest::SetUp() {
|
||||
auto res_mgr = milvus::scheduler::ResMgrInst::GetInstance();
|
||||
res_mgr->Clear();
|
||||
res_mgr->Add(milvus::scheduler::ResourceFactory::Create("disk", "DISK", 0, true, false));
|
||||
res_mgr->Add(milvus::scheduler::ResourceFactory::Create("cpu", "CPU", 0, true, false));
|
||||
res_mgr->Add(milvus::scheduler::ResourceFactory::Create("cpu", "CPU", 0, true, true));
|
||||
res_mgr->Add(milvus::scheduler::ResourceFactory::Create("gtx1660", "GPU", 0, true, true));
|
||||
|
||||
auto default_conn = milvus::scheduler::Connection("IO", 500.0);
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user