From 911a8df17c2f7f8931ce19c2be122d6fd15ef90d Mon Sep 17 00:00:00 2001 From: Spade A <71589810+SpadeA-Tang@users.noreply.github.com> Date: Thu, 12 Jun 2025 14:38:35 +0800 Subject: [PATCH] feat: impl StructArray -- data storage support in segcore (#42406) Ref https://github.com/milvus-io/milvus/issues/42148 This PR mainly enables segcore to support array of vector (read and write, but not indexing). Now only float vector as the element type is supported. --------- Signed-off-by: SpadeA Signed-off-by: SpadeA-Tang --- client/go.mod | 2 +- client/go.sum | 4 +- go.mod | 2 +- go.sum | 4 +- internal/core/src/common/Array.h | 135 +++--- internal/core/src/common/Chunk.cpp | 3 - internal/core/src/common/Chunk.h | 66 ++- internal/core/src/common/ChunkWriter.cpp | 86 +++- internal/core/src/common/ChunkWriter.h | 25 + internal/core/src/common/FieldData.cpp | 19 +- internal/core/src/common/FieldData.h | 14 + internal/core/src/common/FieldDataInterface.h | 30 ++ internal/core/src/common/FieldMeta.cpp | 12 + internal/core/src/common/FieldMeta.h | 23 +- internal/core/src/common/Json.h | 3 +- internal/core/src/common/JsonUtils.h | 11 + internal/core/src/common/Schema.cpp | 15 +- internal/core/src/common/Schema.h | 18 + internal/core/src/common/Span.h | 1 + internal/core/src/common/TypeTraits.h | 66 +++ internal/core/src/common/Types.h | 29 +- internal/core/src/common/VectorArray.h | 352 ++++++++++++++ internal/core/src/common/VectorTrait.h | 33 -- .../core/src/exec/expression/ExistsExpr.cpp | 36 +- .../core/src/exec/expression/UnaryExpr.cpp | 405 ++++++++-------- internal/core/src/index/IndexFactory.h | 14 +- internal/core/src/indexbuilder/IndexFactory.h | 4 + internal/core/src/mmap/ChunkVector.h | 8 + internal/core/src/mmap/ChunkedColumn.h | 49 +- internal/core/src/mmap/ChunkedColumnGroup.h | 39 +- .../core/src/mmap/ChunkedColumnInterface.h | 19 +- .../src/segcore/ChunkedSegmentSealedImpl.cpp | 45 +- .../src/segcore/ChunkedSegmentSealedImpl.h | 8 + .../core/src/segcore/ConcurrentVector.cpp | 8 + internal/core/src/segcore/ConcurrentVector.h | 22 +- internal/core/src/segcore/InsertRecord.h | 6 + .../core/src/segcore/SegmentGrowingImpl.cpp | 32 +- .../core/src/segcore/SegmentGrowingImpl.h | 9 + .../core/src/segcore/SegmentInterface.cpp | 3 +- internal/core/src/segcore/Utils.cpp | 59 ++- internal/core/src/segcore/Utils.h | 4 +- internal/core/src/segcore/reduce/Reduce.cpp | 2 + .../core/src/segcore/reduce/StreamReduce.cpp | 5 + internal/core/src/storage/Event.cpp | 18 + internal/core/src/storage/PayloadReader.cpp | 3 +- internal/core/src/storage/Util.cpp | 5 + internal/core/unittest/CMakeLists.txt | 1 + .../core/unittest/test_array_bitmap_index.cpp | 2 +- internal/core/unittest/test_array_expr.cpp | 72 +-- internal/core/unittest/test_chunk_vector.cpp | 10 +- internal/core/unittest/test_growing.cpp | 127 +++++ internal/core/unittest/test_sealed.cpp | 88 +++- internal/core/unittest/test_utils/DataGen.h | 454 +++++++++++------- .../unittest/test_utils/storage_test_utils.h | 15 +- internal/core/unittest/test_vector_array.cpp | 135 ++++++ .../rootcoord/create_collection_task_test.go | 2 +- pkg/go.mod | 2 +- pkg/go.sum | 2 + tests/go_client/go.mod | 2 +- tests/go_client/go.sum | 4 +- 60 files changed, 2082 insertions(+), 590 deletions(-) create mode 100644 internal/core/src/common/TypeTraits.h create mode 100644 internal/core/src/common/VectorArray.h create mode 100644 internal/core/unittest/test_vector_array.cpp diff --git a/client/go.mod b/client/go.mod index 18f5d051b4..919edfce32 100644 --- a/client/go.mod +++ b/client/go.mod @@ -6,7 +6,7 @@ require ( github.com/blang/semver/v4 v4.0.0 github.com/cockroachdb/errors v1.9.1 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 - github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250520065019-02ce2e62a9fd + github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250604032224-16218b12b847 github.com/milvus-io/milvus/pkg/v2 v2.0.0-20250319085209-5a6b4e56d59e github.com/quasilyte/go-ruleguard/dsl v0.3.22 github.com/samber/lo v1.27.0 diff --git a/client/go.sum b/client/go.sum index 88a1d21f80..3dbaeb47ae 100644 --- a/client/go.sum +++ b/client/go.sum @@ -318,8 +318,8 @@ github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfr github.com/mediocregopher/radix/v3 v3.4.2/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8= github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= -github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250520065019-02ce2e62a9fd h1:nriBHIny3LiSU56kP2FiIVtxh/0EEFXBpZk4WirsR7s= -github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250520065019-02ce2e62a9fd/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= +github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250604032224-16218b12b847 h1:6MFC+EIDe+n1aSX0BZ4s1/XbbPaEzQCx6JBoM1NTTz0= +github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250604032224-16218b12b847/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= github.com/milvus-io/milvus/pkg/v2 v2.0.0-20250319085209-5a6b4e56d59e h1:VCr43pG4efacDbM4au70fh8/5hNTftoWzm1iEumvDWM= github.com/milvus-io/milvus/pkg/v2 v2.0.0-20250319085209-5a6b4e56d59e/go.mod h1:37AWzxVs2NS4QUJrkcbeLUwi+4Av0h5mEdjLI62EANU= github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= diff --git a/go.mod b/go.mod index 4ae5c10b15..31eeed2308 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/klauspost/compress v1.17.9 github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d - github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250527033021-e6b398e94ee6 + github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250604032224-16218b12b847 github.com/minio/minio-go/v7 v7.0.73 github.com/panjf2000/ants/v2 v2.11.3 // indirect github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81 diff --git a/go.sum b/go.sum index 6cd6d427e6..8671374e81 100644 --- a/go.sum +++ b/go.sum @@ -737,8 +737,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20250318084424-114f4050c3a6 h1:YHMFI6L github.com/milvus-io/cgosymbolizer v0.0.0-20250318084424-114f4050c3a6/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4= -github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250527033021-e6b398e94ee6 h1:/m5rWCbnFL0Pg2KLS6Lw/lzQfdEb4qa2dLEqE4QjpkU= -github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250527033021-e6b398e94ee6/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= +github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250604032224-16218b12b847 h1:6MFC+EIDe+n1aSX0BZ4s1/XbbPaEzQCx6JBoM1NTTz0= +github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250604032224-16218b12b847/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE= github.com/milvus-io/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= diff --git a/internal/core/src/common/Array.h b/internal/core/src/common/Array.h index 199f5218bf..5ff655a795 100644 --- a/internal/core/src/common/Array.h +++ b/internal/core/src/common/Array.h @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -33,13 +34,7 @@ class Array { public: Array() = default; - ~Array() { - delete[] data_; - if (offsets_ptr_) { - // only deallocate offsets for string type array - delete[] offsets_ptr_; - } - } + ~Array() = default; Array(char* data, int len, @@ -47,78 +42,78 @@ class Array { DataType element_type, const uint32_t* offsets_ptr) : size_(size), length_(len), element_type_(element_type) { - data_ = new char[size]; - std::copy(data, data + size, data_); + data_ = std::make_unique(size); + std::copy(data, data + size, data_.get()); if (IsVariableDataType(element_type)) { AssertInfo(offsets_ptr != nullptr, "For variable type elements in array, offsets_ptr must " "be non-null"); - offsets_ptr_ = new uint32_t[len]; - std::copy(offsets_ptr, offsets_ptr + len, offsets_ptr_); + offsets_ptr_ = std::make_unique(len); + std::copy(offsets_ptr, offsets_ptr + len, offsets_ptr_.get()); } } - explicit Array(const ScalarArray& field_data) { + explicit Array(const ScalarFieldProto& field_data) { switch (field_data.data_case()) { - case ScalarArray::kBoolData: { + case ScalarFieldProto::kBoolData: { element_type_ = DataType::BOOL; length_ = field_data.bool_data().data().size(); - auto data = new bool[length_]; size_ = length_; + data_ = std::make_unique(size_); for (int i = 0; i < length_; ++i) { - data[i] = field_data.bool_data().data(i); + reinterpret_cast(data_.get())[i] = + field_data.bool_data().data(i); } - data_ = reinterpret_cast(data); break; } - case ScalarArray::kIntData: { + case ScalarFieldProto::kIntData: { element_type_ = DataType::INT32; length_ = field_data.int_data().data().size(); size_ = length_ * sizeof(int32_t); - data_ = new char[size_]; + data_ = std::make_unique(size_); for (int i = 0; i < length_; ++i) { - reinterpret_cast(data_)[i] = + reinterpret_cast(data_.get())[i] = field_data.int_data().data(i); } break; } - case ScalarArray::kLongData: { + case ScalarFieldProto::kLongData: { element_type_ = DataType::INT64; length_ = field_data.long_data().data().size(); size_ = length_ * sizeof(int64_t); - data_ = new char[size_]; + data_ = std::make_unique(size_); for (int i = 0; i < length_; ++i) { - reinterpret_cast(data_)[i] = + reinterpret_cast(data_.get())[i] = field_data.long_data().data(i); } break; } - case ScalarArray::kFloatData: { + case ScalarFieldProto::kFloatData: { element_type_ = DataType::FLOAT; length_ = field_data.float_data().data().size(); size_ = length_ * sizeof(float); - data_ = new char[size_]; + data_ = std::make_unique(size_); for (int i = 0; i < length_; ++i) { - reinterpret_cast(data_)[i] = + reinterpret_cast(data_.get())[i] = field_data.float_data().data(i); } break; } - case ScalarArray::kDoubleData: { + case ScalarFieldProto::kDoubleData: { element_type_ = DataType::DOUBLE; length_ = field_data.double_data().data().size(); size_ = length_ * sizeof(double); - data_ = new char[size_]; + data_ = std::make_unique(size_); for (int i = 0; i < length_; ++i) { - reinterpret_cast(data_)[i] = + reinterpret_cast(data_.get())[i] = field_data.double_data().data(i); } break; } - case ScalarArray::kStringData: { + case ScalarFieldProto::kStringData: { element_type_ = DataType::STRING; length_ = field_data.string_data().data().size(); - offsets_ptr_ = new uint32_t[length_]; + offsets_ptr_ = std::make_unique(length_); for (int i = 0; i < length_; ++i) { offsets_ptr_[i] = size_; size_ += @@ -126,11 +121,11 @@ class Array { .data(i) .size(); //type risk here between uint32_t vs size_t } - data_ = new char[size_]; + data_ = std::make_unique(size_); for (int i = 0; i < length_; ++i) { std::copy_n(field_data.string_data().data(i).data(), field_data.string_data().data(i).size(), - data_ + offsets_ptr_[i]); + data_.get() + offsets_ptr_[i]); } break; } @@ -144,35 +139,43 @@ class Array { : length_{array.length_}, size_{array.size_}, element_type_{array.element_type_} { - data_ = new char[array.size_]; - std::copy(array.data_, array.data_ + array.size_, data_); + data_ = std::make_unique(array.size_); + std::copy( + array.data_.get(), array.data_.get() + array.size_, data_.get()); if (IsVariableDataType(array.element_type_)) { AssertInfo(array.get_offsets_data() != nullptr, "for array with variable length elements, offsets_ptr" "must not be nullptr"); - offsets_ptr_ = new uint32_t[length_]; - std::copy_n(array.get_offsets_data(), array.length(), offsets_ptr_); + offsets_ptr_ = std::make_unique(length_); + std::copy_n( + array.get_offsets_data(), array.length(), offsets_ptr_.get()); } } + friend void + swap(Array& array1, Array& array2) noexcept { + using std::swap; + swap(array1.data_, array2.data_); + swap(array1.length_, array2.length_); + swap(array1.size_, array2.size_); + swap(array1.element_type_, array2.element_type_); + swap(array1.offsets_ptr_, array2.offsets_ptr_); + } + Array& operator=(const Array& array) { - delete[] data_; - if (offsets_ptr_) { - delete[] offsets_ptr_; - } - length_ = array.length_; - size_ = array.size_; - element_type_ = array.element_type_; - data_ = new char[size_]; - std::copy(array.data_, array.data_ + size_, data_); - if (IsVariableDataType(element_type_)) { - AssertInfo(array.get_offsets_data() != nullptr, - "for array with variable length elements, offsets_ptr" - "must not be nullptr"); - offsets_ptr_ = new uint32_t[length_]; - std::copy_n(array.get_offsets_data(), array.length(), offsets_ptr_); - } + Array temp(array); + swap(*this, temp); + return *this; + } + + Array(Array&& other) noexcept : Array() { + swap(*this, other); + } + + Array& + operator=(Array&& other) noexcept { + swap(*this, other); return *this; } @@ -258,7 +261,7 @@ class Array { (index == length_ - 1) ? size_ - offsets_ptr_[length_ - 1] : offsets_ptr_[index + 1] - offsets_ptr_[index]; - return T(data_ + offsets_ptr_[index], element_length); + return T(data_.get() + offsets_ptr_[index], element_length); } if constexpr (std::is_same_v || std::is_same_v || std::is_same_v || std::is_same_v || @@ -268,32 +271,32 @@ class Array { case DataType::INT16: case DataType::INT32: return static_cast( - reinterpret_cast(data_)[index]); + reinterpret_cast(data_.get())[index]); case DataType::INT64: return static_cast( - reinterpret_cast(data_)[index]); + reinterpret_cast(data_.get())[index]); case DataType::FLOAT: return static_cast( - reinterpret_cast(data_)[index]); + reinterpret_cast(data_.get())[index]); case DataType::DOUBLE: return static_cast( - reinterpret_cast(data_)[index]); + reinterpret_cast(data_.get())[index]); default: PanicInfo(Unsupported, "unsupported element type for array"); } } - return reinterpret_cast(data_)[index]; + return reinterpret_cast(data_.get())[index]; } uint32_t* get_offsets_data() const { - return offsets_ptr_; + return offsets_ptr_.get(); } - ScalarArray + ScalarFieldProto output_data() const { - ScalarArray data_array; + ScalarFieldProto data_array; switch (element_type_) { case DataType::BOOL: { for (int j = 0; j < length_; ++j) { @@ -364,7 +367,7 @@ class Array { const char* data() const { - return data_; + return data_.get(); } bool @@ -442,11 +445,11 @@ class Array { } private: - char* data_{nullptr}; + std::unique_ptr data_{nullptr}; int length_ = 0; int size_ = 0; DataType element_type_ = DataType::NONE; - uint32_t* offsets_ptr_{nullptr}; + std::unique_ptr offsets_ptr_{nullptr}; }; class ArrayView { @@ -528,9 +531,9 @@ class ArrayView { return reinterpret_cast(data_)[index]; } - ScalarArray + ScalarFieldProto output_data() const { - ScalarArray data_array; + ScalarFieldProto data_array; switch (element_type_) { case DataType::BOOL: { for (int j = 0; j < length_; ++j) { diff --git a/internal/core/src/common/Chunk.cpp b/internal/core/src/common/Chunk.cpp index 37c9416eb6..a543dd9661 100644 --- a/internal/core/src/common/Chunk.cpp +++ b/internal/core/src/common/Chunk.cpp @@ -11,9 +11,6 @@ #include #include -#include "common/Array.h" -#include "common/Span.h" -#include "common/Types.h" #include "common/Chunk.h" namespace milvus { diff --git a/internal/core/src/common/Chunk.h b/internal/core/src/common/Chunk.h index 8b8d6ba1cc..163c483105 100644 --- a/internal/core/src/common/Chunk.h +++ b/internal/core/src/common/Chunk.h @@ -21,6 +21,7 @@ #include "arrow/array/array_base.h" #include "arrow/record_batch.h" #include "common/Array.h" +#include "common/VectorArray.h" #include "common/ChunkTarget.h" #include "common/EasyAssert.h" #include "common/FieldDataInterface.h" @@ -231,11 +232,11 @@ using JSONChunk = StringChunk; // create an ArrayChunk for these arrays. The data block might look like this: // // [null_bitmap][offsets_lens][array_data] -// [00000000] [24, 3, 36, 2, 44, 4, 60] [1, 2, 3, 4, 5, 6, 7, 8, 9] +// [00000000] [29, 3, 41, 2, 49, 4, 65] [1, 2, 3, 4, 5, 6, 7, 8, 9] // // For string arrays, the structure is more complex as each string element needs its own offset: // [null_bitmap][offsets_lens][array1_offsets][array1_data][array2_offsets][array2_data][array3_offsets][array3_data] -// [00000000] [24, 3, 48, 2, 64, 4, 96] [0, 5, 11, 16] ["hello", "world", "!"] [0, 3, 6] ["foo", "bar"] [0, 6, 12, 18, 24] ["apple", "orange", "banana", "grape"] +// [00000000] [29, 3, 53, 2, 69, 4, 101] [0, 5, 11, 16] ["hello", "world", "!"] [0, 3, 6] ["foo", "bar"] [0, 6, 12, 18, 24] ["apple", "orange", "banana", "grape"] // // Here, the null_bitmap is empty (indicating no nulls), the offsets_lens array contains pairs of (offset, length) // for each array, and the array_data contains the actual array elements. @@ -331,6 +332,67 @@ class ArrayChunk : public Chunk { uint32_t* offsets_lens_; }; +// A VectorArrayChunk is similar to an ArrayChunk but is specialized for storing arrays of vectors. +// Key differences and characteristics: +// - No Nullability: VectorArrayChunk does not support null values. Unlike ArrayChunk, it does not have a null bitmap. +// - Fixed Vector Dimensions: All vectors within a VectorArrayChunk have the same, fixed dimension, specified at creation. +// However, each row (array of vectors) can contain a variable number of these fixed-dimension vectors. +// +// Due to these characteristics, the data layout is simpler: +// [offsets_lens][all_vector_data_concatenated] +// +// Example: +// Suppose we have a data block containing arrays of vectors [[1, 2, 3], [4, 5, 6], [7, 8, 9]], [[10, 11, 12]], and [[13, 14, 15], [16, 17, 18]], and we want to +// create a VectorArrayChunk for these arrays. The data block might look like this: +// +// [offsets_lens][all_vector_data_concatenated] +// [28, 3, 36, 1, 76, 2, 100] [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18] +class VectorArrayChunk : public Chunk { + public: + VectorArrayChunk(int64_t dim, + int32_t row_nums, + char* data, + uint64_t size, + milvus::DataType element_type) + : Chunk(row_nums, data, size, false), + dim_(dim), + element_type_(element_type) { + offsets_lens_ = reinterpret_cast(data); + } + + VectorArrayView + View(int64_t idx) const { + int idx_off = 2 * idx; + auto offset = offsets_lens_[idx_off]; + auto len = offsets_lens_[idx_off + 1]; + auto next_offset = offsets_lens_[idx_off + 2]; + auto data_ptr = data_ + offset; + return VectorArrayView( + data_ptr, dim_, len, next_offset - offset, element_type_); + } + + std::vector + Views() const { + std::vector views; + views.reserve(row_nums_); + for (int64_t i = 0; i < row_nums_; i++) { + views.emplace_back(View(i)); + } + return views; + } + + const char* + ValueAt(int64_t idx) const override { + PanicInfo(ErrorCode::Unsupported, + "VectorArrayChunk::ValueAt is not supported"); + } + + private: + int64_t dim_; + uint32_t* offsets_lens_; + milvus::DataType element_type_; +}; + class SparseFloatVectorChunk : public Chunk { public: SparseFloatVectorChunk(int32_t row_nums, diff --git a/internal/core/src/common/ChunkWriter.cpp b/internal/core/src/common/ChunkWriter.cpp index 00fc488bbb..9783679653 100644 --- a/internal/core/src/common/ChunkWriter.cpp +++ b/internal/core/src/common/ChunkWriter.cpp @@ -155,15 +155,15 @@ ArrayChunkWriter::write(const arrow::ArrayVector& array_vec) { auto array = std::dynamic_pointer_cast(data); for (int i = 0; i < array->length(); i++) { auto str = array->GetView(i); - ScalarArray scalar_array; + ScalarFieldProto scalar_array; scalar_array.ParseFromArray(str.data(), str.size()); auto arr = Array(scalar_array); size += arr.byte_size(); - arrays.push_back(std::move(arr)); if (is_string) { // element offsets size size += sizeof(uint32_t) * arr.length(); } + arrays.push_back(std::move(arr)); } row_nums_ += array->length(); if (nullable_) { @@ -229,6 +229,78 @@ ArrayChunkWriter::finish() { row_nums_, data, size, element_type_, nullable_); } +// 1. Deserialize VectorFieldProto (proto::schema::VectorField) from arrow::ArrayVector +// where VectorFieldProto is vector array and each element it self is a VectorFieldProto. +// 2. Transform this vector of VectorFieldProto to vector of our local representation of VectorArray. +// 3. the contents of these VectorArray are concatenated in some format in target_. +// See more details for the format in the comments of VectorArrayChunk. +void +VectorArrayChunkWriter::write(const arrow::ArrayVector& arrow_array_vec) { + auto size = 0; + std::vector vector_arrays; + vector_arrays.reserve(arrow_array_vec.size()); + + for (const auto& data : arrow_array_vec) { + auto array = std::dynamic_pointer_cast(data); + for (size_t i = 0; i < array->length(); i++) { + auto str = array->GetView(i); + VectorFieldProto vector_field; + vector_field.ParseFromArray(str.data(), str.size()); + auto arr = VectorArray(vector_field); + size += arr.byte_size(); + vector_arrays.push_back(std::move(arr)); + } + row_nums_ += array->length(); + } + + // offsets + lens + size += sizeof(uint32_t) * (row_nums_ * 2 + 1) + MMAP_ARRAY_PADDING; + if (file_) { + target_ = std::make_shared(*file_, file_offset_); + } else { + target_ = std::make_shared(size); + } + + int offsets_num = row_nums_ + 1; + int len_num = row_nums_; + uint32_t offset_start_pos = + target_->tell() + sizeof(uint32_t) * (offsets_num + len_num); + std::vector offsets(offsets_num); + std::vector lens(len_num); + for (size_t i = 0; i < vector_arrays.size(); i++) { + auto& arr = vector_arrays[i]; + offsets[i] = offset_start_pos; + lens[i] = arr.length(); + offset_start_pos += arr.byte_size(); + } + if (offsets_num > 0) { + offsets[offsets_num - 1] = offset_start_pos; + } + + for (int i = 0; i < offsets.size(); i++) { + if (i == offsets.size() - 1) { + target_->write(&offsets[i], sizeof(uint32_t)); + break; + } + target_->write(&offsets[i], sizeof(uint32_t)); + target_->write(&lens[i], sizeof(uint32_t)); + } + + for (auto& arr : vector_arrays) { + target_->write(arr.data(), arr.byte_size()); + } +} + +std::unique_ptr +VectorArrayChunkWriter::finish() { + char padding[MMAP_ARRAY_PADDING]; + target_->write(padding, MMAP_ARRAY_PADDING); + + auto [data, size] = target_->get(); + return std::make_unique( + dim_, row_nums_, data, size, element_type_); +} + void SparseFloatVectorChunkWriter::write(const arrow::ArrayVector& array_vec) { auto size = 0; @@ -382,6 +454,11 @@ create_chunk(const FieldMeta& field_meta, w = std::make_shared(nullable); break; } + case milvus::DataType::VECTOR_ARRAY: { + w = std::make_shared( + dim, field_meta.get_element_type()); + break; + } default: PanicInfo(Unsupported, "Unsupported data type"); } @@ -486,6 +563,11 @@ create_chunk(const FieldMeta& field_meta, file, file_offset, nullable); break; } + case milvus::DataType::VECTOR_ARRAY: { + w = std::make_shared( + dim, field_meta.get_element_type(), file, file_offset); + break; + } default: PanicInfo(Unsupported, "Unsupported data type"); } diff --git a/internal/core/src/common/ChunkWriter.h b/internal/core/src/common/ChunkWriter.h index 8d767ea087..9d29baaac6 100644 --- a/internal/core/src/common/ChunkWriter.h +++ b/internal/core/src/common/ChunkWriter.h @@ -219,6 +219,31 @@ class ArrayChunkWriter : public ChunkWriterBase { const milvus::DataType element_type_; }; +class VectorArrayChunkWriter : public ChunkWriterBase { + public: + VectorArrayChunkWriter(int64_t dim, const milvus::DataType element_type) + : ChunkWriterBase(false), element_type_(element_type), dim_(dim) { + } + VectorArrayChunkWriter(int64_t dim, + const milvus::DataType element_type, + File& file, + size_t offset) + : ChunkWriterBase(file, offset, false), + element_type_(element_type), + dim_(dim) { + } + + void + write(const arrow::ArrayVector& array_vec) override; + + std::unique_ptr + finish() override; + + private: + const milvus::DataType element_type_; + int64_t dim_; +}; + class SparseFloatVectorChunkWriter : public ChunkWriterBase { public: using ChunkWriterBase::ChunkWriterBase; diff --git a/internal/core/src/common/FieldData.cpp b/internal/core/src/common/FieldData.cpp index 28a0f9e07a..e37957c4b9 100644 --- a/internal/core/src/common/FieldData.cpp +++ b/internal/core/src/common/FieldData.cpp @@ -234,7 +234,7 @@ FieldDataImpl::FillFieldData( std::vector values(element_count); int null_number = 0; for (size_t index = 0; index < element_count; ++index) { - ScalarArray field_data; + ScalarFieldProto field_data; if (array_array->GetString(index) == "") { null_number++; continue; @@ -274,6 +274,22 @@ FieldDataImpl::FillFieldData( } return FillFieldData(values.data(), element_count); } + case DataType::VECTOR_ARRAY: { + auto array_array = + std::dynamic_pointer_cast(array); + std::vector values(element_count); + for (size_t index = 0; index < element_count; ++index) { + VectorFieldProto field_data; + if (array_array->GetString(index) == "") { + PanicInfo(DataTypeInvalid, "empty vector array"); + } + auto success = + field_data.ParseFromString(array_array->GetString(index)); + AssertInfo(success, "parse from string failed"); + values[index] = VectorArray(field_data); + } + return FillFieldData(values.data(), element_count); + } default: { PanicInfo(DataTypeInvalid, GetName() + "::FillFieldData" + @@ -423,6 +439,7 @@ template class FieldDataImpl; template class FieldDataImpl; template class FieldDataImpl; template class FieldDataImpl, true>; +template class FieldDataImpl; FieldDataPtr InitScalarFieldData(const DataType& type, bool nullable, int64_t cap_rows) { diff --git a/internal/core/src/common/FieldData.h b/internal/core/src/common/FieldData.h index 7341953ac7..a10fb64569 100644 --- a/internal/core/src/common/FieldData.h +++ b/internal/core/src/common/FieldData.h @@ -80,6 +80,20 @@ class FieldData : public FieldDataArrayImpl { } }; +template <> +class FieldData : public FieldDataVectorArrayImpl { + public: + explicit FieldData(DataType data_type, int64_t buffered_num_rows = 0) + : FieldDataVectorArrayImpl(data_type, buffered_num_rows) { + } + + int64_t + get_dim() const override { + PanicInfo(Unsupported, + "Call get_dim on FieldData is not supported"); + } +}; + template <> class FieldData : public FieldDataImpl { public: diff --git a/internal/core/src/common/FieldDataInterface.h b/internal/core/src/common/FieldDataInterface.h index 5a1c0a2a9d..7b6e4524cb 100644 --- a/internal/core/src/common/FieldDataInterface.h +++ b/internal/core/src/common/FieldDataInterface.h @@ -34,7 +34,9 @@ #include "common/VectorTrait.h" #include "common/EasyAssert.h" #include "common/Array.h" +#include "common/VectorArray.h" #include "knowhere/dataset.h" +#include "common/TypeTraits.h" namespace milvus { @@ -818,4 +820,32 @@ class FieldDataArrayImpl : public FieldDataImpl { } }; +// is_type_entire_row set be true as each element in data_ is a VectorArray +class FieldDataVectorArrayImpl : public FieldDataImpl { + public: + explicit FieldDataVectorArrayImpl(DataType data_type, + int64_t total_num_rows = 0) + : FieldDataImpl( + 1, data_type, false, total_num_rows) { + } + + int64_t + DataSize() const override { + int64_t data_size = 0; + for (size_t offset = 0; offset < length(); ++offset) { + data_size += data_[offset].byte_size(); + } + return data_size; + } + + int64_t + DataSize(ssize_t offset) const override { + AssertInfo(offset < get_num_rows(), + "field data subscript out of range"); + AssertInfo(offset < length(), + "subscript position don't has valid value"); + return data_[offset].byte_size(); + } +}; + } // namespace milvus diff --git a/internal/core/src/common/FieldMeta.cpp b/internal/core/src/common/FieldMeta.cpp index 909d1c351b..eca2d8c050 100644 --- a/internal/core/src/common/FieldMeta.cpp +++ b/internal/core/src/common/FieldMeta.cpp @@ -81,6 +81,7 @@ FieldMeta::ParseFrom(const milvus::proto::schema::FieldSchema& schema_proto) { } auto data_type = DataType(schema_proto.data_type()); + auto element_type = DataType(schema_proto.element_type()); auto default_value = [&]() -> std::optional { if (!schema_proto.has_default_value()) { @@ -89,6 +90,17 @@ FieldMeta::ParseFrom(const milvus::proto::schema::FieldSchema& schema_proto) { return schema_proto.default_value(); }(); + if (data_type == DataType::VECTOR_ARRAY) { + // todo(SpadeA): revisit the code when index build for vector array is ready + int64_t dim = 0; + auto type_map = RepeatedKeyValToMap(schema_proto.type_params()); + AssertInfo(type_map.count("dim"), "dim not found"); + dim = boost::lexical_cast(type_map.at("dim")); + + return FieldMeta{ + name, field_id, data_type, element_type, dim, std::nullopt}; + } + if (IsVectorDataType(data_type)) { auto type_map = RepeatedKeyValToMap(schema_proto.type_params()); auto index_map = RepeatedKeyValToMap(schema_proto.index_params()); diff --git a/internal/core/src/common/FieldMeta.h b/internal/core/src/common/FieldMeta.h index c5f3be9be2..8f21a8858f 100644 --- a/internal/core/src/common/FieldMeta.h +++ b/internal/core/src/common/FieldMeta.h @@ -126,6 +126,23 @@ class FieldMeta { Assert(!nullable); } + // array of vector type + FieldMeta(FieldName name, + FieldId id, + DataType type, + DataType element_type, + int64_t dim, + std::optional metric_type) + : name_(std::move(name)), + id_(id), + type_(type), + nullable_(false), + element_type_(element_type), + vector_info_(VectorInfo{dim, std::move(metric_type)}) { + Assert(type_ == DataType::VECTOR_ARRAY); + Assert(IsVectorDataType(element_type_)); + } + int64_t get_dim() const { Assert(IsVectorDataType(type_)); @@ -218,7 +235,11 @@ class FieldMeta { "schema"); static const size_t ARRAY_SIZE = 128; static const size_t JSON_SIZE = 512; - if (is_vector()) { + // assume float vector with dim 512, array length 10 + static const size_t VECTOR_ARRAY_SIZE = 512 * 10 * 4; + if (type_ == DataType::VECTOR_ARRAY) { + return VECTOR_ARRAY_SIZE; + } else if (is_vector()) { return GetDataTypeSize(type_, get_dim()); } else if (is_string()) { Assert(string_info_.has_value()); diff --git a/internal/core/src/common/Json.h b/internal/core/src/common/Json.h index be3bccd2ea..0aa91a558f 100644 --- a/internal/core/src/common/Json.h +++ b/internal/core/src/common/Json.h @@ -231,7 +231,8 @@ class Json { : doc().at_pointer(pointer).type(); } - auto get_number_type(const std::string& pointer) const { + auto + get_number_type(const std::string& pointer) const { return pointer.empty() ? doc().get_number_type() : doc().at_pointer(pointer).get_number_type(); } diff --git a/internal/core/src/common/JsonUtils.h b/internal/core/src/common/JsonUtils.h index b1fa2e92e8..8b65bbef1c 100644 --- a/internal/core/src/common/JsonUtils.h +++ b/internal/core/src/common/JsonUtils.h @@ -1,3 +1,14 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + #pragma once #include #include diff --git a/internal/core/src/common/Schema.cpp b/internal/core/src/common/Schema.cpp index bf0da0630d..858830ddc9 100644 --- a/internal/core/src/common/Schema.cpp +++ b/internal/core/src/common/Schema.cpp @@ -40,8 +40,7 @@ Schema::ParseFrom(const milvus::proto::schema::CollectionSchema& schema_proto) { // NOTE: only two system - for (const milvus::proto::schema::FieldSchema& child : - schema_proto.fields()) { + auto process_field = [&schema, &schema_proto](const auto& child) { auto field_id = FieldId(child.fieldid()); auto f = FieldMeta::ParseFrom(child); @@ -59,6 +58,18 @@ Schema::ParseFrom(const milvus::proto::schema::CollectionSchema& schema_proto) { "repetitive dynamic field"); schema->set_dynamic_field_id(field_id); } + }; + + for (const milvus::proto::schema::FieldSchema& child : + schema_proto.fields()) { + process_field(child); + } + + for (const milvus::proto::schema::StructArrayFieldSchema& child : + schema_proto.struct_array_fields()) { + for (const auto& sub_field : child.fields()) { + process_field(sub_field); + } } AssertInfo(schema->get_primary_field_id().has_value(), diff --git a/internal/core/src/common/Schema.h b/internal/core/src/common/Schema.h index 2e4f78fb71..4116a25b81 100644 --- a/internal/core/src/common/Schema.h +++ b/internal/core/src/common/Schema.h @@ -108,6 +108,24 @@ class Schema { return field_id; } + // array of vector type + FieldId + AddDebugVectorArrayField(const std::string& name, + DataType element_type, + int64_t dim, + std::optional metric_type) { + auto field_id = FieldId(debug_id); + debug_id++; + auto field_meta = FieldMeta(FieldName(name), + field_id, + DataType::VECTOR_ARRAY, + element_type, + dim, + metric_type); + this->AddField(std::move(field_meta)); + return field_id; + } + // scalar type void AddField(const FieldName& name, diff --git a/internal/core/src/common/Span.h b/internal/core/src/common/Span.h index f0d2cda4e0..cbd29ab3d5 100644 --- a/internal/core/src/common/Span.h +++ b/internal/core/src/common/Span.h @@ -23,6 +23,7 @@ #include "Types.h" #include "VectorTrait.h" +#include "TypeTraits.h" namespace milvus { // type erasure to work around virtual restriction diff --git a/internal/core/src/common/TypeTraits.h b/internal/core/src/common/TypeTraits.h new file mode 100644 index 0000000000..f3875e52d6 --- /dev/null +++ b/internal/core/src/common/TypeTraits.h @@ -0,0 +1,66 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +#include "Array.h" +#include "Types.h" +#include "VectorArray.h" + +namespace milvus { + +template +constexpr bool IsVector = std::is_base_of_v; + +template +constexpr bool IsScalar = + std::is_fundamental_v || std::is_same_v || + std::is_same_v || std::is_same_v || + std::is_same_v || std::is_same_v || + std::is_same_v; + +template +constexpr bool IsSparse = std::is_same_v || + std::is_same_v>; + +template +constexpr bool IsVariableType = + std::is_same_v || std::is_same_v || + std::is_same_v || std::is_same_v || + std::is_same_v || std::is_same_v || + IsSparse || std::is_same_v || + std::is_same_v; + +template +constexpr bool IsVariableTypeSupportInChunk = + std::is_same_v || std::is_same_v || + std::is_same_v || + std::is_same_v>; + +template +using ChunkViewType = std::conditional_t< + std::is_same_v, + std::string_view, + std::conditional_t, + ArrayView, + std::conditional_t, + VectorArrayView, + T>>>; + +} // namespace milvus diff --git a/internal/core/src/common/Types.h b/internal/core/src/common/Types.h index aac3c316be..cf4918e63d 100644 --- a/internal/core/src/common/Types.h +++ b/internal/core/src/common/Types.h @@ -92,6 +92,7 @@ enum class DataType { VECTOR_BFLOAT16 = 103, VECTOR_SPARSE_FLOAT = 104, VECTOR_INT8 = 105, + VECTOR_ARRAY = 106, }; using Timestamp = uint64_t; // TODO: use TiKV-like timestamp @@ -100,9 +101,9 @@ constexpr auto MAX_ROW_COUNT = std::numeric_limits::max(); using OpType = proto::plan::OpType; using ArithOpType = proto::plan::ArithOpType; -using ScalarArray = proto::schema::ScalarField; +using ScalarFieldProto = proto::schema::ScalarField; using DataArray = proto::schema::FieldData; -using VectorArray = proto::schema::VectorField; +using VectorFieldProto = proto::schema::VectorField; using IdArray = proto::schema::IDs; using InsertRecordProto = proto::segcore::InsertRecord; using PkType = std::variant; @@ -246,6 +247,8 @@ GetDataTypeName(DataType data_type) { return "vector_sparse_float"; case DataType::VECTOR_INT8: return "vector_int8"; + case DataType::VECTOR_ARRAY: + return "vector_array"; default: PanicInfo(DataTypeInvalid, "Unsupported DataType({})", data_type); } @@ -327,7 +330,7 @@ IsJsonDataType(DataType data_type) { inline bool IsArrayDataType(DataType data_type) { - return data_type == DataType::ARRAY; + return data_type == DataType::ARRAY || data_type == DataType::VECTOR_ARRAY; } inline bool @@ -396,10 +399,16 @@ IsFloatVectorDataType(DataType data_type) { IsSparseFloatVectorDataType(data_type); } +inline bool +IsVectorArrayDataType(DataType data_type) { + return data_type == DataType::VECTOR_ARRAY; +} + inline bool IsVectorDataType(DataType data_type) { return IsBinaryVectorDataType(data_type) || - IsFloatVectorDataType(data_type) || IsIntVectorDataType(data_type); + IsFloatVectorDataType(data_type) || IsIntVectorDataType(data_type) || + IsVectorArrayDataType(data_type); } inline bool @@ -651,6 +660,15 @@ struct TypeTraits { static constexpr const char* Name = "VECTOR_FLOAT"; }; +template <> +struct TypeTraits { + using NativeType = void; + static constexpr DataType TypeKind = DataType::VECTOR_ARRAY; + static constexpr bool IsPrimitiveType = false; + static constexpr bool IsFixedWidth = false; + static constexpr const char* Name = "VECTOR_ARRAY"; +}; + inline DataType FromValCase(milvus::proto::plan::GenericValue::ValCase val_case) { switch (val_case) { @@ -735,6 +753,9 @@ struct fmt::formatter : formatter { case milvus::DataType::VECTOR_INT8: name = "VECTOR_INT8"; break; + case milvus::DataType::VECTOR_ARRAY: + name = "VECTOR_ARRAY"; + break; } return formatter::format(name, ctx); } diff --git a/internal/core/src/common/VectorArray.h b/internal/core/src/common/VectorArray.h new file mode 100644 index 0000000000..eb2c69384b --- /dev/null +++ b/internal/core/src/common/VectorArray.h @@ -0,0 +1,352 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +#include "FieldMeta.h" +#include "Types.h" +#include "common/VectorTrait.h" + +namespace milvus { +// Internal representation of proto::schema::VectorField which is recognized as one row +// of data type VECTOR_ARRAY +class VectorArray : public milvus::VectorTrait { + public: + VectorArray() = default; + + ~VectorArray() = default; + + // One row of VectorFieldProto + explicit VectorArray(const VectorFieldProto& vector_field) { + dim_ = vector_field.dim(); + switch (vector_field.data_case()) { + case VectorFieldProto::kFloatVector: { + element_type_ = DataType::VECTOR_FLOAT; + // data size should be array length * dim + length_ = vector_field.float_vector().data().size() / dim_; + auto data = new float[length_ * dim_]; + size_ = + vector_field.float_vector().data().size() * sizeof(float); + std::copy(vector_field.float_vector().data().begin(), + vector_field.float_vector().data().end(), + data); + data_ = std::unique_ptr(reinterpret_cast(data)); + break; + } + default: { + // TODO(SpadeA): add other vector types + PanicInfo(NotImplemented, + "Not implemented vector type: {}", + static_cast(vector_field.data_case())); + } + } + } + + explicit VectorArray(const VectorArray& other) + : VectorArray(other.data_.get(), + other.length_, + other.dim_, + other.size_, + other.element_type_) { + } + + friend void + swap(VectorArray& array1, VectorArray& array2) noexcept { + using std::swap; + swap(array1.data_, array2.data_); + swap(array1.size_, array2.size_); + swap(array1.length_, array2.length_); + swap(array1.dim_, array2.dim_); + swap(array1.element_type_, array2.element_type_); + } + + VectorArray(VectorArray&& other) noexcept : VectorArray() { + swap(*this, other); + } + + VectorArray& + operator=(const VectorArray& other) { + VectorArray temp(other); + swap(*this, temp); + return *this; + } + + VectorArray& + operator=(VectorArray&& other) noexcept { + swap(*this, other); + return *this; + } + + bool + operator==(const VectorArray& other) const { + if (element_type_ != other.element_type_ || length_ != other.length_ || + size_ != other.size_) { + return false; + } + + if (length_ == 0) { + return true; + } + + switch (element_type_) { + case DataType::VECTOR_FLOAT: { + auto* a = reinterpret_cast(data_.get()); + auto* b = reinterpret_cast(other.data_.get()); + return std::equal( + a, a + length_ * dim_, b, [](float x, float y) { + return std::abs(x - y) < 1e-6f; + }); + } + default: { + // TODO(SpadeA): add other vector types + PanicInfo(NotImplemented, + "Not implemented vector type: {}", + static_cast(element_type_)); + } + } + } + + template + VectorElement* + get_data(const int index) const { + AssertInfo(index >= 0 && index < length_, + "index out of range, index={}, length={}", + index, + length_); + switch (element_type_) { + case DataType::VECTOR_FLOAT: { + static_assert(std::is_same_v, + "VectorElement must be float for VECTOR_FLOAT"); + return reinterpret_cast(data_.get()) + + index * dim_; + } + default: { + // TODO(SpadeA): add other vector types + PanicInfo(NotImplemented, + "Not implemented vector type: {}", + static_cast(element_type_)); + } + } + } + + VectorFieldProto + output_data() const { + VectorFieldProto vector_field; + vector_field.set_dim(dim_); + switch (element_type_) { + case DataType::VECTOR_FLOAT: { + auto data = reinterpret_cast(data_.get()); + vector_field.mutable_float_vector()->mutable_data()->Add( + data, data + length_ * dim_); + break; + } + default: { + // TODO(SpadeA): add other vector types + PanicInfo(NotImplemented, + "Not implemented vector type: {}", + static_cast(element_type_)); + } + } + return vector_field; + } + + int + length() const { + return length_; + } + + size_t + byte_size() const { + return size_; + } + + int64_t + dim() const { + return dim_; + } + + DataType + get_element_type() const { + return element_type_; + } + + const char* + data() const { + return data_.get(); + } + + bool + is_same_array(const VectorFieldProto& vector_field) { + switch (element_type_) { + case DataType::VECTOR_FLOAT: { + if (vector_field.data_case() != + VectorFieldProto::kFloatVector) { + return false; + } + + if (length_ != + vector_field.float_vector().data().size() / dim_) { + return false; + } + + if (length_ == 0) { + return true; + } + + const float* a = reinterpret_cast(data_.get()); + const float* b = vector_field.float_vector().data().data(); + return std::equal( + a, a + length_ * dim_, b, [](float x, float y) { + return std::abs(x - y) < 1e-6f; + }); + } + default: { + // TODO(SpadeA): add other vector types + PanicInfo(NotImplemented, + "Not implemented vector type: {}", + static_cast(element_type_)); + } + } + } + + private: + VectorArray( + char* data, int len, int dim, size_t size, DataType element_type) + : size_(size), length_(len), dim_(dim), element_type_(element_type) { + data_ = std::make_unique(size); + std::copy(data, data + size, data_.get()); + } + + int64_t dim_ = 0; + std::unique_ptr data_; + // number of vectors in this array + int length_ = 0; + // size of the array in bytes + int size_ = 0; + DataType element_type_ = DataType::NONE; +}; + +class VectorArrayView { + public: + VectorArrayView() = default; + + VectorArrayView(const VectorArrayView& other) + : VectorArrayView(other.data_, + other.dim_, + other.length_, + other.size_, + other.element_type_) { + } + + VectorArrayView( + char* data, int64_t dim, int len, size_t size, DataType element_type) + : data_(data), + dim_(dim), + length_(len), + size_(size), + element_type_(element_type) { + } + + template + VectorElement* + get_data(const int index) const { + AssertInfo(index >= 0 && index < length_, + "index out of range, index={}, length={}", + index, + length_); + switch (element_type_) { + case DataType::VECTOR_FLOAT: { + static_assert(std::is_same_v, + "VectorElement must be float for VECTOR_FLOAT"); + return reinterpret_cast(data_) + index * dim_; + } + default: { + // TODO(SpadeA): add other vector types. + PanicInfo(NotImplemented, + "Not implemented vector type: {}", + static_cast(element_type_)); + } + } + } + + VectorFieldProto + output_data() const { + VectorFieldProto vector_array; + vector_array.set_dim(dim_); + switch (element_type_) { + case DataType::VECTOR_FLOAT: { + auto data = reinterpret_cast(data_); + vector_array.mutable_float_vector()->mutable_data()->Add( + data, data + length_ * dim_); + break; + } + default: { + // TODO(SpadeA): add other vector types + PanicInfo(NotImplemented, + "Not implemented vector type: {}", + static_cast(element_type_)); + } + } + return vector_array; + } + + bool + is_same_array(const VectorFieldProto& vector_field) { + switch (element_type_) { + case DataType::VECTOR_FLOAT: { + if (vector_field.data_case() != + VectorFieldProto::kFloatVector) { + return false; + } + + if (length_ != + vector_field.float_vector().data().size() / dim_) { + return false; + } + + if (length_ == 0) { + return true; + } + + const float* a = reinterpret_cast(data_); + const float* b = vector_field.float_vector().data().data(); + return std::equal( + a, a + length_ * dim_, b, [](float x, float y) { + return std::abs(x - y) < 1e-6f; + }); + } + default: { + // TODO(SpadeA): add other vector types + PanicInfo(NotImplemented, + "Not implemented vector type: {}", + static_cast(element_type_)); + } + } + } + + private: + char* data_{nullptr}; + int64_t dim_ = 0; + // number of vectors in this array + int length_ = 0; + // size of the array in bytes + int size_ = 0; + DataType element_type_ = DataType::NONE; +}; + +} // namespace milvus \ No newline at end of file diff --git a/internal/core/src/common/VectorTrait.h b/internal/core/src/common/VectorTrait.h index 4e9734177a..a0c4b9bdba 100644 --- a/internal/core/src/common/VectorTrait.h +++ b/internal/core/src/common/VectorTrait.h @@ -136,39 +136,6 @@ class Int8Vector : public VectorTrait { proto::common::PlaceholderType::Int8Vector; }; -template -constexpr bool IsVector = std::is_base_of_v; - -template -constexpr bool IsScalar = - std::is_fundamental_v || std::is_same_v || - std::is_same_v || std::is_same_v || - std::is_same_v || std::is_same_v || - std::is_same_v; - -template -constexpr bool IsSparse = std::is_same_v || - std::is_same_v>; - -template -constexpr bool IsVariableType = - std::is_same_v || std::is_same_v || - std::is_same_v || std::is_same_v || - std::is_same_v || std::is_same_v || - IsSparse; - -template -constexpr bool IsVariableTypeSupportInChunk = - std::is_same_v || std::is_same_v || - std::is_same_v || - std::is_same_v>; - -template -using ChunkViewType = std::conditional_t< - std::is_same_v, - std::string_view, - std::conditional_t, ArrayView, T>>; - struct FundamentalTag {}; struct StringTag {}; diff --git a/internal/core/src/exec/expression/ExistsExpr.cpp b/internal/core/src/exec/expression/ExistsExpr.cpp index ea88c3d113..f2691cf462 100644 --- a/internal/core/src/exec/expression/ExistsExpr.cpp +++ b/internal/core/src/exec/expression/ExistsExpr.cpp @@ -125,8 +125,8 @@ PhyExistsFilterExpr::EvalJsonExistsForDataSegment(EvalCtx& context) { auto pointer = milvus::Json::pointer(expr_->column_.nested_path_); int processed_cursor = 0; auto execute_sub_batch = - [&bitmap_input, - &processed_cursor]( + [&bitmap_input, & + processed_cursor ]( const milvus::Json* data, const bool* valid_data, const int32_t* offsets, @@ -134,23 +134,23 @@ PhyExistsFilterExpr::EvalJsonExistsForDataSegment(EvalCtx& context) { TargetBitmapView res, TargetBitmapView valid_res, const std::string& pointer) { - bool has_bitmap_input = !bitmap_input.empty(); - for (int i = 0; i < size; ++i) { - auto offset = i; - if constexpr (filter_type == FilterType::random) { - offset = (offsets) ? offsets[i] : i; - } - if (valid_data != nullptr && !valid_data[offset]) { - res[i] = valid_res[i] = false; - continue; - } - if (has_bitmap_input && !bitmap_input[processed_cursor + i]) { - continue; - } - res[i] = data[offset].exist(pointer); + bool has_bitmap_input = !bitmap_input.empty(); + for (int i = 0; i < size; ++i) { + auto offset = i; + if constexpr (filter_type == FilterType::random) { + offset = (offsets) ? offsets[i] : i; } - processed_cursor += size; - }; + if (valid_data != nullptr && !valid_data[offset]) { + res[i] = valid_res[i] = false; + continue; + } + if (has_bitmap_input && !bitmap_input[processed_cursor + i]) { + continue; + } + res[i] = data[offset].exist(pointer); + } + processed_cursor += size; + }; int64_t processed_size; if (has_offset_input_) { diff --git a/internal/core/src/exec/expression/UnaryExpr.cpp b/internal/core/src/exec/expression/UnaryExpr.cpp index fa389609c8..8e62a54220 100644 --- a/internal/core/src/exec/expression/UnaryExpr.cpp +++ b/internal/core/src/exec/expression/UnaryExpr.cpp @@ -318,9 +318,8 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplArray(EvalCtx& context) { } int processed_cursor = 0; auto execute_sub_batch = - [op_type, - &processed_cursor, - &bitmap_input]( + [ op_type, &processed_cursor, & + bitmap_input ]( const milvus::ArrayView* data, const bool* valid_data, const int32_t* offsets, @@ -329,186 +328,185 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplArray(EvalCtx& context) { TargetBitmapView valid_res, ValueType val, int index) { - switch (op_type) { - case proto::plan::GreaterThan: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - bitmap_input, - processed_cursor, - offsets); - break; - } - case proto::plan::GreaterEqual: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - bitmap_input, - processed_cursor, - offsets); - break; - } - case proto::plan::LessThan: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - bitmap_input, - processed_cursor, - offsets); - break; - } - case proto::plan::LessEqual: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - bitmap_input, - processed_cursor, - offsets); - break; - } - case proto::plan::Equal: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - bitmap_input, - processed_cursor, - offsets); - break; - } - case proto::plan::NotEqual: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - bitmap_input, - processed_cursor, - offsets); - break; - } - case proto::plan::PrefixMatch: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - bitmap_input, - processed_cursor, - offsets); - break; - } - case proto::plan::Match: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - bitmap_input, - processed_cursor, - offsets); - break; - } - case proto::plan::PostfixMatch: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - bitmap_input, - processed_cursor, - offsets); - break; - } - case proto::plan::InnerMatch: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - bitmap_input, - processed_cursor, - offsets); - break; - } - default: - PanicInfo( - OpTypeInvalid, - fmt::format( - "unsupported operator type for unary expr: {}", - op_type)); + switch (op_type) { + case proto::plan::GreaterThan: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + bitmap_input, + processed_cursor, + offsets); + break; } - processed_cursor += size; - }; + case proto::plan::GreaterEqual: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + bitmap_input, + processed_cursor, + offsets); + break; + } + case proto::plan::LessThan: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + bitmap_input, + processed_cursor, + offsets); + break; + } + case proto::plan::LessEqual: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + bitmap_input, + processed_cursor, + offsets); + break; + } + case proto::plan::Equal: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + bitmap_input, + processed_cursor, + offsets); + break; + } + case proto::plan::NotEqual: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + bitmap_input, + processed_cursor, + offsets); + break; + } + case proto::plan::PrefixMatch: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + bitmap_input, + processed_cursor, + offsets); + break; + } + case proto::plan::Match: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + bitmap_input, + processed_cursor, + offsets); + break; + } + case proto::plan::PostfixMatch: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + bitmap_input, + processed_cursor, + offsets); + break; + } + case proto::plan::InnerMatch: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + bitmap_input, + processed_cursor, + offsets); + break; + } + default: + PanicInfo( + OpTypeInvalid, + fmt::format("unsupported operator type for unary expr: {}", + op_type)); + } + processed_cursor += size; + }; int64_t processed_size; if (has_offset_input_) { processed_size = @@ -708,18 +706,16 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJson(EvalCtx& context) { } while (false) int processed_cursor = 0; - auto execute_sub_batch = [op_type, - pointer, - &processed_cursor, - &bitmap_input]( - const milvus::Json* data, - const bool* valid_data, - const int32_t* offsets, - const int size, - TargetBitmapView res, - TargetBitmapView valid_res, - ExprValueType val) { + auto execute_sub_batch = + [ op_type, pointer, &processed_cursor, & + bitmap_input ]( + const milvus::Json* data, + const bool* valid_data, + const int32_t* offsets, + const int size, + TargetBitmapView res, + TargetBitmapView valid_res, + ExprValueType val) { bool has_bitmap_input = !bitmap_input.empty(); switch (op_type) { case proto::plan::GreaterThan: { @@ -1679,17 +1675,16 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplForData(EvalCtx& context) { auto expr_type = expr_->op_type_; size_t processed_cursor = 0; - auto execute_sub_batch = [expr_type, - &processed_cursor, - &bitmap_input]( - const T* data, - const bool* valid_data, - const int32_t* offsets, - const int size, - TargetBitmapView res, - TargetBitmapView valid_res, - IndexInnerType val) { + auto execute_sub_batch = + [ expr_type, &processed_cursor, & + bitmap_input ]( + const T* data, + const bool* valid_data, + const int32_t* offsets, + const int size, + TargetBitmapView res, + TargetBitmapView valid_res, + IndexInnerType val) { switch (expr_type) { case proto::plan::GreaterThan: { UnaryElementFunc func; diff --git a/internal/core/src/index/IndexFactory.h b/internal/core/src/index/IndexFactory.h index 9d3058efb1..86851c5bc9 100644 --- a/internal/core/src/index/IndexFactory.h +++ b/internal/core/src/index/IndexFactory.h @@ -109,13 +109,13 @@ class IndexFactory { storage::FileManagerContext()); IndexBasePtr - CreateJsonIndex(IndexType index_type, - JsonCastType cast_dtype, - const std::string& nested_path, - const storage::FileManagerContext& file_manager_context = - storage::FileManagerContext(), - const std::string& json_cast_function = - UNKNOW_CAST_FUNCTION_NAME); + CreateJsonIndex( + IndexType index_type, + JsonCastType cast_dtype, + const std::string& nested_path, + const storage::FileManagerContext& file_manager_context = + storage::FileManagerContext(), + const std::string& json_cast_function = UNKNOW_CAST_FUNCTION_NAME); IndexBasePtr CreateScalarIndex(const CreateIndexInfo& create_index_info, diff --git a/internal/core/src/indexbuilder/IndexFactory.h b/internal/core/src/indexbuilder/IndexFactory.h index 289ad320e1..8e840afeb7 100644 --- a/internal/core/src/indexbuilder/IndexFactory.h +++ b/internal/core/src/indexbuilder/IndexFactory.h @@ -72,6 +72,10 @@ class IndexFactory { case DataType::VECTOR_INT8: return std::make_unique(type, config, context); + case DataType::VECTOR_ARRAY: + PanicInfo(DataTypeInvalid, + fmt::format("VECTOR_ARRAY is not implemented")); + default: PanicInfo(DataTypeInvalid, fmt::format("invalid type is {}", invalid_dtype_msg)); diff --git a/internal/core/src/mmap/ChunkVector.h b/internal/core/src/mmap/ChunkVector.h index 6e446f281a..6b2263643c 100644 --- a/internal/core/src/mmap/ChunkVector.h +++ b/internal/core/src/mmap/ChunkVector.h @@ -20,6 +20,7 @@ #include "mmap/ChunkData.h" #include "storage/MmapManager.h" #include "segcore/SegcoreConfig.h" +#include "common/TypeTraits.h" namespace milvus { template @@ -131,6 +132,13 @@ class ThreadSafeChunkVector : public ChunkVectorBase { src.byte_size(), src.get_element_type(), src.get_offsets_data()); + } else if constexpr (std::is_same_v) { + auto& src = chunk[chunk_offset]; + return VectorArrayView(const_cast(src.data()), + src.dim(), + src.length(), + src.byte_size(), + src.get_element_type()); } else { return chunk[chunk_offset]; } diff --git a/internal/core/src/mmap/ChunkedColumn.h b/internal/core/src/mmap/ChunkedColumn.h index 6349d6671f..5982f6ed3f 100644 --- a/internal/core/src/mmap/ChunkedColumn.h +++ b/internal/core/src/mmap/ChunkedColumn.h @@ -35,7 +35,6 @@ #include "common/EasyAssert.h" #include "common/FieldMeta.h" #include "common/Span.h" -#include "common/Array.h" #include "segcore/storagev1translator/ChunkTranslator.h" #include "cachinglayer/Translator.h" #include "mmap/ChunkedColumnInterface.h" @@ -190,6 +189,13 @@ class ChunkedColumnBase : public ChunkedColumnInterface { "ArrayViews only supported for ArrayChunkedColumn"); } + PinWrapper> + VectorArrayViews(int64_t chunk_id) const override { + PanicInfo( + ErrorCode::Unsupported, + "VectorArrayViews only supported for ChunkedVectorArrayColumn"); + } + PinWrapper, FixedVector>> ViewsByOffsets(int64_t chunk_id, const FixedVector& offsets) const override { @@ -375,7 +381,7 @@ class ChunkedArrayColumn : public ChunkedColumnBase { } void - BulkArrayAt(std::function fn, + BulkArrayAt(std::function fn, const int64_t* offsets, int64_t count) const override { auto [cids, offsets_in_chunk] = ToChunkIdAndOffset(offsets, count); @@ -400,6 +406,39 @@ class ChunkedArrayColumn : public ChunkedColumnBase { } }; +class ChunkedVectorArrayColumn : public ChunkedColumnBase { + public: + explicit ChunkedVectorArrayColumn( + std::unique_ptr> translator, + const FieldMeta& field_meta) + : ChunkedColumnBase(std::move(translator), field_meta) { + } + + void + BulkVectorArrayAt(std::function fn, + const int64_t* offsets, + int64_t count) const override { + auto [cids, offsets_in_chunk] = ToChunkIdAndOffset(offsets, count); + auto ca = SemiInlineGet(slot_->PinCells(cids)); + for (int64_t i = 0; i < count; i++) { + auto array = + static_cast(ca->get_cell_of(cids[i])) + ->View(offsets_in_chunk[i]) + .output_data(); + fn(std::move(array), i); + } + } + + PinWrapper> + VectorArrayViews(int64_t chunk_id) const override { + auto ca = + SemiInlineGet(slot_->PinCells({static_cast(chunk_id)})); + auto chunk = ca->get_cell_of(chunk_id); + return PinWrapper>( + ca, static_cast(chunk)->Views()); + } +}; + inline std::shared_ptr MakeChunkedColumnBase(DataType data_type, std::unique_ptr> translator, @@ -421,6 +460,12 @@ MakeChunkedColumnBase(DataType data_type, field_meta)); } + if (ChunkedColumnInterface::IsChunkedVectorArrayColumnDataType(data_type)) { + return std::static_pointer_cast( + std::make_shared(std::move(translator), + field_meta)); + } + return std::static_pointer_cast( std::make_shared(std::move(translator), field_meta)); } diff --git a/internal/core/src/mmap/ChunkedColumnGroup.h b/internal/core/src/mmap/ChunkedColumnGroup.h index 47c812f5cd..7be89a98c3 100644 --- a/internal/core/src/mmap/ChunkedColumnGroup.h +++ b/internal/core/src/mmap/ChunkedColumnGroup.h @@ -29,12 +29,10 @@ #include "cachinglayer/Translator.h" #include "cachinglayer/Utils.h" -#include "common/Array.h" #include "common/Chunk.h" #include "common/GroupChunk.h" #include "common/EasyAssert.h" #include "common/Span.h" -#include "common/Array.h" #include "mmap/ChunkedColumnInterface.h" #include "segcore/storagev2translator/GroupCTMeta.h" @@ -261,6 +259,20 @@ class ProxyChunkColumn : public ChunkedColumnInterface { static_cast(chunk.get())->Views(offset_len)); } + PinWrapper> + VectorArrayViews(int64_t chunk_id) const override { + if (!IsChunkedVectorArrayColumnDataType(data_type_)) { + PanicInfo( + ErrorCode::Unsupported, + "VectorArrayViews only supported for ChunkedVectorArrayColumn"); + } + auto chunk_wrapper = group_->GetGroupChunk(chunk_id); + auto chunk = chunk_wrapper.get()->GetChunk(field_id_); + return PinWrapper>( + chunk_wrapper, + static_cast(chunk.get())->Views()); + } + PinWrapper, FixedVector>> ViewsByOffsets(int64_t chunk_id, const FixedVector& offsets) const override { @@ -402,7 +414,7 @@ class ProxyChunkColumn : public ChunkedColumnInterface { } void - BulkArrayAt(std::function fn, + BulkArrayAt(std::function fn, const int64_t* offsets, int64_t count) const override { if (!IsChunkedArrayColumnDataType(data_type_)) { @@ -421,6 +433,27 @@ class ProxyChunkColumn : public ChunkedColumnInterface { } } + void + BulkVectorArrayAt(std::function fn, + const int64_t* offsets, + int64_t count) const override { + if (!IsChunkedVectorArrayColumnDataType(data_type_)) { + PanicInfo(ErrorCode::Unsupported, + "BulkVectorArrayAt only supported for " + "ChunkedVectorArrayColumn"); + } + auto [cids, offsets_in_chunk] = ToChunkIdAndOffset(offsets, count); + auto ca = group_->GetGroupChunks(cids); + for (int64_t i = 0; i < count; i++) { + auto* group_chunk = ca->get_cell_of(cids[i]); + auto chunk = group_chunk->GetChunk(field_id_); + auto array = static_cast(chunk.get()) + ->View(offsets_in_chunk[i]) + .output_data(); + fn(std::move(array), i); + } + } + private: std::shared_ptr group_; FieldId field_id_; diff --git a/internal/core/src/mmap/ChunkedColumnInterface.h b/internal/core/src/mmap/ChunkedColumnInterface.h index 4740c325bb..87e4892944 100644 --- a/internal/core/src/mmap/ChunkedColumnInterface.h +++ b/internal/core/src/mmap/ChunkedColumnInterface.h @@ -81,6 +81,9 @@ class ChunkedColumnInterface { ArrayViews(int64_t chunk_id, std::optional> offset_len) const = 0; + virtual PinWrapper> + VectorArrayViews(int64_t chunk_id) const = 0; + virtual PinWrapper< std::pair, FixedVector>> ViewsByOffsets(int64_t chunk_id, @@ -129,13 +132,22 @@ class ChunkedColumnInterface { } virtual void - BulkArrayAt(std::function fn, + BulkArrayAt(std::function fn, const int64_t* offsets, int64_t count) const { PanicInfo(ErrorCode::Unsupported, "BulkArrayAt only supported for ChunkedArrayColumn"); } + virtual void + BulkVectorArrayAt(std::function fn, + const int64_t* offsets, + int64_t count) const { + PanicInfo( + ErrorCode::Unsupported, + "BulkVectorArrayAt only supported for ChunkedVectorArrayColumn"); + } + static bool IsChunkedVariableColumnDataType(DataType data_type) { return data_type == DataType::STRING || @@ -148,6 +160,11 @@ class ChunkedColumnInterface { return data_type == DataType::ARRAY; } + static bool + IsChunkedVectorArrayColumnDataType(DataType data_type) { + return data_type == DataType::VECTOR_ARRAY; + } + static bool IsChunkedColumnDataType(DataType data_type) { return !IsChunkedVariableColumnDataType(data_type) && diff --git a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp index 1bbd4b3801..3fa68ac20e 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp +++ b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp @@ -90,6 +90,10 @@ ChunkedSegmentSealedImpl::LoadIndex(const LoadIndexInfo& info) { auto field_id = FieldId(info.field_id); auto& field_meta = schema_->operator[](field_id); + if (field_meta.get_data_type() == DataType::VECTOR_ARRAY) { + PanicInfo(DataTypeInvalid, "VECTOR_ARRAY is not implemented"); + } + if (field_meta.is_vector()) { LoadVecIndex(info); } else { @@ -461,6 +465,11 @@ ChunkedSegmentSealedImpl::AddFieldDataInfoForSealed( int64_t ChunkedSegmentSealedImpl::num_chunk_index(FieldId field_id) const { auto& field_meta = schema_->operator[](field_id); + + if (field_meta.get_data_type() == DataType::VECTOR_ARRAY) { + PanicInfo(DataTypeInvalid, "VECTOR_ARRAY is not implemented"); + } + if (field_meta.is_vector()) { return int64_t(vector_indexings_.is_ready(field_id)); } @@ -1043,8 +1052,23 @@ ChunkedSegmentSealedImpl::bulk_subscript_array_impl( const int64_t* seg_offsets, int64_t count, google::protobuf::RepeatedPtrField* dst) { - column->BulkArrayAt( - [dst](ScalarArray&& array, size_t i) { dst->at(i) = std::move(array); }, + column->BulkArrayAt([dst](ScalarFieldProto&& array, + size_t i) { dst->at(i) = std::move(array); }, + seg_offsets, + count); +} + +template +void +ChunkedSegmentSealedImpl::bulk_subscript_vector_array_impl( + const ChunkedColumnInterface* column, + const int64_t* seg_offsets, + int64_t count, + google::protobuf::RepeatedPtrField* dst) { + column->BulkVectorArrayAt( + [dst](VectorFieldProto&& array, size_t i) { + dst->at(i) = std::move(array); + }, seg_offsets, count); } @@ -1090,9 +1114,9 @@ ChunkedSegmentSealedImpl::fill_with_empty(FieldId field_id, int64_t count) const { auto& field_meta = schema_->operator[](field_id); if (IsVectorDataType(field_meta.get_data_type())) { - return CreateVectorDataArray(count, field_meta); + return CreateEmptyVectorDataArray(count, field_meta); } - return CreateScalarDataArray(count, field_meta); + return CreateEmptyScalarDataArray(count, field_meta); } void @@ -1367,6 +1391,14 @@ ChunkedSegmentSealedImpl::get_raw_data(FieldId field_id, ret->mutable_vectors()->mutable_int8_vector()->data()); break; } + case DataType::VECTOR_ARRAY: { + bulk_subscript_vector_array_impl( + column.get(), + seg_offsets, + count, + ret->mutable_vectors()->mutable_vector_array()->mutable_data()); + break; + } default: { PanicInfo(DataTypeInvalid, fmt::format("unsupported data type {}", @@ -1962,6 +1994,11 @@ ChunkedSegmentSealedImpl::fill_empty_field(const FieldMeta& field_meta) { field_meta); break; } + case milvus::DataType::VECTOR_ARRAY: { + column = std::make_shared( + std::move(translator), field_meta); + break; + } default: { column = std::make_shared(std::move(translator), field_meta); diff --git a/internal/core/src/segcore/ChunkedSegmentSealedImpl.h b/internal/core/src/segcore/ChunkedSegmentSealedImpl.h index 578e458395..1232fd069d 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedImpl.h +++ b/internal/core/src/segcore/ChunkedSegmentSealedImpl.h @@ -318,6 +318,14 @@ class ChunkedSegmentSealedImpl : public SegmentSealed { int64_t count, google::protobuf::RepeatedPtrField* dst); + template + static void + bulk_subscript_vector_array_impl( + const ChunkedColumnInterface* column, + const int64_t* seg_offsets, + int64_t count, + google::protobuf::RepeatedPtrField* dst); + static void bulk_subscript_impl(int64_t element_sizeof, ChunkedColumnInterface* field, diff --git a/internal/core/src/segcore/ConcurrentVector.cpp b/internal/core/src/segcore/ConcurrentVector.cpp index dd8318d2c1..ba9e61193b 100644 --- a/internal/core/src/segcore/ConcurrentVector.cpp +++ b/internal/core/src/segcore/ConcurrentVector.cpp @@ -47,6 +47,14 @@ VectorBase::set_data_raw(ssize_t element_offset, } else if (field_meta.get_data_type() == DataType::VECTOR_INT8) { return set_data_raw( element_offset, VEC_FIELD_DATA(data, int8), element_count); + } else if (field_meta.get_data_type() == DataType::VECTOR_ARRAY) { + auto& vector_array = data->vectors().vector_array().data(); + std::vector data_raw{}; + data_raw.reserve(vector_array.size()); + for (auto& e : vector_array) { + data_raw.emplace_back(VectorArray(e)); + } + return set_data_raw(element_offset, data_raw.data(), element_count); } else { PanicInfo(DataTypeInvalid, "unsupported vector type"); } diff --git a/internal/core/src/segcore/ConcurrentVector.h b/internal/core/src/segcore/ConcurrentVector.h index d78c2a9652..845cbae07b 100644 --- a/internal/core/src/segcore/ConcurrentVector.h +++ b/internal/core/src/segcore/ConcurrentVector.h @@ -200,7 +200,9 @@ class ConcurrentVectorImpl : public VectorBase { SpanBase get_span_base(int64_t chunk_id) const override { - if constexpr (is_type_entire_row) { + if constexpr (std::is_same_v) { + PanicInfo(NotImplemented, "unimplemented"); + } else if constexpr (is_type_entire_row) { return chunks_ptr_->get_span(chunk_id); } else if constexpr (std::is_same_v || // NOLINT std::is_same_v) { @@ -272,7 +274,9 @@ class ConcurrentVectorImpl : public VectorBase { int64_t get_element_size() const override { - if constexpr (is_type_entire_row) { + if constexpr (std::is_same_v) { + PanicInfo(NotImplemented, "unimplemented"); + } else if constexpr (is_type_entire_row) { return chunks_ptr_->get_element_size(); } else if constexpr (std::is_same_v || // NOLINT std::is_same_v) { @@ -484,6 +488,20 @@ class ConcurrentVector : public ConcurrentVectorImpl { } }; +template <> +class ConcurrentVector + : public ConcurrentVectorImpl { + public: + explicit ConcurrentVector( + int64_t dim /* not use it*/, + int64_t size_per_chunk, + storage::MmapChunkDescriptorPtr mmap_descriptor = nullptr, + ThreadSafeValidDataPtr valid_data_ptr = nullptr) + : ConcurrentVectorImpl::ConcurrentVectorImpl( + 1, size_per_chunk, std::move(mmap_descriptor), valid_data_ptr) { + } +}; + template <> class ConcurrentVector : public ConcurrentVectorImpl, true> { diff --git a/internal/core/src/segcore/InsertRecord.h b/internal/core/src/segcore/InsertRecord.h index 5a262c8e34..298e308e9a 100644 --- a/internal/core/src/segcore/InsertRecord.h +++ b/internal/core/src/segcore/InsertRecord.h @@ -563,6 +563,12 @@ struct InsertRecord : public InsertRecord { size_per_chunk, dense_vec_mmap_descriptor); return; + } else if (field_meta.get_data_type() == DataType::VECTOR_ARRAY) { + this->append_data(field_id, + field_meta.get_dim(), + size_per_chunk, + dense_vec_mmap_descriptor); + return; } else { PanicInfo(DataTypeInvalid, fmt::format("unsupported vector type", diff --git a/internal/core/src/segcore/SegmentGrowingImpl.cpp b/internal/core/src/segcore/SegmentGrowingImpl.cpp index e9e125ea88..4f82ea4c2d 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.cpp +++ b/internal/core/src/segcore/SegmentGrowingImpl.cpp @@ -42,6 +42,7 @@ #include "storage/RemoteChunkManagerSingleton.h" #include "storage/Util.h" #include "storage/ThreadPools.h" +#include "common/TypeTraits.h" #include "milvus-storage/format/parquet/file_reader.h" #include "milvus-storage/filesystem/fs.h" @@ -644,7 +645,7 @@ SegmentGrowingImpl::bulk_subscript( Assert(!dynamic_field_names.empty()); auto& field_meta = schema_->operator[](field_id); auto vec_ptr = insert_record_.get_data_base(field_id); - auto result = CreateScalarDataArray(count, field_meta); + auto result = CreateEmptyScalarDataArray(count, field_meta); if (field_meta.is_nullable()) { auto valid_data_ptr = insert_record_.get_valid_data(field_id); auto res = result->mutable_valid_data()->mutable_data(); @@ -671,7 +672,7 @@ SegmentGrowingImpl::bulk_subscript(FieldId field_id, auto& field_meta = schema_->operator[](field_id); auto vec_ptr = insert_record_.get_data_base(field_id); if (field_meta.is_vector()) { - auto result = CreateVectorDataArray(count, field_meta); + auto result = CreateEmptyVectorDataArray(count, field_meta); if (field_meta.get_data_type() == DataType::VECTOR_FLOAT) { bulk_subscript_impl(field_id, field_meta.get_sizeof(), @@ -724,6 +725,13 @@ SegmentGrowingImpl::bulk_subscript(FieldId field_id, seg_offsets, count, result->mutable_vectors()->mutable_int8_vector()->data()); + } else if (field_meta.get_data_type() == DataType::VECTOR_ARRAY) { + bulk_subscript_vector_array_impl(*vec_ptr, + seg_offsets, + count, + result->mutable_vectors() + ->mutable_vector_array() + ->mutable_data()); } else { PanicInfo(DataTypeInvalid, "logical error"); } @@ -733,7 +741,7 @@ SegmentGrowingImpl::bulk_subscript(FieldId field_id, AssertInfo(!field_meta.is_vector(), "Scalar field meta type is vector type"); - auto result = CreateScalarDataArray(count, field_meta); + auto result = CreateEmptyScalarDataArray(count, field_meta); if (field_meta.is_nullable()) { auto valid_data_ptr = insert_record_.get_valid_data(field_id); auto res = result->mutable_valid_data()->mutable_data(); @@ -992,6 +1000,24 @@ SegmentGrowingImpl::bulk_subscript_array_impl( } } +template +void +SegmentGrowingImpl::bulk_subscript_vector_array_impl( + const VectorBase& vec_raw, + const int64_t* seg_offsets, + int64_t count, + google::protobuf::RepeatedPtrField* dst) const { + auto vec_ptr = dynamic_cast*>(&vec_raw); + AssertInfo(vec_ptr, "Pointer of vec_raw is nullptr"); + auto& vec = *vec_ptr; + for (int64_t i = 0; i < count; ++i) { + auto offset = seg_offsets[i]; + if (offset != INVALID_SEG_OFFSET) { + dst->at(i) = vec[offset].output_data(); + } + } +} + void SegmentGrowingImpl::bulk_subscript(SystemFieldType system_type, const int64_t* seg_offsets, diff --git a/internal/core/src/segcore/SegmentGrowingImpl.h b/internal/core/src/segcore/SegmentGrowingImpl.h index 46e7a5f7a0..aa3b3243ba 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.h +++ b/internal/core/src/segcore/SegmentGrowingImpl.h @@ -218,6 +218,15 @@ class SegmentGrowingImpl : public SegmentGrowing { int64_t count, google::protobuf::RepeatedPtrField* dst) const; + // for vector array vectors + template + void + bulk_subscript_vector_array_impl( + const VectorBase& vec_raw, + const int64_t* seg_offsets, + int64_t count, + google::protobuf::RepeatedPtrField* dst) const; + template void bulk_subscript_impl(FieldId field_id, diff --git a/internal/core/src/segcore/SegmentInterface.cpp b/internal/core/src/segcore/SegmentInterface.cpp index a611943390..891bbae743 100644 --- a/internal/core/src/segcore/SegmentInterface.cpp +++ b/internal/core/src/segcore/SegmentInterface.cpp @@ -214,6 +214,7 @@ SegmentInternalInterface::FillTargetEntry( } else { col = bulk_subscript(field_id, offsets, size); } + // todo(SpadeA): consider vector array? if (field_meta.get_data_type() == DataType::ARRAY) { col->mutable_scalars()->mutable_array_data()->set_element_type( proto::schema::DataType(field_meta.get_element_type())); @@ -435,7 +436,7 @@ SegmentInternalInterface::bulk_subscript_not_exist_field( fmt::format("unsupported added field type {}", field_meta.get_data_type())); } - auto result = CreateScalarDataArray(count, field_meta); + auto result = CreateEmptyScalarDataArray(count, field_meta); if (field_meta.default_value().has_value()) { auto res = result->mutable_valid_data()->mutable_data(); for (int64_t i = 0; i < count; ++i) { diff --git a/internal/core/src/segcore/Utils.cpp b/internal/core/src/segcore/Utils.cpp index 171aec2170..4b6bfb7cd6 100644 --- a/internal/core/src/segcore/Utils.cpp +++ b/internal/core/src/segcore/Utils.cpp @@ -216,6 +216,23 @@ GetRawDataSizeOfDataArray(const DataArray* data, result += data->vectors().sparse_float_vector().ByteSizeLong(); break; } + case DataType::VECTOR_ARRAY: { + auto& obj = data->vectors().vector_array().data(); + switch (field_meta.get_element_type()) { + case DataType::VECTOR_FLOAT: { + for (auto& e : obj) { + result += e.float_vector().ByteSizeLong(); + } + break; + } + default: { + PanicInfo(NotImplemented, + fmt::format("not implemented vector type {}", + field_meta.get_element_type())); + } + } + break; + } default: { PanicInfo( DataTypeInvalid, @@ -231,7 +248,7 @@ GetRawDataSizeOfDataArray(const DataArray* data, // modify bulk script implement to make process more clear std::unique_ptr -CreateScalarDataArray(int64_t count, const FieldMeta& field_meta) { +CreateEmptyScalarDataArray(int64_t count, const FieldMeta& field_meta) { auto data_type = field_meta.get_data_type(); auto data_array = std::make_unique(); data_array->set_field_id(field_meta.get_id().get()); @@ -317,7 +334,7 @@ CreateScalarDataArray(int64_t count, const FieldMeta& field_meta) { } std::unique_ptr -CreateVectorDataArray(int64_t count, const FieldMeta& field_meta) { +CreateEmptyVectorDataArray(int64_t count, const FieldMeta& field_meta) { auto data_type = field_meta.get_data_type(); auto data_array = std::make_unique(); data_array->set_field_id(field_meta.get_id().get()); @@ -367,6 +384,16 @@ CreateVectorDataArray(int64_t count, const FieldMeta& field_meta) { obj->resize(length); break; } + case DataType::VECTOR_ARRAY: { + auto obj = vector_array->mutable_vector_array(); + obj->set_element_type(static_cast( + field_meta.get_element_type())); + obj->mutable_data()->Reserve(count); + for (int i = 0; i < count; i++) { + *(obj->mutable_data()->Add()) = proto::schema::VectorField(); + } + break; + } default: { PanicInfo(DataTypeInvalid, fmt::format("unsupported datatype {}", data_type)); @@ -453,7 +480,7 @@ CreateScalarDataArrayFrom(const void* data_raw, break; } case DataType::ARRAY: { - auto data = reinterpret_cast(data_raw); + auto data = reinterpret_cast(data_raw); auto obj = scalar_array->mutable_array_data(); obj->set_element_type(static_cast( field_meta.get_element_type())); @@ -538,6 +565,28 @@ CreateVectorDataArrayFrom(const void* data_raw, obj->assign(data, length * sizeof(int8)); break; } + case DataType::VECTOR_ARRAY: { + auto data = reinterpret_cast(data_raw); + auto vector_type = field_meta.get_element_type(); + switch (vector_type) { + case DataType::VECTOR_FLOAT: { + auto obj = vector_array->mutable_vector_array(); + obj->set_element_type( + milvus::proto::schema::DataType::FloatVector); + obj->set_dim(dim); + for (auto i = 0; i < count; i++) { + *(obj->mutable_data()->Add()) = data[i]; + } + break; + } + default: { + PanicInfo(NotImplemented, + fmt::format("not implemented vector type {}", + vector_type)); + } + } + break; + } default: { PanicInfo(DataTypeInvalid, fmt::format("unsupported datatype {}", data_type)); @@ -553,7 +602,7 @@ CreateDataArrayFrom(const void* data_raw, const FieldMeta& field_meta) { auto data_type = field_meta.get_data_type(); - if (!IsVectorDataType(data_type)) { + if (!IsVectorDataType(data_type) && data_type != DataType::VECTOR_ARRAY) { return CreateScalarDataArrayFrom( data_raw, valid_data, count, field_meta); } @@ -619,6 +668,8 @@ MergeDataArray(std::vector& merge_bases, auto data = VEC_FIELD_DATA(src_field_data, int8); auto obj = vector_array->mutable_int8_vector(); obj->assign(data, dim * sizeof(int8)); + } else if (field_meta.get_data_type() == DataType::VECTOR_ARRAY) { + PanicInfo(DataTypeInvalid, "VECTOR_ARRAY is not implemented"); } else { PanicInfo(DataTypeInvalid, fmt::format("unsupported datatype {}", data_type)); diff --git a/internal/core/src/segcore/Utils.h b/internal/core/src/segcore/Utils.h index de27824709..a99e3fca64 100644 --- a/internal/core/src/segcore/Utils.h +++ b/internal/core/src/segcore/Utils.h @@ -48,10 +48,10 @@ GetRawDataSizeOfDataArray(const DataArray* data, // Note: this is temporary solution. // modify bulk script implement to make process more clear std::unique_ptr -CreateScalarDataArray(int64_t count, const FieldMeta& field_meta); +CreateEmptyScalarDataArray(int64_t count, const FieldMeta& field_meta); std::unique_ptr -CreateVectorDataArray(int64_t count, const FieldMeta& field_meta); +CreateEmptyVectorDataArray(int64_t count, const FieldMeta& field_meta); std::unique_ptr CreateScalarDataArrayFrom(const void* data_raw, diff --git a/internal/core/src/segcore/reduce/Reduce.cpp b/internal/core/src/segcore/reduce/Reduce.cpp index 558cdc3a72..8f3a0074d6 100644 --- a/internal/core/src/segcore/reduce/Reduce.cpp +++ b/internal/core/src/segcore/reduce/Reduce.cpp @@ -439,6 +439,8 @@ ReduceHelper::GetSearchResultDataSlice(int slice_index) { ->mutable_array_data() ->set_element_type( proto::schema::DataType(field_meta.get_element_type())); + } else if (field_meta.get_data_type() == DataType::VECTOR_ARRAY) { + PanicInfo(NotImplemented, "VECTOR_ARRAY is not implemented"); } search_result_data->mutable_fields_data()->AddAllocated( field_data.release()); diff --git a/internal/core/src/segcore/reduce/StreamReduce.cpp b/internal/core/src/segcore/reduce/StreamReduce.cpp index d7fdf22035..486c1dc1a2 100644 --- a/internal/core/src/segcore/reduce/StreamReduce.cpp +++ b/internal/core/src/segcore/reduce/StreamReduce.cpp @@ -154,7 +154,10 @@ StreamReducerHelper::AssembleMergedResult() { ->mutable_array_data() ->set_element_type( proto::schema::DataType(field_meta.get_element_type())); + } else if (field_meta.get_data_type() == DataType::VECTOR_ARRAY) { + PanicInfo(NotImplemented, "VECTOR_ARRAY is not implemented"); } + new_merged_result->output_fields_data_[field_id] = std::move(field_data); } @@ -670,6 +673,8 @@ StreamReducerHelper::GetSearchResultDataSlice(int slice_index) { ->mutable_array_data() ->set_element_type( proto::schema::DataType(field_meta.get_element_type())); + } else if (field_meta.get_data_type() == DataType::VECTOR_ARRAY) { + PanicInfo(NotImplemented, "VECTOR_ARRAY is not implemented"); } search_result_data->mutable_fields_data()->AddAllocated( field_data.release()); diff --git a/internal/core/src/storage/Event.cpp b/internal/core/src/storage/Event.cpp index f839e52d34..30b7a0c7d0 100644 --- a/internal/core/src/storage/Event.cpp +++ b/internal/core/src/storage/Event.cpp @@ -247,6 +247,8 @@ BaseEventData::Serialize() { auto data_type = field_data->get_data_type(); std::shared_ptr payload_writer; if (IsVectorDataType(data_type) && + // each element will be serialized as bytes so no need dim info + data_type != DataType::VECTOR_ARRAY && !IsSparseFloatVectorDataType(data_type)) { payload_writer = std::make_unique( data_type, field_data->get_dim(), field_data->IsNullable()); @@ -310,6 +312,22 @@ BaseEventData::Serialize() { } break; } + case DataType::VECTOR_ARRAY: { + for (size_t offset = 0; offset < field_data->get_num_rows(); + ++offset) { + auto array = static_cast( + field_data->RawValue(offset)); + auto array_string = + array->output_data().SerializeAsString(); + auto size = array_string.size(); + + // todo(SapdeA): it maybe better to serialize vectors one by one + payload_writer->add_one_binary_payload( + reinterpret_cast(array_string.c_str()), + size); + } + break; + } default: { auto payload = Payload{data_type, diff --git a/internal/core/src/storage/PayloadReader.cpp b/internal/core/src/storage/PayloadReader.cpp index fc40550ddd..64dcfa5109 100644 --- a/internal/core/src/storage/PayloadReader.cpp +++ b/internal/core/src/storage/PayloadReader.cpp @@ -73,7 +73,8 @@ PayloadReader::init(const uint8_t* data, int length, bool is_field_data) { // dim is unused for sparse float vector dim_ = (IsVectorDataType(column_type_) && - !IsSparseFloatVectorDataType(column_type_)) + !IsSparseFloatVectorDataType(column_type_)) && + !IsVectorArrayDataType(column_type_) ? GetDimensionFromFileMetaData( file_meta->schema()->Column(column_index), column_type_) : 1; diff --git a/internal/core/src/storage/Util.cpp b/internal/core/src/storage/Util.cpp index aadf7f7f3e..06577a867f 100644 --- a/internal/core/src/storage/Util.cpp +++ b/internal/core/src/storage/Util.cpp @@ -284,6 +284,7 @@ CreateArrowBuilder(DataType data_type) { return std::make_shared(); } case DataType::ARRAY: + case DataType::VECTOR_ARRAY: case DataType::JSON: { return std::make_shared(); } @@ -411,6 +412,7 @@ CreateArrowSchema(DataType data_type, bool nullable) { {arrow::field("val", arrow::utf8(), nullable)}); } case DataType::ARRAY: + case DataType::VECTOR_ARRAY: case DataType::JSON: { return arrow::schema( {arrow::field("val", arrow::binary(), nullable)}); @@ -938,6 +940,9 @@ CreateFieldData(const DataType& type, case DataType::VECTOR_INT8: return std::make_shared>( dim, type, total_num_rows); + case DataType::VECTOR_ARRAY: + return std::make_shared>(type, + total_num_rows); default: PanicInfo(DataTypeInvalid, "CreateFieldData not support data type " + diff --git a/internal/core/unittest/CMakeLists.txt b/internal/core/unittest/CMakeLists.txt index f858e89d52..19b1111777 100644 --- a/internal/core/unittest/CMakeLists.txt +++ b/internal/core/unittest/CMakeLists.txt @@ -105,6 +105,7 @@ set(MILVUS_TEST_FILES test_chunked_segment_storage_v2.cpp test_thread_pool.cpp test_json_flat_index.cpp + test_vector_array.cpp ) if ( INDEX_ENGINE STREQUAL "cardinal" ) diff --git a/internal/core/unittest/test_array_bitmap_index.cpp b/internal/core/unittest/test_array_bitmap_index.cpp index 0ffe854304..748acac025 100644 --- a/internal/core/unittest/test_array_bitmap_index.cpp +++ b/internal/core/unittest/test_array_bitmap_index.cpp @@ -65,7 +65,7 @@ GenerateArrayData(proto::schema::DataType element_type, int cardinality, int size, int array_len) { - std::vector data(size); + std::vector data(size); switch (element_type) { case proto::schema::DataType::Bool: { for (int i = 0; i < size; i++) { diff --git a/internal/core/unittest/test_array_expr.cpp b/internal/core/unittest/test_array_expr.cpp index bd29b23c4c..d1145b5f97 100644 --- a/internal/core/unittest/test_array_expr.cpp +++ b/internal/core/unittest/test_array_expr.cpp @@ -564,16 +564,18 @@ TEST(Expr, TestArrayRange) { auto seg = CreateGrowingSegment(schema, empty_index_meta); int N = 1000; - std::map> array_cols; + std::map> array_cols; int num_iters = 1; for (int iter = 0; iter < num_iters; ++iter) { auto raw_data = DataGen(schema, N, iter); - auto new_long_array_col = raw_data.get_col(long_array_fid); - auto new_bool_array_col = raw_data.get_col(bool_array_fid); + auto new_long_array_col = + raw_data.get_col(long_array_fid); + auto new_bool_array_col = + raw_data.get_col(bool_array_fid); auto new_string_array_col = - raw_data.get_col(string_array_fid); + raw_data.get_col(string_array_fid); auto new_float_array_col = - raw_data.get_col(float_array_fid); + raw_data.get_col(float_array_fid); array_cols["long"].insert(array_cols["long"].end(), new_long_array_col.begin(), @@ -717,11 +719,12 @@ TEST(Expr, TestArrayEqual) { auto seg = CreateGrowingSegment(schema, empty_index_meta); int N = 1000; - std::vector long_array_col; + std::vector long_array_col; int num_iters = 1; for (int iter = 0; iter < num_iters; ++iter) { auto raw_data = DataGen(schema, N, iter, 0, 1, 3); - auto new_long_array_col = raw_data.get_col(long_array_fid); + auto new_long_array_col = + raw_data.get_col(long_array_fid); long_array_col.insert(long_array_col.end(), new_long_array_col.begin(), new_long_array_col.end()); @@ -820,13 +823,14 @@ TEST(Expr, TestArrayNullExpr) { auto seg = CreateGrowingSegment(schema, empty_index_meta); int N = 1000; - std::vector long_array_col; + std::vector long_array_col; int num_iters = 1; FixedVector valid_data; for (int iter = 0; iter < num_iters; ++iter) { auto raw_data = DataGen(schema, N, iter, 0, 1, 3); - auto new_long_array_col = raw_data.get_col(long_array_fid); + auto new_long_array_col = + raw_data.get_col(long_array_fid); long_array_col.insert(long_array_col.end(), new_long_array_col.begin(), new_long_array_col.end()); @@ -989,19 +993,22 @@ TEST(Expr, TestArrayContains) { auto seg = CreateGrowingSegment(schema, empty_index_meta); int N = 1000; - std::map> array_cols; + std::map> array_cols; int num_iters = 1; for (int iter = 0; iter < num_iters; ++iter) { auto raw_data = DataGen(schema, N, iter); - auto new_int_array_col = raw_data.get_col(int_array_fid); - auto new_long_array_col = raw_data.get_col(long_array_fid); - auto new_bool_array_col = raw_data.get_col(bool_array_fid); + auto new_int_array_col = + raw_data.get_col(int_array_fid); + auto new_long_array_col = + raw_data.get_col(long_array_fid); + auto new_bool_array_col = + raw_data.get_col(bool_array_fid); auto new_float_array_col = - raw_data.get_col(float_array_fid); + raw_data.get_col(float_array_fid); auto new_double_array_col = - raw_data.get_col(double_array_fid); + raw_data.get_col(double_array_fid); auto new_string_array_col = - raw_data.get_col(string_array_fid); + raw_data.get_col(string_array_fid); array_cols["int"].insert(array_cols["int"].end(), new_int_array_col.begin(), @@ -1512,16 +1519,18 @@ TEST(Expr, TestArrayBinaryArith) { auto seg = CreateGrowingSegment(schema, empty_index_meta); int N = 1000; - std::map> array_cols; + std::map> array_cols; int num_iters = 1; for (int iter = 0; iter < num_iters; ++iter) { auto raw_data = DataGen(schema, N, iter); - auto new_int_array_col = raw_data.get_col(int_array_fid); - auto new_long_array_col = raw_data.get_col(long_array_fid); + auto new_int_array_col = + raw_data.get_col(int_array_fid); + auto new_long_array_col = + raw_data.get_col(long_array_fid); auto new_float_array_col = - raw_data.get_col(float_array_fid); + raw_data.get_col(float_array_fid); auto new_double_array_col = - raw_data.get_col(double_array_fid); + raw_data.get_col(double_array_fid); array_cols["int"].insert(array_cols["int"].end(), new_int_array_col.begin(), @@ -2477,12 +2486,12 @@ TEST(Expr, TestArrayStringMatch) { auto seg = CreateGrowingSegment(schema, empty_index_meta); int N = 1000; - std::map> array_cols; + std::map> array_cols; int num_iters = 1; for (int iter = 0; iter < num_iters; ++iter) { auto raw_data = DataGen(schema, N, iter); auto new_string_array_col = - raw_data.get_col(string_array_fid); + raw_data.get_col(string_array_fid); array_cols["string"].insert(array_cols["string"].end(), new_string_array_col.begin(), new_string_array_col.end()); @@ -2581,16 +2590,18 @@ TEST(Expr, TestArrayInTerm) { auto seg = CreateGrowingSegment(schema, empty_index_meta); int N = 1000; - std::map> array_cols; + std::map> array_cols; int num_iters = 1; for (int iter = 0; iter < num_iters; ++iter) { auto raw_data = DataGen(schema, N, iter); - auto new_long_array_col = raw_data.get_col(long_array_fid); - auto new_bool_array_col = raw_data.get_col(bool_array_fid); + auto new_long_array_col = + raw_data.get_col(long_array_fid); + auto new_bool_array_col = + raw_data.get_col(bool_array_fid); auto new_float_array_col = - raw_data.get_col(float_array_fid); + raw_data.get_col(float_array_fid); auto new_string_array_col = - raw_data.get_col(string_array_fid); + raw_data.get_col(string_array_fid); array_cols["long"].insert(array_cols["long"].end(), new_long_array_col.begin(), new_long_array_col.end()); @@ -2798,11 +2809,12 @@ TEST(Expr, TestTermInArray) { auto seg = CreateGrowingSegment(schema, empty_index_meta); int N = 1000; - std::map> array_cols; + std::map> array_cols; int num_iters = 1; for (int iter = 0; iter < num_iters; ++iter) { auto raw_data = DataGen(schema, N, iter); - auto new_long_array_col = raw_data.get_col(long_array_fid); + auto new_long_array_col = + raw_data.get_col(long_array_fid); array_cols["long"].insert(array_cols["long"].end(), new_long_array_col.begin(), new_long_array_col.end()); diff --git a/internal/core/unittest/test_chunk_vector.cpp b/internal/core/unittest/test_chunk_vector.cpp index 85ccc41ab9..c235a56408 100644 --- a/internal/core/unittest/test_chunk_vector.cpp +++ b/internal/core/unittest/test_chunk_vector.cpp @@ -380,16 +380,16 @@ TEST_F(ChunkVectorTest, QueryWithMmap) { // auto seg = CreateGrowingSegment(schema, empty_index_meta, 22, config); // int N = 1000; -// std::map> array_cols; +// std::map> array_cols; // int num_iters = 1; // for (int iter = 0; iter < num_iters; ++iter) { // auto raw_data = DataGen(schema, N, iter); -// auto new_long_array_col = raw_data.get_col(long_array_fid); -// auto new_bool_array_col = raw_data.get_col(bool_array_fid); +// auto new_long_array_col = raw_data.get_col(long_array_fid); +// auto new_bool_array_col = raw_data.get_col(bool_array_fid); // auto new_float_array_col = -// raw_data.get_col(float_array_fid); +// raw_data.get_col(float_array_fid); // auto new_string_array_col = -// raw_data.get_col(string_array_fid); +// raw_data.get_col(string_array_fid); // array_cols["long"].insert(array_cols["long"].end(), // new_long_array_col.begin(), // new_long_array_col.end()); diff --git a/internal/core/unittest/test_growing.cpp b/internal/core/unittest/test_growing.cpp index 452116d114..c1016b9cd3 100644 --- a/internal/core/unittest/test_growing.cpp +++ b/internal/core/unittest/test_growing.cpp @@ -17,6 +17,7 @@ #include "segcore/SegmentGrowingImpl.h" #include "pb/schema.pb.h" #include "test_utils/DataGen.h" +#include "test_utils/storage_test_utils.h" using namespace milvus::segcore; using namespace milvus; @@ -414,3 +415,129 @@ TEST(Growing, FillNullableData) { EXPECT_EQ(float_array_result->valid_data_size(), num_inserted); } } + +TEST_P(GrowingTest, FillVectorArrayData) { + auto schema = std::make_shared(); + auto int64_field = schema->AddDebugField("int64", DataType::INT64); + auto array_float_vector = schema->AddDebugVectorArrayField( + "array_float_vector", DataType::VECTOR_FLOAT, 128, metric_type); + schema->set_primary_field_id(int64_field); + + auto config = SegcoreConfig::default_config(); + config.set_chunk_rows(1024); + config.set_enable_interim_segment_index(true); + std::map filedMap = {}; + IndexMetaPtr metaPtr = + std::make_shared(100000, std::move(filedMap)); + auto segment_growing = CreateGrowingSegment(schema, metaPtr, 1, config); + auto segment = dynamic_cast(segment_growing.get()); + int64_t per_batch = 1000; + int64_t n_batch = 3; + int64_t dim = 128; + for (int64_t i = 0; i < n_batch; i++) { + auto dataset = DataGen(schema, per_batch); + + auto offset = segment->PreInsert(per_batch); + segment->Insert(offset, + per_batch, + dataset.row_ids_.data(), + dataset.timestamps_.data(), + dataset.raw_); + auto num_inserted = (i + 1) * per_batch; + auto ids_ds = GenRandomIds(num_inserted); + auto int64_result = segment->bulk_subscript( + int64_field, ids_ds->GetIds(), num_inserted); + auto array_float_vector_result = segment->bulk_subscript( + array_float_vector, ids_ds->GetIds(), num_inserted); + + EXPECT_EQ(int64_result->scalars().long_data().data_size(), + num_inserted); + EXPECT_EQ( + array_float_vector_result->vectors().vector_array().data_size(), + num_inserted); + + if (i == 0) { + // Verify vector array data + auto verify_float_vectors = [](auto arr1, auto arr2) { + static constexpr float EPSILON = 1e-6; + EXPECT_EQ(arr1.size(), arr2.size()); + for (int64_t i = 0; i < arr1.size(); ++i) { + EXPECT_NEAR(arr1[i], arr2[i], EPSILON); + } + }; + + auto array_vec_values = + dataset.get_col(array_float_vector); + for (int64_t i = 0; i < per_batch; ++i) { + auto arrow_array = array_float_vector_result->vectors() + .vector_array() + .data()[i] + .float_vector() + .data(); + auto expected_array = + array_vec_values[ids_ds->GetIds()[i]].float_vector().data(); + verify_float_vectors(arrow_array, expected_array); + } + } + + EXPECT_EQ(int64_result->valid_data_size(), 0); + EXPECT_EQ(array_float_vector_result->valid_data_size(), 0); + } +} + +TEST(GrowingTest, LoadVectorArrayData) { + auto schema = std::make_shared(); + auto metric_type = knowhere::metric::L2; + auto int64_field = schema->AddDebugField("int64", DataType::INT64); + auto array_float_vector = schema->AddDebugVectorArrayField( + "array_vec", DataType::VECTOR_FLOAT, 128, metric_type); + schema->set_primary_field_id(int64_field); + + auto config = SegcoreConfig::default_config(); + config.set_chunk_rows(1024); + config.set_enable_interim_segment_index(true); + std::map filedMap = {}; + IndexMetaPtr metaPtr = + std::make_shared(100000, std::move(filedMap)); + + int64_t dataset_size = 1000; + int64_t dim = 128; + auto dataset = DataGen(schema, dataset_size); + auto segment_growing = + CreateGrowingWithFieldDataLoaded(schema, metaPtr, config, dataset); + auto segment = segment_growing.get(); + + // Verify data + auto int64_values = dataset.get_col(int64_field); + auto array_vec_values = + dataset.get_col(array_float_vector); + + auto ids_ds = GenRandomIds(dataset_size); + auto int64_result = + segment->bulk_subscript(int64_field, ids_ds->GetIds(), dataset_size); + auto array_float_vector_result = segment->bulk_subscript( + array_float_vector, ids_ds->GetIds(), dataset_size); + + EXPECT_EQ(int64_result->scalars().long_data().data_size(), dataset_size); + EXPECT_EQ(array_float_vector_result->vectors().vector_array().data_size(), + dataset_size); + + auto verify_float_vectors = [](auto arr1, auto arr2) { + static constexpr float EPSILON = 1e-6; + EXPECT_EQ(arr1.size(), arr2.size()); + for (int64_t i = 0; i < arr1.size(); ++i) { + EXPECT_NEAR(arr1[i], arr2[i], EPSILON); + } + }; + + for (int64_t i = 0; i < dataset_size; ++i) { + auto arrow_array = array_float_vector_result->vectors() + .vector_array() + .data()[i] + .float_vector() + .data(); + auto expected_array = + array_vec_values[ids_ds->GetIds()[i]].float_vector().data(); + verify_float_vectors(arrow_array, expected_array); + } +} \ No newline at end of file diff --git a/internal/core/unittest/test_sealed.cpp b/internal/core/unittest/test_sealed.cpp index b5cccf9d87..ffca7cc261 100644 --- a/internal/core/unittest/test_sealed.cpp +++ b/internal/core/unittest/test_sealed.cpp @@ -1992,12 +1992,17 @@ TEST(Sealed, QueryAllFields) { auto double_values = dataset.get_col(double_field); auto varchar_values = dataset.get_col(varchar_field); auto json_values = dataset.get_col(json_field); - auto int_array_values = dataset.get_col(int_array_field); - auto long_array_values = dataset.get_col(long_array_field); - auto bool_array_values = dataset.get_col(bool_array_field); - auto string_array_values = dataset.get_col(string_array_field); - auto double_array_values = dataset.get_col(double_array_field); - auto float_array_values = dataset.get_col(float_array_field); + auto int_array_values = dataset.get_col(int_array_field); + auto long_array_values = + dataset.get_col(long_array_field); + auto bool_array_values = + dataset.get_col(bool_array_field); + auto string_array_values = + dataset.get_col(string_array_field); + auto double_array_values = + dataset.get_col(double_array_field); + auto float_array_values = + dataset.get_col(float_array_field); auto vector_values = dataset.get_col(vec); auto float16_vector_values = dataset.get_col(float16_vec); auto bfloat16_vector_values = dataset.get_col(bfloat16_vec); @@ -2149,12 +2154,17 @@ TEST(Sealed, QueryAllNullableFields) { auto double_values = dataset.get_col(double_field); auto varchar_values = dataset.get_col(varchar_field); auto json_values = dataset.get_col(json_field); - auto int_array_values = dataset.get_col(int_array_field); - auto long_array_values = dataset.get_col(long_array_field); - auto bool_array_values = dataset.get_col(bool_array_field); - auto string_array_values = dataset.get_col(string_array_field); - auto double_array_values = dataset.get_col(double_array_field); - auto float_array_values = dataset.get_col(float_array_field); + auto int_array_values = dataset.get_col(int_array_field); + auto long_array_values = + dataset.get_col(long_array_field); + auto bool_array_values = + dataset.get_col(bool_array_field); + auto string_array_values = + dataset.get_col(string_array_field); + auto double_array_values = + dataset.get_col(double_array_field); + auto float_array_values = + dataset.get_col(float_array_field); auto vector_values = dataset.get_col(vec); auto bool_valid_values = dataset.get_col_valid(bool_field); @@ -2269,3 +2279,57 @@ TEST(Sealed, SearchSortedPk) { EXPECT_EQ(6, offsets2.size()); EXPECT_EQ(100, offsets2[0].get()); } + +TEST(Sealed, QueryVectorArrayAllFields) { + auto schema = std::make_shared(); + auto metric_type = knowhere::metric::L2; + auto int64_field = schema->AddDebugField("int64", DataType::INT64); + auto array_vec = schema->AddDebugVectorArrayField( + "array_vec", DataType::VECTOR_FLOAT, 128, metric_type); + schema->set_primary_field_id(int64_field); + + std::map filedMap{}; + IndexMetaPtr metaPtr = + std::make_shared(100000, std::move(filedMap)); + + int64_t dataset_size = 1000; + int64_t dim = 128; + auto dataset = DataGen(schema, dataset_size); + auto segment_sealed = CreateSealedWithFieldDataLoaded(schema, dataset); + auto segment = + dynamic_cast(segment_sealed.get()); + + auto int64_values = dataset.get_col(int64_field); + auto array_vec_values = dataset.get_col(array_vec); + + auto ids_ds = GenRandomIds(dataset_size); + auto int64_result = + segment->bulk_subscript(int64_field, ids_ds->GetIds(), dataset_size); + auto array_float_vector_result = + segment->bulk_subscript(array_vec, ids_ds->GetIds(), dataset_size); + + EXPECT_EQ(int64_result->scalars().long_data().data_size(), dataset_size); + EXPECT_EQ(array_float_vector_result->vectors().vector_array().data_size(), + dataset_size); + + auto verify_float_vectors = [](auto arr1, auto arr2) { + static constexpr float EPSILON = 1e-6; + EXPECT_EQ(arr1.size(), arr2.size()); + for (int64_t i = 0; i < arr1.size(); ++i) { + EXPECT_NEAR(arr1[i], arr2[i], EPSILON); + } + }; + for (int64_t i = 0; i < dataset_size; ++i) { + auto arrow_array = array_float_vector_result->vectors() + .vector_array() + .data()[i] + .float_vector() + .data(); + auto expected_array = + array_vec_values[ids_ds->GetIds()[i]].float_vector().data(); + verify_float_vectors(arrow_array, expected_array); + } + + EXPECT_EQ(int64_result->valid_data_size(), 0); + EXPECT_EQ(array_float_vector_result->valid_data_size(), 0); +} diff --git a/internal/core/unittest/test_utils/DataGen.h b/internal/core/unittest/test_utils/DataGen.h index 61c6f31a3a..e6e7ea324d 100644 --- a/internal/core/unittest/test_utils/DataGen.h +++ b/internal/core/unittest/test_utils/DataGen.h @@ -105,129 +105,150 @@ struct GeneratedData { } auto& field_meta = schema_->operator[](field_id); - if (field_meta.is_vector() && - field_meta.get_data_type() != DataType::VECTOR_SPARSE_FLOAT) { - if (field_meta.get_data_type() == DataType::VECTOR_FLOAT) { - int len = raw_->num_rows() * field_meta.get_dim(); - ret.resize(len); - auto src_data = - reinterpret_cast(target_field_data.vectors() - .float_vector() - .data() - .data()); - std::copy_n(src_data, len, ret.data()); - } else if (field_meta.get_data_type() == - DataType::VECTOR_BINARY) { - int len = raw_->num_rows() * (field_meta.get_dim() / 8); - ret.resize(len); - auto src_data = reinterpret_cast( - target_field_data.vectors().binary_vector().data()); - std::copy_n(src_data, len, ret.data()); - } else if (field_meta.get_data_type() == - DataType::VECTOR_FLOAT16) { - int len = raw_->num_rows() * field_meta.get_dim(); - ret.resize(len); - auto src_data = reinterpret_cast( - target_field_data.vectors().float16_vector().data()); - std::copy_n(src_data, len, ret.data()); - } else if (field_meta.get_data_type() == - DataType::VECTOR_BFLOAT16) { - int len = raw_->num_rows() * field_meta.get_dim(); - ret.resize(len); - auto src_data = reinterpret_cast( - target_field_data.vectors().bfloat16_vector().data()); - std::copy_n(src_data, len, ret.data()); - } else if (field_meta.get_data_type() == - DataType::VECTOR_INT8) { - int len = raw_->num_rows() * field_meta.get_dim(); - ret.resize(len); - auto src_data = reinterpret_cast( - target_field_data.vectors().int8_vector().data()); - std::copy_n(src_data, len, ret.data()); - } else { - PanicInfo(Unsupported, "unsupported"); - } - return std::move(ret); - } - if constexpr (std::is_same_v>) { - auto sparse_float_array = - target_field_data.vectors().sparse_float_vector(); - auto rows = SparseBytesToRows(sparse_float_array.contents()); - std::copy_n(rows.get(), raw_->num_rows(), ret.data()); - } else if constexpr (std::is_same_v) { - auto ret_data = reinterpret_cast(ret.data()); - auto src_data = target_field_data.scalars().array_data().data(); + if constexpr (std::is_same_v) { + auto ret_data = reinterpret_cast(ret.data()); + auto src_data = + target_field_data.vectors().vector_array().data(); std::copy(src_data.begin(), src_data.end(), ret_data); + return std::move(ret); } else { - switch (field_meta.get_data_type()) { - case DataType::BOOL: { + if (field_meta.is_vector() && + field_meta.get_data_type() != + DataType::VECTOR_SPARSE_FLOAT) { + if (field_meta.get_data_type() == DataType::VECTOR_FLOAT) { + int len = raw_->num_rows() * field_meta.get_dim(); + ret.resize(len); auto src_data = reinterpret_cast( - target_field_data.scalars() - .bool_data() + target_field_data.vectors() + .float_vector() .data() .data()); - std::copy_n(src_data, raw_->num_rows(), ret.data()); - break; - } - case DataType::INT8: - case DataType::INT16: - case DataType::INT32: { - auto src_data = reinterpret_cast( - target_field_data.scalars() - .int_data() - .data() - .data()); - std::copy_n(src_data, raw_->num_rows(), ret.data()); - break; - } - case DataType::INT64: { + std::copy_n(src_data, len, ret.data()); + } else if (field_meta.get_data_type() == + DataType::VECTOR_BINARY) { + int len = raw_->num_rows() * (field_meta.get_dim() / 8); + ret.resize(len); auto src_data = reinterpret_cast( - target_field_data.scalars() - .long_data() - .data() - .data()); - std::copy_n(src_data, raw_->num_rows(), ret.data()); - break; - } - case DataType::FLOAT: { + target_field_data.vectors().binary_vector().data()); + std::copy_n(src_data, len, ret.data()); + } else if (field_meta.get_data_type() == + DataType::VECTOR_FLOAT16) { + int len = raw_->num_rows() * field_meta.get_dim(); + ret.resize(len); auto src_data = reinterpret_cast( - target_field_data.scalars() - .float_data() - .data() + target_field_data.vectors() + .float16_vector() .data()); - std::copy_n(src_data, raw_->num_rows(), ret.data()); - break; - } - case DataType::DOUBLE: { + std::copy_n(src_data, len, ret.data()); + } else if (field_meta.get_data_type() == + DataType::VECTOR_BFLOAT16) { + int len = raw_->num_rows() * field_meta.get_dim(); + ret.resize(len); auto src_data = reinterpret_cast( - target_field_data.scalars() - .double_data() - .data() + target_field_data.vectors() + .bfloat16_vector() .data()); - std::copy_n(src_data, raw_->num_rows(), ret.data()); - break; - } - case DataType::VARCHAR: { - auto ret_data = - reinterpret_cast(ret.data()); - auto src_data = - target_field_data.scalars().string_data().data(); - std::copy(src_data.begin(), src_data.end(), ret_data); - break; - } - case DataType::JSON: { - auto ret_data = - reinterpret_cast(ret.data()); - auto src_data = - target_field_data.scalars().json_data().data(); - std::copy(src_data.begin(), src_data.end(), ret_data); - break; - } - default: { + std::copy_n(src_data, len, ret.data()); + } else if (field_meta.get_data_type() == + DataType::VECTOR_INT8) { + int len = raw_->num_rows() * field_meta.get_dim(); + ret.resize(len); + auto src_data = reinterpret_cast( + target_field_data.vectors().int8_vector().data()); + std::copy_n(src_data, len, ret.data()); + } else { PanicInfo(Unsupported, "unsupported"); } + + return std::move(ret); + } + if constexpr (std::is_same_v< + T, + knowhere::sparse::SparseRow>) { + auto sparse_float_array = + target_field_data.vectors().sparse_float_vector(); + auto rows = + SparseBytesToRows(sparse_float_array.contents()); + std::copy_n(rows.get(), raw_->num_rows(), ret.data()); + } else if constexpr (std::is_same_v) { + auto ret_data = + reinterpret_cast(ret.data()); + auto src_data = + target_field_data.scalars().array_data().data(); + std::copy(src_data.begin(), src_data.end(), ret_data); + } else { + switch (field_meta.get_data_type()) { + case DataType::BOOL: { + auto src_data = reinterpret_cast( + target_field_data.scalars() + .bool_data() + .data() + .data()); + std::copy_n(src_data, raw_->num_rows(), ret.data()); + break; + } + case DataType::INT8: + case DataType::INT16: + case DataType::INT32: { + auto src_data = reinterpret_cast( + target_field_data.scalars() + .int_data() + .data() + .data()); + std::copy_n(src_data, raw_->num_rows(), ret.data()); + break; + } + case DataType::INT64: { + auto src_data = reinterpret_cast( + target_field_data.scalars() + .long_data() + .data() + .data()); + std::copy_n(src_data, raw_->num_rows(), ret.data()); + break; + } + case DataType::FLOAT: { + auto src_data = reinterpret_cast( + target_field_data.scalars() + .float_data() + .data() + .data()); + std::copy_n(src_data, raw_->num_rows(), ret.data()); + break; + } + case DataType::DOUBLE: { + auto src_data = reinterpret_cast( + target_field_data.scalars() + .double_data() + .data() + .data()); + std::copy_n(src_data, raw_->num_rows(), ret.data()); + break; + } + case DataType::VARCHAR: { + auto ret_data = + reinterpret_cast(ret.data()); + auto src_data = target_field_data.scalars() + .string_data() + .data(); + std::copy( + src_data.begin(), src_data.end(), ret_data); + break; + } + case DataType::JSON: { + auto ret_data = + reinterpret_cast(ret.data()); + auto src_data = + target_field_data.scalars().json_data().data(); + std::copy( + src_data.begin(), src_data.end(), ret_data); + break; + } + default: { + PanicInfo(Unsupported, "unsupported"); + } + } } } } @@ -412,55 +433,96 @@ DataGen(SchemaPtr schema, insert_data->mutable_fields_data()->AddAllocated(array.release()); }; + auto generate_float_vector = [&seed, &offset, &random, &distr]( + auto& field_meta, int64_t N) { + auto dim = field_meta.get_dim(); + vector final(dim * N); + bool is_ip = starts_with(field_meta.get_name().get(), "normalized"); +#pragma omp parallel for + for (int n = 0; n < N; ++n) { + vector data(dim); + float sum = 0; + + std::default_random_engine er2(seed + n); + std::normal_distribution<> distr2(0, 1); + for (auto& x : data) { + x = distr2(er2) + offset; + sum += x * x; + } + if (is_ip) { + sum = sqrt(sum); + for (auto& x : data) { + x /= sum; + } + } + + std::copy(data.begin(), data.end(), final.begin() + dim * n); + } + return final; + }; + + auto generate_binary_vector = [&seed, &offset, &random](auto& field_meta, + int64_t N) { + auto dim = field_meta.get_dim(); + Assert(dim % 8 == 0); + vector data(dim / 8 * N); + for (auto& x : data) { + x = random(); + } + return data; + }; + + auto generate_float16_vector = [&seed, &offset, &random, &distr]( + auto& field_meta, int64_t N) { + auto dim = field_meta.get_dim(); + vector data(dim * N); + for (auto& x : data) { + x = float16(distr(random) + offset); + } + return data; + }; + + auto generate_bfloat16_vector = [&seed, &offset, &random, &distr]( + auto& field_meta, int64_t N) { + auto dim = field_meta.get_dim(); + vector data(dim * N); + for (auto& x : data) { + x = bfloat16(distr(random) + offset); + } + return data; + }; + + auto generate_int8_vector = [&seed, &offset, &random](auto& field_meta, + int64_t N) { + auto dim = field_meta.get_dim(); + vector data(dim * N); + for (auto& x : data) { + x = int8_t(random() % 256 - 128); + } + return data; + }; + for (auto field_id : schema->get_field_ids()) { auto field_meta = schema->operator[](field_id); switch (field_meta.get_data_type()) { case DataType::VECTOR_FLOAT: { - auto dim = field_meta.get_dim(); - vector final(dim * N); - bool is_ip = - starts_with(field_meta.get_name().get(), "normalized"); -#pragma omp parallel for - for (int n = 0; n < N; ++n) { - vector data(dim); - float sum = 0; - - std::default_random_engine er2(seed + n); - std::normal_distribution<> distr2(0, 1); - for (auto& x : data) { - x = distr2(er2) + offset; - sum += x * x; - } - if (is_ip) { - sum = sqrt(sum); - for (auto& x : data) { - x /= sum; - } - } - - std::copy( - data.begin(), data.end(), final.begin() + dim * n); - } - insert_cols(final, N, field_meta, random_valid); + auto data = generate_float_vector(field_meta, N); + insert_cols(data, N, field_meta, random_valid); break; } case DataType::VECTOR_BINARY: { - auto dim = field_meta.get_dim(); - Assert(dim % 8 == 0); - vector data(dim / 8 * N); - for (auto& x : data) { - x = random(); - } + auto data = generate_binary_vector(field_meta, N); insert_cols(data, N, field_meta, random_valid); break; } case DataType::VECTOR_FLOAT16: { - auto dim = field_meta.get_dim(); - vector final(dim * N); - for (auto& x : final) { - x = float16(distr(random) + offset); - } - insert_cols(final, N, field_meta, random_valid); + auto data = generate_float16_vector(field_meta, N); + insert_cols(data, N, field_meta, random_valid); + break; + } + case DataType::VECTOR_BFLOAT16: { + auto data = generate_bfloat16_vector(field_meta, N); + insert_cols(data, N, field_meta, random_valid); break; } case DataType::VECTOR_SPARSE_FLOAT: { @@ -472,23 +534,81 @@ DataGen(SchemaPtr schema, array.release()); break; } - case DataType::VECTOR_BFLOAT16: { - auto dim = field_meta.get_dim(); - vector final(dim * N); - for (auto& x : final) { - x = bfloat16(distr(random) + offset); - } - insert_cols(final, N, field_meta, random_valid); + case DataType::VECTOR_INT8: { + auto data = generate_int8_vector(field_meta, N); + insert_cols(data, N, field_meta, random_valid); break; } - case DataType::VECTOR_INT8: { + + case DataType::VECTOR_ARRAY: { auto dim = field_meta.get_dim(); - vector final(dim * N); - srand(seed); - for (auto& x : final) { - x = int8_t(rand() % 256 - 128); + vector vector_array(N); + for (int i = 0; i < N / repeat_count; ++i) { + VectorFieldProto field_data; + field_data.set_dim(dim); + + switch (field_meta.get_element_type()) { + case DataType::VECTOR_FLOAT: { + auto data = + generate_float_vector(field_meta, array_len); + field_data.mutable_float_vector() + ->mutable_data() + ->Add(data.begin(), data.end()); + break; + } + case DataType::VECTOR_BINARY: { + auto num_bytes = array_len * dim / 8; + auto data_raw = + generate_binary_vector(field_meta, array_len); + auto data = + reinterpret_cast(data_raw.data()); + auto obj = field_data.mutable_binary_vector(); + obj->assign(data, num_bytes); + break; + } + case DataType::VECTOR_FLOAT16: { + auto length = array_len * dim; + auto data_raw = + generate_float16_vector(field_meta, array_len); + auto data = + reinterpret_cast(data_raw.data()); + auto obj = field_data.mutable_float16_vector(); + obj->assign(data, length * sizeof(float16)); + break; + } + case DataType::VECTOR_SPARSE_FLOAT: + PanicInfo(DataTypeInvalid, "not implemented"); + break; + case DataType::VECTOR_BFLOAT16: { + auto length = array_len * dim; + auto data_raw = + generate_bfloat16_vector(field_meta, array_len); + auto data = + reinterpret_cast(data_raw.data()); + auto obj = field_data.mutable_bfloat16_vector(); + obj->assign(data, length * sizeof(bfloat16)); + break; + } + case DataType::VECTOR_INT8: { + auto length = array_len * dim; + auto data_raw = + generate_int8_vector(field_meta, array_len); + auto data = + reinterpret_cast(data_raw.data()); + auto obj = field_data.mutable_int8_vector(); + obj->assign(data, length * sizeof(int8_t)); + break; + } + default: { + PanicInfo(DataTypeInvalid, "not implemented"); + } + } + + for (int j = 0; j < repeat_count; ++j) { + vector_array[i * repeat_count + j] = field_data; + } } - insert_cols(final, N, field_meta, random_valid); + insert_cols(vector_array, N, field_meta, random_valid); break; } case DataType::BOOL: { @@ -594,7 +714,7 @@ DataGen(SchemaPtr schema, break; } case DataType::ARRAY: { - vector data(N); + vector data(N); switch (field_meta.get_element_type()) { case DataType::BOOL: { for (int i = 0; i < N / repeat_count; i++) { @@ -1050,6 +1170,16 @@ CreateFieldDataFromDataArray(ssize_t raw_count, createFieldData(raw_data, DataType::VECTOR_INT8, dim); break; } + case DataType::VECTOR_ARRAY: { + auto src_data = data->vectors().vector_array().data(); + auto dim = field_meta.get_dim(); + std::vector data_raw(src_data.size()); + for (int i = 0; i < src_data.size(); i++) { + data_raw[i] = VectorArray(src_data.at(i)); + } + createFieldData(data_raw.data(), DataType::VECTOR_ARRAY, dim); + break; + } default: { PanicInfo(Unsupported, "unsupported"); } diff --git a/internal/core/unittest/test_utils/storage_test_utils.h b/internal/core/unittest/test_utils/storage_test_utils.h index c29375e4bc..f9f3dbc20d 100644 --- a/internal/core/unittest/test_utils/storage_test_utils.h +++ b/internal/core/unittest/test_utils/storage_test_utils.h @@ -184,7 +184,7 @@ PrepareSingleFieldInsertBinlog(int64_t collection_id, inline void LoadGeneratedDataIntoSegment(const GeneratedData& dataset, - milvus::segcore::SegmentSealed* segment, + milvus::segcore::SegmentInternalInterface* segment, bool with_mmap = false, std::vector excluded_field_ids = {}) { std::string mmap_dir_path = with_mmap ? "./data/mmap-test" : ""; @@ -215,6 +215,19 @@ CreateSealedWithFieldDataLoaded(milvus::SchemaPtr schema, return segment; } +inline std::unique_ptr +CreateGrowingWithFieldDataLoaded(milvus::SchemaPtr schema, + milvus::IndexMetaPtr indexMeta, + const milvus::segcore::SegcoreConfig& config, + const GeneratedData& dataset, + bool with_mmap = false, + std::vector excluded_field_ids = {}) { + auto segment_growing = CreateGrowingSegment(schema, indexMeta, 1, config); + LoadGeneratedDataIntoSegment( + dataset, segment_growing.get(), with_mmap, excluded_field_ids); + return segment_growing; +} + inline std::vector GetExcludedFieldIds(milvus::SchemaPtr schema, std::vector field_ids_to_load) { diff --git a/internal/core/unittest/test_vector_array.cpp b/internal/core/unittest/test_vector_array.cpp new file mode 100644 index 0000000000..3f5d568d34 --- /dev/null +++ b/internal/core/unittest/test_vector_array.cpp @@ -0,0 +1,135 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#include +#include +#include +#include + +#include "common/VectorArray.h" +#include "pb/schema.pb.h" +#include "common/Schema.h" + +using namespace milvus; + +TEST(VectorArray, TestSchema) { + namespace pb = milvus::proto; + pb::schema::CollectionSchema proto; + proto.set_name("col"); + proto.set_description("asdfhsalkgfhsadg"); + auto dim = 16; + bool bool_default_value = true; + int32_t int_default_value = 20; + int64_t long_default_value = 20; + float float_default_value = 20; + double double_default_value = 20; + std::string varchar_dafualt_vlaue = "20"; + + { + auto field = proto.add_fields(); + field->set_name("key"); + field->set_nullable(false); + field->set_fieldid(100); + field->set_is_primary_key(true); + field->set_description("asdgfsagf"); + field->set_data_type(pb::schema::DataType::Int64); + } + + { + auto struct_field = proto.add_struct_array_fields(); + struct_field->set_name("struct"); + struct_field->set_fieldid(101); + + auto field = struct_field->add_fields(); + field->set_name("struct_key"); + field->set_nullable(false); + field->set_fieldid(102); + field->set_data_type(pb::schema::DataType::Array); + field->set_element_type(pb::schema::DataType::Int64); + + auto field2 = struct_field->add_fields(); + field2->set_name("struct_float_vec"); + field2->set_fieldid(103); + field2->set_data_type(pb::schema::DataType::ArrayOfVector); + field2->set_element_type(pb::schema::DataType::FloatVector); + auto param = field2->add_type_params(); + param->set_key("dim"); + param->set_value("16"); + auto iparam = field2->add_index_params(); + iparam->set_key("metric_type"); + iparam->set_value("L2"); + } + + auto schema = Schema::ParseFrom(proto); + auto field = schema->operator[](FieldId(102)); + ASSERT_EQ(field.get_data_type(), DataType::ARRAY); + ASSERT_EQ(field.get_element_type(), DataType::INT64); + + auto field2 = schema->operator[](FieldId(103)); + ASSERT_EQ(field2.get_data_type(), DataType::VECTOR_ARRAY); + ASSERT_EQ(field2.get_element_type(), DataType::VECTOR_FLOAT); + ASSERT_EQ(field2.get_dim(), 16); +} + +std::vector +generate_float_vector(int64_t seed, int64_t N, int64_t dim) { + std::vector final(dim * N); + for (int n = 0; n < N; ++n) { + // generate random float vector + std::vector data(dim); + std::default_random_engine er2(seed + n); + std::normal_distribution<> distr2(0, 1); + for (auto& x : data) { + x = distr2(er2); + } + + std::copy(data.begin(), data.end(), final.begin() + dim * n); + } + return final; +}; + +TEST(VectorArray, TestConstructVectorArray) { + using namespace milvus; + + int N = 10; + // 1. test float vector + int64_t dim = 128; + milvus::proto::schema::VectorField field_float_vector_array; + field_float_vector_array.set_dim(dim); + + auto data = generate_float_vector(100, N, dim); + field_float_vector_array.mutable_float_vector()->mutable_data()->Add( + data.begin(), data.end()); + + auto float_vector_array = VectorArray(field_float_vector_array); + ASSERT_EQ(float_vector_array.length(), N); + ASSERT_EQ(float_vector_array.dim(), dim); + ASSERT_EQ(float_vector_array.get_element_type(), DataType::VECTOR_FLOAT); + ASSERT_EQ(float_vector_array.byte_size(), N * dim * sizeof(float)); + + ASSERT_TRUE(float_vector_array.is_same_array(field_float_vector_array)); + + auto float_vector_array_tmp = VectorArray(float_vector_array); + + ASSERT_TRUE(float_vector_array_tmp.is_same_array(field_float_vector_array)); + + auto float_vector_array_view = + VectorArrayView(const_cast(float_vector_array.data()), + float_vector_array.length(), + float_vector_array.dim(), + float_vector_array.byte_size(), + float_vector_array.get_element_type()); + + ASSERT_TRUE( + float_vector_array_view.is_same_array(field_float_vector_array)); + + // todo: add other vector types +} diff --git a/internal/rootcoord/create_collection_task_test.go b/internal/rootcoord/create_collection_task_test.go index 046cb23467..96ac0ac4d4 100644 --- a/internal/rootcoord/create_collection_task_test.go +++ b/internal/rootcoord/create_collection_task_test.go @@ -729,7 +729,7 @@ func Test_createCollectionTask_prepareSchema(t *testing.T) { Fields: []*schemapb.FieldSchema{ { Name: field1, - DataType: 200, + DataType: 300, }, }, } diff --git a/pkg/go.mod b/pkg/go.mod index 2bed5ebf7e..a93e2d34f4 100644 --- a/pkg/go.mod +++ b/pkg/go.mod @@ -20,7 +20,7 @@ require ( github.com/jolestar/go-commons-pool/v2 v2.1.2 github.com/json-iterator/go v1.1.12 github.com/klauspost/compress v1.17.9 - github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250527033021-e6b398e94ee6 + github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250604032224-16218b12b847 github.com/minio/minio-go/v7 v7.0.73 github.com/panjf2000/ants/v2 v2.11.3 github.com/prometheus/client_golang v1.14.0 diff --git a/pkg/go.sum b/pkg/go.sum index 02129786e2..9570915bdd 100644 --- a/pkg/go.sum +++ b/pkg/go.sum @@ -559,6 +559,8 @@ github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZz github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4= github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250527033021-e6b398e94ee6 h1:/m5rWCbnFL0Pg2KLS6Lw/lzQfdEb4qa2dLEqE4QjpkU= github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250527033021-e6b398e94ee6/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= +github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250604032224-16218b12b847 h1:6MFC+EIDe+n1aSX0BZ4s1/XbbPaEzQCx6JBoM1NTTz0= +github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250604032224-16218b12b847/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE= github.com/milvus-io/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk= github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34= diff --git a/tests/go_client/go.mod b/tests/go_client/go.mod index 0392641831..a8e2d2c2eb 100644 --- a/tests/go_client/go.mod +++ b/tests/go_client/go.mod @@ -51,7 +51,7 @@ require ( github.com/kr/text v0.2.0 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect - github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250520065019-02ce2e62a9fd // indirect + github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250604032224-16218b12b847 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/opencontainers/runtime-spec v1.0.2 // indirect diff --git a/tests/go_client/go.sum b/tests/go_client/go.sum index 88a1d21f80..3dbaeb47ae 100644 --- a/tests/go_client/go.sum +++ b/tests/go_client/go.sum @@ -318,8 +318,8 @@ github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfr github.com/mediocregopher/radix/v3 v3.4.2/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8= github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= -github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250520065019-02ce2e62a9fd h1:nriBHIny3LiSU56kP2FiIVtxh/0EEFXBpZk4WirsR7s= -github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250520065019-02ce2e62a9fd/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= +github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250604032224-16218b12b847 h1:6MFC+EIDe+n1aSX0BZ4s1/XbbPaEzQCx6JBoM1NTTz0= +github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250604032224-16218b12b847/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= github.com/milvus-io/milvus/pkg/v2 v2.0.0-20250319085209-5a6b4e56d59e h1:VCr43pG4efacDbM4au70fh8/5hNTftoWzm1iEumvDWM= github.com/milvus-io/milvus/pkg/v2 v2.0.0-20250319085209-5a6b4e56d59e/go.mod h1:37AWzxVs2NS4QUJrkcbeLUwi+4Av0h5mEdjLI62EANU= github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=