From eb793531b92b3ed8f4aea7c53fe06424b36d95ce Mon Sep 17 00:00:00 2001 From: Spade A <71589810+SpadeA-Tang@users.noreply.github.com> Date: Mon, 15 Sep 2025 20:41:59 +0800 Subject: [PATCH] feat: impl StructArray -- support import for CSV/JSON/PARQUET/BINLOG (#44201) Ref https://github.com/milvus-io/milvus/issues/42148 --------- Signed-off-by: SpadeA --- internal/datanode/importv2/util.go | 6 +- internal/storage/payload_writer.go | 28 +- internal/storage/serde_events.go | 3 +- internal/storage/utils.go | 20 + internal/util/importutilv2/binlog/reader.go | 3 +- .../util/importutilv2/binlog/reader_test.go | 60 ++- internal/util/importutilv2/binlog/util.go | 21 +- internal/util/importutilv2/csv/row_parser.go | 210 +++++++++- .../util/importutilv2/csv/row_parser_test.go | 123 ++++++ internal/util/importutilv2/json/row_parser.go | 147 ++++++- .../util/importutilv2/json/row_parser_test.go | 52 +++ .../util/importutilv2/parquet/field_reader.go | 75 ++++ internal/util/importutilv2/parquet/reader.go | 4 +- .../util/importutilv2/parquet/reader_test.go | 114 ++++++ internal/util/importutilv2/parquet/util.go | 32 +- internal/util/testutil/test_util.go | 373 +++++++++++++++++- pkg/util/parameterutil/get_max_len.go | 4 +- pkg/util/typeutil/schema.go | 28 +- tests/integration/import/binlog_test.go | 15 +- tests/integration/import/vector_array_test.go | 306 ++++++++++++++ 20 files changed, 1538 insertions(+), 86 deletions(-) create mode 100644 tests/integration/import/vector_array_test.go diff --git a/internal/datanode/importv2/util.go b/internal/datanode/importv2/util.go index f4b3c44709..b2f35d4a0c 100644 --- a/internal/datanode/importv2/util.go +++ b/internal/datanode/importv2/util.go @@ -143,7 +143,8 @@ func CheckRowsEqual(schema *schemapb.CollectionSchema, data *storage.InsertData) if len(data.Data) == 0 { return nil } - idToField := lo.KeyBy(schema.GetFields(), func(field *schemapb.FieldSchema) int64 { + allFields := typeutil.GetAllFieldSchemas(schema) + idToField := lo.KeyBy(allFields, func(field *schemapb.FieldSchema) int64 { return field.GetFieldID() }) @@ -238,7 +239,8 @@ func IsFillableField(field *schemapb.FieldSchema) bool { } func AppendNullableDefaultFieldsData(schema *schemapb.CollectionSchema, data *storage.InsertData, rowNum int) error { - for _, field := range schema.GetFields() { + allFields := typeutil.GetAllFieldSchemas(schema) + for _, field := range allFields { if !IsFillableField(field) { continue } diff --git a/internal/storage/payload_writer.go b/internal/storage/payload_writer.go index e5a8b49193..7082f6f93e 100644 --- a/internal/storage/payload_writer.go +++ b/internal/storage/payload_writer.go @@ -114,12 +114,12 @@ func NewPayloadWriter(colType schemapb.DataType, options ...PayloadWriterOptions if w.elementType == nil { return nil, merr.WrapErrParameterInvalidMsg("ArrayOfVector requires elementType, use WithElementType option") } - arrowType, err := VectorArrayToArrowType(*w.elementType) + elemType, err := VectorArrayToArrowType(*w.elementType) if err != nil { return nil, err } - w.arrowType = arrowType - w.builder = array.NewListBuilder(memory.DefaultAllocator, arrowType.(*arrow.ListType).Elem()) + w.arrowType = arrow.ListOf(elemType) + w.builder = array.NewListBuilder(memory.DefaultAllocator, elemType) } else { w.arrowType = MilvusDataTypeToArrowType(colType, *w.dim.Value) w.builder = array.NewBuilder(memory.DefaultAllocator, w.arrowType) @@ -899,28 +899,6 @@ func MilvusDataTypeToArrowType(dataType schemapb.DataType, dim int) arrow.DataTy } } -// VectorArrayToArrowType converts VectorArray type with elementType to Arrow ListArray type -func VectorArrayToArrowType(elementType schemapb.DataType) (arrow.DataType, error) { - var childType arrow.DataType - - switch elementType { - case schemapb.DataType_FloatVector: - childType = arrow.PrimitiveTypes.Float32 - case schemapb.DataType_BinaryVector: - return nil, merr.WrapErrParameterInvalidMsg("BinaryVector in VectorArray not implemented yet") - case schemapb.DataType_Float16Vector: - return nil, merr.WrapErrParameterInvalidMsg("Float16Vector in VectorArray not implemented yet") - case schemapb.DataType_BFloat16Vector: - return nil, merr.WrapErrParameterInvalidMsg("BFloat16Vector in VectorArray not implemented yet") - case schemapb.DataType_Int8Vector: - return nil, merr.WrapErrParameterInvalidMsg("Int8Vector in VectorArray not implemented yet") - default: - return nil, merr.WrapErrParameterInvalidMsg(fmt.Sprintf("unsupported element type in VectorArray: %s", elementType.String())) - } - - return arrow.ListOf(childType), nil -} - // AddVectorArrayFieldDataToPayload adds VectorArrayFieldData to payload using Arrow ListArray func (w *NativePayloadWriter) AddVectorArrayFieldDataToPayload(data *VectorArrayFieldData) error { if w.finished { diff --git a/internal/storage/serde_events.go b/internal/storage/serde_events.go index 78b58a0071..511fa639cf 100644 --- a/internal/storage/serde_events.go +++ b/internal/storage/serde_events.go @@ -125,8 +125,8 @@ func (crr *CompositeBinlogRecordReader) Next() (Record, error) { } recs := make([]arrow.Array, fieldNum) - idx := 0 appendFieldRecord := func(f *schemapb.FieldSchema) error { + idx := crr.index[f.FieldID] if crr.rrs[idx] != nil { if ok := crr.rrs[idx].Next(); !ok { return io.EOF @@ -143,7 +143,6 @@ func (crr *CompositeBinlogRecordReader) Next() (Record, error) { } recs[idx] = arr } - idx++ return nil } diff --git a/internal/storage/utils.go b/internal/storage/utils.go index 3c20e743be..7a9b474019 100644 --- a/internal/storage/utils.go +++ b/internal/storage/utils.go @@ -1641,3 +1641,23 @@ func SortFieldBinlogs(fieldBinlogs map[int64]*datapb.FieldBinlog) []*datapb.Fiel } return binlogs } + +// VectorArrayToArrowType converts VectorArray element type to the corresponding Arrow type +// Note: This returns the element type (e.g., float32), not a list type +// The caller is responsible for wrapping it in a list if needed +func VectorArrayToArrowType(elementType schemapb.DataType) (arrow.DataType, error) { + switch elementType { + case schemapb.DataType_FloatVector: + return arrow.PrimitiveTypes.Float32, nil + case schemapb.DataType_BinaryVector: + return nil, merr.WrapErrParameterInvalidMsg("BinaryVector in VectorArray not implemented yet") + case schemapb.DataType_Float16Vector: + return nil, merr.WrapErrParameterInvalidMsg("Float16Vector in VectorArray not implemented yet") + case schemapb.DataType_BFloat16Vector: + return nil, merr.WrapErrParameterInvalidMsg("BFloat16Vector in VectorArray not implemented yet") + case schemapb.DataType_Int8Vector: + return nil, merr.WrapErrParameterInvalidMsg("Int8Vector in VectorArray not implemented yet") + default: + return nil, merr.WrapErrParameterInvalidMsg(fmt.Sprintf("unsupported element type in VectorArray: %s", elementType.String())) + } +} diff --git a/internal/util/importutilv2/binlog/reader.go b/internal/util/importutilv2/binlog/reader.go index 6450f8d6ff..35990344c6 100644 --- a/internal/util/importutilv2/binlog/reader.go +++ b/internal/util/importutilv2/binlog/reader.go @@ -211,8 +211,9 @@ func (r *reader) Read() (*storage.InsertData, error) { if err != nil { return nil, err } + allFields := typeutil.GetAllFieldSchemas(r.schema) // convert record to fieldData - for _, field := range r.schema.Fields { + for _, field := range allFields { fieldData := insertData.Data[field.GetFieldID()] if fieldData == nil { fieldData, err = storage.NewFieldData(field.GetDataType(), field, 1024) diff --git a/internal/util/importutilv2/binlog/reader_test.go b/internal/util/importutilv2/binlog/reader_test.go index 937d054658..6c0701c277 100644 --- a/internal/util/importutilv2/binlog/reader_test.go +++ b/internal/util/importutilv2/binlog/reader_test.go @@ -100,7 +100,7 @@ func createBinlogBuf(t *testing.T, field *schemapb.FieldSchema, data storage.Fie dim = 1 } - evt, err := w.NextInsertEventWriter(storage.WithDim(int(dim)), storage.WithNullable(field.GetNullable())) + evt, err := w.NextInsertEventWriter(storage.WithDim(int(dim)), storage.WithNullable(field.GetNullable()), storage.WithElementType(field.GetElementType())) assert.NoError(t, err) evt.SetEventTimestamp(1, math.MaxInt64) @@ -190,6 +190,17 @@ func createBinlogBuf(t *testing.T, field *schemapb.FieldSchema, data storage.Fie vectors := data.(*storage.Int8VectorFieldData).Data err = evt.AddInt8VectorToPayload(vectors, int(dim)) assert.NoError(t, err) + case schemapb.DataType_ArrayOfVector: + elementType := field.GetElementType() + switch elementType { + case schemapb.DataType_FloatVector: + vectors := data.(*storage.VectorArrayFieldData) + err = evt.AddVectorArrayFieldDataToPayload(vectors) + assert.NoError(t, err) + default: + assert.True(t, false) + return nil + } default: assert.True(t, false) return nil @@ -254,7 +265,8 @@ func (suite *ReaderSuite) createMockChunk(schema *schemapb.CollectionSchema, ins paths = make([]string, 0) bytes = make([][]byte, 0) ) - for _, field := range schema.Fields { + allFields := typeutil.GetAllFieldSchemas(schema) + for _, field := range allFields { fieldID := field.GetFieldID() logs, ok := insertBinlogs[fieldID] if ok && len(logs) > 0 { @@ -327,8 +339,48 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data Nullable: nullable, }, }, + StructArrayFields: []*schemapb.StructArrayFieldSchema{ + { + FieldID: 103, + Fields: []*schemapb.FieldSchema{ + { + FieldID: 104, + Name: "struct_str", + DataType: schemapb.DataType_Array, + ElementType: schemapb.DataType_VarChar, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.MaxLengthKey, + Value: "256", + }, + { + Key: common.MaxCapacityKey, + Value: "20", + }, + }, + }, + { + FieldID: 105, + Name: "struct_float_vector", + DataType: schemapb.DataType_ArrayOfVector, + ElementType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.MaxCapacityKey, + Value: "20", + }, + { + Key: common.DimKey, + Value: "8", + }, + }, + }, + }, + }, + }, } - insertBinlogs := genBinlogPaths(lo.Map(schema.Fields, func(fieldSchema *schemapb.FieldSchema, _ int) int64 { + allFields := typeutil.GetAllFieldSchemas(schema) + insertBinlogs := genBinlogPaths(lo.Map(allFields, func(fieldSchema *schemapb.FieldSchema, _ int) int64 { return fieldSchema.GetFieldID() })) cm, originalInsertData := suite.createMockChunk(schema, insertBinlogs, true) @@ -383,6 +435,8 @@ OUTER: } else { suite.True(slices.Equal(expect.(*schemapb.ScalarField).GetIntData().GetData(), actual.(*schemapb.ScalarField).GetIntData().GetData())) } + } else if fieldDataType == schemapb.DataType_ArrayOfVector { + suite.True(slices.Equal(expect.(*schemapb.VectorField).GetFloatVector().GetData(), actual.(*schemapb.VectorField).GetFloatVector().GetData())) } else { suite.Equal(expect, actual) } diff --git a/internal/util/importutilv2/binlog/util.go b/internal/util/importutilv2/binlog/util.go index 15bdac2153..5aba926b58 100644 --- a/internal/util/importutilv2/binlog/util.go +++ b/internal/util/importutilv2/binlog/util.go @@ -140,8 +140,9 @@ func verify(schema *schemapb.CollectionSchema, storageVersion int64, insertLogs } } + allFields := typeutil.GetAllFieldSchemas(schema) if storageVersion == storage.StorageV2 { - for _, field := range schema.GetFields() { + for _, field := range allFields { if typeutil.IsVectorType(field.GetDataType()) { if _, ok := insertLogs[field.GetFieldID()]; !ok { // vector field must be provided @@ -174,8 +175,9 @@ func verify(schema *schemapb.CollectionSchema, storageVersion int64, insertLogs // Here we copy the schema for reading part of collection's data. The storage.NewBinlogRecordReader() requires // a schema and the schema must be consistent with the binglog files([]*datapb.FieldBinlog) cloneSchema := typeutil.Clone(schema) - cloneSchema.Fields = []*schemapb.FieldSchema{} // the Fields will be reset according to the validInsertLogs - cloneSchema.EnableDynamicField = false // this flag will be reset + cloneSchema.Fields = []*schemapb.FieldSchema{} // the Fields will be reset according to the validInsertLogs + cloneSchema.StructArrayFields = []*schemapb.StructArrayFieldSchema{} // the StructArrayFields will be reset according to the validInsertLogs + cloneSchema.EnableDynamicField = false // this flag will be reset // this loop will reset the cloneSchema.Fields and return validInsertLogs validInsertLogs := make(map[int64][]string) @@ -205,5 +207,18 @@ func verify(schema *schemapb.CollectionSchema, storageVersion int64, insertLogs } } + for _, structArrayField := range schema.GetStructArrayFields() { + for _, field := range structArrayField.GetFields() { + id := field.GetFieldID() + logs, ok := insertLogs[id] + if !ok { + return nil, nil, merr.WrapErrImportFailed(fmt.Sprintf("no binlog for struct field:%s", field.GetName())) + } + + validInsertLogs[id] = logs + } + cloneSchema.StructArrayFields = append(cloneSchema.StructArrayFields, structArrayField) + } + return validInsertLogs, cloneSchema, nil } diff --git a/internal/util/importutilv2/csv/row_parser.go b/internal/util/importutilv2/csv/row_parser.go index 364c9b3341..b5d871fc1d 100644 --- a/internal/util/importutilv2/csv/row_parser.go +++ b/internal/util/importutilv2/csv/row_parser.go @@ -36,12 +36,14 @@ type RowParser interface { Parse(raw []string) (Row, error) } type rowParser struct { - nullkey string - header []string - name2Dim map[string]int - name2Field map[string]*schemapb.FieldSchema - pkField *schemapb.FieldSchema - dynamicField *schemapb.FieldSchema + nullkey string + header []string + name2Dim map[string]int + name2Field map[string]*schemapb.FieldSchema + structArrays map[string]map[string]*schemapb.FieldSchema + structArraySubFields map[string]interface{} + pkField *schemapb.FieldSchema + dynamicField *schemapb.FieldSchema } func NewRowParser(schema *schemapb.CollectionSchema, header []string, nullkey string) (RowParser, error) { @@ -58,8 +60,10 @@ func NewRowParser(schema *schemapb.CollectionSchema, header []string, nullkey st } } + allFields := typeutil.GetAllFieldSchemas(schema) + name2Field := lo.SliceToMap( - lo.Filter(schema.GetFields(), func(field *schemapb.FieldSchema, _ int) bool { + lo.Filter(allFields, func(field *schemapb.FieldSchema, _ int) bool { return !field.GetIsFunctionOutput() && !typeutil.IsAutoPKField(field) && field.GetName() != dynamicField.GetName() }), func(field *schemapb.FieldSchema) (string, *schemapb.FieldSchema) { @@ -67,6 +71,17 @@ func NewRowParser(schema *schemapb.CollectionSchema, header []string, nullkey st }, ) + structArrays := make(map[string]map[string]*schemapb.FieldSchema) + + structArraySubFields := make(map[string]interface{}) + for _, sa := range schema.GetStructArrayFields() { + structArrays[sa.GetName()] = make(map[string]*schemapb.FieldSchema) + for _, subField := range sa.GetFields() { + structArraySubFields[subField.GetName()] = nil + structArrays[sa.GetName()][subField.GetName()] = subField + } + } + name2Dim := make(map[string]int) for name, field := range name2Field { if typeutil.IsVectorType(field.GetDataType()) && !typeutil.IsSparseFloatVectorType(field.GetDataType()) { @@ -103,7 +118,8 @@ func NewRowParser(schema *schemapb.CollectionSchema, header []string, nullkey st for fieldName, field := range name2Field { _, existInHeader := headerMap[fieldName] - if field.GetNullable() || field.GetDefaultValue() != nil { + _, subField := structArraySubFields[fieldName] + if field.GetNullable() || field.GetDefaultValue() != nil || subField { // nullable/defaultValue fields, provide or not provide both ok } else if !existInHeader { // not nullable/defaultValue/autoPK/functionOutput fields, must provide @@ -113,15 +129,76 @@ func NewRowParser(schema *schemapb.CollectionSchema, header []string, nullkey st } return &rowParser{ - nullkey: nullkey, - name2Dim: name2Dim, - header: header, - name2Field: name2Field, - pkField: pkField, - dynamicField: dynamicField, + nullkey: nullkey, + name2Dim: name2Dim, + header: header, + name2Field: name2Field, + structArrays: structArrays, + structArraySubFields: structArraySubFields, + pkField: pkField, + dynamicField: dynamicField, }, nil } +// reconstructArrayForStructArray reconstructs the StructArray data format from CSV. +// StructArray are passed in with format for one row: StructArray: [element 1, element 2, ...] +// where each element contains one value of all sub-fields in StructArrayField. +// So we need to reconstruct it to be handled by handleField. +// +// For example, let StructArrayFieldSchema { sub-field1: array of int32, sub-field2: array of float vector } +// When we have one row in CSV (as JSON string): +// +// "[{\"sub-field1\": 1, \"sub-field2\": [1.0, 2.0]}, {\"sub-field1\": 2, \"sub-field2\": [3.0, 4.0]}]" +// +// we reconstruct it to be handled by handleField as: +// +// {"sub-field1": "[1, 2]", "sub-field2": "[[1.0, 2.0], [3.0, 4.0]]"} +func (r *rowParser) reconstructArrayForStructArray(subFieldsMap map[string]*schemapb.FieldSchema, raw string) (map[string]string, error) { + // Parse the JSON array string + var rows []any + dec := json.NewDecoder(strings.NewReader(raw)) + dec.UseNumber() + if err := dec.Decode(&rows); err != nil { + return nil, merr.WrapErrImportFailed(fmt.Sprintf("invalid StructArray format in CSV, failed to parse JSON: %v", err)) + } + + buf := make(map[string][]any) + for _, elem := range rows { + row, ok := elem.(map[string]any) + if !ok { + return nil, merr.WrapErrImportFailed(fmt.Sprintf("invalid element in StructArray, expect map[string]any but got type %T", elem)) + } + for key, value := range row { + field, ok := subFieldsMap[key] + if !ok { + return nil, merr.WrapErrImportFailed(fmt.Sprintf("field %s not found", key)) + } + strVal, ok := value.(string) + if !ok { + return nil, merr.WrapErrImportFailed(fmt.Sprintf("invalid value type for field %s, expect string but got %T", key, value)) + } + + data, err := r.parseEntity(field, strVal, true) + if err != nil { + return nil, err + } + buf[key] = append(buf[key], data) + } + } + + // Convert aggregated arrays to JSON strings + out := make(map[string]string, len(buf)) + for k, v := range buf { + // Marshal the array as JSON string so it can be parsed by parseEntity + jsonBytes, err := json.Marshal(v) + if err != nil { + return nil, err + } + out[k] = string(jsonBytes) + } + return out, nil +} + func (r *rowParser) Parse(strArr []string) (Row, error) { if len(strArr) != len(r.header) { return nil, merr.WrapErrImportFailed("the number of fields in the row is not equal to the header") @@ -131,8 +208,25 @@ func (r *rowParser) Parse(strArr []string) (Row, error) { dynamicValues := make(map[string]string) // read values from csv file for index, value := range strArr { - if field, ok := r.name2Field[r.header[index]]; ok { - data, err := r.parseEntity(field, value) + if subFieldsMap, ok := r.structArrays[r.header[index]]; ok { + values, err := r.reconstructArrayForStructArray(subFieldsMap, value) + if err != nil { + return nil, err + } + + for subKey, subValue := range values { + field, ok := r.name2Field[subKey] + if !ok { + return nil, merr.WrapErrImportFailed(fmt.Sprintf("field %s not found", subKey)) + } + data, err := r.parseEntity(field, subValue, false) + if err != nil { + return nil, err + } + row[field.GetFieldID()] = data + } + } else if field, ok := r.name2Field[r.header[index]]; ok { + data, err := r.parseEntity(field, value, false) if err != nil { return nil, err } @@ -160,7 +254,8 @@ func (r *rowParser) Parse(strArr []string) (Row, error) { row[fieldID] = data } } - if _, ok := row[fieldID]; !ok { + _, subField := r.structArraySubFields[fieldName] + if _, ok := row[fieldID]; !ok && !subField { return nil, merr.WrapErrImportFailed(fmt.Sprintf("value of field '%s' is missed", fieldName)) } } @@ -219,7 +314,7 @@ func (r *rowParser) combineDynamicRow(dynamicValues map[string]string, row Row) return nil } -func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string) (any, error) { +func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string, useElementType bool) (any, error) { if field.GetDefaultValue() != nil && obj == r.nullkey { return nullutil.GetDefaultValue(field) } @@ -229,7 +324,12 @@ func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string) (any, e return nil, nil } - switch field.GetDataType() { + dataType := field.GetDataType() + if useElementType { + dataType = field.GetElementType() + } + + switch dataType { case schemapb.DataType_Bool: b, err := strconv.ParseBool(obj) if err != nil { @@ -360,6 +460,26 @@ func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string) (any, e return nil, r.wrapDimError(len(vec), field) } return vec, nil + case schemapb.DataType_ArrayOfVector: + var vec []interface{} + desc := json.NewDecoder(strings.NewReader(obj)) + desc.UseNumber() + err := desc.Decode(&vec) + if err != nil { + return nil, r.wrapTypeError(obj, field) + } + maxCapacity, err := parameterutil.GetMaxCapacity(field) + if err != nil { + return nil, err + } + if err = common.CheckArrayCapacity(len(vec), maxCapacity, field); err != nil { + return nil, err + } + vectorFieldData, err := r.arrayOfVectorToFieldData(vec, field) + if err != nil { + return nil, err + } + return vectorFieldData, nil case schemapb.DataType_Array: var vec []interface{} desc := json.NewDecoder(strings.NewReader(obj)) @@ -542,6 +662,58 @@ func (r *rowParser) arrayToFieldData(arr []interface{}, field *schemapb.FieldSch } } +func (r *rowParser) arrayOfVectorToFieldData(vectors []any, field *schemapb.FieldSchema) (*schemapb.VectorField, error) { + elementType := field.GetElementType() + switch elementType { + case schemapb.DataType_FloatVector: + dim, err := typeutil.GetDim(field) + if err != nil { + return nil, err + } + values := make([]float32, 0, len(vectors)*int(dim)) + + for _, vectorAny := range vectors { + var vector []float32 + v, ok := vectorAny.([]interface{}) + if !ok { + return nil, r.wrapTypeError(vectorAny, field) + } + vector = make([]float32, len(v)) + for i, elem := range v { + value, ok := elem.(json.Number) + if !ok { + return nil, r.wrapArrayValueTypeError(elem, elementType) + } + num, err := strconv.ParseFloat(value.String(), 32) + if err != nil { + return nil, fmt.Errorf("failed to parse float: %w", err) + } + vector[i] = float32(num) + } + + if len(vector) != int(dim) { + return nil, r.wrapDimError(len(vector), field) + } + values = append(values, vector...) + } + + return &schemapb.VectorField{ + Dim: dim, + Data: &schemapb.VectorField_FloatVector{ + FloatVector: &schemapb.FloatArray{ + Data: values, + }, + }, + }, nil + + case schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector, schemapb.DataType_BinaryVector, + schemapb.DataType_Int8Vector, schemapb.DataType_SparseFloatVector: + return nil, merr.WrapErrImportFailed(fmt.Sprintf("not implemented element type for CSV: %s", elementType.String())) + default: + return nil, merr.WrapErrImportFailed(fmt.Sprintf("unsupported element type: %s", elementType.String())) + } +} + func (r *rowParser) wrapTypeError(v any, field *schemapb.FieldSchema) error { return merr.WrapErrImportFailed( fmt.Sprintf("expected type '%s' for field '%s', got type '%T' with value '%v'", diff --git a/internal/util/importutilv2/csv/row_parser_test.go b/internal/util/importutilv2/csv/row_parser_test.go index eb4d2a9386..10bba40b5d 100644 --- a/internal/util/importutilv2/csv/row_parser_test.go +++ b/internal/util/importutilv2/csv/row_parser_test.go @@ -66,6 +66,45 @@ func (suite *RowParserSuite) setSchema(autoID bool, hasNullable bool, hasDynamic } func (suite *RowParserSuite) createAllTypesSchema() *schemapb.CollectionSchema { + structArray := &schemapb.StructArrayFieldSchema{ + FieldID: 110, + Name: "struct_array", + Fields: []*schemapb.FieldSchema{ + { + FieldID: 111, + Name: "sub_float_vector", + DataType: schemapb.DataType_ArrayOfVector, + ElementType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: "2", + }, + { + Key: "max_capacity", + Value: "4", + }, + }, + }, + { + FieldID: 112, + Name: "sub_str", + DataType: schemapb.DataType_Array, + ElementType: schemapb.DataType_VarChar, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "max_capacity", + Value: "4", + }, + { + Key: "max_length", + Value: "8", + }, + }, + }, + }, + } + schema := &schemapb.CollectionSchema{ EnableDynamicField: suite.hasDynamic, Fields: []*schemapb.FieldSchema{ @@ -294,6 +333,7 @@ func (suite *RowParserSuite) createAllTypesSchema() *schemapb.CollectionSchema { Nullable: suite.hasNullable, }, }, + StructArrayFields: []*schemapb.StructArrayFieldSchema{structArray}, } if suite.hasDynamic { @@ -338,6 +378,8 @@ func (suite *RowParserSuite) genAllTypesRowData(resetKey string, resetVal string rawContent["json"] = "{\"a\": 1}" rawContent["x"] = "2" rawContent["$meta"] = "{\"dynamic\": \"dummy\"}" + rawContent["struct_array"] = "[{\"sub_float_vector\": \"[0.1, 0.2]\", \"sub_str\": \"hello1\"}, " + + "{\"sub_float_vector\": \"[0.3, 0.4]\", \"sub_str\": \"hello2\"}]" rawContent[resetKey] = resetVal // reset a value for _, deleteKey := range deleteKeys { @@ -555,11 +597,89 @@ func (suite *RowParserSuite) runValid(c *testCase) { default: continue } + case schemapb.DataType_ArrayOfVector: + // Handle ArrayOfVector validation + vf, _ := val.(*schemapb.VectorField) + if vf.GetFloatVector() != nil { + // Parse expected vectors from raw string + var expectedVectors [][]float32 + err := json.Unmarshal([]byte(rawVal), &expectedVectors) + suite.NoError(err) + + // Flatten expected vectors + var expectedFlat []float32 + for _, vec := range expectedVectors { + expectedFlat = append(expectedFlat, vec...) + } + + suite.Equal(expectedFlat, vf.GetFloatVector().GetData()) + } default: continue } } + // Validate struct array sub-fields + for _, structArray := range schema.GetStructArrayFields() { + // Check if struct_array was provided in the test data + if structArrayRaw, ok := c.content[structArray.GetName()]; ok { + // Parse the struct array JSON + var structArrayData []map[string]any + dec := json.NewDecoder(strings.NewReader(structArrayRaw)) + dec.UseNumber() + err := dec.Decode(&structArrayData) + suite.NoError(err) + + // For each sub-field in the struct array + for _, subField := range structArray.GetFields() { + val, ok := row[subField.GetFieldID()] + suite.True(ok, "Sub-field %s should exist in row", subField.GetName()) + + // Validate based on sub-field type + switch subField.GetDataType() { + case schemapb.DataType_ArrayOfVector: + vf, ok := val.(*schemapb.VectorField) + suite.True(ok, "Sub-field %s should be a VectorField", subField.GetName()) + + // Extract expected vectors from struct array data + var expectedVectors [][]float32 + for _, elem := range structArrayData { + if vecStr, ok := elem[subField.GetName()].(string); ok { + var vec []float32 + err := json.Unmarshal([]byte(vecStr), &vec) + suite.NoError(err) + expectedVectors = append(expectedVectors, vec) + } + } + + // Flatten and compare + var expectedFlat []float32 + for _, vec := range expectedVectors { + expectedFlat = append(expectedFlat, vec...) + } + suite.Equal(expectedFlat, vf.GetFloatVector().GetData()) + + case schemapb.DataType_Array: + sf, ok := val.(*schemapb.ScalarField) + suite.True(ok, "Sub-field %s should be a ScalarField", subField.GetName()) + + // Extract expected values from struct array data + var expectedValues []string + for _, elem := range structArrayData { + if v, ok := elem[subField.GetName()].(string); ok { + expectedValues = append(expectedValues, v) + } + } + + // Compare based on element type + if subField.GetElementType() == schemapb.DataType_VarChar { + suite.Equal(expectedValues, sf.GetStringData().GetData()) + } + } + } + } + } + if suite.hasDynamic { val, ok := row[9999] suite.True(ok) @@ -597,6 +717,9 @@ func (suite *RowParserSuite) TestValid() { suite.runValid(&testCase{name: "A/N/D nullable field json is nil", content: suite.genAllTypesRowData("json", suite.nullKey)}) suite.runValid(&testCase{name: "A/N/D nullable field array_int8 is nil", content: suite.genAllTypesRowData("array_int8", suite.nullKey)}) + // Test struct array parsing + suite.runValid(&testCase{name: "A/N/D struct array valid", content: suite.genAllTypesRowData("x", "2")}) + suite.nullKey = "ABCDEF" suite.runValid(&testCase{name: "A/N/D null key 1", content: suite.genAllTypesRowData("int64", suite.nullKey)}) suite.runValid(&testCase{name: "A/N/D null key 2", content: suite.genAllTypesRowData("double", suite.nullKey)}) diff --git a/internal/util/importutilv2/json/row_parser.go b/internal/util/importutilv2/json/row_parser.go index 53640a1ceb..011d7d974d 100644 --- a/internal/util/importutilv2/json/row_parser.go +++ b/internal/util/importutilv2/json/row_parser.go @@ -42,10 +42,14 @@ type rowParser struct { pkField *schemapb.FieldSchema dynamicField *schemapb.FieldSchema functionOutputFields map[string]int64 + + structArrays map[string]interface{} } func NewRowParser(schema *schemapb.CollectionSchema) (RowParser, error) { - id2Field := lo.KeyBy(schema.GetFields(), func(field *schemapb.FieldSchema) int64 { + allFields := typeutil.GetAllFieldSchemas(schema) + + id2Field := lo.KeyBy(allFields, func(field *schemapb.FieldSchema) int64 { return field.GetFieldID() }) @@ -71,7 +75,7 @@ func NewRowParser(schema *schemapb.CollectionSchema) (RowParser, error) { dynamicField := typeutil.GetDynamicField(schema) name2FieldID := lo.SliceToMap( - lo.Filter(schema.GetFields(), func(field *schemapb.FieldSchema, _ int) bool { + lo.Filter(allFields, func(field *schemapb.FieldSchema, _ int) bool { return !field.GetIsFunctionOutput() && !typeutil.IsAutoPKField(field) && field.GetName() != dynamicField.GetName() }), func(field *schemapb.FieldSchema) (string, int64) { @@ -79,6 +83,13 @@ func NewRowParser(schema *schemapb.CollectionSchema) (RowParser, error) { }, ) + sturctArrays := lo.SliceToMap( + schema.GetStructArrayFields(), + func(sa *schemapb.StructArrayFieldSchema) (string, interface{}) { + return sa.GetName(), nil + }, + ) + return &rowParser{ id2Dim: id2Dim, id2Field: id2Field, @@ -86,6 +97,7 @@ func NewRowParser(schema *schemapb.CollectionSchema) (RowParser, error) { pkField: pkField, dynamicField: dynamicField, functionOutputFields: functionOutputFields, + structArrays: sturctArrays, }, nil } @@ -109,6 +121,42 @@ func (r *rowParser) wrapArrayValueTypeError(v any, eleType schemapb.DataType) er eleType.String(), v, v)) } +// StructArray are passed in with format for one row: StructArray: [element 1, element 2, ...] +// where each element contains one value of all sub-fields in StructArrayField. +// So we need to reconstruct it to be handled by handleField. +// +// For example, let StructArrayFieldSchema { sub-field1: array of int32, sub-field2: array of float vector } +// When we have one row: +// +// [{"sub-field1": 1, "sub-field2": [1.0, 2.0]}, {"sub-field1": 2, "sub-field2": [3.0, 4.0]}], +// +// we reconstruct it to be handled by handleField as: +// +// {"sub-field1": [1, 2], "sub-field2": [[1.0, 2.0], [3.0, 4.0]]} +func reconstructArrayForStructArray(raw any) (map[string]any, error) { + rows, ok := raw.([]any) + if !ok { + return nil, merr.WrapErrImportFailed(fmt.Sprintf("invalid StructArray format in JSON, each row should be a key-value map, but got type %T", raw)) + } + + buf := make(map[string][]any) + for _, elem := range rows { + row, ok := elem.(map[string]any) + if !ok { + return nil, merr.WrapErrImportFailed(fmt.Sprintf("invalid element in StructArray, expect map[string]any but got type %T", elem)) + } + for key, value := range row { + buf[key] = append(buf[key], value) + } + } + + out := make(map[string]any, len(buf)) + for k, v := range buf { + out[k] = v + } + return out, nil +} + func (r *rowParser) Parse(raw any) (Row, error) { stringMap, ok := raw.(map[string]any) if !ok { @@ -122,16 +170,12 @@ func (r *rowParser) Parse(raw any) (Row, error) { row := make(Row) dynamicValues := make(map[string]any) - // read values from json file - for key, value := range stringMap { - if _, ok := r.functionOutputFields[key]; ok { - return nil, merr.WrapErrImportFailed(fmt.Sprintf("the field '%s' is output by function, no need to provide", key)) - } + handleField := func(key string, value any) error { if fieldID, ok := r.name2FieldID[key]; ok { data, err := r.parseEntity(fieldID, value) if err != nil { - return nil, err + return err } row[fieldID] = data } else if r.dynamicField != nil { @@ -139,7 +183,33 @@ func (r *rowParser) Parse(raw any) (Row, error) { dynamicValues[key] = value } else { // from v2.6, we don't intend to return error for redundant fields, just skip it - continue + return nil + } + + return nil + } + + // read values from json file + for key, value := range stringMap { + if _, ok := r.functionOutputFields[key]; ok { + return nil, merr.WrapErrImportFailed(fmt.Sprintf("the field '%s' is output by function, no need to provide", key)) + } + + if _, ok := r.structArrays[key]; ok { + values, err := reconstructArrayForStructArray(value) + if err != nil { + return nil, err + } + + for subKey, subValue := range values { + if err := handleField(subKey, subValue); err != nil { + return nil, err + } + } + } else { + if err := handleField(key, value); err != nil { + return nil, err + } } } @@ -474,6 +544,24 @@ func (r *rowParser) parseEntity(fieldID int64, obj any) (any, error) { return nil, err } return scalarFieldData, nil + case schemapb.DataType_ArrayOfVector: + arr, ok := obj.([]interface{}) + if !ok { + return nil, r.wrapTypeError(obj, fieldID) + } + maxCapacity, err := parameterutil.GetMaxCapacity(field) + if err != nil { + return nil, err + } + if err = common.CheckArrayCapacity(len(arr), maxCapacity, field); err != nil { + return nil, err + } + vectorFieldData, err := r.arrayOfVectorToFieldData(arr, field) + if err != nil { + return nil, err + } + return vectorFieldData, nil + default: return nil, merr.WrapErrImportFailed( fmt.Sprintf("parse json failed, unsupport data type: %s", @@ -634,3 +722,44 @@ func (r *rowParser) arrayToFieldData(arr []interface{}, field *schemapb.FieldSch fmt.Sprintf("parse json failed, unsupported array data type '%s'", eleType.String())) } } + +func (r *rowParser) arrayOfVectorToFieldData(vectors []any, field *schemapb.FieldSchema) (*schemapb.VectorField, error) { + elementType := field.GetElementType() + fieldID := field.GetFieldID() + switch elementType { + case schemapb.DataType_FloatVector: + values := make([]float32, 0, len(vectors)*10) + dim := r.id2Dim[fieldID] + for _, vectorAny := range vectors { + vector, ok := vectorAny.([]any) + if !ok { + return nil, merr.WrapErrImportFailed(fmt.Sprintf("expected slice as vector, but got %T", vectorAny)) + } + for _, v := range vector { + value, ok := v.(json.Number) + if !ok { + return nil, r.wrapTypeError(value, fieldID) + } + num, err := strconv.ParseFloat(value.String(), 32) + if err != nil { + return nil, err + } + values = append(values, float32(num)) + } + } + return &schemapb.VectorField{ + Dim: int64(dim), + Data: &schemapb.VectorField_FloatVector{ + FloatVector: &schemapb.FloatArray{ + Data: values, + }, + }, + }, nil + + case schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector, schemapb.DataType_BinaryVector, + schemapb.DataType_Int8Vector, schemapb.DataType_SparseFloatVector: + return nil, merr.WrapErrImportFailed(fmt.Sprintf("not implemented element type: %s", elementType.String())) + default: + return nil, merr.WrapErrImportFailed(fmt.Sprintf("unsupported element type: %s", elementType.String())) + } +} diff --git a/internal/util/importutilv2/json/row_parser_test.go b/internal/util/importutilv2/json/row_parser_test.go index d070746136..1a0526575f 100644 --- a/internal/util/importutilv2/json/row_parser_test.go +++ b/internal/util/importutilv2/json/row_parser_test.go @@ -57,6 +57,45 @@ func (suite *RowParserSuite) SetupTest() { } func (suite *RowParserSuite) createAllTypesSchema() *schemapb.CollectionSchema { + structArray := &schemapb.StructArrayFieldSchema{ + FieldID: 110, + Name: "struct_array", + Fields: []*schemapb.FieldSchema{ + { + FieldID: 111, + Name: "sub_float_vector", + DataType: schemapb.DataType_ArrayOfVector, + ElementType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: "2", + }, + { + Key: "max_capacity", + Value: "4", + }, + }, + }, + { + FieldID: 112, + Name: "sub_str", + DataType: schemapb.DataType_Array, + ElementType: schemapb.DataType_VarChar, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "max_capacity", + Value: "4", + }, + { + Key: "max_length", + Value: "8", + }, + }, + }, + }, + } + schema := &schemapb.CollectionSchema{ EnableDynamicField: suite.hasDynamic, Fields: []*schemapb.FieldSchema{ @@ -285,6 +324,7 @@ func (suite *RowParserSuite) createAllTypesSchema() *schemapb.CollectionSchema { Nullable: suite.hasNullable, }, }, + StructArrayFields: []*schemapb.StructArrayFieldSchema{structArray}, } if suite.hasDynamic { @@ -329,6 +369,18 @@ func (suite *RowParserSuite) genAllTypesRowData(resetKey string, resetVal any, d rawContent["json"] = map[string]any{"a": 1} rawContent["x"] = 6 rawContent["$meta"] = map[string]any{"dynamic": "dummy"} + rawContent["struct_array"] = []any{ + // struct array element 1 + map[string]any{ + "sub_float_vector": []float32{0.1, 0.2}, + "sub_str": "hello1", + }, + // struct array element 2 + map[string]any{ + "sub_float_vector": []float32{0.3, 0.4}, + "sub_str": "hello2", + }, + } rawContent[resetKey] = resetVal // reset a value for _, deleteKey := range deleteKeys { diff --git a/internal/util/importutilv2/parquet/field_reader.go b/internal/util/importutilv2/parquet/field_reader.go index fa13f3675e..b2452f07b0 100644 --- a/internal/util/importutilv2/parquet/field_reader.go +++ b/internal/util/importutilv2/parquet/field_reader.go @@ -206,6 +206,12 @@ func (c *FieldReader) Next(count int64) (any, any, error) { } data, err := ReadArrayData(c, count) return data, nil, err + case schemapb.DataType_ArrayOfVector: + if c.field.GetNullable() { + return nil, nil, merr.WrapErrParameterInvalidMsg("not support nullable in vector") + } + data, err := ReadVectorArrayData(c, count) + return data, nil, err default: return nil, nil, merr.WrapErrImportFailed(fmt.Sprintf("unsupported data type '%s' for field '%s'", c.field.GetDataType().String(), c.field.GetName())) @@ -1768,3 +1774,72 @@ func ReadNullableArrayData(pcr *FieldReader, count int64) (any, []bool, error) { elementType.String(), pcr.field.GetName())) } } + +func ReadVectorArrayData(pcr *FieldReader, count int64) (any, error) { + data := make([]*schemapb.VectorField, 0, count) + maxCapacity, err := parameterutil.GetMaxCapacity(pcr.field) + if err != nil { + return nil, err + } + + dim, err := typeutil.GetDim(pcr.field) + if err != nil { + return nil, err + } + + chunked, err := pcr.columnReader.NextBatch(count) + if err != nil { + return nil, err + } + + if chunked == nil { + return nil, nil + } + + elementType := pcr.field.GetElementType() + switch elementType { + case schemapb.DataType_FloatVector: + for _, chunk := range chunked.Chunks() { + if chunk.NullN() > 0 { + return nil, WrapNullRowErr(pcr.field) + } + listReader, ok := chunk.(*array.List) + if !ok { + return nil, WrapTypeErr(pcr.field, chunk.DataType().Name()) + } + listFloat32Reader, ok := listReader.ListValues().(*array.Float32) + if !ok { + return nil, WrapTypeErr(pcr.field, chunk.DataType().Name()) + } + offsets := listReader.Offsets() + for i := 1; i < len(offsets); i++ { + start, end := offsets[i-1], offsets[i] + floatCount := end - start + if floatCount%int32(dim) != 0 { + return nil, merr.WrapErrImportFailed(fmt.Sprintf("vectors in VectorArray should be aligned with dim: %d", dim)) + } + + arrLength := floatCount / int32(dim) + if err = common.CheckArrayCapacity(int(arrLength), maxCapacity, pcr.field); err != nil { + return nil, err + } + + arrData := make([]float32, floatCount) + copy(arrData, listFloat32Reader.Float32Values()[start:end]) + data = append(data, &schemapb.VectorField{ + Dim: dim, + Data: &schemapb.VectorField_FloatVector{ + FloatVector: &schemapb.FloatArray{ + Data: arrData, + }, + }, + }) + } + } + default: + return nil, merr.WrapErrImportFailed(fmt.Sprintf("unsupported data type '%s' for vector field '%s'", + elementType.String(), pcr.field.GetName())) + } + + return data, nil +} diff --git a/internal/util/importutilv2/parquet/reader.go b/internal/util/importutilv2/parquet/reader.go index ed07656e89..b4eadbdcac 100644 --- a/internal/util/importutilv2/parquet/reader.go +++ b/internal/util/importutilv2/parquet/reader.go @@ -33,6 +33,7 @@ import ( "github.com/milvus-io/milvus/internal/util/importutilv2/common" "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/util/merr" + "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) const totalReadBufferSize = int64(64 * 1024 * 1024) @@ -59,10 +60,11 @@ func NewReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.Co return nil, err } + allFields := typeutil.GetAllFieldSchemas(schema) // Each ColumnReader consumes ReaderProperties.BufferSize memory independently. // Therefore, the bufferSize should be divided by the number of columns // to ensure total memory usage stays within the intended limit. - columnReaderBufferSize := totalReadBufferSize / int64(len(schema.GetFields())) + columnReaderBufferSize := totalReadBufferSize / int64(len(allFields)) r, err := file.NewParquetReader(cmReader, file.WithReadProps(&parquet.ReaderProperties{ BufferSize: columnReaderBufferSize, diff --git a/internal/util/importutilv2/parquet/reader_test.go b/internal/util/importutilv2/parquet/reader_test.go index 1f371e284a..9ebf5b7a9b 100644 --- a/internal/util/importutilv2/parquet/reader_test.go +++ b/internal/util/importutilv2/parquet/reader_test.go @@ -635,6 +635,120 @@ func TestParquetReader(t *testing.T) { suite.Run(t, new(ReaderSuite)) } +func TestParquetReaderWithStructArray(t *testing.T) { + ctx := context.Background() + + t.Run("test struct array field reading", func(t *testing.T) { + // Create schema with StructArrayField + schema := &schemapb.CollectionSchema{ + Name: "test_struct_array", + Fields: []*schemapb.FieldSchema{ + { + FieldID: 100, + Name: "id", + IsPrimaryKey: true, + DataType: schemapb.DataType_Int64, + }, + { + FieldID: 101, + Name: "varchar_field", + DataType: schemapb.DataType_VarChar, + TypeParams: []*commonpb.KeyValuePair{ + {Key: common.MaxLengthKey, Value: "100"}, + }, + }, + }, + StructArrayFields: []*schemapb.StructArrayFieldSchema{ + { + FieldID: 200, + Name: "struct_array", + Fields: []*schemapb.FieldSchema{ + { + FieldID: 201, + Name: "int_array", + DataType: schemapb.DataType_Array, + ElementType: schemapb.DataType_Int32, + TypeParams: []*commonpb.KeyValuePair{ + {Key: common.MaxCapacityKey, Value: "20"}, + }, + }, + { + FieldID: 202, + Name: "float_array", + DataType: schemapb.DataType_Array, + ElementType: schemapb.DataType_Float, + TypeParams: []*commonpb.KeyValuePair{ + {Key: common.MaxCapacityKey, Value: "20"}, + }, + }, + { + FieldID: 203, + Name: "vector_array", + DataType: schemapb.DataType_ArrayOfVector, + ElementType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "4"}, + {Key: common.MaxCapacityKey, Value: "20"}, + }, + }, + }, + }, + }, + } + + // Create test data file + filePath := fmt.Sprintf("/tmp/test_struct_array_%d.parquet", rand.Int()) + defer os.Remove(filePath) + + numRows := 10 + f, err := os.Create(filePath) + assert.NoError(t, err) + + // Use writeParquet to create test file + insertData, err := writeParquet(f, schema, numRows, 0) + assert.NoError(t, err) + f.Close() + + // Verify the insert data contains struct fields + assert.Contains(t, insertData.Data, int64(201)) // int_array field + assert.Contains(t, insertData.Data, int64(202)) // float_array field + assert.Contains(t, insertData.Data, int64(203)) // vector_array field + + // Now test reading the file using ChunkManager + factory := storage.NewChunkManagerFactory("local", objectstorage.RootPath("/tmp")) + cm, err := factory.NewPersistentStorageChunkManager(ctx) + assert.NoError(t, err) + + reader, err := NewReader(ctx, cm, schema, filePath, 64*1024*1024) + assert.NoError(t, err) + defer reader.Close() + + // Read data + readData, err := reader.Read() + assert.NoError(t, err) + assert.NotNil(t, readData) + + // Verify the data includes struct fields + assert.Contains(t, readData.Data, int64(201)) // int_array field ID + assert.Contains(t, readData.Data, int64(202)) // float_array field ID + assert.Contains(t, readData.Data, int64(203)) // vector_array field ID + + // Check row count matches + assert.Equal(t, numRows, readData.Data[100].RowNum()) // id field + assert.Equal(t, numRows, readData.Data[101].RowNum()) // varchar_field + assert.Equal(t, numRows, readData.Data[201].RowNum()) // int_array + assert.Equal(t, numRows, readData.Data[202].RowNum()) // float_array + assert.Equal(t, numRows, readData.Data[203].RowNum()) // vector_array + + // Verify data content matches + for fieldID, originalData := range insertData.Data { + readFieldData, ok := readData.Data[fieldID] + assert.True(t, ok, "field %d not found in read data", fieldID) + assert.Equal(t, originalData.RowNum(), readFieldData.RowNum(), "row count mismatch for field %d", fieldID) + } + }) +} + func TestParquetReaderError(t *testing.T) { ctx := context.Background() cm := mocks.NewChunkManager(t) diff --git a/internal/util/importutilv2/parquet/util.go b/internal/util/importutilv2/parquet/util.go index c13be6bdd1..cf0f9a74f5 100644 --- a/internal/util/importutilv2/parquet/util.go +++ b/internal/util/importutilv2/parquet/util.go @@ -26,6 +26,7 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" @@ -65,7 +66,9 @@ func WrapNullElementErr(field *schemapb.FieldSchema) error { } func CreateFieldReaders(ctx context.Context, fileReader *pqarrow.FileReader, schema *schemapb.CollectionSchema) (map[int64]*FieldReader, error) { - nameToField := lo.KeyBy(schema.GetFields(), func(field *schemapb.FieldSchema) string { + // Create map for all fields including sub-fields from StructArrayFields + allFields := typeutil.GetAllFieldSchemas(schema) + nameToField := lo.KeyBy(allFields, func(field *schemapb.FieldSchema) string { return field.GetName() }) @@ -285,6 +288,19 @@ func convertToArrowDataType(field *schemapb.FieldSchema, isArray bool) (arrow.Da Nullable: true, Metadata: arrow.Metadata{}, }), nil + case schemapb.DataType_ArrayOfVector: + // VectorArrayToArrowType now returns the element type (e.g., float32) + // We wrap it in a single list to get list (flattened) + elemType, err := storage.VectorArrayToArrowType(field.GetElementType()) + if err != nil { + return nil, err + } + return arrow.ListOfField(arrow.Field{ + Name: "item", + Type: elemType, + Nullable: true, + Metadata: arrow.Metadata{}, + }), nil default: return nil, merr.WrapErrParameterInvalidMsg("unsupported data type %v", dataType.String()) } @@ -293,8 +309,11 @@ func convertToArrowDataType(field *schemapb.FieldSchema, isArray bool) (arrow.Da // This method is used only by import util and related tests. Returned arrow.Schema // doesn't include function output fields. func ConvertToArrowSchemaForUT(schema *schemapb.CollectionSchema, useNullType bool) (*arrow.Schema, error) { - arrFields := make([]arrow.Field, 0) - for _, field := range schema.GetFields() { + // Get all fields including struct sub-fields + allFields := typeutil.GetAllFieldSchemas(schema) + arrFields := make([]arrow.Field, 0, len(allFields)) + + for _, field := range allFields { if typeutil.IsAutoPKField(field) || field.GetIsFunctionOutput() { continue } @@ -321,10 +340,15 @@ func ConvertToArrowSchemaForUT(schema *schemapb.CollectionSchema, useNullType bo } func isSchemaEqual(schema *schemapb.CollectionSchema, arrSchema *arrow.Schema) error { + // Get all fields including struct sub-fields + allFields := typeutil.GetAllFieldSchemas(schema) + arrNameToField := lo.KeyBy(arrSchema.Fields(), func(field arrow.Field) string { return field.Name }) - for _, field := range schema.GetFields() { + + // Check all fields (including struct sub-fields which are stored as separate columns) + for _, field := range allFields { // ignore autoPKField and functionOutputField if typeutil.IsAutoPKField(field) || field.GetIsFunctionOutput() { continue diff --git a/internal/util/testutil/test_util.go b/internal/util/testutil/test_util.go index c214997e9b..dd0ca9bd55 100644 --- a/internal/util/testutil/test_util.go +++ b/internal/util/testutil/test_util.go @@ -109,7 +109,8 @@ func CreateInsertData(schema *schemapb.CollectionSchema, rows int, nullPercent . if err != nil { return nil, err } - for _, f := range schema.GetFields() { + allFields := typeutil.GetAllFieldSchemas(schema) + for _, f := range allFields { if f.GetAutoID() || f.IsFunctionOutput { continue } @@ -200,6 +201,18 @@ func CreateInsertData(schema *schemapb.CollectionSchema, rows int, nullPercent . case schemapb.DataType_String, schemapb.DataType_VarChar: insertData.Data[f.FieldID].AppendDataRows(testutils.GenerateArrayOfStringArray(rows)) } + case schemapb.DataType_ArrayOfVector: + dim, err := typeutil.GetDim(f) + if err != nil { + return nil, err + } + switch f.GetElementType() { + case schemapb.DataType_FloatVector: + insertData.Data[f.FieldID].AppendDataRows(testutils.GenerateArrayOfFloatVectorArray(rows, int(dim))) + default: + panic(fmt.Sprintf("unimplemented data type: %s", f.GetElementType().String())) + } + default: panic(fmt.Sprintf("unsupported data type: %s", f.GetDataType().String())) } @@ -401,11 +414,15 @@ func BuildSparseVectorData(mem *memory.GoAllocator, contents [][]byte, arrowType func BuildArrayData(schema *schemapb.CollectionSchema, insertData *storage.InsertData, useNullType bool) ([]arrow.Array, error) { mem := memory.NewGoAllocator() - columns := make([]arrow.Array, 0, len(schema.Fields)) - for _, field := range schema.Fields { - if field.GetIsPrimaryKey() && field.GetAutoID() || field.GetIsFunctionOutput() { - continue - } + // Get all fields including struct sub-fields + allFields := typeutil.GetAllFieldSchemas(schema) + // Filter out auto-generated and function output fields + fields := lo.Filter(allFields, func(field *schemapb.FieldSchema, _ int) bool { + return !(field.GetIsPrimaryKey() && field.GetAutoID()) && !field.GetIsFunctionOutput() + }) + + columns := make([]arrow.Array, 0, len(fields)) + for _, field := range fields { fieldID := field.GetFieldID() dataType := field.GetDataType() elementType := field.GetElementType() @@ -705,22 +722,264 @@ func BuildArrayData(schema *schemapb.CollectionSchema, insertData *storage.Inser builder.AppendValues(offsets, valid) columns = append(columns, builder.NewListArray()) } + case schemapb.DataType_ArrayOfVector: + data := insertData.Data[fieldID].(*storage.VectorArrayFieldData).Data + rows := len(data) + + switch elementType { + case schemapb.DataType_FloatVector: + // ArrayOfVector is flattened in Arrow - just a list of floats + // where total floats = dim * num_vectors + builder := array.NewListBuilder(mem, &arrow.Float32Type{}) + valueBuilder := builder.ValueBuilder().(*array.Float32Builder) + + for i := 0; i < rows; i++ { + vectorArray := data[i].GetFloatVector() + if vectorArray == nil || len(vectorArray.GetData()) == 0 { + builder.AppendNull() + continue + } + builder.Append(true) + // Append all flattened vector data + valueBuilder.AppendValues(vectorArray.GetData(), nil) + } + columns = append(columns, builder.NewListArray()) + default: + return nil, fmt.Errorf("unsupported element type in VectorArray: %s", elementType.String()) + } } } return columns, nil } +// reconstructStructArrayForJSON reconstructs struct array data for JSON format +// Returns an array of maps where each element represents a struct +func reconstructStructArrayForJSON(structField *schemapb.StructArrayFieldSchema, insertData *storage.InsertData, rowIndex int) ([]map[string]any, error) { + subFields := structField.GetFields() + if len(subFields) == 0 { + return []map[string]any{}, nil + } + + // Determine the array length from the first sub-field's data + var arrayLen int + for _, subField := range subFields { + if fieldData, ok := insertData.Data[subField.GetFieldID()]; ok { + rowData := fieldData.GetRow(rowIndex) + if rowData == nil { + continue + } + + switch subField.GetDataType() { + case schemapb.DataType_Array: + if scalarField, ok := rowData.(*schemapb.ScalarField); ok { + switch subField.GetElementType() { + case schemapb.DataType_Bool: + if data := scalarField.GetBoolData(); data != nil { + arrayLen = len(data.GetData()) + } + case schemapb.DataType_Int8, schemapb.DataType_Int16, schemapb.DataType_Int32: + if data := scalarField.GetIntData(); data != nil { + arrayLen = len(data.GetData()) + } + case schemapb.DataType_Int64: + if data := scalarField.GetLongData(); data != nil { + arrayLen = len(data.GetData()) + } + case schemapb.DataType_Float: + if data := scalarField.GetFloatData(); data != nil { + arrayLen = len(data.GetData()) + } + case schemapb.DataType_Double: + if data := scalarField.GetDoubleData(); data != nil { + arrayLen = len(data.GetData()) + } + case schemapb.DataType_String, schemapb.DataType_VarChar: + if data := scalarField.GetStringData(); data != nil { + arrayLen = len(data.GetData()) + } + } + } + case schemapb.DataType_ArrayOfVector: + if vectorField, ok := rowData.(*schemapb.VectorField); ok { + switch subField.GetElementType() { + case schemapb.DataType_FloatVector: + if data := vectorField.GetFloatVector(); data != nil { + dim, _ := typeutil.GetDim(subField) + if dim > 0 { + arrayLen = len(data.GetData()) / int(dim) + } + } + case schemapb.DataType_BinaryVector: + if data := vectorField.GetBinaryVector(); data != nil { + dim, _ := typeutil.GetDim(subField) + if dim > 0 { + bytesPerVector := int(dim) / 8 + arrayLen = len(data) / bytesPerVector + } + } + case schemapb.DataType_Float16Vector: + if data := vectorField.GetFloat16Vector(); data != nil { + dim, _ := typeutil.GetDim(subField) + if dim > 0 { + bytesPerVector := int(dim) * 2 + arrayLen = len(data) / bytesPerVector + } + } + case schemapb.DataType_BFloat16Vector: + if data := vectorField.GetBfloat16Vector(); data != nil { + dim, _ := typeutil.GetDim(subField) + if dim > 0 { + bytesPerVector := int(dim) * 2 + arrayLen = len(data) / bytesPerVector + } + } + } + } + } + + if arrayLen > 0 { + break + } + } + } + + // Build the struct array + structArray := make([]map[string]any, arrayLen) + for j := 0; j < arrayLen; j++ { + structElem := make(map[string]any) + + for _, subField := range subFields { + if fieldData, ok := insertData.Data[subField.GetFieldID()]; ok { + rowData := fieldData.GetRow(rowIndex) + if rowData == nil { + continue + } + + // Extract the j-th element + switch subField.GetDataType() { + case schemapb.DataType_Array: + if scalarField, ok := rowData.(*schemapb.ScalarField); ok { + switch subField.GetElementType() { + case schemapb.DataType_Bool: + if data := scalarField.GetBoolData(); data != nil && j < len(data.GetData()) { + structElem[subField.GetName()] = data.GetData()[j] + } + case schemapb.DataType_Int8, schemapb.DataType_Int16, schemapb.DataType_Int32: + if data := scalarField.GetIntData(); data != nil && j < len(data.GetData()) { + structElem[subField.GetName()] = data.GetData()[j] + } + case schemapb.DataType_Int64: + if data := scalarField.GetLongData(); data != nil && j < len(data.GetData()) { + structElem[subField.GetName()] = data.GetData()[j] + } + case schemapb.DataType_Float: + if data := scalarField.GetFloatData(); data != nil && j < len(data.GetData()) { + structElem[subField.GetName()] = data.GetData()[j] + } + case schemapb.DataType_Double: + if data := scalarField.GetDoubleData(); data != nil && j < len(data.GetData()) { + structElem[subField.GetName()] = data.GetData()[j] + } + case schemapb.DataType_String, schemapb.DataType_VarChar: + if data := scalarField.GetStringData(); data != nil && j < len(data.GetData()) { + structElem[subField.GetName()] = data.GetData()[j] + } + } + } + case schemapb.DataType_ArrayOfVector: + if vectorField, ok := rowData.(*schemapb.VectorField); ok { + switch subField.GetElementType() { + case schemapb.DataType_FloatVector: + if data := vectorField.GetFloatVector(); data != nil { + dim, _ := typeutil.GetDim(subField) + if dim > 0 { + startIdx := j * int(dim) + endIdx := startIdx + int(dim) + if endIdx <= len(data.GetData()) { + structElem[subField.GetName()] = data.GetData()[startIdx:endIdx] + } + } + } + case schemapb.DataType_BinaryVector: + if data := vectorField.GetBinaryVector(); data != nil { + dim, _ := typeutil.GetDim(subField) + if dim > 0 { + bytesPerVector := int(dim) / 8 + startIdx := j * bytesPerVector + endIdx := startIdx + bytesPerVector + if endIdx <= len(data) { + structElem[subField.GetName()] = data[startIdx:endIdx] + } + } + } + case schemapb.DataType_Float16Vector: + if data := vectorField.GetFloat16Vector(); data != nil { + dim, _ := typeutil.GetDim(subField) + if dim > 0 { + bytesPerVector := int(dim) * 2 + startIdx := j * bytesPerVector + endIdx := startIdx + bytesPerVector + if endIdx <= len(data) { + // Convert Float16 bytes to float32 for JSON representation + structElem[subField.GetName()] = typeutil.Float16BytesToFloat32Vector(data[startIdx:endIdx]) + } + } + } + case schemapb.DataType_BFloat16Vector: + if data := vectorField.GetBfloat16Vector(); data != nil { + dim, _ := typeutil.GetDim(subField) + if dim > 0 { + bytesPerVector := int(dim) * 2 + startIdx := j * bytesPerVector + endIdx := startIdx + bytesPerVector + if endIdx <= len(data) { + // Convert BFloat16 bytes to float32 for JSON representation + structElem[subField.GetName()] = typeutil.BFloat16BytesToFloat32Vector(data[startIdx:endIdx]) + } + } + } + } + } + } + } + } + + structArray[j] = structElem + } + + return structArray, nil +} + func CreateInsertDataRowsForJSON(schema *schemapb.CollectionSchema, insertData *storage.InsertData) ([]map[string]any, error) { fieldIDToField := lo.KeyBy(schema.GetFields(), func(field *schemapb.FieldSchema) int64 { return field.GetFieldID() }) + // Track which field IDs belong to struct array sub-fields + structSubFieldIDs := make(map[int64]bool) + for _, structField := range schema.GetStructArrayFields() { + for _, subField := range structField.GetFields() { + structSubFieldIDs[subField.GetFieldID()] = true + } + } + rowNum := insertData.GetRowNum() rows := make([]map[string]any, 0, rowNum) for i := 0; i < rowNum; i++ { data := make(map[int64]interface{}) + + // First process regular fields for fieldID, v := range insertData.Data { - field := fieldIDToField[fieldID] + // Skip if this is a sub-field of a struct array + if structSubFieldIDs[fieldID] { + continue + } + + field, ok := fieldIDToField[fieldID] + if !ok { + continue + } + dataType := field.GetDataType() elemType := field.GetElementType() if field.GetAutoID() || field.IsFunctionOutput { @@ -746,6 +1005,8 @@ func CreateInsertDataRowsForJSON(schema *schemapb.CollectionSchema, insertData * case schemapb.DataType_String, schemapb.DataType_VarChar: data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetStringData().GetData() } + case schemapb.DataType_ArrayOfVector: + panic("unreachable") case schemapb.DataType_JSON: data[fieldID] = string(v.GetRow(i).([]byte)) case schemapb.DataType_BinaryVector: @@ -768,33 +1029,117 @@ func CreateInsertDataRowsForJSON(schema *schemapb.CollectionSchema, insertData * data[fieldID] = v.GetRow(i) } } - row := lo.MapKeys(data, func(_ any, fieldID int64) string { - return fieldIDToField[fieldID].GetName() - }) + + // Now process struct array fields - reconstruct the nested structure + for _, structField := range schema.GetStructArrayFields() { + structArray, err := reconstructStructArrayForJSON(structField, insertData, i) + if err != nil { + return nil, err + } + data[structField.GetFieldID()] = structArray + } + + // Convert field IDs to field names + row := make(map[string]any) + for fieldID, value := range data { + if field, ok := fieldIDToField[fieldID]; ok { + row[field.GetName()] = value + } else { + // Check if it's a struct array field + for _, structField := range schema.GetStructArrayFields() { + if structField.GetFieldID() == fieldID { + row[structField.GetName()] = value + break + } + } + } + } rows = append(rows, row) } return rows, nil } +// reconstructStructArrayForCSV reconstructs struct array data for CSV format +// Returns a JSON string where each sub-field value is also a JSON string +func reconstructStructArrayForCSV(structField *schemapb.StructArrayFieldSchema, insertData *storage.InsertData, rowIndex int) (string, error) { + // Use the JSON reconstruction function to get the struct array + structArray, err := reconstructStructArrayForJSON(structField, insertData, rowIndex) + if err != nil { + return "", err + } + + // Convert to CSV format: each sub-field value needs to be JSON-encoded + csvArray := make([]map[string]string, len(structArray)) + for i, elem := range structArray { + csvElem := make(map[string]string) + for key, value := range elem { + // Convert each value to JSON string for CSV + jsonBytes, err := json.Marshal(value) + if err != nil { + return "", err + } + csvElem[key] = string(jsonBytes) + } + csvArray[i] = csvElem + } + + // Convert the entire struct array to JSON string + jsonBytes, err := json.Marshal(csvArray) + if err != nil { + return "", err + } + return string(jsonBytes), nil +} + func CreateInsertDataForCSV(schema *schemapb.CollectionSchema, insertData *storage.InsertData, nullkey string) ([][]string, error) { rowNum := insertData.GetRowNum() csvData := make([][]string, 0, rowNum+1) + // Build header - regular fields and struct array fields (not sub-fields) header := make([]string, 0) - fields := lo.Filter(schema.GetFields(), func(field *schemapb.FieldSchema, _ int) bool { - return !field.GetAutoID() && !field.IsFunctionOutput + + // Track which field IDs belong to struct array sub-fields + structSubFieldIDs := make(map[int64]bool) + for _, structField := range schema.GetStructArrayFields() { + for _, subField := range structField.GetFields() { + structSubFieldIDs[subField.GetFieldID()] = true + } + } + + // Add regular fields to header (excluding struct array sub-fields) + allFields := typeutil.GetAllFieldSchemas(schema) + fields := lo.Filter(allFields, func(field *schemapb.FieldSchema, _ int) bool { + return !field.GetAutoID() && !field.IsFunctionOutput && !structSubFieldIDs[field.GetFieldID()] }) nameToFields := lo.KeyBy(fields, func(field *schemapb.FieldSchema) string { name := field.GetName() header = append(header, name) return name }) + + // Build map for struct array fields for quick lookup + structArrayFields := make(map[string]*schemapb.StructArrayFieldSchema) + for _, structField := range schema.GetStructArrayFields() { + structArrayFields[structField.GetName()] = structField + header = append(header, structField.GetName()) + } + csvData = append(csvData, header) for i := 0; i < rowNum; i++ { data := make([]string, 0) for _, name := range header { + if structArrayField, ok := structArrayFields[name]; ok { + structArrayData, err := reconstructStructArrayForCSV(structArrayField, insertData, i) + if err != nil { + return nil, err + } + data = append(data, structArrayData) + continue + } + + // Handle regular field field := nameToFields[name] value := insertData.Data[field.FieldID] dataType := field.GetDataType() @@ -877,6 +1222,10 @@ func CreateInsertDataForCSV(schema *schemapb.CollectionSchema, insertData *stora return nil, err } data = append(data, string(j)) + case schemapb.DataType_ArrayOfVector: + // ArrayOfVector should not appear as a top-level field + // It can only be a sub-field in struct arrays + panic("ArrayOfVector cannot be a top-level field") default: str := fmt.Sprintf("%v", value.GetRow(i)) data = append(data, str) diff --git a/pkg/util/parameterutil/get_max_len.go b/pkg/util/parameterutil/get_max_len.go index 253e5d4fca..f025e932ce 100644 --- a/pkg/util/parameterutil/get_max_len.go +++ b/pkg/util/parameterutil/get_max_len.go @@ -32,8 +32,8 @@ func GetMaxLength(field *schemapb.FieldSchema) (int64, error) { // GetMaxCapacity get max capacity of array field. Maybe also helpful outside. func GetMaxCapacity(field *schemapb.FieldSchema) (int64, error) { - if !typeutil.IsArrayType(field.GetDataType()) { - msg := fmt.Sprintf("%s is not of array type", field.GetDataType()) + if !typeutil.IsArrayType(field.GetDataType()) && !typeutil.IsVectorArrayType(field.GetDataType()) { + msg := fmt.Sprintf("%s is not of array/vector array type", field.GetDataType()) return 0, merr.WrapErrParameterInvalid(schemapb.DataType_Array, field.GetDataType(), msg) } h := typeutil.NewKvPairs(append(field.GetIndexParams(), field.GetTypeParams()...)) diff --git a/pkg/util/typeutil/schema.go b/pkg/util/typeutil/schema.go index d005fe5e0d..85d37aaf47 100644 --- a/pkg/util/typeutil/schema.go +++ b/pkg/util/typeutil/schema.go @@ -104,7 +104,8 @@ func EstimateAvgSizePerRecord(schema *schemapb.CollectionSchema) (int, error) { func estimateSizeBy(schema *schemapb.CollectionSchema, policy getVariableFieldLengthPolicy) (int, error) { res := 0 - for _, fs := range schema.Fields { + allFields := GetAllFieldSchemas(schema) + for _, fs := range allFields { switch fs.DataType { case schemapb.DataType_Bool, schemapb.DataType_Int8: res++ @@ -170,6 +171,31 @@ func estimateSizeBy(schema *schemapb.CollectionSchema, policy getVariableFieldLe break } } + case schemapb.DataType_ArrayOfVector: + dim := 0 + for _, kv := range fs.TypeParams { + if kv.Key == common.DimKey { + v, err := strconv.Atoi(kv.Value) + if err != nil { + return -1, err + } + dim = v + } + } + assumedArrayLen := 10 + // Estimate size based on element type + switch fs.ElementType { + case schemapb.DataType_FloatVector: + res += assumedArrayLen * dim * 4 + case schemapb.DataType_BinaryVector: + res += assumedArrayLen * (dim / 8) + case schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector: + res += assumedArrayLen * dim * 2 + case schemapb.DataType_Int8Vector: + res += assumedArrayLen * dim + default: + return 0, fmt.Errorf("unsupported element type in VectorArray: %s", fs.ElementType.String()) + } } } return res, nil diff --git a/tests/integration/import/binlog_test.go b/tests/integration/import/binlog_test.go index 1d421f145b..58da7d2fcc 100644 --- a/tests/integration/import/binlog_test.go +++ b/tests/integration/import/binlog_test.go @@ -63,7 +63,7 @@ func (s *BulkInsertSuite) PrepareSourceCollection(dim int, dmlGroup *DMLGroup) * collectionName := "TestBinlogImport_A_" + funcutil.GenRandomStr() - schema := integration.ConstructSchema(collectionName, dim, true) + schema := integration.ConstructSchemaOfVecDataTypeWithStruct(collectionName, dim, true) marshaledSchema, err := proto.Marshal(schema) s.NoError(err) @@ -96,6 +96,16 @@ func (s *BulkInsertSuite) PrepareSourceCollection(dim int, dmlGroup *DMLGroup) * s.NoError(merr.CheckRPCCall(createIndexStatus, err)) s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField) + createIndexResult, err := c.MilvusClient.CreateIndex(context.TODO(), &milvuspb.CreateIndexRequest{ + CollectionName: collectionName, + FieldName: integration.StructSubFloatVecField, + IndexName: "array_of_vector_index", + ExtraParams: integration.ConstructIndexParam(dim, integration.IndexEmbListHNSW, metric.MaxSim), + }) + s.NoError(err) + s.Require().Equal(createIndexResult.GetErrorCode(), commonpb.ErrorCode_Success) + s.WaitForIndexBuilt(context.TODO(), collectionName, integration.StructSubFloatVecField) + // load loadStatus, err := c.MilvusClient.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ CollectionName: collectionName, @@ -123,10 +133,11 @@ func (s *BulkInsertSuite) PrepareSourceCollection(dim int, dmlGroup *DMLGroup) * totalDeleteRowNum += delRow fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, insRow, dim) + structColumn := integration.NewStructArrayFieldData(schema.StructArrayFields[0], integration.StructArrayField, insRow, dim) hashKeys := integration.GenerateHashKeys(insRow) insertResult, err := c.MilvusClient.Insert(ctx, &milvuspb.InsertRequest{ CollectionName: collectionName, - FieldsData: []*schemapb.FieldData{fVecColumn}, + FieldsData: []*schemapb.FieldData{fVecColumn, structColumn}, HashKeys: hashKeys, NumRows: uint32(insRow), }) diff --git a/tests/integration/import/vector_array_test.go b/tests/integration/import/vector_array_test.go new file mode 100644 index 0000000000..f614061bb1 --- /dev/null +++ b/tests/integration/import/vector_array_test.go @@ -0,0 +1,306 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package importv2 + +import ( + "context" + "fmt" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.uber.org/zap" + "google.golang.org/protobuf/proto" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/util/importutilv2" + "github.com/milvus-io/milvus/internal/util/testutil" + "github.com/milvus-io/milvus/pkg/v2/common" + "github.com/milvus-io/milvus/pkg/v2/log" + "github.com/milvus-io/milvus/pkg/v2/proto/internalpb" + "github.com/milvus-io/milvus/pkg/v2/util/funcutil" + "github.com/milvus-io/milvus/pkg/v2/util/metric" + "github.com/milvus-io/milvus/tests/integration" +) + +func TestGenerateJsonFileWithVectorArray(t *testing.T) { + const ( + rowCount = 100 + dim = 32 + maxArrayCapacity = 10 + ) + + collectionName := "TestBulkInsert_VectorArray_" + funcutil.GenRandomStr() + + // Create schema with StructArrayField containing vector array + schema := integration.ConstructSchema(collectionName, 0, true, &schemapb.FieldSchema{ + FieldID: 100, + Name: integration.Int64Field, + IsPrimaryKey: true, + DataType: schemapb.DataType_Int64, + AutoID: false, + }, &schemapb.FieldSchema{ + FieldID: 101, + Name: integration.VarCharField, + DataType: schemapb.DataType_VarChar, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.MaxLengthKey, + Value: "256", + }, + }, + }) + + // Add StructArrayField with vector array + structField := &schemapb.StructArrayFieldSchema{ + FieldID: 102, + Name: "struct_with_vector_array", + Fields: []*schemapb.FieldSchema{ + { + FieldID: 103, + Name: "vector_array_field", + IsPrimaryKey: false, + DataType: schemapb.DataType_ArrayOfVector, + ElementType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: strconv.Itoa(dim), + }, + { + Key: common.MaxCapacityKey, + Value: strconv.Itoa(maxArrayCapacity), + }, + }, + }, + { + FieldID: 104, + Name: "scalar_array_field", + IsPrimaryKey: false, + DataType: schemapb.DataType_Array, + ElementType: schemapb.DataType_Int32, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.MaxCapacityKey, + Value: strconv.Itoa(maxArrayCapacity), + }, + }, + }, + }, + } + schema.StructArrayFields = []*schemapb.StructArrayFieldSchema{structField} + schema.EnableDynamicField = false + + insertData, err := testutil.CreateInsertData(schema, rowCount) + assert.NoError(t, err) + + rows, err := testutil.CreateInsertDataRowsForJSON(schema, insertData) + assert.NoError(t, err) + fmt.Println(rows) +} + +func (s *BulkInsertSuite) runForStructArray() { + const ( + rowCount = 100 + dim = 32 + maxArrayCapacity = 10 + ) + + c := s.Cluster + ctx, cancel := context.WithTimeout(c.GetContext(), 600*time.Second) + defer cancel() + + collectionName := "TestBulkInsert_VectorArray_" + funcutil.GenRandomStr() + + // Create schema with StructArrayField containing vector array + schema := integration.ConstructSchema(collectionName, 0, true, &schemapb.FieldSchema{ + FieldID: 100, + Name: integration.Int64Field, + IsPrimaryKey: true, + DataType: schemapb.DataType_Int64, + AutoID: false, + }, &schemapb.FieldSchema{ + FieldID: 101, + Name: integration.VarCharField, + DataType: schemapb.DataType_VarChar, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.MaxLengthKey, + Value: "256", + }, + }, + }) + + // Add StructArrayField with vector array + structField := &schemapb.StructArrayFieldSchema{ + FieldID: 102, + Name: "struct_with_vector_array", + Fields: []*schemapb.FieldSchema{ + { + FieldID: 103, + Name: "vector_array_field", + IsPrimaryKey: false, + DataType: schemapb.DataType_ArrayOfVector, + ElementType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: strconv.Itoa(dim), + }, + { + Key: common.MaxCapacityKey, + Value: strconv.Itoa(maxArrayCapacity), + }, + }, + }, + { + FieldID: 104, + Name: "scalar_array_field", + IsPrimaryKey: false, + DataType: schemapb.DataType_Array, + ElementType: schemapb.DataType_Int32, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.MaxCapacityKey, + Value: strconv.Itoa(maxArrayCapacity), + }, + }, + }, + }, + } + schema.StructArrayFields = []*schemapb.StructArrayFieldSchema{structField} + schema.EnableDynamicField = false + + marshaledSchema, err := proto.Marshal(schema) + s.NoError(err) + + createCollectionStatus, err := c.MilvusClient.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ + DbName: "", + CollectionName: collectionName, + Schema: marshaledSchema, + ShardsNum: common.DefaultShardsNum, + }) + s.NoError(err) + s.Equal(int32(0), createCollectionStatus.GetCode()) + + var files []*internalpb.ImportFile + + options := []*commonpb.KeyValuePair{} + + switch s.fileType { + case importutilv2.JSON: + rowBasedFile := GenerateJSONFile(s.T(), c, schema, rowCount) + files = []*internalpb.ImportFile{ + { + Paths: []string{ + rowBasedFile, + }, + }, + } + case importutilv2.Parquet: + filePath, err := GenerateParquetFile(s.Cluster, schema, rowCount) + s.NoError(err) + files = []*internalpb.ImportFile{ + { + Paths: []string{ + filePath, + }, + }, + } + case importutilv2.CSV: + filePath, sep := GenerateCSVFile(s.T(), s.Cluster, schema, rowCount) + options = []*commonpb.KeyValuePair{{Key: "sep", Value: string(sep)}} + s.NoError(err) + files = []*internalpb.ImportFile{ + { + Paths: []string{ + filePath, + }, + }, + } + } + + // Import data + importResp, err := c.ProxyClient.ImportV2(ctx, &internalpb.ImportRequest{ + CollectionName: collectionName, + Files: files, + Options: options, + }) + s.NoError(err) + s.NotNil(importResp) + s.Equal(int32(0), importResp.GetStatus().GetCode()) + + log.Info("Import response", zap.Any("resp", importResp)) + jobID := importResp.GetJobID() + + // Wait for import to complete + err = WaitForImportDone(ctx, s.Cluster, jobID) + s.NoError(err) + + // Create index for vector array field + createIndexStatus, err := c.MilvusClient.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ + CollectionName: collectionName, + FieldName: "vector_array_field", + IndexName: "_default_idx", + ExtraParams: integration.ConstructIndexParam(dim, s.indexType, s.metricType), + }) + if err == nil { + s.Equal(int32(0), createIndexStatus.GetCode(), createIndexStatus.GetReason()) + } + + // Load collection + loadStatus, err := c.MilvusClient.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ + CollectionName: collectionName, + }) + s.NoError(err) + s.Equal(int32(0), loadStatus.GetCode(), loadStatus.GetReason()) + s.WaitForLoad(ctx, collectionName) + + // search + nq := 10 + topk := 10 + + outputFields := []string{"vector_array_field"} + params := integration.GetSearchParams(s.indexType, s.metricType) + searchReq := integration.ConstructEmbeddingListSearchRequest("", collectionName, "", + "vector_array_field", s.vecType, outputFields, s.metricType, params, nq, dim, topk, -1) + + searchResp, err := s.Cluster.MilvusClient.Search(ctx, searchReq) + s.Require().NoError(err) + s.Require().Equal(commonpb.ErrorCode_Success, searchResp.GetStatus().GetErrorCode(), searchResp.GetStatus().GetReason()) + + result := searchResp.GetResults() + s.Require().Len(result.GetIds().GetIntId().GetData(), nq*topk) + s.Require().Len(result.GetScores(), nq*topk) + s.Require().GreaterOrEqual(len(result.GetFieldsData()), 1) + s.Require().EqualValues(nq, result.GetNumQueries()) + s.Require().EqualValues(topk, result.GetTopK()) +} + +func (s *BulkInsertSuite) TestImportWithVectorArray() { + fileTypeArr := []importutilv2.FileType{importutilv2.CSV, importutilv2.Parquet, importutilv2.JSON} + for _, fileType := range fileTypeArr { + s.fileType = fileType + s.vecType = schemapb.DataType_FloatVector + s.indexType = integration.IndexEmbListHNSW + s.metricType = metric.MaxSim + s.runForStructArray() + } +}