diff --git a/internal/core/src/common/LoadInfo.h b/internal/core/src/common/LoadInfo.h index f64b004525..e85e79f8c7 100644 --- a/internal/core/src/common/LoadInfo.h +++ b/internal/core/src/common/LoadInfo.h @@ -29,3 +29,9 @@ struct LoadFieldDataInfo { const void* blob = nullptr; int64_t row_count = -1; }; + +struct LoadDeletedRecordInfo { + const void* timestamps = nullptr; + const void* primary_keys = nullptr; + int64_t row_count = -1; +}; diff --git a/internal/core/src/common/type_c.h b/internal/core/src/common/type_c.h index 6f6eace773..35912a5c63 100644 --- a/internal/core/src/common/type_c.h +++ b/internal/core/src/common/type_c.h @@ -47,6 +47,12 @@ typedef struct CLoadFieldDataInfo { int64_t row_count; } CLoadFieldDataInfo; +typedef struct CLoadDeletedRecordInfo { + void* timestamps; + void* primary_keys; + int64_t row_count; +} CLoadDeletedRecordInfo; + typedef struct CProtoResult { CStatus status; CProto proto; diff --git a/internal/core/src/segcore/DeletedRecord.h b/internal/core/src/segcore/DeletedRecord.h index 7cb888a041..2c0b260312 100644 --- a/internal/core/src/segcore/DeletedRecord.h +++ b/internal/core/src/segcore/DeletedRecord.h @@ -60,6 +60,7 @@ struct DeletedRecord { AckResponder ack_responder_; ConcurrentVector timestamps_; ConcurrentVector uids_; + int64_t record_size_ = 0; private: std::shared_ptr lru_; diff --git a/internal/core/src/segcore/ScalarIndex.cpp b/internal/core/src/segcore/ScalarIndex.cpp index 4b4adc1815..830dc049b8 100644 --- a/internal/core/src/segcore/ScalarIndex.cpp +++ b/internal/core/src/segcore/ScalarIndex.cpp @@ -47,6 +47,33 @@ ScalarIndexVector::do_search_ids(const IdArray& ids) const { } return {std::move(res_ids), std::move(dst_offsets)}; } + +std::pair, std::vector> +ScalarIndexVector::do_search_ids(const std::vector& ids) const { + std::vector dst_offsets; + std::vector dst_ids; + + for (auto id : ids) { + using Pair = std::pair; + auto [iter_beg, iter_end] = + std::equal_range(mapping_.begin(), mapping_.end(), std::make_pair(id, SegOffset(0)), + [](const Pair& left, const Pair& right) { return left.first < right.first; }); + + if (iter_beg == iter_end) { + // no data + continue; + } + // TODO: for repeated key, decide the final offset with Timestamp + // no repeated key, simplified logic + AssertInfo(iter_beg + 1 == iter_end, "There are no repeated keys in more than one results"); + auto [entry_id, entry_offset] = *iter_beg; + + dst_ids.push_back(entry_id); + dst_offsets.push_back(entry_offset); + } + return {std::move(dst_ids), std::move(dst_offsets)}; +} + void ScalarIndexVector::append_data(const ScalarIndexVector::T* ids, int64_t count, SegOffset base) { for (int64_t i = 0; i < count; ++i) { diff --git a/internal/core/src/segcore/ScalarIndex.h b/internal/core/src/segcore/ScalarIndex.h index 25e2c53efb..4dee4bf2b8 100644 --- a/internal/core/src/segcore/ScalarIndex.h +++ b/internal/core/src/segcore/ScalarIndex.h @@ -25,6 +25,8 @@ class ScalarIndexBase { public: virtual std::pair, std::vector> do_search_ids(const IdArray& ids) const = 0; + virtual std::pair, std::vector> + do_search_ids(const std::vector& ids) const = 0; virtual ~ScalarIndexBase() = default; virtual std::string debug() const = 0; @@ -44,6 +46,9 @@ class ScalarIndexVector : public ScalarIndexBase { std::pair, std::vector> do_search_ids(const IdArray& ids) const override; + std::pair, std::vector> + do_search_ids(const std::vector& ids) const override; + std::string debug() const override { std::string dbg_str; diff --git a/internal/core/src/segcore/SegmentSealed.h b/internal/core/src/segcore/SegmentSealed.h index bd7dc6da7f..12a3b23a52 100644 --- a/internal/core/src/segcore/SegmentSealed.h +++ b/internal/core/src/segcore/SegmentSealed.h @@ -27,6 +27,8 @@ class SegmentSealed : public SegmentInternalInterface { virtual void LoadFieldData(const LoadFieldDataInfo& info) = 0; virtual void + LoadDeletedRecord(const LoadDeletedRecordInfo& info) = 0; + virtual void DropIndex(const FieldId field_id) = 0; virtual void DropFieldData(const FieldId field_id) = 0; diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index 7f1d220207..f24b745f45 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -158,6 +158,21 @@ SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& info) { } } +void +SegmentSealedImpl::LoadDeletedRecord(const LoadDeletedRecordInfo& info) { + AssertInfo(info.row_count > 0, "The row count of deleted record is 0"); + AssertInfo(info.primary_keys, "Deleted primary keys is null"); + AssertInfo(info.timestamps, "Deleted timestamps is null"); + auto primary_keys = reinterpret_cast(info.primary_keys); + auto timestamps = reinterpret_cast(info.timestamps); + int64_t size = info.row_count; + + deleted_record_.uids_.set_data(0, primary_keys, size); + deleted_record_.timestamps_.set_data(0, timestamps, size); + deleted_record_.ack_responder_.AddSegment(0, size); + deleted_record_.record_size_ = size; +} + int64_t SegmentSealedImpl::num_chunk_index(FieldOffset field_offset) const { return 1; @@ -212,10 +227,74 @@ SegmentSealedImpl::get_schema() const { return *schema_; } +std::shared_ptr +SegmentSealedImpl::get_deleted_bitmap(int64_t del_barrier, + Timestamp query_timestamp, + int64_t insert_barrier, + bool force) const { + auto old = deleted_record_.get_lru_entry(); + + if (old->bitmap_ptr->count() == insert_barrier) { + if (old->del_barrier == del_barrier) { + return old; + } + } + + auto current = old->clone(insert_barrier); + current->del_barrier = del_barrier; + auto bitmap = current->bitmap_ptr; + // Sealed segment only has one chunk with chunk_id 0 + auto span = deleted_record_.uids_.get_span_base(0); + auto uids_ptr = reinterpret_cast(span.data()); + auto del_size = deleted_record_.record_size_; + std::vector ids(del_size); + std::copy_n(uids_ptr, del_size, ids.data()); + + auto [uids, seg_offsets] = primary_key_index_->do_search_ids(ids); + + if (del_barrier < old->del_barrier) { + for (auto del_index = del_barrier; del_index < old->del_barrier; ++del_index) { + int64_t the_offset = seg_offsets[del_index].get(); + AssertInfo(the_offset > 0, "Seg offset is invalid"); + if (deleted_record_.timestamps_[del_index] < query_timestamp) { + bitmap->clear(the_offset); + } + } + return current; + } else { + for (auto del_index = old->del_barrier; del_index < del_barrier; ++del_index) { + int64_t the_offset = seg_offsets[del_index].get(); + AssertInfo(the_offset > 0, "Seg offset is invalid"); + if (deleted_record_.timestamps_[del_index] < query_timestamp) { + bitmap->set(the_offset); + } + } + this->deleted_record_.insert_lru_entry(current); + } + return current; +} + BitsetView SegmentSealedImpl::get_filtered_bitmap(const BitsetView& bitset, int64_t ins_barrier, Timestamp timestamp) const { - // TODO(yukun) - return bitset; + auto del_barrier = get_barrier(get_deleted_record(), timestamp); + if (del_barrier == 0) { + return bitset; + } + auto bitmap_holder = get_deleted_bitmap(del_barrier, timestamp, ins_barrier); + if (bitmap_holder == nullptr) { + return bitset; + } + AssertInfo(bitmap_holder, "bitmap_holder is null"); + auto deleted_bitmap = bitmap_holder->bitmap_ptr; + if (bitset.size() == 0) { + return BitsetView(deleted_bitmap); + } + AssertInfo(deleted_bitmap->count() == bitset.size(), "Deleted bitmap count not equal to filtered bitmap count"); + + auto filtered_bitmap = std::make_shared(bitset.size(), bitset.data()); + + auto final_bitmap = (*deleted_bitmap.get()) | (*filtered_bitmap.get()); + return BitsetView(final_bitmap); } void @@ -463,6 +542,27 @@ SegmentSealedImpl::search_ids(const IdArray& id_array, Timestamp timestamp) cons return primary_key_index_->do_search_ids(id_array); } +void +SegmentSealedImpl::Delete(int64_t row_count, const int64_t* uids_raw, const Timestamp* timestamps_raw) { + std::vector> ordering(row_count); + for (int i = 0; i < row_count; i++) { + ordering[i] = std::make_tuple(timestamps_raw[i], uids_raw[i]); + } + std::sort(ordering.begin(), ordering.end()); + std::vector src_uids(row_count); + std::vector src_timestamps(row_count); + + for (int i = 0; i < row_count; i++) { + auto [t, uid] = ordering[i]; + src_timestamps[i] = t; + src_uids[i] = uid; + } + deleted_record_.timestamps_.set_data(0, src_timestamps.data(), row_count); + deleted_record_.uids_.set_data(0, src_uids.data(), row_count); + deleted_record_.ack_responder_.AddSegment(0, row_count); + return; +} + std::vector SegmentSealedImpl::search_ids(const boost::dynamic_bitset<>& bitset, Timestamp timestamp) const { std::vector dst_offset; diff --git a/internal/core/src/segcore/SegmentSealedImpl.h b/internal/core/src/segcore/SegmentSealedImpl.h index e5aff043f4..19b6cf9397 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.h +++ b/internal/core/src/segcore/SegmentSealedImpl.h @@ -10,9 +10,15 @@ // or implied. See the License for the specific language governing permissions and limitations under the License #pragma once +#include +#include +#include + #include #include "segcore/SegmentSealed.h" +#include "ConcurrentVector.h" #include "SealedIndexingRecord.h" +#include "segcore/DeletedRecord.h" #include "ScalarIndex.h" #include #include @@ -30,6 +36,8 @@ class SegmentSealedImpl : public SegmentSealed { void LoadFieldData(const LoadFieldDataInfo& info) override; void + LoadDeletedRecord(const LoadDeletedRecordInfo& info) override; + void LoadSegmentMeta(const milvus::proto::segcore::LoadSegmentMeta& segment_meta) override; void DropIndex(const FieldId field_id) override; @@ -50,6 +58,12 @@ class SegmentSealedImpl : public SegmentSealed { const Schema& get_schema() const override; + std::shared_ptr + get_deleted_bitmap(int64_t del_barrier, + Timestamp query_timestamp, + int64_t insert_barrier, + bool force = false) const; + public: int64_t num_chunk_index(FieldOffset field_offset) const override; @@ -126,12 +140,20 @@ class SegmentSealedImpl : public SegmentSealed { return system_ready_count_ == 2; } + const DeletedRecord& + get_deleted_record() const { + return deleted_record_; + } + std::pair, std::vector> search_ids(const IdArray& id_array, Timestamp timestamp) const override; std::vector search_ids(const boost::dynamic_bitset<>& view, Timestamp timestamp) const override; + void + Delete(int64_t row_count, const int64_t* uids_raw, const Timestamp* timestamps_raw); + // virtual void // build_index_if_primary_key(FieldId field_id); @@ -151,6 +173,7 @@ class SegmentSealedImpl : public SegmentSealed { std::unique_ptr primary_key_index_; std::vector> fields_data_; + mutable DeletedRecord deleted_record_; SealedIndexingRecord vecindexs_; aligned_vector row_ids_; diff --git a/internal/core/src/segcore/segment_c.cpp b/internal/core/src/segcore/segment_c.cpp index c7fca9ff2f..20d9f0bd5b 100644 --- a/internal/core/src/segcore/segment_c.cpp +++ b/internal/core/src/segcore/segment_c.cpp @@ -184,6 +184,21 @@ LoadFieldData(CSegmentInterface c_segment, CLoadFieldDataInfo load_field_data_in } } +CStatus +LoadDeletedRecord(CSegmentInterface c_segment, CLoadDeletedRecordInfo deleted_record_info) { + try { + auto segment_interface = reinterpret_cast(c_segment); + auto segment = dynamic_cast(segment_interface); + AssertInfo(segment != nullptr, "segment conversion failed"); + auto load_info = LoadDeletedRecordInfo{deleted_record_info.timestamps, deleted_record_info.primary_keys, + deleted_record_info.row_count}; + segment->LoadDeletedRecord(load_info); + return milvus::SuccessCStatus(); + } catch (std::exception& e) { + return milvus::FailureCStatus(UnexpectedError, e.what()); + } +} + CStatus UpdateSealedSegmentIndex(CSegmentInterface c_segment, CLoadIndexInfo c_load_index_info) { try { diff --git a/internal/core/unittest/test_sealed.cpp b/internal/core/unittest/test_sealed.cpp index 0f9cee605a..1412951d5c 100644 --- a/internal/core/unittest/test_sealed.cpp +++ b/internal/core/unittest/test_sealed.cpp @@ -313,4 +313,71 @@ TEST(Sealed, LoadFieldData) { ] ])"); ASSERT_EQ(std_json.dump(-2), json.dump(-2)); -} \ No newline at end of file +} + +TEST(Sealed, Delete) { + auto dim = 16; + auto topK = 5; + auto N = 10; + auto metric_type = MetricType::METRIC_L2; + auto schema = std::make_shared(); + auto fakevec_id = schema->AddDebugField("fakevec", DataType::VECTOR_FLOAT, dim, metric_type); + auto counter_id = schema->AddDebugField("counter", DataType::INT64); + auto double_id = schema->AddDebugField("double", DataType::DOUBLE); + auto nothing_id = schema->AddDebugField("nothing", DataType::INT32); + + auto dataset = DataGen(schema, N); + + auto fakevec = dataset.get_col(0); + + auto segment = CreateSealedSegment(schema); + std::string dsl = R"({ + "bool": { + "must": [ + { + "range": { + "double": { + "GE": -1, + "LT": 1 + } + } + }, + { + "vector": { + "fakevec": { + "metric_type": "L2", + "params": { + "nprobe": 10 + }, + "query": "$0", + "topk": 5, + "round_decimal": 3 + } + } + } + ] + } + })"; + + Timestamp time = 1000000; + auto plan = CreatePlan(*schema, dsl); + auto num_queries = 5; + auto ph_group_raw = CreatePlaceholderGroup(num_queries, 16, 1024); + auto ph_group = ParsePlaceholderGroup(plan.get(), ph_group_raw.SerializeAsString()); + + ASSERT_ANY_THROW(segment->Search(plan.get(), *ph_group, time)); + + SealedLoader(dataset, *segment); + + int64_t row_count = 5; + std::vector pks{1, 2, 3, 4, 5}; + std::vector timestamps{10, 10, 10, 10, 10}; + + LoadDeletedRecordInfo info = {timestamps.data(), pks.data(), row_count}; + segment->LoadDeletedRecord(info); + + std::vector tmp_block{0, 0}; + auto view = BitsetView(tmp_block.data(), 10); + auto bitset = segment->get_filtered_bitmap(view, 10, 11); + ASSERT_EQ(bitset.size(), N); +} diff --git a/internal/querynode/segment.go b/internal/querynode/segment.go index 47bd0d5d19..f160f2a13c 100644 --- a/internal/querynode/segment.go +++ b/internal/querynode/segment.go @@ -789,6 +789,20 @@ func (s *Segment) segmentLoadFieldData(fieldID int64, rowCount int, data interfa return nil } +func (s *Segment) LoadDeletedRecord(primaryKeys []IntPrimaryKey) error { + s.segPtrMu.RLock() + defer s.segPtrMu.RUnlock() // thread safe guaranteed by segCore, use RLock + if s.segmentPtr == nil { + return errors.New("null seg core pointer") + } + if s.segmentType != segmentTypeSealed { + errMsg := fmt.Sprintln("segmentLoadFieldData failed, illegal segment type ", s.segmentType, "segmentID = ", s.ID()) + return errors.New(errMsg) + } + + return nil +} + func (s *Segment) dropFieldData(fieldID int64) error { /* CStatus