diff --git a/internal/core/src/segcore/InsertRecord.h b/internal/core/src/segcore/InsertRecord.h index e4b208bfc2..7d62e303ee 100644 --- a/internal/core/src/segcore/InsertRecord.h +++ b/internal/core/src/segcore/InsertRecord.h @@ -594,6 +594,7 @@ struct InsertRecord { timestamps_.clear(); reserved = 0; ack_responder_.clear(); + timestamp_index_ = TimestampIndex(); pk2offset_->clear(); fields_data_.clear(); } @@ -610,6 +611,9 @@ struct InsertRecord { std::atomic reserved = 0; AckResponder ack_responder_; + // used for timestamps index of sealed segment + TimestampIndex timestamp_index_; + // pks to row offset std::unique_ptr pk2offset_; diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index 19962018a6..b6cac4f937 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -23,7 +23,6 @@ #include #include #include -#include #include "Utils.h" #include "Types.h" @@ -349,9 +348,19 @@ SegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) { offset += row_count; } + TimestampIndex index; + auto min_slice_length = num_rows < 4096 ? 1 : 4096; + auto meta = GenerateFakeSlices( + timestamps.data(), num_rows, min_slice_length); + index.set_length_meta(std::move(meta)); + // todo ::opt to avoid copy timestamps from field data + index.build_with(timestamps.data(), num_rows); + + // use special index std::unique_lock lck(mutex_); AssertInfo(insert_record_.timestamps_.empty(), "already exists"); insert_record_.timestamps_.fill_chunk_data(field_data); + insert_record_.timestamp_index_ = std::move(index); AssertInfo(insert_record_.timestamps_.num_chunk() == 1, "num chunk not equal to 1 for sealed segment"); stats_.mem_size += sizeof(Timestamp) * data.row_count; @@ -1519,6 +1528,12 @@ SegmentSealedImpl::debug() const { void SegmentSealedImpl::LoadSegmentMeta( const proto::segcore::LoadSegmentMeta& segment_meta) { + std::unique_lock lck(mutex_); + std::vector slice_lengths; + for (auto& info : segment_meta.metas()) { + slice_lengths.push_back(info.row_count()); + } + insert_record_.timestamp_index_.set_length_meta(std::move(slice_lengths)); PanicInfo(NotImplemented, "unimplemented"); } @@ -1530,17 +1545,33 @@ SegmentSealedImpl::get_active_count(Timestamp ts) const { void SegmentSealedImpl::mask_with_timestamps(BitsetType& bitset_chunk, - Timestamp ts) const { - auto row_count = this->get_row_count(); - auto& ts_vec = this->insert_record_.timestamps_; - auto iter = std::upper_bound( - boost::make_counting_iterator(static_cast(0)), - boost::make_counting_iterator(row_count), - ts, - [&](Timestamp ts, int64_t index) { return ts < ts_vec[index]; }); - for (size_t i = *iter; i < row_count; ++i) { - bitset_chunk.set(i); + Timestamp timestamp) const { + // TODO change the + AssertInfo(insert_record_.timestamps_.num_chunk() == 1, + "num chunk not equal to 1 for sealed segment"); + const auto& timestamps_data = insert_record_.timestamps_.get_chunk(0); + AssertInfo(timestamps_data.size() == get_row_count(), + fmt::format("Timestamp size not equal to row count: {}, {}", + timestamps_data.size(), + get_row_count())); + auto range = insert_record_.timestamp_index_.get_active_range(timestamp); + + // range == (size_, size_) and size_ is this->timestamps_.size(). + // it means these data are all useful, we don't need to update bitset_chunk. + // It can be thought of as an OR operation with another bitmask that is all 0s, but it is not necessary to do so. + if (range.first == range.second && range.first == timestamps_data.size()) { + // just skip + return; } + // range == (0, 0). it means these data can not be used, directly set bitset_chunk to all 1s. + // It can be thought of as an OR operation with another bitmask that is all 1s. + if (range.first == range.second && range.first == 0) { + bitset_chunk.set(); + return; + } + auto mask = TimestampIndex::GenerateBitset( + timestamp, range, timestamps_data.data(), timestamps_data.size()); + bitset_chunk |= mask; } bool