mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-01 00:15:30 +08:00
Merge branch 'jinhai' of 192.168.1.105:jinhai/vecwise_engine into jinhai
Former-commit-id: 00d18f20fb408445831a00a5385a244c49786910
This commit is contained in:
commit
74eee9dc77
@ -70,7 +70,7 @@ Status DBImpl::search(const std::string& group_id, size_t k, size_t nq,
|
||||
auto status = _pMeta->files_to_search(group_id, dates, files);
|
||||
if (!status.ok()) { return status; }
|
||||
|
||||
LOG(DEBUG) << "Search DateT Size=" << files.size();
|
||||
/* LOG(DEBUG) << "Search DateT Size=" << files.size(); */
|
||||
|
||||
meta::GroupFilesSchema index_files;
|
||||
meta::GroupFilesSchema raw_files;
|
||||
@ -112,6 +112,8 @@ Status DBImpl::search(const std::string& group_id, size_t k, size_t nq,
|
||||
memset(output_distence, 0, k * nq * sizeof(float));
|
||||
memset(output_ids, 0, k * nq * sizeof(long));
|
||||
|
||||
long search_set_size = 0;
|
||||
|
||||
auto search_in_index = [&](meta::GroupFilesSchema& file_vec) -> void {
|
||||
for (auto &file : file_vec) {
|
||||
auto index = zilliz::vecwise::cache::CpuCacheMgr::GetInstance()->GetIndex(file.location);
|
||||
@ -120,7 +122,10 @@ Status DBImpl::search(const std::string& group_id, size_t k, size_t nq,
|
||||
index = read_index(file.location.c_str());
|
||||
zilliz::vecwise::cache::CpuCacheMgr::GetInstance()->InsertItem(file.location, index);
|
||||
}
|
||||
LOG(DEBUG) << "Search Index Of Size: " << index->dim * index->ntotal * 4 /(1024*1024) << " M";
|
||||
auto file_size = index->dim * index->ntotal * 4 /(1024*1024);
|
||||
search_set_size += file_size;
|
||||
LOG(DEBUG) << "Search file_type " << file.file_type << " Of Size: "
|
||||
<< file_size << " M";
|
||||
index->search(nq, vectors, k, output_distence, output_ids);
|
||||
cluster(output_ids, output_distence); // cluster to each query
|
||||
memset(output_distence, 0, k * nq * sizeof(float));
|
||||
@ -146,6 +151,8 @@ Status DBImpl::search(const std::string& group_id, size_t k, size_t nq,
|
||||
|
||||
search_in_index(raw_files);
|
||||
search_in_index(index_files);
|
||||
|
||||
LOG(DEBUG) << "Search Overall Set Size=" << search_set_size << " M";
|
||||
cluster_topk();
|
||||
|
||||
free(output_distence);
|
||||
@ -229,8 +236,8 @@ Status DBImpl::merge_files(const std::string& group_id, const meta::DateT& date,
|
||||
auto file_schema = file;
|
||||
file_schema.file_type = meta::GroupFileSchema::TO_DELETE;
|
||||
updated.push_back(file_schema);
|
||||
LOG(DEBUG) << "About to merge file " << file_schema.file_id <<
|
||||
" of size=" << file_schema.rows;
|
||||
/* LOG(DEBUG) << "About to merge file " << file_schema.file_id << */
|
||||
/* " of size=" << file_schema.rows; */
|
||||
index_size = group_file.dimension * index->ntotal;
|
||||
|
||||
if (index_size >= _options.index_trigger_size) break;
|
||||
@ -246,8 +253,8 @@ Status DBImpl::merge_files(const std::string& group_id, const meta::DateT& date,
|
||||
group_file.rows = index_size;
|
||||
updated.push_back(group_file);
|
||||
status = _pMeta->update_files(updated);
|
||||
LOG(DEBUG) << "New merged file " << group_file.file_id <<
|
||||
" of size=" << group_file.rows;
|
||||
/* LOG(DEBUG) << "New merged file " << group_file.file_id << */
|
||||
/* " of size=" << group_file.rows; */
|
||||
|
||||
zilliz::vecwise::cache::CpuCacheMgr::GetInstance()->InsertItem(
|
||||
group_file.location, std::make_shared<Index>(index));
|
||||
@ -304,13 +311,13 @@ Status DBImpl::build_index(const meta::GroupFileSchema& file) {
|
||||
}
|
||||
auto from_index = dynamic_cast<faiss::IndexIDMap*>(to_index->data().get());
|
||||
|
||||
LOG(DEBUG) << "Preparing build_index for file_id=" << file.file_id
|
||||
<< " with new index_file_id=" << group_file.file_id << std::endl;
|
||||
/* LOG(DEBUG) << "Preparing build_index for file_id=" << file.file_id */
|
||||
/* << " with new index_file_id=" << group_file.file_id << std::endl; */
|
||||
auto index = pBuilder->build_all(from_index->ntotal,
|
||||
dynamic_cast<faiss::IndexFlat*>(from_index->index)->xb.data(),
|
||||
from_index->id_map.data());
|
||||
LOG(DEBUG) << "Ending build_index for file_id=" << file.file_id
|
||||
<< " with new index_file_id=" << group_file.file_id << std::endl;
|
||||
/* LOG(DEBUG) << "Ending build_index for file_id=" << file.file_id */
|
||||
/* << " with new index_file_id=" << group_file.file_id << std::endl; */
|
||||
/* std::cout << "raw size=" << from_index->ntotal << " index size=" << index->ntotal << std::endl; */
|
||||
write_index(index, group_file.location.c_str());
|
||||
group_file.file_type = meta::GroupFileSchema::INDEX;
|
||||
@ -335,14 +342,14 @@ void DBImpl::background_build_index() {
|
||||
_pMeta->files_to_index(to_index_files);
|
||||
Status status;
|
||||
for (auto& file : to_index_files) {
|
||||
LOG(DEBUG) << "Buiding index for " << file.location;
|
||||
/* LOG(DEBUG) << "Buiding index for " << file.location; */
|
||||
status = build_index(file);
|
||||
if (!status.ok()) {
|
||||
_bg_error = status;
|
||||
return;
|
||||
}
|
||||
}
|
||||
LOG(DEBUG) << "All Buiding index Done";
|
||||
/* LOG(DEBUG) << "All Buiding index Done"; */
|
||||
|
||||
bg_build_index_started_ = false;
|
||||
bg_build_index_finish_signal_.notify_all();
|
||||
@ -381,21 +388,17 @@ Status DBImpl::count(const std::string& group_id, long& result) {
|
||||
|
||||
DBImpl::~DBImpl() {
|
||||
{
|
||||
LOG(DEBUG) << "Start wait background merge thread";
|
||||
std::unique_lock<std::mutex> lock(_mutex);
|
||||
_shutting_down.store(true, std::memory_order_release);
|
||||
while (_bg_compaction_scheduled) {
|
||||
_bg_work_finish_signal.wait(lock);
|
||||
}
|
||||
LOG(DEBUG) << "Stop wait background merge thread";
|
||||
}
|
||||
{
|
||||
LOG(DEBUG) << "Start wait background build index thread";
|
||||
std::unique_lock<std::mutex> lock(build_index_mutex_);
|
||||
while (bg_build_index_started_) {
|
||||
bg_build_index_finish_signal_.wait(lock);
|
||||
}
|
||||
LOG(DEBUG) << "Stop wait background build index thread";
|
||||
}
|
||||
std::vector<std::string> ids;
|
||||
_pMemMgr->serialize(ids);
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
#include <sys/stat.h>
|
||||
|
||||
#include <unistd.h>
|
||||
#include <sstream>
|
||||
#include <iostream>
|
||||
@ -109,7 +109,7 @@ Status DBMetaImpl::add_group(GroupSchema& group_info) {
|
||||
try {
|
||||
auto id = ConnectorPtr->insert(group_info);
|
||||
group_info.id = id;
|
||||
LOG(DEBUG) << "Add group " << id;
|
||||
/* LOG(DEBUG) << "Add group " << id; */
|
||||
} catch (...) {
|
||||
return Status::DBTransactionError("Add Group Error");
|
||||
}
|
||||
@ -193,7 +193,7 @@ Status DBMetaImpl::add_group_file(GroupFileSchema& group_file) {
|
||||
try {
|
||||
auto id = ConnectorPtr->insert(group_file);
|
||||
group_file.id = id;
|
||||
LOG(DEBUG) << "Add group_file of file_id=" << group_file.file_id;
|
||||
/* LOG(DEBUG) << "Add group_file of file_id=" << group_file.file_id; */
|
||||
} catch (...) {
|
||||
return Status::DBTransactionError("Add file Error");
|
||||
}
|
||||
@ -434,7 +434,7 @@ Status DBMetaImpl::cleanup_ttl_files(uint16_t seconds) {
|
||||
boost::filesystem::remove(group_file.location);
|
||||
}
|
||||
ConnectorPtr->remove<GroupFileSchema>(group_file.id);
|
||||
LOG(DEBUG) << "Removing deleted id=" << group_file.id << " location=" << group_file.location << std::endl;
|
||||
/* LOG(DEBUG) << "Removing deleted id=" << group_file.id << " location=" << group_file.location << std::endl; */
|
||||
}
|
||||
} catch (std::exception & e) {
|
||||
LOG(DEBUG) << e.what();
|
||||
@ -470,7 +470,7 @@ Status DBMetaImpl::cleanup() {
|
||||
boost::filesystem::remove(group_file.location);
|
||||
}
|
||||
ConnectorPtr->remove<GroupFileSchema>(group_file.id);
|
||||
LOG(DEBUG) << "Removing id=" << group_file.id << " location=" << group_file.location << std::endl;
|
||||
/* LOG(DEBUG) << "Removing id=" << group_file.id << " location=" << group_file.location << std::endl; */
|
||||
}
|
||||
} catch (std::exception & e) {
|
||||
LOG(DEBUG) << e.what();
|
||||
|
||||
@ -54,19 +54,21 @@ TEST_F(DBTest, DB_TEST) {
|
||||
INIT_TIMER;
|
||||
std::stringstream ss;
|
||||
long count = 0;
|
||||
long prev_count = -1;
|
||||
|
||||
for (auto j=0; j<15; ++j) {
|
||||
for (auto j=0; j<10; ++j) {
|
||||
ss.str("");
|
||||
db_->count(group_name, count);
|
||||
|
||||
ss << "Search " << j << " With Size " << count;
|
||||
prev_count = count;
|
||||
|
||||
START_TIMER;
|
||||
stat = db_->search(group_name, k, qb, qxb, results);
|
||||
ss << "Search " << j << " With Size " << (float)(count*group_dim*sizeof(float))/(1024*1024) << " M";
|
||||
STOP_TIMER(ss.str());
|
||||
|
||||
ASSERT_STATS(stat);
|
||||
ASSERT_EQ(results[0][0], target_ids[0]);
|
||||
ASSERT_TRUE(count >= prev_count);
|
||||
std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||
}
|
||||
});
|
||||
@ -79,7 +81,7 @@ TEST_F(DBTest, DB_TEST) {
|
||||
} else {
|
||||
db_->add_vectors(group_name, nb, xb, vector_ids);
|
||||
}
|
||||
std::this_thread::sleep_for(std::chrono::microseconds(5));
|
||||
std::this_thread::sleep_for(std::chrono::microseconds(1));
|
||||
}
|
||||
|
||||
search.join();
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user