// Licensed to the LF AI & Data foundation under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #pragma once #include "common/Utils.h" #include "common/Span.h" #include "mmap/ChunkData.h" #include "storage/MmapManager.h" #include "segcore/SegcoreConfig.h" #include "common/TypeTraits.h" namespace milvus { template class ChunkVectorBase { public: virtual ~ChunkVectorBase() = default; virtual void emplace_to_at_least(int64_t chunk_num, int64_t chunk_size) = 0; virtual void copy_to_chunk(int64_t chunk_id, int64_t offest, const Type* data, int64_t length, const std::optional& check_data_valid) = 0; virtual void* get_chunk_data(int64_t index) = 0; virtual int64_t get_chunk_size(int64_t index) = 0; virtual int64_t get_element_size() = 0; virtual int64_t get_element_offset(int64_t index) = 0; virtual ChunkViewType view_element(int64_t chunk_id, int64_t chunk_offset) = 0; int64_t size() const { return counter_; } virtual void clear() = 0; virtual SpanBase get_span(int64_t chunk_id) = 0; virtual bool is_mmap() const = 0; protected: std::atomic counter_ = 0; }; template using ChunkVectorPtr = std::unique_ptr>; template , bool IsMmap = false> class ThreadSafeChunkVector : public ChunkVectorBase { public: ThreadSafeChunkVector( storage::MmapChunkDescriptorPtr descriptor = nullptr) { mmap_descriptor_ = descriptor; } void emplace_to_at_least(int64_t chunk_num, int64_t chunk_size) override { std::unique_lock lck(this->mutex_); if (chunk_num <= this->counter_) { return; } while (vec_.size() < chunk_num) { if constexpr (IsMmap) { vec_.emplace_back(chunk_size, mmap_descriptor_); } else { vec_.emplace_back(chunk_size); } ++this->counter_; } } void copy_to_chunk( int64_t chunk_id, int64_t offset, const Type* data, int64_t length, const std::optional& check_data_valid) override { std::unique_lock lck(mutex_); AssertInfo(chunk_id < this->counter_, fmt::format("index out of range, index={}, counter_={}", chunk_id, this->counter_)); if constexpr (!IsMmap || !IsVariableType) { auto ptr = (Type*)vec_[chunk_id].data(); AssertInfo( offset + length <= vec_[chunk_id].size(), fmt::format( "index out of chunk range, offset={}, length={}, size={}", offset, length, vec_[chunk_id].size())); std::copy_n(data, length, ptr + offset); } else { vec_[chunk_id].set(data, offset, length, check_data_valid); } } ChunkViewType view_element(int64_t chunk_id, int64_t chunk_offset) override { std::shared_lock lck(mutex_); auto& chunk = vec_[chunk_id]; if constexpr (IsMmap) { return chunk.view(chunk_offset); } else if constexpr (std::is_same_v) { return std::string_view(chunk[chunk_offset].data(), chunk[chunk_offset].size()); } else if constexpr (std::is_same_v) { auto& src = chunk[chunk_offset]; return ArrayView(const_cast(src.data()), src.length(), src.byte_size(), src.get_element_type(), src.get_offsets_data()); } else if constexpr (std::is_same_v) { auto& src = chunk[chunk_offset]; return VectorArrayView(const_cast(src.data()), src.dim(), src.length(), src.byte_size(), src.get_element_type()); } else { return chunk[chunk_offset]; } } void* get_chunk_data(int64_t index) override { std::shared_lock lck(mutex_); AssertInfo(index < this->counter_, fmt::format("index out of range, index={}, counter_={}", index, this->counter_)); return vec_[index].data(); } int64_t get_chunk_size(int64_t index) override { std::shared_lock lck(mutex_); AssertInfo(index < this->counter_, fmt::format("index out of range, index={}, counter_={}", index, this->counter_)); return vec_[index].size(); } void clear() override { std::unique_lock lck(mutex_); this->counter_ = 0; vec_.clear(); } int64_t get_element_size() override { std::shared_lock lck(mutex_); if constexpr (IsMmap && std::is_same_v) { return sizeof(ChunkViewType); } return sizeof(Type); } int64_t get_element_offset(int64_t index) override { std::shared_lock lck(mutex_); int64_t offset = 0; for (int i = 0; i < index; i++) { offset += vec_[i].size(); } return offset; } SpanBase get_span(int64_t chunk_id) override { std::shared_lock lck(mutex_); if constexpr (IsMmap && std::is_same_v) { return SpanBase(get_chunk_data(chunk_id), get_chunk_size(chunk_id), sizeof(ChunkViewType)); } else { return SpanBase(get_chunk_data(chunk_id), get_chunk_size(chunk_id), sizeof(Type)); } } bool is_mmap() const override { return mmap_descriptor_ != nullptr; } private: mutable std::shared_mutex mutex_; storage::MmapChunkDescriptorPtr mmap_descriptor_ = nullptr; std::deque vec_; }; template ChunkVectorPtr SelectChunkVectorPtr(storage::MmapChunkDescriptorPtr& mmap_descriptor) { if constexpr (!IsVariableType) { if (mmap_descriptor != nullptr) { return std::make_unique< ThreadSafeChunkVector, true>>( mmap_descriptor); } else { return std::make_unique>(); } } else if constexpr (IsVariableTypeSupportInChunk) { if (mmap_descriptor != nullptr) { return std::make_unique< ThreadSafeChunkVector, true>>( mmap_descriptor); } else { return std::make_unique>(); } } else { return std::make_unique>(); } } } // namespace milvus