diff --git a/internal/datanode/importv2/util.go b/internal/datanode/importv2/util.go index 1e103f3bb2..d27cb58373 100644 --- a/internal/datanode/importv2/util.go +++ b/internal/datanode/importv2/util.go @@ -374,6 +374,19 @@ func AppendNullableDefaultFieldsData(schema *schemapb.CollectionSchema, data *st appender := &nullDefaultAppender[*schemapb.ScalarField]{} err = appender.AppendNull(fieldData, rowNum) } + case schemapb.DataType_FloatVector, + schemapb.DataType_Float16Vector, + schemapb.DataType_BFloat16Vector, + schemapb.DataType_BinaryVector, + schemapb.DataType_SparseFloatVector, + schemapb.DataType_Int8Vector: + if nullable { + for i := 0; i < rowNum; i++ { + if err = fieldData.AppendRow(nil); err != nil { + return err + } + } + } default: return fmt.Errorf("Unexpected data type: %d, cannot be filled with default value", dataType) } diff --git a/internal/datanode/importv2/util_test.go b/internal/datanode/importv2/util_test.go index cd392f50c2..be4223d6bf 100644 --- a/internal/datanode/importv2/util_test.go +++ b/internal/datanode/importv2/util_test.go @@ -495,10 +495,52 @@ func Test_AppendNullableDefaultFieldsData(t *testing.T) { }, }, }, + { + name: "float vector is nullable", + fieldID: 200, + dataType: schemapb.DataType_FloatVector, + nullable: true, + }, + { + name: "float16 vector is nullable", + fieldID: 200, + dataType: schemapb.DataType_Float16Vector, + nullable: true, + }, + { + name: "bfloat16 vector is nullable", + fieldID: 200, + dataType: schemapb.DataType_BFloat16Vector, + nullable: true, + }, + { + name: "binary vector is nullable", + fieldID: 200, + dataType: schemapb.DataType_BinaryVector, + nullable: true, + }, + { + name: "sparse float vector is nullable", + fieldID: 200, + dataType: schemapb.DataType_SparseFloatVector, + nullable: true, + }, + { + name: "int8 vector is nullable", + fieldID: 200, + dataType: schemapb.DataType_Int8Vector, + nullable: true, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { schema := buildSchemaFn() + isVectorType := tt.dataType == schemapb.DataType_FloatVector || + tt.dataType == schemapb.DataType_Float16Vector || + tt.dataType == schemapb.DataType_BFloat16Vector || + tt.dataType == schemapb.DataType_BinaryVector || + tt.dataType == schemapb.DataType_SparseFloatVector || + tt.dataType == schemapb.DataType_Int8Vector fieldSchema := &schemapb.FieldSchema{ FieldID: tt.fieldID, Name: fmt.Sprintf("field_%d", tt.fieldID), @@ -511,10 +553,12 @@ func Test_AppendNullableDefaultFieldsData(t *testing.T) { fieldSchema.TypeParams = append(fieldSchema.TypeParams, &commonpb.KeyValuePair{Key: common.MaxCapacityKey, Value: "100"}) } else if tt.dataType == schemapb.DataType_VarChar { fieldSchema.TypeParams = append(fieldSchema.TypeParams, &commonpb.KeyValuePair{Key: common.MaxLengthKey, Value: "100"}) + } else if isVectorType && tt.dataType != schemapb.DataType_SparseFloatVector { + fieldSchema.TypeParams = append(fieldSchema.TypeParams, &commonpb.KeyValuePair{Key: common.DimKey, Value: "8"}) } // create data without the new field - insertData, err := testutil.CreateInsertData(schema, count) + insertData, err := testutil.CreateInsertData(schema, count, 100) assert.NoError(t, err) // add new nullalbe/default field to the schema @@ -524,7 +568,7 @@ func Test_AppendNullableDefaultFieldsData(t *testing.T) { tempSchema := &schemapb.CollectionSchema{ Fields: []*schemapb.FieldSchema{fieldSchema}, } - tempData, err := testutil.CreateInsertData(tempSchema, 1) + tempData, err := testutil.CreateInsertData(tempSchema, 1, 100) assert.NoError(t, err) insertData.Data[fieldSchema.GetFieldID()] = tempData.Data[fieldSchema.GetFieldID()] diff --git a/internal/util/importutilv2/csv/reader_test.go b/internal/util/importutilv2/csv/reader_test.go index 3b1b38da19..b6e24fae67 100644 --- a/internal/util/importutilv2/csv/reader_test.go +++ b/internal/util/importutilv2/csv/reader_test.go @@ -58,7 +58,7 @@ func (suite *ReaderSuite) SetupTest() { suite.vecDataType = schemapb.DataType_FloatVector } -func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.DataType, nullable bool) { +func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.DataType, nullable bool, nullPercent int) { schema := &schemapb.CollectionSchema{ Fields: []*schemapb.FieldSchema{ { @@ -83,6 +83,7 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data Value: "8", }, }, + Nullable: nullable, }, { FieldID: 102, @@ -129,7 +130,7 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data nullkey := "" // generate csv data - insertData, err := testutil.CreateInsertData(schema, suite.numRows) + insertData, err := testutil.CreateInsertData(schema, suite.numRows, nullPercent) suite.NoError(err) csvData, err := testutil.CreateInsertDataForCSV(schema, insertData, nullkey) suite.NoError(err) @@ -178,65 +179,60 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data } func (suite *ReaderSuite) TestReadScalarFields() { - suite.run(schemapb.DataType_Bool, schemapb.DataType_None, false) - suite.run(schemapb.DataType_Int8, schemapb.DataType_None, false) - suite.run(schemapb.DataType_Int16, schemapb.DataType_None, false) - suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) - suite.run(schemapb.DataType_Int64, schemapb.DataType_None, false) - suite.run(schemapb.DataType_Float, schemapb.DataType_None, false) - suite.run(schemapb.DataType_Double, schemapb.DataType_None, false) - suite.run(schemapb.DataType_String, schemapb.DataType_None, false) - suite.run(schemapb.DataType_VarChar, schemapb.DataType_None, false) - suite.run(schemapb.DataType_JSON, schemapb.DataType_None, false) + elementTypes := []schemapb.DataType{ + schemapb.DataType_Bool, + schemapb.DataType_Int8, + schemapb.DataType_Int16, + schemapb.DataType_Int32, + schemapb.DataType_Int64, + schemapb.DataType_Float, + schemapb.DataType_Double, + schemapb.DataType_String, + } + scalarTypes := append(elementTypes, []schemapb.DataType{schemapb.DataType_VarChar, schemapb.DataType_JSON, schemapb.DataType_Array}...) - suite.run(schemapb.DataType_Array, schemapb.DataType_Bool, false) - suite.run(schemapb.DataType_Array, schemapb.DataType_Int8, false) - suite.run(schemapb.DataType_Array, schemapb.DataType_Int16, false) - suite.run(schemapb.DataType_Array, schemapb.DataType_Int32, false) - suite.run(schemapb.DataType_Array, schemapb.DataType_Int64, false) - suite.run(schemapb.DataType_Array, schemapb.DataType_Float, false) - suite.run(schemapb.DataType_Array, schemapb.DataType_Double, false) - suite.run(schemapb.DataType_Array, schemapb.DataType_String, false) - - suite.run(schemapb.DataType_Bool, schemapb.DataType_None, true) - suite.run(schemapb.DataType_Int8, schemapb.DataType_None, true) - suite.run(schemapb.DataType_Int16, schemapb.DataType_None, true) - suite.run(schemapb.DataType_Int32, schemapb.DataType_None, true) - suite.run(schemapb.DataType_Int64, schemapb.DataType_None, true) - suite.run(schemapb.DataType_Float, schemapb.DataType_None, true) - suite.run(schemapb.DataType_Double, schemapb.DataType_None, true) - suite.run(schemapb.DataType_String, schemapb.DataType_None, true) - suite.run(schemapb.DataType_VarChar, schemapb.DataType_None, true) - suite.run(schemapb.DataType_JSON, schemapb.DataType_None, true) - - suite.run(schemapb.DataType_Array, schemapb.DataType_Bool, true) - suite.run(schemapb.DataType_Array, schemapb.DataType_Int8, true) - suite.run(schemapb.DataType_Array, schemapb.DataType_Int16, true) - suite.run(schemapb.DataType_Array, schemapb.DataType_Int32, true) - suite.run(schemapb.DataType_Array, schemapb.DataType_Int64, true) - suite.run(schemapb.DataType_Array, schemapb.DataType_Float, true) - suite.run(schemapb.DataType_Array, schemapb.DataType_Double, true) - suite.run(schemapb.DataType_Array, schemapb.DataType_String, true) + for _, dataType := range scalarTypes { + if dataType == schemapb.DataType_Array { + for _, elementType := range elementTypes { + suite.run(dataType, elementType, false, 0) + for _, nullPercent := range []int{0, 50, 100} { + suite.run(dataType, elementType, true, nullPercent) + } + } + } else { + suite.run(dataType, schemapb.DataType_None, false, 0) + for _, nullPercent := range []int{0, 50, 100} { + suite.run(dataType, schemapb.DataType_None, true, nullPercent) + } + } + } } func (suite *ReaderSuite) TestStringPK() { suite.pkDataType = schemapb.DataType_VarChar - suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) + suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false, 0) + for _, nullPercent := range []int{0, 50, 100} { + suite.run(schemapb.DataType_Int32, schemapb.DataType_None, true, nullPercent) + } } func (suite *ReaderSuite) TestVector() { - suite.vecDataType = schemapb.DataType_BinaryVector - suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) - suite.vecDataType = schemapb.DataType_FloatVector - suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) - suite.vecDataType = schemapb.DataType_Float16Vector - suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) - suite.vecDataType = schemapb.DataType_BFloat16Vector - suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) - suite.vecDataType = schemapb.DataType_SparseFloatVector - suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) - suite.vecDataType = schemapb.DataType_Int8Vector - suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) + dataTypes := []schemapb.DataType{ + schemapb.DataType_BinaryVector, + schemapb.DataType_FloatVector, + schemapb.DataType_Float16Vector, + schemapb.DataType_BFloat16Vector, + schemapb.DataType_SparseFloatVector, + schemapb.DataType_Int8Vector, + } + + for _, dataType := range dataTypes { + suite.vecDataType = dataType + suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false, 0) + for _, nullPercent := range []int{0, 50, 100} { + suite.run(schemapb.DataType_Int32, schemapb.DataType_None, true, nullPercent) + } + } } func (suite *ReaderSuite) TestError() { diff --git a/internal/util/importutilv2/csv/row_parser_test.go b/internal/util/importutilv2/csv/row_parser_test.go index aa9f14fa64..7f7f4d53cb 100644 --- a/internal/util/importutilv2/csv/row_parser_test.go +++ b/internal/util/importutilv2/csv/row_parser_test.go @@ -131,35 +131,41 @@ func (suite *RowParserSuite) createAllTypesSchema() *schemapb.CollectionSchema { Name: "float_vector", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "2"}}, + Nullable: suite.hasNullable, }, { FieldID: 22, Name: "bin_vector", DataType: schemapb.DataType_BinaryVector, TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "16"}}, + Nullable: suite.hasNullable, }, { FieldID: 23, Name: "sparse_vector", DataType: schemapb.DataType_SparseFloatVector, + Nullable: suite.hasNullable, }, { FieldID: 24, Name: "f16_vector", DataType: schemapb.DataType_Float16Vector, TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "2"}}, + Nullable: suite.hasNullable, }, { FieldID: 25, Name: "bf16_vector", DataType: schemapb.DataType_BFloat16Vector, TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "2"}}, + Nullable: suite.hasNullable, }, { FieldID: 26, Name: "int8_vector", DataType: schemapb.DataType_Int8Vector, TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "2"}}, + Nullable: suite.hasNullable, }, { FieldID: 27, @@ -648,6 +654,12 @@ func (suite *RowParserSuite) TestValid() { suite.runValid(&testCase{name: "A/N/D nullable field varchar is nil", content: suite.genAllTypesRowData("varchar", suite.nullKey)}) suite.runValid(&testCase{name: "A/N/D nullable field json is nil", content: suite.genAllTypesRowData("json", suite.nullKey)}) suite.runValid(&testCase{name: "A/N/D nullable field array_int8 is nil", content: suite.genAllTypesRowData("array_int8", suite.nullKey)}) + suite.runValid(&testCase{name: "A/N/D nullable field float_vector is nil", content: suite.genAllTypesRowData("float_vector", suite.nullKey)}) + suite.runValid(&testCase{name: "A/N/D nullable field bin_vector is nil", content: suite.genAllTypesRowData("bin_vector", suite.nullKey)}) + suite.runValid(&testCase{name: "A/N/D nullable field sparse_vector is nil", content: suite.genAllTypesRowData("sparse_vector", suite.nullKey)}) + suite.runValid(&testCase{name: "A/N/D nullable field f16_vector is nil", content: suite.genAllTypesRowData("f16_vector", suite.nullKey)}) + suite.runValid(&testCase{name: "A/N/D nullable field bf16_vector is nil", content: suite.genAllTypesRowData("bf16_vector", suite.nullKey)}) + suite.runValid(&testCase{name: "A/N/D nullable field int8_vector is nil", content: suite.genAllTypesRowData("int8_vector", suite.nullKey)}) // Test struct array parsing suite.runValid(&testCase{name: "A/N/D struct array valid", content: suite.genAllTypesRowData("x", "2")}) @@ -704,13 +716,6 @@ func (suite *RowParserSuite) TestParseError() { suite.Error(err) suite.Nil(parser) - // field value missed - content = suite.genAllTypesRowData("x", "2", "float_vector") - header, _ = suite.genRowContent(schema, content) - parser, err = NewRowParser(schema, header, suite.nullKey) - suite.Error(err) - suite.Nil(parser) - // function output no need provide content = suite.genAllTypesRowData("x", "2") content["function_sparse_vector"] = "{\"1\":0.5,\"10\":1.5,\"100\":2.5}" diff --git a/internal/util/importutilv2/json/reader_test.go b/internal/util/importutilv2/json/reader_test.go index e71b702215..fb65f3f498 100644 --- a/internal/util/importutilv2/json/reader_test.go +++ b/internal/util/importutilv2/json/reader_test.go @@ -56,7 +56,7 @@ func (suite *ReaderSuite) SetupTest() { suite.vecDataType = schemapb.DataType_FloatVector } -func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.DataType, nullable bool) { +func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.DataType, nullable bool, nullPercent int) { schema := &schemapb.CollectionSchema{ Fields: []*schemapb.FieldSchema{ { @@ -81,6 +81,7 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data Value: "8", }, }, + Nullable: nullable, }, { FieldID: 102, @@ -121,7 +122,7 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data }) } - insertData, err := testutil.CreateInsertData(schema, suite.numRows) + insertData, err := testutil.CreateInsertData(schema, suite.numRows, nullPercent) suite.NoError(err) rows, err := testutil.CreateInsertDataRowsForJSON(schema, insertData) @@ -261,12 +262,16 @@ func (suite *ReaderSuite) TestReadScalarFields() { for _, dataType := range scalarTypes { if dataType == schemapb.DataType_Array { for _, elementType := range elementTypes { - suite.run(dataType, elementType, false) - suite.run(dataType, elementType, true) + suite.run(dataType, elementType, false, 0) + for _, nullPercent := range []int{0, 50, 100} { + suite.run(dataType, elementType, true, nullPercent) + } } } else { - suite.run(dataType, schemapb.DataType_None, false) - suite.run(dataType, schemapb.DataType_None, true) + suite.run(dataType, schemapb.DataType_None, false, 0) + for _, nullPercent := range []int{0, 50, 100} { + suite.run(dataType, schemapb.DataType_None, true, nullPercent) + } } } } @@ -291,22 +296,29 @@ func (suite *ReaderSuite) TestReadScalarFieldsWithDefaultValue() { func (suite *ReaderSuite) TestStringPK() { suite.pkDataType = schemapb.DataType_VarChar - suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) + suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false, 0) + for _, nullPercent := range []int{0, 50, 100} { + suite.run(schemapb.DataType_Int32, schemapb.DataType_None, true, nullPercent) + } } func (suite *ReaderSuite) TestVector() { - suite.vecDataType = schemapb.DataType_BinaryVector - suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) - suite.vecDataType = schemapb.DataType_FloatVector - suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) - suite.vecDataType = schemapb.DataType_Float16Vector - suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) - suite.vecDataType = schemapb.DataType_BFloat16Vector - suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) - suite.vecDataType = schemapb.DataType_SparseFloatVector - suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) - suite.vecDataType = schemapb.DataType_Int8Vector - suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) + dataTypes := []schemapb.DataType{ + schemapb.DataType_BinaryVector, + schemapb.DataType_FloatVector, + schemapb.DataType_Float16Vector, + schemapb.DataType_BFloat16Vector, + schemapb.DataType_SparseFloatVector, + schemapb.DataType_Int8Vector, + } + + for _, dataType := range dataTypes { + suite.vecDataType = dataType + suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false, 0) + for _, nullPercent := range []int{0, 50, 100} { + suite.run(schemapb.DataType_Int32, schemapb.DataType_None, true, nullPercent) + } + } } func (suite *ReaderSuite) TestDecodeError() { diff --git a/internal/util/importutilv2/json/row_parser_test.go b/internal/util/importutilv2/json/row_parser_test.go index f2ce4c6a10..00b7b93a0c 100644 --- a/internal/util/importutilv2/json/row_parser_test.go +++ b/internal/util/importutilv2/json/row_parser_test.go @@ -110,35 +110,41 @@ func (suite *RowParserSuite) createAllTypesSchema() *schemapb.CollectionSchema { Name: "float_vector", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "2"}}, + Nullable: suite.hasNullable, }, { FieldID: 22, Name: "bin_vector", DataType: schemapb.DataType_BinaryVector, TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "16"}}, + Nullable: suite.hasNullable, }, { FieldID: 23, Name: "sparse_vector", DataType: schemapb.DataType_SparseFloatVector, + Nullable: suite.hasNullable, }, { FieldID: 24, Name: "f16_vector", DataType: schemapb.DataType_Float16Vector, TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "2"}}, + Nullable: suite.hasNullable, }, { FieldID: 25, Name: "bf16_vector", DataType: schemapb.DataType_BFloat16Vector, TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "2"}}, + Nullable: suite.hasNullable, }, { FieldID: 26, Name: "int8_vector", DataType: schemapb.DataType_Int8Vector, TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "2"}}, + Nullable: suite.hasNullable, }, { FieldID: 27, @@ -633,6 +639,12 @@ func (suite *RowParserSuite) TestValid() { suite.runValid(&testCase{name: "A/N/D nullable field varchar is nil", content: suite.genAllTypesRowData("varchar", nil)}) suite.runValid(&testCase{name: "A/N/D nullable field json is nil", content: suite.genAllTypesRowData("json", nil)}) suite.runValid(&testCase{name: "A/N/D nullable field array_int8 is nil", content: suite.genAllTypesRowData("array_int8", nil)}) + suite.runValid(&testCase{name: "A/N/D nullable field float_vector is nil", content: suite.genAllTypesRowData("float_vector", nil)}) + suite.runValid(&testCase{name: "A/N/D nullable field bin_vector is nil", content: suite.genAllTypesRowData("bin_vector", nil)}) + suite.runValid(&testCase{name: "A/N/D nullable field sparse_vector is nil", content: suite.genAllTypesRowData("sparse_vector", nil)}) + suite.runValid(&testCase{name: "A/N/D nullable field f16_vector is nil", content: suite.genAllTypesRowData("f16_vector", nil)}) + suite.runValid(&testCase{name: "A/N/D nullable field bf16_vector is nil", content: suite.genAllTypesRowData("bf16_vector", nil)}) + suite.runValid(&testCase{name: "A/N/D nullable field int8_vector is nil", content: suite.genAllTypesRowData("int8_vector", nil)}) suite.setSchema(false, true, true) suite.runValid(&testCase{name: "N/D valid parse", content: suite.genAllTypesRowData("x", 2)}) @@ -675,7 +687,6 @@ func (suite *RowParserSuite) TestParseError() { {name: "not a JSON for dynamic", content: suite.genAllTypesRowData("$meta", []int{})}, {name: "exceeds max length varchar", content: suite.genAllTypesRowData("varchar", "aaaaaaaaaa")}, {name: "exceeds max capacity", content: suite.genAllTypesRowData("array_int8", []int{1, 2, 3, 4, 5})}, - {name: "field value missed", content: suite.genAllTypesRowData("x", 2, "float_vector")}, {name: "type error bool", content: suite.genAllTypesRowData("bool", 0.2)}, {name: "type error int8", content: suite.genAllTypesRowData("int8", []int32{})}, {name: "type error int16", content: suite.genAllTypesRowData("int16", []int32{})}, diff --git a/internal/util/importutilv2/numpy/reader_test.go b/internal/util/importutilv2/numpy/reader_test.go index 7a2baaf6cc..a0105e908a 100644 --- a/internal/util/importutilv2/numpy/reader_test.go +++ b/internal/util/importutilv2/numpy/reader_test.go @@ -433,18 +433,18 @@ func (suite *ReaderSuite) TestStringPK() { } func (suite *ReaderSuite) TestVector() { - suite.vecDataType = schemapb.DataType_BinaryVector - suite.run(schemapb.DataType_Int32) - suite.vecDataType = schemapb.DataType_FloatVector - suite.run(schemapb.DataType_Int32) - suite.vecDataType = schemapb.DataType_Float16Vector - suite.run(schemapb.DataType_Int32) - suite.vecDataType = schemapb.DataType_BFloat16Vector - suite.run(schemapb.DataType_Int32) - // suite.vecDataType = schemapb.DataType_SparseFloatVector - // suite.run(schemapb.DataType_Int32) - suite.vecDataType = schemapb.DataType_Int8Vector - suite.run(schemapb.DataType_Int32) + dataTypes := []schemapb.DataType{ + schemapb.DataType_BinaryVector, + schemapb.DataType_FloatVector, + schemapb.DataType_Float16Vector, + schemapb.DataType_BFloat16Vector, + schemapb.DataType_Int8Vector, + } + + for _, dataType := range dataTypes { + suite.vecDataType = dataType + suite.run(schemapb.DataType_Int32) + } } func TestNumpyCreateReaders(t *testing.T) { diff --git a/internal/util/importutilv2/parquet/field_reader.go b/internal/util/importutilv2/parquet/field_reader.go index c5de8d478d..f207a102fa 100644 --- a/internal/util/importutilv2/parquet/field_reader.go +++ b/internal/util/importutilv2/parquet/field_reader.go @@ -189,13 +189,13 @@ func (c *FieldReader) Next(count int64) (any, any, error) { case schemapb.DataType_BinaryVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector: // vector not support default_value if c.field.GetNullable() { - return nil, nil, merr.WrapErrParameterInvalidMsg("not support nullable in vector") + return ReadNullableBinaryData(c, count) } data, err := ReadBinaryData(c, count) return data, nil, err case schemapb.DataType_FloatVector: if c.field.GetNullable() { - return nil, nil, merr.WrapErrParameterInvalidMsg("not support nullable in vector") + return ReadNullableFloatVectorData(c, count) } arrayData, err := ReadIntegerOrFloatArrayData[float32](c, count) if err != nil { @@ -208,13 +208,13 @@ func (c *FieldReader) Next(count int64) (any, any, error) { return vectors, nil, nil case schemapb.DataType_SparseFloatVector: if c.field.GetNullable() { - return nil, nil, merr.WrapErrParameterInvalidMsg("not support nullable in vector") + return ReadNullableSparseFloatVectorData(c, count) } data, err := ReadSparseFloatVectorData(c, count) return data, nil, err case schemapb.DataType_Int8Vector: if c.field.GetNullable() { - return nil, nil, merr.WrapErrParameterInvalidMsg("not support nullable in vector") + return ReadNullableInt8VectorData(c, count) } arrayData, err := ReadIntegerOrFloatArrayData[int8](c, count) if err != nil { @@ -493,9 +493,6 @@ func ReadNullableIntegerOrFloatData[T constraints.Integer | constraints.Float](p // Value type of "indices" is array.List, element type is array.Uint32 // Value type of "values" is array.List, element type is array.Float32 // The length of the list is equal to the length of chunked.Chunks() -// -// Note: now the ReadStructData() is used by SparseVector type and SparseVector is not nullable, -// create a new method ReadNullableStructData() if we have nullable struct type in future. func ReadStructData(pcr *FieldReader, count int64) ([]map[string]arrow.Array, error) { chunked, err := pcr.columnReader.NextBatch(count) if err != nil { @@ -524,6 +521,40 @@ func ReadStructData(pcr *FieldReader, count int64) ([]map[string]arrow.Array, er return data, nil } +func ReadNullableStructData(pcr *FieldReader, count int64) ([]map[string]arrow.Array, []bool, error) { + chunked, err := pcr.columnReader.NextBatch(count) + if err != nil { + return nil, nil, err + } + data := make([]map[string]arrow.Array, 0, count) + validData := make([]bool, 0, count) + + for _, chunk := range chunked.Chunks() { + structReader, ok := chunk.(*array.Struct) + if !ok { + return nil, nil, WrapTypeErr(pcr.field, chunk.DataType().Name()) + } + + structType := structReader.DataType().(*arrow.StructType) + rows := structReader.Len() + // Sparse storage: only store valid rows' data + for i := 0; i < rows; i++ { + validData = append(validData, !structReader.IsNull(i)) + if !structReader.IsNull(i) { + st := make(map[string]arrow.Array) + for k, field := range structType.Fields() { + st[field.Name] = structReader.Field(k) + } + data = append(data, st) + } + } + } + if len(data) == 0 && len(validData) == 0 { + return nil, nil, nil + } + return data, validData, nil +} + func ReadStringData(pcr *FieldReader, count int64, isVarcharField bool) (any, error) { chunked, err := pcr.columnReader.NextBatch(count) if err != nil { @@ -851,6 +882,61 @@ func ReadBinaryData(pcr *FieldReader, count int64) (any, error) { return data, nil } +func ReadNullableBinaryData(pcr *FieldReader, count int64) (any, []bool, error) { + dataType := pcr.field.GetDataType() + chunked, err := pcr.columnReader.NextBatch(count) + if err != nil { + return nil, nil, err + } + data := make([]byte, 0, count) + validData := make([]bool, 0, count) + + // Sparse storage: only store valid rows' data + for _, chunk := range chunked.Chunks() { + rows := chunk.Data().Len() + switch chunk.DataType().ID() { + case arrow.NULL: + for i := 0; i < rows; i++ { + validData = append(validData, false) + } + case arrow.BINARY: + binaryReader := chunk.(*array.Binary) + for i := 0; i < rows; i++ { + if binaryReader.IsNull(i) { + validData = append(validData, false) + } else { + data = append(data, binaryReader.Value(i)...) + validData = append(validData, true) + } + } + case arrow.LIST: + listReader := chunk.(*array.List) + if err = checkNullableVectorAligned(listReader.Offsets(), listReader, pcr.dim, dataType); err != nil { + return nil, nil, merr.WrapErrImportFailed(fmt.Sprintf("length of vector is not aligned: %s, data type: %s", err.Error(), dataType.String())) + } + uint8Reader, ok := listReader.ListValues().(*array.Uint8) + if !ok { + return nil, nil, WrapTypeErr(pcr.field, listReader.ListValues().DataType().Name()) + } + for i := 0; i < rows; i++ { + if listReader.IsNull(i) { + validData = append(validData, false) + } else { + start, end := listReader.ValueOffsets(i) + data = append(data, uint8Reader.Uint8Values()[start:end]...) + validData = append(validData, true) + } + } + default: + return nil, nil, WrapTypeErr(pcr.field, chunk.DataType().Name()) + } + } + if len(data) == 0 && len(validData) == 0 { + return nil, nil, nil + } + return data, validData, nil +} + func parseSparseFloatRowVector(str string) ([]byte, uint32, error) { rowVec, err := typeutil.CreateSparseFloatRowFromJSON([]byte(str)) if err != nil { @@ -1045,6 +1131,80 @@ func ReadSparseFloatVectorData(pcr *FieldReader, count int64) (any, error) { }, nil } +func ReadNullableSparseFloatVectorData(pcr *FieldReader, count int64) (any, []bool, error) { + if pcr.sparseIsString { + data, validData, err := ReadNullableStringData(pcr, count) + if err != nil { + return nil, nil, err + } + if data == nil { + return nil, nil, nil + } + + // Sparse storage: only store valid rows' data + byteArr := make([][]byte, 0, count) + maxDim := uint32(0) + + for i, str := range data.([]string) { + if validData[i] { + rowVec, rowMaxIdx, err := parseSparseFloatRowVector(str) + if err != nil { + return nil, nil, err + } + byteArr = append(byteArr, rowVec) + if rowMaxIdx > maxDim { + maxDim = rowMaxIdx + } + } + } + + return &storage.SparseFloatVectorFieldData{ + SparseFloatArray: schemapb.SparseFloatArray{ + Dim: int64(maxDim), + Contents: byteArr, + }, + ValidData: validData, + Nullable: true, + }, validData, nil + } + + data, validData, err := ReadNullableStructData(pcr, count) + if err != nil { + return nil, nil, err + } + if data == nil { + return nil, nil, nil + } + + // Sparse storage: only store valid rows' data + byteArr := make([][]byte, 0, count) + maxDim := uint32(0) + + for i, structData := range data { + if validData[i] { + singleByteArr, singleMaxDim, err := parseSparseFloatVectorStructs([]map[string]arrow.Array{structData}) + if err != nil { + return nil, nil, err + } + if len(singleByteArr) > 0 { + byteArr = append(byteArr, singleByteArr[0]) + if singleMaxDim > maxDim { + maxDim = singleMaxDim + } + } + } + } + + return &storage.SparseFloatVectorFieldData{ + SparseFloatArray: schemapb.SparseFloatArray{ + Dim: int64(maxDim), + Contents: byteArr, + }, + ValidData: validData, + Nullable: true, + }, validData, nil +} + func checkVectorAlignWithDim(offsets []int32, dim int32) error { for i := 1; i < len(offsets); i++ { if offsets[i]-offsets[i-1] != dim { @@ -1054,6 +1214,16 @@ func checkVectorAlignWithDim(offsets []int32, dim int32) error { return nil } +func checkNullableVectorAlignWithDim(offsets []int32, listReader *array.List, dim int32) error { + for i := 1; i < len(offsets); i++ { + length := offsets[i] - offsets[i-1] + if !listReader.IsNull(i-1) && length != dim { + return fmt.Errorf("expected %d but got %d", dim, length) + } + } + return nil +} + func checkVectorAligned(offsets []int32, dim int, dataType schemapb.DataType) error { if len(offsets) < 1 { return errors.New("empty offsets") @@ -1075,6 +1245,26 @@ func checkVectorAligned(offsets []int32, dim int, dataType schemapb.DataType) er } } +func checkNullableVectorAligned(offsets []int32, listReader *array.List, dim int, dataType schemapb.DataType) error { + if len(offsets) < 1 { + return errors.New("empty offsets") + } + switch dataType { + case schemapb.DataType_BinaryVector: + return checkNullableVectorAlignWithDim(offsets, listReader, int32(dim/8)) + case schemapb.DataType_FloatVector: + return checkNullableVectorAlignWithDim(offsets, listReader, int32(dim)) + case schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector: + return checkNullableVectorAlignWithDim(offsets, listReader, int32(dim*2)) + case schemapb.DataType_SparseFloatVector: + return nil + case schemapb.DataType_Int8Vector: + return checkNullableVectorAlignWithDim(offsets, listReader, int32(dim)) + default: + return fmt.Errorf("unexpected vector data type %s", dataType.String()) + } +} + func getArrayData[T any](offsets []int32, getElement func(int) (T, error), outputArray func(arr []T, valid bool)) error { for i := 1; i < len(offsets); i++ { start, end := offsets[i-1], offsets[i] @@ -1092,6 +1282,24 @@ func getArrayData[T any](offsets []int32, getElement func(int) (T, error), outpu return nil } +func getArrayDataNullable[T any](offsets []int32, listReader *array.List, getElement func(int) (T, error), outputArray func(arr []T, valid bool)) error { + for i := 1; i < len(offsets); i++ { + isValid := !listReader.IsNull(i - 1) + + start, end := offsets[i-1], offsets[i] + arrData := make([]T, 0, end-start) + for j := start; j < end; j++ { + elementVal, err := getElement(int(j)) + if err != nil { + return err + } + arrData = append(arrData, elementVal) + } + outputArray(arrData, isValid) + } + return nil +} + func ReadBoolArrayData(pcr *FieldReader, count int64) (any, error) { chunked, err := pcr.columnReader.NextBatch(count) if err != nil { @@ -1430,6 +1638,108 @@ func ReadNullableIntegerOrFloatArrayData[T constraints.Integer | constraints.Flo return data, validData, nil } +func ReadNullableFloatVectorData(pcr *FieldReader, count int64) (any, []bool, error) { + chunked, err := pcr.columnReader.NextBatch(count) + if err != nil { + return nil, nil, err + } + data := make([]float32, 0, int(count)*pcr.dim) + validData := make([]bool, 0, count) + + for _, chunk := range chunked.Chunks() { + listReader, ok := chunk.(*array.List) + if !ok { + // the chunk type may be *array.Null if the data in chunk is all null + _, ok := chunk.(*array.Null) + if !ok { + return nil, nil, WrapTypeErr(pcr.field, chunk.DataType().Name()) + } + dataNums := chunk.Data().Len() + validData = append(validData, make([]bool, dataNums)...) + continue + } + + dataType := pcr.field.GetDataType() + offsets := listReader.Offsets() + if err = checkNullableVectorAligned(offsets, listReader, pcr.dim, dataType); err != nil { + return nil, nil, merr.WrapErrImportFailed(fmt.Sprintf("length of vector is not aligned: %s, data type: %s", err.Error(), dataType.String())) + } + + valueReader := listReader.ListValues() + rows := listReader.Len() + + // Sparse storage: only store valid rows' data + float32Reader, ok := valueReader.(*array.Float32) + if !ok { + return nil, nil, WrapTypeErr(pcr.field, valueReader.DataType().Name()) + } + for i := 0; i < rows; i++ { + validData = append(validData, !listReader.IsNull(i)) + if !listReader.IsNull(i) { + start, end := offsets[i], offsets[i+1] + for j := start; j < end; j++ { + data = append(data, float32Reader.Value(int(j))) + } + } + } + } + if len(data) == 0 && len(validData) == 0 { + return nil, nil, nil + } + return data, validData, typeutil.VerifyFloats32(data) +} + +func ReadNullableInt8VectorData(pcr *FieldReader, count int64) (any, []bool, error) { + chunked, err := pcr.columnReader.NextBatch(count) + if err != nil { + return nil, nil, err + } + data := make([]int8, 0, int(count)*pcr.dim) + validData := make([]bool, 0, count) + + for _, chunk := range chunked.Chunks() { + listReader, ok := chunk.(*array.List) + if !ok { + // the chunk type may be *array.Null if the data in chunk is all null + _, ok := chunk.(*array.Null) + if !ok { + return nil, nil, WrapTypeErr(pcr.field, chunk.DataType().Name()) + } + dataNums := chunk.Data().Len() + validData = append(validData, make([]bool, dataNums)...) + continue + } + + dataType := pcr.field.GetDataType() + offsets := listReader.Offsets() + if err = checkNullableVectorAligned(offsets, listReader, pcr.dim, dataType); err != nil { + return nil, nil, merr.WrapErrImportFailed(fmt.Sprintf("length of vector is not aligned: %s, data type: %s", err.Error(), dataType.String())) + } + + valueReader := listReader.ListValues() + rows := listReader.Len() + + // Sparse storage: only store valid rows' data + int8Reader, ok := valueReader.(*array.Int8) + if !ok { + return nil, nil, WrapTypeErr(pcr.field, valueReader.DataType().Name()) + } + for i := 0; i < rows; i++ { + validData = append(validData, !listReader.IsNull(i)) + if !listReader.IsNull(i) { + start, end := offsets[i], offsets[i+1] + for j := start; j < end; j++ { + data = append(data, int8Reader.Value(int(j))) + } + } + } + } + if len(data) == 0 && len(validData) == 0 { + return nil, nil, nil + } + return data, validData, nil +} + func ReadStringArrayData(pcr *FieldReader, count int64) (any, error) { maxLength, err := parameterutil.GetMaxLength(pcr.field) if err != nil { diff --git a/internal/util/importutilv2/parquet/field_reader_test.go b/internal/util/importutilv2/parquet/field_reader_test.go index 97f35bdc9a..7cfbbb7482 100644 --- a/internal/util/importutilv2/parquet/field_reader_test.go +++ b/internal/util/importutilv2/parquet/field_reader_test.go @@ -345,7 +345,7 @@ func TestParseSparseFloatVectorStructs(t *testing.T) { } func TestReadFieldData(t *testing.T) { - checkFunc := func(dataHasNull bool, readScehamIsNullable bool, dataType schemapb.DataType, elementType schemapb.DataType) { + checkFunc := func(t *testing.T, nullPercent int, readScehamIsNullable bool, dataType schemapb.DataType, elementType schemapb.DataType) { fieldName := dataType.String() if elementType != schemapb.DataType_None { fieldName = fieldName + "_" + elementType.String() @@ -357,7 +357,7 @@ func TestReadFieldData(t *testing.T) { Name: fieldName, DataType: dataType, ElementType: elementType, - Nullable: dataHasNull, + Nullable: nullPercent != 0, TypeParams: []*commonpb.KeyValuePair{ { Key: "dim", @@ -397,10 +397,6 @@ func TestReadFieldData(t *testing.T) { assert.NoError(t, err) rowCount := 5 - nullPercent := 0 - if dataHasNull { - nullPercent = 50 - } insertData, err := testutil.CreateInsertData(schema, rowCount, nullPercent) assert.NoError(t, err) columns, err := testutil.BuildArrayData(schema, insertData, false) @@ -423,7 +419,7 @@ func TestReadFieldData(t *testing.T) { defer reader.Close() _, err = reader.Read() - if !readScehamIsNullable && dataHasNull { + if !readScehamIsNullable && nullPercent != 0 { assert.Error(t, err) } else { assert.NoError(t, err) @@ -432,17 +428,17 @@ func TestReadFieldData(t *testing.T) { type testCase struct { name string - dataHasNull bool + nullPercent int readScehamIsNullable bool dataType schemapb.DataType elementType schemapb.DataType } - buildCaseFunc := func(dataHasNull bool, readScehamIsNullable bool, dataType schemapb.DataType, elementType schemapb.DataType) *testCase { - name := fmt.Sprintf("dataHasNull='%v' schemaNullable='%v' dataType='%s' elementType='%s'", - dataHasNull, readScehamIsNullable, dataType, elementType) + buildCaseFunc := func(nullPercent int, readScehamIsNullable bool, dataType schemapb.DataType, elementType schemapb.DataType) *testCase { + name := fmt.Sprintf("nullPercent='%v' schemaNullable='%v' dataType='%s' elementType='%s'", + nullPercent, readScehamIsNullable, dataType, elementType) return &testCase{ name: name, - dataHasNull: dataHasNull, + nullPercent: nullPercent, readScehamIsNullable: readScehamIsNullable, dataType: dataType, elementType: elementType, @@ -459,12 +455,19 @@ func TestReadFieldData(t *testing.T) { schemapb.DataType_Float, schemapb.DataType_Double, schemapb.DataType_VarChar, + schemapb.DataType_FloatVector, + schemapb.DataType_BinaryVector, + schemapb.DataType_SparseFloatVector, + schemapb.DataType_Float16Vector, + schemapb.DataType_BFloat16Vector, + schemapb.DataType_Int8Vector, } for _, dataType := range nullableDataTypes { - cases = append(cases, buildCaseFunc(true, true, dataType, schemapb.DataType_None)) - cases = append(cases, buildCaseFunc(true, false, dataType, schemapb.DataType_None)) - cases = append(cases, buildCaseFunc(false, true, dataType, schemapb.DataType_None)) - cases = append(cases, buildCaseFunc(false, true, dataType, schemapb.DataType_None)) + for _, nullPercent := range []int{0, 50} { + for _, readScehamIsNullable := range []bool{true, false} { + cases = append(cases, buildCaseFunc(nullPercent, readScehamIsNullable, dataType, schemapb.DataType_None)) + } + } } elementTypes := []schemapb.DataType{ @@ -478,28 +481,23 @@ func TestReadFieldData(t *testing.T) { schemapb.DataType_VarChar, } for _, elementType := range elementTypes { - cases = append(cases, buildCaseFunc(true, true, schemapb.DataType_Array, elementType)) - cases = append(cases, buildCaseFunc(true, false, schemapb.DataType_Array, elementType)) - cases = append(cases, buildCaseFunc(false, true, schemapb.DataType_Array, elementType)) - cases = append(cases, buildCaseFunc(false, false, schemapb.DataType_Array, elementType)) + for _, nullPercent := range []int{0, 50} { + for _, readScehamIsNullable := range []bool{true, false} { + cases = append(cases, buildCaseFunc(nullPercent, readScehamIsNullable, schemapb.DataType_Array, elementType)) + } + } } notNullableTypes := []schemapb.DataType{ schemapb.DataType_JSON, - schemapb.DataType_FloatVector, - schemapb.DataType_BinaryVector, - schemapb.DataType_SparseFloatVector, - schemapb.DataType_Float16Vector, - schemapb.DataType_BFloat16Vector, - schemapb.DataType_Int8Vector, } for _, dataType := range notNullableTypes { - cases = append(cases, buildCaseFunc(false, false, dataType, schemapb.DataType_None)) + cases = append(cases, buildCaseFunc(0, false, dataType, schemapb.DataType_None)) } for _, tt := range cases { t.Run(tt.name, func(t *testing.T) { - checkFunc(tt.dataHasNull, tt.readScehamIsNullable, tt.dataType, tt.elementType) + checkFunc(t, tt.nullPercent, tt.readScehamIsNullable, tt.dataType, tt.elementType) }) } } @@ -659,8 +657,7 @@ func TestTypeMismatch(t *testing.T) { cases = append(cases, buildCaseFunc(schemapb.DataType_Bool, schemapb.DataType_None, schemapb.DataType_Array, elementType, false)) } - notNullableTypes := []schemapb.DataType{ - schemapb.DataType_JSON, + vectorTypes := []schemapb.DataType{ schemapb.DataType_FloatVector, schemapb.DataType_BinaryVector, schemapb.DataType_SparseFloatVector, @@ -668,6 +665,18 @@ func TestTypeMismatch(t *testing.T) { schemapb.DataType_BFloat16Vector, schemapb.DataType_Int8Vector, } + for _, dataType := range vectorTypes { + srcDataType := schemapb.DataType_Bool + cases = append(cases, buildCaseFunc(srcDataType, schemapb.DataType_None, dataType, schemapb.DataType_None, true)) + cases = append(cases, buildCaseFunc(srcDataType, schemapb.DataType_None, dataType, schemapb.DataType_None, false)) + + cases = append(cases, buildCaseFunc(schemapb.DataType_Array, schemapb.DataType_Bool, dataType, schemapb.DataType_None, true)) + cases = append(cases, buildCaseFunc(schemapb.DataType_Array, schemapb.DataType_Bool, dataType, schemapb.DataType_None, false)) + } + + notNullableTypes := []schemapb.DataType{ + schemapb.DataType_JSON, + } for _, dataType := range notNullableTypes { srcDataType := schemapb.DataType_Bool if dataType == schemapb.DataType_Bool { diff --git a/internal/util/importutilv2/parquet/reader_test.go b/internal/util/importutilv2/parquet/reader_test.go index c6d9c11342..758340e02f 100644 --- a/internal/util/importutilv2/parquet/reader_test.go +++ b/internal/util/importutilv2/parquet/reader_test.go @@ -450,8 +450,10 @@ func (s *ReaderSuite) runWithSparseVector(indicesType arrow.DataType, valuesType builder.AppendValues(int64Data, validData) arrowColumns = append(arrowColumns, builder.NewInt64Array()) - contents := insertData.Data[schema.Fields[1].FieldID].(*storage.SparseFloatVectorFieldData).GetContents() - arr, err := testutil.BuildSparseVectorData(mem, contents, arrowFields[1].Type) + sparseFieldData := insertData.Data[schema.Fields[1].FieldID].(*storage.SparseFloatVectorFieldData) + contents := sparseFieldData.GetContents() + sparseValidData := sparseFieldData.ValidData + arr, err := testutil.BuildSparseVectorData(mem, contents, arrowFields[1].Type, sparseValidData) assert.NoError(s.T(), err) arrowColumns = append(arrowColumns, arr) @@ -545,65 +547,34 @@ func (s *ReaderSuite) TestReadScalarFieldsWithDefaultValue() { } func (s *ReaderSuite) TestReadScalarFields() { - s.run(schemapb.DataType_Bool, schemapb.DataType_None, false, 0) - s.run(schemapb.DataType_Int8, schemapb.DataType_None, false, 0) - s.run(schemapb.DataType_Int16, schemapb.DataType_None, false, 0) - s.run(schemapb.DataType_Int32, schemapb.DataType_None, false, 0) - s.run(schemapb.DataType_Int64, schemapb.DataType_None, false, 0) - s.run(schemapb.DataType_Float, schemapb.DataType_None, false, 0) - s.run(schemapb.DataType_Double, schemapb.DataType_None, false, 0) - s.run(schemapb.DataType_String, schemapb.DataType_None, false, 0) - s.run(schemapb.DataType_VarChar, schemapb.DataType_None, false, 0) - s.run(schemapb.DataType_JSON, schemapb.DataType_None, false, 0) - s.run(schemapb.DataType_Geometry, schemapb.DataType_None, false, 0) + elementTypes := []schemapb.DataType{ + schemapb.DataType_Bool, + schemapb.DataType_Int8, + schemapb.DataType_Int16, + schemapb.DataType_Int32, + schemapb.DataType_Int64, + schemapb.DataType_Float, + schemapb.DataType_Double, + schemapb.DataType_String, + } - s.run(schemapb.DataType_Array, schemapb.DataType_Bool, false, 0) - s.run(schemapb.DataType_Array, schemapb.DataType_Int8, false, 0) - s.run(schemapb.DataType_Array, schemapb.DataType_Int16, false, 0) - s.run(schemapb.DataType_Array, schemapb.DataType_Int32, false, 0) - s.run(schemapb.DataType_Array, schemapb.DataType_Int64, false, 0) - s.run(schemapb.DataType_Array, schemapb.DataType_Float, false, 0) - s.run(schemapb.DataType_Array, schemapb.DataType_Double, false, 0) - s.run(schemapb.DataType_Array, schemapb.DataType_String, false, 0) + scalarTypes := append(elementTypes, []schemapb.DataType{schemapb.DataType_VarChar, schemapb.DataType_JSON, schemapb.DataType_Geometry, schemapb.DataType_Array}...) - s.run(schemapb.DataType_Bool, schemapb.DataType_None, true, 50) - s.run(schemapb.DataType_Int8, schemapb.DataType_None, true, 50) - s.run(schemapb.DataType_Int16, schemapb.DataType_None, true, 50) - s.run(schemapb.DataType_Int32, schemapb.DataType_None, true, 50) - s.run(schemapb.DataType_Int64, schemapb.DataType_None, true, 50) - s.run(schemapb.DataType_Float, schemapb.DataType_None, true, 50) - s.run(schemapb.DataType_String, schemapb.DataType_None, true, 50) - s.run(schemapb.DataType_VarChar, schemapb.DataType_None, true, 50) - s.run(schemapb.DataType_JSON, schemapb.DataType_None, true, 50) - - s.run(schemapb.DataType_Array, schemapb.DataType_Bool, true, 50) - s.run(schemapb.DataType_Array, schemapb.DataType_Int8, true, 50) - s.run(schemapb.DataType_Array, schemapb.DataType_Int16, true, 50) - s.run(schemapb.DataType_Array, schemapb.DataType_Int32, true, 50) - s.run(schemapb.DataType_Array, schemapb.DataType_Int64, true, 50) - s.run(schemapb.DataType_Array, schemapb.DataType_Float, true, 50) - s.run(schemapb.DataType_Array, schemapb.DataType_Double, true, 50) - s.run(schemapb.DataType_Array, schemapb.DataType_String, true, 50) - - s.run(schemapb.DataType_Bool, schemapb.DataType_None, true, 100) - s.run(schemapb.DataType_Int8, schemapb.DataType_None, true, 100) - s.run(schemapb.DataType_Int16, schemapb.DataType_None, true, 100) - s.run(schemapb.DataType_Int32, schemapb.DataType_None, true, 100) - s.run(schemapb.DataType_Int64, schemapb.DataType_None, true, 100) - s.run(schemapb.DataType_Float, schemapb.DataType_None, true, 100) - s.run(schemapb.DataType_String, schemapb.DataType_None, true, 100) - s.run(schemapb.DataType_VarChar, schemapb.DataType_None, true, 100) - s.run(schemapb.DataType_JSON, schemapb.DataType_None, true, 100) - s.run(schemapb.DataType_Geometry, schemapb.DataType_None, true, 100) - - s.run(schemapb.DataType_Array, schemapb.DataType_Bool, true, 100) - s.run(schemapb.DataType_Array, schemapb.DataType_Int8, true, 100) - s.run(schemapb.DataType_Array, schemapb.DataType_Int16, true, 100) - s.run(schemapb.DataType_Array, schemapb.DataType_Int32, true, 100) - s.run(schemapb.DataType_Array, schemapb.DataType_Int64, true, 100) - s.run(schemapb.DataType_Array, schemapb.DataType_Float, true, 100) - s.run(schemapb.DataType_Array, schemapb.DataType_Double, true, 100) - s.run(schemapb.DataType_Array, schemapb.DataType_String, true, 100) + for _, dataType := range scalarTypes { + if dataType == schemapb.DataType_Array { + for _, elementType := range elementTypes { + s.run(dataType, elementType, false, 0) + for _, nullPercent := range []int{0, 50, 100} { + s.run(dataType, elementType, true, nullPercent) + } + } + } else { + s.run(dataType, schemapb.DataType_None, false, 0) + for _, nullPercent := range []int{0, 50, 100} { + s.run(dataType, schemapb.DataType_None, true, nullPercent) + } + } + } s.failRun(schemapb.DataType_JSON, true) } @@ -611,24 +582,28 @@ func (s *ReaderSuite) TestReadScalarFields() { func (s *ReaderSuite) TestStringPK() { s.pkDataType = schemapb.DataType_VarChar s.run(schemapb.DataType_Int32, schemapb.DataType_None, false, 0) - s.run(schemapb.DataType_Int32, schemapb.DataType_None, true, 50) - s.run(schemapb.DataType_Int32, schemapb.DataType_None, true, 100) + for _, nullPercent := range []int{0, 50, 100} { + s.run(schemapb.DataType_Int32, schemapb.DataType_None, true, nullPercent) + } } func (s *ReaderSuite) TestVector() { - s.vecDataType = schemapb.DataType_BinaryVector - s.run(schemapb.DataType_Int32, schemapb.DataType_None, false, 0) - s.vecDataType = schemapb.DataType_FloatVector - s.run(schemapb.DataType_Int32, schemapb.DataType_None, false, 0) - s.vecDataType = schemapb.DataType_Float16Vector - s.run(schemapb.DataType_Int32, schemapb.DataType_None, false, 0) - s.vecDataType = schemapb.DataType_BFloat16Vector - s.run(schemapb.DataType_Int32, schemapb.DataType_None, false, 0) - // this test case only test parsing sparse vector from JSON-format string - s.vecDataType = schemapb.DataType_SparseFloatVector - s.run(schemapb.DataType_Int32, schemapb.DataType_None, false, 0) - s.vecDataType = schemapb.DataType_Int8Vector - s.run(schemapb.DataType_Int32, schemapb.DataType_None, false, 0) + dataTypes := []schemapb.DataType{ + schemapb.DataType_BinaryVector, + schemapb.DataType_FloatVector, + schemapb.DataType_Float16Vector, + schemapb.DataType_BFloat16Vector, + schemapb.DataType_SparseFloatVector, + schemapb.DataType_Int8Vector, + } + + for _, dataType := range dataTypes { + s.vecDataType = dataType + s.run(schemapb.DataType_Int32, schemapb.DataType_None, false, 0) + for _, nullPercent := range []int{0, 50, 100} { + s.run(schemapb.DataType_Int32, schemapb.DataType_None, true, nullPercent) + } + } } func (s *ReaderSuite) TestSparseVector() { diff --git a/internal/util/testutil/test_util.go b/internal/util/testutil/test_util.go index 700b55b774..314f0248ae 100644 --- a/internal/util/testutil/test_util.go +++ b/internal/util/testutil/test_util.go @@ -110,11 +110,76 @@ func CreateInsertData(schema *schemapb.CollectionSchema, rows int, nullPercent . if err != nil { return nil, err } + // Pre-generate validData for nullable fields to determine sparse storage size + validDataMap := make(map[int64][]bool) allFields := typeutil.GetAllFieldSchemas(schema) for _, f := range allFields { if f.GetAutoID() || f.IsFunctionOutput { continue } + if f.GetNullable() { + if len(nullPercent) > 1 { + return nil, merr.WrapErrParameterInvalidMsg("the length of nullPercent is wrong") + } + var validData []bool + if len(nullPercent) == 0 || nullPercent[0] == 50 { + validData = testutils.GenerateBoolArray(rows) + } else if len(nullPercent) == 1 && nullPercent[0] == 100 { + validData = make([]bool, rows) + } else if len(nullPercent) == 1 && nullPercent[0] == 0 { + validData = make([]bool, rows) + for i := range validData { + validData[i] = true + } + } else { + return nil, merr.WrapErrParameterInvalidMsg(fmt.Sprintf("not support the number of nullPercent(%d)", nullPercent)) + } + validDataMap[f.FieldID] = validData + } + } + + // Helper function to check if a type is a vector type (uses sparse storage) + isVectorType := func(dataType schemapb.DataType) bool { + switch dataType { + case schemapb.DataType_BinaryVector, + schemapb.DataType_FloatVector, + schemapb.DataType_Float16Vector, + schemapb.DataType_BFloat16Vector, + schemapb.DataType_SparseFloatVector, + schemapb.DataType_Int8Vector: + return true + default: + return false + } + } + + // Helper function to count valid rows + countValidRows := func(validData []bool) int { + if len(validData) == 0 { + return rows + } + count := 0 + for _, v := range validData { + if v { + count++ + } + } + return count + } + + for _, f := range allFields { + if f.GetAutoID() || f.IsFunctionOutput { + continue + } + validData := validDataMap[f.FieldID] + // Vector types use sparse storage (only valid rows), scalar types use dense storage (all rows) + var dataRows int + if isVectorType(f.GetDataType()) { + dataRows = countValidRows(validData) + } else { + dataRows = rows + } + switch f.GetDataType() { case schemapb.DataType_Bool: insertData.Data[f.FieldID].AppendDataRows(testutils.GenerateBoolArray(rows)) @@ -135,54 +200,47 @@ func CreateInsertData(schema *schemapb.CollectionSchema, rows int, nullPercent . if err != nil { return nil, err } - insertData.Data[f.FieldID] = &storage.BinaryVectorFieldData{ - Data: testutils.GenerateBinaryVectors(rows, int(dim)), - Dim: int(dim), - } + // For nullable vectors, use sparse storage (only generate valid rows) + insertData.Data[f.FieldID].(*storage.BinaryVectorFieldData).Data = testutils.GenerateBinaryVectors(dataRows, int(dim)) + insertData.Data[f.FieldID].(*storage.BinaryVectorFieldData).Dim = int(dim) case schemapb.DataType_FloatVector: dim, err := typeutil.GetDim(f) if err != nil { return nil, err } - insertData.Data[f.GetFieldID()] = &storage.FloatVectorFieldData{ - Data: testutils.GenerateFloatVectors(rows, int(dim)), - Dim: int(dim), - } + // For nullable vectors, use sparse storage (only generate valid rows) + insertData.Data[f.GetFieldID()].(*storage.FloatVectorFieldData).Data = testutils.GenerateFloatVectors(dataRows, int(dim)) + insertData.Data[f.GetFieldID()].(*storage.FloatVectorFieldData).Dim = int(dim) case schemapb.DataType_Float16Vector: dim, err := typeutil.GetDim(f) if err != nil { return nil, err } - insertData.Data[f.FieldID] = &storage.Float16VectorFieldData{ - Data: testutils.GenerateFloat16Vectors(rows, int(dim)), - Dim: int(dim), - } + // For nullable vectors, use sparse storage (only generate valid rows) + insertData.Data[f.FieldID].(*storage.Float16VectorFieldData).Data = testutils.GenerateFloat16Vectors(dataRows, int(dim)) + insertData.Data[f.FieldID].(*storage.Float16VectorFieldData).Dim = int(dim) case schemapb.DataType_BFloat16Vector: dim, err := typeutil.GetDim(f) if err != nil { return nil, err } - insertData.Data[f.FieldID] = &storage.BFloat16VectorFieldData{ - Data: testutils.GenerateBFloat16Vectors(rows, int(dim)), - Dim: int(dim), - } + // For nullable vectors, use sparse storage (only generate valid rows) + insertData.Data[f.FieldID].(*storage.BFloat16VectorFieldData).Data = testutils.GenerateBFloat16Vectors(dataRows, int(dim)) + insertData.Data[f.FieldID].(*storage.BFloat16VectorFieldData).Dim = int(dim) case schemapb.DataType_SparseFloatVector: - data, dim := testutils.GenerateSparseFloatVectorsData(rows) - insertData.Data[f.FieldID] = &storage.SparseFloatVectorFieldData{ - SparseFloatArray: schemapb.SparseFloatArray{ - Contents: data, - Dim: dim, - }, - } + // For nullable vectors, use sparse storage (only generate valid rows) + data, dim := testutils.GenerateSparseFloatVectorsData(dataRows) + sparseData := insertData.Data[f.FieldID].(*storage.SparseFloatVectorFieldData) + sparseData.Contents = data + sparseData.Dim = dim case schemapb.DataType_Int8Vector: dim, err := typeutil.GetDim(f) if err != nil { return nil, err } - insertData.Data[f.FieldID] = &storage.Int8VectorFieldData{ - Data: testutils.GenerateInt8Vectors(rows, int(dim)), - Dim: int(dim), - } + // For nullable vectors, use sparse storage (only generate valid rows) + insertData.Data[f.FieldID].(*storage.Int8VectorFieldData).Data = testutils.GenerateInt8Vectors(dataRows, int(dim)) + insertData.Data[f.FieldID].(*storage.Int8VectorFieldData).Dim = int(dim) case schemapb.DataType_String, schemapb.DataType_VarChar: insertData.Data[f.FieldID].AppendDataRows(testutils.GenerateStringArray(rows)) case schemapb.DataType_JSON: @@ -220,23 +278,10 @@ func CreateInsertData(schema *schemapb.CollectionSchema, rows int, nullPercent . default: panic(fmt.Sprintf("unsupported data type: %s", f.GetDataType().String())) } + // Apply pre-generated validData for nullable fields if f.GetNullable() { - if len(nullPercent) > 1 { - return nil, merr.WrapErrParameterInvalidMsg("the length of nullPercent is wrong") - } - if len(nullPercent) == 0 || nullPercent[0] == 50 { - insertData.Data[f.FieldID].AppendValidDataRows(testutils.GenerateBoolArray(rows)) - } else if len(nullPercent) == 1 && nullPercent[0] == 100 { - insertData.Data[f.FieldID].AppendValidDataRows(make([]bool, rows)) - } else if len(nullPercent) == 1 && nullPercent[0] == 0 { - validData := make([]bool, rows) - for i := range validData { - validData[i] = true - } - insertData.Data[f.FieldID].AppendValidDataRows(validData) - } else { - return nil, merr.WrapErrParameterInvalidMsg(fmt.Sprintf("not support the number of nullPercent(%d)", nullPercent)) - } + validData := validDataMap[f.FieldID] + insertData.Data[f.FieldID].AppendValidDataRows(validData) } } return insertData, nil @@ -304,25 +349,34 @@ func CreateFieldWithDefaultValue(dataType schemapb.DataType, id int64, nullable return field, nil } -func BuildSparseVectorData(mem *memory.GoAllocator, contents [][]byte, arrowType arrow.DataType) (arrow.Array, error) { +func BuildSparseVectorData(mem *memory.GoAllocator, contents [][]byte, arrowType arrow.DataType, validData []bool) (arrow.Array, error) { if arrowType == nil || arrowType.ID() == arrow.STRING { // build sparse vector as JSON-format string builder := array.NewStringBuilder(mem) - rows := len(contents) - jsonBytesData := make([][]byte, 0) - for i := 0; i < rows; i++ { - rowVecData := contents[i] - mapData := typeutil.SparseFloatBytesToMap(rowVecData) - // convert to JSON format - jsonBytes, err := json.Marshal(mapData) - if err != nil { - return nil, err - } - jsonBytesData = append(jsonBytesData, jsonBytes) + // For sparse storage: iterate over logical rows, use physical index for contents + var logicalRows int + if len(validData) > 0 { + logicalRows = len(validData) + } else { + logicalRows = len(contents) + } + physicalIdx := 0 + for i := 0; i < logicalRows; i++ { + isValid := len(validData) == 0 || validData[i] + if isValid { + rowVecData := contents[physicalIdx] + mapData := typeutil.SparseFloatBytesToMap(rowVecData) + // convert to JSON format + jsonBytes, err := json.Marshal(mapData) + if err != nil { + return nil, err + } + builder.Append(string(jsonBytes)) + physicalIdx++ + } else { + builder.AppendNull() + } } - builder.AppendValues(lo.Map(jsonBytesData, func(bs []byte, _ int) string { - return string(bs) - }), nil) return builder.NewStringArray(), nil } else if arrowType.ID() == arrow.STRUCT { // build sparse vector as parquet struct @@ -399,15 +453,27 @@ func BuildSparseVectorData(mem *memory.GoAllocator, contents [][]byte, arrowType return nil, merr.WrapErrImportFailed(msg) } - for i := 0; i < len(contents); i++ { - builder.Append(true) - indicesBuilder.Append(true) - valuesBuilder.Append(true) - rowVecData := contents[i] - elemCount := len(rowVecData) / 8 - for j := 0; j < elemCount; j++ { - appendIndexFunc(common.Endian.Uint32(rowVecData[j*8:])) - appendValueFunc(math.Float32frombits(common.Endian.Uint32(rowVecData[j*8+4:]))) + // For sparse storage: iterate over logical rows, use physical index for contents + var logicalRows int + if len(validData) > 0 { + logicalRows = len(validData) + } else { + logicalRows = len(contents) + } + physicalIdx := 0 + for i := 0; i < logicalRows; i++ { + isValid := len(validData) == 0 || validData[i] + builder.Append(isValid) + indicesBuilder.Append(isValid) + valuesBuilder.Append(isValid) + if isValid { + rowVecData := contents[physicalIdx] + elemCount := len(rowVecData) / 8 + for j := 0; j < elemCount; j++ { + appendIndexFunc(common.Endian.Uint32(rowVecData[j*8:])) + appendValueFunc(math.Float32frombits(common.Endian.Uint32(rowVecData[j*8+4:]))) + } + physicalIdx++ } } return builder.NewStructArray(), nil @@ -490,82 +556,183 @@ func BuildArrayData(schema *schemapb.CollectionSchema, insertData *storage.Inser columns = append(columns, builder.NewStringArray()) case schemapb.DataType_BinaryVector: builder := array.NewListBuilder(mem, &arrow.Uint8Type{}) + valueBuilder := builder.ValueBuilder().(*array.Uint8Builder) dim := insertData.Data[fieldID].(*storage.BinaryVectorFieldData).Dim binVecData := insertData.Data[fieldID].(*storage.BinaryVectorFieldData).Data + validData := insertData.Data[fieldID].(*storage.BinaryVectorFieldData).ValidData rowBytes := dim / 8 - rows := len(binVecData) / rowBytes - offsets := make([]int32, 0, rows) - valid := make([]bool, 0) - for i := 0; i < rows; i++ { - offsets = append(offsets, int32(i*rowBytes)) - valid = append(valid, true) + // For sparse storage: logicalRows from validData, physicalRows from data + var logicalRows int + if len(validData) > 0 { + logicalRows = len(validData) + } else { + logicalRows = len(binVecData) / rowBytes } - builder.ValueBuilder().(*array.Uint8Builder).AppendValues(binVecData, nil) + offsets := make([]int32, 0, logicalRows+1) + valid := make([]bool, 0, logicalRows) + currOffset := int32(0) + physicalIdx := 0 // Track physical index in sparse data + for i := 0; i < logicalRows; i++ { + offsets = append(offsets, currOffset) + if len(validData) > 0 && !validData[i] { + valid = append(valid, false) + } else { + // Use physical index for sparse storage + start := physicalIdx * rowBytes + end := start + rowBytes + valueBuilder.AppendValues(binVecData[start:end], nil) + currOffset += int32(rowBytes) + valid = append(valid, true) + physicalIdx++ // Increment only for valid rows + } + } + offsets = append(offsets, currOffset) builder.AppendValues(offsets, valid) columns = append(columns, builder.NewListArray()) case schemapb.DataType_FloatVector: builder := array.NewListBuilder(mem, &arrow.Float32Type{}) + valueBuilder := builder.ValueBuilder().(*array.Float32Builder) dim := insertData.Data[fieldID].(*storage.FloatVectorFieldData).Dim floatVecData := insertData.Data[fieldID].(*storage.FloatVectorFieldData).Data - rows := len(floatVecData) / dim - offsets := make([]int32, 0, rows) - valid := make([]bool, 0, rows) - for i := 0; i < rows; i++ { - offsets = append(offsets, int32(i*dim)) - valid = append(valid, true) + validData := insertData.Data[fieldID].(*storage.FloatVectorFieldData).ValidData + // For sparse storage: logicalRows from validData, physicalRows from data + var logicalRows int + if len(validData) > 0 { + logicalRows = len(validData) + } else { + logicalRows = len(floatVecData) / dim } - builder.ValueBuilder().(*array.Float32Builder).AppendValues(floatVecData, nil) + offsets := make([]int32, 0, logicalRows+1) + valid := make([]bool, 0, logicalRows) + currOffset := int32(0) + physicalIdx := 0 // Track physical index in sparse data + for i := 0; i < logicalRows; i++ { + offsets = append(offsets, currOffset) + if len(validData) > 0 && !validData[i] { + valid = append(valid, false) + } else { + // Use physical index for sparse storage + start := physicalIdx * dim + end := start + dim + valueBuilder.AppendValues(floatVecData[start:end], nil) + currOffset += int32(dim) + valid = append(valid, true) + physicalIdx++ // Increment only for valid rows + } + } + offsets = append(offsets, currOffset) builder.AppendValues(offsets, valid) columns = append(columns, builder.NewListArray()) case schemapb.DataType_Float16Vector: builder := array.NewListBuilder(mem, &arrow.Uint8Type{}) + valueBuilder := builder.ValueBuilder().(*array.Uint8Builder) dim := insertData.Data[fieldID].(*storage.Float16VectorFieldData).Dim float16VecData := insertData.Data[fieldID].(*storage.Float16VectorFieldData).Data + validData := insertData.Data[fieldID].(*storage.Float16VectorFieldData).ValidData rowBytes := dim * 2 - rows := len(float16VecData) / rowBytes - offsets := make([]int32, 0, rows) - valid := make([]bool, 0, rows) - for i := 0; i < rows; i++ { - offsets = append(offsets, int32(i*rowBytes)) - valid = append(valid, true) + // For sparse storage: logicalRows from validData, physicalRows from data + var logicalRows int + if len(validData) > 0 { + logicalRows = len(validData) + } else { + logicalRows = len(float16VecData) / rowBytes } - builder.ValueBuilder().(*array.Uint8Builder).AppendValues(float16VecData, nil) + offsets := make([]int32, 0, logicalRows+1) + valid := make([]bool, 0, logicalRows) + currOffset := int32(0) + physicalIdx := 0 // Track physical index in sparse data + for i := 0; i < logicalRows; i++ { + offsets = append(offsets, currOffset) + if len(validData) > 0 && !validData[i] { + valid = append(valid, false) + } else { + // Use physical index for sparse storage + start := physicalIdx * rowBytes + end := start + rowBytes + valueBuilder.AppendValues(float16VecData[start:end], nil) + currOffset += int32(rowBytes) + valid = append(valid, true) + physicalIdx++ // Increment only for valid rows + } + } + offsets = append(offsets, currOffset) builder.AppendValues(offsets, valid) columns = append(columns, builder.NewListArray()) case schemapb.DataType_BFloat16Vector: builder := array.NewListBuilder(mem, &arrow.Uint8Type{}) + valueBuilder := builder.ValueBuilder().(*array.Uint8Builder) dim := insertData.Data[fieldID].(*storage.BFloat16VectorFieldData).Dim bfloat16VecData := insertData.Data[fieldID].(*storage.BFloat16VectorFieldData).Data + validData := insertData.Data[fieldID].(*storage.BFloat16VectorFieldData).ValidData rowBytes := dim * 2 - rows := len(bfloat16VecData) / rowBytes - offsets := make([]int32, 0, rows) - valid := make([]bool, 0, rows) - for i := 0; i < rows; i++ { - offsets = append(offsets, int32(i*rowBytes)) - valid = append(valid, true) + // For sparse storage: logicalRows from validData, physicalRows from data + var logicalRows int + if len(validData) > 0 { + logicalRows = len(validData) + } else { + logicalRows = len(bfloat16VecData) / rowBytes } - builder.ValueBuilder().(*array.Uint8Builder).AppendValues(bfloat16VecData, nil) + offsets := make([]int32, 0, logicalRows+1) + valid := make([]bool, 0, logicalRows) + currOffset := int32(0) + physicalIdx := 0 // Track physical index in sparse data + for i := 0; i < logicalRows; i++ { + offsets = append(offsets, currOffset) + if len(validData) > 0 && !validData[i] { + valid = append(valid, false) + } else { + // Use physical index for sparse storage + start := physicalIdx * rowBytes + end := start + rowBytes + valueBuilder.AppendValues(bfloat16VecData[start:end], nil) + currOffset += int32(rowBytes) + valid = append(valid, true) + physicalIdx++ // Increment only for valid rows + } + } + offsets = append(offsets, currOffset) builder.AppendValues(offsets, valid) columns = append(columns, builder.NewListArray()) case schemapb.DataType_SparseFloatVector: contents := insertData.Data[fieldID].(*storage.SparseFloatVectorFieldData).GetContents() - arr, err := BuildSparseVectorData(mem, contents, nil) + validData := insertData.Data[fieldID].(*storage.SparseFloatVectorFieldData).ValidData + arr, err := BuildSparseVectorData(mem, contents, nil, validData) if err != nil { return nil, err } columns = append(columns, arr) case schemapb.DataType_Int8Vector: builder := array.NewListBuilder(mem, &arrow.Int8Type{}) + valueBuilder := builder.ValueBuilder().(*array.Int8Builder) dim := insertData.Data[fieldID].(*storage.Int8VectorFieldData).Dim int8VecData := insertData.Data[fieldID].(*storage.Int8VectorFieldData).Data - rows := len(int8VecData) / dim - offsets := make([]int32, 0, rows) - valid := make([]bool, 0, rows) - for i := 0; i < rows; i++ { - offsets = append(offsets, int32(i*dim)) - valid = append(valid, true) + validData := insertData.Data[fieldID].(*storage.Int8VectorFieldData).ValidData + // For sparse storage: logicalRows from validData, physicalRows from data + var logicalRows int + if len(validData) > 0 { + logicalRows = len(validData) + } else { + logicalRows = len(int8VecData) / dim } - builder.ValueBuilder().(*array.Int8Builder).AppendValues(int8VecData, nil) + offsets := make([]int32, 0, logicalRows+1) + valid := make([]bool, 0, logicalRows) + currOffset := int32(0) + physicalIdx := 0 // Track physical index in sparse data + for i := 0; i < logicalRows; i++ { + offsets = append(offsets, currOffset) + if len(validData) > 0 && !validData[i] { + valid = append(valid, false) + } else { + // Use physical index for sparse storage + start := physicalIdx * dim + end := start + dim + valueBuilder.AppendValues(int8VecData[start:end], nil) + currOffset += int32(dim) + valid = append(valid, true) + physicalIdx++ // Increment only for valid rows + } + } + offsets = append(offsets, currOffset) builder.AppendValues(offsets, valid) columns = append(columns, builder.NewListArray()) case schemapb.DataType_JSON: