From 5aa0ddf77b99d8856d04275c92d4947d85d59906 Mon Sep 17 00:00:00 2001 From: yah01 Date: Tue, 18 Apr 2023 11:24:30 +0800 Subject: [PATCH] Refine field storage of sealed segment (#23464) Signed-off-by: yah01 --- .../VariableField.h => common/Column.h} | 123 ++++++++++++------ internal/core/src/common/Span.h | 5 + .../core/src/segcore/SegmentSealedImpl.cpp | 58 +++++---- internal/core/src/segcore/SegmentSealedImpl.h | 21 +-- 4 files changed, 131 insertions(+), 76 deletions(-) rename internal/core/src/{segcore/VariableField.h => common/Column.h} (55%) diff --git a/internal/core/src/segcore/VariableField.h b/internal/core/src/common/Column.h similarity index 55% rename from internal/core/src/segcore/VariableField.h rename to internal/core/src/common/Column.h index 380e616ff6..6a5fd23a10 100644 --- a/internal/core/src/segcore/VariableField.h +++ b/internal/core/src/common/Column.h @@ -13,7 +13,9 @@ #include +#include #include +#include #include #include #include @@ -32,13 +34,78 @@ struct Entry { uint32_t length; }; -// Used for string/varchar field only, -// TODO(yah01): make this generic -class VariableField { +class ColumnBase { public: - explicit VariableField(int64_t segment_id, - const FieldMeta& field_meta, - const LoadFieldDataInfo& info) { + ColumnBase() = default; + virtual ~ColumnBase() { + if (data_ != nullptr && data_ != MAP_FAILED) { + if (munmap(data_, size_)) { + AssertInfo(true, + fmt::format("failed to unmap variable field, err={}", + strerror(errno))); + } + } + } + + ColumnBase(ColumnBase&& column) noexcept + : data_(column.data_), size_(column.size_) { + column.data_ = nullptr; + column.size_ = 0; + } + + const char* + data() const { + return data_; + } + + [[nodiscard]] size_t + size() const { + return size_; + } + + virtual SpanBase + span() const = 0; + + protected: + char* data_{nullptr}; + uint64_t size_{0}; +}; + +class FixedColumn : public ColumnBase { + public: + FixedColumn(int64_t segment_id, + const FieldMeta& field_meta, + const LoadFieldDataInfo& info) { + data_ = static_cast(CreateMap(segment_id, field_meta, info)); + size_ = field_meta.get_sizeof() * info.row_count; + row_count_ = info.row_count; + } + + FixedColumn(FixedColumn&& column) noexcept + : ColumnBase(std::move(column)), row_count_(column.row_count_) { + column.row_count_ = 0; + } + + ~FixedColumn() override = default; + + SpanBase + span() const override { + return SpanBase(data_, row_count_, size_ / row_count_); + } + + private: + int64_t row_count_{}; +}; + +template +class VariableColumn : public ColumnBase { + public: + using ViewType = + std::conditional_t, std::string_view, T>; + + VariableColumn(int64_t segment_id, + const FieldMeta& field_meta, + const LoadFieldDataInfo& info) { auto begin = info.field_data->scalars().string_data().data().begin(); auto end = info.field_data->scalars().string_data().data().end(); @@ -53,44 +120,28 @@ class VariableField { construct_views(); } - VariableField(VariableField&& field) noexcept - : indices_(std::move(field.indices_)), - size_(field.size_), - data_(field.data_), - views_(std::move(field.views_)) { + VariableColumn(VariableColumn&& field) noexcept + : indices_(std::move(field.indices_)), views_(std::move(field.views_)) { + data_ = field.data(); + size_ = field.size(); field.data_ = nullptr; } - ~VariableField() { - if (data_ != MAP_FAILED && data_ != nullptr) { - if (munmap(data_, size_)) { - AssertInfo(true, - fmt::format("failed to unmap variable field, err={}", - strerror(errno))); - } - } + ~VariableColumn() override = default; + + SpanBase + span() const override { + return SpanBase(views_.data(), views_.size(), sizeof(ViewType)); } - char* - data() { - return data_; - } - - [[nodiscard]] const std::vector& + [[nodiscard]] const std::vector& views() const { return views_; } - [[nodiscard]] size_t - size() const { - return size_; - } - - Span + ViewType operator[](const int i) const { - uint64_t next = (i + 1 == indices_.size()) ? size_ : indices_[i + 1]; - uint64_t offset = indices_[i]; - return Span(data_ + offset, uint32_t(next - offset)); + return views_[i]; } protected: @@ -106,10 +157,8 @@ class VariableField { private: std::vector indices_{}; - uint64_t size_{0}; - char* data_{nullptr}; // Compatible with current Span type - std::vector views_{}; + std::vector views_{}; }; } // namespace milvus::segcore diff --git a/internal/core/src/common/Span.h b/internal/core/src/common/Span.h index 2b2ebc3ee7..4ab50fb99c 100644 --- a/internal/core/src/common/Span.h +++ b/internal/core/src/common/Span.h @@ -18,6 +18,7 @@ #include #include +#include #include #include "Types.h" @@ -68,6 +69,10 @@ class Span< : data_(data), row_count_(row_count) { } + explicit Span(std::string_view data) { + Span(data.data(), data.size()); + } + operator SpanBase() const { return SpanBase(data_, row_count_, sizeof(T)); } diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index 6dc6a6d286..dc4eb72eb9 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -15,10 +15,15 @@ #include #include +#include +#include +#include #include "Utils.h" +#include "common/Column.h" #include "common/Consts.h" #include "common/FieldMeta.h" +#include "common/Types.h" #include "query/ScalarIndex.h" #include "query/SearchBruteForce.h" #include "query/SearchOnSealed.h" @@ -225,19 +230,27 @@ SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& info) { AssertInfo(!get_bit(index_ready_bitset_, field_id), "field data can't be loaded when indexing exists"); - void* field_data = nullptr; size_t size = 0; if (datatype_is_variable(data_type)) { - VariableField field(get_segment_id(), field_meta, info); - size = field.size(); - field_data = reinterpret_cast(field.data()); + std::unique_ptr column{}; + switch (data_type) { + case milvus::DataType::STRING: + case milvus::DataType::VARCHAR: { + column = std::make_unique>( + get_segment_id(), field_meta, info); + break; + } + default: { + } + } + size = column->size(); std::unique_lock lck(mutex_); - variable_fields_.emplace(field_id, std::move(field)); + variable_fields_.emplace(field_id, std::move(column)); } else { - field_data = CreateMap(get_segment_id(), field_meta, info); - size = field_meta.get_sizeof() * info.row_count; + auto column = FixedColumn(get_segment_id(), field_meta, info); + size = column.size(); std::unique_lock lck(mutex_); - fixed_fields_[field_id] = field_data; + fixed_fields_.emplace(field_id, std::move(column)); } // set pks to offset @@ -333,15 +346,13 @@ SegmentSealedImpl::chunk_data_impl(FieldId field_id, int64_t chunk_id) const { auto& field_meta = schema_->operator[](field_id); auto element_sizeof = field_meta.get_sizeof(); if (auto it = fixed_fields_.find(field_id); it != fixed_fields_.end()) { - auto field_data = it->second; - return SpanBase(field_data, get_row_count(), element_sizeof); + auto& field_data = it->second; + return field_data.span(); } if (auto it = variable_fields_.find(field_id); it != variable_fields_.end()) { auto& field = it->second; - return SpanBase(field.views().data(), - field.views().size(), - sizeof(std::string_view)); + return field->span(); } auto field_data = insert_record_.get_field_data_base(field_id); AssertInfo(field_data->num_chunk() == 1, @@ -432,9 +443,9 @@ SegmentSealedImpl::vector_search(SearchInfo& search_info, "Field Data is not loaded: " + std::to_string(field_id.get())); AssertInfo(row_count_opt_.has_value(), "Can't get row count value"); auto row_count = row_count_opt_.value(); - auto vec_data = fixed_fields_.at(field_id); + auto& vec_data = fixed_fields_.at(field_id); query::SearchOnSealed(*schema_, - vec_data, + vec_data.data(), search_info, query_data, query_count, @@ -571,16 +582,16 @@ SegmentSealedImpl::bulk_subscript_impl(const void* src_raw, template void -SegmentSealedImpl::bulk_subscript_impl(const VariableField& field, +SegmentSealedImpl::bulk_subscript_impl(const ColumnBase* column, const int64_t* seg_offsets, int64_t count, void* dst_raw) { + auto field = reinterpret_cast*>(column); auto dst = reinterpret_cast(dst_raw); for (int64_t i = 0; i < count; ++i) { auto offset = seg_offsets[i]; if (offset != INVALID_SEG_OFFSET) { - auto entry = field[offset]; - dst[i] = std::move(T(entry.data(), entry.row_count())); + dst[i] = std::move(T((*field)[offset])); } } } @@ -647,10 +658,11 @@ SegmentSealedImpl::bulk_subscript(FieldId field_id, case DataType::VARCHAR: case DataType::STRING: { FixedVector output(count); - bulk_subscript_impl(variable_fields_.at(field_id), - seg_offsets, - count, - output.data()); + bulk_subscript_impl( + variable_fields_.at(field_id).get(), + seg_offsets, + count, + output.data()); return CreateScalarDataArrayFrom( output.data(), count, field_meta); } @@ -662,7 +674,7 @@ SegmentSealedImpl::bulk_subscript(FieldId field_id, } } - auto src_vec = fixed_fields_.at(field_id); + auto src_vec = fixed_fields_.at(field_id).data(); switch (field_meta.get_data_type()) { case DataType::BOOL: { FixedVector output(count); diff --git a/internal/core/src/segcore/SegmentSealedImpl.h b/internal/core/src/segcore/SegmentSealedImpl.h index 883f6ec7a5..965c7f053f 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.h +++ b/internal/core/src/segcore/SegmentSealedImpl.h @@ -28,7 +28,7 @@ #include "SealedIndexingRecord.h" #include "SegmentSealed.h" #include "TimestampIndex.h" -#include "VariableField.h" +#include "common/Column.h" #include "index/ScalarIndex.h" #include "sys/mman.h" @@ -37,18 +37,7 @@ namespace milvus::segcore { class SegmentSealedImpl : public SegmentSealed { public: explicit SegmentSealedImpl(SchemaPtr schema, int64_t segment_id); - ~SegmentSealedImpl() { - for (auto& [field_id, data] : fixed_fields_) { - auto field_meta = schema_->operator[](field_id); - auto data_type = field_meta.get_data_type(); - if (munmap(data, field_meta.get_sizeof() * get_row_count())) { - AssertInfo(true, - "failed to unmap field " + - std::to_string(field_id.get()) + - " err=" + strerror(errno)); - } - } - } + ~SegmentSealedImpl() override = default; void LoadIndex(const LoadIndexInfo& info) override; void @@ -151,7 +140,7 @@ class SegmentSealedImpl : public SegmentSealed { template static void - bulk_subscript_impl(const VariableField& field, + bulk_subscript_impl(const ColumnBase* field, const int64_t* seg_offsets, int64_t count, void* dst_raw); @@ -240,8 +229,8 @@ class SegmentSealedImpl : public SegmentSealed { SchemaPtr schema_; int64_t id_; - std::unordered_map fixed_fields_; - std::unordered_map variable_fields_; + std::unordered_map fixed_fields_; + std::unordered_map> variable_fields_; }; inline SegmentSealedPtr