#include #include #include #include #include #include #include #include #include namespace milvus::dog_segment { int TestABI() { return 42; } std::unique_ptr CreateSegment(SchemaPtr schema) { auto segment = std::make_unique(schema); return segment; } SegmentNaive::Record::Record(const Schema &schema) : uids_(1), timestamps_(1) { for (auto &field : schema) { if (field.is_vector()) { assert(field.get_data_type() == DataType::VECTOR_FLOAT); entity_vec_.emplace_back(std::make_shared>(field.get_dim())); } else { assert(field.get_data_type() == DataType::INT32); entity_vec_.emplace_back(std::make_shared>()); } } } int64_t SegmentNaive::PreInsert(int64_t size) { auto reserved_begin = record_.reserved.fetch_add(size); return reserved_begin; } int64_t SegmentNaive::PreDelete(int64_t size) { auto reserved_begin = deleted_record_.reserved.fetch_add(size); return reserved_begin; } auto SegmentNaive::get_deleted_bitmap(int64_t del_barrier, Timestamp query_timestamp, int64_t insert_barrier, bool force) -> std::shared_ptr { auto old = deleted_record_.get_lru_entry(); if (!force || old->bitmap_ptr->capacity() == insert_barrier) { if (old->del_barrier == del_barrier) { return old; } } auto current = old->clone(insert_barrier); current->del_barrier = del_barrier; auto bitmap = current->bitmap_ptr; if (del_barrier < old->del_barrier) { for (auto del_index = del_barrier; del_index < old->del_barrier; ++del_index) { // get uid in delete logs auto uid = deleted_record_.uids_[del_index]; // map uid to corrensponding offsets, select the max one, which should be the target // the max one should be closest to query_timestamp, so the delete log should refer to it int64_t the_offset = -1; auto[iter_b, iter_e] = uid2offset_.equal_range(uid); for (auto iter = iter_b; iter != iter_e; ++iter) { auto offset = iter->second; if (record_.timestamps_[offset] < query_timestamp) { assert(offset < insert_barrier); the_offset = std::max(the_offset, offset); } } // if not found, skip if (the_offset == -1) { continue; } // otherwise, clear the flag bitmap->clear(the_offset); } return current; } else { for (auto del_index = old->del_barrier; del_index < del_barrier; ++del_index) { // get uid in delete logs auto uid = deleted_record_.uids_[del_index]; // map uid to corrensponding offsets, select the max one, which should be the target // the max one should be closest to query_timestamp, so the delete log should refer to it int64_t the_offset = -1; auto[iter_b, iter_e] = uid2offset_.equal_range(uid); for (auto iter = iter_b; iter != iter_e; ++iter) { auto offset = iter->second; if (offset >= insert_barrier) { continue; } if (offset >= insert_barrier) { continue; } if (record_.timestamps_[offset] < query_timestamp) { assert(offset < insert_barrier); the_offset = std::max(the_offset, offset); } } // if not found, skip if (the_offset == -1) { continue; } // otherwise, set the flag bitmap->set(the_offset); } this->deleted_record_.insert_lru_entry(current); } return current; } Status SegmentNaive::Insert(int64_t reserved_begin, int64_t size, const int64_t *uids_raw, const Timestamp *timestamps_raw, const DogDataChunk &entities_raw) { assert(entities_raw.count == size); if (entities_raw.sizeof_per_row != schema_->get_total_sizeof()) { std::string msg = "entity length = " + std::to_string(entities_raw.sizeof_per_row) + ", schema length = " + std::to_string(schema_->get_total_sizeof()); throw std::runtime_error(msg); } auto raw_data = reinterpret_cast(entities_raw.raw_data); // std::vector entities(raw_data, raw_data + size * len_per_row); auto len_per_row = entities_raw.sizeof_per_row; std::vector> ordering; ordering.resize(size); // #pragma omp parallel for for (int i = 0; i < size; ++i) { ordering[i] = std::make_tuple(timestamps_raw[i], uids_raw[i], i); } std::sort(ordering.begin(), ordering.end()); auto sizeof_infos = schema_->get_sizeof_infos(); std::vector offset_infos(schema_->size() + 1, 0); std::partial_sum(sizeof_infos.begin(), sizeof_infos.end(), offset_infos.begin() + 1); std::vector> entities(schema_->size()); for (int fid = 0; fid < schema_->size(); ++fid) { auto len = sizeof_infos[fid]; entities[fid].resize(len * size); } std::vector uids(size); std::vector timestamps(size); // #pragma omp parallel for for (int index = 0; index < size; ++index) { auto[t, uid, order_index] = ordering[index]; timestamps[index] = t; uids[index] = uid; for (int fid = 0; fid < schema_->size(); ++fid) { auto len = sizeof_infos[fid]; auto offset = offset_infos[fid]; auto src = raw_data + offset + order_index * len_per_row; auto dst = entities[fid].data() + index * len; memcpy(dst, src, len); } } record_.timestamps_.set_data(reserved_begin, timestamps.data(), size); record_.uids_.set_data(reserved_begin, uids.data(), size); for (int fid = 0; fid < schema_->size(); ++fid) { record_.entity_vec_[fid]->set_data_raw(reserved_begin, entities[fid].data(), size); } for (int i = 0; i < uids.size(); ++i) { auto uid = uids[i]; // NOTE: this must be the last step, cannot be put above uid2offset_.insert(std::make_pair(uid, reserved_begin + i)); } record_.ack_responder_.AddSegment(reserved_begin, reserved_begin + size); return Status::OK(); // std::thread go(executor, std::move(uids), std::move(timestamps), std::move(entities)); // go.detach(); // const auto& schema = *schema_; // auto record_ptr = GetMutableRecord(); // assert(record_ptr); // auto& record = *record_ptr; // auto data_chunk = ColumnBasedDataChunk::from(row_values, schema); // // // TODO: use shared_lock for better concurrency // std::lock_guard lck(mutex_); // assert(state_ == SegmentState::Open); // auto ack_id = ack_count_.load(); // record.uids_.grow_by(primary_keys, primary_keys + size); // for (int64_t i = 0; i < size; ++i) { // auto key = primary_keys[i]; // auto internal_index = i + ack_id; // internal_indexes_[key] = internal_index; // } // record.timestamps_.grow_by(timestamps, timestamps + size); // for (int fid = 0; fid < schema.size(); ++fid) { // auto field = schema[fid]; // auto total_len = field.get_sizeof() * size / sizeof(float); // auto source_vec = data_chunk.entity_vecs[fid]; // record.entity_vecs_[fid].grow_by(source_vec.data(), source_vec.data() + total_len); // } // // // finish insert // ack_count_ += size; // return Status::OK(); } Status SegmentNaive::Delete(int64_t reserved_begin, int64_t size, const int64_t *uids_raw, const Timestamp *timestamps_raw) { std::vector> ordering; ordering.resize(size); // #pragma omp parallel for for (int i = 0; i < size; ++i) { ordering[i] = std::make_tuple(timestamps_raw[i], uids_raw[i]); } std::sort(ordering.begin(), ordering.end()); std::vector uids(size); std::vector timestamps(size); // #pragma omp parallel for for (int index = 0; index < size; ++index) { auto[t, uid] = ordering[index]; timestamps[index] = t; uids[index] = uid; } deleted_record_.timestamps_.set_data(reserved_begin, timestamps.data(), size); deleted_record_.uids_.set_data(reserved_begin, uids.data(), size); deleted_record_.ack_responder_.AddSegment(reserved_begin, reserved_begin + size); return Status::OK(); // for (int i = 0; i < size; ++i) { // auto key = primary_keys[i]; // auto time = timestamps[i]; // delete_logs_.insert(std::make_pair(key, time)); // } // return Status::OK(); } template int64_t get_barrier(const RecordType &record, Timestamp timestamp) { auto &vec = record.timestamps_; int64_t beg = 0; int64_t end = record.ack_responder_.GetAck(); while (beg < end) { auto mid = (beg + end) / 2; if (vec[mid] < timestamp) { beg = mid + 1; } else { end = mid; } } return beg; } Status SegmentNaive::QueryImpl(query::QueryPtr query_info, Timestamp timestamp, QueryResult &result) { auto ins_barrier = get_barrier(record_, timestamp); auto del_barrier = get_barrier(deleted_record_, timestamp); auto bitmap_holder = get_deleted_bitmap(del_barrier, timestamp, ins_barrier, true); assert(bitmap_holder); assert(bitmap_holder->bitmap_ptr->capacity() == ins_barrier); auto field_offset = schema_->get_offset(query_info->field_name); auto &field = schema_->operator[](query_info->field_name); assert(field.get_data_type() == DataType::VECTOR_FLOAT); auto dim = field.get_dim(); auto bitmap = bitmap_holder->bitmap_ptr; auto topK = query_info->topK; auto num_queries = query_info->num_queries; auto the_offset_opt = schema_->get_offset(query_info->field_name); assert(the_offset_opt.has_value()); auto vec_ptr = std::static_pointer_cast>(record_.entity_vec_.at(the_offset_opt.value())); auto index_entry = index_meta_->lookup_by_field(query_info->field_name); auto conf = index_entry.config; conf[milvus::knowhere::meta::TOPK] = query_info->topK; { auto count = 0; for (int i = 0; i < bitmap->capacity(); ++i) { if (bitmap->test(i)) { ++count; } } std::cout << "fuck " << count << std::endl; } auto indexing = std::static_pointer_cast(indexings_[index_entry.index_name]); indexing->SetBlacklist(bitmap); auto ds = knowhere::GenDataset(query_info->num_queries, dim, query_info->query_raw_data.data()); auto final = indexing->Query(ds, conf); auto ids = final->Get(knowhere::meta::IDS); auto distances = final->Get(knowhere::meta::DISTANCE); auto total_num = num_queries * topK; result.result_ids_.resize(total_num); result.result_distances_.resize(total_num); result.row_num_ = total_num; result.num_queries_ = num_queries; result.topK_ = topK; std::copy_n(ids, total_num, result.result_ids_.data()); std::copy_n(distances, total_num, result.result_distances_.data()); for (auto &id: result.result_ids_) { id = record_.uids_[id]; } return Status::OK(); } void merge_into(int64_t queries, int64_t topk, float *distances, int64_t *uids, const float *new_distances, const int64_t *new_uids) { for(int64_t qn = 0; qn < queries; ++qn) { auto base = qn * topk; auto src2_dis = distances + base; auto src2_uids = uids + base; auto src1_dis = new_distances + base; auto src1_uids = new_uids + base; std::vector buf_dis(topk); std::vector buf_uids(topk); auto it1 = 0; auto it2 = 0; for(auto buf = 0; buf < topk; ++buf){ if(src1_dis[it1] <= src2_dis[it2]) { buf_dis[buf] = src1_dis[it1]; buf_uids[buf] = src1_uids[it1]; ++it1; } else { buf_dis[buf] = src2_dis[it2]; buf_uids[buf] = src2_uids[it2]; ++it2; } } std::copy_n(buf_dis.data(), topk, src2_dis); std::copy_n(buf_uids.data(), topk, src2_uids); } } Status SegmentNaive::QueryBruteForceImpl(query::QueryPtr query_info, Timestamp timestamp, QueryResult &results) { auto ins_barrier = get_barrier(record_, timestamp); auto del_barrier = get_barrier(deleted_record_, timestamp); auto bitmap_holder = get_deleted_bitmap(del_barrier, timestamp, ins_barrier); assert(bitmap_holder); auto &field = schema_->operator[](query_info->field_name); assert(field.get_data_type() == DataType::VECTOR_FLOAT); auto dim = field.get_dim(); auto bitmap = bitmap_holder->bitmap_ptr; auto topK = query_info->topK; auto num_queries = query_info->num_queries; auto total_count = topK * num_queries; // TODO: optimize auto the_offset_opt = schema_->get_offset(query_info->field_name); assert(the_offset_opt.has_value()); auto vec_ptr = std::static_pointer_cast>(record_.entity_vec_.at(the_offset_opt.value())); std::vector final_uids(total_count); std::vector final_dis(total_count, std::numeric_limits::max()); auto max_chunk = (ins_barrier + DefaultElementPerChunk - 1) / DefaultElementPerChunk; for (int chunk_id = 0; chunk_id < max_chunk; ++chunk_id) { std::vector buf_uids(total_count, -1); std::vector buf_dis(total_count, std::numeric_limits::max()); faiss::float_maxheap_array_t buf = { (size_t)num_queries, (size_t)topK, buf_uids.data(), buf_dis.data()}; auto src_data = vec_ptr->get_chunk(chunk_id).data(); auto nsize = chunk_id != max_chunk - 1? DefaultElementPerChunk: ins_barrier - chunk_id * DefaultElementPerChunk; auto offset = chunk_id * DefaultElementPerChunk; faiss::knn_L2sqr(query_info->query_raw_data.data(), src_data, dim, num_queries, nsize, &buf, bitmap, offset); if(chunk_id == 0) { final_uids = buf_uids; final_dis = buf_dis; } else { merge_into(num_queries, topK, final_dis.data(), final_uids.data(), buf_dis.data(), buf_uids.data()); } } for(auto& id: final_uids) { id = record_.uids_[id]; } results.result_ids_ = std::move(final_uids); results.result_distances_ = std::move(final_dis); results.topK_ = topK; results.num_queries_ = num_queries; results.row_num_ = total_count; // throw std::runtime_error("unimplemented"); return Status::OK(); } Status SegmentNaive::QuerySlowImpl(query::QueryPtr query_info, Timestamp timestamp, QueryResult &result) { auto ins_barrier = get_barrier(record_, timestamp); auto del_barrier = get_barrier(deleted_record_, timestamp); auto bitmap_holder = get_deleted_bitmap(del_barrier, timestamp, ins_barrier); assert(bitmap_holder); auto &field = schema_->operator[](query_info->field_name); assert(field.get_data_type() == DataType::VECTOR_FLOAT); auto dim = field.get_dim(); auto bitmap = bitmap_holder->bitmap_ptr; auto topK = query_info->topK; auto num_queries = query_info->num_queries; // TODO: optimize auto the_offset_opt = schema_->get_offset(query_info->field_name); assert(the_offset_opt.has_value()); auto vec_ptr = std::static_pointer_cast>(record_.entity_vec_.at(the_offset_opt.value())); std::vector>> records(num_queries); auto get_L2_distance = [dim](const float *a, const float *b) { float L2_distance = 0; for (auto i = 0; i < dim; ++i) { auto d = a[i] - b[i]; L2_distance += d * d; } return L2_distance; }; for (int64_t i = 0; i < ins_barrier; ++i) { if (i < bitmap->capacity() && bitmap->test(i)) { continue; } auto element = vec_ptr->get_element(i); for (auto query_id = 0; query_id < num_queries; ++query_id) { auto query_blob = query_info->query_raw_data.data() + query_id * dim; auto dis = get_L2_distance(query_blob, element); auto &record = records[query_id]; if (record.size() < topK) { record.emplace(dis, i); } else if (record.top().first > dis) { record.emplace(dis, i); record.pop(); } } } result.num_queries_ = num_queries; result.topK_ = topK; auto row_num = topK * num_queries; result.row_num_ = topK * num_queries; result.result_ids_.resize(row_num); result.result_distances_.resize(row_num); for (int q_id = 0; q_id < num_queries; ++q_id) { // reverse for (int i = 0; i < topK; ++i) { auto dst_id = topK - 1 - i + q_id * topK; auto[dis, offset] = records[q_id].top(); records[q_id].pop(); result.result_ids_[dst_id] = record_.uids_[offset]; result.result_distances_[dst_id] = dis; } } return Status::OK(); } Status SegmentNaive::Query(query::QueryPtr query_info, Timestamp timestamp, QueryResult &result) { // TODO: enable delete // TODO: enable index // TODO: remove mock if (query_info == nullptr) { query_info = std::make_shared(); query_info->field_name = "fakevec"; query_info->topK = 10; query_info->num_queries = 1; auto dim = schema_->operator[]("fakevec").get_dim(); std::default_random_engine e(42); std::uniform_real_distribution<> dis(0.0, 1.0); query_info->query_raw_data.resize(query_info->num_queries * dim); for (auto &x: query_info->query_raw_data) { x = dis(e); } } if (index_ready_) { return QueryImpl(query_info, timestamp, result); } else { return QueryBruteForceImpl(query_info, timestamp, result); } } Status SegmentNaive::Close() { if (this->record_.reserved != this->record_.ack_responder_.GetAck()) { std::runtime_error("insert not ready"); } if (this->deleted_record_.reserved != this->record_.ack_responder_.GetAck()) { std::runtime_error("delete not ready"); } state_ = SegmentState::Closed; return Status::OK(); } template knowhere::IndexPtr SegmentNaive::BuildVecIndexImpl(const IndexMeta::Entry &entry) { auto offset_opt = schema_->get_offset(entry.field_name); assert(offset_opt.has_value()); auto offset = offset_opt.value(); auto field = (*schema_)[offset]; auto dim = field.get_dim(); auto indexing = knowhere::VecIndexFactory::GetInstance().CreateVecIndex(entry.type, entry.mode); auto chunk_size = record_.uids_.chunk_size(); auto &uids = record_.uids_; auto entities = record_.get_vec_entity(offset); std::vector datasets; for (int chunk_id = 0; chunk_id < uids.chunk_size(); ++chunk_id) { auto entities_chunk = entities->get_chunk(chunk_id).data(); int64_t count = chunk_id == uids.chunk_size() - 1 ? record_.reserved - chunk_id * DefaultElementPerChunk : DefaultElementPerChunk; datasets.push_back(knowhere::GenDataset(count, dim, entities_chunk)); } for (auto &ds: datasets) { indexing->Train(ds, entry.config); } for (auto &ds: datasets) { indexing->AddWithoutIds(ds, entry.config); } return indexing; } Status SegmentNaive::BuildIndex(IndexMetaPtr remote_index_meta) { if (remote_index_meta == nullptr) { int dim = 0; std::string index_field_name; for (auto& field: schema_->get_fields()) { if (field.get_data_type() == DataType::VECTOR_FLOAT) { dim = field.get_dim(); index_field_name = field.get_name(); } } assert(dim != 0); assert(!index_field_name.empty()); auto index_meta = std::make_shared(schema_); // TODO: this is merge of query conf and insert conf // TODO: should be splitted into multiple configs auto conf = milvus::knowhere::Config{ {milvus::knowhere::meta::DIM, dim}, {milvus::knowhere::IndexParams::nlist, 100}, {milvus::knowhere::IndexParams::nprobe, 4}, {milvus::knowhere::IndexParams::m, 4}, {milvus::knowhere::IndexParams::nbits, 8}, {milvus::knowhere::Metric::TYPE, milvus::knowhere::Metric::L2}, {milvus::knowhere::meta::DEVICEID, 0}, }; index_meta->AddEntry("fakeindex", index_field_name, knowhere::IndexEnum::INDEX_FAISS_IVFPQ, knowhere::IndexMode::MODE_CPU, conf); remote_index_meta = index_meta; } if(record_.ack_responder_.GetAck() < 1024 * 4) { return Status(SERVER_BUILD_INDEX_ERROR, "too few elements"); } for (auto&[index_name, entry]: index_meta_->get_entries()) { assert(entry.index_name == index_name); const auto &field = (*schema_)[entry.field_name]; if (field.is_vector()) { assert(field.get_data_type() == engine::DataType::VECTOR_FLOAT); auto index_ptr = BuildVecIndexImpl(entry); indexings_[index_name] = index_ptr; } else { throw std::runtime_error("unimplemented"); } } index_ready_ = true; return Status::OK(); } int64_t SegmentNaive::GetMemoryUsageInBytes() { int64_t total_bytes = 0; if(index_ready_) { auto& index_entries = index_meta_->get_entries(); for(auto [index_name, entry]: index_entries) { assert(schema_->operator[](entry.field_name).is_vector()); auto vec_ptr = std::static_pointer_cast(indexings_[index_name]); total_bytes += vec_ptr->IndexSize(); } } int64_t ins_n = (record_.reserved + DefaultElementPerChunk - 1) & (DefaultElementPerChunk - 1); total_bytes += ins_n * (schema_->get_total_sizeof() + 16 + 1); int64_t del_n = (deleted_record_.reserved + DefaultElementPerChunk - 1) & (DefaultElementPerChunk - 1); total_bytes += del_n * (16 * 2); return total_bytes; } } // namespace milvus::dog_segment