Compare commits

...

2 Commits

Author SHA1 Message Date
congqixia
8471565c38
feat: [2.6] bump loon version (#46029) (#46149)
Cherry-pick from master
pr: #46029
See: #44956

This PR upgrades loon to the latest version and resolves building
conflicts.

---------

Signed-off-by: Ted Xu <ted.xu@zilliz.com>
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
Co-authored-by: Ted Xu <ted.xu@zilliz.com>
2025-12-05 23:41:12 +08:00
cai.zhang
e42bf36ad0
enhance: [2.6] Add log with segment size for tasks (#46119)
master pr: #46118

---------

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
2025-12-05 18:08:41 +08:00
25 changed files with 305 additions and 194 deletions

View File

@ -50,6 +50,7 @@ class MilvusConan(ConanFile):
"unordered_dense/4.4.0#6a855c992618cc4c63019109a2e47298", "unordered_dense/4.4.0#6a855c992618cc4c63019109a2e47298",
"mongo-cxx-driver/3.11.0#ae206de0e90fb8cb2fb95465fb8b2f01", "mongo-cxx-driver/3.11.0#ae206de0e90fb8cb2fb95465fb8b2f01",
"geos/3.12.0#0b177c90c25a8ca210578fb9e2899c37", "geos/3.12.0#0b177c90c25a8ca210578fb9e2899c37",
"libavrocpp/1.12.1@milvus/dev",
) )
generators = ("cmake", "cmake_find_package") generators = ("cmake", "cmake_find_package")
default_options = { default_options = {

View File

@ -106,13 +106,16 @@ class TestVectorArrayStorageV2 : public testing::Test {
auto storage_config = milvus_storage::StorageConfig(); auto storage_config = milvus_storage::StorageConfig();
// Create writer // Create writer
milvus_storage::PackedRecordBatchWriter writer( auto result = milvus_storage::PackedRecordBatchWriter::Make(
fs, fs,
paths, paths,
schema_->ConvertToArrowSchema(), schema_->ConvertToArrowSchema(),
storage_config, storage_config,
column_groups, column_groups,
writer_memory); writer_memory,
::parquet::default_writer_properties());
EXPECT_TRUE(result.ok());
auto writer = result.ValueOrDie();
// Generate and write data // Generate and write data
int64_t row_count = 0; int64_t row_count = 0;
@ -201,9 +204,9 @@ class TestVectorArrayStorageV2 : public testing::Test {
auto record_batch = arrow::RecordBatch::Make( auto record_batch = arrow::RecordBatch::Make(
schema_->ConvertToArrowSchema(), test_data_count_, arrays); schema_->ConvertToArrowSchema(), test_data_count_, arrays);
row_count += test_data_count_; row_count += test_data_count_;
EXPECT_TRUE(writer.Write(record_batch).ok()); EXPECT_TRUE(writer->Write(record_batch).ok());
} }
EXPECT_TRUE(writer.Close().ok()); EXPECT_TRUE(writer->Close().ok());
LoadFieldDataInfo load_info; LoadFieldDataInfo load_info;
load_info.field_infos.emplace( load_info.field_infos.emplace(

View File

@ -716,8 +716,11 @@ JsonKeyStats::GetColumnSchemaFromParquet(int64_t column_group_id,
const std::string& file) { const std::string& file) {
auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance() auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance()
.GetArrowFileSystem(); .GetArrowFileSystem();
auto file_reader = auto result = milvus_storage::FileRowGroupReader::Make(fs, file);
std::make_shared<milvus_storage::FileRowGroupReader>(fs, file); AssertInfo(result.ok(),
"[StorageV2] Failed to create file row group reader: " +
result.status().ToString());
auto file_reader = result.ValueOrDie();
std::shared_ptr<arrow::Schema> file_schema = file_reader->schema(); std::shared_ptr<arrow::Schema> file_schema = file_reader->schema();
LOG_DEBUG("get column schema: [{}] for segment {}", LOG_DEBUG("get column schema: [{}] for segment {}",
file_schema->ToString(true), file_schema->ToString(true),
@ -778,9 +781,11 @@ JsonKeyStats::GetCommonMetaFromParquet(const std::string& file) {
auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance() auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance()
.GetArrowFileSystem(); .GetArrowFileSystem();
auto file_reader = auto result = milvus_storage::FileRowGroupReader::Make(fs, file);
std::make_shared<milvus_storage::FileRowGroupReader>(fs, file); AssertInfo(result.ok(),
"[StorageV2] Failed to create file row group reader: " +
result.status().ToString());
auto file_reader = result.ValueOrDie();
// get key value metadata from parquet file // get key value metadata from parquet file
std::shared_ptr<milvus_storage::PackedFileMetadata> metadata = std::shared_ptr<milvus_storage::PackedFileMetadata> metadata =
file_reader->file_metadata(); file_reader->file_metadata();
@ -874,8 +879,11 @@ JsonKeyStats::LoadColumnGroup(int64_t column_group_id,
auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance() auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance()
.GetArrowFileSystem(); .GetArrowFileSystem();
auto file_reader = auto result = milvus_storage::FileRowGroupReader::Make(fs, files[0]);
std::make_shared<milvus_storage::FileRowGroupReader>(fs, files[0]); AssertInfo(result.ok(),
"[StorageV2] Failed to create file row group reader: " +
result.status().ToString());
auto file_reader = result.ValueOrDie();
std::shared_ptr<milvus_storage::PackedFileMetadata> metadata = std::shared_ptr<milvus_storage::PackedFileMetadata> metadata =
file_reader->file_metadata(); file_reader->file_metadata();
milvus_storage::FieldIDList field_id_list = milvus_storage::FieldIDList field_id_list =
@ -886,8 +894,11 @@ JsonKeyStats::LoadColumnGroup(int64_t column_group_id,
} }
for (const auto& file : files) { for (const auto& file : files) {
auto reader = auto result = milvus_storage::FileRowGroupReader::Make(fs, file);
std::make_shared<milvus_storage::FileRowGroupReader>(fs, file); AssertInfo(result.ok(),
"[StorageV2] Failed to create file row group reader: " +
result.status().ToString());
auto reader = result.ValueOrDie();
auto row_group_meta_vector = auto row_group_meta_vector =
reader->file_metadata()->GetRowGroupMetadataVector(); reader->file_metadata()->GetRowGroupMetadataVector();
num_rows += row_group_meta_vector.row_num(); num_rows += row_group_meta_vector.row_num();

View File

@ -108,16 +108,19 @@ JsonStatsParquetWriter::Init(const ParquetWriteContext& context) {
schema_ = context.schema; schema_ = context.schema;
builders_ = context.builders; builders_ = context.builders;
builders_map_ = context.builders_map; builders_map_ = context.builders_map;
kv_metadata_ = std::move(context.kv_metadata); kv_metadata_ = context.kv_metadata;
column_groups_ = context.column_groups; column_groups_ = context.column_groups;
file_paths_ = context.file_paths; file_paths_ = context.file_paths;
packed_writer_ = std::make_unique<milvus_storage::PackedRecordBatchWriter>( auto result = milvus_storage::PackedRecordBatchWriter::Make(fs_,
fs_, file_paths_,
file_paths_, schema_,
schema_, storage_config_,
storage_config_, column_groups_,
column_groups_, buffer_size_);
buffer_size_); AssertInfo(result.ok(),
"[StorageV2] Failed to create packed writer: " +
result.status().ToString());
packed_writer_ = result.ValueOrDie();
for (const auto& [key, value] : kv_metadata_) { for (const auto& [key, value] : kv_metadata_) {
packed_writer_->AddUserMetadata(key, value); packed_writer_->AddUserMetadata(key, value);
} }

View File

@ -232,7 +232,7 @@ class JsonStatsParquetWriter {
size_t batch_size_; size_t batch_size_;
std::shared_ptr<arrow::fs::FileSystem> fs_; std::shared_ptr<arrow::fs::FileSystem> fs_;
milvus_storage::StorageConfig storage_config_; milvus_storage::StorageConfig storage_config_;
std::unique_ptr<milvus_storage::PackedRecordBatchWriter> packed_writer_; std::shared_ptr<milvus_storage::PackedRecordBatchWriter> packed_writer_;
std::vector<std::pair<std::string, std::string>> kv_metadata_; std::vector<std::pair<std::string, std::string>> kv_metadata_;
// cache for builders // cache for builders

View File

@ -97,13 +97,16 @@ class TestChunkSegmentStorageV2 : public testing::TestWithParam<bool> {
auto storage_config = milvus_storage::StorageConfig(); auto storage_config = milvus_storage::StorageConfig();
// Create writer // Create writer
milvus_storage::PackedRecordBatchWriter writer( auto result = milvus_storage::PackedRecordBatchWriter::Make(
fs, fs,
paths, paths,
schema->ConvertToArrowSchema(), schema->ConvertToArrowSchema(),
storage_config, storage_config,
column_groups, column_groups,
writer_memory); writer_memory,
::parquet::default_writer_properties());
EXPECT_TRUE(result.ok());
auto writer = result.ValueOrDie();
// Generate and write data // Generate and write data
int64_t row_count = 0; int64_t row_count = 0;
@ -159,9 +162,9 @@ class TestChunkSegmentStorageV2 : public testing::TestWithParam<bool> {
auto record_batch = arrow::RecordBatch::Make( auto record_batch = arrow::RecordBatch::Make(
schema->ConvertToArrowSchema(), test_data_count, arrays); schema->ConvertToArrowSchema(), test_data_count, arrays);
row_count += test_data_count; row_count += test_data_count;
EXPECT_TRUE(writer.Write(record_batch).ok()); EXPECT_TRUE(writer->Write(record_batch).ok());
} }
EXPECT_TRUE(writer.Close().ok()); EXPECT_TRUE(writer->Close().ok());
LoadFieldDataInfo load_info; LoadFieldDataInfo load_info;
load_info.field_infos.emplace( load_info.field_infos.emplace(

View File

@ -440,11 +440,15 @@ SegmentGrowingImpl::load_column_group_data_internal(
std::vector<std::vector<int64_t>> row_group_lists; std::vector<std::vector<int64_t>> row_group_lists;
row_group_lists.reserve(insert_files.size()); row_group_lists.reserve(insert_files.size());
for (const auto& file : insert_files) { for (const auto& file : insert_files) {
auto reader = std::make_shared<milvus_storage::FileRowGroupReader>( auto result = milvus_storage::FileRowGroupReader::Make(
fs, fs,
file, file,
milvus_storage::DEFAULT_READ_BUFFER_SIZE, milvus_storage::DEFAULT_READ_BUFFER_SIZE,
storage::GetReaderProperties()); storage::GetReaderProperties());
AssertInfo(result.ok(),
"[StorageV2] Failed to create file row group reader: " +
result.status().ToString());
auto reader = result.ValueOrDie();
auto row_group_num = auto row_group_num =
reader->file_metadata()->GetRowGroupMetadataVector().size(); reader->file_metadata()->GetRowGroupMetadataVector().size();
std::vector<int64_t> all_row_groups(row_group_num); std::vector<int64_t> all_row_groups(row_group_num);

View File

@ -143,12 +143,20 @@ TEST_F(TestGrowingStorageV2, LoadFieldData) {
auto column_groups = std::vector<std::vector<int>>{{2}, {0, 1}}; auto column_groups = std::vector<std::vector<int>>{{2}, {0, 1}};
auto writer_memory = 16 * 1024 * 1024; auto writer_memory = 16 * 1024 * 1024;
auto storage_config = milvus_storage::StorageConfig(); auto storage_config = milvus_storage::StorageConfig();
milvus_storage::PackedRecordBatchWriter writer( auto result = milvus_storage::PackedRecordBatchWriter::Make(
fs_, paths, schema_, storage_config, column_groups, writer_memory); fs_,
paths,
schema_,
storage_config,
column_groups,
writer_memory,
::parquet::default_writer_properties());
EXPECT_TRUE(result.ok());
auto writer = result.ValueOrDie();
for (int i = 0; i < batch_size; ++i) { for (int i = 0; i < batch_size; ++i) {
EXPECT_TRUE(writer.Write(record_batch_).ok()); EXPECT_TRUE(writer->Write(record_batch_).ok());
} }
EXPECT_TRUE(writer.Close().ok()); EXPECT_TRUE(writer->Close().ok());
auto schema = std::make_shared<milvus::Schema>(); auto schema = std::make_shared<milvus::Schema>();
auto ts_fid = schema->AddDebugField("ts", milvus::DataType::INT64, true); auto ts_fid = schema->AddDebugField("ts", milvus::DataType::INT64, true);
@ -187,20 +195,32 @@ TEST_F(TestGrowingStorageV2, LoadWithStrategy) {
auto column_groups = std::vector<std::vector<int>>{{2}, {0, 1}}; auto column_groups = std::vector<std::vector<int>>{{2}, {0, 1}};
auto writer_memory = 16 * 1024 * 1024; auto writer_memory = 16 * 1024 * 1024;
auto storage_config = milvus_storage::StorageConfig(); auto storage_config = milvus_storage::StorageConfig();
milvus_storage::PackedRecordBatchWriter writer( auto result = milvus_storage::PackedRecordBatchWriter::Make(
fs_, paths, schema_, storage_config, column_groups, writer_memory); fs_,
paths,
schema_,
storage_config,
column_groups,
writer_memory,
::parquet::default_writer_properties());
EXPECT_TRUE(result.ok());
auto writer = result.ValueOrDie();
for (int i = 0; i < batch_size; ++i) { for (int i = 0; i < batch_size; ++i) {
EXPECT_TRUE(writer.Write(record_batch_).ok()); EXPECT_TRUE(writer->Write(record_batch_).ok());
} }
EXPECT_TRUE(writer.Close().ok()); EXPECT_TRUE(writer->Close().ok());
auto channel = std::make_shared<milvus::ArrowReaderChannel>(); auto channel = std::make_shared<milvus::ArrowReaderChannel>();
int64_t memory_limit = 1024 * 1024 * 1024; // 1GB int64_t memory_limit = 1024 * 1024 * 1024; // 1GB
uint64_t parallel_degree = 2; uint64_t parallel_degree = 2;
// read all row groups // read all row groups
auto fr = std::make_shared<milvus_storage::FileRowGroupReader>( auto reader_result =
fs_, paths[0], schema_); milvus_storage::FileRowGroupReader::Make(fs_, paths[0]);
AssertInfo(reader_result.ok(),
"[StorageV2] Failed to create file row group reader: " +
reader_result.status().ToString());
auto fr = reader_result.ValueOrDie();
auto row_group_metadata = fr->file_metadata()->GetRowGroupMetadataVector(); auto row_group_metadata = fr->file_metadata()->GetRowGroupMetadataVector();
auto status = fr->Close(); auto status = fr->Close();
AssertInfo( AssertInfo(
@ -349,8 +369,16 @@ TEST_F(TestGrowingStorageV2, TestAllDataTypes) {
auto writer_memory = 16 * 1024 * 1024; auto writer_memory = 16 * 1024 * 1024;
auto storage_config = milvus_storage::StorageConfig(); auto storage_config = milvus_storage::StorageConfig();
auto arrow_schema = schema->ConvertToArrowSchema(); auto arrow_schema = schema->ConvertToArrowSchema();
milvus_storage::PackedRecordBatchWriter writer( auto result = milvus_storage::PackedRecordBatchWriter::Make(
fs_, paths, arrow_schema, storage_config, column_groups, writer_memory); fs_,
paths,
arrow_schema,
storage_config,
column_groups,
writer_memory,
::parquet::default_writer_properties());
EXPECT_TRUE(result.ok());
auto writer = result.ValueOrDie();
int64_t total_rows = 0; int64_t total_rows = 0;
for (int64_t i = 0; i < n_batch; i++) { for (int64_t i = 0; i < n_batch; i++) {
auto dataset = DataGen(schema, per_batch); auto dataset = DataGen(schema, per_batch);
@ -358,9 +386,9 @@ TEST_F(TestGrowingStorageV2, TestAllDataTypes) {
ConvertToArrowRecordBatch(dataset, dim, arrow_schema); ConvertToArrowRecordBatch(dataset, dim, arrow_schema);
total_rows += record_batch->num_rows(); total_rows += record_batch->num_rows();
EXPECT_TRUE(writer.Write(record_batch).ok()); EXPECT_TRUE(writer->Write(record_batch).ok());
} }
EXPECT_TRUE(writer.Close().ok()); EXPECT_TRUE(writer->Close().ok());
// Load data back from storage v2 // Load data back from storage v2
LoadFieldDataInfo load_info; LoadFieldDataInfo load_info;

View File

@ -184,55 +184,54 @@ LoadWithStrategy(const std::vector<std::string>& remote_files,
memory_limit / blocks.size(), FILE_SLICE_SIZE.load()); memory_limit / blocks.size(), FILE_SLICE_SIZE.load());
for (const auto& block : blocks) { for (const auto& block : blocks) {
futures.emplace_back(pool.Submit([block, futures.emplace_back(pool.Submit(
fs, [block, fs, file, file_idx, schema, reader_memory_limit]() {
file, AssertInfo(fs != nullptr,
file_idx, "[StorageV2] file system is nullptr");
schema, auto result = milvus_storage::FileRowGroupReader::Make(
reader_memory_limit]() {
AssertInfo(fs != nullptr,
"[StorageV2] file system is nullptr");
auto row_group_reader =
std::make_shared<milvus_storage::FileRowGroupReader>(
fs, fs,
file, file,
schema, schema,
reader_memory_limit, reader_memory_limit,
milvus::storage::GetReaderProperties()); milvus::storage::GetReaderProperties());
AssertInfo(row_group_reader != nullptr, AssertInfo(
"[StorageV2] row group reader is nullptr"); result.ok(),
row_group_reader->SetRowGroupOffsetAndCount(block.offset, "[StorageV2] Failed to create row group reader: " +
block.count); result.status().ToString());
LOG_INFO( auto row_group_reader = result.ValueOrDie();
"[StorageV2] read row groups from file {} with offset "
"{} and count "
"{}",
file,
block.offset,
block.count);
auto ret = std::make_shared<ArrowDataWrapper>();
for (int64_t i = 0; i < block.count; ++i) {
std::shared_ptr<arrow::Table> table;
auto status = auto status =
row_group_reader->ReadNextRowGroup(&table); row_group_reader->SetRowGroupOffsetAndCount(
block.offset, block.count);
AssertInfo(status.ok(), AssertInfo(status.ok(),
"[StorageV2] Failed to read row group " + "[StorageV2] Failed to set row group offset "
std::to_string(block.offset + i) + "and count " +
" from file " + file + " with error " + std::to_string(block.offset) + " and " +
status.ToString()); std::to_string(block.count) +
ret->arrow_tables.push_back( " with error " + status.ToString());
{file_idx, auto ret = std::make_shared<ArrowDataWrapper>();
static_cast<size_t>(block.offset + i), for (int64_t i = 0; i < block.count; ++i) {
table}); std::shared_ptr<arrow::Table> table;
} auto status =
auto close_status = row_group_reader->Close(); row_group_reader->ReadNextRowGroup(&table);
AssertInfo(close_status.ok(), AssertInfo(status.ok(),
"[StorageV2] Failed to close row group reader " "[StorageV2] Failed to read row group " +
"for file " + std::to_string(block.offset + i) +
file + " with error " + " from file " + file +
close_status.ToString()); " with error " + status.ToString());
return ret; ret->arrow_tables.push_back(
})); {file_idx,
static_cast<size_t>(block.offset + i),
table});
}
auto close_status = row_group_reader->Close();
AssertInfo(
close_status.ok(),
"[StorageV2] Failed to close row group reader "
"for file " +
file + " with error " +
close_status.ToString());
return ret;
}));
} }
for (auto& future : futures) { for (auto& future : futures) {

View File

@ -108,16 +108,21 @@ NewPackedWriterWithStorageConfig(struct ArrowSchema* schema,
} }
auto writer_properties = builder.build(); auto writer_properties = builder.build();
auto writer = std::make_unique<milvus_storage::PackedRecordBatchWriter>( auto result =
trueFs, milvus_storage::PackedRecordBatchWriter::Make(trueFs,
truePaths, truePaths,
trueSchema, trueSchema,
storage_config, storage_config,
columnGroups, columnGroups,
buffer_size, buffer_size,
writer_properties); writer_properties);
AssertInfo(writer, "[StorageV2] writer pointer is null"); AssertInfo(result.ok(),
*c_packed_writer = writer.release(); "[StorageV2] Failed to create packed writer: " +
result.status().ToString());
auto writer = result.ValueOrDie();
*c_packed_writer =
new std::shared_ptr<milvus_storage::PackedRecordBatchWriter>(
std::move(writer));
return milvus::SuccessCStatus(); return milvus::SuccessCStatus();
} catch (std::exception& e) { } catch (std::exception& e) {
return milvus::FailureCStatus(&e); return milvus::FailureCStatus(&e);
@ -177,16 +182,21 @@ NewPackedWriter(struct ArrowSchema* schema,
} }
auto writer_properties = builder.build(); auto writer_properties = builder.build();
auto writer = std::make_unique<milvus_storage::PackedRecordBatchWriter>( auto result =
trueFs, milvus_storage::PackedRecordBatchWriter::Make(trueFs,
truePaths, truePaths,
trueSchema, trueSchema,
conf, conf,
columnGroups, columnGroups,
buffer_size, buffer_size,
writer_properties); writer_properties);
AssertInfo(writer, "[StorageV2] writer pointer is null"); AssertInfo(result.ok(),
*c_packed_writer = writer.release(); "[StorageV2] Failed to create packed writer: " +
result.status().ToString());
auto writer = result.ValueOrDie();
*c_packed_writer =
new std::shared_ptr<milvus_storage::PackedRecordBatchWriter>(
std::move(writer));
return milvus::SuccessCStatus(); return milvus::SuccessCStatus();
} catch (std::exception& e) { } catch (std::exception& e) {
return milvus::FailureCStatus(&e); return milvus::FailureCStatus(&e);
@ -201,9 +211,9 @@ WriteRecordBatch(CPackedWriter c_packed_writer,
SCOPE_CGO_CALL_METRIC(); SCOPE_CGO_CALL_METRIC();
try { try {
auto packed_writer = auto packed_writer = *static_cast<
static_cast<milvus_storage::PackedRecordBatchWriter*>( std::shared_ptr<milvus_storage::PackedRecordBatchWriter>*>(
c_packed_writer); c_packed_writer);
auto import_schema = arrow::ImportSchema(schema); auto import_schema = arrow::ImportSchema(schema);
if (!import_schema.ok()) { if (!import_schema.ok()) {
@ -248,10 +258,10 @@ CloseWriter(CPackedWriter c_packed_writer) {
SCOPE_CGO_CALL_METRIC(); SCOPE_CGO_CALL_METRIC();
try { try {
auto packed_writer = auto packed_writer = static_cast<
static_cast<milvus_storage::PackedRecordBatchWriter*>( std::shared_ptr<milvus_storage::PackedRecordBatchWriter>*>(
c_packed_writer); c_packed_writer);
auto status = packed_writer->Close(); auto status = (*packed_writer)->Close();
delete packed_writer; delete packed_writer;
if (!status.ok()) { if (!status.ok()) {
return milvus::FailureCStatus(milvus::ErrorCode::FileWriteFailed, return milvus::FailureCStatus(milvus::ErrorCode::FileWriteFailed,

View File

@ -108,11 +108,16 @@ GroupChunkTranslator::GroupChunkTranslator(
// Get row group metadata from files // Get row group metadata from files
row_group_meta_list_.reserve(insert_files_.size()); row_group_meta_list_.reserve(insert_files_.size());
for (const auto& file : insert_files_) { for (const auto& file : insert_files_) {
auto reader = std::make_shared<milvus_storage::FileRowGroupReader>( auto result = milvus_storage::FileRowGroupReader::Make(
fs, fs,
file, file,
milvus_storage::DEFAULT_READ_BUFFER_SIZE, milvus_storage::DEFAULT_READ_BUFFER_SIZE,
storage::GetReaderProperties()); storage::GetReaderProperties());
AssertInfo(result.ok(),
"[StorageV2] Failed to create file row group reader: " +
result.status().ToString());
auto reader = result.ValueOrDie();
row_group_meta_list_.push_back( row_group_meta_list_.push_back(
reader->file_metadata()->GetRowGroupMetadataVector()); reader->file_metadata()->GetRowGroupMetadataVector());
auto status = reader->Close(); auto status = reader->Close();

View File

@ -59,12 +59,16 @@ class GroupChunkTranslatorTest : public ::testing::TestWithParam<bool> {
{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}}; {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}};
auto writer_memory = 16 * 1024 * 1024; auto writer_memory = 16 * 1024 * 1024;
auto storage_config = milvus_storage::StorageConfig(); auto storage_config = milvus_storage::StorageConfig();
milvus_storage::PackedRecordBatchWriter writer(fs_, auto result = milvus_storage::PackedRecordBatchWriter::Make(
paths_, fs_,
arrow_schema_, paths_,
storage_config, arrow_schema_,
column_groups, storage_config,
writer_memory); column_groups,
writer_memory,
::parquet::default_writer_properties());
EXPECT_TRUE(result.ok());
auto writer = result.ValueOrDie();
int64_t total_rows = 0; int64_t total_rows = 0;
for (int64_t i = 0; i < n_batch; i++) { for (int64_t i = 0; i < n_batch; i++) {
auto dataset = DataGen(schema_, per_batch); auto dataset = DataGen(schema_, per_batch);
@ -72,9 +76,9 @@ class GroupChunkTranslatorTest : public ::testing::TestWithParam<bool> {
ConvertToArrowRecordBatch(dataset, dim, arrow_schema_); ConvertToArrowRecordBatch(dataset, dim, arrow_schema_);
total_rows += record_batch->num_rows(); total_rows += record_batch->num_rows();
EXPECT_TRUE(writer.Write(record_batch).ok()); EXPECT_TRUE(writer->Write(record_batch).ok());
} }
EXPECT_TRUE(writer.Close().ok()); EXPECT_TRUE(writer->Close().ok());
} }
protected: protected:
@ -116,8 +120,12 @@ TEST_P(GroupChunkTranslatorTest, TestWithMmap) {
milvus::proto::common::LoadPriority::LOW); milvus::proto::common::LoadPriority::LOW);
// num cells - get the expected number from the file directly // num cells - get the expected number from the file directly
auto fr = auto reader_result =
std::make_shared<milvus_storage::FileRowGroupReader>(fs_, paths_[0]); milvus_storage::FileRowGroupReader::Make(fs_, paths_[0]);
AssertInfo(reader_result.ok(),
"[StorageV2] Failed to create file row group reader: " +
reader_result.status().ToString());
auto fr = reader_result.ValueOrDie();
auto expected_num_cells = auto expected_num_cells =
fr->file_metadata()->GetRowGroupMetadataVector().size(); fr->file_metadata()->GetRowGroupMetadataVector().size();
auto row_group_metadata_vector = auto row_group_metadata_vector =
@ -214,25 +222,33 @@ TEST_P(GroupChunkTranslatorTest, TestMultipleFiles) {
auto writer_memory = 16 * 1024 * 1024; auto writer_memory = 16 * 1024 * 1024;
auto storage_config = milvus_storage::StorageConfig(); auto storage_config = milvus_storage::StorageConfig();
std::vector<std::string> single_file_paths{file_path}; std::vector<std::string> single_file_paths{file_path};
milvus_storage::PackedRecordBatchWriter writer(fs_, auto result = milvus_storage::PackedRecordBatchWriter::Make(
single_file_paths, fs_,
arrow_schema_, single_file_paths,
storage_config, arrow_schema_,
column_groups, storage_config,
writer_memory); column_groups,
writer_memory,
::parquet::default_writer_properties());
EXPECT_TRUE(result.ok());
auto writer = result.ValueOrDie();
for (int64_t i = 0; i < n_batch; i++) { for (int64_t i = 0; i < n_batch; i++) {
auto dataset = DataGen(schema_, per_batch); auto dataset = DataGen(schema_, per_batch);
auto record_batch = auto record_batch =
ConvertToArrowRecordBatch(dataset, dim, arrow_schema_); ConvertToArrowRecordBatch(dataset, dim, arrow_schema_);
total_rows += record_batch->num_rows(); total_rows += record_batch->num_rows();
EXPECT_TRUE(writer.Write(record_batch).ok()); EXPECT_TRUE(writer->Write(record_batch).ok());
} }
EXPECT_TRUE(writer.Close().ok()); EXPECT_TRUE(writer->Close().ok());
// Get the number of row groups in this file // Get the number of row groups in this file
auto fr = std::make_shared<milvus_storage::FileRowGroupReader>( auto reader_result =
fs_, file_path); milvus_storage::FileRowGroupReader::Make(fs_, file_path);
AssertInfo(reader_result.ok(),
"[StorageV2] Failed to create file row group reader: " +
reader_result.status().ToString());
auto fr = reader_result.ValueOrDie();
expected_row_groups_per_file.push_back( expected_row_groups_per_file.push_back(
fr->file_metadata()->GetRowGroupMetadataVector().size()); fr->file_metadata()->GetRowGroupMetadataVector().size());
auto status = fr->Close(); auto status = fr->Close();
@ -303,8 +319,12 @@ TEST_P(GroupChunkTranslatorTest, TestMultipleFiles) {
auto usage = translator->estimated_byte_size_of_cell(i).first; auto usage = translator->estimated_byte_size_of_cell(i).first;
// Get the expected memory size from the corresponding file // Get the expected memory size from the corresponding file
auto fr = std::make_shared<milvus_storage::FileRowGroupReader>( auto reader_result = milvus_storage::FileRowGroupReader::Make(
fs_, multi_file_paths[file_idx]); fs_, multi_file_paths[file_idx]);
AssertInfo(reader_result.ok(),
"[StorageV2] Failed to create file row group reader: " +
reader_result.status().ToString());
auto fr = reader_result.ValueOrDie();
auto row_group_metadata_vector = auto row_group_metadata_vector =
fr->file_metadata()->GetRowGroupMetadataVector(); fr->file_metadata()->GetRowGroupMetadataVector();
auto expected_size = static_cast<int64_t>( auto expected_size = static_cast<int64_t>(

View File

@ -1307,11 +1307,15 @@ GetFieldDatasFromStorageV2(std::vector<std::vector<std::string>>& remote_files,
for (auto& column_group_file : remote_chunk_files) { for (auto& column_group_file : remote_chunk_files) {
// get all row groups for each file // get all row groups for each file
std::vector<std::vector<int64_t>> row_group_lists; std::vector<std::vector<int64_t>> row_group_lists;
auto reader = std::make_shared<milvus_storage::FileRowGroupReader>( auto result = milvus_storage::FileRowGroupReader::Make(
fs, fs,
column_group_file, column_group_file,
milvus_storage::DEFAULT_READ_BUFFER_SIZE, milvus_storage::DEFAULT_READ_BUFFER_SIZE,
GetReaderProperties()); GetReaderProperties());
AssertInfo(result.ok(),
"[StorageV2] Failed to create file row group reader: " +
result.status().ToString());
auto reader = result.ValueOrDie();
auto row_group_num = auto row_group_num =
reader->file_metadata()->GetRowGroupMetadataVector().size(); reader->file_metadata()->GetRowGroupMetadataVector().size();
@ -1515,12 +1519,16 @@ GetFieldIDList(FieldId column_group_id,
field_id_list.Add(column_group_id.get()); field_id_list.Add(column_group_id.get());
return field_id_list; return field_id_list;
} }
auto file_reader = std::make_shared<milvus_storage::FileRowGroupReader>( auto result = milvus_storage::FileRowGroupReader::Make(
fs, fs,
filepath, filepath,
arrow_schema, arrow_schema,
milvus_storage::DEFAULT_READ_BUFFER_SIZE, milvus_storage::DEFAULT_READ_BUFFER_SIZE,
GetReaderProperties()); GetReaderProperties());
AssertInfo(result.ok(),
"[StorageV2] Failed to create file row group reader: " +
result.status().ToString());
auto file_reader = result.ValueOrDie();
field_id_list = field_id_list =
file_reader->file_metadata()->GetGroupFieldIDList().GetFieldIDList( file_reader->file_metadata()->GetGroupFieldIDList().GetFieldIDList(
column_group_id.get()); column_group_id.get());

View File

@ -22,14 +22,14 @@
#include "monitor/scope_metric.h" #include "monitor/scope_metric.h"
ReaderHandle ReaderHandle
createFFIReader(char* manifest, createFFIReader(ColumnGroupsHandle column_groups_handle,
struct ArrowSchema* schema, struct ArrowSchema* schema,
char** needed_columns, char** needed_columns,
int64_t needed_columns_size, int64_t needed_columns_size,
const std::shared_ptr<Properties>& properties) { const std::shared_ptr<Properties>& properties) {
ReaderHandle reader_handler = 0; ReaderHandle reader_handler = 0;
FFIResult result = reader_new(manifest, FFIResult result = reader_new(column_groups_handle,
schema, schema,
needed_columns, needed_columns,
needed_columns_size, needed_columns_size,
@ -97,7 +97,7 @@ NewPackedFFIReader(const char* manifest_path,
} }
CStatus CStatus
NewPackedFFIReaderWithManifest(const char* manifest_content, NewPackedFFIReaderWithManifest(const ColumnGroupsHandle column_groups_handle,
struct ArrowSchema* schema, struct ArrowSchema* schema,
char** needed_columns, char** needed_columns,
int64_t needed_columns_size, int64_t needed_columns_size,
@ -109,12 +109,10 @@ NewPackedFFIReaderWithManifest(const char* manifest_content,
try { try {
auto properties = auto properties =
MakeInternalPropertiesFromStorageConfig(c_storage_config); MakeInternalPropertiesFromStorageConfig(c_storage_config);
// Parse the column groups, the column groups is a JSON string auto* cg_ptr = reinterpret_cast<
auto cpp_column_groups = std::shared_ptr<milvus_storage::api::ColumnGroups>*>(
std::make_shared<milvus_storage::api::ColumnGroups>(); column_groups_handle);
auto des_result = auto cpp_column_groups = *cg_ptr;
cpp_column_groups->deserialize(std::string_view(manifest_content));
AssertInfo(des_result.ok(), "failed to deserialize column groups");
auto reader = GetLoonReader(cpp_column_groups, auto reader = GetLoonReader(cpp_column_groups,
schema, schema,

View File

@ -105,7 +105,7 @@ NewPackedFFIReader(const char* manifest_path,
* be freed after this call returns. * be freed after this call returns.
*/ */
CStatus CStatus
NewPackedFFIReaderWithManifest(const char* manifest_content, NewPackedFFIReaderWithManifest(const ColumnGroupsHandle column_groups_handle,
struct ArrowSchema* schema, struct ArrowSchema* schema,
char** needed_columns, char** needed_columns,
int64_t needed_columns_size, int64_t needed_columns_size,

View File

@ -252,12 +252,10 @@ GetManifest(const std::string& path,
// Parse the JSON string // Parse the JSON string
json j = json::parse(path); json j = json::parse(path);
// Extract base_path and ver fields // Extract base_path
std::string base_path = j.at("base_path").get<std::string>(); std::string base_path = j.at("base_path").get<std::string>();
int64_t ver = j.at("ver").get<int64_t>();
// return std::make_pair(base_path, ver); ColumnGroupsHandle out_column_groups = 0;
char* out_column_groups = nullptr;
int64_t out_read_version = 0; int64_t out_read_version = 0;
FFIResult result = get_latest_column_groups(base_path.c_str(), FFIResult result = get_latest_column_groups(base_path.c_str(),
properties.get(), properties.get(),
@ -293,9 +291,8 @@ GetColumnGroups(
// Parse the JSON string // Parse the JSON string
json j = json::parse(path); json j = json::parse(path);
// Extract base_path and ver fields // Extract base_path
std::string base_path = j.at("base_path").get<std::string>(); std::string base_path = j.at("base_path").get<std::string>();
int64_t ver = j.at("ver").get<int64_t>();
// TODO fetch manifest based on version after api supported // TODO fetch manifest based on version after api supported
auto transaction = auto transaction =

View File

@ -14,7 +14,7 @@
# Update milvus-storage_VERSION for the first occurrence # Update milvus-storage_VERSION for the first occurrence
milvus_add_pkg_config("milvus-storage") milvus_add_pkg_config("milvus-storage")
set_property(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} PROPERTY INCLUDE_DIRECTORIES "") set_property(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} PROPERTY INCLUDE_DIRECTORIES "")
set( milvus-storage_VERSION ba7df7b) set( milvus-storage_VERSION 5fff4f5)
set( GIT_REPOSITORY "https://github.com/milvus-io/milvus-storage.git") set( GIT_REPOSITORY "https://github.com/milvus-io/milvus-storage.git")
message(STATUS "milvus-storage repo: ${GIT_REPOSITORY}") message(STATUS "milvus-storage repo: ${GIT_REPOSITORY}")
message(STATUS "milvus-storage version: ${milvus-storage_VERSION}") message(STATUS "milvus-storage version: ${milvus-storage_VERSION}")

View File

@ -75,8 +75,16 @@ TEST_F(StorageV2IndexRawDataTest, TestGetRawData) {
auto writer_memory = 16 * 1024 * 1024; auto writer_memory = 16 * 1024 * 1024;
auto storage_config = milvus_storage::StorageConfig(); auto storage_config = milvus_storage::StorageConfig();
auto arrow_schema = schema->ConvertToArrowSchema(); auto arrow_schema = schema->ConvertToArrowSchema();
milvus_storage::PackedRecordBatchWriter writer( auto result = milvus_storage::PackedRecordBatchWriter::Make(
fs_, paths, arrow_schema, storage_config, column_groups, writer_memory); fs_,
paths,
arrow_schema,
storage_config,
column_groups,
writer_memory,
::parquet::default_writer_properties());
EXPECT_TRUE(result.ok());
auto writer = result.ValueOrDie();
int64_t total_rows = 0; int64_t total_rows = 0;
for (int64_t i = 0; i < n_batch; i++) { for (int64_t i = 0; i < n_batch; i++) {
auto dataset = DataGen(schema, per_batch); auto dataset = DataGen(schema, per_batch);
@ -84,9 +92,9 @@ TEST_F(StorageV2IndexRawDataTest, TestGetRawData) {
ConvertToArrowRecordBatch(dataset, dim, arrow_schema); ConvertToArrowRecordBatch(dataset, dim, arrow_schema);
total_rows += record_batch->num_rows(); total_rows += record_batch->num_rows();
EXPECT_TRUE(writer.Write(record_batch).ok()); EXPECT_TRUE(writer->Write(record_batch).ok());
} }
EXPECT_TRUE(writer.Close().ok()); EXPECT_TRUE(writer->Close().ok());
{ {
// test memory file manager // test memory file manager

View File

@ -56,7 +56,10 @@ func (t *mixCompactionTask) GetTaskSlot() int64 {
if t.GetTaskProto().GetType() == datapb.CompactionType_SortCompaction { if t.GetTaskProto().GetType() == datapb.CompactionType_SortCompaction {
segment := t.meta.GetHealthySegment(context.Background(), t.GetTaskProto().GetInputSegments()[0]) segment := t.meta.GetHealthySegment(context.Background(), t.GetTaskProto().GetInputSegments()[0])
if segment != nil { if segment != nil {
slotUsage = calculateStatsTaskSlot(segment.getSegmentSize()) segSize := segment.getSegmentSize()
slotUsage = calculateStatsTaskSlot(segSize)
log.Info("mixCompactionTask get task slot",
zap.Int64("segment size", segSize), zap.Int64("task slot", slotUsage))
} }
} }
t.slotUsage.Store(slotUsage) t.slotUsage.Store(slotUsage)

View File

@ -179,7 +179,8 @@ func (i *indexInspector) createIndexForSegment(ctx context.Context, segment *Seg
indexParams := i.meta.indexMeta.GetIndexParams(segment.CollectionID, indexID) indexParams := i.meta.indexMeta.GetIndexParams(segment.CollectionID, indexID)
indexType := GetIndexType(indexParams) indexType := GetIndexType(indexParams)
isVectorIndex := vecindexmgr.GetVecIndexMgrInstance().IsVecIndex(indexType) isVectorIndex := vecindexmgr.GetVecIndexMgrInstance().IsVecIndex(indexType)
taskSlot := calculateIndexTaskSlot(segment.getSegmentSize(), isVectorIndex) segSize := segment.getSegmentSize()
taskSlot := calculateIndexTaskSlot(segSize, isVectorIndex)
// rewrite the index type if needed, and this final index type will be persisted in the meta // rewrite the index type if needed, and this final index type will be persisted in the meta
if isVectorIndex && Params.KnowhereConfig.Enable.GetAsBool() { if isVectorIndex && Params.KnowhereConfig.Enable.GetAsBool() {
@ -215,6 +216,11 @@ func (i *indexInspector) createIndexForSegment(ctx context.Context, segment *Seg
i.handler, i.handler,
i.storageCli, i.storageCli,
i.indexEngineVersionManager)) i.indexEngineVersionManager))
log.Info("indexInspector create index for segment success",
zap.Int64("segmentID", segment.ID),
zap.Int64("indexID", indexID),
zap.Int64("segment size", segSize),
zap.Int64("task slot", taskSlot))
return nil return nil
} }

View File

@ -1749,7 +1749,9 @@ func (m *meta) completeMixCompactionMutation(
zap.String("type", t.GetType().String()), zap.String("type", t.GetType().String()),
zap.Int64("collectionID", t.CollectionID), zap.Int64("collectionID", t.CollectionID),
zap.Int64("partitionID", t.PartitionID), zap.Int64("partitionID", t.PartitionID),
zap.String("channel", t.GetChannel())) zap.String("channel", t.GetChannel()),
zap.Int64("planID", t.GetPlanID()),
)
metricMutation := &segMetricMutation{stateChange: make(map[string]map[string]map[string]int)} metricMutation := &segMetricMutation{stateChange: make(map[string]map[string]map[string]int)}
var compactFromSegIDs []int64 var compactFromSegIDs []int64
@ -1779,6 +1781,12 @@ func (m *meta) completeMixCompactionMutation(
// metrics mutation for compaction from segments // metrics mutation for compaction from segments
updateSegStateAndPrepareMetrics(cloned, commonpb.SegmentState_Dropped, metricMutation) updateSegStateAndPrepareMetrics(cloned, commonpb.SegmentState_Dropped, metricMutation)
log.Info("compact from segment",
zap.Int64("segmentID", cloned.GetID()),
zap.Int64("segment size", cloned.getSegmentSize()),
zap.Int64("num rows", cloned.GetNumOfRows()),
)
} }
log = log.With(zap.Int64s("compactFrom", compactFromSegIDs)) log = log.With(zap.Int64s("compactFrom", compactFromSegIDs))
@ -1828,6 +1836,7 @@ func (m *meta) completeMixCompactionMutation(
zap.Int("binlog count", len(compactToSegmentInfo.GetBinlogs())), zap.Int("binlog count", len(compactToSegmentInfo.GetBinlogs())),
zap.Int("statslog count", len(compactToSegmentInfo.GetStatslogs())), zap.Int("statslog count", len(compactToSegmentInfo.GetStatslogs())),
zap.Int("deltalog count", len(compactToSegmentInfo.GetDeltalogs())), zap.Int("deltalog count", len(compactToSegmentInfo.GetDeltalogs())),
zap.Int64("segment size", compactToSegmentInfo.getSegmentSize()),
) )
compactToSegments = append(compactToSegments, compactToSegmentInfo) compactToSegments = append(compactToSegments, compactToSegmentInfo)
} }
@ -2322,7 +2331,9 @@ func (m *meta) completeSortCompactionMutation(
log = log.With(zap.Int64s("compactFrom", []int64{oldSegment.GetID()}), zap.Int64("compactTo", segment.GetID())) log = log.With(zap.Int64s("compactFrom", []int64{oldSegment.GetID()}), zap.Int64("compactTo", segment.GetID()))
log.Info("meta update: prepare for complete stats mutation - complete", zap.Int64("num rows", segment.GetNumOfRows())) log.Info("meta update: prepare for complete stats mutation - complete",
zap.Int64("num rows", segment.GetNumOfRows()),
zap.Int64("segment size", segment.getSegmentSize()))
if err := m.catalog.AlterSegments(m.ctx, []*datapb.SegmentInfo{cloned.SegmentInfo, segment.SegmentInfo}, metastore.BinlogsIncrement{Segment: segment.SegmentInfo}); err != nil { if err := m.catalog.AlterSegments(m.ctx, []*datapb.SegmentInfo{cloned.SegmentInfo, segment.SegmentInfo}, metastore.BinlogsIncrement{Segment: segment.SegmentInfo}); err != nil {
log.Warn("fail to alter segments and new segment", zap.Error(err)) log.Warn("fail to alter segments and new segment", zap.Error(err))
return nil, nil, err return nil, nil, err

View File

@ -240,13 +240,7 @@ func NewManifestReader(manifest string,
} }
func (mr *ManifestReader) init() error { func (mr *ManifestReader) init() error {
// TODO add needed column option reader, err := packed.NewFFIPackedReader(mr.manifest, mr.arrowSchema, mr.neededColumns, mr.bufferSize, mr.storageConfig, mr.storagePluginContext)
manifest, err := packed.GetManifest(mr.manifest, mr.storageConfig)
if err != nil {
return err
}
reader, err := packed.NewFFIPackedReader(manifest, mr.arrowSchema, mr.neededColumns, mr.bufferSize, mr.storageConfig, mr.storagePluginContext)
if err != nil { if err != nil {
return err return err
} }

View File

@ -31,6 +31,7 @@ import (
"github.com/apache/arrow/go/v17/arrow" "github.com/apache/arrow/go/v17/arrow"
"github.com/apache/arrow/go/v17/arrow/cdata" "github.com/apache/arrow/go/v17/arrow/cdata"
"github.com/cockroachdb/errors"
"go.uber.org/zap" "go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/log"
@ -38,9 +39,11 @@ import (
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb" "github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
) )
func NewFFIPackedReader(manifest string, schema *arrow.Schema, neededColumns []string, bufferSize int64, storageConfig *indexpb.StorageConfig, storagePluginContext *indexcgopb.StoragePluginContext) (*FFIPackedReader, error) { func NewFFIPackedReader(manifestPath string, schema *arrow.Schema, neededColumns []string, bufferSize int64, storageConfig *indexpb.StorageConfig, storagePluginContext *indexcgopb.StoragePluginContext) (*FFIPackedReader, error) {
cManifest := C.CString(manifest) cColumnGroups, err := GetColumnGroups(manifestPath, storageConfig)
defer C.free(unsafe.Pointer(cManifest)) if err != nil {
return nil, errors.Wrap(err, "failed to get manifest")
}
var cas cdata.CArrowSchema var cas cdata.CArrowSchema
cdata.ExportArrowSchema(schema, &cas) cdata.ExportArrowSchema(schema, &cas)
@ -102,7 +105,7 @@ func NewFFIPackedReader(manifest string, schema *arrow.Schema, neededColumns []s
cNeededColumnArray := (**C.char)(unsafe.Pointer(&cNeededColumn[0])) cNeededColumnArray := (**C.char)(unsafe.Pointer(&cNeededColumn[0]))
cNumColumns := C.int64_t(len(neededColumns)) cNumColumns := C.int64_t(len(neededColumns))
status = C.NewPackedFFIReaderWithManifest(cManifest, cSchema, cNeededColumnArray, cNumColumns, &cPackedReader, cStorageConfig, pluginContextPtr) status = C.NewPackedFFIReaderWithManifest(cColumnGroups, cSchema, cNeededColumnArray, cNumColumns, &cPackedReader, cStorageConfig, pluginContextPtr)
} else { } else {
return nil, fmt.Errorf("storageConfig is required") return nil, fmt.Errorf("storageConfig is required")
} }
@ -183,30 +186,29 @@ func (r *FFIPackedReader) Release() {
r.Close() r.Close()
} }
func GetManifest(manifestPath string, storageConfig *indexpb.StorageConfig) (manifest string, err error) { func GetColumnGroups(manifestPath string, storageConfig *indexpb.StorageConfig) (columnGroups C.ColumnGroupsHandle, err error) {
var cColumnGroups C.ColumnGroupsHandle
basePath, version, err := UnmarshalManfestPath(manifestPath) basePath, version, err := UnmarshalManfestPath(manifestPath)
if err != nil { if err != nil {
return "", err return cColumnGroups, err
} }
log.Info("GetManifest", zap.String("manifestPath", manifestPath), zap.String("basePath", basePath), zap.Int64("version", version)) log.Info("GetManifest", zap.String("manifestPath", manifestPath), zap.String("basePath", basePath), zap.Int64("version", version))
cProperties, err := MakePropertiesFromStorageConfig(storageConfig, nil) cProperties, err := MakePropertiesFromStorageConfig(storageConfig, nil)
if err != nil { if err != nil {
return "", err return cColumnGroups, err
} }
cBasePath := C.CString(basePath) cBasePath := C.CString(basePath)
defer C.free(unsafe.Pointer(cBasePath)) defer C.free(unsafe.Pointer(cBasePath))
var cManifest *C.char
var cVersion C.int64_t var cVersion C.int64_t
result := C.get_latest_column_groups(cBasePath, cProperties, &cManifest, &cVersion) result := C.get_latest_column_groups(cBasePath, cProperties, &cColumnGroups, &cVersion)
err = HandleFFIResult(result) err = HandleFFIResult(result)
if err != nil { if err != nil {
return "", err return cColumnGroups, err
} }
manifest = C.GoString(cManifest) return cColumnGroups, nil
return manifest, nil
} }
// Ensure FFIPackedReader implements array.RecordReader interface // Ensure FFIPackedReader implements array.RecordReader interface

View File

@ -133,9 +133,9 @@ func (pw *FFIPackedWriter) WriteRecordBatch(recordBatch arrow.Record) error {
} }
func (pw *FFIPackedWriter) Close() (string, error) { func (pw *FFIPackedWriter) Close() (string, error) {
var manifest *C.char var cColumnGroups C.ColumnGroupsHandle
result := C.writer_close(pw.cWriterHandle, nil, nil, 0, &manifest) result := C.writer_close(pw.cWriterHandle, nil, nil, 0, &cColumnGroups)
if err := HandleFFIResult(result); err != nil { if err := HandleFFIResult(result); err != nil {
return "", err return "", err
} }
@ -143,7 +143,10 @@ func (pw *FFIPackedWriter) Close() (string, error) {
cBasePath := C.CString(pw.basePath) cBasePath := C.CString(pw.basePath)
defer C.free(unsafe.Pointer(cBasePath)) defer C.free(unsafe.Pointer(cBasePath))
var transationHandle C.TransactionHandle var transationHandle C.TransactionHandle
result = C.transaction_begin(cBasePath, pw.cProperties, &transationHandle)
// TODO pass version
// use -1 as latest
result = C.transaction_begin(cBasePath, pw.cProperties, &transationHandle, C.int64_t(-1))
if err := HandleFFIResult(result); err != nil { if err := HandleFFIResult(result); err != nil {
return "", err return "", err
} }
@ -157,23 +160,14 @@ func (pw *FFIPackedWriter) Close() (string, error) {
// #define LOON_TRANSACTION_RESOLVE_MERGE 1 // #define LOON_TRANSACTION_RESOLVE_MERGE 1
// #define LOON_TRANSACTION_RESOLVE_MAX 2 // #define LOON_TRANSACTION_RESOLVE_MAX 2
var commitResult C.bool var commitResult C.TransactionCommitResult
result = C.transaction_commit(transationHandle, C.int16_t(0), C.int16_t(0), manifest, &commitResult) result = C.transaction_commit(transationHandle, C.int16_t(0), C.int16_t(0), cColumnGroups, &commitResult)
if err := HandleFFIResult(result); err != nil { if err := HandleFFIResult(result); err != nil {
return "", err return "", err
} }
var readVersion C.int64_t log.Info("FFI writer closed", zap.Int64("version", int64(commitResult.committed_version)))
// TODO: not atomic, need to get version from transaction
var cOutManifest *C.char
result = C.get_latest_column_groups(cBasePath, pw.cProperties, &cOutManifest, &readVersion)
if err := HandleFFIResult(result); err != nil {
return "", err
}
outManifest := C.GoString(cOutManifest)
log.Info("FFI writer closed with output manifest", zap.String("manifest", outManifest), zap.Int64("version", int64(readVersion)))
defer C.properties_free(pw.cProperties) defer C.properties_free(pw.cProperties)
return MarshalManifestPath(pw.basePath, int64(readVersion)), nil return MarshalManifestPath(pw.basePath, int64(commitResult.committed_version)), nil
} }

View File

@ -212,6 +212,9 @@ make rebuild_cache >/dev/null 2>&1
CPU_ARCH=$(get_cpu_arch $CPU_TARGET) CPU_ARCH=$(get_cpu_arch $CPU_TARGET)
# In case any 3rdparty (e.g. libavrocpp) requires a minimum version of CMake lower than 3.5
export CMAKE_POLICY_VERSION_MINIMUM=3.5
arch=$(uname -m) arch=$(uname -m)
CMAKE_CMD="cmake \ CMAKE_CMD="cmake \
${CMAKE_EXTRA_ARGS} \ ${CMAKE_EXTRA_ARGS} \