From 3c2cf2c06623c2a8ff3eb403706a848b63cf5179 Mon Sep 17 00:00:00 2001 From: marcelo-cjl Date: Mon, 29 Dec 2025 10:51:21 +0800 Subject: [PATCH] feat: Add nullable vector support in import utility layer (#46142) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit related: #45993 Add nullable vector support in import utility layer Key changes: ImportV2 util: - Add nullable vector types (FloatVector, Float16Vector, BFloat16Vector, BinaryVector, SparseFloatVector, Int8Vector) to AppendNullableDefaultFieldsData() - Add tests for nullable vector field data appending CSV/JSON/Numpy readers: - Add nullPercent parameter to test data generation for better null coverage - Mark vector fields as nullable in test schemas - Add test cases for nullable vector field parsing - Refactor tests to use loop-based approach with 0%, 50%, 100% null percentages Parquet field reader: - Add ReadNullableBinaryData() for nullable BinaryVector/Float16Vector/BFloat16Vector - Add ReadNullableFloatVectorData() for nullable FloatVector - Add ReadNullableSparseFloatVectorData() for nullable SparseFloatVector - Add ReadNullableInt8VectorData() for nullable Int8Vector - Add ReadNullableStructData() for generic nullable struct data - Update Next() to use nullable read methods when field is nullable - Add null data validation for non-nullable fields - Core invariant: import must preserve per-row alignment and validity for every field — nullable vector fields are expected to be encoded with per-row validity masks and all readers/writers must emit arrays aligned to original input rows (null entries represented explicitly). - New feature & scope: adds end-to-end nullable-vector support in the import utility layer — AppendNullableDefaultFieldsData in internal/datanode/importv2/util.go now appends nil placeholders for nullable vectors (FloatVector, Float16Vector, BFloat16Vector, BinaryVector, SparseFloatVector, Int8Vector); parquet reader (internal/util/importutilv2/parquet/field_reader.go) adds ReadNullableBinaryData, ReadNullableFloatVectorData, ReadNullableSparseFloatVectorData, ReadNullableInt8VectorData, ReadNullableStructData and routes nullable branches to these helpers; CSV/JSON/Numpy readers and test utilities updated to generate and validate 0/50/100% null scenarios and mark vector fields as nullable in test schemas. - Logic removed / simplified: eliminates ad-hoc "parameter-invalid" rejections for nullable vectors inside FieldReader.Next by centralizing nullable handling into ReadNullable* helpers and shared validators (getArrayDataNullable, checkNullableVectorAlignWithDim/checkNullableVectorAligned), simplifying control flow and removing scattered special-case checks. - No data loss / no regression (concrete code paths): nulls are preserved end-to-end — AppendNullableDefaultFieldsData explicitly inserts nil entries per null row (datanode import append path); ReadNullable*Data helpers return both data and []bool validity masks so callers in field_reader.go and downstream readers receive exact per-row validity; testutil.BuildSparseVectorData was extended to accept validData so sparse vectors are materialized only for valid rows while null rows are represented as missing. These concrete paths ensure null rows are represented rather than dropped, preventing data loss or behavioral regression. Signed-off-by: marcelo-cjl --- internal/datanode/importv2/util.go | 13 + internal/datanode/importv2/util_test.go | 48 ++- internal/util/importutilv2/csv/reader_test.go | 102 +++-- .../util/importutilv2/csv/row_parser_test.go | 19 +- .../util/importutilv2/json/reader_test.go | 50 ++- .../util/importutilv2/json/row_parser_test.go | 13 +- .../util/importutilv2/numpy/reader_test.go | 24 +- .../util/importutilv2/parquet/field_reader.go | 324 ++++++++++++++- .../importutilv2/parquet/field_reader_test.go | 69 ++-- .../util/importutilv2/parquet/reader_test.go | 123 +++--- internal/util/testutil/test_util.go | 373 +++++++++++++----- 11 files changed, 850 insertions(+), 308 deletions(-) diff --git a/internal/datanode/importv2/util.go b/internal/datanode/importv2/util.go index 1e103f3bb2..d27cb58373 100644 --- a/internal/datanode/importv2/util.go +++ b/internal/datanode/importv2/util.go @@ -374,6 +374,19 @@ func AppendNullableDefaultFieldsData(schema *schemapb.CollectionSchema, data *st appender := &nullDefaultAppender[*schemapb.ScalarField]{} err = appender.AppendNull(fieldData, rowNum) } + case schemapb.DataType_FloatVector, + schemapb.DataType_Float16Vector, + schemapb.DataType_BFloat16Vector, + schemapb.DataType_BinaryVector, + schemapb.DataType_SparseFloatVector, + schemapb.DataType_Int8Vector: + if nullable { + for i := 0; i < rowNum; i++ { + if err = fieldData.AppendRow(nil); err != nil { + return err + } + } + } default: return fmt.Errorf("Unexpected data type: %d, cannot be filled with default value", dataType) } diff --git a/internal/datanode/importv2/util_test.go b/internal/datanode/importv2/util_test.go index cd392f50c2..be4223d6bf 100644 --- a/internal/datanode/importv2/util_test.go +++ b/internal/datanode/importv2/util_test.go @@ -495,10 +495,52 @@ func Test_AppendNullableDefaultFieldsData(t *testing.T) { }, }, }, + { + name: "float vector is nullable", + fieldID: 200, + dataType: schemapb.DataType_FloatVector, + nullable: true, + }, + { + name: "float16 vector is nullable", + fieldID: 200, + dataType: schemapb.DataType_Float16Vector, + nullable: true, + }, + { + name: "bfloat16 vector is nullable", + fieldID: 200, + dataType: schemapb.DataType_BFloat16Vector, + nullable: true, + }, + { + name: "binary vector is nullable", + fieldID: 200, + dataType: schemapb.DataType_BinaryVector, + nullable: true, + }, + { + name: "sparse float vector is nullable", + fieldID: 200, + dataType: schemapb.DataType_SparseFloatVector, + nullable: true, + }, + { + name: "int8 vector is nullable", + fieldID: 200, + dataType: schemapb.DataType_Int8Vector, + nullable: true, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { schema := buildSchemaFn() + isVectorType := tt.dataType == schemapb.DataType_FloatVector || + tt.dataType == schemapb.DataType_Float16Vector || + tt.dataType == schemapb.DataType_BFloat16Vector || + tt.dataType == schemapb.DataType_BinaryVector || + tt.dataType == schemapb.DataType_SparseFloatVector || + tt.dataType == schemapb.DataType_Int8Vector fieldSchema := &schemapb.FieldSchema{ FieldID: tt.fieldID, Name: fmt.Sprintf("field_%d", tt.fieldID), @@ -511,10 +553,12 @@ func Test_AppendNullableDefaultFieldsData(t *testing.T) { fieldSchema.TypeParams = append(fieldSchema.TypeParams, &commonpb.KeyValuePair{Key: common.MaxCapacityKey, Value: "100"}) } else if tt.dataType == schemapb.DataType_VarChar { fieldSchema.TypeParams = append(fieldSchema.TypeParams, &commonpb.KeyValuePair{Key: common.MaxLengthKey, Value: "100"}) + } else if isVectorType && tt.dataType != schemapb.DataType_SparseFloatVector { + fieldSchema.TypeParams = append(fieldSchema.TypeParams, &commonpb.KeyValuePair{Key: common.DimKey, Value: "8"}) } // create data without the new field - insertData, err := testutil.CreateInsertData(schema, count) + insertData, err := testutil.CreateInsertData(schema, count, 100) assert.NoError(t, err) // add new nullalbe/default field to the schema @@ -524,7 +568,7 @@ func Test_AppendNullableDefaultFieldsData(t *testing.T) { tempSchema := &schemapb.CollectionSchema{ Fields: []*schemapb.FieldSchema{fieldSchema}, } - tempData, err := testutil.CreateInsertData(tempSchema, 1) + tempData, err := testutil.CreateInsertData(tempSchema, 1, 100) assert.NoError(t, err) insertData.Data[fieldSchema.GetFieldID()] = tempData.Data[fieldSchema.GetFieldID()] diff --git a/internal/util/importutilv2/csv/reader_test.go b/internal/util/importutilv2/csv/reader_test.go index 3b1b38da19..b6e24fae67 100644 --- a/internal/util/importutilv2/csv/reader_test.go +++ b/internal/util/importutilv2/csv/reader_test.go @@ -58,7 +58,7 @@ func (suite *ReaderSuite) SetupTest() { suite.vecDataType = schemapb.DataType_FloatVector } -func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.DataType, nullable bool) { +func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.DataType, nullable bool, nullPercent int) { schema := &schemapb.CollectionSchema{ Fields: []*schemapb.FieldSchema{ { @@ -83,6 +83,7 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data Value: "8", }, }, + Nullable: nullable, }, { FieldID: 102, @@ -129,7 +130,7 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data nullkey := "" // generate csv data - insertData, err := testutil.CreateInsertData(schema, suite.numRows) + insertData, err := testutil.CreateInsertData(schema, suite.numRows, nullPercent) suite.NoError(err) csvData, err := testutil.CreateInsertDataForCSV(schema, insertData, nullkey) suite.NoError(err) @@ -178,65 +179,60 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data } func (suite *ReaderSuite) TestReadScalarFields() { - suite.run(schemapb.DataType_Bool, schemapb.DataType_None, false) - suite.run(schemapb.DataType_Int8, schemapb.DataType_None, false) - suite.run(schemapb.DataType_Int16, schemapb.DataType_None, false) - suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) - suite.run(schemapb.DataType_Int64, schemapb.DataType_None, false) - suite.run(schemapb.DataType_Float, schemapb.DataType_None, false) - suite.run(schemapb.DataType_Double, schemapb.DataType_None, false) - suite.run(schemapb.DataType_String, schemapb.DataType_None, false) - suite.run(schemapb.DataType_VarChar, schemapb.DataType_None, false) - suite.run(schemapb.DataType_JSON, schemapb.DataType_None, false) + elementTypes := []schemapb.DataType{ + schemapb.DataType_Bool, + schemapb.DataType_Int8, + schemapb.DataType_Int16, + schemapb.DataType_Int32, + schemapb.DataType_Int64, + schemapb.DataType_Float, + schemapb.DataType_Double, + schemapb.DataType_String, + } + scalarTypes := append(elementTypes, []schemapb.DataType{schemapb.DataType_VarChar, schemapb.DataType_JSON, schemapb.DataType_Array}...) - suite.run(schemapb.DataType_Array, schemapb.DataType_Bool, false) - suite.run(schemapb.DataType_Array, schemapb.DataType_Int8, false) - suite.run(schemapb.DataType_Array, schemapb.DataType_Int16, false) - suite.run(schemapb.DataType_Array, schemapb.DataType_Int32, false) - suite.run(schemapb.DataType_Array, schemapb.DataType_Int64, false) - suite.run(schemapb.DataType_Array, schemapb.DataType_Float, false) - suite.run(schemapb.DataType_Array, schemapb.DataType_Double, false) - suite.run(schemapb.DataType_Array, schemapb.DataType_String, false) - - suite.run(schemapb.DataType_Bool, schemapb.DataType_None, true) - suite.run(schemapb.DataType_Int8, schemapb.DataType_None, true) - suite.run(schemapb.DataType_Int16, schemapb.DataType_None, true) - suite.run(schemapb.DataType_Int32, schemapb.DataType_None, true) - suite.run(schemapb.DataType_Int64, schemapb.DataType_None, true) - suite.run(schemapb.DataType_Float, schemapb.DataType_None, true) - suite.run(schemapb.DataType_Double, schemapb.DataType_None, true) - suite.run(schemapb.DataType_String, schemapb.DataType_None, true) - suite.run(schemapb.DataType_VarChar, schemapb.DataType_None, true) - suite.run(schemapb.DataType_JSON, schemapb.DataType_None, true) - - suite.run(schemapb.DataType_Array, schemapb.DataType_Bool, true) - suite.run(schemapb.DataType_Array, schemapb.DataType_Int8, true) - suite.run(schemapb.DataType_Array, schemapb.DataType_Int16, true) - suite.run(schemapb.DataType_Array, schemapb.DataType_Int32, true) - suite.run(schemapb.DataType_Array, schemapb.DataType_Int64, true) - suite.run(schemapb.DataType_Array, schemapb.DataType_Float, true) - suite.run(schemapb.DataType_Array, schemapb.DataType_Double, true) - suite.run(schemapb.DataType_Array, schemapb.DataType_String, true) + for _, dataType := range scalarTypes { + if dataType == schemapb.DataType_Array { + for _, elementType := range elementTypes { + suite.run(dataType, elementType, false, 0) + for _, nullPercent := range []int{0, 50, 100} { + suite.run(dataType, elementType, true, nullPercent) + } + } + } else { + suite.run(dataType, schemapb.DataType_None, false, 0) + for _, nullPercent := range []int{0, 50, 100} { + suite.run(dataType, schemapb.DataType_None, true, nullPercent) + } + } + } } func (suite *ReaderSuite) TestStringPK() { suite.pkDataType = schemapb.DataType_VarChar - suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) + suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false, 0) + for _, nullPercent := range []int{0, 50, 100} { + suite.run(schemapb.DataType_Int32, schemapb.DataType_None, true, nullPercent) + } } func (suite *ReaderSuite) TestVector() { - suite.vecDataType = schemapb.DataType_BinaryVector - suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) - suite.vecDataType = schemapb.DataType_FloatVector - suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) - suite.vecDataType = schemapb.DataType_Float16Vector - suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) - suite.vecDataType = schemapb.DataType_BFloat16Vector - suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) - suite.vecDataType = schemapb.DataType_SparseFloatVector - suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) - suite.vecDataType = schemapb.DataType_Int8Vector - suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) + dataTypes := []schemapb.DataType{ + schemapb.DataType_BinaryVector, + schemapb.DataType_FloatVector, + schemapb.DataType_Float16Vector, + schemapb.DataType_BFloat16Vector, + schemapb.DataType_SparseFloatVector, + schemapb.DataType_Int8Vector, + } + + for _, dataType := range dataTypes { + suite.vecDataType = dataType + suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false, 0) + for _, nullPercent := range []int{0, 50, 100} { + suite.run(schemapb.DataType_Int32, schemapb.DataType_None, true, nullPercent) + } + } } func (suite *ReaderSuite) TestError() { diff --git a/internal/util/importutilv2/csv/row_parser_test.go b/internal/util/importutilv2/csv/row_parser_test.go index aa9f14fa64..7f7f4d53cb 100644 --- a/internal/util/importutilv2/csv/row_parser_test.go +++ b/internal/util/importutilv2/csv/row_parser_test.go @@ -131,35 +131,41 @@ func (suite *RowParserSuite) createAllTypesSchema() *schemapb.CollectionSchema { Name: "float_vector", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "2"}}, + Nullable: suite.hasNullable, }, { FieldID: 22, Name: "bin_vector", DataType: schemapb.DataType_BinaryVector, TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "16"}}, + Nullable: suite.hasNullable, }, { FieldID: 23, Name: "sparse_vector", DataType: schemapb.DataType_SparseFloatVector, + Nullable: suite.hasNullable, }, { FieldID: 24, Name: "f16_vector", DataType: schemapb.DataType_Float16Vector, TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "2"}}, + Nullable: suite.hasNullable, }, { FieldID: 25, Name: "bf16_vector", DataType: schemapb.DataType_BFloat16Vector, TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "2"}}, + Nullable: suite.hasNullable, }, { FieldID: 26, Name: "int8_vector", DataType: schemapb.DataType_Int8Vector, TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "2"}}, + Nullable: suite.hasNullable, }, { FieldID: 27, @@ -648,6 +654,12 @@ func (suite *RowParserSuite) TestValid() { suite.runValid(&testCase{name: "A/N/D nullable field varchar is nil", content: suite.genAllTypesRowData("varchar", suite.nullKey)}) suite.runValid(&testCase{name: "A/N/D nullable field json is nil", content: suite.genAllTypesRowData("json", suite.nullKey)}) suite.runValid(&testCase{name: "A/N/D nullable field array_int8 is nil", content: suite.genAllTypesRowData("array_int8", suite.nullKey)}) + suite.runValid(&testCase{name: "A/N/D nullable field float_vector is nil", content: suite.genAllTypesRowData("float_vector", suite.nullKey)}) + suite.runValid(&testCase{name: "A/N/D nullable field bin_vector is nil", content: suite.genAllTypesRowData("bin_vector", suite.nullKey)}) + suite.runValid(&testCase{name: "A/N/D nullable field sparse_vector is nil", content: suite.genAllTypesRowData("sparse_vector", suite.nullKey)}) + suite.runValid(&testCase{name: "A/N/D nullable field f16_vector is nil", content: suite.genAllTypesRowData("f16_vector", suite.nullKey)}) + suite.runValid(&testCase{name: "A/N/D nullable field bf16_vector is nil", content: suite.genAllTypesRowData("bf16_vector", suite.nullKey)}) + suite.runValid(&testCase{name: "A/N/D nullable field int8_vector is nil", content: suite.genAllTypesRowData("int8_vector", suite.nullKey)}) // Test struct array parsing suite.runValid(&testCase{name: "A/N/D struct array valid", content: suite.genAllTypesRowData("x", "2")}) @@ -704,13 +716,6 @@ func (suite *RowParserSuite) TestParseError() { suite.Error(err) suite.Nil(parser) - // field value missed - content = suite.genAllTypesRowData("x", "2", "float_vector") - header, _ = suite.genRowContent(schema, content) - parser, err = NewRowParser(schema, header, suite.nullKey) - suite.Error(err) - suite.Nil(parser) - // function output no need provide content = suite.genAllTypesRowData("x", "2") content["function_sparse_vector"] = "{\"1\":0.5,\"10\":1.5,\"100\":2.5}" diff --git a/internal/util/importutilv2/json/reader_test.go b/internal/util/importutilv2/json/reader_test.go index e71b702215..fb65f3f498 100644 --- a/internal/util/importutilv2/json/reader_test.go +++ b/internal/util/importutilv2/json/reader_test.go @@ -56,7 +56,7 @@ func (suite *ReaderSuite) SetupTest() { suite.vecDataType = schemapb.DataType_FloatVector } -func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.DataType, nullable bool) { +func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.DataType, nullable bool, nullPercent int) { schema := &schemapb.CollectionSchema{ Fields: []*schemapb.FieldSchema{ { @@ -81,6 +81,7 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data Value: "8", }, }, + Nullable: nullable, }, { FieldID: 102, @@ -121,7 +122,7 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data }) } - insertData, err := testutil.CreateInsertData(schema, suite.numRows) + insertData, err := testutil.CreateInsertData(schema, suite.numRows, nullPercent) suite.NoError(err) rows, err := testutil.CreateInsertDataRowsForJSON(schema, insertData) @@ -261,12 +262,16 @@ func (suite *ReaderSuite) TestReadScalarFields() { for _, dataType := range scalarTypes { if dataType == schemapb.DataType_Array { for _, elementType := range elementTypes { - suite.run(dataType, elementType, false) - suite.run(dataType, elementType, true) + suite.run(dataType, elementType, false, 0) + for _, nullPercent := range []int{0, 50, 100} { + suite.run(dataType, elementType, true, nullPercent) + } } } else { - suite.run(dataType, schemapb.DataType_None, false) - suite.run(dataType, schemapb.DataType_None, true) + suite.run(dataType, schemapb.DataType_None, false, 0) + for _, nullPercent := range []int{0, 50, 100} { + suite.run(dataType, schemapb.DataType_None, true, nullPercent) + } } } } @@ -291,22 +296,29 @@ func (suite *ReaderSuite) TestReadScalarFieldsWithDefaultValue() { func (suite *ReaderSuite) TestStringPK() { suite.pkDataType = schemapb.DataType_VarChar - suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) + suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false, 0) + for _, nullPercent := range []int{0, 50, 100} { + suite.run(schemapb.DataType_Int32, schemapb.DataType_None, true, nullPercent) + } } func (suite *ReaderSuite) TestVector() { - suite.vecDataType = schemapb.DataType_BinaryVector - suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) - suite.vecDataType = schemapb.DataType_FloatVector - suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) - suite.vecDataType = schemapb.DataType_Float16Vector - suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) - suite.vecDataType = schemapb.DataType_BFloat16Vector - suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) - suite.vecDataType = schemapb.DataType_SparseFloatVector - suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) - suite.vecDataType = schemapb.DataType_Int8Vector - suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) + dataTypes := []schemapb.DataType{ + schemapb.DataType_BinaryVector, + schemapb.DataType_FloatVector, + schemapb.DataType_Float16Vector, + schemapb.DataType_BFloat16Vector, + schemapb.DataType_SparseFloatVector, + schemapb.DataType_Int8Vector, + } + + for _, dataType := range dataTypes { + suite.vecDataType = dataType + suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false, 0) + for _, nullPercent := range []int{0, 50, 100} { + suite.run(schemapb.DataType_Int32, schemapb.DataType_None, true, nullPercent) + } + } } func (suite *ReaderSuite) TestDecodeError() { diff --git a/internal/util/importutilv2/json/row_parser_test.go b/internal/util/importutilv2/json/row_parser_test.go index f2ce4c6a10..00b7b93a0c 100644 --- a/internal/util/importutilv2/json/row_parser_test.go +++ b/internal/util/importutilv2/json/row_parser_test.go @@ -110,35 +110,41 @@ func (suite *RowParserSuite) createAllTypesSchema() *schemapb.CollectionSchema { Name: "float_vector", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "2"}}, + Nullable: suite.hasNullable, }, { FieldID: 22, Name: "bin_vector", DataType: schemapb.DataType_BinaryVector, TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "16"}}, + Nullable: suite.hasNullable, }, { FieldID: 23, Name: "sparse_vector", DataType: schemapb.DataType_SparseFloatVector, + Nullable: suite.hasNullable, }, { FieldID: 24, Name: "f16_vector", DataType: schemapb.DataType_Float16Vector, TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "2"}}, + Nullable: suite.hasNullable, }, { FieldID: 25, Name: "bf16_vector", DataType: schemapb.DataType_BFloat16Vector, TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "2"}}, + Nullable: suite.hasNullable, }, { FieldID: 26, Name: "int8_vector", DataType: schemapb.DataType_Int8Vector, TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "2"}}, + Nullable: suite.hasNullable, }, { FieldID: 27, @@ -633,6 +639,12 @@ func (suite *RowParserSuite) TestValid() { suite.runValid(&testCase{name: "A/N/D nullable field varchar is nil", content: suite.genAllTypesRowData("varchar", nil)}) suite.runValid(&testCase{name: "A/N/D nullable field json is nil", content: suite.genAllTypesRowData("json", nil)}) suite.runValid(&testCase{name: "A/N/D nullable field array_int8 is nil", content: suite.genAllTypesRowData("array_int8", nil)}) + suite.runValid(&testCase{name: "A/N/D nullable field float_vector is nil", content: suite.genAllTypesRowData("float_vector", nil)}) + suite.runValid(&testCase{name: "A/N/D nullable field bin_vector is nil", content: suite.genAllTypesRowData("bin_vector", nil)}) + suite.runValid(&testCase{name: "A/N/D nullable field sparse_vector is nil", content: suite.genAllTypesRowData("sparse_vector", nil)}) + suite.runValid(&testCase{name: "A/N/D nullable field f16_vector is nil", content: suite.genAllTypesRowData("f16_vector", nil)}) + suite.runValid(&testCase{name: "A/N/D nullable field bf16_vector is nil", content: suite.genAllTypesRowData("bf16_vector", nil)}) + suite.runValid(&testCase{name: "A/N/D nullable field int8_vector is nil", content: suite.genAllTypesRowData("int8_vector", nil)}) suite.setSchema(false, true, true) suite.runValid(&testCase{name: "N/D valid parse", content: suite.genAllTypesRowData("x", 2)}) @@ -675,7 +687,6 @@ func (suite *RowParserSuite) TestParseError() { {name: "not a JSON for dynamic", content: suite.genAllTypesRowData("$meta", []int{})}, {name: "exceeds max length varchar", content: suite.genAllTypesRowData("varchar", "aaaaaaaaaa")}, {name: "exceeds max capacity", content: suite.genAllTypesRowData("array_int8", []int{1, 2, 3, 4, 5})}, - {name: "field value missed", content: suite.genAllTypesRowData("x", 2, "float_vector")}, {name: "type error bool", content: suite.genAllTypesRowData("bool", 0.2)}, {name: "type error int8", content: suite.genAllTypesRowData("int8", []int32{})}, {name: "type error int16", content: suite.genAllTypesRowData("int16", []int32{})}, diff --git a/internal/util/importutilv2/numpy/reader_test.go b/internal/util/importutilv2/numpy/reader_test.go index 7a2baaf6cc..a0105e908a 100644 --- a/internal/util/importutilv2/numpy/reader_test.go +++ b/internal/util/importutilv2/numpy/reader_test.go @@ -433,18 +433,18 @@ func (suite *ReaderSuite) TestStringPK() { } func (suite *ReaderSuite) TestVector() { - suite.vecDataType = schemapb.DataType_BinaryVector - suite.run(schemapb.DataType_Int32) - suite.vecDataType = schemapb.DataType_FloatVector - suite.run(schemapb.DataType_Int32) - suite.vecDataType = schemapb.DataType_Float16Vector - suite.run(schemapb.DataType_Int32) - suite.vecDataType = schemapb.DataType_BFloat16Vector - suite.run(schemapb.DataType_Int32) - // suite.vecDataType = schemapb.DataType_SparseFloatVector - // suite.run(schemapb.DataType_Int32) - suite.vecDataType = schemapb.DataType_Int8Vector - suite.run(schemapb.DataType_Int32) + dataTypes := []schemapb.DataType{ + schemapb.DataType_BinaryVector, + schemapb.DataType_FloatVector, + schemapb.DataType_Float16Vector, + schemapb.DataType_BFloat16Vector, + schemapb.DataType_Int8Vector, + } + + for _, dataType := range dataTypes { + suite.vecDataType = dataType + suite.run(schemapb.DataType_Int32) + } } func TestNumpyCreateReaders(t *testing.T) { diff --git a/internal/util/importutilv2/parquet/field_reader.go b/internal/util/importutilv2/parquet/field_reader.go index c5de8d478d..f207a102fa 100644 --- a/internal/util/importutilv2/parquet/field_reader.go +++ b/internal/util/importutilv2/parquet/field_reader.go @@ -189,13 +189,13 @@ func (c *FieldReader) Next(count int64) (any, any, error) { case schemapb.DataType_BinaryVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector: // vector not support default_value if c.field.GetNullable() { - return nil, nil, merr.WrapErrParameterInvalidMsg("not support nullable in vector") + return ReadNullableBinaryData(c, count) } data, err := ReadBinaryData(c, count) return data, nil, err case schemapb.DataType_FloatVector: if c.field.GetNullable() { - return nil, nil, merr.WrapErrParameterInvalidMsg("not support nullable in vector") + return ReadNullableFloatVectorData(c, count) } arrayData, err := ReadIntegerOrFloatArrayData[float32](c, count) if err != nil { @@ -208,13 +208,13 @@ func (c *FieldReader) Next(count int64) (any, any, error) { return vectors, nil, nil case schemapb.DataType_SparseFloatVector: if c.field.GetNullable() { - return nil, nil, merr.WrapErrParameterInvalidMsg("not support nullable in vector") + return ReadNullableSparseFloatVectorData(c, count) } data, err := ReadSparseFloatVectorData(c, count) return data, nil, err case schemapb.DataType_Int8Vector: if c.field.GetNullable() { - return nil, nil, merr.WrapErrParameterInvalidMsg("not support nullable in vector") + return ReadNullableInt8VectorData(c, count) } arrayData, err := ReadIntegerOrFloatArrayData[int8](c, count) if err != nil { @@ -493,9 +493,6 @@ func ReadNullableIntegerOrFloatData[T constraints.Integer | constraints.Float](p // Value type of "indices" is array.List, element type is array.Uint32 // Value type of "values" is array.List, element type is array.Float32 // The length of the list is equal to the length of chunked.Chunks() -// -// Note: now the ReadStructData() is used by SparseVector type and SparseVector is not nullable, -// create a new method ReadNullableStructData() if we have nullable struct type in future. func ReadStructData(pcr *FieldReader, count int64) ([]map[string]arrow.Array, error) { chunked, err := pcr.columnReader.NextBatch(count) if err != nil { @@ -524,6 +521,40 @@ func ReadStructData(pcr *FieldReader, count int64) ([]map[string]arrow.Array, er return data, nil } +func ReadNullableStructData(pcr *FieldReader, count int64) ([]map[string]arrow.Array, []bool, error) { + chunked, err := pcr.columnReader.NextBatch(count) + if err != nil { + return nil, nil, err + } + data := make([]map[string]arrow.Array, 0, count) + validData := make([]bool, 0, count) + + for _, chunk := range chunked.Chunks() { + structReader, ok := chunk.(*array.Struct) + if !ok { + return nil, nil, WrapTypeErr(pcr.field, chunk.DataType().Name()) + } + + structType := structReader.DataType().(*arrow.StructType) + rows := structReader.Len() + // Sparse storage: only store valid rows' data + for i := 0; i < rows; i++ { + validData = append(validData, !structReader.IsNull(i)) + if !structReader.IsNull(i) { + st := make(map[string]arrow.Array) + for k, field := range structType.Fields() { + st[field.Name] = structReader.Field(k) + } + data = append(data, st) + } + } + } + if len(data) == 0 && len(validData) == 0 { + return nil, nil, nil + } + return data, validData, nil +} + func ReadStringData(pcr *FieldReader, count int64, isVarcharField bool) (any, error) { chunked, err := pcr.columnReader.NextBatch(count) if err != nil { @@ -851,6 +882,61 @@ func ReadBinaryData(pcr *FieldReader, count int64) (any, error) { return data, nil } +func ReadNullableBinaryData(pcr *FieldReader, count int64) (any, []bool, error) { + dataType := pcr.field.GetDataType() + chunked, err := pcr.columnReader.NextBatch(count) + if err != nil { + return nil, nil, err + } + data := make([]byte, 0, count) + validData := make([]bool, 0, count) + + // Sparse storage: only store valid rows' data + for _, chunk := range chunked.Chunks() { + rows := chunk.Data().Len() + switch chunk.DataType().ID() { + case arrow.NULL: + for i := 0; i < rows; i++ { + validData = append(validData, false) + } + case arrow.BINARY: + binaryReader := chunk.(*array.Binary) + for i := 0; i < rows; i++ { + if binaryReader.IsNull(i) { + validData = append(validData, false) + } else { + data = append(data, binaryReader.Value(i)...) + validData = append(validData, true) + } + } + case arrow.LIST: + listReader := chunk.(*array.List) + if err = checkNullableVectorAligned(listReader.Offsets(), listReader, pcr.dim, dataType); err != nil { + return nil, nil, merr.WrapErrImportFailed(fmt.Sprintf("length of vector is not aligned: %s, data type: %s", err.Error(), dataType.String())) + } + uint8Reader, ok := listReader.ListValues().(*array.Uint8) + if !ok { + return nil, nil, WrapTypeErr(pcr.field, listReader.ListValues().DataType().Name()) + } + for i := 0; i < rows; i++ { + if listReader.IsNull(i) { + validData = append(validData, false) + } else { + start, end := listReader.ValueOffsets(i) + data = append(data, uint8Reader.Uint8Values()[start:end]...) + validData = append(validData, true) + } + } + default: + return nil, nil, WrapTypeErr(pcr.field, chunk.DataType().Name()) + } + } + if len(data) == 0 && len(validData) == 0 { + return nil, nil, nil + } + return data, validData, nil +} + func parseSparseFloatRowVector(str string) ([]byte, uint32, error) { rowVec, err := typeutil.CreateSparseFloatRowFromJSON([]byte(str)) if err != nil { @@ -1045,6 +1131,80 @@ func ReadSparseFloatVectorData(pcr *FieldReader, count int64) (any, error) { }, nil } +func ReadNullableSparseFloatVectorData(pcr *FieldReader, count int64) (any, []bool, error) { + if pcr.sparseIsString { + data, validData, err := ReadNullableStringData(pcr, count) + if err != nil { + return nil, nil, err + } + if data == nil { + return nil, nil, nil + } + + // Sparse storage: only store valid rows' data + byteArr := make([][]byte, 0, count) + maxDim := uint32(0) + + for i, str := range data.([]string) { + if validData[i] { + rowVec, rowMaxIdx, err := parseSparseFloatRowVector(str) + if err != nil { + return nil, nil, err + } + byteArr = append(byteArr, rowVec) + if rowMaxIdx > maxDim { + maxDim = rowMaxIdx + } + } + } + + return &storage.SparseFloatVectorFieldData{ + SparseFloatArray: schemapb.SparseFloatArray{ + Dim: int64(maxDim), + Contents: byteArr, + }, + ValidData: validData, + Nullable: true, + }, validData, nil + } + + data, validData, err := ReadNullableStructData(pcr, count) + if err != nil { + return nil, nil, err + } + if data == nil { + return nil, nil, nil + } + + // Sparse storage: only store valid rows' data + byteArr := make([][]byte, 0, count) + maxDim := uint32(0) + + for i, structData := range data { + if validData[i] { + singleByteArr, singleMaxDim, err := parseSparseFloatVectorStructs([]map[string]arrow.Array{structData}) + if err != nil { + return nil, nil, err + } + if len(singleByteArr) > 0 { + byteArr = append(byteArr, singleByteArr[0]) + if singleMaxDim > maxDim { + maxDim = singleMaxDim + } + } + } + } + + return &storage.SparseFloatVectorFieldData{ + SparseFloatArray: schemapb.SparseFloatArray{ + Dim: int64(maxDim), + Contents: byteArr, + }, + ValidData: validData, + Nullable: true, + }, validData, nil +} + func checkVectorAlignWithDim(offsets []int32, dim int32) error { for i := 1; i < len(offsets); i++ { if offsets[i]-offsets[i-1] != dim { @@ -1054,6 +1214,16 @@ func checkVectorAlignWithDim(offsets []int32, dim int32) error { return nil } +func checkNullableVectorAlignWithDim(offsets []int32, listReader *array.List, dim int32) error { + for i := 1; i < len(offsets); i++ { + length := offsets[i] - offsets[i-1] + if !listReader.IsNull(i-1) && length != dim { + return fmt.Errorf("expected %d but got %d", dim, length) + } + } + return nil +} + func checkVectorAligned(offsets []int32, dim int, dataType schemapb.DataType) error { if len(offsets) < 1 { return errors.New("empty offsets") @@ -1075,6 +1245,26 @@ func checkVectorAligned(offsets []int32, dim int, dataType schemapb.DataType) er } } +func checkNullableVectorAligned(offsets []int32, listReader *array.List, dim int, dataType schemapb.DataType) error { + if len(offsets) < 1 { + return errors.New("empty offsets") + } + switch dataType { + case schemapb.DataType_BinaryVector: + return checkNullableVectorAlignWithDim(offsets, listReader, int32(dim/8)) + case schemapb.DataType_FloatVector: + return checkNullableVectorAlignWithDim(offsets, listReader, int32(dim)) + case schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector: + return checkNullableVectorAlignWithDim(offsets, listReader, int32(dim*2)) + case schemapb.DataType_SparseFloatVector: + return nil + case schemapb.DataType_Int8Vector: + return checkNullableVectorAlignWithDim(offsets, listReader, int32(dim)) + default: + return fmt.Errorf("unexpected vector data type %s", dataType.String()) + } +} + func getArrayData[T any](offsets []int32, getElement func(int) (T, error), outputArray func(arr []T, valid bool)) error { for i := 1; i < len(offsets); i++ { start, end := offsets[i-1], offsets[i] @@ -1092,6 +1282,24 @@ func getArrayData[T any](offsets []int32, getElement func(int) (T, error), outpu return nil } +func getArrayDataNullable[T any](offsets []int32, listReader *array.List, getElement func(int) (T, error), outputArray func(arr []T, valid bool)) error { + for i := 1; i < len(offsets); i++ { + isValid := !listReader.IsNull(i - 1) + + start, end := offsets[i-1], offsets[i] + arrData := make([]T, 0, end-start) + for j := start; j < end; j++ { + elementVal, err := getElement(int(j)) + if err != nil { + return err + } + arrData = append(arrData, elementVal) + } + outputArray(arrData, isValid) + } + return nil +} + func ReadBoolArrayData(pcr *FieldReader, count int64) (any, error) { chunked, err := pcr.columnReader.NextBatch(count) if err != nil { @@ -1430,6 +1638,108 @@ func ReadNullableIntegerOrFloatArrayData[T constraints.Integer | constraints.Flo return data, validData, nil } +func ReadNullableFloatVectorData(pcr *FieldReader, count int64) (any, []bool, error) { + chunked, err := pcr.columnReader.NextBatch(count) + if err != nil { + return nil, nil, err + } + data := make([]float32, 0, int(count)*pcr.dim) + validData := make([]bool, 0, count) + + for _, chunk := range chunked.Chunks() { + listReader, ok := chunk.(*array.List) + if !ok { + // the chunk type may be *array.Null if the data in chunk is all null + _, ok := chunk.(*array.Null) + if !ok { + return nil, nil, WrapTypeErr(pcr.field, chunk.DataType().Name()) + } + dataNums := chunk.Data().Len() + validData = append(validData, make([]bool, dataNums)...) + continue + } + + dataType := pcr.field.GetDataType() + offsets := listReader.Offsets() + if err = checkNullableVectorAligned(offsets, listReader, pcr.dim, dataType); err != nil { + return nil, nil, merr.WrapErrImportFailed(fmt.Sprintf("length of vector is not aligned: %s, data type: %s", err.Error(), dataType.String())) + } + + valueReader := listReader.ListValues() + rows := listReader.Len() + + // Sparse storage: only store valid rows' data + float32Reader, ok := valueReader.(*array.Float32) + if !ok { + return nil, nil, WrapTypeErr(pcr.field, valueReader.DataType().Name()) + } + for i := 0; i < rows; i++ { + validData = append(validData, !listReader.IsNull(i)) + if !listReader.IsNull(i) { + start, end := offsets[i], offsets[i+1] + for j := start; j < end; j++ { + data = append(data, float32Reader.Value(int(j))) + } + } + } + } + if len(data) == 0 && len(validData) == 0 { + return nil, nil, nil + } + return data, validData, typeutil.VerifyFloats32(data) +} + +func ReadNullableInt8VectorData(pcr *FieldReader, count int64) (any, []bool, error) { + chunked, err := pcr.columnReader.NextBatch(count) + if err != nil { + return nil, nil, err + } + data := make([]int8, 0, int(count)*pcr.dim) + validData := make([]bool, 0, count) + + for _, chunk := range chunked.Chunks() { + listReader, ok := chunk.(*array.List) + if !ok { + // the chunk type may be *array.Null if the data in chunk is all null + _, ok := chunk.(*array.Null) + if !ok { + return nil, nil, WrapTypeErr(pcr.field, chunk.DataType().Name()) + } + dataNums := chunk.Data().Len() + validData = append(validData, make([]bool, dataNums)...) + continue + } + + dataType := pcr.field.GetDataType() + offsets := listReader.Offsets() + if err = checkNullableVectorAligned(offsets, listReader, pcr.dim, dataType); err != nil { + return nil, nil, merr.WrapErrImportFailed(fmt.Sprintf("length of vector is not aligned: %s, data type: %s", err.Error(), dataType.String())) + } + + valueReader := listReader.ListValues() + rows := listReader.Len() + + // Sparse storage: only store valid rows' data + int8Reader, ok := valueReader.(*array.Int8) + if !ok { + return nil, nil, WrapTypeErr(pcr.field, valueReader.DataType().Name()) + } + for i := 0; i < rows; i++ { + validData = append(validData, !listReader.IsNull(i)) + if !listReader.IsNull(i) { + start, end := offsets[i], offsets[i+1] + for j := start; j < end; j++ { + data = append(data, int8Reader.Value(int(j))) + } + } + } + } + if len(data) == 0 && len(validData) == 0 { + return nil, nil, nil + } + return data, validData, nil +} + func ReadStringArrayData(pcr *FieldReader, count int64) (any, error) { maxLength, err := parameterutil.GetMaxLength(pcr.field) if err != nil { diff --git a/internal/util/importutilv2/parquet/field_reader_test.go b/internal/util/importutilv2/parquet/field_reader_test.go index 97f35bdc9a..7cfbbb7482 100644 --- a/internal/util/importutilv2/parquet/field_reader_test.go +++ b/internal/util/importutilv2/parquet/field_reader_test.go @@ -345,7 +345,7 @@ func TestParseSparseFloatVectorStructs(t *testing.T) { } func TestReadFieldData(t *testing.T) { - checkFunc := func(dataHasNull bool, readScehamIsNullable bool, dataType schemapb.DataType, elementType schemapb.DataType) { + checkFunc := func(t *testing.T, nullPercent int, readScehamIsNullable bool, dataType schemapb.DataType, elementType schemapb.DataType) { fieldName := dataType.String() if elementType != schemapb.DataType_None { fieldName = fieldName + "_" + elementType.String() @@ -357,7 +357,7 @@ func TestReadFieldData(t *testing.T) { Name: fieldName, DataType: dataType, ElementType: elementType, - Nullable: dataHasNull, + Nullable: nullPercent != 0, TypeParams: []*commonpb.KeyValuePair{ { Key: "dim", @@ -397,10 +397,6 @@ func TestReadFieldData(t *testing.T) { assert.NoError(t, err) rowCount := 5 - nullPercent := 0 - if dataHasNull { - nullPercent = 50 - } insertData, err := testutil.CreateInsertData(schema, rowCount, nullPercent) assert.NoError(t, err) columns, err := testutil.BuildArrayData(schema, insertData, false) @@ -423,7 +419,7 @@ func TestReadFieldData(t *testing.T) { defer reader.Close() _, err = reader.Read() - if !readScehamIsNullable && dataHasNull { + if !readScehamIsNullable && nullPercent != 0 { assert.Error(t, err) } else { assert.NoError(t, err) @@ -432,17 +428,17 @@ func TestReadFieldData(t *testing.T) { type testCase struct { name string - dataHasNull bool + nullPercent int readScehamIsNullable bool dataType schemapb.DataType elementType schemapb.DataType } - buildCaseFunc := func(dataHasNull bool, readScehamIsNullable bool, dataType schemapb.DataType, elementType schemapb.DataType) *testCase { - name := fmt.Sprintf("dataHasNull='%v' schemaNullable='%v' dataType='%s' elementType='%s'", - dataHasNull, readScehamIsNullable, dataType, elementType) + buildCaseFunc := func(nullPercent int, readScehamIsNullable bool, dataType schemapb.DataType, elementType schemapb.DataType) *testCase { + name := fmt.Sprintf("nullPercent='%v' schemaNullable='%v' dataType='%s' elementType='%s'", + nullPercent, readScehamIsNullable, dataType, elementType) return &testCase{ name: name, - dataHasNull: dataHasNull, + nullPercent: nullPercent, readScehamIsNullable: readScehamIsNullable, dataType: dataType, elementType: elementType, @@ -459,12 +455,19 @@ func TestReadFieldData(t *testing.T) { schemapb.DataType_Float, schemapb.DataType_Double, schemapb.DataType_VarChar, + schemapb.DataType_FloatVector, + schemapb.DataType_BinaryVector, + schemapb.DataType_SparseFloatVector, + schemapb.DataType_Float16Vector, + schemapb.DataType_BFloat16Vector, + schemapb.DataType_Int8Vector, } for _, dataType := range nullableDataTypes { - cases = append(cases, buildCaseFunc(true, true, dataType, schemapb.DataType_None)) - cases = append(cases, buildCaseFunc(true, false, dataType, schemapb.DataType_None)) - cases = append(cases, buildCaseFunc(false, true, dataType, schemapb.DataType_None)) - cases = append(cases, buildCaseFunc(false, true, dataType, schemapb.DataType_None)) + for _, nullPercent := range []int{0, 50} { + for _, readScehamIsNullable := range []bool{true, false} { + cases = append(cases, buildCaseFunc(nullPercent, readScehamIsNullable, dataType, schemapb.DataType_None)) + } + } } elementTypes := []schemapb.DataType{ @@ -478,28 +481,23 @@ func TestReadFieldData(t *testing.T) { schemapb.DataType_VarChar, } for _, elementType := range elementTypes { - cases = append(cases, buildCaseFunc(true, true, schemapb.DataType_Array, elementType)) - cases = append(cases, buildCaseFunc(true, false, schemapb.DataType_Array, elementType)) - cases = append(cases, buildCaseFunc(false, true, schemapb.DataType_Array, elementType)) - cases = append(cases, buildCaseFunc(false, false, schemapb.DataType_Array, elementType)) + for _, nullPercent := range []int{0, 50} { + for _, readScehamIsNullable := range []bool{true, false} { + cases = append(cases, buildCaseFunc(nullPercent, readScehamIsNullable, schemapb.DataType_Array, elementType)) + } + } } notNullableTypes := []schemapb.DataType{ schemapb.DataType_JSON, - schemapb.DataType_FloatVector, - schemapb.DataType_BinaryVector, - schemapb.DataType_SparseFloatVector, - schemapb.DataType_Float16Vector, - schemapb.DataType_BFloat16Vector, - schemapb.DataType_Int8Vector, } for _, dataType := range notNullableTypes { - cases = append(cases, buildCaseFunc(false, false, dataType, schemapb.DataType_None)) + cases = append(cases, buildCaseFunc(0, false, dataType, schemapb.DataType_None)) } for _, tt := range cases { t.Run(tt.name, func(t *testing.T) { - checkFunc(tt.dataHasNull, tt.readScehamIsNullable, tt.dataType, tt.elementType) + checkFunc(t, tt.nullPercent, tt.readScehamIsNullable, tt.dataType, tt.elementType) }) } } @@ -659,8 +657,7 @@ func TestTypeMismatch(t *testing.T) { cases = append(cases, buildCaseFunc(schemapb.DataType_Bool, schemapb.DataType_None, schemapb.DataType_Array, elementType, false)) } - notNullableTypes := []schemapb.DataType{ - schemapb.DataType_JSON, + vectorTypes := []schemapb.DataType{ schemapb.DataType_FloatVector, schemapb.DataType_BinaryVector, schemapb.DataType_SparseFloatVector, @@ -668,6 +665,18 @@ func TestTypeMismatch(t *testing.T) { schemapb.DataType_BFloat16Vector, schemapb.DataType_Int8Vector, } + for _, dataType := range vectorTypes { + srcDataType := schemapb.DataType_Bool + cases = append(cases, buildCaseFunc(srcDataType, schemapb.DataType_None, dataType, schemapb.DataType_None, true)) + cases = append(cases, buildCaseFunc(srcDataType, schemapb.DataType_None, dataType, schemapb.DataType_None, false)) + + cases = append(cases, buildCaseFunc(schemapb.DataType_Array, schemapb.DataType_Bool, dataType, schemapb.DataType_None, true)) + cases = append(cases, buildCaseFunc(schemapb.DataType_Array, schemapb.DataType_Bool, dataType, schemapb.DataType_None, false)) + } + + notNullableTypes := []schemapb.DataType{ + schemapb.DataType_JSON, + } for _, dataType := range notNullableTypes { srcDataType := schemapb.DataType_Bool if dataType == schemapb.DataType_Bool { diff --git a/internal/util/importutilv2/parquet/reader_test.go b/internal/util/importutilv2/parquet/reader_test.go index c6d9c11342..758340e02f 100644 --- a/internal/util/importutilv2/parquet/reader_test.go +++ b/internal/util/importutilv2/parquet/reader_test.go @@ -450,8 +450,10 @@ func (s *ReaderSuite) runWithSparseVector(indicesType arrow.DataType, valuesType builder.AppendValues(int64Data, validData) arrowColumns = append(arrowColumns, builder.NewInt64Array()) - contents := insertData.Data[schema.Fields[1].FieldID].(*storage.SparseFloatVectorFieldData).GetContents() - arr, err := testutil.BuildSparseVectorData(mem, contents, arrowFields[1].Type) + sparseFieldData := insertData.Data[schema.Fields[1].FieldID].(*storage.SparseFloatVectorFieldData) + contents := sparseFieldData.GetContents() + sparseValidData := sparseFieldData.ValidData + arr, err := testutil.BuildSparseVectorData(mem, contents, arrowFields[1].Type, sparseValidData) assert.NoError(s.T(), err) arrowColumns = append(arrowColumns, arr) @@ -545,65 +547,34 @@ func (s *ReaderSuite) TestReadScalarFieldsWithDefaultValue() { } func (s *ReaderSuite) TestReadScalarFields() { - s.run(schemapb.DataType_Bool, schemapb.DataType_None, false, 0) - s.run(schemapb.DataType_Int8, schemapb.DataType_None, false, 0) - s.run(schemapb.DataType_Int16, schemapb.DataType_None, false, 0) - s.run(schemapb.DataType_Int32, schemapb.DataType_None, false, 0) - s.run(schemapb.DataType_Int64, schemapb.DataType_None, false, 0) - s.run(schemapb.DataType_Float, schemapb.DataType_None, false, 0) - s.run(schemapb.DataType_Double, schemapb.DataType_None, false, 0) - s.run(schemapb.DataType_String, schemapb.DataType_None, false, 0) - s.run(schemapb.DataType_VarChar, schemapb.DataType_None, false, 0) - s.run(schemapb.DataType_JSON, schemapb.DataType_None, false, 0) - s.run(schemapb.DataType_Geometry, schemapb.DataType_None, false, 0) + elementTypes := []schemapb.DataType{ + schemapb.DataType_Bool, + schemapb.DataType_Int8, + schemapb.DataType_Int16, + schemapb.DataType_Int32, + schemapb.DataType_Int64, + schemapb.DataType_Float, + schemapb.DataType_Double, + schemapb.DataType_String, + } - s.run(schemapb.DataType_Array, schemapb.DataType_Bool, false, 0) - s.run(schemapb.DataType_Array, schemapb.DataType_Int8, false, 0) - s.run(schemapb.DataType_Array, schemapb.DataType_Int16, false, 0) - s.run(schemapb.DataType_Array, schemapb.DataType_Int32, false, 0) - s.run(schemapb.DataType_Array, schemapb.DataType_Int64, false, 0) - s.run(schemapb.DataType_Array, schemapb.DataType_Float, false, 0) - s.run(schemapb.DataType_Array, schemapb.DataType_Double, false, 0) - s.run(schemapb.DataType_Array, schemapb.DataType_String, false, 0) + scalarTypes := append(elementTypes, []schemapb.DataType{schemapb.DataType_VarChar, schemapb.DataType_JSON, schemapb.DataType_Geometry, schemapb.DataType_Array}...) - s.run(schemapb.DataType_Bool, schemapb.DataType_None, true, 50) - s.run(schemapb.DataType_Int8, schemapb.DataType_None, true, 50) - s.run(schemapb.DataType_Int16, schemapb.DataType_None, true, 50) - s.run(schemapb.DataType_Int32, schemapb.DataType_None, true, 50) - s.run(schemapb.DataType_Int64, schemapb.DataType_None, true, 50) - s.run(schemapb.DataType_Float, schemapb.DataType_None, true, 50) - s.run(schemapb.DataType_String, schemapb.DataType_None, true, 50) - s.run(schemapb.DataType_VarChar, schemapb.DataType_None, true, 50) - s.run(schemapb.DataType_JSON, schemapb.DataType_None, true, 50) - - s.run(schemapb.DataType_Array, schemapb.DataType_Bool, true, 50) - s.run(schemapb.DataType_Array, schemapb.DataType_Int8, true, 50) - s.run(schemapb.DataType_Array, schemapb.DataType_Int16, true, 50) - s.run(schemapb.DataType_Array, schemapb.DataType_Int32, true, 50) - s.run(schemapb.DataType_Array, schemapb.DataType_Int64, true, 50) - s.run(schemapb.DataType_Array, schemapb.DataType_Float, true, 50) - s.run(schemapb.DataType_Array, schemapb.DataType_Double, true, 50) - s.run(schemapb.DataType_Array, schemapb.DataType_String, true, 50) - - s.run(schemapb.DataType_Bool, schemapb.DataType_None, true, 100) - s.run(schemapb.DataType_Int8, schemapb.DataType_None, true, 100) - s.run(schemapb.DataType_Int16, schemapb.DataType_None, true, 100) - s.run(schemapb.DataType_Int32, schemapb.DataType_None, true, 100) - s.run(schemapb.DataType_Int64, schemapb.DataType_None, true, 100) - s.run(schemapb.DataType_Float, schemapb.DataType_None, true, 100) - s.run(schemapb.DataType_String, schemapb.DataType_None, true, 100) - s.run(schemapb.DataType_VarChar, schemapb.DataType_None, true, 100) - s.run(schemapb.DataType_JSON, schemapb.DataType_None, true, 100) - s.run(schemapb.DataType_Geometry, schemapb.DataType_None, true, 100) - - s.run(schemapb.DataType_Array, schemapb.DataType_Bool, true, 100) - s.run(schemapb.DataType_Array, schemapb.DataType_Int8, true, 100) - s.run(schemapb.DataType_Array, schemapb.DataType_Int16, true, 100) - s.run(schemapb.DataType_Array, schemapb.DataType_Int32, true, 100) - s.run(schemapb.DataType_Array, schemapb.DataType_Int64, true, 100) - s.run(schemapb.DataType_Array, schemapb.DataType_Float, true, 100) - s.run(schemapb.DataType_Array, schemapb.DataType_Double, true, 100) - s.run(schemapb.DataType_Array, schemapb.DataType_String, true, 100) + for _, dataType := range scalarTypes { + if dataType == schemapb.DataType_Array { + for _, elementType := range elementTypes { + s.run(dataType, elementType, false, 0) + for _, nullPercent := range []int{0, 50, 100} { + s.run(dataType, elementType, true, nullPercent) + } + } + } else { + s.run(dataType, schemapb.DataType_None, false, 0) + for _, nullPercent := range []int{0, 50, 100} { + s.run(dataType, schemapb.DataType_None, true, nullPercent) + } + } + } s.failRun(schemapb.DataType_JSON, true) } @@ -611,24 +582,28 @@ func (s *ReaderSuite) TestReadScalarFields() { func (s *ReaderSuite) TestStringPK() { s.pkDataType = schemapb.DataType_VarChar s.run(schemapb.DataType_Int32, schemapb.DataType_None, false, 0) - s.run(schemapb.DataType_Int32, schemapb.DataType_None, true, 50) - s.run(schemapb.DataType_Int32, schemapb.DataType_None, true, 100) + for _, nullPercent := range []int{0, 50, 100} { + s.run(schemapb.DataType_Int32, schemapb.DataType_None, true, nullPercent) + } } func (s *ReaderSuite) TestVector() { - s.vecDataType = schemapb.DataType_BinaryVector - s.run(schemapb.DataType_Int32, schemapb.DataType_None, false, 0) - s.vecDataType = schemapb.DataType_FloatVector - s.run(schemapb.DataType_Int32, schemapb.DataType_None, false, 0) - s.vecDataType = schemapb.DataType_Float16Vector - s.run(schemapb.DataType_Int32, schemapb.DataType_None, false, 0) - s.vecDataType = schemapb.DataType_BFloat16Vector - s.run(schemapb.DataType_Int32, schemapb.DataType_None, false, 0) - // this test case only test parsing sparse vector from JSON-format string - s.vecDataType = schemapb.DataType_SparseFloatVector - s.run(schemapb.DataType_Int32, schemapb.DataType_None, false, 0) - s.vecDataType = schemapb.DataType_Int8Vector - s.run(schemapb.DataType_Int32, schemapb.DataType_None, false, 0) + dataTypes := []schemapb.DataType{ + schemapb.DataType_BinaryVector, + schemapb.DataType_FloatVector, + schemapb.DataType_Float16Vector, + schemapb.DataType_BFloat16Vector, + schemapb.DataType_SparseFloatVector, + schemapb.DataType_Int8Vector, + } + + for _, dataType := range dataTypes { + s.vecDataType = dataType + s.run(schemapb.DataType_Int32, schemapb.DataType_None, false, 0) + for _, nullPercent := range []int{0, 50, 100} { + s.run(schemapb.DataType_Int32, schemapb.DataType_None, true, nullPercent) + } + } } func (s *ReaderSuite) TestSparseVector() { diff --git a/internal/util/testutil/test_util.go b/internal/util/testutil/test_util.go index 700b55b774..314f0248ae 100644 --- a/internal/util/testutil/test_util.go +++ b/internal/util/testutil/test_util.go @@ -110,11 +110,76 @@ func CreateInsertData(schema *schemapb.CollectionSchema, rows int, nullPercent . if err != nil { return nil, err } + // Pre-generate validData for nullable fields to determine sparse storage size + validDataMap := make(map[int64][]bool) allFields := typeutil.GetAllFieldSchemas(schema) for _, f := range allFields { if f.GetAutoID() || f.IsFunctionOutput { continue } + if f.GetNullable() { + if len(nullPercent) > 1 { + return nil, merr.WrapErrParameterInvalidMsg("the length of nullPercent is wrong") + } + var validData []bool + if len(nullPercent) == 0 || nullPercent[0] == 50 { + validData = testutils.GenerateBoolArray(rows) + } else if len(nullPercent) == 1 && nullPercent[0] == 100 { + validData = make([]bool, rows) + } else if len(nullPercent) == 1 && nullPercent[0] == 0 { + validData = make([]bool, rows) + for i := range validData { + validData[i] = true + } + } else { + return nil, merr.WrapErrParameterInvalidMsg(fmt.Sprintf("not support the number of nullPercent(%d)", nullPercent)) + } + validDataMap[f.FieldID] = validData + } + } + + // Helper function to check if a type is a vector type (uses sparse storage) + isVectorType := func(dataType schemapb.DataType) bool { + switch dataType { + case schemapb.DataType_BinaryVector, + schemapb.DataType_FloatVector, + schemapb.DataType_Float16Vector, + schemapb.DataType_BFloat16Vector, + schemapb.DataType_SparseFloatVector, + schemapb.DataType_Int8Vector: + return true + default: + return false + } + } + + // Helper function to count valid rows + countValidRows := func(validData []bool) int { + if len(validData) == 0 { + return rows + } + count := 0 + for _, v := range validData { + if v { + count++ + } + } + return count + } + + for _, f := range allFields { + if f.GetAutoID() || f.IsFunctionOutput { + continue + } + validData := validDataMap[f.FieldID] + // Vector types use sparse storage (only valid rows), scalar types use dense storage (all rows) + var dataRows int + if isVectorType(f.GetDataType()) { + dataRows = countValidRows(validData) + } else { + dataRows = rows + } + switch f.GetDataType() { case schemapb.DataType_Bool: insertData.Data[f.FieldID].AppendDataRows(testutils.GenerateBoolArray(rows)) @@ -135,54 +200,47 @@ func CreateInsertData(schema *schemapb.CollectionSchema, rows int, nullPercent . if err != nil { return nil, err } - insertData.Data[f.FieldID] = &storage.BinaryVectorFieldData{ - Data: testutils.GenerateBinaryVectors(rows, int(dim)), - Dim: int(dim), - } + // For nullable vectors, use sparse storage (only generate valid rows) + insertData.Data[f.FieldID].(*storage.BinaryVectorFieldData).Data = testutils.GenerateBinaryVectors(dataRows, int(dim)) + insertData.Data[f.FieldID].(*storage.BinaryVectorFieldData).Dim = int(dim) case schemapb.DataType_FloatVector: dim, err := typeutil.GetDim(f) if err != nil { return nil, err } - insertData.Data[f.GetFieldID()] = &storage.FloatVectorFieldData{ - Data: testutils.GenerateFloatVectors(rows, int(dim)), - Dim: int(dim), - } + // For nullable vectors, use sparse storage (only generate valid rows) + insertData.Data[f.GetFieldID()].(*storage.FloatVectorFieldData).Data = testutils.GenerateFloatVectors(dataRows, int(dim)) + insertData.Data[f.GetFieldID()].(*storage.FloatVectorFieldData).Dim = int(dim) case schemapb.DataType_Float16Vector: dim, err := typeutil.GetDim(f) if err != nil { return nil, err } - insertData.Data[f.FieldID] = &storage.Float16VectorFieldData{ - Data: testutils.GenerateFloat16Vectors(rows, int(dim)), - Dim: int(dim), - } + // For nullable vectors, use sparse storage (only generate valid rows) + insertData.Data[f.FieldID].(*storage.Float16VectorFieldData).Data = testutils.GenerateFloat16Vectors(dataRows, int(dim)) + insertData.Data[f.FieldID].(*storage.Float16VectorFieldData).Dim = int(dim) case schemapb.DataType_BFloat16Vector: dim, err := typeutil.GetDim(f) if err != nil { return nil, err } - insertData.Data[f.FieldID] = &storage.BFloat16VectorFieldData{ - Data: testutils.GenerateBFloat16Vectors(rows, int(dim)), - Dim: int(dim), - } + // For nullable vectors, use sparse storage (only generate valid rows) + insertData.Data[f.FieldID].(*storage.BFloat16VectorFieldData).Data = testutils.GenerateBFloat16Vectors(dataRows, int(dim)) + insertData.Data[f.FieldID].(*storage.BFloat16VectorFieldData).Dim = int(dim) case schemapb.DataType_SparseFloatVector: - data, dim := testutils.GenerateSparseFloatVectorsData(rows) - insertData.Data[f.FieldID] = &storage.SparseFloatVectorFieldData{ - SparseFloatArray: schemapb.SparseFloatArray{ - Contents: data, - Dim: dim, - }, - } + // For nullable vectors, use sparse storage (only generate valid rows) + data, dim := testutils.GenerateSparseFloatVectorsData(dataRows) + sparseData := insertData.Data[f.FieldID].(*storage.SparseFloatVectorFieldData) + sparseData.Contents = data + sparseData.Dim = dim case schemapb.DataType_Int8Vector: dim, err := typeutil.GetDim(f) if err != nil { return nil, err } - insertData.Data[f.FieldID] = &storage.Int8VectorFieldData{ - Data: testutils.GenerateInt8Vectors(rows, int(dim)), - Dim: int(dim), - } + // For nullable vectors, use sparse storage (only generate valid rows) + insertData.Data[f.FieldID].(*storage.Int8VectorFieldData).Data = testutils.GenerateInt8Vectors(dataRows, int(dim)) + insertData.Data[f.FieldID].(*storage.Int8VectorFieldData).Dim = int(dim) case schemapb.DataType_String, schemapb.DataType_VarChar: insertData.Data[f.FieldID].AppendDataRows(testutils.GenerateStringArray(rows)) case schemapb.DataType_JSON: @@ -220,23 +278,10 @@ func CreateInsertData(schema *schemapb.CollectionSchema, rows int, nullPercent . default: panic(fmt.Sprintf("unsupported data type: %s", f.GetDataType().String())) } + // Apply pre-generated validData for nullable fields if f.GetNullable() { - if len(nullPercent) > 1 { - return nil, merr.WrapErrParameterInvalidMsg("the length of nullPercent is wrong") - } - if len(nullPercent) == 0 || nullPercent[0] == 50 { - insertData.Data[f.FieldID].AppendValidDataRows(testutils.GenerateBoolArray(rows)) - } else if len(nullPercent) == 1 && nullPercent[0] == 100 { - insertData.Data[f.FieldID].AppendValidDataRows(make([]bool, rows)) - } else if len(nullPercent) == 1 && nullPercent[0] == 0 { - validData := make([]bool, rows) - for i := range validData { - validData[i] = true - } - insertData.Data[f.FieldID].AppendValidDataRows(validData) - } else { - return nil, merr.WrapErrParameterInvalidMsg(fmt.Sprintf("not support the number of nullPercent(%d)", nullPercent)) - } + validData := validDataMap[f.FieldID] + insertData.Data[f.FieldID].AppendValidDataRows(validData) } } return insertData, nil @@ -304,25 +349,34 @@ func CreateFieldWithDefaultValue(dataType schemapb.DataType, id int64, nullable return field, nil } -func BuildSparseVectorData(mem *memory.GoAllocator, contents [][]byte, arrowType arrow.DataType) (arrow.Array, error) { +func BuildSparseVectorData(mem *memory.GoAllocator, contents [][]byte, arrowType arrow.DataType, validData []bool) (arrow.Array, error) { if arrowType == nil || arrowType.ID() == arrow.STRING { // build sparse vector as JSON-format string builder := array.NewStringBuilder(mem) - rows := len(contents) - jsonBytesData := make([][]byte, 0) - for i := 0; i < rows; i++ { - rowVecData := contents[i] - mapData := typeutil.SparseFloatBytesToMap(rowVecData) - // convert to JSON format - jsonBytes, err := json.Marshal(mapData) - if err != nil { - return nil, err - } - jsonBytesData = append(jsonBytesData, jsonBytes) + // For sparse storage: iterate over logical rows, use physical index for contents + var logicalRows int + if len(validData) > 0 { + logicalRows = len(validData) + } else { + logicalRows = len(contents) + } + physicalIdx := 0 + for i := 0; i < logicalRows; i++ { + isValid := len(validData) == 0 || validData[i] + if isValid { + rowVecData := contents[physicalIdx] + mapData := typeutil.SparseFloatBytesToMap(rowVecData) + // convert to JSON format + jsonBytes, err := json.Marshal(mapData) + if err != nil { + return nil, err + } + builder.Append(string(jsonBytes)) + physicalIdx++ + } else { + builder.AppendNull() + } } - builder.AppendValues(lo.Map(jsonBytesData, func(bs []byte, _ int) string { - return string(bs) - }), nil) return builder.NewStringArray(), nil } else if arrowType.ID() == arrow.STRUCT { // build sparse vector as parquet struct @@ -399,15 +453,27 @@ func BuildSparseVectorData(mem *memory.GoAllocator, contents [][]byte, arrowType return nil, merr.WrapErrImportFailed(msg) } - for i := 0; i < len(contents); i++ { - builder.Append(true) - indicesBuilder.Append(true) - valuesBuilder.Append(true) - rowVecData := contents[i] - elemCount := len(rowVecData) / 8 - for j := 0; j < elemCount; j++ { - appendIndexFunc(common.Endian.Uint32(rowVecData[j*8:])) - appendValueFunc(math.Float32frombits(common.Endian.Uint32(rowVecData[j*8+4:]))) + // For sparse storage: iterate over logical rows, use physical index for contents + var logicalRows int + if len(validData) > 0 { + logicalRows = len(validData) + } else { + logicalRows = len(contents) + } + physicalIdx := 0 + for i := 0; i < logicalRows; i++ { + isValid := len(validData) == 0 || validData[i] + builder.Append(isValid) + indicesBuilder.Append(isValid) + valuesBuilder.Append(isValid) + if isValid { + rowVecData := contents[physicalIdx] + elemCount := len(rowVecData) / 8 + for j := 0; j < elemCount; j++ { + appendIndexFunc(common.Endian.Uint32(rowVecData[j*8:])) + appendValueFunc(math.Float32frombits(common.Endian.Uint32(rowVecData[j*8+4:]))) + } + physicalIdx++ } } return builder.NewStructArray(), nil @@ -490,82 +556,183 @@ func BuildArrayData(schema *schemapb.CollectionSchema, insertData *storage.Inser columns = append(columns, builder.NewStringArray()) case schemapb.DataType_BinaryVector: builder := array.NewListBuilder(mem, &arrow.Uint8Type{}) + valueBuilder := builder.ValueBuilder().(*array.Uint8Builder) dim := insertData.Data[fieldID].(*storage.BinaryVectorFieldData).Dim binVecData := insertData.Data[fieldID].(*storage.BinaryVectorFieldData).Data + validData := insertData.Data[fieldID].(*storage.BinaryVectorFieldData).ValidData rowBytes := dim / 8 - rows := len(binVecData) / rowBytes - offsets := make([]int32, 0, rows) - valid := make([]bool, 0) - for i := 0; i < rows; i++ { - offsets = append(offsets, int32(i*rowBytes)) - valid = append(valid, true) + // For sparse storage: logicalRows from validData, physicalRows from data + var logicalRows int + if len(validData) > 0 { + logicalRows = len(validData) + } else { + logicalRows = len(binVecData) / rowBytes } - builder.ValueBuilder().(*array.Uint8Builder).AppendValues(binVecData, nil) + offsets := make([]int32, 0, logicalRows+1) + valid := make([]bool, 0, logicalRows) + currOffset := int32(0) + physicalIdx := 0 // Track physical index in sparse data + for i := 0; i < logicalRows; i++ { + offsets = append(offsets, currOffset) + if len(validData) > 0 && !validData[i] { + valid = append(valid, false) + } else { + // Use physical index for sparse storage + start := physicalIdx * rowBytes + end := start + rowBytes + valueBuilder.AppendValues(binVecData[start:end], nil) + currOffset += int32(rowBytes) + valid = append(valid, true) + physicalIdx++ // Increment only for valid rows + } + } + offsets = append(offsets, currOffset) builder.AppendValues(offsets, valid) columns = append(columns, builder.NewListArray()) case schemapb.DataType_FloatVector: builder := array.NewListBuilder(mem, &arrow.Float32Type{}) + valueBuilder := builder.ValueBuilder().(*array.Float32Builder) dim := insertData.Data[fieldID].(*storage.FloatVectorFieldData).Dim floatVecData := insertData.Data[fieldID].(*storage.FloatVectorFieldData).Data - rows := len(floatVecData) / dim - offsets := make([]int32, 0, rows) - valid := make([]bool, 0, rows) - for i := 0; i < rows; i++ { - offsets = append(offsets, int32(i*dim)) - valid = append(valid, true) + validData := insertData.Data[fieldID].(*storage.FloatVectorFieldData).ValidData + // For sparse storage: logicalRows from validData, physicalRows from data + var logicalRows int + if len(validData) > 0 { + logicalRows = len(validData) + } else { + logicalRows = len(floatVecData) / dim } - builder.ValueBuilder().(*array.Float32Builder).AppendValues(floatVecData, nil) + offsets := make([]int32, 0, logicalRows+1) + valid := make([]bool, 0, logicalRows) + currOffset := int32(0) + physicalIdx := 0 // Track physical index in sparse data + for i := 0; i < logicalRows; i++ { + offsets = append(offsets, currOffset) + if len(validData) > 0 && !validData[i] { + valid = append(valid, false) + } else { + // Use physical index for sparse storage + start := physicalIdx * dim + end := start + dim + valueBuilder.AppendValues(floatVecData[start:end], nil) + currOffset += int32(dim) + valid = append(valid, true) + physicalIdx++ // Increment only for valid rows + } + } + offsets = append(offsets, currOffset) builder.AppendValues(offsets, valid) columns = append(columns, builder.NewListArray()) case schemapb.DataType_Float16Vector: builder := array.NewListBuilder(mem, &arrow.Uint8Type{}) + valueBuilder := builder.ValueBuilder().(*array.Uint8Builder) dim := insertData.Data[fieldID].(*storage.Float16VectorFieldData).Dim float16VecData := insertData.Data[fieldID].(*storage.Float16VectorFieldData).Data + validData := insertData.Data[fieldID].(*storage.Float16VectorFieldData).ValidData rowBytes := dim * 2 - rows := len(float16VecData) / rowBytes - offsets := make([]int32, 0, rows) - valid := make([]bool, 0, rows) - for i := 0; i < rows; i++ { - offsets = append(offsets, int32(i*rowBytes)) - valid = append(valid, true) + // For sparse storage: logicalRows from validData, physicalRows from data + var logicalRows int + if len(validData) > 0 { + logicalRows = len(validData) + } else { + logicalRows = len(float16VecData) / rowBytes } - builder.ValueBuilder().(*array.Uint8Builder).AppendValues(float16VecData, nil) + offsets := make([]int32, 0, logicalRows+1) + valid := make([]bool, 0, logicalRows) + currOffset := int32(0) + physicalIdx := 0 // Track physical index in sparse data + for i := 0; i < logicalRows; i++ { + offsets = append(offsets, currOffset) + if len(validData) > 0 && !validData[i] { + valid = append(valid, false) + } else { + // Use physical index for sparse storage + start := physicalIdx * rowBytes + end := start + rowBytes + valueBuilder.AppendValues(float16VecData[start:end], nil) + currOffset += int32(rowBytes) + valid = append(valid, true) + physicalIdx++ // Increment only for valid rows + } + } + offsets = append(offsets, currOffset) builder.AppendValues(offsets, valid) columns = append(columns, builder.NewListArray()) case schemapb.DataType_BFloat16Vector: builder := array.NewListBuilder(mem, &arrow.Uint8Type{}) + valueBuilder := builder.ValueBuilder().(*array.Uint8Builder) dim := insertData.Data[fieldID].(*storage.BFloat16VectorFieldData).Dim bfloat16VecData := insertData.Data[fieldID].(*storage.BFloat16VectorFieldData).Data + validData := insertData.Data[fieldID].(*storage.BFloat16VectorFieldData).ValidData rowBytes := dim * 2 - rows := len(bfloat16VecData) / rowBytes - offsets := make([]int32, 0, rows) - valid := make([]bool, 0, rows) - for i := 0; i < rows; i++ { - offsets = append(offsets, int32(i*rowBytes)) - valid = append(valid, true) + // For sparse storage: logicalRows from validData, physicalRows from data + var logicalRows int + if len(validData) > 0 { + logicalRows = len(validData) + } else { + logicalRows = len(bfloat16VecData) / rowBytes } - builder.ValueBuilder().(*array.Uint8Builder).AppendValues(bfloat16VecData, nil) + offsets := make([]int32, 0, logicalRows+1) + valid := make([]bool, 0, logicalRows) + currOffset := int32(0) + physicalIdx := 0 // Track physical index in sparse data + for i := 0; i < logicalRows; i++ { + offsets = append(offsets, currOffset) + if len(validData) > 0 && !validData[i] { + valid = append(valid, false) + } else { + // Use physical index for sparse storage + start := physicalIdx * rowBytes + end := start + rowBytes + valueBuilder.AppendValues(bfloat16VecData[start:end], nil) + currOffset += int32(rowBytes) + valid = append(valid, true) + physicalIdx++ // Increment only for valid rows + } + } + offsets = append(offsets, currOffset) builder.AppendValues(offsets, valid) columns = append(columns, builder.NewListArray()) case schemapb.DataType_SparseFloatVector: contents := insertData.Data[fieldID].(*storage.SparseFloatVectorFieldData).GetContents() - arr, err := BuildSparseVectorData(mem, contents, nil) + validData := insertData.Data[fieldID].(*storage.SparseFloatVectorFieldData).ValidData + arr, err := BuildSparseVectorData(mem, contents, nil, validData) if err != nil { return nil, err } columns = append(columns, arr) case schemapb.DataType_Int8Vector: builder := array.NewListBuilder(mem, &arrow.Int8Type{}) + valueBuilder := builder.ValueBuilder().(*array.Int8Builder) dim := insertData.Data[fieldID].(*storage.Int8VectorFieldData).Dim int8VecData := insertData.Data[fieldID].(*storage.Int8VectorFieldData).Data - rows := len(int8VecData) / dim - offsets := make([]int32, 0, rows) - valid := make([]bool, 0, rows) - for i := 0; i < rows; i++ { - offsets = append(offsets, int32(i*dim)) - valid = append(valid, true) + validData := insertData.Data[fieldID].(*storage.Int8VectorFieldData).ValidData + // For sparse storage: logicalRows from validData, physicalRows from data + var logicalRows int + if len(validData) > 0 { + logicalRows = len(validData) + } else { + logicalRows = len(int8VecData) / dim } - builder.ValueBuilder().(*array.Int8Builder).AppendValues(int8VecData, nil) + offsets := make([]int32, 0, logicalRows+1) + valid := make([]bool, 0, logicalRows) + currOffset := int32(0) + physicalIdx := 0 // Track physical index in sparse data + for i := 0; i < logicalRows; i++ { + offsets = append(offsets, currOffset) + if len(validData) > 0 && !validData[i] { + valid = append(valid, false) + } else { + // Use physical index for sparse storage + start := physicalIdx * dim + end := start + dim + valueBuilder.AppendValues(int8VecData[start:end], nil) + currOffset += int32(dim) + valid = append(valid, true) + physicalIdx++ // Increment only for valid rows + } + } + offsets = append(offsets, currOffset) builder.AppendValues(offsets, valid) columns = append(columns, builder.NewListArray()) case schemapb.DataType_JSON: