Merge branch 'branch-0.4.0' into 'branch-0.4.0'

MS-409 Using new scheduler

See merge request megasearch/milvus!417

Former-commit-id: 82972459227e5f45990a4f59a56cb99f15721704
This commit is contained in:
peng.xu 2019-08-23 21:15:24 +08:00
commit 1766a316c9
22 changed files with 221 additions and 14 deletions

View File

@ -46,6 +46,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-405 - Add delete task support
- MS-407 - Reconstruct MetricsCollector
- MS-408 - Add device_id in resource construct function
- MS-409 - Using new scheduler
## New Feature
- MS-343 - Implement ResourceMgr

View File

@ -37,5 +37,40 @@ cache_config:
engine_config:
use_blas_threshold: 20
metric_type: L2 # compare vectors by euclidean distance(L2) or inner product(IP), optional: L2 or IP
omp_thread_num: 0 # how many cpu cores be used by engine, 0 means use all cpu cores used
metric_type: L2 # compare vectors by euclidean distance(L2) or inner product(IP), optional: L2 or IP
omp_thread_num: 0 # how many compute threads be used by engine, 0 means use all cpu core to compute
resource_config:
resources:
ssda:
type: DISK
memory: 2048
device_id: 0
enable_loader: true
enable_executor: false
cpu:
type: CPU
memory: 64
device_id: 0
enable_loader: true
enable_executor: false
gtx1060:
type: GPU
memory: 6
device_id: 0
enable_loader: true
enable_executor: true
gtx1660:
type: GPU
memory: 6
device_id: 1
enable_loader: true
enable_executor: true
connections:
- ssda===cpu
- cpu===gtx1060
- cpu===gtx1660

View File

@ -17,6 +17,19 @@ aux_source_directory(db/meta db_meta_files)
aux_source_directory(metrics metrics_files)
aux_source_directory(wrapper/knowhere knowhere_files)
aux_source_directory(scheduler/action scheduler_action_files)
aux_source_directory(scheduler/event scheduler_event_files)
aux_source_directory(scheduler/resource scheduler_resource_files)
aux_source_directory(scheduler/task scheduler_task_files)
aux_source_directory(scheduler scheduler_root_files)
set(scheduler_srcs
${scheduler_action_files}
${scheduler_event_files}
${scheduler_resource_files}
${scheduler_task_files}
${scheduler_root_files}
)
aux_source_directory(db/scheduler scheduler_files)
aux_source_directory(db/scheduler/context scheduler_context_files)
aux_source_directory(db/scheduler/task scheduler_task_files)
@ -210,6 +223,7 @@ if (MILVUS_WITH_THRIFT STREQUAL "ON")
${utils_files}
${thrift_service_files}
${metrics_files}
${scheduler_srcs}
)
else()
add_executable(milvus_server
@ -219,6 +233,7 @@ else()
${utils_files}
${grpc_service_files}
${metrics_files}
${scheduler_srcs}
)
endif()

View File

@ -23,6 +23,7 @@
#include <cstring>
#include <cache/CpuCacheMgr.h>
#include <boost/filesystem.hpp>
#include "scheduler/SchedInst.h"
#include <src/cache/GpuCacheMgr.h>
namespace zilliz {
@ -67,13 +68,15 @@ Status DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& date
//scheduler will determine when to delete table files
TaskScheduler& scheduler = TaskScheduler::GetInstance();
DeleteContextPtr context = std::make_shared<DeleteContext>(table_id, meta_ptr_);
DeleteContextPtr context = std::make_shared<DeleteContext>(table_id,
meta_ptr_,
ResMgrInst::GetInstance()->GetNumOfComputeResource());
scheduler.Schedule(context);
context->WaitAndDelete();
} else {
meta_ptr_->DropPartitionsByDates(table_id, dates);
}
return Status::OK();
}

View File

@ -51,6 +51,8 @@ public:
virtual Status CopyToCpu() = 0;
virtual std::shared_ptr<ExecutionEngine> Clone() = 0;
virtual Status Merge(const std::string& location) = 0;
virtual Status Search(long n,

View File

@ -181,6 +181,13 @@ Status ExecutionEngineImpl::CopyToCpu() {
return Status::OK();
}
ExecutionEnginePtr ExecutionEngineImpl::Clone() {
auto ret = std::make_shared<ExecutionEngineImpl>(dim_, location_, index_type_, metric_type_, nlist_);
ret->Init();
ret->index_ = index_->Clone();
return ret;
}
Status ExecutionEngineImpl::Merge(const std::string &location) {
if (location == location_) {
return Status::Error("Cannot Merge Self");

View File

@ -50,6 +50,8 @@ public:
Status CopyToCpu() override;
ExecutionEnginePtr Clone() override;
Status Merge(const std::string &location) override;
Status Search(long n,

View File

@ -4,12 +4,15 @@
* Proprietary and confidential.
******************************************************************************/
#include "server/ServerConfig.h"
#include "TaskScheduler.h"
#include "TaskDispatchQueue.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include "db/engine/EngineFactory.h"
#include "scheduler/task/TaskConvert.h"
#include "scheduler/SchedInst.h"
#include "scheduler/ResourceFactory.h"
namespace zilliz {
namespace milvus {
@ -86,14 +89,22 @@ TaskScheduler::TaskDispatchWorker() {
return true;
}
#if 1
// TODO: Put task into Disk-TaskTable
// auto task = TaskConvert(task_ptr);
// DiskResourcePtr->task_table().Put(task)
auto task = TaskConvert(task_ptr);
auto disk_list = ResMgrInst::GetInstance()->GetDiskResources();
if (!disk_list.empty()) {
if (auto disk = disk_list[0].lock()) {
disk->task_table().Put(task);
}
}
#else
//execute task
ScheduleTaskPtr next_task = task_ptr->Execute();
if(next_task != nullptr) {
task_queue_.Put(next_task);
}
#endif
}
return true;

View File

@ -6,17 +6,33 @@
#include "DeleteContext.h"
namespace zilliz {
namespace milvus {
namespace engine {
DeleteContext::DeleteContext(const std::string& table_id, meta::Meta::Ptr& meta_ptr)
DeleteContext::DeleteContext(const std::string &table_id, meta::Meta::Ptr &meta_ptr, uint64_t num_resource)
: IScheduleContext(ScheduleContextType::kDelete),
table_id_(table_id),
meta_ptr_(meta_ptr) {
meta_ptr_(meta_ptr),
num_resource_(num_resource) {
}
void DeleteContext::WaitAndDelete() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [&] { return done_resource == num_resource_; });
meta_ptr_->DeleteTableFiles(table_id_);
}
void DeleteContext::ResourceDone() {
{
std::lock_guard<std::mutex> lock(mutex_);
++done_resource;
}
cv_.notify_one();
}
}
}
}

View File

@ -7,6 +7,8 @@
#include "IScheduleContext.h"
#include "db/meta/Meta.h"
#include <mutex>
#include <condition_variable>
namespace zilliz {
namespace milvus {
@ -14,14 +16,21 @@ namespace engine {
class DeleteContext : public IScheduleContext {
public:
DeleteContext(const std::string& table_id, meta::Meta::Ptr& meta_ptr);
DeleteContext(const std::string& table_id, meta::Meta::Ptr& meta_ptr, uint64_t num_resource);
std::string table_id() const { return table_id_; }
meta::Meta::Ptr meta() const { return meta_ptr_; }
void WaitAndDelete();
void ResourceDone();
private:
std::string table_id_;
meta::Meta::Ptr meta_ptr_;
uint64_t num_resource_;
uint64_t done_resource = 0;
std::mutex mutex_;
std::condition_variable cv_;
};
using DeleteContextPtr = std::shared_ptr<DeleteContext>;

View File

@ -18,7 +18,7 @@ public:
virtual std::shared_ptr<IScheduleTask> Execute() override;
private:
public:
DeleteContextPtr context_;
};

View File

@ -95,6 +95,13 @@ ResourceMgr::Stop() {
}
}
void
ResourceMgr::Clear() {
std::lock_guard<std::mutex> lck(resources_mutex_);
disk_resources_.clear();
resources_.clear();
}
void
ResourceMgr::PostEvent(const EventPtr &event) {
std::lock_guard<std::mutex> lock(event_mutex_);

View File

@ -68,6 +68,9 @@ public:
void
Stop();
void
Clear();
void
PostEvent(const EventPtr &event);

View File

@ -5,7 +5,8 @@
******************************************************************************/
#include "SchedInst.h"
#include "server/ServerConfig.h"
#include "ResourceFactory.h"
namespace zilliz {
namespace milvus {
@ -17,6 +18,40 @@ std::mutex ResMgrInst::mutex_;
SchedulerPtr SchedInst::instance = nullptr;
std::mutex SchedInst::mutex_;
void
SchedServInit() {
server::ConfigNode &config = server::ServerConfig::GetInstance().GetConfig(server::CONFIG_RESOURCE);
auto resources = config.GetChild(server::CONFIG_RESOURCES).GetChildren();
for (auto &resource : resources) {
auto &resname = resource.first;
auto &resconf = resource.second;
auto type = resconf.GetValue(server::CONFIG_RESOURCE_TYPE);
// auto memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_MEMORY);
auto device_id = resconf.GetInt64Value(server::CONFIG_RESOURCE_DEVICE_ID);
auto enable_loader = resconf.GetBoolValue(server::CONFIG_RESOURCE_ENABLE_LOADER);
auto enable_executor = resconf.GetBoolValue(server::CONFIG_RESOURCE_ENABLE_EXECUTOR);
ResMgrInst::GetInstance()->Add(ResourceFactory::Create(resname,
type,
device_id,
enable_loader,
enable_executor));
}
auto default_connection = Connection("default_connection", 500.0);
auto connections = config.GetSequence(server::CONFIG_RESOURCE_CONNECTIONS);
for (auto &conn : connections) {
std::string delimiter = "===";
std::string left = conn.substr(0, conn.find(delimiter));
std::string right = conn.substr(conn.find(delimiter) + 3, conn.length());
ResMgrInst::GetInstance()->Connect(left, right, default_connection);
}
ResMgrInst::GetInstance()->Start();
SchedInst::GetInstance()->Start();
}
}
}
}

View File

@ -52,6 +52,9 @@ private:
static std::mutex mutex_;
};
void
SchedServInit();
}
}
}

View File

@ -84,7 +84,9 @@ CollectFileMetrics(int file_type, size_t file_size) {
XSearchTask::XSearchTask(TableFileSchemaPtr file) : file_(file) {
index_engine_ = EngineFactory::Build(file_->dimension_,
file_->location_,
(EngineType) file_->engine_type_);
(EngineType) file_->engine_type_,
(MetricType)file_->metric_type_,
file_->nlist_);
}
void

View File

@ -24,6 +24,7 @@
//#include <numaif.h>
#include <unistd.h>
#include <string.h>
#include <src/scheduler/SchedInst.h>
#include "metrics/Metrics.h"
@ -163,6 +164,7 @@ Server::Start() {
signal(SIGTERM, SignalUtil::HandleSignal);
server::Metrics::GetInstance().Init();
server::SystemInfo::GetInstance().Init();
engine::SchedServInit();
std::cout << "Milvus server start successfully." << std::endl;
StartService();

View File

@ -49,6 +49,16 @@ static const std::string CONFIG_ENGINE = "engine_config";
static const std::string CONFIG_DCBT = "use_blas_threshold";
static const std::string CONFIG_OMP_THREAD_NUM = "omp_thread_num";
static const char* CONFIG_RESOURCE = "resource_config";
static const char* CONFIG_RESOURCES = "resources";
static const char* CONFIG_RESOURCE_TYPE = "type";
static const char* CONFIG_RESOURCE_MEMORY = "memory";
static const char* CONFIG_RESOURCE_DEVICE_ID = "device_id";
static const char* CONFIG_RESOURCE_ENABLE_LOADER = "enable_loader";
static const char* CONFIG_RESOURCE_ENABLE_EXECUTOR = "enable_executor";
static const char* CONFIG_RESOURCE_CONNECTIONS = "connections";
class ServerConfig {
public:
static ServerConfig &GetInstance();

View File

@ -10,6 +10,12 @@ aux_source_directory(${MILVUS_ENGINE_SRC}/db/meta db_meta_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/config config_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/cache cache_srcs)
aux_source_directory(${MILVUS_ENGINE_SRC}/wrapper/knowhere knowhere_src)
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/action scheduler_action_srcs)
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/event scheduler_event_srcs)
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/resource scheduler_resource_srcs)
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/task scheduler_task_srcs)
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler scheduler_srcs)
aux_source_directory(./ test_srcs)
set(util_files
@ -40,6 +46,11 @@ set(db_test_src
${db_meta_files}
${db_scheduler_srcs}
${wrapper_src}
${scheduler_action_srcs}
${scheduler_event_srcs}
${scheduler_resource_srcs}
${scheduler_task_srcs}
${scheduler_srcs}
${knowhere_src}
${util_files}
${require_files}

View File

@ -100,7 +100,7 @@ TEST(DBSchedulerTest, DELETE_SCHEDULER_TEST) {
}
engine::meta::Meta::Ptr meta_ptr;
engine::DeleteContextPtr context_ptr = std::make_shared<engine::DeleteContext>(table_id, meta_ptr);
engine::DeleteContextPtr context_ptr = std::make_shared<engine::DeleteContext>(table_id, meta_ptr, 0);
ret = engine::TaskDispatchStrategy::Schedule(context_ptr, task_list);
ASSERT_TRUE(ret);
ASSERT_EQ(task_list.size(), 21);
@ -115,7 +115,7 @@ TEST(DBSchedulerTest, DELETE_SCHEDULER_TEST) {
}
}
context_ptr = std::make_shared<engine::DeleteContext>("no_task_table", meta_ptr);
context_ptr = std::make_shared<engine::DeleteContext>("no_task_table", meta_ptr, 0);
ret = engine::TaskDispatchStrategy::Schedule(context_ptr, task_list);
ASSERT_TRUE(ret);
ASSERT_EQ(task_list.size(), 22);

View File

@ -59,12 +59,27 @@ engine::Options DBTest::GetOptions() {
void DBTest::SetUp() {
InitLog();
auto res_mgr = engine::ResMgrInst::GetInstance();
res_mgr->Clear();
res_mgr->Add(engine::ResourceFactory::Create("disk", "DISK", 0, true, false));
res_mgr->Add(engine::ResourceFactory::Create("cpu", "CPU", 0, true, true));
auto default_conn = engine::Connection("IO", 500.0);
res_mgr->Connect("disk", "cpu", default_conn);
res_mgr->Start();
engine::SchedInst::GetInstance()->Start();
auto options = GetOptions();
db_ = engine::DBFactory::Build(options);
}
void DBTest::TearDown() {
delete db_;
engine::ResMgrInst::GetInstance()->Stop();
engine::SchedInst::GetInstance()->Stop();
boost::filesystem::remove_all("/tmp/milvus_test");
}
@ -117,6 +132,21 @@ void NewMemManagerTest::InitLog() {
void NewMemManagerTest::SetUp() {
InitLog();
auto res_mgr = engine::ResMgrInst::GetInstance();
res_mgr->Clear();
res_mgr->Add(engine::ResourceFactory::Create("disk", "DISK", 0, true, false));
res_mgr->Add(engine::ResourceFactory::Create("cpu", "CPU", 0, true, true));
auto default_conn = engine::Connection("IO", 500.0);
res_mgr->Connect("disk", "cpu", default_conn);
res_mgr->Start();
engine::SchedInst::GetInstance()->Start();
}
void NewMemManagerTest::TearDown() {
engine::ResMgrInst::GetInstance()->Stop();
engine::SchedInst::GetInstance()->Stop();
}
int main(int argc, char **argv) {

View File

@ -13,6 +13,8 @@
#include "db/DB.h"
#include "db/meta/SqliteMetaImpl.h"
#include "db/meta/MySQLMetaImpl.h"
#include "scheduler/SchedInst.h"
#include "scheduler/ResourceFactory.h"
#define TIMING
@ -91,4 +93,5 @@ class DISABLED_MySQLDBTest : public ::testing::Test {
class NewMemManagerTest : public ::testing::Test {
void InitLog();
void SetUp() override;
void TearDown() override;
};