From 5e06dc1732ad65f0fc88b85dbda8191a3e637a34 Mon Sep 17 00:00:00 2001 From: FluorineDog Date: Wed, 20 Jan 2021 17:33:31 +0800 Subject: [PATCH] Enable segment sealed, add support for loadXXX Signed-off-by: FluorineDog --- internal/core/src/common/LoadInfo.h | 2 +- internal/core/src/common/Span.h | 5 ++ internal/core/src/common/SystemProperty.cpp | 18 +++-- internal/core/src/common/SystemProperty.h | 6 ++ internal/core/src/segcore/CMakeLists.txt | 1 + .../core/src/segcore/SegmentGrowingImpl.cpp | 36 ---------- .../core/src/segcore/SegmentGrowingImpl.h | 42 ++++++++++- .../core/src/segcore/SegmentInterface.cpp | 42 +++++++++++ internal/core/src/segcore/SegmentInterface.h | 12 +++- .../core/src/segcore/SegmentSealedImpl.cpp | 71 ++++++++++++------- internal/core/src/segcore/SegmentSealedImpl.h | 50 +++++++++++-- internal/core/unittest/test_sealed.cpp | 65 ++++++++++++----- internal/core/unittest/test_utils/DataGen.h | 10 ++- 13 files changed, 268 insertions(+), 92 deletions(-) create mode 100644 internal/core/src/segcore/SegmentInterface.cpp diff --git a/internal/core/src/common/LoadInfo.h b/internal/core/src/common/LoadInfo.h index 7b5b0af166..ec6655af46 100644 --- a/internal/core/src/common/LoadInfo.h +++ b/internal/core/src/common/LoadInfo.h @@ -26,6 +26,6 @@ struct LoadIndexInfo { // NOTE: Refer to common/SystemProperty.cpp for details struct LoadFieldDataInfo { int64_t field_id; - void* blob = nullptr; + const void* blob = nullptr; int64_t row_count = -1; }; diff --git a/internal/core/src/common/Span.h b/internal/core/src/common/Span.h index 51398808c0..f0d34779d3 100644 --- a/internal/core/src/common/Span.h +++ b/internal/core/src/common/Span.h @@ -78,6 +78,11 @@ class Span>> { return data_; } + const T& + operator[](int64_t offset) const { + return data_[offset]; + } + int64_t row_count() const { return row_count_; diff --git a/internal/core/src/common/SystemProperty.cpp b/internal/core/src/common/SystemProperty.cpp index 04004acb2c..e6fc4e29f4 100644 --- a/internal/core/src/common/SystemProperty.cpp +++ b/internal/core/src/common/SystemProperty.cpp @@ -17,10 +17,10 @@ class SystemPropertyImpl : public SystemProperty { public: [[nodiscard]] bool SystemFieldVerify(const FieldName& field_name, FieldId field_id) const override { - if (!name_to_types_.count(field_name)) { + if (!IsSystem(field_name)) { return false; } - if (!id_to_types_.count(field_id)) { + if (!IsSystem(field_id)) { return false; } auto left_id = name_to_types_.at(field_name); @@ -30,16 +30,26 @@ class SystemPropertyImpl : public SystemProperty { SystemFieldType GetSystemFieldType(FieldName field_name) const override { - Assert(name_to_types_.count(field_name)); + Assert(IsSystem(field_name)); return name_to_types_.at(field_name); } SystemFieldType GetSystemFieldType(FieldId field_id) const override { - Assert(id_to_types_.count(field_id)); + Assert(IsSystem(field_id)); return id_to_types_.at(field_id); } + bool + IsSystem(FieldId field_id) const override { + return id_to_types_.count(field_id); + } + + bool + IsSystem(FieldName field_name) const override { + return name_to_types_.count(field_name); + } + friend const SystemProperty& SystemProperty::Instance(); diff --git a/internal/core/src/common/SystemProperty.h b/internal/core/src/common/SystemProperty.h index 8bd36daa69..4dd8d7b535 100644 --- a/internal/core/src/common/SystemProperty.h +++ b/internal/core/src/common/SystemProperty.h @@ -34,6 +34,12 @@ class SystemProperty { virtual SystemFieldType GetSystemFieldType(FieldName field_name) const = 0; + + virtual bool + IsSystem(FieldId field_id) const = 0; + + virtual bool + IsSystem(FieldName field_name) const = 0; }; } // namespace milvus diff --git a/internal/core/src/segcore/CMakeLists.txt b/internal/core/src/segcore/CMakeLists.txt index e51eb17b54..506cfb9b80 100644 --- a/internal/core/src/segcore/CMakeLists.txt +++ b/internal/core/src/segcore/CMakeLists.txt @@ -13,6 +13,7 @@ set(SEGCORE_FILES reduce_c.cpp load_index_c.cpp SealedIndexingRecord.cpp + SegmentInterface.cpp ) add_library(milvus_segcore SHARED ${SEGCORE_FILES} diff --git a/internal/core/src/segcore/SegmentGrowingImpl.cpp b/internal/core/src/segcore/SegmentGrowingImpl.cpp index 51f692cd2f..3044de2142 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.cpp +++ b/internal/core/src/segcore/SegmentGrowingImpl.cpp @@ -248,42 +248,6 @@ SegmentGrowingImpl::Search(const query::Plan* plan, return results; } -void -SegmentGrowingImpl::FillTargetEntry(const query::Plan* plan, QueryResult& results) const { - AssertInfo(plan, "empty plan"); - auto size = results.result_distances_.size(); - Assert(results.internal_seg_offsets_.size() == size); - Assert(results.result_offsets_.size() == size); - Assert(results.row_data_.size() == 0); - - if (plan->schema_.get_is_auto_id()) { - auto& uids = record_.uids_; - for (int64_t i = 0; i < size; ++i) { - auto seg_offset = results.internal_seg_offsets_[i]; - auto row_id = seg_offset == -1 ? -1 : uids[seg_offset]; - - std::vector blob(sizeof(row_id)); - memcpy(blob.data(), &row_id, sizeof(row_id)); - results.row_data_.emplace_back(std::move(blob)); - } - } else { - auto key_offset_opt = schema_->get_primary_key_offset(); - Assert(key_offset_opt.has_value()); - auto key_offset = key_offset_opt.value(); - auto& field_meta = schema_->operator[](key_offset); - Assert(field_meta.get_data_type() == DataType::INT64); - auto uids = record_.get_entity(key_offset); - for (int64_t i = 0; i < size; ++i) { - auto seg_offset = results.internal_seg_offsets_[i]; - auto row_id = seg_offset == -1 ? -1 : uids->operator[](seg_offset); - - std::vector blob(sizeof(row_id)); - memcpy(blob.data(), &row_id, sizeof(row_id)); - results.row_data_.emplace_back(std::move(blob)); - } - } -} - Status SegmentGrowingImpl::LoadIndexing(const LoadIndexInfo& info) { auto field_offset = schema_->get_offset(FieldName(info.field_name)); diff --git a/internal/core/src/segcore/SegmentGrowingImpl.h b/internal/core/src/segcore/SegmentGrowingImpl.h index d7dec5f523..e779cac177 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.h +++ b/internal/core/src/segcore/SegmentGrowingImpl.h @@ -125,6 +125,45 @@ class SegmentGrowingImpl : public SegmentGrowing { return 0; } + template + void + bulk_subscript_impl(const VectorBase& vec_raw, const int64_t* seg_offsets, int64_t count, void* output_raw) const { + static_assert(IsScalar); + auto vec_ptr = dynamic_cast*>(&vec_raw); + Assert(vec_ptr); + auto& vec = *vec_ptr; + auto output = reinterpret_cast(output_raw); + for (int64_t i = 0; i < count; ++i) { + auto offset = seg_offsets[i]; + output[i] = offset == -1 ? -1 : vec[offset]; + } + } + + void + bulk_subscript(SystemFieldType system_type, + const int64_t* seg_offsets, + int64_t count, + void* output) const override { + switch (system_type) { + case SystemFieldType::Timestamp: + PanicInfo("timestamp unsupported"); + case SystemFieldType::RowId: + bulk_subscript_impl(this->record_.uids_, seg_offsets, count, output); + break; + default: + PanicInfo("unknown subscript fields"); + } + } + + void + bulk_subscript(FieldOffset field_offset, const int64_t* seg_offsets, int64_t count, void* output) const override { + // TODO: support more types + auto vec_ptr = record_.get_base_entity(field_offset); + auto data_type = schema_->operator[](field_offset).get_data_type(); + Assert(data_type == DataType::INT64); + bulk_subscript_impl(*vec_ptr, seg_offsets, count, output); + } + int64_t num_chunk_data() const override; @@ -146,9 +185,6 @@ class SegmentGrowingImpl : public SegmentGrowing { std::shared_ptr get_deleted_bitmap(int64_t del_barrier, Timestamp query_timestamp, int64_t insert_barrier, bool force = false); - void - FillTargetEntry(const query::Plan* Plan, QueryResult& results) const override; - protected: SpanBase chunk_data_impl(FieldOffset field_offset, int64_t chunk_id) const override; diff --git a/internal/core/src/segcore/SegmentInterface.cpp b/internal/core/src/segcore/SegmentInterface.cpp new file mode 100644 index 0000000000..e6d4303b5f --- /dev/null +++ b/internal/core/src/segcore/SegmentInterface.cpp @@ -0,0 +1,42 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#include "segcore/SegmentInterface.h" +namespace milvus::segcore { +class Naive; + +void +SegmentInterface::FillTargetEntry(const query::Plan* plan, QueryResult& results) const { + AssertInfo(plan, "empty plan"); + auto size = results.result_distances_.size(); + Assert(results.internal_seg_offsets_.size() == size); + Assert(results.result_offsets_.size() == size); + Assert(results.row_data_.size() == 0); + + std::vector target(size); + if (plan->schema_.get_is_auto_id()) { + // use row_id + bulk_subscript(SystemFieldType::RowId, results.internal_seg_offsets_.data(), size, target.data()); + } else { + auto key_offset_opt = get_schema().get_primary_key_offset(); + Assert(key_offset_opt.has_value()); + auto key_offset = key_offset_opt.value(); + bulk_subscript(key_offset, results.internal_seg_offsets_.data(), size, target.data()); + } + + for (int64_t i = 0; i < size; ++i) { + auto row_id = target[i]; + std::vector blob(sizeof(row_id)); + memcpy(blob.data(), &row_id, sizeof(row_id)); + results.row_data_.emplace_back(std::move(blob)); + } +} +} // namespace milvus::segcore diff --git a/internal/core/src/segcore/SegmentInterface.h b/internal/core/src/segcore/SegmentInterface.h index 39a5327483..08f18b4b45 100644 --- a/internal/core/src/segcore/SegmentInterface.h +++ b/internal/core/src/segcore/SegmentInterface.h @@ -16,13 +16,14 @@ #include "common/Span.h" #include "IndexingEntry.h" #include +#include "common/SystemProperty.h" namespace milvus::segcore { class SegmentInterface { public: - virtual void - FillTargetEntry(const query::Plan* Plan, QueryResult& results) const = 0; + void + FillTargetEntry(const query::Plan* plan, QueryResult& results) const; virtual QueryResult Search(const query::Plan* Plan, @@ -40,6 +41,13 @@ class SegmentInterface { get_schema() const = 0; virtual ~SegmentInterface() = default; + + protected: + virtual void + bulk_subscript(SystemFieldType system_type, const int64_t* seg_offsets, int64_t count, void* output) const = 0; + + virtual void + bulk_subscript(FieldOffset field_offset, const int64_t* seg_offsets, int64_t count, void* output) const = 0; }; // internal API for DSL calculation diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index 35fd8fa302..caa1670763 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -37,22 +37,42 @@ SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& info) { // TODO Assert(info.row_count > 0); auto field_id = FieldId(info.field_id); - auto field_offset = schema_->get_offset(field_id); - auto& field_meta = schema_->operator[](field_offset); - Assert(!field_meta.is_vector()); - auto element_sizeof = field_meta.get_sizeof(); - auto length_in_bytes = element_sizeof * info.row_count; - aligned_vector vecdata(length_in_bytes); - memcpy(vecdata.data(), info.blob, length_in_bytes); - std::unique_lock lck(mutex_); - if (row_count_opt_.has_value()) { - AssertInfo(row_count_opt_.value() == info.row_count, "load data has different row count from other columns"); + Assert(info.blob); + Assert(info.row_count > 0); + if (SystemProperty::Instance().IsSystem(field_id)) { + auto system_field_type = SystemProperty::Instance().GetSystemFieldType(field_id); + Assert(system_field_type == SystemFieldType::RowId); + auto src_ptr = reinterpret_cast(info.blob); + + // prepare data + aligned_vector vec_data(info.row_count); + std::copy_n(src_ptr, info.row_count, vec_data.data()); + + // write data under lock + std::unique_lock lck(mutex_); + update_row_count(info.row_count); + AssertInfo(row_ids_.empty(), "already exists"); + row_ids_ = std::move(vec_data); + + ++ready_count_; } else { - row_count_opt_ = info.row_count; + // prepare data + auto field_offset = schema_->get_offset(field_id); + auto& field_meta = schema_->operator[](field_offset); + Assert(!field_meta.is_vector()); + auto element_sizeof = field_meta.get_sizeof(); + auto length_in_bytes = element_sizeof * info.row_count; + aligned_vector vec_data(length_in_bytes); + memcpy(vec_data.data(), info.blob, length_in_bytes); + + // write data under lock + std::unique_lock lck(mutex_); + update_row_count(info.row_count); + AssertInfo(columns_data_[field_offset.get()].empty(), "already exists"); + columns_data_[field_offset.get()] = std::move(vec_data); + + ++ready_count_; } - AssertInfo(columns_data_[field_offset.get()].empty(), "already exists"); - columns_data_[field_offset.get()] = std::move(vecdata); - ++ready_count_; } int64_t @@ -63,27 +83,28 @@ SegmentSealedImpl::num_chunk_index_safe(FieldOffset field_offset) const { int64_t SegmentSealedImpl::num_chunk_data() const { - PanicInfo("unimplemented"); + return 1; } int64_t SegmentSealedImpl::size_per_chunk() const { - PanicInfo("unimplemented"); + return get_row_count(); } SpanBase SegmentSealedImpl::chunk_data_impl(FieldOffset field_offset, int64_t chunk_id) const { - PanicInfo("unimplemented"); + std::shared_lock lck(mutex_); + auto& field_meta = schema_->operator[](field_offset); + auto element_sizeof = field_meta.get_sizeof(); + Assert(is_all_ready()); + SpanBase base(columns_data_[field_offset.get()].data(), row_count_opt_.value(), element_sizeof); + return base; } const knowhere::Index* SegmentSealedImpl::chunk_index_impl(FieldOffset field_offset, int64_t chunk_id) const { - PanicInfo("unimplemented"); -} - -void -SegmentSealedImpl::FillTargetEntry(const query::Plan* Plan, QueryResult& results) const { - PanicInfo("unimplemented"); + // TODO: support scalar index + return nullptr; } QueryResult @@ -96,7 +117,9 @@ SegmentSealedImpl::Search(const query::Plan* Plan, int64_t SegmentSealedImpl::GetMemoryUsageInBytes() const { - PanicInfo("unimplemented"); + // TODO: add estimate for index + auto row_count = row_count_opt_.value_or(0); + return schema_->get_total_sizeof() * row_count; } int64_t diff --git a/internal/core/src/segcore/SegmentSealedImpl.h b/internal/core/src/segcore/SegmentSealedImpl.h index 15a288e1eb..ce7989f18a 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.h +++ b/internal/core/src/segcore/SegmentSealedImpl.h @@ -26,9 +26,6 @@ class SegmentSealedImpl : public SegmentSealed { LoadFieldData(const LoadFieldDataInfo& info) override; public: - void - FillTargetEntry(const query::Plan* Plan, QueryResult& results) const override; - QueryResult Search(const query::Plan* Plan, const query::PlaceholderGroup* placeholder_groups[], @@ -63,10 +60,55 @@ class SegmentSealedImpl : public SegmentSealed { const knowhere::Index* chunk_index_impl(FieldOffset field_offset, int64_t chunk_id) const override; + // Calculate: output[i] = Vec[seg_offset[i]], + // where Vec is determined from field_offset + void + bulk_subscript(SystemFieldType system_type, + const int64_t* seg_offsets, + int64_t count, + void* output) const override { + Assert(is_all_ready()); + Assert(system_type == SystemFieldType::RowId); + bulk_subscript_impl(row_ids_.data(), seg_offsets, count, output); + } + + // Calculate: output[i] = Vec[seg_offset[i]] + // where Vec is determined from field_offset + void + bulk_subscript(FieldOffset field_offset, const int64_t* seg_offsets, int64_t count, void* output) const override { + Assert(is_all_ready()); + auto& field_meta = schema_->operator[](field_offset); + Assert(field_meta.get_data_type() == DataType::INT64); + bulk_subscript_impl(columns_data_[field_offset.get()].data(), seg_offsets, count, output); + } + private: + template + static void + bulk_subscript_impl(const void* src_raw, const int64_t* seg_offsets, int64_t count, void* dst_raw) { + static_assert(IsScalar); + auto src = reinterpret_cast(src_raw); + auto dst = reinterpret_cast(dst_raw); + for (int64_t i = 0; i < count; ++i) { + auto offset = seg_offsets[i]; + dst[i] = offset == -1 ? -1 : src[offset]; + } + } + + void + update_row_count(int64_t row_count) { + if (row_count_opt_.has_value()) { + AssertInfo(row_count_opt_.value() == row_count, "load data has different row count from other columns"); + } else { + row_count_opt_ = row_count; + } + } + bool is_all_ready() const { - return ready_count_ == schema_->size(); + // TODO: optimize here + // NOTE: including row_ids + return ready_count_ == schema_->size() + 1; } mutable std::shared_mutex mutex_; diff --git a/internal/core/unittest/test_sealed.cpp b/internal/core/unittest/test_sealed.cpp index 1ab497b4ab..671deb9084 100644 --- a/internal/core/unittest/test_sealed.cpp +++ b/internal/core/unittest/test_sealed.cpp @@ -216,6 +216,32 @@ TEST(Sealed, with_predicate) { } } +void +SealedLoader(const GeneratedData& dataset, SegmentSealed& seg) { + // TODO + auto row_count = dataset.row_ids_.size(); + { + LoadFieldDataInfo info; + info.blob = dataset.row_ids_.data(); + info.row_count = dataset.row_ids_.size(); + info.field_id = 0; // field id for RowId + seg.LoadFieldData(info); + } + int field_offset = 0; + for (auto& meta : seg.get_schema().get_fields()) { + if (meta.is_vector()) { + ++field_offset; + continue; + } + LoadFieldDataInfo info; + info.field_id = meta.get_id().get(); + info.row_count = row_count; + info.blob = dataset.cols_[field_offset].data(); + seg.LoadFieldData(info); + ++field_offset; + } +} + TEST(Sealed, LoadFieldData) { auto dim = 16; auto topK = 5; @@ -223,14 +249,13 @@ TEST(Sealed, LoadFieldData) { auto metric_type = MetricType::METRIC_L2; auto schema = std::make_shared(); auto fakevec_id = schema->AddDebugField("fakevec", DataType::VECTOR_FLOAT, dim, metric_type); - auto counter_id = schema->AddDebugField("counter", DataType::INT64); + schema->AddDebugField("counter", DataType::INT64); + schema->AddDebugField("double", DataType::DOUBLE); + auto dataset = DataGen(schema, N); auto fakevec = dataset.get_col(0); - auto counter = dataset.get_col(1); - auto indexing = std::make_shared(); - auto conf = knowhere::Config{{knowhere::meta::DIM, dim}, {knowhere::meta::TOPK, topK}, {knowhere::IndexParams::nlist, 100}, @@ -238,21 +263,27 @@ TEST(Sealed, LoadFieldData) { {knowhere::Metric::TYPE, milvus::knowhere::Metric::L2}, {knowhere::meta::DEVICEID, 0}}; auto database = knowhere::GenDataset(N, dim, fakevec.data()); + auto indexing = std::make_shared(); indexing->Train(database, conf); indexing->AddWithoutIds(database, conf); auto segment = CreateSealedSegment(schema); - LoadFieldDataInfo field_info; - field_info.field_id = counter_id.get(); - field_info.row_count = N; - field_info.blob = counter.data(); - segment->LoadFieldData(field_info); - - LoadIndexInfo vec_info; - vec_info.field_id = fakevec_id.get(); - vec_info.field_name = "fakevec"; - vec_info.index = indexing; - vec_info.index_params["metric_type"] = milvus::knowhere::Metric::L2; - segment->LoadIndex(vec_info); - int i = 1 + 1; + SealedLoader(dataset, *segment); + { + LoadIndexInfo vec_info; + vec_info.field_id = fakevec_id.get(); + vec_info.field_name = "fakevec"; + vec_info.index = indexing; + vec_info.index_params["metric_type"] = milvus::knowhere::Metric::L2; + segment->LoadIndex(vec_info); + } + ASSERT_EQ(segment->num_chunk_data(), 1); + auto chunk_span1 = segment->chunk_data(FieldOffset(1), 0); + auto chunk_span2 = segment->chunk_data(FieldOffset(2), 0); + auto ref1 = dataset.get_col(1); + auto ref2 = dataset.get_col(2); + for (int i = 0; i < N; ++i) { + ASSERT_EQ(chunk_span1[i], ref1[i]); + ASSERT_EQ(chunk_span2[i], ref2[i]); + } } \ No newline at end of file diff --git a/internal/core/unittest/test_utils/DataGen.h b/internal/core/unittest/test_utils/DataGen.h index dc89559d3f..08a4f91f30 100644 --- a/internal/core/unittest/test_utils/DataGen.h +++ b/internal/core/unittest/test_utils/DataGen.h @@ -29,7 +29,7 @@ struct GeneratedData { RowBasedRawData raw_; template auto - get_col(int index) { + get_col(int index) const { auto& target = cols_.at(index); std::vector ret(target.size() / sizeof(T)); memcpy(ret.data(), target.data(), target.size()); @@ -158,6 +158,14 @@ DataGen(SchemaPtr schema, int64_t N, uint64_t seed = 42) { insert_cols(data); break; } + case engine::DataType::DOUBLE: { + vector data(N); + for (auto& x : data) { + x = distr(er); + } + insert_cols(data); + break; + } default: { throw std::runtime_error("unimplemented"); }