From 9032bb7668cb4e1d214db7ec28f63d48c706eb9f Mon Sep 17 00:00:00 2001 From: sparknack Date: Thu, 6 Nov 2025 10:53:33 +0800 Subject: [PATCH] enhance: unify the aligned buffer for both buffered and direct I/O (#45323) issue: #43040 Signed-off-by: Shawn Wang --- configs/milvus.yaml | 2 +- internal/core/src/monitor/Monitor.cpp | 18 ---- internal/core/src/monitor/Monitor.h | 8 -- internal/core/src/storage/FileWriter.cpp | 102 +++++++++++------------ internal/core/src/storage/FileWriter.h | 29 +++---- pkg/util/paramtable/component_param.go | 2 +- 6 files changed, 63 insertions(+), 98 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index a868c77908..2bce0ec119 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -932,7 +932,7 @@ common: # Currently, only QueryNode uses 'common.diskWrite*' parameters. Support for other components will be added in the future. # The options include 'direct' and 'buffered'. The default value is 'buffered'. diskWriteMode: buffered - # Disk write buffer size in KB, only used when disk write mode is 'direct', default is 64KB. + # Disk write buffer size in KB, used for both 'direct' and 'buffered' modes, default is 64KB. # Current valid range is [4, 65536]. If the value is not aligned to 4KB, it will be rounded up to the nearest multiple of 4KB. diskWriteBufferSizeKb: 64 # This parameter controls the number of writer threads used for disk write operations. The valid range is [0, hardware_concurrency]. diff --git a/internal/core/src/monitor/Monitor.cpp b/internal/core/src/monitor/Monitor.cpp index 8a8a3b2deb..674cfb7b90 100644 --- a/internal/core/src/monitor/Monitor.cpp +++ b/internal/core/src/monitor/Monitor.cpp @@ -300,22 +300,4 @@ DEFINE_PROMETHEUS_GAUGE(internal_cgo_executing_task_total_all, internal_cgo_executing_task_total, {}); -// --- file writer metrics --- - -std::map diskWriteModeBufferedLabel = { - {"mode", "buffered"}}; -std::map diskWriteModeDirectLabel = { - {"mode", "direct"}}; - -DEFINE_PROMETHEUS_COUNTER_FAMILY(disk_write_total_bytes, - "[cpp]disk write total bytes"); -DEFINE_PROMETHEUS_COUNTER(disk_write_total_bytes_buffered, - disk_write_total_bytes, - diskWriteModeBufferedLabel); -DEFINE_PROMETHEUS_COUNTER(disk_write_total_bytes_direct, - disk_write_total_bytes, - diskWriteModeDirectLabel); - -// --- file writer metrics end --- - } // namespace milvus::monitor diff --git a/internal/core/src/monitor/Monitor.h b/internal/core/src/monitor/Monitor.h index b4c3f96890..0366805b9c 100644 --- a/internal/core/src/monitor/Monitor.h +++ b/internal/core/src/monitor/Monitor.h @@ -102,12 +102,4 @@ DECLARE_PROMETHEUS_HISTOGRAM(internal_json_stats_latency_shredding); DECLARE_PROMETHEUS_HISTOGRAM(internal_json_stats_latency_shared); DECLARE_PROMETHEUS_HISTOGRAM(internal_json_stats_latency_load); -// --- file writer metrics --- - -DECLARE_PROMETHEUS_COUNTER_FAMILY(disk_write_total_bytes); -DECLARE_PROMETHEUS_COUNTER(disk_write_total_bytes_buffered); -DECLARE_PROMETHEUS_COUNTER(disk_write_total_bytes_direct); - -// --- file writer metrics end --- - } // namespace milvus::monitor diff --git a/internal/core/src/storage/FileWriter.cpp b/internal/core/src/storage/FileWriter.cpp index dd9317765a..d36f81caba 100644 --- a/internal/core/src/storage/FileWriter.cpp +++ b/internal/core/src/storage/FileWriter.cpp @@ -15,8 +15,8 @@ // limitations under the License. #include +#include #include "folly/futures/Future.h" -#include "monitor/Monitor.h" #include "storage/FileWriter.h" namespace milvus::storage { @@ -27,26 +27,26 @@ FileWriter::FileWriter(std::string filename, io::Priority priority) rate_limiter_(io::WriteRateLimiter::GetInstance()) { auto mode = GetMode(); use_direct_io_ = mode == WriteMode::DIRECT; + use_writer_pool_ = FileWriteWorkerPool::GetInstance().HasPool(); + + // allocate an internal aligned buffer for both modes to batch writes + size_t buf_size = GetBufferSize(); + AssertInfo( + buf_size != 0 && (buf_size % ALIGNMENT_BYTES) == 0, + "Buffer size must be greater than 0 and aligned to the alignment " + "size, buf_size: {}, alignment size: {}", + buf_size, + ALIGNMENT_BYTES); + capacity_ = buf_size; + aligned_buf_ = aligned_alloc(ALIGNMENT_BYTES, capacity_); + if (aligned_buf_ == nullptr) { + ThrowInfo(ErrorCode::MemAllocateFailed, + "Failed to allocate aligned buffer of size {}", + capacity_); + } + auto open_flags = O_CREAT | O_RDWR | O_TRUNC; if (use_direct_io_) { - // check if the file is aligned to the alignment size - size_t buf_size = GetBufferSize(); - AssertInfo( - buf_size != 0 && (buf_size % ALIGNMENT_BYTES) == 0, - "Buffer size must be greater than 0 and aligned to the alignment " - "size, buf_size: {}, alignment size: {}, error: {}", - buf_size, - ALIGNMENT_BYTES, - strerror(errno)); - capacity_ = buf_size; - auto err = posix_memalign(&aligned_buf_, ALIGNMENT_BYTES, capacity_); - if (err != 0) { - aligned_buf_ = nullptr; - ThrowInfo( - ErrorCode::MemAllocateFailed, - "Failed to allocate aligned buffer for direct io, error: {}", - strerror(err)); - } #ifndef __APPLE__ open_flags |= O_DIRECT; #endif @@ -85,7 +85,7 @@ FileWriter::Cleanup() noexcept { close(fd_); fd_ = -1; } - if (use_direct_io_) { + if (aligned_buf_ != nullptr) { free(aligned_buf_); aligned_buf_ = nullptr; } @@ -129,7 +129,7 @@ FileWriter::PositionedWriteWithCheck(const void* data, ++empty_loops; // if the empty loops is too large or the total wait time is too long, we should write the data directly if (empty_loops > MAX_EMPTY_LOOPS || total_wait_us > MAX_WAIT_US) { - allowed_bytes = rate_limiter_.GetBytesPerPeriod(); + allowed_bytes = bytes_to_write; empty_loops = 0; total_wait_us = 0; } else { @@ -154,14 +154,8 @@ FileWriter::PositionedWriteWithCheck(const void* data, } void -FileWriter::WriteWithDirectIO(const void* data, size_t nbyte) { +FileWriter::WriteInternal(const void* data, size_t nbyte) { const char* src = static_cast(data); - // if the data can fit in the aligned buffer, we can just copy it to the aligned buffer - if (offset_ + nbyte <= capacity_) { - memcpy(static_cast(aligned_buf_) + offset_, src, nbyte); - offset_ += nbyte; - return; - } size_t left_size = nbyte; // we should fill and handle the cached aligned buffer first @@ -208,21 +202,20 @@ FileWriter::WriteWithDirectIO(const void* data, size_t nbyte) { } assert(src == static_cast(data) + nbyte); - - milvus::monitor::disk_write_total_bytes_direct.Increment(nbyte); -} - -void -FileWriter::WriteWithBufferedIO(const void* data, size_t nbyte) { - PositionedWriteWithCheck(data, nbyte, file_size_); - file_size_ += nbyte; - milvus::monitor::disk_write_total_bytes_buffered.Increment(nbyte); } void FileWriter::Write(const void* data, size_t nbyte) { - AssertInfo(fd_ != -1, "FileWriter is not initialized or finished"); - if (nbyte == 0) { + // if the data can fit in the aligned buffer, we can just copy it to the aligned buffer + if (nbyte <= capacity_ - offset_) { + const char* src = static_cast(data); + memcpy(static_cast(aligned_buf_) + offset_, src, nbyte); + offset_ += nbyte; + return; + } + + if (!use_writer_pool_) { + WriteInternal(data, nbyte); return; } @@ -230,11 +223,7 @@ FileWriter::Write(const void* data, size_t nbyte) { auto future = promise->getFuture(); auto task = [this, data, nbyte, promise]() { try { - if (use_direct_io_) { - WriteWithDirectIO(data, nbyte); - } else { - WriteWithBufferedIO(data, nbyte); - } + WriteInternal(data, nbyte); promise->setValue(folly::Unit{}); } catch (...) { promise->setException( @@ -242,6 +231,8 @@ FileWriter::Write(const void* data, size_t nbyte) { } }; + // try to add the task to the writer pool + // fallback to write the data directly if the task cannot be added to the pool if (FileWriteWorkerPool::GetInstance().AddTask(task)) { try { future.wait(); @@ -253,11 +244,7 @@ FileWriter::Write(const void* data, size_t nbyte) { e.what()); } } else { - if (use_direct_io_) { - WriteWithDirectIO(data, nbyte); - } else { - WriteWithBufferedIO(data, nbyte); - } + WriteInternal(data, nbyte); } } @@ -270,7 +257,6 @@ FileWriter::FlushWithDirectIO() { nearest_aligned_offset - offset_); PositionedWriteWithCheck(aligned_buf_, nearest_aligned_offset, file_size_); file_size_ += offset_; - milvus::monitor::disk_write_total_bytes_direct.Increment(offset_); // truncate the file to the actual size since the file written by the aligned buffer may be larger than the actual size if (ftruncate(fd_, file_size_) != 0) { Cleanup(); @@ -282,10 +268,18 @@ FileWriter::FlushWithDirectIO() { offset_ = 0; } +void +FileWriter::FlushWithBufferedIO() { + if (offset_ == 0) { + return; + } + PositionedWriteWithCheck(aligned_buf_, offset_, file_size_); + file_size_ += offset_; + offset_ = 0; +} + size_t FileWriter::Finish() { - AssertInfo(fd_ != -1, "FileWriter is not initialized or finished"); - // if the aligned buffer is not empty, we should flush it to the file if (offset_ != 0) { auto promise = std::make_shared>(); @@ -294,6 +288,8 @@ FileWriter::Finish() { try { if (use_direct_io_) { FlushWithDirectIO(); + } else { + FlushWithBufferedIO(); } promise->setValue(folly::Unit{}); } catch (...) { @@ -315,6 +311,8 @@ FileWriter::Finish() { } else { if (use_direct_io_) { FlushWithDirectIO(); + } else { + FlushWithBufferedIO(); } } } diff --git a/internal/core/src/storage/FileWriter.h b/internal/core/src/storage/FileWriter.h index 72c68c2e75..0c27447870 100644 --- a/internal/core/src/storage/FileWriter.h +++ b/internal/core/src/storage/FileWriter.h @@ -24,7 +24,6 @@ #include #include #include -#include #include #include #include @@ -34,7 +33,6 @@ #include "common/EasyAssert.h" #include "log/Log.h" #include "pb/common.pb.h" -#include "storage/ThreadPools.h" namespace milvus::storage { @@ -108,20 +106,10 @@ class WriteRateLimiter { Acquire(size_t bytes, size_t alignment_bytes = 1, Priority priority = Priority::MIDDLE) { - if (static_cast(priority) >= - static_cast(Priority::NR_PRIORITY)) { - ThrowInfo(ErrorCode::InvalidParameter, - "Invalid priority value: {}", - static_cast(priority)); - } // if priority ratio is <= 0, no rate limit is applied, return the original bytes if (priority_ratio_[static_cast(priority)] <= 0) { return bytes; } - AssertInfo(alignment_bytes > 0 && bytes >= alignment_bytes && - (bytes % alignment_bytes == 0), - "alignment_bytes must be positive and bytes must be " - "divisible by alignment_bytes"); std::unique_lock lock(mutex_); // recheck the amplification ratio after taking the lock @@ -263,10 +251,7 @@ class FileWriter { private: void - WriteWithDirectIO(const void* data, size_t nbyte); - - void - WriteWithBufferedIO(const void* data, size_t nbyte); + WriteInternal(const void* data, size_t nbyte); void FlushWithDirectIO(); @@ -289,6 +274,8 @@ class FileWriter { std::string filename_{""}; size_t file_size_{0}; + bool use_writer_pool_{false}; + // for direct io bool use_direct_io_{false}; void* aligned_buf_{nullptr}; @@ -298,8 +285,7 @@ class FileWriter { // for global configuration static WriteMode mode_; // The write mode, which can be 'buffered' (default) or 'direct'. - static size_t - buffer_size_; // The buffer size used for direct I/O, which is only used when the write mode is 'direct'. + static size_t buffer_size_; // for rate limiter io::Priority priority_; @@ -367,6 +353,13 @@ class FileWriteWorkerPool { return true; } + bool + HasPool() const { + // no lock here, so it's not thread-safe + // but it's ok because we still can write without the pool + return executor_ != nullptr; + } + ~FileWriteWorkerPool() { std::lock_guard lock(executor_mutex_); if (executor_ != nullptr) { diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 88683adb6b..c414da5df5 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -700,7 +700,7 @@ The options include 'direct' and 'buffered'. The default value is 'buffered'.`, Key: "common.diskWriteBufferSizeKb", Version: "2.6.0", DefaultValue: "64", - Doc: `Disk write buffer size in KB, only used when disk write mode is 'direct', default is 64KB. + Doc: `Disk write buffer size in KB, used for both 'direct' and 'buffered' modes, default is 64KB. Current valid range is [4, 65536]. If the value is not aligned to 4KB, it will be rounded up to the nearest multiple of 4KB.`, Export: true, }