mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 17:48:29 +08:00
issue: #43040 This patch introduces a disk file writer that supports Direct IO. Currently, it is exclusively utilized during the QueryNode load process. Below is its parameters: 1. `common.diskWriteMode` This parameter controls the write mode of the local disk, which is used to write temporary data downloaded from remote storage. Currently, only QueryNode uses 'common.diskWrite*' parameters. Support for other components will be added in the future. The options include 'direct' and 'buffered'. The default value is 'buffered'. 2. `common.diskWriteBufferSizeKb` Disk write buffer size in KB, only used when disk write mode is 'direct', default is 64KB. Current valid range is [4, 65536]. If the value is not aligned to 4KB, it will be rounded up to the nearest multiple of 4KB. 3. `common.diskWriteNumThreads` This parameter controls the number of writer threads used for disk write operations. The valid range is [0, hardware_concurrency]. It is designed to limit the maximum concurrency of disk write operations to reduce the impact on disk read performance. For example, if you want to limit the maximum concurrency of disk write operations to 1, you can set this parameter to 1. The default value is 0, which means the caller will perform write operations directly without using an additional writer thread pool. In this case, the maximum concurrency of disk write operations is determined by the caller's thread pool size. Both parameters can be updated during runtime. --------- Signed-off-by: Shawn Wang <shawn.wang@zilliz.com>
927 lines
33 KiB
C++
927 lines
33 KiB
C++
// Copyright (C) 2019-2025 Zilliz. All rights reserved.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
|
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
|
// or implied. See the License for the specific language governing permissions and limitations under the License
|
|
|
|
#include <gtest/gtest.h>
|
|
#include <gmock/gmock.h>
|
|
#include <filesystem>
|
|
#include <fstream>
|
|
#include <random>
|
|
#include <thread>
|
|
|
|
#include "storage/FileWriter.h"
|
|
|
|
using namespace milvus;
|
|
using namespace milvus::storage;
|
|
|
|
class FileWriterTest : public testing::Test {
|
|
protected:
|
|
void
|
|
SetUp() override {
|
|
test_dir_ = std::filesystem::temp_directory_path() / "file_writer_test";
|
|
std::filesystem::create_directories(test_dir_);
|
|
}
|
|
|
|
void
|
|
TearDown() override {
|
|
std::filesystem::remove_all(test_dir_);
|
|
}
|
|
|
|
std::filesystem::path test_dir_;
|
|
const size_t kBufferSize = 4096; // 4KB buffer size
|
|
};
|
|
|
|
// Test basic file writing functionality with buffered IO
|
|
TEST_F(FileWriterTest, BasicWriteWithBufferedIO) {
|
|
FileWriter::SetMode(FileWriter::WriteMode::BUFFERED);
|
|
|
|
std::string filename = (test_dir_ / "basic_write.txt").string();
|
|
FileWriter writer(filename);
|
|
|
|
std::string test_data = "Hello, World!";
|
|
writer.Write(test_data.data(), test_data.size());
|
|
writer.Finish();
|
|
|
|
// Verify file contents
|
|
std::ifstream file(filename, std::ios::binary);
|
|
std::string content((std::istreambuf_iterator<char>(file)),
|
|
std::istreambuf_iterator<char>());
|
|
EXPECT_EQ(content, test_data);
|
|
}
|
|
|
|
// Test basic file writing functionality with direct IO
|
|
TEST_F(FileWriterTest, BasicWriteWithDirectIO) {
|
|
FileWriter::SetMode(FileWriter::WriteMode::DIRECT);
|
|
FileWriter::SetBufferSize(kBufferSize);
|
|
|
|
std::string filename = (test_dir_ / "basic_write.txt").string();
|
|
FileWriter writer(filename);
|
|
|
|
std::string test_data = "Hello, World!";
|
|
writer.Write(test_data.data(), test_data.size());
|
|
writer.Finish();
|
|
|
|
// Verify file contents
|
|
std::ifstream file(filename, std::ios::binary);
|
|
std::string content((std::istreambuf_iterator<char>(file)),
|
|
std::istreambuf_iterator<char>());
|
|
EXPECT_EQ(content, test_data);
|
|
}
|
|
|
|
// Test writing data with size exactly equal to buffer size
|
|
TEST_F(FileWriterTest, ExactBufferSizeWriteWithDirectIO) {
|
|
FileWriter::SetMode(FileWriter::WriteMode::DIRECT);
|
|
FileWriter::SetBufferSize(kBufferSize);
|
|
|
|
std::string filename = (test_dir_ / "exact_buffer.txt").string();
|
|
FileWriter writer(filename);
|
|
|
|
std::vector<char> exact_buffer_data(kBufferSize);
|
|
std::generate(
|
|
exact_buffer_data.begin(), exact_buffer_data.end(), std::rand);
|
|
|
|
writer.Write(exact_buffer_data.data(), exact_buffer_data.size());
|
|
writer.Finish();
|
|
|
|
std::ifstream file(filename, std::ios::binary);
|
|
std::vector<char> read_data((std::istreambuf_iterator<char>(file)),
|
|
std::istreambuf_iterator<char>());
|
|
EXPECT_EQ(read_data, exact_buffer_data);
|
|
}
|
|
|
|
// Test writing data size with multiple of buffer size
|
|
TEST_F(FileWriterTest, MultipleOfBufferSizeWriteWithDirectIO) {
|
|
FileWriter::SetMode(FileWriter::WriteMode::DIRECT);
|
|
FileWriter::SetBufferSize(kBufferSize);
|
|
|
|
std::string filename = (test_dir_ / "multiple_of_buffer_size.txt").string();
|
|
FileWriter writer(filename);
|
|
|
|
std::vector<char> data(kBufferSize * 5);
|
|
std::generate(data.begin(), data.end(), std::rand);
|
|
writer.Write(data.data(), data.size());
|
|
writer.Finish();
|
|
|
|
// Verify file contents
|
|
std::ifstream file(filename, std::ios::binary);
|
|
std::vector<char> content((std::istreambuf_iterator<char>(file)),
|
|
std::istreambuf_iterator<char>());
|
|
EXPECT_EQ(content, std::vector<char>(data.begin(), data.end()));
|
|
}
|
|
|
|
// Test writing data size with unaligned to buffer size
|
|
TEST_F(FileWriterTest, UnalignedToBufferSizeWriteWithDirectIO) {
|
|
FileWriter::SetMode(FileWriter::WriteMode::DIRECT);
|
|
FileWriter::SetBufferSize(kBufferSize);
|
|
|
|
std::string filename =
|
|
(test_dir_ / "unaligned_to_buffer_size.txt").string();
|
|
FileWriter writer(filename);
|
|
|
|
std::vector<char> large_data(kBufferSize * 2 + 17);
|
|
std::generate(large_data.begin(), large_data.end(), std::rand);
|
|
writer.Write(large_data.data(), large_data.size());
|
|
writer.Finish();
|
|
|
|
// Verify file contents
|
|
std::ifstream file(filename, std::ios::binary);
|
|
std::vector<char> content((std::istreambuf_iterator<char>(file)),
|
|
std::istreambuf_iterator<char>());
|
|
EXPECT_EQ(content, std::vector<char>(large_data.begin(), large_data.end()));
|
|
}
|
|
|
|
// Test writing data with direct IO without finishing
|
|
TEST_F(FileWriterTest, WriteWithoutFinishWithDirectIO) {
|
|
FileWriter::SetMode(FileWriter::WriteMode::DIRECT);
|
|
FileWriter::SetBufferSize(kBufferSize);
|
|
|
|
std::vector<char> data(kBufferSize * 2 + 10);
|
|
std::generate(data.begin(), data.end(), std::rand);
|
|
std::string filename = (test_dir_ / "write_without_finish.txt").string();
|
|
{
|
|
FileWriter writer(filename);
|
|
writer.Write(data.data(), data.size());
|
|
}
|
|
|
|
std::ifstream file(filename, std::ios::binary);
|
|
std::vector<char> content((std::istreambuf_iterator<char>(file)),
|
|
std::istreambuf_iterator<char>());
|
|
|
|
EXPECT_NE(content.size(), data.size());
|
|
EXPECT_EQ(content.size(), kBufferSize * 2);
|
|
EXPECT_NE(content, std::vector<char>(data.begin(), data.end()));
|
|
EXPECT_EQ(content, std::vector<char>(data.begin(), data.begin() + kBufferSize * 2));
|
|
}
|
|
|
|
// Test writing data with size slightly less than buffer size
|
|
TEST_F(FileWriterTest, SlightlyLessThanBufferSizeWriteWithDirectIO) {
|
|
FileWriter::SetMode(FileWriter::WriteMode::DIRECT);
|
|
FileWriter::SetBufferSize(kBufferSize);
|
|
|
|
std::string filename = (test_dir_ / "slightly_less.txt").string();
|
|
FileWriter writer(filename);
|
|
|
|
std::vector<char> data(kBufferSize - 1);
|
|
std::generate(data.begin(), data.end(), std::rand);
|
|
|
|
writer.Write(data.data(), data.size());
|
|
writer.Finish();
|
|
|
|
std::ifstream file(filename, std::ios::binary);
|
|
std::vector<char> read_data((std::istreambuf_iterator<char>(file)),
|
|
std::istreambuf_iterator<char>());
|
|
EXPECT_EQ(read_data, data);
|
|
}
|
|
|
|
// Test writing data with multiple small chunks with direct IO
|
|
TEST_F(FileWriterTest, MultipleSmallChunksWriteWithBufferedIO) {
|
|
FileWriter::SetMode(FileWriter::WriteMode::BUFFERED);
|
|
|
|
std::string filename =
|
|
(test_dir_ / "multiple_small_chunks_buffered.txt").string();
|
|
FileWriter writer(filename);
|
|
|
|
const int num_chunks = 100;
|
|
const size_t chunk_size = 10; // 10 bytes per chunk
|
|
std::vector<std::vector<char>> chunks(num_chunks,
|
|
std::vector<char>(chunk_size));
|
|
|
|
for (auto& chunk : chunks) {
|
|
std::generate(chunk.begin(), chunk.end(), std::rand);
|
|
writer.Write(chunk.data(), chunk.size());
|
|
}
|
|
writer.Finish();
|
|
|
|
std::ifstream file(filename, std::ios::binary);
|
|
std::vector<char> read_data((std::istreambuf_iterator<char>(file)),
|
|
std::istreambuf_iterator<char>());
|
|
|
|
std::vector<char> expected_data;
|
|
for (const auto& chunk : chunks) {
|
|
expected_data.insert(expected_data.end(), chunk.begin(), chunk.end());
|
|
}
|
|
EXPECT_EQ(read_data, expected_data);
|
|
}
|
|
|
|
// Test writing data with multiple small chunks with direct IO
|
|
TEST_F(FileWriterTest, MultipleSmallChunksWriteWithDirectIO) {
|
|
FileWriter::SetMode(FileWriter::WriteMode::DIRECT);
|
|
FileWriter::SetBufferSize(kBufferSize);
|
|
|
|
std::string filename =
|
|
(test_dir_ / "multiple_small_chunks_direct.txt").string();
|
|
FileWriter writer(filename);
|
|
|
|
const int num_chunks = 100;
|
|
const size_t chunk_size = 10; // 10 bytes per chunk
|
|
std::vector<std::vector<char>> chunks(num_chunks,
|
|
std::vector<char>(chunk_size));
|
|
|
|
for (auto& chunk : chunks) {
|
|
std::generate(chunk.begin(), chunk.end(), std::rand);
|
|
writer.Write(chunk.data(), chunk.size());
|
|
}
|
|
writer.Finish();
|
|
|
|
std::ifstream file(filename, std::ios::binary);
|
|
std::vector<char> read_data((std::istreambuf_iterator<char>(file)),
|
|
std::istreambuf_iterator<char>());
|
|
|
|
std::vector<char> expected_data;
|
|
for (const auto& chunk : chunks) {
|
|
expected_data.insert(expected_data.end(), chunk.begin(), chunk.end());
|
|
}
|
|
EXPECT_EQ(read_data, expected_data);
|
|
}
|
|
|
|
// Test writing memory address aligned data
|
|
TEST_F(FileWriterTest, MemoryAddressAlignedDataWriteWithDirectIO) {
|
|
FileWriter::SetMode(FileWriter::WriteMode::DIRECT);
|
|
FileWriter::SetBufferSize(kBufferSize);
|
|
|
|
std::string filename = (test_dir_ / "aligned_write.txt").string();
|
|
FileWriter writer(filename);
|
|
|
|
// Create 4KB aligned data using posix_memalign
|
|
void* aligned_data = nullptr;
|
|
if (posix_memalign(&aligned_data, 4096, kBufferSize) != 0) {
|
|
throw std::runtime_error("Failed to allocate aligned memory");
|
|
}
|
|
std::generate(static_cast<char*>(aligned_data),
|
|
static_cast<char*>(aligned_data) + kBufferSize,
|
|
std::rand);
|
|
|
|
writer.Write(aligned_data, kBufferSize);
|
|
writer.Finish();
|
|
|
|
// Verify file contents
|
|
std::ifstream file(filename, std::ios::binary);
|
|
std::vector<char> read_data((std::istreambuf_iterator<char>(file)),
|
|
std::istreambuf_iterator<char>());
|
|
EXPECT_EQ(
|
|
read_data,
|
|
std::vector<char>(static_cast<char*>(aligned_data),
|
|
static_cast<char*>(aligned_data) + kBufferSize));
|
|
|
|
// Clean up aligned memory
|
|
free(aligned_data);
|
|
}
|
|
|
|
// Test writing empty data
|
|
TEST_F(FileWriterTest, EmptyDataWriteWithDirectIO) {
|
|
FileWriter::SetMode(FileWriter::WriteMode::DIRECT);
|
|
FileWriter::SetBufferSize(kBufferSize);
|
|
|
|
std::string filename = (test_dir_ / "empty_write.txt").string();
|
|
FileWriter writer(filename);
|
|
|
|
writer.Write(nullptr, 0);
|
|
writer.Finish();
|
|
|
|
// Verify file is empty
|
|
std::ifstream file(filename, std::ios::binary);
|
|
EXPECT_EQ(file.tellg(), 0);
|
|
}
|
|
|
|
// Test concurrent writes to different files
|
|
TEST_F(FileWriterTest, ConcurrentWritesWithDirectIO) {
|
|
FileWriter::SetMode(FileWriter::WriteMode::DIRECT);
|
|
FileWriter::SetBufferSize(kBufferSize);
|
|
|
|
const int num_threads = 4;
|
|
std::vector<std::thread> threads;
|
|
std::vector<std::string> filenames;
|
|
|
|
filenames.reserve(num_threads);
|
|
for (int i = 0; i < num_threads; ++i) {
|
|
filenames.emplace_back(
|
|
(test_dir_ / ("concurrent_" + std::to_string(i) + ".txt"))
|
|
.string());
|
|
}
|
|
|
|
threads.reserve(num_threads);
|
|
for (int i = 0; i < num_threads; ++i) {
|
|
threads.emplace_back([&, i]() {
|
|
FileWriter writer(filenames[i]);
|
|
std::string test_data = "Thread " + std::to_string(i) + " data";
|
|
writer.Write(test_data.data(), test_data.size());
|
|
writer.Finish();
|
|
});
|
|
}
|
|
|
|
for (auto& thread : threads) {
|
|
thread.join();
|
|
}
|
|
|
|
// Verify all files
|
|
for (int i = 0; i < num_threads; ++i) {
|
|
std::ifstream file(filenames[i], std::ios::binary);
|
|
std::string content((std::istreambuf_iterator<char>(file)),
|
|
std::istreambuf_iterator<char>());
|
|
EXPECT_EQ(content, "Thread " + std::to_string(i) + " data");
|
|
}
|
|
}
|
|
|
|
// Test error handling for invalid file path
|
|
TEST_F(FileWriterTest, InvalidFilePathWithDirectIO) {
|
|
FileWriter::SetMode(FileWriter::WriteMode::DIRECT);
|
|
FileWriter::SetBufferSize(kBufferSize);
|
|
|
|
std::string invalid_path = "/invalid/path/file.txt";
|
|
EXPECT_THROW(FileWriter writer(invalid_path), std::runtime_error);
|
|
}
|
|
|
|
// Test writing to a file that already exists
|
|
TEST_F(FileWriterTest, ExistingFileWithDirectIO) {
|
|
FileWriter::SetMode(FileWriter::WriteMode::DIRECT);
|
|
FileWriter::SetBufferSize(kBufferSize);
|
|
|
|
std::string filename = (test_dir_ / "existing.txt").string();
|
|
|
|
// Create initial file
|
|
{
|
|
FileWriter writer(filename);
|
|
std::string initial_data = "Initial data";
|
|
writer.Write(initial_data.data(), initial_data.size());
|
|
writer.Finish();
|
|
}
|
|
|
|
// Write to the same file again
|
|
FileWriter writer(filename);
|
|
std::string new_data = "New data";
|
|
writer.Write(new_data.data(), new_data.size());
|
|
writer.Finish();
|
|
|
|
// Verify file contains new data
|
|
std::ifstream file(filename, std::ios::binary);
|
|
std::string content((std::istreambuf_iterator<char>(file)),
|
|
std::istreambuf_iterator<char>());
|
|
EXPECT_EQ(content, new_data);
|
|
}
|
|
|
|
// Test config FileWriterConfig with very small buffer size
|
|
TEST_F(FileWriterTest, SmallBufferSizeWriteWithDirectIO) {
|
|
const size_t small_buffer_size = 64; // 64 bytes
|
|
FileWriter::SetMode(FileWriter::WriteMode::DIRECT);
|
|
FileWriter::SetBufferSize(small_buffer_size);
|
|
auto real_buffer_size = FileWriter::GetBufferSize();
|
|
EXPECT_EQ(real_buffer_size, kBufferSize);
|
|
}
|
|
|
|
// Test config FileWriterConfig with unaligned buffer size
|
|
TEST_F(FileWriterTest, UnalignedBufferSizeWriteWithDirectIO) {
|
|
const size_t unaligned_buffer_size = kBufferSize + 1; // Not aligned to 4KB
|
|
FileWriter::SetMode(FileWriter::WriteMode::DIRECT);
|
|
FileWriter::SetBufferSize(unaligned_buffer_size);
|
|
auto real_buffer_size = FileWriter::GetBufferSize();
|
|
EXPECT_EQ(real_buffer_size, 2 * kBufferSize);
|
|
}
|
|
|
|
// Test config FileWriterConfig with zero buffer size
|
|
TEST_F(FileWriterTest, ZeroBufferSizeWriteWithDirectIO) {
|
|
const size_t zero_buffer_size = 0;
|
|
FileWriter::SetMode(FileWriter::WriteMode::DIRECT);
|
|
FileWriter::SetBufferSize(zero_buffer_size);
|
|
auto real_buffer_size = FileWriter::GetBufferSize();
|
|
EXPECT_EQ(real_buffer_size, kBufferSize);
|
|
}
|
|
|
|
// Test config FileWriterConfig with very large buffer size
|
|
TEST_F(FileWriterTest, LargeBufferSizeWriteWithDirectIO) {
|
|
const size_t large_buffer_size = 1024 * 1024; // 1MB
|
|
FileWriter::SetMode(FileWriter::WriteMode::DIRECT);
|
|
FileWriter::SetBufferSize(large_buffer_size);
|
|
|
|
std::string filename = (test_dir_ / "large_buffer.txt").string();
|
|
FileWriter writer(filename);
|
|
|
|
std::vector<char> data(2 * large_buffer_size + 1);
|
|
std::generate(data.begin(), data.end(), std::rand);
|
|
writer.Write(data.data(), data.size());
|
|
writer.Finish();
|
|
|
|
std::ifstream file(filename, std::ios::binary);
|
|
std::vector<char> read_data((std::istreambuf_iterator<char>(file)),
|
|
std::istreambuf_iterator<char>());
|
|
EXPECT_EQ(read_data, std::vector<char>(data.begin(), data.end()));
|
|
}
|
|
|
|
// Tese config FileWriterConfig with unknown mode
|
|
TEST_F(FileWriterTest, UnknownModeWriteWithDirectIO) {
|
|
uint8_t mode = 2;
|
|
EXPECT_NO_THROW(
|
|
{
|
|
FileWriter::SetMode(
|
|
static_cast<FileWriter::WriteMode>(mode));
|
|
FileWriter::SetBufferSize(kBufferSize);
|
|
});
|
|
}
|
|
|
|
TEST_F(FileWriterTest, HalfAlignedDataWriteWithDirectIO) {
|
|
const size_t aligned_buffer_size = 2 * kBufferSize;
|
|
std::string filename = (test_dir_ / "half_aligned_buffer.txt").string();
|
|
FileWriter writer(filename);
|
|
|
|
char* aligned_buffer = nullptr;
|
|
int ret = posix_memalign(reinterpret_cast<void**>(&aligned_buffer),
|
|
kBufferSize,
|
|
aligned_buffer_size);
|
|
ASSERT_EQ(ret, 0);
|
|
|
|
const size_t first_half_size = kBufferSize / 2;
|
|
const size_t rest_size = aligned_buffer_size - first_half_size;
|
|
writer.Write(aligned_buffer, first_half_size);
|
|
writer.Write(aligned_buffer + first_half_size, rest_size);
|
|
writer.Finish();
|
|
|
|
std::ifstream file(filename, std::ios::binary);
|
|
std::vector<char> read_data((std::istreambuf_iterator<char>(file)),
|
|
std::istreambuf_iterator<char>());
|
|
EXPECT_EQ(read_data,
|
|
std::vector<char>(aligned_buffer,
|
|
aligned_buffer + aligned_buffer_size));
|
|
|
|
free(aligned_buffer);
|
|
}
|
|
|
|
// Test writing data with alternating large and small chunks
|
|
TEST_F(FileWriterTest, AlternatingChunksWriteWithDirectIO) {
|
|
std::string filename = (test_dir_ / "alternating_chunks.txt").string();
|
|
FileWriter writer(filename);
|
|
|
|
const int num_chunks = 10;
|
|
std::vector<std::vector<char>> chunks;
|
|
|
|
for (int i = 0; i < num_chunks; ++i) {
|
|
size_t chunk_size = (i % 2 == 0) ? kBufferSize * 2 : 10;
|
|
std::vector<char> chunk(chunk_size);
|
|
std::generate(chunk.begin(), chunk.end(), std::rand);
|
|
chunks.push_back(chunk);
|
|
writer.Write(chunk.data(), chunk.size());
|
|
}
|
|
writer.Finish();
|
|
|
|
std::ifstream file(filename, std::ios::binary);
|
|
std::vector<char> read_data((std::istreambuf_iterator<char>(file)),
|
|
std::istreambuf_iterator<char>());
|
|
|
|
std::vector<char> expected_data;
|
|
for (const auto& chunk : chunks) {
|
|
expected_data.insert(expected_data.end(), chunk.begin(), chunk.end());
|
|
}
|
|
EXPECT_EQ(read_data, expected_data);
|
|
}
|
|
|
|
// Test writing data with very large file size
|
|
TEST_F(FileWriterTest, VeryLargeFileWriteWithDirectIO) {
|
|
std::string filename = (test_dir_ / "very_large_file.txt").string();
|
|
FileWriter writer(filename);
|
|
|
|
const size_t large_size = 100 * 1024 * 1024; // 100MB
|
|
const size_t alignment = 4096; // 4KB alignment
|
|
char* aligned_data = nullptr;
|
|
ASSERT_EQ(posix_memalign((void**)&aligned_data, alignment, large_size), 0);
|
|
std::generate(aligned_data, aligned_data + large_size, std::rand);
|
|
|
|
writer.Write(aligned_data, large_size);
|
|
writer.Finish();
|
|
|
|
std::ifstream file(filename, std::ios::binary);
|
|
std::vector<char> read_data((std::istreambuf_iterator<char>(file)),
|
|
std::istreambuf_iterator<char>());
|
|
EXPECT_EQ(read_data.size(), large_size);
|
|
EXPECT_EQ(std::memcmp(read_data.data(), aligned_data, large_size), 0);
|
|
|
|
free(aligned_data);
|
|
}
|
|
|
|
// Test writing data with different buffer sizes in the same file
|
|
TEST_F(FileWriterTest, MixedBufferSizesWriteWithDirectIO) {
|
|
std::string filename = (test_dir_ / "mixed_buffer_sizes.txt").string();
|
|
FileWriter writer(filename);
|
|
|
|
std::vector<size_t> chunk_sizes = {
|
|
10, // Very small
|
|
kBufferSize - 1, // Slightly less than buffer
|
|
kBufferSize, // Exact buffer size
|
|
kBufferSize + 1, // Slightly more than buffer
|
|
kBufferSize * 2, // Double buffer size
|
|
kBufferSize * 10 // Much larger than buffer
|
|
};
|
|
|
|
std::vector<std::vector<char>> chunks;
|
|
for (size_t size : chunk_sizes) {
|
|
std::vector<char> chunk(size);
|
|
std::generate(chunk.begin(), chunk.end(), std::rand);
|
|
chunks.push_back(chunk);
|
|
writer.Write(chunk.data(), chunk.size());
|
|
}
|
|
writer.Finish();
|
|
|
|
std::ifstream file(filename, std::ios::binary);
|
|
std::vector<char> read_data((std::istreambuf_iterator<char>(file)),
|
|
std::istreambuf_iterator<char>());
|
|
|
|
std::vector<char> expected_data;
|
|
for (const auto& chunk : chunks) {
|
|
expected_data.insert(expected_data.end(), chunk.begin(), chunk.end());
|
|
}
|
|
EXPECT_EQ(read_data, expected_data);
|
|
}
|
|
|
|
// Test multi-threaded writing to different files
|
|
TEST_F(FileWriterTest, MultiThreadedWriteWithDirectIO) {
|
|
const int num_threads = 4;
|
|
const size_t data_size_per_thread = 50 * 1024 * 1024; // 50MB per thread
|
|
std::vector<std::thread> threads;
|
|
std::vector<std::string> filenames;
|
|
std::vector<std::vector<char>> test_data;
|
|
|
|
// Prepare filenames and test data
|
|
for (int i = 0; i < num_threads; ++i) {
|
|
filenames.push_back(
|
|
(test_dir_ / ("multi_thread_" + std::to_string(i) + ".txt"))
|
|
.string());
|
|
test_data.emplace_back(data_size_per_thread);
|
|
std::generate(test_data[i].begin(), test_data[i].end(), std::rand);
|
|
}
|
|
|
|
// Launch threads
|
|
threads.reserve(num_threads);
|
|
for (int i = 0; i < num_threads; ++i) {
|
|
threads.emplace_back([&, i]() {
|
|
FileWriter writer(filenames[i]);
|
|
writer.Write(test_data[i].data(), test_data[i].size());
|
|
writer.Finish();
|
|
});
|
|
}
|
|
|
|
// Wait for all threads to complete
|
|
for (auto& thread : threads) {
|
|
thread.join();
|
|
}
|
|
|
|
// Verify all files
|
|
for (int i = 0; i < num_threads; ++i) {
|
|
std::ifstream file(filenames[i], std::ios::binary);
|
|
std::vector<char> read_data((std::istreambuf_iterator<char>(file)),
|
|
std::istreambuf_iterator<char>());
|
|
EXPECT_EQ(read_data.size(), data_size_per_thread);
|
|
EXPECT_EQ(read_data, test_data[i]);
|
|
}
|
|
}
|
|
|
|
// Test executor-based asynchronous writes
|
|
TEST_F(FileWriterTest, ExecutorBasedAsyncWrites) {
|
|
// Set up executor with 2 threads
|
|
FileWriteWorkerPool::GetInstance().Configure(2);
|
|
FileWriter::SetMode(FileWriter::WriteMode::BUFFERED);
|
|
|
|
std::string filename = (test_dir_ / "executor_async.txt").string();
|
|
FileWriter writer(filename);
|
|
|
|
// Write multiple chunks
|
|
const int num_chunks = 10;
|
|
const size_t chunk_size = 1024;
|
|
std::vector<std::vector<char>> chunks(num_chunks,
|
|
std::vector<char>(chunk_size));
|
|
|
|
for (auto& chunk : chunks) {
|
|
std::generate(chunk.begin(), chunk.end(), std::rand);
|
|
writer.Write(chunk.data(), chunk.size());
|
|
}
|
|
writer.Finish();
|
|
|
|
// Verify file contents
|
|
std::ifstream file(filename, std::ios::binary);
|
|
std::vector<char> read_data((std::istreambuf_iterator<char>(file)),
|
|
std::istreambuf_iterator<char>());
|
|
|
|
std::vector<char> expected_data;
|
|
for (const auto& chunk : chunks) {
|
|
expected_data.insert(expected_data.end(), chunk.begin(), chunk.end());
|
|
}
|
|
EXPECT_EQ(read_data, expected_data);
|
|
}
|
|
|
|
// Test executor-based asynchronous writes with direct IO
|
|
TEST_F(FileWriterTest, ExecutorBasedAsyncWritesWithDirectIO) {
|
|
// Set up executor with 2 threads
|
|
FileWriteWorkerPool::GetInstance().Configure(2);
|
|
FileWriter::SetMode(FileWriter::WriteMode::DIRECT);
|
|
FileWriter::SetBufferSize(kBufferSize);
|
|
|
|
std::string filename = (test_dir_ / "executor_async_direct.txt").string();
|
|
FileWriter writer(filename);
|
|
|
|
// Write multiple chunks asynchronously
|
|
const int num_chunks = 10;
|
|
const size_t chunk_size = kBufferSize;
|
|
std::vector<std::vector<char>> chunks(num_chunks,
|
|
std::vector<char>(chunk_size));
|
|
|
|
for (auto& chunk : chunks) {
|
|
std::generate(chunk.begin(), chunk.end(), std::rand);
|
|
writer.Write(chunk.data(), chunk.size());
|
|
}
|
|
writer.Finish();
|
|
|
|
// Verify file contents
|
|
std::ifstream file(filename, std::ios::binary);
|
|
std::vector<char> read_data((std::istreambuf_iterator<char>(file)),
|
|
std::istreambuf_iterator<char>());
|
|
|
|
std::vector<char> expected_data;
|
|
for (const auto& chunk : chunks) {
|
|
expected_data.insert(expected_data.end(), chunk.begin(), chunk.end());
|
|
}
|
|
EXPECT_EQ(read_data, expected_data);
|
|
}
|
|
|
|
// Test concurrent writes using executor
|
|
TEST_F(FileWriterTest, ConcurrentWritesWithExecutor) {
|
|
// Set up executor with 4 threads
|
|
FileWriteWorkerPool::GetInstance().Configure(4);
|
|
FileWriter::SetMode(FileWriter::WriteMode::BUFFERED);
|
|
|
|
const int num_files = 8;
|
|
std::vector<std::string> filenames;
|
|
std::vector<std::vector<char>> test_data;
|
|
|
|
// Prepare filenames and test data
|
|
for (int i = 0; i < num_files; ++i) {
|
|
filenames.push_back(
|
|
(test_dir_ / ("concurrent_executor_" + std::to_string(i) + ".txt"))
|
|
.string());
|
|
test_data.emplace_back(1024 * 1024); // 1MB per file
|
|
std::generate(test_data[i].begin(), test_data[i].end(), std::rand);
|
|
}
|
|
|
|
// Write to all files concurrently
|
|
std::vector<std::thread> threads;
|
|
threads.reserve(num_files);
|
|
for (int i = 0; i < num_files; ++i) {
|
|
threads.emplace_back([&, i]() {
|
|
FileWriter writer(filenames[i]);
|
|
writer.Write(test_data[i].data(), test_data[i].size());
|
|
writer.Finish();
|
|
});
|
|
}
|
|
|
|
// Wait for all threads to complete
|
|
for (auto& thread : threads) {
|
|
thread.join();
|
|
}
|
|
|
|
// Verify all files
|
|
for (int i = 0; i < num_files; ++i) {
|
|
std::ifstream file(filenames[i], std::ios::binary);
|
|
std::vector<char> read_data((std::istreambuf_iterator<char>(file)),
|
|
std::istreambuf_iterator<char>());
|
|
EXPECT_EQ(read_data, test_data[i]);
|
|
}
|
|
}
|
|
|
|
// Test executor configuration with invalid number of threads
|
|
TEST_F(FileWriterTest, InvalidExecutorConfiguration) {
|
|
// Test with zero threads
|
|
EXPECT_NO_THROW(FileWriteWorkerPool::GetInstance().Configure(0));
|
|
|
|
// Test with negative number of threads
|
|
EXPECT_NO_THROW(FileWriteWorkerPool::GetInstance().Configure(-1));
|
|
}
|
|
|
|
// Test executor configuration changes
|
|
TEST_F(FileWriterTest, ExecutorConfigurationChanges) {
|
|
// Set initial executor
|
|
FileWriteWorkerPool::GetInstance().Configure(2);
|
|
|
|
// Change executor configuration
|
|
FileWriteWorkerPool::GetInstance().Configure(4);
|
|
|
|
// Verify the change doesn't break functionality
|
|
FileWriter::SetMode(FileWriter::WriteMode::BUFFERED);
|
|
|
|
std::string filename = (test_dir_ / "executor_change.txt").string();
|
|
FileWriter writer(filename);
|
|
|
|
std::string test_data = "Test data for executor change";
|
|
writer.Write(test_data.data(), test_data.size());
|
|
writer.Finish();
|
|
|
|
// Verify file contents
|
|
std::ifstream file(filename, std::ios::binary);
|
|
std::string content((std::istreambuf_iterator<char>(file)),
|
|
std::istreambuf_iterator<char>());
|
|
EXPECT_EQ(content, test_data);
|
|
}
|
|
|
|
// Test mixed buffered and direct IO with executor
|
|
TEST_F(FileWriterTest, MixedIOWithExecutor) {
|
|
FileWriteWorkerPool::GetInstance().Configure(2);
|
|
|
|
// Test buffered IO with executor
|
|
{
|
|
FileWriter::SetMode(FileWriter::WriteMode::BUFFERED);
|
|
std::string filename = (test_dir_ / "mixed_buffered.txt").string();
|
|
FileWriter writer(filename);
|
|
|
|
std::string test_data = "Buffered IO test data";
|
|
writer.Write(test_data.data(), test_data.size());
|
|
writer.Finish();
|
|
|
|
std::ifstream file(filename, std::ios::binary);
|
|
std::string content((std::istreambuf_iterator<char>(file)),
|
|
std::istreambuf_iterator<char>());
|
|
EXPECT_EQ(content, test_data);
|
|
}
|
|
|
|
// Test direct IO with executor
|
|
{
|
|
FileWriter::SetMode(FileWriter::WriteMode::DIRECT);
|
|
FileWriter::SetBufferSize(kBufferSize);
|
|
std::string filename = (test_dir_ / "mixed_direct.txt").string();
|
|
FileWriter writer(filename);
|
|
|
|
std::string test_data = "Direct IO test data";
|
|
writer.Write(test_data.data(), test_data.size());
|
|
writer.Finish();
|
|
|
|
std::ifstream file(filename, std::ios::binary);
|
|
std::string content((std::istreambuf_iterator<char>(file)),
|
|
std::istreambuf_iterator<char>());
|
|
EXPECT_EQ(content, test_data);
|
|
}
|
|
}
|
|
|
|
// Test large data writes with executor
|
|
TEST_F(FileWriterTest, LargeDataWritesWithExecutor) {
|
|
FileWriteWorkerPool::GetInstance().Configure(2);
|
|
FileWriter::SetMode(FileWriter::WriteMode::BUFFERED);
|
|
|
|
std::string filename = (test_dir_ / "large_data_executor.txt").string();
|
|
FileWriter writer(filename);
|
|
|
|
const size_t large_size = 10 * 1024 * 1024; // 10MB
|
|
std::vector<char> large_data(large_size);
|
|
std::generate(large_data.begin(), large_data.end(), std::rand);
|
|
|
|
writer.Write(large_data.data(), large_data.size());
|
|
writer.Finish();
|
|
|
|
std::ifstream file(filename, std::ios::binary);
|
|
std::vector<char> read_data((std::istreambuf_iterator<char>(file)),
|
|
std::istreambuf_iterator<char>());
|
|
EXPECT_EQ(read_data, large_data);
|
|
}
|
|
|
|
// Test executor with different buffer sizes
|
|
TEST_F(FileWriterTest, ExecutorWithDifferentBufferSizes) {
|
|
FileWriteWorkerPool::GetInstance().Configure(2);
|
|
FileWriter::SetMode(FileWriter::WriteMode::DIRECT);
|
|
|
|
std::vector<size_t> buffer_sizes = {4096, 8192, 16384, 32768};
|
|
|
|
for (size_t buffer_size : buffer_sizes) {
|
|
FileWriter::SetBufferSize(buffer_size);
|
|
|
|
std::string filename =
|
|
(test_dir_ /
|
|
("buffer_size_" + std::to_string(buffer_size) + ".txt"))
|
|
.string();
|
|
FileWriter writer(filename);
|
|
|
|
std::vector<char> test_data(buffer_size * 2);
|
|
std::generate(test_data.begin(), test_data.end(), std::rand);
|
|
|
|
writer.Write(test_data.data(), test_data.size());
|
|
writer.Finish();
|
|
|
|
std::ifstream file(filename, std::ios::binary);
|
|
std::vector<char> read_data((std::istreambuf_iterator<char>(file)),
|
|
std::istreambuf_iterator<char>());
|
|
EXPECT_EQ(read_data, test_data);
|
|
}
|
|
}
|
|
|
|
// Test error handling in async operations
|
|
TEST_F(FileWriterTest, ErrorHandlingInAsyncOperations) {
|
|
FileWriteWorkerPool::GetInstance().Configure(2);
|
|
FileWriter::SetMode(FileWriter::WriteMode::BUFFERED);
|
|
|
|
// Test with invalid file path in async context
|
|
std::string invalid_path = "/invalid/path/async_test.txt";
|
|
|
|
// This should throw an exception even in async context
|
|
EXPECT_THROW(
|
|
{
|
|
FileWriter writer(invalid_path);
|
|
std::string test_data = "Test data";
|
|
writer.Write(test_data.data(), test_data.size());
|
|
writer.Finish();
|
|
},
|
|
std::runtime_error);
|
|
}
|
|
|
|
// Test concurrent access to FileWriterConfig
|
|
TEST_F(FileWriterTest, ConcurrentAccessToFileWriterConfig) {
|
|
const int num_threads = std::thread::hardware_concurrency();
|
|
std::vector<std::thread> threads;
|
|
|
|
threads.reserve(num_threads);
|
|
for (int i = 0; i < num_threads; ++i) {
|
|
threads.emplace_back([i]() {
|
|
// Each thread sets different executor configurations
|
|
FileWriteWorkerPool::GetInstance().Configure(i + 1);
|
|
FileWriter::SetMode(
|
|
i % 2 == 0 ? FileWriter::WriteMode::BUFFERED
|
|
: FileWriter::WriteMode::DIRECT);
|
|
FileWriter::SetBufferSize(4096 * (i + 1));
|
|
});
|
|
}
|
|
|
|
// Wait for all threads to complete
|
|
for (auto& thread : threads) {
|
|
thread.join();
|
|
}
|
|
|
|
// Verify that the configuration is still valid
|
|
FileWriter::SetMode(FileWriter::WriteMode::BUFFERED);
|
|
std::string filename =
|
|
(std::filesystem::temp_directory_path() / "concurrent_config_test.txt")
|
|
.string();
|
|
|
|
FileWriter writer(filename);
|
|
std::string test_data = "Concurrent config test";
|
|
writer.Write(test_data.data(), test_data.size());
|
|
writer.Finish();
|
|
|
|
// Clean up
|
|
std::filesystem::remove(filename);
|
|
}
|
|
// Test that changing FileWriterConfig during FileWriter operations doesn't affect existing instances
|
|
TEST_F(FileWriterTest, ConfigChangeDuringFileWriterOperations) {
|
|
// Start with buffered mode
|
|
FileWriter::SetMode(FileWriter::WriteMode::BUFFERED);
|
|
FileWriteWorkerPool::GetInstance().Configure(2);
|
|
|
|
std::string filename1 =
|
|
(std::filesystem::temp_directory_path() / "config_change_test1.txt")
|
|
.string();
|
|
std::string filename2 =
|
|
(std::filesystem::temp_directory_path() / "config_change_test2.txt")
|
|
.string();
|
|
|
|
// Create first FileWriter
|
|
FileWriter writer1(filename1);
|
|
|
|
// Start writing some data with first writer
|
|
std::string test_data1 = "First writer data";
|
|
writer1.Write(test_data1.data(), test_data1.size());
|
|
|
|
// Change configuration while first writer is still active
|
|
FileWriter::SetMode(FileWriter::WriteMode::DIRECT);
|
|
FileWriter::SetBufferSize(8192);
|
|
FileWriteWorkerPool::GetInstance().Configure(4);
|
|
|
|
// Create second FileWriter with new configuration
|
|
FileWriter writer2(filename2);
|
|
|
|
// Continue writing with both writers
|
|
std::string test_data2 = "Second writer data";
|
|
writer2.Write(test_data2.data(), test_data2.size());
|
|
|
|
std::string more_data1 = "More data for first writer";
|
|
writer1.Write(more_data1.data(), more_data1.size());
|
|
|
|
// Finish both writers
|
|
size_t size1 = writer1.Finish();
|
|
size_t size2 = writer2.Finish();
|
|
|
|
// Verify both files were written correctly
|
|
EXPECT_EQ(size1, test_data1.size() + more_data1.size());
|
|
EXPECT_EQ(size2, test_data2.size());
|
|
|
|
// Read back and verify content
|
|
std::ifstream file1(filename1);
|
|
std::string content1((std::istreambuf_iterator<char>(file1)),
|
|
std::istreambuf_iterator<char>());
|
|
EXPECT_EQ(content1, test_data1 + more_data1);
|
|
|
|
std::ifstream file2(filename2);
|
|
std::string content2((std::istreambuf_iterator<char>(file2)),
|
|
std::istreambuf_iterator<char>());
|
|
EXPECT_EQ(content2, test_data2);
|
|
|
|
// Clean up
|
|
std::filesystem::remove(filename1);
|
|
std::filesystem::remove(filename2);
|
|
}
|