mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-30 15:35:33 +08:00
update
Former-commit-id: 0b15e2302d0bec551b1215285ff86b5f6cdf71de
This commit is contained in:
commit
b2cdb9e96d
@ -158,10 +158,6 @@ Status DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq,
|
||||
|
||||
Status DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq,
|
||||
const float* vectors, const meta::DatesT& dates, QueryResults& results) {
|
||||
#if 0
|
||||
return QuerySync(table_id, k, nq, vectors, dates, results);
|
||||
#else
|
||||
|
||||
//get all table files from table
|
||||
meta::DatePartionedTableFilesSchema files;
|
||||
auto status = meta_ptr_->FilesToSearch(table_id, dates, files);
|
||||
@ -175,7 +171,6 @@ Status DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq,
|
||||
}
|
||||
|
||||
return QueryAsync(table_id, file_id_array, k, nq, vectors, dates, results);
|
||||
#endif
|
||||
}
|
||||
|
||||
Status DBImpl::Query(const std::string& table_id, const std::vector<std::string>& file_ids,
|
||||
@ -203,141 +198,6 @@ Status DBImpl::Query(const std::string& table_id, const std::vector<std::string>
|
||||
return QueryAsync(table_id, files_array, k, nq, vectors, dates, results);
|
||||
}
|
||||
|
||||
Status DBImpl::QuerySync(const std::string& table_id, uint64_t k, uint64_t nq,
|
||||
const float* vectors, const meta::DatesT& dates, QueryResults& results) {
|
||||
meta::DatePartionedTableFilesSchema files;
|
||||
auto status = meta_ptr_->FilesToSearch(table_id, dates, files);
|
||||
if (!status.ok()) { return status; }
|
||||
|
||||
ENGINE_LOG_DEBUG << "Search DateT Size = " << files.size();
|
||||
|
||||
meta::TableFilesSchema index_files;
|
||||
meta::TableFilesSchema raw_files;
|
||||
for (auto &day_files : files) {
|
||||
for (auto &file : day_files.second) {
|
||||
file.file_type_ == meta::TableFileSchema::INDEX ?
|
||||
index_files.push_back(file) : raw_files.push_back(file);
|
||||
}
|
||||
}
|
||||
|
||||
int dim = 0;
|
||||
if (!index_files.empty()) {
|
||||
dim = index_files[0].dimension_;
|
||||
} else if (!raw_files.empty()) {
|
||||
dim = raw_files[0].dimension_;
|
||||
} else {
|
||||
ENGINE_LOG_DEBUG << "no files to search";
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
{
|
||||
// [{ids, distence}, ...]
|
||||
using SearchResult = std::pair<std::vector<long>, std::vector<float>>;
|
||||
std::vector<SearchResult> batchresult(nq); // allocate nq cells.
|
||||
|
||||
auto cluster = [&](long *nns, float *dis, const int& k) -> void {
|
||||
for (int i = 0; i < nq; ++i) {
|
||||
auto f_begin = batchresult[i].first.cbegin();
|
||||
auto s_begin = batchresult[i].second.cbegin();
|
||||
batchresult[i].first.insert(f_begin, nns + i * k, nns + i * k + k);
|
||||
batchresult[i].second.insert(s_begin, dis + i * k, dis + i * k + k);
|
||||
}
|
||||
};
|
||||
|
||||
// Allocate Memory
|
||||
float *output_distence;
|
||||
long *output_ids;
|
||||
output_distence = (float *) malloc(k * nq * sizeof(float));
|
||||
output_ids = (long *) malloc(k * nq * sizeof(long));
|
||||
memset(output_distence, 0, k * nq * sizeof(float));
|
||||
memset(output_ids, 0, k * nq * sizeof(long));
|
||||
|
||||
long search_set_size = 0;
|
||||
|
||||
auto search_in_index = [&](meta::TableFilesSchema& file_vec) -> void {
|
||||
for (auto &file : file_vec) {
|
||||
|
||||
ExecutionEnginePtr index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_);
|
||||
index->Load();
|
||||
auto file_size = index->PhysicalSize();
|
||||
search_set_size += file_size;
|
||||
|
||||
ENGINE_LOG_DEBUG << "Search file_type " << file.file_type_ << " Of Size: "
|
||||
<< file_size/(1024*1024) << " M";
|
||||
|
||||
int inner_k = index->Count() < k ? index->Count() : k;
|
||||
auto start_time = METRICS_NOW_TIME;
|
||||
index->Search(nq, vectors, inner_k, output_distence, output_ids);
|
||||
auto end_time = METRICS_NOW_TIME;
|
||||
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
|
||||
CollectFileMetrics(file.file_type_, file_size, total_time);
|
||||
cluster(output_ids, output_distence, inner_k); // cluster to each query
|
||||
memset(output_distence, 0, k * nq * sizeof(float));
|
||||
memset(output_ids, 0, k * nq * sizeof(long));
|
||||
}
|
||||
};
|
||||
|
||||
auto topk_cpu = [](const std::vector<float> &input_data,
|
||||
const int &k,
|
||||
float *output_distence,
|
||||
long *output_ids) -> void {
|
||||
std::map<float, std::vector<int>> inverted_table;
|
||||
for (int i = 0; i < input_data.size(); ++i) {
|
||||
if (inverted_table.count(input_data[i]) == 1) {
|
||||
auto& ori_vec = inverted_table[input_data[i]];
|
||||
ori_vec.push_back(i);
|
||||
}
|
||||
else {
|
||||
inverted_table[input_data[i]] = std::vector<int>{i};
|
||||
}
|
||||
}
|
||||
|
||||
int count = 0;
|
||||
for (auto &item : inverted_table){
|
||||
if (count == k) break;
|
||||
for (auto &id : item.second){
|
||||
output_distence[count] = item.first;
|
||||
output_ids[count] = id;
|
||||
if (++count == k) break;
|
||||
}
|
||||
}
|
||||
};
|
||||
auto cluster_topk = [&]() -> void {
|
||||
QueryResult res;
|
||||
for (auto &result_pair : batchresult) {
|
||||
auto &dis = result_pair.second;
|
||||
auto &nns = result_pair.first;
|
||||
|
||||
topk_cpu(dis, k, output_distence, output_ids);
|
||||
|
||||
int inner_k = dis.size() < k ? dis.size() : k;
|
||||
for (int i = 0; i < inner_k; ++i) {
|
||||
res.emplace_back(std::make_pair(nns[output_ids[i]], output_distence[i])); // mapping
|
||||
}
|
||||
results.push_back(res); // append to result list
|
||||
res.clear();
|
||||
memset(output_distence, 0, k * nq * sizeof(float));
|
||||
memset(output_ids, 0, k * nq * sizeof(long));
|
||||
}
|
||||
};
|
||||
|
||||
search_in_index(raw_files);
|
||||
search_in_index(index_files);
|
||||
|
||||
ENGINE_LOG_DEBUG << "Search Overall Set Size = " << search_set_size << " M";
|
||||
cluster_topk();
|
||||
|
||||
free(output_distence);
|
||||
free(output_ids);
|
||||
}
|
||||
|
||||
if (results.empty()) {
|
||||
return Status::NotFound("Group " + table_id + ", search result not found!");
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files,
|
||||
uint64_t k, uint64_t nq, const float* vectors,
|
||||
const meta::DatesT& dates, QueryResults& results) {
|
||||
|
||||
@ -85,14 +85,6 @@ class DBImpl : public DB {
|
||||
~DBImpl() override;
|
||||
|
||||
private:
|
||||
Status
|
||||
QuerySync(const std::string &table_id,
|
||||
uint64_t k,
|
||||
uint64_t nq,
|
||||
const float *vectors,
|
||||
const meta::DatesT &dates,
|
||||
QueryResults &results);
|
||||
|
||||
Status
|
||||
QueryAsync(const std::string &table_id,
|
||||
const meta::TableFilesSchema &files,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user