feat: bump loon version (#46029)

See: #44956

This PR upgrades loon to the latest version and resolves building
conflicts.

---------

Signed-off-by: Ted Xu <ted.xu@zilliz.com>
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
Co-authored-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
Ted Xu 2025-12-04 10:57:12 +08:00 committed by GitHub
parent 8efe9ccac6
commit 20ce9fdc23
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 280 additions and 190 deletions

View File

@ -46,6 +46,7 @@ class MilvusConan(ConanFile):
"mongo-cxx-driver/3.11.0#ae206de0e90fb8cb2fb95465fb8b2f01",
"geos/3.12.0#0b177c90c25a8ca210578fb9e2899c37",
"icu/74.2#cd1937b9561b8950a2ae6311284c5813",
"libavrocpp/1.12.1@milvus/dev",
)
generators = ("cmake", "cmake_find_package")

View File

@ -106,13 +106,16 @@ class TestVectorArrayStorageV2 : public testing::Test {
auto storage_config = milvus_storage::StorageConfig();
// Create writer
milvus_storage::PackedRecordBatchWriter writer(
auto result = milvus_storage::PackedRecordBatchWriter::Make(
fs,
paths,
schema_->ConvertToArrowSchema(),
storage_config,
column_groups,
writer_memory);
writer_memory,
::parquet::default_writer_properties());
EXPECT_TRUE(result.ok());
auto writer = result.ValueOrDie();
// Generate and write data
int64_t row_count = 0;
@ -201,9 +204,9 @@ class TestVectorArrayStorageV2 : public testing::Test {
auto record_batch = arrow::RecordBatch::Make(
schema_->ConvertToArrowSchema(), test_data_count_, arrays);
row_count += test_data_count_;
EXPECT_TRUE(writer.Write(record_batch).ok());
EXPECT_TRUE(writer->Write(record_batch).ok());
}
EXPECT_TRUE(writer.Close().ok());
EXPECT_TRUE(writer->Close().ok());
LoadFieldDataInfo load_info;
load_info.field_infos.emplace(

View File

@ -716,8 +716,11 @@ JsonKeyStats::GetColumnSchemaFromParquet(int64_t column_group_id,
const std::string& file) {
auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance()
.GetArrowFileSystem();
auto file_reader =
std::make_shared<milvus_storage::FileRowGroupReader>(fs, file);
auto result = milvus_storage::FileRowGroupReader::Make(fs, file);
AssertInfo(result.ok(),
"[StorageV2] Failed to create file row group reader: " +
result.status().ToString());
auto file_reader = result.ValueOrDie();
std::shared_ptr<arrow::Schema> file_schema = file_reader->schema();
LOG_DEBUG("get column schema: [{}] for segment {}",
file_schema->ToString(true),
@ -778,9 +781,11 @@ JsonKeyStats::GetCommonMetaFromParquet(const std::string& file) {
auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance()
.GetArrowFileSystem();
auto file_reader =
std::make_shared<milvus_storage::FileRowGroupReader>(fs, file);
auto result = milvus_storage::FileRowGroupReader::Make(fs, file);
AssertInfo(result.ok(),
"[StorageV2] Failed to create file row group reader: " +
result.status().ToString());
auto file_reader = result.ValueOrDie();
// get key value metadata from parquet file
std::shared_ptr<milvus_storage::PackedFileMetadata> metadata =
file_reader->file_metadata();
@ -874,8 +879,11 @@ JsonKeyStats::LoadColumnGroup(int64_t column_group_id,
auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance()
.GetArrowFileSystem();
auto file_reader =
std::make_shared<milvus_storage::FileRowGroupReader>(fs, files[0]);
auto result = milvus_storage::FileRowGroupReader::Make(fs, files[0]);
AssertInfo(result.ok(),
"[StorageV2] Failed to create file row group reader: " +
result.status().ToString());
auto file_reader = result.ValueOrDie();
std::shared_ptr<milvus_storage::PackedFileMetadata> metadata =
file_reader->file_metadata();
milvus_storage::FieldIDList field_id_list =
@ -886,8 +894,11 @@ JsonKeyStats::LoadColumnGroup(int64_t column_group_id,
}
for (const auto& file : files) {
auto reader =
std::make_shared<milvus_storage::FileRowGroupReader>(fs, file);
auto result = milvus_storage::FileRowGroupReader::Make(fs, file);
AssertInfo(result.ok(),
"[StorageV2] Failed to create file row group reader: " +
result.status().ToString());
auto reader = result.ValueOrDie();
auto row_group_meta_vector =
reader->file_metadata()->GetRowGroupMetadataVector();
num_rows += row_group_meta_vector.row_num();

View File

@ -108,16 +108,19 @@ JsonStatsParquetWriter::Init(const ParquetWriteContext& context) {
schema_ = context.schema;
builders_ = context.builders;
builders_map_ = context.builders_map;
kv_metadata_ = std::move(context.kv_metadata);
kv_metadata_ = context.kv_metadata;
column_groups_ = context.column_groups;
file_paths_ = context.file_paths;
packed_writer_ = std::make_unique<milvus_storage::PackedRecordBatchWriter>(
fs_,
auto result = milvus_storage::PackedRecordBatchWriter::Make(fs_,
file_paths_,
schema_,
storage_config_,
column_groups_,
buffer_size_);
AssertInfo(result.ok(),
"[StorageV2] Failed to create packed writer: " +
result.status().ToString());
packed_writer_ = result.ValueOrDie();
for (const auto& [key, value] : kv_metadata_) {
packed_writer_->AddUserMetadata(key, value);
}

View File

@ -232,7 +232,7 @@ class JsonStatsParquetWriter {
size_t batch_size_;
std::shared_ptr<arrow::fs::FileSystem> fs_;
milvus_storage::StorageConfig storage_config_;
std::unique_ptr<milvus_storage::PackedRecordBatchWriter> packed_writer_;
std::shared_ptr<milvus_storage::PackedRecordBatchWriter> packed_writer_;
std::vector<std::pair<std::string, std::string>> kv_metadata_;
// cache for builders

View File

@ -97,13 +97,16 @@ class TestChunkSegmentStorageV2 : public testing::TestWithParam<bool> {
auto storage_config = milvus_storage::StorageConfig();
// Create writer
milvus_storage::PackedRecordBatchWriter writer(
auto result = milvus_storage::PackedRecordBatchWriter::Make(
fs,
paths,
schema->ConvertToArrowSchema(),
storage_config,
column_groups,
writer_memory);
writer_memory,
::parquet::default_writer_properties());
EXPECT_TRUE(result.ok());
auto writer = result.ValueOrDie();
// Generate and write data
int64_t row_count = 0;
@ -159,9 +162,9 @@ class TestChunkSegmentStorageV2 : public testing::TestWithParam<bool> {
auto record_batch = arrow::RecordBatch::Make(
schema->ConvertToArrowSchema(), test_data_count, arrays);
row_count += test_data_count;
EXPECT_TRUE(writer.Write(record_batch).ok());
EXPECT_TRUE(writer->Write(record_batch).ok());
}
EXPECT_TRUE(writer.Close().ok());
EXPECT_TRUE(writer->Close().ok());
LoadFieldDataInfo load_info;
load_info.field_infos.emplace(

View File

@ -440,11 +440,15 @@ SegmentGrowingImpl::load_column_group_data_internal(
std::vector<std::vector<int64_t>> row_group_lists;
row_group_lists.reserve(insert_files.size());
for (const auto& file : insert_files) {
auto reader = std::make_shared<milvus_storage::FileRowGroupReader>(
auto result = milvus_storage::FileRowGroupReader::Make(
fs,
file,
milvus_storage::DEFAULT_READ_BUFFER_SIZE,
storage::GetReaderProperties());
AssertInfo(result.ok(),
"[StorageV2] Failed to create file row group reader: " +
result.status().ToString());
auto reader = result.ValueOrDie();
auto row_group_num =
reader->file_metadata()->GetRowGroupMetadataVector().size();
std::vector<int64_t> all_row_groups(row_group_num);

View File

@ -143,12 +143,20 @@ TEST_F(TestGrowingStorageV2, LoadFieldData) {
auto column_groups = std::vector<std::vector<int>>{{2}, {0, 1}};
auto writer_memory = 16 * 1024 * 1024;
auto storage_config = milvus_storage::StorageConfig();
milvus_storage::PackedRecordBatchWriter writer(
fs_, paths, schema_, storage_config, column_groups, writer_memory);
auto result = milvus_storage::PackedRecordBatchWriter::Make(
fs_,
paths,
schema_,
storage_config,
column_groups,
writer_memory,
::parquet::default_writer_properties());
EXPECT_TRUE(result.ok());
auto writer = result.ValueOrDie();
for (int i = 0; i < batch_size; ++i) {
EXPECT_TRUE(writer.Write(record_batch_).ok());
EXPECT_TRUE(writer->Write(record_batch_).ok());
}
EXPECT_TRUE(writer.Close().ok());
EXPECT_TRUE(writer->Close().ok());
auto schema = std::make_shared<milvus::Schema>();
auto ts_fid = schema->AddDebugField("ts", milvus::DataType::INT64, true);
@ -187,20 +195,32 @@ TEST_F(TestGrowingStorageV2, LoadWithStrategy) {
auto column_groups = std::vector<std::vector<int>>{{2}, {0, 1}};
auto writer_memory = 16 * 1024 * 1024;
auto storage_config = milvus_storage::StorageConfig();
milvus_storage::PackedRecordBatchWriter writer(
fs_, paths, schema_, storage_config, column_groups, writer_memory);
auto result = milvus_storage::PackedRecordBatchWriter::Make(
fs_,
paths,
schema_,
storage_config,
column_groups,
writer_memory,
::parquet::default_writer_properties());
EXPECT_TRUE(result.ok());
auto writer = result.ValueOrDie();
for (int i = 0; i < batch_size; ++i) {
EXPECT_TRUE(writer.Write(record_batch_).ok());
EXPECT_TRUE(writer->Write(record_batch_).ok());
}
EXPECT_TRUE(writer.Close().ok());
EXPECT_TRUE(writer->Close().ok());
auto channel = std::make_shared<milvus::ArrowReaderChannel>();
int64_t memory_limit = 1024 * 1024 * 1024; // 1GB
uint64_t parallel_degree = 2;
// read all row groups
auto fr = std::make_shared<milvus_storage::FileRowGroupReader>(
fs_, paths[0], schema_);
auto reader_result =
milvus_storage::FileRowGroupReader::Make(fs_, paths[0]);
AssertInfo(reader_result.ok(),
"[StorageV2] Failed to create file row group reader: " +
reader_result.status().ToString());
auto fr = reader_result.ValueOrDie();
auto row_group_metadata = fr->file_metadata()->GetRowGroupMetadataVector();
auto status = fr->Close();
AssertInfo(
@ -349,8 +369,16 @@ TEST_F(TestGrowingStorageV2, TestAllDataTypes) {
auto writer_memory = 16 * 1024 * 1024;
auto storage_config = milvus_storage::StorageConfig();
auto arrow_schema = schema->ConvertToArrowSchema();
milvus_storage::PackedRecordBatchWriter writer(
fs_, paths, arrow_schema, storage_config, column_groups, writer_memory);
auto result = milvus_storage::PackedRecordBatchWriter::Make(
fs_,
paths,
arrow_schema,
storage_config,
column_groups,
writer_memory,
::parquet::default_writer_properties());
EXPECT_TRUE(result.ok());
auto writer = result.ValueOrDie();
int64_t total_rows = 0;
for (int64_t i = 0; i < n_batch; i++) {
auto dataset = DataGen(schema, per_batch);
@ -358,9 +386,9 @@ TEST_F(TestGrowingStorageV2, TestAllDataTypes) {
ConvertToArrowRecordBatch(dataset, dim, arrow_schema);
total_rows += record_batch->num_rows();
EXPECT_TRUE(writer.Write(record_batch).ok());
EXPECT_TRUE(writer->Write(record_batch).ok());
}
EXPECT_TRUE(writer.Close().ok());
EXPECT_TRUE(writer->Close().ok());
// Load data back from storage v2
LoadFieldDataInfo load_info;

View File

@ -183,32 +183,30 @@ LoadWithStrategy(const std::vector<std::string>& remote_files,
memory_limit / blocks.size(), FILE_SLICE_SIZE.load());
for (const auto& block : blocks) {
futures.emplace_back(pool.Submit([block,
fs,
file,
file_idx,
schema,
reader_memory_limit]() {
futures.emplace_back(pool.Submit(
[block, fs, file, file_idx, schema, reader_memory_limit]() {
AssertInfo(fs != nullptr,
"[StorageV2] file system is nullptr");
auto row_group_reader =
std::make_shared<milvus_storage::FileRowGroupReader>(
auto result = milvus_storage::FileRowGroupReader::Make(
fs,
file,
schema,
reader_memory_limit,
milvus::storage::GetReaderProperties());
AssertInfo(row_group_reader != nullptr,
"[StorageV2] row group reader is nullptr");
row_group_reader->SetRowGroupOffsetAndCount(block.offset,
block.count);
LOG_INFO(
"[StorageV2] read row groups from file {} with offset "
"{} and count "
"{}",
file,
block.offset,
block.count);
AssertInfo(
result.ok(),
"[StorageV2] Failed to create row group reader: " +
result.status().ToString());
auto row_group_reader = result.ValueOrDie();
auto status =
row_group_reader->SetRowGroupOffsetAndCount(
block.offset, block.count);
AssertInfo(status.ok(),
"[StorageV2] Failed to set row group offset "
"and count " +
std::to_string(block.offset) + " and " +
std::to_string(block.count) +
" with error " + status.ToString());
auto ret = std::make_shared<ArrowDataWrapper>();
for (int64_t i = 0; i < block.count; ++i) {
std::shared_ptr<arrow::Table> table;
@ -217,15 +215,16 @@ LoadWithStrategy(const std::vector<std::string>& remote_files,
AssertInfo(status.ok(),
"[StorageV2] Failed to read row group " +
std::to_string(block.offset + i) +
" from file " + file + " with error " +
status.ToString());
" from file " + file +
" with error " + status.ToString());
ret->arrow_tables.push_back(
{file_idx,
static_cast<size_t>(block.offset + i),
table});
}
auto close_status = row_group_reader->Close();
AssertInfo(close_status.ok(),
AssertInfo(
close_status.ok(),
"[StorageV2] Failed to close row group reader "
"for file " +
file + " with error " +

View File

@ -108,16 +108,21 @@ NewPackedWriterWithStorageConfig(struct ArrowSchema* schema,
}
auto writer_properties = builder.build();
auto writer = std::make_unique<milvus_storage::PackedRecordBatchWriter>(
trueFs,
auto result =
milvus_storage::PackedRecordBatchWriter::Make(trueFs,
truePaths,
trueSchema,
storage_config,
columnGroups,
buffer_size,
writer_properties);
AssertInfo(writer, "[StorageV2] writer pointer is null");
*c_packed_writer = writer.release();
AssertInfo(result.ok(),
"[StorageV2] Failed to create packed writer: " +
result.status().ToString());
auto writer = result.ValueOrDie();
*c_packed_writer =
new std::shared_ptr<milvus_storage::PackedRecordBatchWriter>(
std::move(writer));
return milvus::SuccessCStatus();
} catch (std::exception& e) {
return milvus::FailureCStatus(&e);
@ -177,16 +182,21 @@ NewPackedWriter(struct ArrowSchema* schema,
}
auto writer_properties = builder.build();
auto writer = std::make_unique<milvus_storage::PackedRecordBatchWriter>(
trueFs,
auto result =
milvus_storage::PackedRecordBatchWriter::Make(trueFs,
truePaths,
trueSchema,
conf,
columnGroups,
buffer_size,
writer_properties);
AssertInfo(writer, "[StorageV2] writer pointer is null");
*c_packed_writer = writer.release();
AssertInfo(result.ok(),
"[StorageV2] Failed to create packed writer: " +
result.status().ToString());
auto writer = result.ValueOrDie();
*c_packed_writer =
new std::shared_ptr<milvus_storage::PackedRecordBatchWriter>(
std::move(writer));
return milvus::SuccessCStatus();
} catch (std::exception& e) {
return milvus::FailureCStatus(&e);
@ -201,8 +211,8 @@ WriteRecordBatch(CPackedWriter c_packed_writer,
SCOPE_CGO_CALL_METRIC();
try {
auto packed_writer =
static_cast<milvus_storage::PackedRecordBatchWriter*>(
auto packed_writer = *static_cast<
std::shared_ptr<milvus_storage::PackedRecordBatchWriter>*>(
c_packed_writer);
auto import_schema = arrow::ImportSchema(schema);
@ -248,10 +258,10 @@ CloseWriter(CPackedWriter c_packed_writer) {
SCOPE_CGO_CALL_METRIC();
try {
auto packed_writer =
static_cast<milvus_storage::PackedRecordBatchWriter*>(
auto packed_writer = static_cast<
std::shared_ptr<milvus_storage::PackedRecordBatchWriter>*>(
c_packed_writer);
auto status = packed_writer->Close();
auto status = (*packed_writer)->Close();
delete packed_writer;
if (!status.ok()) {
return milvus::FailureCStatus(milvus::ErrorCode::FileWriteFailed,

View File

@ -109,11 +109,15 @@ GroupChunkTranslator::GroupChunkTranslator(
parquet_file_metadata_.reserve(insert_files_.size());
row_group_meta_list_.reserve(insert_files_.size());
for (const auto& file : insert_files_) {
auto reader = std::make_shared<milvus_storage::FileRowGroupReader>(
auto result = milvus_storage::FileRowGroupReader::Make(
fs,
file,
milvus_storage::DEFAULT_READ_BUFFER_SIZE,
storage::GetReaderProperties());
AssertInfo(result.ok(),
"[StorageV2] Failed to create file row group reader: " +
result.status().ToString());
auto reader = result.ValueOrDie();
parquet_file_metadata_.push_back(
reader->file_metadata()->GetParquetMetadata());

View File

@ -59,12 +59,16 @@ class GroupChunkTranslatorTest : public ::testing::TestWithParam<bool> {
{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}};
auto writer_memory = 16 * 1024 * 1024;
auto storage_config = milvus_storage::StorageConfig();
milvus_storage::PackedRecordBatchWriter writer(fs_,
auto result = milvus_storage::PackedRecordBatchWriter::Make(
fs_,
paths_,
arrow_schema_,
storage_config,
column_groups,
writer_memory);
writer_memory,
::parquet::default_writer_properties());
EXPECT_TRUE(result.ok());
auto writer = result.ValueOrDie();
int64_t total_rows = 0;
for (int64_t i = 0; i < n_batch; i++) {
auto dataset = DataGen(schema_, per_batch);
@ -72,9 +76,9 @@ class GroupChunkTranslatorTest : public ::testing::TestWithParam<bool> {
ConvertToArrowRecordBatch(dataset, dim, arrow_schema_);
total_rows += record_batch->num_rows();
EXPECT_TRUE(writer.Write(record_batch).ok());
EXPECT_TRUE(writer->Write(record_batch).ok());
}
EXPECT_TRUE(writer.Close().ok());
EXPECT_TRUE(writer->Close().ok());
}
protected:
@ -116,8 +120,12 @@ TEST_P(GroupChunkTranslatorTest, TestWithMmap) {
milvus::proto::common::LoadPriority::LOW);
// num cells - get the expected number from the file directly
auto fr =
std::make_shared<milvus_storage::FileRowGroupReader>(fs_, paths_[0]);
auto reader_result =
milvus_storage::FileRowGroupReader::Make(fs_, paths_[0]);
AssertInfo(reader_result.ok(),
"[StorageV2] Failed to create file row group reader: " +
reader_result.status().ToString());
auto fr = reader_result.ValueOrDie();
auto expected_num_cells =
fr->file_metadata()->GetRowGroupMetadataVector().size();
auto row_group_metadata_vector =
@ -214,25 +222,33 @@ TEST_P(GroupChunkTranslatorTest, TestMultipleFiles) {
auto writer_memory = 16 * 1024 * 1024;
auto storage_config = milvus_storage::StorageConfig();
std::vector<std::string> single_file_paths{file_path};
milvus_storage::PackedRecordBatchWriter writer(fs_,
auto result = milvus_storage::PackedRecordBatchWriter::Make(
fs_,
single_file_paths,
arrow_schema_,
storage_config,
column_groups,
writer_memory);
writer_memory,
::parquet::default_writer_properties());
EXPECT_TRUE(result.ok());
auto writer = result.ValueOrDie();
for (int64_t i = 0; i < n_batch; i++) {
auto dataset = DataGen(schema_, per_batch);
auto record_batch =
ConvertToArrowRecordBatch(dataset, dim, arrow_schema_);
total_rows += record_batch->num_rows();
EXPECT_TRUE(writer.Write(record_batch).ok());
EXPECT_TRUE(writer->Write(record_batch).ok());
}
EXPECT_TRUE(writer.Close().ok());
EXPECT_TRUE(writer->Close().ok());
// Get the number of row groups in this file
auto fr = std::make_shared<milvus_storage::FileRowGroupReader>(
fs_, file_path);
auto reader_result =
milvus_storage::FileRowGroupReader::Make(fs_, file_path);
AssertInfo(reader_result.ok(),
"[StorageV2] Failed to create file row group reader: " +
reader_result.status().ToString());
auto fr = reader_result.ValueOrDie();
expected_row_groups_per_file.push_back(
fr->file_metadata()->GetRowGroupMetadataVector().size());
auto status = fr->Close();
@ -303,8 +319,12 @@ TEST_P(GroupChunkTranslatorTest, TestMultipleFiles) {
auto usage = translator->estimated_byte_size_of_cell(i).first;
// Get the expected memory size from the corresponding file
auto fr = std::make_shared<milvus_storage::FileRowGroupReader>(
auto reader_result = milvus_storage::FileRowGroupReader::Make(
fs_, multi_file_paths[file_idx]);
AssertInfo(reader_result.ok(),
"[StorageV2] Failed to create file row group reader: " +
reader_result.status().ToString());
auto fr = reader_result.ValueOrDie();
auto row_group_metadata_vector =
fr->file_metadata()->GetRowGroupMetadataVector();
auto expected_size = static_cast<int64_t>(

View File

@ -1307,11 +1307,15 @@ GetFieldDatasFromStorageV2(std::vector<std::vector<std::string>>& remote_files,
for (auto& column_group_file : remote_chunk_files) {
// get all row groups for each file
std::vector<std::vector<int64_t>> row_group_lists;
auto reader = std::make_shared<milvus_storage::FileRowGroupReader>(
auto result = milvus_storage::FileRowGroupReader::Make(
fs,
column_group_file,
milvus_storage::DEFAULT_READ_BUFFER_SIZE,
GetReaderProperties());
AssertInfo(result.ok(),
"[StorageV2] Failed to create file row group reader: " +
result.status().ToString());
auto reader = result.ValueOrDie();
auto row_group_num =
reader->file_metadata()->GetRowGroupMetadataVector().size();
@ -1515,12 +1519,16 @@ GetFieldIDList(FieldId column_group_id,
field_id_list.Add(column_group_id.get());
return field_id_list;
}
auto file_reader = std::make_shared<milvus_storage::FileRowGroupReader>(
auto result = milvus_storage::FileRowGroupReader::Make(
fs,
filepath,
arrow_schema,
milvus_storage::DEFAULT_READ_BUFFER_SIZE,
GetReaderProperties());
AssertInfo(result.ok(),
"[StorageV2] Failed to create file row group reader: " +
result.status().ToString());
auto file_reader = result.ValueOrDie();
field_id_list =
file_reader->file_metadata()->GetGroupFieldIDList().GetFieldIDList(
column_group_id.get());

View File

@ -22,14 +22,14 @@
#include "monitor/scope_metric.h"
ReaderHandle
createFFIReader(char* manifest,
createFFIReader(ColumnGroupsHandle column_groups_handle,
struct ArrowSchema* schema,
char** needed_columns,
int64_t needed_columns_size,
const std::shared_ptr<Properties>& properties) {
ReaderHandle reader_handler = 0;
FFIResult result = reader_new(manifest,
FFIResult result = reader_new(column_groups_handle,
schema,
needed_columns,
needed_columns_size,
@ -97,7 +97,7 @@ NewPackedFFIReader(const char* manifest_path,
}
CStatus
NewPackedFFIReaderWithManifest(const char* manifest_content,
NewPackedFFIReaderWithManifest(const ColumnGroupsHandle column_groups_handle,
struct ArrowSchema* schema,
char** needed_columns,
int64_t needed_columns_size,
@ -109,12 +109,10 @@ NewPackedFFIReaderWithManifest(const char* manifest_content,
try {
auto properties =
MakeInternalPropertiesFromStorageConfig(c_storage_config);
// Parse the column groups, the column groups is a JSON string
auto cpp_column_groups =
std::make_shared<milvus_storage::api::ColumnGroups>();
auto des_result =
cpp_column_groups->deserialize(std::string_view(manifest_content));
AssertInfo(des_result.ok(), "failed to deserialize column groups");
auto* cg_ptr = reinterpret_cast<
std::shared_ptr<milvus_storage::api::ColumnGroups>*>(
column_groups_handle);
auto cpp_column_groups = *cg_ptr;
auto reader = GetLoonReader(cpp_column_groups,
schema,

View File

@ -105,7 +105,7 @@ NewPackedFFIReader(const char* manifest_path,
* be freed after this call returns.
*/
CStatus
NewPackedFFIReaderWithManifest(const char* manifest_content,
NewPackedFFIReaderWithManifest(const ColumnGroupsHandle column_groups_handle,
struct ArrowSchema* schema,
char** needed_columns,
int64_t needed_columns_size,

View File

@ -257,12 +257,10 @@ GetManifest(const std::string& path,
// Parse the JSON string
json j = json::parse(path);
// Extract base_path and ver fields
// Extract base_path
std::string base_path = j.at("base_path").get<std::string>();
int64_t ver = j.at("ver").get<int64_t>();
// return std::make_pair(base_path, ver);
char* out_column_groups = nullptr;
ColumnGroupsHandle out_column_groups = 0;
int64_t out_read_version = 0;
FFIResult result = get_latest_column_groups(base_path.c_str(),
properties.get(),
@ -298,9 +296,8 @@ GetColumnGroups(
// Parse the JSON string
json j = json::parse(path);
// Extract base_path and ver fields
// Extract base_path
std::string base_path = j.at("base_path").get<std::string>();
int64_t ver = j.at("ver").get<int64_t>();
// TODO fetch manifest based on version after api supported
auto transaction =

View File

@ -14,7 +14,7 @@
# Update milvus-storage_VERSION for the first occurrence
milvus_add_pkg_config("milvus-storage")
set_property(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} PROPERTY INCLUDE_DIRECTORIES "")
set( milvus-storage_VERSION ba7df7b)
set( milvus-storage_VERSION 5fff4f5)
set( GIT_REPOSITORY "https://github.com/milvus-io/milvus-storage.git")
message(STATUS "milvus-storage repo: ${GIT_REPOSITORY}")
message(STATUS "milvus-storage version: ${milvus-storage_VERSION}")

View File

@ -75,8 +75,16 @@ TEST_F(StorageV2IndexRawDataTest, TestGetRawData) {
auto writer_memory = 16 * 1024 * 1024;
auto storage_config = milvus_storage::StorageConfig();
auto arrow_schema = schema->ConvertToArrowSchema();
milvus_storage::PackedRecordBatchWriter writer(
fs_, paths, arrow_schema, storage_config, column_groups, writer_memory);
auto result = milvus_storage::PackedRecordBatchWriter::Make(
fs_,
paths,
arrow_schema,
storage_config,
column_groups,
writer_memory,
::parquet::default_writer_properties());
EXPECT_TRUE(result.ok());
auto writer = result.ValueOrDie();
int64_t total_rows = 0;
for (int64_t i = 0; i < n_batch; i++) {
auto dataset = DataGen(schema, per_batch);
@ -84,9 +92,9 @@ TEST_F(StorageV2IndexRawDataTest, TestGetRawData) {
ConvertToArrowRecordBatch(dataset, dim, arrow_schema);
total_rows += record_batch->num_rows();
EXPECT_TRUE(writer.Write(record_batch).ok());
EXPECT_TRUE(writer->Write(record_batch).ok());
}
EXPECT_TRUE(writer.Close().ok());
EXPECT_TRUE(writer->Close().ok());
{
// test memory file manager

View File

@ -244,13 +244,7 @@ func NewManifestReader(manifest string,
}
func (mr *ManifestReader) init() error {
// TODO add needed column option
manifest, err := packed.GetManifest(mr.manifest, mr.storageConfig)
if err != nil {
return err
}
reader, err := packed.NewFFIPackedReader(manifest, mr.arrowSchema, mr.neededColumns, mr.bufferSize, mr.storageConfig, mr.storagePluginContext)
reader, err := packed.NewFFIPackedReader(mr.manifest, mr.arrowSchema, mr.neededColumns, mr.bufferSize, mr.storageConfig, mr.storagePluginContext)
if err != nil {
return err
}

View File

@ -31,6 +31,7 @@ import (
"github.com/apache/arrow/go/v17/arrow"
"github.com/apache/arrow/go/v17/arrow/cdata"
"github.com/cockroachdb/errors"
"go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/v2/log"
@ -38,9 +39,11 @@ import (
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
)
func NewFFIPackedReader(manifest string, schema *arrow.Schema, neededColumns []string, bufferSize int64, storageConfig *indexpb.StorageConfig, storagePluginContext *indexcgopb.StoragePluginContext) (*FFIPackedReader, error) {
cManifest := C.CString(manifest)
defer C.free(unsafe.Pointer(cManifest))
func NewFFIPackedReader(manifestPath string, schema *arrow.Schema, neededColumns []string, bufferSize int64, storageConfig *indexpb.StorageConfig, storagePluginContext *indexcgopb.StoragePluginContext) (*FFIPackedReader, error) {
cColumnGroups, err := GetColumnGroups(manifestPath, storageConfig)
if err != nil {
return nil, errors.Wrap(err, "failed to get manifest")
}
var cas cdata.CArrowSchema
cdata.ExportArrowSchema(schema, &cas)
@ -103,7 +106,7 @@ func NewFFIPackedReader(manifest string, schema *arrow.Schema, neededColumns []s
cNeededColumnArray := (**C.char)(unsafe.Pointer(&cNeededColumn[0]))
cNumColumns := C.int64_t(len(neededColumns))
status = C.NewPackedFFIReaderWithManifest(cManifest, cSchema, cNeededColumnArray, cNumColumns, &cPackedReader, cStorageConfig, pluginContextPtr)
status = C.NewPackedFFIReaderWithManifest(cColumnGroups, cSchema, cNeededColumnArray, cNumColumns, &cPackedReader, cStorageConfig, pluginContextPtr)
} else {
return nil, fmt.Errorf("storageConfig is required")
}
@ -184,30 +187,29 @@ func (r *FFIPackedReader) Release() {
r.Close()
}
func GetManifest(manifestPath string, storageConfig *indexpb.StorageConfig) (manifest string, err error) {
func GetColumnGroups(manifestPath string, storageConfig *indexpb.StorageConfig) (columnGroups C.ColumnGroupsHandle, err error) {
var cColumnGroups C.ColumnGroupsHandle
basePath, version, err := UnmarshalManfestPath(manifestPath)
if err != nil {
return "", err
return cColumnGroups, err
}
log.Info("GetManifest", zap.String("manifestPath", manifestPath), zap.String("basePath", basePath), zap.Int64("version", version))
cProperties, err := MakePropertiesFromStorageConfig(storageConfig, nil)
if err != nil {
return "", err
return cColumnGroups, err
}
cBasePath := C.CString(basePath)
defer C.free(unsafe.Pointer(cBasePath))
var cManifest *C.char
var cVersion C.int64_t
result := C.get_latest_column_groups(cBasePath, cProperties, &cManifest, &cVersion)
result := C.get_latest_column_groups(cBasePath, cProperties, &cColumnGroups, &cVersion)
err = HandleFFIResult(result)
if err != nil {
return "", err
return cColumnGroups, err
}
manifest = C.GoString(cManifest)
return manifest, nil
return cColumnGroups, nil
}
// Ensure FFIPackedReader implements array.RecordReader interface

View File

@ -133,9 +133,9 @@ func (pw *FFIPackedWriter) WriteRecordBatch(recordBatch arrow.Record) error {
}
func (pw *FFIPackedWriter) Close() (string, error) {
var manifest *C.char
var cColumnGroups C.ColumnGroupsHandle
result := C.writer_close(pw.cWriterHandle, nil, nil, 0, &manifest)
result := C.writer_close(pw.cWriterHandle, nil, nil, 0, &cColumnGroups)
if err := HandleFFIResult(result); err != nil {
return "", err
}
@ -143,7 +143,10 @@ func (pw *FFIPackedWriter) Close() (string, error) {
cBasePath := C.CString(pw.basePath)
defer C.free(unsafe.Pointer(cBasePath))
var transationHandle C.TransactionHandle
result = C.transaction_begin(cBasePath, pw.cProperties, &transationHandle)
// TODO pass version
// use -1 as latest
result = C.transaction_begin(cBasePath, pw.cProperties, &transationHandle, C.int64_t(-1))
if err := HandleFFIResult(result); err != nil {
return "", err
}
@ -157,23 +160,14 @@ func (pw *FFIPackedWriter) Close() (string, error) {
// #define LOON_TRANSACTION_RESOLVE_MERGE 1
// #define LOON_TRANSACTION_RESOLVE_MAX 2
var commitResult C.bool
result = C.transaction_commit(transationHandle, C.int16_t(0), C.int16_t(0), manifest, &commitResult)
var commitResult C.TransactionCommitResult
result = C.transaction_commit(transationHandle, C.int16_t(0), C.int16_t(0), cColumnGroups, &commitResult)
if err := HandleFFIResult(result); err != nil {
return "", err
}
var readVersion C.int64_t
// TODO: not atomic, need to get version from transaction
var cOutManifest *C.char
result = C.get_latest_column_groups(cBasePath, pw.cProperties, &cOutManifest, &readVersion)
if err := HandleFFIResult(result); err != nil {
return "", err
}
outManifest := C.GoString(cOutManifest)
log.Info("FFI writer closed with output manifest", zap.String("manifest", outManifest), zap.Int64("version", int64(readVersion)))
log.Info("FFI writer closed", zap.Int64("version", int64(commitResult.committed_version)))
defer C.properties_free(pw.cProperties)
return MarshalManifestPath(pw.basePath, int64(readVersion)), nil
return MarshalManifestPath(pw.basePath, int64(commitResult.committed_version)), nil
}

View File

@ -212,6 +212,9 @@ make rebuild_cache >/dev/null 2>&1
CPU_ARCH=$(get_cpu_arch $CPU_TARGET)
# In case any 3rdparty (e.g. libavrocpp) requires a minimum version of CMake lower than 3.5
export CMAKE_POLICY_VERSION_MINIMUM=3.5
arch=$(uname -m)
CMAKE_CMD="cmake \
${CMAKE_EXTRA_ARGS} \