mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
delete table issue
Former-commit-id: d1a8e64086c11081917058623d344ac3c567a74b
This commit is contained in:
parent
c535c2ad55
commit
e050719b2b
@ -10,10 +10,18 @@ aux_source_directory(config config_files)
|
||||
aux_source_directory(server server_files)
|
||||
aux_source_directory(utils utils_files)
|
||||
aux_source_directory(db db_files)
|
||||
aux_source_directory(db/scheduler db_scheduler_files)
|
||||
aux_source_directory(wrapper wrapper_files)
|
||||
aux_source_directory(metrics metrics_files)
|
||||
|
||||
aux_source_directory(db/scheduler scheduler_files)
|
||||
aux_source_directory(db/scheduler/context scheduler_context_files)
|
||||
aux_source_directory(db/scheduler/task scheduler_task_files)
|
||||
set(db_scheduler_files
|
||||
${scheduler_files}
|
||||
${scheduler_context_files}
|
||||
${scheduler_task_files}
|
||||
)
|
||||
|
||||
set(license_check_files
|
||||
license/LicenseLibrary.cpp
|
||||
license/LicenseCheck.cpp
|
||||
|
||||
@ -8,7 +8,9 @@
|
||||
#include "Log.h"
|
||||
#include "EngineFactory.h"
|
||||
#include "metrics/Metrics.h"
|
||||
#include "scheduler/SearchScheduler.h"
|
||||
#include "scheduler/TaskScheduler.h"
|
||||
#include "scheduler/context/SearchContext.h"
|
||||
#include "scheduler/context/DeleteContext.h"
|
||||
#include "utils/TimeRecorder.h"
|
||||
|
||||
#include <assert.h>
|
||||
@ -130,48 +132,52 @@ void CalcScore(uint64_t vector_count,
|
||||
DBImpl::DBImpl(const Options& options)
|
||||
: options_(options),
|
||||
shutting_down_(false),
|
||||
pMeta_(new meta::DBMetaImpl(options_.meta)),
|
||||
pMemMgr_(new MemManager(pMeta_, options_)),
|
||||
meta_ptr_(new meta::DBMetaImpl(options_.meta)),
|
||||
mem_mgr_(new MemManager(meta_ptr_, options_)),
|
||||
compact_thread_pool_(1, 1),
|
||||
index_thread_pool_(1, 1) {
|
||||
StartTimerTasks();
|
||||
}
|
||||
|
||||
Status DBImpl::CreateTable(meta::TableSchema& table_schema) {
|
||||
return pMeta_->CreateTable(table_schema);
|
||||
return meta_ptr_->CreateTable(table_schema);
|
||||
}
|
||||
|
||||
Status DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& dates) {
|
||||
//dates empty means delete all files of the table
|
||||
if(dates.empty()) {
|
||||
pMemMgr_->EraseMemVector(table_id); //not allow insert
|
||||
pMeta_->DeleteTable(table_id); //soft delete
|
||||
}
|
||||
//dates partly delete files of the table but currently we don't support
|
||||
|
||||
mem_mgr_->EraseMemVector(table_id); //not allow insert
|
||||
meta_ptr_->DeleteTable(table_id); //soft delete table
|
||||
|
||||
//scheduler will determine when to delete table files
|
||||
TaskScheduler& scheduler = TaskScheduler::GetInstance();
|
||||
DeleteContextPtr context = std::make_shared<DeleteContext>(table_id, meta_ptr_);
|
||||
scheduler.Schedule(context);
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status DBImpl::DescribeTable(meta::TableSchema& table_schema) {
|
||||
return pMeta_->DescribeTable(table_schema);
|
||||
return meta_ptr_->DescribeTable(table_schema);
|
||||
}
|
||||
|
||||
Status DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
|
||||
return pMeta_->HasTable(table_id, has_or_not);
|
||||
return meta_ptr_->HasTable(table_id, has_or_not);
|
||||
}
|
||||
|
||||
Status DBImpl::AllTables(std::vector<meta::TableSchema>& table_schema_array) {
|
||||
return pMeta_->AllTables(table_schema_array);
|
||||
return meta_ptr_->AllTables(table_schema_array);
|
||||
}
|
||||
|
||||
Status DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) {
|
||||
return pMeta_->Count(table_id, row_count);
|
||||
return meta_ptr_->Count(table_id, row_count);
|
||||
}
|
||||
|
||||
Status DBImpl::InsertVectors(const std::string& table_id_,
|
||||
uint64_t n, const float* vectors, IDNumbers& vector_ids_) {
|
||||
|
||||
auto start_time = METRICS_NOW_TIME;
|
||||
Status status = pMemMgr_->InsertVectors(table_id_, n, vectors, vector_ids_);
|
||||
Status status = mem_mgr_->InsertVectors(table_id_, n, vectors, vector_ids_);
|
||||
auto end_time = METRICS_NOW_TIME;
|
||||
double total_time = METRICS_MICROSECONDS(start_time,end_time);
|
||||
// std::chrono::microseconds time_span = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
|
||||
@ -203,7 +209,7 @@ Status DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq,
|
||||
|
||||
//get all table files from table
|
||||
meta::DatePartionedTableFilesSchema files;
|
||||
auto status = pMeta_->FilesToSearch(table_id, dates, files);
|
||||
auto status = meta_ptr_->FilesToSearch(table_id, dates, files);
|
||||
if (!status.ok()) { return status; }
|
||||
|
||||
meta::TableFilesSchema file_id_array;
|
||||
@ -225,7 +231,7 @@ Status DBImpl::Query(const std::string& table_id, const std::vector<std::string>
|
||||
for (auto &id : file_ids) {
|
||||
meta::TableFileSchema table_file;
|
||||
table_file.table_id_ = id;
|
||||
auto status = pMeta_->GetTableFile(table_file);
|
||||
auto status = meta_ptr_->GetTableFile(table_file);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
@ -238,7 +244,7 @@ Status DBImpl::Query(const std::string& table_id, const std::vector<std::string>
|
||||
Status DBImpl::QuerySync(const std::string& table_id, uint64_t k, uint64_t nq,
|
||||
const float* vectors, const meta::DatesT& dates, QueryResults& results) {
|
||||
meta::DatePartionedTableFilesSchema files;
|
||||
auto status = pMeta_->FilesToSearch(table_id, dates, files);
|
||||
auto status = meta_ptr_->FilesToSearch(table_id, dates, files);
|
||||
if (!status.ok()) { return status; }
|
||||
|
||||
ENGINE_LOG_DEBUG << "Search DateT Size = " << files.size();
|
||||
@ -387,8 +393,8 @@ Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSch
|
||||
}
|
||||
|
||||
//step 2: put search task to scheduler
|
||||
SearchScheduler& scheduler = SearchScheduler::GetInstance();
|
||||
scheduler.ScheduleSearchTask(context);
|
||||
TaskScheduler& scheduler = TaskScheduler::GetInstance();
|
||||
scheduler.Schedule(context);
|
||||
|
||||
context->WaitResult();
|
||||
|
||||
@ -396,7 +402,7 @@ Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSch
|
||||
auto& context_result = context->GetResult();
|
||||
meta::TableSchema table_schema;
|
||||
table_schema.table_id_ = table_id;
|
||||
pMeta_->DescribeTable(table_schema);
|
||||
meta_ptr_->DescribeTable(table_schema);
|
||||
|
||||
CalcScore(context->nq(), context->vectors(), table_schema.dimension_, context_result, results);
|
||||
|
||||
@ -460,7 +466,7 @@ void DBImpl::StartCompactionTask() {
|
||||
|
||||
//serialize memory data
|
||||
std::vector<std::string> temp_table_ids;
|
||||
pMemMgr_->Serialize(temp_table_ids);
|
||||
mem_mgr_->Serialize(temp_table_ids);
|
||||
for(auto& id : temp_table_ids) {
|
||||
compact_table_ids_.insert(id);
|
||||
}
|
||||
@ -486,10 +492,10 @@ Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
|
||||
meta::TableFileSchema table_file;
|
||||
table_file.table_id_ = table_id;
|
||||
table_file.date_ = date;
|
||||
Status status = pMeta_->CreateTableFile(table_file);
|
||||
Status status = meta_ptr_->CreateTableFile(table_file);
|
||||
|
||||
if (!status.ok()) {
|
||||
LOG(INFO) << status.ToString() << std::endl;
|
||||
ENGINE_LOG_INFO << status.ToString() << std::endl;
|
||||
return status;
|
||||
}
|
||||
|
||||
@ -510,7 +516,7 @@ Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
|
||||
|
||||
file_schema.file_type_ = meta::TableFileSchema::TO_DELETE;
|
||||
updated.push_back(file_schema);
|
||||
LOG(DEBUG) << "Merging file " << file_schema.file_id_;
|
||||
ENGINE_LOG_DEBUG << "Merging file " << file_schema.file_id_;
|
||||
index_size = index->Size();
|
||||
|
||||
if (index_size >= options_.index_trigger_size) break;
|
||||
@ -526,8 +532,8 @@ Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
|
||||
}
|
||||
table_file.size_ = index_size;
|
||||
updated.push_back(table_file);
|
||||
status = pMeta_->UpdateTableFiles(updated);
|
||||
LOG(DEBUG) << "New merged file " << table_file.file_id_ <<
|
||||
status = meta_ptr_->UpdateTableFiles(updated);
|
||||
ENGINE_LOG_DEBUG << "New merged file " << table_file.file_id_ <<
|
||||
" of size=" << index->PhysicalSize()/(1024*1024) << " M";
|
||||
|
||||
index->Cache();
|
||||
@ -537,7 +543,7 @@ Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
|
||||
|
||||
Status DBImpl::BackgroundMergeFiles(const std::string& table_id) {
|
||||
meta::DatePartionedTableFilesSchema raw_files;
|
||||
auto status = pMeta_->FilesToMerge(table_id, raw_files);
|
||||
auto status = meta_ptr_->FilesToMerge(table_id, raw_files);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
@ -569,8 +575,8 @@ void DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
|
||||
}
|
||||
}
|
||||
|
||||
pMeta_->Archive();
|
||||
pMeta_->CleanUpFilesWithTTL(1);
|
||||
meta_ptr_->Archive();
|
||||
meta_ptr_->CleanUpFilesWithTTL(1);
|
||||
}
|
||||
|
||||
void DBImpl::StartBuildIndexTask() {
|
||||
@ -596,27 +602,43 @@ void DBImpl::StartBuildIndexTask() {
|
||||
}
|
||||
|
||||
Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
|
||||
meta::TableFileSchema table_file;
|
||||
table_file.table_id_ = file.table_id_;
|
||||
table_file.date_ = file.date_;
|
||||
Status status = pMeta_->CreateTableFile(table_file);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
|
||||
ExecutionEnginePtr to_index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_);
|
||||
if(to_index == nullptr) {
|
||||
return Status::Error("Invalid engine type");
|
||||
}
|
||||
|
||||
try {
|
||||
//step 1: load index
|
||||
to_index->Load();
|
||||
|
||||
//step 2: create table file
|
||||
meta::TableFileSchema table_file;
|
||||
table_file.table_id_ = file.table_id_;
|
||||
table_file.date_ = file.date_;
|
||||
Status status = meta_ptr_->CreateTableFile(table_file);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
|
||||
//step 3: build index
|
||||
auto start_time = METRICS_NOW_TIME;
|
||||
auto index = to_index->BuildIndex(table_file.location_);
|
||||
auto end_time = METRICS_NOW_TIME;
|
||||
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
|
||||
server::Metrics::GetInstance().BuildIndexDurationSecondsHistogramObserve(total_time);
|
||||
|
||||
//step 4: if table has been deleted, dont save index file
|
||||
bool has_table = false;
|
||||
meta_ptr_->HasTable(file.table_id_, has_table);
|
||||
if(!has_table) {
|
||||
meta_ptr_->DeleteTableFiles(file.table_id_);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
//step 5: save index file
|
||||
index->Serialize();
|
||||
|
||||
//step 6: update meta
|
||||
table_file.file_type_ = meta::TableFileSchema::INDEX;
|
||||
table_file.size_ = index->Size();
|
||||
|
||||
@ -624,13 +646,13 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
|
||||
to_remove.file_type_ = meta::TableFileSchema::TO_DELETE;
|
||||
|
||||
meta::TableFilesSchema update_files = {to_remove, table_file};
|
||||
pMeta_->UpdateTableFiles(update_files);
|
||||
meta_ptr_->UpdateTableFiles(update_files);
|
||||
|
||||
LOG(DEBUG) << "New index file " << table_file.file_id_ << " of size "
|
||||
ENGINE_LOG_DEBUG << "New index file " << table_file.file_id_ << " of size "
|
||||
<< index->PhysicalSize()/(1024*1024) << " M"
|
||||
<< " from file " << to_remove.file_id_;
|
||||
|
||||
index->Cache();
|
||||
//index->Cache();
|
||||
|
||||
} catch (std::exception& ex) {
|
||||
return Status::Error("Build index encounter exception", ex.what());
|
||||
@ -641,10 +663,10 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
|
||||
|
||||
void DBImpl::BackgroundBuildIndex() {
|
||||
meta::TableFilesSchema to_index_files;
|
||||
pMeta_->FilesToIndex(to_index_files);
|
||||
meta_ptr_->FilesToIndex(to_index_files);
|
||||
Status status;
|
||||
for (auto& file : to_index_files) {
|
||||
/* LOG(DEBUG) << "Buiding index for " << file.location; */
|
||||
/* ENGINE_LOG_DEBUG << "Buiding index for " << file.location; */
|
||||
status = BuildIndex(file);
|
||||
if (!status.ok()) {
|
||||
bg_error_ = status;
|
||||
@ -655,22 +677,22 @@ void DBImpl::BackgroundBuildIndex() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
/* LOG(DEBUG) << "All Buiding index Done"; */
|
||||
/* ENGINE_LOG_DEBUG << "All Buiding index Done"; */
|
||||
}
|
||||
|
||||
Status DBImpl::DropAll() {
|
||||
return pMeta_->DropAll();
|
||||
return meta_ptr_->DropAll();
|
||||
}
|
||||
|
||||
Status DBImpl::Size(uint64_t& result) {
|
||||
return pMeta_->Size(result);
|
||||
return meta_ptr_->Size(result);
|
||||
}
|
||||
|
||||
DBImpl::~DBImpl() {
|
||||
shutting_down_.store(true, std::memory_order_release);
|
||||
bg_timer_thread_.join();
|
||||
std::vector<std::string> ids;
|
||||
pMemMgr_->Serialize(ids);
|
||||
mem_mgr_->Serialize(ids);
|
||||
}
|
||||
|
||||
} // namespace engine
|
||||
|
||||
@ -93,8 +93,8 @@ private:
|
||||
|
||||
std::thread bg_timer_thread_;
|
||||
|
||||
MetaPtr pMeta_;
|
||||
MemManagerPtr pMemMgr_;
|
||||
MetaPtr meta_ptr_;
|
||||
MemManagerPtr mem_mgr_;
|
||||
|
||||
server::ThreadPool compact_thread_pool_;
|
||||
std::list<std::future<void>> compact_thread_results_;
|
||||
|
||||
@ -258,6 +258,28 @@ Status DBMetaImpl::DeleteTable(const std::string& table_id) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status DBMetaImpl::DeleteTableFiles(const std::string& table_id) {
|
||||
try {
|
||||
MetricCollector metric;
|
||||
|
||||
//soft delete table files
|
||||
ConnectorPtr->update_all(
|
||||
set(
|
||||
c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE,
|
||||
c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()
|
||||
),
|
||||
where(
|
||||
c(&TableFileSchema::table_id_) == table_id and
|
||||
c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE
|
||||
));
|
||||
|
||||
} catch (std::exception &e) {
|
||||
return HandleException("Encounter exception when delete table files", e);
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status DBMetaImpl::DescribeTable(TableSchema &table_schema) {
|
||||
try {
|
||||
MetricCollector metric;
|
||||
@ -582,74 +604,6 @@ Status DBMetaImpl::FilesToMerge(const std::string &table_id,
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status DBMetaImpl::FilesToDelete(const std::string& table_id,
|
||||
const DatesT& partition,
|
||||
DatePartionedTableFilesSchema& files) {
|
||||
auto now = utils::GetMicroSecTimeStamp();
|
||||
try {
|
||||
if(partition.empty()) {
|
||||
//step 1: get table files by dates
|
||||
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
|
||||
&TableFileSchema::table_id_,
|
||||
&TableFileSchema::file_id_,
|
||||
&TableFileSchema::size_,
|
||||
&TableFileSchema::date_),
|
||||
where(c(&TableFileSchema::table_id_) == table_id));
|
||||
|
||||
//step 2: erase table files from meta
|
||||
for (auto &file : selected) {
|
||||
TableFileSchema table_file;
|
||||
table_file.id_ = std::get<0>(file);
|
||||
table_file.table_id_ = std::get<1>(file);
|
||||
table_file.file_id_ = std::get<2>(file);
|
||||
table_file.size_ = std::get<3>(file);
|
||||
table_file.date_ = std::get<4>(file);
|
||||
GetTableFilePath(table_file);
|
||||
auto dateItr = files.find(table_file.date_);
|
||||
if (dateItr == files.end()) {
|
||||
files[table_file.date_] = TableFilesSchema();
|
||||
}
|
||||
files[table_file.date_].push_back(table_file);
|
||||
|
||||
ConnectorPtr->remove<TableFileSchema>(std::get<0>(file));
|
||||
}
|
||||
|
||||
} else {
|
||||
//step 1: get all table files
|
||||
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
|
||||
&TableFileSchema::table_id_,
|
||||
&TableFileSchema::file_id_,
|
||||
&TableFileSchema::size_,
|
||||
&TableFileSchema::date_),
|
||||
where(in(&TableFileSchema::date_, partition)
|
||||
and c(&TableFileSchema::table_id_) == table_id));
|
||||
|
||||
//step 2: erase table files from meta
|
||||
for (auto &file : selected) {
|
||||
TableFileSchema table_file;
|
||||
table_file.id_ = std::get<0>(file);
|
||||
table_file.table_id_ = std::get<1>(file);
|
||||
table_file.file_id_ = std::get<2>(file);
|
||||
table_file.size_ = std::get<3>(file);
|
||||
table_file.date_ = std::get<4>(file);
|
||||
GetTableFilePath(table_file);
|
||||
auto dateItr = files.find(table_file.date_);
|
||||
if (dateItr == files.end()) {
|
||||
files[table_file.date_] = TableFilesSchema();
|
||||
}
|
||||
files[table_file.date_].push_back(table_file);
|
||||
|
||||
ConnectorPtr->remove<TableFileSchema>(std::get<0>(file));
|
||||
}
|
||||
}
|
||||
|
||||
} catch (std::exception &e) {
|
||||
return HandleException("Encounter exception when iterate delete files", e);
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status DBMetaImpl::GetTableFile(TableFileSchema &file_schema) {
|
||||
|
||||
try {
|
||||
@ -745,41 +699,52 @@ Status DBMetaImpl::DiscardFiles(long to_discard_size) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
LOG(DEBUG) << "About to discard size=" << to_discard_size;
|
||||
ENGINE_LOG_DEBUG << "About to discard size=" << to_discard_size;
|
||||
|
||||
try {
|
||||
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
|
||||
&TableFileSchema::size_),
|
||||
where(c(&TableFileSchema::file_type_)
|
||||
MetricCollector metric;
|
||||
|
||||
auto commited = ConnectorPtr->transaction([&]() mutable {
|
||||
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
|
||||
&TableFileSchema::size_),
|
||||
where(c(&TableFileSchema::file_type_)
|
||||
!= (int) TableFileSchema::TO_DELETE),
|
||||
order_by(&TableFileSchema::id_),
|
||||
limit(10));
|
||||
order_by(&TableFileSchema::id_),
|
||||
limit(10));
|
||||
|
||||
std::vector<int> ids;
|
||||
TableFileSchema table_file;
|
||||
std::vector<int> ids;
|
||||
TableFileSchema table_file;
|
||||
|
||||
for (auto &file : selected) {
|
||||
if (to_discard_size <= 0) break;
|
||||
table_file.id_ = std::get<0>(file);
|
||||
table_file.size_ = std::get<1>(file);
|
||||
ids.push_back(table_file.id_);
|
||||
ENGINE_LOG_DEBUG << "Discard table_file.id=" << table_file.file_id_
|
||||
<< " table_file.size=" << table_file.size_;
|
||||
to_discard_size -= table_file.size_;
|
||||
for (auto &file : selected) {
|
||||
if (to_discard_size <= 0) break;
|
||||
table_file.id_ = std::get<0>(file);
|
||||
table_file.size_ = std::get<1>(file);
|
||||
ids.push_back(table_file.id_);
|
||||
ENGINE_LOG_DEBUG << "Discard table_file.id=" << table_file.file_id_
|
||||
<< " table_file.size=" << table_file.size_;
|
||||
to_discard_size -= table_file.size_;
|
||||
}
|
||||
|
||||
if (ids.size() == 0) {
|
||||
return true;
|
||||
}
|
||||
|
||||
ConnectorPtr->update_all(
|
||||
set(
|
||||
c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE,
|
||||
c(&TableFileSchema::updated_time_) = utils::GetMicroSecTimeStamp()
|
||||
),
|
||||
where(
|
||||
in(&TableFileSchema::id_, ids)
|
||||
));
|
||||
|
||||
return true;
|
||||
});
|
||||
|
||||
if (!commited) {
|
||||
return Status::DBTransactionError("Update table file error");
|
||||
}
|
||||
|
||||
if (ids.size() == 0) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
ConnectorPtr->update_all(
|
||||
set(
|
||||
c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_DELETE
|
||||
),
|
||||
where(
|
||||
in(&TableFileSchema::id_, ids)
|
||||
));
|
||||
|
||||
} catch (std::exception &e) {
|
||||
return HandleException("Encounter exception when discard table file", e);
|
||||
}
|
||||
@ -792,11 +757,21 @@ Status DBMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
|
||||
try {
|
||||
MetricCollector metric;
|
||||
|
||||
auto tables = ConnectorPtr->select(columns(&TableSchema::state_),
|
||||
where(c(&TableSchema::table_id_) == file_schema.table_id_));
|
||||
|
||||
//if the table has been deleted, just mark the table file as TO_DELETE
|
||||
//clean thread will delete the file later
|
||||
if(tables.size() < 1 || std::get<0>(tables[0]) == (int)TableSchema::TO_DELETE) {
|
||||
file_schema.file_type_ = TableFileSchema::TO_DELETE;
|
||||
}
|
||||
|
||||
ConnectorPtr->update(file_schema);
|
||||
|
||||
} catch (std::exception &e) {
|
||||
ENGINE_LOG_DEBUG << "table_id= " << file_schema.table_id_ << " file_id=" << file_schema.file_id_;
|
||||
return HandleException("Encounter exception when update table file", e);
|
||||
std::string msg = "Exception update table file: table_id = " + file_schema.table_id_
|
||||
+ " file_id = " + file_schema.file_id_;
|
||||
return HandleException(msg, e);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
@ -805,16 +780,37 @@ Status DBMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
|
||||
try {
|
||||
MetricCollector metric;
|
||||
|
||||
std::map<std::string, bool> has_tables;
|
||||
for (auto &file : files) {
|
||||
if(has_tables.find(file.table_id_) != has_tables.end()) {
|
||||
continue;
|
||||
}
|
||||
auto tables = ConnectorPtr->select(columns(&TableSchema::id_),
|
||||
where(c(&TableSchema::table_id_) == file.table_id_
|
||||
and c(&TableSchema::state_) != (int) TableSchema::TO_DELETE));
|
||||
if(tables.size() >= 1) {
|
||||
has_tables[file.table_id_] = true;
|
||||
} else {
|
||||
has_tables[file.table_id_] = false;
|
||||
}
|
||||
}
|
||||
|
||||
auto commited = ConnectorPtr->transaction([&]() mutable {
|
||||
for (auto &file : files) {
|
||||
if(!has_tables[file.table_id_]) {
|
||||
file.file_type_ = TableFileSchema::TO_DELETE;
|
||||
}
|
||||
|
||||
file.updated_time_ = utils::GetMicroSecTimeStamp();
|
||||
ConnectorPtr->update(file);
|
||||
}
|
||||
return true;
|
||||
});
|
||||
|
||||
if (!commited) {
|
||||
return Status::DBTransactionError("Update files Error");
|
||||
return Status::DBTransactionError("Update table files error");
|
||||
}
|
||||
|
||||
} catch (std::exception &e) {
|
||||
return HandleException("Encounter exception when update table files", e);
|
||||
}
|
||||
@ -824,35 +820,67 @@ Status DBMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
|
||||
Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
|
||||
auto now = utils::GetMicroSecTimeStamp();
|
||||
try {
|
||||
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
|
||||
&TableFileSchema::table_id_,
|
||||
&TableFileSchema::file_id_,
|
||||
&TableFileSchema::file_type_,
|
||||
&TableFileSchema::size_,
|
||||
&TableFileSchema::date_),
|
||||
where(
|
||||
c(&TableFileSchema::file_type_) == (int) TableFileSchema::TO_DELETE
|
||||
and
|
||||
c(&TableFileSchema::updated_time_)
|
||||
> now - seconds * US_PS));
|
||||
MetricCollector metric;
|
||||
|
||||
TableFilesSchema updated;
|
||||
TableFileSchema table_file;
|
||||
auto files = ConnectorPtr->select(columns(&TableFileSchema::id_,
|
||||
&TableFileSchema::table_id_,
|
||||
&TableFileSchema::file_id_,
|
||||
&TableFileSchema::date_),
|
||||
where(
|
||||
c(&TableFileSchema::file_type_) ==
|
||||
(int) TableFileSchema::TO_DELETE
|
||||
and
|
||||
c(&TableFileSchema::updated_time_)
|
||||
< now - seconds * US_PS));
|
||||
|
||||
for (auto &file : selected) {
|
||||
table_file.id_ = std::get<0>(file);
|
||||
table_file.table_id_ = std::get<1>(file);
|
||||
table_file.file_id_ = std::get<2>(file);
|
||||
table_file.file_type_ = std::get<3>(file);
|
||||
table_file.size_ = std::get<4>(file);
|
||||
table_file.date_ = std::get<5>(file);
|
||||
GetTableFilePath(table_file);
|
||||
if (table_file.file_type_ == TableFileSchema::TO_DELETE) {
|
||||
auto commited = ConnectorPtr->transaction([&]() mutable {
|
||||
TableFileSchema table_file;
|
||||
for (auto &file : files) {
|
||||
table_file.id_ = std::get<0>(file);
|
||||
table_file.table_id_ = std::get<1>(file);
|
||||
table_file.file_id_ = std::get<2>(file);
|
||||
table_file.date_ = std::get<3>(file);
|
||||
GetTableFilePath(table_file);
|
||||
|
||||
ENGINE_LOG_DEBUG << "Removing deleted id =" << table_file.id_ << " location = " << table_file.location_ << std::endl;
|
||||
boost::filesystem::remove(table_file.location_);
|
||||
ConnectorPtr->remove<TableFileSchema>(table_file.id_);
|
||||
|
||||
}
|
||||
ConnectorPtr->remove<TableFileSchema>(table_file.id_);
|
||||
/* LOG(DEBUG) << "Removing deleted id=" << table_file.id << " location=" << table_file.location << std::endl; */
|
||||
return true;
|
||||
});
|
||||
|
||||
if (!commited) {
|
||||
return Status::DBTransactionError("Clean files error");
|
||||
}
|
||||
|
||||
} catch (std::exception &e) {
|
||||
return HandleException("Encounter exception when clean table files", e);
|
||||
}
|
||||
|
||||
try {
|
||||
MetricCollector metric;
|
||||
|
||||
auto tables = ConnectorPtr->select(columns(&TableSchema::id_,
|
||||
&TableSchema::table_id_),
|
||||
where(c(&TableSchema::state_) == (int) TableSchema::TO_DELETE));
|
||||
|
||||
auto commited = ConnectorPtr->transaction([&]() mutable {
|
||||
for (auto &table : tables) {
|
||||
auto table_path = GetTablePath(std::get<1>(table));
|
||||
|
||||
ENGINE_LOG_DEBUG << "Remove table folder: " << table_path;
|
||||
boost::filesystem::remove_all(table_path);
|
||||
ConnectorPtr->remove<TableSchema>(std::get<0>(table));
|
||||
}
|
||||
|
||||
return true;
|
||||
});
|
||||
|
||||
if (!commited) {
|
||||
return Status::DBTransactionError("Clean files error");
|
||||
}
|
||||
|
||||
} catch (std::exception &e) {
|
||||
return HandleException("Encounter exception when clean table files", e);
|
||||
}
|
||||
@ -862,35 +890,21 @@ Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
|
||||
|
||||
Status DBMetaImpl::CleanUp() {
|
||||
try {
|
||||
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
|
||||
&TableFileSchema::table_id_,
|
||||
&TableFileSchema::file_id_,
|
||||
&TableFileSchema::file_type_,
|
||||
&TableFileSchema::size_,
|
||||
&TableFileSchema::date_),
|
||||
where(
|
||||
c(&TableFileSchema::file_type_) == (int) TableFileSchema::TO_DELETE
|
||||
or
|
||||
c(&TableFileSchema::file_type_)
|
||||
== (int) TableFileSchema::NEW));
|
||||
auto files = ConnectorPtr->select(columns(&TableFileSchema::id_),
|
||||
where(c(&TableFileSchema::file_type_) == (int) TableFileSchema::NEW));
|
||||
|
||||
TableFilesSchema updated;
|
||||
TableFileSchema table_file;
|
||||
|
||||
for (auto &file : selected) {
|
||||
table_file.id_ = std::get<0>(file);
|
||||
table_file.table_id_ = std::get<1>(file);
|
||||
table_file.file_id_ = std::get<2>(file);
|
||||
table_file.file_type_ = std::get<3>(file);
|
||||
table_file.size_ = std::get<4>(file);
|
||||
table_file.date_ = std::get<5>(file);
|
||||
GetTableFilePath(table_file);
|
||||
if (table_file.file_type_ == TableFileSchema::TO_DELETE) {
|
||||
boost::filesystem::remove(table_file.location_);
|
||||
auto commited = ConnectorPtr->transaction([&]() mutable {
|
||||
for (auto &file : files) {
|
||||
ENGINE_LOG_DEBUG << "Remove table file type as NEW";
|
||||
ConnectorPtr->remove<TableFileSchema>(std::get<0>(file));
|
||||
}
|
||||
ConnectorPtr->remove<TableFileSchema>(table_file.id_);
|
||||
/* LOG(DEBUG) << "Removing id=" << table_file.id << " location=" << table_file.location << std::endl; */
|
||||
return true;
|
||||
});
|
||||
|
||||
if (!commited) {
|
||||
return Status::DBTransactionError("Clean files error");
|
||||
}
|
||||
|
||||
} catch (std::exception &e) {
|
||||
return HandleException("Encounter exception when clean table file", e);
|
||||
}
|
||||
@ -903,14 +917,12 @@ Status DBMetaImpl::Count(const std::string &table_id, uint64_t &result) {
|
||||
try {
|
||||
MetricCollector metric;
|
||||
|
||||
auto selected = ConnectorPtr->select(columns(&TableFileSchema::size_,
|
||||
&TableFileSchema::date_),
|
||||
where((c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW or
|
||||
c(&TableFileSchema::file_type_) == (int) TableFileSchema::TO_INDEX
|
||||
or
|
||||
c(&TableFileSchema::file_type_) == (int) TableFileSchema::INDEX)
|
||||
and
|
||||
c(&TableFileSchema::table_id_) == table_id));
|
||||
auto selected = ConnectorPtr->select(columns(&TableFileSchema::size_),
|
||||
where((c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW
|
||||
or
|
||||
c(&TableFileSchema::file_type_) == (int) TableFileSchema::TO_INDEX
|
||||
or c(&TableFileSchema::file_type_) == (int) TableFileSchema::INDEX)
|
||||
and c(&TableFileSchema::table_id_) == table_id));
|
||||
|
||||
TableSchema table_schema;
|
||||
table_schema.table_id_ = table_id;
|
||||
|
||||
@ -20,11 +20,13 @@ public:
|
||||
DBMetaImpl(const DBMetaOptions& options_);
|
||||
|
||||
virtual Status CreateTable(TableSchema& table_schema) override;
|
||||
virtual Status DeleteTable(const std::string& table_id) override;
|
||||
virtual Status DescribeTable(TableSchema& group_info_) override;
|
||||
virtual Status HasTable(const std::string& table_id, bool& has_or_not) override;
|
||||
virtual Status AllTables(std::vector<TableSchema>& table_schema_array) override;
|
||||
|
||||
virtual Status DeleteTable(const std::string& table_id) override;
|
||||
virtual Status DeleteTableFiles(const std::string& table_id) override;
|
||||
|
||||
virtual Status CreateTableFile(TableFileSchema& file_schema) override;
|
||||
virtual Status DropPartitionsByDates(const std::string& table_id,
|
||||
const DatesT& dates) override;
|
||||
@ -42,10 +44,6 @@ public:
|
||||
virtual Status FilesToMerge(const std::string& table_id,
|
||||
DatePartionedTableFilesSchema& files) override;
|
||||
|
||||
virtual Status FilesToDelete(const std::string& table_id,
|
||||
const DatesT& partition,
|
||||
DatePartionedTableFilesSchema& files) override;
|
||||
|
||||
virtual Status FilesToIndex(TableFilesSchema&) override;
|
||||
|
||||
virtual Status Archive() override;
|
||||
|
||||
@ -4,8 +4,8 @@
|
||||
* Proprietary and confidential.
|
||||
******************************************************************************/
|
||||
#include "FaissExecutionEngine.h"
|
||||
#include "Log.h"
|
||||
|
||||
#include <easylogging++.h>
|
||||
#include <faiss/AutoTune.h>
|
||||
#include <faiss/MetaIndexes.h>
|
||||
#include <faiss/IndexFlat.h>
|
||||
@ -74,7 +74,7 @@ Status FaissExecutionEngine::Load() {
|
||||
if (!index) {
|
||||
index = read_index(location_);
|
||||
to_cache = true;
|
||||
LOG(DEBUG) << "Disk io from: " << location_;
|
||||
ENGINE_LOG_DEBUG << "Disk io from: " << location_;
|
||||
}
|
||||
|
||||
pIndex_ = index->data();
|
||||
@ -98,6 +98,8 @@ Status FaissExecutionEngine::Merge(const std::string& location) {
|
||||
if (location == location_) {
|
||||
return Status::Error("Cannot Merge Self");
|
||||
}
|
||||
ENGINE_LOG_DEBUG << "Merge index file: " << location << " to: " << location_;
|
||||
|
||||
auto to_merge = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location);
|
||||
if (!to_merge) {
|
||||
to_merge = read_index(location);
|
||||
@ -110,6 +112,8 @@ Status FaissExecutionEngine::Merge(const std::string& location) {
|
||||
|
||||
ExecutionEnginePtr
|
||||
FaissExecutionEngine::BuildIndex(const std::string& location) {
|
||||
ENGINE_LOG_DEBUG << "Build index file: " << location << " from: " << location_;
|
||||
|
||||
auto opd = std::make_shared<Operand>();
|
||||
opd->d = pIndex_->d;
|
||||
opd->index_type = build_index_type_;
|
||||
@ -122,7 +126,6 @@ FaissExecutionEngine::BuildIndex(const std::string& location) {
|
||||
from_index->id_map.data());
|
||||
|
||||
ExecutionEnginePtr new_ee(new FaissExecutionEngine(index->data(), location, build_index_type_, raw_index_type_));
|
||||
new_ee->Serialize();
|
||||
return new_ee;
|
||||
}
|
||||
|
||||
|
||||
@ -24,11 +24,13 @@ public:
|
||||
using Ptr = std::shared_ptr<Meta>;
|
||||
|
||||
virtual Status CreateTable(TableSchema& table_schema) = 0;
|
||||
virtual Status DeleteTable(const std::string& table_id) = 0;
|
||||
virtual Status DescribeTable(TableSchema& table_schema) = 0;
|
||||
virtual Status HasTable(const std::string& table_id, bool& has_or_not) = 0;
|
||||
virtual Status AllTables(std::vector<TableSchema>& table_schema_array) = 0;
|
||||
|
||||
virtual Status DeleteTable(const std::string& table_id) = 0;
|
||||
virtual Status DeleteTableFiles(const std::string& table_id) = 0;
|
||||
|
||||
virtual Status CreateTableFile(TableFileSchema& file_schema) = 0;
|
||||
virtual Status DropPartitionsByDates(const std::string& table_id,
|
||||
const DatesT& dates) = 0;
|
||||
@ -45,10 +47,6 @@ public:
|
||||
virtual Status FilesToMerge(const std::string& table_id,
|
||||
DatePartionedTableFilesSchema& files) = 0;
|
||||
|
||||
virtual Status FilesToDelete(const std::string& table_id,
|
||||
const DatesT& partition,
|
||||
DatePartionedTableFilesSchema& files) = 0;
|
||||
|
||||
virtual Status Size(uint64_t& result) = 0;
|
||||
|
||||
virtual Status Archive() = 0;
|
||||
|
||||
@ -1,64 +0,0 @@
|
||||
/*******************************************************************************
|
||||
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
|
||||
* Unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
* Proprietary and confidential.
|
||||
******************************************************************************/
|
||||
|
||||
|
||||
#include "ScheduleStrategy.h"
|
||||
#include "cache/CpuCacheMgr.h"
|
||||
#include "utils/Error.h"
|
||||
#include "utils/Log.h"
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
class MemScheduleStrategy : public IScheduleStrategy {
|
||||
public:
|
||||
bool Schedule(const SearchContextPtr &search_context, IndexLoaderQueue::LoaderQueue& loader_list) override {
|
||||
if(search_context == nullptr) {
|
||||
return false;
|
||||
}
|
||||
|
||||
SearchContext::Id2IndexMap index_files = search_context->GetIndexMap();
|
||||
//some index loader alread exists
|
||||
for(auto& loader : loader_list) {
|
||||
if(index_files.find(loader->file_->id_) != index_files.end()){
|
||||
SERVER_LOG_INFO << "Append SearchContext to exist IndexLoaderContext";
|
||||
index_files.erase(loader->file_->id_);
|
||||
loader->search_contexts_.push_back(search_context);
|
||||
}
|
||||
}
|
||||
|
||||
//index_files still contains some index files, create new loader
|
||||
for(auto& pair : index_files) {
|
||||
SERVER_LOG_INFO << "Create new IndexLoaderContext for: " << pair.second->location_;
|
||||
IndexLoaderContextPtr new_loader = std::make_shared<IndexLoaderContext>();
|
||||
new_loader->search_contexts_.push_back(search_context);
|
||||
new_loader->file_ = pair.second;
|
||||
|
||||
auto index = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(pair.second->location_);
|
||||
if(index != nullptr) {
|
||||
//if the index file has been in memory, increase its priority
|
||||
loader_list.push_front(new_loader);
|
||||
} else {
|
||||
//index file not in memory, put it to tail
|
||||
loader_list.push_back(new_loader);
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
ScheduleStrategyPtr StrategyFactory::CreateMemStrategy() {
|
||||
ScheduleStrategyPtr strategy(new MemScheduleStrategy());
|
||||
return strategy;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1,180 +0,0 @@
|
||||
/*******************************************************************************
|
||||
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
|
||||
* Unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
* Proprietary and confidential.
|
||||
******************************************************************************/
|
||||
|
||||
#include "SearchScheduler.h"
|
||||
#include "IndexLoaderQueue.h"
|
||||
#include "SearchTask.h"
|
||||
#include "utils/Log.h"
|
||||
#include "utils/TimeRecorder.h"
|
||||
#include "metrics/Metrics.h"
|
||||
#include "db/EngineFactory.h"
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
namespace {
|
||||
void CollectFileMetrics(int file_type, size_t file_size) {
|
||||
switch(file_type) {
|
||||
case meta::TableFileSchema::RAW:
|
||||
case meta::TableFileSchema::TO_INDEX: {
|
||||
server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size);
|
||||
server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size);
|
||||
server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size);
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
server::Metrics::GetInstance().IndexFileSizeHistogramObserve(file_size);
|
||||
server::Metrics::GetInstance().IndexFileSizeTotalIncrement(file_size);
|
||||
server::Metrics::GetInstance().IndexFileSizeGaugeSet(file_size);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void CollectDurationMetrics(int index_type, double total_time) {
|
||||
switch(index_type) {
|
||||
case meta::TableFileSchema::RAW: {
|
||||
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
|
||||
break;
|
||||
}
|
||||
case meta::TableFileSchema::TO_INDEX: {
|
||||
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
SearchScheduler::SearchScheduler()
|
||||
: stopped_(true) {
|
||||
Start();
|
||||
}
|
||||
|
||||
SearchScheduler::~SearchScheduler() {
|
||||
Stop();
|
||||
}
|
||||
|
||||
SearchScheduler& SearchScheduler::GetInstance() {
|
||||
static SearchScheduler s_instance;
|
||||
return s_instance;
|
||||
}
|
||||
|
||||
bool
|
||||
SearchScheduler::Start() {
|
||||
if(!stopped_) {
|
||||
return true;
|
||||
}
|
||||
|
||||
stopped_ = false;
|
||||
|
||||
search_queue_.SetCapacity(2);
|
||||
|
||||
index_load_thread_ = std::make_shared<std::thread>(&SearchScheduler::IndexLoadWorker, this);
|
||||
search_thread_ = std::make_shared<std::thread>(&SearchScheduler::SearchWorker, this);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool
|
||||
SearchScheduler::Stop() {
|
||||
if(stopped_) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if(index_load_thread_) {
|
||||
index_load_queue_.Put(nullptr);
|
||||
index_load_thread_->join();
|
||||
index_load_thread_ = nullptr;
|
||||
}
|
||||
|
||||
if(search_thread_) {
|
||||
search_queue_.Put(nullptr);
|
||||
search_thread_->join();
|
||||
search_thread_ = nullptr;
|
||||
}
|
||||
|
||||
stopped_ = true;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool
|
||||
SearchScheduler::ScheduleSearchTask(SearchContextPtr& search_context) {
|
||||
index_load_queue_.Put(search_context);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool
|
||||
SearchScheduler::IndexLoadWorker() {
|
||||
while(true) {
|
||||
IndexLoaderContextPtr context = index_load_queue_.Take();
|
||||
if(context == nullptr) {
|
||||
SERVER_LOG_INFO << "Stop thread for index loading";
|
||||
break;//exit
|
||||
}
|
||||
|
||||
SERVER_LOG_INFO << "Loading index(" << context->file_->id_ << ") from location: " << context->file_->location_;
|
||||
|
||||
server::TimeRecorder rc("Load index");
|
||||
//step 1: load index
|
||||
ExecutionEnginePtr index_ptr = EngineFactory::Build(context->file_->dimension_,
|
||||
context->file_->location_,
|
||||
(EngineType)context->file_->engine_type_);
|
||||
index_ptr->Load();
|
||||
|
||||
rc.Record("load index file to memory");
|
||||
|
||||
size_t file_size = index_ptr->PhysicalSize();
|
||||
LOG(DEBUG) << "Index file type " << context->file_->file_type_ << " Of Size: "
|
||||
<< file_size/(1024*1024) << " M";
|
||||
|
||||
CollectFileMetrics(context->file_->file_type_, file_size);
|
||||
|
||||
//step 2: put search task into another queue
|
||||
SearchTaskPtr task_ptr = std::make_shared<SearchTask>();
|
||||
task_ptr->index_id_ = context->file_->id_;
|
||||
task_ptr->index_type_ = context->file_->file_type_;
|
||||
task_ptr->index_engine_ = index_ptr;
|
||||
task_ptr->search_contexts_.swap(context->search_contexts_);
|
||||
search_queue_.Put(task_ptr);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool
|
||||
SearchScheduler::SearchWorker() {
|
||||
while(true) {
|
||||
SearchTaskPtr task_ptr = search_queue_.Take();
|
||||
if(task_ptr == nullptr) {
|
||||
SERVER_LOG_INFO << "Stop thread for searching";
|
||||
break;//exit
|
||||
}
|
||||
|
||||
SERVER_LOG_INFO << "Searching in index(" << task_ptr->index_id_<< ") with "
|
||||
<< task_ptr->search_contexts_.size() << " tasks";
|
||||
|
||||
//do search
|
||||
auto start_time = METRICS_NOW_TIME;
|
||||
task_ptr->DoSearch();
|
||||
auto end_time = METRICS_NOW_TIME;
|
||||
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
|
||||
CollectDurationMetrics(task_ptr->index_type_, total_time);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1,48 +0,0 @@
|
||||
/*******************************************************************************
|
||||
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
|
||||
* Unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
* Proprietary and confidential.
|
||||
******************************************************************************/
|
||||
#pragma once
|
||||
|
||||
#include "SearchContext.h"
|
||||
#include "IndexLoaderQueue.h"
|
||||
#include "SearchTask.h"
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
class SearchScheduler {
|
||||
private:
|
||||
SearchScheduler();
|
||||
virtual ~SearchScheduler();
|
||||
|
||||
public:
|
||||
static SearchScheduler& GetInstance();
|
||||
|
||||
bool ScheduleSearchTask(SearchContextPtr& search_context);
|
||||
|
||||
private:
|
||||
bool Start();
|
||||
bool Stop();
|
||||
|
||||
bool IndexLoadWorker();
|
||||
bool SearchWorker();
|
||||
|
||||
private:
|
||||
std::shared_ptr<std::thread> index_load_thread_;
|
||||
std::shared_ptr<std::thread> search_thread_;
|
||||
|
||||
IndexLoaderQueue index_load_queue_;
|
||||
|
||||
using SearchTaskQueue = server::BlockingQueue<SearchTaskPtr>;
|
||||
SearchTaskQueue search_queue_;
|
||||
|
||||
bool stopped_ = true;
|
||||
};
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -4,8 +4,8 @@
|
||||
* Proprietary and confidential.
|
||||
******************************************************************************/
|
||||
|
||||
#include "IndexLoaderQueue.h"
|
||||
#include "ScheduleStrategy.h"
|
||||
#include "TaskDispatchQueue.h"
|
||||
#include "TaskDispatchStrategy.h"
|
||||
#include "utils/Error.h"
|
||||
#include "utils/Log.h"
|
||||
|
||||
@ -14,12 +14,12 @@ namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
void
|
||||
IndexLoaderQueue::Put(const SearchContextPtr &search_context) {
|
||||
TaskDispatchQueue::Put(const ScheduleContextPtr &context) {
|
||||
std::unique_lock <std::mutex> lock(mtx);
|
||||
full_.wait(lock, [this] { return (queue_.size() < capacity_); });
|
||||
|
||||
if(search_context == nullptr) {
|
||||
queue_.push_back(nullptr);
|
||||
if(context == nullptr) {
|
||||
queue_.push_front(nullptr);
|
||||
empty_.notify_all();
|
||||
return;
|
||||
}
|
||||
@ -32,14 +32,13 @@ IndexLoaderQueue::Put(const SearchContextPtr &search_context) {
|
||||
throw server::ServerException(server::SERVER_BLOCKING_QUEUE_EMPTY, error_msg);
|
||||
}
|
||||
|
||||
ScheduleStrategyPtr strategy = StrategyFactory::CreateMemStrategy();
|
||||
strategy->Schedule(search_context, queue_);
|
||||
TaskDispatchStrategy::Schedule(context, queue_);
|
||||
|
||||
empty_.notify_all();
|
||||
}
|
||||
|
||||
IndexLoaderContextPtr
|
||||
IndexLoaderQueue::Take() {
|
||||
ScheduleTaskPtr
|
||||
TaskDispatchQueue::Take() {
|
||||
std::unique_lock <std::mutex> lock(mtx);
|
||||
empty_.wait(lock, [this] { return !queue_.empty(); });
|
||||
|
||||
@ -49,20 +48,20 @@ IndexLoaderQueue::Take() {
|
||||
throw server::ServerException(server::SERVER_BLOCKING_QUEUE_EMPTY, error_msg);
|
||||
}
|
||||
|
||||
IndexLoaderContextPtr front(queue_.front());
|
||||
ScheduleTaskPtr front(queue_.front());
|
||||
queue_.pop_front();
|
||||
full_.notify_all();
|
||||
return front;
|
||||
}
|
||||
|
||||
size_t
|
||||
IndexLoaderQueue::Size() {
|
||||
TaskDispatchQueue::Size() {
|
||||
std::lock_guard <std::mutex> lock(mtx);
|
||||
return queue_.size();
|
||||
}
|
||||
|
||||
IndexLoaderContextPtr
|
||||
IndexLoaderQueue::Front() {
|
||||
ScheduleTaskPtr
|
||||
TaskDispatchQueue::Front() {
|
||||
std::unique_lock <std::mutex> lock(mtx);
|
||||
empty_.wait(lock, [this] { return !queue_.empty(); });
|
||||
if (queue_.empty()) {
|
||||
@ -70,12 +69,12 @@ IndexLoaderQueue::Front() {
|
||||
SERVER_LOG_ERROR << error_msg;
|
||||
throw server::ServerException(server::SERVER_BLOCKING_QUEUE_EMPTY, error_msg);
|
||||
}
|
||||
IndexLoaderContextPtr front(queue_.front());
|
||||
ScheduleTaskPtr front(queue_.front());
|
||||
return front;
|
||||
}
|
||||
|
||||
IndexLoaderContextPtr
|
||||
IndexLoaderQueue::Back() {
|
||||
ScheduleTaskPtr
|
||||
TaskDispatchQueue::Back() {
|
||||
std::unique_lock <std::mutex> lock(mtx);
|
||||
empty_.wait(lock, [this] { return !queue_.empty(); });
|
||||
|
||||
@ -85,18 +84,18 @@ IndexLoaderQueue::Back() {
|
||||
throw server::ServerException(server::SERVER_BLOCKING_QUEUE_EMPTY, error_msg);
|
||||
}
|
||||
|
||||
IndexLoaderContextPtr back(queue_.back());
|
||||
ScheduleTaskPtr back(queue_.back());
|
||||
return back;
|
||||
}
|
||||
|
||||
bool
|
||||
IndexLoaderQueue::Empty() {
|
||||
TaskDispatchQueue::Empty() {
|
||||
std::unique_lock <std::mutex> lock(mtx);
|
||||
return queue_.empty();
|
||||
}
|
||||
|
||||
void
|
||||
IndexLoaderQueue::SetCapacity(const size_t capacity) {
|
||||
TaskDispatchQueue::SetCapacity(const size_t capacity) {
|
||||
capacity_ = (capacity > 0 ? capacity : capacity_);
|
||||
}
|
||||
|
||||
@ -5,7 +5,8 @@
|
||||
******************************************************************************/
|
||||
#pragma once
|
||||
|
||||
#include "SearchContext.h"
|
||||
#include "context/IScheduleContext.h"
|
||||
#include "task/IScheduleTask.h"
|
||||
|
||||
#include <condition_variable>
|
||||
#include <iostream>
|
||||
@ -17,31 +18,23 @@ namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
|
||||
class IndexLoaderContext {
|
||||
class TaskDispatchQueue {
|
||||
public:
|
||||
TableFileSchemaPtr file_;
|
||||
std::vector<SearchContextPtr> search_contexts_;
|
||||
};
|
||||
using IndexLoaderContextPtr = std::shared_ptr<IndexLoaderContext>;
|
||||
TaskDispatchQueue() : mtx(), full_(), empty_() {}
|
||||
|
||||
class IndexLoaderQueue {
|
||||
public:
|
||||
IndexLoaderQueue() : mtx(), full_(), empty_() {}
|
||||
TaskDispatchQueue(const TaskDispatchQueue &rhs) = delete;
|
||||
|
||||
IndexLoaderQueue(const IndexLoaderQueue &rhs) = delete;
|
||||
TaskDispatchQueue &operator=(const TaskDispatchQueue &rhs) = delete;
|
||||
|
||||
IndexLoaderQueue &operator=(const IndexLoaderQueue &rhs) = delete;
|
||||
using TaskList = std::list<ScheduleTaskPtr>;
|
||||
|
||||
using LoaderQueue = std::list<IndexLoaderContextPtr>;
|
||||
void Put(const ScheduleContextPtr &context);
|
||||
|
||||
void Put(const SearchContextPtr &search_context);
|
||||
ScheduleTaskPtr Take();
|
||||
|
||||
IndexLoaderContextPtr Take();
|
||||
ScheduleTaskPtr Front();
|
||||
|
||||
IndexLoaderContextPtr Front();
|
||||
|
||||
IndexLoaderContextPtr Back();
|
||||
ScheduleTaskPtr Back();
|
||||
|
||||
size_t Size();
|
||||
|
||||
@ -54,7 +47,7 @@ private:
|
||||
std::condition_variable full_;
|
||||
std::condition_variable empty_;
|
||||
|
||||
LoaderQueue queue_;
|
||||
TaskList queue_;
|
||||
size_t capacity_ = 1000000;
|
||||
};
|
||||
|
||||
122
cpp/src/db/scheduler/TaskDispatchStrategy.cpp
Normal file
122
cpp/src/db/scheduler/TaskDispatchStrategy.cpp
Normal file
@ -0,0 +1,122 @@
|
||||
/*******************************************************************************
|
||||
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
|
||||
* Unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
* Proprietary and confidential.
|
||||
******************************************************************************/
|
||||
#include "TaskDispatchStrategy.h"
|
||||
#include "context/SearchContext.h"
|
||||
#include "context/DeleteContext.h"
|
||||
#include "task/IndexLoadTask.h"
|
||||
#include "task/DeleteTask.h"
|
||||
#include "cache/CpuCacheMgr.h"
|
||||
#include "utils/Error.h"
|
||||
#include "db/Log.h"
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
class ReuseCacheIndexStrategy {
|
||||
public:
|
||||
bool Schedule(const SearchContextPtr &context, std::list<ScheduleTaskPtr>& task_list) {
|
||||
if(context == nullptr) {
|
||||
return false;
|
||||
}
|
||||
|
||||
SearchContext::Id2IndexMap index_files = context->GetIndexMap();
|
||||
//some index loader alread exists
|
||||
for(auto& task : task_list) {
|
||||
if(task->type() != ScheduleTaskType::kIndexLoad) {
|
||||
continue;
|
||||
}
|
||||
|
||||
IndexLoadTaskPtr loader = std::static_pointer_cast<IndexLoadTask>(task);
|
||||
if(index_files.find(loader->file_->id_) != index_files.end()){
|
||||
ENGINE_LOG_INFO << "Append SearchContext to exist IndexLoaderContext";
|
||||
index_files.erase(loader->file_->id_);
|
||||
loader->search_contexts_.push_back(context);
|
||||
}
|
||||
}
|
||||
|
||||
//index_files still contains some index files, create new loader
|
||||
for(auto& pair : index_files) {
|
||||
ENGINE_LOG_INFO << "Create new IndexLoaderContext for: " << pair.second->location_;
|
||||
IndexLoadTaskPtr new_loader = std::make_shared<IndexLoadTask>();
|
||||
new_loader->search_contexts_.push_back(context);
|
||||
new_loader->file_ = pair.second;
|
||||
|
||||
auto index = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(pair.second->location_);
|
||||
if(index != nullptr) {
|
||||
//if the index file has been in memory, increase its priority
|
||||
task_list.push_front(new_loader);
|
||||
} else {
|
||||
//index file not in memory, put it to tail
|
||||
task_list.push_back(new_loader);
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
class DeleteTableStrategy {
|
||||
public:
|
||||
bool Schedule(const DeleteContextPtr &context, std::list<ScheduleTaskPtr> &task_list) {
|
||||
if (context == nullptr) {
|
||||
return false;
|
||||
}
|
||||
|
||||
DeleteTaskPtr delete_task = std::make_shared<DeleteTask>(context);
|
||||
if(task_list.empty()) {
|
||||
task_list.push_back(delete_task);
|
||||
return true;
|
||||
}
|
||||
|
||||
std::string table_id = context->table_id();
|
||||
for(auto iter = task_list.begin(); iter != task_list.end(); ++iter) {
|
||||
if((*iter)->type() != ScheduleTaskType::kIndexLoad) {
|
||||
continue;
|
||||
}
|
||||
|
||||
//put delete task to proper position
|
||||
IndexLoadTaskPtr loader = std::static_pointer_cast<IndexLoadTask>(*iter);
|
||||
if(loader->file_->table_id_ == table_id) {
|
||||
|
||||
task_list.insert(++iter, delete_task);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
bool TaskDispatchStrategy::Schedule(const ScheduleContextPtr &context_ptr,
|
||||
std::list<zilliz::milvus::engine::ScheduleTaskPtr> &task_list) {
|
||||
if(context_ptr == nullptr) {
|
||||
return false;
|
||||
}
|
||||
|
||||
switch(context_ptr->type()) {
|
||||
case ScheduleContextType::kSearch: {
|
||||
SearchContextPtr search_context = std::static_pointer_cast<SearchContext>(context_ptr);
|
||||
ReuseCacheIndexStrategy strategy;
|
||||
return strategy.Schedule(search_context, task_list);
|
||||
}
|
||||
case ScheduleContextType::kDelete: {
|
||||
DeleteContextPtr delete_context = std::static_pointer_cast<DeleteContext>(context_ptr);
|
||||
DeleteTableStrategy strategy;
|
||||
return strategy.Schedule(delete_context, task_list);
|
||||
}
|
||||
default:
|
||||
ENGINE_LOG_ERROR << "Invalid schedule task type";
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -5,18 +5,18 @@
|
||||
******************************************************************************/
|
||||
#pragma once
|
||||
|
||||
#include "IScheduleStrategy.h"
|
||||
#include "context/IScheduleContext.h"
|
||||
#include "task/IScheduleTask.h"
|
||||
|
||||
#include <list>
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
class StrategyFactory {
|
||||
private:
|
||||
StrategyFactory() {}
|
||||
|
||||
class TaskDispatchStrategy {
|
||||
public:
|
||||
static ScheduleStrategyPtr CreateMemStrategy();
|
||||
static bool Schedule(const ScheduleContextPtr &context_ptr, std::list<ScheduleTaskPtr>& task_list);
|
||||
};
|
||||
|
||||
}
|
||||
117
cpp/src/db/scheduler/TaskScheduler.cpp
Normal file
117
cpp/src/db/scheduler/TaskScheduler.cpp
Normal file
@ -0,0 +1,117 @@
|
||||
/*******************************************************************************
|
||||
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
|
||||
* Unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
* Proprietary and confidential.
|
||||
******************************************************************************/
|
||||
|
||||
#include "TaskScheduler.h"
|
||||
#include "TaskDispatchQueue.h"
|
||||
#include "utils/Log.h"
|
||||
#include "utils/TimeRecorder.h"
|
||||
#include "db/EngineFactory.h"
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
TaskScheduler::TaskScheduler()
|
||||
: stopped_(true) {
|
||||
Start();
|
||||
}
|
||||
|
||||
TaskScheduler::~TaskScheduler() {
|
||||
Stop();
|
||||
}
|
||||
|
||||
TaskScheduler& TaskScheduler::GetInstance() {
|
||||
static TaskScheduler s_instance;
|
||||
return s_instance;
|
||||
}
|
||||
|
||||
bool
|
||||
TaskScheduler::Start() {
|
||||
if(!stopped_) {
|
||||
return true;
|
||||
}
|
||||
|
||||
stopped_ = false;
|
||||
|
||||
task_queue_.SetCapacity(2);
|
||||
|
||||
task_dispatch_thread_ = std::make_shared<std::thread>(&TaskScheduler::TaskDispatchWorker, this);
|
||||
task_thread_ = std::make_shared<std::thread>(&TaskScheduler::TaskWorker, this);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool
|
||||
TaskScheduler::Stop() {
|
||||
if(stopped_) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if(task_dispatch_thread_) {
|
||||
task_dispatch_queue_.Put(nullptr);
|
||||
task_dispatch_thread_->join();
|
||||
task_dispatch_thread_ = nullptr;
|
||||
}
|
||||
|
||||
if(task_thread_) {
|
||||
task_queue_.Put(nullptr);
|
||||
task_thread_->join();
|
||||
task_thread_ = nullptr;
|
||||
}
|
||||
|
||||
stopped_ = true;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool
|
||||
TaskScheduler::Schedule(ScheduleContextPtr context) {
|
||||
task_dispatch_queue_.Put(context);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool
|
||||
TaskScheduler::TaskDispatchWorker() {
|
||||
while(true) {
|
||||
ScheduleTaskPtr task_ptr = task_dispatch_queue_.Take();
|
||||
if(task_ptr == nullptr) {
|
||||
SERVER_LOG_INFO << "Stop db task dispatch thread";
|
||||
break;//exit
|
||||
}
|
||||
|
||||
//execute task
|
||||
ScheduleTaskPtr next_task = task_ptr->Execute();
|
||||
if(next_task != nullptr) {
|
||||
task_queue_.Put(next_task);
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool
|
||||
TaskScheduler::TaskWorker() {
|
||||
while(true) {
|
||||
ScheduleTaskPtr task_ptr = task_queue_.Take();
|
||||
if(task_ptr == nullptr) {
|
||||
SERVER_LOG_INFO << "Stop db task thread";
|
||||
break;//exit
|
||||
}
|
||||
|
||||
//execute task
|
||||
ScheduleTaskPtr next_task = task_ptr->Execute();
|
||||
if(next_task != nullptr) {
|
||||
task_queue_.Put(next_task);
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
49
cpp/src/db/scheduler/TaskScheduler.h
Normal file
49
cpp/src/db/scheduler/TaskScheduler.h
Normal file
@ -0,0 +1,49 @@
|
||||
/*******************************************************************************
|
||||
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
|
||||
* Unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
* Proprietary and confidential.
|
||||
******************************************************************************/
|
||||
#pragma once
|
||||
|
||||
#include "context/IScheduleContext.h"
|
||||
#include "task/IScheduleTask.h"
|
||||
#include "TaskDispatchQueue.h"
|
||||
#include "utils/BlockingQueue.h"
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
class TaskScheduler {
|
||||
private:
|
||||
TaskScheduler();
|
||||
virtual ~TaskScheduler();
|
||||
|
||||
public:
|
||||
static TaskScheduler& GetInstance();
|
||||
|
||||
bool Schedule(ScheduleContextPtr context);
|
||||
|
||||
private:
|
||||
bool Start();
|
||||
bool Stop();
|
||||
|
||||
bool TaskDispatchWorker();
|
||||
bool TaskWorker();
|
||||
|
||||
private:
|
||||
std::shared_ptr<std::thread> task_dispatch_thread_;
|
||||
std::shared_ptr<std::thread> task_thread_;
|
||||
|
||||
TaskDispatchQueue task_dispatch_queue_;
|
||||
|
||||
using TaskQueue = server::BlockingQueue<ScheduleTaskPtr>;
|
||||
TaskQueue task_queue_;
|
||||
|
||||
bool stopped_ = true;
|
||||
};
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
22
cpp/src/db/scheduler/context/DeleteContext.cpp
Normal file
22
cpp/src/db/scheduler/context/DeleteContext.cpp
Normal file
@ -0,0 +1,22 @@
|
||||
/*******************************************************************************
|
||||
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
|
||||
* Unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
* Proprietary and confidential.
|
||||
******************************************************************************/
|
||||
|
||||
#include "DeleteContext.h"
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
DeleteContext::DeleteContext(const std::string& table_id, meta::Meta::Ptr& meta_ptr)
|
||||
: IScheduleContext(ScheduleContextType::kDelete),
|
||||
table_id_(table_id),
|
||||
meta_ptr_(meta_ptr) {
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
31
cpp/src/db/scheduler/context/DeleteContext.h
Normal file
31
cpp/src/db/scheduler/context/DeleteContext.h
Normal file
@ -0,0 +1,31 @@
|
||||
/*******************************************************************************
|
||||
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
|
||||
* Unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
* Proprietary and confidential.
|
||||
******************************************************************************/
|
||||
#pragma once
|
||||
|
||||
#include "IScheduleContext.h"
|
||||
#include "db/Meta.h"
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
class DeleteContext : public IScheduleContext {
|
||||
public:
|
||||
DeleteContext(const std::string& table_id, meta::Meta::Ptr& meta_ptr);
|
||||
|
||||
std::string table_id() const { return table_id_; }
|
||||
meta::Meta::Ptr meta() const { return meta_ptr_; }
|
||||
|
||||
private:
|
||||
std::string table_id_;
|
||||
meta::Meta::Ptr meta_ptr_;
|
||||
};
|
||||
|
||||
using DeleteContextPtr = std::shared_ptr<DeleteContext>;
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
38
cpp/src/db/scheduler/context/IScheduleContext.h
Normal file
38
cpp/src/db/scheduler/context/IScheduleContext.h
Normal file
@ -0,0 +1,38 @@
|
||||
/*******************************************************************************
|
||||
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
|
||||
* Unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
* Proprietary and confidential.
|
||||
******************************************************************************/
|
||||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
enum class ScheduleContextType {
|
||||
kUnknown = 0,
|
||||
kSearch,
|
||||
kDelete,
|
||||
};
|
||||
|
||||
class IScheduleContext {
|
||||
public:
|
||||
IScheduleContext(ScheduleContextType type)
|
||||
: type_(type) {
|
||||
}
|
||||
|
||||
virtual ~IScheduleContext() = default;
|
||||
|
||||
ScheduleContextType type() const { return type_; }
|
||||
|
||||
protected:
|
||||
ScheduleContextType type_;
|
||||
};
|
||||
|
||||
using ScheduleContextPtr = std::shared_ptr<IScheduleContext>;
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -14,7 +14,8 @@ namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
SearchContext::SearchContext(uint64_t topk, uint64_t nq, const float* vectors)
|
||||
: topk_(topk),
|
||||
: IScheduleContext(ScheduleContextType::kSearch),
|
||||
topk_(topk),
|
||||
nq_(nq),
|
||||
vectors_(vectors) {
|
||||
//use current time to identify this context
|
||||
@ -5,6 +5,7 @@
|
||||
******************************************************************************/
|
||||
#pragma once
|
||||
|
||||
#include "IScheduleContext.h"
|
||||
#include "db/MetaTypes.h"
|
||||
|
||||
#include <unordered_map>
|
||||
@ -18,7 +19,7 @@ namespace engine {
|
||||
|
||||
using TableFileSchemaPtr = std::shared_ptr<meta::TableFileSchema>;
|
||||
|
||||
class SearchContext {
|
||||
class SearchContext : public IScheduleContext {
|
||||
public:
|
||||
SearchContext(uint64_t topk, uint64_t nq, const float* vectors);
|
||||
|
||||
30
cpp/src/db/scheduler/task/DeleteTask.cpp
Normal file
30
cpp/src/db/scheduler/task/DeleteTask.cpp
Normal file
@ -0,0 +1,30 @@
|
||||
/*******************************************************************************
|
||||
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
|
||||
* Unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
* Proprietary and confidential.
|
||||
******************************************************************************/
|
||||
|
||||
#include "DeleteTask.h"
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
DeleteTask::DeleteTask(const DeleteContextPtr& context)
|
||||
: IScheduleTask(ScheduleTaskType::kDelete),
|
||||
context_(context) {
|
||||
|
||||
}
|
||||
|
||||
std::shared_ptr<IScheduleTask> DeleteTask::Execute() {
|
||||
|
||||
if(context_ != nullptr && context_->meta() != nullptr) {
|
||||
context_->meta()->DeleteTableFiles(context_->table_id());
|
||||
}
|
||||
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -5,22 +5,25 @@
|
||||
******************************************************************************/
|
||||
#pragma once
|
||||
|
||||
#include "IndexLoaderQueue.h"
|
||||
#include "SearchContext.h"
|
||||
#include "IScheduleTask.h"
|
||||
#include "db/scheduler/context/DeleteContext.h"
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
class IScheduleStrategy {
|
||||
class DeleteTask : public IScheduleTask {
|
||||
public:
|
||||
virtual ~IScheduleStrategy() {}
|
||||
DeleteTask(const DeleteContextPtr& context);
|
||||
|
||||
virtual bool Schedule(const SearchContextPtr &search_context, IndexLoaderQueue::LoaderQueue& loader_list) = 0;
|
||||
virtual std::shared_ptr<IScheduleTask> Execute() override;
|
||||
|
||||
private:
|
||||
DeleteContextPtr context_;
|
||||
};
|
||||
|
||||
using ScheduleStrategyPtr = std::shared_ptr<IScheduleStrategy>;
|
||||
using DeleteTaskPtr = std::shared_ptr<DeleteTask>;
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
41
cpp/src/db/scheduler/task/IScheduleTask.h
Normal file
41
cpp/src/db/scheduler/task/IScheduleTask.h
Normal file
@ -0,0 +1,41 @@
|
||||
/*******************************************************************************
|
||||
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
|
||||
* Unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
* Proprietary and confidential.
|
||||
******************************************************************************/
|
||||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
enum class ScheduleTaskType {
|
||||
kUnknown = 0,
|
||||
kIndexLoad,
|
||||
kSearch,
|
||||
kDelete,
|
||||
};
|
||||
|
||||
class IScheduleTask {
|
||||
public:
|
||||
IScheduleTask(ScheduleTaskType type)
|
||||
: type_(type) {
|
||||
}
|
||||
|
||||
virtual ~IScheduleTask() = default;
|
||||
|
||||
ScheduleTaskType type() const { return type_; }
|
||||
|
||||
virtual std::shared_ptr<IScheduleTask> Execute() = 0;
|
||||
|
||||
protected:
|
||||
ScheduleTaskType type_;
|
||||
};
|
||||
|
||||
using ScheduleTaskPtr = std::shared_ptr<IScheduleTask>;
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
72
cpp/src/db/scheduler/task/IndexLoadTask.cpp
Normal file
72
cpp/src/db/scheduler/task/IndexLoadTask.cpp
Normal file
@ -0,0 +1,72 @@
|
||||
/*******************************************************************************
|
||||
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
|
||||
* Unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
* Proprietary and confidential.
|
||||
******************************************************************************/
|
||||
|
||||
#include "IndexLoadTask.h"
|
||||
#include "SearchTask.h"
|
||||
#include "db/Log.h"
|
||||
#include "db/EngineFactory.h"
|
||||
#include "utils/TimeRecorder.h"
|
||||
#include "metrics/Metrics.h"
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
namespace {
|
||||
void CollectFileMetrics(int file_type, size_t file_size) {
|
||||
switch(file_type) {
|
||||
case meta::TableFileSchema::RAW:
|
||||
case meta::TableFileSchema::TO_INDEX: {
|
||||
server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size);
|
||||
server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size);
|
||||
server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size);
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
server::Metrics::GetInstance().IndexFileSizeHistogramObserve(file_size);
|
||||
server::Metrics::GetInstance().IndexFileSizeTotalIncrement(file_size);
|
||||
server::Metrics::GetInstance().IndexFileSizeGaugeSet(file_size);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
IndexLoadTask::IndexLoadTask()
|
||||
: IScheduleTask(ScheduleTaskType::kIndexLoad) {
|
||||
|
||||
}
|
||||
|
||||
std::shared_ptr<IScheduleTask> IndexLoadTask::Execute() {
|
||||
ENGINE_LOG_INFO << "Loading index(" << file_->id_ << ") from location: " << file_->location_;
|
||||
|
||||
server::TimeRecorder rc("Load index");
|
||||
//step 1: load index
|
||||
ExecutionEnginePtr index_ptr = EngineFactory::Build(file_->dimension_,
|
||||
file_->location_,
|
||||
(EngineType)file_->engine_type_);
|
||||
index_ptr->Load();
|
||||
|
||||
rc.Record("load index file to memory");
|
||||
|
||||
size_t file_size = index_ptr->PhysicalSize();
|
||||
LOG(DEBUG) << "Index file type " << file_->file_type_ << " Of Size: "
|
||||
<< file_size/(1024*1024) << " M";
|
||||
|
||||
CollectFileMetrics(file_->file_type_, file_size);
|
||||
|
||||
//step 2: return search task for later execution
|
||||
SearchTaskPtr task_ptr = std::make_shared<SearchTask>();
|
||||
task_ptr->index_id_ = file_->id_;
|
||||
task_ptr->index_type_ = file_->file_type_;
|
||||
task_ptr->index_engine_ = index_ptr;
|
||||
task_ptr->search_contexts_.swap(search_contexts_);
|
||||
return std::static_pointer_cast<IScheduleTask>(task_ptr);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
30
cpp/src/db/scheduler/task/IndexLoadTask.h
Normal file
30
cpp/src/db/scheduler/task/IndexLoadTask.h
Normal file
@ -0,0 +1,30 @@
|
||||
/*******************************************************************************
|
||||
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
|
||||
* Unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
* Proprietary and confidential.
|
||||
******************************************************************************/
|
||||
#pragma once
|
||||
|
||||
#include "IScheduleTask.h"
|
||||
#include "db/scheduler/context/SearchContext.h"
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
class IndexLoadTask : public IScheduleTask {
|
||||
public:
|
||||
IndexLoadTask();
|
||||
|
||||
virtual std::shared_ptr<IScheduleTask> Execute() override;
|
||||
|
||||
public:
|
||||
TableFileSchemaPtr file_;
|
||||
std::vector<SearchContextPtr> search_contexts_;
|
||||
};
|
||||
|
||||
using IndexLoadTaskPtr = std::shared_ptr<IndexLoadTask>;
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -4,6 +4,7 @@
|
||||
* Proprietary and confidential.
|
||||
******************************************************************************/
|
||||
#include "SearchTask.h"
|
||||
#include "metrics/Metrics.h"
|
||||
#include "utils/Log.h"
|
||||
#include "utils/TimeRecorder.h"
|
||||
|
||||
@ -110,15 +111,42 @@ void TopkResult(SearchContext::ResultSet &result_src,
|
||||
}
|
||||
}
|
||||
|
||||
void CollectDurationMetrics(int index_type, double total_time) {
|
||||
switch(index_type) {
|
||||
case meta::TableFileSchema::RAW: {
|
||||
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
|
||||
break;
|
||||
}
|
||||
case meta::TableFileSchema::TO_INDEX: {
|
||||
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool SearchTask::DoSearch() {
|
||||
}
|
||||
|
||||
SearchTask::SearchTask()
|
||||
: IScheduleTask(ScheduleTaskType::kSearch) {
|
||||
|
||||
}
|
||||
|
||||
std::shared_ptr<IScheduleTask> SearchTask::Execute() {
|
||||
if(index_engine_ == nullptr) {
|
||||
return false;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
SERVER_LOG_INFO << "Searching in index(" << index_id_<< ") with "
|
||||
<< search_contexts_.size() << " tasks";
|
||||
|
||||
server::TimeRecorder rc("DoSearch index(" + std::to_string(index_id_) + ")");
|
||||
|
||||
auto start_time = METRICS_NOW_TIME;
|
||||
|
||||
std::vector<long> output_ids;
|
||||
std::vector<float> output_distence;
|
||||
for(auto& context : search_contexts_) {
|
||||
@ -153,9 +181,13 @@ bool SearchTask::DoSearch() {
|
||||
context->IndexSearchDone(index_id_);
|
||||
}
|
||||
|
||||
auto end_time = METRICS_NOW_TIME;
|
||||
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
|
||||
CollectDurationMetrics(index_type_, total_time);
|
||||
|
||||
rc.Elapse("totally cost");
|
||||
|
||||
return true;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
}
|
||||
@ -5,19 +5,19 @@
|
||||
******************************************************************************/
|
||||
#pragma once
|
||||
|
||||
#include "SearchContext.h"
|
||||
#include "utils/BlockingQueue.h"
|
||||
#include "IScheduleTask.h"
|
||||
#include "db/scheduler/context/SearchContext.h"
|
||||
#include "db/ExecutionEngine.h"
|
||||
|
||||
#include <memory>
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
class SearchTask {
|
||||
class SearchTask : public IScheduleTask {
|
||||
public:
|
||||
bool DoSearch();
|
||||
SearchTask();
|
||||
|
||||
virtual std::shared_ptr<IScheduleTask> Execute() override;
|
||||
|
||||
public:
|
||||
size_t index_id_ = 0;
|
||||
@ -20,7 +20,7 @@ namespace {
|
||||
static constexpr int64_t TOTAL_ROW_COUNT = 100000;
|
||||
static constexpr int64_t TOP_K = 10;
|
||||
static constexpr int64_t SEARCH_TARGET = 5000; //change this value, result is different
|
||||
static constexpr int64_t ADD_VECTOR_LOOP = 2;
|
||||
static constexpr int64_t ADD_VECTOR_LOOP = 10;
|
||||
|
||||
#define BLOCK_SPLITER std::cout << "===========================================" << std::endl;
|
||||
|
||||
@ -195,10 +195,10 @@ ClientTest::Test(const std::string& address, const std::string& port) {
|
||||
PrintSearchResult(topk_query_result_array);
|
||||
}
|
||||
|
||||
// {//delete table
|
||||
// Status stat = conn->DeleteTable(TABLE_NAME);
|
||||
// std::cout << "DeleteTable function call status: " << stat.ToString() << std::endl;
|
||||
// }
|
||||
{//delete table
|
||||
Status stat = conn->DeleteTable(TABLE_NAME);
|
||||
std::cout << "DeleteTable function call status: " << stat.ToString() << std::endl;
|
||||
}
|
||||
|
||||
{//server status
|
||||
std::string status = conn->ServerStatus();
|
||||
|
||||
42
cpp/src/server/DBWrapper.cpp
Normal file
42
cpp/src/server/DBWrapper.cpp
Normal file
@ -0,0 +1,42 @@
|
||||
/*******************************************************************************
|
||||
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
|
||||
* Unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
* Proprietary and confidential.
|
||||
******************************************************************************/
|
||||
|
||||
#include "DBWrapper.h"
|
||||
#include "ServerConfig.h"
|
||||
#include "utils/CommonUtil.h"
|
||||
#include "utils/Log.h"
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace server {
|
||||
|
||||
DBWrapper::DBWrapper() {
|
||||
zilliz::milvus::engine::Options opt;
|
||||
ConfigNode& config = ServerConfig::GetInstance().GetConfig(CONFIG_DB);
|
||||
opt.meta.backend_uri = config.GetValue(CONFIG_DB_URL);
|
||||
std::string db_path = config.GetValue(CONFIG_DB_PATH);
|
||||
opt.meta.path = db_path + "/db";
|
||||
int64_t index_size = config.GetInt64Value(CONFIG_DB_INDEX_TRIGGER_SIZE);
|
||||
if(index_size > 0) {//ensure larger than zero, unit is MB
|
||||
opt.index_trigger_size = (size_t)index_size * engine::ONE_MB;
|
||||
}
|
||||
|
||||
CommonUtil::CreateDirectory(opt.meta.path);
|
||||
|
||||
zilliz::milvus::engine::DB::Open(opt, &db_);
|
||||
if(db_ == nullptr) {
|
||||
SERVER_LOG_ERROR << "Failed to open db";
|
||||
throw ServerException(SERVER_NULL_POINTER, "Failed to open db");
|
||||
}
|
||||
}
|
||||
|
||||
DBWrapper::~DBWrapper() {
|
||||
delete db_;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
34
cpp/src/server/DBWrapper.h
Normal file
34
cpp/src/server/DBWrapper.h
Normal file
@ -0,0 +1,34 @@
|
||||
/*******************************************************************************
|
||||
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
|
||||
* Unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
* Proprietary and confidential.
|
||||
******************************************************************************/
|
||||
#pragma once
|
||||
|
||||
#include "db/DB.h"
|
||||
#include "db/Meta.h"
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace server {
|
||||
|
||||
class DBWrapper {
|
||||
private:
|
||||
DBWrapper();
|
||||
~DBWrapper();
|
||||
|
||||
public:
|
||||
static zilliz::milvus::engine::DB* DB() {
|
||||
static DBWrapper db_wrapper;
|
||||
return db_wrapper.db();
|
||||
}
|
||||
|
||||
zilliz::milvus::engine::DB* db() { return db_; }
|
||||
|
||||
private:
|
||||
zilliz::milvus::engine::DB* db_ = nullptr;
|
||||
};
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -7,6 +7,7 @@
|
||||
#include "RequestHandler.h"
|
||||
#include "ServerConfig.h"
|
||||
#include "ThreadPoolServer.h"
|
||||
#include "DBWrapper.h"
|
||||
|
||||
#include "milvus_types.h"
|
||||
#include "milvus_constants.h"
|
||||
@ -51,6 +52,8 @@ MilvusServer::StartService() {
|
||||
std::string mode = server_config.GetValue(CONFIG_SERVER_MODE, "thread_pool");
|
||||
|
||||
try {
|
||||
DBWrapper::DB();//initialize db
|
||||
|
||||
stdcxx::shared_ptr<RequestHandler> handler(new RequestHandler());
|
||||
stdcxx::shared_ptr<TProcessor> processor(new MilvusServiceProcessor(handler));
|
||||
stdcxx::shared_ptr<TServerTransport> server_transport(new TServerSocket(address, port));
|
||||
|
||||
@ -8,8 +8,7 @@
|
||||
#include "utils/CommonUtil.h"
|
||||
#include "utils/Log.h"
|
||||
#include "utils/TimeRecorder.h"
|
||||
#include "db/DB.h"
|
||||
#include "db/Meta.h"
|
||||
#include "DBWrapper.h"
|
||||
#include "version.h"
|
||||
|
||||
namespace zilliz {
|
||||
@ -26,43 +25,6 @@ using DB_META = zilliz::milvus::engine::meta::Meta;
|
||||
using DB_DATE = zilliz::milvus::engine::meta::DateT;
|
||||
|
||||
namespace {
|
||||
class DBWrapper {
|
||||
public:
|
||||
DBWrapper() {
|
||||
zilliz::milvus::engine::Options opt;
|
||||
ConfigNode& config = ServerConfig::GetInstance().GetConfig(CONFIG_DB);
|
||||
opt.meta.backend_uri = config.GetValue(CONFIG_DB_URL);
|
||||
std::string db_path = config.GetValue(CONFIG_DB_PATH);
|
||||
opt.meta.path = db_path + "/db";
|
||||
int64_t index_size = config.GetInt64Value(CONFIG_DB_INDEX_TRIGGER_SIZE);
|
||||
if(index_size > 0) {//ensure larger than zero, unit is MB
|
||||
opt.index_trigger_size = (size_t)index_size * engine::ONE_MB;
|
||||
}
|
||||
|
||||
CommonUtil::CreateDirectory(opt.meta.path);
|
||||
|
||||
zilliz::milvus::engine::DB::Open(opt, &db_);
|
||||
if(db_ == nullptr) {
|
||||
SERVER_LOG_ERROR << "Failed to open db";
|
||||
throw ServerException(SERVER_NULL_POINTER, "Failed to open db");
|
||||
}
|
||||
}
|
||||
|
||||
~DBWrapper() {
|
||||
delete db_;
|
||||
}
|
||||
|
||||
zilliz::milvus::engine::DB* DB() { return db_; }
|
||||
|
||||
private:
|
||||
zilliz::milvus::engine::DB* db_ = nullptr;
|
||||
};
|
||||
|
||||
zilliz::milvus::engine::DB* DB() {
|
||||
static DBWrapper db_wrapper;
|
||||
return db_wrapper.DB();
|
||||
}
|
||||
|
||||
engine::EngineType EngineType(int type) {
|
||||
static std::map<int, engine::EngineType> map_type = {
|
||||
{0, engine::EngineType::INVALID},
|
||||
@ -199,7 +161,7 @@ ServerError CreateTableTask::OnExecute() {
|
||||
table_info.store_raw_data_ = schema_.store_raw_vector;
|
||||
|
||||
//step 3: create table
|
||||
engine::Status stat = DB()->CreateTable(table_info);
|
||||
engine::Status stat = DBWrapper::DB()->CreateTable(table_info);
|
||||
if(!stat.ok()) {//table could exist
|
||||
error_code_ = SERVER_UNEXPECTED_ERROR;
|
||||
error_msg_ = "Engine failed: " + stat.ToString();
|
||||
@ -246,7 +208,7 @@ ServerError DescribeTableTask::OnExecute() {
|
||||
//step 2: get table info
|
||||
engine::meta::TableSchema table_info;
|
||||
table_info.table_id_ = table_name_;
|
||||
engine::Status stat = DB()->DescribeTable(table_info);
|
||||
engine::Status stat = DBWrapper::DB()->DescribeTable(table_info);
|
||||
if(!stat.ok()) {
|
||||
error_code_ = SERVER_TABLE_NOT_EXIST;
|
||||
error_msg_ = "Engine failed: " + stat.ToString();
|
||||
@ -297,7 +259,7 @@ ServerError DeleteTableTask::OnExecute() {
|
||||
//step 2: check table existence
|
||||
engine::meta::TableSchema table_info;
|
||||
table_info.table_id_ = table_name_;
|
||||
engine::Status stat = DB()->DescribeTable(table_info);
|
||||
engine::Status stat = DBWrapper::DB()->DescribeTable(table_info);
|
||||
if(!stat.ok()) {
|
||||
error_code_ = SERVER_TABLE_NOT_EXIST;
|
||||
error_msg_ = "Engine failed: " + stat.ToString();
|
||||
@ -309,7 +271,7 @@ ServerError DeleteTableTask::OnExecute() {
|
||||
|
||||
//step 3: delete table
|
||||
std::vector<DB_DATE> dates;
|
||||
stat = DB()->DeleteTable(table_name_, dates);
|
||||
stat = DBWrapper::DB()->DeleteTable(table_name_, dates);
|
||||
if(!stat.ok()) {
|
||||
SERVER_LOG_ERROR << "Engine failed: " << stat.ToString();
|
||||
return SERVER_UNEXPECTED_ERROR;
|
||||
@ -340,7 +302,7 @@ BaseTaskPtr ShowTablesTask::Create(std::vector<std::string>& tables) {
|
||||
|
||||
ServerError ShowTablesTask::OnExecute() {
|
||||
std::vector<engine::meta::TableSchema> schema_array;
|
||||
engine::Status stat = DB()->AllTables(schema_array);
|
||||
engine::Status stat = DBWrapper::DB()->AllTables(schema_array);
|
||||
if(!stat.ok()) {
|
||||
error_code_ = SERVER_UNEXPECTED_ERROR;
|
||||
error_msg_ = "Engine failed: " + stat.ToString();
|
||||
@ -395,7 +357,7 @@ ServerError AddVectorTask::OnExecute() {
|
||||
//step 2: check table existence
|
||||
engine::meta::TableSchema table_info;
|
||||
table_info.table_id_ = table_name_;
|
||||
engine::Status stat = DB()->DescribeTable(table_info);
|
||||
engine::Status stat = DBWrapper::DB()->DescribeTable(table_info);
|
||||
if(!stat.ok()) {
|
||||
error_code_ = SERVER_TABLE_NOT_EXIST;
|
||||
error_msg_ = "Engine failed: " + stat.ToString();
|
||||
@ -417,7 +379,7 @@ ServerError AddVectorTask::OnExecute() {
|
||||
|
||||
//step 4: insert vectors
|
||||
uint64_t vec_count = (uint64_t)record_array_.size();
|
||||
stat = DB()->InsertVectors(table_name_, vec_count, vec_f.data(), record_ids_);
|
||||
stat = DBWrapper::DB()->InsertVectors(table_name_, vec_count, vec_f.data(), record_ids_);
|
||||
rc.Record("add vectors to engine");
|
||||
if(!stat.ok()) {
|
||||
error_code_ = SERVER_UNEXPECTED_ERROR;
|
||||
@ -493,7 +455,7 @@ ServerError SearchVectorTask::OnExecute() {
|
||||
//step 2: check table existence
|
||||
engine::meta::TableSchema table_info;
|
||||
table_info.table_id_ = table_name_;
|
||||
engine::Status stat = DB()->DescribeTable(table_info);
|
||||
engine::Status stat = DBWrapper::DB()->DescribeTable(table_info);
|
||||
if(!stat.ok()) {
|
||||
error_code_ = SERVER_TABLE_NOT_EXIST;
|
||||
error_msg_ = "Engine failed: " + stat.ToString();
|
||||
@ -526,9 +488,9 @@ ServerError SearchVectorTask::OnExecute() {
|
||||
uint64_t record_count = (uint64_t)record_array_.size();
|
||||
|
||||
if(file_id_array_.empty()) {
|
||||
stat = DB()->Query(table_name_, (size_t) top_k_, record_count, vec_f.data(), dates, results);
|
||||
stat = DBWrapper::DB()->Query(table_name_, (size_t) top_k_, record_count, vec_f.data(), dates, results);
|
||||
} else {
|
||||
stat = DB()->Query(table_name_, file_id_array_, (size_t) top_k_, record_count, vec_f.data(), dates, results);
|
||||
stat = DBWrapper::DB()->Query(table_name_, file_id_array_, (size_t) top_k_, record_count, vec_f.data(), dates, results);
|
||||
}
|
||||
|
||||
rc.Record("search vectors from engine");
|
||||
@ -599,7 +561,7 @@ ServerError GetTableRowCountTask::OnExecute() {
|
||||
|
||||
//step 2: get row count
|
||||
uint64_t row_count = 0;
|
||||
engine::Status stat = DB()->GetTableRowCount(table_name_, row_count);
|
||||
engine::Status stat = DBWrapper::DB()->GetTableRowCount(table_name_, row_count);
|
||||
if (!stat.ok()) {
|
||||
error_code_ = SERVER_UNEXPECTED_ERROR;
|
||||
error_msg_ = "Engine failed: " + stat.ToString();
|
||||
|
||||
@ -4,11 +4,19 @@
|
||||
# Proprietary and confidential.
|
||||
#-------------------------------------------------------------------------------
|
||||
aux_source_directory(${MILVUS_ENGINE_SRC}/db db_srcs)
|
||||
aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler db_scheduler_srcs)
|
||||
aux_source_directory(${MILVUS_ENGINE_SRC}/config config_files)
|
||||
aux_source_directory(${MILVUS_ENGINE_SRC}/cache cache_srcs)
|
||||
aux_source_directory(${MILVUS_ENGINE_SRC}/wrapper wrapper_src)
|
||||
|
||||
aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler scheduler_files)
|
||||
aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler/context scheduler_context_files)
|
||||
aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler/task scheduler_task_files)
|
||||
set(db_scheduler_srcs
|
||||
${scheduler_files}
|
||||
${scheduler_context_files}
|
||||
${scheduler_task_files}
|
||||
)
|
||||
|
||||
include_directories(/usr/local/cuda/include)
|
||||
link_directories("/usr/local/cuda/lib64")
|
||||
|
||||
|
||||
@ -13,12 +13,19 @@ include_directories(../../src)
|
||||
|
||||
|
||||
aux_source_directory(../../src/db db_srcs)
|
||||
aux_source_directory(../../src/db/scheduler db_scheduler_srcs)
|
||||
aux_source_directory(../../src/config config_files)
|
||||
aux_source_directory(../../src/cache cache_srcs)
|
||||
aux_source_directory(../../src/wrapper wrapper_src)
|
||||
aux_source_directory(../../src/metrics metrics_src)
|
||||
|
||||
aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler scheduler_files)
|
||||
aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler/context scheduler_context_files)
|
||||
aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler/task scheduler_task_files)
|
||||
set(db_scheduler_srcs
|
||||
${scheduler_files}
|
||||
${scheduler_context_files}
|
||||
${scheduler_task_files}
|
||||
)
|
||||
|
||||
include_directories(/usr/include)
|
||||
include_directories(../../third_party/build/include)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user