diff --git a/internal/core/src/segcore/packed_reader_c.cpp b/internal/core/src/segcore/packed_reader_c.cpp index eda3ab0cfd..7bf144f500 100644 --- a/internal/core/src/segcore/packed_reader_c.cpp +++ b/internal/core/src/segcore/packed_reader_c.cpp @@ -40,12 +40,8 @@ NewPackedReader(char** paths, "Failed to get filesystem"); } auto trueSchema = arrow::ImportSchema(schema).ValueOrDie(); - std::set needed_columns; - for (int i = 0; i < trueSchema->num_fields(); i++) { - needed_columns.emplace(i); - } auto reader = std::make_unique( - trueFs, truePaths, trueSchema, needed_columns, buffer_size); + trueFs, truePaths, trueSchema, buffer_size); *c_packed_reader = reader.release(); return milvus::SuccessCStatus(); } catch (std::exception& e) { diff --git a/internal/core/thirdparty/milvus-storage/CMakeLists.txt b/internal/core/thirdparty/milvus-storage/CMakeLists.txt index e06285ca06..7ebc1ff6ac 100644 --- a/internal/core/thirdparty/milvus-storage/CMakeLists.txt +++ b/internal/core/thirdparty/milvus-storage/CMakeLists.txt @@ -14,7 +14,7 @@ # Update milvus-storage_VERSION for the first occurrence milvus_add_pkg_config("milvus-storage") set_property(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} PROPERTY INCLUDE_DIRECTORIES "") -set( milvus-storage_VERSION f21caea ) +set( milvus-storage_VERSION 6cf3724 ) set( GIT_REPOSITORY "https://github.com/milvus-io/milvus-storage.git") message(STATUS "milvus-storage repo: ${GIT_REPOSITORY}") message(STATUS "milvus-storage version: ${milvus-storage_VERSION}") diff --git a/internal/core/unittest/test_packed_c.cpp b/internal/core/unittest/test_packed_c.cpp index 9ff6d692bf..3e7d7af1c0 100644 --- a/internal/core/unittest/test_packed_c.cpp +++ b/internal/core/unittest/test_packed_c.cpp @@ -15,6 +15,7 @@ // limitations under the License. #include +#include "milvus-storage/common/constants.h" #include "segcore/packed_writer_c.h" #include "segcore/packed_reader_c.h" #include "segcore/arrow_fs_c.h" @@ -25,8 +26,8 @@ #include #include "arrow/table_builder.h" #include "arrow/type_fwd.h" +#include #include -#include TEST(CPackedTest, PackedWriterAndReader) { std::vector test_data(5); @@ -39,7 +40,12 @@ TEST(CPackedTest, PackedWriterAndReader) { ASSERT_TRUE(res.ok()); std::shared_ptr array = res.ValueOrDie(); - auto schema = arrow::schema({arrow::field("int64", arrow::int64())}); + auto schema = arrow::schema( + {arrow::field("int64", + arrow::int64(), + false, + arrow::key_value_metadata( + {milvus_storage::ARROW_FIELD_ID_KEY}, {"100"}))}); auto batch = arrow::RecordBatch::Make(schema, array->length(), {array}); struct ArrowSchema c_write_schema; diff --git a/internal/storage/schema.go b/internal/storage/schema.go index 026ed58766..73a6939ac5 100644 --- a/internal/storage/schema.go +++ b/internal/storage/schema.go @@ -6,6 +6,7 @@ import ( "github.com/apache/arrow/go/v17/arrow" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/storagev2/packed" "github.com/milvus-io/milvus/pkg/v2/util/merr" ) @@ -37,7 +38,7 @@ func ConvertToArrowField(field *schemapb.FieldSchema, dataType arrow.DataType) a return arrow.Field{ Name: field.GetName(), Type: dataType, - Metadata: arrow.NewMetadata([]string{"FieldID"}, []string{strconv.Itoa(int(field.GetFieldID()))}), + Metadata: arrow.NewMetadata([]string{packed.ArrowFieldIdMetadataKey}, []string{strconv.Itoa(int(field.GetFieldID()))}), Nullable: field.GetNullable(), } } diff --git a/internal/storagev2/packed/constant.go b/internal/storagev2/packed/constant.go index 0292bfaeea..4cf71d8b6d 100644 --- a/internal/storagev2/packed/constant.go +++ b/internal/storagev2/packed/constant.go @@ -21,4 +21,6 @@ const ( DefaultWriteBufferSize = 32 * 1024 * 1024 // 32MB // DefaultMultiPartUploadSize is the default size of each part of a multipart upload. DefaultMultiPartUploadSize = 10 * 1024 * 1024 // 10MB + // Arrow will convert these field IDs to a metadata key named PARQUET:field_id on the appropriate field. + ArrowFieldIdMetadataKey = "PARQUET:field_id" ) diff --git a/internal/storagev2/packed/packed_test.go b/internal/storagev2/packed/packed_test.go index f713e471c0..a896111782 100644 --- a/internal/storagev2/packed/packed_test.go +++ b/internal/storagev2/packed/packed_test.go @@ -47,9 +47,9 @@ func (suite *PackedTestSuite) SetupSuite() { func (suite *PackedTestSuite) SetupTest() { initcore.InitLocalArrowFileSystem("/tmp") schema := arrow.NewSchema([]arrow.Field{ - {Name: "a", Type: arrow.PrimitiveTypes.Int32}, - {Name: "b", Type: arrow.PrimitiveTypes.Int64}, - {Name: "c", Type: arrow.BinaryTypes.String}, + {Name: "a", Type: arrow.PrimitiveTypes.Int32, Nullable: false, Metadata: arrow.NewMetadata([]string{ArrowFieldIdMetadataKey}, []string{"100"})}, + {Name: "b", Type: arrow.PrimitiveTypes.Int64, Nullable: false, Metadata: arrow.NewMetadata([]string{ArrowFieldIdMetadataKey}, []string{"101"})}, + {Name: "c", Type: arrow.BinaryTypes.String, Nullable: false, Metadata: arrow.NewMetadata([]string{ArrowFieldIdMetadataKey}, []string{"102"})}, }, nil) suite.schema = schema