enhance: Deep copy arraw array (#43724)

Deep copy arrow array and make a new RecordBatch with the copied array.

issue: https://github.com/milvus-io/milvus/issues/43310

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
yihao.dai 2025-08-05 00:31:38 +08:00 committed by GitHub
parent f14c7d598c
commit cb7be8885d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -15,16 +15,57 @@
#include "segcore/column_groups_c.h"
#include "segcore/packed_writer_c.h"
#include "milvus-storage/packed/writer.h"
#include "milvus-storage/common/log.h"
#include "milvus-storage/common/config.h"
#include "milvus-storage/filesystem/fs.h"
#include <arrow/c/bridge.h>
#include <arrow/filesystem/filesystem.h>
#include <arrow/array.h>
#include <arrow/record_batch.h>
#include <arrow/memory_pool.h>
#include <arrow/device.h>
#include <cstring>
#include "common/EasyAssert.h"
#include "common/type_c.h"
#include "monitor/scope_metric.h"
// Deep copy ArrowArray and return a copied RecordBatch
// This function creates a complete deep copy of the ArrowArray and returns it as a RecordBatch
static std::shared_ptr<arrow::RecordBatch>
ArrowArrayDeepCopyToRecordBatch(const struct ArrowArray* src,
const struct ArrowSchema* schema) {
AssertInfo(src != nullptr, "[StorageV2] Source ArrowArray is null");
AssertInfo(schema != nullptr, "[StorageV2] Source ArrowSchema is null");
auto record_batch =
arrow::ImportRecordBatch(const_cast<struct ArrowArray*>(src),
const_cast<struct ArrowSchema*>(schema))
.ValueOrDie();
// Get the default CPU memory manager for deep copy
auto memory_manager = arrow::default_cpu_memory_manager();
// For true deep copy, we'll use Arrow's CopyTo() function
std::vector<std::shared_ptr<arrow::Array>> copied_arrays;
for (int i = 0; i < record_batch->num_columns(); i++) {
auto original_array = record_batch->column(i);
auto copied_data_result =
original_array->data()->CopyTo(memory_manager);
AssertInfo(copied_data_result.ok(),
"[StorageV2] Failed to deep copy array data: {}",
copied_data_result.status().ToString());
auto copied_data = copied_data_result.ValueOrDie();
auto copied_array = arrow::MakeArray(copied_data);
copied_arrays.push_back(copied_array);
}
// Create and return a new RecordBatch with the copied arrays
return arrow::RecordBatch::Make(
record_batch->schema(), record_batch->num_rows(), copied_arrays);
}
CStatus
NewPackedWriterWithStorageConfig(struct ArrowSchema* schema,
const int64_t buffer_size,
@ -141,8 +182,13 @@ WriteRecordBatch(CPackedWriter c_packed_writer,
auto packed_writer =
static_cast<milvus_storage::PackedRecordBatchWriter*>(
c_packed_writer);
auto record_batch =
arrow::ImportRecordBatch(array, schema).ValueOrDie();
// Deep copy the ArrowArray and get a copied RecordBatch
auto record_batch = ArrowArrayDeepCopyToRecordBatch(array, schema);
if (record_batch == nullptr) {
return milvus::FailureCStatus(milvus::ErrorCode::FileWriteFailed,
"Failed to copy ArrowArray");
}
auto status = packed_writer->Write(record_batch);
if (!status.ok()) {
return milvus::FailureCStatus(milvus::ErrorCode::FileWriteFailed,