From e72b0348f76d6525a764d14f25ec54629f449ecf Mon Sep 17 00:00:00 2001 From: wxyu Date: Wed, 18 Sep 2019 19:44:38 +0800 Subject: [PATCH] MS-555 Remove old scheduler Former-commit-id: 8eb7f773a15df78f46bf61a59ec0ea9d8923cc28 --- cpp/CHANGELOG.md | 1 + cpp/src/db/DBImpl.cpp | 73 +++++---- cpp/src/db/DBImpl.h | 1 - cpp/src/db/scheduler/TaskDispatchQueue.cpp | 102 ------------- cpp/src/db/scheduler/TaskDispatchQueue.h | 68 --------- cpp/src/db/scheduler/TaskDispatchStrategy.cpp | 143 ------------------ cpp/src/db/scheduler/TaskDispatchStrategy.h | 36 ----- cpp/src/db/scheduler/TaskScheduler.cpp | 135 ----------------- cpp/src/db/scheduler/TaskScheduler.h | 63 -------- .../db/scheduler/context/DeleteContext.cpp | 52 ------- cpp/src/db/scheduler/context/DeleteContext.h | 52 ------- .../db/scheduler/context/IScheduleContext.h | 50 ------ .../db/scheduler/context/SearchContext.cpp | 70 --------- cpp/src/db/scheduler/context/SearchContext.h | 94 ------------ cpp/src/db/scheduler/task/DeleteTask.cpp | 37 ----- cpp/src/db/scheduler/task/DeleteTask.h | 41 ----- cpp/src/db/scheduler/task/IScheduleTask.h | 53 ------- cpp/src/db/scheduler/task/IndexLoadTask.cpp | 41 ----- cpp/src/db/scheduler/task/IndexLoadTask.h | 42 ----- cpp/src/db/scheduler/task/SearchTask.cpp | 39 ----- cpp/src/db/scheduler/task/SearchTask.h | 46 ------ cpp/src/scheduler/JobMgr.h | 2 + cpp/src/scheduler/SchedInst.cpp | 7 +- cpp/src/scheduler/SchedInst.h | 19 +++ cpp/src/scheduler/TaskCreator.cpp | 10 +- cpp/src/scheduler/job/Job.h | 1 + cpp/src/scheduler/job/SearchJob.cpp | 12 +- cpp/src/scheduler/job/SearchJob.h | 4 + cpp/src/scheduler/task/DeleteTask.cpp | 6 +- cpp/src/scheduler/task/DeleteTask.h | 6 +- cpp/src/scheduler/task/SearchTask.cpp | 69 +++++---- cpp/src/scheduler/task/SearchTask.h | 16 +- cpp/src/scheduler/task/Task.h | 7 +- cpp/src/scheduler/task/TaskConvert.cpp | 53 ------- cpp/src/scheduler/task/TaskConvert.h | 35 ----- cpp/src/scheduler/task/TestTask.cpp | 2 +- cpp/src/scheduler/task/TestTask.h | 3 +- cpp/unittest/db/CMakeLists.txt | 10 +- cpp/unittest/db/scheduler_test.cpp | 137 ----------------- cpp/unittest/db/search_test.cpp | 39 +++-- cpp/unittest/db/utils.cpp | 7 +- cpp/unittest/scheduler/CMakeLists.txt | 2 + cpp/unittest/scheduler/normal_test.cpp | 2 +- cpp/unittest/scheduler/resource_mgr_test.cpp | 2 +- cpp/unittest/scheduler/resource_test.cpp | 8 +- cpp/unittest/scheduler/scheduler_test.cpp | 6 +- cpp/unittest/scheduler/tasktable_test.cpp | 4 +- 47 files changed, 184 insertions(+), 1524 deletions(-) delete mode 100644 cpp/src/db/scheduler/TaskDispatchQueue.cpp delete mode 100644 cpp/src/db/scheduler/TaskDispatchQueue.h delete mode 100644 cpp/src/db/scheduler/TaskDispatchStrategy.cpp delete mode 100644 cpp/src/db/scheduler/TaskDispatchStrategy.h delete mode 100644 cpp/src/db/scheduler/TaskScheduler.cpp delete mode 100644 cpp/src/db/scheduler/TaskScheduler.h delete mode 100644 cpp/src/db/scheduler/context/DeleteContext.cpp delete mode 100644 cpp/src/db/scheduler/context/DeleteContext.h delete mode 100644 cpp/src/db/scheduler/context/IScheduleContext.h delete mode 100644 cpp/src/db/scheduler/context/SearchContext.cpp delete mode 100644 cpp/src/db/scheduler/context/SearchContext.h delete mode 100644 cpp/src/db/scheduler/task/DeleteTask.cpp delete mode 100644 cpp/src/db/scheduler/task/DeleteTask.h delete mode 100644 cpp/src/db/scheduler/task/IScheduleTask.h delete mode 100644 cpp/src/db/scheduler/task/IndexLoadTask.cpp delete mode 100644 cpp/src/db/scheduler/task/IndexLoadTask.h delete mode 100644 cpp/src/db/scheduler/task/SearchTask.cpp delete mode 100644 cpp/src/db/scheduler/task/SearchTask.h delete mode 100644 cpp/src/scheduler/task/TaskConvert.cpp delete mode 100644 cpp/src/scheduler/task/TaskConvert.h delete mode 100644 cpp/unittest/db/scheduler_test.cpp diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index c5d27e3461..76ef4a98a7 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -13,6 +13,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-556 - Add Job Definition in Scheduler - MS-558 - Refine status code - MS-562 - Add JobMgr and TaskCreator in Scheduler +- MS-555 - Remove old scheduler ## New Feature diff --git a/cpp/src/db/DBImpl.cpp b/cpp/src/db/DBImpl.cpp index 648038de25..51bad594b2 100644 --- a/cpp/src/db/DBImpl.cpp +++ b/cpp/src/db/DBImpl.cpp @@ -24,8 +24,8 @@ #include "meta/MetaFactory.h" #include "meta/MetaConsts.h" #include "metrics/Metrics.h" -#include "scheduler/TaskScheduler.h" -#include "scheduler/context/DeleteContext.h" +#include "scheduler/job/SearchJob.h" +#include "scheduler/job/DeleteJob.h" #include "scheduler/SchedInst.h" #include "utils/TimeRecorder.h" #include "utils/Log.h" @@ -133,12 +133,10 @@ Status DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& date meta_ptr_->DeleteTable(table_id); //soft delete table //scheduler will determine when to delete table files - TaskScheduler& scheduler = TaskScheduler::GetInstance(); - DeleteContextPtr context = std::make_shared(table_id, - meta_ptr_, - ResMgrInst::GetInstance()->GetNumOfComputeResource()); - scheduler.Schedule(context); - context->WaitAndDelete(); + auto nres = ResMgrInst::GetInstance()->GetNumOfComputeResource(); + scheduler::DeleteJobPtr job = std::make_shared(0, table_id, meta_ptr_, nres); + JobMgrInst::GetInstance()->Put(job); + job->WaitAndDelete(); } else { meta_ptr_->DropPartitionsByDates(table_id, dates); } @@ -418,51 +416,50 @@ Status DBImpl::Size(uint64_t& result) { Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files, uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors, const meta::DatesT& dates, QueryResults& results) { + using namespace scheduler; server::CollectQueryMetrics metrics(nq); TimeRecorder rc(""); //step 1: get files to search ENGINE_LOG_DEBUG << "Engine query begin, index file count: " << files.size() << " date range count: " << dates.size(); - SearchContextPtr context = std::make_shared(k, nq, nprobe, vectors); - for (auto &file : files) { + SearchJobPtr job = std::make_shared(0, k, nq, nprobe, vectors); + for (auto &file : files) { TableFileSchemaPtr file_ptr = std::make_shared(file); - context->AddIndexFile(file_ptr); + job->AddIndexFile(file_ptr); } //step 2: put search task to scheduler - TaskScheduler& scheduler = TaskScheduler::GetInstance(); - scheduler.Schedule(context); - - context->WaitResult(); - if (!context->GetStatus().ok()) { - return context->GetStatus(); + JobMgrInst::GetInstance()->Put(job); + job->WaitResult(); + if (!job->GetStatus().ok()) { + return job->GetStatus(); } //step 3: print time cost information - double load_cost = context->LoadCost(); - double search_cost = context->SearchCost(); - double reduce_cost = context->ReduceCost(); - std::string load_info = TimeRecorder::GetTimeSpanStr(load_cost); - std::string search_info = TimeRecorder::GetTimeSpanStr(search_cost); - std::string reduce_info = TimeRecorder::GetTimeSpanStr(reduce_cost); - if(search_cost > 0.0 || reduce_cost > 0.0) { - double total_cost = load_cost + search_cost + reduce_cost; - double load_percent = load_cost/total_cost; - double search_percent = search_cost/total_cost; - double reduce_percent = reduce_cost/total_cost; - - ENGINE_LOG_DEBUG << "Engine load index totally cost: " << load_info << " percent: " << load_percent*100 << "%"; - ENGINE_LOG_DEBUG << "Engine search index totally cost: " << search_info << " percent: " << search_percent*100 << "%"; - ENGINE_LOG_DEBUG << "Engine reduce topk totally cost: " << reduce_info << " percent: " << reduce_percent*100 << "%"; - } else { - ENGINE_LOG_DEBUG << "Engine load cost: " << load_info - << " search cost: " << search_info - << " reduce cost: " << reduce_info; - } +// double load_cost = context->LoadCost(); +// double search_cost = context->SearchCost(); +// double reduce_cost = context->ReduceCost(); +// std::string load_info = TimeRecorder::GetTimeSpanStr(load_cost); +// std::string search_info = TimeRecorder::GetTimeSpanStr(search_cost); +// std::string reduce_info = TimeRecorder::GetTimeSpanStr(reduce_cost); +// if(search_cost > 0.0 || reduce_cost > 0.0) { +// double total_cost = load_cost + search_cost + reduce_cost; +// double load_percent = load_cost/total_cost; +// double search_percent = search_cost/total_cost; +// double reduce_percent = reduce_cost/total_cost; +// +// ENGINE_LOG_DEBUG << "Engine load index totally cost: " << load_info << " percent: " << load_percent*100 << "%"; +// ENGINE_LOG_DEBUG << "Engine search index totally cost: " << search_info << " percent: " << search_percent*100 << "%"; +// ENGINE_LOG_DEBUG << "Engine reduce topk totally cost: " << reduce_info << " percent: " << reduce_percent*100 << "%"; +// } else { +// ENGINE_LOG_DEBUG << "Engine load cost: " << load_info +// << " search cost: " << search_info +// << " reduce cost: " << reduce_info; +// } //step 4: construct results - results = context->GetResult(); + results = job->GetResult(); rc.ElapseFromBegin("Engine query totally cost"); return Status::OK(); diff --git a/cpp/src/db/DBImpl.h b/cpp/src/db/DBImpl.h index 85803b763f..6062c5e2db 100644 --- a/cpp/src/db/DBImpl.h +++ b/cpp/src/db/DBImpl.h @@ -29,7 +29,6 @@ #include #include #include -#include "scheduler/context/SearchContext.h" namespace zilliz { diff --git a/cpp/src/db/scheduler/TaskDispatchQueue.cpp b/cpp/src/db/scheduler/TaskDispatchQueue.cpp deleted file mode 100644 index 41a95060a8..0000000000 --- a/cpp/src/db/scheduler/TaskDispatchQueue.cpp +++ /dev/null @@ -1,102 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - - -#include "TaskDispatchQueue.h" -#include "TaskDispatchStrategy.h" -#include "utils/Error.h" -#include "utils/Log.h" - -namespace zilliz { -namespace milvus { -namespace engine { - -void -TaskDispatchQueue::Put(const ScheduleContextPtr &context) { - std::unique_lock lock(mtx); - full_.wait(lock, [this] { return (queue_.size() < capacity_); }); - - if(context == nullptr) { - queue_.push_front(nullptr); - empty_.notify_all(); - return; - } - - TaskDispatchStrategy::Schedule(context, queue_); - - empty_.notify_all(); -} - -ScheduleTaskPtr -TaskDispatchQueue::Take() { - std::unique_lock lock(mtx); - empty_.wait(lock, [this] { return !queue_.empty(); }); - - ScheduleTaskPtr front(queue_.front()); - queue_.pop_front(); - full_.notify_all(); - return front; -} - -size_t -TaskDispatchQueue::Size() { - std::lock_guard lock(mtx); - return queue_.size(); -} - -ScheduleTaskPtr -TaskDispatchQueue::Front() { - std::unique_lock lock(mtx); - empty_.wait(lock, [this] { return !queue_.empty(); }); - if (queue_.empty()) { - std::string error_msg = "blocking queue empty"; - SERVER_LOG_ERROR << error_msg; - throw server::ServerException(SERVER_BLOCKING_QUEUE_EMPTY, error_msg); - } - ScheduleTaskPtr front(queue_.front()); - return front; -} - -ScheduleTaskPtr -TaskDispatchQueue::Back() { - std::unique_lock lock(mtx); - empty_.wait(lock, [this] { return !queue_.empty(); }); - - if (queue_.empty()) { - std::string error_msg = "blocking queue empty"; - SERVER_LOG_ERROR << error_msg; - throw server::ServerException(SERVER_BLOCKING_QUEUE_EMPTY, error_msg); - } - - ScheduleTaskPtr back(queue_.back()); - return back; -} - -bool -TaskDispatchQueue::Empty() { - std::unique_lock lock(mtx); - return queue_.empty(); -} - -void -TaskDispatchQueue::SetCapacity(const size_t capacity) { - capacity_ = (capacity > 0 ? capacity : capacity_); -} - -} -} -} \ No newline at end of file diff --git a/cpp/src/db/scheduler/TaskDispatchQueue.h b/cpp/src/db/scheduler/TaskDispatchQueue.h deleted file mode 100644 index b8a0dd611e..0000000000 --- a/cpp/src/db/scheduler/TaskDispatchQueue.h +++ /dev/null @@ -1,68 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include "context/IScheduleContext.h" -#include "task/IScheduleTask.h" - -#include -#include -#include -#include - - -namespace zilliz { -namespace milvus { -namespace engine { - -class TaskDispatchQueue { -public: - TaskDispatchQueue() : mtx(), full_(), empty_() {} - - TaskDispatchQueue(const TaskDispatchQueue &rhs) = delete; - - TaskDispatchQueue &operator=(const TaskDispatchQueue &rhs) = delete; - - using TaskList = std::list; - - void Put(const ScheduleContextPtr &context); - - ScheduleTaskPtr Take(); - - ScheduleTaskPtr Front(); - - ScheduleTaskPtr Back(); - - size_t Size(); - - bool Empty(); - - void SetCapacity(const size_t capacity); - -private: - mutable std::mutex mtx; - std::condition_variable full_; - std::condition_variable empty_; - - TaskList queue_; - size_t capacity_ = 1000000; -}; - -} -} -} diff --git a/cpp/src/db/scheduler/TaskDispatchStrategy.cpp b/cpp/src/db/scheduler/TaskDispatchStrategy.cpp deleted file mode 100644 index 6ce9e582de..0000000000 --- a/cpp/src/db/scheduler/TaskDispatchStrategy.cpp +++ /dev/null @@ -1,143 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "TaskDispatchStrategy.h" -#include "context/SearchContext.h" -#include "context/DeleteContext.h" -#include "task/IndexLoadTask.h" -#include "task/DeleteTask.h" -#include "cache/CpuCacheMgr.h" -#include "utils/Error.h" -#include "utils/Log.h" - -namespace zilliz { -namespace milvus { -namespace engine { - -class ReuseCacheIndexStrategy { -public: - bool Schedule(const SearchContextPtr &context, std::list& task_list) { - if(context == nullptr) { - ENGINE_LOG_ERROR << "Task Dispatch context doesn't exist"; - return false; - } - - SearchContext::Id2IndexMap index_files = context->GetIndexMap(); - //some index loader alread exists - for(auto& task : task_list) { - if(task->type() != ScheduleTaskType::kIndexLoad) { - continue; - } - - IndexLoadTaskPtr loader = std::static_pointer_cast(task); - if(index_files.find(loader->file_->id_) != index_files.end()){ - ENGINE_LOG_DEBUG << "Append SearchContext to exist IndexLoaderContext"; - index_files.erase(loader->file_->id_); - loader->search_contexts_.push_back(context); - } - } - - //index_files still contains some index files, create new loader - for(auto& pair : index_files) { - ENGINE_LOG_DEBUG << "Create new IndexLoaderContext for: " << pair.second->location_; - IndexLoadTaskPtr new_loader = std::make_shared(); - new_loader->search_contexts_.push_back(context); - new_loader->file_ = pair.second; - - auto index = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(pair.second->location_); - if(index != nullptr) { - //if the index file has been in memory, increase its priority - task_list.push_front(new_loader); - } else { - //index file not in memory, put it to tail - task_list.push_back(new_loader); - } - } - - return true; - } -}; - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -class DeleteTableStrategy { -public: - bool Schedule(const DeleteContextPtr &context, std::list &task_list) { - if (context == nullptr) { - ENGINE_LOG_ERROR << "Task Dispatch context doesn't exist"; - return false; - } - - DeleteTaskPtr delete_task = std::make_shared(context); - if(task_list.empty()) { - task_list.push_back(delete_task); - return true; - } - - std::string table_id = context->table_id(); - - //put delete task to proper position - //for example: task_list has 10 IndexLoadTask, only the No.5 IndexLoadTask is for table1 - //if user want to delete table1, the DeleteTask will be insert into No.6 position - for(std::list::reverse_iterator iter = task_list.rbegin(); iter != task_list.rend(); ++iter) { - if((*iter)->type() != ScheduleTaskType::kIndexLoad) { - continue; - } - - IndexLoadTaskPtr loader = std::static_pointer_cast(*iter); - if(loader->file_->table_id_ != table_id) { - continue; - } - - task_list.insert(iter.base(), delete_task); - return true; - } - - //no task is searching this table, put DeleteTask to front of list so that the table will be delete asap - task_list.push_front(delete_task); - return true; - } -}; - - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -bool TaskDispatchStrategy::Schedule(const ScheduleContextPtr &context_ptr, - std::list &task_list) { - if(context_ptr == nullptr) { - ENGINE_LOG_ERROR << "Task Dispatch context doesn't exist"; - return false; - } - - switch(context_ptr->type()) { - case ScheduleContextType::kSearch: { - SearchContextPtr search_context = std::static_pointer_cast(context_ptr); - ReuseCacheIndexStrategy strategy; - return strategy.Schedule(search_context, task_list); - } - case ScheduleContextType::kDelete: { - DeleteContextPtr delete_context = std::static_pointer_cast(context_ptr); - DeleteTableStrategy strategy; - return strategy.Schedule(delete_context, task_list); - } - default: - ENGINE_LOG_ERROR << "Invalid schedule task type"; - return false; - } -} - -} -} -} \ No newline at end of file diff --git a/cpp/src/db/scheduler/TaskDispatchStrategy.h b/cpp/src/db/scheduler/TaskDispatchStrategy.h deleted file mode 100644 index 8d1b51605e..0000000000 --- a/cpp/src/db/scheduler/TaskDispatchStrategy.h +++ /dev/null @@ -1,36 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include "context/IScheduleContext.h" -#include "task/IScheduleTask.h" - -#include - -namespace zilliz { -namespace milvus { -namespace engine { - -class TaskDispatchStrategy { -public: - static bool Schedule(const ScheduleContextPtr &context_ptr, std::list& task_list); -}; - -} -} -} diff --git a/cpp/src/db/scheduler/TaskScheduler.cpp b/cpp/src/db/scheduler/TaskScheduler.cpp deleted file mode 100644 index a546dbabf1..0000000000 --- a/cpp/src/db/scheduler/TaskScheduler.cpp +++ /dev/null @@ -1,135 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - - -#include "server/ServerConfig.h" -#include "TaskScheduler.h" -#include "TaskDispatchQueue.h" -#include "utils/Log.h" -#include "utils/TimeRecorder.h" -#include "db/engine/EngineFactory.h" -#include "scheduler/task/TaskConvert.h" -#include "scheduler/SchedInst.h" -#include "scheduler/ResourceFactory.h" - -namespace zilliz { -namespace milvus { -namespace engine { - -TaskScheduler::TaskScheduler() - : stopped_(true) { - Start(); -} - -TaskScheduler::~TaskScheduler() { - Stop(); -} - -TaskScheduler& TaskScheduler::GetInstance() { - static TaskScheduler s_instance; - return s_instance; -} - -bool -TaskScheduler::Start() { - if(!stopped_) { - SERVER_LOG_INFO << "Task Scheduler isn't started"; - return true; - } - - stopped_ = false; - - task_queue_.SetCapacity(2); - - task_dispatch_thread_ = std::make_shared(&TaskScheduler::TaskDispatchWorker, this); - task_thread_ = std::make_shared(&TaskScheduler::TaskWorker, this); - - return true; -} - -bool -TaskScheduler::Stop() { - if(stopped_) { - SERVER_LOG_INFO << "Task Scheduler already stopped"; - return true; - } - - if(task_dispatch_thread_) { - task_dispatch_queue_.Put(nullptr); - task_dispatch_thread_->join(); - task_dispatch_thread_ = nullptr; - } - - if(task_thread_) { - task_queue_.Put(nullptr); - task_thread_->join(); - task_thread_ = nullptr; - } - - stopped_ = true; - - return true; -} - -bool -TaskScheduler::Schedule(ScheduleContextPtr context) { - task_dispatch_queue_.Put(context); - - return true; -} - -bool -TaskScheduler::TaskDispatchWorker() { - while(true) { - ScheduleTaskPtr task_ptr = task_dispatch_queue_.Take(); - if(task_ptr == nullptr) { - SERVER_LOG_INFO << "Stop db task dispatch thread"; - return true; - } - - // TODO: Put task into Disk-TaskTable - auto task = TaskConvert(task_ptr); - auto disk_list = ResMgrInst::GetInstance()->GetDiskResources(); - if (!disk_list.empty()) { - if (auto disk = disk_list[0].lock()) { - disk->task_table().Put(task); - } - } - } -} - -bool -TaskScheduler::TaskWorker() { - while(true) { - // TODO: expected blocking forever - ScheduleTaskPtr task_ptr = task_queue_.Take(); - if(task_ptr == nullptr) { - SERVER_LOG_INFO << "Stop db task worker thread"; - return true; - } - - //execute task - ScheduleTaskPtr next_task = task_ptr->Execute(); - if(next_task != nullptr) { - task_queue_.Put(next_task); - } - } -} - -} -} -} diff --git a/cpp/src/db/scheduler/TaskScheduler.h b/cpp/src/db/scheduler/TaskScheduler.h deleted file mode 100644 index 86feb37f8f..0000000000 --- a/cpp/src/db/scheduler/TaskScheduler.h +++ /dev/null @@ -1,63 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include "context/IScheduleContext.h" -#include "task/IScheduleTask.h" -#include "TaskDispatchQueue.h" -#include "utils/BlockingQueue.h" - -#include - -namespace zilliz { -namespace milvus { -namespace engine { - -class TaskScheduler { -private: - TaskScheduler(); - virtual ~TaskScheduler(); - -public: - static TaskScheduler& GetInstance(); - - bool Schedule(ScheduleContextPtr context); - -private: - bool Start(); - bool Stop(); - - bool TaskDispatchWorker(); - bool TaskWorker(); - -private: - std::shared_ptr task_dispatch_thread_; - std::shared_ptr task_thread_; - - TaskDispatchQueue task_dispatch_queue_; - - using TaskQueue = server::BlockingQueue; - TaskQueue task_queue_; - - bool stopped_ = true; -}; - - -} -} -} diff --git a/cpp/src/db/scheduler/context/DeleteContext.cpp b/cpp/src/db/scheduler/context/DeleteContext.cpp deleted file mode 100644 index 32ec2f8f09..0000000000 --- a/cpp/src/db/scheduler/context/DeleteContext.cpp +++ /dev/null @@ -1,52 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - - -#include "DeleteContext.h" - - -namespace zilliz { -namespace milvus { -namespace engine { - -DeleteContext::DeleteContext(const std::string &table_id, meta::MetaPtr &meta_ptr, uint64_t num_resource) - : IScheduleContext(ScheduleContextType::kDelete), - table_id_(table_id), - meta_ptr_(meta_ptr), - num_resource_(num_resource) { - -} - -void DeleteContext::WaitAndDelete() { -#ifdef NEW_SCHEDULER - std::unique_lock lock(mutex_); - cv_.wait(lock, [&] { return done_resource == num_resource_; }); - meta_ptr_->DeleteTableFiles(table_id_); -#endif -} - -void DeleteContext::ResourceDone() { - { - std::lock_guard lock(mutex_); - ++done_resource; - } - cv_.notify_one(); -} - -} -} -} \ No newline at end of file diff --git a/cpp/src/db/scheduler/context/DeleteContext.h b/cpp/src/db/scheduler/context/DeleteContext.h deleted file mode 100644 index 7132acc827..0000000000 --- a/cpp/src/db/scheduler/context/DeleteContext.h +++ /dev/null @@ -1,52 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include "IScheduleContext.h" -#include "db/meta/Meta.h" -#include -#include - -namespace zilliz { -namespace milvus { -namespace engine { - -class DeleteContext : public IScheduleContext { -public: - DeleteContext(const std::string& table_id, meta::MetaPtr& meta_ptr, uint64_t num_resource); - - std::string table_id() const { return table_id_; } - meta::MetaPtr meta() const { return meta_ptr_; } - void WaitAndDelete(); - void ResourceDone(); - -private: - std::string table_id_; - meta::MetaPtr meta_ptr_; - - uint64_t num_resource_; - uint64_t done_resource = 0; - std::mutex mutex_; - std::condition_variable cv_; -}; - -using DeleteContextPtr = std::shared_ptr; - -} -} -} diff --git a/cpp/src/db/scheduler/context/IScheduleContext.h b/cpp/src/db/scheduler/context/IScheduleContext.h deleted file mode 100644 index 1ac347526c..0000000000 --- a/cpp/src/db/scheduler/context/IScheduleContext.h +++ /dev/null @@ -1,50 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include - -namespace zilliz { -namespace milvus { -namespace engine { - -enum class ScheduleContextType { - kUnknown = 0, - kSearch, - kDelete, -}; - -class IScheduleContext { -public: - IScheduleContext(ScheduleContextType type) - : type_(type) { - } - - virtual ~IScheduleContext() = default; - - ScheduleContextType type() const { return type_; } - -protected: - ScheduleContextType type_; -}; - -using ScheduleContextPtr = std::shared_ptr; - -} -} -} diff --git a/cpp/src/db/scheduler/context/SearchContext.cpp b/cpp/src/db/scheduler/context/SearchContext.cpp deleted file mode 100644 index ae9ced5938..0000000000 --- a/cpp/src/db/scheduler/context/SearchContext.cpp +++ /dev/null @@ -1,70 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - - -#include "SearchContext.h" -#include "utils/Log.h" - -#include - -namespace zilliz { -namespace milvus { -namespace engine { - -SearchContext::SearchContext(uint64_t topk, uint64_t nq, uint64_t nprobe, const float* vectors) - : IScheduleContext(ScheduleContextType::kSearch), - topk_(topk), - nq_(nq), - nprobe_(nprobe), - vectors_(vectors) { - //use current time to identify this context - std::chrono::system_clock::time_point tp = std::chrono::system_clock::now(); - long id = tp.time_since_epoch().count(); - identity_ = std::to_string(id); -} - -bool -SearchContext::AddIndexFile(TableFileSchemaPtr& index_file) { - std::unique_lock lock(mtx_); - if(index_file == nullptr || map_index_files_.find(index_file->id_) != map_index_files_.end()) { - return false; - } - - SERVER_LOG_DEBUG << "SearchContext " << identity_ << " add index file: " << index_file->id_; - - map_index_files_[index_file->id_] = index_file; - return true; -} - -void -SearchContext::IndexSearchDone(size_t index_id) { - std::unique_lock lock(mtx_); - map_index_files_.erase(index_id); - done_cond_.notify_all(); - SERVER_LOG_DEBUG << "SearchContext " << identity_ << " finish index file: " << index_id; -} - -void -SearchContext::WaitResult() { - std::unique_lock lock(mtx_); - done_cond_.wait(lock, [this] { return map_index_files_.empty(); }); - SERVER_LOG_DEBUG << "SearchContext " << identity_ << " all done"; -} - -} -} -} \ No newline at end of file diff --git a/cpp/src/db/scheduler/context/SearchContext.h b/cpp/src/db/scheduler/context/SearchContext.h deleted file mode 100644 index 3aee2c436e..0000000000 --- a/cpp/src/db/scheduler/context/SearchContext.h +++ /dev/null @@ -1,94 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include "IScheduleContext.h" -#include "db/meta/MetaTypes.h" - -#include -#include -#include -#include - -namespace zilliz { -namespace milvus { -namespace engine { - -using TableFileSchemaPtr = std::shared_ptr; - -class SearchContext : public IScheduleContext { -public: - SearchContext(uint64_t topk, uint64_t nq, uint64_t nprobe, const float* vectors); - - bool AddIndexFile(TableFileSchemaPtr& index_file); - - uint64_t topk() const { return topk_; } - uint64_t nq() const { return nq_; } - uint64_t nprobe() const { return nprobe_; } - const float* vectors() const { return vectors_; } - - using Id2IndexMap = std::unordered_map; - const Id2IndexMap& GetIndexMap() const { return map_index_files_; } - - using Id2DistanceMap = std::vector>; - using ResultSet = std::vector; - const ResultSet& GetResult() const { return result_; } - ResultSet& GetResult() { return result_; } - - const std::string& Identity() const { return identity_; } - const Status& GetStatus() const { return status_; } - Status& GetStatus() { return status_; } - - void IndexSearchDone(size_t index_id); - void WaitResult(); - - void AccumLoadCost(double span) { time_cost_load_ += span; } - void AccumSearchCost(double span) { time_cost_search_ += span; } - void AccumReduceCost(double span) { time_cost_reduce_ += span; } - - double LoadCost() const { return time_cost_load_; } - double SearchCost() const { return time_cost_search_; } - double ReduceCost() const { return time_cost_reduce_; } - -private: - uint64_t topk_ = 0; - uint64_t nq_ = 0; - uint64_t nprobe_ = 10; - const float* vectors_ = nullptr; - - Id2IndexMap map_index_files_; - ResultSet result_; - - std::mutex mtx_; - std::condition_variable done_cond_; - - std::string identity_; //for debug - Status status_; - - double time_cost_load_ = 0.0; //time cost for load all index files, unit: us - double time_cost_search_ = 0.0; //time cost for entire search, unit: us - double time_cost_reduce_ = 0.0; //time cost for entire reduce, unit: us -}; - -using SearchContextPtr = std::shared_ptr; - - - -} -} -} diff --git a/cpp/src/db/scheduler/task/DeleteTask.cpp b/cpp/src/db/scheduler/task/DeleteTask.cpp deleted file mode 100644 index 310b844ff9..0000000000 --- a/cpp/src/db/scheduler/task/DeleteTask.cpp +++ /dev/null @@ -1,37 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - - -#include "DeleteTask.h" - -namespace zilliz { -namespace milvus { -namespace engine { - -DeleteTask::DeleteTask(const DeleteContextPtr& context) - : IScheduleTask(ScheduleTaskType::kDelete), - context_(context) { - -} - -std::shared_ptr DeleteTask::Execute() { - return nullptr; -} - -} -} -} \ No newline at end of file diff --git a/cpp/src/db/scheduler/task/DeleteTask.h b/cpp/src/db/scheduler/task/DeleteTask.h deleted file mode 100644 index 362b125c92..0000000000 --- a/cpp/src/db/scheduler/task/DeleteTask.h +++ /dev/null @@ -1,41 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include "IScheduleTask.h" -#include "db/scheduler/context/DeleteContext.h" - -namespace zilliz { -namespace milvus { -namespace engine { - -class DeleteTask : public IScheduleTask { -public: - DeleteTask(const DeleteContextPtr& context); - - virtual std::shared_ptr Execute() override; - -public: - DeleteContextPtr context_; -}; - -using DeleteTaskPtr = std::shared_ptr; - -} -} -} diff --git a/cpp/src/db/scheduler/task/IScheduleTask.h b/cpp/src/db/scheduler/task/IScheduleTask.h deleted file mode 100644 index 07025a7d72..0000000000 --- a/cpp/src/db/scheduler/task/IScheduleTask.h +++ /dev/null @@ -1,53 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include - -namespace zilliz { -namespace milvus { -namespace engine { - -enum class ScheduleTaskType { - kUnknown = 0, - kIndexLoad, - kSearch, - kDelete, -}; - -class IScheduleTask { -public: - IScheduleTask(ScheduleTaskType type) - : type_(type) { - } - - virtual ~IScheduleTask() = default; - - ScheduleTaskType type() const { return type_; } - - virtual std::shared_ptr Execute() = 0; - -protected: - ScheduleTaskType type_; -}; - -using ScheduleTaskPtr = std::shared_ptr; - -} -} -} diff --git a/cpp/src/db/scheduler/task/IndexLoadTask.cpp b/cpp/src/db/scheduler/task/IndexLoadTask.cpp deleted file mode 100644 index be83b86683..0000000000 --- a/cpp/src/db/scheduler/task/IndexLoadTask.cpp +++ /dev/null @@ -1,41 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - - -#include "IndexLoadTask.h" -#include "SearchTask.h" -#include "db/engine/EngineFactory.h" -#include "utils/Log.h" -#include "utils/TimeRecorder.h" -#include "metrics/Metrics.h" - -namespace zilliz { -namespace milvus { -namespace engine { - -IndexLoadTask::IndexLoadTask() - : IScheduleTask(ScheduleTaskType::kIndexLoad) { - -} - -std::shared_ptr IndexLoadTask::Execute() { - return nullptr; -} - -} -} -} \ No newline at end of file diff --git a/cpp/src/db/scheduler/task/IndexLoadTask.h b/cpp/src/db/scheduler/task/IndexLoadTask.h deleted file mode 100644 index 29356e5f58..0000000000 --- a/cpp/src/db/scheduler/task/IndexLoadTask.h +++ /dev/null @@ -1,42 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include "IScheduleTask.h" -#include "db/scheduler/context/SearchContext.h" - -namespace zilliz { -namespace milvus { -namespace engine { - -class IndexLoadTask : public IScheduleTask { -public: - IndexLoadTask(); - - virtual std::shared_ptr Execute() override; - -public: - TableFileSchemaPtr file_; - std::vector search_contexts_; -}; - -using IndexLoadTaskPtr = std::shared_ptr; - -} -} -} diff --git a/cpp/src/db/scheduler/task/SearchTask.cpp b/cpp/src/db/scheduler/task/SearchTask.cpp deleted file mode 100644 index d0962ec101..0000000000 --- a/cpp/src/db/scheduler/task/SearchTask.cpp +++ /dev/null @@ -1,39 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "SearchTask.h" -#include "metrics/Metrics.h" -#include "utils/Log.h" -#include "utils/TimeRecorder.h" - -#include - -namespace zilliz { -namespace milvus { -namespace engine { - -SearchTask::SearchTask() -: IScheduleTask(ScheduleTaskType::kSearch) { -} - -std::shared_ptr SearchTask::Execute() { - return nullptr; -} - -} -} -} diff --git a/cpp/src/db/scheduler/task/SearchTask.h b/cpp/src/db/scheduler/task/SearchTask.h deleted file mode 100644 index 47c4813fd2..0000000000 --- a/cpp/src/db/scheduler/task/SearchTask.h +++ /dev/null @@ -1,46 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include "IScheduleTask.h" -#include "db/scheduler/context/SearchContext.h" -#include "db/engine/ExecutionEngine.h" - -namespace zilliz { -namespace milvus { -namespace engine { - -class SearchTask : public IScheduleTask { -public: - SearchTask(); - - virtual std::shared_ptr Execute() override; - -public: - size_t index_id_ = 0; - int file_type_ = 0; //for metrics - ExecutionEnginePtr index_engine_; - std::vector search_contexts_; -}; - -using SearchTaskPtr = std::shared_ptr; - - -} -} -} \ No newline at end of file diff --git a/cpp/src/scheduler/JobMgr.h b/cpp/src/scheduler/JobMgr.h index f096ab6121..1d8720c4b4 100644 --- a/cpp/src/scheduler/JobMgr.h +++ b/cpp/src/scheduler/JobMgr.h @@ -73,6 +73,8 @@ private: ResourceMgrPtr res_mgr_ = nullptr; }; +using JobMgrPtr = std::shared_ptr; + } } } diff --git a/cpp/src/scheduler/SchedInst.cpp b/cpp/src/scheduler/SchedInst.cpp index 5432227523..424bc9f778 100644 --- a/cpp/src/scheduler/SchedInst.cpp +++ b/cpp/src/scheduler/SchedInst.cpp @@ -33,6 +33,9 @@ std::mutex ResMgrInst::mutex_; SchedulerPtr SchedInst::instance = nullptr; std::mutex SchedInst::mutex_; +scheduler::JobMgrPtr JobMgrInst::instance = nullptr; +std::mutex JobMgrInst::mutex_; + void load_simple_config() { server::ConfigNode &config = server::ServerConfig::GetInstance().GetConfig(server::CONFIG_RESOURCE); @@ -151,12 +154,14 @@ StartSchedulerService() { // load_advance_config(); ResMgrInst::GetInstance()->Start(); SchedInst::GetInstance()->Start(); + JobMgrInst::GetInstance()->Start(); } void StopSchedulerService() { - ResMgrInst::GetInstance()->Stop(); + JobMgrInst::GetInstance()->Stop(); SchedInst::GetInstance()->Stop(); + ResMgrInst::GetInstance()->Stop(); } } } diff --git a/cpp/src/scheduler/SchedInst.h b/cpp/src/scheduler/SchedInst.h index fe73aae362..010a7e84f5 100644 --- a/cpp/src/scheduler/SchedInst.h +++ b/cpp/src/scheduler/SchedInst.h @@ -19,6 +19,7 @@ #include "ResourceMgr.h" #include "Scheduler.h" +#include "JobMgr.h" #include #include @@ -64,6 +65,24 @@ private: static std::mutex mutex_; }; +class JobMgrInst { +public: + static scheduler::JobMgrPtr + GetInstance() { + if (instance == nullptr) { + std::lock_guard lock(mutex_); + if (instance == nullptr) { + instance = std::make_shared(ResMgrInst::GetInstance()); + } + } + return instance; + } + +private: + static scheduler::JobMgrPtr instance; + static std::mutex mutex_; +}; + void StartSchedulerService(); diff --git a/cpp/src/scheduler/TaskCreator.cpp b/cpp/src/scheduler/TaskCreator.cpp index afe0d9d868..1d471135d6 100644 --- a/cpp/src/scheduler/TaskCreator.cpp +++ b/cpp/src/scheduler/TaskCreator.cpp @@ -15,7 +15,9 @@ // specific language governing permissions and limitations // under the License. +#include #include "TaskCreator.h" +#include "tasklabel/DefaultLabel.h" namespace zilliz { @@ -43,6 +45,8 @@ TaskCreator::Create(const SearchJobPtr &job) { std::vector tasks; for (auto &index_file : job->index_files()) { auto task = std::make_shared(index_file.second); + task->label() = std::make_shared(); + task->job_ = job; tasks.emplace_back(task); } @@ -52,8 +56,10 @@ TaskCreator::Create(const SearchJobPtr &job) { std::vector TaskCreator::Create(const DeleteJobPtr &job) { std::vector tasks; -// auto task = std::make_shared(job); -// tasks.emplace_back(task); + auto task = std::make_shared(job); + task->label() = std::make_shared(); + task->job_ = job; + tasks.emplace_back(task); return tasks; } diff --git a/cpp/src/scheduler/job/Job.h b/cpp/src/scheduler/job/Job.h index 845a5dc165..242c33a4e5 100644 --- a/cpp/src/scheduler/job/Job.h +++ b/cpp/src/scheduler/job/Job.h @@ -62,6 +62,7 @@ private: }; using JobPtr = std::shared_ptr; +using JobWPtr = std::weak_ptr; } } diff --git a/cpp/src/scheduler/job/SearchJob.cpp b/cpp/src/scheduler/job/SearchJob.cpp index 786c1fb7b5..65b6701b5f 100644 --- a/cpp/src/scheduler/job/SearchJob.cpp +++ b/cpp/src/scheduler/job/SearchJob.cpp @@ -28,7 +28,12 @@ SearchJob::SearchJob(zilliz::milvus::scheduler::JobId id, uint64_t topk, uint64_t nq, uint64_t nprobe, - const float *vectors) : Job(id, JobType::SEARCH) {} + const float *vectors) + : Job(id, JobType::SEARCH), + topk_(topk), + nq_(nq), + nprobe_(nprobe), + vectors_(vectors) {} bool SearchJob::AddIndexFile(const TableFileSchemaPtr &index_file) { @@ -64,6 +69,11 @@ SearchJob::GetResult() { return result_; } +Status& +SearchJob::GetStatus() { + return status_; +} + } } diff --git a/cpp/src/scheduler/job/SearchJob.h b/cpp/src/scheduler/job/SearchJob.h index ed6531767c..12edeab199 100644 --- a/cpp/src/scheduler/job/SearchJob.h +++ b/cpp/src/scheduler/job/SearchJob.h @@ -59,6 +59,9 @@ public: ResultSet & GetResult(); + Status & + GetStatus(); + public: uint64_t topk() const { @@ -94,6 +97,7 @@ private: Id2IndexMap index_files_; // TODO: column-base better ? ResultSet result_; + Status status_; std::mutex mutex_; std::condition_variable cv_; diff --git a/cpp/src/scheduler/task/DeleteTask.cpp b/cpp/src/scheduler/task/DeleteTask.cpp index ded1ad0462..b1b1697f0e 100644 --- a/cpp/src/scheduler/task/DeleteTask.cpp +++ b/cpp/src/scheduler/task/DeleteTask.cpp @@ -23,8 +23,8 @@ namespace zilliz { namespace milvus { namespace engine { -XDeleteTask::XDeleteTask(DeleteContextPtr &delete_context) - : Task(TaskType::DeleteTask), delete_context_ptr_(delete_context) {} +XDeleteTask::XDeleteTask(const scheduler::DeleteJobPtr &delete_job) + : Task(TaskType::DeleteTask), delete_job_(delete_job) {} void XDeleteTask::Load(LoadType type, uint8_t device_id) { @@ -33,7 +33,7 @@ XDeleteTask::Load(LoadType type, uint8_t device_id) { void XDeleteTask::Execute() { - delete_context_ptr_->ResourceDone(); + delete_job_->ResourceDone(); } } diff --git a/cpp/src/scheduler/task/DeleteTask.h b/cpp/src/scheduler/task/DeleteTask.h index b49107cc2a..f21a1a96e4 100644 --- a/cpp/src/scheduler/task/DeleteTask.h +++ b/cpp/src/scheduler/task/DeleteTask.h @@ -17,7 +17,7 @@ #pragma once -#include +#include "scheduler/job/DeleteJob.h" #include "Task.h" @@ -28,7 +28,7 @@ namespace engine { class XDeleteTask : public Task { public: explicit - XDeleteTask(DeleteContextPtr &delete_context); + XDeleteTask(const scheduler::DeleteJobPtr &job); void Load(LoadType type, uint8_t device_id) override; @@ -37,7 +37,7 @@ public: Execute() override; public: - DeleteContextPtr delete_context_ptr_; + scheduler::DeleteJobPtr delete_job_; }; } diff --git a/cpp/src/scheduler/task/SearchTask.cpp b/cpp/src/scheduler/task/SearchTask.cpp index d8222da39f..e39cd00d2e 100644 --- a/cpp/src/scheduler/task/SearchTask.cpp +++ b/cpp/src/scheduler/task/SearchTask.cpp @@ -22,6 +22,7 @@ #include "utils/Log.h" #include +#include "scheduler/job/SearchJob.h" namespace zilliz { @@ -94,7 +95,7 @@ CollectFileMetrics(int file_type, size_t file_size) { } } -XSearchTask::XSearchTask(TableFileSchemaPtr file) +XSearchTask::XSearchTask(meta::TableFileSchemaPtr file) : Task(TaskType::SearchTask), file_(file) { if (file_) { index_engine_ = EngineFactory::Build(file_->dimension_, @@ -143,9 +144,10 @@ XSearchTask::Load(LoadType type, uint8_t device_id) { s = Status(SERVER_UNEXPECTED_ERROR, error_msg); } - for (auto &context : search_contexts_) { - context->IndexSearchDone(file_->id_);//mark as done avoid dead lock, even failed - context->GetStatus() = s; + if (auto job = job_.lock()){ + auto search_job = std::static_pointer_cast(job); + search_job->SearchDone(file_->id_); + search_job->GetStatus() = s; } return; @@ -156,16 +158,16 @@ XSearchTask::Load(LoadType type, uint8_t device_id) { std::string info = "Load file id:" + std::to_string(file_->id_) + " file type:" + std::to_string(file_->file_type_) + " size:" + std::to_string(file_size) + " bytes from location: " + file_->location_ + " totally cost"; double span = rc.ElapseFromBegin(info); - for (auto &context : search_contexts_) { - context->AccumLoadCost(span); - } +// for (auto &context : search_contexts_) { +// context->AccumLoadCost(span); +// } CollectFileMetrics(file_->file_type_, file_size); //step 2: return search task for later execution index_id_ = file_->id_; index_type_ = file_->file_type_; - search_contexts_.swap(search_contexts_); +// search_contexts_.swap(search_contexts_); } void @@ -174,8 +176,8 @@ XSearchTask::Execute() { return; } - ENGINE_LOG_DEBUG << "Searching in file id:" << index_id_ << " with " - << search_contexts_.size() << " tasks"; +// ENGINE_LOG_DEBUG << "Searching in file id:" << index_id_ << " with " +// << search_contexts_.size() << " tasks"; TimeRecorder rc("DoSearch file id:" + std::to_string(index_id_)); @@ -183,16 +185,18 @@ XSearchTask::Execute() { std::vector output_ids; std::vector output_distance; - for (auto &context : search_contexts_) { + + if (auto job = job_.lock()) { + auto search_job = std::static_pointer_cast(job); //step 1: allocate memory - uint64_t nq = context->nq(); - uint64_t topk = context->topk(); - uint64_t nprobe = context->nprobe(); - const float* vectors = context->vectors(); + uint64_t nq = search_job->nq(); + uint64_t topk = search_job->topk(); + uint64_t nprobe = search_job->nprobe(); + const float* vectors = search_job->vectors(); output_ids.resize(topk * nq); output_distance.resize(topk * nq); - std::string hdr = "context " + context->Identity() + + std::string hdr = "job " + std::to_string(search_job->id()) + " nq " + std::to_string(nq) + " topk " + std::to_string(topk); @@ -201,30 +205,29 @@ XSearchTask::Execute() { index_engine_->Search(nq, vectors, topk, nprobe, output_distance.data(), output_ids.data()); double span = rc.RecordSection(hdr + ", do search"); - context->AccumSearchCost(span); +// search_job->AccumSearchCost(span); //step 3: cluster result - SearchContext::ResultSet result_set; + scheduler::ResultSet result_set; auto spec_k = index_engine_->Count() < topk ? index_engine_->Count() : topk; XSearchTask::ClusterResult(output_ids, output_distance, nq, spec_k, result_set); span = rc.RecordSection(hdr + ", cluster result"); - context->AccumReduceCost(span); +// search_job->AccumReduceCost(span); // step 4: pick up topk result - XSearchTask::TopkResult(result_set, topk, metric_l2, context->GetResult()); + XSearchTask::TopkResult(result_set, topk, metric_l2, search_job->GetResult()); span = rc.RecordSection(hdr + ", reduce topk"); - context->AccumReduceCost(span); +// search_job->AccumReduceCost(span); } catch (std::exception &ex) { ENGINE_LOG_ERROR << "SearchTask encounter exception: " << ex.what(); - context->IndexSearchDone(index_id_);//mark as done avoid dead lock, even search failed - continue; +// search_job->IndexSearchDone(index_id_);//mark as done avoid dead lock, even search failed } //step 5: notify to send result to client - context->IndexSearchDone(index_id_); + search_job->SearchDone(index_id_); } rc.ElapseFromBegin("totally cost"); @@ -237,7 +240,7 @@ Status XSearchTask::ClusterResult(const std::vector &output_ids, const std::vector &output_distance, uint64_t nq, uint64_t topk, - SearchContext::ResultSet &result_set) { + scheduler::ResultSet &result_set) { if (output_ids.size() < nq * topk || output_distance.size() < nq * topk) { std::string msg = "Invalid id array size: " + std::to_string(output_ids.size()) + " distance array size: " + std::to_string(output_distance.size()); @@ -250,7 +253,7 @@ Status XSearchTask::ClusterResult(const std::vector &output_ids, std::function reduce_worker = [&](size_t from_index, size_t to_index) { for (auto i = from_index; i < to_index; i++) { - SearchContext::Id2DistanceMap id_distance; + scheduler::Id2DistanceMap id_distance; id_distance.reserve(topk); for (auto k = 0; k < topk; k++) { uint64_t index = i * topk + k; @@ -272,8 +275,8 @@ Status XSearchTask::ClusterResult(const std::vector &output_ids, return Status::OK(); } -Status XSearchTask::MergeResult(SearchContext::Id2DistanceMap &distance_src, - SearchContext::Id2DistanceMap &distance_target, +Status XSearchTask::MergeResult(scheduler::Id2DistanceMap &distance_src, + scheduler::Id2DistanceMap &distance_target, uint64_t topk, bool ascending) { //Note: the score_src and score_target are already arranged by score in ascending order @@ -290,7 +293,7 @@ Status XSearchTask::MergeResult(SearchContext::Id2DistanceMap &distance_src, size_t src_count = distance_src.size(); size_t target_count = distance_target.size(); - SearchContext::Id2DistanceMap distance_merged; + scheduler::Id2DistanceMap distance_merged; distance_merged.reserve(topk); size_t src_index = 0, target_index = 0; while (true) { @@ -346,10 +349,10 @@ Status XSearchTask::MergeResult(SearchContext::Id2DistanceMap &distance_src, return Status::OK(); } -Status XSearchTask::TopkResult(SearchContext::ResultSet &result_src, +Status XSearchTask::TopkResult(scheduler::ResultSet &result_src, uint64_t topk, bool ascending, - SearchContext::ResultSet &result_target) { + scheduler::ResultSet &result_target) { if (result_target.empty()) { result_target.swap(result_src); return Status::OK(); @@ -363,8 +366,8 @@ Status XSearchTask::TopkResult(SearchContext::ResultSet &result_src, std::function ReduceWorker = [&](size_t from_index, size_t to_index) { for (size_t i = from_index; i < to_index; i++) { - SearchContext::Id2DistanceMap &score_src = result_src[i]; - SearchContext::Id2DistanceMap &score_target = result_target[i]; + scheduler::Id2DistanceMap &score_src = result_src[i]; + scheduler::Id2DistanceMap &score_target = result_target[i]; XSearchTask::MergeResult(score_src, score_target, topk, ascending); } }; diff --git a/cpp/src/scheduler/task/SearchTask.h b/cpp/src/scheduler/task/SearchTask.h index b92e5bd851..e54e742e24 100644 --- a/cpp/src/scheduler/task/SearchTask.h +++ b/cpp/src/scheduler/task/SearchTask.h @@ -18,6 +18,8 @@ #pragma once #include "Task.h" +#include "db/meta/MetaTypes.h" +#include "scheduler/job/SearchJob.h" namespace zilliz { @@ -28,7 +30,7 @@ namespace engine { class XSearchTask : public Task { public: explicit - XSearchTask(TableFileSchemaPtr file); + XSearchTask(meta::TableFileSchemaPtr file); void Load(LoadType type, uint8_t device_id) override; @@ -41,20 +43,20 @@ public: const std::vector &output_distence, uint64_t nq, uint64_t topk, - SearchContext::ResultSet &result_set); + scheduler::ResultSet &result_set); - static Status MergeResult(SearchContext::Id2DistanceMap &distance_src, - SearchContext::Id2DistanceMap &distance_target, + static Status MergeResult(scheduler::Id2DistanceMap &distance_src, + scheduler::Id2DistanceMap &distance_target, uint64_t topk, bool ascending); - static Status TopkResult(SearchContext::ResultSet &result_src, + static Status TopkResult(scheduler::ResultSet &result_src, uint64_t topk, bool ascending, - SearchContext::ResultSet &result_target); + scheduler::ResultSet &result_target); public: - TableFileSchemaPtr file_; + meta::TableFileSchemaPtr file_; size_t index_id_ = 0; int index_type_ = 0; diff --git a/cpp/src/scheduler/task/Task.h b/cpp/src/scheduler/task/Task.h index bfaa1fe68d..c4a2c1cb4c 100644 --- a/cpp/src/scheduler/task/Task.h +++ b/cpp/src/scheduler/task/Task.h @@ -17,9 +17,9 @@ #pragma once -#include "db/scheduler/context/SearchContext.h" -#include "db/scheduler/task/IScheduleTask.h" #include "scheduler/tasklabel/TaskLabel.h" +#include "scheduler/job/Job.h" +#include "utils/Status.h" #include "Path.h" #include @@ -84,7 +84,8 @@ public: public: Path task_path_; - std::vector search_contexts_; +// std::vector search_contexts_; + scheduler::JobWPtr job_; TaskType type_; TaskLabelPtr label_ = nullptr; }; diff --git a/cpp/src/scheduler/task/TaskConvert.cpp b/cpp/src/scheduler/task/TaskConvert.cpp deleted file mode 100644 index 6be6309093..0000000000 --- a/cpp/src/scheduler/task/TaskConvert.cpp +++ /dev/null @@ -1,53 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - - -#include "TaskConvert.h" -#include "scheduler/tasklabel/DefaultLabel.h" -#include "scheduler/tasklabel/BroadcastLabel.h" - - -namespace zilliz { -namespace milvus { -namespace engine { - -TaskPtr -TaskConvert(const ScheduleTaskPtr &schedule_task) { - switch (schedule_task->type()) { - case ScheduleTaskType::kIndexLoad: { - auto load_task = std::static_pointer_cast(schedule_task); - auto task = std::make_shared(load_task->file_); - task->label() = std::make_shared(); - task->search_contexts_ = load_task->search_contexts_; - return task; - } - case ScheduleTaskType::kDelete: { - auto delete_task = std::static_pointer_cast(schedule_task); - auto task = std::make_shared(delete_task->context_); - task->label() = std::make_shared(); - return task; - } - default: { - // TODO: unexpected !!! - return nullptr; - } - } -} - -} -} -} diff --git a/cpp/src/scheduler/task/TaskConvert.h b/cpp/src/scheduler/task/TaskConvert.h deleted file mode 100644 index 2420c48b23..0000000000 --- a/cpp/src/scheduler/task/TaskConvert.h +++ /dev/null @@ -1,35 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - - -#include "db/scheduler/task/DeleteTask.h" -#include "db/scheduler/task/IndexLoadTask.h" -#include "Task.h" -#include "SearchTask.h" -#include "DeleteTask.h" - -namespace zilliz { -namespace milvus { -namespace engine { - - -TaskPtr -TaskConvert(const ScheduleTaskPtr &schedule_task); - -} -} -} diff --git a/cpp/src/scheduler/task/TestTask.cpp b/cpp/src/scheduler/task/TestTask.cpp index f1f9c29477..5d5ac976b5 100644 --- a/cpp/src/scheduler/task/TestTask.cpp +++ b/cpp/src/scheduler/task/TestTask.cpp @@ -25,7 +25,7 @@ namespace milvus { namespace engine { -TestTask::TestTask(TableFileSchemaPtr &file) : XSearchTask(file) {} +TestTask::TestTask(meta::TableFileSchemaPtr &file) : XSearchTask(file) {} void TestTask::Load(LoadType type, uint8_t device_id) { diff --git a/cpp/src/scheduler/task/TestTask.h b/cpp/src/scheduler/task/TestTask.h index 66fd864e1d..e489fc537f 100644 --- a/cpp/src/scheduler/task/TestTask.h +++ b/cpp/src/scheduler/task/TestTask.h @@ -26,7 +26,8 @@ namespace engine { class TestTask : public XSearchTask { public: - TestTask(TableFileSchemaPtr& file); + explicit + TestTask(meta::TableFileSchemaPtr& file); public: void diff --git a/cpp/unittest/db/CMakeLists.txt b/cpp/unittest/db/CMakeLists.txt index 29440dcf1a..b64696fe50 100644 --- a/cpp/unittest/db/CMakeLists.txt +++ b/cpp/unittest/db/CMakeLists.txt @@ -26,6 +26,7 @@ aux_source_directory(${MILVUS_ENGINE_SRC}/cache cache_srcs) aux_source_directory(${MILVUS_ENGINE_SRC}/wrapper/knowhere knowhere_src) aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/action scheduler_action_srcs) aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/event scheduler_event_srcs) +aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/job scheduler_job_srcs) aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/resource scheduler_resource_srcs) aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/task scheduler_task_srcs) aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler scheduler_srcs) @@ -37,14 +38,6 @@ set(util_files ${MILVUS_ENGINE_SRC}/utils/ValidationUtil.cpp ${MILVUS_ENGINE_SRC}/utils/easylogging++.cc) -aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler scheduler_files) -aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler/context scheduler_context_files) -aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler/task scheduler_task_files) -set(db_scheduler_srcs - ${scheduler_files} - ${scheduler_context_files} - ${scheduler_task_files} - ) include_directories(/usr/local/cuda/include) link_directories("/usr/local/cuda/lib64") @@ -62,6 +55,7 @@ set(db_test_src ${wrapper_src} ${scheduler_action_srcs} ${scheduler_event_srcs} + ${scheduler_job_srcs} ${scheduler_resource_srcs} ${scheduler_task_srcs} ${scheduler_srcs} diff --git a/cpp/unittest/db/scheduler_test.cpp b/cpp/unittest/db/scheduler_test.cpp deleted file mode 100644 index e61741a8fb..0000000000 --- a/cpp/unittest/db/scheduler_test.cpp +++ /dev/null @@ -1,137 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include -#include -#include "utils/easylogging++.h" -#include - -#include "db/scheduler/TaskScheduler.h" -#include "db/scheduler/TaskDispatchStrategy.h" -#include "db/scheduler/TaskDispatchQueue.h" -#include "db/scheduler/task/SearchTask.h" -#include "db/scheduler/task/DeleteTask.h" -#include "db/scheduler/task/IndexLoadTask.h" - -using namespace zilliz::milvus; - -namespace { - -engine::TableFileSchemaPtr CreateTabileFileStruct(size_t id, const std::string& table_id) { - auto file = std::make_shared(); - file->id_ = id; - file->table_id_ = table_id; - return file; -} - -} - -TEST(DBSchedulerTest, TASK_QUEUE_TEST) { - engine::TaskDispatchQueue queue; - queue.SetCapacity(1000); - queue.Put(nullptr); - ASSERT_EQ(queue.Size(), 1UL); - - auto ptr = queue.Take(); - ASSERT_EQ(ptr, nullptr); - ASSERT_TRUE(queue.Empty()); - - engine::SearchContextPtr context_ptr = std::make_shared(1, 1, 10, nullptr); - for(size_t i = 0; i < 10; i++) { - auto file = CreateTabileFileStruct(i, "tbl"); - context_ptr->AddIndexFile(file); - } - - queue.Put(context_ptr); - ASSERT_EQ(queue.Size(), 10); - - auto index_files = context_ptr->GetIndexMap(); - - ptr = queue.Front(); - ASSERT_EQ(ptr->type(), engine::ScheduleTaskType::kIndexLoad); - engine::IndexLoadTaskPtr load_task = std::static_pointer_cast(ptr); - ASSERT_EQ(load_task->file_->id_, index_files.begin()->first); - - ptr = queue.Back(); - ASSERT_EQ(ptr->type(), engine::ScheduleTaskType::kIndexLoad); - load_task->Execute(); -} - -TEST(DBSchedulerTest, SEARCH_SCHEDULER_TEST) { - std::list task_list; - bool ret = engine::TaskDispatchStrategy::Schedule(nullptr, task_list); - ASSERT_FALSE(ret); - - for(size_t i = 10; i < 30; i++) { - engine::IndexLoadTaskPtr task_ptr = std::make_shared(); - task_ptr->file_ = CreateTabileFileStruct(i, "tbl"); - task_list.push_back(task_ptr); - } - - engine::SearchContextPtr context_ptr = std::make_shared(1, 1, 10, nullptr); - for(size_t i = 0; i < 20; i++) { - auto file = CreateTabileFileStruct(i, "tbl"); - context_ptr->AddIndexFile(file); - } - - ret = engine::TaskDispatchStrategy::Schedule(context_ptr, task_list); - ASSERT_TRUE(ret); - ASSERT_EQ(task_list.size(), 30); -} - -TEST(DBSchedulerTest, DELETE_SCHEDULER_TEST) { - std::list task_list; - bool ret = engine::TaskDispatchStrategy::Schedule(nullptr, task_list); - ASSERT_FALSE(ret); - - const std::string table_id = "to_delete_table"; - for(size_t i = 0; i < 10; i++) { - engine::IndexLoadTaskPtr task_ptr = std::make_shared(); - task_ptr->file_ = CreateTabileFileStruct(i, table_id); - task_list.push_back(task_ptr); - } - - for(size_t i = 0; i < 10; i++) { - engine::IndexLoadTaskPtr task_ptr = std::make_shared(); - task_ptr->file_ = CreateTabileFileStruct(i, "other_table"); - task_list.push_back(task_ptr); - } - - engine::meta::MetaPtr meta_ptr; - engine::DeleteContextPtr context_ptr = std::make_shared(table_id, meta_ptr, 0); - ret = engine::TaskDispatchStrategy::Schedule(context_ptr, task_list); - ASSERT_TRUE(ret); - ASSERT_EQ(task_list.size(), 21); - - auto temp_list = task_list; - for(size_t i = 0; ; i++) { - engine::ScheduleTaskPtr task_ptr = temp_list.front(); - temp_list.pop_front(); - if(task_ptr->type() == engine::ScheduleTaskType::kDelete) { - ASSERT_EQ(i, 10); - break; - } - } - - context_ptr = std::make_shared("no_task_table", meta_ptr, 0); - ret = engine::TaskDispatchStrategy::Schedule(context_ptr, task_list); - ASSERT_TRUE(ret); - ASSERT_EQ(task_list.size(), 22); - - engine::ScheduleTaskPtr task_ptr = task_list.front(); - ASSERT_EQ(task_ptr->type(), engine::ScheduleTaskType::kDelete); -} diff --git a/cpp/unittest/db/search_test.cpp b/cpp/unittest/db/search_test.cpp index 93b71ab406..295650f2d2 100644 --- a/cpp/unittest/db/search_test.cpp +++ b/cpp/unittest/db/search_test.cpp @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -#include "db/scheduler/task/SearchTask.h" #include "server/ServerConfig.h" #include "utils/TimeRecorder.h" @@ -50,9 +49,9 @@ void BuildResult(uint64_t nq, } } -void CheckResult(const engine::SearchContext::Id2DistanceMap& src_1, - const engine::SearchContext::Id2DistanceMap& src_2, - const engine::SearchContext::Id2DistanceMap& target, +void CheckResult(const scheduler::Id2DistanceMap& src_1, + const scheduler::Id2DistanceMap& src_2, + const scheduler::Id2DistanceMap& target, bool ascending) { for(uint64_t i = 0; i < target.size() - 1; i++) { if(ascending) { @@ -81,7 +80,7 @@ void CheckResult(const engine::SearchContext::Id2DistanceMap& src_1, void CheckCluster(const std::vector& target_ids, const std::vector& target_distence, - const engine::SearchContext::ResultSet& src_result, + const scheduler::ResultSet& src_result, int64_t nq, int64_t topk) { ASSERT_EQ(src_result.size(), nq); @@ -98,7 +97,7 @@ void CheckCluster(const std::vector& target_ids, } } -void CheckTopkResult(const engine::SearchContext::ResultSet& src_result, +void CheckTopkResult(const scheduler::ResultSet& src_result, bool ascending, int64_t nq, int64_t topk) { @@ -127,7 +126,7 @@ TEST(DBSearchTest, TOPK_TEST) { bool ascending = true; std::vector target_ids; std::vector target_distence; - engine::SearchContext::ResultSet src_result; + scheduler::ResultSet src_result; auto status = engine::XSearchTask::ClusterResult(target_ids, target_distence, NQ, TOP_K, src_result); ASSERT_FALSE(status.ok()); ASSERT_TRUE(src_result.empty()); @@ -137,7 +136,7 @@ TEST(DBSearchTest, TOPK_TEST) { ASSERT_TRUE(status.ok()); ASSERT_EQ(src_result.size(), NQ); - engine::SearchContext::ResultSet target_result; + scheduler::ResultSet target_result; status = engine::XSearchTask::TopkResult(target_result, TOP_K, ascending, target_result); ASSERT_TRUE(status.ok()); @@ -179,7 +178,7 @@ TEST(DBSearchTest, MERGE_TEST) { std::vector target_distence; std::vector src_ids; std::vector src_distence; - engine::SearchContext::ResultSet src_result, target_result; + scheduler::ResultSet src_result, target_result; uint64_t src_count = 5, target_count = 8; BuildResult(1, src_count, ascending, src_ids, src_distence); @@ -190,8 +189,8 @@ TEST(DBSearchTest, MERGE_TEST) { ASSERT_TRUE(status.ok()); { - engine::SearchContext::Id2DistanceMap src = src_result[0]; - engine::SearchContext::Id2DistanceMap target = target_result[0]; + scheduler::Id2DistanceMap src = src_result[0]; + scheduler::Id2DistanceMap target = target_result[0]; status = engine::XSearchTask::MergeResult(src, target, 10, ascending); ASSERT_TRUE(status.ok()); ASSERT_EQ(target.size(), 10); @@ -199,8 +198,8 @@ TEST(DBSearchTest, MERGE_TEST) { } { - engine::SearchContext::Id2DistanceMap src = src_result[0]; - engine::SearchContext::Id2DistanceMap target; + scheduler::Id2DistanceMap src = src_result[0]; + scheduler::Id2DistanceMap target; status = engine::XSearchTask::MergeResult(src, target, 10, ascending); ASSERT_TRUE(status.ok()); ASSERT_EQ(target.size(), src_count); @@ -209,8 +208,8 @@ TEST(DBSearchTest, MERGE_TEST) { } { - engine::SearchContext::Id2DistanceMap src = src_result[0]; - engine::SearchContext::Id2DistanceMap target = target_result[0]; + scheduler::Id2DistanceMap src = src_result[0]; + scheduler::Id2DistanceMap target = target_result[0]; status = engine::XSearchTask::MergeResult(src, target, 30, ascending); ASSERT_TRUE(status.ok()); ASSERT_EQ(target.size(), src_count + target_count); @@ -218,8 +217,8 @@ TEST(DBSearchTest, MERGE_TEST) { } { - engine::SearchContext::Id2DistanceMap target = src_result[0]; - engine::SearchContext::Id2DistanceMap src = target_result[0]; + scheduler::Id2DistanceMap target = src_result[0]; + scheduler::Id2DistanceMap src = target_result[0]; status = engine::XSearchTask::MergeResult(src, target, 30, ascending); ASSERT_TRUE(status.ok()); ASSERT_EQ(target.size(), src_count + target_count); @@ -235,7 +234,7 @@ TEST(DBSearchTest, PARALLEL_CLUSTER_TEST) { bool ascending = true; std::vector target_ids; std::vector target_distence; - engine::SearchContext::ResultSet src_result; + scheduler::ResultSet src_result; auto DoCluster = [&](int64_t nq, int64_t topk) { TimeRecorder rc("DoCluster"); @@ -270,11 +269,11 @@ TEST(DBSearchTest, PARALLEL_TOPK_TEST) { std::vector target_ids; std::vector target_distence; - engine::SearchContext::ResultSet src_result; + scheduler::ResultSet src_result; std::vector insufficient_ids; std::vector insufficient_distence; - engine::SearchContext::ResultSet insufficient_result; + scheduler::ResultSet insufficient_result; auto DoTopk = [&](int64_t nq, int64_t topk,int64_t insufficient_topk, bool ascending) { src_result.clear(); diff --git a/cpp/unittest/db/utils.cpp b/cpp/unittest/db/utils.cpp index 4bc9699e05..a956215e49 100644 --- a/cpp/unittest/db/utils.cpp +++ b/cpp/unittest/db/utils.cpp @@ -80,7 +80,7 @@ void DBTest::SetUp() { auto res_mgr = engine::ResMgrInst::GetInstance(); res_mgr->Clear(); res_mgr->Add(engine::ResourceFactory::Create("disk", "DISK", 0, true, false)); - res_mgr->Add(engine::ResourceFactory::Create("cpu", "CPU", 0, true, true)); + res_mgr->Add(engine::ResourceFactory::Create("cpu", "CPU", 0, true, false)); res_mgr->Add(engine::ResourceFactory::Create("gtx1660", "GPU", 0, true, true)); auto default_conn = engine::Connection("IO", 500.0); @@ -90,6 +90,8 @@ void DBTest::SetUp() { res_mgr->Start(); engine::SchedInst::GetInstance()->Start(); + engine::JobMgrInst::GetInstance()->Start(); + auto options = GetOptions(); db_ = engine::DBFactory::Build(options); } @@ -100,8 +102,9 @@ void DBTest::TearDown() { BaseTest::TearDown(); - engine::ResMgrInst::GetInstance()->Stop(); + engine::JobMgrInst::GetInstance()->Stop(); engine::SchedInst::GetInstance()->Stop(); + engine::ResMgrInst::GetInstance()->Stop(); auto options = GetOptions(); boost::filesystem::remove_all(options.meta.path); diff --git a/cpp/unittest/scheduler/CMakeLists.txt b/cpp/unittest/scheduler/CMakeLists.txt index 7359420e16..106b1a8b44 100644 --- a/cpp/unittest/scheduler/CMakeLists.txt +++ b/cpp/unittest/scheduler/CMakeLists.txt @@ -30,6 +30,7 @@ aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler/task scheduler_task_files aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/action scheduler_action_srcs) aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/event scheduler_event_srcs) +aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/job scheduler_job_srcs) aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/resource scheduler_resource_srcs) aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/task scheduler_task_srcs) aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler scheduler_srcs) @@ -55,6 +56,7 @@ set(scheduler_test_src ${test_srcs} ${scheduler_action_srcs} ${scheduler_event_srcs} + ${scheduler_job_srcs} ${scheduler_resource_srcs} ${scheduler_task_srcs} ${scheduler_srcs} diff --git a/cpp/unittest/scheduler/normal_test.cpp b/cpp/unittest/scheduler/normal_test.cpp index 5ddfd49b55..02ed3dae69 100644 --- a/cpp/unittest/scheduler/normal_test.cpp +++ b/cpp/unittest/scheduler/normal_test.cpp @@ -46,7 +46,7 @@ TEST(NormalTest, INST_TEST) { const uint64_t NUM_TASK = 1000; std::vector> tasks; - TableFileSchemaPtr dummy = nullptr; + meta::TableFileSchemaPtr dummy = nullptr; auto disks = res_mgr->GetDiskResources(); ASSERT_FALSE(disks.empty()); diff --git a/cpp/unittest/scheduler/resource_mgr_test.cpp b/cpp/unittest/scheduler/resource_mgr_test.cpp index 56db020058..8aaef5bdf5 100644 --- a/cpp/unittest/scheduler/resource_mgr_test.cpp +++ b/cpp/unittest/scheduler/resource_mgr_test.cpp @@ -187,7 +187,7 @@ TEST_F(ResourceMgrAdvanceTest, REGISTER_SUBSCRIBER) { flag = true; }; mgr1_->RegisterSubscriber(callback); - TableFileSchemaPtr dummy = nullptr; + meta::TableFileSchemaPtr dummy = nullptr; disk_res->task_table().Put(std::make_shared(dummy)); sleep(1); ASSERT_TRUE(flag); diff --git a/cpp/unittest/scheduler/resource_test.cpp b/cpp/unittest/scheduler/resource_test.cpp index b5a83ac776..1fdeb13062 100644 --- a/cpp/unittest/scheduler/resource_test.cpp +++ b/cpp/unittest/scheduler/resource_test.cpp @@ -180,7 +180,7 @@ protected: TEST_F(ResourceAdvanceTest, DISK_RESOURCE_TEST) { const uint64_t NUM = 100; std::vector> tasks; - TableFileSchemaPtr dummy = nullptr; + meta::TableFileSchemaPtr dummy = nullptr; for (uint64_t i = 0; i < NUM; ++i) { auto task = std::make_shared(dummy); tasks.push_back(task); @@ -205,7 +205,7 @@ TEST_F(ResourceAdvanceTest, DISK_RESOURCE_TEST) { TEST_F(ResourceAdvanceTest, CPU_RESOURCE_TEST) { const uint64_t NUM = 100; std::vector> tasks; - TableFileSchemaPtr dummy = nullptr; + meta::TableFileSchemaPtr dummy = nullptr; for (uint64_t i = 0; i < NUM; ++i) { auto task = std::make_shared(dummy); tasks.push_back(task); @@ -230,7 +230,7 @@ TEST_F(ResourceAdvanceTest, CPU_RESOURCE_TEST) { TEST_F(ResourceAdvanceTest, GPU_RESOURCE_TEST) { const uint64_t NUM = 100; std::vector> tasks; - TableFileSchemaPtr dummy = nullptr; + meta::TableFileSchemaPtr dummy = nullptr; for (uint64_t i = 0; i < NUM; ++i) { auto task = std::make_shared(dummy); tasks.push_back(task); @@ -255,7 +255,7 @@ TEST_F(ResourceAdvanceTest, GPU_RESOURCE_TEST) { TEST_F(ResourceAdvanceTest, TEST_RESOURCE_TEST) { const uint64_t NUM = 100; std::vector> tasks; - TableFileSchemaPtr dummy = nullptr; + meta::TableFileSchemaPtr dummy = nullptr; for (uint64_t i = 0; i < NUM; ++i) { auto task = std::make_shared(dummy); tasks.push_back(task); diff --git a/cpp/unittest/scheduler/scheduler_test.cpp b/cpp/unittest/scheduler/scheduler_test.cpp index 6f680f4a60..440a66bcbd 100644 --- a/cpp/unittest/scheduler/scheduler_test.cpp +++ b/cpp/unittest/scheduler/scheduler_test.cpp @@ -157,7 +157,7 @@ insert_dummy_index_into_gpu_cache(uint64_t device_id) { TEST_F(SchedulerTest, ON_LOAD_COMPLETED) { const uint64_t NUM = 10; std::vector> tasks; - TableFileSchemaPtr dummy = std::make_shared(); + meta::TableFileSchemaPtr dummy = std::make_shared(); dummy->location_ = "location"; insert_dummy_index_into_gpu_cache(1); @@ -177,7 +177,7 @@ TEST_F(SchedulerTest, ON_LOAD_COMPLETED) { TEST_F(SchedulerTest, PUSH_TASK_TO_NEIGHBOUR_RANDOMLY_TEST) { const uint64_t NUM = 10; std::vector> tasks; - TableFileSchemaPtr dummy1 = std::make_shared(); + meta::TableFileSchemaPtr dummy1 = std::make_shared(); dummy1->location_ = "location"; tasks.clear(); @@ -248,7 +248,7 @@ protected: TEST_F(SchedulerTest2, SPECIFIED_RESOURCE_TEST) { const uint64_t NUM = 10; std::vector> tasks; - TableFileSchemaPtr dummy = std::make_shared(); + meta::TableFileSchemaPtr dummy = std::make_shared(); dummy->location_ = "location"; for (uint64_t i = 0; i < NUM; ++i) { diff --git a/cpp/unittest/scheduler/tasktable_test.cpp b/cpp/unittest/scheduler/tasktable_test.cpp index 9e1f9b2c3a..0c3f65e146 100644 --- a/cpp/unittest/scheduler/tasktable_test.cpp +++ b/cpp/unittest/scheduler/tasktable_test.cpp @@ -169,7 +169,7 @@ class TaskTableBaseTest : public ::testing::Test { protected: void SetUp() override { - TableFileSchemaPtr dummy = nullptr; + meta::TableFileSchemaPtr dummy = nullptr; invalid_task_ = nullptr; task1_ = std::make_shared(dummy); task2_ = std::make_shared(dummy); @@ -339,7 +339,7 @@ class TaskTableAdvanceTest : public ::testing::Test { protected: void SetUp() override { - TableFileSchemaPtr dummy = nullptr; + meta::TableFileSchemaPtr dummy = nullptr; for (uint64_t i = 0; i < 8; ++i) { auto task = std::make_shared(dummy); table1_.Put(task);