// Copyright (C) 2019-2020 Zilliz. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software distributed under the License // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License #include #include #include #include #include #include #include #include #include #include #include #include #include "cachinglayer/CacheSlot.h" #include "common/Consts.h" #include "common/EasyAssert.h" #include "common/FieldData.h" #include "common/Schema.h" #include "common/Json.h" #include "common/Types.h" #include "common/Common.h" #include "fmt/format.h" #include "log/Log.h" #include "nlohmann/json.hpp" #include "query/PlanNode.h" #include "query/SearchOnSealed.h" #include "segcore/SegmentGrowingImpl.h" #include "segcore/SegmentGrowing.h" #include "segcore/Utils.h" #include "segcore/memory_planner.h" #include "storage/RemoteChunkManagerSingleton.h" #include "storage/Util.h" #include "storage/ThreadPools.h" #include "storage/KeyRetriever.h" #include "common/TypeTraits.h" #include "milvus-storage/format/parquet/file_reader.h" #include "milvus-storage/filesystem/fs.h" #include "milvus-storage/common/constants.h" namespace milvus::segcore { using namespace milvus::cachinglayer; int64_t SegmentGrowingImpl::PreInsert(int64_t size) { auto reserved_begin = insert_record_.reserved.fetch_add(size); return reserved_begin; } void SegmentGrowingImpl::mask_with_delete(BitsetTypeView& bitset, int64_t ins_barrier, Timestamp timestamp) const { deleted_record_.Query(bitset, ins_barrier, timestamp); } void SegmentGrowingImpl::try_remove_chunks(FieldId fieldId) { //remove the chunk data to reduce memory consumption auto& field_meta = schema_->operator[](fieldId); auto data_type = field_meta.get_data_type(); if (IsVectorDataType(data_type)) { if (indexing_record_.HasRawData(fieldId)) { auto vec_data_base = insert_record_.get_data_base(fieldId); if (vec_data_base && vec_data_base->num_chunk() > 0 && chunk_mutex_.try_lock()) { vec_data_base->clear(); chunk_mutex_.unlock(); } } } } void SegmentGrowingImpl::Insert(int64_t reserved_offset, int64_t num_rows, const int64_t* row_ids, const Timestamp* timestamps_raw, InsertRecordProto* insert_record_proto) { AssertInfo(insert_record_proto->num_rows() == num_rows, "Entities_raw count not equal to insert size"); // protect schema being changed during insert // schema change cannot happends during insertion, // otherwise, there might be some data not following new schema std::shared_lock lck(sch_mutex_); // step 1: check insert data if valid std::unordered_map field_id_to_offset; int64_t field_offset = 0; int64_t exist_rows = stats_.mem_size / (sizeof(Timestamp) + sizeof(idx_t)); for (const auto& field : insert_record_proto->fields_data()) { auto field_id = FieldId(field.field_id()); AssertInfo(!field_id_to_offset.count(field_id), "duplicate field data"); field_id_to_offset.emplace(field_id, field_offset++); // may be added field, add the null if has existed data if (exist_rows > 0 && !insert_record_.is_data_exist(field_id)) { LOG_WARN( "heterogeneous insert data found for segment {}, field id {}, " "data type {}", id_, field_id.get(), field.type()); schema_->AddField(FieldName(field.field_name()), field_id, DataType(field.type()), true, std::nullopt); auto field_meta = schema_->get_fields().at(field_id); insert_record_.append_field_meta( field_id, field_meta, size_per_chunk(), mmap_descriptor_); auto data = bulk_subscript_not_exist_field(field_meta, exist_rows); insert_record_.get_data_base(field_id)->set_data_raw( 0, exist_rows, data.get(), field_meta); } } // segment have latest schema while insert used old one // need to fill insert data with field_meta for (auto& [field_id, field_meta] : schema_->get_fields()) { if (field_id.get() < START_USER_FIELDID) { continue; } if (field_id_to_offset.count(field_id) > 0) { continue; } LOG_INFO( "schema newer than insert data found for segment {}, attach empty " "field data" "not exist field {}, data type {}", id_, field_id.get(), field_meta.get_data_type()); auto data = bulk_subscript_not_exist_field(field_meta, num_rows); insert_record_proto->add_fields_data()->CopyFrom(*data); field_id_to_offset.emplace(field_id, field_offset++); } // step 2: sort timestamp // query node already guarantees that the timestamp is ordered, avoid field data copy in c++ // step 3: fill into Segment.ConcurrentVector insert_record_.timestamps_.set_data_raw( reserved_offset, timestamps_raw, num_rows); // update the mem size of timestamps and row IDs stats_.mem_size += num_rows * (sizeof(Timestamp) + sizeof(idx_t)); for (auto& [field_id, field_meta] : schema_->get_fields()) { if (field_id.get() < START_USER_FIELDID) { continue; } AssertInfo(field_id_to_offset.count(field_id), fmt::format("can't find field {}", field_id.get())); auto data_offset = field_id_to_offset[field_id]; if (!indexing_record_.HasRawData(field_id)) { if (field_meta.is_nullable()) { insert_record_.get_valid_data(field_id)->set_data_raw( num_rows, &insert_record_proto->fields_data(data_offset), field_meta); } insert_record_.get_data_base(field_id)->set_data_raw( reserved_offset, num_rows, &insert_record_proto->fields_data(data_offset), field_meta); } //insert vector data into index if (segcore_config_.get_enable_interim_segment_index()) { indexing_record_.AppendingIndex( reserved_offset, num_rows, field_id, &insert_record_proto->fields_data(data_offset), insert_record_); } // index text. if (field_meta.enable_match()) { // TODO: iterate texts and call `AddText` instead of `AddTexts`. This may cost much more memory. std::vector texts( insert_record_proto->fields_data(data_offset) .scalars() .string_data() .data() .begin(), insert_record_proto->fields_data(data_offset) .scalars() .string_data() .data() .end()); FixedVector texts_valid_data( insert_record_proto->fields_data(data_offset) .valid_data() .begin(), insert_record_proto->fields_data(data_offset) .valid_data() .end()); AddTexts(field_id, texts.data(), texts_valid_data.data(), num_rows, reserved_offset); } // update average row data size auto field_data_size = GetRawDataSizeOfDataArray( &insert_record_proto->fields_data(data_offset), field_meta, num_rows); if (IsVariableDataType(field_meta.get_data_type())) { SegmentInternalInterface::set_field_avg_size( field_id, num_rows, field_data_size); } stats_.mem_size += field_data_size; try_remove_chunks(field_id); } // step 4: set pks to offset auto field_id = schema_->get_primary_field_id().value_or(FieldId(-1)); AssertInfo(field_id.get() != INVALID_FIELD_ID, "Primary key is -1"); std::vector pks(num_rows); ParsePksFromFieldData( pks, insert_record_proto->fields_data(field_id_to_offset[field_id])); for (int i = 0; i < num_rows; ++i) { insert_record_.insert_pk(pks[i], reserved_offset + i); } // step 5: update small indexes insert_record_.ack_responder_.AddSegment(reserved_offset, reserved_offset + num_rows); } void SegmentGrowingImpl::LoadFieldData(const LoadFieldDataInfo& infos) { switch (infos.storage_version) { case 2: load_column_group_data_internal(infos); break; default: load_field_data_internal(infos); break; } } void SegmentGrowingImpl::load_field_data_internal(const LoadFieldDataInfo& infos) { AssertInfo(infos.field_infos.find(TimestampFieldID.get()) != infos.field_infos.end(), "timestamps field data should be included"); AssertInfo( infos.field_infos.find(RowFieldID.get()) != infos.field_infos.end(), "rowID field data should be included"); auto primary_field_id = schema_->get_primary_field_id().value_or(FieldId(-1)); AssertInfo(primary_field_id.get() != INVALID_FIELD_ID, "Primary key is -1"); AssertInfo(infos.field_infos.find(primary_field_id.get()) != infos.field_infos.end(), "primary field data should be included"); size_t num_rows = storage::GetNumRowsForLoadInfo(infos); auto reserved_offset = PreInsert(num_rows); for (auto& [id, info] : infos.field_infos) { auto field_id = FieldId(id); auto insert_files = info.insert_files; storage::SortByPath(insert_files); auto channel = std::make_shared(); auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::MIDDLE); int total = 0; for (int num : info.entries_nums) { total += num; } if (total != info.row_count) { AssertInfo(total <= info.row_count, "binlog number should less than or equal row_count"); auto field_meta = get_schema()[field_id]; AssertInfo(field_meta.is_nullable(), "nullable must be true when lack rows"); auto lack_num = info.row_count - total; auto field_data = storage::CreateFieldData(field_meta.get_data_type(), field_meta.get_element_type(), true, 1, lack_num); field_data->FillFieldData(field_meta.default_value(), lack_num); channel->push(field_data); } LOG_INFO("segment {} loads field {} with num_rows {}", this->get_segment_id(), field_id.get(), num_rows); auto load_future = pool.Submit(LoadFieldDatasFromRemote, insert_files, channel, infos.load_priority); LOG_INFO("segment {} submits load field {} task to thread pool", this->get_segment_id(), field_id.get()); auto field_data = storage::CollectFieldDataChannel(channel); load_field_data_common( field_id, reserved_offset, field_data, primary_field_id, num_rows); } // step 5: update small indexes insert_record_.ack_responder_.AddSegment(reserved_offset, reserved_offset + num_rows); } void SegmentGrowingImpl::load_field_data_common( FieldId field_id, size_t reserved_offset, const std::vector& field_data, FieldId primary_field_id, size_t num_rows) { if (field_id == TimestampFieldID) { // step 2: sort timestamp // query node already guarantees that the timestamp is ordered, avoid field data copy in c++ // step 3: fill into Segment.ConcurrentVector insert_record_.timestamps_.set_data_raw(reserved_offset, field_data); return; } if (field_id == RowFieldID) { return; } if (!indexing_record_.HasRawData(field_id)) { if (insert_record_.is_valid_data_exist(field_id)) { insert_record_.get_valid_data(field_id)->set_data_raw(field_data); } insert_record_.get_data_base(field_id)->set_data_raw(reserved_offset, field_data); } if (segcore_config_.get_enable_interim_segment_index()) { auto offset = reserved_offset; for (auto& data : field_data) { auto row_count = data->get_num_rows(); indexing_record_.AppendingIndex( offset, row_count, field_id, data, insert_record_); offset += row_count; } } try_remove_chunks(field_id); if (field_id == primary_field_id) { insert_record_.insert_pks(field_data); } // update average row data size auto field_meta = (*schema_)[field_id]; if (IsVariableDataType(field_meta.get_data_type())) { SegmentInternalInterface::set_field_avg_size( field_id, num_rows, storage::GetByteSizeOfFieldDatas(field_data)); } // build text match index if (field_meta.enable_match()) { auto index = GetTextIndex(field_id); index->BuildIndexFromFieldData(field_data, field_meta.is_nullable()); index->Commit(); // Reload reader so that the index can be read immediately index->Reload(); } // update the mem size stats_.mem_size += storage::GetByteSizeOfFieldDatas(field_data); LOG_INFO("segment {} loads field {} done", this->get_segment_id(), field_id.get()); } void SegmentGrowingImpl::load_column_group_data_internal( const LoadFieldDataInfo& infos) { auto primary_field_id = schema_->get_primary_field_id().value_or(FieldId(-1)); size_t num_rows = storage::GetNumRowsForLoadInfo(infos); auto reserved_offset = PreInsert(num_rows); auto parallel_degree = static_cast(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE); ArrowSchemaPtr arrow_schema = schema_->ConvertToArrowSchema(); for (auto& [id, info] : infos.field_infos) { auto column_group_id = FieldId(id); auto insert_files = info.insert_files; storage::SortByPath(insert_files); auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance() .GetArrowFileSystem(); auto column_group_info = FieldDataInfo(column_group_id.get(), num_rows, ""); column_group_info.arrow_reader_channel->set_capacity(parallel_degree); LOG_INFO( "[StorageV2] segment {} loads column group {} with num_rows {}", this->get_segment_id(), column_group_id.get(), num_rows); auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::MIDDLE); // Get all row groups for each file std::vector> row_group_lists; row_group_lists.reserve(insert_files.size()); for (const auto& file : insert_files) { auto reader = std::make_shared( fs, file, milvus_storage::DEFAULT_READ_BUFFER_SIZE, storage::GetReaderProperties()); auto row_group_num = reader->file_metadata()->GetRowGroupMetadataVector().size(); std::vector all_row_groups(row_group_num); std::iota(all_row_groups.begin(), all_row_groups.end(), 0); row_group_lists.push_back(all_row_groups); auto status = reader->Close(); AssertInfo( status.ok(), "[StorageV2] failed to close file reader when get row group " "metadata from file {} with error {}", file, status.ToString()); } // create parallel degree split strategy auto strategy = std::make_unique(parallel_degree); auto load_future = pool.Submit([&]() { return LoadWithStrategy(insert_files, column_group_info.arrow_reader_channel, DEFAULT_FIELD_MAX_MEMORY_LIMIT, std::move(strategy), row_group_lists, fs, nullptr, infos.load_priority); }); LOG_INFO( "[StorageV2] segment {} submits load column group {} task to " "thread pool", this->get_segment_id(), column_group_id.get()); std::shared_ptr r; std::unordered_map> field_data_map; while (column_group_info.arrow_reader_channel->pop(r)) { for (const auto& table_info : r->arrow_tables) { size_t batch_num_rows = table_info.table->num_rows(); for (int i = 0; i < table_info.table->schema()->num_fields(); ++i) { AssertInfo( table_info.table->schema() ->field(i) ->metadata() ->Contains(milvus_storage::ARROW_FIELD_ID_KEY), "[StorageV2] field id not found in metadata for field " "{}", table_info.table->schema()->field(i)->name()); auto field_id = std::stoll(table_info.table->schema() ->field(i) ->metadata() ->Get(milvus_storage::ARROW_FIELD_ID_KEY) ->data()); for (auto& field : schema_->get_fields()) { if (field.second.get_id().get() != field_id) { continue; } auto data_type = field.second.get_data_type(); auto field_data = storage::CreateFieldData( data_type, field.second.get_element_type(), field.second.is_nullable(), IsVectorDataType(data_type) && !IsSparseFloatVectorDataType(data_type) ? field.second.get_dim() : 1, batch_num_rows); field_data->FillFieldData(table_info.table->column(i)); field_data_map[FieldId(field_id)].push_back(field_data); } } } } for (auto& [field_id, field_data] : field_data_map) { load_field_data_common(field_id, reserved_offset, field_data, primary_field_id, num_rows); } } // step 5: update small indexes insert_record_.ack_responder_.AddSegment(reserved_offset, reserved_offset + num_rows); } SegcoreError SegmentGrowingImpl::Delete(int64_t size, const IdArray* ids, const Timestamp* timestamps_raw) { auto field_id = schema_->get_primary_field_id().value_or(FieldId(-1)); AssertInfo(field_id.get() != -1, "Primary key is -1"); auto& field_meta = schema_->operator[](field_id); std::vector pks(size); ParsePksFromIDs(pks, field_meta.get_data_type(), *ids); // filter out the deletions that the primary key not exists std::vector> ordering(size); for (int i = 0; i < size; i++) { ordering[i] = std::make_tuple(timestamps_raw[i], pks[i]); } auto end = std::remove_if(ordering.begin(), ordering.end(), [&](const std::tuple& record) { return !insert_record_.contain(std::get<1>(record)); }); size = end - ordering.begin(); ordering.resize(size); if (size == 0) { return SegcoreError::success(); } // step 1: sort timestamp std::sort(ordering.begin(), ordering.end()); std::vector sort_pks(size); std::vector sort_timestamps(size); for (int i = 0; i < size; i++) { auto [t, pk] = ordering[i]; sort_timestamps[i] = t; sort_pks[i] = pk; } // step 2: fill delete record deleted_record_.StreamPush(sort_pks, sort_timestamps.data()); return SegcoreError::success(); } void SegmentGrowingImpl::LoadDeletedRecord(const LoadDeletedRecordInfo& info) { AssertInfo(info.row_count > 0, "The row count of deleted record is 0"); AssertInfo(info.primary_keys, "Deleted primary keys is null"); AssertInfo(info.timestamps, "Deleted timestamps is null"); // step 1: get pks and timestamps auto field_id = schema_->get_primary_field_id().value_or(FieldId(INVALID_FIELD_ID)); AssertInfo(field_id.get() != INVALID_FIELD_ID, "Primary key has invalid field id"); auto& field_meta = schema_->operator[](field_id); int64_t size = info.row_count; std::vector pks(size); ParsePksFromIDs(pks, field_meta.get_data_type(), *info.primary_keys); auto timestamps = reinterpret_cast(info.timestamps); // step 2: push delete info to delete_record deleted_record_.LoadPush(pks, timestamps); } PinWrapper SegmentGrowingImpl::chunk_data_impl(FieldId field_id, int64_t chunk_id) const { return PinWrapper( get_insert_record().get_span_base(field_id, chunk_id)); } PinWrapper, FixedVector>> SegmentGrowingImpl::chunk_string_view_impl( FieldId field_id, int64_t chunk_id, std::optional> offset_len = std::nullopt) const { ThrowInfo(ErrorCode::NotImplemented, "chunk string view impl not implement for growing segment"); } PinWrapper, FixedVector>> SegmentGrowingImpl::chunk_array_view_impl( FieldId field_id, int64_t chunk_id, std::optional> offset_len = std::nullopt) const { ThrowInfo(ErrorCode::NotImplemented, "chunk array view impl not implement for growing segment"); } PinWrapper, FixedVector>> SegmentGrowingImpl::chunk_vector_array_view_impl( FieldId field_id, int64_t chunk_id, std::optional> offset_len = std::nullopt) const { ThrowInfo(ErrorCode::NotImplemented, "chunk vector array view impl not implement for growing segment"); } PinWrapper, FixedVector>> SegmentGrowingImpl::chunk_string_views_by_offsets( FieldId field_id, int64_t chunk_id, const FixedVector& offsets) const { ThrowInfo(ErrorCode::NotImplemented, "chunk view by offsets not implemented for growing segment"); } PinWrapper, FixedVector>> SegmentGrowingImpl::chunk_array_views_by_offsets( FieldId field_id, int64_t chunk_id, const FixedVector& offsets) const { ThrowInfo( ErrorCode::NotImplemented, "chunk array views by offsets not implemented for growing segment"); } int64_t SegmentGrowingImpl::num_chunk(FieldId field_id) const { auto size = get_insert_record().ack_responder_.GetAck(); return upper_div(size, segcore_config_.get_chunk_rows()); } DataType SegmentGrowingImpl::GetFieldDataType(milvus::FieldId field_id) const { auto& field_meta = schema_->operator[](field_id); return field_meta.get_data_type(); } void SegmentGrowingImpl::search_batch_pks( const std::vector& pks, const Timestamp* timestamps, bool include_same_ts, const std::function& callback) const { for (size_t i = 0; i < pks.size(); ++i) { auto timestamp = timestamps[i]; auto offsets = insert_record_.search_pk(pks[i], timestamp, include_same_ts); for (auto offset : offsets) { callback(offset, timestamp); } } } void SegmentGrowingImpl::vector_search(SearchInfo& search_info, const void* query_data, const size_t* query_lims, int64_t query_count, Timestamp timestamp, const BitsetView& bitset, milvus::OpContext* op_context, SearchResult& output) const { query::SearchOnGrowing(*this, search_info, query_data, query_lims, query_count, timestamp, bitset, op_context, output); } std::unique_ptr SegmentGrowingImpl::bulk_subscript( FieldId field_id, const int64_t* seg_offsets, int64_t count, const std::vector& dynamic_field_names) const { Assert(!dynamic_field_names.empty()); auto& field_meta = schema_->operator[](field_id); auto vec_ptr = insert_record_.get_data_base(field_id); auto result = CreateEmptyScalarDataArray(count, field_meta); if (field_meta.is_nullable()) { auto valid_data_ptr = insert_record_.get_valid_data(field_id); auto res = result->mutable_valid_data()->mutable_data(); for (int64_t i = 0; i < count; ++i) { auto offset = seg_offsets[i]; res[i] = valid_data_ptr->is_valid(offset); } } auto vec = dynamic_cast*>(vec_ptr); auto dst = result->mutable_scalars()->mutable_json_data()->mutable_data(); auto& src = *vec; for (int64_t i = 0; i < count; ++i) { auto offset = seg_offsets[i]; dst->at(i) = ExtractSubJson(std::string(src[offset]), dynamic_field_names); } return result; } std::unique_ptr SegmentGrowingImpl::bulk_subscript(FieldId field_id, const int64_t* seg_offsets, int64_t count) const { auto& field_meta = schema_->operator[](field_id); auto vec_ptr = insert_record_.get_data_base(field_id); if (field_meta.is_vector()) { auto result = CreateEmptyVectorDataArray(count, field_meta); if (field_meta.get_data_type() == DataType::VECTOR_FLOAT) { bulk_subscript_impl(field_id, field_meta.get_sizeof(), vec_ptr, seg_offsets, count, result->mutable_vectors() ->mutable_float_vector() ->mutable_data() ->mutable_data()); } else if (field_meta.get_data_type() == DataType::VECTOR_BINARY) { bulk_subscript_impl( field_id, field_meta.get_sizeof(), vec_ptr, seg_offsets, count, result->mutable_vectors()->mutable_binary_vector()->data()); } else if (field_meta.get_data_type() == DataType::VECTOR_FLOAT16) { bulk_subscript_impl( field_id, field_meta.get_sizeof(), vec_ptr, seg_offsets, count, result->mutable_vectors()->mutable_float16_vector()->data()); } else if (field_meta.get_data_type() == DataType::VECTOR_BFLOAT16) { bulk_subscript_impl( field_id, field_meta.get_sizeof(), vec_ptr, seg_offsets, count, result->mutable_vectors()->mutable_bfloat16_vector()->data()); } else if (field_meta.get_data_type() == DataType::VECTOR_SPARSE_U32_F32) { bulk_subscript_sparse_float_vector_impl( field_id, (const ConcurrentVector*)vec_ptr, seg_offsets, count, result->mutable_vectors()->mutable_sparse_float_vector()); result->mutable_vectors()->set_dim( result->vectors().sparse_float_vector().dim()); } else if (field_meta.get_data_type() == DataType::VECTOR_INT8) { bulk_subscript_impl( field_id, field_meta.get_sizeof(), vec_ptr, seg_offsets, count, result->mutable_vectors()->mutable_int8_vector()->data()); } else if (field_meta.get_data_type() == DataType::VECTOR_ARRAY) { bulk_subscript_vector_array_impl(*vec_ptr, seg_offsets, count, result->mutable_vectors() ->mutable_vector_array() ->mutable_data()); } else { ThrowInfo(DataTypeInvalid, "logical error"); } return result; } AssertInfo(!field_meta.is_vector(), "Scalar field meta type is vector type"); auto result = CreateEmptyScalarDataArray(count, field_meta); if (field_meta.is_nullable()) { auto valid_data_ptr = insert_record_.get_valid_data(field_id); auto res = result->mutable_valid_data()->mutable_data(); for (int64_t i = 0; i < count; ++i) { auto offset = seg_offsets[i]; res[i] = valid_data_ptr->is_valid(offset); } } switch (field_meta.get_data_type()) { case DataType::BOOL: { bulk_subscript_impl(vec_ptr, seg_offsets, count, result->mutable_scalars() ->mutable_bool_data() ->mutable_data() ->mutable_data()); break; } case DataType::INT8: { bulk_subscript_impl(vec_ptr, seg_offsets, count, result->mutable_scalars() ->mutable_int_data() ->mutable_data() ->mutable_data()); break; } case DataType::INT16: { bulk_subscript_impl(vec_ptr, seg_offsets, count, result->mutable_scalars() ->mutable_int_data() ->mutable_data() ->mutable_data()); break; } case DataType::INT32: { bulk_subscript_impl(vec_ptr, seg_offsets, count, result->mutable_scalars() ->mutable_int_data() ->mutable_data() ->mutable_data()); break; } case DataType::INT64: { bulk_subscript_impl(vec_ptr, seg_offsets, count, result->mutable_scalars() ->mutable_long_data() ->mutable_data() ->mutable_data()); break; } case DataType::FLOAT: { bulk_subscript_impl(vec_ptr, seg_offsets, count, result->mutable_scalars() ->mutable_float_data() ->mutable_data() ->mutable_data()); break; } case DataType::DOUBLE: { bulk_subscript_impl(vec_ptr, seg_offsets, count, result->mutable_scalars() ->mutable_double_data() ->mutable_data() ->mutable_data()); break; } case DataType::TIMESTAMPTZ: { bulk_subscript_impl(vec_ptr, seg_offsets, count, result->mutable_scalars() ->mutable_timestamptz_data() ->mutable_data() ->mutable_data()); break; } case DataType::VARCHAR: case DataType::TEXT: { bulk_subscript_ptr_impl(vec_ptr, seg_offsets, count, result->mutable_scalars() ->mutable_string_data() ->mutable_data()); break; } case DataType::JSON: { bulk_subscript_ptr_impl( vec_ptr, seg_offsets, count, result->mutable_scalars()->mutable_json_data()->mutable_data()); break; } case DataType::ARRAY: { // element bulk_subscript_array_impl(*vec_ptr, seg_offsets, count, result->mutable_scalars() ->mutable_array_data() ->mutable_data()); break; } default: { ThrowInfo( DataTypeInvalid, fmt::format("unsupported type {}", field_meta.get_data_type())); } } return result; } void SegmentGrowingImpl::bulk_subscript_sparse_float_vector_impl( FieldId field_id, const ConcurrentVector* vec_raw, const int64_t* seg_offsets, int64_t count, milvus::proto::schema::SparseFloatArray* output) const { AssertInfo(HasRawData(field_id.get()), "Growing segment loss raw data"); // if index has finished building, grab from index without any // synchronization operations. if (indexing_record_.SyncDataWithIndex(field_id)) { indexing_record_.GetDataFromIndex( field_id, seg_offsets, count, 0, output); return; } { std::lock_guard guard(chunk_mutex_); // check again after lock to make sure: if index has finished building // after the above check but before we grabbed the lock, we should grab // from index as the data in chunk may have been removed in // try_remove_chunks. if (!indexing_record_.SyncDataWithIndex(field_id)) { // copy from raw data SparseRowsToProto( [&](size_t i) { auto offset = seg_offsets[i]; return offset != INVALID_SEG_OFFSET ? vec_raw->get_element(offset) : nullptr; }, count, output); return; } // else: release lock and copy from index } indexing_record_.GetDataFromIndex(field_id, seg_offsets, count, 0, output); } template void SegmentGrowingImpl::bulk_subscript_ptr_impl( const VectorBase* vec_raw, const int64_t* seg_offsets, int64_t count, google::protobuf::RepeatedPtrField* dst) const { auto vec = dynamic_cast*>(vec_raw); auto& src = *vec; for (int64_t i = 0; i < count; ++i) { auto offset = seg_offsets[i]; if (IsVariableTypeSupportInChunk && src.is_mmap()) { dst->at(i) = std::move(std::string(src.view_element(offset))); } else { dst->at(i) = std::move(std::string(src[offset])); } } } template void SegmentGrowingImpl::bulk_subscript_impl(FieldId field_id, int64_t element_sizeof, const VectorBase* vec_raw, const int64_t* seg_offsets, int64_t count, void* output_raw) const { static_assert(IsVector); auto vec_ptr = dynamic_cast*>(vec_raw); AssertInfo(vec_ptr, "Pointer of vec_raw is nullptr"); auto& vec = *vec_ptr; // HasRawData interface guarantees that data can be fetched from growing segment AssertInfo(HasRawData(field_id.get()), "Growing segment loss raw data"); // if index has finished building, grab from index without any // synchronization operations. if (indexing_record_.HasRawData(field_id)) { indexing_record_.GetDataFromIndex( field_id, seg_offsets, count, element_sizeof, output_raw); return; } { std::lock_guard guard(chunk_mutex_); // check again after lock to make sure: if index has finished building // after the above check but before we grabbed the lock, we should grab // from index as the data in chunk may have been removed in // try_remove_chunks. if (!indexing_record_.HasRawData(field_id)) { auto output_base = reinterpret_cast(output_raw); for (int i = 0; i < count; ++i) { auto dst = output_base + i * element_sizeof; auto offset = seg_offsets[i]; if (offset == INVALID_SEG_OFFSET) { memset(dst, 0, element_sizeof); } else { auto src = (const uint8_t*)vec.get_element(offset); memcpy(dst, src, element_sizeof); } } return; } // else: release lock and copy from index } indexing_record_.GetDataFromIndex( field_id, seg_offsets, count, element_sizeof, output_raw); } template void SegmentGrowingImpl::bulk_subscript_impl(const VectorBase* vec_raw, const int64_t* seg_offsets, int64_t count, T* output) const { static_assert(IsScalar); auto vec_ptr = dynamic_cast*>(vec_raw); AssertInfo(vec_ptr, "Pointer of vec_raw is nullptr"); auto& vec = *vec_ptr; for (int64_t i = 0; i < count; ++i) { auto offset = seg_offsets[i]; output[i] = vec[offset]; } } template void SegmentGrowingImpl::bulk_subscript_array_impl( const VectorBase& vec_raw, const int64_t* seg_offsets, int64_t count, google::protobuf::RepeatedPtrField* dst) const { auto vec_ptr = dynamic_cast*>(&vec_raw); AssertInfo(vec_ptr, "Pointer of vec_raw is nullptr"); auto& vec = *vec_ptr; for (int64_t i = 0; i < count; ++i) { auto offset = seg_offsets[i]; if (offset != INVALID_SEG_OFFSET) { dst->at(i) = vec[offset].output_data(); } } } template void SegmentGrowingImpl::bulk_subscript_vector_array_impl( const VectorBase& vec_raw, const int64_t* seg_offsets, int64_t count, google::protobuf::RepeatedPtrField* dst) const { auto vec_ptr = dynamic_cast*>(&vec_raw); AssertInfo(vec_ptr, "Pointer of vec_raw is nullptr"); auto& vec = *vec_ptr; for (int64_t i = 0; i < count; ++i) { auto offset = seg_offsets[i]; if (offset != INVALID_SEG_OFFSET) { dst->at(i) = vec[offset].output_data(); } } } void SegmentGrowingImpl::bulk_subscript(SystemFieldType system_type, const int64_t* seg_offsets, int64_t count, void* output) const { switch (system_type) { case SystemFieldType::Timestamp: bulk_subscript_impl(&this->insert_record_.timestamps_, seg_offsets, count, static_cast(output)); break; case SystemFieldType::RowId: ThrowInfo(ErrorCode::Unsupported, "RowId retrieve is not supported"); break; default: ThrowInfo(DataTypeInvalid, "unknown subscript fields"); } } std::vector SegmentGrowingImpl::search_ids(const IdArray& id_array, Timestamp timestamp) const { auto field_id = schema_->get_primary_field_id().value_or(FieldId(-1)); AssertInfo(field_id.get() != -1, "Primary key is -1"); auto& field_meta = schema_->operator[](field_id); auto data_type = field_meta.get_data_type(); auto ids_size = GetSizeOfIdArray(id_array); std::vector pks(ids_size); ParsePksFromIDs(pks, data_type, id_array); std::vector res_offsets; res_offsets.reserve(pks.size()); for (auto& pk : pks) { auto segOffsets = insert_record_.search_pk(pk, timestamp); for (auto offset : segOffsets) { res_offsets.push_back(offset); } } return std::move(res_offsets); } std::string SegmentGrowingImpl::debug() const { return "Growing\n"; } int64_t SegmentGrowingImpl::get_active_count(Timestamp ts) const { auto row_count = this->get_row_count(); auto& ts_vec = this->get_insert_record().timestamps_; auto iter = std::upper_bound( boost::make_counting_iterator(static_cast(0)), boost::make_counting_iterator(row_count), ts, [&](Timestamp ts, int64_t index) { return ts < ts_vec[index]; }); return *iter; } void SegmentGrowingImpl::mask_with_timestamps(BitsetTypeView& bitset_chunk, Timestamp timestamp, Timestamp collection_ttl) const { if (collection_ttl > 0) { auto& timestamps = get_timestamps(); auto size = bitset_chunk.size(); if (timestamps[size - 1] <= collection_ttl) { bitset_chunk.set(); return; } auto pilot = upper_bound(timestamps, 0, size, collection_ttl); BitsetType bitset; bitset.reserve(size); bitset.resize(pilot, true); bitset.resize(size, false); bitset_chunk |= bitset; } } void SegmentGrowingImpl::CreateTextIndex(FieldId field_id) { std::unique_lock lock(mutex_); const auto& field_meta = schema_->operator[](field_id); AssertInfo(IsStringDataType(field_meta.get_data_type()), "cannot create text index on non-string type"); std::string unique_id = GetUniqueFieldId(field_meta.get_id().get()); // todo: make this(200) configurable. auto index = std::make_unique( 200, unique_id.c_str(), "milvus_tokenizer", field_meta.get_analyzer_params().c_str()); index->Commit(); index->CreateReader(milvus::index::SetBitsetGrowing); index->RegisterTokenizer("milvus_tokenizer", field_meta.get_analyzer_params().c_str()); text_indexes_[field_id] = std::move(index); } void SegmentGrowingImpl::CreateTextIndexes() { for (auto [field_id, field_meta] : schema_->get_fields()) { if (IsStringDataType(field_meta.get_data_type()) && field_meta.enable_match()) { CreateTextIndex(FieldId(field_id)); } } } void SegmentGrowingImpl::AddTexts(milvus::FieldId field_id, const std::string* texts, const bool* texts_valid_data, size_t n, int64_t offset_begin) { std::unique_lock lock(mutex_); auto iter = text_indexes_.find(field_id); if (iter == text_indexes_.end()) { throw SegcoreError( ErrorCode::TextIndexNotFound, fmt::format("text index not found for field {}", field_id.get())); } iter->second->AddTextsGrowing(n, texts, texts_valid_data, offset_begin); } void SegmentGrowingImpl::BulkGetJsonData( FieldId field_id, std::function fn, const int64_t* offsets, int64_t count) const { auto vec_ptr = dynamic_cast*>( insert_record_.get_data_base(field_id)); auto& src = *vec_ptr; auto& field_meta = schema_->operator[](field_id); if (field_meta.is_nullable()) { auto valid_data_ptr = insert_record_.get_valid_data(field_id); for (int64_t i = 0; i < count; ++i) { auto offset = offsets[i]; fn(src[offset], i, valid_data_ptr->is_valid(offset)); } } else { for (int64_t i = 0; i < count; ++i) { auto offset = offsets[i]; fn(src[offset], i, true); } } } void SegmentGrowingImpl::LazyCheckSchema(SchemaPtr sch) { if (sch->get_schema_version() > schema_->get_schema_version()) { LOG_INFO( "lazy check schema segment {} found newer schema version, current " "schema version {}, new schema version {}", id_, schema_->get_schema_version(), sch->get_schema_version()); Reopen(sch); } } void SegmentGrowingImpl::Reopen(SchemaPtr sch) { std::unique_lock lck(sch_mutex_); // double check condition, avoid multiple assignment if (sch->get_schema_version() > schema_->get_schema_version()) { auto absent_fields = sch->AbsentFields(*schema_); for (const auto& field_meta : *absent_fields) { fill_empty_field(field_meta); } schema_ = sch; } } void SegmentGrowingImpl::FinishLoad() { for (const auto& [field_id, field_meta] : schema_->get_fields()) { if (field_id.get() < START_USER_FIELDID) { continue; } // append_data is called according to schema before // so we must check data empty here if (!IsVectorDataType(field_meta.get_data_type()) && insert_record_.get_data_base(field_id)->empty()) { fill_empty_field(field_meta); } } } void SegmentGrowingImpl::fill_empty_field(const FieldMeta& field_meta) { auto field_id = field_meta.get_id(); LOG_INFO("start fill empty field {} (data type {}) for growing segment {}", field_meta.get_data_type(), field_id.get(), id_); // append meta only needed when schema is old // loading old segment with new schema will have meta appended if (!insert_record_.is_data_exist(field_id)) { insert_record_.append_field_meta( field_id, field_meta, size_per_chunk(), mmap_descriptor_); } auto total_row_num = insert_record_.size(); auto data = bulk_subscript_not_exist_field(field_meta, total_row_num); insert_record_.get_valid_data(field_id)->set_data_raw( total_row_num, data.get(), field_meta); insert_record_.get_data_base(field_id)->set_data_raw( 0, total_row_num, data.get(), field_meta); LOG_INFO("fill empty field {} (data type {}) for growing segment {} done", field_meta.get_data_type(), field_id.get(), id_); } } // namespace milvus::segcore