From 4110e396d31d5604d65a2984c955e57ee6a037e2 Mon Sep 17 00:00:00 2001 From: xige-16 Date: Mon, 21 Aug 2023 14:32:22 +0800 Subject: [PATCH] Fix serialize string index failed when greater than 2G (#26393) Signed-off-by: xige-16 --- internal/core/src/index/Exception.h | 34 +++++++++++++++++++ internal/core/src/index/StringIndexMarisa.cpp | 16 ++------- internal/core/src/index/Utils.cpp | 16 +++++++++ internal/core/src/index/Utils.h | 4 +++ internal/core/unittest/test_utils.cpp | 27 +++++++++++++++ 5 files changed, 84 insertions(+), 13 deletions(-) create mode 100644 internal/core/src/index/Exception.h diff --git a/internal/core/src/index/Exception.h b/internal/core/src/index/Exception.h new file mode 100644 index 0000000000..dde9a9e24b --- /dev/null +++ b/internal/core/src/index/Exception.h @@ -0,0 +1,34 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +namespace milvus::index { + +class UnistdException : public std::runtime_error { + public: + explicit UnistdException(const std::string& msg) : std::runtime_error(msg) { + } + + virtual ~UnistdException() { + } +}; + +} // namespace milvus::index diff --git a/internal/core/src/index/StringIndexMarisa.cpp b/internal/core/src/index/StringIndexMarisa.cpp index 3ac3071d1e..c118208964 100644 --- a/internal/core/src/index/StringIndexMarisa.cpp +++ b/internal/core/src/index/StringIndexMarisa.cpp @@ -24,6 +24,7 @@ #include "index/StringIndexMarisa.h" #include "index/Utils.h" +#include "index/Exception.h" #include "index/Index.h" #include "common/Utils.h" #include "common/Slice.h" @@ -32,14 +33,6 @@ namespace milvus::index { #if defined(__linux__) || defined(__APPLE__) -class UnistdException : public std::runtime_error { - public: - explicit UnistdException(const std::string& msg) : std::runtime_error(msg) { - } - virtual ~UnistdException() { - } -}; - StringIndexMarisa::StringIndexMarisa(storage::FileManagerImplPtr file_manager) { if (file_manager != nullptr) { file_manager_ = std::dynamic_pointer_cast(file_manager); @@ -124,18 +117,15 @@ StringIndexMarisa::Serialize(const Config& config) { auto file = std::string("/tmp/") + uuid_string; auto fd = open(file.c_str(), O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR | S_IXUSR); + AssertInfo(fd != -1, "open file failed"); trie_.write(fd); auto size = get_file_size(fd); auto index_data = std::shared_ptr(new uint8_t[size]); + ReadDataFromFD(fd, index_data.get(), size); - lseek(fd, 0, SEEK_SET); - auto status = read(fd, index_data.get(), size); close(fd); remove(file.c_str()); - if (status != size) { - throw UnistdException("read index from fd error, errorCode is " + std::to_string(status)); - } auto str_ids_len = str_ids_.size() * sizeof(size_t); std::shared_ptr str_ids(new uint8_t[str_ids_len]); diff --git a/internal/core/src/index/Utils.cpp b/internal/core/src/index/Utils.cpp index 791373f5c3..5e63b07209 100644 --- a/internal/core/src/index/Utils.cpp +++ b/internal/core/src/index/Utils.cpp @@ -22,6 +22,7 @@ #include "index/Utils.h" #include "index/Meta.h" +#include "index/Exception.h" #include #include "exceptions/EasyAssert.h" #include "common/Slice.h" @@ -231,4 +232,19 @@ AssembleIndexDatas(std::map& index_datas) { } } +void +ReadDataFromFD(int fd, void* buf, size_t size, size_t chunk_size) { + lseek(fd, 0, SEEK_SET); + while (size != 0) { + const size_t count = (size < chunk_size) ? size : chunk_size; + const ssize_t size_read = read(fd, buf, count); + if (size_read != count) { + throw UnistdException("read data from fd error, returned read size is " + std::to_string(size_read)); + } + + buf = static_cast(buf) + size_read; + size -= static_cast(size_read); + } +} + } // namespace milvus::index diff --git a/internal/core/src/index/Utils.h b/internal/core/src/index/Utils.h index 3a14de9055..afb2a3e6da 100644 --- a/internal/core/src/index/Utils.h +++ b/internal/core/src/index/Utils.h @@ -127,4 +127,8 @@ ParseConfigFromIndexParams(const std::map& index_param void AssembleIndexDatas(std::map& index_datas); +// On Linux, read() (and similar system calls) will transfer at most 0x7ffff000 (2,147,479,552) bytes once +void +ReadDataFromFD(int fd, void* buf, size_t size, size_t chunk_size = 0x7ffff000); + } // namespace milvus::index diff --git a/internal/core/unittest/test_utils.cpp b/internal/core/unittest/test_utils.cpp index 386a16dcd4..08aa84d56d 100644 --- a/internal/core/unittest/test_utils.cpp +++ b/internal/core/unittest/test_utils.cpp @@ -11,10 +11,14 @@ #include #include +#include +#include +#include #include "common/Utils.h" #include "query/Utils.h" #include "test_utils/DataGen.h" +#include "index/Exception.h" TEST(Util, StringMatch) { using namespace milvus; @@ -99,3 +103,26 @@ TEST(Util, OutOfRange) { ASSERT_TRUE(out_of_range(static_cast(std::numeric_limits::max()) + 1)); ASSERT_TRUE(out_of_range(static_cast(std::numeric_limits::min()) - 1)); } + +TEST(Util, read_from_fd) { + auto uuid = boost::uuids::random_generator()(); + auto uuid_string = boost::uuids::to_string(uuid); + auto file = std::string("/tmp/") + uuid_string; + + auto fd = open(file.c_str(), O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR | S_IXUSR); + ASSERT_NE(fd, -1); + size_t data_size = 100 * 1024 * 1024; // 100M + auto index_data = std::shared_ptr(new uint8_t[data_size]); + auto max_loop = size_t(INT_MAX) / data_size + 1; // insert data > 2G + for (int i = 0; i < max_loop; ++i) { + auto size_write = write(fd, index_data.get(), data_size); + ASSERT_GE(size_write, 0); + } + + auto read_buf = std::shared_ptr(new uint8_t[data_size * max_loop]); + EXPECT_NO_THROW(milvus::index::ReadDataFromFD(fd, read_buf.get(), data_size * max_loop)); + + // On Linux, read() (and similar system calls) will transfer at most 0x7ffff000 (2,147,479,552) bytes once + EXPECT_THROW(milvus::index::ReadDataFromFD(fd, read_buf.get(), data_size * max_loop, INT_MAX), + milvus::index::UnistdException); +}