enhance: [2.5] Improve import error msgs (#40597)

issue: https://github.com/milvus-io/milvus/issues/40208

pr: https://github.com/milvus-io/milvus/pull/40567

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
yihao.dai 2025-03-12 14:36:08 +08:00 committed by GitHub
parent bd4170106d
commit b0ad3d9444
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 64 additions and 66 deletions

View File

@ -61,20 +61,20 @@ func getInsertDataRowNum(data *storage.InsertData, schema *schemapb.CollectionSc
return 0
}
func CheckVarcharLength(data any, maxLength int64) error {
func CheckVarcharLength(data any, maxLength int64, field *schemapb.FieldSchema) error {
str, ok := data.(string)
if !ok {
return fmt.Errorf("expected string, got %T", data)
return fmt.Errorf("expected string type for field %s, but got %T", field.GetName(), data)
}
if (int64)(len(str)) > maxLength {
return fmt.Errorf("value length %d exceeds max_length %d", len(str), maxLength)
return fmt.Errorf("value length(%d) for field %s exceeds max_length(%d)", len(str), field.GetName(), maxLength)
}
return nil
}
func CheckArrayCapacity(arrLength int, maxCapacity int64) error {
func CheckArrayCapacity(arrLength int, maxCapacity int64, field *schemapb.FieldSchema) error {
if (int64)(arrLength) > maxCapacity {
return fmt.Errorf("array capacity %d exceeds max_capacity %d", arrLength, maxCapacity)
return fmt.Errorf("array capacity(%d) for field %s exceeds max_capacity(%d)", arrLength, field.GetName(), maxCapacity)
}
return nil
}

View File

@ -72,8 +72,7 @@ func NewRowParser(schema *schemapb.CollectionSchema, header []string, nullkey st
// check if csv header provides the primary key while it should be auto-generated
if pkField.GetAutoID() && lo.Contains(header, pkField.GetName()) {
return nil, merr.WrapErrImportFailed(
fmt.Sprintf("the primary key '%s' is auto-generated, no need to provide", pkField.GetName()))
return nil, fmt.Errorf("the primary key '%s' is auto-generated, no need to provide", pkField.GetName())
}
// check whether csv header contains all fields in schema
@ -84,7 +83,7 @@ func NewRowParser(schema *schemapb.CollectionSchema, header []string, nullkey st
}
for fieldName := range name2Field {
if _, ok := nameMap[fieldName]; !ok && (fieldName != dynamicField.GetName()) && (fieldName != pkField.GetName() && !pkField.GetAutoID()) {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("value of field is missed: '%s'", fieldName))
return nil, fmt.Errorf("value of field is missed: '%s'", fieldName)
}
}
@ -100,7 +99,7 @@ func NewRowParser(schema *schemapb.CollectionSchema, header []string, nullkey st
func (r *rowParser) Parse(strArr []string) (Row, error) {
if len(strArr) != len(r.header) {
return nil, merr.WrapErrImportFailed("the number of fields in the row is not equal to the header")
return nil, fmt.Errorf("the number of fields in the row is not equal to the header")
}
row := make(Row)
@ -115,7 +114,7 @@ func (r *rowParser) Parse(strArr []string) (Row, error) {
} else if r.dynamicField != nil {
dynamicValues[r.header[index]] = value
} else {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("the field '%s' is not defined in schema", r.header[index]))
return nil, fmt.Errorf("the field '%s' is not defined in schema", r.header[index])
}
}
@ -144,12 +143,12 @@ func (r *rowParser) combineDynamicRow(dynamicValues map[string]string, row Row)
var mp map[string]interface{}
err := json.Unmarshal([]byte(str), &mp)
if err != nil {
return merr.WrapErrImportFailed("illegal value for dynamic field, not a JSON format string")
return fmt.Errorf("illegal value for dynamic field, not a JSON format string")
}
// put the all dynamic fields into newDynamicValues
for k, v := range mp {
if _, ok = dynamicValues[k]; ok {
return merr.WrapErrImportFailed(fmt.Sprintf("duplicated key in dynamic field, key=%s", k))
return fmt.Errorf("duplicated key in dynamic field, key=%s", k)
}
newDynamicValues[k] = v
}
@ -165,7 +164,7 @@ func (r *rowParser) combineDynamicRow(dynamicValues map[string]string, row Row)
// check if stasify the json format
dynamicBytes, err := json.Marshal(newDynamicValues)
if err != nil {
return merr.WrapErrImportFailed("illegal value for dynamic field, not a JSON object")
return fmt.Errorf("illegal value for dynamic field, not a JSON object")
}
row[dynamicFieldID] = dynamicBytes
@ -246,7 +245,7 @@ func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string) (any, e
if err != nil {
return nil, err
}
if err = common.CheckVarcharLength(obj, maxLength); err != nil {
if err = common.CheckVarcharLength(obj, maxLength, field); err != nil {
return nil, err
}
return obj, nil
@ -352,7 +351,7 @@ func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string) (any, e
if err != nil {
return nil, err
}
if err = common.CheckArrayCapacity(len(vec), maxCapacity); err != nil {
if err = common.CheckArrayCapacity(len(vec), maxCapacity, field); err != nil {
return nil, err
}
// elements in array not support null value
@ -362,8 +361,8 @@ func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string) (any, e
}
return scalarFieldData, nil
default:
return nil, merr.WrapErrImportFailed(fmt.Sprintf("parse csv failed, unsupport data type: %s",
field.GetDataType().String()))
return nil, fmt.Errorf("parse csv failed, unsupport data type: %s",
field.GetDataType().String())
}
}
@ -489,21 +488,21 @@ func (r *rowParser) arrayToFieldData(arr []interface{}, eleType schemapb.DataTyp
},
}, nil
default:
return nil, merr.WrapErrImportFailed(fmt.Sprintf("parse csv failed, unsupported data type: %s", eleType.String()))
return nil, fmt.Errorf("parse csv failed, unsupported data type: %s", eleType.String())
}
}
func (r *rowParser) wrapTypeError(v any, field *schemapb.FieldSchema) error {
return merr.WrapErrImportFailed(fmt.Sprintf("expected type '%s' for field '%s', got type '%T' with value '%v'",
field.GetDataType().String(), field.GetName(), v, v))
return fmt.Errorf("expected type '%s' for field '%s', got type '%T' with value '%v'",
field.GetDataType().String(), field.GetName(), v, v)
}
func (r *rowParser) wrapDimError(actualDim int, field *schemapb.FieldSchema) error {
return merr.WrapErrImportFailed(fmt.Sprintf("expected dim '%d' for field '%s' with type '%s', got dim '%d'",
r.name2Dim[field.GetName()], field.GetName(), field.GetDataType().String(), actualDim))
return fmt.Errorf("expected dim '%d' for field '%s' with type '%s', got dim '%d'",
r.name2Dim[field.GetName()], field.GetName(), field.GetDataType().String(), actualDim)
}
func (r *rowParser) wrapArrayValueTypeError(v any, eleType schemapb.DataType) error {
return merr.WrapErrImportFailed(fmt.Sprintf("expected element type '%s' in array field, got type '%T' with value '%v'",
eleType.String(), v, v))
return fmt.Errorf("expected element type '%s' in array field, got type '%T' with value '%v'",
eleType.String(), v, v)
}

View File

@ -140,7 +140,7 @@ func (j *reader) Read() (*storage.InsertData, error) {
}
row, err := j.parser.Parse(value)
if err != nil {
return nil, err
return nil, merr.WrapErrImportFailed(fmt.Sprintf("failed to parse row, error: %v", err))
}
err = insertData.Append(row)
if err != nil {

View File

@ -85,29 +85,28 @@ func NewRowParser(schema *schemapb.CollectionSchema) (RowParser, error) {
func (r *rowParser) wrapTypeError(v any, fieldID int64) error {
field := r.id2Field[fieldID]
return merr.WrapErrImportFailed(fmt.Sprintf("expected type '%s' for field '%s', got type '%T' with value '%v'",
field.GetDataType().String(), field.GetName(), v, v))
return fmt.Errorf("expected type '%s' for field '%s', got type '%T' with value '%v'",
field.GetDataType().String(), field.GetName(), v, v)
}
func (r *rowParser) wrapDimError(actualDim int, fieldID int64) error {
field := r.id2Field[fieldID]
return merr.WrapErrImportFailed(fmt.Sprintf("expected dim '%d' for field '%s' with type '%s', got dim '%d'",
r.id2Dim[fieldID], field.GetName(), field.GetDataType().String(), actualDim))
return fmt.Errorf("expected dim '%d' for field '%s' with type '%s', got dim '%d'",
r.id2Dim[fieldID], field.GetName(), field.GetDataType().String(), actualDim)
}
func (r *rowParser) wrapArrayValueTypeError(v any, eleType schemapb.DataType) error {
return merr.WrapErrImportFailed(fmt.Sprintf("expected element type '%s' in array field, got type '%T' with value '%v'",
eleType.String(), v, v))
return fmt.Errorf("expected element type '%s' in array field, got type '%T' with value '%v'",
eleType.String(), v, v)
}
func (r *rowParser) Parse(raw any) (Row, error) {
stringMap, ok := raw.(map[string]any)
if !ok {
return nil, merr.WrapErrImportFailed("invalid JSON format, each row should be a key-value map")
return nil, fmt.Errorf("invalid JSON format, each row should be a key-value map, but got type %T", raw)
}
if _, ok = stringMap[r.pkField.GetName()]; ok && r.pkField.GetAutoID() {
return nil, merr.WrapErrImportFailed(
fmt.Sprintf("the primary key '%s' is auto-generated, no need to provide", r.pkField.GetName()))
return nil, fmt.Errorf("the primary key '%s' is auto-generated, no need to provide", r.pkField.GetName())
}
dynamicValues := make(map[string]any)
row := make(Row)
@ -122,7 +121,7 @@ func (r *rowParser) Parse(raw any) (Row, error) {
// has dynamic field, put redundant pair to dynamicValues
dynamicValues[key] = value
} else {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("the field '%s' is not defined in schema", key))
return nil, fmt.Errorf("the field '%s' is not defined in schema", key)
}
}
for fieldName, fieldID := range r.name2FieldID {
@ -139,7 +138,7 @@ func (r *rowParser) Parse(raw any) (Row, error) {
}
}
if _, ok = row[fieldID]; !ok {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("value of field '%s' is missed", fieldName))
return nil, fmt.Errorf("value of field '%s' is missed", fieldName)
}
}
if r.dynamicField == nil {
@ -180,20 +179,20 @@ func (r *rowParser) combineDynamicRow(dynamicValues map[string]any, row Row) err
// case 1, 3
err := json.Unmarshal([]byte(value), &mp)
if err != nil {
return merr.WrapErrImportFailed("illegal value for dynamic field, not a JSON format string")
return fmt.Errorf("illegal value for dynamic field, not a JSON format string")
}
case map[string]interface{}:
// case 2, 4, 5
mp = value
default:
// invalid input
return merr.WrapErrImportFailed("illegal value for dynamic field, not a JSON object")
return fmt.Errorf("illegal value for dynamic field, not a JSON object")
}
delete(dynamicValues, r.dynamicField.GetName())
for k, v := range mp {
if _, ok = dynamicValues[k]; ok {
// case 8, 9
return merr.WrapErrImportFailed(fmt.Sprintf("duplicated key is not allowed, key=%s", k))
return fmt.Errorf("duplicated key is not allowed, key=%s", k)
}
dynamicValues[k] = v
}
@ -386,7 +385,7 @@ func (r *rowParser) parseEntity(fieldID int64, obj any) (any, error) {
if err != nil {
return nil, err
}
if err = common.CheckVarcharLength(value, maxLength); err != nil {
if err = common.CheckVarcharLength(value, maxLength, r.id2Field[fieldID]); err != nil {
return nil, err
}
return value, nil
@ -418,7 +417,7 @@ func (r *rowParser) parseEntity(fieldID int64, obj any) (any, error) {
if err != nil {
return nil, err
}
if err = common.CheckArrayCapacity(len(arr), maxCapacity); err != nil {
if err = common.CheckArrayCapacity(len(arr), maxCapacity, r.id2Field[fieldID]); err != nil {
return nil, err
}
scalarFieldData, err := r.arrayToFieldData(arr, r.id2Field[fieldID].GetElementType())
@ -427,8 +426,8 @@ func (r *rowParser) parseEntity(fieldID int64, obj any) (any, error) {
}
return scalarFieldData, nil
default:
return nil, merr.WrapErrImportFailed(fmt.Sprintf("parse json failed, unsupport data type: %s",
r.id2Field[fieldID].GetDataType().String()))
return nil, fmt.Errorf("parse json failed, unsupport data type: %s",
r.id2Field[fieldID].GetDataType().String())
}
}
@ -568,8 +567,8 @@ func (r *rowParser) parseNullableEntity(fieldID int64, obj any) (any, error) {
}
return scalarFieldData, nil
default:
return nil, merr.WrapErrImportFailed(fmt.Sprintf("parse json failed, unsupport data type: %s",
r.id2Field[fieldID].GetDataType().String()))
return nil, fmt.Errorf("parse json failed, unsupport data type: %s",
r.id2Field[fieldID].GetDataType().String())
}
}

View File

@ -200,8 +200,8 @@ func TestRowParser_Parse_Invalid(t *testing.T) {
{name: `{"id": 1, "vector": [], "arrayField": [1, 2, 3, 4], "x": 6, "$meta": "{\"x\": 8}", "name": "test"}`, expectErr: "duplicated key is not allowed"},
{name: `{"id": 1, "vector": [], "arrayField": [1, 2, 3, 4], "x": 6, "$meta": "{*&%%&$*(&", "name": "test"}`, expectErr: "not a JSON format string"},
{name: `{"id": 1, "vector": [], "arrayField": [1, 2, 3, 4], "x": 6, "$meta": [], "name": "test"}`, expectErr: "not a JSON object"},
{name: `{"id": 1, "vector": [], "arrayField": [1, 2, 3, 4], "x": 8, "$meta": "{\"y\": 8}", "name": "testName"}`, expectErr: "value length 8 exceeds max_length 4"},
{name: `{"id": 1, "vector": [], "arrayField": [1, 2, 3, 4, 5], "x": 8, "$meta": "{\"z\": 9}", "name": "test"}`, expectErr: "array capacity 5 exceeds max_capacity 4"},
{name: `{"id": 1, "vector": [], "arrayField": [1, 2, 3, 4], "x": 8, "$meta": "{\"y\": 8}", "name": "testName"}`, expectErr: "value length(8) for field name exceeds max_length(4)"},
{name: `{"id": 1, "vector": [], "arrayField": [1, 2, 3, 4, 5], "x": 8, "$meta": "{\"z\": 9}", "name": "test"}`, expectErr: "array capacity(5) for field arrayField exceeds max_capacity(4)"},
{name: `{"id": 1, "vector": [], "x": 8, "$meta": "{\"z\": 9}", "name": "test"}`, expectErr: "value of field 'arrayField' is missed"},
}

View File

@ -291,7 +291,7 @@ func (c *FieldReader) ReadString(count int64) ([]string, error) {
}
str, err := decodeUtf32(raw, c.order)
if c.field.DataType == schemapb.DataType_VarChar {
if err = common.CheckVarcharLength(str, maxLength); err != nil {
if err = common.CheckVarcharLength(str, maxLength, c.field); err != nil {
return nil, err
}
}

View File

@ -499,7 +499,7 @@ func ReadVarcharData(pcr *FieldReader, count int64) (any, error) {
return nil, WrapTypeErr("string", chunk.DataType().Name(), pcr.field)
}
for i := 0; i < dataNums; i++ {
if err = common.CheckVarcharLength(stringReader.Value(i), maxLength); err != nil {
if err = common.CheckVarcharLength(stringReader.Value(i), maxLength, pcr.field); err != nil {
return nil, err
}
data = append(data, stringReader.Value(i))
@ -540,7 +540,7 @@ func ReadNullableVarcharData(pcr *FieldReader, count int64) (any, []bool, error)
data = append(data, "")
continue
}
if err = common.CheckVarcharLength(stringReader.Value(i), maxLength); err != nil {
if err = common.CheckVarcharLength(stringReader.Value(i), maxLength, pcr.field); err != nil {
return nil, nil, err
}
data = append(data, stringReader.ValueStr(i))
@ -1047,7 +1047,7 @@ func ReadArrayData(pcr *FieldReader, count int64) (any, error) {
return nil, nil
}
for _, elementArray := range boolArray.([][]bool) {
if err = common.CheckArrayCapacity(len(elementArray), maxCapacity); err != nil {
if err = common.CheckArrayCapacity(len(elementArray), maxCapacity, pcr.field); err != nil {
return nil, err
}
data = append(data, &schemapb.ScalarField{
@ -1067,7 +1067,7 @@ func ReadArrayData(pcr *FieldReader, count int64) (any, error) {
return nil, nil
}
for _, elementArray := range int8Array.([][]int32) {
if err = common.CheckArrayCapacity(len(elementArray), maxCapacity); err != nil {
if err = common.CheckArrayCapacity(len(elementArray), maxCapacity, pcr.field); err != nil {
return nil, err
}
data = append(data, &schemapb.ScalarField{
@ -1087,7 +1087,7 @@ func ReadArrayData(pcr *FieldReader, count int64) (any, error) {
return nil, nil
}
for _, elementArray := range int16Array.([][]int32) {
if err = common.CheckArrayCapacity(len(elementArray), maxCapacity); err != nil {
if err = common.CheckArrayCapacity(len(elementArray), maxCapacity, pcr.field); err != nil {
return nil, err
}
data = append(data, &schemapb.ScalarField{
@ -1107,7 +1107,7 @@ func ReadArrayData(pcr *FieldReader, count int64) (any, error) {
return nil, nil
}
for _, elementArray := range int32Array.([][]int32) {
if err = common.CheckArrayCapacity(len(elementArray), maxCapacity); err != nil {
if err = common.CheckArrayCapacity(len(elementArray), maxCapacity, pcr.field); err != nil {
return nil, err
}
data = append(data, &schemapb.ScalarField{
@ -1127,7 +1127,7 @@ func ReadArrayData(pcr *FieldReader, count int64) (any, error) {
return nil, nil
}
for _, elementArray := range int64Array.([][]int64) {
if err = common.CheckArrayCapacity(len(elementArray), maxCapacity); err != nil {
if err = common.CheckArrayCapacity(len(elementArray), maxCapacity, pcr.field); err != nil {
return nil, err
}
data = append(data, &schemapb.ScalarField{
@ -1147,7 +1147,7 @@ func ReadArrayData(pcr *FieldReader, count int64) (any, error) {
return nil, nil
}
for _, elementArray := range float32Array.([][]float32) {
if err = common.CheckArrayCapacity(len(elementArray), maxCapacity); err != nil {
if err = common.CheckArrayCapacity(len(elementArray), maxCapacity, pcr.field); err != nil {
return nil, err
}
data = append(data, &schemapb.ScalarField{
@ -1167,7 +1167,7 @@ func ReadArrayData(pcr *FieldReader, count int64) (any, error) {
return nil, nil
}
for _, elementArray := range float64Array.([][]float64) {
if err = common.CheckArrayCapacity(len(elementArray), maxCapacity); err != nil {
if err = common.CheckArrayCapacity(len(elementArray), maxCapacity, pcr.field); err != nil {
return nil, err
}
data = append(data, &schemapb.ScalarField{
@ -1187,7 +1187,7 @@ func ReadArrayData(pcr *FieldReader, count int64) (any, error) {
return nil, nil
}
for _, elementArray := range stringArray.([][]string) {
if err = common.CheckArrayCapacity(len(elementArray), maxCapacity); err != nil {
if err = common.CheckArrayCapacity(len(elementArray), maxCapacity, pcr.field); err != nil {
return nil, err
}
data = append(data, &schemapb.ScalarField{
@ -1222,7 +1222,7 @@ func ReadNullableArrayData(pcr *FieldReader, count int64) (any, []bool, error) {
return nil, nil, nil
}
for _, elementArray := range boolArray.([][]bool) {
if err = common.CheckArrayCapacity(len(elementArray), maxCapacity); err != nil {
if err = common.CheckArrayCapacity(len(elementArray), maxCapacity, pcr.field); err != nil {
return nil, nil, err
}
data = append(data, &schemapb.ScalarField{
@ -1243,7 +1243,7 @@ func ReadNullableArrayData(pcr *FieldReader, count int64) (any, []bool, error) {
return nil, nil, nil
}
for _, elementArray := range int8Array.([][]int32) {
if err = common.CheckArrayCapacity(len(elementArray), maxCapacity); err != nil {
if err = common.CheckArrayCapacity(len(elementArray), maxCapacity, pcr.field); err != nil {
return nil, nil, err
}
data = append(data, &schemapb.ScalarField{
@ -1264,7 +1264,7 @@ func ReadNullableArrayData(pcr *FieldReader, count int64) (any, []bool, error) {
return nil, nil, nil
}
for _, elementArray := range int16Array.([][]int32) {
if err = common.CheckArrayCapacity(len(elementArray), maxCapacity); err != nil {
if err = common.CheckArrayCapacity(len(elementArray), maxCapacity, pcr.field); err != nil {
return nil, nil, err
}
data = append(data, &schemapb.ScalarField{
@ -1285,7 +1285,7 @@ func ReadNullableArrayData(pcr *FieldReader, count int64) (any, []bool, error) {
return nil, nil, nil
}
for _, elementArray := range int32Array.([][]int32) {
if err = common.CheckArrayCapacity(len(elementArray), maxCapacity); err != nil {
if err = common.CheckArrayCapacity(len(elementArray), maxCapacity, pcr.field); err != nil {
return nil, nil, err
}
data = append(data, &schemapb.ScalarField{
@ -1306,7 +1306,7 @@ func ReadNullableArrayData(pcr *FieldReader, count int64) (any, []bool, error) {
return nil, nil, nil
}
for _, elementArray := range int64Array.([][]int64) {
if err = common.CheckArrayCapacity(len(elementArray), maxCapacity); err != nil {
if err = common.CheckArrayCapacity(len(elementArray), maxCapacity, pcr.field); err != nil {
return nil, nil, err
}
data = append(data, &schemapb.ScalarField{
@ -1327,7 +1327,7 @@ func ReadNullableArrayData(pcr *FieldReader, count int64) (any, []bool, error) {
return nil, nil, nil
}
for _, elementArray := range float32Array.([][]float32) {
if err = common.CheckArrayCapacity(len(elementArray), maxCapacity); err != nil {
if err = common.CheckArrayCapacity(len(elementArray), maxCapacity, pcr.field); err != nil {
return nil, nil, err
}
data = append(data, &schemapb.ScalarField{
@ -1348,7 +1348,7 @@ func ReadNullableArrayData(pcr *FieldReader, count int64) (any, []bool, error) {
return nil, nil, nil
}
for _, elementArray := range float64Array.([][]float64) {
if err = common.CheckArrayCapacity(len(elementArray), maxCapacity); err != nil {
if err = common.CheckArrayCapacity(len(elementArray), maxCapacity, pcr.field); err != nil {
return nil, nil, err
}
data = append(data, &schemapb.ScalarField{
@ -1369,7 +1369,7 @@ func ReadNullableArrayData(pcr *FieldReader, count int64) (any, []bool, error) {
return nil, nil, nil
}
for _, elementArray := range stringArray.([][]string) {
if err = common.CheckArrayCapacity(len(elementArray), maxCapacity); err != nil {
if err = common.CheckArrayCapacity(len(elementArray), maxCapacity, pcr.field); err != nil {
return nil, nil, err
}
data = append(data, &schemapb.ScalarField{