enhance: unify the aligned buffer for both buffered and direct I/O (#45323)

issue: #43040

Signed-off-by: Shawn Wang <shawn.wang@zilliz.com>
This commit is contained in:
sparknack 2025-11-06 10:53:33 +08:00 committed by GitHub
parent 1cf00c6d32
commit 9032bb7668
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 63 additions and 98 deletions

View File

@ -932,7 +932,7 @@ common:
# Currently, only QueryNode uses 'common.diskWrite*' parameters. Support for other components will be added in the future. # Currently, only QueryNode uses 'common.diskWrite*' parameters. Support for other components will be added in the future.
# The options include 'direct' and 'buffered'. The default value is 'buffered'. # The options include 'direct' and 'buffered'. The default value is 'buffered'.
diskWriteMode: buffered diskWriteMode: buffered
# Disk write buffer size in KB, only used when disk write mode is 'direct', default is 64KB. # Disk write buffer size in KB, used for both 'direct' and 'buffered' modes, default is 64KB.
# Current valid range is [4, 65536]. If the value is not aligned to 4KB, it will be rounded up to the nearest multiple of 4KB. # Current valid range is [4, 65536]. If the value is not aligned to 4KB, it will be rounded up to the nearest multiple of 4KB.
diskWriteBufferSizeKb: 64 diskWriteBufferSizeKb: 64
# This parameter controls the number of writer threads used for disk write operations. The valid range is [0, hardware_concurrency]. # This parameter controls the number of writer threads used for disk write operations. The valid range is [0, hardware_concurrency].

View File

@ -300,22 +300,4 @@ DEFINE_PROMETHEUS_GAUGE(internal_cgo_executing_task_total_all,
internal_cgo_executing_task_total, internal_cgo_executing_task_total,
{}); {});
// --- file writer metrics ---
std::map<std::string, std::string> diskWriteModeBufferedLabel = {
{"mode", "buffered"}};
std::map<std::string, std::string> diskWriteModeDirectLabel = {
{"mode", "direct"}};
DEFINE_PROMETHEUS_COUNTER_FAMILY(disk_write_total_bytes,
"[cpp]disk write total bytes");
DEFINE_PROMETHEUS_COUNTER(disk_write_total_bytes_buffered,
disk_write_total_bytes,
diskWriteModeBufferedLabel);
DEFINE_PROMETHEUS_COUNTER(disk_write_total_bytes_direct,
disk_write_total_bytes,
diskWriteModeDirectLabel);
// --- file writer metrics end ---
} // namespace milvus::monitor } // namespace milvus::monitor

View File

@ -102,12 +102,4 @@ DECLARE_PROMETHEUS_HISTOGRAM(internal_json_stats_latency_shredding);
DECLARE_PROMETHEUS_HISTOGRAM(internal_json_stats_latency_shared); DECLARE_PROMETHEUS_HISTOGRAM(internal_json_stats_latency_shared);
DECLARE_PROMETHEUS_HISTOGRAM(internal_json_stats_latency_load); DECLARE_PROMETHEUS_HISTOGRAM(internal_json_stats_latency_load);
// --- file writer metrics ---
DECLARE_PROMETHEUS_COUNTER_FAMILY(disk_write_total_bytes);
DECLARE_PROMETHEUS_COUNTER(disk_write_total_bytes_buffered);
DECLARE_PROMETHEUS_COUNTER(disk_write_total_bytes_direct);
// --- file writer metrics end ---
} // namespace milvus::monitor } // namespace milvus::monitor

View File

@ -15,8 +15,8 @@
// limitations under the License. // limitations under the License.
#include <utility> #include <utility>
#include <cstdlib>
#include "folly/futures/Future.h" #include "folly/futures/Future.h"
#include "monitor/Monitor.h"
#include "storage/FileWriter.h" #include "storage/FileWriter.h"
namespace milvus::storage { namespace milvus::storage {
@ -27,26 +27,26 @@ FileWriter::FileWriter(std::string filename, io::Priority priority)
rate_limiter_(io::WriteRateLimiter::GetInstance()) { rate_limiter_(io::WriteRateLimiter::GetInstance()) {
auto mode = GetMode(); auto mode = GetMode();
use_direct_io_ = mode == WriteMode::DIRECT; use_direct_io_ = mode == WriteMode::DIRECT;
use_writer_pool_ = FileWriteWorkerPool::GetInstance().HasPool();
// allocate an internal aligned buffer for both modes to batch writes
size_t buf_size = GetBufferSize();
AssertInfo(
buf_size != 0 && (buf_size % ALIGNMENT_BYTES) == 0,
"Buffer size must be greater than 0 and aligned to the alignment "
"size, buf_size: {}, alignment size: {}",
buf_size,
ALIGNMENT_BYTES);
capacity_ = buf_size;
aligned_buf_ = aligned_alloc(ALIGNMENT_BYTES, capacity_);
if (aligned_buf_ == nullptr) {
ThrowInfo(ErrorCode::MemAllocateFailed,
"Failed to allocate aligned buffer of size {}",
capacity_);
}
auto open_flags = O_CREAT | O_RDWR | O_TRUNC; auto open_flags = O_CREAT | O_RDWR | O_TRUNC;
if (use_direct_io_) { if (use_direct_io_) {
// check if the file is aligned to the alignment size
size_t buf_size = GetBufferSize();
AssertInfo(
buf_size != 0 && (buf_size % ALIGNMENT_BYTES) == 0,
"Buffer size must be greater than 0 and aligned to the alignment "
"size, buf_size: {}, alignment size: {}, error: {}",
buf_size,
ALIGNMENT_BYTES,
strerror(errno));
capacity_ = buf_size;
auto err = posix_memalign(&aligned_buf_, ALIGNMENT_BYTES, capacity_);
if (err != 0) {
aligned_buf_ = nullptr;
ThrowInfo(
ErrorCode::MemAllocateFailed,
"Failed to allocate aligned buffer for direct io, error: {}",
strerror(err));
}
#ifndef __APPLE__ #ifndef __APPLE__
open_flags |= O_DIRECT; open_flags |= O_DIRECT;
#endif #endif
@ -85,7 +85,7 @@ FileWriter::Cleanup() noexcept {
close(fd_); close(fd_);
fd_ = -1; fd_ = -1;
} }
if (use_direct_io_) { if (aligned_buf_ != nullptr) {
free(aligned_buf_); free(aligned_buf_);
aligned_buf_ = nullptr; aligned_buf_ = nullptr;
} }
@ -129,7 +129,7 @@ FileWriter::PositionedWriteWithCheck(const void* data,
++empty_loops; ++empty_loops;
// if the empty loops is too large or the total wait time is too long, we should write the data directly // if the empty loops is too large or the total wait time is too long, we should write the data directly
if (empty_loops > MAX_EMPTY_LOOPS || total_wait_us > MAX_WAIT_US) { if (empty_loops > MAX_EMPTY_LOOPS || total_wait_us > MAX_WAIT_US) {
allowed_bytes = rate_limiter_.GetBytesPerPeriod(); allowed_bytes = bytes_to_write;
empty_loops = 0; empty_loops = 0;
total_wait_us = 0; total_wait_us = 0;
} else { } else {
@ -154,14 +154,8 @@ FileWriter::PositionedWriteWithCheck(const void* data,
} }
void void
FileWriter::WriteWithDirectIO(const void* data, size_t nbyte) { FileWriter::WriteInternal(const void* data, size_t nbyte) {
const char* src = static_cast<const char*>(data); const char* src = static_cast<const char*>(data);
// if the data can fit in the aligned buffer, we can just copy it to the aligned buffer
if (offset_ + nbyte <= capacity_) {
memcpy(static_cast<char*>(aligned_buf_) + offset_, src, nbyte);
offset_ += nbyte;
return;
}
size_t left_size = nbyte; size_t left_size = nbyte;
// we should fill and handle the cached aligned buffer first // we should fill and handle the cached aligned buffer first
@ -208,21 +202,20 @@ FileWriter::WriteWithDirectIO(const void* data, size_t nbyte) {
} }
assert(src == static_cast<const char*>(data) + nbyte); assert(src == static_cast<const char*>(data) + nbyte);
milvus::monitor::disk_write_total_bytes_direct.Increment(nbyte);
}
void
FileWriter::WriteWithBufferedIO(const void* data, size_t nbyte) {
PositionedWriteWithCheck(data, nbyte, file_size_);
file_size_ += nbyte;
milvus::monitor::disk_write_total_bytes_buffered.Increment(nbyte);
} }
void void
FileWriter::Write(const void* data, size_t nbyte) { FileWriter::Write(const void* data, size_t nbyte) {
AssertInfo(fd_ != -1, "FileWriter is not initialized or finished"); // if the data can fit in the aligned buffer, we can just copy it to the aligned buffer
if (nbyte == 0) { if (nbyte <= capacity_ - offset_) {
const char* src = static_cast<const char*>(data);
memcpy(static_cast<char*>(aligned_buf_) + offset_, src, nbyte);
offset_ += nbyte;
return;
}
if (!use_writer_pool_) {
WriteInternal(data, nbyte);
return; return;
} }
@ -230,11 +223,7 @@ FileWriter::Write(const void* data, size_t nbyte) {
auto future = promise->getFuture(); auto future = promise->getFuture();
auto task = [this, data, nbyte, promise]() { auto task = [this, data, nbyte, promise]() {
try { try {
if (use_direct_io_) { WriteInternal(data, nbyte);
WriteWithDirectIO(data, nbyte);
} else {
WriteWithBufferedIO(data, nbyte);
}
promise->setValue(folly::Unit{}); promise->setValue(folly::Unit{});
} catch (...) { } catch (...) {
promise->setException( promise->setException(
@ -242,6 +231,8 @@ FileWriter::Write(const void* data, size_t nbyte) {
} }
}; };
// try to add the task to the writer pool
// fallback to write the data directly if the task cannot be added to the pool
if (FileWriteWorkerPool::GetInstance().AddTask(task)) { if (FileWriteWorkerPool::GetInstance().AddTask(task)) {
try { try {
future.wait(); future.wait();
@ -253,11 +244,7 @@ FileWriter::Write(const void* data, size_t nbyte) {
e.what()); e.what());
} }
} else { } else {
if (use_direct_io_) { WriteInternal(data, nbyte);
WriteWithDirectIO(data, nbyte);
} else {
WriteWithBufferedIO(data, nbyte);
}
} }
} }
@ -270,7 +257,6 @@ FileWriter::FlushWithDirectIO() {
nearest_aligned_offset - offset_); nearest_aligned_offset - offset_);
PositionedWriteWithCheck(aligned_buf_, nearest_aligned_offset, file_size_); PositionedWriteWithCheck(aligned_buf_, nearest_aligned_offset, file_size_);
file_size_ += offset_; file_size_ += offset_;
milvus::monitor::disk_write_total_bytes_direct.Increment(offset_);
// truncate the file to the actual size since the file written by the aligned buffer may be larger than the actual size // truncate the file to the actual size since the file written by the aligned buffer may be larger than the actual size
if (ftruncate(fd_, file_size_) != 0) { if (ftruncate(fd_, file_size_) != 0) {
Cleanup(); Cleanup();
@ -282,10 +268,18 @@ FileWriter::FlushWithDirectIO() {
offset_ = 0; offset_ = 0;
} }
void
FileWriter::FlushWithBufferedIO() {
if (offset_ == 0) {
return;
}
PositionedWriteWithCheck(aligned_buf_, offset_, file_size_);
file_size_ += offset_;
offset_ = 0;
}
size_t size_t
FileWriter::Finish() { FileWriter::Finish() {
AssertInfo(fd_ != -1, "FileWriter is not initialized or finished");
// if the aligned buffer is not empty, we should flush it to the file // if the aligned buffer is not empty, we should flush it to the file
if (offset_ != 0) { if (offset_ != 0) {
auto promise = std::make_shared<folly::Promise<folly::Unit>>(); auto promise = std::make_shared<folly::Promise<folly::Unit>>();
@ -294,6 +288,8 @@ FileWriter::Finish() {
try { try {
if (use_direct_io_) { if (use_direct_io_) {
FlushWithDirectIO(); FlushWithDirectIO();
} else {
FlushWithBufferedIO();
} }
promise->setValue(folly::Unit{}); promise->setValue(folly::Unit{});
} catch (...) { } catch (...) {
@ -315,6 +311,8 @@ FileWriter::Finish() {
} else { } else {
if (use_direct_io_) { if (use_direct_io_) {
FlushWithDirectIO(); FlushWithDirectIO();
} else {
FlushWithBufferedIO();
} }
} }
} }

View File

@ -24,7 +24,6 @@
#include <fcntl.h> #include <fcntl.h>
#include <folly/executors/CPUThreadPoolExecutor.h> #include <folly/executors/CPUThreadPoolExecutor.h>
#include <mutex> #include <mutex>
#include <stdexcept>
#include <string> #include <string>
#include <sys/mman.h> #include <sys/mman.h>
#include <sys/stat.h> #include <sys/stat.h>
@ -34,7 +33,6 @@
#include "common/EasyAssert.h" #include "common/EasyAssert.h"
#include "log/Log.h" #include "log/Log.h"
#include "pb/common.pb.h" #include "pb/common.pb.h"
#include "storage/ThreadPools.h"
namespace milvus::storage { namespace milvus::storage {
@ -108,20 +106,10 @@ class WriteRateLimiter {
Acquire(size_t bytes, Acquire(size_t bytes,
size_t alignment_bytes = 1, size_t alignment_bytes = 1,
Priority priority = Priority::MIDDLE) { Priority priority = Priority::MIDDLE) {
if (static_cast<int>(priority) >=
static_cast<int>(Priority::NR_PRIORITY)) {
ThrowInfo(ErrorCode::InvalidParameter,
"Invalid priority value: {}",
static_cast<int>(priority));
}
// if priority ratio is <= 0, no rate limit is applied, return the original bytes // if priority ratio is <= 0, no rate limit is applied, return the original bytes
if (priority_ratio_[static_cast<int>(priority)] <= 0) { if (priority_ratio_[static_cast<int>(priority)] <= 0) {
return bytes; return bytes;
} }
AssertInfo(alignment_bytes > 0 && bytes >= alignment_bytes &&
(bytes % alignment_bytes == 0),
"alignment_bytes must be positive and bytes must be "
"divisible by alignment_bytes");
std::unique_lock<std::mutex> lock(mutex_); std::unique_lock<std::mutex> lock(mutex_);
// recheck the amplification ratio after taking the lock // recheck the amplification ratio after taking the lock
@ -263,10 +251,7 @@ class FileWriter {
private: private:
void void
WriteWithDirectIO(const void* data, size_t nbyte); WriteInternal(const void* data, size_t nbyte);
void
WriteWithBufferedIO(const void* data, size_t nbyte);
void void
FlushWithDirectIO(); FlushWithDirectIO();
@ -289,6 +274,8 @@ class FileWriter {
std::string filename_{""}; std::string filename_{""};
size_t file_size_{0}; size_t file_size_{0};
bool use_writer_pool_{false};
// for direct io // for direct io
bool use_direct_io_{false}; bool use_direct_io_{false};
void* aligned_buf_{nullptr}; void* aligned_buf_{nullptr};
@ -298,8 +285,7 @@ class FileWriter {
// for global configuration // for global configuration
static WriteMode static WriteMode
mode_; // The write mode, which can be 'buffered' (default) or 'direct'. mode_; // The write mode, which can be 'buffered' (default) or 'direct'.
static size_t static size_t buffer_size_;
buffer_size_; // The buffer size used for direct I/O, which is only used when the write mode is 'direct'.
// for rate limiter // for rate limiter
io::Priority priority_; io::Priority priority_;
@ -367,6 +353,13 @@ class FileWriteWorkerPool {
return true; return true;
} }
bool
HasPool() const {
// no lock here, so it's not thread-safe
// but it's ok because we still can write without the pool
return executor_ != nullptr;
}
~FileWriteWorkerPool() { ~FileWriteWorkerPool() {
std::lock_guard<std::mutex> lock(executor_mutex_); std::lock_guard<std::mutex> lock(executor_mutex_);
if (executor_ != nullptr) { if (executor_ != nullptr) {

View File

@ -700,7 +700,7 @@ The options include 'direct' and 'buffered'. The default value is 'buffered'.`,
Key: "common.diskWriteBufferSizeKb", Key: "common.diskWriteBufferSizeKb",
Version: "2.6.0", Version: "2.6.0",
DefaultValue: "64", DefaultValue: "64",
Doc: `Disk write buffer size in KB, only used when disk write mode is 'direct', default is 64KB. Doc: `Disk write buffer size in KB, used for both 'direct' and 'buffered' modes, default is 64KB.
Current valid range is [4, 65536]. If the value is not aligned to 4KB, it will be rounded up to the nearest multiple of 4KB.`, Current valid range is [4, 65536]. If the value is not aligned to 4KB, it will be rounded up to the nearest multiple of 4KB.`,
Export: true, Export: true,
} }