mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 09:08:43 +08:00
Compare commits
2 Commits
6ce05b7d45
...
8471565c38
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8471565c38 | ||
|
|
e42bf36ad0 |
@ -50,6 +50,7 @@ class MilvusConan(ConanFile):
|
||||
"unordered_dense/4.4.0#6a855c992618cc4c63019109a2e47298",
|
||||
"mongo-cxx-driver/3.11.0#ae206de0e90fb8cb2fb95465fb8b2f01",
|
||||
"geos/3.12.0#0b177c90c25a8ca210578fb9e2899c37",
|
||||
"libavrocpp/1.12.1@milvus/dev",
|
||||
)
|
||||
generators = ("cmake", "cmake_find_package")
|
||||
default_options = {
|
||||
|
||||
@ -106,13 +106,16 @@ class TestVectorArrayStorageV2 : public testing::Test {
|
||||
auto storage_config = milvus_storage::StorageConfig();
|
||||
|
||||
// Create writer
|
||||
milvus_storage::PackedRecordBatchWriter writer(
|
||||
auto result = milvus_storage::PackedRecordBatchWriter::Make(
|
||||
fs,
|
||||
paths,
|
||||
schema_->ConvertToArrowSchema(),
|
||||
storage_config,
|
||||
column_groups,
|
||||
writer_memory);
|
||||
writer_memory,
|
||||
::parquet::default_writer_properties());
|
||||
EXPECT_TRUE(result.ok());
|
||||
auto writer = result.ValueOrDie();
|
||||
|
||||
// Generate and write data
|
||||
int64_t row_count = 0;
|
||||
@ -201,9 +204,9 @@ class TestVectorArrayStorageV2 : public testing::Test {
|
||||
auto record_batch = arrow::RecordBatch::Make(
|
||||
schema_->ConvertToArrowSchema(), test_data_count_, arrays);
|
||||
row_count += test_data_count_;
|
||||
EXPECT_TRUE(writer.Write(record_batch).ok());
|
||||
EXPECT_TRUE(writer->Write(record_batch).ok());
|
||||
}
|
||||
EXPECT_TRUE(writer.Close().ok());
|
||||
EXPECT_TRUE(writer->Close().ok());
|
||||
|
||||
LoadFieldDataInfo load_info;
|
||||
load_info.field_infos.emplace(
|
||||
|
||||
@ -716,8 +716,11 @@ JsonKeyStats::GetColumnSchemaFromParquet(int64_t column_group_id,
|
||||
const std::string& file) {
|
||||
auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance()
|
||||
.GetArrowFileSystem();
|
||||
auto file_reader =
|
||||
std::make_shared<milvus_storage::FileRowGroupReader>(fs, file);
|
||||
auto result = milvus_storage::FileRowGroupReader::Make(fs, file);
|
||||
AssertInfo(result.ok(),
|
||||
"[StorageV2] Failed to create file row group reader: " +
|
||||
result.status().ToString());
|
||||
auto file_reader = result.ValueOrDie();
|
||||
std::shared_ptr<arrow::Schema> file_schema = file_reader->schema();
|
||||
LOG_DEBUG("get column schema: [{}] for segment {}",
|
||||
file_schema->ToString(true),
|
||||
@ -778,9 +781,11 @@ JsonKeyStats::GetCommonMetaFromParquet(const std::string& file) {
|
||||
|
||||
auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance()
|
||||
.GetArrowFileSystem();
|
||||
auto file_reader =
|
||||
std::make_shared<milvus_storage::FileRowGroupReader>(fs, file);
|
||||
|
||||
auto result = milvus_storage::FileRowGroupReader::Make(fs, file);
|
||||
AssertInfo(result.ok(),
|
||||
"[StorageV2] Failed to create file row group reader: " +
|
||||
result.status().ToString());
|
||||
auto file_reader = result.ValueOrDie();
|
||||
// get key value metadata from parquet file
|
||||
std::shared_ptr<milvus_storage::PackedFileMetadata> metadata =
|
||||
file_reader->file_metadata();
|
||||
@ -874,8 +879,11 @@ JsonKeyStats::LoadColumnGroup(int64_t column_group_id,
|
||||
|
||||
auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance()
|
||||
.GetArrowFileSystem();
|
||||
auto file_reader =
|
||||
std::make_shared<milvus_storage::FileRowGroupReader>(fs, files[0]);
|
||||
auto result = milvus_storage::FileRowGroupReader::Make(fs, files[0]);
|
||||
AssertInfo(result.ok(),
|
||||
"[StorageV2] Failed to create file row group reader: " +
|
||||
result.status().ToString());
|
||||
auto file_reader = result.ValueOrDie();
|
||||
std::shared_ptr<milvus_storage::PackedFileMetadata> metadata =
|
||||
file_reader->file_metadata();
|
||||
milvus_storage::FieldIDList field_id_list =
|
||||
@ -886,8 +894,11 @@ JsonKeyStats::LoadColumnGroup(int64_t column_group_id,
|
||||
}
|
||||
|
||||
for (const auto& file : files) {
|
||||
auto reader =
|
||||
std::make_shared<milvus_storage::FileRowGroupReader>(fs, file);
|
||||
auto result = milvus_storage::FileRowGroupReader::Make(fs, file);
|
||||
AssertInfo(result.ok(),
|
||||
"[StorageV2] Failed to create file row group reader: " +
|
||||
result.status().ToString());
|
||||
auto reader = result.ValueOrDie();
|
||||
auto row_group_meta_vector =
|
||||
reader->file_metadata()->GetRowGroupMetadataVector();
|
||||
num_rows += row_group_meta_vector.row_num();
|
||||
|
||||
@ -108,16 +108,19 @@ JsonStatsParquetWriter::Init(const ParquetWriteContext& context) {
|
||||
schema_ = context.schema;
|
||||
builders_ = context.builders;
|
||||
builders_map_ = context.builders_map;
|
||||
kv_metadata_ = std::move(context.kv_metadata);
|
||||
kv_metadata_ = context.kv_metadata;
|
||||
column_groups_ = context.column_groups;
|
||||
file_paths_ = context.file_paths;
|
||||
packed_writer_ = std::make_unique<milvus_storage::PackedRecordBatchWriter>(
|
||||
fs_,
|
||||
auto result = milvus_storage::PackedRecordBatchWriter::Make(fs_,
|
||||
file_paths_,
|
||||
schema_,
|
||||
storage_config_,
|
||||
column_groups_,
|
||||
buffer_size_);
|
||||
AssertInfo(result.ok(),
|
||||
"[StorageV2] Failed to create packed writer: " +
|
||||
result.status().ToString());
|
||||
packed_writer_ = result.ValueOrDie();
|
||||
for (const auto& [key, value] : kv_metadata_) {
|
||||
packed_writer_->AddUserMetadata(key, value);
|
||||
}
|
||||
|
||||
@ -232,7 +232,7 @@ class JsonStatsParquetWriter {
|
||||
size_t batch_size_;
|
||||
std::shared_ptr<arrow::fs::FileSystem> fs_;
|
||||
milvus_storage::StorageConfig storage_config_;
|
||||
std::unique_ptr<milvus_storage::PackedRecordBatchWriter> packed_writer_;
|
||||
std::shared_ptr<milvus_storage::PackedRecordBatchWriter> packed_writer_;
|
||||
std::vector<std::pair<std::string, std::string>> kv_metadata_;
|
||||
|
||||
// cache for builders
|
||||
|
||||
@ -97,13 +97,16 @@ class TestChunkSegmentStorageV2 : public testing::TestWithParam<bool> {
|
||||
auto storage_config = milvus_storage::StorageConfig();
|
||||
|
||||
// Create writer
|
||||
milvus_storage::PackedRecordBatchWriter writer(
|
||||
auto result = milvus_storage::PackedRecordBatchWriter::Make(
|
||||
fs,
|
||||
paths,
|
||||
schema->ConvertToArrowSchema(),
|
||||
storage_config,
|
||||
column_groups,
|
||||
writer_memory);
|
||||
writer_memory,
|
||||
::parquet::default_writer_properties());
|
||||
EXPECT_TRUE(result.ok());
|
||||
auto writer = result.ValueOrDie();
|
||||
|
||||
// Generate and write data
|
||||
int64_t row_count = 0;
|
||||
@ -159,9 +162,9 @@ class TestChunkSegmentStorageV2 : public testing::TestWithParam<bool> {
|
||||
auto record_batch = arrow::RecordBatch::Make(
|
||||
schema->ConvertToArrowSchema(), test_data_count, arrays);
|
||||
row_count += test_data_count;
|
||||
EXPECT_TRUE(writer.Write(record_batch).ok());
|
||||
EXPECT_TRUE(writer->Write(record_batch).ok());
|
||||
}
|
||||
EXPECT_TRUE(writer.Close().ok());
|
||||
EXPECT_TRUE(writer->Close().ok());
|
||||
|
||||
LoadFieldDataInfo load_info;
|
||||
load_info.field_infos.emplace(
|
||||
|
||||
@ -440,11 +440,15 @@ SegmentGrowingImpl::load_column_group_data_internal(
|
||||
std::vector<std::vector<int64_t>> row_group_lists;
|
||||
row_group_lists.reserve(insert_files.size());
|
||||
for (const auto& file : insert_files) {
|
||||
auto reader = std::make_shared<milvus_storage::FileRowGroupReader>(
|
||||
auto result = milvus_storage::FileRowGroupReader::Make(
|
||||
fs,
|
||||
file,
|
||||
milvus_storage::DEFAULT_READ_BUFFER_SIZE,
|
||||
storage::GetReaderProperties());
|
||||
AssertInfo(result.ok(),
|
||||
"[StorageV2] Failed to create file row group reader: " +
|
||||
result.status().ToString());
|
||||
auto reader = result.ValueOrDie();
|
||||
auto row_group_num =
|
||||
reader->file_metadata()->GetRowGroupMetadataVector().size();
|
||||
std::vector<int64_t> all_row_groups(row_group_num);
|
||||
|
||||
@ -143,12 +143,20 @@ TEST_F(TestGrowingStorageV2, LoadFieldData) {
|
||||
auto column_groups = std::vector<std::vector<int>>{{2}, {0, 1}};
|
||||
auto writer_memory = 16 * 1024 * 1024;
|
||||
auto storage_config = milvus_storage::StorageConfig();
|
||||
milvus_storage::PackedRecordBatchWriter writer(
|
||||
fs_, paths, schema_, storage_config, column_groups, writer_memory);
|
||||
auto result = milvus_storage::PackedRecordBatchWriter::Make(
|
||||
fs_,
|
||||
paths,
|
||||
schema_,
|
||||
storage_config,
|
||||
column_groups,
|
||||
writer_memory,
|
||||
::parquet::default_writer_properties());
|
||||
EXPECT_TRUE(result.ok());
|
||||
auto writer = result.ValueOrDie();
|
||||
for (int i = 0; i < batch_size; ++i) {
|
||||
EXPECT_TRUE(writer.Write(record_batch_).ok());
|
||||
EXPECT_TRUE(writer->Write(record_batch_).ok());
|
||||
}
|
||||
EXPECT_TRUE(writer.Close().ok());
|
||||
EXPECT_TRUE(writer->Close().ok());
|
||||
|
||||
auto schema = std::make_shared<milvus::Schema>();
|
||||
auto ts_fid = schema->AddDebugField("ts", milvus::DataType::INT64, true);
|
||||
@ -187,20 +195,32 @@ TEST_F(TestGrowingStorageV2, LoadWithStrategy) {
|
||||
auto column_groups = std::vector<std::vector<int>>{{2}, {0, 1}};
|
||||
auto writer_memory = 16 * 1024 * 1024;
|
||||
auto storage_config = milvus_storage::StorageConfig();
|
||||
milvus_storage::PackedRecordBatchWriter writer(
|
||||
fs_, paths, schema_, storage_config, column_groups, writer_memory);
|
||||
auto result = milvus_storage::PackedRecordBatchWriter::Make(
|
||||
fs_,
|
||||
paths,
|
||||
schema_,
|
||||
storage_config,
|
||||
column_groups,
|
||||
writer_memory,
|
||||
::parquet::default_writer_properties());
|
||||
EXPECT_TRUE(result.ok());
|
||||
auto writer = result.ValueOrDie();
|
||||
for (int i = 0; i < batch_size; ++i) {
|
||||
EXPECT_TRUE(writer.Write(record_batch_).ok());
|
||||
EXPECT_TRUE(writer->Write(record_batch_).ok());
|
||||
}
|
||||
EXPECT_TRUE(writer.Close().ok());
|
||||
EXPECT_TRUE(writer->Close().ok());
|
||||
|
||||
auto channel = std::make_shared<milvus::ArrowReaderChannel>();
|
||||
int64_t memory_limit = 1024 * 1024 * 1024; // 1GB
|
||||
uint64_t parallel_degree = 2;
|
||||
|
||||
// read all row groups
|
||||
auto fr = std::make_shared<milvus_storage::FileRowGroupReader>(
|
||||
fs_, paths[0], schema_);
|
||||
auto reader_result =
|
||||
milvus_storage::FileRowGroupReader::Make(fs_, paths[0]);
|
||||
AssertInfo(reader_result.ok(),
|
||||
"[StorageV2] Failed to create file row group reader: " +
|
||||
reader_result.status().ToString());
|
||||
auto fr = reader_result.ValueOrDie();
|
||||
auto row_group_metadata = fr->file_metadata()->GetRowGroupMetadataVector();
|
||||
auto status = fr->Close();
|
||||
AssertInfo(
|
||||
@ -349,8 +369,16 @@ TEST_F(TestGrowingStorageV2, TestAllDataTypes) {
|
||||
auto writer_memory = 16 * 1024 * 1024;
|
||||
auto storage_config = milvus_storage::StorageConfig();
|
||||
auto arrow_schema = schema->ConvertToArrowSchema();
|
||||
milvus_storage::PackedRecordBatchWriter writer(
|
||||
fs_, paths, arrow_schema, storage_config, column_groups, writer_memory);
|
||||
auto result = milvus_storage::PackedRecordBatchWriter::Make(
|
||||
fs_,
|
||||
paths,
|
||||
arrow_schema,
|
||||
storage_config,
|
||||
column_groups,
|
||||
writer_memory,
|
||||
::parquet::default_writer_properties());
|
||||
EXPECT_TRUE(result.ok());
|
||||
auto writer = result.ValueOrDie();
|
||||
int64_t total_rows = 0;
|
||||
for (int64_t i = 0; i < n_batch; i++) {
|
||||
auto dataset = DataGen(schema, per_batch);
|
||||
@ -358,9 +386,9 @@ TEST_F(TestGrowingStorageV2, TestAllDataTypes) {
|
||||
ConvertToArrowRecordBatch(dataset, dim, arrow_schema);
|
||||
total_rows += record_batch->num_rows();
|
||||
|
||||
EXPECT_TRUE(writer.Write(record_batch).ok());
|
||||
EXPECT_TRUE(writer->Write(record_batch).ok());
|
||||
}
|
||||
EXPECT_TRUE(writer.Close().ok());
|
||||
EXPECT_TRUE(writer->Close().ok());
|
||||
|
||||
// Load data back from storage v2
|
||||
LoadFieldDataInfo load_info;
|
||||
|
||||
@ -184,32 +184,30 @@ LoadWithStrategy(const std::vector<std::string>& remote_files,
|
||||
memory_limit / blocks.size(), FILE_SLICE_SIZE.load());
|
||||
|
||||
for (const auto& block : blocks) {
|
||||
futures.emplace_back(pool.Submit([block,
|
||||
fs,
|
||||
file,
|
||||
file_idx,
|
||||
schema,
|
||||
reader_memory_limit]() {
|
||||
futures.emplace_back(pool.Submit(
|
||||
[block, fs, file, file_idx, schema, reader_memory_limit]() {
|
||||
AssertInfo(fs != nullptr,
|
||||
"[StorageV2] file system is nullptr");
|
||||
auto row_group_reader =
|
||||
std::make_shared<milvus_storage::FileRowGroupReader>(
|
||||
auto result = milvus_storage::FileRowGroupReader::Make(
|
||||
fs,
|
||||
file,
|
||||
schema,
|
||||
reader_memory_limit,
|
||||
milvus::storage::GetReaderProperties());
|
||||
AssertInfo(row_group_reader != nullptr,
|
||||
"[StorageV2] row group reader is nullptr");
|
||||
row_group_reader->SetRowGroupOffsetAndCount(block.offset,
|
||||
block.count);
|
||||
LOG_INFO(
|
||||
"[StorageV2] read row groups from file {} with offset "
|
||||
"{} and count "
|
||||
"{}",
|
||||
file,
|
||||
block.offset,
|
||||
block.count);
|
||||
AssertInfo(
|
||||
result.ok(),
|
||||
"[StorageV2] Failed to create row group reader: " +
|
||||
result.status().ToString());
|
||||
auto row_group_reader = result.ValueOrDie();
|
||||
auto status =
|
||||
row_group_reader->SetRowGroupOffsetAndCount(
|
||||
block.offset, block.count);
|
||||
AssertInfo(status.ok(),
|
||||
"[StorageV2] Failed to set row group offset "
|
||||
"and count " +
|
||||
std::to_string(block.offset) + " and " +
|
||||
std::to_string(block.count) +
|
||||
" with error " + status.ToString());
|
||||
auto ret = std::make_shared<ArrowDataWrapper>();
|
||||
for (int64_t i = 0; i < block.count; ++i) {
|
||||
std::shared_ptr<arrow::Table> table;
|
||||
@ -218,15 +216,16 @@ LoadWithStrategy(const std::vector<std::string>& remote_files,
|
||||
AssertInfo(status.ok(),
|
||||
"[StorageV2] Failed to read row group " +
|
||||
std::to_string(block.offset + i) +
|
||||
" from file " + file + " with error " +
|
||||
status.ToString());
|
||||
" from file " + file +
|
||||
" with error " + status.ToString());
|
||||
ret->arrow_tables.push_back(
|
||||
{file_idx,
|
||||
static_cast<size_t>(block.offset + i),
|
||||
table});
|
||||
}
|
||||
auto close_status = row_group_reader->Close();
|
||||
AssertInfo(close_status.ok(),
|
||||
AssertInfo(
|
||||
close_status.ok(),
|
||||
"[StorageV2] Failed to close row group reader "
|
||||
"for file " +
|
||||
file + " with error " +
|
||||
|
||||
@ -108,16 +108,21 @@ NewPackedWriterWithStorageConfig(struct ArrowSchema* schema,
|
||||
}
|
||||
|
||||
auto writer_properties = builder.build();
|
||||
auto writer = std::make_unique<milvus_storage::PackedRecordBatchWriter>(
|
||||
trueFs,
|
||||
auto result =
|
||||
milvus_storage::PackedRecordBatchWriter::Make(trueFs,
|
||||
truePaths,
|
||||
trueSchema,
|
||||
storage_config,
|
||||
columnGroups,
|
||||
buffer_size,
|
||||
writer_properties);
|
||||
AssertInfo(writer, "[StorageV2] writer pointer is null");
|
||||
*c_packed_writer = writer.release();
|
||||
AssertInfo(result.ok(),
|
||||
"[StorageV2] Failed to create packed writer: " +
|
||||
result.status().ToString());
|
||||
auto writer = result.ValueOrDie();
|
||||
*c_packed_writer =
|
||||
new std::shared_ptr<milvus_storage::PackedRecordBatchWriter>(
|
||||
std::move(writer));
|
||||
return milvus::SuccessCStatus();
|
||||
} catch (std::exception& e) {
|
||||
return milvus::FailureCStatus(&e);
|
||||
@ -177,16 +182,21 @@ NewPackedWriter(struct ArrowSchema* schema,
|
||||
}
|
||||
|
||||
auto writer_properties = builder.build();
|
||||
auto writer = std::make_unique<milvus_storage::PackedRecordBatchWriter>(
|
||||
trueFs,
|
||||
auto result =
|
||||
milvus_storage::PackedRecordBatchWriter::Make(trueFs,
|
||||
truePaths,
|
||||
trueSchema,
|
||||
conf,
|
||||
columnGroups,
|
||||
buffer_size,
|
||||
writer_properties);
|
||||
AssertInfo(writer, "[StorageV2] writer pointer is null");
|
||||
*c_packed_writer = writer.release();
|
||||
AssertInfo(result.ok(),
|
||||
"[StorageV2] Failed to create packed writer: " +
|
||||
result.status().ToString());
|
||||
auto writer = result.ValueOrDie();
|
||||
*c_packed_writer =
|
||||
new std::shared_ptr<milvus_storage::PackedRecordBatchWriter>(
|
||||
std::move(writer));
|
||||
return milvus::SuccessCStatus();
|
||||
} catch (std::exception& e) {
|
||||
return milvus::FailureCStatus(&e);
|
||||
@ -201,8 +211,8 @@ WriteRecordBatch(CPackedWriter c_packed_writer,
|
||||
SCOPE_CGO_CALL_METRIC();
|
||||
|
||||
try {
|
||||
auto packed_writer =
|
||||
static_cast<milvus_storage::PackedRecordBatchWriter*>(
|
||||
auto packed_writer = *static_cast<
|
||||
std::shared_ptr<milvus_storage::PackedRecordBatchWriter>*>(
|
||||
c_packed_writer);
|
||||
|
||||
auto import_schema = arrow::ImportSchema(schema);
|
||||
@ -248,10 +258,10 @@ CloseWriter(CPackedWriter c_packed_writer) {
|
||||
SCOPE_CGO_CALL_METRIC();
|
||||
|
||||
try {
|
||||
auto packed_writer =
|
||||
static_cast<milvus_storage::PackedRecordBatchWriter*>(
|
||||
auto packed_writer = static_cast<
|
||||
std::shared_ptr<milvus_storage::PackedRecordBatchWriter>*>(
|
||||
c_packed_writer);
|
||||
auto status = packed_writer->Close();
|
||||
auto status = (*packed_writer)->Close();
|
||||
delete packed_writer;
|
||||
if (!status.ok()) {
|
||||
return milvus::FailureCStatus(milvus::ErrorCode::FileWriteFailed,
|
||||
|
||||
@ -108,11 +108,16 @@ GroupChunkTranslator::GroupChunkTranslator(
|
||||
// Get row group metadata from files
|
||||
row_group_meta_list_.reserve(insert_files_.size());
|
||||
for (const auto& file : insert_files_) {
|
||||
auto reader = std::make_shared<milvus_storage::FileRowGroupReader>(
|
||||
auto result = milvus_storage::FileRowGroupReader::Make(
|
||||
fs,
|
||||
file,
|
||||
milvus_storage::DEFAULT_READ_BUFFER_SIZE,
|
||||
storage::GetReaderProperties());
|
||||
AssertInfo(result.ok(),
|
||||
"[StorageV2] Failed to create file row group reader: " +
|
||||
result.status().ToString());
|
||||
auto reader = result.ValueOrDie();
|
||||
|
||||
row_group_meta_list_.push_back(
|
||||
reader->file_metadata()->GetRowGroupMetadataVector());
|
||||
auto status = reader->Close();
|
||||
|
||||
@ -59,12 +59,16 @@ class GroupChunkTranslatorTest : public ::testing::TestWithParam<bool> {
|
||||
{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}};
|
||||
auto writer_memory = 16 * 1024 * 1024;
|
||||
auto storage_config = milvus_storage::StorageConfig();
|
||||
milvus_storage::PackedRecordBatchWriter writer(fs_,
|
||||
auto result = milvus_storage::PackedRecordBatchWriter::Make(
|
||||
fs_,
|
||||
paths_,
|
||||
arrow_schema_,
|
||||
storage_config,
|
||||
column_groups,
|
||||
writer_memory);
|
||||
writer_memory,
|
||||
::parquet::default_writer_properties());
|
||||
EXPECT_TRUE(result.ok());
|
||||
auto writer = result.ValueOrDie();
|
||||
int64_t total_rows = 0;
|
||||
for (int64_t i = 0; i < n_batch; i++) {
|
||||
auto dataset = DataGen(schema_, per_batch);
|
||||
@ -72,9 +76,9 @@ class GroupChunkTranslatorTest : public ::testing::TestWithParam<bool> {
|
||||
ConvertToArrowRecordBatch(dataset, dim, arrow_schema_);
|
||||
total_rows += record_batch->num_rows();
|
||||
|
||||
EXPECT_TRUE(writer.Write(record_batch).ok());
|
||||
EXPECT_TRUE(writer->Write(record_batch).ok());
|
||||
}
|
||||
EXPECT_TRUE(writer.Close().ok());
|
||||
EXPECT_TRUE(writer->Close().ok());
|
||||
}
|
||||
|
||||
protected:
|
||||
@ -116,8 +120,12 @@ TEST_P(GroupChunkTranslatorTest, TestWithMmap) {
|
||||
milvus::proto::common::LoadPriority::LOW);
|
||||
|
||||
// num cells - get the expected number from the file directly
|
||||
auto fr =
|
||||
std::make_shared<milvus_storage::FileRowGroupReader>(fs_, paths_[0]);
|
||||
auto reader_result =
|
||||
milvus_storage::FileRowGroupReader::Make(fs_, paths_[0]);
|
||||
AssertInfo(reader_result.ok(),
|
||||
"[StorageV2] Failed to create file row group reader: " +
|
||||
reader_result.status().ToString());
|
||||
auto fr = reader_result.ValueOrDie();
|
||||
auto expected_num_cells =
|
||||
fr->file_metadata()->GetRowGroupMetadataVector().size();
|
||||
auto row_group_metadata_vector =
|
||||
@ -214,25 +222,33 @@ TEST_P(GroupChunkTranslatorTest, TestMultipleFiles) {
|
||||
auto writer_memory = 16 * 1024 * 1024;
|
||||
auto storage_config = milvus_storage::StorageConfig();
|
||||
std::vector<std::string> single_file_paths{file_path};
|
||||
milvus_storage::PackedRecordBatchWriter writer(fs_,
|
||||
auto result = milvus_storage::PackedRecordBatchWriter::Make(
|
||||
fs_,
|
||||
single_file_paths,
|
||||
arrow_schema_,
|
||||
storage_config,
|
||||
column_groups,
|
||||
writer_memory);
|
||||
writer_memory,
|
||||
::parquet::default_writer_properties());
|
||||
EXPECT_TRUE(result.ok());
|
||||
auto writer = result.ValueOrDie();
|
||||
|
||||
for (int64_t i = 0; i < n_batch; i++) {
|
||||
auto dataset = DataGen(schema_, per_batch);
|
||||
auto record_batch =
|
||||
ConvertToArrowRecordBatch(dataset, dim, arrow_schema_);
|
||||
total_rows += record_batch->num_rows();
|
||||
EXPECT_TRUE(writer.Write(record_batch).ok());
|
||||
EXPECT_TRUE(writer->Write(record_batch).ok());
|
||||
}
|
||||
EXPECT_TRUE(writer.Close().ok());
|
||||
EXPECT_TRUE(writer->Close().ok());
|
||||
|
||||
// Get the number of row groups in this file
|
||||
auto fr = std::make_shared<milvus_storage::FileRowGroupReader>(
|
||||
fs_, file_path);
|
||||
auto reader_result =
|
||||
milvus_storage::FileRowGroupReader::Make(fs_, file_path);
|
||||
AssertInfo(reader_result.ok(),
|
||||
"[StorageV2] Failed to create file row group reader: " +
|
||||
reader_result.status().ToString());
|
||||
auto fr = reader_result.ValueOrDie();
|
||||
expected_row_groups_per_file.push_back(
|
||||
fr->file_metadata()->GetRowGroupMetadataVector().size());
|
||||
auto status = fr->Close();
|
||||
@ -303,8 +319,12 @@ TEST_P(GroupChunkTranslatorTest, TestMultipleFiles) {
|
||||
auto usage = translator->estimated_byte_size_of_cell(i).first;
|
||||
|
||||
// Get the expected memory size from the corresponding file
|
||||
auto fr = std::make_shared<milvus_storage::FileRowGroupReader>(
|
||||
auto reader_result = milvus_storage::FileRowGroupReader::Make(
|
||||
fs_, multi_file_paths[file_idx]);
|
||||
AssertInfo(reader_result.ok(),
|
||||
"[StorageV2] Failed to create file row group reader: " +
|
||||
reader_result.status().ToString());
|
||||
auto fr = reader_result.ValueOrDie();
|
||||
auto row_group_metadata_vector =
|
||||
fr->file_metadata()->GetRowGroupMetadataVector();
|
||||
auto expected_size = static_cast<int64_t>(
|
||||
|
||||
@ -1307,11 +1307,15 @@ GetFieldDatasFromStorageV2(std::vector<std::vector<std::string>>& remote_files,
|
||||
for (auto& column_group_file : remote_chunk_files) {
|
||||
// get all row groups for each file
|
||||
std::vector<std::vector<int64_t>> row_group_lists;
|
||||
auto reader = std::make_shared<milvus_storage::FileRowGroupReader>(
|
||||
auto result = milvus_storage::FileRowGroupReader::Make(
|
||||
fs,
|
||||
column_group_file,
|
||||
milvus_storage::DEFAULT_READ_BUFFER_SIZE,
|
||||
GetReaderProperties());
|
||||
AssertInfo(result.ok(),
|
||||
"[StorageV2] Failed to create file row group reader: " +
|
||||
result.status().ToString());
|
||||
auto reader = result.ValueOrDie();
|
||||
|
||||
auto row_group_num =
|
||||
reader->file_metadata()->GetRowGroupMetadataVector().size();
|
||||
@ -1515,12 +1519,16 @@ GetFieldIDList(FieldId column_group_id,
|
||||
field_id_list.Add(column_group_id.get());
|
||||
return field_id_list;
|
||||
}
|
||||
auto file_reader = std::make_shared<milvus_storage::FileRowGroupReader>(
|
||||
auto result = milvus_storage::FileRowGroupReader::Make(
|
||||
fs,
|
||||
filepath,
|
||||
arrow_schema,
|
||||
milvus_storage::DEFAULT_READ_BUFFER_SIZE,
|
||||
GetReaderProperties());
|
||||
AssertInfo(result.ok(),
|
||||
"[StorageV2] Failed to create file row group reader: " +
|
||||
result.status().ToString());
|
||||
auto file_reader = result.ValueOrDie();
|
||||
field_id_list =
|
||||
file_reader->file_metadata()->GetGroupFieldIDList().GetFieldIDList(
|
||||
column_group_id.get());
|
||||
|
||||
@ -22,14 +22,14 @@
|
||||
#include "monitor/scope_metric.h"
|
||||
|
||||
ReaderHandle
|
||||
createFFIReader(char* manifest,
|
||||
createFFIReader(ColumnGroupsHandle column_groups_handle,
|
||||
struct ArrowSchema* schema,
|
||||
char** needed_columns,
|
||||
int64_t needed_columns_size,
|
||||
const std::shared_ptr<Properties>& properties) {
|
||||
ReaderHandle reader_handler = 0;
|
||||
|
||||
FFIResult result = reader_new(manifest,
|
||||
FFIResult result = reader_new(column_groups_handle,
|
||||
schema,
|
||||
needed_columns,
|
||||
needed_columns_size,
|
||||
@ -97,7 +97,7 @@ NewPackedFFIReader(const char* manifest_path,
|
||||
}
|
||||
|
||||
CStatus
|
||||
NewPackedFFIReaderWithManifest(const char* manifest_content,
|
||||
NewPackedFFIReaderWithManifest(const ColumnGroupsHandle column_groups_handle,
|
||||
struct ArrowSchema* schema,
|
||||
char** needed_columns,
|
||||
int64_t needed_columns_size,
|
||||
@ -109,12 +109,10 @@ NewPackedFFIReaderWithManifest(const char* manifest_content,
|
||||
try {
|
||||
auto properties =
|
||||
MakeInternalPropertiesFromStorageConfig(c_storage_config);
|
||||
// Parse the column groups, the column groups is a JSON string
|
||||
auto cpp_column_groups =
|
||||
std::make_shared<milvus_storage::api::ColumnGroups>();
|
||||
auto des_result =
|
||||
cpp_column_groups->deserialize(std::string_view(manifest_content));
|
||||
AssertInfo(des_result.ok(), "failed to deserialize column groups");
|
||||
auto* cg_ptr = reinterpret_cast<
|
||||
std::shared_ptr<milvus_storage::api::ColumnGroups>*>(
|
||||
column_groups_handle);
|
||||
auto cpp_column_groups = *cg_ptr;
|
||||
|
||||
auto reader = GetLoonReader(cpp_column_groups,
|
||||
schema,
|
||||
|
||||
@ -105,7 +105,7 @@ NewPackedFFIReader(const char* manifest_path,
|
||||
* be freed after this call returns.
|
||||
*/
|
||||
CStatus
|
||||
NewPackedFFIReaderWithManifest(const char* manifest_content,
|
||||
NewPackedFFIReaderWithManifest(const ColumnGroupsHandle column_groups_handle,
|
||||
struct ArrowSchema* schema,
|
||||
char** needed_columns,
|
||||
int64_t needed_columns_size,
|
||||
|
||||
@ -252,12 +252,10 @@ GetManifest(const std::string& path,
|
||||
// Parse the JSON string
|
||||
json j = json::parse(path);
|
||||
|
||||
// Extract base_path and ver fields
|
||||
// Extract base_path
|
||||
std::string base_path = j.at("base_path").get<std::string>();
|
||||
int64_t ver = j.at("ver").get<int64_t>();
|
||||
|
||||
// return std::make_pair(base_path, ver);
|
||||
char* out_column_groups = nullptr;
|
||||
ColumnGroupsHandle out_column_groups = 0;
|
||||
int64_t out_read_version = 0;
|
||||
FFIResult result = get_latest_column_groups(base_path.c_str(),
|
||||
properties.get(),
|
||||
@ -293,9 +291,8 @@ GetColumnGroups(
|
||||
// Parse the JSON string
|
||||
json j = json::parse(path);
|
||||
|
||||
// Extract base_path and ver fields
|
||||
// Extract base_path
|
||||
std::string base_path = j.at("base_path").get<std::string>();
|
||||
int64_t ver = j.at("ver").get<int64_t>();
|
||||
|
||||
// TODO fetch manifest based on version after api supported
|
||||
auto transaction =
|
||||
|
||||
@ -14,7 +14,7 @@
|
||||
# Update milvus-storage_VERSION for the first occurrence
|
||||
milvus_add_pkg_config("milvus-storage")
|
||||
set_property(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} PROPERTY INCLUDE_DIRECTORIES "")
|
||||
set( milvus-storage_VERSION ba7df7b)
|
||||
set( milvus-storage_VERSION 5fff4f5)
|
||||
set( GIT_REPOSITORY "https://github.com/milvus-io/milvus-storage.git")
|
||||
message(STATUS "milvus-storage repo: ${GIT_REPOSITORY}")
|
||||
message(STATUS "milvus-storage version: ${milvus-storage_VERSION}")
|
||||
|
||||
@ -75,8 +75,16 @@ TEST_F(StorageV2IndexRawDataTest, TestGetRawData) {
|
||||
auto writer_memory = 16 * 1024 * 1024;
|
||||
auto storage_config = milvus_storage::StorageConfig();
|
||||
auto arrow_schema = schema->ConvertToArrowSchema();
|
||||
milvus_storage::PackedRecordBatchWriter writer(
|
||||
fs_, paths, arrow_schema, storage_config, column_groups, writer_memory);
|
||||
auto result = milvus_storage::PackedRecordBatchWriter::Make(
|
||||
fs_,
|
||||
paths,
|
||||
arrow_schema,
|
||||
storage_config,
|
||||
column_groups,
|
||||
writer_memory,
|
||||
::parquet::default_writer_properties());
|
||||
EXPECT_TRUE(result.ok());
|
||||
auto writer = result.ValueOrDie();
|
||||
int64_t total_rows = 0;
|
||||
for (int64_t i = 0; i < n_batch; i++) {
|
||||
auto dataset = DataGen(schema, per_batch);
|
||||
@ -84,9 +92,9 @@ TEST_F(StorageV2IndexRawDataTest, TestGetRawData) {
|
||||
ConvertToArrowRecordBatch(dataset, dim, arrow_schema);
|
||||
total_rows += record_batch->num_rows();
|
||||
|
||||
EXPECT_TRUE(writer.Write(record_batch).ok());
|
||||
EXPECT_TRUE(writer->Write(record_batch).ok());
|
||||
}
|
||||
EXPECT_TRUE(writer.Close().ok());
|
||||
EXPECT_TRUE(writer->Close().ok());
|
||||
|
||||
{
|
||||
// test memory file manager
|
||||
|
||||
@ -56,7 +56,10 @@ func (t *mixCompactionTask) GetTaskSlot() int64 {
|
||||
if t.GetTaskProto().GetType() == datapb.CompactionType_SortCompaction {
|
||||
segment := t.meta.GetHealthySegment(context.Background(), t.GetTaskProto().GetInputSegments()[0])
|
||||
if segment != nil {
|
||||
slotUsage = calculateStatsTaskSlot(segment.getSegmentSize())
|
||||
segSize := segment.getSegmentSize()
|
||||
slotUsage = calculateStatsTaskSlot(segSize)
|
||||
log.Info("mixCompactionTask get task slot",
|
||||
zap.Int64("segment size", segSize), zap.Int64("task slot", slotUsage))
|
||||
}
|
||||
}
|
||||
t.slotUsage.Store(slotUsage)
|
||||
|
||||
@ -179,7 +179,8 @@ func (i *indexInspector) createIndexForSegment(ctx context.Context, segment *Seg
|
||||
indexParams := i.meta.indexMeta.GetIndexParams(segment.CollectionID, indexID)
|
||||
indexType := GetIndexType(indexParams)
|
||||
isVectorIndex := vecindexmgr.GetVecIndexMgrInstance().IsVecIndex(indexType)
|
||||
taskSlot := calculateIndexTaskSlot(segment.getSegmentSize(), isVectorIndex)
|
||||
segSize := segment.getSegmentSize()
|
||||
taskSlot := calculateIndexTaskSlot(segSize, isVectorIndex)
|
||||
|
||||
// rewrite the index type if needed, and this final index type will be persisted in the meta
|
||||
if isVectorIndex && Params.KnowhereConfig.Enable.GetAsBool() {
|
||||
@ -215,6 +216,11 @@ func (i *indexInspector) createIndexForSegment(ctx context.Context, segment *Seg
|
||||
i.handler,
|
||||
i.storageCli,
|
||||
i.indexEngineVersionManager))
|
||||
log.Info("indexInspector create index for segment success",
|
||||
zap.Int64("segmentID", segment.ID),
|
||||
zap.Int64("indexID", indexID),
|
||||
zap.Int64("segment size", segSize),
|
||||
zap.Int64("task slot", taskSlot))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@ -1749,7 +1749,9 @@ func (m *meta) completeMixCompactionMutation(
|
||||
zap.String("type", t.GetType().String()),
|
||||
zap.Int64("collectionID", t.CollectionID),
|
||||
zap.Int64("partitionID", t.PartitionID),
|
||||
zap.String("channel", t.GetChannel()))
|
||||
zap.String("channel", t.GetChannel()),
|
||||
zap.Int64("planID", t.GetPlanID()),
|
||||
)
|
||||
|
||||
metricMutation := &segMetricMutation{stateChange: make(map[string]map[string]map[string]int)}
|
||||
var compactFromSegIDs []int64
|
||||
@ -1779,6 +1781,12 @@ func (m *meta) completeMixCompactionMutation(
|
||||
|
||||
// metrics mutation for compaction from segments
|
||||
updateSegStateAndPrepareMetrics(cloned, commonpb.SegmentState_Dropped, metricMutation)
|
||||
|
||||
log.Info("compact from segment",
|
||||
zap.Int64("segmentID", cloned.GetID()),
|
||||
zap.Int64("segment size", cloned.getSegmentSize()),
|
||||
zap.Int64("num rows", cloned.GetNumOfRows()),
|
||||
)
|
||||
}
|
||||
|
||||
log = log.With(zap.Int64s("compactFrom", compactFromSegIDs))
|
||||
@ -1828,6 +1836,7 @@ func (m *meta) completeMixCompactionMutation(
|
||||
zap.Int("binlog count", len(compactToSegmentInfo.GetBinlogs())),
|
||||
zap.Int("statslog count", len(compactToSegmentInfo.GetStatslogs())),
|
||||
zap.Int("deltalog count", len(compactToSegmentInfo.GetDeltalogs())),
|
||||
zap.Int64("segment size", compactToSegmentInfo.getSegmentSize()),
|
||||
)
|
||||
compactToSegments = append(compactToSegments, compactToSegmentInfo)
|
||||
}
|
||||
@ -2322,7 +2331,9 @@ func (m *meta) completeSortCompactionMutation(
|
||||
|
||||
log = log.With(zap.Int64s("compactFrom", []int64{oldSegment.GetID()}), zap.Int64("compactTo", segment.GetID()))
|
||||
|
||||
log.Info("meta update: prepare for complete stats mutation - complete", zap.Int64("num rows", segment.GetNumOfRows()))
|
||||
log.Info("meta update: prepare for complete stats mutation - complete",
|
||||
zap.Int64("num rows", segment.GetNumOfRows()),
|
||||
zap.Int64("segment size", segment.getSegmentSize()))
|
||||
if err := m.catalog.AlterSegments(m.ctx, []*datapb.SegmentInfo{cloned.SegmentInfo, segment.SegmentInfo}, metastore.BinlogsIncrement{Segment: segment.SegmentInfo}); err != nil {
|
||||
log.Warn("fail to alter segments and new segment", zap.Error(err))
|
||||
return nil, nil, err
|
||||
|
||||
@ -240,13 +240,7 @@ func NewManifestReader(manifest string,
|
||||
}
|
||||
|
||||
func (mr *ManifestReader) init() error {
|
||||
// TODO add needed column option
|
||||
manifest, err := packed.GetManifest(mr.manifest, mr.storageConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
reader, err := packed.NewFFIPackedReader(manifest, mr.arrowSchema, mr.neededColumns, mr.bufferSize, mr.storageConfig, mr.storagePluginContext)
|
||||
reader, err := packed.NewFFIPackedReader(mr.manifest, mr.arrowSchema, mr.neededColumns, mr.bufferSize, mr.storageConfig, mr.storagePluginContext)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -31,6 +31,7 @@ import (
|
||||
|
||||
"github.com/apache/arrow/go/v17/arrow"
|
||||
"github.com/apache/arrow/go/v17/arrow/cdata"
|
||||
"github.com/cockroachdb/errors"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/v2/log"
|
||||
@ -38,9 +39,11 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
|
||||
)
|
||||
|
||||
func NewFFIPackedReader(manifest string, schema *arrow.Schema, neededColumns []string, bufferSize int64, storageConfig *indexpb.StorageConfig, storagePluginContext *indexcgopb.StoragePluginContext) (*FFIPackedReader, error) {
|
||||
cManifest := C.CString(manifest)
|
||||
defer C.free(unsafe.Pointer(cManifest))
|
||||
func NewFFIPackedReader(manifestPath string, schema *arrow.Schema, neededColumns []string, bufferSize int64, storageConfig *indexpb.StorageConfig, storagePluginContext *indexcgopb.StoragePluginContext) (*FFIPackedReader, error) {
|
||||
cColumnGroups, err := GetColumnGroups(manifestPath, storageConfig)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to get manifest")
|
||||
}
|
||||
|
||||
var cas cdata.CArrowSchema
|
||||
cdata.ExportArrowSchema(schema, &cas)
|
||||
@ -102,7 +105,7 @@ func NewFFIPackedReader(manifest string, schema *arrow.Schema, neededColumns []s
|
||||
cNeededColumnArray := (**C.char)(unsafe.Pointer(&cNeededColumn[0]))
|
||||
cNumColumns := C.int64_t(len(neededColumns))
|
||||
|
||||
status = C.NewPackedFFIReaderWithManifest(cManifest, cSchema, cNeededColumnArray, cNumColumns, &cPackedReader, cStorageConfig, pluginContextPtr)
|
||||
status = C.NewPackedFFIReaderWithManifest(cColumnGroups, cSchema, cNeededColumnArray, cNumColumns, &cPackedReader, cStorageConfig, pluginContextPtr)
|
||||
} else {
|
||||
return nil, fmt.Errorf("storageConfig is required")
|
||||
}
|
||||
@ -183,30 +186,29 @@ func (r *FFIPackedReader) Release() {
|
||||
r.Close()
|
||||
}
|
||||
|
||||
func GetManifest(manifestPath string, storageConfig *indexpb.StorageConfig) (manifest string, err error) {
|
||||
func GetColumnGroups(manifestPath string, storageConfig *indexpb.StorageConfig) (columnGroups C.ColumnGroupsHandle, err error) {
|
||||
var cColumnGroups C.ColumnGroupsHandle
|
||||
basePath, version, err := UnmarshalManfestPath(manifestPath)
|
||||
if err != nil {
|
||||
return "", err
|
||||
return cColumnGroups, err
|
||||
}
|
||||
log.Info("GetManifest", zap.String("manifestPath", manifestPath), zap.String("basePath", basePath), zap.Int64("version", version))
|
||||
|
||||
cProperties, err := MakePropertiesFromStorageConfig(storageConfig, nil)
|
||||
if err != nil {
|
||||
return "", err
|
||||
return cColumnGroups, err
|
||||
}
|
||||
cBasePath := C.CString(basePath)
|
||||
defer C.free(unsafe.Pointer(cBasePath))
|
||||
|
||||
var cManifest *C.char
|
||||
var cVersion C.int64_t
|
||||
result := C.get_latest_column_groups(cBasePath, cProperties, &cManifest, &cVersion)
|
||||
result := C.get_latest_column_groups(cBasePath, cProperties, &cColumnGroups, &cVersion)
|
||||
err = HandleFFIResult(result)
|
||||
if err != nil {
|
||||
return "", err
|
||||
return cColumnGroups, err
|
||||
}
|
||||
|
||||
manifest = C.GoString(cManifest)
|
||||
return manifest, nil
|
||||
return cColumnGroups, nil
|
||||
}
|
||||
|
||||
// Ensure FFIPackedReader implements array.RecordReader interface
|
||||
|
||||
@ -133,9 +133,9 @@ func (pw *FFIPackedWriter) WriteRecordBatch(recordBatch arrow.Record) error {
|
||||
}
|
||||
|
||||
func (pw *FFIPackedWriter) Close() (string, error) {
|
||||
var manifest *C.char
|
||||
var cColumnGroups C.ColumnGroupsHandle
|
||||
|
||||
result := C.writer_close(pw.cWriterHandle, nil, nil, 0, &manifest)
|
||||
result := C.writer_close(pw.cWriterHandle, nil, nil, 0, &cColumnGroups)
|
||||
if err := HandleFFIResult(result); err != nil {
|
||||
return "", err
|
||||
}
|
||||
@ -143,7 +143,10 @@ func (pw *FFIPackedWriter) Close() (string, error) {
|
||||
cBasePath := C.CString(pw.basePath)
|
||||
defer C.free(unsafe.Pointer(cBasePath))
|
||||
var transationHandle C.TransactionHandle
|
||||
result = C.transaction_begin(cBasePath, pw.cProperties, &transationHandle)
|
||||
|
||||
// TODO pass version
|
||||
// use -1 as latest
|
||||
result = C.transaction_begin(cBasePath, pw.cProperties, &transationHandle, C.int64_t(-1))
|
||||
if err := HandleFFIResult(result); err != nil {
|
||||
return "", err
|
||||
}
|
||||
@ -157,23 +160,14 @@ func (pw *FFIPackedWriter) Close() (string, error) {
|
||||
// #define LOON_TRANSACTION_RESOLVE_MERGE 1
|
||||
// #define LOON_TRANSACTION_RESOLVE_MAX 2
|
||||
|
||||
var commitResult C.bool
|
||||
result = C.transaction_commit(transationHandle, C.int16_t(0), C.int16_t(0), manifest, &commitResult)
|
||||
var commitResult C.TransactionCommitResult
|
||||
result = C.transaction_commit(transationHandle, C.int16_t(0), C.int16_t(0), cColumnGroups, &commitResult)
|
||||
if err := HandleFFIResult(result); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
var readVersion C.int64_t
|
||||
|
||||
// TODO: not atomic, need to get version from transaction
|
||||
var cOutManifest *C.char
|
||||
result = C.get_latest_column_groups(cBasePath, pw.cProperties, &cOutManifest, &readVersion)
|
||||
if err := HandleFFIResult(result); err != nil {
|
||||
return "", err
|
||||
}
|
||||
outManifest := C.GoString(cOutManifest)
|
||||
log.Info("FFI writer closed with output manifest", zap.String("manifest", outManifest), zap.Int64("version", int64(readVersion)))
|
||||
log.Info("FFI writer closed", zap.Int64("version", int64(commitResult.committed_version)))
|
||||
|
||||
defer C.properties_free(pw.cProperties)
|
||||
return MarshalManifestPath(pw.basePath, int64(readVersion)), nil
|
||||
return MarshalManifestPath(pw.basePath, int64(commitResult.committed_version)), nil
|
||||
}
|
||||
|
||||
@ -212,6 +212,9 @@ make rebuild_cache >/dev/null 2>&1
|
||||
|
||||
CPU_ARCH=$(get_cpu_arch $CPU_TARGET)
|
||||
|
||||
# In case any 3rdparty (e.g. libavrocpp) requires a minimum version of CMake lower than 3.5
|
||||
export CMAKE_POLICY_VERSION_MINIMUM=3.5
|
||||
|
||||
arch=$(uname -m)
|
||||
CMAKE_CMD="cmake \
|
||||
${CMAKE_EXTRA_ARGS} \
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user