enhance: Pass callback in search batch pks to void large result (#43695)

Related to #43660

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2025-08-02 17:57:37 +08:00 committed by GitHub
parent 75666153e3
commit 4aff581007
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 381 additions and 93 deletions

View File

@ -896,21 +896,23 @@ ChunkedSegmentSealedImpl::search_pk(const PkType& pk,
});
}
std::vector<std::pair<SegOffset, Timestamp>>
ChunkedSegmentSealedImpl::search_batch_pks(const std::vector<PkType>& pks,
const Timestamp* timestamps,
bool include_same_ts) const {
std::vector<std::pair<SegOffset, Timestamp>> pk_offsets;
void
ChunkedSegmentSealedImpl::search_batch_pks(
const std::vector<PkType>& pks,
const Timestamp* timestamps,
bool include_same_ts,
const std::function<void(const SegOffset offset, const Timestamp ts)>&
callback) const {
// handle unsorted case
if (!is_sorted_by_pk_) {
for (size_t i = 0; i < pks.size(); i++) {
auto offsets = insert_record_.search_pk(
pks[i], timestamps[i], include_same_ts);
for (auto offset : offsets) {
pk_offsets.emplace_back(offset, timestamps[i]);
callback(offset, timestamps[i]);
}
}
return pk_offsets;
return;
}
auto pk_field_id = schema_->get_primary_field_id().value_or(FieldId(-1));
@ -951,7 +953,7 @@ ChunkedSegmentSealedImpl::search_batch_pks(const std::vector<PkType>& pks,
auto offset = it - src + num_rows_until_chunk;
if (timestamp_hit(insert_record_.timestamps_[offset],
timestamp)) {
pk_offsets.emplace_back(offset, timestamp);
callback(SegOffset(offset), timestamp);
}
}
}
@ -978,7 +980,7 @@ ChunkedSegmentSealedImpl::search_batch_pks(const std::vector<PkType>& pks,
if (timestamp_hit(
insert_record_.timestamps_[segment_offset],
timestamp)) {
pk_offsets.emplace_back(segment_offset, timestamp);
callback(SegOffset(segment_offset), timestamp);
}
}
}
@ -993,8 +995,6 @@ ChunkedSegmentSealedImpl::search_batch_pks(const std::vector<PkType>& pks,
schema_->get_fields().at(pk_field_id).get_data_type()));
}
}
return pk_offsets;
}
template <typename Condition>
@ -1121,8 +1121,11 @@ ChunkedSegmentSealedImpl::ChunkedSegmentSealedImpl(
is_sorted_by_pk_(is_sorted_by_pk),
deleted_record_(
&insert_record_,
[this](const std::vector<PkType>& pks, const Timestamp* timestamps) {
return this->search_batch_pks(pks, timestamps, false);
[this](const std::vector<PkType>& pks,
const Timestamp* timestamps,
std::function<void(const SegOffset offset, const Timestamp ts)>
callback) {
this->search_batch_pks(pks, timestamps, false, callback);
},
segment_id) {
auto mcm = storage::MmapManager::GetInstance().GetMmapChunkManager();

View File

@ -204,10 +204,13 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
return true;
}
std::vector<std::pair<SegOffset, Timestamp>>
search_batch_pks(const std::vector<PkType>& pks,
const Timestamp* timestamps,
bool include_same_ts) const;
void
search_batch_pks(
const std::vector<PkType>& pks,
const Timestamp* timestamps,
bool include_same_ts,
const std::function<void(const SegOffset offset, const Timestamp ts)>&
callback) const;
public:
int64_t

View File

@ -53,9 +53,11 @@ template <bool is_sealed = false>
class DeletedRecord {
public:
DeletedRecord(InsertRecord<is_sealed>* insert_record,
std::function<std::vector<std::pair<SegOffset, Timestamp>>(
std::function<void(
const std::vector<PkType>& pks,
const Timestamp* timestamps)> search_pk_func,
const Timestamp* timestamps,
std::function<void(SegOffset offset, Timestamp ts)> cb)>
search_pk_func,
int64_t segment_id)
: insert_record_(insert_record),
search_pk_func_(std::move(search_pk_func)),
@ -115,30 +117,30 @@ class DeletedRecord {
max_timestamp = deleted_ts;
}
}
auto offsets = search_pk_func_(pks, timestamps);
for (auto& [offset, deleted_ts] : offsets) {
auto row_id = offset.get();
// if already deleted, no need to add new record
if (deleted_mask_.size() > row_id && deleted_mask_[row_id]) {
continue;
}
// if insert record and delete record is same timestamp,
// delete not take effect on this record.
if (deleted_ts == insert_record_->timestamps_[row_id]) {
continue;
}
accessor.insert(std::make_pair(deleted_ts, row_id));
if constexpr (is_sealed) {
Assert(deleted_mask_.size() > 0);
deleted_mask_.set(row_id);
} else {
// need to add mask size firstly for growing segment
deleted_mask_.resize(insert_record_->size());
deleted_mask_.set(row_id);
}
removed_num++;
mem_add += DELETE_PAIR_SIZE;
}
search_pk_func_(
pks, timestamps, [&](SegOffset offset, Timestamp delete_ts) {
auto row_id = offset.get();
// if already deleted, no need to add new record
if (deleted_mask_.size() > row_id && deleted_mask_[row_id]) {
return;
}
// if insert record and delete record is same timestamp,
// delete not take effect on this record.
if (delete_ts == insert_record_->timestamps_[row_id]) {
return;
}
accessor.insert(std::make_pair(delete_ts, row_id));
if constexpr (is_sealed) {
Assert(deleted_mask_.size() > 0);
deleted_mask_.set(row_id);
} else {
// need to add mask size firstly for growing segment
deleted_mask_.resize(insert_record_->size());
deleted_mask_.set(row_id);
}
removed_num++;
mem_add += DELETE_PAIR_SIZE;
});
n_.fetch_add(removed_num);
mem_size_.fetch_add(mem_add);
@ -322,8 +324,9 @@ class DeletedRecord {
std::atomic<int64_t> n_ = 0;
std::atomic<int64_t> mem_size_ = 0;
InsertRecord<is_sealed>* insert_record_;
std::function<std::vector<std::pair<SegOffset, Timestamp>>(
const std::vector<PkType>& pks, const Timestamp* timestamps)>
std::function<void(const std::vector<PkType>& pks,
const Timestamp* timestamps,
std::function<void(SegOffset offset, Timestamp ts)>)>
search_pk_func_;
int64_t segment_id_{0};
std::shared_ptr<SortedDeleteList> deleted_lists_;

View File

@ -665,20 +665,21 @@ SegmentGrowingImpl::GetFieldDataType(milvus::FieldId field_id) const {
return field_meta.get_data_type();
}
std::vector<std::pair<SegOffset, Timestamp>>
SegmentGrowingImpl::search_batch_pks(const std::vector<PkType>& pks,
const Timestamp* timestamps,
bool include_same_ts) const {
std::vector<std::pair<SegOffset, Timestamp>> results;
void
SegmentGrowingImpl::search_batch_pks(
const std::vector<PkType>& pks,
const Timestamp* timestamps,
bool include_same_ts,
const std::function<void(const SegOffset offset, const Timestamp ts)>&
callback) const {
for (size_t i = 0; i < pks.size(); ++i) {
auto timestamp = timestamps[i];
auto offsets =
insert_record_.search_pk(pks[i], timestamp, include_same_ts);
for (auto offset : offsets) {
results.emplace_back(offset, timestamp);
callback(offset, timestamp);
}
}
return results;
}
void

View File

@ -175,10 +175,13 @@ class SegmentGrowingImpl : public SegmentGrowing {
void
try_remove_chunks(FieldId fieldId);
std::vector<std::pair<SegOffset, Timestamp>>
search_batch_pks(const std::vector<PkType>& pks,
const Timestamp* timestamps,
bool include_same_ts) const;
void
search_batch_pks(
const std::vector<PkType>& pks,
const Timestamp* timestamps,
bool include_same_ts,
const std::function<void(const SegOffset offset, const Timestamp ts)>&
callback) const;
public:
size_t
@ -297,8 +300,10 @@ class SegmentGrowingImpl : public SegmentGrowing {
deleted_record_(
&insert_record_,
[this](const std::vector<PkType>& pks,
const Timestamp* timestamps) {
return this->search_batch_pks(pks, timestamps, false);
const Timestamp* timestamps,
std::function<void(const SegOffset offset,
const Timestamp ts)> callback) {
this->search_batch_pks(pks, timestamps, false, callback);
},
segment_id) {
this->CreateTextIndexes();

View File

@ -0,0 +1,77 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <benchmark/benchmark.h>
#include <vector>
#include <algorithm>
#include <common/Types.h>
#include <folly/FBVector.h>
static void
apply_hits(milvus::BitsetType& bitset, std::vector<size_t>& hits, bool v) {
for (auto v : hits) {
bitset[v] = v;
}
}
static void
apply_hits_elementwise(milvus::BitsetType& bitset,
std::vector<size_t>& hits,
bool v) {
std::sort(hits.begin(), hits.end());
uint64_t* data = bitset.data();
size_t j = 0;
while (j < hits.size()) {
size_t index = hits[j];
size_t word_idx = index / 64;
uint64_t mask = 0;
do {
mask |= (1ULL << (hits[j] % 64));
++j;
} while (j < hits.size() && (hits[j] / 64) == word_idx);
if (v) {
data[word_idx] |= mask;
} else {
data[word_idx] &= ~mask;
}
}
}
static void
BM_BITSET_APPLYHITS_BRUTEFORCE(benchmark::State& stats) {
auto hits = std::vector<size_t>(600000);
for (size_t i = 0; i < 600000; i++) {
hits.emplace_back(i);
}
for (auto _ : stats) {
milvus::BitsetType bitset(600000);
apply_hits(bitset, hits, true);
}
}
BENCHMARK(BM_BITSET_APPLYHITS_BRUTEFORCE);
static void
BM_BITSET_APPLYHITS_ELEMENTWISE(benchmark::State& stats) {
auto hits = std::vector<size_t>(600000);
for (size_t i = 0; i < 600000; i++) {
hits.emplace_back(i);
}
for (auto _ : stats) {
milvus::BitsetType bitset(600000);
apply_hits_elementwise(bitset, hits, true);
}
}
BENCHMARK(BM_BITSET_APPLYHITS_ELEMENTWISE);

View File

@ -0,0 +1,89 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <benchmark/benchmark.h>
#include <filesystem>
#include "common/File.h"
static void
BN_FILE_Write_Syscall(benchmark::State& stats, int n) {
std::string s(n, '*');
auto file_path = std::filesystem::current_path() / "bn_write_syscall";
milvus::File f =
milvus::File::Open(file_path.string(), O_CREAT | O_TRUNC | O_RDWR);
for (auto _ : stats) {
f.WriteInt(n);
f.Write(s.c_str(), n);
}
}
static void
BN_FILE_Write_Stream(benchmark::State& stats, size_t buf_size, int n) {
std::string s(n, '*');
auto file_path = std::filesystem::current_path() / "bn_write_stream";
milvus::File f = milvus::File::Open(
file_path.string(), O_CREAT | O_TRUNC | O_RDWR, buf_size);
for (auto _ : stats) {
f.FWriteInt(n);
f.FWrite(s.c_str(), n);
}
f.FFlush();
}
static void
BN_FILE_Write_Syscall_2(benchmark::State& stats) {
BN_FILE_Write_Syscall(stats, 2);
}
BENCHMARK(BN_FILE_Write_Syscall_2);
static void
BN_FILE_Write_Syscall_65535(benchmark::State& stats) {
BN_FILE_Write_Syscall(stats, 65535);
}
BENCHMARK(BN_FILE_Write_Syscall_65535);
static void
BN_FILE_Write_Stream_4096_2(benchmark::State& stats) {
BN_FILE_Write_Stream(stats, 4096, 2);
}
BENCHMARK(BN_FILE_Write_Stream_4096_2);
static void
BN_FILE_Write_Stream_16384_2(benchmark::State& stats) {
BN_FILE_Write_Stream(stats, 16384, 2);
}
BENCHMARK(BN_FILE_Write_Stream_16384_2);
static void
BN_FILE_Write_Stream_163840_2(benchmark::State& stats) {
BN_FILE_Write_Stream(stats, 163840, 2);
}
BENCHMARK(BN_FILE_Write_Stream_163840_2);
static void
BN_FILE_Write_Stream_4096_65535(benchmark::State& stats) {
BN_FILE_Write_Stream(stats, 4096, 65535);
}
BENCHMARK(BN_FILE_Write_Stream_4096_65535);
static void
BN_FILE_Write_Stream_16384_65535(benchmark::State& stats) {
BN_FILE_Write_Stream(stats, 16384, 65535);
}
BENCHMARK(BN_FILE_Write_Stream_16384_65535);
static void
BN_FILE_Write_Stream_163840_65535(benchmark::State& stats) {
BN_FILE_Write_Stream(stats, 163840, 65535);
}
BENCHMARK(BN_FILE_Write_Stream_163840_65535);

View File

@ -0,0 +1,75 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <benchmark/benchmark.h>
#include <filesystem>
#include "common/File.h"
#include <common/Types.h>
static std::vector<int>
BF_SCAN_OFFSET(int n, milvus::BitsetType& bitset) {
std::vector<int> res;
res.reserve(3);
for (int i = 0; i < n; i++) {
if (!bitset[i]) {
res.emplace_back(i);
}
}
return res;
}
static void
BM_BITSET_BRUTEFORCE(benchmark::State& stats) {
// BitsetBlockType bitset;
milvus::BitsetType bitset(640000);
bitset.flip();
bitset.set(10000, false);
bitset.set(190000, false);
bitset.set(610000, false);
for (auto _ : stats) {
auto res = BF_SCAN_OFFSET(640000, bitset);
}
}
BENCHMARK(BM_BITSET_BRUTEFORCE);
static std::vector<int>
BS_FINDFIRST(int n, milvus::BitsetType& bitset) {
bitset.flip();
int i = 0;
std::vector<int> res;
res.reserve(3);
while (i <= n) {
auto next = bitset.find_next(i);
if (!next.has_value()) {
break;
}
i = next.value() + 1;
res.emplace_back(i);
}
bitset.flip();
return res;
}
static void
BM_BITSET_FINDFIRST(benchmark::State& stats) {
milvus::BitsetType bitset(640000);
bitset.flip();
bitset.set(10000, false);
bitset.set(190000, false);
bitset.set(610000, false);
for (auto _ : stats) {
auto res = BS_FINDFIRST(640000, bitset);
}
}
BENCHMARK(BM_BITSET_FINDFIRST);

View File

@ -0,0 +1,27 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <cstdint>
#include <benchmark/benchmark.h>
#include <string>
#include "common/type_c.h"
#include "segcore/segment_c.h"
#include "segcore/SegmentGrowing.h"
#include "segcore/SegmentSealed.h"
#include "test_cachinglayer/cachinglayer_test_utils.h"
#include "test_utils/DataGen.h"
#include "test_utils/storage_test_utils.h"
using namespace milvus;
using namespace milvus::query;
using namespace milvus::segcore;
static int dim = 768;

View File

@ -276,8 +276,10 @@ TEST_P(TestChunkSegment, TestSkipNextTermExpr) {
proto::plan::GenericValue v1;
v1.set_int64_val(10000);
auto first_expr = std::make_shared<expr::UnaryRangeFilterExpr>(
expr::ColumnInfo(fields.at("int64"), DataType::INT64), proto::plan::OpType::GreaterEqual, v1);
expr::ColumnInfo(fields.at("int64"), DataType::INT64),
proto::plan::OpType::GreaterEqual,
v1);
std::vector<proto::plan::GenericValue> v2;
for (int i = 1; i <= 5; ++i) {
proto::plan::GenericValue v;
@ -286,9 +288,12 @@ TEST_P(TestChunkSegment, TestSkipNextTermExpr) {
}
auto second_expr = std::make_shared<expr::TermFilterExpr>(
expr::ColumnInfo(fields.at("pk"), DataType::INT64), v2);
auto and_expr = std::make_shared<expr::LogicalBinaryExpr>(expr::LogicalBinaryExpr::OpType::And, first_expr, second_expr);
auto plan = std::make_shared<plan::FilterBitsNode>(DEFAULT_PLANNODE_ID, and_expr);
auto final = query::ExecuteQueryExpr(plan, segment.get(), chunk_num * test_data_count, MAX_TIMESTAMP);
auto and_expr = std::make_shared<expr::LogicalBinaryExpr>(
expr::LogicalBinaryExpr::OpType::And, first_expr, second_expr);
auto plan =
std::make_shared<plan::FilterBitsNode>(DEFAULT_PLANNODE_ID, and_expr);
auto final = query::ExecuteQueryExpr(
plan, segment.get(), chunk_num * test_data_count, MAX_TIMESTAMP);
ASSERT_EQ(5, final.count());
for (int i = 10001; i <= 10005; ++i) {
ASSERT_EQ(true, final[i]) << "i: " << i;

View File

@ -42,17 +42,17 @@ TEST(DeleteMVCC, common_case) {
auto segment_ptr = segment.get();
DeletedRecord<true> delete_record(
&insert_record,
[&insert_record](const std::vector<PkType>& pks,
const Timestamp* timestamps) {
std::vector<std::pair<SegOffset, Timestamp>> results;
[&insert_record](
const std::vector<PkType>& pks,
const Timestamp* timestamps,
std::function<void(SegOffset offset, Timestamp ts)> cb) {
for (size_t i = 0; i < pks.size(); ++i) {
auto timestamp = timestamps[i];
auto offsets = insert_record.search_pk(pks[i], timestamp);
for (auto offset : offsets) {
results.emplace_back(offset, timestamp);
cb(offset, timestamp);
}
}
return results;
},
0);
delete_record.set_sealed_row_count(c);
@ -167,17 +167,17 @@ TEST(DeleteMVCC, delete_exist_duplicate_pks) {
InsertRecord<false> insert_record(*schema, N);
DeletedRecord<false> delete_record(
&insert_record,
[&insert_record](const std::vector<PkType>& pks,
const Timestamp* timestamps) {
std::vector<std::pair<SegOffset, Timestamp>> results;
[&insert_record](
const std::vector<PkType>& pks,
const Timestamp* timestamps,
std::function<void(SegOffset offset, Timestamp ts)> cb) {
for (size_t i = 0; i < pks.size(); ++i) {
auto timestamp = timestamps[i];
auto offsets = insert_record.search_pk(pks[i], timestamp);
for (auto offset : offsets) {
results.emplace_back(offset, timestamp);
cb(offset, timestamp);
}
}
return results;
},
0);
@ -291,17 +291,17 @@ TEST(DeleteMVCC, snapshot) {
InsertRecord<false> insert_record(*schema, N);
DeletedRecord<false> delete_record(
&insert_record,
[&insert_record](const std::vector<PkType>& pks,
const Timestamp* timestamps) {
std::vector<std::pair<SegOffset, Timestamp>> results;
[&insert_record](
const std::vector<PkType>& pks,
const Timestamp* timestamps,
std::function<void(SegOffset offset, Timestamp ts)> cb) {
for (size_t i = 0; i < pks.size(); ++i) {
auto timestamp = timestamps[i];
auto offsets = insert_record.search_pk(pks[i], timestamp);
for (auto offset : offsets) {
results.emplace_back(offset, timestamp);
cb(offset, timestamp);
}
}
return results;
},
0);
@ -348,17 +348,17 @@ TEST(DeleteMVCC, insert_after_snapshot) {
InsertRecord<false> insert_record(*schema, N);
DeletedRecord<false> delete_record(
&insert_record,
[&insert_record](const std::vector<PkType>& pks,
const Timestamp* timestamps) {
std::vector<std::pair<SegOffset, Timestamp>> results;
[&insert_record](
const std::vector<PkType>& pks,
const Timestamp* timestamps,
std::function<void(SegOffset offset, Timestamp ts)> cb) {
for (size_t i = 0; i < pks.size(); ++i) {
auto timestamp = timestamps[i];
auto offsets = insert_record.search_pk(pks[i], timestamp);
for (auto offset : offsets) {
results.emplace_back(offset, timestamp);
cb(offset, timestamp);
}
}
return results;
},
0);
@ -452,17 +452,17 @@ TEST(DeleteMVCC, perform) {
InsertRecord<false> insert_record(*schema, N);
DeletedRecord<false> delete_record(
&insert_record,
[&insert_record](const std::vector<PkType>& pks,
const Timestamp* timestamps) {
std::vector<std::pair<SegOffset, Timestamp>> results;
[&insert_record](
const std::vector<PkType>& pks,
const Timestamp* timestamps,
std::function<void(SegOffset offset, Timestamp ts)> cb) {
for (size_t i = 0; i < pks.size(); ++i) {
auto timestamp = timestamps[i];
auto offsets = insert_record.search_pk(pks[i], timestamp);
for (auto offset : offsets) {
results.emplace_back(offset, timestamp);
cb(offset, timestamp);
}
}
return results;
},
0);

View File

@ -90,17 +90,17 @@ TEST(Util, GetDeleteBitmap) {
InsertRecord<false> insert_record(*schema, N);
DeletedRecord<false> delete_record(
&insert_record,
[&insert_record](const std::vector<PkType>& pks,
const Timestamp* timestamps) {
std::vector<std::pair<SegOffset, Timestamp>> results;
[&insert_record](
const std::vector<PkType>& pks,
const Timestamp* timestamps,
std::function<void(SegOffset offset, Timestamp ts)> cb) {
for (size_t i = 0; i < pks.size(); ++i) {
auto timestamp = timestamps[i];
auto offsets = insert_record.search_pk(pks[i], timestamp);
for (auto offset : offsets) {
results.emplace_back(offset, timestamp);
cb(offset, timestamp);
}
}
return results;
},
0);