mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
fix: Fix bulk import with autoid (#44601)
pr: #44604 issue: #44424 --------- Signed-off-by: sunby <sunbingyi1992@gmail.com>
This commit is contained in:
parent
c86d68bea5
commit
9434a3bdaa
@ -176,11 +176,7 @@ func AppendSystemFieldsData(task *ImportTask, data *storage.InsertData, rowNum i
|
|||||||
}
|
}
|
||||||
pkData, ok := data.Data[pkField.GetFieldID()]
|
pkData, ok := data.Data[pkField.GetFieldID()]
|
||||||
allowInsertAutoID, _ := common.IsAllowInsertAutoID(task.req.Schema.GetProperties()...)
|
allowInsertAutoID, _ := common.IsAllowInsertAutoID(task.req.Schema.GetProperties()...)
|
||||||
if pkField.GetAutoID() {
|
if pkField.GetAutoID() && (!ok || pkData == nil || pkData.RowNum() == 0 || !allowInsertAutoID) {
|
||||||
if allowInsertAutoID && ok && pkData != nil {
|
|
||||||
// if allowInsertAutoID is true, and pkData is not nil, skip generating primary key data
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
switch pkField.GetDataType() {
|
switch pkField.GetDataType() {
|
||||||
case schemapb.DataType_Int64:
|
case schemapb.DataType_Int64:
|
||||||
data.Data[pkField.GetFieldID()] = &storage.Int64FieldData{Data: ids}
|
data.Data[pkField.GetFieldID()] = &storage.Int64FieldData{Data: ids}
|
||||||
|
|||||||
@ -43,6 +43,7 @@ type rowParser struct {
|
|||||||
name2Field map[string]*schemapb.FieldSchema
|
name2Field map[string]*schemapb.FieldSchema
|
||||||
pkField *schemapb.FieldSchema
|
pkField *schemapb.FieldSchema
|
||||||
dynamicField *schemapb.FieldSchema
|
dynamicField *schemapb.FieldSchema
|
||||||
|
allowInsertAutoID bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRowParser(schema *schemapb.CollectionSchema, header []string, nullkey string) (RowParser, error) {
|
func NewRowParser(schema *schemapb.CollectionSchema, header []string, nullkey string) (RowParser, error) {
|
||||||
@ -89,7 +90,6 @@ func NewRowParser(schema *schemapb.CollectionSchema, header []string, nullkey st
|
|||||||
return nil, fmt.Errorf("value of field is missed: '%s'", fieldName)
|
return nil, fmt.Errorf("value of field is missed: '%s'", fieldName)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return &rowParser{
|
return &rowParser{
|
||||||
nullkey: nullkey,
|
nullkey: nullkey,
|
||||||
name2Dim: name2Dim,
|
name2Dim: name2Dim,
|
||||||
@ -97,6 +97,7 @@ func NewRowParser(schema *schemapb.CollectionSchema, header []string, nullkey st
|
|||||||
name2Field: name2Field,
|
name2Field: name2Field,
|
||||||
pkField: pkField,
|
pkField: pkField,
|
||||||
dynamicField: dynamicField,
|
dynamicField: dynamicField,
|
||||||
|
allowInsertAutoID: allowInsertAutoID,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -116,6 +117,12 @@ func (r *rowParser) Parse(strArr []string) (Row, error) {
|
|||||||
row[field.GetFieldID()] = data
|
row[field.GetFieldID()] = data
|
||||||
} else if r.dynamicField != nil {
|
} else if r.dynamicField != nil {
|
||||||
dynamicValues[r.header[index]] = value
|
dynamicValues[r.header[index]] = value
|
||||||
|
} else if r.pkField.GetName() == r.header[index] && r.pkField.GetAutoID() && r.allowInsertAutoID {
|
||||||
|
data, err := r.parseEntity(r.pkField, value)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
row[r.pkField.GetFieldID()] = data
|
||||||
} else {
|
} else {
|
||||||
return nil, fmt.Errorf("the field '%s' is not defined in schema", r.header[index])
|
return nil, fmt.Errorf("the field '%s' is not defined in schema", r.header[index])
|
||||||
}
|
}
|
||||||
|
|||||||
@ -71,7 +71,7 @@ func NewRowParser(schema *schemapb.CollectionSchema) (RowParser, error) {
|
|||||||
allowInsertAutoID, _ := pkgcommon.IsAllowInsertAutoID(schema.GetProperties()...)
|
allowInsertAutoID, _ := pkgcommon.IsAllowInsertAutoID(schema.GetProperties()...)
|
||||||
name2FieldID := lo.SliceToMap(
|
name2FieldID := lo.SliceToMap(
|
||||||
lo.Filter(schema.GetFields(), func(field *schemapb.FieldSchema, _ int) bool {
|
lo.Filter(schema.GetFields(), func(field *schemapb.FieldSchema, _ int) bool {
|
||||||
return !field.GetIsFunctionOutput() && (!typeutil.IsAutoPKField(field) || allowInsertAutoID) && field.GetName() != dynamicField.GetName()
|
return !field.GetIsFunctionOutput() && !typeutil.IsAutoPKField(field) && field.GetName() != dynamicField.GetName()
|
||||||
}),
|
}),
|
||||||
func(field *schemapb.FieldSchema) (string, int64) {
|
func(field *schemapb.FieldSchema) (string, int64) {
|
||||||
return field.GetName(), field.GetFieldID()
|
return field.GetName(), field.GetFieldID()
|
||||||
@ -122,6 +122,12 @@ func (r *rowParser) Parse(raw any) (Row, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
row[fieldID] = data
|
row[fieldID] = data
|
||||||
|
} else if r.pkField.GetName() == key && r.pkField.GetAutoID() && r.allowInsertAutoID {
|
||||||
|
data, err := r.parseEntity(r.pkField.GetFieldID(), value)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
row[r.pkField.GetFieldID()] = data
|
||||||
} else if r.dynamicField != nil {
|
} else if r.dynamicField != nil {
|
||||||
// has dynamic field, put redundant pair to dynamicValues
|
// has dynamic field, put redundant pair to dynamicValues
|
||||||
dynamicValues[key] = value
|
dynamicValues[key] = value
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user