diff --git a/internal/core/src/query/ScalarIndex.h b/internal/core/src/query/ScalarIndex.h new file mode 100644 index 0000000000..ae12080171 --- /dev/null +++ b/internal/core/src/query/ScalarIndex.h @@ -0,0 +1,51 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#pragma once +#include "knowhere/index/structured_index_simple/StructuredIndexSort.h" +#include "common/Span.h" +#include "common/FieldMeta.h" +#include + +namespace milvus::query { + +template +inline std::unique_ptr> +generate_scalar_index(Span data) { + auto indexing = std::make_unique>(); + indexing->Build(data.row_count(), data.data()); + return indexing; +} + +inline std::unique_ptr +generate_scalar_index(SpanBase data, DataType data_type) { + Assert(!datatype_is_vector(data_type)); + switch (data_type) { + case DataType::BOOL: + return generate_scalar_index(Span(data)); + case DataType::INT8: + return generate_scalar_index(Span(data)); + case DataType::INT16: + return generate_scalar_index(Span(data)); + case DataType::INT32: + return generate_scalar_index(Span(data)); + case DataType::INT64: + return generate_scalar_index(Span(data)); + case DataType::FLOAT: + return generate_scalar_index(Span(data)); + case DataType::DOUBLE: + return generate_scalar_index(Span(data)); + default: + PanicInfo("unsupported type"); + } +} + +} // namespace milvus::query diff --git a/internal/core/src/query/SearchBruteForce.cpp b/internal/core/src/query/SearchBruteForce.cpp index 65bb5bb900..6de688f942 100644 --- a/internal/core/src/query/SearchBruteForce.cpp +++ b/internal/core/src/query/SearchBruteForce.cpp @@ -99,8 +99,8 @@ BinarySearchBruteForceFast(MetricType metric_type, } SubQueryResult -FloatSearchBruteForce(const dataset::FloatQueryDataset& query_dataset, - const float* chunk_data, +FloatSearchBruteForce(const dataset::QueryDataset& query_dataset, + const void* chunk_data_raw, int64_t size_per_chunk, const faiss::BitsetView& bitset) { auto metric_type = query_dataset.metric_type; @@ -108,25 +108,29 @@ FloatSearchBruteForce(const dataset::FloatQueryDataset& query_dataset, auto topk = query_dataset.topk; auto dim = query_dataset.dim; SubQueryResult sub_qr(num_queries, topk, metric_type); + auto query_data = reinterpret_cast(query_dataset.query_data); + auto chunk_data = reinterpret_cast(chunk_data_raw); if (metric_type == MetricType::METRIC_L2) { faiss::float_maxheap_array_t buf{(size_t)num_queries, (size_t)topk, sub_qr.get_labels(), sub_qr.get_values()}; - faiss::knn_L2sqr(query_dataset.query_data, chunk_data, dim, num_queries, size_per_chunk, &buf, bitset); + faiss::knn_L2sqr(query_data, chunk_data, dim, num_queries, size_per_chunk, &buf, bitset); return sub_qr; } else { faiss::float_minheap_array_t buf{(size_t)num_queries, (size_t)topk, sub_qr.get_labels(), sub_qr.get_values()}; - faiss::knn_inner_product(query_dataset.query_data, chunk_data, dim, num_queries, size_per_chunk, &buf, bitset); + faiss::knn_inner_product(query_data, chunk_data, dim, num_queries, size_per_chunk, &buf, bitset); return sub_qr; } } SubQueryResult -BinarySearchBruteForce(const dataset::BinaryQueryDataset& query_dataset, - const uint8_t* binary_chunk, +BinarySearchBruteForce(const dataset::QueryDataset& query_dataset, + const void* chunk_data_raw, int64_t size_per_chunk, const faiss::BitsetView& bitset) { // TODO: refactor the internal function - return BinarySearchBruteForceFast(query_dataset.metric_type, query_dataset.dim, binary_chunk, size_per_chunk, - query_dataset.topk, query_dataset.num_queries, query_dataset.query_data, bitset); + auto query_data = reinterpret_cast(query_dataset.query_data); + auto chunk_data = reinterpret_cast(chunk_data_raw); + return BinarySearchBruteForceFast(query_dataset.metric_type, query_dataset.dim, chunk_data, size_per_chunk, + query_dataset.topk, query_dataset.num_queries, query_data, bitset); } } // namespace milvus::query diff --git a/internal/core/src/query/SearchBruteForce.h b/internal/core/src/query/SearchBruteForce.h index f544ed8540..d384a9cfab 100644 --- a/internal/core/src/query/SearchBruteForce.h +++ b/internal/core/src/query/SearchBruteForce.h @@ -19,14 +19,14 @@ namespace milvus::query { SubQueryResult -BinarySearchBruteForce(const dataset::BinaryQueryDataset& query_dataset, - const uint8_t* binary_chunk, +BinarySearchBruteForce(const dataset::QueryDataset& query_dataset, + const void* chunk_data_raw, int64_t size_per_chunk, const faiss::BitsetView& bitset); SubQueryResult -FloatSearchBruteForce(const dataset::FloatQueryDataset& query_dataset, - const float* chunk_data, +FloatSearchBruteForce(const dataset::QueryDataset& query_dataset, + const void* chunk_data_raw, int64_t size_per_chunk, const faiss::BitsetView& bitset); diff --git a/internal/core/src/query/SearchOnGrowing.cpp b/internal/core/src/query/SearchOnGrowing.cpp index 6c0b616274..0e15fc107b 100644 --- a/internal/core/src/query/SearchOnGrowing.cpp +++ b/internal/core/src/query/SearchOnGrowing.cpp @@ -69,7 +69,7 @@ FloatSearch(const segcore::SegmentGrowingImpl& segment, // std::vector final_uids(total_count, -1); // std::vector final_dis(total_count, std::numeric_limits::max()); SubQueryResult final_qr(num_queries, topK, metric_type); - dataset::FloatQueryDataset query_dataset{metric_type, num_queries, topK, dim, query_data}; + dataset::QueryDataset query_dataset{metric_type, num_queries, topK, dim, query_data}; auto max_indexed_id = indexing_record.get_finished_ack(); const auto& field_indexing = indexing_record.get_vec_field_indexing(vecfield_offset); @@ -158,7 +158,7 @@ BinarySearch(const segcore::SegmentGrowingImpl& segment, auto total_count = topK * num_queries; // step 3: small indexing search - query::dataset::BinaryQueryDataset query_dataset{metric_type, num_queries, topK, dim, query_data}; + query::dataset::QueryDataset query_dataset{metric_type, num_queries, topK, dim, query_data}; auto vec_ptr = record.get_field_data(vecfield_offset); diff --git a/internal/core/src/query/SearchOnIndex.cpp b/internal/core/src/query/SearchOnIndex.cpp index db3a2a49c1..93ed0bcf11 100644 --- a/internal/core/src/query/SearchOnIndex.cpp +++ b/internal/core/src/query/SearchOnIndex.cpp @@ -12,7 +12,7 @@ #include "SearchOnIndex.h" namespace milvus::query { SubQueryResult -SearchOnIndex(const dataset::FloatQueryDataset& query_dataset, +SearchOnIndex(const dataset::QueryDataset& query_dataset, const knowhere::VecIndex& indexing, const knowhere::Config& search_conf, const faiss::BitsetView& bitset) { diff --git a/internal/core/src/query/SearchOnIndex.h b/internal/core/src/query/SearchOnIndex.h index d46c60c4ee..8fc1c5f5c8 100644 --- a/internal/core/src/query/SearchOnIndex.h +++ b/internal/core/src/query/SearchOnIndex.h @@ -19,7 +19,7 @@ namespace milvus::query { SubQueryResult -SearchOnIndex(const dataset::FloatQueryDataset& query_dataset, +SearchOnIndex(const dataset::QueryDataset& query_dataset, const knowhere::VecIndex& indexing, const knowhere::Config& search_conf, const faiss::BitsetView& bitset); diff --git a/internal/core/src/query/helper.h b/internal/core/src/query/helper.h index 6df3efe572..69dc250074 100644 --- a/internal/core/src/query/helper.h +++ b/internal/core/src/query/helper.h @@ -15,20 +15,12 @@ namespace milvus::query { namespace dataset { -struct FloatQueryDataset { +struct QueryDataset { MetricType metric_type; int64_t num_queries; int64_t topk; int64_t dim; - const float* query_data; -}; - -struct BinaryQueryDataset { - MetricType metric_type; - int64_t num_queries; - int64_t topk; - int64_t dim; - const uint8_t* query_data; + const void* query_data; }; } // namespace dataset diff --git a/internal/core/src/segcore/ConcurrentVector.h b/internal/core/src/segcore/ConcurrentVector.h index bb8a854533..2dd03b78d2 100644 --- a/internal/core/src/segcore/ConcurrentVector.h +++ b/internal/core/src/segcore/ConcurrentVector.h @@ -80,7 +80,7 @@ class VectorBase { grow_to_at_least(int64_t element_count) = 0; virtual void - set_data_raw(ssize_t element_offset, void* source, ssize_t element_count) = 0; + set_data_raw(ssize_t element_offset, const void* source, ssize_t element_count) = 0; virtual SpanBase get_span_base(int64_t chunk_id) const = 0; @@ -142,7 +142,7 @@ class ConcurrentVectorImpl : public VectorBase { } void - set_data_raw(ssize_t element_offset, void* source, ssize_t element_count) override { + set_data_raw(ssize_t element_offset, const void* source, ssize_t element_count) override { set_data(element_offset, static_cast(source), element_count); } diff --git a/internal/core/src/segcore/SegmentGrowing.h b/internal/core/src/segcore/SegmentGrowing.h index fa8307c88e..02701ed938 100644 --- a/internal/core/src/segcore/SegmentGrowing.h +++ b/internal/core/src/segcore/SegmentGrowing.h @@ -32,6 +32,11 @@ struct RowBasedRawData { int64_t count; }; +struct ColumnBasedRawData { + std::vector> columns_; + int64_t count; +}; + int TestABI(); @@ -55,6 +60,13 @@ class SegmentGrowing : public SegmentInternalInterface { const Timestamp* timestamps, const RowBasedRawData& values) = 0; + virtual void + Insert(int64_t reserved_offset, + int64_t size, + const int64_t* row_ids, + const Timestamp* timestamps, + const ColumnBasedRawData& values) = 0; + virtual int64_t PreDelete(int64_t size) = 0; diff --git a/internal/core/src/segcore/SegmentGrowingImpl.cpp b/internal/core/src/segcore/SegmentGrowingImpl.cpp index d118c8804c..e701e461d4 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.cpp +++ b/internal/core/src/segcore/SegmentGrowingImpl.cpp @@ -142,7 +142,7 @@ SegmentGrowingImpl::Insert(int64_t reserved_begin, auto sizeof_infos = schema_->get_sizeof_infos(); std::vector offset_infos(schema_->size() + 1, 0); std::partial_sum(sizeof_infos.begin(), sizeof_infos.end(), offset_infos.begin() + 1); - std::vector> entities(schema_->size()); + std::vector> entities(schema_->size()); for (int fid = 0; fid < schema_->size(); ++fid) { auto len = sizeof_infos[fid]; @@ -165,23 +165,32 @@ SegmentGrowingImpl::Insert(int64_t reserved_begin, } } + do_insert(reserved_begin, size, uids.data(), timestamps.data(), entities); + return Status::OK(); +} + +void +SegmentGrowingImpl::do_insert(int64_t reserved_begin, + int64_t size, + const idx_t* row_ids, + const Timestamp* timestamps, + const std::vector>& columns_data) { // step 4: fill into Segment.ConcurrentVector - record_.timestamps_.set_data(reserved_begin, timestamps.data(), size); - record_.uids_.set_data(reserved_begin, uids.data(), size); + record_.timestamps_.set_data(reserved_begin, timestamps, size); + record_.uids_.set_data(reserved_begin, row_ids, size); for (int fid = 0; fid < schema_->size(); ++fid) { auto field_offset = FieldOffset(fid); - record_.get_field_data_base(field_offset)->set_data_raw(reserved_begin, entities[fid].data(), size); + record_.get_field_data_base(field_offset)->set_data_raw(reserved_begin, columns_data[fid].data(), size); } - for (int i = 0; i < uids.size(); ++i) { - auto uid = uids[i]; + for (int i = 0; i < size; ++i) { + auto row_id = row_ids[i]; // NOTE: this must be the last step, cannot be put above - uid2offset_.insert(std::make_pair(uid, reserved_begin + i)); + uid2offset_.insert(std::make_pair(row_id, reserved_begin + i)); } record_.ack_responder_.AddSegment(reserved_begin, reserved_begin + size); indexing_record_.UpdateResourceAck(record_.ack_responder_.GetAck() / size_per_chunk_, record_); - return Status::OK(); } Status @@ -274,5 +283,167 @@ SegmentGrowingImpl::vector_search(int64_t vec_count, SearchOnGrowing(*this, vec_count, query_info, query_data, query_count, bitset, output); } } +void +SegmentGrowingImpl::bulk_subscript(FieldOffset field_offset, + const int64_t* seg_offsets, + int64_t count, + void* output) const { + // TODO: support more types + auto vec_ptr = record_.get_field_data_base(field_offset); + auto& field_meta = schema_->operator[](field_offset); + if (field_meta.is_vector()) { + if (field_meta.get_data_type() == DataType::VECTOR_FLOAT) { + bulk_subscript_impl(field_meta.get_sizeof(), *vec_ptr, seg_offsets, count, output); + } else if (field_meta.get_data_type() == DataType::VECTOR_BINARY) { + bulk_subscript_impl(field_meta.get_sizeof(), *vec_ptr, seg_offsets, count, output); + } else { + PanicInfo("logical error"); + } + return; + } + + Assert(!field_meta.is_vector()); + switch (field_meta.get_data_type()) { + case DataType::BOOL: { + bulk_subscript_impl(*vec_ptr, seg_offsets, count, false, output); + break; + } + case DataType::INT8: { + bulk_subscript_impl(*vec_ptr, seg_offsets, count, 0, output); + break; + } + + case DataType::INT16: { + bulk_subscript_impl(*vec_ptr, seg_offsets, count, 0, output); + break; + } + + case DataType::INT32: { + bulk_subscript_impl(*vec_ptr, seg_offsets, count, 0, output); + break; + } + + case DataType::INT64: { + bulk_subscript_impl(*vec_ptr, seg_offsets, count, 0, output); + break; + } + + case DataType::FLOAT: { + bulk_subscript_impl(*vec_ptr, seg_offsets, count, 0, output); + break; + } + + case DataType::DOUBLE: { + bulk_subscript_impl(*vec_ptr, seg_offsets, count, 0, output); + break; + } + + default: { + PanicInfo("unsupported type"); + } + } +} + +template +void +SegmentGrowingImpl::bulk_subscript_impl(int64_t element_sizeof, + const VectorBase& vec_raw, + const int64_t* seg_offsets, + int64_t count, + void* output_raw) const { + static_assert(IsVector); + auto vec_ptr = dynamic_cast*>(&vec_raw); + Assert(vec_ptr); + auto& vec = *vec_ptr; + std::vector empty(element_sizeof, 0); + auto output_base = reinterpret_cast(output_raw); + for (int i = 0; i < count; ++i) { + auto dst = output_base + i * element_sizeof; + auto offset = seg_offsets[i]; + const uint8_t* src = offset == -1 ? empty.data() : (const uint8_t*)vec.get_element(offset); + memcpy(dst, src, element_sizeof); + } +} + +template +void +SegmentGrowingImpl::bulk_subscript_impl( + const VectorBase& vec_raw, const int64_t* seg_offsets, int64_t count, T default_value, void* output_raw) const { + static_assert(IsScalar); + auto vec_ptr = dynamic_cast*>(&vec_raw); + Assert(vec_ptr); + auto& vec = *vec_ptr; + auto output = reinterpret_cast(output_raw); + for (int64_t i = 0; i < count; ++i) { + auto offset = seg_offsets[i]; + output[i] = offset == -1 ? default_value : vec[offset]; + } +} + +void +SegmentGrowingImpl::bulk_subscript(SystemFieldType system_type, + const int64_t* seg_offsets, + int64_t count, + void* output) const { + switch (system_type) { + case SystemFieldType::Timestamp: + PanicInfo("timestamp unsupported"); + case SystemFieldType::RowId: + bulk_subscript_impl(this->record_.uids_, seg_offsets, count, -1, output); + break; + default: + PanicInfo("unknown subscript fields"); + } +} + +// copied from stack overflow +template +std::vector +sort_indexes(const T* src, int64_t size) { + // initialize original index locations + std::vector idx(size); + iota(idx.begin(), idx.end(), 0); + + // sort indexes based on comparing values in v + // using std::stable_sort instead of std::sort + // to avoid unnecessary index re-orderings + // when v contains elements of equal values + std::stable_sort(idx.begin(), idx.end(), [src](size_t i1, size_t i2) { return src[i1] < src[i2]; }); + + return idx; +} + +void +SegmentGrowingImpl::Insert(int64_t reserved_offset, + int64_t size, + const int64_t* row_ids_raw, + const Timestamp* timestamps_raw, + const ColumnBasedRawData& values) { + auto indexes = sort_indexes(timestamps_raw, size); + std::vector timestamps(size); + std::vector row_ids(size); + Assert(values.count == size); + for (int64_t i = 0; i < size; ++i) { + auto offset = indexes[i]; + timestamps[i] = timestamps_raw[offset]; + row_ids[i] = row_ids_raw[i]; + } + std::vector> columns_data; + + for (int field_offset = 0; field_offset < schema_->size(); ++field_offset) { + auto& field_meta = schema_->operator[](FieldOffset(field_offset)); + aligned_vector column; + auto element_sizeof = field_meta.get_sizeof(); + auto& src_vec = values.columns_[field_offset]; + Assert(src_vec.size() == element_sizeof * size); + for (int64_t i = 0; i < size; ++i) { + auto offset = indexes[i]; + auto beg = src_vec.data() + offset * element_sizeof; + column.insert(column.end(), beg, beg + element_sizeof); + } + columns_data.emplace_back(std::move(column)); + } + do_insert(reserved_offset, size, row_ids.data(), timestamps.data(), columns_data); +} } // namespace milvus::segcore diff --git a/internal/core/src/segcore/SegmentGrowingImpl.h b/internal/core/src/segcore/SegmentGrowingImpl.h index 54c3bb0093..bf75d6bb08 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.h +++ b/internal/core/src/segcore/SegmentGrowingImpl.h @@ -31,6 +31,7 @@ #include "InsertRecord.h" #include #include +#include namespace milvus::segcore { @@ -46,6 +47,13 @@ class SegmentGrowingImpl : public SegmentGrowing { const Timestamp* timestamps, const RowBasedRawData& values) override; + void + Insert(int64_t reserved_offset, + int64_t size, + const int64_t* row_ids, + const Timestamp* timestamps, + const ColumnBasedRawData& values) override; + int64_t PreDelete(int64_t size) override; @@ -119,44 +127,25 @@ class SegmentGrowingImpl : public SegmentGrowing { return 0; } + // for scalar vectors template void - bulk_subscript_impl(const VectorBase& vec_raw, const int64_t* seg_offsets, int64_t count, void* output_raw) const { - static_assert(IsScalar); - auto vec_ptr = dynamic_cast*>(&vec_raw); - Assert(vec_ptr); - auto& vec = *vec_ptr; - auto output = reinterpret_cast(output_raw); - for (int64_t i = 0; i < count; ++i) { - auto offset = seg_offsets[i]; - output[i] = offset == -1 ? -1 : vec[offset]; - } - } + bulk_subscript_impl( + const VectorBase& vec_raw, const int64_t* seg_offsets, int64_t count, T default_value, void* output_raw) const; + + template + void + bulk_subscript_impl(int64_t element_sizeof, + const VectorBase& vec_raw, + const int64_t* seg_offsets, + int64_t count, + void* output_raw) const; void - bulk_subscript(SystemFieldType system_type, - const int64_t* seg_offsets, - int64_t count, - void* output) const override { - switch (system_type) { - case SystemFieldType::Timestamp: - PanicInfo("timestamp unsupported"); - case SystemFieldType::RowId: - bulk_subscript_impl(this->record_.uids_, seg_offsets, count, output); - break; - default: - PanicInfo("unknown subscript fields"); - } - } + bulk_subscript(SystemFieldType system_type, const int64_t* seg_offsets, int64_t count, void* output) const override; void - bulk_subscript(FieldOffset field_offset, const int64_t* seg_offsets, int64_t count, void* output) const override { - // TODO: support more types - auto vec_ptr = record_.get_field_data_base(field_offset); - auto data_type = schema_->operator[](field_offset).get_data_type(); - Assert(data_type == DataType::INT64); - bulk_subscript_impl(*vec_ptr, seg_offsets, count, output); - } + bulk_subscript(FieldOffset field_offset, const int64_t* seg_offsets, int64_t count, void* output) const override; Status LoadIndexing(const LoadIndexInfo& info) override; @@ -196,6 +185,14 @@ class SegmentGrowingImpl : public SegmentGrowing { Assert(plan); } + private: + void + do_insert(int64_t reserved_begin, + int64_t size, + const idx_t* row_ids, + const Timestamp* timestamps, + const std::vector>& columns_data); + private: int64_t size_per_chunk_; SchemaPtr schema_; diff --git a/internal/core/src/segcore/SegmentInterface.cpp b/internal/core/src/segcore/SegmentInterface.cpp index 313d6f4f1e..7c798ab89f 100644 --- a/internal/core/src/segcore/SegmentInterface.cpp +++ b/internal/core/src/segcore/SegmentInterface.cpp @@ -23,22 +23,51 @@ SegmentInternalInterface::FillTargetEntry(const query::Plan* plan, QueryResult& Assert(results.result_offsets_.size() == size); Assert(results.row_data_.size() == 0); - std::vector target(size); - if (plan->schema_.get_is_auto_id()) { - // use row_id - bulk_subscript(SystemFieldType::RowId, results.internal_seg_offsets_.data(), size, target.data()); - } else { - auto key_offset_opt = get_schema().get_primary_key_offset(); - Assert(key_offset_opt.has_value()); - auto key_offset = key_offset_opt.value(); - bulk_subscript(key_offset, results.internal_seg_offsets_.data(), size, target.data()); + // std::vector row_ids(size); + std::vector element_sizeofs; + std::vector> blobs; + + // fill row_ids + { + aligned_vector blob(size * sizeof(int64_t)); + if (plan->schema_.get_is_auto_id()) { + bulk_subscript(SystemFieldType::RowId, results.internal_seg_offsets_.data(), size, blob.data()); + } else { + auto key_offset_opt = get_schema().get_primary_key_offset(); + Assert(key_offset_opt.has_value()); + auto key_offset = key_offset_opt.value(); + Assert(get_schema()[key_offset].get_data_type() == DataType::INT64); + bulk_subscript(key_offset, results.internal_seg_offsets_.data(), size, blob.data()); + } + blobs.emplace_back(std::move(blob)); + element_sizeofs.push_back(sizeof(int64_t)); } + // fill other entries + for (auto field_offset : plan->target_entries_) { + auto& field_meta = get_schema()[field_offset]; + auto element_sizeof = field_meta.get_sizeof(); + aligned_vector blob(size * element_sizeof); + bulk_subscript(field_offset, results.internal_seg_offsets_.data(), size, blob.data()); + blobs.emplace_back(std::move(blob)); + element_sizeofs.push_back(element_sizeof); + } + + auto target_sizeof = std::accumulate(element_sizeofs.begin(), element_sizeofs.end(), 0); + for (int64_t i = 0; i < size; ++i) { - auto row_id = target[i]; - std::vector blob(sizeof(row_id)); - memcpy(blob.data(), &row_id, sizeof(row_id)); - results.row_data_.emplace_back(std::move(blob)); + int64_t element_offset = 0; + std::vector target(target_sizeof); + for (int loc = 0; loc < blobs.size(); ++loc) { + auto element_sizeof = element_sizeofs[loc]; + auto blob_ptr = blobs[loc].data(); + auto src = blob_ptr + element_sizeof * i; + auto dst = target.data() + element_offset; + memcpy(dst, src, element_sizeof); + element_offset += element_sizeof; + } + assert(element_offset == target_sizeof); + results.row_data_.emplace_back(std::move(target)); } } diff --git a/internal/core/src/segcore/SegmentSealed.h b/internal/core/src/segcore/SegmentSealed.h index 6d1e5d7ce9..b532fe9cde 100644 --- a/internal/core/src/segcore/SegmentSealed.h +++ b/internal/core/src/segcore/SegmentSealed.h @@ -8,7 +8,7 @@ // Unless required by applicable law or agreed to in writing, software distributed under the License // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License - +#pragma once #include #include "SegmentInterface.h" @@ -26,6 +26,10 @@ class SegmentSealed : public SegmentInternalInterface { DropIndex(const FieldId field_id) = 0; virtual void DropFieldData(const FieldId field_id) = 0; + virtual bool + HasIndex(FieldId field_id) const = 0; + virtual bool + HasFieldData(FieldId field_id) const = 0; }; using SegmentSealedPtr = std::unique_ptr; diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index 273aeab3e5..7505e5410d 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -11,7 +11,20 @@ #include "segcore/SegmentSealedImpl.h" #include "query/SearchOnSealed.h" +#include "query/ScalarIndex.h" +#include "query/SearchBruteForce.h" namespace milvus::segcore { + +static inline void +set_bit(boost::dynamic_bitset<>& bitset, FieldOffset field_offset, bool flag = true) { + bitset[field_offset.get()] = flag; +} + +static inline bool +get_bit(const boost::dynamic_bitset<>& bitset, FieldOffset field_offset) { + return bitset[field_offset.get()]; +} + void SegmentSealedImpl::LoadIndex(const LoadIndexInfo& info) { // NOTE: lock only when data is ready to avoid starvation @@ -24,14 +37,17 @@ SegmentSealedImpl::LoadIndex(const LoadIndexInfo& info) { Assert(row_count > 0); std::unique_lock lck(mutex_); + Assert(!get_bit(vecindex_ready_bitset_, field_offset)); if (row_count_opt_.has_value()) { AssertInfo(row_count_opt_.value() == row_count, "load data has different row count from other columns"); } else { row_count_opt_ = row_count; } - Assert(!vec_indexings_.is_ready(field_offset)); - vec_indexings_.append_field_indexing(field_offset, GetMetricType(metric_type_str), info.index); - set_field_ready(field_offset, true); + Assert(!vecindexs_.is_ready(field_offset)); + vecindexs_.append_field_indexing(field_offset, GetMetricType(metric_type_str), info.index); + + set_bit(vecindex_ready_bitset_, field_offset, true); + lck.unlock(); } void @@ -61,26 +77,40 @@ SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& info) { // prepare data auto field_offset = schema_->get_offset(field_id); auto& field_meta = schema_->operator[](field_offset); - Assert(!field_meta.is_vector()); + // Assert(!field_meta.is_vector()); auto element_sizeof = field_meta.get_sizeof(); + auto span = SpanBase(info.blob, info.row_count, element_sizeof); auto length_in_bytes = element_sizeof * info.row_count; aligned_vector vec_data(length_in_bytes); memcpy(vec_data.data(), info.blob, length_in_bytes); + // generate scalar index + std::unique_ptr index; + if (!field_meta.is_vector()) { + index = query::generate_scalar_index(span, field_meta.get_data_type()); + } + // write data under lock std::unique_lock lck(mutex_); update_row_count(info.row_count); - AssertInfo(field_datas_[field_offset.get()].empty(), "already exists"); - field_datas_[field_offset.get()] = std::move(vec_data); + AssertInfo(field_datas_[field_offset.get()].empty(), "field data already exists"); - set_field_ready(field_offset, true); + if (field_meta.is_vector()) { + AssertInfo(!vecindexs_.is_ready(field_offset), "field data can't be loaded when indexing exists"); + field_datas_[field_offset.get()] = std::move(vec_data); + } else { + AssertInfo(!scalar_indexings_[field_offset.get()], "scalar indexing not cleared"); + field_datas_[field_offset.get()] = std::move(vec_data); + scalar_indexings_[field_offset.get()] = std::move(index); + } + + set_bit(field_data_ready_bitset_, field_offset, true); } } int64_t SegmentSealedImpl::num_chunk_index(FieldOffset field_offset) const { - // TODO: support scalar index - return 0; + return 1; } int64_t @@ -96,7 +126,7 @@ SegmentSealedImpl::size_per_chunk() const { SpanBase SegmentSealedImpl::chunk_data_impl(FieldOffset field_offset, int64_t chunk_id) const { std::shared_lock lck(mutex_); - Assert(is_field_ready(field_offset)); + Assert(get_bit(field_data_ready_bitset_, field_offset)); auto& field_meta = schema_->operator[](field_offset); auto element_sizeof = field_meta.get_sizeof(); SpanBase base(field_datas_[field_offset.get()].data(), row_count_opt_.value(), element_sizeof); @@ -106,7 +136,9 @@ SegmentSealedImpl::chunk_data_impl(FieldOffset field_offset, int64_t chunk_id) c const knowhere::Index* SegmentSealedImpl::chunk_index_impl(FieldOffset field_offset, int64_t chunk_id) const { // TODO: support scalar index - PanicInfo("unimplemented"); + auto ptr = scalar_indexings_[field_offset.get()].get(); + Assert(ptr); + return ptr; } int64_t @@ -137,10 +169,44 @@ SegmentSealedImpl::vector_search(int64_t vec_count, QueryResult& output) const { auto field_offset = query_info.field_offset_; auto& field_meta = schema_->operator[](field_offset); + Assert(field_meta.is_vector()); - Assert(vec_indexings_.is_ready(field_offset)); - query::SearchOnSealed(*schema_, vec_indexings_, query_info, query_data, query_count, bitset, output); + if (get_bit(vecindex_ready_bitset_, field_offset)) { + Assert(vecindexs_.is_ready(field_offset)); + query::SearchOnSealed(*schema_, vecindexs_, query_info, query_data, query_count, bitset, output); + } else if (get_bit(field_data_ready_bitset_, field_offset)) { + query::dataset::QueryDataset dataset; + dataset.query_data = query_data; + dataset.num_queries = query_count; + dataset.metric_type = field_meta.get_metric_type(); + dataset.topk = query_info.topK_; + dataset.dim = field_meta.get_dim(); + + Assert(get_bit(field_data_ready_bitset_, field_offset)); + Assert(row_count_opt_.has_value()); + auto row_count = row_count_opt_.value(); + auto chunk_data = field_datas_[field_offset.get()].data(); + + auto sub_qr = [&] { + if (field_meta.get_data_type() == DataType::VECTOR_FLOAT) { + return query::FloatSearchBruteForce(dataset, chunk_data, row_count, bitset); + } else { + return query::BinarySearchBruteForce(dataset, chunk_data, row_count, bitset); + } + }(); + + QueryResult results; + results.result_distances_ = std::move(sub_qr.mutable_values()); + results.internal_seg_offsets_ = std::move(sub_qr.mutable_labels()); + results.topK_ = dataset.topk; + results.num_queries_ = dataset.num_queries; + + output = std::move(results); + } else { + PanicInfo("Field Data is not loaded"); + } } + void SegmentSealedImpl::DropFieldData(const FieldId field_id) { if (SystemProperty::Instance().IsSystem(field_id)) { @@ -156,10 +222,9 @@ SegmentSealedImpl::DropFieldData(const FieldId field_id) { } else { auto field_offset = schema_->get_offset(field_id); auto& field_meta = schema_->operator[](field_offset); - Assert(!field_meta.is_vector()); std::unique_lock lck(mutex_); - set_field_ready(field_offset, false); + set_bit(field_data_ready_bitset_, field_offset, false); auto vec = std::move(field_datas_[field_offset.get()]); lck.unlock(); @@ -175,7 +240,146 @@ SegmentSealedImpl::DropIndex(const FieldId field_id) { Assert(field_meta.is_vector()); std::unique_lock lck(mutex_); - vec_indexings_.drop_field_indexing(field_offset); + vecindexs_.drop_field_indexing(field_offset); + set_bit(vecindex_ready_bitset_, field_offset, false); +} + +void +SegmentSealedImpl::check_search(const query::Plan* plan) const { + Assert(plan); + Assert(plan->extra_info_opt_.has_value()); + + if (!is_system_field_ready()) { + PanicInfo("System Field RowID is not loaded"); + } + + auto& request_fields = plan->extra_info_opt_.value().involved_fields_; + auto field_ready_bitset = field_data_ready_bitset_ | vecindex_ready_bitset_; + Assert(request_fields.size() == field_ready_bitset.size()); + auto absent_fields = request_fields - field_ready_bitset; + + if (absent_fields.any()) { + auto field_offset = FieldOffset(absent_fields.find_first()); + auto& field_meta = schema_->operator[](field_offset); + PanicInfo("User Field(" + field_meta.get_name().get() + ") is not loaded"); + } +} + +SegmentSealedImpl::SegmentSealedImpl(SchemaPtr schema) + : schema_(schema), + field_datas_(schema->size()), + field_data_ready_bitset_(schema->size()), + vecindex_ready_bitset_(schema->size()), + scalar_indexings_(schema->size()) { +} +void +SegmentSealedImpl::bulk_subscript(SystemFieldType system_type, + const int64_t* seg_offsets, + int64_t count, + void* output) const { + Assert(is_system_field_ready()); + Assert(system_type == SystemFieldType::RowId); + bulk_subscript_impl(row_ids_.data(), seg_offsets, count, output); +} +template +void +SegmentSealedImpl::bulk_subscript_impl(const void* src_raw, const int64_t* seg_offsets, int64_t count, void* dst_raw) { + static_assert(IsScalar); + auto src = reinterpret_cast(src_raw); + auto dst = reinterpret_cast(dst_raw); + for (int64_t i = 0; i < count; ++i) { + auto offset = seg_offsets[i]; + dst[i] = offset == -1 ? -1 : src[offset]; + } +} + +// for vector +void +SegmentSealedImpl::bulk_subscript_impl( + int64_t element_sizeof, const void* src_raw, const int64_t* seg_offsets, int64_t count, void* dst_raw) { + auto src_vec = reinterpret_cast(src_raw); + auto dst_vec = reinterpret_cast(dst_raw); + std::vector none(element_sizeof, 0); + for (int64_t i = 0; i < count; ++i) { + auto offset = seg_offsets[i]; + auto dst = dst_vec + i * element_sizeof; + const char* src; + if (offset != 0) { + src = src_vec + element_sizeof * offset; + } else { + src = none.data(); + } + memcpy(dst, src, element_sizeof); + } +} + +void +SegmentSealedImpl::bulk_subscript(FieldOffset field_offset, + const int64_t* seg_offsets, + int64_t count, + void* output) const { + Assert(get_bit(field_data_ready_bitset_, field_offset)); + auto& field_meta = schema_->operator[](field_offset); + auto src_vec = field_datas_[field_offset.get()].data(); + switch (field_meta.get_data_type()) { + case DataType::BOOL: { + bulk_subscript_impl(src_vec, seg_offsets, count, output); + break; + } + case DataType::INT8: { + bulk_subscript_impl(src_vec, seg_offsets, count, output); + break; + } + case DataType::INT16: { + bulk_subscript_impl(src_vec, seg_offsets, count, output); + break; + } + case DataType::INT32: { + bulk_subscript_impl(src_vec, seg_offsets, count, output); + break; + } + case DataType::INT64: { + bulk_subscript_impl(src_vec, seg_offsets, count, output); + break; + } + case DataType::FLOAT: { + bulk_subscript_impl(src_vec, seg_offsets, count, output); + break; + } + case DataType::DOUBLE: { + bulk_subscript_impl(src_vec, seg_offsets, count, output); + break; + } + + case DataType::VECTOR_FLOAT: + case DataType::VECTOR_BINARY: { + bulk_subscript_impl(field_meta.get_sizeof(), src_vec, seg_offsets, count, output); + break; + } + + default: { + PanicInfo("unsupported"); + } + } +} + +bool +SegmentSealedImpl::HasIndex(FieldId field_id) const { + std::shared_lock lck(mutex_); + Assert(!SystemProperty::Instance().IsSystem(field_id)); + auto field_offset = schema_->get_offset(field_id); + return get_bit(vecindex_ready_bitset_, field_offset); +} + +bool +SegmentSealedImpl::HasFieldData(FieldId field_id) const { + std::shared_lock lck(mutex_); + if (SystemProperty::Instance().IsSystem(field_id)) { + return is_system_field_ready(); + } else { + auto field_offset = schema_->get_offset(field_id); + return get_bit(field_data_ready_bitset_, field_offset); + } } SegmentSealedPtr diff --git a/internal/core/src/segcore/SegmentSealedImpl.h b/internal/core/src/segcore/SegmentSealedImpl.h index 89f2ba4674..d8ac6999a2 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.h +++ b/internal/core/src/segcore/SegmentSealedImpl.h @@ -14,13 +14,12 @@ #include "SealedIndexingRecord.h" #include #include +#include namespace milvus::segcore { class SegmentSealedImpl : public SegmentSealed { public: - explicit SegmentSealedImpl(SchemaPtr schema) - : schema_(schema), field_datas_(schema->size()), field_ready_bitset_(schema->size()) { - } + explicit SegmentSealedImpl(SchemaPtr schema); void LoadIndex(const LoadIndexInfo& info) override; void @@ -30,6 +29,11 @@ class SegmentSealedImpl : public SegmentSealed { void DropFieldData(const FieldId field_id) override; + bool + HasIndex(FieldId field_id) const override; + bool + HasFieldData(FieldId field_id) const override; + public: int64_t GetMemoryUsageInBytes() const override; @@ -62,56 +66,24 @@ class SegmentSealedImpl : public SegmentSealed { // Calculate: output[i] = Vec[seg_offset[i]], // where Vec is determined from field_offset void - bulk_subscript(SystemFieldType system_type, - const int64_t* seg_offsets, - int64_t count, - void* output) const override { - Assert(is_system_field_ready()); - Assert(system_type == SystemFieldType::RowId); - bulk_subscript_impl(row_ids_.data(), seg_offsets, count, output); - } + bulk_subscript(SystemFieldType system_type, const int64_t* seg_offsets, int64_t count, void* output) const override; // Calculate: output[i] = Vec[seg_offset[i]] // where Vec is determined from field_offset void - bulk_subscript(FieldOffset field_offset, const int64_t* seg_offsets, int64_t count, void* output) const override { - Assert(is_field_ready(field_offset)); - auto& field_meta = schema_->operator[](field_offset); - Assert(field_meta.get_data_type() == DataType::INT64); - bulk_subscript_impl(field_datas_[field_offset.get()].data(), seg_offsets, count, output); - } + bulk_subscript(FieldOffset field_offset, const int64_t* seg_offsets, int64_t count, void* output) const override; void - check_search(const query::Plan* plan) const override { - Assert(plan); - Assert(plan->extra_info_opt_.has_value()); - - if (!is_system_field_ready()) { - PanicInfo("System Field RowID is not loaded"); - } - - auto& request_fields = plan->extra_info_opt_.value().involved_fields_; - Assert(request_fields.size() == field_ready_bitset_.size()); - auto absent_fields = request_fields - field_ready_bitset_; - if (absent_fields.any()) { - auto field_offset = FieldOffset(absent_fields.find_first()); - auto& field_meta = schema_->operator[](field_offset); - PanicInfo("User Field(" + field_meta.get_name().get() + ") is not loaded"); - } - } + check_search(const query::Plan* plan) const override; private: template static void - bulk_subscript_impl(const void* src_raw, const int64_t* seg_offsets, int64_t count, void* dst_raw) { - static_assert(IsScalar); - auto src = reinterpret_cast(src_raw); - auto dst = reinterpret_cast(dst_raw); - for (int64_t i = 0; i < count; ++i) { - auto offset = seg_offsets[i]; - dst[i] = offset == -1 ? -1 : src[offset]; - } - } + bulk_subscript_impl(const void* src_raw, const int64_t* seg_offsets, int64_t count, void* dst_raw); + + static void + bulk_subscript_impl( + int64_t element_sizeof, const void* src_raw, const int64_t* seg_offsets, int64_t count, void* dst_raw); void update_row_count(int64_t row_count) { @@ -135,25 +107,16 @@ class SegmentSealedImpl : public SegmentSealed { return system_ready_count_ == 1; } - bool - is_field_ready(FieldOffset field_offset) const { - return field_ready_bitset_.test(field_offset.get()); - } - - void - set_field_ready(FieldOffset field_offset, bool flag = true) { - field_ready_bitset_[field_offset.get()] = flag; - } - private: // segment loading state - boost::dynamic_bitset<> field_ready_bitset_; + boost::dynamic_bitset<> field_data_ready_bitset_; + boost::dynamic_bitset<> vecindex_ready_bitset_; std::atomic system_ready_count_ = 0; // segment datas // TODO: generate index for scalar std::optional row_count_opt_; - std::map scalar_indexings_; - SealedIndexingRecord vec_indexings_; + std::vector> scalar_indexings_; + SealedIndexingRecord vecindexs_; std::vector> field_datas_; aligned_vector row_ids_; SchemaPtr schema_; diff --git a/internal/core/unittest/test_indexing.cpp b/internal/core/unittest/test_indexing.cpp index beedf9113f..2830c39e08 100644 --- a/internal/core/unittest/test_indexing.cpp +++ b/internal/core/unittest/test_indexing.cpp @@ -247,7 +247,7 @@ TEST(Indexing, BinaryBruteForce) { auto dataset = DataGen(schema, N, 10); auto bin_vec = dataset.get_col(0); auto query_data = 1024 * dim / 8 + bin_vec.data(); - query::dataset::BinaryQueryDataset query_dataset{ + query::dataset::QueryDataset query_dataset{ faiss::MetricType::METRIC_Jaccard, // num_queries, // topk, // diff --git a/internal/core/unittest/test_naive.cpp b/internal/core/unittest/test_naive.cpp index 48f8d797a3..80f3f2c3e2 100644 --- a/internal/core/unittest/test_naive.cpp +++ b/internal/core/unittest/test_naive.cpp @@ -10,6 +10,7 @@ // or implied. See the License for the specific language governing permissions and limitations under the License #include +#include "query/ScalarIndex.h" TEST(TestNaive, Naive) { EXPECT_TRUE(true); diff --git a/internal/core/unittest/test_query.cpp b/internal/core/unittest/test_query.cpp index 3c1a919932..b80f92dbad 100644 --- a/internal/core/unittest/test_query.cpp +++ b/internal/core/unittest/test_query.cpp @@ -129,6 +129,100 @@ TEST(Query, ParsePlaceholderGroup) { auto placeholder = ParsePlaceholderGroup(plan.get(), blob); } +TEST(Query, ExecWithPredicateLoader) { + using namespace milvus::query; + using namespace milvus::segcore; + auto schema = std::make_shared(); + schema->AddDebugField("fakevec", DataType::VECTOR_FLOAT, 16, MetricType::METRIC_L2); + schema->AddDebugField("age", DataType::FLOAT); + std::string dsl = R"({ + "bool": { + "must": [ + { + "range": { + "age": { + "GE": -1, + "LT": 1 + } + } + }, + { + "vector": { + "fakevec": { + "metric_type": "L2", + "params": { + "nprobe": 10 + }, + "query": "$0", + "topk": 5 + } + } + } + ] + } + })"; + int64_t N = 1000 * 1000; + auto dataset = DataGen(schema, N); + auto segment = CreateGrowingSegment(schema); + segment->PreInsert(N); + ColumnBasedRawData raw_data; + raw_data.columns_ = dataset.cols_; + raw_data.count = N; + segment->Insert(0, N, dataset.row_ids_.data(), dataset.timestamps_.data(), raw_data); + + auto plan = CreatePlan(*schema, dsl); + auto num_queries = 5; + auto ph_group_raw = CreatePlaceholderGroup(num_queries, 16, 1024); + auto ph_group = ParsePlaceholderGroup(plan.get(), ph_group_raw.SerializeAsString()); + Timestamp time = 1000000; + std::vector ph_group_arr = {ph_group.get()}; + auto qr = segment->Search(plan.get(), ph_group_arr.data(), &time, 1); + int topk = 5; + + Json json = QueryResultToJson(qr); + auto ref = json::parse(R"( +[ + [ + [ + "980486->3.149221", + "318367->3.661235", + "302798->4.553688", + "321424->4.757450", + "565529->5.083780" + ], + [ + "233390->7.931535", + "238958->8.109344", + "230645->8.439169", + "901939->8.658772", + "380328->8.731251" + ], + [ + "897246->3.749835", + "750683->3.897577", + "857598->4.230977", + "299009->4.379639", + "440010->4.454046" + ], + [ + "840855->4.782170", + "709627->5.063170", + "72322->5.166143", + "107142->5.180207", + "948403->5.247065" + ], + [ + "810401->3.926393", + "46575->4.054171", + "201740->4.274491", + "669040->4.399628", + "231500->4.831223" + ] + ] +])"); + ASSERT_EQ(json.dump(2), ref.dump(2)); +} + TEST(Query, ExecWithPredicate) { using namespace milvus::query; using namespace milvus::segcore; @@ -449,6 +543,7 @@ TEST(Query, FillSegment) { proto.set_name("col"); proto.set_description("asdfhsalkgfhsadg"); proto.set_autoid(false); + auto dim = 16; { auto field = proto.add_fields(); @@ -474,12 +569,49 @@ TEST(Query, FillSegment) { field->set_data_type(pb::schema::DataType::INT64); } + { + auto field = proto.add_fields(); + field->set_name("the_value"); + field->set_fieldid(102); + field->set_is_primary_key(false); + field->set_description("asdgfsagf"); + field->set_data_type(pb::schema::DataType::INT32); + } + auto schema = Schema::ParseFrom(proto); - auto segment = CreateGrowingSegment(schema); + + // dispatch here int N = 100000; auto dataset = DataGen(schema, N); - segment->PreInsert(N); - segment->Insert(0, N, dataset.row_ids_.data(), dataset.timestamps_.data(), dataset.raw_); + const auto std_vec = dataset.get_col(1); + const auto std_vfloat_vec = dataset.get_col(0); + const auto std_i32_vec = dataset.get_col(2); + + std::vector> segments; + segments.emplace_back([&] { + auto segment = CreateGrowingSegment(schema); + segment->PreInsert(N); + segment->Insert(0, N, dataset.row_ids_.data(), dataset.timestamps_.data(), dataset.raw_); + return segment; + }()); + segments.emplace_back([&] { + auto segment = CreateSealedSegment(schema); + SealedLoader(dataset, *segment); + // auto indexing = GenIndexing(N, dim, std_vfloat_vec.data()); + + // LoadIndexInfo info; + // auto field_offset = schema->get_offset(FieldName("fakevec")); + // auto& meta = schema->operator[](field_offset); + + // info.field_id = meta.get_id().get(); + // info.field_name = meta.get_name().get(); + // info.index_params["metric_type"] = "L2"; + // info.index = indexing; + + // segment->LoadIndex(info); + return segment; + }()); + std::string dsl = R"({ "bool": { "must": [ @@ -503,27 +635,44 @@ TEST(Query, FillSegment) { auto ph = ParsePlaceholderGroup(plan.get(), ph_proto.SerializeAsString()); std::vector groups = {ph.get()}; std::vector timestamps = {N * 2UL}; - QueryResult result; - result = segment->Search(plan.get(), groups.data(), timestamps.data(), 1); - auto topk = 5; auto num_queries = 10; - result.result_offsets_.resize(topk * num_queries); - segment->FillTargetEntry(plan.get(), result); + for (auto& segment : segments) { + plan->target_entries_.clear(); + plan->target_entries_.push_back(schema->get_offset(FieldName("fakevec"))); + plan->target_entries_.push_back(schema->get_offset(FieldName("the_value"))); + QueryResult result = segment->Search(plan.get(), groups.data(), timestamps.data(), 1); + // std::cout << QueryResultToJson(result).dump(2); + result.result_offsets_.resize(topk * num_queries); + segment->FillTargetEntry(plan.get(), result); - auto ans = result.row_data_; - ASSERT_EQ(ans.size(), topk * num_queries); - int64_t std_index = 0; - auto std_vec = dataset.get_col(1); - for (auto& vec : ans) { - ASSERT_EQ(vec.size(), sizeof(int64_t)); - int64_t val; - memcpy(&val, vec.data(), sizeof(int64_t)); - auto internal_offset = result.internal_seg_offsets_[std_index]; - auto std_val = std_vec[internal_offset]; - ASSERT_EQ(val, std_val) << "io:" << internal_offset; - ++std_index; + auto ans = result.row_data_; + ASSERT_EQ(ans.size(), topk * num_queries); + int64_t std_index = 0; + + for (auto& vec : ans) { + ASSERT_EQ(vec.size(), sizeof(int64_t) + sizeof(float) * dim + sizeof(int32_t)); + int64_t val; + memcpy(&val, vec.data(), sizeof(int64_t)); + + auto internal_offset = result.internal_seg_offsets_[std_index]; + auto std_val = std_vec[internal_offset]; + auto std_i32 = std_i32_vec[internal_offset]; + std::vector std_vfloat(dim); + std::copy_n(std_vfloat_vec.begin() + dim * internal_offset, dim, std_vfloat.begin()); + + ASSERT_EQ(val, std_val) << "io:" << internal_offset; + if (val != -1) { + std::vector vfloat(dim); + int i32; + memcpy(vfloat.data(), vec.data() + sizeof(int64_t), dim * sizeof(float)); + memcpy(&i32, vec.data() + sizeof(int64_t) + dim * sizeof(float), sizeof(int32_t)); + ASSERT_EQ(vfloat, std_vfloat) << std_index; + ASSERT_EQ(i32, std_i32) << std_index; + } + ++std_index; + } } } diff --git a/internal/core/unittest/test_sealed.cpp b/internal/core/unittest/test_sealed.cpp index 08ad60375e..5cce2286ba 100644 --- a/internal/core/unittest/test_sealed.cpp +++ b/internal/core/unittest/test_sealed.cpp @@ -214,32 +214,6 @@ TEST(Sealed, with_predicate) { } } -void -SealedLoader(const GeneratedData& dataset, SegmentSealed& seg) { - // TODO - auto row_count = dataset.row_ids_.size(); - { - LoadFieldDataInfo info; - info.blob = dataset.row_ids_.data(); - info.row_count = dataset.row_ids_.size(); - info.field_id = 0; // field id for RowId - seg.LoadFieldData(info); - } - int field_offset = 0; - for (auto& meta : seg.get_schema().get_fields()) { - if (meta.is_vector()) { - ++field_offset; - continue; - } - LoadFieldDataInfo info; - info.field_id = meta.get_id().get(); - info.row_count = row_count; - info.blob = dataset.cols_[field_offset].data(); - seg.LoadFieldData(info); - ++field_offset; - } -} - TEST(Sealed, LoadFieldData) { auto dim = 16; auto topK = 5; @@ -255,16 +229,7 @@ TEST(Sealed, LoadFieldData) { auto fakevec = dataset.get_col(0); - auto conf = knowhere::Config{{knowhere::meta::DIM, dim}, - {knowhere::meta::TOPK, topK}, - {knowhere::IndexParams::nlist, 100}, - {knowhere::IndexParams::nprobe, 10}, - {knowhere::Metric::TYPE, milvus::knowhere::Metric::L2}, - {knowhere::meta::DEVICEID, 0}}; - auto database = knowhere::GenDataset(N, dim, fakevec.data()); - auto indexing = std::make_shared(); - indexing->Train(database, conf); - indexing->AddWithoutIds(database, conf); + auto indexing = GenIndexing(N, dim, fakevec.data()); auto segment = CreateSealedSegment(schema); std::string dsl = R"({ @@ -305,7 +270,9 @@ TEST(Sealed, LoadFieldData) { SealedLoader(dataset, *segment); segment->DropFieldData(nothing_id); + segment->Search(plan.get(), ph_group_arr.data(), &time, 1); + segment->DropFieldData(fakevec_id); ASSERT_ANY_THROW(segment->Search(plan.get(), ph_group_arr.data(), &time, 1)); LoadIndexInfo vec_info; @@ -336,4 +303,46 @@ TEST(Sealed, LoadFieldData) { ASSERT_EQ(json.dump(-2), json2.dump(-2)); segment->DropFieldData(double_id); ASSERT_ANY_THROW(segment->Search(plan.get(), ph_group_arr.data(), &time, 1)); + auto std_json = Json::parse(R"( +[ + [ + [ + "980486->3.149221", + "579754->3.634295", + "318367->3.661235", + "265835->4.333358", + "302798->4.553688" + ], + [ + "233390->7.931535", + "238958->8.109344", + "230645->8.439169", + "901939->8.658772", + "380328->8.731251" + ], + [ + "897246->3.749835", + "750683->3.897577", + "857598->4.230977", + "299009->4.379639", + "440010->4.454046" + ], + [ + "37641->3.783446", + "22628->4.719435", + "840855->4.782170", + "709627->5.063170", + "635836->5.156095" + ], + [ + "810401->3.926393", + "46575->4.054171", + "201740->4.274491", + "669040->4.399628", + "231500->4.831223" + ] + ] +] + )"); + ASSERT_EQ(std_json.dump(-2), json.dump(-2)); } diff --git a/internal/core/unittest/test_utils/DataGen.h b/internal/core/unittest/test_utils/DataGen.h index 0904f6fe91..e20cf411e1 100644 --- a/internal/core/unittest/test_utils/DataGen.h +++ b/internal/core/unittest/test_utils/DataGen.h @@ -15,15 +15,22 @@ #include #include #include "segcore/SegmentGrowing.h" +#include "segcore/SegmentSealed.h" #include "Constants.h" #include + +#include +#include +#include +#include +#include using boost::algorithm::starts_with; namespace milvus::segcore { struct GeneratedData { std::vector rows_; - std::vector> cols_; + std::vector> cols_; std::vector row_ids_; std::vector timestamps_; RowBasedRawData raw_; @@ -51,6 +58,7 @@ struct GeneratedData { void generate_rows(int N, SchemaPtr schema); }; + inline void GeneratedData::generate_rows(int N, SchemaPtr schema) { std::vector offset_infos(schema->size() + 1, 0); @@ -78,7 +86,7 @@ GeneratedData::generate_rows(int N, SchemaPtr schema) { inline GeneratedData DataGen(SchemaPtr schema, int64_t N, uint64_t seed = 42) { using std::vector; - std::vector> cols; + std::vector> cols; std::default_random_engine er(seed); std::normal_distribution<> distr(0, 1); int offset = 0; @@ -86,7 +94,7 @@ DataGen(SchemaPtr schema, int64_t N, uint64_t seed = 42) { auto insert_cols = [&cols](auto& data) { using T = std::remove_reference_t; auto len = sizeof(typename T::value_type) * data.size(); - auto ptr = vector(len); + auto ptr = aligned_vector(len); memcpy(ptr.data(), data.data(), len); cols.emplace_back(std::move(ptr)); }; @@ -280,4 +288,40 @@ QueryResultToJson(const QueryResult& qr) { return json{results}; }; +inline void +SealedLoader(const GeneratedData& dataset, SegmentSealed& seg) { + // TODO + auto row_count = dataset.row_ids_.size(); + { + LoadFieldDataInfo info; + info.blob = dataset.row_ids_.data(); + info.row_count = dataset.row_ids_.size(); + info.field_id = 0; // field id for RowId + seg.LoadFieldData(info); + } + int field_offset = 0; + for (auto& meta : seg.get_schema().get_fields()) { + LoadFieldDataInfo info; + info.field_id = meta.get_id().get(); + info.row_count = row_count; + info.blob = dataset.cols_[field_offset].data(); + seg.LoadFieldData(info); + ++field_offset; + } +} + +inline knowhere::VecIndexPtr +GenIndexing(int64_t N, int64_t dim, const float* vec) { + auto conf = knowhere::Config{{knowhere::meta::DIM, dim}, + {knowhere::IndexParams::nlist, 100}, + {knowhere::IndexParams::nprobe, 10}, + {knowhere::Metric::TYPE, milvus::knowhere::Metric::L2}, + {knowhere::meta::DEVICEID, 0}}; + auto database = knowhere::GenDataset(N, dim, vec); + auto indexing = std::make_shared(); + indexing->Train(database, conf); + indexing->AddWithoutIds(database, conf); + return indexing; +} + } // namespace milvus::segcore diff --git a/internal/msgstream/pulsarms/pulsar_msgstream.go b/internal/msgstream/pulsarms/pulsar_msgstream.go index 1aa3229954..9e5ada5607 100644 --- a/internal/msgstream/pulsarms/pulsar_msgstream.go +++ b/internal/msgstream/pulsarms/pulsar_msgstream.go @@ -490,9 +490,9 @@ func (ms *PulsarTtMsgStream) bufMsgPackToChannel() { wg.Add(1) go ms.findTimeTick(consumer, eofMsgTimeStamp, &wg, &findMapMutex) } + ms.consumerLock.Unlock() wg.Wait() timeStamp, ok := checkTimeTickMsg(eofMsgTimeStamp, isChannelReady, &findMapMutex) - ms.consumerLock.Unlock() if !ok || timeStamp <= ms.lastTimeStamp { //log.Printf("All timeTick's timestamps are inconsistent") continue @@ -501,6 +501,9 @@ func (ms *PulsarTtMsgStream) bufMsgPackToChannel() { msgPositions := make([]*internalpb2.MsgPosition, 0) ms.unsolvedMutex.Lock() for consumer, msgs := range ms.unsolvedBuf { + if len(msgs) == 0 { + continue + } tempBuffer := make([]TsMsg, 0) var timeTickMsg TsMsg for _, v := range msgs { diff --git a/internal/queryservice/queryservice.go b/internal/queryservice/queryservice.go index 767a9252e6..d791efc181 100644 --- a/internal/queryservice/queryservice.go +++ b/internal/queryservice/queryservice.go @@ -590,11 +590,33 @@ func (qs *QueryService) watchDmChannels(dbID UniqueID, collectionID UniqueID) er } dmChannels := resp.Values - channels2NodeID := qs.shuffleChannelsToQueryNode(dmChannels) + watchedChannels2NodeID := make(map[string]UniqueID) + unWatchedChannels := make([]string, 0) + for _, channel := range dmChannels { + findChannel := false + for nodeID, node := range qs.queryNodes { + watchedChannels := node.dmChannelNames + for _, watchedChannel := range watchedChannels { + if channel == watchedChannel { + findChannel = true + watchedChannels2NodeID[channel] = nodeID + break + } + } + } + if !findChannel { + unWatchedChannels = append(unWatchedChannels, channel) + } + } + channels2NodeID := qs.shuffleChannelsToQueryNode(unWatchedChannels) err = qs.replica.addDmChannels(dbID, collection.id, channels2NodeID) if err != nil { return err } + err = qs.replica.addDmChannels(dbID, collection.id, watchedChannels2NodeID) + if err != nil { + return err + } node2channels := make(map[UniqueID][]string) for channel, nodeID := range channels2NodeID { if _, ok := node2channels[nodeID]; ok { @@ -625,6 +647,9 @@ func (qs *QueryService) watchDmChannels(dbID UniqueID, collectionID UniqueID) er func (qs *QueryService) shuffleChannelsToQueryNode(dmChannels []string) map[string]UniqueID { maxNumDMChannel := 0 res := make(map[string]UniqueID) + if len(dmChannels) == 0 { + return res + } node2lens := make(map[UniqueID]int) for id, node := range qs.queryNodes { node2lens[id] = len(node.dmChannelNames)