diff --git a/internal/core/src/segcore/packed_writer_c.cpp b/internal/core/src/segcore/packed_writer_c.cpp index 01136c4f9c..e082f1dba0 100644 --- a/internal/core/src/segcore/packed_writer_c.cpp +++ b/internal/core/src/segcore/packed_writer_c.cpp @@ -29,43 +29,6 @@ #include "common/type_c.h" #include "monitor/scope_metric.h" -// Deep copy ArrowArray and return a copied RecordBatch -// This function creates a complete deep copy of the ArrowArray and returns it as a RecordBatch -static std::shared_ptr -ArrowArrayDeepCopyToRecordBatch(const struct ArrowArray* src, - const struct ArrowSchema* schema) { - AssertInfo(src != nullptr, "[StorageV2] Source ArrowArray is null"); - AssertInfo(schema != nullptr, "[StorageV2] Source ArrowSchema is null"); - - auto record_batch = - arrow::ImportRecordBatch(const_cast(src), - const_cast(schema)) - .ValueOrDie(); - - // Get the default CPU memory manager for deep copy - auto memory_manager = arrow::default_cpu_memory_manager(); - - // For true deep copy, we'll use Arrow's CopyTo() function - std::vector> copied_arrays; - for (int i = 0; i < record_batch->num_columns(); i++) { - auto original_array = record_batch->column(i); - - auto copied_data_result = - original_array->data()->CopyTo(memory_manager); - AssertInfo(copied_data_result.ok(), - "[StorageV2] Failed to deep copy array data: {}", - copied_data_result.status().ToString()); - - auto copied_data = copied_data_result.ValueOrDie(); - auto copied_array = arrow::MakeArray(copied_data); - copied_arrays.push_back(copied_array); - } - - // Create and return a new RecordBatch with the copied arrays - return arrow::RecordBatch::Make( - record_batch->schema(), record_batch->num_rows(), copied_arrays); -} - CStatus NewPackedWriterWithStorageConfig(struct ArrowSchema* schema, const int64_t buffer_size, @@ -174,7 +137,8 @@ NewPackedWriter(struct ArrowSchema* schema, CStatus WriteRecordBatch(CPackedWriter c_packed_writer, - struct ArrowArray* array, + struct ArrowArray* arrays, + struct ArrowSchema* array_schemas, struct ArrowSchema* schema) { SCOPE_CGO_CALL_METRIC(); @@ -183,12 +147,29 @@ WriteRecordBatch(CPackedWriter c_packed_writer, static_cast( c_packed_writer); - // Deep copy the ArrowArray and get a copied RecordBatch - auto record_batch = ArrowArrayDeepCopyToRecordBatch(array, schema); - if (record_batch == nullptr) { + auto import_schema = arrow::ImportSchema(schema); + if (!import_schema.ok()) { return milvus::FailureCStatus(milvus::ErrorCode::FileWriteFailed, - "Failed to copy ArrowArray"); + "Failed to import schema: " + import_schema.status().ToString()); } + auto arrow_schema = import_schema.ValueOrDie(); + + int num_fields = arrow_schema->num_fields(); + std::vector> all_arrays; + all_arrays.reserve(num_fields); + + for (int i = 0; i < num_fields; i++) { + auto array = arrow::ImportArray(&arrays[i], &array_schemas[i]); + if (!array.ok()) { + return milvus::FailureCStatus(milvus::ErrorCode::FileWriteFailed, + "Failed to import array " + std::to_string(i) + ": " + array.status().ToString()); + } + all_arrays.push_back(array.ValueOrDie()); + } + + auto record_batch = arrow::RecordBatch::Make( + arrow_schema, all_arrays[0]->length(), all_arrays); + auto status = packed_writer->Write(record_batch); if (!status.ok()) { return milvus::FailureCStatus(milvus::ErrorCode::FileWriteFailed, diff --git a/internal/core/src/segcore/packed_writer_c.h b/internal/core/src/segcore/packed_writer_c.h index 666e7e86db..c6041b744a 100644 --- a/internal/core/src/segcore/packed_writer_c.h +++ b/internal/core/src/segcore/packed_writer_c.h @@ -45,7 +45,8 @@ NewPackedWriter(struct ArrowSchema* schema, CStatus WriteRecordBatch(CPackedWriter c_packed_writer, - struct ArrowArray* array, + struct ArrowArray* arrays, + struct ArrowSchema* array_schemas, struct ArrowSchema* schema); CStatus diff --git a/internal/core/unittest/test_packed_c.cpp b/internal/core/unittest/test_packed_c.cpp index 3e7d7af1c0..43654fadfa 100644 --- a/internal/core/unittest/test_packed_c.cpp +++ b/internal/core/unittest/test_packed_c.cpp @@ -46,11 +46,15 @@ TEST(CPackedTest, PackedWriterAndReader) { false, arrow::key_value_metadata( {milvus_storage::ARROW_FIELD_ID_KEY}, {"100"}))}); + auto origin_schema = arrow::schema({schema->fields()[0]->Copy()}); auto batch = arrow::RecordBatch::Make(schema, array->length(), {array}); struct ArrowSchema c_write_schema; ASSERT_TRUE(arrow::ExportSchema(*schema, &c_write_schema).ok()); + struct ArrowSchema c_origin_schema; + ASSERT_TRUE(arrow::ExportSchema(*origin_schema, &c_origin_schema).ok()); + const int64_t buffer_size = 10 * 1024 * 1024; char* path = const_cast("/tmp"); char* paths[] = {const_cast("/tmp/0")}; @@ -77,7 +81,10 @@ TEST(CPackedTest, PackedWriterAndReader) { struct ArrowSchema cschema; ASSERT_TRUE(arrow::ExportRecordBatch(*batch, &carray, &cschema).ok()); - c_status = WriteRecordBatch(c_packed_writer, &carray, &cschema); + struct ArrowArray arrays[] = {carray}; + struct ArrowSchema array_schemas[] = {cschema}; + + c_status = WriteRecordBatch(c_packed_writer, arrays, array_schemas, &c_origin_schema); EXPECT_EQ(c_status.error_code, 0); c_status = CloseWriter(c_packed_writer); diff --git a/internal/storagev2/packed/packed_writer.go b/internal/storagev2/packed/packed_writer.go index 4849067172..16f7533695 100644 --- a/internal/storagev2/packed/packed_writer.go +++ b/internal/storagev2/packed/packed_writer.go @@ -114,17 +114,22 @@ func NewPackedWriter(filePaths []string, schema *arrow.Schema, bufferSize int64, } func (pw *PackedWriter) WriteRecordBatch(recordBatch arrow.Record) error { - var caa cdata.CArrowArray + cArrays := make([]CArrowArray, recordBatch.NumCols()) + cSchemas := make([]CArrowSchema, recordBatch.NumCols()) + + for i := range recordBatch.NumCols() { + var caa cdata.CArrowArray + var cas cdata.CArrowSchema + cdata.ExportArrowArray(recordBatch.Column(int(i)), &caa, &cas) + cArrays[i] = *(*CArrowArray)(unsafe.Pointer(&caa)) + cSchemas[i] = *(*CArrowSchema)(unsafe.Pointer(&cas)) + } + var cas cdata.CArrowSchema - - cdata.ExportArrowRecordBatch(recordBatch, &caa, &cas) - - cArr := (*C.struct_ArrowArray)(unsafe.Pointer(&caa)) + cdata.ExportArrowSchema(recordBatch.Schema(), &cas) cSchema := (*C.struct_ArrowSchema)(unsafe.Pointer(&cas)) - defer cdata.ReleaseCArrowSchema(&cas) - defer cdata.ReleaseCArrowArray(&caa) - status := C.WriteRecordBatch(pw.cPackedWriter, cArr, cSchema) + status := C.WriteRecordBatch(pw.cPackedWriter, &cArrays[0], &cSchemas[0], cSchema) if err := ConsumeCStatusIntoError(&status); err != nil { return err } diff --git a/tests/integration/getvector/array_struct_test.go b/tests/integration/getvector/array_struct_test.go index 08e4c3eb5f..7b5bf09584 100644 --- a/tests/integration/getvector/array_struct_test.go +++ b/tests/integration/getvector/array_struct_test.go @@ -203,6 +203,6 @@ func (s *TestArrayStructSuite) TestGetVector_ArrayStruct_FloatVector() { } func TestGetVectorArrayStruct(t *testing.T) { - // t.Skip("Skip integration test, need to refactor integration test framework.") + t.Skip("Skip integration test, need to refactor integration test framework.") suite.Run(t, new(TestArrayStructSuite)) }