diff --git a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp index a1d46863aa..03cec48e5b 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp +++ b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp @@ -898,12 +898,14 @@ ChunkedSegmentSealedImpl::search_pk(const PkType& pk, std::vector> ChunkedSegmentSealedImpl::search_batch_pks(const std::vector& pks, - const Timestamp* timestamps) const { + const Timestamp* timestamps, + bool include_same_ts) const { std::vector> pk_offsets; // handle unsorted case if (!is_sorted_by_pk_) { for (size_t i = 0; i < pks.size(); i++) { - auto offsets = insert_record_.search_pk(pks[i], timestamps[i]); + auto offsets = insert_record_.search_pk( + pks[i], timestamps[i], include_same_ts); for (auto offset : offsets) { pk_offsets.emplace_back(offset, timestamps[i]); } @@ -917,6 +919,13 @@ ChunkedSegmentSealedImpl::search_batch_pks(const std::vector& pks, auto all_chunk_pins = pk_column->GetAllChunks(); + auto timestamp_hit = include_same_ts + ? [](const Timestamp& ts1, + const Timestamp& ts2) { return ts1 <= ts2; } + : [](const Timestamp& ts1, const Timestamp& ts2) { + return ts1 < ts2; + }; + switch (schema_->get_fields().at(pk_field_id).get_data_type()) { case DataType::INT64: { auto num_chunk = pk_column->num_chunks(); @@ -940,7 +949,8 @@ ChunkedSegmentSealedImpl::search_batch_pks(const std::vector& pks, pk_column->GetNumRowsUntilChunk(i); for (; it != src + chunk_row_num && *it == target; ++it) { auto offset = it - src + num_rows_until_chunk; - if (insert_record_.timestamps_[offset] <= timestamp) { + if (timestamp_hit(insert_record_.timestamps_[offset], + timestamp)) { pk_offsets.emplace_back(offset, timestamp); } } @@ -965,8 +975,9 @@ ChunkedSegmentSealedImpl::search_batch_pks(const std::vector& pks, string_chunk->operator[](offset) == target; ++offset) { auto segment_offset = offset + num_rows_until_chunk; - if (insert_record_.timestamps_[segment_offset] <= - timestamp) { + if (timestamp_hit( + insert_record_.timestamps_[segment_offset], + timestamp)) { pk_offsets.emplace_back(segment_offset, timestamp); } } @@ -1111,7 +1122,7 @@ ChunkedSegmentSealedImpl::ChunkedSegmentSealedImpl( deleted_record_( &insert_record_, [this](const std::vector& pks, const Timestamp* timestamps) { - return this->search_batch_pks(pks, timestamps); + return this->search_batch_pks(pks, timestamps, false); }, segment_id) { auto mcm = storage::MmapManager::GetInstance().GetMmapChunkManager(); diff --git a/internal/core/src/segcore/ChunkedSegmentSealedImpl.h b/internal/core/src/segcore/ChunkedSegmentSealedImpl.h index fd44ffb2c1..f411839d02 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedImpl.h +++ b/internal/core/src/segcore/ChunkedSegmentSealedImpl.h @@ -206,7 +206,8 @@ class ChunkedSegmentSealedImpl : public SegmentSealed { std::vector> search_batch_pks(const std::vector& pks, - const Timestamp* timestamps) const; + const Timestamp* timestamps, + bool include_same_ts) const; public: int64_t diff --git a/internal/core/src/segcore/InsertRecord.h b/internal/core/src/segcore/InsertRecord.h index f51ff3cf4b..5f7433d5f5 100644 --- a/internal/core/src/segcore/InsertRecord.h +++ b/internal/core/src/segcore/InsertRecord.h @@ -321,12 +321,20 @@ struct InsertRecord { } std::vector - search_pk(const PkType& pk, Timestamp timestamp) const { + search_pk(const PkType& pk, + Timestamp timestamp, + bool include_same_ts = true) const { std::shared_lock lck(shared_mutex_); std::vector res_offsets; auto offset_iter = pk2offset_->find(pk); + auto timestamp_hit = + include_same_ts ? [](const Timestamp& ts1, + const Timestamp& ts2) { return ts1 <= ts2; } + : [](const Timestamp& ts1, const Timestamp& ts2) { + return ts1 < ts2; + }; for (auto offset : offset_iter) { - if (timestamps_[offset] <= timestamp) { + if (timestamp_hit(timestamps_[offset], timestamp)) { res_offsets.emplace_back(offset); } } diff --git a/internal/core/src/segcore/SegmentGrowingImpl.cpp b/internal/core/src/segcore/SegmentGrowingImpl.cpp index 1b02a5730d..7e25b189f1 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.cpp +++ b/internal/core/src/segcore/SegmentGrowingImpl.cpp @@ -667,11 +667,13 @@ SegmentGrowingImpl::GetFieldDataType(milvus::FieldId field_id) const { std::vector> SegmentGrowingImpl::search_batch_pks(const std::vector& pks, - const Timestamp* timestamps) const { + const Timestamp* timestamps, + bool include_same_ts) const { std::vector> results; for (size_t i = 0; i < pks.size(); ++i) { auto timestamp = timestamps[i]; - auto offsets = insert_record_.search_pk(pks[i], timestamp); + auto offsets = + insert_record_.search_pk(pks[i], timestamp, include_same_ts); for (auto offset : offsets) { results.emplace_back(offset, timestamp); } diff --git a/internal/core/src/segcore/SegmentGrowingImpl.h b/internal/core/src/segcore/SegmentGrowingImpl.h index 6e3fbeb1b3..0fc2b1f475 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.h +++ b/internal/core/src/segcore/SegmentGrowingImpl.h @@ -177,7 +177,8 @@ class SegmentGrowingImpl : public SegmentGrowing { std::vector> search_batch_pks(const std::vector& pks, - const Timestamp* timestamps) const; + const Timestamp* timestamps, + bool include_same_ts) const; public: size_t @@ -297,7 +298,7 @@ class SegmentGrowingImpl : public SegmentGrowing { &insert_record_, [this](const std::vector& pks, const Timestamp* timestamps) { - return this->search_batch_pks(pks, timestamps); + return this->search_batch_pks(pks, timestamps, false); }, segment_id) { this->CreateTextIndexes();