From 9191c8dcfd32ee6c8ac1245b64b2dc9eccbd568c Mon Sep 17 00:00:00 2001 From: groot Date: Thu, 13 Nov 2025 18:13:38 +0800 Subject: [PATCH] fix: [2.6] Fix bulkimport bug for Struct field (#45536) issue: https://github.com/milvus-io/milvus/issues/45006 pr: https://github.com/milvus-io/milvus/pull/45474 Signed-off-by: yhmo --- internal/util/importutilv2/csv/row_parser.go | 121 ++++++----- .../util/importutilv2/csv/row_parser_test.go | 194 +++++------------- .../parquet/struct_field_reader.go | 75 ++++++- internal/util/testutil/test_util.go | 17 +- 4 files changed, 190 insertions(+), 217 deletions(-) diff --git a/internal/util/importutilv2/csv/row_parser.go b/internal/util/importutilv2/csv/row_parser.go index fcb4550561..79b58527eb 100644 --- a/internal/util/importutilv2/csv/row_parser.go +++ b/internal/util/importutilv2/csv/row_parser.go @@ -42,6 +42,7 @@ type rowParser struct { header []string name2Dim map[string]int name2Field map[string]*schemapb.FieldSchema + name2StructField map[string]*schemapb.StructArrayFieldSchema structArrays map[string]map[string]*schemapb.FieldSchema structArraySubFields map[string]interface{} pkField *schemapb.FieldSchema @@ -77,9 +78,11 @@ func NewRowParser(schema *schemapb.CollectionSchema, header []string, nullkey st ) structArrays := make(map[string]map[string]*schemapb.FieldSchema) + name2StructField := make(map[string]*schemapb.StructArrayFieldSchema) structArraySubFields := make(map[string]interface{}) for _, sa := range schema.GetStructArrayFields() { + name2StructField[sa.GetName()] = sa structArrays[sa.GetName()] = make(map[string]*schemapb.FieldSchema) for _, subField := range sa.GetFields() { structArraySubFields[subField.GetName()] = nil @@ -138,6 +141,7 @@ func NewRowParser(schema *schemapb.CollectionSchema, header []string, nullkey st name2Dim: name2Dim, header: header, name2Field: name2Field, + name2StructField: name2StructField, structArrays: structArrays, structArraySubFields: structArraySubFields, pkField: pkField, @@ -159,52 +163,33 @@ func NewRowParser(schema *schemapb.CollectionSchema, header []string, nullkey st // // we reconstruct it to be handled by handleField as: // -// {"sub-field1": "[1, 2]", "sub-field2": "[[1.0, 2.0], [3.0, 4.0]]"} -func (r *rowParser) reconstructArrayForStructArray(structName string, subFieldsMap map[string]*schemapb.FieldSchema, raw string) (map[string]string, error) { +// {"struct[sub-field1]": "[1, 2]", "struct[sub-field2]": "[[1.0, 2.0], [3.0, 4.0]]"} +func (r *rowParser) reconstructArrayForStructArray(structName string, subFieldsMap map[string]*schemapb.FieldSchema, raw string) (map[string][]any, error) { // Parse the JSON array string - var rows []any + var structs []any dec := json.NewDecoder(strings.NewReader(raw)) dec.UseNumber() - if err := dec.Decode(&rows); err != nil { + if err := dec.Decode(&structs); err != nil { return nil, merr.WrapErrImportFailed(fmt.Sprintf("invalid StructArray format in CSV, failed to parse JSON: %v", err)) } - buf := make(map[string][]any) - for _, elem := range rows { - row, ok := elem.(map[string]any) + flatStructs := make(map[string][]any) + for _, elem := range structs { + dict, ok := elem.(map[string]any) if !ok { return nil, merr.WrapErrImportFailed(fmt.Sprintf("invalid element in StructArray, expect map[string]any but got type %T", elem)) } - for key, value := range row { + for key, value := range dict { fieldName := typeutil.ConcatStructFieldName(structName, key) - field, ok := subFieldsMap[fieldName] + _, ok := subFieldsMap[fieldName] if !ok { return nil, merr.WrapErrImportFailed(fmt.Sprintf("field %s not found", fieldName)) } - strVal, ok := value.(string) - if !ok { - return nil, merr.WrapErrImportFailed(fmt.Sprintf("invalid value type for field %s, expect string but got %T", fieldName, value)) - } - data, err := r.parseEntity(field, strVal, true) - if err != nil { - return nil, err - } - buf[fieldName] = append(buf[fieldName], data) + flatStructs[fieldName] = append(flatStructs[fieldName], value) } } - - // Convert aggregated arrays to JSON strings - out := make(map[string]string, len(buf)) - for k, v := range buf { - // Marshal the array as JSON string so it can be parsed by parseEntity - jsonBytes, err := json.Marshal(v) - if err != nil { - return nil, err - } - out[k] = string(jsonBytes) - } - return out, nil + return flatStructs, nil } func (r *rowParser) Parse(strArr []string) (Row, error) { @@ -216,24 +201,29 @@ func (r *rowParser) Parse(strArr []string) (Row, error) { dynamicValues := make(map[string]string) // read values from csv file for index, value := range strArr { - if subFieldsMap, ok := r.structArrays[r.header[index]]; ok { - values, err := r.reconstructArrayForStructArray(r.header[index], subFieldsMap, value) + csvFieldName := r.header[index] + if subFieldsMap, ok := r.structArrays[csvFieldName]; ok { + _, ok := r.name2StructField[csvFieldName] + if !ok { + return nil, merr.WrapErrImportFailed(fmt.Sprintf("struct field %s is not found in schema", csvFieldName)) + } + flatStructs, err := r.reconstructArrayForStructArray(r.header[index], subFieldsMap, value) if err != nil { return nil, err } - - for subKey, subValue := range values { + for subKey, subValues := range flatStructs { field, ok := r.name2Field[subKey] if !ok { - return nil, merr.WrapErrImportFailed(fmt.Sprintf("field %s not found", subKey)) + return nil, merr.WrapErrImportFailed(fmt.Sprintf("sub field %s of struct field %s is not found in schema", subKey, csvFieldName)) } - data, err := r.parseEntity(field, subValue, false) + // TODO: how to get max capacity from a StructFieldSchema? + data, err := r.parseStructEntity(field, subValues) if err != nil { return nil, err } row[field.GetFieldID()] = data } - } else if field, ok := r.name2Field[r.header[index]]; ok { + } else if field, ok := r.name2Field[csvFieldName]; ok { data, err := r.parseEntity(field, value, false) if err != nil { return nil, err @@ -328,6 +318,43 @@ func (r *rowParser) combineDynamicRow(dynamicValues map[string]string, row Row) return nil } +func (r *rowParser) parseStructEntity(field *schemapb.FieldSchema, values []any) (any, error) { + dataType := field.GetDataType() + switch dataType { + case schemapb.DataType_ArrayOfVector: + maxCapacity, err := parameterutil.GetMaxCapacity(field) + if err != nil { + return nil, err + } + if err := common.CheckArrayCapacity(len(values), maxCapacity, field); err != nil { + return nil, err + } + vectorFieldData, err := r.arrayOfVectorToFieldData(values, field) + if err != nil { + return nil, err + } + return vectorFieldData, nil + case schemapb.DataType_Array: + maxCapacity, err := parameterutil.GetMaxCapacity(field) + if err != nil { + return nil, err + } + if err := common.CheckArrayCapacity(len(values), maxCapacity, field); err != nil { + return nil, err + } + + // elements in array not support null value + scalarFieldData, err := r.arrayToFieldData(values, field) + if err != nil { + return nil, err + } + return scalarFieldData, nil + default: + return nil, merr.WrapErrImportFailed( + fmt.Sprintf("parse csv failed, unsupport data type: %s for struct field: %s", dataType.String(), field.GetName())) + } +} + func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string, useElementType bool) (any, error) { if field.GetDefaultValue() != nil && obj == r.nullkey { return nullutil.GetDefaultValue(field) @@ -486,26 +513,6 @@ func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string, useElem return nil, r.wrapDimError(len(vec), field) } return vec, nil - case schemapb.DataType_ArrayOfVector: - var vec []interface{} - desc := json.NewDecoder(strings.NewReader(obj)) - desc.UseNumber() - err := desc.Decode(&vec) - if err != nil { - return nil, r.wrapTypeError(obj, field) - } - maxCapacity, err := parameterutil.GetMaxCapacity(field) - if err != nil { - return nil, err - } - if err = common.CheckArrayCapacity(len(vec), maxCapacity, field); err != nil { - return nil, err - } - vectorFieldData, err := r.arrayOfVectorToFieldData(vec, field) - if err != nil { - return nil, err - } - return vectorFieldData, nil case schemapb.DataType_Array: var vec []interface{} desc := json.NewDecoder(strings.NewReader(obj)) diff --git a/internal/util/importutilv2/csv/row_parser_test.go b/internal/util/importutilv2/csv/row_parser_test.go index f862f7f0e6..3a3380354c 100644 --- a/internal/util/importutilv2/csv/row_parser_test.go +++ b/internal/util/importutilv2/csv/row_parser_test.go @@ -68,41 +68,53 @@ func (suite *RowParserSuite) setSchema(autoID bool, hasNullable bool, hasDynamic suite.schema = suite.createAllTypesSchema() } +func (suite *RowParserSuite) createArrayFieldSchema(id int64, name string, elementType schemapb.DataType, nullable bool) *schemapb.FieldSchema { + return &schemapb.FieldSchema{ + FieldID: id, + Name: name, + DataType: schemapb.DataType_Array, + ElementType: elementType, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.MaxCapacityKey, + Value: "4", + }, + { + Key: common.MaxLengthKey, + Value: "8", + }, + }, + Nullable: nullable, + } +} + func (suite *RowParserSuite) createAllTypesSchema() *schemapb.CollectionSchema { structArray := &schemapb.StructArrayFieldSchema{ - FieldID: 110, + FieldID: 1000, Name: "struct_array", Fields: []*schemapb.FieldSchema{ + suite.createArrayFieldSchema(1001, "struct_array[sub_bool]", schemapb.DataType_Bool, false), + suite.createArrayFieldSchema(1002, "struct_array[sub_int8]", schemapb.DataType_Int8, false), + suite.createArrayFieldSchema(1003, "struct_array[sub_int16]", schemapb.DataType_Int16, false), + suite.createArrayFieldSchema(1004, "struct_array[sub_int32]", schemapb.DataType_Int32, false), + suite.createArrayFieldSchema(1005, "struct_array[sub_int64]", schemapb.DataType_Int64, false), + suite.createArrayFieldSchema(1006, "struct_array[sub_float]", schemapb.DataType_Float, false), + suite.createArrayFieldSchema(1007, "struct_array[sub_double]", schemapb.DataType_Double, false), + suite.createArrayFieldSchema(1008, "struct_array[sub_str]", schemapb.DataType_VarChar, false), { - FieldID: 111, + FieldID: 1009, Name: "struct_array[sub_float_vector]", DataType: schemapb.DataType_ArrayOfVector, ElementType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.MaxCapacityKey, + Value: "4", + }, { Key: common.DimKey, Value: "2", }, - { - Key: "max_capacity", - Value: "4", - }, - }, - }, - { - FieldID: 112, - Name: "struct_array[sub_str]", - DataType: schemapb.DataType_Array, - ElementType: schemapb.DataType_VarChar, - TypeParams: []*commonpb.KeyValuePair{ - { - Key: "max_capacity", - Value: "4", - }, - { - Key: "max_length", - Value: "8", - }, }, }, }, @@ -161,114 +173,14 @@ func (suite *RowParserSuite) createAllTypesSchema() *schemapb.CollectionSchema { IsFunctionOutput: true, }, - { - FieldID: 50, - Name: "array_bool", - DataType: schemapb.DataType_Array, - ElementType: schemapb.DataType_Bool, - TypeParams: []*commonpb.KeyValuePair{ - { - Key: "max_capacity", - Value: "4", - }, - }, - Nullable: suite.hasNullable, - }, - { - FieldID: 51, - Name: "array_int8", - DataType: schemapb.DataType_Array, - ElementType: schemapb.DataType_Int8, - TypeParams: []*commonpb.KeyValuePair{ - { - Key: "max_capacity", - Value: "4", - }, - }, - Nullable: suite.hasNullable, - }, - { - FieldID: 52, - Name: "array_int16", - DataType: schemapb.DataType_Array, - ElementType: schemapb.DataType_Int16, - TypeParams: []*commonpb.KeyValuePair{ - { - Key: "max_capacity", - Value: "4", - }, - }, - Nullable: suite.hasNullable, - }, - { - FieldID: 53, - Name: "array_int32", - DataType: schemapb.DataType_Array, - ElementType: schemapb.DataType_Int32, - TypeParams: []*commonpb.KeyValuePair{ - { - Key: "max_capacity", - Value: "4", - }, - }, - Nullable: suite.hasNullable, - }, - { - FieldID: 54, - Name: "array_int64", - DataType: schemapb.DataType_Array, - ElementType: schemapb.DataType_Int64, - TypeParams: []*commonpb.KeyValuePair{ - { - Key: "max_capacity", - Value: "4", - }, - }, - Nullable: suite.hasNullable, - }, - { - FieldID: 55, - Name: "array_float", - DataType: schemapb.DataType_Array, - ElementType: schemapb.DataType_Float, - TypeParams: []*commonpb.KeyValuePair{ - { - Key: "max_capacity", - Value: "4", - }, - }, - Nullable: suite.hasNullable, - }, - { - FieldID: 56, - Name: "array_double", - DataType: schemapb.DataType_Array, - ElementType: schemapb.DataType_Double, - TypeParams: []*commonpb.KeyValuePair{ - { - Key: "max_capacity", - Value: "4", - }, - }, - Nullable: suite.hasNullable, - }, - { - FieldID: 57, - Name: "array_varchar", - DataType: schemapb.DataType_Array, - ElementType: schemapb.DataType_VarChar, - TypeParams: []*commonpb.KeyValuePair{ - { - Key: "max_capacity", - Value: "4", - }, - { - Key: "max_length", - Value: "8", - }, - }, - Nullable: suite.hasNullable, - }, + suite.createArrayFieldSchema(50, "array_bool", schemapb.DataType_Bool, suite.hasNullable), + suite.createArrayFieldSchema(51, "array_int8", schemapb.DataType_Int8, suite.hasNullable), + suite.createArrayFieldSchema(52, "array_int16", schemapb.DataType_Int16, suite.hasNullable), + suite.createArrayFieldSchema(53, "array_int32", schemapb.DataType_Int32, suite.hasNullable), + suite.createArrayFieldSchema(54, "array_int64", schemapb.DataType_Int64, suite.hasNullable), + suite.createArrayFieldSchema(55, "array_float", schemapb.DataType_Float, suite.hasNullable), + suite.createArrayFieldSchema(56, "array_double", schemapb.DataType_Double, suite.hasNullable), + suite.createArrayFieldSchema(57, "array_varchar", schemapb.DataType_VarChar, suite.hasNullable), { FieldID: 101, @@ -387,8 +299,11 @@ func (suite *RowParserSuite) genAllTypesRowData(resetKey string, resetVal string rawContent["json"] = "{\"a\": 1}" rawContent["x"] = "2" rawContent["$meta"] = "{\"dynamic\": \"dummy\"}" - rawContent["struct_array"] = "[{\"sub_float_vector\": \"[0.1, 0.2]\", \"sub_str\": \"hello1\"}, " + - "{\"sub_float_vector\": \"[0.3, 0.4]\", \"sub_str\": \"hello2\"}]" + + rawContent["struct_array"] = "[{\"sub_bool\": true, \"sub_int8\": 3, \"sub_int16\": 4, \"sub_int16\": 5, \"sub_int32\": 6," + + "\"sub_int64\": 7, \"sub_float\": 3.1415, \"sub_double\": 99.99, \"sub_float_vector\": [0.1, 0.2], \"sub_str\": \"hello1\"}, " + + "{\"sub_bool\": false, \"sub_int8\": 13, \"sub_int16\": 14, \"sub_int16\": 15, \"sub_int32\": 16," + + "\"sub_int64\": 17, \"sub_float\": 13.1415, \"sub_double\": 199.99, \"sub_float_vector\": [0.3, 0.4], \"sub_str\": \"hello2\"}]" rawContent["geometry"] = "POINT (30.123 -10.456)" rawContent[resetKey] = resetVal // reset a value for _, deleteKey := range deleteKeys { @@ -661,12 +576,9 @@ func (suite *RowParserSuite) runValid(c *testCase) { suite.True(ok, "Sub-field %s should be a VectorField", subFieldName) // Extract expected vectors from struct array data - var expectedVectors [][]float32 + var expectedVectors [][]any for _, elem := range structArrayData { - if vecStr, ok := elem[originalSubFieldName].(string); ok { - var vec []float32 - err := json.Unmarshal([]byte(vecStr), &vec) - suite.NoError(err) + if vec, ok := elem[originalSubFieldName].([]any); ok { expectedVectors = append(expectedVectors, vec) } } @@ -674,7 +586,13 @@ func (suite *RowParserSuite) runValid(c *testCase) { // Flatten and compare var expectedFlat []float32 for _, vec := range expectedVectors { - expectedFlat = append(expectedFlat, vec...) + var vecFlat []float32 + for _, val := range vec { + jval := val.(json.Number) + fval, _ := jval.Float64() + vecFlat = append(vecFlat, float32(fval)) + } + expectedFlat = append(expectedFlat, vecFlat...) } suite.Equal(expectedFlat, vf.GetFloatVector().GetData()) diff --git a/internal/util/importutilv2/parquet/struct_field_reader.go b/internal/util/importutilv2/parquet/struct_field_reader.go index 8803455505..90fe12c179 100644 --- a/internal/util/importutilv2/parquet/struct_field_reader.go +++ b/internal/util/importutilv2/parquet/struct_field_reader.go @@ -102,12 +102,48 @@ func (r *StructFieldReader) Next(count int64) (any, any, error) { } } -func (r *StructFieldReader) toScalarField(data []interface{}) *schemapb.ScalarField { +func (r *StructFieldReader) toScalarField(data []interface{}) (*schemapb.ScalarField, error) { if len(data) == 0 { - return nil + return nil, nil } switch r.field.GetElementType() { + case schemapb.DataType_Bool: + boolData := make([]bool, len(data)) + for i, v := range data { + if val, ok := v.(bool); ok { + boolData[i] = val + } + } + return &schemapb.ScalarField{ + Data: &schemapb.ScalarField_BoolData{ + BoolData: &schemapb.BoolArray{Data: boolData}, + }, + }, nil + case schemapb.DataType_Int8: + intData := make([]int32, len(data)) + for i, v := range data { + if val, ok := v.(int8); ok { + intData[i] = int32(val) + } + } + return &schemapb.ScalarField{ + Data: &schemapb.ScalarField_IntData{ + IntData: &schemapb.IntArray{Data: intData}, + }, + }, nil + case schemapb.DataType_Int16: + intData := make([]int32, len(data)) + for i, v := range data { + if val, ok := v.(int16); ok { + intData[i] = int32(val) + } + } + return &schemapb.ScalarField{ + Data: &schemapb.ScalarField_IntData{ + IntData: &schemapb.IntArray{Data: intData}, + }, + }, nil case schemapb.DataType_Int32: intData := make([]int32, len(data)) for i, v := range data { @@ -119,7 +155,19 @@ func (r *StructFieldReader) toScalarField(data []interface{}) *schemapb.ScalarFi Data: &schemapb.ScalarField_IntData{ IntData: &schemapb.IntArray{Data: intData}, }, + }, nil + case schemapb.DataType_Int64: + intData := make([]int64, len(data)) + for i, v := range data { + if val, ok := v.(int64); ok { + intData[i] = val + } } + return &schemapb.ScalarField{ + Data: &schemapb.ScalarField_LongData{ + LongData: &schemapb.LongArray{Data: intData}, + }, + }, nil case schemapb.DataType_Float: floatData := make([]float32, len(data)) for i, v := range data { @@ -131,7 +179,19 @@ func (r *StructFieldReader) toScalarField(data []interface{}) *schemapb.ScalarFi Data: &schemapb.ScalarField_FloatData{ FloatData: &schemapb.FloatArray{Data: floatData}, }, + }, nil + case schemapb.DataType_Double: + floatData := make([]float64, len(data)) + for i, v := range data { + if val, ok := v.(float64); ok { + floatData[i] = val + } } + return &schemapb.ScalarField{ + Data: &schemapb.ScalarField_DoubleData{ + DoubleData: &schemapb.DoubleArray{Data: floatData}, + }, + }, nil case schemapb.DataType_String, schemapb.DataType_VarChar: strData := make([]string, len(data)) for i, v := range data { @@ -143,10 +203,10 @@ func (r *StructFieldReader) toScalarField(data []interface{}) *schemapb.ScalarFi Data: &schemapb.ScalarField_StringData{ StringData: &schemapb.StringArray{Data: strData}, }, - } + }, nil + default: + return nil, merr.WrapErrImportFailed(fmt.Sprintf("unsupported element type for struct field: %v", r.field.GetDataType())) } - - return nil } func (r *StructFieldReader) readArrayField(chunked *arrow.Chunked) (any, any, error) { @@ -208,7 +268,10 @@ func (r *StructFieldReader) readArrayField(chunked *arrow.Chunked) (any, any, er } // Create a single ScalarField for this row - scalarField := r.toScalarField(combinedData) + scalarField, err := r.toScalarField(combinedData) + if err != nil { + return nil, nil, err + } if scalarField != nil { result = append(result, scalarField) } diff --git a/internal/util/testutil/test_util.go b/internal/util/testutil/test_util.go index 1ef646c52e..700b55b774 100644 --- a/internal/util/testutil/test_util.go +++ b/internal/util/testutil/test_util.go @@ -1354,23 +1354,8 @@ func reconstructStructArrayForCSV(structField *schemapb.StructArrayFieldSchema, return "", err } - // Convert to CSV format: each sub-field value needs to be JSON-encoded - csvArray := make([]map[string]string, len(structArray)) - for i, elem := range structArray { - csvElem := make(map[string]string) - for key, value := range elem { - // Convert each value to JSON string for CSV - jsonBytes, err := json.Marshal(value) - if err != nil { - return "", err - } - csvElem[key] = string(jsonBytes) - } - csvArray[i] = csvElem - } - // Convert the entire struct array to JSON string - jsonBytes, err := json.Marshal(csvArray) + jsonBytes, err := json.Marshal(structArray) if err != nil { return "", err }