mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-30 23:45:28 +08:00
complete BuildIndexJob and BuildIndexTask
Former-commit-id: 3b5d96f4800860b7b3fb09fa3d64b9b3f41441b8
This commit is contained in:
parent
35cfea9210
commit
b815cc36fe
@ -28,6 +28,7 @@
|
||||
#include "scheduler/SchedInst.h"
|
||||
#include "scheduler/job/DeleteJob.h"
|
||||
#include "scheduler/job/SearchJob.h"
|
||||
#include "scheduler/job/BuildIndexJob.h"
|
||||
#include "utils/Log.h"
|
||||
#include "utils/TimeRecorder.h"
|
||||
|
||||
@ -39,6 +40,7 @@
|
||||
#include <iostream>
|
||||
#include <thread>
|
||||
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
@ -51,7 +53,7 @@ constexpr uint64_t INDEX_ACTION_INTERVAL = 1;
|
||||
|
||||
} // namespace
|
||||
|
||||
DBImpl::DBImpl(const DBOptions& options)
|
||||
DBImpl::DBImpl(const DBOptions &options)
|
||||
: options_(options), shutting_down_(true), compact_thread_pool_(1, 1), index_thread_pool_(1, 1) {
|
||||
meta_ptr_ = MetaFactory::Build(options.meta_, options.mode_);
|
||||
mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_);
|
||||
@ -111,7 +113,7 @@ DBImpl::DropAll() {
|
||||
}
|
||||
|
||||
Status
|
||||
DBImpl::CreateTable(meta::TableSchema& table_schema) {
|
||||
DBImpl::CreateTable(meta::TableSchema &table_schema) {
|
||||
if (shutting_down_.load(std::memory_order_acquire)) {
|
||||
return Status(DB_ERROR, "Milsvus server is shutdown!");
|
||||
}
|
||||
@ -122,7 +124,7 @@ DBImpl::CreateTable(meta::TableSchema& table_schema) {
|
||||
}
|
||||
|
||||
Status
|
||||
DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& dates) {
|
||||
DBImpl::DeleteTable(const std::string &table_id, const meta::DatesT &dates) {
|
||||
if (shutting_down_.load(std::memory_order_acquire)) {
|
||||
return Status(DB_ERROR, "Milsvus server is shutdown!");
|
||||
}
|
||||
@ -147,7 +149,7 @@ DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& dates) {
|
||||
}
|
||||
|
||||
Status
|
||||
DBImpl::DescribeTable(meta::TableSchema& table_schema) {
|
||||
DBImpl::DescribeTable(meta::TableSchema &table_schema) {
|
||||
if (shutting_down_.load(std::memory_order_acquire)) {
|
||||
return Status(DB_ERROR, "Milsvus server is shutdown!");
|
||||
}
|
||||
@ -158,7 +160,7 @@ DBImpl::DescribeTable(meta::TableSchema& table_schema) {
|
||||
}
|
||||
|
||||
Status
|
||||
DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
|
||||
DBImpl::HasTable(const std::string &table_id, bool &has_or_not) {
|
||||
if (shutting_down_.load(std::memory_order_acquire)) {
|
||||
return Status(DB_ERROR, "Milsvus server is shutdown!");
|
||||
}
|
||||
@ -167,7 +169,7 @@ DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
|
||||
}
|
||||
|
||||
Status
|
||||
DBImpl::AllTables(std::vector<meta::TableSchema>& table_schema_array) {
|
||||
DBImpl::AllTables(std::vector<meta::TableSchema> &table_schema_array) {
|
||||
if (shutting_down_.load(std::memory_order_acquire)) {
|
||||
return Status(DB_ERROR, "Milsvus server is shutdown!");
|
||||
}
|
||||
@ -176,7 +178,7 @@ DBImpl::AllTables(std::vector<meta::TableSchema>& table_schema_array) {
|
||||
}
|
||||
|
||||
Status
|
||||
DBImpl::PreloadTable(const std::string& table_id) {
|
||||
DBImpl::PreloadTable(const std::string &table_id) {
|
||||
if (shutting_down_.load(std::memory_order_acquire)) {
|
||||
return Status(DB_ERROR, "Milsvus server is shutdown!");
|
||||
}
|
||||
@ -195,11 +197,11 @@ DBImpl::PreloadTable(const std::string& table_id) {
|
||||
int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
|
||||
int64_t available_size = cache_total - cache_usage;
|
||||
|
||||
for (auto& day_files : files) {
|
||||
for (auto& file : day_files.second) {
|
||||
for (auto &day_files : files) {
|
||||
for (auto &file : day_files.second) {
|
||||
ExecutionEnginePtr engine =
|
||||
EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_,
|
||||
(MetricType)file.metric_type_, file.nlist_);
|
||||
EngineFactory::Build(file.dimension_, file.location_, (EngineType) file.engine_type_,
|
||||
(MetricType) file.metric_type_, file.nlist_);
|
||||
if (engine == nullptr) {
|
||||
ENGINE_LOG_ERROR << "Invalid engine type";
|
||||
return Status(DB_ERROR, "Invalid engine type");
|
||||
@ -212,7 +214,7 @@ DBImpl::PreloadTable(const std::string& table_id) {
|
||||
try {
|
||||
// step 1: load index
|
||||
engine->Load(true);
|
||||
} catch (std::exception& ex) {
|
||||
} catch (std::exception &ex) {
|
||||
std::string msg = "Pre-load table encounter exception: " + std::string(ex.what());
|
||||
ENGINE_LOG_ERROR << msg;
|
||||
return Status(DB_ERROR, msg);
|
||||
@ -224,7 +226,7 @@ DBImpl::PreloadTable(const std::string& table_id) {
|
||||
}
|
||||
|
||||
Status
|
||||
DBImpl::UpdateTableFlag(const std::string& table_id, int64_t flag) {
|
||||
DBImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) {
|
||||
if (shutting_down_.load(std::memory_order_acquire)) {
|
||||
return Status(DB_ERROR, "Milsvus server is shutdown!");
|
||||
}
|
||||
@ -233,7 +235,7 @@ DBImpl::UpdateTableFlag(const std::string& table_id, int64_t flag) {
|
||||
}
|
||||
|
||||
Status
|
||||
DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) {
|
||||
DBImpl::GetTableRowCount(const std::string &table_id, uint64_t &row_count) {
|
||||
if (shutting_down_.load(std::memory_order_acquire)) {
|
||||
return Status(DB_ERROR, "Milsvus server is shutdown!");
|
||||
}
|
||||
@ -242,7 +244,7 @@ DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) {
|
||||
}
|
||||
|
||||
Status
|
||||
DBImpl::InsertVectors(const std::string& table_id_, uint64_t n, const float* vectors, IDNumbers& vector_ids_) {
|
||||
DBImpl::InsertVectors(const std::string &table_id_, uint64_t n, const float *vectors, IDNumbers &vector_ids_) {
|
||||
// ENGINE_LOG_DEBUG << "Insert " << n << " vectors to cache";
|
||||
if (shutting_down_.load(std::memory_order_acquire)) {
|
||||
return Status(DB_ERROR, "Milsvus server is shutdown!");
|
||||
@ -261,7 +263,7 @@ DBImpl::InsertVectors(const std::string& table_id_, uint64_t n, const float* vec
|
||||
}
|
||||
|
||||
Status
|
||||
DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) {
|
||||
DBImpl::CreateIndex(const std::string &table_id, const TableIndex &index) {
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(build_index_mutex_);
|
||||
|
||||
@ -295,15 +297,15 @@ DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) {
|
||||
// for IDMAP type, only wait all NEW file converted to RAW file
|
||||
// for other type, wait NEW/RAW/NEW_MERGE/NEW_INDEX/TO_INDEX files converted to INDEX files
|
||||
std::vector<int> file_types;
|
||||
if (index.engine_type_ == (int)EngineType::FAISS_IDMAP) {
|
||||
if (index.engine_type_ == (int) EngineType::FAISS_IDMAP) {
|
||||
file_types = {
|
||||
(int)meta::TableFileSchema::NEW, (int)meta::TableFileSchema::NEW_MERGE,
|
||||
(int) meta::TableFileSchema::NEW, (int) meta::TableFileSchema::NEW_MERGE,
|
||||
};
|
||||
} else {
|
||||
file_types = {
|
||||
(int)meta::TableFileSchema::RAW, (int)meta::TableFileSchema::NEW,
|
||||
(int)meta::TableFileSchema::NEW_MERGE, (int)meta::TableFileSchema::NEW_INDEX,
|
||||
(int)meta::TableFileSchema::TO_INDEX,
|
||||
(int) meta::TableFileSchema::RAW, (int) meta::TableFileSchema::NEW,
|
||||
(int) meta::TableFileSchema::NEW_MERGE, (int) meta::TableFileSchema::NEW_INDEX,
|
||||
(int) meta::TableFileSchema::TO_INDEX,
|
||||
};
|
||||
}
|
||||
|
||||
@ -313,7 +315,7 @@ DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) {
|
||||
|
||||
while (!file_ids.empty()) {
|
||||
ENGINE_LOG_DEBUG << "Non index files detected! Will build index " << times;
|
||||
if (index.engine_type_ != (int)EngineType::FAISS_IDMAP) {
|
||||
if (index.engine_type_ != (int) EngineType::FAISS_IDMAP) {
|
||||
status = meta_ptr_->UpdateTableFilesToIndex(table_id);
|
||||
}
|
||||
|
||||
@ -326,19 +328,19 @@ DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) {
|
||||
}
|
||||
|
||||
Status
|
||||
DBImpl::DescribeIndex(const std::string& table_id, TableIndex& index) {
|
||||
DBImpl::DescribeIndex(const std::string &table_id, TableIndex &index) {
|
||||
return meta_ptr_->DescribeTableIndex(table_id, index);
|
||||
}
|
||||
|
||||
Status
|
||||
DBImpl::DropIndex(const std::string& table_id) {
|
||||
DBImpl::DropIndex(const std::string &table_id) {
|
||||
ENGINE_LOG_DEBUG << "Drop index for table: " << table_id;
|
||||
return meta_ptr_->DropTableIndex(table_id);
|
||||
}
|
||||
|
||||
Status
|
||||
DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors,
|
||||
QueryResults& results) {
|
||||
DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t nprobe, const float *vectors,
|
||||
QueryResults &results) {
|
||||
if (shutting_down_.load(std::memory_order_acquire)) {
|
||||
return Status(DB_ERROR, "Milsvus server is shutdown!");
|
||||
}
|
||||
@ -350,8 +352,8 @@ DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t npr
|
||||
}
|
||||
|
||||
Status
|
||||
DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors,
|
||||
const meta::DatesT& dates, QueryResults& results) {
|
||||
DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t nprobe, const float *vectors,
|
||||
const meta::DatesT &dates, QueryResults &results) {
|
||||
if (shutting_down_.load(std::memory_order_acquire)) {
|
||||
return Status(DB_ERROR, "Milsvus server is shutdown!");
|
||||
}
|
||||
@ -367,8 +369,8 @@ DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t npr
|
||||
}
|
||||
|
||||
meta::TableFilesSchema file_id_array;
|
||||
for (auto& day_files : files) {
|
||||
for (auto& file : day_files.second) {
|
||||
for (auto &day_files : files) {
|
||||
for (auto &file : day_files.second) {
|
||||
file_id_array.push_back(file);
|
||||
}
|
||||
}
|
||||
@ -380,8 +382,8 @@ DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t npr
|
||||
}
|
||||
|
||||
Status
|
||||
DBImpl::Query(const std::string& table_id, const std::vector<std::string>& file_ids, uint64_t k, uint64_t nq,
|
||||
uint64_t nprobe, const float* vectors, const meta::DatesT& dates, QueryResults& results) {
|
||||
DBImpl::Query(const std::string &table_id, const std::vector<std::string> &file_ids, uint64_t k, uint64_t nq,
|
||||
uint64_t nprobe, const float *vectors, const meta::DatesT &dates, QueryResults &results) {
|
||||
if (shutting_down_.load(std::memory_order_acquire)) {
|
||||
return Status(DB_ERROR, "Milsvus server is shutdown!");
|
||||
}
|
||||
@ -390,7 +392,7 @@ DBImpl::Query(const std::string& table_id, const std::vector<std::string>& file_
|
||||
|
||||
// get specified files
|
||||
std::vector<size_t> ids;
|
||||
for (auto& id : file_ids) {
|
||||
for (auto &id : file_ids) {
|
||||
meta::TableFileSchema table_file;
|
||||
table_file.table_id_ = table_id;
|
||||
std::string::size_type sz;
|
||||
@ -404,8 +406,8 @@ DBImpl::Query(const std::string& table_id, const std::vector<std::string>& file_
|
||||
}
|
||||
|
||||
meta::TableFilesSchema file_id_array;
|
||||
for (auto& day_files : files_array) {
|
||||
for (auto& file : day_files.second) {
|
||||
for (auto &day_files : files_array) {
|
||||
for (auto &file : day_files.second) {
|
||||
file_id_array.push_back(file);
|
||||
}
|
||||
}
|
||||
@ -421,7 +423,7 @@ DBImpl::Query(const std::string& table_id, const std::vector<std::string>& file_
|
||||
}
|
||||
|
||||
Status
|
||||
DBImpl::Size(uint64_t& result) {
|
||||
DBImpl::Size(uint64_t &result) {
|
||||
if (shutting_down_.load(std::memory_order_acquire)) {
|
||||
return Status(DB_ERROR, "Milsvus server is shutdown!");
|
||||
}
|
||||
@ -433,8 +435,8 @@ DBImpl::Size(uint64_t& result) {
|
||||
// internal methods
|
||||
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
Status
|
||||
DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files, uint64_t k, uint64_t nq,
|
||||
uint64_t nprobe, const float* vectors, const meta::DatesT& dates, QueryResults& results) {
|
||||
DBImpl::QueryAsync(const std::string &table_id, const meta::TableFilesSchema &files, uint64_t k, uint64_t nq,
|
||||
uint64_t nprobe, const float *vectors, const meta::DatesT &dates, QueryResults &results) {
|
||||
server::CollectQueryMetrics metrics(nq);
|
||||
|
||||
TimeRecorder rc("");
|
||||
@ -443,7 +445,7 @@ DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& fi
|
||||
ENGINE_LOG_DEBUG << "Engine query begin, index file count: " << files.size()
|
||||
<< " date range count: " << dates.size();
|
||||
scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(0, k, nq, nprobe, vectors);
|
||||
for (auto& file : files) {
|
||||
for (auto &file : files) {
|
||||
scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
|
||||
job->AddIndexFile(file_ptr);
|
||||
}
|
||||
@ -511,7 +513,7 @@ DBImpl::BackgroundTimerTask() {
|
||||
void
|
||||
DBImpl::WaitMergeFileFinish() {
|
||||
std::lock_guard<std::mutex> lck(compact_result_mutex_);
|
||||
for (auto& iter : compact_thread_results_) {
|
||||
for (auto &iter : compact_thread_results_) {
|
||||
iter.wait();
|
||||
}
|
||||
}
|
||||
@ -519,7 +521,7 @@ DBImpl::WaitMergeFileFinish() {
|
||||
void
|
||||
DBImpl::WaitBuildIndexFinish() {
|
||||
std::lock_guard<std::mutex> lck(index_result_mutex_);
|
||||
for (auto& iter : index_thread_results_) {
|
||||
for (auto &iter : index_thread_results_) {
|
||||
iter.wait();
|
||||
}
|
||||
}
|
||||
@ -560,7 +562,7 @@ DBImpl::MemSerialize() {
|
||||
std::lock_guard<std::mutex> lck(mem_serialize_mutex_);
|
||||
std::set<std::string> temp_table_ids;
|
||||
mem_mgr_->Serialize(temp_table_ids);
|
||||
for (auto& id : temp_table_ids) {
|
||||
for (auto &id : temp_table_ids) {
|
||||
compact_table_ids_.insert(id);
|
||||
}
|
||||
|
||||
@ -605,7 +607,7 @@ DBImpl::StartCompactionTask() {
|
||||
}
|
||||
|
||||
Status
|
||||
DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const meta::TableFilesSchema& files) {
|
||||
DBImpl::MergeFiles(const std::string &table_id, const meta::DateT &date, const meta::TableFilesSchema &files) {
|
||||
ENGINE_LOG_DEBUG << "Merge files for table: " << table_id;
|
||||
|
||||
// step 1: create table file
|
||||
@ -622,13 +624,13 @@ DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const m
|
||||
|
||||
// step 2: merge files
|
||||
ExecutionEnginePtr index =
|
||||
EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_,
|
||||
(MetricType)table_file.metric_type_, table_file.nlist_);
|
||||
EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType) table_file.engine_type_,
|
||||
(MetricType) table_file.metric_type_, table_file.nlist_);
|
||||
|
||||
meta::TableFilesSchema updated;
|
||||
int64_t index_size = 0;
|
||||
|
||||
for (auto& file : files) {
|
||||
for (auto &file : files) {
|
||||
server::CollectMergeFilesMetrics metrics;
|
||||
|
||||
index->Merge(file.location_);
|
||||
@ -644,7 +646,7 @@ DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const m
|
||||
// step 3: serialize to disk
|
||||
try {
|
||||
index->Serialize();
|
||||
} catch (std::exception& ex) {
|
||||
} catch (std::exception &ex) {
|
||||
// typical error: out of disk space or permition denied
|
||||
std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what());
|
||||
ENGINE_LOG_ERROR << msg;
|
||||
@ -662,7 +664,7 @@ DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const m
|
||||
// step 4: update table files state
|
||||
// if index type isn't IDMAP, set file type to TO_INDEX if file size execeed index_file_size
|
||||
// else set file type to RAW, no need to build index
|
||||
if (table_file.engine_type_ != (int)EngineType::FAISS_IDMAP) {
|
||||
if (table_file.engine_type_ != (int) EngineType::FAISS_IDMAP) {
|
||||
table_file.file_type_ = (index->PhysicalSize() >= table_file.index_file_size_) ? meta::TableFileSchema::TO_INDEX
|
||||
: meta::TableFileSchema::RAW;
|
||||
} else {
|
||||
@ -682,7 +684,7 @@ DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const m
|
||||
}
|
||||
|
||||
Status
|
||||
DBImpl::BackgroundMergeFiles(const std::string& table_id) {
|
||||
DBImpl::BackgroundMergeFiles(const std::string &table_id) {
|
||||
meta::DatePartionedTableFilesSchema raw_files;
|
||||
auto status = meta_ptr_->FilesToMerge(table_id, raw_files);
|
||||
if (!status.ok()) {
|
||||
@ -691,7 +693,7 @@ DBImpl::BackgroundMergeFiles(const std::string& table_id) {
|
||||
}
|
||||
|
||||
bool has_merge = false;
|
||||
for (auto& kv : raw_files) {
|
||||
for (auto &kv : raw_files) {
|
||||
auto files = kv.second;
|
||||
if (files.size() < options_.merge_trigger_number_) {
|
||||
ENGINE_LOG_DEBUG << "Files number not greater equal than merge trigger number, skip merge action";
|
||||
@ -714,7 +716,7 @@ DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
|
||||
ENGINE_LOG_TRACE << " Background compaction thread start";
|
||||
|
||||
Status status;
|
||||
for (auto& table_id : table_ids) {
|
||||
for (auto &table_id : table_ids) {
|
||||
status = BackgroundMergeFiles(table_id);
|
||||
if (!status.ok()) {
|
||||
ENGINE_LOG_ERROR << "Merge files for table " << table_id << " failed: " << status.ToString();
|
||||
@ -766,9 +768,9 @@ DBImpl::StartBuildIndexTask(bool force) {
|
||||
}
|
||||
|
||||
Status
|
||||
DBImpl::BuildIndex(const meta::TableFileSchema& file) {
|
||||
ExecutionEnginePtr to_index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_,
|
||||
(MetricType)file.metric_type_, file.nlist_);
|
||||
DBImpl::BuildIndex(const meta::TableFileSchema &file) {
|
||||
ExecutionEnginePtr to_index = EngineFactory::Build(file.dimension_, file.location_, (EngineType) file.engine_type_,
|
||||
(MetricType) file.metric_type_, file.nlist_);
|
||||
if (to_index == nullptr) {
|
||||
ENGINE_LOG_ERROR << "Invalid engine type";
|
||||
return Status(DB_ERROR, "Invalid engine type");
|
||||
@ -799,7 +801,7 @@ DBImpl::BuildIndex(const meta::TableFileSchema& file) {
|
||||
|
||||
try {
|
||||
server::CollectBuildIndexMetrics metrics;
|
||||
index = to_index->BuildIndex(table_file.location_, (EngineType)table_file.engine_type_);
|
||||
index = to_index->BuildIndex(table_file.location_, (EngineType) table_file.engine_type_);
|
||||
if (index == nullptr) {
|
||||
table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
|
||||
status = meta_ptr_->UpdateTableFile(table_file);
|
||||
@ -808,7 +810,7 @@ DBImpl::BuildIndex(const meta::TableFileSchema& file) {
|
||||
|
||||
return status;
|
||||
}
|
||||
} catch (std::exception& ex) {
|
||||
} catch (std::exception &ex) {
|
||||
// typical error: out of gpu memory
|
||||
std::string msg = "BuildIndex encounter exception: " + std::string(ex.what());
|
||||
ENGINE_LOG_ERROR << msg;
|
||||
@ -834,7 +836,7 @@ DBImpl::BuildIndex(const meta::TableFileSchema& file) {
|
||||
// step 5: save index file
|
||||
try {
|
||||
index->Serialize();
|
||||
} catch (std::exception& ex) {
|
||||
} catch (std::exception &ex) {
|
||||
// typical error: out of disk space or permition denied
|
||||
std::string msg = "Serialize index encounter exception: " + std::string(ex.what());
|
||||
ENGINE_LOG_ERROR << msg;
|
||||
@ -877,7 +879,7 @@ DBImpl::BuildIndex(const meta::TableFileSchema& file) {
|
||||
status = meta_ptr_->UpdateTableFile(table_file);
|
||||
ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
|
||||
}
|
||||
} catch (std::exception& ex) {
|
||||
} catch (std::exception &ex) {
|
||||
std::string msg = "Build index encounter exception: " + std::string(ex.what());
|
||||
ENGINE_LOG_ERROR << msg;
|
||||
return Status(DB_ERROR, msg);
|
||||
@ -894,17 +896,40 @@ DBImpl::BackgroundBuildIndex() {
|
||||
meta::TableFilesSchema to_index_files;
|
||||
meta_ptr_->FilesToIndex(to_index_files);
|
||||
Status status;
|
||||
for (auto& file : to_index_files) {
|
||||
status = BuildIndex(file);
|
||||
|
||||
scheduler::BuildIndexJobPtr
|
||||
job = std::make_shared<scheduler::BuildIndexJob>(0);
|
||||
|
||||
// step 2: put build index task to scheduler
|
||||
scheduler::JobMgrInst::GetInstance()->Put(job);
|
||||
for (auto &file : to_index_files) {
|
||||
std::cout << "get to index file" << std::endl;
|
||||
meta::TableFileSchema table_file;
|
||||
table_file.table_id_ = file.table_id_;
|
||||
table_file.date_ = file.date_;
|
||||
table_file.file_type_ =
|
||||
meta::TableFileSchema::NEW_INDEX; // for multi-db-path, distribute index file averagely to each path
|
||||
status = meta_ptr_->CreateTableFile(table_file);
|
||||
if (!status.ok()) {
|
||||
ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString();
|
||||
ENGINE_LOG_ERROR << "Failed to create table file: " << status.ToString();
|
||||
}
|
||||
|
||||
if (shutting_down_.load(std::memory_order_acquire)) {
|
||||
ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action";
|
||||
break;
|
||||
}
|
||||
scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
|
||||
job->AddToIndexFiles(file_ptr, table_file);
|
||||
}
|
||||
job->WaitBuildIndexFinish();
|
||||
|
||||
// for (auto &file : to_index_files) {
|
||||
// status = BuildIndex(file);
|
||||
// if (!status.ok()) {
|
||||
// ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString();
|
||||
// }
|
||||
//
|
||||
// if (shutting_down_.load(std::memory_order_acquire)) {
|
||||
// ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action";
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
|
||||
ENGINE_LOG_TRACE << "Background build index thread exit";
|
||||
}
|
||||
|
||||
@ -15,16 +15,19 @@
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include <src/scheduler/tasklabel/SpecResLabel.h>
|
||||
#include "scheduler/TaskCreator.h"
|
||||
#include "scheduler/tasklabel/BroadcastLabel.h"
|
||||
#include "tasklabel/DefaultLabel.h"
|
||||
#include "SchedInst.h"
|
||||
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace scheduler {
|
||||
|
||||
std::vector<TaskPtr>
|
||||
TaskCreator::Create(const JobPtr& job) {
|
||||
TaskCreator::Create(const JobPtr &job) {
|
||||
switch (job->type()) {
|
||||
case JobType::SEARCH: {
|
||||
return Create(std::static_pointer_cast<SearchJob>(job));
|
||||
@ -32,6 +35,9 @@ TaskCreator::Create(const JobPtr& job) {
|
||||
case JobType::DELETE: {
|
||||
return Create(std::static_pointer_cast<DeleteJob>(job));
|
||||
}
|
||||
case JobType::BUILD: {
|
||||
return Create(std::static_pointer_cast<BuildIndexJob>(job));
|
||||
}
|
||||
default: {
|
||||
// TODO: error
|
||||
return std::vector<TaskPtr>();
|
||||
@ -40,9 +46,9 @@ TaskCreator::Create(const JobPtr& job) {
|
||||
}
|
||||
|
||||
std::vector<TaskPtr>
|
||||
TaskCreator::Create(const SearchJobPtr& job) {
|
||||
TaskCreator::Create(const SearchJobPtr &job) {
|
||||
std::vector<TaskPtr> tasks;
|
||||
for (auto& index_file : job->index_files()) {
|
||||
for (auto &index_file : job->index_files()) {
|
||||
auto task = std::make_shared<XSearchTask>(index_file.second);
|
||||
task->label() = std::make_shared<DefaultLabel>();
|
||||
task->job_ = job;
|
||||
@ -53,7 +59,7 @@ TaskCreator::Create(const SearchJobPtr& job) {
|
||||
}
|
||||
|
||||
std::vector<TaskPtr>
|
||||
TaskCreator::Create(const DeleteJobPtr& job) {
|
||||
TaskCreator::Create(const DeleteJobPtr &job) {
|
||||
std::vector<TaskPtr> tasks;
|
||||
auto task = std::make_shared<XDeleteTask>(job);
|
||||
task->label() = std::make_shared<BroadcastLabel>();
|
||||
@ -63,6 +69,21 @@ TaskCreator::Create(const DeleteJobPtr& job) {
|
||||
return tasks;
|
||||
}
|
||||
|
||||
std::vector<TaskPtr>
|
||||
TaskCreator::Create(const zilliz::milvus::scheduler::BuildIndexJobPtr &job) {
|
||||
std::vector<TaskPtr> tasks;
|
||||
//TODO(yukun): remove "disk" hardcode here
|
||||
ResourcePtr res_ptr = ResMgrInst::GetInstance()->GetResource("disk");
|
||||
|
||||
for (auto &to_index_file : job->to_index_files()) {
|
||||
auto task = std::make_shared<XBuildIndexTask>(to_index_file.second);
|
||||
task->label() = std::make_shared<SpecResLabel>(std::weak_ptr<Resource>(res_ptr));
|
||||
task->job_ = job;
|
||||
tasks.emplace_back(task);
|
||||
}
|
||||
return tasks;
|
||||
}
|
||||
|
||||
} // namespace scheduler
|
||||
} // namespace milvus
|
||||
} // namespace zilliz
|
||||
|
||||
@ -32,6 +32,7 @@
|
||||
#include "job/SearchJob.h"
|
||||
#include "task/DeleteTask.h"
|
||||
#include "task/SearchTask.h"
|
||||
#include "task/BuildIndexTask.h"
|
||||
#include "task/Task.h"
|
||||
|
||||
namespace zilliz {
|
||||
@ -49,6 +50,9 @@ class TaskCreator {
|
||||
|
||||
static std::vector<TaskPtr>
|
||||
Create(const DeleteJobPtr& job);
|
||||
|
||||
static std::vector<TaskPtr>
|
||||
Create(const BuildIndexJobPtr& job);
|
||||
};
|
||||
|
||||
} // namespace scheduler
|
||||
|
||||
@ -20,6 +20,7 @@
|
||||
#include "../Algorithm.h"
|
||||
#include "Action.h"
|
||||
#include "src/cache/GpuCacheMgr.h"
|
||||
#include "src/server/Config.h"
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
@ -142,26 +143,41 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr
|
||||
transport_costs.push_back(transport_cost);
|
||||
paths.emplace_back(path);
|
||||
}
|
||||
|
||||
// step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost
|
||||
uint64_t min_cost = std::numeric_limits<uint64_t>::max();
|
||||
uint64_t min_cost_idx = 0;
|
||||
for (uint64_t i = 0; i < compute_resources.size(); ++i) {
|
||||
if (compute_resources[i]->TotalTasks() == 0) {
|
||||
min_cost_idx = i;
|
||||
break;
|
||||
if (task->job_.lock()->type() == JobType::SEARCH) {
|
||||
// step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost
|
||||
uint64_t min_cost = std::numeric_limits<uint64_t>::max();
|
||||
uint64_t min_cost_idx = 0;
|
||||
for (uint64_t i = 0; i < compute_resources.size(); ++i) {
|
||||
if (compute_resources[i]->TotalTasks() == 0) {
|
||||
min_cost_idx = i;
|
||||
break;
|
||||
}
|
||||
uint64_t cost =
|
||||
compute_resources[i]->TaskAvgCost() * compute_resources[i]->NumOfTaskToExec() + transport_costs[i];
|
||||
if (min_cost > cost) {
|
||||
min_cost = cost;
|
||||
min_cost_idx = i;
|
||||
}
|
||||
}
|
||||
uint64_t cost =
|
||||
compute_resources[i]->TaskAvgCost() * compute_resources[i]->NumOfTaskToExec() + transport_costs[i];
|
||||
if (min_cost > cost) {
|
||||
min_cost = cost;
|
||||
min_cost_idx = i;
|
||||
|
||||
// step 3: set path in task
|
||||
Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1);
|
||||
task->path() = task_path;
|
||||
} else if (task->job_.lock()->type() == JobType::BUILD) {
|
||||
//step2: Read device id in config
|
||||
//get build index gpu resource
|
||||
server::Config &config = server::Config::GetInstance();
|
||||
int32_t build_index_gpu;
|
||||
Status stat = config.GetDBConfigBuildIndexGPU(build_index_gpu);
|
||||
|
||||
for (uint64_t i = 0; i < compute_resources.size(); ++i) {
|
||||
if (compute_resources[i]->device_id() == build_index_gpu) {
|
||||
Path task_path(paths[i], paths[i].size() - 1);
|
||||
task->path() = task_path;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// step 3: set path in task
|
||||
Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1);
|
||||
task->path() = task_path;
|
||||
}
|
||||
|
||||
if (resource->name() == task->path().Last()) {
|
||||
|
||||
63
cpp/src/scheduler/job/BuildIndexJob.cpp
Normal file
63
cpp/src/scheduler/job/BuildIndexJob.cpp
Normal file
@ -0,0 +1,63 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include "BuildIndexJob.h"
|
||||
#include "utils/Log.h"
|
||||
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace scheduler {
|
||||
|
||||
BuildIndexJob::BuildIndexJob(zilliz::milvus::scheduler::JobId id)
|
||||
: Job(id, JobType::BUILD){
|
||||
|
||||
}
|
||||
|
||||
bool
|
||||
BuildIndexJob::AddToIndexFiles(const engine::meta::TableFileSchemaPtr &to_index_file,
|
||||
const TableFileSchema table_file) {
|
||||
std::unique_lock<std::mutex> lock(mutex_);
|
||||
if (to_index_file == nullptr) {
|
||||
return false;
|
||||
}
|
||||
|
||||
SERVER_LOG_DEBUG << "BuildIndexJob " << id() << " add to_index file: " << to_index_file->id_;
|
||||
|
||||
to_index_files_[to_index_file->id_] = to_index_file;
|
||||
table_files_[table_file.id_] = table_file;
|
||||
}
|
||||
|
||||
Status&
|
||||
BuildIndexJob::WaitBuildIndexFinish() {
|
||||
std::unique_lock<std::mutex> lock(mutex_);
|
||||
cv_.wait(lock, [this] { return to_index_files_.empty(); });
|
||||
SERVER_LOG_DEBUG << "BuildIndexJob " << id() << " all done";
|
||||
}
|
||||
|
||||
void
|
||||
BuildIndexJob::BuildIndexDone(size_t to_index_id) {
|
||||
std::unique_lock<std::mutex> lock(mutex_);
|
||||
to_index_files_.erase(to_index_id);
|
||||
cv_.notify_all();
|
||||
SERVER_LOG_DEBUG << "BuildIndexJob " << id() << " finish index file: " << to_index_id;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
86
cpp/src/scheduler/job/BuildIndexJob.h
Normal file
86
cpp/src/scheduler/job/BuildIndexJob.h
Normal file
@ -0,0 +1,86 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
#pragma once
|
||||
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include <list>
|
||||
#include <queue>
|
||||
#include <deque>
|
||||
#include <unordered_map>
|
||||
#include <thread>
|
||||
#include <mutex>
|
||||
#include <condition_variable>
|
||||
#include <memory>
|
||||
|
||||
#include "Job.h"
|
||||
#include "db/meta/Meta.h"
|
||||
#include "scheduler/Definition.h"
|
||||
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace scheduler {
|
||||
|
||||
using engine::meta::TableFileSchemaPtr;
|
||||
|
||||
using Id2ToIndexMap = std::unordered_map<size_t, TableFileSchemaPtr>;
|
||||
using Id2ToTableFileMap = std::unordered_map<size_t, TableFileSchema>;
|
||||
|
||||
class BuildIndexJob : public Job {
|
||||
public:
|
||||
explicit BuildIndexJob(JobId id);
|
||||
|
||||
public:
|
||||
bool
|
||||
AddToIndexFiles(const TableFileSchemaPtr &to_index_file, const TableFileSchema table_file);
|
||||
|
||||
Status &
|
||||
WaitBuildIndexFinish();
|
||||
|
||||
void
|
||||
BuildIndexDone(size_t to_index_id);
|
||||
|
||||
public:
|
||||
// std::string
|
||||
// location() const {
|
||||
// return location_;
|
||||
// }
|
||||
//
|
||||
// EngineType
|
||||
// engine_type() const {
|
||||
// return engine_type_;
|
||||
// }
|
||||
|
||||
Id2ToIndexMap &
|
||||
to_index_files() {
|
||||
return to_index_files_;
|
||||
}
|
||||
|
||||
private:
|
||||
Id2ToIndexMap to_index_files_;
|
||||
Id2ToTableFileMap table_files_;
|
||||
|
||||
std::mutex mutex_;
|
||||
std::condition_variable cv_;
|
||||
};
|
||||
|
||||
using BuildIndexJobPtr = std::shared_ptr<BuildIndexJob>;
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
131
cpp/src/scheduler/task/BuildIndexTask.cpp
Normal file
131
cpp/src/scheduler/task/BuildIndexTask.cpp
Normal file
@ -0,0 +1,131 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include "BuildIndexTask.h"
|
||||
#include "db/engine/EngineFactory.h"
|
||||
#include "metrics/Metrics.h"
|
||||
#include "scheduler/job/BuildIndexJob.h"
|
||||
#include "utils/Log.h"
|
||||
#include "utils/TimeRecorder.h"
|
||||
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <utility>
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace scheduler {
|
||||
|
||||
XBuildIndexTask::XBuildIndexTask(TableFileSchemaPtr file)
|
||||
: Task(TaskType::BuildIndexTask), file_(file) {
|
||||
if (file_) {
|
||||
to_index_engine_ = EngineFactory::Build(file_->dimension_, file_->location_, (EngineType) file_->engine_type_,
|
||||
(MetricType) file_->metric_type_, file_->nlist_);
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
XBuildIndexTask::Load(zilliz::milvus::scheduler::LoadType type, uint8_t device_id) {
|
||||
TimeRecorder rc("");
|
||||
Status stat = Status::OK();
|
||||
std::string error_msg;
|
||||
std::string type_str;
|
||||
|
||||
try {
|
||||
if (type == LoadType::DISK2CPU) {
|
||||
stat = to_index_engine_->Load();
|
||||
type_str = "DISK2CPU";
|
||||
} else if (type == LoadType::CPU2GPU) {
|
||||
stat = to_index_engine_->CopyToGpu(device_id);
|
||||
type_str = "CPU2GPU";
|
||||
} else if (type == LoadType::GPU2CPU) {
|
||||
stat = to_index_engine_->CopyToCpu();
|
||||
type_str = "GPU2CPU";
|
||||
} else {
|
||||
error_msg = "Wrong load type";
|
||||
stat = Status(SERVER_UNEXPECTED_ERROR, error_msg);
|
||||
}
|
||||
} catch (std::exception& ex) {
|
||||
// typical error: out of disk space or permition denied
|
||||
error_msg = "Failed to load to_index file: " + std::string(ex.what());
|
||||
stat = Status(SERVER_UNEXPECTED_ERROR, error_msg);
|
||||
}
|
||||
|
||||
if (!stat.ok()) {
|
||||
Status s;
|
||||
if(stat.ToString().find("out of memory") != std::string::npos) {
|
||||
error_msg = "out of memory: " + type_str;
|
||||
s = Status(SERVER_UNEXPECTED_ERROR, error_msg);
|
||||
} else {
|
||||
error_msg = "Failed to load to_index file: " + type_str;
|
||||
s = Status(SERVER_UNEXPECTED_ERROR, error_msg);
|
||||
}
|
||||
|
||||
if (auto job = job_.lock()) {
|
||||
auto build_index_job = std::static_pointer_cast<scheduler::BuildIndexJob>(job);
|
||||
build_index_job->BuildIndexDone(file_->id_);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
size_t file_size = to_index_engine_->PhysicalSize();
|
||||
|
||||
std::string info = "Load file id:" + std::to_string(file_->id_) + " file type:" +
|
||||
std::to_string(file_->file_type_) + " size:" + std::to_string(file_size) +
|
||||
" bytes from location: " + file_->location_ + " totally cost";
|
||||
double span = rc.ElapseFromBegin(info);
|
||||
|
||||
// to_index_id_ = file_->id_;
|
||||
// to_index_type_ = file_->file_type_;
|
||||
}
|
||||
|
||||
void
|
||||
XBuildIndexTask::Execute() {
|
||||
if (to_index_engine_ == nullptr) {
|
||||
return;
|
||||
}
|
||||
|
||||
TimeRecorder rc("DoBuildIndex file id:" + std::to_string(to_index_id_));
|
||||
|
||||
if (auto job = job_.lock()) {
|
||||
auto build_job = std::static_pointer_cast<scheduler::BuildIndexJob>(job);
|
||||
std::string location = file_->location_;
|
||||
EngineType engine_type = (EngineType)file_->engine_type_;
|
||||
std::shared_ptr<engine::ExecutionEngine> index;
|
||||
|
||||
try {
|
||||
index = to_index_engine_->BuildIndex(location, engine_type);
|
||||
if (index == nullptr) {
|
||||
table_file_.file_type_ = engine::meta::TableFileSchema::TO_DELETE;
|
||||
//TODO: updatetablefile
|
||||
}
|
||||
} catch (std::exception &ex) {
|
||||
ENGINE_LOG_ERROR << "SearchTask encounter exception: " << ex.what();
|
||||
}
|
||||
|
||||
build_job->BuildIndexDone(to_index_id_);
|
||||
}
|
||||
|
||||
rc.ElapseFromBegin("totally cost");
|
||||
|
||||
to_index_engine_ = nullptr;
|
||||
}
|
||||
|
||||
} // namespace scheduler
|
||||
} // namespace milvus
|
||||
} // namespace zilliz
|
||||
49
cpp/src/scheduler/task/BuildIndexTask.h
Normal file
49
cpp/src/scheduler/task/BuildIndexTask.h
Normal file
@ -0,0 +1,49 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "Task.h"
|
||||
#include "scheduler/Definition.h"
|
||||
#include "scheduler/job/BuildIndexJob.h"
|
||||
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace scheduler {
|
||||
|
||||
class XBuildIndexTask : public Task {
|
||||
public:
|
||||
explicit XBuildIndexTask(TableFileSchemaPtr file);
|
||||
|
||||
void
|
||||
Load(LoadType type, uint8_t device_id) override;
|
||||
|
||||
void
|
||||
Execute() override;
|
||||
|
||||
public:
|
||||
TableFileSchemaPtr file_;
|
||||
TableFileSchema table_file_;
|
||||
size_t to_index_id_ = 0;
|
||||
int to_index_type_ = 0;
|
||||
ExecutionEnginePtr to_index_engine_ = nullptr;
|
||||
};
|
||||
|
||||
} // namespace scheduler
|
||||
} // namespace milvus
|
||||
} // namespace zilliz
|
||||
@ -39,6 +39,7 @@ enum class LoadType {
|
||||
enum class TaskType {
|
||||
SearchTask,
|
||||
DeleteTask,
|
||||
BuildIndexTask,
|
||||
TestTask,
|
||||
};
|
||||
|
||||
|
||||
@ -18,13 +18,14 @@
|
||||
#pragma once
|
||||
|
||||
#include "TaskLabel.h"
|
||||
#include "scheduler/ResourceMgr.h"
|
||||
|
||||
#include <memory>
|
||||
#include <string>
|
||||
|
||||
class Resource;
|
||||
|
||||
using ResourceWPtr = std::weak_ptr<Resource>;
|
||||
//class Resource;
|
||||
//
|
||||
//using ResourceWPtr = std::weak_ptr<Resource>;
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user