// Copyright (C) 2019-2020 Zilliz. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software distributed under the License // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License #include #include #include "common/EasyAssert.h" #include "fmt/format.h" #include "index/ScalarIndexSort.h" #include "index/StringIndexSort.h" #include "common/SystemProperty.h" #include "segcore/FieldIndexing.h" #include "index/VectorMemIndex.h" #include "IndexConfigGenerator.h" namespace milvus::segcore { using std::unique_ptr; VectorFieldIndexing::VectorFieldIndexing(const FieldMeta& field_meta, const FieldIndexMeta& field_index_meta, int64_t segment_max_row_count, const SegcoreConfig& segcore_config) : FieldIndexing(field_meta, segcore_config), build(false), sync_with_index(false), config_(std::make_unique(segment_max_row_count, field_index_meta, segcore_config, SegmentType::Growing)) { index_ = std::make_unique( config_->GetIndexType(), config_->GetMetricType(), knowhere::Version::GetCurrentVersion().VersionNumber()); } void VectorFieldIndexing::BuildIndexRange(int64_t ack_beg, int64_t ack_end, const VectorBase* vec_base) { AssertInfo(field_meta_.get_data_type() == DataType::VECTOR_FLOAT, "Data type of vector field is not VECTOR_FLOAT"); auto dim = field_meta_.get_dim(); auto source = dynamic_cast*>(vec_base); AssertInfo(source, "vec_base can't cast to ConcurrentVector type"); auto num_chunk = source->num_chunk(); AssertInfo(ack_end <= num_chunk, "ack_end is bigger than num_chunk"); auto conf = get_build_params(); data_.grow_to_at_least(ack_end); for (int chunk_id = ack_beg; chunk_id < ack_end; chunk_id++) { const auto& chunk = source->get_chunk(chunk_id); auto indexing = std::make_unique( knowhere::IndexEnum::INDEX_FAISS_IVFFLAT, knowhere::metric::L2, knowhere::Version::GetCurrentVersion().VersionNumber()); auto dataset = knowhere::GenDataSet( source->get_size_per_chunk(), dim, chunk.data()); indexing->BuildWithDataset(dataset, conf); data_[chunk_id] = std::move(indexing); } } void VectorFieldIndexing::GetDataFromIndex(const int64_t* seg_offsets, int64_t count, int64_t element_size, void* output) { auto ids_ds = std::make_shared(); ids_ds->SetRows(count); ids_ds->SetDim(1); ids_ds->SetIds(seg_offsets); ids_ds->SetIsOwner(false); auto vector = index_->GetVector(ids_ds); std::memcpy(output, vector.data(), count * element_size); } void VectorFieldIndexing::AppendSegmentIndex(int64_t reserved_offset, int64_t size, const VectorBase* vec_base, const void* data_source) { AssertInfo(field_meta_.get_data_type() == DataType::VECTOR_FLOAT, "Data type of vector field is not VECTOR_FLOAT"); auto dim = field_meta_.get_dim(); auto conf = get_build_params(); auto source = dynamic_cast*>(vec_base); auto per_chunk = source->get_size_per_chunk(); //append vector [vector_id_beg, vector_id_end] into index //build index [vector_id_beg, build_threshold) when index not exist if (!build) { idx_t vector_id_beg = index_cur_.load(); idx_t vector_id_end = get_build_threshold() - 1; auto chunk_id_beg = vector_id_beg / per_chunk; auto chunk_id_end = vector_id_end / per_chunk; int64_t vec_num = vector_id_end - vector_id_beg + 1; // for train index const void* data_addr; unique_ptr vec_data; //all train data in one chunk if (chunk_id_beg == chunk_id_end) { data_addr = vec_base->get_chunk_data(chunk_id_beg); } else { //merge data from multiple chunks together vec_data = std::make_unique(vec_num * dim); int64_t offset = 0; //copy vector data [vector_id_beg, vector_id_end] for (int chunk_id = chunk_id_beg; chunk_id <= chunk_id_end; chunk_id++) { int chunk_offset = 0; int chunk_copysz = chunk_id == chunk_id_end ? vector_id_end - chunk_id * per_chunk + 1 : per_chunk; std::memcpy(vec_data.get() + offset * dim, (const float*)vec_base->get_chunk_data(chunk_id) + chunk_offset * dim, chunk_copysz * dim * sizeof(float)); offset += chunk_copysz; } data_addr = vec_data.get(); } auto dataset = knowhere::GenDataSet(vec_num, dim, data_addr); dataset->SetIsOwner(false); try { index_->BuildWithDataset(dataset, conf); } catch (SegcoreError& error) { LOG_SEGCORE_ERROR_ << " growing index build error : " << error.what(); return; } index_cur_.fetch_add(vec_num); build = true; } //append rest data when index has built idx_t vector_id_beg = index_cur_.load(); idx_t vector_id_end = reserved_offset + size - 1; auto chunk_id_beg = vector_id_beg / per_chunk; auto chunk_id_end = vector_id_end / per_chunk; int64_t vec_num = vector_id_end - vector_id_beg + 1; if (vec_num <= 0) { sync_with_index.store(true); return; } if (sync_with_index.load()) { auto dataset = knowhere::GenDataSet(vec_num, dim, data_source); index_->AddWithDataset(dataset, conf); index_cur_.fetch_add(vec_num); } else { for (int chunk_id = chunk_id_beg; chunk_id <= chunk_id_end; chunk_id++) { int chunk_offset = chunk_id == chunk_id_beg ? index_cur_ - chunk_id * per_chunk : 0; int chunk_sz = chunk_id == chunk_id_end ? vector_id_end % per_chunk - chunk_offset + 1 : per_chunk - chunk_offset; auto dataset = knowhere::GenDataSet( chunk_sz, dim, (const float*)source->get_chunk_data(chunk_id) + chunk_offset * dim); index_->AddWithDataset(dataset, conf); index_cur_.fetch_add(chunk_sz); } sync_with_index.store(true); } } knowhere::Json VectorFieldIndexing::get_build_params() const { auto config = config_->GetBuildBaseParams(); config[knowhere::meta::DIM] = std::to_string(field_meta_.get_dim()); config[knowhere::meta::NUM_BUILD_THREAD] = std::to_string(1); return config; } SearchInfo VectorFieldIndexing::get_search_params(const SearchInfo& searchInfo) const { auto conf = config_->GetSearchConf(searchInfo); return conf; } idx_t VectorFieldIndexing::get_index_cursor() { return index_cur_.load(); } bool VectorFieldIndexing::sync_data_with_index() const { return sync_with_index.load(); } bool VectorFieldIndexing::has_raw_data() const { return index_->HasRawData(); } template void ScalarFieldIndexing::BuildIndexRange(int64_t ack_beg, int64_t ack_end, const VectorBase* vec_base) { auto source = dynamic_cast*>(vec_base); AssertInfo(source, "vec_base can't cast to ConcurrentVector type"); auto num_chunk = source->num_chunk(); AssertInfo(ack_end <= num_chunk, "Ack_end is bigger than num_chunk"); data_.grow_to_at_least(ack_end); for (int chunk_id = ack_beg; chunk_id < ack_end; chunk_id++) { const auto& chunk = source->get_chunk(chunk_id); // build index for chunk // TODO if constexpr (std::is_same_v) { auto indexing = index::CreateStringIndexSort(); indexing->Build(vec_base->get_size_per_chunk(), chunk.data()); data_[chunk_id] = std::move(indexing); } else { auto indexing = index::CreateScalarIndexSort(); indexing->Build(vec_base->get_size_per_chunk(), chunk.data()); data_[chunk_id] = std::move(indexing); } } } std::unique_ptr CreateIndex(const FieldMeta& field_meta, const FieldIndexMeta& field_index_meta, int64_t segment_max_row_count, const SegcoreConfig& segcore_config) { if (field_meta.is_vector()) { if (field_meta.get_data_type() == DataType::VECTOR_FLOAT) { return std::make_unique(field_meta, field_index_meta, segment_max_row_count, segcore_config); } else if (field_meta.get_data_type() == DataType::VECTOR_FLOAT16) { return std::make_unique(field_meta, field_index_meta, segment_max_row_count, segcore_config); } else { PanicInfo(DataTypeInvalid, fmt::format("unsupported vector type in index: {}", field_meta.get_data_type())); } } switch (field_meta.get_data_type()) { case DataType::BOOL: return std::make_unique>(field_meta, segcore_config); case DataType::INT8: return std::make_unique>( field_meta, segcore_config); case DataType::INT16: return std::make_unique>( field_meta, segcore_config); case DataType::INT32: return std::make_unique>( field_meta, segcore_config); case DataType::INT64: return std::make_unique>( field_meta, segcore_config); case DataType::FLOAT: return std::make_unique>(field_meta, segcore_config); case DataType::DOUBLE: return std::make_unique>( field_meta, segcore_config); case DataType::VARCHAR: return std::make_unique>( field_meta, segcore_config); default: PanicInfo(DataTypeInvalid, fmt::format("unsupported scalar type in index: {}", field_meta.get_data_type())); } } } // namespace milvus::segcore