mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-31 07:55:38 +08:00
fix: readd timestamp index because segment timestamp not ordered (#33856)
#33533 Signed-off-by: luzhang <luzhang@zilliz.com> Co-authored-by: luzhang <luzhang@zilliz.com>
This commit is contained in:
parent
f67b6dc2b0
commit
e422168f09
@ -594,6 +594,7 @@ struct InsertRecord {
|
||||
timestamps_.clear();
|
||||
reserved = 0;
|
||||
ack_responder_.clear();
|
||||
timestamp_index_ = TimestampIndex();
|
||||
pk2offset_->clear();
|
||||
fields_data_.clear();
|
||||
}
|
||||
@ -610,6 +611,9 @@ struct InsertRecord {
|
||||
std::atomic<int64_t> reserved = 0;
|
||||
AckResponder ack_responder_;
|
||||
|
||||
// used for timestamps index of sealed segment
|
||||
TimestampIndex timestamp_index_;
|
||||
|
||||
// pks to row offset
|
||||
std::unique_ptr<OffsetMap> pk2offset_;
|
||||
|
||||
|
||||
@ -23,7 +23,6 @@
|
||||
#include <string_view>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
#include <boost/iterator/counting_iterator.hpp>
|
||||
|
||||
#include "Utils.h"
|
||||
#include "Types.h"
|
||||
@ -349,9 +348,19 @@ SegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) {
|
||||
offset += row_count;
|
||||
}
|
||||
|
||||
TimestampIndex index;
|
||||
auto min_slice_length = num_rows < 4096 ? 1 : 4096;
|
||||
auto meta = GenerateFakeSlices(
|
||||
timestamps.data(), num_rows, min_slice_length);
|
||||
index.set_length_meta(std::move(meta));
|
||||
// todo ::opt to avoid copy timestamps from field data
|
||||
index.build_with(timestamps.data(), num_rows);
|
||||
|
||||
// use special index
|
||||
std::unique_lock lck(mutex_);
|
||||
AssertInfo(insert_record_.timestamps_.empty(), "already exists");
|
||||
insert_record_.timestamps_.fill_chunk_data(field_data);
|
||||
insert_record_.timestamp_index_ = std::move(index);
|
||||
AssertInfo(insert_record_.timestamps_.num_chunk() == 1,
|
||||
"num chunk not equal to 1 for sealed segment");
|
||||
stats_.mem_size += sizeof(Timestamp) * data.row_count;
|
||||
@ -1519,6 +1528,12 @@ SegmentSealedImpl::debug() const {
|
||||
void
|
||||
SegmentSealedImpl::LoadSegmentMeta(
|
||||
const proto::segcore::LoadSegmentMeta& segment_meta) {
|
||||
std::unique_lock lck(mutex_);
|
||||
std::vector<int64_t> slice_lengths;
|
||||
for (auto& info : segment_meta.metas()) {
|
||||
slice_lengths.push_back(info.row_count());
|
||||
}
|
||||
insert_record_.timestamp_index_.set_length_meta(std::move(slice_lengths));
|
||||
PanicInfo(NotImplemented, "unimplemented");
|
||||
}
|
||||
|
||||
@ -1530,17 +1545,33 @@ SegmentSealedImpl::get_active_count(Timestamp ts) const {
|
||||
|
||||
void
|
||||
SegmentSealedImpl::mask_with_timestamps(BitsetType& bitset_chunk,
|
||||
Timestamp ts) const {
|
||||
auto row_count = this->get_row_count();
|
||||
auto& ts_vec = this->insert_record_.timestamps_;
|
||||
auto iter = std::upper_bound(
|
||||
boost::make_counting_iterator(static_cast<int64_t>(0)),
|
||||
boost::make_counting_iterator(row_count),
|
||||
ts,
|
||||
[&](Timestamp ts, int64_t index) { return ts < ts_vec[index]; });
|
||||
for (size_t i = *iter; i < row_count; ++i) {
|
||||
bitset_chunk.set(i);
|
||||
Timestamp timestamp) const {
|
||||
// TODO change the
|
||||
AssertInfo(insert_record_.timestamps_.num_chunk() == 1,
|
||||
"num chunk not equal to 1 for sealed segment");
|
||||
const auto& timestamps_data = insert_record_.timestamps_.get_chunk(0);
|
||||
AssertInfo(timestamps_data.size() == get_row_count(),
|
||||
fmt::format("Timestamp size not equal to row count: {}, {}",
|
||||
timestamps_data.size(),
|
||||
get_row_count()));
|
||||
auto range = insert_record_.timestamp_index_.get_active_range(timestamp);
|
||||
|
||||
// range == (size_, size_) and size_ is this->timestamps_.size().
|
||||
// it means these data are all useful, we don't need to update bitset_chunk.
|
||||
// It can be thought of as an OR operation with another bitmask that is all 0s, but it is not necessary to do so.
|
||||
if (range.first == range.second && range.first == timestamps_data.size()) {
|
||||
// just skip
|
||||
return;
|
||||
}
|
||||
// range == (0, 0). it means these data can not be used, directly set bitset_chunk to all 1s.
|
||||
// It can be thought of as an OR operation with another bitmask that is all 1s.
|
||||
if (range.first == range.second && range.first == 0) {
|
||||
bitset_chunk.set();
|
||||
return;
|
||||
}
|
||||
auto mask = TimestampIndex::GenerateBitset(
|
||||
timestamp, range, timestamps_data.data(), timestamps_data.size());
|
||||
bitset_chunk |= mask;
|
||||
}
|
||||
|
||||
bool
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user