enhance: always use buffered io for high load priority (#45900)

issue: #43040

Signed-off-by: Shawn Wang <shawn.wang@zilliz.com>
This commit is contained in:
sparknack 2025-11-29 00:03:08 +08:00 committed by GitHub
parent ea4bbbda3a
commit 8ef35de7ca
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 42 additions and 16 deletions

View File

@ -53,9 +53,12 @@ class ChunkTarget {
class MmapChunkTarget : public ChunkTarget { class MmapChunkTarget : public ChunkTarget {
public: public:
explicit MmapChunkTarget(std::string file_path, size_t cap) explicit MmapChunkTarget(std::string file_path,
size_t cap,
storage::io::Priority io_prio)
: file_path_(std::move(file_path)), cap_(cap) { : file_path_(std::move(file_path)), cap_(cap) {
file_writer_ = std::make_unique<storage::FileWriter>(file_path_); file_writer_ =
std::make_unique<storage::FileWriter>(file_path_, io_prio);
} }
void void

View File

@ -23,6 +23,7 @@
#include "common/EasyAssert.h" #include "common/EasyAssert.h"
#include "common/Types.h" #include "common/Types.h"
#include "simdjson/padded_string.h" #include "simdjson/padded_string.h"
#include "storage/FileWriter.h"
namespace milvus { namespace milvus {
@ -712,7 +713,8 @@ make_chunk(const FieldMeta& field_meta,
std::unique_ptr<Chunk> std::unique_ptr<Chunk>
create_chunk(const FieldMeta& field_meta, create_chunk(const FieldMeta& field_meta,
const arrow::ArrayVector& array_vec, const arrow::ArrayVector& array_vec,
const std::string& file_path) { const std::string& file_path,
proto::common::LoadPriority load_priority) {
auto cw = create_chunk_writer(field_meta); auto cw = create_chunk_writer(field_meta);
auto [size, row_nums] = cw->calculate_size(array_vec); auto [size, row_nums] = cw->calculate_size(array_vec);
size_t aligned_size = (size + ChunkTarget::ALIGNED_SIZE - 1) & size_t aligned_size = (size + ChunkTarget::ALIGNED_SIZE - 1) &
@ -721,7 +723,9 @@ create_chunk(const FieldMeta& field_meta,
if (file_path.empty()) { if (file_path.empty()) {
target = std::make_shared<MemChunkTarget>(aligned_size); target = std::make_shared<MemChunkTarget>(aligned_size);
} else { } else {
target = std::make_shared<MmapChunkTarget>(file_path, aligned_size); auto io_prio = storage::io::GetPriorityFromLoadPriority(load_priority);
target =
std::make_shared<MmapChunkTarget>(file_path, aligned_size, io_prio);
} }
cw->write_to_target(array_vec, target); cw->write_to_target(array_vec, target);
auto data = target->release(); auto data = target->release();
@ -740,7 +744,8 @@ std::unordered_map<FieldId, std::shared_ptr<Chunk>>
create_group_chunk(const std::vector<FieldId>& field_ids, create_group_chunk(const std::vector<FieldId>& field_ids,
const std::vector<FieldMeta>& field_metas, const std::vector<FieldMeta>& field_metas,
const std::vector<arrow::ArrayVector>& array_vec, const std::vector<arrow::ArrayVector>& array_vec,
const std::string& file_path) { const std::string& file_path,
proto::common::LoadPriority load_priority) {
std::vector<std::shared_ptr<ChunkWriterBase>> cws; std::vector<std::shared_ptr<ChunkWriterBase>> cws;
cws.reserve(field_ids.size()); cws.reserve(field_ids.size());
size_t total_aligned_size = 0, final_row_nums = 0; size_t total_aligned_size = 0, final_row_nums = 0;
@ -776,8 +781,10 @@ create_group_chunk(const std::vector<FieldId>& field_ids,
if (file_path.empty()) { if (file_path.empty()) {
target = std::make_shared<MemChunkTarget>(total_aligned_size); target = std::make_shared<MemChunkTarget>(total_aligned_size);
} else { } else {
target = target = std::make_shared<MmapChunkTarget>(
std::make_shared<MmapChunkTarget>(file_path, total_aligned_size); file_path,
total_aligned_size,
storage::io::GetPriorityFromLoadPriority(load_priority));
} }
for (size_t i = 0; i < field_ids.size(); i++) { for (size_t i = 0; i < field_ids.size(); i++) {
auto start_off = target->tell(); auto start_off = target->tell();

View File

@ -242,13 +242,17 @@ class SparseFloatVectorChunkWriter : public ChunkWriterBase {
std::unique_ptr<Chunk> std::unique_ptr<Chunk>
create_chunk(const FieldMeta& field_meta, create_chunk(const FieldMeta& field_meta,
const arrow::ArrayVector& array_vec, const arrow::ArrayVector& array_vec,
const std::string& file_path = ""); const std::string& file_path = "",
proto::common::LoadPriority load_priority =
proto::common::LoadPriority::HIGH);
std::unordered_map<FieldId, std::shared_ptr<Chunk>> std::unordered_map<FieldId, std::shared_ptr<Chunk>>
create_group_chunk(const std::vector<FieldId>& field_ids, create_group_chunk(const std::vector<FieldId>& field_ids,
const std::vector<FieldMeta>& field_metas, const std::vector<FieldMeta>& field_metas,
const std::vector<arrow::ArrayVector>& array_vec, const std::vector<arrow::ArrayVector>& array_vec,
const std::string& file_path = ""); const std::string& file_path = "",
proto::common::LoadPriority load_priority =
proto::common::LoadPriority::HIGH);
arrow::ArrayVector arrow::ArrayVector
read_single_column_batches(std::shared_ptr<arrow::RecordBatchReader> reader); read_single_column_batches(std::shared_ptr<arrow::RecordBatchReader> reader);

View File

@ -681,7 +681,8 @@ ChunkedSegmentSealedImpl::load_field_data_internal(
LOG_INFO("segment {} submits load field {} task to thread pool", LOG_INFO("segment {} submits load field {} task to thread pool",
this->get_segment_id(), this->get_segment_id(),
field_id.get()); field_id.get());
load_system_field_internal(field_id, field_data_info); load_system_field_internal(
field_id, field_data_info, load_info.load_priority);
LOG_INFO("segment {} loads system field {} mmap false done", LOG_INFO("segment {} loads system field {} mmap false done",
this->get_segment_id(), this->get_segment_id(),
field_id.get()); field_id.get());
@ -720,8 +721,10 @@ ChunkedSegmentSealedImpl::load_field_data_internal(
} }
void void
ChunkedSegmentSealedImpl::load_system_field_internal(FieldId field_id, ChunkedSegmentSealedImpl::load_system_field_internal(
FieldDataInfo& data) { FieldId field_id,
FieldDataInfo& data,
proto::common::LoadPriority load_priority) {
SCOPE_CGO_CALL_METRIC(); SCOPE_CGO_CALL_METRIC();
auto num_rows = data.row_count; auto num_rows = data.row_count;

View File

@ -40,6 +40,7 @@
#include "folly/concurrency/ConcurrentHashMap.h" #include "folly/concurrency/ConcurrentHashMap.h"
#include "index/json_stats/JsonKeyStats.h" #include "index/json_stats/JsonKeyStats.h"
#include "pb/index_cgo_msg.pb.h" #include "pb/index_cgo_msg.pb.h"
#include "pb/common.pb.h"
#include "milvus-storage/reader.h" #include "milvus-storage/reader.h"
namespace milvus::segcore { namespace milvus::segcore {
@ -393,7 +394,10 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
private: private:
void void
load_system_field_internal(FieldId field_id, FieldDataInfo& data); load_system_field_internal(
FieldId field_id,
FieldDataInfo& data,
milvus::proto::common::LoadPriority load_priority);
template <typename PK> template <typename PK>
void void

View File

@ -201,7 +201,8 @@ ChunkTranslator::get_cells(
AssertInfo(popped, "failed to pop arrow reader from channel"); AssertInfo(popped, "failed to pop arrow reader from channel");
arrow::ArrayVector array_vec = arrow::ArrayVector array_vec =
read_single_column_batches(r->reader); read_single_column_batches(r->reader);
chunk = create_chunk(field_meta_, array_vec, filepath.string()); chunk = create_chunk(
field_meta_, array_vec, filepath.string(), load_priority_);
} }
cells.emplace_back(cid, std::move(chunk)); cells.emplace_back(cid, std::move(chunk));
} }

View File

@ -234,7 +234,8 @@ ManifestGroupTranslator::load_group_chunk(
std::filesystem::create_directories(filepath.parent_path()); std::filesystem::create_directories(filepath.parent_path());
chunk = create_chunk(field_meta, array_vec, filepath.string()); chunk = create_chunk(
field_meta, array_vec, filepath.string(), load_priority_);
} }
chunks[fid] = std::move(chunk); chunks[fid] = std::move(chunk);

View File

@ -25,7 +25,10 @@ FileWriter::FileWriter(std::string filename, io::Priority priority)
: filename_(std::move(filename)), : filename_(std::move(filename)),
priority_(priority), priority_(priority),
rate_limiter_(io::WriteRateLimiter::GetInstance()) { rate_limiter_(io::WriteRateLimiter::GetInstance()) {
auto mode = GetMode(); // high priority always use buffered mode, otherwise use the global mode
auto mode =
priority_ == io::Priority::HIGH ? WriteMode::BUFFERED : GetMode();
use_direct_io_ = mode == WriteMode::DIRECT; use_direct_io_ = mode == WriteMode::DIRECT;
use_writer_pool_ = FileWriteWorkerPool::GetInstance().HasPool(); use_writer_pool_ = FileWriteWorkerPool::GetInstance().HasPool();