diff --git a/internal/core/src/common/File.h b/internal/core/src/common/File.h index 4d9b765be3..4015df78e6 100644 --- a/internal/core/src/common/File.h +++ b/internal/core/src/common/File.h @@ -21,6 +21,13 @@ #include namespace milvus { + +#define THROW_FILE_WRITE_ERROR \ + PanicInfo(ErrorCode::FileWriteFailed, \ + fmt::format("write data to file {} failed, error code {}", \ + file_.Path(), \ + strerror(errno))); + class File { public: File(const File& file) = delete; @@ -36,12 +43,26 @@ class File { static File Open(const std::string_view filepath, int flags) { + // using default buf size = 4096 + return Open(filepath, flags, 4096); + } + + static File + Open(const std::string_view filepath, int flags, size_t buf_size) { int fd = open(filepath.data(), flags, S_IRUSR | S_IWUSR); AssertInfo(fd != -1, "failed to create mmap file {}: {}", filepath, strerror(errno)); - return File(fd, std::string(filepath)); + FILE* fs = fdopen(fd, "wb+"); + AssertInfo(fs != nullptr, + "failed to open file {}: {}", + filepath, + strerror(errno)); + auto f = File(fd, fs, std::string(filepath)); + // setup buffer size file stream will use + setvbuf(f.fs_, nullptr, _IOFBF, buf_size); + return f; } int @@ -94,16 +115,71 @@ class File { } private: - explicit File(int fd, const std::string& filepath) - : fd_(fd), filepath_(filepath) { - fs_ = fdopen(fd_, "wb+"); - AssertInfo(fs_ != nullptr, - "failed to open file {}: {}", - filepath, - strerror(errno)); + explicit File(int fd, FILE* fs, const std::string& filepath) + : fd_(fd), filepath_(filepath), fs_(fs) { } int fd_{-1}; FILE* fs_; std::string filepath_; }; + +class BufferedWriter { + public: + // Constructor: Initialize with the file pointer and the buffer size (default 4KB). + explicit BufferedWriter(File& file, size_t buffer_size = 4096) + : file_(file), + buffer_size_(buffer_size), + buffer_(new char[buffer_size]) { + } + + ~BufferedWriter() { + // Ensure the buffer is flushed when the object is destroyed + flush(); + delete[] buffer_; + } + + // Write method to handle data larger than the buffer + void + Write(const void* data, size_t size) { + if (size > buffer_size_) { + flush(); + ssize_t written_data_size = file_.FWrite(data, size); + if (written_data_size != size) { + THROW_FILE_WRITE_ERROR + } + return; + } + + if (buffer_pos_ + size > buffer_size_) { + flush(); + } + + std::memcpy(buffer_ + buffer_pos_, data, size); + buffer_pos_ += size; + } + + template , int> = 0> + void + WriteInt(T value) { + Write(&value, sizeof(value)); + } + + // Flush method: Write the contents of the buffer to the file + void + flush() { + if (buffer_pos_ > 0) { + ssize_t written_data_size = file_.FWrite(buffer_, buffer_pos_); + if (written_data_size != buffer_pos_) { + THROW_FILE_WRITE_ERROR + } + buffer_pos_ = 0; + } + } + + private: + File& file_; // File pointer + size_t buffer_size_; // Size of the internal buffer + char* buffer_; // The buffer itself + size_t buffer_pos_{0}; // Current position in the buffer +}; } // namespace milvus diff --git a/internal/core/src/mmap/Utils.h b/internal/core/src/mmap/Utils.h index dbf815eb49..c93165df31 100644 --- a/internal/core/src/mmap/Utils.h +++ b/internal/core/src/mmap/Utils.h @@ -93,6 +93,9 @@ WriteFieldData(File& file, std::vector>& element_indices, FixedVector& valid_data) { if (IsVariableDataType(data_type)) { + // use buffered writer to reduce fwrite/write syscall + // buffer size = 1024*1024 = 1MB + BufferedWriter bw = BufferedWriter(file, 1048576); switch (data_type) { case DataType::VARCHAR: case DataType::STRING: { @@ -101,17 +104,10 @@ WriteFieldData(File& file, indices.push_back(total_written); auto str = static_cast(data->RawValue(i)); - ssize_t written_data_size = - file.FWriteInt(uint32_t(str->size())); - if (written_data_size != sizeof(uint32_t)) { - THROW_FILE_WRITE_ERROR - } - total_written += written_data_size; - auto written_data = file.FWrite(str->data(), str->size()); - if (written_data < str->size()) { - THROW_FILE_WRITE_ERROR - } - total_written += written_data; + bw.WriteInt(static_cast(str->size())); + total_written += sizeof(uint32_t); + bw.Write(str->data(), str->size()); + total_written += str->size(); } break; } @@ -121,18 +117,11 @@ WriteFieldData(File& file, indices.push_back(total_written); auto padded_string = static_cast(data->RawValue(i))->data(); - ssize_t written_data_size = file.FWriteInt( - uint32_t(padded_string.size())); - if (written_data_size != sizeof(uint32_t)) { - THROW_FILE_WRITE_ERROR - } - total_written += written_data_size; - ssize_t written_data = - file.FWrite(padded_string.data(), padded_string.size()); - if (written_data < padded_string.size()) { - THROW_FILE_WRITE_ERROR - } - total_written += written_data; + bw.WriteInt( + static_cast(padded_string.size())); + total_written += padded_string.size(); + bw.Write(padded_string.data(), padded_string.size()); + total_written += padded_string.size(); } break; } @@ -141,13 +130,9 @@ WriteFieldData(File& file, for (size_t i = 0; i < data->get_num_rows(); ++i) { indices.push_back(total_written); auto array = static_cast(data->RawValue(i)); - ssize_t written = - file.FWrite(array->data(), array->byte_size()); - if (written < array->byte_size()) { - THROW_FILE_WRITE_ERROR - } + bw.Write(array->data(), array->byte_size()); element_indices.emplace_back(array->get_offsets()); - total_written += written; + total_written += array->byte_size(); } break; } @@ -157,12 +142,8 @@ WriteFieldData(File& file, auto vec = static_cast*>( data->RawValue(i)); - ssize_t written = - file.FWrite(vec->data(), vec->data_byte_size()); - if (written < vec->data_byte_size()) { - break; - } - total_written += written; + bw.Write(vec->data(), vec->data_byte_size()); + total_written += vec->data_byte_size(); } break; } @@ -171,6 +152,7 @@ WriteFieldData(File& file, "not supported data type {}", GetDataTypeName(data_type)); } + bw.flush(); } else { // write as: data|data|data|data|data|data...... size_t written = file.FWrite(data->Data(), data->DataSize());