mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
fix: csv/json import with STRUCT adapts concatenated struct name (#45000)
After https://github.com/milvus-io/milvus/pull/44557, the field name in STRUCT field becomes STRUCT_NAME[FIELD_NAME] This PR make import consider the change. issue: https://github.com/milvus-io/milvus/issues/45006 ref: https://github.com/milvus-io/milvus/issues/42148 TODO: parquet is much more complex than csv/json, and I will leave it to a separate PR. --------- Signed-off-by: SpadeA <tangchenjie1210@gmail.com>
This commit is contained in:
parent
6494c75d31
commit
d8591f9548
@ -1751,7 +1751,7 @@ func TestProxy(t *testing.T) {
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
|
||||
})
|
||||
|
||||
fieldName := ConcatStructFieldName(structField, subFieldFVec)
|
||||
fieldName := typeutil.ConcatStructFieldName(structField, subFieldFVec)
|
||||
wg.Add(1)
|
||||
t.Run("create index for embedding list field", func(t *testing.T) {
|
||||
defer wg.Done()
|
||||
|
||||
@ -83,10 +83,6 @@ const (
|
||||
|
||||
var logger = log.L().WithOptions(zap.Fields(zap.String("role", typeutil.ProxyRole)))
|
||||
|
||||
func ConcatStructFieldName(structName string, fieldName string) string {
|
||||
return fmt.Sprintf("%s[%s]", structName, fieldName)
|
||||
}
|
||||
|
||||
// transformStructFieldNames transforms struct field names to structName[fieldName] format
|
||||
// This ensures global uniqueness while allowing same field names across different structs
|
||||
func transformStructFieldNames(schema *schemapb.CollectionSchema) error {
|
||||
@ -94,7 +90,7 @@ func transformStructFieldNames(schema *schemapb.CollectionSchema) error {
|
||||
structName := structArrayField.Name
|
||||
for _, field := range structArrayField.Fields {
|
||||
// Create transformed name: structName[fieldName]
|
||||
newName := ConcatStructFieldName(structName, field.Name)
|
||||
newName := typeutil.ConcatStructFieldName(structName, field.Name)
|
||||
field.Name = newName
|
||||
}
|
||||
}
|
||||
@ -1835,7 +1831,7 @@ func checkAndFlattenStructFieldData(schema *schemapb.CollectionSchema, insertMsg
|
||||
structName, expectedArrayLen, currentArrayLen, subField.FieldName)
|
||||
}
|
||||
|
||||
transformedFieldName := ConcatStructFieldName(structName, subField.FieldName)
|
||||
transformedFieldName := typeutil.ConcatStructFieldName(structName, subField.FieldName)
|
||||
subFieldCopy := &schemapb.FieldData{
|
||||
FieldName: transformedFieldName,
|
||||
FieldId: subField.FieldId,
|
||||
|
||||
@ -159,7 +159,7 @@ func NewRowParser(schema *schemapb.CollectionSchema, header []string, nullkey st
|
||||
// we reconstruct it to be handled by handleField as:
|
||||
//
|
||||
// {"sub-field1": "[1, 2]", "sub-field2": "[[1.0, 2.0], [3.0, 4.0]]"}
|
||||
func (r *rowParser) reconstructArrayForStructArray(subFieldsMap map[string]*schemapb.FieldSchema, raw string) (map[string]string, error) {
|
||||
func (r *rowParser) reconstructArrayForStructArray(structName string, subFieldsMap map[string]*schemapb.FieldSchema, raw string) (map[string]string, error) {
|
||||
// Parse the JSON array string
|
||||
var rows []any
|
||||
dec := json.NewDecoder(strings.NewReader(raw))
|
||||
@ -175,20 +175,21 @@ func (r *rowParser) reconstructArrayForStructArray(subFieldsMap map[string]*sche
|
||||
return nil, merr.WrapErrImportFailed(fmt.Sprintf("invalid element in StructArray, expect map[string]any but got type %T", elem))
|
||||
}
|
||||
for key, value := range row {
|
||||
field, ok := subFieldsMap[key]
|
||||
fieldName := typeutil.ConcatStructFieldName(structName, key)
|
||||
field, ok := subFieldsMap[fieldName]
|
||||
if !ok {
|
||||
return nil, merr.WrapErrImportFailed(fmt.Sprintf("field %s not found", key))
|
||||
return nil, merr.WrapErrImportFailed(fmt.Sprintf("field %s not found", fieldName))
|
||||
}
|
||||
strVal, ok := value.(string)
|
||||
if !ok {
|
||||
return nil, merr.WrapErrImportFailed(fmt.Sprintf("invalid value type for field %s, expect string but got %T", key, value))
|
||||
return nil, merr.WrapErrImportFailed(fmt.Sprintf("invalid value type for field %s, expect string but got %T", fieldName, value))
|
||||
}
|
||||
|
||||
data, err := r.parseEntity(field, strVal, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
buf[key] = append(buf[key], data)
|
||||
buf[fieldName] = append(buf[fieldName], data)
|
||||
}
|
||||
}
|
||||
|
||||
@ -215,7 +216,7 @@ func (r *rowParser) Parse(strArr []string) (Row, error) {
|
||||
// read values from csv file
|
||||
for index, value := range strArr {
|
||||
if subFieldsMap, ok := r.structArrays[r.header[index]]; ok {
|
||||
values, err := r.reconstructArrayForStructArray(subFieldsMap, value)
|
||||
values, err := r.reconstructArrayForStructArray(r.header[index], subFieldsMap, value)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -75,7 +75,7 @@ func (suite *RowParserSuite) createAllTypesSchema() *schemapb.CollectionSchema {
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
FieldID: 111,
|
||||
Name: "sub_float_vector",
|
||||
Name: "struct_array[sub_float_vector]",
|
||||
DataType: schemapb.DataType_ArrayOfVector,
|
||||
ElementType: schemapb.DataType_FloatVector,
|
||||
TypeParams: []*commonpb.KeyValuePair{
|
||||
@ -91,7 +91,7 @@ func (suite *RowParserSuite) createAllTypesSchema() *schemapb.CollectionSchema {
|
||||
},
|
||||
{
|
||||
FieldID: 112,
|
||||
Name: "sub_str",
|
||||
Name: "struct_array[sub_str]",
|
||||
DataType: schemapb.DataType_Array,
|
||||
ElementType: schemapb.DataType_VarChar,
|
||||
TypeParams: []*commonpb.KeyValuePair{
|
||||
@ -649,19 +649,21 @@ func (suite *RowParserSuite) runValid(c *testCase) {
|
||||
|
||||
// For each sub-field in the struct array
|
||||
for _, subField := range structArray.GetFields() {
|
||||
subFieldName := subField.GetName()
|
||||
originalSubFieldName := subFieldName[len(structArray.GetName())+1 : len(subFieldName)-1]
|
||||
val, ok := row[subField.GetFieldID()]
|
||||
suite.True(ok, "Sub-field %s should exist in row", subField.GetName())
|
||||
suite.True(ok, "Sub-field %s should exist in row", subFieldName)
|
||||
|
||||
// Validate based on sub-field type
|
||||
switch subField.GetDataType() {
|
||||
case schemapb.DataType_ArrayOfVector:
|
||||
vf, ok := val.(*schemapb.VectorField)
|
||||
suite.True(ok, "Sub-field %s should be a VectorField", subField.GetName())
|
||||
suite.True(ok, "Sub-field %s should be a VectorField", subFieldName)
|
||||
|
||||
// Extract expected vectors from struct array data
|
||||
var expectedVectors [][]float32
|
||||
for _, elem := range structArrayData {
|
||||
if vecStr, ok := elem[subField.GetName()].(string); ok {
|
||||
if vecStr, ok := elem[originalSubFieldName].(string); ok {
|
||||
var vec []float32
|
||||
err := json.Unmarshal([]byte(vecStr), &vec)
|
||||
suite.NoError(err)
|
||||
@ -678,12 +680,12 @@ func (suite *RowParserSuite) runValid(c *testCase) {
|
||||
|
||||
case schemapb.DataType_Array:
|
||||
sf, ok := val.(*schemapb.ScalarField)
|
||||
suite.True(ok, "Sub-field %s should be a ScalarField", subField.GetName())
|
||||
suite.True(ok, "Sub-field %s should be a ScalarField", subFieldName)
|
||||
|
||||
// Extract expected values from struct array data
|
||||
var expectedValues []string
|
||||
for _, elem := range structArrayData {
|
||||
if v, ok := elem[subField.GetName()].(string); ok {
|
||||
if v, ok := elem[originalSubFieldName].(string); ok {
|
||||
expectedValues = append(expectedValues, v)
|
||||
}
|
||||
}
|
||||
|
||||
@ -178,8 +178,20 @@ func (r *rowParser) Parse(raw any) (Row, error) {
|
||||
row := make(Row)
|
||||
dynamicValues := make(map[string]any)
|
||||
|
||||
handleField := func(key string, value any) error {
|
||||
if fieldID, ok := r.name2FieldID[key]; ok {
|
||||
handleField := func(structName string, key string, value any) error {
|
||||
var fieldID int64
|
||||
var found bool
|
||||
|
||||
if structName != "" {
|
||||
// Transform to structName[fieldName] format
|
||||
transformedKey := typeutil.ConcatStructFieldName(structName, key)
|
||||
fieldID, found = r.name2FieldID[transformedKey]
|
||||
} else {
|
||||
// For regular fields, lookup directly
|
||||
fieldID, found = r.name2FieldID[key]
|
||||
}
|
||||
|
||||
if found {
|
||||
data, err := r.parseEntity(fieldID, value)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -215,12 +227,14 @@ func (r *rowParser) Parse(raw any) (Row, error) {
|
||||
}
|
||||
|
||||
for subKey, subValue := range values {
|
||||
if err := handleField(subKey, subValue); err != nil {
|
||||
// Pass struct name for sub-fields
|
||||
if err := handleField(key, subKey, subValue); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if err := handleField(key, value); err != nil {
|
||||
// Pass empty string for regular fields
|
||||
if err := handleField("", key, value); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
@ -66,7 +66,7 @@ func (suite *RowParserSuite) createAllTypesSchema() *schemapb.CollectionSchema {
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
FieldID: 111,
|
||||
Name: "sub_float_vector",
|
||||
Name: "struct_array[sub_float_vector]",
|
||||
DataType: schemapb.DataType_ArrayOfVector,
|
||||
ElementType: schemapb.DataType_FloatVector,
|
||||
TypeParams: []*commonpb.KeyValuePair{
|
||||
@ -82,7 +82,7 @@ func (suite *RowParserSuite) createAllTypesSchema() *schemapb.CollectionSchema {
|
||||
},
|
||||
{
|
||||
FieldID: 112,
|
||||
Name: "sub_str",
|
||||
Name: "struct_array[sub_str]",
|
||||
DataType: schemapb.DataType_Array,
|
||||
ElementType: schemapb.DataType_VarChar,
|
||||
TypeParams: []*commonpb.KeyValuePair{
|
||||
|
||||
@ -2456,3 +2456,9 @@ func IsBm25FunctionInputField(coll *schemapb.CollectionSchema, field *schemapb.F
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// ConcatStructFieldName transforms struct field names to structName[fieldName] format
|
||||
// This ensures global uniqueness while allowing same field names across different structs
|
||||
func ConcatStructFieldName(structName string, fieldName string) string {
|
||||
return fmt.Sprintf("%s[%s]", structName, fieldName)
|
||||
}
|
||||
|
||||
@ -28,11 +28,11 @@ import (
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/proxy"
|
||||
"github.com/milvus-io/milvus/pkg/v2/common"
|
||||
"github.com/milvus-io/milvus/pkg/v2/log"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/metric"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
||||
"github.com/milvus-io/milvus/tests/integration"
|
||||
)
|
||||
|
||||
@ -170,7 +170,7 @@ func (s *ArrayStructDataNodeSuite) loadCollection(collectionName string) {
|
||||
log.Info("=========================Index created for float vector=========================")
|
||||
s.WaitForIndexBuilt(context.TODO(), collectionName, integration.FloatVecField)
|
||||
|
||||
subFieldName := proxy.ConcatStructFieldName(integration.StructArrayField, integration.StructSubFloatVecField)
|
||||
subFieldName := typeutil.ConcatStructFieldName(integration.StructArrayField, integration.StructSubFloatVecField)
|
||||
createIndexResult, err := c.MilvusClient.CreateIndex(context.TODO(), &milvuspb.CreateIndexRequest{
|
||||
DbName: dbName,
|
||||
CollectionName: collectionName,
|
||||
@ -317,7 +317,7 @@ func (s *ArrayStructDataNodeSuite) query(collectionName string) {
|
||||
topk := 10
|
||||
roundDecimal := -1
|
||||
|
||||
subFieldName := proxy.ConcatStructFieldName(integration.StructArrayField, integration.StructSubFloatVecField)
|
||||
subFieldName := typeutil.ConcatStructFieldName(integration.StructArrayField, integration.StructSubFloatVecField)
|
||||
params := integration.GetSearchParams(integration.IndexHNSW, metric.MaxSim)
|
||||
searchReq := integration.ConstructEmbeddingListSearchRequest("", collectionName, expr,
|
||||
subFieldName, schemapb.DataType_FloatVector, []string{integration.StructArrayField}, metric.MaxSim, params, nq, s.dim, topk, roundDecimal)
|
||||
|
||||
@ -27,11 +27,11 @@ import (
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/proxy"
|
||||
"github.com/milvus-io/milvus/pkg/v2/common"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/funcutil"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/metric"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
||||
"github.com/milvus-io/milvus/tests/integration"
|
||||
)
|
||||
|
||||
@ -199,7 +199,7 @@ func (s *TestArrayStructSuite) run() {
|
||||
|
||||
s.WaitForIndexBuiltWithDB(ctx, s.dbName, collection, vecFieldName)
|
||||
|
||||
subFieldName := proxy.ConcatStructFieldName(structFieldName, structSubVecFieldName)
|
||||
subFieldName := typeutil.ConcatStructFieldName(structFieldName, structSubVecFieldName)
|
||||
// create index for struct sub-vector field
|
||||
createIndexResult, err := s.Cluster.MilvusClient.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
|
||||
DbName: s.dbName,
|
||||
|
||||
@ -30,7 +30,6 @@ import (
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/proxy"
|
||||
"github.com/milvus-io/milvus/internal/util/importutilv2"
|
||||
"github.com/milvus-io/milvus/pkg/v2/common"
|
||||
"github.com/milvus-io/milvus/pkg/v2/log"
|
||||
@ -40,6 +39,7 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/metric"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
||||
"github.com/milvus-io/milvus/tests/integration"
|
||||
)
|
||||
|
||||
@ -97,7 +97,7 @@ func (s *BulkInsertSuite) PrepareSourceCollection(dim int, dmlGroup *DMLGroup) *
|
||||
s.NoError(merr.CheckRPCCall(createIndexStatus, err))
|
||||
s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField)
|
||||
|
||||
name := proxy.ConcatStructFieldName(integration.StructArrayField, integration.StructSubFloatVecField)
|
||||
name := typeutil.ConcatStructFieldName(integration.StructArrayField, integration.StructSubFloatVecField)
|
||||
createIndexResult, err := c.MilvusClient.CreateIndex(context.TODO(), &milvuspb.CreateIndexRequest{
|
||||
CollectionName: collectionName,
|
||||
FieldName: name,
|
||||
|
||||
@ -200,9 +200,11 @@ func (s *BulkInsertSuite) runForStructArray() {
|
||||
s.NoError(err)
|
||||
s.Equal(int32(0), createCollectionStatus.GetCode())
|
||||
|
||||
// adjust struct field name
|
||||
schema.StructArrayFields[0].Fields[0].Name = "struct_with_vector_array[vector_array_field]"
|
||||
schema.StructArrayFields[0].Fields[1].Name = "struct_with_vector_array[scalar_array_field]"
|
||||
// Note: when `CreateCollection`, the field name in Struct will be transformed to `structName[fieldName]` format
|
||||
// such as struct_with_vector_array[vector_array_field]. But we use the schema which is not transformed to generate
|
||||
// test data. This is expected because user will not generate data with the transformed field name.
|
||||
schema.StructArrayFields[0].Fields[0].Name = "vector_array_field"
|
||||
schema.StructArrayFields[0].Fields[1].Name = "scalar_array_field"
|
||||
|
||||
var files []*internalpb.ImportFile
|
||||
|
||||
@ -299,7 +301,7 @@ func (s *BulkInsertSuite) runForStructArray() {
|
||||
}
|
||||
|
||||
func (s *BulkInsertSuite) TestImportWithVectorArray() {
|
||||
fileTypeArr := []importutilv2.FileType{importutilv2.CSV, importutilv2.Parquet, importutilv2.JSON}
|
||||
fileTypeArr := []importutilv2.FileType{importutilv2.CSV, importutilv2.JSON}
|
||||
for _, fileType := range fileTypeArr {
|
||||
s.fileType = fileType
|
||||
s.vecType = schemapb.DataType_FloatVector
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user