diff --git a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp index 03cec48e5b..44ca1e8a7f 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp +++ b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp @@ -896,21 +896,23 @@ ChunkedSegmentSealedImpl::search_pk(const PkType& pk, }); } -std::vector> -ChunkedSegmentSealedImpl::search_batch_pks(const std::vector& pks, - const Timestamp* timestamps, - bool include_same_ts) const { - std::vector> pk_offsets; +void +ChunkedSegmentSealedImpl::search_batch_pks( + const std::vector& pks, + const Timestamp* timestamps, + bool include_same_ts, + const std::function& + callback) const { // handle unsorted case if (!is_sorted_by_pk_) { for (size_t i = 0; i < pks.size(); i++) { auto offsets = insert_record_.search_pk( pks[i], timestamps[i], include_same_ts); for (auto offset : offsets) { - pk_offsets.emplace_back(offset, timestamps[i]); + callback(offset, timestamps[i]); } } - return pk_offsets; + return; } auto pk_field_id = schema_->get_primary_field_id().value_or(FieldId(-1)); @@ -951,7 +953,7 @@ ChunkedSegmentSealedImpl::search_batch_pks(const std::vector& pks, auto offset = it - src + num_rows_until_chunk; if (timestamp_hit(insert_record_.timestamps_[offset], timestamp)) { - pk_offsets.emplace_back(offset, timestamp); + callback(SegOffset(offset), timestamp); } } } @@ -978,7 +980,7 @@ ChunkedSegmentSealedImpl::search_batch_pks(const std::vector& pks, if (timestamp_hit( insert_record_.timestamps_[segment_offset], timestamp)) { - pk_offsets.emplace_back(segment_offset, timestamp); + callback(SegOffset(segment_offset), timestamp); } } } @@ -993,8 +995,6 @@ ChunkedSegmentSealedImpl::search_batch_pks(const std::vector& pks, schema_->get_fields().at(pk_field_id).get_data_type())); } } - - return pk_offsets; } template @@ -1121,8 +1121,11 @@ ChunkedSegmentSealedImpl::ChunkedSegmentSealedImpl( is_sorted_by_pk_(is_sorted_by_pk), deleted_record_( &insert_record_, - [this](const std::vector& pks, const Timestamp* timestamps) { - return this->search_batch_pks(pks, timestamps, false); + [this](const std::vector& pks, + const Timestamp* timestamps, + std::function + callback) { + this->search_batch_pks(pks, timestamps, false, callback); }, segment_id) { auto mcm = storage::MmapManager::GetInstance().GetMmapChunkManager(); diff --git a/internal/core/src/segcore/ChunkedSegmentSealedImpl.h b/internal/core/src/segcore/ChunkedSegmentSealedImpl.h index f411839d02..95a6eeaf4a 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedImpl.h +++ b/internal/core/src/segcore/ChunkedSegmentSealedImpl.h @@ -204,10 +204,13 @@ class ChunkedSegmentSealedImpl : public SegmentSealed { return true; } - std::vector> - search_batch_pks(const std::vector& pks, - const Timestamp* timestamps, - bool include_same_ts) const; + void + search_batch_pks( + const std::vector& pks, + const Timestamp* timestamps, + bool include_same_ts, + const std::function& + callback) const; public: int64_t diff --git a/internal/core/src/segcore/DeletedRecord.h b/internal/core/src/segcore/DeletedRecord.h index d0eea98e88..32f0890c77 100644 --- a/internal/core/src/segcore/DeletedRecord.h +++ b/internal/core/src/segcore/DeletedRecord.h @@ -53,9 +53,11 @@ template class DeletedRecord { public: DeletedRecord(InsertRecord* insert_record, - std::function>( + std::function& pks, - const Timestamp* timestamps)> search_pk_func, + const Timestamp* timestamps, + std::function cb)> + search_pk_func, int64_t segment_id) : insert_record_(insert_record), search_pk_func_(std::move(search_pk_func)), @@ -115,30 +117,30 @@ class DeletedRecord { max_timestamp = deleted_ts; } } - auto offsets = search_pk_func_(pks, timestamps); - for (auto& [offset, deleted_ts] : offsets) { - auto row_id = offset.get(); - // if already deleted, no need to add new record - if (deleted_mask_.size() > row_id && deleted_mask_[row_id]) { - continue; - } - // if insert record and delete record is same timestamp, - // delete not take effect on this record. - if (deleted_ts == insert_record_->timestamps_[row_id]) { - continue; - } - accessor.insert(std::make_pair(deleted_ts, row_id)); - if constexpr (is_sealed) { - Assert(deleted_mask_.size() > 0); - deleted_mask_.set(row_id); - } else { - // need to add mask size firstly for growing segment - deleted_mask_.resize(insert_record_->size()); - deleted_mask_.set(row_id); - } - removed_num++; - mem_add += DELETE_PAIR_SIZE; - } + search_pk_func_( + pks, timestamps, [&](SegOffset offset, Timestamp delete_ts) { + auto row_id = offset.get(); + // if already deleted, no need to add new record + if (deleted_mask_.size() > row_id && deleted_mask_[row_id]) { + return; + } + // if insert record and delete record is same timestamp, + // delete not take effect on this record. + if (delete_ts == insert_record_->timestamps_[row_id]) { + return; + } + accessor.insert(std::make_pair(delete_ts, row_id)); + if constexpr (is_sealed) { + Assert(deleted_mask_.size() > 0); + deleted_mask_.set(row_id); + } else { + // need to add mask size firstly for growing segment + deleted_mask_.resize(insert_record_->size()); + deleted_mask_.set(row_id); + } + removed_num++; + mem_add += DELETE_PAIR_SIZE; + }); n_.fetch_add(removed_num); mem_size_.fetch_add(mem_add); @@ -322,8 +324,9 @@ class DeletedRecord { std::atomic n_ = 0; std::atomic mem_size_ = 0; InsertRecord* insert_record_; - std::function>( - const std::vector& pks, const Timestamp* timestamps)> + std::function& pks, + const Timestamp* timestamps, + std::function)> search_pk_func_; int64_t segment_id_{0}; std::shared_ptr deleted_lists_; diff --git a/internal/core/src/segcore/SegmentGrowingImpl.cpp b/internal/core/src/segcore/SegmentGrowingImpl.cpp index 7e25b189f1..1a400365c0 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.cpp +++ b/internal/core/src/segcore/SegmentGrowingImpl.cpp @@ -665,20 +665,21 @@ SegmentGrowingImpl::GetFieldDataType(milvus::FieldId field_id) const { return field_meta.get_data_type(); } -std::vector> -SegmentGrowingImpl::search_batch_pks(const std::vector& pks, - const Timestamp* timestamps, - bool include_same_ts) const { - std::vector> results; +void +SegmentGrowingImpl::search_batch_pks( + const std::vector& pks, + const Timestamp* timestamps, + bool include_same_ts, + const std::function& + callback) const { for (size_t i = 0; i < pks.size(); ++i) { auto timestamp = timestamps[i]; auto offsets = insert_record_.search_pk(pks[i], timestamp, include_same_ts); for (auto offset : offsets) { - results.emplace_back(offset, timestamp); + callback(offset, timestamp); } } - return results; } void diff --git a/internal/core/src/segcore/SegmentGrowingImpl.h b/internal/core/src/segcore/SegmentGrowingImpl.h index 0fc2b1f475..a52d1ec589 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.h +++ b/internal/core/src/segcore/SegmentGrowingImpl.h @@ -175,10 +175,13 @@ class SegmentGrowingImpl : public SegmentGrowing { void try_remove_chunks(FieldId fieldId); - std::vector> - search_batch_pks(const std::vector& pks, - const Timestamp* timestamps, - bool include_same_ts) const; + void + search_batch_pks( + const std::vector& pks, + const Timestamp* timestamps, + bool include_same_ts, + const std::function& + callback) const; public: size_t @@ -297,8 +300,10 @@ class SegmentGrowingImpl : public SegmentGrowing { deleted_record_( &insert_record_, [this](const std::vector& pks, - const Timestamp* timestamps) { - return this->search_batch_pks(pks, timestamps, false); + const Timestamp* timestamps, + std::function callback) { + this->search_batch_pks(pks, timestamps, false, callback); }, segment_id) { this->CreateTextIndexes(); diff --git a/internal/core/unittest/bench/bench_applyhits.cpp b/internal/core/unittest/bench/bench_applyhits.cpp new file mode 100644 index 0000000000..f52d83a7a8 --- /dev/null +++ b/internal/core/unittest/bench/bench_applyhits.cpp @@ -0,0 +1,77 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#include +#include +#include +#include +#include + +static void +apply_hits(milvus::BitsetType& bitset, std::vector& hits, bool v) { + for (auto v : hits) { + bitset[v] = v; + } +} + +static void +apply_hits_elementwise(milvus::BitsetType& bitset, + std::vector& hits, + bool v) { + std::sort(hits.begin(), hits.end()); + uint64_t* data = bitset.data(); + size_t j = 0; + while (j < hits.size()) { + size_t index = hits[j]; + size_t word_idx = index / 64; + uint64_t mask = 0; + do { + mask |= (1ULL << (hits[j] % 64)); + ++j; + } while (j < hits.size() && (hits[j] / 64) == word_idx); + + if (v) { + data[word_idx] |= mask; + } else { + data[word_idx] &= ~mask; + } + } +} + +static void +BM_BITSET_APPLYHITS_BRUTEFORCE(benchmark::State& stats) { + auto hits = std::vector(600000); + for (size_t i = 0; i < 600000; i++) { + hits.emplace_back(i); + } + + for (auto _ : stats) { + milvus::BitsetType bitset(600000); + apply_hits(bitset, hits, true); + } +} + +BENCHMARK(BM_BITSET_APPLYHITS_BRUTEFORCE); + +static void +BM_BITSET_APPLYHITS_ELEMENTWISE(benchmark::State& stats) { + auto hits = std::vector(600000); + for (size_t i = 0; i < 600000; i++) { + hits.emplace_back(i); + } + + for (auto _ : stats) { + milvus::BitsetType bitset(600000); + apply_hits_elementwise(bitset, hits, true); + } +} + +BENCHMARK(BM_BITSET_APPLYHITS_ELEMENTWISE); diff --git a/internal/core/unittest/bench/bench_filewrite.cpp b/internal/core/unittest/bench/bench_filewrite.cpp new file mode 100644 index 0000000000..77dca0ee35 --- /dev/null +++ b/internal/core/unittest/bench/bench_filewrite.cpp @@ -0,0 +1,89 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#include +#include +#include "common/File.h" + +static void +BN_FILE_Write_Syscall(benchmark::State& stats, int n) { + std::string s(n, '*'); + auto file_path = std::filesystem::current_path() / "bn_write_syscall"; + milvus::File f = + milvus::File::Open(file_path.string(), O_CREAT | O_TRUNC | O_RDWR); + + for (auto _ : stats) { + f.WriteInt(n); + f.Write(s.c_str(), n); + } +} + +static void +BN_FILE_Write_Stream(benchmark::State& stats, size_t buf_size, int n) { + std::string s(n, '*'); + auto file_path = std::filesystem::current_path() / "bn_write_stream"; + milvus::File f = milvus::File::Open( + file_path.string(), O_CREAT | O_TRUNC | O_RDWR, buf_size); + + for (auto _ : stats) { + f.FWriteInt(n); + f.FWrite(s.c_str(), n); + } + f.FFlush(); +} + +static void +BN_FILE_Write_Syscall_2(benchmark::State& stats) { + BN_FILE_Write_Syscall(stats, 2); +} +BENCHMARK(BN_FILE_Write_Syscall_2); + +static void +BN_FILE_Write_Syscall_65535(benchmark::State& stats) { + BN_FILE_Write_Syscall(stats, 65535); +} +BENCHMARK(BN_FILE_Write_Syscall_65535); + +static void +BN_FILE_Write_Stream_4096_2(benchmark::State& stats) { + BN_FILE_Write_Stream(stats, 4096, 2); +} +BENCHMARK(BN_FILE_Write_Stream_4096_2); + +static void +BN_FILE_Write_Stream_16384_2(benchmark::State& stats) { + BN_FILE_Write_Stream(stats, 16384, 2); +} +BENCHMARK(BN_FILE_Write_Stream_16384_2); + +static void +BN_FILE_Write_Stream_163840_2(benchmark::State& stats) { + BN_FILE_Write_Stream(stats, 163840, 2); +} +BENCHMARK(BN_FILE_Write_Stream_163840_2); + +static void +BN_FILE_Write_Stream_4096_65535(benchmark::State& stats) { + BN_FILE_Write_Stream(stats, 4096, 65535); +} +BENCHMARK(BN_FILE_Write_Stream_4096_65535); + +static void +BN_FILE_Write_Stream_16384_65535(benchmark::State& stats) { + BN_FILE_Write_Stream(stats, 16384, 65535); +} +BENCHMARK(BN_FILE_Write_Stream_16384_65535); + +static void +BN_FILE_Write_Stream_163840_65535(benchmark::State& stats) { + BN_FILE_Write_Stream(stats, 163840, 65535); +} +BENCHMARK(BN_FILE_Write_Stream_163840_65535); \ No newline at end of file diff --git a/internal/core/unittest/bench/bench_findfirst.cpp b/internal/core/unittest/bench/bench_findfirst.cpp new file mode 100644 index 0000000000..66a4116a5f --- /dev/null +++ b/internal/core/unittest/bench/bench_findfirst.cpp @@ -0,0 +1,75 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#include +#include +#include "common/File.h" +#include + +static std::vector +BF_SCAN_OFFSET(int n, milvus::BitsetType& bitset) { + std::vector res; + res.reserve(3); + for (int i = 0; i < n; i++) { + if (!bitset[i]) { + res.emplace_back(i); + } + } + return res; +} + +static void +BM_BITSET_BRUTEFORCE(benchmark::State& stats) { + // BitsetBlockType bitset; + milvus::BitsetType bitset(640000); + bitset.flip(); + bitset.set(10000, false); + bitset.set(190000, false); + bitset.set(610000, false); + + for (auto _ : stats) { + auto res = BF_SCAN_OFFSET(640000, bitset); + } +} +BENCHMARK(BM_BITSET_BRUTEFORCE); + +static std::vector +BS_FINDFIRST(int n, milvus::BitsetType& bitset) { + bitset.flip(); + int i = 0; + std::vector res; + res.reserve(3); + while (i <= n) { + auto next = bitset.find_next(i); + if (!next.has_value()) { + break; + } + i = next.value() + 1; + res.emplace_back(i); + } + bitset.flip(); + return res; +} + +static void +BM_BITSET_FINDFIRST(benchmark::State& stats) { + milvus::BitsetType bitset(640000); + bitset.flip(); + bitset.set(10000, false); + bitset.set(190000, false); + bitset.set(610000, false); + + for (auto _ : stats) { + auto res = BS_FINDFIRST(640000, bitset); + } +} + +BENCHMARK(BM_BITSET_FINDFIRST); \ No newline at end of file diff --git a/internal/core/unittest/bench/bench_search_pk.cpp b/internal/core/unittest/bench/bench_search_pk.cpp new file mode 100644 index 0000000000..f65244133f --- /dev/null +++ b/internal/core/unittest/bench/bench_search_pk.cpp @@ -0,0 +1,27 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#include +#include +#include +#include "common/type_c.h" +#include "segcore/segment_c.h" +#include "segcore/SegmentGrowing.h" +#include "segcore/SegmentSealed.h" +#include "test_cachinglayer/cachinglayer_test_utils.h" +#include "test_utils/DataGen.h" +#include "test_utils/storage_test_utils.h" + +using namespace milvus; +using namespace milvus::query; +using namespace milvus::segcore; + +static int dim = 768; \ No newline at end of file diff --git a/internal/core/unittest/test_chunked_segment.cpp b/internal/core/unittest/test_chunked_segment.cpp index d5bf9a8199..1c643df7f0 100644 --- a/internal/core/unittest/test_chunked_segment.cpp +++ b/internal/core/unittest/test_chunked_segment.cpp @@ -276,8 +276,10 @@ TEST_P(TestChunkSegment, TestSkipNextTermExpr) { proto::plan::GenericValue v1; v1.set_int64_val(10000); auto first_expr = std::make_shared( - expr::ColumnInfo(fields.at("int64"), DataType::INT64), proto::plan::OpType::GreaterEqual, v1); - + expr::ColumnInfo(fields.at("int64"), DataType::INT64), + proto::plan::OpType::GreaterEqual, + v1); + std::vector v2; for (int i = 1; i <= 5; ++i) { proto::plan::GenericValue v; @@ -286,9 +288,12 @@ TEST_P(TestChunkSegment, TestSkipNextTermExpr) { } auto second_expr = std::make_shared( expr::ColumnInfo(fields.at("pk"), DataType::INT64), v2); - auto and_expr = std::make_shared(expr::LogicalBinaryExpr::OpType::And, first_expr, second_expr); - auto plan = std::make_shared(DEFAULT_PLANNODE_ID, and_expr); - auto final = query::ExecuteQueryExpr(plan, segment.get(), chunk_num * test_data_count, MAX_TIMESTAMP); + auto and_expr = std::make_shared( + expr::LogicalBinaryExpr::OpType::And, first_expr, second_expr); + auto plan = + std::make_shared(DEFAULT_PLANNODE_ID, and_expr); + auto final = query::ExecuteQueryExpr( + plan, segment.get(), chunk_num * test_data_count, MAX_TIMESTAMP); ASSERT_EQ(5, final.count()); for (int i = 10001; i <= 10005; ++i) { ASSERT_EQ(true, final[i]) << "i: " << i; diff --git a/internal/core/unittest/test_delete_record.cpp b/internal/core/unittest/test_delete_record.cpp index af3a5bd510..e297b426da 100644 --- a/internal/core/unittest/test_delete_record.cpp +++ b/internal/core/unittest/test_delete_record.cpp @@ -42,17 +42,17 @@ TEST(DeleteMVCC, common_case) { auto segment_ptr = segment.get(); DeletedRecord delete_record( &insert_record, - [&insert_record](const std::vector& pks, - const Timestamp* timestamps) { - std::vector> results; + [&insert_record]( + const std::vector& pks, + const Timestamp* timestamps, + std::function cb) { for (size_t i = 0; i < pks.size(); ++i) { auto timestamp = timestamps[i]; auto offsets = insert_record.search_pk(pks[i], timestamp); for (auto offset : offsets) { - results.emplace_back(offset, timestamp); + cb(offset, timestamp); } } - return results; }, 0); delete_record.set_sealed_row_count(c); @@ -167,17 +167,17 @@ TEST(DeleteMVCC, delete_exist_duplicate_pks) { InsertRecord insert_record(*schema, N); DeletedRecord delete_record( &insert_record, - [&insert_record](const std::vector& pks, - const Timestamp* timestamps) { - std::vector> results; + [&insert_record]( + const std::vector& pks, + const Timestamp* timestamps, + std::function cb) { for (size_t i = 0; i < pks.size(); ++i) { auto timestamp = timestamps[i]; auto offsets = insert_record.search_pk(pks[i], timestamp); for (auto offset : offsets) { - results.emplace_back(offset, timestamp); + cb(offset, timestamp); } } - return results; }, 0); @@ -291,17 +291,17 @@ TEST(DeleteMVCC, snapshot) { InsertRecord insert_record(*schema, N); DeletedRecord delete_record( &insert_record, - [&insert_record](const std::vector& pks, - const Timestamp* timestamps) { - std::vector> results; + [&insert_record]( + const std::vector& pks, + const Timestamp* timestamps, + std::function cb) { for (size_t i = 0; i < pks.size(); ++i) { auto timestamp = timestamps[i]; auto offsets = insert_record.search_pk(pks[i], timestamp); for (auto offset : offsets) { - results.emplace_back(offset, timestamp); + cb(offset, timestamp); } } - return results; }, 0); @@ -348,17 +348,17 @@ TEST(DeleteMVCC, insert_after_snapshot) { InsertRecord insert_record(*schema, N); DeletedRecord delete_record( &insert_record, - [&insert_record](const std::vector& pks, - const Timestamp* timestamps) { - std::vector> results; + [&insert_record]( + const std::vector& pks, + const Timestamp* timestamps, + std::function cb) { for (size_t i = 0; i < pks.size(); ++i) { auto timestamp = timestamps[i]; auto offsets = insert_record.search_pk(pks[i], timestamp); for (auto offset : offsets) { - results.emplace_back(offset, timestamp); + cb(offset, timestamp); } } - return results; }, 0); @@ -452,17 +452,17 @@ TEST(DeleteMVCC, perform) { InsertRecord insert_record(*schema, N); DeletedRecord delete_record( &insert_record, - [&insert_record](const std::vector& pks, - const Timestamp* timestamps) { - std::vector> results; + [&insert_record]( + const std::vector& pks, + const Timestamp* timestamps, + std::function cb) { for (size_t i = 0; i < pks.size(); ++i) { auto timestamp = timestamps[i]; auto offsets = insert_record.search_pk(pks[i], timestamp); for (auto offset : offsets) { - results.emplace_back(offset, timestamp); + cb(offset, timestamp); } } - return results; }, 0); diff --git a/internal/core/unittest/test_utils.cpp b/internal/core/unittest/test_utils.cpp index a6cafdcf91..a00852674c 100644 --- a/internal/core/unittest/test_utils.cpp +++ b/internal/core/unittest/test_utils.cpp @@ -90,17 +90,17 @@ TEST(Util, GetDeleteBitmap) { InsertRecord insert_record(*schema, N); DeletedRecord delete_record( &insert_record, - [&insert_record](const std::vector& pks, - const Timestamp* timestamps) { - std::vector> results; + [&insert_record]( + const std::vector& pks, + const Timestamp* timestamps, + std::function cb) { for (size_t i = 0; i < pks.size(); ++i) { auto timestamp = timestamps[i]; auto offsets = insert_record.search_pk(pks[i], timestamp); for (auto offset : offsets) { - results.emplace_back(offset, timestamp); + cb(offset, timestamp); } } - return results; }, 0);