From 19052ef3e5b06ce80cf10f2ad82268e3cf8a0597 Mon Sep 17 00:00:00 2001 From: congqixia Date: Fri, 27 Dec 2024 12:20:50 +0800 Subject: [PATCH] enhance: Add buffered writer to reduce fwrite syscall (#38570) Related to previous PR #38157 If mmapped row is too small, frequent fwrite call still cost too much cpu time for context switching. This PR add buffered write to avoid this bad case with extra buffer per variable field. --------- Signed-off-by: Congqi Xia --- internal/core/src/common/File.h | 92 ++++++++++++++++++++++++++++++--- internal/core/src/mmap/Utils.h | 52 ++++++------------- 2 files changed, 101 insertions(+), 43 deletions(-) diff --git a/internal/core/src/common/File.h b/internal/core/src/common/File.h index 4d9b765be3..4015df78e6 100644 --- a/internal/core/src/common/File.h +++ b/internal/core/src/common/File.h @@ -21,6 +21,13 @@ #include namespace milvus { + +#define THROW_FILE_WRITE_ERROR \ + PanicInfo(ErrorCode::FileWriteFailed, \ + fmt::format("write data to file {} failed, error code {}", \ + file_.Path(), \ + strerror(errno))); + class File { public: File(const File& file) = delete; @@ -36,12 +43,26 @@ class File { static File Open(const std::string_view filepath, int flags) { + // using default buf size = 4096 + return Open(filepath, flags, 4096); + } + + static File + Open(const std::string_view filepath, int flags, size_t buf_size) { int fd = open(filepath.data(), flags, S_IRUSR | S_IWUSR); AssertInfo(fd != -1, "failed to create mmap file {}: {}", filepath, strerror(errno)); - return File(fd, std::string(filepath)); + FILE* fs = fdopen(fd, "wb+"); + AssertInfo(fs != nullptr, + "failed to open file {}: {}", + filepath, + strerror(errno)); + auto f = File(fd, fs, std::string(filepath)); + // setup buffer size file stream will use + setvbuf(f.fs_, nullptr, _IOFBF, buf_size); + return f; } int @@ -94,16 +115,71 @@ class File { } private: - explicit File(int fd, const std::string& filepath) - : fd_(fd), filepath_(filepath) { - fs_ = fdopen(fd_, "wb+"); - AssertInfo(fs_ != nullptr, - "failed to open file {}: {}", - filepath, - strerror(errno)); + explicit File(int fd, FILE* fs, const std::string& filepath) + : fd_(fd), filepath_(filepath), fs_(fs) { } int fd_{-1}; FILE* fs_; std::string filepath_; }; + +class BufferedWriter { + public: + // Constructor: Initialize with the file pointer and the buffer size (default 4KB). + explicit BufferedWriter(File& file, size_t buffer_size = 4096) + : file_(file), + buffer_size_(buffer_size), + buffer_(new char[buffer_size]) { + } + + ~BufferedWriter() { + // Ensure the buffer is flushed when the object is destroyed + flush(); + delete[] buffer_; + } + + // Write method to handle data larger than the buffer + void + Write(const void* data, size_t size) { + if (size > buffer_size_) { + flush(); + ssize_t written_data_size = file_.FWrite(data, size); + if (written_data_size != size) { + THROW_FILE_WRITE_ERROR + } + return; + } + + if (buffer_pos_ + size > buffer_size_) { + flush(); + } + + std::memcpy(buffer_ + buffer_pos_, data, size); + buffer_pos_ += size; + } + + template , int> = 0> + void + WriteInt(T value) { + Write(&value, sizeof(value)); + } + + // Flush method: Write the contents of the buffer to the file + void + flush() { + if (buffer_pos_ > 0) { + ssize_t written_data_size = file_.FWrite(buffer_, buffer_pos_); + if (written_data_size != buffer_pos_) { + THROW_FILE_WRITE_ERROR + } + buffer_pos_ = 0; + } + } + + private: + File& file_; // File pointer + size_t buffer_size_; // Size of the internal buffer + char* buffer_; // The buffer itself + size_t buffer_pos_{0}; // Current position in the buffer +}; } // namespace milvus diff --git a/internal/core/src/mmap/Utils.h b/internal/core/src/mmap/Utils.h index dbf815eb49..c93165df31 100644 --- a/internal/core/src/mmap/Utils.h +++ b/internal/core/src/mmap/Utils.h @@ -93,6 +93,9 @@ WriteFieldData(File& file, std::vector>& element_indices, FixedVector& valid_data) { if (IsVariableDataType(data_type)) { + // use buffered writer to reduce fwrite/write syscall + // buffer size = 1024*1024 = 1MB + BufferedWriter bw = BufferedWriter(file, 1048576); switch (data_type) { case DataType::VARCHAR: case DataType::STRING: { @@ -101,17 +104,10 @@ WriteFieldData(File& file, indices.push_back(total_written); auto str = static_cast(data->RawValue(i)); - ssize_t written_data_size = - file.FWriteInt(uint32_t(str->size())); - if (written_data_size != sizeof(uint32_t)) { - THROW_FILE_WRITE_ERROR - } - total_written += written_data_size; - auto written_data = file.FWrite(str->data(), str->size()); - if (written_data < str->size()) { - THROW_FILE_WRITE_ERROR - } - total_written += written_data; + bw.WriteInt(static_cast(str->size())); + total_written += sizeof(uint32_t); + bw.Write(str->data(), str->size()); + total_written += str->size(); } break; } @@ -121,18 +117,11 @@ WriteFieldData(File& file, indices.push_back(total_written); auto padded_string = static_cast(data->RawValue(i))->data(); - ssize_t written_data_size = file.FWriteInt( - uint32_t(padded_string.size())); - if (written_data_size != sizeof(uint32_t)) { - THROW_FILE_WRITE_ERROR - } - total_written += written_data_size; - ssize_t written_data = - file.FWrite(padded_string.data(), padded_string.size()); - if (written_data < padded_string.size()) { - THROW_FILE_WRITE_ERROR - } - total_written += written_data; + bw.WriteInt( + static_cast(padded_string.size())); + total_written += padded_string.size(); + bw.Write(padded_string.data(), padded_string.size()); + total_written += padded_string.size(); } break; } @@ -141,13 +130,9 @@ WriteFieldData(File& file, for (size_t i = 0; i < data->get_num_rows(); ++i) { indices.push_back(total_written); auto array = static_cast(data->RawValue(i)); - ssize_t written = - file.FWrite(array->data(), array->byte_size()); - if (written < array->byte_size()) { - THROW_FILE_WRITE_ERROR - } + bw.Write(array->data(), array->byte_size()); element_indices.emplace_back(array->get_offsets()); - total_written += written; + total_written += array->byte_size(); } break; } @@ -157,12 +142,8 @@ WriteFieldData(File& file, auto vec = static_cast*>( data->RawValue(i)); - ssize_t written = - file.FWrite(vec->data(), vec->data_byte_size()); - if (written < vec->data_byte_size()) { - break; - } - total_written += written; + bw.Write(vec->data(), vec->data_byte_size()); + total_written += vec->data_byte_size(); } break; } @@ -171,6 +152,7 @@ WriteFieldData(File& file, "not supported data type {}", GetDataTypeName(data_type)); } + bw.flush(); } else { // write as: data|data|data|data|data|data...... size_t written = file.FWrite(data->Data(), data->DataSize());