diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 3c81868e32..dab772d02f 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -104,6 +104,9 @@ Please mark all change in change log and use the ticket from JIRA. - MS-519 - Add event_test in scheduler - MS-520 - Update resource_test in scheduler - MS-524 - Add some unittest in event_test and resource_test +- MS-525 - Disable parallel reduce in SearchTask +- MS-527 - Update scheduler_test and enable it +- MS-528 - Hide some config used future ## New Feature - MS-343 - Implement ResourceMgr diff --git a/cpp/conf/server_config.template b/cpp/conf/server_config.template index 8d439b08ef..aa65ef5cb5 100644 --- a/cpp/conf/server_config.template +++ b/cpp/conf/server_config.template @@ -48,51 +48,38 @@ resource_config: # example: # resource_name: # resource name, just using in connections below # type: DISK # resource type, optional: DISK/CPU/GPU - # memory: 256 # memory size, unit: GB # device_id: 0 - # enable_loader: true # if is enable loader, optional: true, false # enable_executor: false # if is enable executor, optional: true, false resources: ssda: type: DISK - memory: 2048 device_id: 0 - enable_loader: true enable_executor: false cpu: type: CPU - memory: 64 device_id: 0 - enable_loader: true enable_executor: false gpu0: type: GPU - memory: 6 device_id: 0 - enable_loader: true enable_executor: true gpu_resource_num: 2 pinned_memory: 300 temp_memory: 300 -# gtx1660: -# type: GPU -# memory: 6 -# device_id: 1 -# enable_loader: true -# enable_executor: true - # connection list, length: 0~N - # format: -${resource_name}===${resource_name} + # example: + # connection_name: + # speed: 100 # unit: MS/s + # endpoint: ${resource_name}===${resource_name} connections: io: speed: 500 endpoint: ssda===cpu - pcie: + pcie0: speed: 11000 endpoint: cpu===gpu0 -# - cpu===gtx1660 diff --git a/cpp/src/scheduler/SchedInst.cpp b/cpp/src/scheduler/SchedInst.cpp index 12be4d2eb8..f9392f3425 100644 --- a/cpp/src/scheduler/SchedInst.cpp +++ b/cpp/src/scheduler/SchedInst.cpp @@ -36,7 +36,8 @@ StartSchedulerService() { auto type = resconf.GetValue(server::CONFIG_RESOURCE_TYPE); // auto memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_MEMORY); auto device_id = resconf.GetInt64Value(server::CONFIG_RESOURCE_DEVICE_ID); - auto enable_loader = resconf.GetBoolValue(server::CONFIG_RESOURCE_ENABLE_LOADER); +// auto enable_loader = resconf.GetBoolValue(server::CONFIG_RESOURCE_ENABLE_LOADER); + auto enable_loader = true; auto enable_executor = resconf.GetBoolValue(server::CONFIG_RESOURCE_ENABLE_EXECUTOR); auto pinned_memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_PIN_MEMORY); auto temp_memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_TEMP_MEMORY); diff --git a/cpp/src/scheduler/task/SearchTask.cpp b/cpp/src/scheduler/task/SearchTask.cpp index aa526517fa..47d5532921 100644 --- a/cpp/src/scheduler/task/SearchTask.cpp +++ b/cpp/src/scheduler/task/SearchTask.cpp @@ -20,47 +20,47 @@ namespace engine { static constexpr size_t PARALLEL_REDUCE_THRESHOLD = 10000; static constexpr size_t PARALLEL_REDUCE_BATCH = 1000; -bool -NeedParallelReduce(uint64_t nq, uint64_t topk) { - server::ServerConfig &config = server::ServerConfig::GetInstance(); - server::ConfigNode &db_config = config.GetConfig(server::CONFIG_DB); - bool need_parallel = db_config.GetBoolValue(server::CONFIG_DB_PARALLEL_REDUCE, false); - if (!need_parallel) { - return false; - } - - return nq * topk >= PARALLEL_REDUCE_THRESHOLD; -} - -void -ParallelReduce(std::function &reduce_function, size_t max_index) { - size_t reduce_batch = PARALLEL_REDUCE_BATCH; - - auto thread_count = std::thread::hardware_concurrency() - 1; //not all core do this work - if (thread_count > 0) { - reduce_batch = max_index / thread_count + 1; - } - ENGINE_LOG_DEBUG << "use " << thread_count << - " thread parallelly do reduce, each thread process " << reduce_batch << " vectors"; - - std::vector > thread_array; - size_t from_index = 0; - while (from_index < max_index) { - size_t to_index = from_index + reduce_batch; - if (to_index > max_index) { - to_index = max_index; - } - - auto reduce_thread = std::make_shared(reduce_function, from_index, to_index); - thread_array.push_back(reduce_thread); - - from_index = to_index; - } - - for (auto &thread_ptr : thread_array) { - thread_ptr->join(); - } -} +//bool +//NeedParallelReduce(uint64_t nq, uint64_t topk) { +// server::ServerConfig &config = server::ServerConfig::GetInstance(); +// server::ConfigNode &db_config = config.GetConfig(server::CONFIG_DB); +// bool need_parallel = db_config.GetBoolValue(server::CONFIG_DB_PARALLEL_REDUCE, false); +// if (!need_parallel) { +// return false; +// } +// +// return nq * topk >= PARALLEL_REDUCE_THRESHOLD; +//} +// +//void +//ParallelReduce(std::function &reduce_function, size_t max_index) { +// size_t reduce_batch = PARALLEL_REDUCE_BATCH; +// +// auto thread_count = std::thread::hardware_concurrency() - 1; //not all core do this work +// if (thread_count > 0) { +// reduce_batch = max_index / thread_count + 1; +// } +// ENGINE_LOG_DEBUG << "use " << thread_count << +// " thread parallelly do reduce, each thread process " << reduce_batch << " vectors"; +// +// std::vector > thread_array; +// size_t from_index = 0; +// while (from_index < max_index) { +// size_t to_index = from_index + reduce_batch; +// if (to_index > max_index) { +// to_index = max_index; +// } +// +// auto reduce_thread = std::make_shared(reduce_function, from_index, to_index); +// thread_array.push_back(reduce_thread); +// +// from_index = to_index; +// } +// +// for (auto &thread_ptr : thread_array) { +// thread_ptr->join(); +// } +//} void CollectFileMetrics(int file_type, size_t file_size) { @@ -238,11 +238,11 @@ Status XSearchTask::ClusterResult(const std::vector &output_ids, } }; - if (NeedParallelReduce(nq, topk)) { - ParallelReduce(reduce_worker, nq); - } else { +// if (NeedParallelReduce(nq, topk)) { +// ParallelReduce(reduce_worker, nq); +// } else { reduce_worker(0, nq); - } +// } return Status::OK(); } @@ -343,11 +343,11 @@ Status XSearchTask::TopkResult(SearchContext::ResultSet &result_src, } }; - if (NeedParallelReduce(result_src.size(), topk)) { - ParallelReduce(ReduceWorker, result_src.size()); - } else { +// if (NeedParallelReduce(result_src.size(), topk)) { +// ParallelReduce(ReduceWorker, result_src.size()); +// } else { ReduceWorker(0, result_src.size()); - } +// } return Status::OK(); } diff --git a/cpp/unittest/scheduler/scheduler_test.cpp b/cpp/unittest/scheduler/scheduler_test.cpp index b8eb9ce9f8..f176311eb1 100644 --- a/cpp/unittest/scheduler/scheduler_test.cpp +++ b/cpp/unittest/scheduler/scheduler_test.cpp @@ -6,6 +6,7 @@ #include "scheduler/Scheduler.h" #include #include +#include #include "cache/DataObj.h" #include "cache/GpuCacheMgr.h" #include "scheduler/task/TestTask.h" @@ -15,233 +16,238 @@ #include "wrapper/knowhere/vec_index.h" #include "scheduler/tasklabel/SpecResLabel.h" + namespace zilliz { namespace milvus { namespace engine { -//class MockVecIndex : public engine::VecIndex { -//public: -// virtual server::KnowhereError BuildAll(const long &nb, -// const float *xb, -// const long *ids, -// const engine::Config &cfg, -// const long &nt = 0, -// const float *xt = nullptr) { -// -// } -// -// engine::VecIndexPtr Clone() override { -// return zilliz::milvus::engine::VecIndexPtr(); -// } -// -// int64_t GetDeviceId() override { -// return 0; -// } -// -// engine::IndexType GetType() override { -// return engine::IndexType::INVALID; -// } -// -// virtual server::KnowhereError Add(const long &nb, -// const float *xb, -// const long *ids, -// const engine::Config &cfg = engine::Config()) { -// -// } -// -// virtual server::KnowhereError Search(const long &nq, -// const float *xq, -// float *dist, -// long *ids, -// const engine::Config &cfg = engine::Config()) { -// -// } -// -// engine::VecIndexPtr CopyToGpu(const int64_t &device_id, const engine::Config &cfg) override { -// -// } -// -// engine::VecIndexPtr CopyToCpu(const engine::Config &cfg) override { -// -// } -// -// virtual int64_t Dimension() { -// return dimension_; -// } -// -// virtual int64_t Count() { -// return ntotal_; -// } -// -// virtual zilliz::knowhere::BinarySet Serialize() { -// zilliz::knowhere::BinarySet binset; -// return binset; -// } -// -// virtual server::KnowhereError Load(const zilliz::knowhere::BinarySet &index_binary) { -// -// } -// -//public: -// int64_t dimension_ = 512; -// int64_t ntotal_ = 0; -//}; -// -// -//class SchedulerTest : public testing::Test { -//protected: -// void -// SetUp() override { -// ResourcePtr cpu = ResourceFactory::Create("cpu", "CPU", 0, true, false); -// ResourcePtr gpu_0 = ResourceFactory::Create("gpu0", "GPU", 0); -// ResourcePtr gpu_1 = ResourceFactory::Create("gpu1", "GPU", 1); -// -// res_mgr_ = std::make_shared(); -// cpu_resource_ = res_mgr_->Add(std::move(cpu)); -// gpu_resource_0_ = res_mgr_->Add(std::move(gpu_0)); -// gpu_resource_1_ = res_mgr_->Add(std::move(gpu_1)); -// -// auto PCIE = Connection("IO", 11000.0); -// res_mgr_->Connect("cpu", "gpu0", PCIE); -// res_mgr_->Connect("cpu", "gpu1", PCIE); -// -// scheduler_ = std::make_shared(res_mgr_); -// -// res_mgr_->Start(); -// scheduler_->Start(); -// } -// -// void -// TearDown() override { -// scheduler_->Stop(); -// res_mgr_->Stop(); -// } -// -// ResourceWPtr cpu_resource_; -// ResourceWPtr gpu_resource_0_; -// ResourceWPtr gpu_resource_1_; -// -// ResourceMgrPtr res_mgr_; -// std::shared_ptr scheduler_; -//}; -// -//void -//insert_dummy_index_into_gpu_cache(uint64_t device_id) { -// MockVecIndex* mock_index = new MockVecIndex(); -// mock_index->ntotal_ = 1000; -// engine::VecIndexPtr index(mock_index); -// -// cache::DataObjPtr obj = std::make_shared(index); -// -// cache::GpuCacheMgr::GetInstance(device_id)->InsertItem("location",obj); -//} -// -//TEST_F(SchedulerTest, OnCopyCompleted) { -// const uint64_t NUM = 10; -// std::vector> tasks; -// TableFileSchemaPtr dummy = std::make_shared(); -// dummy->location_ = "location"; -// -// insert_dummy_index_into_gpu_cache(1); -// -// for (uint64_t i = 0; i < NUM; ++i) { -// auto task = std::make_shared(dummy); -// task->label() = std::make_shared(); -// tasks.push_back(task); -// cpu_resource_.lock()->task_table().Put(task); -// } -// -// sleep(3); +class MockVecIndex : public engine::VecIndex { +public: + virtual ErrorCode BuildAll(const long &nb, + const float *xb, + const long *ids, + const engine::Config &cfg, + const long &nt = 0, + const float *xt = nullptr) { + + } + + engine::VecIndexPtr Clone() override { + return zilliz::milvus::engine::VecIndexPtr(); + } + + int64_t GetDeviceId() override { + return 0; + } + + engine::IndexType GetType() override { + return engine::IndexType::INVALID; + } + + virtual ErrorCode Add(const long &nb, + const float *xb, + const long *ids, + const engine::Config &cfg = engine::Config()) { + + } + + virtual ErrorCode Search(const long &nq, + const float *xq, + float *dist, + long *ids, + const engine::Config &cfg = engine::Config()) { + + } + + engine::VecIndexPtr CopyToGpu(const int64_t &device_id, const engine::Config &cfg) override { + + } + + engine::VecIndexPtr CopyToCpu(const engine::Config &cfg) override { + + } + + virtual int64_t Dimension() { + return dimension_; + } + + virtual int64_t Count() { + return ntotal_; + } + + virtual zilliz::knowhere::BinarySet Serialize() { + zilliz::knowhere::BinarySet binset; + return binset; + } + + virtual ErrorCode Load(const zilliz::knowhere::BinarySet &index_binary) { + + } + +public: + int64_t dimension_ = 512; + int64_t ntotal_ = 0; +}; + + +class SchedulerTest : public testing::Test { +protected: + void + SetUp() override { + server::ConfigNode& config = server::ServerConfig::GetInstance().GetConfig(server::CONFIG_CACHE); + config.AddSequenceItem(server::CONFIG_GPU_IDS, "0"); + config.AddSequenceItem(server::CONFIG_GPU_IDS, "1"); + + ResourcePtr cpu = ResourceFactory::Create("cpu", "CPU", 0, true, false); + ResourcePtr gpu_0 = ResourceFactory::Create("gpu0", "GPU", 0); + ResourcePtr gpu_1 = ResourceFactory::Create("gpu1", "GPU", 1); + + res_mgr_ = std::make_shared(); + cpu_resource_ = res_mgr_->Add(std::move(cpu)); + gpu_resource_0_ = res_mgr_->Add(std::move(gpu_0)); + gpu_resource_1_ = res_mgr_->Add(std::move(gpu_1)); + + auto PCIE = Connection("IO", 11000.0); + res_mgr_->Connect("cpu", "gpu0", PCIE); + res_mgr_->Connect("cpu", "gpu1", PCIE); + + scheduler_ = std::make_shared(res_mgr_); + + res_mgr_->Start(); + scheduler_->Start(); + } + + void + TearDown() override { + scheduler_->Stop(); + res_mgr_->Stop(); + } + + ResourceWPtr cpu_resource_; + ResourceWPtr gpu_resource_0_; + ResourceWPtr gpu_resource_1_; + + ResourceMgrPtr res_mgr_; + std::shared_ptr scheduler_; +}; + +void +insert_dummy_index_into_gpu_cache(uint64_t device_id) { + MockVecIndex *mock_index = new MockVecIndex(); + mock_index->ntotal_ = 1000; + engine::VecIndexPtr index(mock_index); + + cache::DataObjPtr obj = std::make_shared(index); + + cache::GpuCacheMgr::GetInstance(device_id)->InsertItem("location", obj); +} + +TEST_F(SchedulerTest, OnLoadCompleted) { + const uint64_t NUM = 10; + std::vector> tasks; + TableFileSchemaPtr dummy = std::make_shared(); + dummy->location_ = "location"; + + insert_dummy_index_into_gpu_cache(1); + + for (uint64_t i = 0; i < NUM; ++i) { + auto task = std::make_shared(dummy); + task->label() = std::make_shared(); + tasks.push_back(task); + cpu_resource_.lock()->task_table().Put(task); + } + + sleep(3); + ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM); + +} + +TEST_F(SchedulerTest, PushTaskToNeighbourRandomlyTest) { + const uint64_t NUM = 10; + std::vector> tasks; + TableFileSchemaPtr dummy1 = std::make_shared(); + dummy1->location_ = "location"; + + tasks.clear(); + + for (uint64_t i = 0; i < NUM; ++i) { + auto task = std::make_shared(dummy1); + task->label() = std::make_shared(); + tasks.push_back(task); + cpu_resource_.lock()->task_table().Put(task); + } + + sleep(3); // ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM); -// -//} -// -//TEST_F(SchedulerTest, PushTaskToNeighbourRandomlyTest) { -// const uint64_t NUM = 10; -// std::vector> tasks; -// TableFileSchemaPtr dummy1 = std::make_shared(); -// dummy1->location_ = "location"; -// -// tasks.clear(); -// -// for (uint64_t i = 0; i < NUM; ++i) { -// auto task = std::make_shared(dummy1); -// task->label() = std::make_shared(); -// tasks.push_back(task); -// cpu_resource_.lock()->task_table().Put(task); -// } -// -// sleep(3); -//// ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM); -//} -// -//class SchedulerTest2 : public testing::Test { -// protected: -// void -// SetUp() override { -// ResourcePtr disk = ResourceFactory::Create("disk", "DISK", 0, true, false); -// ResourcePtr cpu0 = ResourceFactory::Create("cpu0", "CPU", 0, true, false); -// ResourcePtr cpu1 = ResourceFactory::Create("cpu1", "CPU", 1, true, false); -// ResourcePtr cpu2 = ResourceFactory::Create("cpu2", "CPU", 2, true, false); -// ResourcePtr gpu0 = ResourceFactory::Create("gpu0", "GPU", 0, true, true); -// ResourcePtr gpu1 = ResourceFactory::Create("gpu1", "GPU", 1, true, true); -// -// res_mgr_ = std::make_shared(); -// disk_ = res_mgr_->Add(std::move(disk)); -// cpu_0_ = res_mgr_->Add(std::move(cpu0)); -// cpu_1_ = res_mgr_->Add(std::move(cpu1)); -// cpu_2_ = res_mgr_->Add(std::move(cpu2)); -// gpu_0_ = res_mgr_->Add(std::move(gpu0)); -// gpu_1_ = res_mgr_->Add(std::move(gpu1)); -// auto IO = Connection("IO", 5.0); -// auto PCIE1 = Connection("PCIE", 11.0); -// auto PCIE2 = Connection("PCIE", 20.0); -// res_mgr_->Connect("disk", "cpu0", IO); -// res_mgr_->Connect("cpu0", "cpu1", IO); -// res_mgr_->Connect("cpu1", "cpu2", IO); -// res_mgr_->Connect("cpu0", "cpu2", IO); -// res_mgr_->Connect("cpu1", "gpu0", PCIE1); -// res_mgr_->Connect("cpu2", "gpu1", PCIE2); -// -// scheduler_ = std::make_shared(res_mgr_); -// -// res_mgr_->Start(); -// scheduler_->Start(); -// } -// -// void -// TearDown() override { -// scheduler_->Stop(); -// res_mgr_->Stop(); -// } -// -// ResourceWPtr disk_; -// ResourceWPtr cpu_0_; -// ResourceWPtr cpu_1_; -// ResourceWPtr cpu_2_; -// ResourceWPtr gpu_0_; -// ResourceWPtr gpu_1_; -// ResourceMgrPtr res_mgr_; -// -// std::shared_ptr scheduler_; -//}; -// -// -//TEST_F(SchedulerTest2, SpecifiedResourceTest) { -// const uint64_t NUM = 10; -// std::vector> tasks; -// TableFileSchemaPtr dummy = std::make_shared(); -// dummy->location_ = "location"; -// -// for (uint64_t i = 0; i < NUM; ++i) { -// std::shared_ptr task = std::make_shared(dummy); -// task->label() = std::make_shared(disk_); -// tasks.push_back(task); -// disk_.lock()->task_table().Put(task); -// } -// -//// ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM); -//} +} + +class SchedulerTest2 : public testing::Test { +protected: + void + SetUp() override { + ResourcePtr disk = ResourceFactory::Create("disk", "DISK", 0, true, false); + ResourcePtr cpu0 = ResourceFactory::Create("cpu0", "CPU", 0, true, false); + ResourcePtr cpu1 = ResourceFactory::Create("cpu1", "CPU", 1, true, false); + ResourcePtr cpu2 = ResourceFactory::Create("cpu2", "CPU", 2, true, false); + ResourcePtr gpu0 = ResourceFactory::Create("gpu0", "GPU", 0, true, true); + ResourcePtr gpu1 = ResourceFactory::Create("gpu1", "GPU", 1, true, true); + + res_mgr_ = std::make_shared(); + disk_ = res_mgr_->Add(std::move(disk)); + cpu_0_ = res_mgr_->Add(std::move(cpu0)); + cpu_1_ = res_mgr_->Add(std::move(cpu1)); + cpu_2_ = res_mgr_->Add(std::move(cpu2)); + gpu_0_ = res_mgr_->Add(std::move(gpu0)); + gpu_1_ = res_mgr_->Add(std::move(gpu1)); + auto IO = Connection("IO", 5.0); + auto PCIE1 = Connection("PCIE", 11.0); + auto PCIE2 = Connection("PCIE", 20.0); + res_mgr_->Connect("disk", "cpu0", IO); + res_mgr_->Connect("cpu0", "cpu1", IO); + res_mgr_->Connect("cpu1", "cpu2", IO); + res_mgr_->Connect("cpu0", "cpu2", IO); + res_mgr_->Connect("cpu1", "gpu0", PCIE1); + res_mgr_->Connect("cpu2", "gpu1", PCIE2); + + scheduler_ = std::make_shared(res_mgr_); + + res_mgr_->Start(); + scheduler_->Start(); + } + + void + TearDown() override { + scheduler_->Stop(); + res_mgr_->Stop(); + } + + ResourceWPtr disk_; + ResourceWPtr cpu_0_; + ResourceWPtr cpu_1_; + ResourceWPtr cpu_2_; + ResourceWPtr gpu_0_; + ResourceWPtr gpu_1_; + ResourceMgrPtr res_mgr_; + + std::shared_ptr scheduler_; +}; + + +TEST_F(SchedulerTest2, SpecifiedResourceTest) { + const uint64_t NUM = 10; + std::vector> tasks; + TableFileSchemaPtr dummy = std::make_shared(); + dummy->location_ = "location"; + + for (uint64_t i = 0; i < NUM; ++i) { + std::shared_ptr task = std::make_shared(dummy); + task->label() = std::make_shared(disk_); + tasks.push_back(task); + disk_.lock()->task_table().Put(task); + } + +// ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM); +} } }