enhance: update packed reader api (#41055)

related: https://github.com/milvus-io/milvus/issues/39173

Signed-off-by: shaoting-huang <shaoting.huang@zilliz.com>
This commit is contained in:
sthuang 2025-04-09 10:18:26 +08:00 committed by GitHub
parent e2d8adb963
commit 50e02e3598
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 17 additions and 12 deletions

View File

@ -40,12 +40,8 @@ NewPackedReader(char** paths,
"Failed to get filesystem");
}
auto trueSchema = arrow::ImportSchema(schema).ValueOrDie();
std::set<int> needed_columns;
for (int i = 0; i < trueSchema->num_fields(); i++) {
needed_columns.emplace(i);
}
auto reader = std::make_unique<milvus_storage::PackedRecordBatchReader>(
trueFs, truePaths, trueSchema, needed_columns, buffer_size);
trueFs, truePaths, trueSchema, buffer_size);
*c_packed_reader = reader.release();
return milvus::SuccessCStatus();
} catch (std::exception& e) {

View File

@ -14,7 +14,7 @@
# Update milvus-storage_VERSION for the first occurrence
milvus_add_pkg_config("milvus-storage")
set_property(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} PROPERTY INCLUDE_DIRECTORIES "")
set( milvus-storage_VERSION f21caea )
set( milvus-storage_VERSION 6cf3724 )
set( GIT_REPOSITORY "https://github.com/milvus-io/milvus-storage.git")
message(STATUS "milvus-storage repo: ${GIT_REPOSITORY}")
message(STATUS "milvus-storage version: ${milvus-storage_VERSION}")

View File

@ -15,6 +15,7 @@
// limitations under the License.
#include <gtest/gtest.h>
#include "milvus-storage/common/constants.h"
#include "segcore/packed_writer_c.h"
#include "segcore/packed_reader_c.h"
#include "segcore/arrow_fs_c.h"
@ -25,8 +26,8 @@
#include <arrow/array/builder_primitive.h>
#include "arrow/table_builder.h"
#include "arrow/type_fwd.h"
#include <arrow/util/key_value_metadata.h>
#include <numeric>
#include <iostream>
TEST(CPackedTest, PackedWriterAndReader) {
std::vector<int64_t> test_data(5);
@ -39,7 +40,12 @@ TEST(CPackedTest, PackedWriterAndReader) {
ASSERT_TRUE(res.ok());
std::shared_ptr<arrow::Array> array = res.ValueOrDie();
auto schema = arrow::schema({arrow::field("int64", arrow::int64())});
auto schema = arrow::schema(
{arrow::field("int64",
arrow::int64(),
false,
arrow::key_value_metadata(
{milvus_storage::ARROW_FIELD_ID_KEY}, {"100"}))});
auto batch = arrow::RecordBatch::Make(schema, array->length(), {array});
struct ArrowSchema c_write_schema;

View File

@ -6,6 +6,7 @@ import (
"github.com/apache/arrow/go/v17/arrow"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/storagev2/packed"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
)
@ -37,7 +38,7 @@ func ConvertToArrowField(field *schemapb.FieldSchema, dataType arrow.DataType) a
return arrow.Field{
Name: field.GetName(),
Type: dataType,
Metadata: arrow.NewMetadata([]string{"FieldID"}, []string{strconv.Itoa(int(field.GetFieldID()))}),
Metadata: arrow.NewMetadata([]string{packed.ArrowFieldIdMetadataKey}, []string{strconv.Itoa(int(field.GetFieldID()))}),
Nullable: field.GetNullable(),
}
}

View File

@ -21,4 +21,6 @@ const (
DefaultWriteBufferSize = 32 * 1024 * 1024 // 32MB
// DefaultMultiPartUploadSize is the default size of each part of a multipart upload.
DefaultMultiPartUploadSize = 10 * 1024 * 1024 // 10MB
// Arrow will convert these field IDs to a metadata key named PARQUET:field_id on the appropriate field.
ArrowFieldIdMetadataKey = "PARQUET:field_id"
)

View File

@ -47,9 +47,9 @@ func (suite *PackedTestSuite) SetupSuite() {
func (suite *PackedTestSuite) SetupTest() {
initcore.InitLocalArrowFileSystem("/tmp")
schema := arrow.NewSchema([]arrow.Field{
{Name: "a", Type: arrow.PrimitiveTypes.Int32},
{Name: "b", Type: arrow.PrimitiveTypes.Int64},
{Name: "c", Type: arrow.BinaryTypes.String},
{Name: "a", Type: arrow.PrimitiveTypes.Int32, Nullable: false, Metadata: arrow.NewMetadata([]string{ArrowFieldIdMetadataKey}, []string{"100"})},
{Name: "b", Type: arrow.PrimitiveTypes.Int64, Nullable: false, Metadata: arrow.NewMetadata([]string{ArrowFieldIdMetadataKey}, []string{"101"})},
{Name: "c", Type: arrow.BinaryTypes.String, Nullable: false, Metadata: arrow.NewMetadata([]string{ArrowFieldIdMetadataKey}, []string{"102"})},
}, nil)
suite.schema = schema