diff --git a/internal/core/src/common/Chunk.h b/internal/core/src/common/Chunk.h index 89a8c40e74..679bf50001 100644 --- a/internal/core/src/common/Chunk.h +++ b/internal/core/src/common/Chunk.h @@ -500,7 +500,7 @@ class SparseFloatVectorChunk : public Chunk { reinterpret_cast(data + null_bitmap_bytes_num); for (int i = 0; i < row_nums; i++) { vec_[i] = {(offsets_ptr[i + 1] - offsets_ptr[i]) / - knowhere::sparse::SparseRow::element_size(), + knowhere::sparse::SparseRow::element_size(), reinterpret_cast(data + offsets_ptr[i]), false}; dim_ = std::max(dim_, vec_[i].dim()); @@ -519,7 +519,7 @@ class SparseFloatVectorChunk : public Chunk { } // only for test - std::vector>& + std::vector>& Vec() { return vec_; } @@ -531,6 +531,6 @@ class SparseFloatVectorChunk : public Chunk { private: int64_t dim_ = 0; - std::vector> vec_; + std::vector> vec_; }; } // namespace milvus \ No newline at end of file diff --git a/internal/core/src/common/ChunkWriter.cpp b/internal/core/src/common/ChunkWriter.cpp index 9777749f6e..6d4595025c 100644 --- a/internal/core/src/common/ChunkWriter.cpp +++ b/internal/core/src/common/ChunkWriter.cpp @@ -447,7 +447,7 @@ create_chunk_writer(const FieldMeta& field_meta, Args&&... args) { field_meta.get_element_type(), std::forward(args)..., nullable); - case milvus::DataType::VECTOR_SPARSE_FLOAT: + case milvus::DataType::VECTOR_SPARSE_U32_F32: return std::make_shared( std::forward(args)..., nullable); case milvus::DataType::VECTOR_ARRAY: diff --git a/internal/core/src/common/FieldData.cpp b/internal/core/src/common/FieldData.cpp index 57ffff31c1..03e4ca0aa0 100644 --- a/internal/core/src/common/FieldData.cpp +++ b/internal/core/src/common/FieldData.cpp @@ -284,11 +284,11 @@ FieldDataImpl::FillFieldData( array); return FillFieldData(array_info.first, array_info.second); } - case DataType::VECTOR_SPARSE_FLOAT: { + case DataType::VECTOR_SPARSE_U32_F32: { AssertInfo(array->type()->id() == arrow::Type::type::BINARY, "inconsistent data type"); auto arr = std::dynamic_pointer_cast(array); - std::vector> values; + std::vector> values; for (size_t index = 0; index < element_count; ++index) { auto view = arr->GetString(index); values.push_back( @@ -460,7 +460,7 @@ template class FieldDataImpl; template class FieldDataImpl; template class FieldDataImpl; template class FieldDataImpl; -template class FieldDataImpl, true>; +template class FieldDataImpl, true>; template class FieldDataImpl; FieldDataPtr diff --git a/internal/core/src/common/FieldDataInterface.h b/internal/core/src/common/FieldDataInterface.h index 379b9fc8d6..f57d3fc819 100644 --- a/internal/core/src/common/FieldDataInterface.h +++ b/internal/core/src/common/FieldDataInterface.h @@ -723,14 +723,14 @@ class FieldDataJsonImpl : public FieldDataImpl { }; class FieldDataSparseVectorImpl - : public FieldDataImpl, true> { + : public FieldDataImpl, true> { public: explicit FieldDataSparseVectorImpl(DataType data_type, int64_t total_num_rows = 0) - : FieldDataImpl, true>( + : FieldDataImpl, true>( /*dim=*/1, data_type, false, total_num_rows), vec_dim_(0) { - AssertInfo(data_type == DataType::VECTOR_SPARSE_FLOAT, + AssertInfo(data_type == DataType::VECTOR_SPARSE_U32_F32, "invalid data type for sparse vector"); } @@ -753,7 +753,7 @@ class FieldDataSparseVectorImpl } // source is a pointer to element_count of - // knowhere::sparse::SparseRow + // knowhere::sparse::SparseRow void FillFieldData(const void* source, ssize_t element_count) override { if (element_count == 0) { @@ -765,7 +765,7 @@ class FieldDataSparseVectorImpl resize_field_data(length_ + element_count); } auto ptr = - static_cast*>(source); + static_cast*>(source); for (int64_t i = 0; i < element_count; ++i) { auto& row = ptr[i]; vec_dim_ = std::max(vec_dim_, row.dim()); @@ -774,7 +774,7 @@ class FieldDataSparseVectorImpl length_ += element_count; } - // each binary in array is a knowhere::sparse::SparseRow + // each binary in array is a knowhere::sparse::SparseRow void FillFieldData(const std::shared_ptr& array) override { auto n = array->length(); diff --git a/internal/core/src/common/TypeTraits.h b/internal/core/src/common/TypeTraits.h index c99ca0ebcd..537dd653b5 100644 --- a/internal/core/src/common/TypeTraits.h +++ b/internal/core/src/common/TypeTraits.h @@ -37,7 +37,7 @@ constexpr bool IsScalar = template constexpr bool IsSparse = std::is_same_v || - std::is_same_v>; + std::is_same_v>; template constexpr bool IsVariableType = @@ -52,7 +52,7 @@ template constexpr bool IsVariableTypeSupportInChunk = std::is_same_v || std::is_same_v || std::is_same_v || - std::is_same_v>; + std::is_same_v>; template using ChunkViewType = std::conditional_t< diff --git a/internal/core/src/common/Types.h b/internal/core/src/common/Types.h index 443687765f..8d93f4181e 100644 --- a/internal/core/src/common/Types.h +++ b/internal/core/src/common/Types.h @@ -63,6 +63,7 @@ using float16 = knowhere::fp16; using bfloat16 = knowhere::bf16; using bin1 = knowhere::bin1; using int8 = knowhere::int8; +using sparse_u32_f32 = knowhere::sparse_u32_f32; // See also: https://github.com/milvus-io/milvus-proto/blob/master/proto/schema.proto enum class DataType { @@ -91,7 +92,7 @@ enum class DataType { VECTOR_FLOAT = 101, VECTOR_FLOAT16 = 102, VECTOR_BFLOAT16 = 103, - VECTOR_SPARSE_FLOAT = 104, + VECTOR_SPARSE_U32_F32 = 104, VECTOR_INT8 = 105, VECTOR_ARRAY = 106, }; @@ -139,7 +140,7 @@ GetDataTypeSize(DataType data_type, int dim = 1) { return sizeof(bfloat16) * dim; case DataType::VECTOR_INT8: return sizeof(int8) * dim; - // Not supporting variable length types(such as VECTOR_SPARSE_FLOAT and + // Not supporting variable length types(such as VECTOR_SPARSE_U32_F32 and // VARCHAR) here intentionally. We can't easily estimate the size of // them. Caller of this method must handle this case themselves and must // not pass variable length types to this method. @@ -184,7 +185,7 @@ GetArrowDataType(DataType data_type, int dim = 1) { case DataType::VECTOR_FLOAT16: case DataType::VECTOR_BFLOAT16: return arrow::fixed_size_binary(dim * 2); - case DataType::VECTOR_SPARSE_FLOAT: + case DataType::VECTOR_SPARSE_U32_F32: return arrow::binary(); case DataType::VECTOR_INT8: return arrow::fixed_size_binary(dim); @@ -244,8 +245,8 @@ GetDataTypeName(DataType data_type) { return "vector_float16"; case DataType::VECTOR_BFLOAT16: return "vector_bfloat16"; - case DataType::VECTOR_SPARSE_FLOAT: - return "vector_sparse_float"; + case DataType::VECTOR_SPARSE_U32_F32: + return "VECTOR_SPARSE_U32_F32"; case DataType::VECTOR_INT8: return "vector_int8"; case DataType::VECTOR_ARRAY: @@ -386,7 +387,7 @@ IsDenseFloatVectorDataType(DataType data_type) { inline bool IsSparseFloatVectorDataType(DataType data_type) { - return data_type == DataType::VECTOR_SPARSE_FLOAT; + return data_type == DataType::VECTOR_SPARSE_U32_F32; } inline bool @@ -749,8 +750,8 @@ struct fmt::formatter : formatter { case milvus::DataType::VECTOR_BFLOAT16: name = "VECTOR_BFLOAT16"; break; - case milvus::DataType::VECTOR_SPARSE_FLOAT: - name = "VECTOR_SPARSE_FLOAT"; + case milvus::DataType::VECTOR_SPARSE_U32_F32: + name = "VECTOR_SPARSE_U32_F32"; break; case milvus::DataType::VECTOR_INT8: name = "VECTOR_INT8"; diff --git a/internal/core/src/common/Utils.h b/internal/core/src/common/Utils.h index 29e4839a4c..99643a085c 100644 --- a/internal/core/src/common/Utils.h +++ b/internal/core/src/common/Utils.h @@ -43,6 +43,7 @@ namespace milvus { (data_array->vectors().type##_vector().data()) using CheckDataValid = std::function; +using sparseValueType = typename knowhere::sparse_u32_f32::ValueType; inline DatasetPtr GenDataset(const int64_t nb, const int64_t dim, const void* xb) { @@ -245,17 +246,17 @@ EscapeBraces(const std::string& input) { return result; } -inline knowhere::sparse::SparseRow +inline knowhere::sparse::SparseRow CopyAndWrapSparseRow(const void* data, size_t size, const bool validate = false) { size_t num_elements = - size / knowhere::sparse::SparseRow::element_size(); - knowhere::sparse::SparseRow row(num_elements); + size / knowhere::sparse::SparseRow::element_size(); + knowhere::sparse::SparseRow row(num_elements); std::memcpy(row.data(), data, size); if (validate) { AssertInfo( - size % knowhere::sparse::SparseRow::element_size() == 0, + size % knowhere::sparse::SparseRow::element_size() == 0, "Invalid size for sparse row data"); for (size_t i = 0; i < num_elements; ++i) { auto element = row[i]; @@ -276,17 +277,17 @@ CopyAndWrapSparseRow(const void* data, // Iterable is a list of bytes, each is a byte array representation of a single // sparse float row. This helper function converts such byte arrays into a list -// of knowhere::sparse::SparseRow. The resulting list is a deep copy of +// of knowhere::sparse::SparseRow. The resulting list is a deep copy of // the source data. // // Here in segcore we validate the sparse row data only for search requests, // as the insert/upsert data are already validated in go code. template -std::unique_ptr[]> +std::unique_ptr[]> SparseBytesToRows(const Iterable& rows, const bool validate = false) { AssertInfo(rows.size() > 0, "at least 1 sparse row should be provided"); auto res = - std::make_unique[]>(rows.size()); + std::make_unique[]>(rows.size()); for (size_t i = 0; i < rows.size(); ++i) { res[i] = std::move( CopyAndWrapSparseRow(rows[i].data(), rows[i].size(), validate)); @@ -294,11 +295,11 @@ SparseBytesToRows(const Iterable& rows, const bool validate = false) { return res; } -// SparseRowsToProto converts a list of knowhere::sparse::SparseRow to +// SparseRowsToProto converts a list of knowhere::sparse::SparseRow to // a milvus::proto::schema::SparseFloatArray. The resulting proto is a deep copy // of the source data. source(i) returns the i-th row to be copied. inline void SparseRowsToProto( - const std::function*(size_t)>& + const std::function*(size_t)>& source, int64_t rows, milvus::proto::schema::SparseFloatArray* proto) { diff --git a/internal/core/src/common/VectorTrait.h b/internal/core/src/common/VectorTrait.h index c853577c46..11e9c8b968 100644 --- a/internal/core/src/common/VectorTrait.h +++ b/internal/core/src/common/VectorTrait.h @@ -122,7 +122,7 @@ class SparseFloatVector : public VectorTrait { public: using embedded_type = float; static constexpr int32_t dim_factor = 1; - static constexpr auto data_type = DataType::VECTOR_SPARSE_FLOAT; + static constexpr auto data_type = DataType::VECTOR_SPARSE_U32_F32; static constexpr auto c_data_type = CDataType::SparseFloatVector; static constexpr auto schema_data_type = proto::schema::DataType::SparseFloatVector; diff --git a/internal/core/src/config/ConfigKnowhere.cpp b/internal/core/src/config/ConfigKnowhere.cpp index 564a43a433..3e919050bd 100644 --- a/internal/core/src/config/ConfigKnowhere.cpp +++ b/internal/core/src/config/ConfigKnowhere.cpp @@ -93,6 +93,11 @@ KnowhereInitSearchThreadPool(const uint32_t num_threads) { } } +void +KnowhereInitFetchThreadPool(const uint32_t num_threads) { + knowhere::KnowhereConfig::SetFetchThreadPoolSize(num_threads); +} + void KnowhereInitGPUMemoryPool(const uint32_t init_size, const uint32_t max_size) { if (init_size == 0 && max_size == 0) { diff --git a/internal/core/src/config/ConfigKnowhere.h b/internal/core/src/config/ConfigKnowhere.h index 18239c4a37..bdea3d4114 100644 --- a/internal/core/src/config/ConfigKnowhere.h +++ b/internal/core/src/config/ConfigKnowhere.h @@ -35,6 +35,9 @@ KnowhereInitBuildThreadPool(const uint32_t); void KnowhereInitSearchThreadPool(const uint32_t); +void +KnowhereInitFetchThreadPool(const uint32_t); + int32_t GetMinimalIndexVersion(); diff --git a/internal/core/src/index/IndexFactory.cpp b/internal/core/src/index/IndexFactory.cpp index dd9ddd2536..09419c2925 100644 --- a/internal/core/src/index/IndexFactory.cpp +++ b/internal/core/src/index/IndexFactory.cpp @@ -184,12 +184,12 @@ IndexFactory::VecIndexLoadResource( knowhere::IndexStaticFaced::HasRawData( index_type, index_version, config); break; - case milvus::DataType::VECTOR_SPARSE_FLOAT: + case milvus::DataType::VECTOR_SPARSE_U32_F32: resource = knowhere::IndexStaticFaced< - knowhere::fp32>::EstimateLoadResource(index_type, - index_version, - index_size_gb, - config); + knowhere::sparse_u32_f32>::EstimateLoadResource(index_type, + index_version, + index_size_gb, + config); has_raw_data = knowhere::IndexStaticFaced::HasRawData( index_type, index_version, config); @@ -516,8 +516,8 @@ IndexFactory::CreateVectorIndex( return std::make_unique>( index_type, metric_type, version, file_manager_context); } - case DataType::VECTOR_SPARSE_FLOAT: { - return std::make_unique>( + case DataType::VECTOR_SPARSE_U32_F32: { + return std::make_unique>( index_type, metric_type, version, file_manager_context); } case DataType::VECTOR_ARRAY: { @@ -537,8 +537,7 @@ IndexFactory::CreateVectorIndex( } } else { // create mem index switch (data_type) { - case DataType::VECTOR_FLOAT: - case DataType::VECTOR_SPARSE_FLOAT: { + case DataType::VECTOR_FLOAT: { return std::make_unique>( DataType::NONE, index_type, @@ -547,6 +546,15 @@ IndexFactory::CreateVectorIndex( use_knowhere_build_pool, file_manager_context); } + case DataType::VECTOR_SPARSE_U32_F32: { + return std::make_unique>( + DataType::NONE, + index_type, + metric_type, + version, + use_knowhere_build_pool, + file_manager_context); + } case DataType::VECTOR_BINARY: { return std::make_unique>( DataType::NONE, @@ -596,11 +604,19 @@ IndexFactory::CreateVectorIndex( version, use_knowhere_build_pool, file_manager_context); + case DataType::VECTOR_SPARSE_U32_F32: + return std::make_unique>( + element_type, + index_type, + metric_type, + version, + use_knowhere_build_pool, + file_manager_context); default: ThrowInfo(NotImplemented, fmt::format("not implemented data type to " "build mem index: {}", - data_type)); + element_type)); } } default: diff --git a/internal/core/src/index/VectorDiskIndex.cpp b/internal/core/src/index/VectorDiskIndex.cpp index dae133305b..e3410dcfa3 100644 --- a/internal/core/src/index/VectorDiskIndex.cpp +++ b/internal/core/src/index/VectorDiskIndex.cpp @@ -168,7 +168,7 @@ VectorDiskAnnIndex::Build(const Config& config) { index_.IsAdditionalScalarSupported( is_partition_key_isolation.value_or(false))) { build_config[VEC_OPT_FIELDS_PATH] = - file_manager_->CacheOptFieldToDisk(opt_fields.value()); + file_manager_->CacheOptFieldToDisk(config); // `partition_key_isolation` is already in the config, so it falls through // into the index Build call directly } @@ -415,5 +415,6 @@ template class VectorDiskAnnIndex; template class VectorDiskAnnIndex; template class VectorDiskAnnIndex; template class VectorDiskAnnIndex; +template class VectorDiskAnnIndex; } // namespace milvus::index diff --git a/internal/core/src/index/VectorDiskIndex.h b/internal/core/src/index/VectorDiskIndex.h index 4733a1bb58..9aec130d22 100644 --- a/internal/core/src/index/VectorDiskIndex.h +++ b/internal/core/src/index/VectorDiskIndex.h @@ -80,7 +80,7 @@ class VectorDiskAnnIndex : public VectorIndex { std::vector GetVector(const DatasetPtr dataset) const override; - std::unique_ptr[]> + std::unique_ptr[]> GetSparseVector(const DatasetPtr dataset) const override { ThrowInfo(ErrorCode::Unsupported, "get sparse vector not supported for disk index"); diff --git a/internal/core/src/index/VectorIndex.h b/internal/core/src/index/VectorIndex.h index dd0509f534..0c1ec63fa8 100644 --- a/internal/core/src/index/VectorIndex.h +++ b/internal/core/src/index/VectorIndex.h @@ -76,7 +76,7 @@ class VectorIndex : public IndexBase { virtual std::vector GetVector(const DatasetPtr dataset) const = 0; - virtual std::unique_ptr[]> + virtual std::unique_ptr[]> GetSparseVector(const DatasetPtr dataset) const = 0; IndexType diff --git a/internal/core/src/index/VectorMemIndex.cpp b/internal/core/src/index/VectorMemIndex.cpp index 1078802c65..d3e4a23c60 100644 --- a/internal/core/src/index/VectorMemIndex.cpp +++ b/internal/core/src/index/VectorMemIndex.cpp @@ -426,10 +426,10 @@ VectorMemIndex::Build(const Config& config) { field_data) ->Dim()); } - std::vector> vec(total_rows); + std::vector> vec(total_rows); int64_t offset = 0; for (auto field_data : field_datas) { - auto ptr = static_cast*>( + auto ptr = static_cast*>( field_data->Data()); AssertInfo(ptr, "failed to cast field data to sparse rows"); for (size_t i = 0; i < field_data->Length(); ++i) { @@ -570,7 +570,7 @@ VectorMemIndex::GetVector(const DatasetPtr dataset) const { } template -std::unique_ptr[]> +std::unique_ptr[]> VectorMemIndex::GetSparseVector(const DatasetPtr dataset) const { auto res = index_.GetVectorByIds(dataset); if (!res.has_value()) { @@ -579,8 +579,8 @@ VectorMemIndex::GetSparseVector(const DatasetPtr dataset) const { } // release and transfer ownership to the result unique ptr. res.value()->SetIsOwner(false); - return std::unique_ptr[]>( - static_cast*>( + return std::unique_ptr[]>( + static_cast*>( res.value()->GetTensor())); } @@ -751,5 +751,6 @@ template class VectorMemIndex; template class VectorMemIndex; template class VectorMemIndex; template class VectorMemIndex; +template class VectorMemIndex; } // namespace milvus::index diff --git a/internal/core/src/index/VectorMemIndex.h b/internal/core/src/index/VectorMemIndex.h index 50a0e37fc7..45a9b02861 100644 --- a/internal/core/src/index/VectorMemIndex.h +++ b/internal/core/src/index/VectorMemIndex.h @@ -87,7 +87,7 @@ class VectorMemIndex : public VectorIndex { std::vector GetVector(const DatasetPtr dataset) const override; - std::unique_ptr[]> + std::unique_ptr[]> GetSparseVector(const DatasetPtr dataset) const override; IndexStatsPtr diff --git a/internal/core/src/indexbuilder/IndexFactory.h b/internal/core/src/indexbuilder/IndexFactory.h index d5019f9ab3..4ff2ddfd26 100644 --- a/internal/core/src/indexbuilder/IndexFactory.h +++ b/internal/core/src/indexbuilder/IndexFactory.h @@ -68,7 +68,7 @@ class IndexFactory { case DataType::VECTOR_FLOAT16: case DataType::VECTOR_BFLOAT16: case DataType::VECTOR_BINARY: - case DataType::VECTOR_SPARSE_FLOAT: + case DataType::VECTOR_SPARSE_U32_F32: case DataType::VECTOR_INT8: case DataType::VECTOR_ARRAY: return std::make_unique(type, config, context); diff --git a/internal/core/src/mmap/ChunkData.h b/internal/core/src/mmap/ChunkData.h index 58c47f7d13..c9376da172 100644 --- a/internal/core/src/mmap/ChunkData.h +++ b/internal/core/src/mmap/ChunkData.h @@ -134,8 +134,8 @@ VariableLengthChunk::set( // Template specialization for sparse vector template <> inline void -VariableLengthChunk>::set( - const knowhere::sparse::SparseRow* src, +VariableLengthChunk>::set( + const knowhere::sparse::SparseRow* src, uint32_t begin, uint32_t length, const std::optional& check_data_valid) { @@ -158,7 +158,7 @@ VariableLengthChunk>::set( uint8_t* data_ptr = buf + offset; std::memcpy(data_ptr, (uint8_t*)src[i].data(), data_size); data_[i + begin] = - knowhere::sparse::SparseRow(src[i].size(), data_ptr, false); + knowhere::sparse::SparseRow(src[i].size(), data_ptr, false); offset += data_size; } } diff --git a/internal/core/src/monitor/monitor_c.cpp b/internal/core/src/monitor/monitor_c.cpp index b4888fee5f..42663ba2de 100644 --- a/internal/core/src/monitor/monitor_c.cpp +++ b/internal/core/src/monitor/monitor_c.cpp @@ -16,9 +16,9 @@ char* GetCoreMetrics() { - auto str = milvus::monitor::prometheusClient->GetMetrics(); + auto str = milvus::monitor::getPrometheusClient().GetMetrics(); auto len = str.length(); - char* res = (char*)malloc(len + 1); + char* res = static_cast(malloc(len + 1)); memcpy(res, str.data(), len); res[len] = '\0'; return res; diff --git a/internal/core/src/monitor/scope_metric.cpp b/internal/core/src/monitor/scope_metric.cpp index 01088756ab..1027df08a5 100644 --- a/internal/core/src/monitor/scope_metric.cpp +++ b/internal/core/src/monitor/scope_metric.cpp @@ -27,10 +27,11 @@ const prometheus::Histogram::BucketBoundaries cgoCallDurationbuckets = { // One histogram per function name (label) static inline prometheus::Histogram& GetHistogram(std::string&& func) { - static auto& hist_family = prometheus::BuildHistogram() - .Name("milvus_cgocall_duration_seconds") - .Help("Duration of cgo-exposed functions") - .Register(prometheusClient->GetRegistry()); + static auto& hist_family = + prometheus::BuildHistogram() + .Name("milvus_cgocall_duration_seconds") + .Help("Duration of cgo-exposed functions") + .Register(getPrometheusClient().GetRegistry()); // default buckets: [0.005, 0.01, ..., 1.0] return hist_family.Add({{"func", func}}, cgoCallDurationbuckets); diff --git a/internal/core/src/query/PlanImpl.h b/internal/core/src/query/PlanImpl.h index 19f98fffaa..4bb95d43ec 100644 --- a/internal/core/src/query/PlanImpl.h +++ b/internal/core/src/query/PlanImpl.h @@ -23,6 +23,7 @@ #include "common/Json.h" #include "common/Consts.h" #include "common/Schema.h" +#include "common/Utils.h" namespace milvus::query { @@ -80,7 +81,7 @@ struct Placeholder { // only one of blob_ and sparse_matrix_ should be set. blob_ is used for // dense vector search and sparse_matrix_ is for sparse vector search. aligned_vector blob_; - std::unique_ptr[]> sparse_matrix_; + std::unique_ptr[]> sparse_matrix_; // offsets for embedding list aligned_vector lims_; diff --git a/internal/core/src/query/SearchBruteForce.cpp b/internal/core/src/query/SearchBruteForce.cpp index 18435755dd..db552d1c55 100644 --- a/internal/core/src/query/SearchBruteForce.cpp +++ b/internal/core/src/query/SearchBruteForce.cpp @@ -106,7 +106,7 @@ PrepareBFDataSet(const dataset::SearchDataset& query_ds, query_dataset->SetRows(query_ds.query_lims[query_ds.num_queries]); } - if (data_type == DataType::VECTOR_SPARSE_FLOAT) { + if (data_type == DataType::VECTOR_SPARSE_U32_F32) { base_dataset->SetIsSparse(true); query_dataset->SetIsSparse(true); } @@ -168,9 +168,9 @@ BruteForceSearch(const dataset::SearchDataset& query_ds, } else if (data_type == DataType::VECTOR_BINARY) { res = knowhere::BruteForce::RangeSearch( base_dataset, query_dataset, search_cfg, bitset); - } else if (data_type == DataType::VECTOR_SPARSE_FLOAT) { + } else if (data_type == DataType::VECTOR_SPARSE_U32_F32) { res = knowhere::BruteForce::RangeSearch< - knowhere::sparse::SparseRow>( + knowhere::sparse::SparseRow>( base_dataset, query_dataset, search_cfg, bitset); } else if (data_type == DataType::VECTOR_INT8) { res = knowhere::BruteForce::RangeSearch( @@ -229,7 +229,7 @@ BruteForceSearch(const dataset::SearchDataset& query_ds, sub_result.mutable_distances().data(), search_cfg, bitset); - } else if (data_type == DataType::VECTOR_SPARSE_FLOAT) { + } else if (data_type == DataType::VECTOR_SPARSE_U32_F32) { stat = knowhere::BruteForce::SearchSparseWithBuf( base_dataset, query_dataset, @@ -279,9 +279,9 @@ DispatchBruteForceIteratorByDataType(const knowhere::DataSetPtr& base_dataset, case DataType::VECTOR_BFLOAT16: return knowhere::BruteForce::AnnIterator( base_dataset, query_dataset, config, bitset); - case DataType::VECTOR_SPARSE_FLOAT: + case DataType::VECTOR_SPARSE_U32_F32: return knowhere::BruteForce::AnnIterator< - knowhere::sparse::SparseRow>( + knowhere::sparse::SparseRow>( base_dataset, query_dataset, config, bitset); case DataType::VECTOR_INT8: return knowhere::BruteForce::AnnIterator( diff --git a/internal/core/src/query/SearchOnGrowing.cpp b/internal/core/src/query/SearchOnGrowing.cpp index 4e367db994..d001d94478 100644 --- a/internal/core/src/query/SearchOnGrowing.cpp +++ b/internal/core/src/query/SearchOnGrowing.cpp @@ -38,13 +38,13 @@ FloatSegmentIndexSearch(const segcore::SegmentGrowingImpl& segment, auto vecfield_id = info.field_id_; auto& field = schema[vecfield_id]; - auto is_sparse = field.get_data_type() == DataType::VECTOR_SPARSE_FLOAT; + auto is_sparse = field.get_data_type() == DataType::VECTOR_SPARSE_U32_F32; // TODO(SPARSE): see todo in PlanImpl.h::PlaceHolder. auto dim = is_sparse ? 0 : field.get_dim(); AssertInfo(IsVectorDataType(field.get_data_type()), "[FloatSearch]Field data type isn't VECTOR_FLOAT, " - "VECTOR_FLOAT16, VECTOR_BFLOAT16 or VECTOR_SPARSE_FLOAT"); + "VECTOR_FLOAT16, VECTOR_BFLOAT16 or VECTOR_SPARSE_U32_F32"); dataset::SearchDataset search_dataset{info.metric_type_, num_queries, info.topk_, @@ -119,7 +119,7 @@ SearchOnGrowing(const segcore::SegmentGrowingImpl& segment, } SubSearchResult final_qr(num_queries, topk, metric_type, round_decimal); // TODO(SPARSE): see todo in PlanImpl.h::PlaceHolder. - auto dim = field.get_data_type() == DataType::VECTOR_SPARSE_FLOAT + auto dim = field.get_data_type() == DataType::VECTOR_SPARSE_U32_F32 ? 0 : field.get_dim(); dataset::SearchDataset search_dataset{metric_type, diff --git a/internal/core/src/query/SearchOnSealed.cpp b/internal/core/src/query/SearchOnSealed.cpp index ed76aa40ac..6a2631f280 100644 --- a/internal/core/src/query/SearchOnSealed.cpp +++ b/internal/core/src/query/SearchOnSealed.cpp @@ -40,7 +40,7 @@ SearchOnSealedIndex(const Schema& schema, auto field_id = search_info.field_id_; auto& field = schema[field_id]; - auto is_sparse = field.get_data_type() == DataType::VECTOR_SPARSE_FLOAT; + auto is_sparse = field.get_data_type() == DataType::VECTOR_SPARSE_U32_F32; // TODO(SPARSE): see todo in PlanImpl.h::PlaceHolder. auto dim = is_sparse ? 0 : field.get_dim(); @@ -115,7 +115,7 @@ SearchOnSealedColumn(const Schema& schema, auto data_type = field.get_data_type(); auto element_type = field.get_element_type(); // TODO(SPARSE): see todo in PlanImpl.h::PlaceHolder. - auto dim = data_type == DataType::VECTOR_SPARSE_FLOAT ? 0 : field.get_dim(); + auto dim = data_type == DataType::VECTOR_SPARSE_U32_F32 ? 0 : field.get_dim(); query::dataset::SearchDataset query_dataset{search_info.metric_type_, num_queries, diff --git a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp index 61a5115a65..a43f239186 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp +++ b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp @@ -813,7 +813,7 @@ ChunkedSegmentSealedImpl::get_vector(FieldId field_id, if (has_raw_data) { // If index has raw data, get vector from memory. auto ids_ds = GenIdsDataset(count, ids); - if (field_meta.get_data_type() == DataType::VECTOR_SPARSE_FLOAT) { + if (field_meta.get_data_type() == DataType::VECTOR_SPARSE_U32_F32) { auto res = vec_index->GetSparseVector(ids_ds); return segcore::CreateVectorDataArrayFrom( res.get(), count, field_meta); @@ -1752,7 +1752,7 @@ ChunkedSegmentSealedImpl::get_raw_data(FieldId field_id, ret->mutable_vectors()->mutable_int8_vector()->data()); break; } - case DataType::VECTOR_SPARSE_FLOAT: { + case DataType::VECTOR_SPARSE_U32_F32: { auto dst = ret->mutable_vectors()->mutable_sparse_float_vector(); int64_t max_dim = 0; column->BulkValueAt( @@ -1761,7 +1761,7 @@ ChunkedSegmentSealedImpl::get_raw_data(FieldId field_id, auto row = offset != INVALID_SEG_OFFSET ? static_cast< - const knowhere::sparse::SparseRow*>( + const knowhere::sparse::SparseRow*>( static_cast(value)) : nullptr; if (row == nullptr) { @@ -2108,7 +2108,7 @@ ChunkedSegmentSealedImpl::generate_interim_index(const FieldId field_id, auto& index_params = field_index_meta.GetIndexParams(); bool is_sparse = - field_meta.get_data_type() == DataType::VECTOR_SPARSE_FLOAT; + field_meta.get_data_type() == DataType::VECTOR_SPARSE_U32_F32; bool enable_growing_mmap = storage::MmapManager::GetInstance() .GetMmapConfig() diff --git a/internal/core/src/segcore/ConcurrentVector.cpp b/internal/core/src/segcore/ConcurrentVector.cpp index 0c44ac0eb7..f8d01e758e 100644 --- a/internal/core/src/segcore/ConcurrentVector.cpp +++ b/internal/core/src/segcore/ConcurrentVector.cpp @@ -37,7 +37,7 @@ VectorBase::set_data_raw(ssize_t element_offset, return set_data_raw( element_offset, VEC_FIELD_DATA(data, bfloat16), element_count); } else if (field_meta.get_data_type() == - DataType::VECTOR_SPARSE_FLOAT) { + DataType::VECTOR_SPARSE_U32_F32) { return set_data_raw( element_offset, SparseBytesToRows( diff --git a/internal/core/src/segcore/ConcurrentVector.h b/internal/core/src/segcore/ConcurrentVector.h index 15a4ab4a8c..bdb1b6c524 100644 --- a/internal/core/src/segcore/ConcurrentVector.h +++ b/internal/core/src/segcore/ConcurrentVector.h @@ -504,13 +504,13 @@ class ConcurrentVector template <> class ConcurrentVector - : public ConcurrentVectorImpl, true> { + : public ConcurrentVectorImpl, true> { public: explicit ConcurrentVector( int64_t size_per_chunk, storage::MmapChunkDescriptorPtr mmap_descriptor = nullptr, ThreadSafeValidDataPtr valid_data_ptr = nullptr) - : ConcurrentVectorImpl, + : ConcurrentVectorImpl, true>::ConcurrentVectorImpl(1, size_per_chunk, std::move( @@ -524,11 +524,11 @@ class ConcurrentVector const void* source, ssize_t element_count) override { auto* src = - static_cast*>(source); + static_cast*>(source); for (int i = 0; i < element_count; ++i) { dim_ = std::max(dim_, src[i].dim()); } - ConcurrentVectorImpl, + ConcurrentVectorImpl, true>::set_data_raw(element_offset, source, element_count); diff --git a/internal/core/src/segcore/FieldIndexing.cpp b/internal/core/src/segcore/FieldIndexing.cpp index 27b719cb6b..b79118c0f8 100644 --- a/internal/core/src/segcore/FieldIndexing.cpp +++ b/internal/core/src/segcore/FieldIndexing.cpp @@ -46,7 +46,7 @@ void VectorFieldIndexing::recreate_index(DataType data_type, const VectorBase* field_raw_data) { if (IsSparseFloatVectorDataType(data_type)) { - index_ = std::make_unique>( + index_ = std::make_unique>( DataType::NONE, config_->GetIndexType(), config_->GetMetricType(), @@ -150,7 +150,7 @@ VectorFieldIndexing::AppendSegmentIndexSparse(int64_t reserved_offset, auto dim = source->Dim(); while (total_rows > 0) { - auto mat = static_cast*>( + auto mat = static_cast*>( source->get_chunk_data(chunk_id)); auto rows = std::min(source->get_size_per_chunk(), total_rows); auto dataset = knowhere::GenDataSet(rows, dim, mat); @@ -336,7 +336,7 @@ CreateIndex(const FieldMeta& field_meta, field_meta.get_data_type() == DataType::VECTOR_FLOAT16 || field_meta.get_data_type() == DataType::VECTOR_BFLOAT16 || field_meta.get_data_type() == DataType::VECTOR_INT8 || - field_meta.get_data_type() == DataType::VECTOR_SPARSE_FLOAT) { + field_meta.get_data_type() == DataType::VECTOR_SPARSE_U32_F32) { return std::make_unique(field_meta, field_index_meta, segment_max_row_count, diff --git a/internal/core/src/segcore/FieldIndexing.h b/internal/core/src/segcore/FieldIndexing.h index 68d21bd764..4cbc2cbae0 100644 --- a/internal/core/src/segcore/FieldIndexing.h +++ b/internal/core/src/segcore/FieldIndexing.h @@ -345,7 +345,7 @@ class IndexingRecord { size, field_raw_data, stream_data->vectors().bfloat16_vector().data()); - } else if (type == DataType::VECTOR_SPARSE_FLOAT) { + } else if (type == DataType::VECTOR_SPARSE_U32_F32) { auto data = SparseBytesToRows( stream_data->vectors().sparse_float_vector().contents()); indexing->AppendSegmentIndexSparse( @@ -378,7 +378,7 @@ class IndexingRecord { auto vec_base = record.get_data_base(fieldId); indexing->AppendSegmentIndexDense( reserved_offset, size, vec_base, data->Data()); - } else if (type == DataType::VECTOR_SPARSE_FLOAT) { + } else if (type == DataType::VECTOR_SPARSE_U32_F32) { auto vec_base = record.get_data_base(fieldId); indexing->AppendSegmentIndexSparse( reserved_offset, @@ -406,7 +406,7 @@ class IndexingRecord { if (data_type == DataType::VECTOR_FLOAT || data_type == DataType::VECTOR_FLOAT16 || data_type == DataType::VECTOR_BFLOAT16 || - data_type == DataType::VECTOR_SPARSE_FLOAT) { + data_type == DataType::VECTOR_SPARSE_U32_F32) { indexing->GetDataFromIndex( seg_offsets, count, element_size, output_raw); } diff --git a/internal/core/src/segcore/InsertRecord.h b/internal/core/src/segcore/InsertRecord.h index 9b44cb529c..8e5c1a5142 100644 --- a/internal/core/src/segcore/InsertRecord.h +++ b/internal/core/src/segcore/InsertRecord.h @@ -699,7 +699,7 @@ struct InsertRecord : public InsertRecord { dense_vec_mmap_descriptor); return; } else if (field_meta.get_data_type() == - DataType::VECTOR_SPARSE_FLOAT) { + DataType::VECTOR_SPARSE_U32_F32) { this->append_data( field_id, size_per_chunk, vec_mmap_descriptor); return; diff --git a/internal/core/src/segcore/SegmentGrowingImpl.cpp b/internal/core/src/segcore/SegmentGrowingImpl.cpp index 61b0b2988f..4bf3e37f85 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.cpp +++ b/internal/core/src/segcore/SegmentGrowingImpl.cpp @@ -782,7 +782,7 @@ SegmentGrowingImpl::bulk_subscript(FieldId field_id, count, result->mutable_vectors()->mutable_bfloat16_vector()->data()); } else if (field_meta.get_data_type() == - DataType::VECTOR_SPARSE_FLOAT) { + DataType::VECTOR_SPARSE_U32_F32) { bulk_subscript_sparse_float_vector_impl( field_id, (const ConcurrentVector*)vec_ptr, diff --git a/internal/core/src/segcore/Utils.cpp b/internal/core/src/segcore/Utils.cpp index 06dc899c65..bf92393ac1 100644 --- a/internal/core/src/segcore/Utils.cpp +++ b/internal/core/src/segcore/Utils.cpp @@ -210,7 +210,7 @@ GetRawDataSizeOfDataArray(const DataArray* data, break; } - case DataType::VECTOR_SPARSE_FLOAT: { + case DataType::VECTOR_SPARSE_U32_F32: { // TODO(SPARSE, size) result += data->vectors().sparse_float_vector().ByteSizeLong(); break; @@ -342,7 +342,7 @@ CreateEmptyVectorDataArray(int64_t count, const FieldMeta& field_meta) { auto vector_array = data_array->mutable_vectors(); auto dim = 0; - if (data_type != DataType::VECTOR_SPARSE_FLOAT) { + if (data_type != DataType::VECTOR_SPARSE_U32_F32) { dim = field_meta.get_dim(); vector_array->set_dim(dim); } @@ -373,7 +373,7 @@ CreateEmptyVectorDataArray(int64_t count, const FieldMeta& field_meta) { obj->resize(length * sizeof(bfloat16)); break; } - case DataType::VECTOR_SPARSE_FLOAT: { + case DataType::VECTOR_SPARSE_U32_F32: { // does nothing here break; } @@ -544,11 +544,11 @@ CreateVectorDataArrayFrom(const void* data_raw, obj->assign(data, length * sizeof(bfloat16)); break; } - case DataType::VECTOR_SPARSE_FLOAT: { + case DataType::VECTOR_SPARSE_U32_F32: { SparseRowsToProto( [&](size_t i) { return reinterpret_cast< - const knowhere::sparse::SparseRow*>( + const knowhere::sparse::SparseRow*>( data_raw) + i; }, @@ -655,7 +655,7 @@ MergeDataArray(std::vector& merge_bases, auto obj = vector_array->mutable_binary_vector(); obj->assign(data + src_offset * num_bytes, num_bytes); } else if (field_meta.get_data_type() == - DataType::VECTOR_SPARSE_FLOAT) { + DataType::VECTOR_SPARSE_U32_F32) { auto src = src_field_data->vectors().sparse_float_vector(); auto dst = vector_array->mutable_sparse_float_vector(); if (src.dim() > dst->dim()) { diff --git a/internal/core/src/segcore/segcore_init_c.cpp b/internal/core/src/segcore/segcore_init_c.cpp index dbdb1b75b2..b7cd5784f2 100644 --- a/internal/core/src/segcore/segcore_init_c.cpp +++ b/internal/core/src/segcore/segcore_init_c.cpp @@ -123,6 +123,11 @@ SegcoreSetKnowhereSearchThreadPoolNum(const uint32_t num_threads) { milvus::config::KnowhereInitSearchThreadPool(num_threads); } +extern "C" void +SegcoreSetKnowhereFetchThreadPoolNum(const uint32_t num_threads) { + milvus::config::KnowhereInitFetchThreadPool(num_threads); +} + extern "C" void SegcoreSetKnowhereGpuMemoryPoolSize(const uint32_t init_size, const uint32_t max_size) { diff --git a/internal/core/src/segcore/segcore_init_c.h b/internal/core/src/segcore/segcore_init_c.h index 0879d28719..6eb8584976 100644 --- a/internal/core/src/segcore/segcore_init_c.h +++ b/internal/core/src/segcore/segcore_init_c.h @@ -71,6 +71,9 @@ SegcoreSetKnowhereBuildThreadPoolNum(const uint32_t num_threads); void SegcoreSetKnowhereSearchThreadPoolNum(const uint32_t num_threads); +void +SegcoreSetKnowhereFetchThreadPoolNum(const uint32_t num_threads); + void SegcoreSetKnowhereGpuMemoryPoolSize(const uint32_t init_size, const uint32_t max_size); diff --git a/internal/core/src/segcore/storagev1translator/InterimSealedIndexTranslator.cpp b/internal/core/src/segcore/storagev1translator/InterimSealedIndexTranslator.cpp index 33b8ef3d61..952b128cd7 100644 --- a/internal/core/src/segcore/storagev1translator/InterimSealedIndexTranslator.cpp +++ b/internal/core/src/segcore/storagev1translator/InterimSealedIndexTranslator.cpp @@ -105,7 +105,8 @@ InterimSealedIndexTranslator::get_cells( false); } } else { - vec_index = std::make_unique>( + // sparse vector case + vec_index = std::make_unique>( DataType::NONE, index_type_, metric_type_, diff --git a/internal/core/src/segcore/vector_index_c.cpp b/internal/core/src/segcore/vector_index_c.cpp index 9018ddc700..275633f932 100644 --- a/internal/core/src/segcore/vector_index_c.cpp +++ b/internal/core/src/segcore/vector_index_c.cpp @@ -75,9 +75,9 @@ ValidateIndexParams(const char* index_type, knowhere::Version::GetCurrentVersion().VersionNumber(), json, error_msg); - } else if (dataType == milvus::DataType::VECTOR_SPARSE_FLOAT) { + } else if (dataType == milvus::DataType::VECTOR_SPARSE_U32_F32) { status = - knowhere::IndexStaticFaced::ConfigCheck( + knowhere::IndexStaticFaced::ConfigCheck( index_type, knowhere::Version::GetCurrentVersion().VersionNumber(), json, diff --git a/internal/core/src/storage/DiskFileManagerImpl.cpp b/internal/core/src/storage/DiskFileManagerImpl.cpp index 61fc335294..aaf338e8da 100644 --- a/internal/core/src/storage/DiskFileManagerImpl.cpp +++ b/internal/core/src/storage/DiskFileManagerImpl.cpp @@ -476,7 +476,7 @@ DiskFileManagerImpl::cache_raw_data_to_disk_common( GetFieldDataMeta().segment_id, GetFieldDataMeta().field_id) + "raw_data"; - if (dt == milvus::DataType::VECTOR_SPARSE_FLOAT) { + if (dt == milvus::DataType::VECTOR_SPARSE_U32_F32) { local_data_path += ".sparse_u32_f32"; } local_chunk_manager->CreateFile(local_data_path); @@ -484,13 +484,13 @@ DiskFileManagerImpl::cache_raw_data_to_disk_common( init_file_info(data_type); file_created = true; } - if (data_type == milvus::DataType::VECTOR_SPARSE_FLOAT) { + if (data_type == milvus::DataType::VECTOR_SPARSE_U32_F32) { dim = (uint32_t)(std::dynamic_pointer_cast>( field_data) ->Dim()); auto sparse_rows = - static_cast*>( + static_cast*>( field_data->Data()); for (size_t i = 0; i < field_data->Length(); ++i) { auto row = sparse_rows[i]; @@ -620,9 +620,11 @@ WriteOptFieldIvfDataImpl( // Do not write to disk if there is only one value if (mp.size() <= 1) { + LOG_INFO("There are only one category, skip caching to local disk"); return false; } + LOG_INFO("Get opt fields with {} categories", mp.size()); local_chunk_manager->Write(local_data_path, write_offset, const_cast(&field_id), @@ -712,7 +714,31 @@ WriteOptFieldsIvfMeta( } std::string -DiskFileManagerImpl::CacheOptFieldToDisk(OptFieldT& fields_map) { +DiskFileManagerImpl::CacheOptFieldToDisk(const Config& config) { + auto storage_version = + index::GetValueFromConfig(config, STORAGE_VERSION_KEY) + .value_or(0); + auto opt_fields = + index::GetValueFromConfig(config, VEC_OPT_FIELDS); + if (!opt_fields.has_value()) { + return ""; + } + + std::vector> remote_files_storage_v2; + if (storage_version == STORAGE_V2) { + auto segment_insert_files = + index::GetValueFromConfig>>( + config, SEGMENT_INSERT_FILES_KEY); + AssertInfo(segment_insert_files.has_value(), + "segment insert files is empty when build index while " + "caching opt fields"); + remote_files_storage_v2 = segment_insert_files.value(); + for (auto& remote_files : remote_files_storage_v2) { + SortByPath(remote_files); + } + } + + auto fields_map = opt_fields.value(); const uint32_t num_of_fields = fields_map.size(); if (0 == num_of_fields) { return ""; @@ -737,15 +763,22 @@ DiskFileManagerImpl::CacheOptFieldToDisk(OptFieldT& fields_map) { std::unordered_set actual_field_ids; for (auto& [field_id, tup] : fields_map) { const auto& field_type = std::get<1>(tup); - auto& field_paths = std::get<2>(tup); - if (0 == field_paths.size()) { - LOG_WARN("optional field {} has no data", field_id); - return ""; - } - SortByPath(field_paths); - std::vector field_datas = - FetchFieldData(rcm_.get(), field_paths); + std::vector field_datas; + // fetch scalar data from storage v2 + if (storage_version == STORAGE_V2) { + field_datas = GetFieldDatasFromStorageV2( + remote_files_storage_v2, field_id, field_type, 1, fs_); + } else { // original way + auto& field_paths = std::get<2>(tup); + if (0 == field_paths.size()) { + LOG_WARN("optional field {} has no data", field_id); + return ""; + } + + SortByPath(field_paths); + field_datas = FetchFieldData(rcm_.get(), field_paths); + } if (WriteOptFieldIvfData(field_type, field_id, @@ -934,6 +967,8 @@ template std::string DiskFileManagerImpl::CacheRawDataToDisk(const Config& config); template std::string DiskFileManagerImpl::CacheRawDataToDisk(const Config& config); +template std::string +DiskFileManagerImpl::CacheRawDataToDisk(const Config& config); std::string DiskFileManagerImpl::GetRemoteIndexFilePrefixV2() const { diff --git a/internal/core/src/storage/DiskFileManagerImpl.h b/internal/core/src/storage/DiskFileManagerImpl.h index 39db9fc7fd..34554cad64 100644 --- a/internal/core/src/storage/DiskFileManagerImpl.h +++ b/internal/core/src/storage/DiskFileManagerImpl.h @@ -158,7 +158,7 @@ class DiskFileManagerImpl : public FileManagerImpl { CacheRawDataToDisk(const Config& config); std::string - CacheOptFieldToDisk(OptFieldT& fields_map); + CacheOptFieldToDisk(const Config& config); std::string GetRemoteIndexPrefix() const { diff --git a/internal/core/src/storage/Event.cpp b/internal/core/src/storage/Event.cpp index dd06f68959..bf01c5980c 100644 --- a/internal/core/src/storage/Event.cpp +++ b/internal/core/src/storage/Event.cpp @@ -300,11 +300,11 @@ BaseEventData::Serialize() { } break; } - case DataType::VECTOR_SPARSE_FLOAT: { + case DataType::VECTOR_SPARSE_U32_F32: { for (size_t offset = 0; offset < field_data->get_num_rows(); ++offset) { auto row = - static_cast*>( + static_cast*>( field_data->RawValue(offset)); payload_writer->add_one_binary_payload( static_cast(row->data()), diff --git a/internal/core/src/storage/PayloadWriter.cpp b/internal/core/src/storage/PayloadWriter.cpp index c7722c11b8..68e16f719c 100644 --- a/internal/core/src/storage/PayloadWriter.cpp +++ b/internal/core/src/storage/PayloadWriter.cpp @@ -32,7 +32,7 @@ PayloadWriter::PayloadWriter(const DataType column_type, bool nullable) // create payload writer for vector data type PayloadWriter::PayloadWriter(const DataType column_type, int dim, bool nullable) : column_type_(column_type), nullable_(nullable) { - AssertInfo(column_type != DataType::VECTOR_SPARSE_FLOAT, + AssertInfo(column_type != DataType::VECTOR_SPARSE_U32_F32, "PayloadWriter for Sparse Float Vector should be created " "using the constructor without dimension"); AssertInfo(nullable == false, "only scalcar type support null now"); diff --git a/internal/core/src/storage/RemoteInputStream.cpp b/internal/core/src/storage/RemoteInputStream.cpp index f9be7781da..b7e7771f3c 100644 --- a/internal/core/src/storage/RemoteInputStream.cpp +++ b/internal/core/src/storage/RemoteInputStream.cpp @@ -20,6 +20,13 @@ RemoteInputStream::Read(void* data, size_t size) { return static_cast(status.ValueOrDie()); } +size_t +RemoteInputStream::ReadAt(void* data, size_t offset, size_t size) { + auto status = remote_file_->ReadAt(offset, size, data); + AssertInfo(status.ok(), "Failed to read from input stream"); + return static_cast(status.ValueOrDie()); +} + size_t RemoteInputStream::Read(int fd, size_t size) { size_t read_batch_size = diff --git a/internal/core/src/storage/RemoteInputStream.h b/internal/core/src/storage/RemoteInputStream.h index 159956607a..9829bd1eeb 100644 --- a/internal/core/src/storage/RemoteInputStream.h +++ b/internal/core/src/storage/RemoteInputStream.h @@ -29,6 +29,9 @@ class RemoteInputStream : public milvus::InputStream { size_t Read(void* data, size_t size) override; + size_t + ReadAt(void* data, size_t offset, size_t size) override; + size_t Read(int fd, size_t size) override; diff --git a/internal/core/src/storage/Util.cpp b/internal/core/src/storage/Util.cpp index 231af0b668..83dcf11687 100644 --- a/internal/core/src/storage/Util.cpp +++ b/internal/core/src/storage/Util.cpp @@ -206,7 +206,7 @@ AddPayloadToArrowBuilder(std::shared_ptr builder, add_vector_payload(builder, const_cast(raw_data), length); break; } - case DataType::VECTOR_SPARSE_FLOAT: { + case DataType::VECTOR_SPARSE_U32_F32: { ThrowInfo(DataTypeInvalid, "Sparse Float Vector payload should be added by calling " "add_one_binary_payload", @@ -287,7 +287,7 @@ CreateArrowBuilder(DataType data_type) { return std::make_shared(); } // sparse float vector doesn't require a dim - case DataType::VECTOR_SPARSE_FLOAT: { + case DataType::VECTOR_SPARSE_U32_F32: { return std::make_shared(); } default: { @@ -416,7 +416,7 @@ CreateArrowSchema(DataType data_type, bool nullable) { {arrow::field("val", arrow::binary(), nullable)}); } // sparse float vector doesn't require a dim - case DataType::VECTOR_SPARSE_FLOAT: { + case DataType::VECTOR_SPARSE_U32_F32: { return arrow::schema( {arrow::field("val", arrow::binary(), nullable)}); } @@ -456,7 +456,7 @@ CreateArrowSchema(DataType data_type, int dim, bool nullable) { arrow::fixed_size_binary(dim * sizeof(bfloat16)), nullable)}); } - case DataType::VECTOR_SPARSE_FLOAT: { + case DataType::VECTOR_SPARSE_U32_F32: { return arrow::schema( {arrow::field("val", arrow::binary(), nullable)}); } @@ -490,7 +490,7 @@ GetDimensionFromFileMetaData(const parquet::ColumnDescriptor* schema, case DataType::VECTOR_BFLOAT16: { return schema->type_length() / sizeof(bfloat16); } - case DataType::VECTOR_SPARSE_FLOAT: { + case DataType::VECTOR_SPARSE_U32_F32: { ThrowInfo(DataTypeInvalid, fmt::format("GetDimensionFromFileMetaData should not be " "called for sparse vector")); @@ -971,7 +971,7 @@ CreateFieldData(const DataType& type, case DataType::VECTOR_BFLOAT16: return std::make_shared>( dim, type, total_num_rows); - case DataType::VECTOR_SPARSE_FLOAT: + case DataType::VECTOR_SPARSE_U32_F32: return std::make_shared>( type, total_num_rows); case DataType::VECTOR_INT8: diff --git a/internal/core/thirdparty/knowhere/CMakeLists.txt b/internal/core/thirdparty/knowhere/CMakeLists.txt index 5ceef2096a..33d4530123 100644 --- a/internal/core/thirdparty/knowhere/CMakeLists.txt +++ b/internal/core/thirdparty/knowhere/CMakeLists.txt @@ -14,7 +14,7 @@ # Update KNOWHERE_VERSION for the first occurrence milvus_add_pkg_config("knowhere") set_property(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} PROPERTY INCLUDE_DIRECTORIES "") -set( KNOWHERE_VERSION v2.6.1-rc ) +set( KNOWHERE_VERSION v2.6.1 ) set( GIT_REPOSITORY "https://github.com/zilliztech/knowhere.git") message(STATUS "Knowhere repo: ${GIT_REPOSITORY}") diff --git a/internal/core/thirdparty/milvus-common/CMakeLists.txt b/internal/core/thirdparty/milvus-common/CMakeLists.txt index b6f6de3c71..a65703d5b1 100644 --- a/internal/core/thirdparty/milvus-common/CMakeLists.txt +++ b/internal/core/thirdparty/milvus-common/CMakeLists.txt @@ -13,7 +13,7 @@ milvus_add_pkg_config("milvus-common") set_property(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} PROPERTY INCLUDE_DIRECTORIES "") -set( MILVUS-COMMON-VERSION 41fa9b1 ) +set( MILVUS-COMMON-VERSION 5770e40 ) set( GIT_REPOSITORY "https://github.com/zilliztech/milvus-common.git") message(STATUS "milvus-common repo: ${GIT_REPOSITORY}") diff --git a/internal/core/unittest/test_always_true_expr.cpp b/internal/core/unittest/test_always_true_expr.cpp index 3e395122fc..4a57527a00 100644 --- a/internal/core/unittest/test_always_true_expr.cpp +++ b/internal/core/unittest/test_always_true_expr.cpp @@ -29,7 +29,7 @@ INSTANTIATE_TEST_SUITE_P( ExprAlwaysTrueParameters, ExprAlwaysTrueTest, ::testing::Values(milvus::DataType::VECTOR_FLOAT, - milvus::DataType::VECTOR_SPARSE_FLOAT)); + milvus::DataType::VECTOR_SPARSE_U32_F32)); TEST_P(ExprAlwaysTrueTest, AlwaysTrue) { using namespace milvus; diff --git a/internal/core/unittest/test_bf_sparse.cpp b/internal/core/unittest/test_bf_sparse.cpp index a707454840..08620b0711 100644 --- a/internal/core/unittest/test_bf_sparse.cpp +++ b/internal/core/unittest/test_bf_sparse.cpp @@ -27,8 +27,8 @@ using namespace milvus::query; namespace { std::vector -SearchRef(const knowhere::sparse::SparseRow* base, - const knowhere::sparse::SparseRow& query, +SearchRef(const knowhere::sparse::SparseRow* base, + const knowhere::sparse::SparseRow& query, int nb, int topk) { std::vector> res; @@ -51,8 +51,8 @@ SearchRef(const knowhere::sparse::SparseRow* base, } std::vector -RangeSearchRef(const knowhere::sparse::SparseRow* base, - const knowhere::sparse::SparseRow& query, +RangeSearchRef(const knowhere::sparse::SparseRow* base, + const knowhere::sparse::SparseRow& query, int nb, float radius, float range_filter, @@ -113,7 +113,7 @@ class TestSparseFloatSearchBruteForce : public ::testing::Test { search_info, index_info, bitset_view, - DataType::VECTOR_SPARSE_FLOAT, + DataType::VECTOR_SPARSE_U32_F32, DataType::NONE)); return; } @@ -122,7 +122,7 @@ class TestSparseFloatSearchBruteForce : public ::testing::Test { search_info, index_info, bitset_view, - DataType::VECTOR_SPARSE_FLOAT, + DataType::VECTOR_SPARSE_U32_F32, DataType::NONE); for (int i = 0; i < nq; i++) { auto ref = SearchRef(base.get(), *(query.get() + i), nb, topk); @@ -137,7 +137,7 @@ class TestSparseFloatSearchBruteForce : public ::testing::Test { search_info, index_info, bitset_view, - DataType::VECTOR_SPARSE_FLOAT, + DataType::VECTOR_SPARSE_U32_F32, DataType::NONE); for (int i = 0; i < nq; i++) { auto ref = RangeSearchRef( @@ -152,7 +152,7 @@ class TestSparseFloatSearchBruteForce : public ::testing::Test { search_info, index_info, bitset_view, - DataType::VECTOR_SPARSE_FLOAT); + DataType::VECTOR_SPARSE_U32_F32); auto iterators = result3.chunk_iterators(); for (int i = 0; i < nq; i++) { auto it = iterators[i]; diff --git a/internal/core/unittest/test_binlog_index.cpp b/internal/core/unittest/test_binlog_index.cpp index 2951cb6c43..19dac1ebd0 100644 --- a/internal/core/unittest/test_binlog_index.cpp +++ b/internal/core/unittest/test_binlog_index.cpp @@ -91,7 +91,7 @@ class BinlogIndexTest : public ::testing::TestWithParam { } else { intermin_index_has_raw_data = true; } - } else if (data_type == DataType::VECTOR_SPARSE_FLOAT) { + } else if (data_type == DataType::VECTOR_SPARSE_U32_F32) { auto sparse_vecs = GenerateRandomSparseFloatVector(data_n); vec_field_data->FillFieldData(sparse_vecs.get(), data_n); data_d = std::dynamic_pointer_cast< @@ -190,12 +190,12 @@ INSTANTIATE_TEST_SUITE_P( knowhere::IndexEnum:: INDEX_FAISS_SCANN_DVR), // intermin index not has data std::make_tuple( - DataType::VECTOR_SPARSE_FLOAT, + DataType::VECTOR_SPARSE_U32_F32, knowhere::metric::IP, knowhere::IndexEnum:: INDEX_SPARSE_INVERTED_INDEX, //intermin index not has data std::nullopt), - std::make_tuple(DataType::VECTOR_SPARSE_FLOAT, + std::make_tuple(DataType::VECTOR_SPARSE_U32_F32, knowhere::metric::IP, knowhere::IndexEnum:: INDEX_SPARSE_WAND, // intermin index not has data diff --git a/internal/core/unittest/test_chunk.cpp b/internal/core/unittest/test_chunk.cpp index 2640b4ba70..344025a459 100644 --- a/internal/core/unittest/test_chunk.cpp +++ b/internal/core/unittest/test_chunk.cpp @@ -568,7 +568,7 @@ TEST(chunk, test_sparse_float) { auto vecs = milvus::segcore::GenerateRandomSparseFloatVector( n_rows, kTestSparseDim, kTestSparseVectorDensity); auto field_data = milvus::storage::CreateFieldData( - storage::DataType::VECTOR_SPARSE_FLOAT, false, kTestSparseDim, n_rows); + storage::DataType::VECTOR_SPARSE_U32_F32, false, kTestSparseDim, n_rows); field_data->FillFieldData(vecs.get(), n_rows); storage::InsertEventData event_data; @@ -593,7 +593,7 @@ TEST(chunk, test_sparse_float) { FieldMeta field_meta(FieldName("a"), milvus::FieldId(1), - DataType::VECTOR_SPARSE_FLOAT, + DataType::VECTOR_SPARSE_U32_F32, kTestSparseDim, "IP", false, diff --git a/internal/core/unittest/test_chunk_vector.cpp b/internal/core/unittest/test_chunk_vector.cpp index 1cdb9e617c..6684e928e3 100644 --- a/internal/core/unittest/test_chunk_vector.cpp +++ b/internal/core/unittest/test_chunk_vector.cpp @@ -71,7 +71,7 @@ TEST_F(ChunkVectorTest, FillDataWithMmap) { auto bf16_vec = schema->AddDebugField( "bf16_vec", DataType::VECTOR_BFLOAT16, 128, metric_type); auto sparse_vec = schema->AddDebugField( - "sparse_vec", DataType::VECTOR_SPARSE_FLOAT, 128, metric_type); + "sparse_vec", DataType::VECTOR_SPARSE_U32_F32, 128, metric_type); auto int8_vec = schema->AddDebugField( "int8_vec", DataType::VECTOR_INT8, 128, metric_type); schema->set_primary_field_id(int64_field); @@ -200,7 +200,7 @@ TEST_F(ChunkVectorTest, FillDataWithMmap) { auto fp16_vec_gt = dataset.get_col(fp16_vec); auto bf16_vec_gt = dataset.get_col(bf16_vec); auto sparse_vec_gt = - dataset.get_col>(sparse_vec); + dataset.get_col>(sparse_vec); auto int8_vec_gt = dataset.get_col(int8_vec); for (size_t i = 0; i < num_inserted; ++i) { @@ -234,7 +234,7 @@ INSTANTIATE_TEST_SUITE_P(IsSparse, ChunkVectorTest, ::testing::Bool()); TEST_P(ChunkVectorTest, SearchWithMmap) { auto is_sparse = GetParam(); auto data_type = - is_sparse ? DataType::VECTOR_SPARSE_FLOAT : DataType::VECTOR_FLOAT; + is_sparse ? DataType::VECTOR_SPARSE_U32_F32 : DataType::VECTOR_FLOAT; auto schema = std::make_shared(); auto pk = schema->AddDebugField("pk", DataType::INT64); auto random = schema->AddDebugField("random", DataType::DOUBLE); diff --git a/internal/core/unittest/test_data_codec.cpp b/internal/core/unittest/test_data_codec.cpp index 1bff0f7619..fc70798e97 100644 --- a/internal/core/unittest/test_data_codec.cpp +++ b/internal/core/unittest/test_data_codec.cpp @@ -591,7 +591,7 @@ TEST(storage, InsertDataSparseFloat) { auto vecs = milvus::segcore::GenerateRandomSparseFloatVector( n_rows, kTestSparseDim, kTestSparseVectorDensity); auto field_data = milvus::storage::CreateFieldData( - storage::DataType::VECTOR_SPARSE_FLOAT, false, kTestSparseDim, n_rows); + storage::DataType::VECTOR_SPARSE_U32_F32, false, kTestSparseDim, n_rows); field_data->FillFieldData(vecs.get(), n_rows); auto payload_reader = @@ -611,10 +611,10 @@ TEST(storage, InsertDataSparseFloat) { std::make_pair(Timestamp(0), Timestamp(100))); auto new_payload = new_insert_data->GetFieldData(); ASSERT_TRUE(new_payload->get_data_type() == - storage::DataType::VECTOR_SPARSE_FLOAT); + storage::DataType::VECTOR_SPARSE_U32_F32); ASSERT_EQ(new_payload->get_num_rows(), n_rows); ASSERT_EQ(new_payload->get_null_count(), 0); - auto new_data = static_cast*>( + auto new_data = static_cast*>( new_payload->Data()); for (auto i = 0; i < n_rows; ++i) { diff --git a/internal/core/unittest/test_disk_file_manager_test.cpp b/internal/core/unittest/test_disk_file_manager_test.cpp index 00a7701324..29a6c244de 100644 --- a/internal/core/unittest/test_disk_file_manager_test.cpp +++ b/internal/core/unittest/test_disk_file_manager_test.cpp @@ -455,16 +455,20 @@ TEST_F(DiskAnnFileManagerTest, CacheOptFieldToDiskOptFieldMoreThanOne) { PrepareOptionalField(file_manager, insert_file_path); opt_fields[kOptFieldId + 1] = { kOptFieldName + "second", DataType::INT64, {insert_file_path}}; - EXPECT_THROW(file_manager->CacheOptFieldToDisk(opt_fields), SegcoreError); + milvus::Config config; + config[VEC_OPT_FIELDS] = opt_fields; + EXPECT_THROW(file_manager->CacheOptFieldToDisk(config), SegcoreError); } TEST_F(DiskAnnFileManagerTest, CacheOptFieldToDiskSpaceCorrect) { auto file_manager = CreateFileManager(cm_); const auto insert_file_path = PrepareInsertData(kOptFieldDataRange); - auto opt_fileds = + auto opt_fields = PrepareOptionalField(file_manager, insert_file_path); - auto res = file_manager->CacheOptFieldToDisk(opt_fileds); + milvus::Config config; + config[VEC_OPT_FIELDS] = opt_fields; + auto res = file_manager->CacheOptFieldToDisk(config); ASSERT_FALSE(res.empty()); CheckOptFieldCorrectness(res); } @@ -475,7 +479,9 @@ TEST_F(DiskAnnFileManagerTest, CacheOptFieldToDiskSpaceCorrect) { auto insert_file_path = PrepareInsertData(RANGE); \ auto opt_fields = \ PrepareOptionalField(file_manager, insert_file_path); \ - auto res = file_manager->CacheOptFieldToDisk(opt_fields); \ + milvus::Config config; \ + config[VEC_OPT_FIELDS] = opt_fields; \ + auto res = file_manager->CacheOptFieldToDisk(config); \ ASSERT_FALSE(res.empty()); \ CheckOptFieldCorrectness(res, RANGE); \ }; @@ -496,9 +502,11 @@ TEST_F(DiskAnnFileManagerTest, CacheOptFieldToDiskOnlyOneCategory) { { const auto insert_file_path = PrepareInsertData(1); - auto opt_fileds = PrepareOptionalField( + auto opt_fields = PrepareOptionalField( file_manager, insert_file_path); - auto res = file_manager->CacheOptFieldToDisk(opt_fileds); + milvus::Config config; + config[VEC_OPT_FIELDS] = opt_fields; + auto res = file_manager->CacheOptFieldToDisk(config); ASSERT_TRUE(res.empty()); } } diff --git a/internal/core/unittest/test_exec.cpp b/internal/core/unittest/test_exec.cpp index 893a4b4908..660b471bca 100644 --- a/internal/core/unittest/test_exec.cpp +++ b/internal/core/unittest/test_exec.cpp @@ -105,7 +105,7 @@ class TaskTest : public testing::TestWithParam { INSTANTIATE_TEST_SUITE_P(TaskTestSuite, TaskTest, ::testing::Values(DataType::VECTOR_FLOAT, - DataType::VECTOR_SPARSE_FLOAT)); + DataType::VECTOR_SPARSE_U32_F32)); TEST_P(TaskTest, RegisterFunction) { milvus::exec::expression::FunctionFactory& factory = diff --git a/internal/core/unittest/test_expr.cpp b/internal/core/unittest/test_expr.cpp index c6204c119b..b6fc0e1325 100644 --- a/internal/core/unittest/test_expr.cpp +++ b/internal/core/unittest/test_expr.cpp @@ -95,7 +95,7 @@ INSTANTIATE_TEST_SUITE_P( std::make_tuple(std::pair(milvus::DataType::VECTOR_FLOAT, knowhere::metric::L2), false), - std::make_tuple(std::pair(milvus::DataType::VECTOR_SPARSE_FLOAT, + std::make_tuple(std::pair(milvus::DataType::VECTOR_SPARSE_U32_F32, knowhere::metric::IP), false), std::make_tuple(std::pair(milvus::DataType::VECTOR_BINARY, @@ -104,7 +104,7 @@ INSTANTIATE_TEST_SUITE_P( std::make_tuple(std::pair(milvus::DataType::VECTOR_FLOAT, knowhere::metric::L2), true), - std::make_tuple(std::pair(milvus::DataType::VECTOR_SPARSE_FLOAT, + std::make_tuple(std::pair(milvus::DataType::VECTOR_SPARSE_U32_F32, knowhere::metric::IP), true), std::make_tuple(std::pair(milvus::DataType::VECTOR_BINARY, diff --git a/internal/core/unittest/test_growing.cpp b/internal/core/unittest/test_growing.cpp index c94156483b..f835666764 100644 --- a/internal/core/unittest/test_growing.cpp +++ b/internal/core/unittest/test_growing.cpp @@ -109,7 +109,7 @@ class GrowingTest } else if (index_type == knowhere::IndexEnum::INDEX_SPARSE_INVERTED_INDEX || index_type == knowhere::IndexEnum::INDEX_SPARSE_WAND) { - data_type = DataType::VECTOR_SPARSE_FLOAT; + data_type = DataType::VECTOR_SPARSE_U32_F32; } else { ASSERT_TRUE(false); } @@ -242,7 +242,7 @@ TEST_P(GrowingTest, FillData) { if (data_type == DataType::VECTOR_FLOAT) { EXPECT_EQ(vec_result->vectors().float_vector().data_size(), num_inserted * dim); - } else if (data_type == DataType::VECTOR_SPARSE_FLOAT) { + } else if (data_type == DataType::VECTOR_SPARSE_U32_F32) { EXPECT_EQ( vec_result->vectors().sparse_float_vector().contents_size(), num_inserted); diff --git a/internal/core/unittest/test_growing_index.cpp b/internal/core/unittest/test_growing_index.cpp index 38939221bc..09a04ae406 100644 --- a/internal/core/unittest/test_growing_index.cpp +++ b/internal/core/unittest/test_growing_index.cpp @@ -41,7 +41,7 @@ class GrowingIndexTest : public ::testing::TestWithParam { metric_type = std::get<2>(param); dense_vec_intermin_index_type = std::get<3>(param); dense_refine_type = std::get<4>(param); - if (data_type == DataType::VECTOR_SPARSE_FLOAT) { + if (data_type == DataType::VECTOR_SPARSE_U32_F32) { is_sparse = true; if (metric_type == knowhere::metric::IP) { intermin_index_with_raw_data = true; @@ -108,7 +108,7 @@ INSTANTIATE_TEST_SUITE_P( SparseIndexTypeParameters, GrowingIndexTest, ::testing::Combine( - ::testing::Values(DataType::VECTOR_SPARSE_FLOAT), + ::testing::Values(DataType::VECTOR_SPARSE_U32_F32), // VecIndexConfig will convert INDEX_SPARSE_INVERTED_INDEX/ // INDEX_SPARSE_WAND to INDEX_SPARSE_INVERTED_INDEX_CC/ // INDEX_SPARSE_WAND_CC, thus no need to use _CC version here. @@ -409,7 +409,7 @@ TEST_P(GrowingIndexTest, AddWithoutBuildPool) { } EXPECT_EQ(index->Count(), (add_cont + 1) * N); } else if (is_sparse) { - auto index = std::make_unique>( + auto index = std::make_unique>( DataType::NONE, index_type, metric_type, @@ -417,7 +417,7 @@ TEST_P(GrowingIndexTest, AddWithoutBuildPool) { false, milvus::storage::FileManagerContext()); auto sparse_data = - dataset.get_col>(vec); + dataset.get_col>(vec); index->BuildWithDataset( knowhere::GenDataSet(N, dim, sparse_data.data()), build_config); for (int i = 0; i < add_cont; i++) { @@ -560,14 +560,14 @@ TEST_P(GrowingIndexTest, GetVector) { } } } else if (is_sparse) { - // GetVector for VECTOR_SPARSE_FLOAT + // GetVector for VECTOR_SPARSE_U32_F32 int64_t per_batch = 5000; int64_t n_batch = 20; int64_t dim = 128; for (int64_t i = 0; i < n_batch; i++) { auto dataset = DataGen(schema, per_batch); auto fakevec = - dataset.get_col>(vec); + dataset.get_col>(vec); auto offset = segment->PreInsert(per_batch); segment->Insert(offset, per_batch, diff --git a/internal/core/unittest/test_index_c_api.cpp b/internal/core/unittest/test_index_c_api.cpp index 19bcf60297..2f855e5e7e 100644 --- a/internal/core/unittest/test_index_c_api.cpp +++ b/internal/core/unittest/test_index_c_api.cpp @@ -68,7 +68,7 @@ TestVecIndex() { status = BuildBinaryVecIndex(index, NB * DIM / 8, xb_data.data()); } else if (std::is_same_v) { auto xb_data = - dataset.template get_col>( + dataset.template get_col>( milvus::FieldId(100)); status = BuildSparseFloatVecIndex( index, diff --git a/internal/core/unittest/test_index_wrapper.cpp b/internal/core/unittest/test_index_wrapper.cpp index 9adf8fad7e..2be27d7296 100644 --- a/internal/core/unittest/test_index_wrapper.cpp +++ b/internal/core/unittest/test_index_wrapper.cpp @@ -70,9 +70,9 @@ class IndexWrapperTest : public ::testing::TestWithParam { DataType::VECTOR_BINARY}, {knowhere::IndexEnum::INDEX_HNSW, DataType::VECTOR_FLOAT}, {knowhere::IndexEnum::INDEX_SPARSE_INVERTED_INDEX, - DataType::VECTOR_SPARSE_FLOAT}, + DataType::VECTOR_SPARSE_U32_F32}, {knowhere::IndexEnum::INDEX_SPARSE_WAND, - DataType::VECTOR_SPARSE_FLOAT}, + DataType::VECTOR_SPARSE_U32_F32}, }; vec_field_data_type = index_to_vec_type[index_type]; @@ -132,9 +132,9 @@ TEST_P(IndexWrapperTest, BuildAndQuery) { auto bin_vecs = dataset.get_col(milvus::FieldId(100)); xb_dataset = knowhere::GenDataSet(NB, DIM, bin_vecs.data()); ASSERT_NO_THROW(index->Build(xb_dataset)); - } else if (vec_field_data_type == DataType::VECTOR_SPARSE_FLOAT) { + } else if (vec_field_data_type == DataType::VECTOR_SPARSE_U32_F32) { auto dataset = GenFieldData(NB, metric_type, vec_field_data_type); - auto sparse_vecs = dataset.get_col>( + auto sparse_vecs = dataset.get_col>( milvus::FieldId(100)); xb_dataset = knowhere::GenDataSet(NB, kTestSparseDim, sparse_vecs.data()); @@ -159,7 +159,7 @@ TEST_P(IndexWrapperTest, BuildAndQuery) { vec_field_data_type, config, file_manager_context); auto vec_index = static_cast(copy_index.get()); - if (vec_field_data_type != DataType::VECTOR_SPARSE_FLOAT) { + if (vec_field_data_type != DataType::VECTOR_SPARSE_U32_F32) { ASSERT_EQ(vec_index->dim(), DIM); } @@ -177,9 +177,9 @@ TEST_P(IndexWrapperTest, BuildAndQuery) { auto xq_dataset = knowhere::GenDataSet(NQ, DIM, xb_data.data() + DIM * query_offset); result = vec_index->Query(xq_dataset, search_info, nullptr); - } else if (vec_field_data_type == DataType::VECTOR_SPARSE_FLOAT) { + } else if (vec_field_data_type == DataType::VECTOR_SPARSE_U32_F32) { auto dataset = GenFieldData(NQ, metric_type, vec_field_data_type); - auto xb_data = dataset.get_col>( + auto xb_data = dataset.get_col>( milvus::FieldId(100)); auto xq_dataset = knowhere::GenDataSet(NQ, kTestSparseDim, xb_data.data()); diff --git a/internal/core/unittest/test_indexing.cpp b/internal/core/unittest/test_indexing.cpp index e07708d209..9b2f3b95a2 100644 --- a/internal/core/unittest/test_indexing.cpp +++ b/internal/core/unittest/test_indexing.cpp @@ -331,7 +331,7 @@ class IndexTest : public ::testing::TestWithParam { if (index_type == knowhere::IndexEnum::INDEX_SPARSE_INVERTED_INDEX || index_type == knowhere::IndexEnum::INDEX_SPARSE_WAND) { is_sparse = true; - vec_field_data_type = milvus::DataType::VECTOR_SPARSE_FLOAT; + vec_field_data_type = milvus::DataType::VECTOR_SPARSE_U32_F32; } else if (IsBinaryVectorMetricType(metric_type)) { is_binary = true; vec_field_data_type = milvus::DataType::VECTOR_BINARY; @@ -349,7 +349,7 @@ class IndexTest : public ::testing::TestWithParam { } else if (is_sparse) { // sparse vector xb_sparse_data = - dataset.get_col>( + dataset.get_col>( milvus::FieldId(100)); xb_dataset = knowhere::GenDataSet(NB, kTestSparseDim, xb_sparse_data.data()); @@ -382,7 +382,7 @@ class IndexTest : public ::testing::TestWithParam { knowhere::DataSetPtr xb_dataset; FixedVector xb_data; FixedVector xb_bin_data; - FixedVector> xb_sparse_data; + FixedVector> xb_sparse_data; knowhere::DataSetPtr xq_dataset; int64_t query_offset = 100; int64_t NB = 3000; // will be updated to 27000 for mmap+hnsw @@ -686,7 +686,7 @@ TEST_P(IndexTest, GetVector_EmptySparseVector) { } NB = 3; - std::vector> vec; + std::vector> vec; vec.reserve(NB); vec.emplace_back(2); vec[0].set_at(0, 1, 1.0); diff --git a/internal/core/unittest/test_loading.cpp b/internal/core/unittest/test_loading.cpp index f2e937f813..7ef1f5e89b 100644 --- a/internal/core/unittest/test_loading.cpp +++ b/internal/core/unittest/test_loading.cpp @@ -47,8 +47,8 @@ class IndexLoadTest : public ::testing::TestWithParam { data_type = milvus::DataType::VECTOR_FLOAT16; } else if (field_type == "vector_binary") { data_type = milvus::DataType::VECTOR_BINARY; - } else if (field_type == "vector_sparse_float") { - data_type = milvus::DataType::VECTOR_SPARSE_FLOAT; + } else if (field_type == "VECTOR_SPARSE_U32_F32") { + data_type = milvus::DataType::VECTOR_SPARSE_U32_F32; } else if (field_type == "vector_int8") { data_type = milvus::DataType::VECTOR_INT8; } else if (field_type == "array") { diff --git a/internal/core/unittest/test_retrieve.cpp b/internal/core/unittest/test_retrieve.cpp index 4c49836d31..a3fe9f0007 100644 --- a/internal/core/unittest/test_retrieve.cpp +++ b/internal/core/unittest/test_retrieve.cpp @@ -46,7 +46,7 @@ class RetrieveTest : public ::testing::TestWithParam { INSTANTIATE_TEST_SUITE_P(RetrieveTest, RetrieveTest, ::testing::Values(DataType::VECTOR_FLOAT, - DataType::VECTOR_SPARSE_FLOAT)); + DataType::VECTOR_SPARSE_U32_F32)); TEST_P(RetrieveTest, AutoID) { auto schema = std::make_shared(); @@ -422,7 +422,7 @@ TEST_P(RetrieveTest, LargeTimestamp) { Assert(field_data.vectors().float_vector().data_size() == target_num * DIM); } - if (DataType(field_data.type()) == DataType::VECTOR_SPARSE_FLOAT) { + if (DataType(field_data.type()) == DataType::VECTOR_SPARSE_U32_F32) { Assert(field_data.vectors() .sparse_float_vector() .contents_size() == target_num); diff --git a/internal/core/unittest/test_types.cpp b/internal/core/unittest/test_types.cpp index c7b528fee8..550d9eabc3 100644 --- a/internal/core/unittest/test_types.cpp +++ b/internal/core/unittest/test_types.cpp @@ -97,8 +97,8 @@ TEST(GetArrowDataTypeTest, VECTOR_BFLOAT16) { ASSERT_TRUE(result->Equals(arrow::fixed_size_binary(dim * 2))); } -TEST(GetArrowDataTypeTest, VECTOR_SPARSE_FLOAT) { - auto result = GetArrowDataType(DataType::VECTOR_SPARSE_FLOAT); +TEST(GetArrowDataTypeTest, VECTOR_SPARSE_U32_F32) { + auto result = GetArrowDataType(DataType::VECTOR_SPARSE_U32_F32); ASSERT_TRUE(result->Equals(arrow::binary())); } diff --git a/internal/core/unittest/test_utils/DataGen.h b/internal/core/unittest/test_utils/DataGen.h index 372b53d33c..4e58ddb897 100644 --- a/internal/core/unittest/test_utils/DataGen.h +++ b/internal/core/unittest/test_utils/DataGen.h @@ -114,7 +114,7 @@ struct GeneratedData { } else { if (field_meta.is_vector() && field_meta.get_data_type() != - DataType::VECTOR_SPARSE_FLOAT) { + DataType::VECTOR_SPARSE_U32_F32) { if (field_meta.get_data_type() == DataType::VECTOR_FLOAT) { int len = raw_->num_rows() * field_meta.get_dim(); ret.resize(len); @@ -164,7 +164,7 @@ struct GeneratedData { } if constexpr (std::is_same_v< T, - knowhere::sparse::SparseRow>) { + knowhere::sparse::SparseRow>) { auto sparse_float_array = target_field_data.vectors().sparse_float_vector(); auto rows = @@ -301,7 +301,7 @@ struct GeneratedData { int array_len); }; -inline std::unique_ptr[]> +inline std::unique_ptr[]> GenerateRandomSparseFloatVector(size_t rows, size_t cols = kTestSparseDim, float density = kTestSparseVectorDensity, @@ -340,13 +340,13 @@ GenerateRandomSparseFloatVector(size_t rows, data[row][col] = val; } - auto tensor = std::make_unique[]>(rows); + auto tensor = std::make_unique[]>(rows); for (int32_t i = 0; i < rows; ++i) { if (data[i].size() == 0) { continue; } - knowhere::sparse::SparseRow row(data[i].size()); + knowhere::sparse::SparseRow row(data[i].size()); size_t j = 0; for (auto& [idx, val] : data[i]) { row.set_at(j++, idx, val); @@ -544,7 +544,7 @@ DataGen(SchemaPtr schema, insert_cols(data, N, field_meta, random_valid); break; } - case DataType::VECTOR_SPARSE_FLOAT: { + case DataType::VECTOR_SPARSE_U32_F32: { auto res = GenerateRandomSparseFloatVector( N, kTestSparseDim, kTestSparseVectorDensity, seed); auto array = milvus::segcore::CreateDataArrayFrom( @@ -595,7 +595,7 @@ DataGen(SchemaPtr schema, obj->assign(data, length * sizeof(float16)); break; } - case DataType::VECTOR_SPARSE_FLOAT: + case DataType::VECTOR_SPARSE_U32_F32: ThrowInfo(DataTypeInvalid, "not implemented"); break; case DataType::VECTOR_BFLOAT16: { @@ -1195,10 +1195,10 @@ CreateFieldDataFromDataArray(ssize_t raw_count, createFieldData(raw_data, DataType::VECTOR_BFLOAT16, dim); break; } - case DataType::VECTOR_SPARSE_FLOAT: { + case DataType::VECTOR_SPARSE_U32_F32: { auto sparse_float_array = data->vectors().sparse_float_vector(); auto rows = SparseBytesToRows(sparse_float_array.contents()); - createFieldData(rows.get(), DataType::VECTOR_SPARSE_FLOAT, 0); + createFieldData(rows.get(), DataType::VECTOR_SPARSE_U32_F32, 0); break; } case DataType::VECTOR_INT8: { diff --git a/internal/core/unittest/test_utils/indexbuilder_test_utils.h b/internal/core/unittest/test_utils/indexbuilder_test_utils.h index 84efb62563..2952f2b967 100644 --- a/internal/core/unittest/test_utils/indexbuilder_test_utils.h +++ b/internal/core/unittest/test_utils/indexbuilder_test_utils.h @@ -234,7 +234,7 @@ GenFieldData(int64_t N, schema->AddDebugField( "fakevec", data_type, - (data_type != milvus::DataType::VECTOR_SPARSE_FLOAT ? dim : 0), + (data_type != milvus::DataType::VECTOR_SPARSE_U32_F32 ? dim : 0), metric_type); return milvus::segcore::DataGen(schema, N); } diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index 82250d54b6..2f6ef8dd77 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -259,6 +259,9 @@ func (node *QueryNode) InitSegcore() error { cKnowhereThreadPoolSize := C.uint32_t(paramtable.Get().QueryNodeCfg.KnowhereThreadPoolSize.GetAsUint32()) C.SegcoreSetKnowhereSearchThreadPoolNum(cKnowhereThreadPoolSize) + cKnowhereFetchThreadPoolSize := C.uint32_t(paramtable.Get().QueryNodeCfg.KnowhereFetchThreadPoolSize.GetAsUint32()) + C.SegcoreSetKnowhereFetchThreadPoolNum(cKnowhereFetchThreadPoolSize) + // override segcore SIMD type cSimdType := C.CString(paramtable.Get().CommonCfg.SimdType.GetValue()) C.SegcoreSetSimdType(cSimdType) diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 97bbd14397..804c776293 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2879,6 +2879,7 @@ type queryNodeConfig struct { StatsPublishInterval ParamItem `refreshable:"true"` // segcore + KnowhereFetchThreadPoolSize ParamItem `refreshable:"false"` KnowhereThreadPoolSize ParamItem `refreshable:"false"` ChunkRows ParamItem `refreshable:"false"` EnableInterminSegmentIndex ParamItem `refreshable:"false"` @@ -3322,6 +3323,25 @@ If set to 0, time based eviction is disabled.`, } p.KnowhereThreadPoolSize.Init(base.mgr) + p.KnowhereFetchThreadPoolSize = ParamItem{ + Key: "queryNode.segcore.knowhereFetchThreadPoolNumRatio", + Version: "2.6.0", + DefaultValue: "4", + Formatter: func(v string) string { + factor := getAsInt64(v) + if factor <= 0 { + factor = 1 + } else if factor > 32 { + factor = 32 + } + knowhereFetchThreadPoolSize := uint32(hardware.GetCPUNum()) * uint32(factor) + return strconv.FormatUint(uint64(knowhereFetchThreadPoolSize), 10) + }, + Doc: "The number of threads in knowhere's fetch thread pool for object storage. The pool size will multiply with knowhereThreadPoolNumRatio([1, 32])", + Export: false, + } + p.KnowhereFetchThreadPoolSize.Init(base.mgr) + p.ChunkRows = ParamItem{ Key: "queryNode.segcore.chunkRows", Version: "2.0.0",