feat: impl StructArray -- support import for CSV/JSON/PARQUET/BINLOG (#44201)

Ref https://github.com/milvus-io/milvus/issues/42148

---------

Signed-off-by: SpadeA <tangchenjie1210@gmail.com>
This commit is contained in:
Spade A 2025-09-15 20:41:59 +08:00 committed by GitHub
parent 2b2a11afba
commit eb793531b9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 1538 additions and 86 deletions

View File

@ -143,7 +143,8 @@ func CheckRowsEqual(schema *schemapb.CollectionSchema, data *storage.InsertData)
if len(data.Data) == 0 { if len(data.Data) == 0 {
return nil return nil
} }
idToField := lo.KeyBy(schema.GetFields(), func(field *schemapb.FieldSchema) int64 { allFields := typeutil.GetAllFieldSchemas(schema)
idToField := lo.KeyBy(allFields, func(field *schemapb.FieldSchema) int64 {
return field.GetFieldID() return field.GetFieldID()
}) })
@ -238,7 +239,8 @@ func IsFillableField(field *schemapb.FieldSchema) bool {
} }
func AppendNullableDefaultFieldsData(schema *schemapb.CollectionSchema, data *storage.InsertData, rowNum int) error { func AppendNullableDefaultFieldsData(schema *schemapb.CollectionSchema, data *storage.InsertData, rowNum int) error {
for _, field := range schema.GetFields() { allFields := typeutil.GetAllFieldSchemas(schema)
for _, field := range allFields {
if !IsFillableField(field) { if !IsFillableField(field) {
continue continue
} }

View File

@ -114,12 +114,12 @@ func NewPayloadWriter(colType schemapb.DataType, options ...PayloadWriterOptions
if w.elementType == nil { if w.elementType == nil {
return nil, merr.WrapErrParameterInvalidMsg("ArrayOfVector requires elementType, use WithElementType option") return nil, merr.WrapErrParameterInvalidMsg("ArrayOfVector requires elementType, use WithElementType option")
} }
arrowType, err := VectorArrayToArrowType(*w.elementType) elemType, err := VectorArrayToArrowType(*w.elementType)
if err != nil { if err != nil {
return nil, err return nil, err
} }
w.arrowType = arrowType w.arrowType = arrow.ListOf(elemType)
w.builder = array.NewListBuilder(memory.DefaultAllocator, arrowType.(*arrow.ListType).Elem()) w.builder = array.NewListBuilder(memory.DefaultAllocator, elemType)
} else { } else {
w.arrowType = MilvusDataTypeToArrowType(colType, *w.dim.Value) w.arrowType = MilvusDataTypeToArrowType(colType, *w.dim.Value)
w.builder = array.NewBuilder(memory.DefaultAllocator, w.arrowType) w.builder = array.NewBuilder(memory.DefaultAllocator, w.arrowType)
@ -899,28 +899,6 @@ func MilvusDataTypeToArrowType(dataType schemapb.DataType, dim int) arrow.DataTy
} }
} }
// VectorArrayToArrowType converts VectorArray type with elementType to Arrow ListArray type
func VectorArrayToArrowType(elementType schemapb.DataType) (arrow.DataType, error) {
var childType arrow.DataType
switch elementType {
case schemapb.DataType_FloatVector:
childType = arrow.PrimitiveTypes.Float32
case schemapb.DataType_BinaryVector:
return nil, merr.WrapErrParameterInvalidMsg("BinaryVector in VectorArray not implemented yet")
case schemapb.DataType_Float16Vector:
return nil, merr.WrapErrParameterInvalidMsg("Float16Vector in VectorArray not implemented yet")
case schemapb.DataType_BFloat16Vector:
return nil, merr.WrapErrParameterInvalidMsg("BFloat16Vector in VectorArray not implemented yet")
case schemapb.DataType_Int8Vector:
return nil, merr.WrapErrParameterInvalidMsg("Int8Vector in VectorArray not implemented yet")
default:
return nil, merr.WrapErrParameterInvalidMsg(fmt.Sprintf("unsupported element type in VectorArray: %s", elementType.String()))
}
return arrow.ListOf(childType), nil
}
// AddVectorArrayFieldDataToPayload adds VectorArrayFieldData to payload using Arrow ListArray // AddVectorArrayFieldDataToPayload adds VectorArrayFieldData to payload using Arrow ListArray
func (w *NativePayloadWriter) AddVectorArrayFieldDataToPayload(data *VectorArrayFieldData) error { func (w *NativePayloadWriter) AddVectorArrayFieldDataToPayload(data *VectorArrayFieldData) error {
if w.finished { if w.finished {

View File

@ -125,8 +125,8 @@ func (crr *CompositeBinlogRecordReader) Next() (Record, error) {
} }
recs := make([]arrow.Array, fieldNum) recs := make([]arrow.Array, fieldNum)
idx := 0
appendFieldRecord := func(f *schemapb.FieldSchema) error { appendFieldRecord := func(f *schemapb.FieldSchema) error {
idx := crr.index[f.FieldID]
if crr.rrs[idx] != nil { if crr.rrs[idx] != nil {
if ok := crr.rrs[idx].Next(); !ok { if ok := crr.rrs[idx].Next(); !ok {
return io.EOF return io.EOF
@ -143,7 +143,6 @@ func (crr *CompositeBinlogRecordReader) Next() (Record, error) {
} }
recs[idx] = arr recs[idx] = arr
} }
idx++
return nil return nil
} }

View File

@ -1641,3 +1641,23 @@ func SortFieldBinlogs(fieldBinlogs map[int64]*datapb.FieldBinlog) []*datapb.Fiel
} }
return binlogs return binlogs
} }
// VectorArrayToArrowType converts VectorArray element type to the corresponding Arrow type
// Note: This returns the element type (e.g., float32), not a list type
// The caller is responsible for wrapping it in a list if needed
func VectorArrayToArrowType(elementType schemapb.DataType) (arrow.DataType, error) {
switch elementType {
case schemapb.DataType_FloatVector:
return arrow.PrimitiveTypes.Float32, nil
case schemapb.DataType_BinaryVector:
return nil, merr.WrapErrParameterInvalidMsg("BinaryVector in VectorArray not implemented yet")
case schemapb.DataType_Float16Vector:
return nil, merr.WrapErrParameterInvalidMsg("Float16Vector in VectorArray not implemented yet")
case schemapb.DataType_BFloat16Vector:
return nil, merr.WrapErrParameterInvalidMsg("BFloat16Vector in VectorArray not implemented yet")
case schemapb.DataType_Int8Vector:
return nil, merr.WrapErrParameterInvalidMsg("Int8Vector in VectorArray not implemented yet")
default:
return nil, merr.WrapErrParameterInvalidMsg(fmt.Sprintf("unsupported element type in VectorArray: %s", elementType.String()))
}
}

View File

@ -211,8 +211,9 @@ func (r *reader) Read() (*storage.InsertData, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
allFields := typeutil.GetAllFieldSchemas(r.schema)
// convert record to fieldData // convert record to fieldData
for _, field := range r.schema.Fields { for _, field := range allFields {
fieldData := insertData.Data[field.GetFieldID()] fieldData := insertData.Data[field.GetFieldID()]
if fieldData == nil { if fieldData == nil {
fieldData, err = storage.NewFieldData(field.GetDataType(), field, 1024) fieldData, err = storage.NewFieldData(field.GetDataType(), field, 1024)

View File

@ -100,7 +100,7 @@ func createBinlogBuf(t *testing.T, field *schemapb.FieldSchema, data storage.Fie
dim = 1 dim = 1
} }
evt, err := w.NextInsertEventWriter(storage.WithDim(int(dim)), storage.WithNullable(field.GetNullable())) evt, err := w.NextInsertEventWriter(storage.WithDim(int(dim)), storage.WithNullable(field.GetNullable()), storage.WithElementType(field.GetElementType()))
assert.NoError(t, err) assert.NoError(t, err)
evt.SetEventTimestamp(1, math.MaxInt64) evt.SetEventTimestamp(1, math.MaxInt64)
@ -190,6 +190,17 @@ func createBinlogBuf(t *testing.T, field *schemapb.FieldSchema, data storage.Fie
vectors := data.(*storage.Int8VectorFieldData).Data vectors := data.(*storage.Int8VectorFieldData).Data
err = evt.AddInt8VectorToPayload(vectors, int(dim)) err = evt.AddInt8VectorToPayload(vectors, int(dim))
assert.NoError(t, err) assert.NoError(t, err)
case schemapb.DataType_ArrayOfVector:
elementType := field.GetElementType()
switch elementType {
case schemapb.DataType_FloatVector:
vectors := data.(*storage.VectorArrayFieldData)
err = evt.AddVectorArrayFieldDataToPayload(vectors)
assert.NoError(t, err)
default:
assert.True(t, false)
return nil
}
default: default:
assert.True(t, false) assert.True(t, false)
return nil return nil
@ -254,7 +265,8 @@ func (suite *ReaderSuite) createMockChunk(schema *schemapb.CollectionSchema, ins
paths = make([]string, 0) paths = make([]string, 0)
bytes = make([][]byte, 0) bytes = make([][]byte, 0)
) )
for _, field := range schema.Fields { allFields := typeutil.GetAllFieldSchemas(schema)
for _, field := range allFields {
fieldID := field.GetFieldID() fieldID := field.GetFieldID()
logs, ok := insertBinlogs[fieldID] logs, ok := insertBinlogs[fieldID]
if ok && len(logs) > 0 { if ok && len(logs) > 0 {
@ -327,8 +339,48 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data
Nullable: nullable, Nullable: nullable,
}, },
}, },
StructArrayFields: []*schemapb.StructArrayFieldSchema{
{
FieldID: 103,
Fields: []*schemapb.FieldSchema{
{
FieldID: 104,
Name: "struct_str",
DataType: schemapb.DataType_Array,
ElementType: schemapb.DataType_VarChar,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.MaxLengthKey,
Value: "256",
},
{
Key: common.MaxCapacityKey,
Value: "20",
},
},
},
{
FieldID: 105,
Name: "struct_float_vector",
DataType: schemapb.DataType_ArrayOfVector,
ElementType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.MaxCapacityKey,
Value: "20",
},
{
Key: common.DimKey,
Value: "8",
},
},
},
},
},
},
} }
insertBinlogs := genBinlogPaths(lo.Map(schema.Fields, func(fieldSchema *schemapb.FieldSchema, _ int) int64 { allFields := typeutil.GetAllFieldSchemas(schema)
insertBinlogs := genBinlogPaths(lo.Map(allFields, func(fieldSchema *schemapb.FieldSchema, _ int) int64 {
return fieldSchema.GetFieldID() return fieldSchema.GetFieldID()
})) }))
cm, originalInsertData := suite.createMockChunk(schema, insertBinlogs, true) cm, originalInsertData := suite.createMockChunk(schema, insertBinlogs, true)
@ -383,6 +435,8 @@ OUTER:
} else { } else {
suite.True(slices.Equal(expect.(*schemapb.ScalarField).GetIntData().GetData(), actual.(*schemapb.ScalarField).GetIntData().GetData())) suite.True(slices.Equal(expect.(*schemapb.ScalarField).GetIntData().GetData(), actual.(*schemapb.ScalarField).GetIntData().GetData()))
} }
} else if fieldDataType == schemapb.DataType_ArrayOfVector {
suite.True(slices.Equal(expect.(*schemapb.VectorField).GetFloatVector().GetData(), actual.(*schemapb.VectorField).GetFloatVector().GetData()))
} else { } else {
suite.Equal(expect, actual) suite.Equal(expect, actual)
} }

View File

@ -140,8 +140,9 @@ func verify(schema *schemapb.CollectionSchema, storageVersion int64, insertLogs
} }
} }
allFields := typeutil.GetAllFieldSchemas(schema)
if storageVersion == storage.StorageV2 { if storageVersion == storage.StorageV2 {
for _, field := range schema.GetFields() { for _, field := range allFields {
if typeutil.IsVectorType(field.GetDataType()) { if typeutil.IsVectorType(field.GetDataType()) {
if _, ok := insertLogs[field.GetFieldID()]; !ok { if _, ok := insertLogs[field.GetFieldID()]; !ok {
// vector field must be provided // vector field must be provided
@ -174,8 +175,9 @@ func verify(schema *schemapb.CollectionSchema, storageVersion int64, insertLogs
// Here we copy the schema for reading part of collection's data. The storage.NewBinlogRecordReader() requires // Here we copy the schema for reading part of collection's data. The storage.NewBinlogRecordReader() requires
// a schema and the schema must be consistent with the binglog files([]*datapb.FieldBinlog) // a schema and the schema must be consistent with the binglog files([]*datapb.FieldBinlog)
cloneSchema := typeutil.Clone(schema) cloneSchema := typeutil.Clone(schema)
cloneSchema.Fields = []*schemapb.FieldSchema{} // the Fields will be reset according to the validInsertLogs cloneSchema.Fields = []*schemapb.FieldSchema{} // the Fields will be reset according to the validInsertLogs
cloneSchema.EnableDynamicField = false // this flag will be reset cloneSchema.StructArrayFields = []*schemapb.StructArrayFieldSchema{} // the StructArrayFields will be reset according to the validInsertLogs
cloneSchema.EnableDynamicField = false // this flag will be reset
// this loop will reset the cloneSchema.Fields and return validInsertLogs // this loop will reset the cloneSchema.Fields and return validInsertLogs
validInsertLogs := make(map[int64][]string) validInsertLogs := make(map[int64][]string)
@ -205,5 +207,18 @@ func verify(schema *schemapb.CollectionSchema, storageVersion int64, insertLogs
} }
} }
for _, structArrayField := range schema.GetStructArrayFields() {
for _, field := range structArrayField.GetFields() {
id := field.GetFieldID()
logs, ok := insertLogs[id]
if !ok {
return nil, nil, merr.WrapErrImportFailed(fmt.Sprintf("no binlog for struct field:%s", field.GetName()))
}
validInsertLogs[id] = logs
}
cloneSchema.StructArrayFields = append(cloneSchema.StructArrayFields, structArrayField)
}
return validInsertLogs, cloneSchema, nil return validInsertLogs, cloneSchema, nil
} }

View File

@ -36,12 +36,14 @@ type RowParser interface {
Parse(raw []string) (Row, error) Parse(raw []string) (Row, error)
} }
type rowParser struct { type rowParser struct {
nullkey string nullkey string
header []string header []string
name2Dim map[string]int name2Dim map[string]int
name2Field map[string]*schemapb.FieldSchema name2Field map[string]*schemapb.FieldSchema
pkField *schemapb.FieldSchema structArrays map[string]map[string]*schemapb.FieldSchema
dynamicField *schemapb.FieldSchema structArraySubFields map[string]interface{}
pkField *schemapb.FieldSchema
dynamicField *schemapb.FieldSchema
} }
func NewRowParser(schema *schemapb.CollectionSchema, header []string, nullkey string) (RowParser, error) { func NewRowParser(schema *schemapb.CollectionSchema, header []string, nullkey string) (RowParser, error) {
@ -58,8 +60,10 @@ func NewRowParser(schema *schemapb.CollectionSchema, header []string, nullkey st
} }
} }
allFields := typeutil.GetAllFieldSchemas(schema)
name2Field := lo.SliceToMap( name2Field := lo.SliceToMap(
lo.Filter(schema.GetFields(), func(field *schemapb.FieldSchema, _ int) bool { lo.Filter(allFields, func(field *schemapb.FieldSchema, _ int) bool {
return !field.GetIsFunctionOutput() && !typeutil.IsAutoPKField(field) && field.GetName() != dynamicField.GetName() return !field.GetIsFunctionOutput() && !typeutil.IsAutoPKField(field) && field.GetName() != dynamicField.GetName()
}), }),
func(field *schemapb.FieldSchema) (string, *schemapb.FieldSchema) { func(field *schemapb.FieldSchema) (string, *schemapb.FieldSchema) {
@ -67,6 +71,17 @@ func NewRowParser(schema *schemapb.CollectionSchema, header []string, nullkey st
}, },
) )
structArrays := make(map[string]map[string]*schemapb.FieldSchema)
structArraySubFields := make(map[string]interface{})
for _, sa := range schema.GetStructArrayFields() {
structArrays[sa.GetName()] = make(map[string]*schemapb.FieldSchema)
for _, subField := range sa.GetFields() {
structArraySubFields[subField.GetName()] = nil
structArrays[sa.GetName()][subField.GetName()] = subField
}
}
name2Dim := make(map[string]int) name2Dim := make(map[string]int)
for name, field := range name2Field { for name, field := range name2Field {
if typeutil.IsVectorType(field.GetDataType()) && !typeutil.IsSparseFloatVectorType(field.GetDataType()) { if typeutil.IsVectorType(field.GetDataType()) && !typeutil.IsSparseFloatVectorType(field.GetDataType()) {
@ -103,7 +118,8 @@ func NewRowParser(schema *schemapb.CollectionSchema, header []string, nullkey st
for fieldName, field := range name2Field { for fieldName, field := range name2Field {
_, existInHeader := headerMap[fieldName] _, existInHeader := headerMap[fieldName]
if field.GetNullable() || field.GetDefaultValue() != nil { _, subField := structArraySubFields[fieldName]
if field.GetNullable() || field.GetDefaultValue() != nil || subField {
// nullable/defaultValue fields, provide or not provide both ok // nullable/defaultValue fields, provide or not provide both ok
} else if !existInHeader { } else if !existInHeader {
// not nullable/defaultValue/autoPK/functionOutput fields, must provide // not nullable/defaultValue/autoPK/functionOutput fields, must provide
@ -113,15 +129,76 @@ func NewRowParser(schema *schemapb.CollectionSchema, header []string, nullkey st
} }
return &rowParser{ return &rowParser{
nullkey: nullkey, nullkey: nullkey,
name2Dim: name2Dim, name2Dim: name2Dim,
header: header, header: header,
name2Field: name2Field, name2Field: name2Field,
pkField: pkField, structArrays: structArrays,
dynamicField: dynamicField, structArraySubFields: structArraySubFields,
pkField: pkField,
dynamicField: dynamicField,
}, nil }, nil
} }
// reconstructArrayForStructArray reconstructs the StructArray data format from CSV.
// StructArray are passed in with format for one row: StructArray: [element 1, element 2, ...]
// where each element contains one value of all sub-fields in StructArrayField.
// So we need to reconstruct it to be handled by handleField.
//
// For example, let StructArrayFieldSchema { sub-field1: array of int32, sub-field2: array of float vector }
// When we have one row in CSV (as JSON string):
//
// "[{\"sub-field1\": 1, \"sub-field2\": [1.0, 2.0]}, {\"sub-field1\": 2, \"sub-field2\": [3.0, 4.0]}]"
//
// we reconstruct it to be handled by handleField as:
//
// {"sub-field1": "[1, 2]", "sub-field2": "[[1.0, 2.0], [3.0, 4.0]]"}
func (r *rowParser) reconstructArrayForStructArray(subFieldsMap map[string]*schemapb.FieldSchema, raw string) (map[string]string, error) {
// Parse the JSON array string
var rows []any
dec := json.NewDecoder(strings.NewReader(raw))
dec.UseNumber()
if err := dec.Decode(&rows); err != nil {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("invalid StructArray format in CSV, failed to parse JSON: %v", err))
}
buf := make(map[string][]any)
for _, elem := range rows {
row, ok := elem.(map[string]any)
if !ok {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("invalid element in StructArray, expect map[string]any but got type %T", elem))
}
for key, value := range row {
field, ok := subFieldsMap[key]
if !ok {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("field %s not found", key))
}
strVal, ok := value.(string)
if !ok {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("invalid value type for field %s, expect string but got %T", key, value))
}
data, err := r.parseEntity(field, strVal, true)
if err != nil {
return nil, err
}
buf[key] = append(buf[key], data)
}
}
// Convert aggregated arrays to JSON strings
out := make(map[string]string, len(buf))
for k, v := range buf {
// Marshal the array as JSON string so it can be parsed by parseEntity
jsonBytes, err := json.Marshal(v)
if err != nil {
return nil, err
}
out[k] = string(jsonBytes)
}
return out, nil
}
func (r *rowParser) Parse(strArr []string) (Row, error) { func (r *rowParser) Parse(strArr []string) (Row, error) {
if len(strArr) != len(r.header) { if len(strArr) != len(r.header) {
return nil, merr.WrapErrImportFailed("the number of fields in the row is not equal to the header") return nil, merr.WrapErrImportFailed("the number of fields in the row is not equal to the header")
@ -131,8 +208,25 @@ func (r *rowParser) Parse(strArr []string) (Row, error) {
dynamicValues := make(map[string]string) dynamicValues := make(map[string]string)
// read values from csv file // read values from csv file
for index, value := range strArr { for index, value := range strArr {
if field, ok := r.name2Field[r.header[index]]; ok { if subFieldsMap, ok := r.structArrays[r.header[index]]; ok {
data, err := r.parseEntity(field, value) values, err := r.reconstructArrayForStructArray(subFieldsMap, value)
if err != nil {
return nil, err
}
for subKey, subValue := range values {
field, ok := r.name2Field[subKey]
if !ok {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("field %s not found", subKey))
}
data, err := r.parseEntity(field, subValue, false)
if err != nil {
return nil, err
}
row[field.GetFieldID()] = data
}
} else if field, ok := r.name2Field[r.header[index]]; ok {
data, err := r.parseEntity(field, value, false)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -160,7 +254,8 @@ func (r *rowParser) Parse(strArr []string) (Row, error) {
row[fieldID] = data row[fieldID] = data
} }
} }
if _, ok := row[fieldID]; !ok { _, subField := r.structArraySubFields[fieldName]
if _, ok := row[fieldID]; !ok && !subField {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("value of field '%s' is missed", fieldName)) return nil, merr.WrapErrImportFailed(fmt.Sprintf("value of field '%s' is missed", fieldName))
} }
} }
@ -219,7 +314,7 @@ func (r *rowParser) combineDynamicRow(dynamicValues map[string]string, row Row)
return nil return nil
} }
func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string) (any, error) { func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string, useElementType bool) (any, error) {
if field.GetDefaultValue() != nil && obj == r.nullkey { if field.GetDefaultValue() != nil && obj == r.nullkey {
return nullutil.GetDefaultValue(field) return nullutil.GetDefaultValue(field)
} }
@ -229,7 +324,12 @@ func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string) (any, e
return nil, nil return nil, nil
} }
switch field.GetDataType() { dataType := field.GetDataType()
if useElementType {
dataType = field.GetElementType()
}
switch dataType {
case schemapb.DataType_Bool: case schemapb.DataType_Bool:
b, err := strconv.ParseBool(obj) b, err := strconv.ParseBool(obj)
if err != nil { if err != nil {
@ -360,6 +460,26 @@ func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string) (any, e
return nil, r.wrapDimError(len(vec), field) return nil, r.wrapDimError(len(vec), field)
} }
return vec, nil return vec, nil
case schemapb.DataType_ArrayOfVector:
var vec []interface{}
desc := json.NewDecoder(strings.NewReader(obj))
desc.UseNumber()
err := desc.Decode(&vec)
if err != nil {
return nil, r.wrapTypeError(obj, field)
}
maxCapacity, err := parameterutil.GetMaxCapacity(field)
if err != nil {
return nil, err
}
if err = common.CheckArrayCapacity(len(vec), maxCapacity, field); err != nil {
return nil, err
}
vectorFieldData, err := r.arrayOfVectorToFieldData(vec, field)
if err != nil {
return nil, err
}
return vectorFieldData, nil
case schemapb.DataType_Array: case schemapb.DataType_Array:
var vec []interface{} var vec []interface{}
desc := json.NewDecoder(strings.NewReader(obj)) desc := json.NewDecoder(strings.NewReader(obj))
@ -542,6 +662,58 @@ func (r *rowParser) arrayToFieldData(arr []interface{}, field *schemapb.FieldSch
} }
} }
func (r *rowParser) arrayOfVectorToFieldData(vectors []any, field *schemapb.FieldSchema) (*schemapb.VectorField, error) {
elementType := field.GetElementType()
switch elementType {
case schemapb.DataType_FloatVector:
dim, err := typeutil.GetDim(field)
if err != nil {
return nil, err
}
values := make([]float32, 0, len(vectors)*int(dim))
for _, vectorAny := range vectors {
var vector []float32
v, ok := vectorAny.([]interface{})
if !ok {
return nil, r.wrapTypeError(vectorAny, field)
}
vector = make([]float32, len(v))
for i, elem := range v {
value, ok := elem.(json.Number)
if !ok {
return nil, r.wrapArrayValueTypeError(elem, elementType)
}
num, err := strconv.ParseFloat(value.String(), 32)
if err != nil {
return nil, fmt.Errorf("failed to parse float: %w", err)
}
vector[i] = float32(num)
}
if len(vector) != int(dim) {
return nil, r.wrapDimError(len(vector), field)
}
values = append(values, vector...)
}
return &schemapb.VectorField{
Dim: dim,
Data: &schemapb.VectorField_FloatVector{
FloatVector: &schemapb.FloatArray{
Data: values,
},
},
}, nil
case schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector, schemapb.DataType_BinaryVector,
schemapb.DataType_Int8Vector, schemapb.DataType_SparseFloatVector:
return nil, merr.WrapErrImportFailed(fmt.Sprintf("not implemented element type for CSV: %s", elementType.String()))
default:
return nil, merr.WrapErrImportFailed(fmt.Sprintf("unsupported element type: %s", elementType.String()))
}
}
func (r *rowParser) wrapTypeError(v any, field *schemapb.FieldSchema) error { func (r *rowParser) wrapTypeError(v any, field *schemapb.FieldSchema) error {
return merr.WrapErrImportFailed( return merr.WrapErrImportFailed(
fmt.Sprintf("expected type '%s' for field '%s', got type '%T' with value '%v'", fmt.Sprintf("expected type '%s' for field '%s', got type '%T' with value '%v'",

View File

@ -66,6 +66,45 @@ func (suite *RowParserSuite) setSchema(autoID bool, hasNullable bool, hasDynamic
} }
func (suite *RowParserSuite) createAllTypesSchema() *schemapb.CollectionSchema { func (suite *RowParserSuite) createAllTypesSchema() *schemapb.CollectionSchema {
structArray := &schemapb.StructArrayFieldSchema{
FieldID: 110,
Name: "struct_array",
Fields: []*schemapb.FieldSchema{
{
FieldID: 111,
Name: "sub_float_vector",
DataType: schemapb.DataType_ArrayOfVector,
ElementType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "2",
},
{
Key: "max_capacity",
Value: "4",
},
},
},
{
FieldID: 112,
Name: "sub_str",
DataType: schemapb.DataType_Array,
ElementType: schemapb.DataType_VarChar,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "max_capacity",
Value: "4",
},
{
Key: "max_length",
Value: "8",
},
},
},
},
}
schema := &schemapb.CollectionSchema{ schema := &schemapb.CollectionSchema{
EnableDynamicField: suite.hasDynamic, EnableDynamicField: suite.hasDynamic,
Fields: []*schemapb.FieldSchema{ Fields: []*schemapb.FieldSchema{
@ -294,6 +333,7 @@ func (suite *RowParserSuite) createAllTypesSchema() *schemapb.CollectionSchema {
Nullable: suite.hasNullable, Nullable: suite.hasNullable,
}, },
}, },
StructArrayFields: []*schemapb.StructArrayFieldSchema{structArray},
} }
if suite.hasDynamic { if suite.hasDynamic {
@ -338,6 +378,8 @@ func (suite *RowParserSuite) genAllTypesRowData(resetKey string, resetVal string
rawContent["json"] = "{\"a\": 1}" rawContent["json"] = "{\"a\": 1}"
rawContent["x"] = "2" rawContent["x"] = "2"
rawContent["$meta"] = "{\"dynamic\": \"dummy\"}" rawContent["$meta"] = "{\"dynamic\": \"dummy\"}"
rawContent["struct_array"] = "[{\"sub_float_vector\": \"[0.1, 0.2]\", \"sub_str\": \"hello1\"}, " +
"{\"sub_float_vector\": \"[0.3, 0.4]\", \"sub_str\": \"hello2\"}]"
rawContent[resetKey] = resetVal // reset a value rawContent[resetKey] = resetVal // reset a value
for _, deleteKey := range deleteKeys { for _, deleteKey := range deleteKeys {
@ -555,11 +597,89 @@ func (suite *RowParserSuite) runValid(c *testCase) {
default: default:
continue continue
} }
case schemapb.DataType_ArrayOfVector:
// Handle ArrayOfVector validation
vf, _ := val.(*schemapb.VectorField)
if vf.GetFloatVector() != nil {
// Parse expected vectors from raw string
var expectedVectors [][]float32
err := json.Unmarshal([]byte(rawVal), &expectedVectors)
suite.NoError(err)
// Flatten expected vectors
var expectedFlat []float32
for _, vec := range expectedVectors {
expectedFlat = append(expectedFlat, vec...)
}
suite.Equal(expectedFlat, vf.GetFloatVector().GetData())
}
default: default:
continue continue
} }
} }
// Validate struct array sub-fields
for _, structArray := range schema.GetStructArrayFields() {
// Check if struct_array was provided in the test data
if structArrayRaw, ok := c.content[structArray.GetName()]; ok {
// Parse the struct array JSON
var structArrayData []map[string]any
dec := json.NewDecoder(strings.NewReader(structArrayRaw))
dec.UseNumber()
err := dec.Decode(&structArrayData)
suite.NoError(err)
// For each sub-field in the struct array
for _, subField := range structArray.GetFields() {
val, ok := row[subField.GetFieldID()]
suite.True(ok, "Sub-field %s should exist in row", subField.GetName())
// Validate based on sub-field type
switch subField.GetDataType() {
case schemapb.DataType_ArrayOfVector:
vf, ok := val.(*schemapb.VectorField)
suite.True(ok, "Sub-field %s should be a VectorField", subField.GetName())
// Extract expected vectors from struct array data
var expectedVectors [][]float32
for _, elem := range structArrayData {
if vecStr, ok := elem[subField.GetName()].(string); ok {
var vec []float32
err := json.Unmarshal([]byte(vecStr), &vec)
suite.NoError(err)
expectedVectors = append(expectedVectors, vec)
}
}
// Flatten and compare
var expectedFlat []float32
for _, vec := range expectedVectors {
expectedFlat = append(expectedFlat, vec...)
}
suite.Equal(expectedFlat, vf.GetFloatVector().GetData())
case schemapb.DataType_Array:
sf, ok := val.(*schemapb.ScalarField)
suite.True(ok, "Sub-field %s should be a ScalarField", subField.GetName())
// Extract expected values from struct array data
var expectedValues []string
for _, elem := range structArrayData {
if v, ok := elem[subField.GetName()].(string); ok {
expectedValues = append(expectedValues, v)
}
}
// Compare based on element type
if subField.GetElementType() == schemapb.DataType_VarChar {
suite.Equal(expectedValues, sf.GetStringData().GetData())
}
}
}
}
}
if suite.hasDynamic { if suite.hasDynamic {
val, ok := row[9999] val, ok := row[9999]
suite.True(ok) suite.True(ok)
@ -597,6 +717,9 @@ func (suite *RowParserSuite) TestValid() {
suite.runValid(&testCase{name: "A/N/D nullable field json is nil", content: suite.genAllTypesRowData("json", suite.nullKey)}) suite.runValid(&testCase{name: "A/N/D nullable field json is nil", content: suite.genAllTypesRowData("json", suite.nullKey)})
suite.runValid(&testCase{name: "A/N/D nullable field array_int8 is nil", content: suite.genAllTypesRowData("array_int8", suite.nullKey)}) suite.runValid(&testCase{name: "A/N/D nullable field array_int8 is nil", content: suite.genAllTypesRowData("array_int8", suite.nullKey)})
// Test struct array parsing
suite.runValid(&testCase{name: "A/N/D struct array valid", content: suite.genAllTypesRowData("x", "2")})
suite.nullKey = "ABCDEF" suite.nullKey = "ABCDEF"
suite.runValid(&testCase{name: "A/N/D null key 1", content: suite.genAllTypesRowData("int64", suite.nullKey)}) suite.runValid(&testCase{name: "A/N/D null key 1", content: suite.genAllTypesRowData("int64", suite.nullKey)})
suite.runValid(&testCase{name: "A/N/D null key 2", content: suite.genAllTypesRowData("double", suite.nullKey)}) suite.runValid(&testCase{name: "A/N/D null key 2", content: suite.genAllTypesRowData("double", suite.nullKey)})

View File

@ -42,10 +42,14 @@ type rowParser struct {
pkField *schemapb.FieldSchema pkField *schemapb.FieldSchema
dynamicField *schemapb.FieldSchema dynamicField *schemapb.FieldSchema
functionOutputFields map[string]int64 functionOutputFields map[string]int64
structArrays map[string]interface{}
} }
func NewRowParser(schema *schemapb.CollectionSchema) (RowParser, error) { func NewRowParser(schema *schemapb.CollectionSchema) (RowParser, error) {
id2Field := lo.KeyBy(schema.GetFields(), func(field *schemapb.FieldSchema) int64 { allFields := typeutil.GetAllFieldSchemas(schema)
id2Field := lo.KeyBy(allFields, func(field *schemapb.FieldSchema) int64 {
return field.GetFieldID() return field.GetFieldID()
}) })
@ -71,7 +75,7 @@ func NewRowParser(schema *schemapb.CollectionSchema) (RowParser, error) {
dynamicField := typeutil.GetDynamicField(schema) dynamicField := typeutil.GetDynamicField(schema)
name2FieldID := lo.SliceToMap( name2FieldID := lo.SliceToMap(
lo.Filter(schema.GetFields(), func(field *schemapb.FieldSchema, _ int) bool { lo.Filter(allFields, func(field *schemapb.FieldSchema, _ int) bool {
return !field.GetIsFunctionOutput() && !typeutil.IsAutoPKField(field) && field.GetName() != dynamicField.GetName() return !field.GetIsFunctionOutput() && !typeutil.IsAutoPKField(field) && field.GetName() != dynamicField.GetName()
}), }),
func(field *schemapb.FieldSchema) (string, int64) { func(field *schemapb.FieldSchema) (string, int64) {
@ -79,6 +83,13 @@ func NewRowParser(schema *schemapb.CollectionSchema) (RowParser, error) {
}, },
) )
sturctArrays := lo.SliceToMap(
schema.GetStructArrayFields(),
func(sa *schemapb.StructArrayFieldSchema) (string, interface{}) {
return sa.GetName(), nil
},
)
return &rowParser{ return &rowParser{
id2Dim: id2Dim, id2Dim: id2Dim,
id2Field: id2Field, id2Field: id2Field,
@ -86,6 +97,7 @@ func NewRowParser(schema *schemapb.CollectionSchema) (RowParser, error) {
pkField: pkField, pkField: pkField,
dynamicField: dynamicField, dynamicField: dynamicField,
functionOutputFields: functionOutputFields, functionOutputFields: functionOutputFields,
structArrays: sturctArrays,
}, nil }, nil
} }
@ -109,6 +121,42 @@ func (r *rowParser) wrapArrayValueTypeError(v any, eleType schemapb.DataType) er
eleType.String(), v, v)) eleType.String(), v, v))
} }
// StructArray are passed in with format for one row: StructArray: [element 1, element 2, ...]
// where each element contains one value of all sub-fields in StructArrayField.
// So we need to reconstruct it to be handled by handleField.
//
// For example, let StructArrayFieldSchema { sub-field1: array of int32, sub-field2: array of float vector }
// When we have one row:
//
// [{"sub-field1": 1, "sub-field2": [1.0, 2.0]}, {"sub-field1": 2, "sub-field2": [3.0, 4.0]}],
//
// we reconstruct it to be handled by handleField as:
//
// {"sub-field1": [1, 2], "sub-field2": [[1.0, 2.0], [3.0, 4.0]]}
func reconstructArrayForStructArray(raw any) (map[string]any, error) {
rows, ok := raw.([]any)
if !ok {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("invalid StructArray format in JSON, each row should be a key-value map, but got type %T", raw))
}
buf := make(map[string][]any)
for _, elem := range rows {
row, ok := elem.(map[string]any)
if !ok {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("invalid element in StructArray, expect map[string]any but got type %T", elem))
}
for key, value := range row {
buf[key] = append(buf[key], value)
}
}
out := make(map[string]any, len(buf))
for k, v := range buf {
out[k] = v
}
return out, nil
}
func (r *rowParser) Parse(raw any) (Row, error) { func (r *rowParser) Parse(raw any) (Row, error) {
stringMap, ok := raw.(map[string]any) stringMap, ok := raw.(map[string]any)
if !ok { if !ok {
@ -122,16 +170,12 @@ func (r *rowParser) Parse(raw any) (Row, error) {
row := make(Row) row := make(Row)
dynamicValues := make(map[string]any) dynamicValues := make(map[string]any)
// read values from json file
for key, value := range stringMap {
if _, ok := r.functionOutputFields[key]; ok {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("the field '%s' is output by function, no need to provide", key))
}
handleField := func(key string, value any) error {
if fieldID, ok := r.name2FieldID[key]; ok { if fieldID, ok := r.name2FieldID[key]; ok {
data, err := r.parseEntity(fieldID, value) data, err := r.parseEntity(fieldID, value)
if err != nil { if err != nil {
return nil, err return err
} }
row[fieldID] = data row[fieldID] = data
} else if r.dynamicField != nil { } else if r.dynamicField != nil {
@ -139,7 +183,33 @@ func (r *rowParser) Parse(raw any) (Row, error) {
dynamicValues[key] = value dynamicValues[key] = value
} else { } else {
// from v2.6, we don't intend to return error for redundant fields, just skip it // from v2.6, we don't intend to return error for redundant fields, just skip it
continue return nil
}
return nil
}
// read values from json file
for key, value := range stringMap {
if _, ok := r.functionOutputFields[key]; ok {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("the field '%s' is output by function, no need to provide", key))
}
if _, ok := r.structArrays[key]; ok {
values, err := reconstructArrayForStructArray(value)
if err != nil {
return nil, err
}
for subKey, subValue := range values {
if err := handleField(subKey, subValue); err != nil {
return nil, err
}
}
} else {
if err := handleField(key, value); err != nil {
return nil, err
}
} }
} }
@ -474,6 +544,24 @@ func (r *rowParser) parseEntity(fieldID int64, obj any) (any, error) {
return nil, err return nil, err
} }
return scalarFieldData, nil return scalarFieldData, nil
case schemapb.DataType_ArrayOfVector:
arr, ok := obj.([]interface{})
if !ok {
return nil, r.wrapTypeError(obj, fieldID)
}
maxCapacity, err := parameterutil.GetMaxCapacity(field)
if err != nil {
return nil, err
}
if err = common.CheckArrayCapacity(len(arr), maxCapacity, field); err != nil {
return nil, err
}
vectorFieldData, err := r.arrayOfVectorToFieldData(arr, field)
if err != nil {
return nil, err
}
return vectorFieldData, nil
default: default:
return nil, merr.WrapErrImportFailed( return nil, merr.WrapErrImportFailed(
fmt.Sprintf("parse json failed, unsupport data type: %s", fmt.Sprintf("parse json failed, unsupport data type: %s",
@ -634,3 +722,44 @@ func (r *rowParser) arrayToFieldData(arr []interface{}, field *schemapb.FieldSch
fmt.Sprintf("parse json failed, unsupported array data type '%s'", eleType.String())) fmt.Sprintf("parse json failed, unsupported array data type '%s'", eleType.String()))
} }
} }
func (r *rowParser) arrayOfVectorToFieldData(vectors []any, field *schemapb.FieldSchema) (*schemapb.VectorField, error) {
elementType := field.GetElementType()
fieldID := field.GetFieldID()
switch elementType {
case schemapb.DataType_FloatVector:
values := make([]float32, 0, len(vectors)*10)
dim := r.id2Dim[fieldID]
for _, vectorAny := range vectors {
vector, ok := vectorAny.([]any)
if !ok {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("expected slice as vector, but got %T", vectorAny))
}
for _, v := range vector {
value, ok := v.(json.Number)
if !ok {
return nil, r.wrapTypeError(value, fieldID)
}
num, err := strconv.ParseFloat(value.String(), 32)
if err != nil {
return nil, err
}
values = append(values, float32(num))
}
}
return &schemapb.VectorField{
Dim: int64(dim),
Data: &schemapb.VectorField_FloatVector{
FloatVector: &schemapb.FloatArray{
Data: values,
},
},
}, nil
case schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector, schemapb.DataType_BinaryVector,
schemapb.DataType_Int8Vector, schemapb.DataType_SparseFloatVector:
return nil, merr.WrapErrImportFailed(fmt.Sprintf("not implemented element type: %s", elementType.String()))
default:
return nil, merr.WrapErrImportFailed(fmt.Sprintf("unsupported element type: %s", elementType.String()))
}
}

View File

@ -57,6 +57,45 @@ func (suite *RowParserSuite) SetupTest() {
} }
func (suite *RowParserSuite) createAllTypesSchema() *schemapb.CollectionSchema { func (suite *RowParserSuite) createAllTypesSchema() *schemapb.CollectionSchema {
structArray := &schemapb.StructArrayFieldSchema{
FieldID: 110,
Name: "struct_array",
Fields: []*schemapb.FieldSchema{
{
FieldID: 111,
Name: "sub_float_vector",
DataType: schemapb.DataType_ArrayOfVector,
ElementType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "2",
},
{
Key: "max_capacity",
Value: "4",
},
},
},
{
FieldID: 112,
Name: "sub_str",
DataType: schemapb.DataType_Array,
ElementType: schemapb.DataType_VarChar,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "max_capacity",
Value: "4",
},
{
Key: "max_length",
Value: "8",
},
},
},
},
}
schema := &schemapb.CollectionSchema{ schema := &schemapb.CollectionSchema{
EnableDynamicField: suite.hasDynamic, EnableDynamicField: suite.hasDynamic,
Fields: []*schemapb.FieldSchema{ Fields: []*schemapb.FieldSchema{
@ -285,6 +324,7 @@ func (suite *RowParserSuite) createAllTypesSchema() *schemapb.CollectionSchema {
Nullable: suite.hasNullable, Nullable: suite.hasNullable,
}, },
}, },
StructArrayFields: []*schemapb.StructArrayFieldSchema{structArray},
} }
if suite.hasDynamic { if suite.hasDynamic {
@ -329,6 +369,18 @@ func (suite *RowParserSuite) genAllTypesRowData(resetKey string, resetVal any, d
rawContent["json"] = map[string]any{"a": 1} rawContent["json"] = map[string]any{"a": 1}
rawContent["x"] = 6 rawContent["x"] = 6
rawContent["$meta"] = map[string]any{"dynamic": "dummy"} rawContent["$meta"] = map[string]any{"dynamic": "dummy"}
rawContent["struct_array"] = []any{
// struct array element 1
map[string]any{
"sub_float_vector": []float32{0.1, 0.2},
"sub_str": "hello1",
},
// struct array element 2
map[string]any{
"sub_float_vector": []float32{0.3, 0.4},
"sub_str": "hello2",
},
}
rawContent[resetKey] = resetVal // reset a value rawContent[resetKey] = resetVal // reset a value
for _, deleteKey := range deleteKeys { for _, deleteKey := range deleteKeys {

View File

@ -206,6 +206,12 @@ func (c *FieldReader) Next(count int64) (any, any, error) {
} }
data, err := ReadArrayData(c, count) data, err := ReadArrayData(c, count)
return data, nil, err return data, nil, err
case schemapb.DataType_ArrayOfVector:
if c.field.GetNullable() {
return nil, nil, merr.WrapErrParameterInvalidMsg("not support nullable in vector")
}
data, err := ReadVectorArrayData(c, count)
return data, nil, err
default: default:
return nil, nil, merr.WrapErrImportFailed(fmt.Sprintf("unsupported data type '%s' for field '%s'", return nil, nil, merr.WrapErrImportFailed(fmt.Sprintf("unsupported data type '%s' for field '%s'",
c.field.GetDataType().String(), c.field.GetName())) c.field.GetDataType().String(), c.field.GetName()))
@ -1768,3 +1774,72 @@ func ReadNullableArrayData(pcr *FieldReader, count int64) (any, []bool, error) {
elementType.String(), pcr.field.GetName())) elementType.String(), pcr.field.GetName()))
} }
} }
func ReadVectorArrayData(pcr *FieldReader, count int64) (any, error) {
data := make([]*schemapb.VectorField, 0, count)
maxCapacity, err := parameterutil.GetMaxCapacity(pcr.field)
if err != nil {
return nil, err
}
dim, err := typeutil.GetDim(pcr.field)
if err != nil {
return nil, err
}
chunked, err := pcr.columnReader.NextBatch(count)
if err != nil {
return nil, err
}
if chunked == nil {
return nil, nil
}
elementType := pcr.field.GetElementType()
switch elementType {
case schemapb.DataType_FloatVector:
for _, chunk := range chunked.Chunks() {
if chunk.NullN() > 0 {
return nil, WrapNullRowErr(pcr.field)
}
listReader, ok := chunk.(*array.List)
if !ok {
return nil, WrapTypeErr(pcr.field, chunk.DataType().Name())
}
listFloat32Reader, ok := listReader.ListValues().(*array.Float32)
if !ok {
return nil, WrapTypeErr(pcr.field, chunk.DataType().Name())
}
offsets := listReader.Offsets()
for i := 1; i < len(offsets); i++ {
start, end := offsets[i-1], offsets[i]
floatCount := end - start
if floatCount%int32(dim) != 0 {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("vectors in VectorArray should be aligned with dim: %d", dim))
}
arrLength := floatCount / int32(dim)
if err = common.CheckArrayCapacity(int(arrLength), maxCapacity, pcr.field); err != nil {
return nil, err
}
arrData := make([]float32, floatCount)
copy(arrData, listFloat32Reader.Float32Values()[start:end])
data = append(data, &schemapb.VectorField{
Dim: dim,
Data: &schemapb.VectorField_FloatVector{
FloatVector: &schemapb.FloatArray{
Data: arrData,
},
},
})
}
}
default:
return nil, merr.WrapErrImportFailed(fmt.Sprintf("unsupported data type '%s' for vector field '%s'",
elementType.String(), pcr.field.GetName()))
}
return data, nil
}

View File

@ -33,6 +33,7 @@ import (
"github.com/milvus-io/milvus/internal/util/importutilv2/common" "github.com/milvus-io/milvus/internal/util/importutilv2/common"
"github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
) )
const totalReadBufferSize = int64(64 * 1024 * 1024) const totalReadBufferSize = int64(64 * 1024 * 1024)
@ -59,10 +60,11 @@ func NewReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.Co
return nil, err return nil, err
} }
allFields := typeutil.GetAllFieldSchemas(schema)
// Each ColumnReader consumes ReaderProperties.BufferSize memory independently. // Each ColumnReader consumes ReaderProperties.BufferSize memory independently.
// Therefore, the bufferSize should be divided by the number of columns // Therefore, the bufferSize should be divided by the number of columns
// to ensure total memory usage stays within the intended limit. // to ensure total memory usage stays within the intended limit.
columnReaderBufferSize := totalReadBufferSize / int64(len(schema.GetFields())) columnReaderBufferSize := totalReadBufferSize / int64(len(allFields))
r, err := file.NewParquetReader(cmReader, file.WithReadProps(&parquet.ReaderProperties{ r, err := file.NewParquetReader(cmReader, file.WithReadProps(&parquet.ReaderProperties{
BufferSize: columnReaderBufferSize, BufferSize: columnReaderBufferSize,

View File

@ -635,6 +635,120 @@ func TestParquetReader(t *testing.T) {
suite.Run(t, new(ReaderSuite)) suite.Run(t, new(ReaderSuite))
} }
func TestParquetReaderWithStructArray(t *testing.T) {
ctx := context.Background()
t.Run("test struct array field reading", func(t *testing.T) {
// Create schema with StructArrayField
schema := &schemapb.CollectionSchema{
Name: "test_struct_array",
Fields: []*schemapb.FieldSchema{
{
FieldID: 100,
Name: "id",
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
},
{
FieldID: 101,
Name: "varchar_field",
DataType: schemapb.DataType_VarChar,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.MaxLengthKey, Value: "100"},
},
},
},
StructArrayFields: []*schemapb.StructArrayFieldSchema{
{
FieldID: 200,
Name: "struct_array",
Fields: []*schemapb.FieldSchema{
{
FieldID: 201,
Name: "int_array",
DataType: schemapb.DataType_Array,
ElementType: schemapb.DataType_Int32,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.MaxCapacityKey, Value: "20"},
},
},
{
FieldID: 202,
Name: "float_array",
DataType: schemapb.DataType_Array,
ElementType: schemapb.DataType_Float,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.MaxCapacityKey, Value: "20"},
},
},
{
FieldID: 203,
Name: "vector_array",
DataType: schemapb.DataType_ArrayOfVector,
ElementType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "4"},
{Key: common.MaxCapacityKey, Value: "20"},
},
},
},
},
},
}
// Create test data file
filePath := fmt.Sprintf("/tmp/test_struct_array_%d.parquet", rand.Int())
defer os.Remove(filePath)
numRows := 10
f, err := os.Create(filePath)
assert.NoError(t, err)
// Use writeParquet to create test file
insertData, err := writeParquet(f, schema, numRows, 0)
assert.NoError(t, err)
f.Close()
// Verify the insert data contains struct fields
assert.Contains(t, insertData.Data, int64(201)) // int_array field
assert.Contains(t, insertData.Data, int64(202)) // float_array field
assert.Contains(t, insertData.Data, int64(203)) // vector_array field
// Now test reading the file using ChunkManager
factory := storage.NewChunkManagerFactory("local", objectstorage.RootPath("/tmp"))
cm, err := factory.NewPersistentStorageChunkManager(ctx)
assert.NoError(t, err)
reader, err := NewReader(ctx, cm, schema, filePath, 64*1024*1024)
assert.NoError(t, err)
defer reader.Close()
// Read data
readData, err := reader.Read()
assert.NoError(t, err)
assert.NotNil(t, readData)
// Verify the data includes struct fields
assert.Contains(t, readData.Data, int64(201)) // int_array field ID
assert.Contains(t, readData.Data, int64(202)) // float_array field ID
assert.Contains(t, readData.Data, int64(203)) // vector_array field ID
// Check row count matches
assert.Equal(t, numRows, readData.Data[100].RowNum()) // id field
assert.Equal(t, numRows, readData.Data[101].RowNum()) // varchar_field
assert.Equal(t, numRows, readData.Data[201].RowNum()) // int_array
assert.Equal(t, numRows, readData.Data[202].RowNum()) // float_array
assert.Equal(t, numRows, readData.Data[203].RowNum()) // vector_array
// Verify data content matches
for fieldID, originalData := range insertData.Data {
readFieldData, ok := readData.Data[fieldID]
assert.True(t, ok, "field %d not found in read data", fieldID)
assert.Equal(t, originalData.RowNum(), readFieldData.RowNum(), "row count mismatch for field %d", fieldID)
}
})
}
func TestParquetReaderError(t *testing.T) { func TestParquetReaderError(t *testing.T) {
ctx := context.Background() ctx := context.Background()
cm := mocks.NewChunkManager(t) cm := mocks.NewChunkManager(t)

View File

@ -26,6 +26,7 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil" "github.com/milvus-io/milvus/pkg/v2/util/typeutil"
@ -65,7 +66,9 @@ func WrapNullElementErr(field *schemapb.FieldSchema) error {
} }
func CreateFieldReaders(ctx context.Context, fileReader *pqarrow.FileReader, schema *schemapb.CollectionSchema) (map[int64]*FieldReader, error) { func CreateFieldReaders(ctx context.Context, fileReader *pqarrow.FileReader, schema *schemapb.CollectionSchema) (map[int64]*FieldReader, error) {
nameToField := lo.KeyBy(schema.GetFields(), func(field *schemapb.FieldSchema) string { // Create map for all fields including sub-fields from StructArrayFields
allFields := typeutil.GetAllFieldSchemas(schema)
nameToField := lo.KeyBy(allFields, func(field *schemapb.FieldSchema) string {
return field.GetName() return field.GetName()
}) })
@ -285,6 +288,19 @@ func convertToArrowDataType(field *schemapb.FieldSchema, isArray bool) (arrow.Da
Nullable: true, Nullable: true,
Metadata: arrow.Metadata{}, Metadata: arrow.Metadata{},
}), nil }), nil
case schemapb.DataType_ArrayOfVector:
// VectorArrayToArrowType now returns the element type (e.g., float32)
// We wrap it in a single list to get list<float32> (flattened)
elemType, err := storage.VectorArrayToArrowType(field.GetElementType())
if err != nil {
return nil, err
}
return arrow.ListOfField(arrow.Field{
Name: "item",
Type: elemType,
Nullable: true,
Metadata: arrow.Metadata{},
}), nil
default: default:
return nil, merr.WrapErrParameterInvalidMsg("unsupported data type %v", dataType.String()) return nil, merr.WrapErrParameterInvalidMsg("unsupported data type %v", dataType.String())
} }
@ -293,8 +309,11 @@ func convertToArrowDataType(field *schemapb.FieldSchema, isArray bool) (arrow.Da
// This method is used only by import util and related tests. Returned arrow.Schema // This method is used only by import util and related tests. Returned arrow.Schema
// doesn't include function output fields. // doesn't include function output fields.
func ConvertToArrowSchemaForUT(schema *schemapb.CollectionSchema, useNullType bool) (*arrow.Schema, error) { func ConvertToArrowSchemaForUT(schema *schemapb.CollectionSchema, useNullType bool) (*arrow.Schema, error) {
arrFields := make([]arrow.Field, 0) // Get all fields including struct sub-fields
for _, field := range schema.GetFields() { allFields := typeutil.GetAllFieldSchemas(schema)
arrFields := make([]arrow.Field, 0, len(allFields))
for _, field := range allFields {
if typeutil.IsAutoPKField(field) || field.GetIsFunctionOutput() { if typeutil.IsAutoPKField(field) || field.GetIsFunctionOutput() {
continue continue
} }
@ -321,10 +340,15 @@ func ConvertToArrowSchemaForUT(schema *schemapb.CollectionSchema, useNullType bo
} }
func isSchemaEqual(schema *schemapb.CollectionSchema, arrSchema *arrow.Schema) error { func isSchemaEqual(schema *schemapb.CollectionSchema, arrSchema *arrow.Schema) error {
// Get all fields including struct sub-fields
allFields := typeutil.GetAllFieldSchemas(schema)
arrNameToField := lo.KeyBy(arrSchema.Fields(), func(field arrow.Field) string { arrNameToField := lo.KeyBy(arrSchema.Fields(), func(field arrow.Field) string {
return field.Name return field.Name
}) })
for _, field := range schema.GetFields() {
// Check all fields (including struct sub-fields which are stored as separate columns)
for _, field := range allFields {
// ignore autoPKField and functionOutputField // ignore autoPKField and functionOutputField
if typeutil.IsAutoPKField(field) || field.GetIsFunctionOutput() { if typeutil.IsAutoPKField(field) || field.GetIsFunctionOutput() {
continue continue

View File

@ -109,7 +109,8 @@ func CreateInsertData(schema *schemapb.CollectionSchema, rows int, nullPercent .
if err != nil { if err != nil {
return nil, err return nil, err
} }
for _, f := range schema.GetFields() { allFields := typeutil.GetAllFieldSchemas(schema)
for _, f := range allFields {
if f.GetAutoID() || f.IsFunctionOutput { if f.GetAutoID() || f.IsFunctionOutput {
continue continue
} }
@ -200,6 +201,18 @@ func CreateInsertData(schema *schemapb.CollectionSchema, rows int, nullPercent .
case schemapb.DataType_String, schemapb.DataType_VarChar: case schemapb.DataType_String, schemapb.DataType_VarChar:
insertData.Data[f.FieldID].AppendDataRows(testutils.GenerateArrayOfStringArray(rows)) insertData.Data[f.FieldID].AppendDataRows(testutils.GenerateArrayOfStringArray(rows))
} }
case schemapb.DataType_ArrayOfVector:
dim, err := typeutil.GetDim(f)
if err != nil {
return nil, err
}
switch f.GetElementType() {
case schemapb.DataType_FloatVector:
insertData.Data[f.FieldID].AppendDataRows(testutils.GenerateArrayOfFloatVectorArray(rows, int(dim)))
default:
panic(fmt.Sprintf("unimplemented data type: %s", f.GetElementType().String()))
}
default: default:
panic(fmt.Sprintf("unsupported data type: %s", f.GetDataType().String())) panic(fmt.Sprintf("unsupported data type: %s", f.GetDataType().String()))
} }
@ -401,11 +414,15 @@ func BuildSparseVectorData(mem *memory.GoAllocator, contents [][]byte, arrowType
func BuildArrayData(schema *schemapb.CollectionSchema, insertData *storage.InsertData, useNullType bool) ([]arrow.Array, error) { func BuildArrayData(schema *schemapb.CollectionSchema, insertData *storage.InsertData, useNullType bool) ([]arrow.Array, error) {
mem := memory.NewGoAllocator() mem := memory.NewGoAllocator()
columns := make([]arrow.Array, 0, len(schema.Fields)) // Get all fields including struct sub-fields
for _, field := range schema.Fields { allFields := typeutil.GetAllFieldSchemas(schema)
if field.GetIsPrimaryKey() && field.GetAutoID() || field.GetIsFunctionOutput() { // Filter out auto-generated and function output fields
continue fields := lo.Filter(allFields, func(field *schemapb.FieldSchema, _ int) bool {
} return !(field.GetIsPrimaryKey() && field.GetAutoID()) && !field.GetIsFunctionOutput()
})
columns := make([]arrow.Array, 0, len(fields))
for _, field := range fields {
fieldID := field.GetFieldID() fieldID := field.GetFieldID()
dataType := field.GetDataType() dataType := field.GetDataType()
elementType := field.GetElementType() elementType := field.GetElementType()
@ -705,22 +722,264 @@ func BuildArrayData(schema *schemapb.CollectionSchema, insertData *storage.Inser
builder.AppendValues(offsets, valid) builder.AppendValues(offsets, valid)
columns = append(columns, builder.NewListArray()) columns = append(columns, builder.NewListArray())
} }
case schemapb.DataType_ArrayOfVector:
data := insertData.Data[fieldID].(*storage.VectorArrayFieldData).Data
rows := len(data)
switch elementType {
case schemapb.DataType_FloatVector:
// ArrayOfVector is flattened in Arrow - just a list of floats
// where total floats = dim * num_vectors
builder := array.NewListBuilder(mem, &arrow.Float32Type{})
valueBuilder := builder.ValueBuilder().(*array.Float32Builder)
for i := 0; i < rows; i++ {
vectorArray := data[i].GetFloatVector()
if vectorArray == nil || len(vectorArray.GetData()) == 0 {
builder.AppendNull()
continue
}
builder.Append(true)
// Append all flattened vector data
valueBuilder.AppendValues(vectorArray.GetData(), nil)
}
columns = append(columns, builder.NewListArray())
default:
return nil, fmt.Errorf("unsupported element type in VectorArray: %s", elementType.String())
}
} }
} }
return columns, nil return columns, nil
} }
// reconstructStructArrayForJSON reconstructs struct array data for JSON format
// Returns an array of maps where each element represents a struct
func reconstructStructArrayForJSON(structField *schemapb.StructArrayFieldSchema, insertData *storage.InsertData, rowIndex int) ([]map[string]any, error) {
subFields := structField.GetFields()
if len(subFields) == 0 {
return []map[string]any{}, nil
}
// Determine the array length from the first sub-field's data
var arrayLen int
for _, subField := range subFields {
if fieldData, ok := insertData.Data[subField.GetFieldID()]; ok {
rowData := fieldData.GetRow(rowIndex)
if rowData == nil {
continue
}
switch subField.GetDataType() {
case schemapb.DataType_Array:
if scalarField, ok := rowData.(*schemapb.ScalarField); ok {
switch subField.GetElementType() {
case schemapb.DataType_Bool:
if data := scalarField.GetBoolData(); data != nil {
arrayLen = len(data.GetData())
}
case schemapb.DataType_Int8, schemapb.DataType_Int16, schemapb.DataType_Int32:
if data := scalarField.GetIntData(); data != nil {
arrayLen = len(data.GetData())
}
case schemapb.DataType_Int64:
if data := scalarField.GetLongData(); data != nil {
arrayLen = len(data.GetData())
}
case schemapb.DataType_Float:
if data := scalarField.GetFloatData(); data != nil {
arrayLen = len(data.GetData())
}
case schemapb.DataType_Double:
if data := scalarField.GetDoubleData(); data != nil {
arrayLen = len(data.GetData())
}
case schemapb.DataType_String, schemapb.DataType_VarChar:
if data := scalarField.GetStringData(); data != nil {
arrayLen = len(data.GetData())
}
}
}
case schemapb.DataType_ArrayOfVector:
if vectorField, ok := rowData.(*schemapb.VectorField); ok {
switch subField.GetElementType() {
case schemapb.DataType_FloatVector:
if data := vectorField.GetFloatVector(); data != nil {
dim, _ := typeutil.GetDim(subField)
if dim > 0 {
arrayLen = len(data.GetData()) / int(dim)
}
}
case schemapb.DataType_BinaryVector:
if data := vectorField.GetBinaryVector(); data != nil {
dim, _ := typeutil.GetDim(subField)
if dim > 0 {
bytesPerVector := int(dim) / 8
arrayLen = len(data) / bytesPerVector
}
}
case schemapb.DataType_Float16Vector:
if data := vectorField.GetFloat16Vector(); data != nil {
dim, _ := typeutil.GetDim(subField)
if dim > 0 {
bytesPerVector := int(dim) * 2
arrayLen = len(data) / bytesPerVector
}
}
case schemapb.DataType_BFloat16Vector:
if data := vectorField.GetBfloat16Vector(); data != nil {
dim, _ := typeutil.GetDim(subField)
if dim > 0 {
bytesPerVector := int(dim) * 2
arrayLen = len(data) / bytesPerVector
}
}
}
}
}
if arrayLen > 0 {
break
}
}
}
// Build the struct array
structArray := make([]map[string]any, arrayLen)
for j := 0; j < arrayLen; j++ {
structElem := make(map[string]any)
for _, subField := range subFields {
if fieldData, ok := insertData.Data[subField.GetFieldID()]; ok {
rowData := fieldData.GetRow(rowIndex)
if rowData == nil {
continue
}
// Extract the j-th element
switch subField.GetDataType() {
case schemapb.DataType_Array:
if scalarField, ok := rowData.(*schemapb.ScalarField); ok {
switch subField.GetElementType() {
case schemapb.DataType_Bool:
if data := scalarField.GetBoolData(); data != nil && j < len(data.GetData()) {
structElem[subField.GetName()] = data.GetData()[j]
}
case schemapb.DataType_Int8, schemapb.DataType_Int16, schemapb.DataType_Int32:
if data := scalarField.GetIntData(); data != nil && j < len(data.GetData()) {
structElem[subField.GetName()] = data.GetData()[j]
}
case schemapb.DataType_Int64:
if data := scalarField.GetLongData(); data != nil && j < len(data.GetData()) {
structElem[subField.GetName()] = data.GetData()[j]
}
case schemapb.DataType_Float:
if data := scalarField.GetFloatData(); data != nil && j < len(data.GetData()) {
structElem[subField.GetName()] = data.GetData()[j]
}
case schemapb.DataType_Double:
if data := scalarField.GetDoubleData(); data != nil && j < len(data.GetData()) {
structElem[subField.GetName()] = data.GetData()[j]
}
case schemapb.DataType_String, schemapb.DataType_VarChar:
if data := scalarField.GetStringData(); data != nil && j < len(data.GetData()) {
structElem[subField.GetName()] = data.GetData()[j]
}
}
}
case schemapb.DataType_ArrayOfVector:
if vectorField, ok := rowData.(*schemapb.VectorField); ok {
switch subField.GetElementType() {
case schemapb.DataType_FloatVector:
if data := vectorField.GetFloatVector(); data != nil {
dim, _ := typeutil.GetDim(subField)
if dim > 0 {
startIdx := j * int(dim)
endIdx := startIdx + int(dim)
if endIdx <= len(data.GetData()) {
structElem[subField.GetName()] = data.GetData()[startIdx:endIdx]
}
}
}
case schemapb.DataType_BinaryVector:
if data := vectorField.GetBinaryVector(); data != nil {
dim, _ := typeutil.GetDim(subField)
if dim > 0 {
bytesPerVector := int(dim) / 8
startIdx := j * bytesPerVector
endIdx := startIdx + bytesPerVector
if endIdx <= len(data) {
structElem[subField.GetName()] = data[startIdx:endIdx]
}
}
}
case schemapb.DataType_Float16Vector:
if data := vectorField.GetFloat16Vector(); data != nil {
dim, _ := typeutil.GetDim(subField)
if dim > 0 {
bytesPerVector := int(dim) * 2
startIdx := j * bytesPerVector
endIdx := startIdx + bytesPerVector
if endIdx <= len(data) {
// Convert Float16 bytes to float32 for JSON representation
structElem[subField.GetName()] = typeutil.Float16BytesToFloat32Vector(data[startIdx:endIdx])
}
}
}
case schemapb.DataType_BFloat16Vector:
if data := vectorField.GetBfloat16Vector(); data != nil {
dim, _ := typeutil.GetDim(subField)
if dim > 0 {
bytesPerVector := int(dim) * 2
startIdx := j * bytesPerVector
endIdx := startIdx + bytesPerVector
if endIdx <= len(data) {
// Convert BFloat16 bytes to float32 for JSON representation
structElem[subField.GetName()] = typeutil.BFloat16BytesToFloat32Vector(data[startIdx:endIdx])
}
}
}
}
}
}
}
}
structArray[j] = structElem
}
return structArray, nil
}
func CreateInsertDataRowsForJSON(schema *schemapb.CollectionSchema, insertData *storage.InsertData) ([]map[string]any, error) { func CreateInsertDataRowsForJSON(schema *schemapb.CollectionSchema, insertData *storage.InsertData) ([]map[string]any, error) {
fieldIDToField := lo.KeyBy(schema.GetFields(), func(field *schemapb.FieldSchema) int64 { fieldIDToField := lo.KeyBy(schema.GetFields(), func(field *schemapb.FieldSchema) int64 {
return field.GetFieldID() return field.GetFieldID()
}) })
// Track which field IDs belong to struct array sub-fields
structSubFieldIDs := make(map[int64]bool)
for _, structField := range schema.GetStructArrayFields() {
for _, subField := range structField.GetFields() {
structSubFieldIDs[subField.GetFieldID()] = true
}
}
rowNum := insertData.GetRowNum() rowNum := insertData.GetRowNum()
rows := make([]map[string]any, 0, rowNum) rows := make([]map[string]any, 0, rowNum)
for i := 0; i < rowNum; i++ { for i := 0; i < rowNum; i++ {
data := make(map[int64]interface{}) data := make(map[int64]interface{})
// First process regular fields
for fieldID, v := range insertData.Data { for fieldID, v := range insertData.Data {
field := fieldIDToField[fieldID] // Skip if this is a sub-field of a struct array
if structSubFieldIDs[fieldID] {
continue
}
field, ok := fieldIDToField[fieldID]
if !ok {
continue
}
dataType := field.GetDataType() dataType := field.GetDataType()
elemType := field.GetElementType() elemType := field.GetElementType()
if field.GetAutoID() || field.IsFunctionOutput { if field.GetAutoID() || field.IsFunctionOutput {
@ -746,6 +1005,8 @@ func CreateInsertDataRowsForJSON(schema *schemapb.CollectionSchema, insertData *
case schemapb.DataType_String, schemapb.DataType_VarChar: case schemapb.DataType_String, schemapb.DataType_VarChar:
data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetStringData().GetData() data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetStringData().GetData()
} }
case schemapb.DataType_ArrayOfVector:
panic("unreachable")
case schemapb.DataType_JSON: case schemapb.DataType_JSON:
data[fieldID] = string(v.GetRow(i).([]byte)) data[fieldID] = string(v.GetRow(i).([]byte))
case schemapb.DataType_BinaryVector: case schemapb.DataType_BinaryVector:
@ -768,33 +1029,117 @@ func CreateInsertDataRowsForJSON(schema *schemapb.CollectionSchema, insertData *
data[fieldID] = v.GetRow(i) data[fieldID] = v.GetRow(i)
} }
} }
row := lo.MapKeys(data, func(_ any, fieldID int64) string {
return fieldIDToField[fieldID].GetName() // Now process struct array fields - reconstruct the nested structure
}) for _, structField := range schema.GetStructArrayFields() {
structArray, err := reconstructStructArrayForJSON(structField, insertData, i)
if err != nil {
return nil, err
}
data[structField.GetFieldID()] = structArray
}
// Convert field IDs to field names
row := make(map[string]any)
for fieldID, value := range data {
if field, ok := fieldIDToField[fieldID]; ok {
row[field.GetName()] = value
} else {
// Check if it's a struct array field
for _, structField := range schema.GetStructArrayFields() {
if structField.GetFieldID() == fieldID {
row[structField.GetName()] = value
break
}
}
}
}
rows = append(rows, row) rows = append(rows, row)
} }
return rows, nil return rows, nil
} }
// reconstructStructArrayForCSV reconstructs struct array data for CSV format
// Returns a JSON string where each sub-field value is also a JSON string
func reconstructStructArrayForCSV(structField *schemapb.StructArrayFieldSchema, insertData *storage.InsertData, rowIndex int) (string, error) {
// Use the JSON reconstruction function to get the struct array
structArray, err := reconstructStructArrayForJSON(structField, insertData, rowIndex)
if err != nil {
return "", err
}
// Convert to CSV format: each sub-field value needs to be JSON-encoded
csvArray := make([]map[string]string, len(structArray))
for i, elem := range structArray {
csvElem := make(map[string]string)
for key, value := range elem {
// Convert each value to JSON string for CSV
jsonBytes, err := json.Marshal(value)
if err != nil {
return "", err
}
csvElem[key] = string(jsonBytes)
}
csvArray[i] = csvElem
}
// Convert the entire struct array to JSON string
jsonBytes, err := json.Marshal(csvArray)
if err != nil {
return "", err
}
return string(jsonBytes), nil
}
func CreateInsertDataForCSV(schema *schemapb.CollectionSchema, insertData *storage.InsertData, nullkey string) ([][]string, error) { func CreateInsertDataForCSV(schema *schemapb.CollectionSchema, insertData *storage.InsertData, nullkey string) ([][]string, error) {
rowNum := insertData.GetRowNum() rowNum := insertData.GetRowNum()
csvData := make([][]string, 0, rowNum+1) csvData := make([][]string, 0, rowNum+1)
// Build header - regular fields and struct array fields (not sub-fields)
header := make([]string, 0) header := make([]string, 0)
fields := lo.Filter(schema.GetFields(), func(field *schemapb.FieldSchema, _ int) bool {
return !field.GetAutoID() && !field.IsFunctionOutput // Track which field IDs belong to struct array sub-fields
structSubFieldIDs := make(map[int64]bool)
for _, structField := range schema.GetStructArrayFields() {
for _, subField := range structField.GetFields() {
structSubFieldIDs[subField.GetFieldID()] = true
}
}
// Add regular fields to header (excluding struct array sub-fields)
allFields := typeutil.GetAllFieldSchemas(schema)
fields := lo.Filter(allFields, func(field *schemapb.FieldSchema, _ int) bool {
return !field.GetAutoID() && !field.IsFunctionOutput && !structSubFieldIDs[field.GetFieldID()]
}) })
nameToFields := lo.KeyBy(fields, func(field *schemapb.FieldSchema) string { nameToFields := lo.KeyBy(fields, func(field *schemapb.FieldSchema) string {
name := field.GetName() name := field.GetName()
header = append(header, name) header = append(header, name)
return name return name
}) })
// Build map for struct array fields for quick lookup
structArrayFields := make(map[string]*schemapb.StructArrayFieldSchema)
for _, structField := range schema.GetStructArrayFields() {
structArrayFields[structField.GetName()] = structField
header = append(header, structField.GetName())
}
csvData = append(csvData, header) csvData = append(csvData, header)
for i := 0; i < rowNum; i++ { for i := 0; i < rowNum; i++ {
data := make([]string, 0) data := make([]string, 0)
for _, name := range header { for _, name := range header {
if structArrayField, ok := structArrayFields[name]; ok {
structArrayData, err := reconstructStructArrayForCSV(structArrayField, insertData, i)
if err != nil {
return nil, err
}
data = append(data, structArrayData)
continue
}
// Handle regular field
field := nameToFields[name] field := nameToFields[name]
value := insertData.Data[field.FieldID] value := insertData.Data[field.FieldID]
dataType := field.GetDataType() dataType := field.GetDataType()
@ -877,6 +1222,10 @@ func CreateInsertDataForCSV(schema *schemapb.CollectionSchema, insertData *stora
return nil, err return nil, err
} }
data = append(data, string(j)) data = append(data, string(j))
case schemapb.DataType_ArrayOfVector:
// ArrayOfVector should not appear as a top-level field
// It can only be a sub-field in struct arrays
panic("ArrayOfVector cannot be a top-level field")
default: default:
str := fmt.Sprintf("%v", value.GetRow(i)) str := fmt.Sprintf("%v", value.GetRow(i))
data = append(data, str) data = append(data, str)

View File

@ -32,8 +32,8 @@ func GetMaxLength(field *schemapb.FieldSchema) (int64, error) {
// GetMaxCapacity get max capacity of array field. Maybe also helpful outside. // GetMaxCapacity get max capacity of array field. Maybe also helpful outside.
func GetMaxCapacity(field *schemapb.FieldSchema) (int64, error) { func GetMaxCapacity(field *schemapb.FieldSchema) (int64, error) {
if !typeutil.IsArrayType(field.GetDataType()) { if !typeutil.IsArrayType(field.GetDataType()) && !typeutil.IsVectorArrayType(field.GetDataType()) {
msg := fmt.Sprintf("%s is not of array type", field.GetDataType()) msg := fmt.Sprintf("%s is not of array/vector array type", field.GetDataType())
return 0, merr.WrapErrParameterInvalid(schemapb.DataType_Array, field.GetDataType(), msg) return 0, merr.WrapErrParameterInvalid(schemapb.DataType_Array, field.GetDataType(), msg)
} }
h := typeutil.NewKvPairs(append(field.GetIndexParams(), field.GetTypeParams()...)) h := typeutil.NewKvPairs(append(field.GetIndexParams(), field.GetTypeParams()...))

View File

@ -104,7 +104,8 @@ func EstimateAvgSizePerRecord(schema *schemapb.CollectionSchema) (int, error) {
func estimateSizeBy(schema *schemapb.CollectionSchema, policy getVariableFieldLengthPolicy) (int, error) { func estimateSizeBy(schema *schemapb.CollectionSchema, policy getVariableFieldLengthPolicy) (int, error) {
res := 0 res := 0
for _, fs := range schema.Fields { allFields := GetAllFieldSchemas(schema)
for _, fs := range allFields {
switch fs.DataType { switch fs.DataType {
case schemapb.DataType_Bool, schemapb.DataType_Int8: case schemapb.DataType_Bool, schemapb.DataType_Int8:
res++ res++
@ -170,6 +171,31 @@ func estimateSizeBy(schema *schemapb.CollectionSchema, policy getVariableFieldLe
break break
} }
} }
case schemapb.DataType_ArrayOfVector:
dim := 0
for _, kv := range fs.TypeParams {
if kv.Key == common.DimKey {
v, err := strconv.Atoi(kv.Value)
if err != nil {
return -1, err
}
dim = v
}
}
assumedArrayLen := 10
// Estimate size based on element type
switch fs.ElementType {
case schemapb.DataType_FloatVector:
res += assumedArrayLen * dim * 4
case schemapb.DataType_BinaryVector:
res += assumedArrayLen * (dim / 8)
case schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector:
res += assumedArrayLen * dim * 2
case schemapb.DataType_Int8Vector:
res += assumedArrayLen * dim
default:
return 0, fmt.Errorf("unsupported element type in VectorArray: %s", fs.ElementType.String())
}
} }
} }
return res, nil return res, nil

View File

@ -63,7 +63,7 @@ func (s *BulkInsertSuite) PrepareSourceCollection(dim int, dmlGroup *DMLGroup) *
collectionName := "TestBinlogImport_A_" + funcutil.GenRandomStr() collectionName := "TestBinlogImport_A_" + funcutil.GenRandomStr()
schema := integration.ConstructSchema(collectionName, dim, true) schema := integration.ConstructSchemaOfVecDataTypeWithStruct(collectionName, dim, true)
marshaledSchema, err := proto.Marshal(schema) marshaledSchema, err := proto.Marshal(schema)
s.NoError(err) s.NoError(err)
@ -96,6 +96,16 @@ func (s *BulkInsertSuite) PrepareSourceCollection(dim int, dmlGroup *DMLGroup) *
s.NoError(merr.CheckRPCCall(createIndexStatus, err)) s.NoError(merr.CheckRPCCall(createIndexStatus, err))
s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField) s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField)
createIndexResult, err := c.MilvusClient.CreateIndex(context.TODO(), &milvuspb.CreateIndexRequest{
CollectionName: collectionName,
FieldName: integration.StructSubFloatVecField,
IndexName: "array_of_vector_index",
ExtraParams: integration.ConstructIndexParam(dim, integration.IndexEmbListHNSW, metric.MaxSim),
})
s.NoError(err)
s.Require().Equal(createIndexResult.GetErrorCode(), commonpb.ErrorCode_Success)
s.WaitForIndexBuilt(context.TODO(), collectionName, integration.StructSubFloatVecField)
// load // load
loadStatus, err := c.MilvusClient.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ loadStatus, err := c.MilvusClient.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
CollectionName: collectionName, CollectionName: collectionName,
@ -123,10 +133,11 @@ func (s *BulkInsertSuite) PrepareSourceCollection(dim int, dmlGroup *DMLGroup) *
totalDeleteRowNum += delRow totalDeleteRowNum += delRow
fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, insRow, dim) fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, insRow, dim)
structColumn := integration.NewStructArrayFieldData(schema.StructArrayFields[0], integration.StructArrayField, insRow, dim)
hashKeys := integration.GenerateHashKeys(insRow) hashKeys := integration.GenerateHashKeys(insRow)
insertResult, err := c.MilvusClient.Insert(ctx, &milvuspb.InsertRequest{ insertResult, err := c.MilvusClient.Insert(ctx, &milvuspb.InsertRequest{
CollectionName: collectionName, CollectionName: collectionName,
FieldsData: []*schemapb.FieldData{fVecColumn}, FieldsData: []*schemapb.FieldData{fVecColumn, structColumn},
HashKeys: hashKeys, HashKeys: hashKeys,
NumRows: uint32(insRow), NumRows: uint32(insRow),
}) })

View File

@ -0,0 +1,306 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package importv2
import (
"context"
"fmt"
"strconv"
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/util/importutilv2"
"github.com/milvus-io/milvus/internal/util/testutil"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/internalpb"
"github.com/milvus-io/milvus/pkg/v2/util/funcutil"
"github.com/milvus-io/milvus/pkg/v2/util/metric"
"github.com/milvus-io/milvus/tests/integration"
)
func TestGenerateJsonFileWithVectorArray(t *testing.T) {
const (
rowCount = 100
dim = 32
maxArrayCapacity = 10
)
collectionName := "TestBulkInsert_VectorArray_" + funcutil.GenRandomStr()
// Create schema with StructArrayField containing vector array
schema := integration.ConstructSchema(collectionName, 0, true, &schemapb.FieldSchema{
FieldID: 100,
Name: integration.Int64Field,
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
AutoID: false,
}, &schemapb.FieldSchema{
FieldID: 101,
Name: integration.VarCharField,
DataType: schemapb.DataType_VarChar,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.MaxLengthKey,
Value: "256",
},
},
})
// Add StructArrayField with vector array
structField := &schemapb.StructArrayFieldSchema{
FieldID: 102,
Name: "struct_with_vector_array",
Fields: []*schemapb.FieldSchema{
{
FieldID: 103,
Name: "vector_array_field",
IsPrimaryKey: false,
DataType: schemapb.DataType_ArrayOfVector,
ElementType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: strconv.Itoa(dim),
},
{
Key: common.MaxCapacityKey,
Value: strconv.Itoa(maxArrayCapacity),
},
},
},
{
FieldID: 104,
Name: "scalar_array_field",
IsPrimaryKey: false,
DataType: schemapb.DataType_Array,
ElementType: schemapb.DataType_Int32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.MaxCapacityKey,
Value: strconv.Itoa(maxArrayCapacity),
},
},
},
},
}
schema.StructArrayFields = []*schemapb.StructArrayFieldSchema{structField}
schema.EnableDynamicField = false
insertData, err := testutil.CreateInsertData(schema, rowCount)
assert.NoError(t, err)
rows, err := testutil.CreateInsertDataRowsForJSON(schema, insertData)
assert.NoError(t, err)
fmt.Println(rows)
}
func (s *BulkInsertSuite) runForStructArray() {
const (
rowCount = 100
dim = 32
maxArrayCapacity = 10
)
c := s.Cluster
ctx, cancel := context.WithTimeout(c.GetContext(), 600*time.Second)
defer cancel()
collectionName := "TestBulkInsert_VectorArray_" + funcutil.GenRandomStr()
// Create schema with StructArrayField containing vector array
schema := integration.ConstructSchema(collectionName, 0, true, &schemapb.FieldSchema{
FieldID: 100,
Name: integration.Int64Field,
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
AutoID: false,
}, &schemapb.FieldSchema{
FieldID: 101,
Name: integration.VarCharField,
DataType: schemapb.DataType_VarChar,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.MaxLengthKey,
Value: "256",
},
},
})
// Add StructArrayField with vector array
structField := &schemapb.StructArrayFieldSchema{
FieldID: 102,
Name: "struct_with_vector_array",
Fields: []*schemapb.FieldSchema{
{
FieldID: 103,
Name: "vector_array_field",
IsPrimaryKey: false,
DataType: schemapb.DataType_ArrayOfVector,
ElementType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: strconv.Itoa(dim),
},
{
Key: common.MaxCapacityKey,
Value: strconv.Itoa(maxArrayCapacity),
},
},
},
{
FieldID: 104,
Name: "scalar_array_field",
IsPrimaryKey: false,
DataType: schemapb.DataType_Array,
ElementType: schemapb.DataType_Int32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.MaxCapacityKey,
Value: strconv.Itoa(maxArrayCapacity),
},
},
},
},
}
schema.StructArrayFields = []*schemapb.StructArrayFieldSchema{structField}
schema.EnableDynamicField = false
marshaledSchema, err := proto.Marshal(schema)
s.NoError(err)
createCollectionStatus, err := c.MilvusClient.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{
DbName: "",
CollectionName: collectionName,
Schema: marshaledSchema,
ShardsNum: common.DefaultShardsNum,
})
s.NoError(err)
s.Equal(int32(0), createCollectionStatus.GetCode())
var files []*internalpb.ImportFile
options := []*commonpb.KeyValuePair{}
switch s.fileType {
case importutilv2.JSON:
rowBasedFile := GenerateJSONFile(s.T(), c, schema, rowCount)
files = []*internalpb.ImportFile{
{
Paths: []string{
rowBasedFile,
},
},
}
case importutilv2.Parquet:
filePath, err := GenerateParquetFile(s.Cluster, schema, rowCount)
s.NoError(err)
files = []*internalpb.ImportFile{
{
Paths: []string{
filePath,
},
},
}
case importutilv2.CSV:
filePath, sep := GenerateCSVFile(s.T(), s.Cluster, schema, rowCount)
options = []*commonpb.KeyValuePair{{Key: "sep", Value: string(sep)}}
s.NoError(err)
files = []*internalpb.ImportFile{
{
Paths: []string{
filePath,
},
},
}
}
// Import data
importResp, err := c.ProxyClient.ImportV2(ctx, &internalpb.ImportRequest{
CollectionName: collectionName,
Files: files,
Options: options,
})
s.NoError(err)
s.NotNil(importResp)
s.Equal(int32(0), importResp.GetStatus().GetCode())
log.Info("Import response", zap.Any("resp", importResp))
jobID := importResp.GetJobID()
// Wait for import to complete
err = WaitForImportDone(ctx, s.Cluster, jobID)
s.NoError(err)
// Create index for vector array field
createIndexStatus, err := c.MilvusClient.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
CollectionName: collectionName,
FieldName: "vector_array_field",
IndexName: "_default_idx",
ExtraParams: integration.ConstructIndexParam(dim, s.indexType, s.metricType),
})
if err == nil {
s.Equal(int32(0), createIndexStatus.GetCode(), createIndexStatus.GetReason())
}
// Load collection
loadStatus, err := c.MilvusClient.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
CollectionName: collectionName,
})
s.NoError(err)
s.Equal(int32(0), loadStatus.GetCode(), loadStatus.GetReason())
s.WaitForLoad(ctx, collectionName)
// search
nq := 10
topk := 10
outputFields := []string{"vector_array_field"}
params := integration.GetSearchParams(s.indexType, s.metricType)
searchReq := integration.ConstructEmbeddingListSearchRequest("", collectionName, "",
"vector_array_field", s.vecType, outputFields, s.metricType, params, nq, dim, topk, -1)
searchResp, err := s.Cluster.MilvusClient.Search(ctx, searchReq)
s.Require().NoError(err)
s.Require().Equal(commonpb.ErrorCode_Success, searchResp.GetStatus().GetErrorCode(), searchResp.GetStatus().GetReason())
result := searchResp.GetResults()
s.Require().Len(result.GetIds().GetIntId().GetData(), nq*topk)
s.Require().Len(result.GetScores(), nq*topk)
s.Require().GreaterOrEqual(len(result.GetFieldsData()), 1)
s.Require().EqualValues(nq, result.GetNumQueries())
s.Require().EqualValues(topk, result.GetTopK())
}
func (s *BulkInsertSuite) TestImportWithVectorArray() {
fileTypeArr := []importutilv2.FileType{importutilv2.CSV, importutilv2.Parquet, importutilv2.JSON}
for _, fileType := range fileTypeArr {
s.fileType = fileType
s.vecType = schemapb.DataType_FloatVector
s.indexType = integration.IndexEmbListHNSW
s.metricType = metric.MaxSim
s.runForStructArray()
}
}