From 647c2bca2d0f28ed1ffa1ab2ff816b036c43e85d Mon Sep 17 00:00:00 2001 From: foxspy Date: Fri, 15 Aug 2025 23:41:43 +0800 Subject: [PATCH] enhance: Support streaming read and write of vector index files (#43824) issue: #42032 Signed-off-by: xianliang.li --- internal/core/src/index/VectorDiskIndex.cpp | 10 +- internal/core/src/segcore/DeletedRecord.h | 1 - internal/core/src/storage/ChunkManager.h | 7 ++ .../core/src/storage/DiskFileManagerImpl.cpp | 55 ++++++++++ .../core/src/storage/DiskFileManagerImpl.h | 20 ++++ internal/core/src/storage/FileManager.h | 32 +++++- internal/core/src/storage/LocalChunkManager.h | 5 + .../core/src/storage/MemFileManagerImpl.cpp | 15 +++ .../core/src/storage/MemFileManagerImpl.h | 9 ++ internal/core/src/storage/MinioChunkManager.h | 4 +- .../core/src/storage/RemoteInputStream.cpp | 66 +++++++++++ internal/core/src/storage/RemoteInputStream.h | 49 +++++++++ .../core/src/storage/RemoteOutputStream.cpp | 47 ++++++++ .../core/src/storage/RemoteOutputStream.h | 39 +++++++ internal/core/src/storage/ThreadPool.h | 1 + .../src/storage/azure/AzureChunkManager.h | 4 +- .../GcpNativeChunkManager.h | 4 +- .../src/storage/opendal/OpenDALChunkManager.h | 5 + internal/core/thirdparty/CMakeLists.txt | 3 +- .../core/thirdparty/knowhere/CMakeLists.txt | 2 +- .../thirdparty/milvus-common/CMakeLists.txt | 4 +- .../unittest/test_disk_file_manager_test.cpp | 103 ++++++++++++++++++ internal/core/unittest/test_exec.cpp | 25 +++-- 23 files changed, 480 insertions(+), 30 deletions(-) create mode 100644 internal/core/src/storage/RemoteInputStream.cpp create mode 100644 internal/core/src/storage/RemoteInputStream.h create mode 100644 internal/core/src/storage/RemoteOutputStream.cpp create mode 100644 internal/core/src/storage/RemoteOutputStream.h diff --git a/internal/core/src/index/VectorDiskIndex.cpp b/internal/core/src/index/VectorDiskIndex.cpp index cbff507e43..fad6409dac 100644 --- a/internal/core/src/index/VectorDiskIndex.cpp +++ b/internal/core/src/index/VectorDiskIndex.cpp @@ -27,6 +27,7 @@ #include "common/Consts.h" #include "common/RangeSearchHelper.h" #include "indexbuilder/types.h" +#include "filemanager/FileManager.h" namespace milvus::index { @@ -59,7 +60,7 @@ VectorDiskAnnIndex::VectorDiskAnnIndex( CheckCompatible(version); local_chunk_manager->CreateDir(local_index_path_prefix); auto diskann_index_pack = - knowhere::Pack(std::shared_ptr(file_manager_)); + knowhere::Pack(std::shared_ptr(file_manager_)); auto get_index_obj = knowhere::IndexFactory::Instance().Create( GetIndexType(), version, diskann_index_pack); if (get_index_obj.has_value()) { @@ -96,8 +97,11 @@ VectorDiskAnnIndex::Load(milvus::tracer::TraceContext ctx, GetValueFromConfig>(config, "index_files"); AssertInfo(index_files.has_value(), "index file paths is empty when load disk ann index data"); - file_manager_->CacheIndexToDisk(index_files.value(), - config[milvus::LOAD_PRIORITY]); + // If index is loaded with stream, we don't need to cache index to disk + if (!index_.LoadIndexWithStream()) { + file_manager_->CacheIndexToDisk(index_files.value(), + config[milvus::LOAD_PRIORITY]); + } read_file_span->End(); } diff --git a/internal/core/src/segcore/DeletedRecord.h b/internal/core/src/segcore/DeletedRecord.h index aeee4b3b59..26d16a5250 100644 --- a/internal/core/src/segcore/DeletedRecord.h +++ b/internal/core/src/segcore/DeletedRecord.h @@ -191,7 +191,6 @@ class DeletedRecord { } it++; } - } size_t diff --git a/internal/core/src/storage/ChunkManager.h b/internal/core/src/storage/ChunkManager.h index fa4a39b2ec..e671b668cd 100644 --- a/internal/core/src/storage/ChunkManager.h +++ b/internal/core/src/storage/ChunkManager.h @@ -121,6 +121,13 @@ class ChunkManager { */ virtual std::string GetRootPath() const = 0; + + /** + * @brief Get the Bucket Name + * @return std::string + */ + virtual std::string + GetBucketName() const = 0; }; using ChunkManagerPtr = std::shared_ptr; diff --git a/internal/core/src/storage/DiskFileManagerImpl.cpp b/internal/core/src/storage/DiskFileManagerImpl.cpp index c658049601..8fc57b7c63 100644 --- a/internal/core/src/storage/DiskFileManagerImpl.cpp +++ b/internal/core/src/storage/DiskFileManagerImpl.cpp @@ -47,6 +47,9 @@ #include "storage/Util.h" #include "storage/FileWriter.h" +#include "storage/RemoteOutputStream.h" +#include "storage/RemoteInputStream.h" + namespace milvus::storage { DiskFileManagerImpl::DiskFileManagerImpl( const FileManagerContext& fileManagerContext) @@ -76,6 +79,12 @@ DiskFileManagerImpl::GetRemoteIndexPath(const std::string& file_name, return remote_prefix + "/" + file_name + "_" + std::to_string(slice_num); } +std::string +DiskFileManagerImpl::GetRemoteIndexPathV2(const std::string& file_name) const { + std::string remote_prefix = GetRemoteIndexFilePrefixV2(); + return remote_prefix + "/" + file_name; +} + std::string DiskFileManagerImpl::GetRemoteTextLogPath(const std::string& file_name, int64_t slice_num) const { @@ -145,6 +154,38 @@ DiskFileManagerImpl::AddFileInternal( return true; } // namespace knowhere +std::shared_ptr +DiskFileManagerImpl::OpenInputStream(const std::string& filename) { + auto local_file_name = GetFileName(filename); + auto remote_file_path = GetRemoteIndexPathV2(local_file_name); + + auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance() + .GetArrowFileSystem(); + + auto remote_file = fs->OpenInputFile(remote_file_path); + AssertInfo(remote_file.ok(), "failed to open remote file"); + return std::static_pointer_cast( + std::make_shared( + std::move(remote_file.ValueOrDie()))); +} + +std::shared_ptr +DiskFileManagerImpl::OpenOutputStream(const std::string& filename) { + auto local_file_name = GetFileName(filename); + auto remote_file_path = GetRemoteIndexPathV2(local_file_name); + + auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance() + .GetArrowFileSystem(); + + auto remote_stream = fs->OpenOutputStream(remote_file_path); + AssertInfo(remote_stream.ok(), + "failed to open remote stream, reason: {}", + remote_stream.status().ToString()); + + return std::make_shared( + std::move(remote_stream.ValueOrDie())); +} + bool DiskFileManagerImpl::AddFile(const std::string& file) noexcept { return AddFileInternal(file, @@ -153,6 +194,15 @@ DiskFileManagerImpl::AddFile(const std::string& file) noexcept { }); } +bool +DiskFileManagerImpl::AddFileMeta(const FileMeta& file_meta) { + auto local_file_name = GetFileName(file_meta.file_path); + auto remote_file_path = GetRemoteIndexPathV2(local_file_name); + + remote_paths_to_size_[remote_file_path] = file_meta.file_size; + return true; +} + bool DiskFileManagerImpl::AddJsonKeyIndexLog(const std::string& file) noexcept { return AddFileInternal( @@ -883,4 +933,9 @@ DiskFileManagerImpl::CacheRawDataToDisk(const Config& config); template std::string DiskFileManagerImpl::CacheRawDataToDisk(const Config& config); +std::string +DiskFileManagerImpl::GetRemoteIndexFilePrefixV2() const { + return FileManagerImpl::GetRemoteIndexFilePrefixV2(); +} + } // namespace milvus::storage diff --git a/internal/core/src/storage/DiskFileManagerImpl.h b/internal/core/src/storage/DiskFileManagerImpl.h index a11aed586e..39db9fc7fd 100644 --- a/internal/core/src/storage/DiskFileManagerImpl.h +++ b/internal/core/src/storage/DiskFileManagerImpl.h @@ -43,12 +43,21 @@ class DiskFileManagerImpl : public FileManagerImpl { bool AddFile(const std::string& filename) noexcept override; + bool + AddFileMeta(const FileMeta& file_meta) override; + std::optional IsExisted(const std::string& filename) noexcept override; bool RemoveFile(const std::string& filename) noexcept override; + std::shared_ptr + OpenInputStream(const std::string& filename) override; + + std::shared_ptr + OpenOutputStream(const std::string& filename) override; + public: bool AddTextLog(const std::string& filename) noexcept; @@ -164,6 +173,9 @@ class DiskFileManagerImpl : public FileManagerImpl { std::string GetFileName(const std::string& localfile); + std::string + GetRemoteIndexFilePrefixV2() const override; + private: int64_t GetIndexBuildId() { @@ -173,6 +185,14 @@ class DiskFileManagerImpl : public FileManagerImpl { std::string GetRemoteIndexPath(const std::string& file_name, int64_t slice_num) const; + /** + * @brief Get the Remote Index Path V2 + * @param file_name; v2 will not split the file with slice_num + * @return std::string + */ + std::string + GetRemoteIndexPathV2(const std::string& file_name) const; + std::string GetRemoteTextLogPath(const std::string& file_name, int64_t slice_num) const; diff --git a/internal/core/src/storage/FileManager.h b/internal/core/src/storage/FileManager.h index eeaa11fc4c..43d6b7a6fc 100644 --- a/internal/core/src/storage/FileManager.h +++ b/internal/core/src/storage/FileManager.h @@ -22,11 +22,11 @@ #include "common/Consts.h" #include "boost/filesystem/path.hpp" -#include "knowhere/file_manager.h" #include "log/Log.h" #include "storage/ChunkManager.h" #include "storage/Types.h" #include "milvus-storage/filesystem/fs.h" +#include "filemanager/FileManager.h" namespace milvus::storage { @@ -75,7 +75,7 @@ struct FileManagerContext { return false; #define FILEMANAGER_END } -class FileManagerImpl : public knowhere::FileManager { +class FileManagerImpl : public milvus::FileManager { public: explicit FileManagerImpl(const FieldDataMeta& field_mata, IndexMeta index_meta) @@ -90,7 +90,7 @@ class FileManagerImpl : public knowhere::FileManager { * @return false if any error, or return true. */ virtual bool - LoadFile(const std::string& filename) noexcept = 0; + LoadFile(const std::string& filename) override = 0; /** * @brief Add file to FileManager to manipulate it. @@ -99,7 +99,7 @@ class FileManagerImpl : public knowhere::FileManager { * @return false if any error, or return true. */ virtual bool - AddFile(const std::string& filename) noexcept = 0; + AddFile(const std::string& filename) override = 0; /** * @brief Check if a file exists. @@ -108,7 +108,7 @@ class FileManagerImpl : public knowhere::FileManager { * @return std::nullopt if any error, or return if the file exists. */ virtual std::optional - IsExisted(const std::string& filename) noexcept = 0; + IsExisted(const std::string& filename) override = 0; /** * @brief Delete a file from FileManager. @@ -117,7 +117,16 @@ class FileManagerImpl : public knowhere::FileManager { * @return false if any error, or return true. */ virtual bool - RemoveFile(const std::string& filename) noexcept = 0; + RemoveFile(const std::string& filename) override = 0; + + virtual bool + AddFileMeta(const FileMeta& file_meta) override = 0; + + virtual std::shared_ptr + OpenInputStream(const std::string& filename) override = 0; + + virtual std::shared_ptr + OpenOutputStream(const std::string& filename) override = 0; public: virtual std::string @@ -159,6 +168,17 @@ class FileManagerImpl : public knowhere::FileManager { std::to_string(field_meta_.segment_id); } + virtual std::string + GetRemoteIndexFilePrefixV2() const { + boost::filesystem::path bucket = rcm_->GetBucketName(); + std::string v1_prefix = GetRemoteIndexObjectPrefix(); + if (bucket.empty()) { + return v1_prefix; + } else { + return (bucket / v1_prefix).string(); + } + } + virtual std::string GetRemoteTextLogPrefix() const { boost::filesystem::path prefix = rcm_->GetRootPath(); diff --git a/internal/core/src/storage/LocalChunkManager.h b/internal/core/src/storage/LocalChunkManager.h index de49906853..649bdb8013 100644 --- a/internal/core/src/storage/LocalChunkManager.h +++ b/internal/core/src/storage/LocalChunkManager.h @@ -133,6 +133,11 @@ class LocalChunkManager : public ChunkManager { int64_t GetSizeOfDir(const std::string& dir); + virtual std::string + GetBucketName() const { + return ""; + } + private: std::string path_prefix_; }; diff --git a/internal/core/src/storage/MemFileManagerImpl.cpp b/internal/core/src/storage/MemFileManagerImpl.cpp index 131003a9b4..997e7f9c38 100644 --- a/internal/core/src/storage/MemFileManagerImpl.cpp +++ b/internal/core/src/storage/MemFileManagerImpl.cpp @@ -84,6 +84,21 @@ MemFileManagerImpl::AddBinarySet(const BinarySet& binary_set, return true; } +std::shared_ptr +MemFileManagerImpl::OpenInputStream(const std::string& filename) { + return nullptr; +} + +std::shared_ptr +MemFileManagerImpl::OpenOutputStream(const std::string& filename) { + return nullptr; +} + +bool +MemFileManagerImpl::AddFileMeta(const FileMeta& file_meta) { + return true; +} + bool MemFileManagerImpl::AddFile(const BinarySet& binary_set) { return AddBinarySet(binary_set, GetRemoteIndexObjectPrefix()); diff --git a/internal/core/src/storage/MemFileManagerImpl.h b/internal/core/src/storage/MemFileManagerImpl.h index d554df6b98..5290671fb1 100644 --- a/internal/core/src/storage/MemFileManagerImpl.h +++ b/internal/core/src/storage/MemFileManagerImpl.h @@ -47,6 +47,15 @@ class MemFileManagerImpl : public FileManagerImpl { virtual bool RemoveFile(const std::string& filename) noexcept; + virtual bool + AddFileMeta(const FileMeta& file_meta) override; + + virtual std::shared_ptr + OpenInputStream(const std::string& filename) override; + + virtual std::shared_ptr + OpenOutputStream(const std::string& filename) override; + public: virtual std::string GetName() const { diff --git a/internal/core/src/storage/MinioChunkManager.h b/internal/core/src/storage/MinioChunkManager.h index 5f17026158..223e4b6aaf 100644 --- a/internal/core/src/storage/MinioChunkManager.h +++ b/internal/core/src/storage/MinioChunkManager.h @@ -163,8 +163,8 @@ class MinioChunkManager : public ChunkManager { return remote_root_path_; } - inline std::string - GetBucketName() { + virtual std::string + GetBucketName() const { return default_bucket_name_; } diff --git a/internal/core/src/storage/RemoteInputStream.cpp b/internal/core/src/storage/RemoteInputStream.cpp new file mode 100644 index 0000000000..f9be7781da --- /dev/null +++ b/internal/core/src/storage/RemoteInputStream.cpp @@ -0,0 +1,66 @@ +#include +#include "RemoteInputStream.h" +#include "common/Consts.h" +#include "common/EasyAssert.h" + +namespace milvus::storage { + +RemoteInputStream::RemoteInputStream( + std::shared_ptr&& remote_file) + : remote_file_(std::move(remote_file)) { + auto status = remote_file_->GetSize(); + AssertInfo(status.ok(), "Failed to get size of remote file"); + file_size_ = static_cast(status.ValueOrDie()); +} + +size_t +RemoteInputStream::Read(void* data, size_t size) { + auto status = remote_file_->Read(size, data); + AssertInfo(status.ok(), "Failed to read from input stream"); + return static_cast(status.ValueOrDie()); +} + +size_t +RemoteInputStream::Read(int fd, size_t size) { + size_t read_batch_size = + std::min(size, static_cast(DEFAULT_INDEX_FILE_SLICE_SIZE)); + size_t rest_size = size; + std::vector data(read_batch_size); + + while (rest_size > 0) { + size_t read_size = std::min(rest_size, read_batch_size); + auto status = remote_file_->Read(read_size, data.data()); + AssertInfo(status.ok(), "Failed to read from input stream"); + ssize_t ret = ::write(fd, data.data(), read_size); + AssertInfo(ret == static_cast(read_size), + "Failed to write to file"); + ::fsync(fd); + rest_size -= read_size; + } + return size; +} + +size_t +RemoteInputStream::Tell() const { + auto status = remote_file_->Tell(); + AssertInfo(status.ok(), "Failed to tell input stream"); + return static_cast(status.ValueOrDie()); +} + +bool +RemoteInputStream::Eof() const { + return Tell() >= file_size_; +} + +bool +RemoteInputStream::Seek(int64_t offset) { + auto status = remote_file_->Seek(offset); + return status.ok(); +} + +size_t +RemoteInputStream::Size() const { + return file_size_; +} + +} // namespace milvus::storage \ No newline at end of file diff --git a/internal/core/src/storage/RemoteInputStream.h b/internal/core/src/storage/RemoteInputStream.h new file mode 100644 index 0000000000..159956607a --- /dev/null +++ b/internal/core/src/storage/RemoteInputStream.h @@ -0,0 +1,49 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#pragma once + +#include "filemanager/InputStream.h" +#include "milvus-storage/filesystem/fs.h" + +namespace milvus::storage { + +class RemoteInputStream : public milvus::InputStream { + public: + explicit RemoteInputStream( + std::shared_ptr&& remote_file); + + ~RemoteInputStream() override = default; + + size_t + Size() const override; + + size_t + Read(void* data, size_t size) override; + + size_t + Read(int fd, size_t size) override; + + size_t + Tell() const override; + + bool + Eof() const override; + + bool + Seek(int64_t offset) override; + + private: + size_t file_size_; + std::shared_ptr remote_file_; +}; + +} // namespace milvus::storage \ No newline at end of file diff --git a/internal/core/src/storage/RemoteOutputStream.cpp b/internal/core/src/storage/RemoteOutputStream.cpp new file mode 100644 index 0000000000..f1c3fc171e --- /dev/null +++ b/internal/core/src/storage/RemoteOutputStream.cpp @@ -0,0 +1,47 @@ +#include "RemoteOutputStream.h" +#include +#include +#include "common/Consts.h" +#include "common/EasyAssert.h" + +namespace milvus::storage { + +RemoteOutputStream::RemoteOutputStream( + std::shared_ptr&& output_stream) + : output_stream_(std::move(output_stream)) { +} + +size_t +RemoteOutputStream::Tell() const { + auto status = output_stream_->Tell(); + AssertInfo(status.ok(), "Failed to tell output stream"); + return status.ValueOrDie(); +} + +size_t +RemoteOutputStream::Write(const void* data, size_t size) { + auto status = output_stream_->Write(data, size); + AssertInfo(status.ok(), "Failed to write to output stream"); + return size; +} + +size_t +RemoteOutputStream::Write(int fd, size_t size) { + size_t read_batch_size = + std::min(size, static_cast(DEFAULT_INDEX_FILE_SLICE_SIZE)); + size_t rest_size = size; + std::vector data(read_batch_size); + + while (rest_size > 0) { + size_t read_size = std::min(rest_size, read_batch_size); + auto read_status = ::read(fd, data.data(), read_size); + AssertInfo(read_status == static_cast(read_size), + "Failed to read from file"); + auto write_status = output_stream_->Write(data.data(), read_size); + AssertInfo(write_status.ok(), "Failed to write to output stream"); + rest_size -= read_size; + } + + return size; +} +} // namespace milvus::storage \ No newline at end of file diff --git a/internal/core/src/storage/RemoteOutputStream.h b/internal/core/src/storage/RemoteOutputStream.h new file mode 100644 index 0000000000..528bd81b97 --- /dev/null +++ b/internal/core/src/storage/RemoteOutputStream.h @@ -0,0 +1,39 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#pragma once + +#include "filemanager/OutputStream.h" +#include "milvus-storage/filesystem/fs.h" + +namespace milvus::storage { + +class RemoteOutputStream : public milvus::OutputStream { + public: + explicit RemoteOutputStream( + std::shared_ptr&& output_stream); + + ~RemoteOutputStream() override = default; + + size_t + Tell() const override; + + size_t + Write(const void* data, size_t size) override; + + size_t + Write(int fd, size_t size) override; + + private: + std::shared_ptr output_stream_; +}; + +} // namespace milvus::storage \ No newline at end of file diff --git a/internal/core/src/storage/ThreadPool.h b/internal/core/src/storage/ThreadPool.h index 19b10faddf..bce32f29bb 100644 --- a/internal/core/src/storage/ThreadPool.h +++ b/internal/core/src/storage/ThreadPool.h @@ -24,6 +24,7 @@ #include #include #include +#include #include "SafeQueue.h" #include "log/Log.h" diff --git a/internal/core/src/storage/azure/AzureChunkManager.h b/internal/core/src/storage/azure/AzureChunkManager.h index 34a03567e1..ace89e528a 100644 --- a/internal/core/src/storage/azure/AzureChunkManager.h +++ b/internal/core/src/storage/azure/AzureChunkManager.h @@ -116,8 +116,8 @@ class AzureChunkManager : public ChunkManager { return path_prefix_; } - inline std::string - GetBucketName() { + virtual std::string + GetBucketName() const { return default_bucket_name_; } diff --git a/internal/core/src/storage/gcp-native-storage/GcpNativeChunkManager.h b/internal/core/src/storage/gcp-native-storage/GcpNativeChunkManager.h index 87333f49a0..6689bc84b5 100644 --- a/internal/core/src/storage/gcp-native-storage/GcpNativeChunkManager.h +++ b/internal/core/src/storage/gcp-native-storage/GcpNativeChunkManager.h @@ -108,8 +108,8 @@ class GcpNativeChunkManager : public ChunkManager { } public: - inline std::string - GetBucketName() { + virtual std::string + GetBucketName() const { return default_bucket_name_; } diff --git a/internal/core/src/storage/opendal/OpenDALChunkManager.h b/internal/core/src/storage/opendal/OpenDALChunkManager.h index 9c1db9eae7..84059dfe1c 100644 --- a/internal/core/src/storage/opendal/OpenDALChunkManager.h +++ b/internal/core/src/storage/opendal/OpenDALChunkManager.h @@ -79,6 +79,11 @@ class OpenDALChunkManager : public ChunkManager { return remote_root_path_; } + virtual std::string + GetBucketName() const { + return default_bucket_name_; + } + private: std::string default_bucket_name_; std::string remote_root_path_; diff --git a/internal/core/thirdparty/CMakeLists.txt b/internal/core/thirdparty/CMakeLists.txt index 75948087cb..5f14072cc3 100644 --- a/internal/core/thirdparty/CMakeLists.txt +++ b/internal/core/thirdparty/CMakeLists.txt @@ -29,6 +29,7 @@ set(FETCHCONTENT_QUIET OFF) # Find pthreads set(THREADS_PREFER_PTHREAD_FLAG ON) find_package(Threads REQUIRED) +add_subdirectory(milvus-common) add_subdirectory(knowhere) @@ -46,4 +47,4 @@ if (LINUX) endif() add_subdirectory(milvus-storage) -add_subdirectory(milvus-common) + diff --git a/internal/core/thirdparty/knowhere/CMakeLists.txt b/internal/core/thirdparty/knowhere/CMakeLists.txt index d1d442c491..5ceef2096a 100644 --- a/internal/core/thirdparty/knowhere/CMakeLists.txt +++ b/internal/core/thirdparty/knowhere/CMakeLists.txt @@ -14,7 +14,7 @@ # Update KNOWHERE_VERSION for the first occurrence milvus_add_pkg_config("knowhere") set_property(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} PROPERTY INCLUDE_DIRECTORIES "") -set( KNOWHERE_VERSION v2.6.0 ) +set( KNOWHERE_VERSION v2.6.1-rc ) set( GIT_REPOSITORY "https://github.com/zilliztech/knowhere.git") message(STATUS "Knowhere repo: ${GIT_REPOSITORY}") diff --git a/internal/core/thirdparty/milvus-common/CMakeLists.txt b/internal/core/thirdparty/milvus-common/CMakeLists.txt index 5bf4754609..b6f6de3c71 100644 --- a/internal/core/thirdparty/milvus-common/CMakeLists.txt +++ b/internal/core/thirdparty/milvus-common/CMakeLists.txt @@ -13,8 +13,8 @@ milvus_add_pkg_config("milvus-common") set_property(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} PROPERTY INCLUDE_DIRECTORIES "") -set( MILVUS-COMMON-VERSION be8f319 ) -set( GIT_REPOSITORY "https://github.com/zilliztech/milvus-common.git") +set( MILVUS-COMMON-VERSION 41fa9b1 ) +set( GIT_REPOSITORY "https://github.com/zilliztech/milvus-common.git") message(STATUS "milvus-common repo: ${GIT_REPOSITORY}") message(STATUS "milvus-common version: ${MILVUS-COMMON-VERSION}") diff --git a/internal/core/unittest/test_disk_file_manager_test.cpp b/internal/core/unittest/test_disk_file_manager_test.cpp index 9a8e9b25ba..00a7701324 100644 --- a/internal/core/unittest/test_disk_file_manager_test.cpp +++ b/internal/core/unittest/test_disk_file_manager_test.cpp @@ -118,6 +118,109 @@ TEST_F(DiskAnnFileManagerTest, AddFilePositiveParallel) { } } +TEST_F(DiskAnnFileManagerTest, ReadAndWriteWithStream) { + auto conf = milvus_storage::ArrowFileSystemConfig(); + conf.storage_type = "local"; + conf.root_path = "/tmp"; + milvus_storage::ArrowFileSystemSingleton::GetInstance().Init(conf); + + auto lcm = LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + std::string small_index_file_path = + "/tmp/diskann/index_files/1000/small_index_file"; + std::string large_index_file_path = + "/tmp/diskann/index_files/1000/large_index_file"; + auto exist = lcm->Exist(large_index_file_path); + + std::string index_file_path = "/tmp/diskann/index_files/1000/index_file"; + boost::filesystem::path localPath(index_file_path); + auto local_file_name = localPath.filename().string(); + + EXPECT_EQ(exist, false); + uint64_t large_index_size = 50 << 20; + lcm->CreateFile(large_index_file_path); + std::vector large_data(large_index_size); + for (size_t i = 0; i < large_index_size; i++) { + large_data[i] = i % 255; + } + lcm->Write(large_index_file_path, large_data.data(), large_index_size); + + uint64_t small_index_size = 10 << 20; + lcm->CreateFile(small_index_file_path); + std::vector small_data(small_index_size); + for (size_t i = 0; i < small_index_size; i++) { + small_data[i] = i % 255; + } + lcm->Write(small_index_file_path, small_data.data(), small_index_size); + + // collection_id: 1, partition_id: 2, segment_id: 3 + // field_id: 100, index_build_id: 1000, index_version: 1 + FieldDataMeta filed_data_meta = {1, 2, 3, 100}; + IndexMeta index_meta = {3, 100, 1000, 1, "index"}; + + auto diskAnnFileManager = std::make_shared( + storage::FileManagerContext(filed_data_meta, index_meta, cm_)); + + auto os = diskAnnFileManager->OpenOutputStream(index_file_path); + size_t write_offset = 0; + os->Write(large_index_size); + write_offset += sizeof(large_index_size); + EXPECT_EQ(os->Tell(), write_offset); + os->Write(large_data.data(), large_index_size); + write_offset += large_index_size; + EXPECT_EQ(os->Tell(), write_offset); + os->Write(small_index_size); + write_offset += sizeof(small_index_size); + EXPECT_EQ(os->Tell(), write_offset); + int fd = open(small_index_file_path.c_str(), O_RDONLY); + ASSERT_NE(fd, -1); + os->Write(fd, small_index_size); + write_offset += small_index_size; + close(fd); + EXPECT_EQ(os->Tell(), write_offset); + + auto is = diskAnnFileManager->OpenInputStream(index_file_path); + size_t read_offset = 0; + size_t read_large_index_size; + is->Read(read_large_index_size); + read_offset += sizeof(read_large_index_size); + EXPECT_EQ(read_large_index_size, large_index_size); + EXPECT_EQ(is->Tell(), read_offset); + std::vector read_large_data(read_large_index_size); + is->Read(read_large_data.data(), read_large_index_size); + EXPECT_EQ( + memcmp( + read_large_data.data(), large_data.data(), read_large_index_size), + 0); + read_offset += read_large_index_size; + EXPECT_EQ(is->Tell(), read_offset); + size_t read_small_index_size; + is->Read(read_small_index_size); + read_offset += sizeof(read_small_index_size); + EXPECT_EQ(read_small_index_size, small_index_size); + EXPECT_EQ(is->Tell(), read_offset); + std::string small_index_file_path_read = + "/tmp/diskann/index_files/1000/small_index_file_read"; + lcm->CreateFile(small_index_file_path_read); + int fd_read = open(small_index_file_path_read.c_str(), O_WRONLY); + ASSERT_NE(fd_read, -1); + is->Read(fd_read, small_index_size); + close(fd_read); + std::vector read_small_data(read_small_index_size); + lcm->Read(small_index_file_path_read, + read_small_data.data(), + read_small_index_size); + EXPECT_EQ( + memcmp( + read_small_data.data(), small_data.data(), read_small_index_size), + 0); + read_offset += read_small_index_size; + EXPECT_EQ(is->Tell(), read_offset); + + lcm->Remove(small_index_file_path_read); + lcm->Remove(large_index_file_path); + lcm->Remove(small_index_file_path); +} + int test_worker(string s) { std::cout << s << std::endl; diff --git a/internal/core/unittest/test_exec.cpp b/internal/core/unittest/test_exec.cpp index ca5e40f448..893a4b4908 100644 --- a/internal/core/unittest/test_exec.cpp +++ b/internal/core/unittest/test_exec.cpp @@ -742,7 +742,8 @@ TEST_P(TaskTest, Test_MultiNotEqualConvert) { EXPECT_EQ(inputs.size(), 1); EXPECT_STREQ(inputs[0]->name().c_str(), "PhyLogicalUnaryExpr"); EXPECT_EQ(inputs[0]->GetInputsRef().size(), 1); - EXPECT_STREQ(inputs[0]->GetInputsRef()[0]->name().c_str(), "PhyTermFilterExpr"); + EXPECT_STREQ(inputs[0]->GetInputsRef()[0]->name().c_str(), + "PhyTermFilterExpr"); } { @@ -852,12 +853,15 @@ TEST_P(TaskTest, Test_MultiNotEqualConvert) { EXPECT_EQ(inputs.size(), 2); EXPECT_STREQ(inputs[0]->name().c_str(), "PhyConjunctFilterExpr"); EXPECT_STREQ(inputs[1]->name().c_str(), "PhyLogicalUnaryExpr"); - auto phy_expr1 = std::static_pointer_cast( - inputs[1]); + auto phy_expr1 = + std::static_pointer_cast( + inputs[1]); EXPECT_EQ(phy_expr1->GetInputsRef().size(), 1); - EXPECT_STREQ(phy_expr1->GetInputsRef()[0]->name().c_str(), "PhyTermFilterExpr"); - phy_expr = std::static_pointer_cast( - inputs[0]); + EXPECT_STREQ(phy_expr1->GetInputsRef()[0]->name().c_str(), + "PhyTermFilterExpr"); + phy_expr = + std::static_pointer_cast( + inputs[0]); inputs = phy_expr->GetInputsRef(); EXPECT_EQ(inputs.size(), 2); } @@ -908,12 +912,13 @@ TEST_P(TaskTest, Test_MultiNotEqualConvert) { auto inputs = phy_expr->GetInputsRef(); EXPECT_EQ(inputs.size(), 1); EXPECT_STREQ(inputs[0]->name().c_str(), "PhyLogicalUnaryExpr"); - auto phy_expr1 = std::static_pointer_cast( - inputs[0]); + auto phy_expr1 = + std::static_pointer_cast( + inputs[0]); EXPECT_EQ(phy_expr1->GetInputsRef().size(), 1); - EXPECT_STREQ(phy_expr1->GetInputsRef()[0]->name().c_str(), "PhyTermFilterExpr"); + EXPECT_STREQ(phy_expr1->GetInputsRef()[0]->name().c_str(), + "PhyTermFilterExpr"); } - } TEST_P(TaskTest, Test_MultiInConvert) {