From 7ccfa5b9f09931104a7ff7340001bf08321126fc Mon Sep 17 00:00:00 2001 From: Cai Yudong Date: Thu, 2 Apr 2020 22:44:45 +0800 Subject: [PATCH] Caiyd 1627 move rw index (#1833) * #1627 move read/write index into codec Signed-off-by: yudong.cai * fix clang-format Signed-off-by: yudong.cai * update changelog Signed-off-by: yudong.cai * update unittest Signed-off-by: yudong.cai * code optimize Signed-off-by: yudong.cai * update WriteVectorIndex Signed-off-by: yudong.cai * fix clang-format Signed-off-by: yudong.cai --- CHANGELOG.md | 1 + core/src/codecs/Codec.h | 8 +- ...ctorsIndexFormat.h => VectorIndexFormat.h} | 22 ++- core/src/codecs/default/DefaultCodec.cpp | 7 + core/src/codecs/default/DefaultCodec.h | 4 + .../default/DefaultVectorIndexFormat.cpp | 178 ++++++++++++++++++ .../codecs/default/DefaultVectorIndexFormat.h | 58 ++++++ core/src/db/engine/ExecutionEngineImpl.cpp | 18 +- core/src/index/archive/VecIndex.cpp | 173 ----------------- core/src/index/archive/VecIndex.h | 37 ---- .../knowhere/index/vector_index/VecIndex.h | 9 +- core/src/segment/SegmentReader.cpp | 15 ++ core/src/segment/SegmentReader.h | 3 + core/src/segment/SegmentWriter.cpp | 20 ++ core/src/segment/SegmentWriter.h | 6 + core/src/segment/Types.h | 2 + core/src/segment/VectorIndex.h | 31 ++- core/src/storage/disk/DiskIOReader.h | 3 + core/src/storage/disk/DiskIOWriter.h | 3 + core/src/storage/s3/S3IOReader.h | 3 + core/src/storage/s3/S3IOWriter.h | 3 + core/unittest/metrics/CMakeLists.txt | 1 + core/unittest/metrics/test_metrics.cpp | 2 - 23 files changed, 369 insertions(+), 238 deletions(-) rename core/src/codecs/{VectorsIndexFormat.h => VectorIndexFormat.h} (67%) create mode 100644 core/src/codecs/default/DefaultVectorIndexFormat.cpp create mode 100644 core/src/codecs/default/DefaultVectorIndexFormat.h delete mode 100644 core/src/index/archive/VecIndex.cpp delete mode 100644 core/src/index/archive/VecIndex.h diff --git a/CHANGELOG.md b/CHANGELOG.md index c6b8d5e388..3bc8066305 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ Please mark all change in change log and use the issue from GitHub - \#1849 NSG support deleted vectors searching ## Improvement +- \#1627 Move read/write index APIs into codec - \#1784 Add Substructure and Superstructure in http module ## Task diff --git a/core/src/codecs/Codec.h b/core/src/codecs/Codec.h index 1371f1e7c0..4242585547 100644 --- a/core/src/codecs/Codec.h +++ b/core/src/codecs/Codec.h @@ -22,8 +22,8 @@ #include "DeletedDocsFormat.h" #include "IdBloomFilterFormat.h" #include "IdIndexFormat.h" +#include "VectorIndexFormat.h" #include "VectorsFormat.h" -#include "VectorsIndexFormat.h" namespace milvus { namespace codec { @@ -33,6 +33,9 @@ class Codec { virtual VectorsFormatPtr GetVectorsFormat() = 0; + virtual VectorIndexFormatPtr + GetVectorIndexFormat() = 0; + virtual DeletedDocsFormatPtr GetDeletedDocsFormat() = 0; @@ -44,9 +47,6 @@ class Codec { virtual AttrsFormat GetAttrsFormat() = 0; - virtual VectorsIndexFormat - GetVectorsIndexFormat() = 0; - virtual AttrsIndexFormat GetAttrsIndexFormat() = 0; diff --git a/core/src/codecs/VectorsIndexFormat.h b/core/src/codecs/VectorIndexFormat.h similarity index 67% rename from core/src/codecs/VectorsIndexFormat.h rename to core/src/codecs/VectorIndexFormat.h index 1c0f03f3d5..c3b113cb06 100644 --- a/core/src/codecs/VectorsIndexFormat.h +++ b/core/src/codecs/VectorIndexFormat.h @@ -17,17 +17,25 @@ #pragma once +#include +#include + +#include "segment/VectorIndex.h" +#include "storage/FSHandler.h" + namespace milvus { namespace codec { -class VectorsIndexFormat { - // public: - // virtual VectorsIndex - // read() = 0; - // - // virtual void - // write(VectorsIndex vectors_index) = 0; +class VectorIndexFormat { + public: + virtual void + read(const storage::FSHandlerPtr& fs_ptr, segment::VectorIndexPtr& vector_index) = 0; + + virtual void + write(const storage::FSHandlerPtr& fs_ptr, const segment::VectorIndexPtr& vector_index) = 0; }; +using VectorIndexFormatPtr = std::shared_ptr; + } // namespace codec } // namespace milvus diff --git a/core/src/codecs/default/DefaultCodec.cpp b/core/src/codecs/default/DefaultCodec.cpp index e7611baf0d..71ceacd348 100644 --- a/core/src/codecs/default/DefaultCodec.cpp +++ b/core/src/codecs/default/DefaultCodec.cpp @@ -21,6 +21,7 @@ #include "DefaultDeletedDocsFormat.h" #include "DefaultIdBloomFilterFormat.h" +#include "DefaultVectorIndexFormat.h" #include "DefaultVectorsFormat.h" namespace milvus { @@ -28,6 +29,7 @@ namespace codec { DefaultCodec::DefaultCodec() { vectors_format_ptr_ = std::make_shared(); + vector_index_format_ptr_ = std::make_shared(); deleted_docs_format_ptr_ = std::make_shared(); id_bloom_filter_format_ptr_ = std::make_shared(); } @@ -37,6 +39,11 @@ DefaultCodec::GetVectorsFormat() { return vectors_format_ptr_; } +VectorIndexFormatPtr +DefaultCodec::GetVectorIndexFormat() { + return vector_index_format_ptr_; +} + DeletedDocsFormatPtr DefaultCodec::GetDeletedDocsFormat() { return deleted_docs_format_ptr_; diff --git a/core/src/codecs/default/DefaultCodec.h b/core/src/codecs/default/DefaultCodec.h index 4383af30c4..63a25833fb 100644 --- a/core/src/codecs/default/DefaultCodec.h +++ b/core/src/codecs/default/DefaultCodec.h @@ -29,6 +29,9 @@ class DefaultCodec : public Codec { VectorsFormatPtr GetVectorsFormat() override; + VectorIndexFormatPtr + GetVectorIndexFormat() override; + DeletedDocsFormatPtr GetDeletedDocsFormat() override; @@ -37,6 +40,7 @@ class DefaultCodec : public Codec { private: VectorsFormatPtr vectors_format_ptr_; + VectorIndexFormatPtr vector_index_format_ptr_; DeletedDocsFormatPtr deleted_docs_format_ptr_; IdBloomFilterFormatPtr id_bloom_filter_format_ptr_; }; diff --git a/core/src/codecs/default/DefaultVectorIndexFormat.cpp b/core/src/codecs/default/DefaultVectorIndexFormat.cpp new file mode 100644 index 0000000000..3993d503b3 --- /dev/null +++ b/core/src/codecs/default/DefaultVectorIndexFormat.cpp @@ -0,0 +1,178 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include + +#include "codecs/default/DefaultVectorIndexFormat.h" +#include "knowhere/common/BinarySet.h" +#include "knowhere/index/vector_index/VecIndex.h" +#include "knowhere/index/vector_index/VecIndexFactory.h" +#include "segment/VectorIndex.h" +#include "utils/Exception.h" +#include "utils/Log.h" +#include "utils/TimeRecorder.h" + +namespace milvus { +namespace codec { + +knowhere::VecIndexPtr +DefaultVectorIndexFormat::read_internal(const storage::FSHandlerPtr& fs_ptr, const std::string& path) { + milvus::TimeRecorder recorder("read_index"); + knowhere::BinarySet load_data_list; + + recorder.RecordSection("Start"); + fs_ptr->reader_ptr_->open(path); + + size_t length = fs_ptr->reader_ptr_->length(); + if (length <= 0) { + ENGINE_LOG_ERROR << "Invalid vector index length: " << path; + return nullptr; + } + + size_t rp = 0; + fs_ptr->reader_ptr_->seekg(0); + + int32_t current_type = 0; + fs_ptr->reader_ptr_->read(¤t_type, sizeof(current_type)); + rp += sizeof(current_type); + fs_ptr->reader_ptr_->seekg(rp); + + ENGINE_LOG_DEBUG << "Start to read_index(" << path << ") length: " << length << " bytes"; + while (rp < length) { + size_t meta_length; + fs_ptr->reader_ptr_->read(&meta_length, sizeof(meta_length)); + rp += sizeof(meta_length); + fs_ptr->reader_ptr_->seekg(rp); + + auto meta = new char[meta_length]; + fs_ptr->reader_ptr_->read(meta, meta_length); + rp += meta_length; + fs_ptr->reader_ptr_->seekg(rp); + + size_t bin_length; + fs_ptr->reader_ptr_->read(&bin_length, sizeof(bin_length)); + rp += sizeof(bin_length); + fs_ptr->reader_ptr_->seekg(rp); + + auto bin = new uint8_t[bin_length]; + fs_ptr->reader_ptr_->read(bin, bin_length); + rp += bin_length; + fs_ptr->reader_ptr_->seekg(rp); + + std::shared_ptr binptr(bin); + load_data_list.Append(std::string(meta, meta_length), binptr, bin_length); + delete[] meta; + } + fs_ptr->reader_ptr_->close(); + + double span = recorder.RecordSection("End"); + double rate = length * 1000000.0 / span / 1024 / 1024; + ENGINE_LOG_DEBUG << "read_index(" << path << ") rate " << rate << "MB/s"; + + knowhere::VecIndexFactory& vec_index_factory = knowhere::VecIndexFactory::GetInstance(); + auto index = + vec_index_factory.CreateVecIndex(knowhere::OldIndexTypeToStr(current_type), knowhere::IndexMode::MODE_CPU); + if (index != nullptr) { + index->Load(load_data_list); + index->SetIndexSize(length); + } else { + ENGINE_LOG_ERROR << "Fail to create vector index: " << path; + } + + return index; +} + +void +DefaultVectorIndexFormat::read(const storage::FSHandlerPtr& fs_ptr, segment::VectorIndexPtr& vector_index) { + const std::lock_guard lock(mutex_); + + std::string dir_path = fs_ptr->operation_ptr_->GetDirectory(); + if (!boost::filesystem::is_directory(dir_path)) { + std::string err_msg = "Directory: " + dir_path + "does not exist"; + ENGINE_LOG_ERROR << err_msg; + throw Exception(SERVER_INVALID_ARGUMENT, err_msg); + } + + boost::filesystem::path target_path(dir_path); + typedef boost::filesystem::directory_iterator d_it; + d_it it_end; + d_it it(target_path); + + for (; it != it_end; ++it) { + const auto& path = it->path(); + + // if (path.extension().string() == vector_index_extension_) { + /* tmp solution, should be replaced when use .idx as index extension name */ + const std::string& location = path.string(); + if (location.substr(location.length() - 3) == "000") { + knowhere::VecIndexPtr index = read_internal(fs_ptr, location); + vector_index->SetVectorIndex(index); + vector_index->SetName(path.stem().string()); + return; + } + } +} + +std::string +GenerateFileName() { + auto now = std::chrono::system_clock::now(); + auto micros = std::chrono::duration_cast(now.time_since_epoch()).count(); + return std::to_string(micros * 1000); +} + +void +DefaultVectorIndexFormat::write(const storage::FSHandlerPtr& fs_ptr, const segment::VectorIndexPtr& vector_index) { + const std::lock_guard lock(mutex_); + + std::string dir_path = fs_ptr->operation_ptr_->GetDirectory(); + + const std::string index_file_path = dir_path + "/" + GenerateFileName(); + // const std::string index_file_path = dir_path + "/" + vector_index->GetName() + vector_index_extension_; + + milvus::TimeRecorder recorder("write_index"); + + knowhere::VecIndexPtr index = vector_index->GetVectorIndex(); + + auto binaryset = index->Serialize(knowhere::Config()); + int32_t index_type = knowhere::StrToOldIndexType(index->index_type()); + + recorder.RecordSection("Start"); + fs_ptr->writer_ptr_->open(index_file_path); + + fs_ptr->writer_ptr_->write(&index_type, sizeof(index_type)); + + for (auto& iter : binaryset.binary_map_) { + auto meta = iter.first.c_str(); + size_t meta_length = iter.first.length(); + fs_ptr->writer_ptr_->write(&meta_length, sizeof(meta_length)); + fs_ptr->writer_ptr_->write((void*)meta, meta_length); + + auto binary = iter.second; + int64_t binary_length = binary->size; + fs_ptr->writer_ptr_->write(&binary_length, sizeof(binary_length)); + fs_ptr->writer_ptr_->write((void*)binary->data.get(), binary_length); + } + fs_ptr->writer_ptr_->close(); + + double span = recorder.RecordSection("End"); + double rate = fs_ptr->writer_ptr_->length() * 1000000.0 / span / 1024 / 1024; + ENGINE_LOG_DEBUG << "write_index(" << index_file_path << ") rate " << rate << "MB/s"; +} + +} // namespace codec +} // namespace milvus diff --git a/core/src/codecs/default/DefaultVectorIndexFormat.h b/core/src/codecs/default/DefaultVectorIndexFormat.h new file mode 100644 index 0000000000..58c8b39e96 --- /dev/null +++ b/core/src/codecs/default/DefaultVectorIndexFormat.h @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "codecs/VectorIndexFormat.h" + +namespace milvus { +namespace codec { + +class DefaultVectorIndexFormat : public VectorIndexFormat { + public: + DefaultVectorIndexFormat() = default; + + void + read(const storage::FSHandlerPtr& fs_ptr, segment::VectorIndexPtr& vector_index) override; + + void + write(const storage::FSHandlerPtr& fs_ptr, const segment::VectorIndexPtr& vector_index) override; + + // No copy and move + DefaultVectorIndexFormat(const DefaultVectorIndexFormat&) = delete; + DefaultVectorIndexFormat(DefaultVectorIndexFormat&&) = delete; + + DefaultVectorIndexFormat& + operator=(const DefaultVectorIndexFormat&) = delete; + DefaultVectorIndexFormat& + operator=(DefaultVectorIndexFormat&&) = delete; + + private: + knowhere::VecIndexPtr + read_internal(const storage::FSHandlerPtr& fs_ptr, const std::string& path); + + private: + std::mutex mutex_; + + const std::string vector_index_extension_ = ""; +}; + +} // namespace codec +} // namespace milvus diff --git a/core/src/db/engine/ExecutionEngineImpl.cpp b/core/src/db/engine/ExecutionEngineImpl.cpp index 8be6ec8898..f0f3a40429 100644 --- a/core/src/db/engine/ExecutionEngineImpl.cpp +++ b/core/src/db/engine/ExecutionEngineImpl.cpp @@ -22,7 +22,6 @@ #include "cache/GpuCacheMgr.h" #include "config/Config.h" #include "db/Utils.h" -#include "index/archive/VecIndex.h" #include "knowhere/common/Config.h" #include "knowhere/index/vector_index/ConfAdapter.h" #include "knowhere/index/vector_index/ConfAdapterMgr.h" @@ -40,6 +39,8 @@ #include "knowhere/index/vector_index/helpers/IndexParameter.h" #include "metrics/Metrics.h" #include "scheduler/Utils.h" +#include "segment/SegmentReader.h" +#include "segment/SegmentWriter.h" #include "utils/CommonUtil.h" #include "utils/Error.h" #include "utils/Exception.h" @@ -353,7 +354,11 @@ ExecutionEngineImpl::Size() const { Status ExecutionEngineImpl::Serialize() { - auto status = write_index(index_, location_); + std::string segment_dir; + utils::GetParentPath(location_, segment_dir); + auto segment_writer_ptr = std::make_shared(segment_dir); + segment_writer_ptr->SetVectorIndex(index_); + segment_writer_ptr->WriteVectorIndex(); // here we reset index size by file size, // since some index type(such as SQ8) data size become smaller after serialized @@ -362,10 +367,10 @@ ExecutionEngineImpl::Serialize() { if (index_->Size() == 0) { std::string msg = "Failed to serialize file: " + location_ + " reason: out of disk space or memory"; - status = Status(DB_ERROR, msg); + return Status(DB_ERROR, msg); } - return status; + return Status::OK(); } Status @@ -436,7 +441,10 @@ ExecutionEngineImpl::Load(bool to_cache) { ENGINE_LOG_DEBUG << "Finished loading raw data from segment " << segment_dir; } else { try { - index_ = read_index(location_); + segment::SegmentPtr segment_ptr; + segment_reader_ptr->GetSegment(segment_ptr); + auto status = segment_reader_ptr->LoadVectorIndex(segment_ptr->vector_index_ptr_); + index_ = segment_ptr->vector_index_ptr_->GetVectorIndex(); if (index_ == nullptr) { std::string msg = "Failed to load index from " + location_; diff --git a/core/src/index/archive/VecIndex.cpp b/core/src/index/archive/VecIndex.cpp deleted file mode 100644 index 1e7a9e10a0..0000000000 --- a/core/src/index/archive/VecIndex.cpp +++ /dev/null @@ -1,173 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include - -#include "config/Config.h" -#include "index/archive/VecIndex.h" -#include "knowhere/common/Exception.h" -#include "knowhere/index/vector_index/IndexType.h" -#include "knowhere/index/vector_index/VecIndex.h" -#include "knowhere/index/vector_index/VecIndexFactory.h" -#include "storage/disk/DiskIOReader.h" -#include "storage/disk/DiskIOWriter.h" -#include "storage/s3/S3IOReader.h" -#include "storage/s3/S3IOWriter.h" -#include "utils/Log.h" -#include "utils/TimeRecorder.h" - -namespace milvus { -namespace engine { - -knowhere::VecIndexPtr -LoadVecIndex(const knowhere::IndexType& type, const knowhere::BinarySet& index_binary, int64_t size) { - knowhere::VecIndexFactory& vec_index_factory = knowhere::VecIndexFactory::GetInstance(); - auto index = vec_index_factory.CreateVecIndex(type, knowhere::IndexMode::MODE_CPU); - if (index == nullptr) - return nullptr; - // else - index->Load(index_binary); - index->SetIndexSize(size); - return index; -} - -knowhere::VecIndexPtr -read_index(const std::string& location) { - milvus::TimeRecorder recorder("read_index"); - knowhere::BinarySet load_data_list; - - bool s3_enable = false; - milvus::server::Config& config = milvus::server::Config::GetInstance(); - config.GetStorageConfigS3Enable(s3_enable); - - std::shared_ptr reader_ptr; - if (s3_enable) { - reader_ptr = std::make_shared(); - } else { - reader_ptr = std::make_shared(); - } - - recorder.RecordSection("Start"); - reader_ptr->open(location); - - size_t length = reader_ptr->length(); - if (length <= 0) { - STORAGE_LOG_DEBUG << "read_index(" << location << ") failed!"; - return nullptr; - } - - size_t rp = 0; - reader_ptr->seekg(0); - - int32_t current_type = 0; - reader_ptr->read(¤t_type, sizeof(current_type)); - rp += sizeof(current_type); - reader_ptr->seekg(rp); - - STORAGE_LOG_DEBUG << "Start to read_index(" << location << ") length: " << length << " bytes"; - while (rp < length) { - size_t meta_length; - reader_ptr->read(&meta_length, sizeof(meta_length)); - rp += sizeof(meta_length); - reader_ptr->seekg(rp); - - auto meta = new char[meta_length]; - reader_ptr->read(meta, meta_length); - rp += meta_length; - reader_ptr->seekg(rp); - - size_t bin_length; - reader_ptr->read(&bin_length, sizeof(bin_length)); - rp += sizeof(bin_length); - reader_ptr->seekg(rp); - - auto bin = new uint8_t[bin_length]; - reader_ptr->read(bin, bin_length); - rp += bin_length; - reader_ptr->seekg(rp); - - std::shared_ptr binptr(bin); - load_data_list.Append(std::string(meta, meta_length), binptr, bin_length); - delete[] meta; - } - reader_ptr->close(); - - double span = recorder.RecordSection("End"); - double rate = length * 1000000.0 / span / 1024 / 1024; - STORAGE_LOG_DEBUG << "read_index(" << location << ") rate " << rate << "MB/s"; - - return LoadVecIndex(knowhere::OldIndexTypeToStr(current_type), load_data_list, length); -} - -milvus::Status -write_index(knowhere::VecIndexPtr index, const std::string& location) { - try { - milvus::TimeRecorder recorder("write_index"); - - auto binaryset = index->Serialize(knowhere::Config()); - int32_t index_type = knowhere::StrToOldIndexType(index->index_type()); - - bool s3_enable = false; - milvus::server::Config& config = milvus::server::Config::GetInstance(); - config.GetStorageConfigS3Enable(s3_enable); - - std::shared_ptr writer_ptr; - if (s3_enable) { - writer_ptr = std::make_shared(); - } else { - writer_ptr = std::make_shared(); - } - - recorder.RecordSection("Start"); - writer_ptr->open(location); - - writer_ptr->write(&index_type, sizeof(index_type)); - - for (auto& iter : binaryset.binary_map_) { - auto meta = iter.first.c_str(); - size_t meta_length = iter.first.length(); - writer_ptr->write(&meta_length, sizeof(meta_length)); - writer_ptr->write((void*)meta, meta_length); - - auto binary = iter.second; - int64_t binary_length = binary->size; - writer_ptr->write(&binary_length, sizeof(binary_length)); - writer_ptr->write((void*)binary->data.get(), binary_length); - } - writer_ptr->close(); - - double span = recorder.RecordSection("End"); - double rate = writer_ptr->length() * 1000000.0 / span / 1024 / 1024; - STORAGE_LOG_DEBUG << "write_index(" << location << ") rate " << rate << "MB/s"; - } catch (knowhere::KnowhereException& e) { - WRAPPER_LOG_ERROR << e.what(); - return milvus::Status(milvus::KNOWHERE_UNEXPECTED_ERROR, e.what()); - } catch (std::exception& e) { - WRAPPER_LOG_ERROR << e.what(); - std::string estring(e.what()); - if (estring.find("No space left on device") != estring.npos) { - WRAPPER_LOG_ERROR << "No space left on the device"; - return milvus::Status(milvus::KNOWHERE_NO_SPACE, "No space left on the device"); - } else { - return milvus::Status(milvus::KNOWHERE_ERROR, e.what()); - } - } - return milvus::Status::OK(); -} - -} // namespace engine -} // namespace milvus diff --git a/core/src/index/archive/VecIndex.h b/core/src/index/archive/VecIndex.h deleted file mode 100644 index 92037019f1..0000000000 --- a/core/src/index/archive/VecIndex.h +++ /dev/null @@ -1,37 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include - -#include "cache/DataObj.h" -#include "knowhere/common/BinarySet.h" -#include "knowhere/index/vector_index/VecIndex.h" -#include "utils/Status.h" - -namespace milvus { -namespace engine { - -extern milvus::Status -write_index(knowhere::VecIndexPtr index, const std::string& location); - -extern knowhere::VecIndexPtr -read_index(const std::string& location); - -} // namespace engine -} // namespace milvus diff --git a/core/src/index/knowhere/knowhere/index/vector_index/VecIndex.h b/core/src/index/knowhere/knowhere/index/vector_index/VecIndex.h index 3a0d064f03..7eb1218325 100644 --- a/core/src/index/knowhere/knowhere/index/vector_index/VecIndex.h +++ b/core/src/index/knowhere/knowhere/index/vector_index/VecIndex.h @@ -21,7 +21,6 @@ #include "knowhere/common/Typedef.h" #include "knowhere/index/Index.h" #include "knowhere/index/vector_index/IndexType.h" -#include "segment/Types.h" namespace milvus { namespace knowhere { @@ -88,13 +87,13 @@ class VecIndex : public Index { bitset_ = std::move(bitset_ptr); } - const std::vector& + const std::vector& GetUids() const { return uids_; } void - SetUids(std::vector& uids) { + SetUids(std::vector& uids) { uids_.clear(); uids_.swap(uids); } @@ -110,7 +109,7 @@ class VecIndex : public Index { size_t UidsSize() { - return uids_.size() * sizeof(segment::doc_id_t); + return uids_.size() * sizeof(IDType); } virtual int64_t @@ -135,7 +134,7 @@ class VecIndex : public Index { IndexType index_type_ = ""; IndexMode index_mode_ = IndexMode::MODE_CPU; faiss::ConcurrentBitsetPtr bitset_ = nullptr; - std::vector uids_; + std::vector uids_; int64_t index_size_ = -1; }; diff --git a/core/src/segment/SegmentReader.cpp b/core/src/segment/SegmentReader.cpp index 7a311a1307..eddc72be40 100644 --- a/core/src/segment/SegmentReader.cpp +++ b/core/src/segment/SegmentReader.cpp @@ -50,6 +50,7 @@ SegmentReader::Load() { try { fs_ptr_->operation_ptr_->CreateDirectory(); default_codec.GetVectorsFormat()->read(fs_ptr_, segment_ptr_->vectors_ptr_); + // default_codec.GetVectorIndexFormat()->read(fs_ptr_, segment_ptr_->vector_index_ptr_); default_codec.GetDeletedDocsFormat()->read(fs_ptr_, segment_ptr_->deleted_docs_ptr_); } catch (std::exception& e) { return Status(DB_ERROR, e.what()); @@ -91,6 +92,20 @@ SegmentReader::GetSegment(SegmentPtr& segment_ptr) { return Status::OK(); } +Status +SegmentReader::LoadVectorIndex(segment::VectorIndexPtr& vector_index_ptr) { + codec::DefaultCodec default_codec; + try { + fs_ptr_->operation_ptr_->CreateDirectory(); + default_codec.GetVectorIndexFormat()->read(fs_ptr_, vector_index_ptr); + } catch (std::exception& e) { + std::string err_msg = "Failed to load vector index: " + std::string(e.what()); + ENGINE_LOG_ERROR << err_msg; + return Status(DB_ERROR, err_msg); + } + return Status::OK(); +} + Status SegmentReader::LoadBloomFilter(segment::IdBloomFilterPtr& id_bloom_filter_ptr) { codec::DefaultCodec default_codec; diff --git a/core/src/segment/SegmentReader.h b/core/src/segment/SegmentReader.h index 949c714852..816542b62f 100644 --- a/core/src/segment/SegmentReader.h +++ b/core/src/segment/SegmentReader.h @@ -45,6 +45,9 @@ class SegmentReader { Status LoadUids(std::vector& uids); + Status + LoadVectorIndex(segment::VectorIndexPtr& vector_index_ptr); + Status LoadBloomFilter(segment::IdBloomFilterPtr& id_bloom_filter_ptr); diff --git a/core/src/segment/SegmentWriter.cpp b/core/src/segment/SegmentWriter.cpp index 8c2766075e..37e498d968 100644 --- a/core/src/segment/SegmentWriter.cpp +++ b/core/src/segment/SegmentWriter.cpp @@ -49,6 +49,12 @@ SegmentWriter::AddVectors(const std::string& name, const std::vector& d return Status::OK(); } +Status +SegmentWriter::SetVectorIndex(const milvus::knowhere::VecIndexPtr& index) { + segment_ptr_->vector_index_ptr_->SetVectorIndex(index); + return Status::OK(); +} + Status SegmentWriter::Serialize() { auto start = std::chrono::high_resolution_clock::now(); @@ -99,6 +105,20 @@ SegmentWriter::WriteVectors() { return Status::OK(); } +Status +SegmentWriter::WriteVectorIndex() { + codec::DefaultCodec default_codec; + try { + fs_ptr_->operation_ptr_->CreateDirectory(); + default_codec.GetVectorIndexFormat()->write(fs_ptr_, segment_ptr_->vector_index_ptr_); + } catch (std::exception& e) { + std::string err_msg = "Failed to write vector index: " + std::string(e.what()); + ENGINE_LOG_ERROR << err_msg; + return Status(SERVER_WRITE_ERROR, err_msg); + } + return Status::OK(); +} + Status SegmentWriter::WriteBloomFilter() { codec::DefaultCodec default_codec; diff --git a/core/src/segment/SegmentWriter.h b/core/src/segment/SegmentWriter.h index b5d42761a1..d150aceaa2 100644 --- a/core/src/segment/SegmentWriter.h +++ b/core/src/segment/SegmentWriter.h @@ -35,6 +35,9 @@ class SegmentWriter { Status AddVectors(const std::string& name, const std::vector& data, const std::vector& uids); + Status + SetVectorIndex(const knowhere::VecIndexPtr& index); + Status WriteBloomFilter(const IdBloomFilterPtr& bloom_filter_ptr); @@ -59,6 +62,9 @@ class SegmentWriter { size_t VectorCount(); + Status + WriteVectorIndex(); + private: Status WriteVectors(); diff --git a/core/src/segment/Types.h b/core/src/segment/Types.h index b65db7c866..10b3bd3dd7 100644 --- a/core/src/segment/Types.h +++ b/core/src/segment/Types.h @@ -21,6 +21,7 @@ #include "segment/DeletedDocs.h" #include "segment/IdBloomFilter.h" +#include "segment/VectorIndex.h" #include "segment/Vectors.h" namespace milvus { @@ -30,6 +31,7 @@ typedef int64_t doc_id_t; struct Segment { VectorsPtr vectors_ptr_ = std::make_shared(); + VectorIndexPtr vector_index_ptr_ = std::make_shared(); DeletedDocsPtr deleted_docs_ptr_ = nullptr; IdBloomFilterPtr id_bloom_filter_ptr_ = nullptr; }; diff --git a/core/src/segment/VectorIndex.h b/core/src/segment/VectorIndex.h index a266078ddf..1fb388e151 100644 --- a/core/src/segment/VectorIndex.h +++ b/core/src/segment/VectorIndex.h @@ -18,18 +18,38 @@ #pragma once #include - -#include "index/knowhere/knowhere/index/Index.h" +#include +#include "knowhere/index/vector_index/VecIndex.h" namespace milvus { namespace segment { class VectorIndex { public: - explicit VectorIndex(knowhere::IndexPtr index_ptr); + explicit VectorIndex(knowhere::VecIndexPtr index_ptr) : index_ptr_(index_ptr) { + } + + VectorIndex() = default; + + knowhere::VecIndexPtr + GetVectorIndex() const { + return index_ptr_; + } void - Get(knowhere::IndexPtr& index_ptr); + SetVectorIndex(const knowhere::VecIndexPtr& index_ptr) { + index_ptr_ = index_ptr; + } + + void + SetName(const std::string& name) { + name_ = name; + } + + const std::string& + GetName() const { + return name_; + } // No copy and move VectorIndex(const VectorIndex&) = delete; @@ -41,7 +61,8 @@ class VectorIndex { operator=(VectorIndex&&) = delete; private: - knowhere::IndexPtr index_ptr_; + knowhere::VecIndexPtr index_ptr_ = nullptr; + std::string name_; }; using VectorIndexPtr = std::shared_ptr; diff --git a/core/src/storage/disk/DiskIOReader.h b/core/src/storage/disk/DiskIOReader.h index 2fcf52457d..09d2e27955 100644 --- a/core/src/storage/disk/DiskIOReader.h +++ b/core/src/storage/disk/DiskIOReader.h @@ -12,6 +12,7 @@ #pragma once #include +#include #include #include "storage/IOReader.h" @@ -52,5 +53,7 @@ class DiskIOReader : public IOReader { std::fstream fs_; }; +using DiskIOReaderPtr = std::shared_ptr; + } // namespace storage } // namespace milvus diff --git a/core/src/storage/disk/DiskIOWriter.h b/core/src/storage/disk/DiskIOWriter.h index 39c9b5ca68..d803c9dd39 100644 --- a/core/src/storage/disk/DiskIOWriter.h +++ b/core/src/storage/disk/DiskIOWriter.h @@ -12,6 +12,7 @@ #pragma once #include +#include #include #include "storage/IOWriter.h" @@ -50,5 +51,7 @@ class DiskIOWriter : public IOWriter { std::fstream fs_; }; +using DiskIOWriterPtr = std::shared_ptr; + } // namespace storage } // namespace milvus diff --git a/core/src/storage/s3/S3IOReader.h b/core/src/storage/s3/S3IOReader.h index e69a64f969..3311ff1297 100644 --- a/core/src/storage/s3/S3IOReader.h +++ b/core/src/storage/s3/S3IOReader.h @@ -11,6 +11,7 @@ #pragma once +#include #include #include "storage/IOReader.h" @@ -52,5 +53,7 @@ class S3IOReader : public IOReader { size_t pos_; }; +using S3IOReaderPtr = std::shared_ptr; + } // namespace storage } // namespace milvus diff --git a/core/src/storage/s3/S3IOWriter.h b/core/src/storage/s3/S3IOWriter.h index 0b5240d7b1..712e74b722 100644 --- a/core/src/storage/s3/S3IOWriter.h +++ b/core/src/storage/s3/S3IOWriter.h @@ -11,6 +11,7 @@ #pragma once +#include #include #include "storage/IOWriter.h" @@ -49,5 +50,7 @@ class S3IOWriter : public IOWriter { std::string buffer_; }; +using S3IOWriterPtr = std::shared_ptr; + } // namespace storage } // namespace milvus diff --git a/core/unittest/metrics/CMakeLists.txt b/core/unittest/metrics/CMakeLists.txt index 0092be8a16..2e37dca64a 100644 --- a/core/unittest/metrics/CMakeLists.txt +++ b/core/unittest/metrics/CMakeLists.txt @@ -22,6 +22,7 @@ if (MILVUS_WITH_PROMETHEUS) test_prometheus.cpp) endif () +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fno-access-control") add_executable(test_metrics ${common_files} ${test_files} diff --git a/core/unittest/metrics/test_metrics.cpp b/core/unittest/metrics/test_metrics.cpp index cb3fd93fe2..c6f6a05785 100644 --- a/core/unittest/metrics/test_metrics.cpp +++ b/core/unittest/metrics/test_metrics.cpp @@ -18,8 +18,6 @@ #include #include -#define private public - #include "cache/CpuCacheMgr.h" #include "config/Config.h" #include "metrics/utils.h"