diff --git a/internal/util/importutil/import_util.go b/internal/util/importutil/import_util.go index 2379379e92..ff2d7616c6 100644 --- a/internal/util/importutil/import_util.go +++ b/internal/util/importutil/import_util.go @@ -21,6 +21,7 @@ import ( "encoding/json" "errors" "fmt" + "math" "path" "runtime/debug" "strconv" @@ -121,24 +122,26 @@ func initSegmentData(collectionSchema *schemapb.CollectionSchema) map[storage.Fi return segmentData } +func parseFloat(s string, bitsize int, fieldName string) (float64, error) { + value, err := strconv.ParseFloat(s, bitsize) + if err != nil { + return 0, fmt.Errorf("failed to parse value '%s' for field '%s', error: %w", s, fieldName, err) + } + + // not allow not-a-number and infinity + if math.IsNaN(value) || math.IsInf(value, -1) || math.IsInf(value, 1) { + return 0, fmt.Errorf("value '%s' is not a number or infinity, field '%s', error: %w", s, fieldName, err) + } + + return value, nil +} + // initValidators constructs valiator methods and data conversion methods func initValidators(collectionSchema *schemapb.CollectionSchema, validators map[storage.FieldID]*Validator) error { if collectionSchema == nil { return errors.New("collection schema is nil") } - // json decoder parse all the numeric value into float64 - numericValidator := func(fieldName string) func(obj interface{}) error { - return func(obj interface{}) error { - switch obj.(type) { - case json.Number: - return nil - default: - return fmt.Errorf("illegal value %v for numeric type field '%s'", obj, fieldName) - } - } - } - for i := 0; i < len(collectionSchema.Fields); i++ { schema := collectionSchema.Fields[i] @@ -150,91 +153,99 @@ func initValidators(collectionSchema *schemapb.CollectionSchema, validators map[ switch schema.DataType { case schemapb.DataType_Bool: - validators[schema.GetFieldID()].validateFunc = func(obj interface{}) error { - switch obj.(type) { - case bool: - return nil - default: + validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error { + if value, ok := obj.(bool); ok { + field.(*storage.BoolFieldData).Data = append(field.(*storage.BoolFieldData).Data, value) + field.(*storage.BoolFieldData).NumRows[0]++ + } else { return fmt.Errorf("illegal value '%v' for bool type field '%s'", obj, schema.GetName()) } - } - validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error { - value := obj.(bool) - field.(*storage.BoolFieldData).Data = append(field.(*storage.BoolFieldData).Data, value) - field.(*storage.BoolFieldData).NumRows[0]++ return nil } case schemapb.DataType_Float: - validators[schema.GetFieldID()].validateFunc = numericValidator(schema.GetName()) validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error { - value, err := strconv.ParseFloat(string(obj.(json.Number)), 32) - if err != nil { - return fmt.Errorf("failed to parse value '%v' for float type field '%s', error: %w", - obj, schema.GetName(), err) + if num, ok := obj.(json.Number); ok { + value, err := parseFloat(string(num), 32, schema.GetName()) + if err != nil { + return err + } + field.(*storage.FloatFieldData).Data = append(field.(*storage.FloatFieldData).Data, float32(value)) + field.(*storage.FloatFieldData).NumRows[0]++ + } else { + return fmt.Errorf("illegal value '%v' for float type field '%s'", obj, schema.GetName()) } - field.(*storage.FloatFieldData).Data = append(field.(*storage.FloatFieldData).Data, float32(value)) - field.(*storage.FloatFieldData).NumRows[0]++ + return nil } case schemapb.DataType_Double: - validators[schema.GetFieldID()].validateFunc = numericValidator(schema.GetName()) validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error { - value, err := strconv.ParseFloat(string(obj.(json.Number)), 32) - if err != nil { - return fmt.Errorf("failed to parse value '%v' for double type field '%s', error: %w", - obj, schema.GetName(), err) + if num, ok := obj.(json.Number); ok { + value, err := parseFloat(string(num), 64, schema.GetName()) + if err != nil { + return err + } + field.(*storage.DoubleFieldData).Data = append(field.(*storage.DoubleFieldData).Data, value) + field.(*storage.DoubleFieldData).NumRows[0]++ + } else { + return fmt.Errorf("illegal value '%v' for double type field '%s'", obj, schema.GetName()) } - field.(*storage.DoubleFieldData).Data = append(field.(*storage.DoubleFieldData).Data, value) - field.(*storage.DoubleFieldData).NumRows[0]++ return nil } case schemapb.DataType_Int8: - validators[schema.GetFieldID()].validateFunc = numericValidator(schema.GetName()) validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error { - value, err := strconv.ParseInt(string(obj.(json.Number)), 10, 8) - if err != nil { - return fmt.Errorf("failed to parse value '%v' for int8 type field '%s', error: %w", - obj, schema.GetName(), err) + if num, ok := obj.(json.Number); ok { + value, err := strconv.ParseInt(string(num), 0, 8) + if err != nil { + return fmt.Errorf("failed to parse value '%v' for int8 field '%s', error: %w", num, schema.GetName(), err) + } + field.(*storage.Int8FieldData).Data = append(field.(*storage.Int8FieldData).Data, int8(value)) + field.(*storage.Int8FieldData).NumRows[0]++ + } else { + return fmt.Errorf("illegal value '%v' for int8 type field '%s'", obj, schema.GetName()) } - field.(*storage.Int8FieldData).Data = append(field.(*storage.Int8FieldData).Data, int8(value)) - field.(*storage.Int8FieldData).NumRows[0]++ return nil } case schemapb.DataType_Int16: - validators[schema.GetFieldID()].validateFunc = numericValidator(schema.GetName()) validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error { - value, err := strconv.ParseInt(string(obj.(json.Number)), 10, 16) - if err != nil { - return fmt.Errorf("failed to parse value '%v' for int16 type field '%s', error: %w", - obj, schema.GetName(), err) + if num, ok := obj.(json.Number); ok { + value, err := strconv.ParseInt(string(num), 0, 16) + if err != nil { + return fmt.Errorf("failed to parse value '%v' for int16 field '%s', error: %w", num, schema.GetName(), err) + } + field.(*storage.Int16FieldData).Data = append(field.(*storage.Int16FieldData).Data, int16(value)) + field.(*storage.Int16FieldData).NumRows[0]++ + } else { + return fmt.Errorf("illegal value '%v' for int16 type field '%s'", obj, schema.GetName()) } - field.(*storage.Int16FieldData).Data = append(field.(*storage.Int16FieldData).Data, int16(value)) - field.(*storage.Int16FieldData).NumRows[0]++ return nil } case schemapb.DataType_Int32: - validators[schema.GetFieldID()].validateFunc = numericValidator(schema.GetName()) validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error { - value, err := strconv.ParseInt(string(obj.(json.Number)), 10, 32) - if err != nil { - return fmt.Errorf("failed to parse value '%v' for int32 type field '%s', error: %w", - obj, schema.GetName(), err) + if num, ok := obj.(json.Number); ok { + value, err := strconv.ParseInt(string(num), 0, 32) + if err != nil { + return fmt.Errorf("failed to parse value '%v' for int32 field '%s', error: %w", num, schema.GetName(), err) + } + field.(*storage.Int32FieldData).Data = append(field.(*storage.Int32FieldData).Data, int32(value)) + field.(*storage.Int32FieldData).NumRows[0]++ + } else { + return fmt.Errorf("illegal value '%v' for int32 type field '%s'", obj, schema.GetName()) } - field.(*storage.Int32FieldData).Data = append(field.(*storage.Int32FieldData).Data, int32(value)) - field.(*storage.Int32FieldData).NumRows[0]++ return nil } case schemapb.DataType_Int64: - validators[schema.GetFieldID()].validateFunc = numericValidator(schema.GetName()) validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error { - value, err := strconv.ParseInt(string(obj.(json.Number)), 10, 64) - if err != nil { - return fmt.Errorf("failed to parse value '%v' for int64 type field '%s', error: %w", - obj, schema.GetName(), err) + if num, ok := obj.(json.Number); ok { + value, err := strconv.ParseInt(string(num), 0, 64) + if err != nil { + return fmt.Errorf("failed to parse value '%v' for int64 field '%s', error: %w", num, schema.GetName(), err) + } + field.(*storage.Int64FieldData).Data = append(field.(*storage.Int64FieldData).Data, value) + field.(*storage.Int64FieldData).NumRows[0]++ + } else { + return fmt.Errorf("illegal value '%v' for int64 type field '%s'", obj, schema.GetName()) } - field.(*storage.Int64FieldData).Data = append(field.(*storage.Int64FieldData).Data, value) - field.(*storage.Int64FieldData).NumRows[0]++ return nil } case schemapb.DataType_BinaryVector: @@ -244,33 +255,26 @@ func initValidators(collectionSchema *schemapb.CollectionSchema, validators map[ } validators[schema.GetFieldID()].dimension = dim - validators[schema.GetFieldID()].validateFunc = func(obj interface{}) error { - switch vt := obj.(type) { - case []interface{}: - if len(vt)*8 != dim { - return fmt.Errorf("bit size %d doesn't equal to vector dimension %d of field '%s'", len(vt)*8, dim, schema.GetName()) - } - numValidateFunc := numericValidator(schema.GetName()) - for i := 0; i < len(vt); i++ { - if e := numValidateFunc(vt[i]); e != nil { - return e - } - } - return nil - default: + validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error { + arr, ok := obj.([]interface{}) + if !ok { return fmt.Errorf("'%v' is not an array for binary vector field '%s'", obj, schema.GetName()) } - } + // we use uint8 to represent binary vector in json file, each uint8 value represents 8 dimensions. + if len(arr)*8 != dim { + return fmt.Errorf("bit size %d doesn't equal to vector dimension %d of field '%s'", len(arr)*8, dim, schema.GetName()) + } - validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error { - arr := obj.([]interface{}) for i := 0; i < len(arr); i++ { - value, err := strconv.ParseUint(string(arr[i].(json.Number)), 10, 8) - if err != nil { - return fmt.Errorf("failed to parse value '%v' for binary vector type field '%s', error: %w", - obj, schema.GetName(), err) + if num, ok := arr[i].(json.Number); ok { + value, err := strconv.ParseUint(string(num), 0, 8) + if err != nil { + return fmt.Errorf("failed to parse value '%v' for binary vector field '%s', error: %w", num, schema.GetName(), err) + } + field.(*storage.BinaryVectorFieldData).Data = append(field.(*storage.BinaryVectorFieldData).Data, byte(value)) + } else { + return fmt.Errorf("illegal value '%v' for binary vector field '%s'", obj, schema.GetName()) } - field.(*storage.BinaryVectorFieldData).Data = append(field.(*storage.BinaryVectorFieldData).Data, byte(value)) } field.(*storage.BinaryVectorFieldData).NumRows[0]++ @@ -283,52 +287,40 @@ func initValidators(collectionSchema *schemapb.CollectionSchema, validators map[ } validators[schema.GetFieldID()].dimension = dim - validators[schema.GetFieldID()].validateFunc = func(obj interface{}) error { - switch vt := obj.(type) { - case []interface{}: - if len(vt) != dim { - return fmt.Errorf("array size %d doesn't equal to vector dimension %d of field '%s'", len(vt), dim, schema.GetName()) - } - numValidateFunc := numericValidator(schema.GetName()) - for i := 0; i < len(vt); i++ { - if e := numValidateFunc(vt[i]); e != nil { - return e - } - } - return nil - default: + validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error { + arr, ok := obj.([]interface{}) + if !ok { return fmt.Errorf("'%v' is not an array for float vector field '%s'", obj, schema.GetName()) } - } - - validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error { - arr := obj.([]interface{}) - for i := 0; i < len(arr); i++ { - value, err := strconv.ParseFloat(string(arr[i].(json.Number)), 32) - if err != nil { - return fmt.Errorf("failed to parse value '%v' for binary vector type field '%s', error: %w", - obj, schema.GetName(), err) - } - field.(*storage.FloatVectorFieldData).Data = append(field.(*storage.FloatVectorFieldData).Data, float32(value)) + if len(arr) != dim { + return fmt.Errorf("array size %d doesn't equal to vector dimension %d of field '%s'", len(arr), dim, schema.GetName()) } + + for i := 0; i < len(arr); i++ { + if num, ok := arr[i].(json.Number); ok { + value, err := parseFloat(string(num), 32, schema.GetName()) + if err != nil { + return err + } + field.(*storage.FloatVectorFieldData).Data = append(field.(*storage.FloatVectorFieldData).Data, float32(value)) + } else { + return fmt.Errorf("illegal value '%v' for float vector field '%s'", obj, schema.GetName()) + } + } + field.(*storage.FloatVectorFieldData).NumRows[0]++ return nil } case schemapb.DataType_String, schemapb.DataType_VarChar: validators[schema.GetFieldID()].isString = true - validators[schema.GetFieldID()].validateFunc = func(obj interface{}) error { - switch obj.(type) { - case string: - return nil - default: - return fmt.Errorf("'%v' is not a string for varchar type field '%s'", obj, schema.GetName()) - } - } validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error { - value := obj.(string) - field.(*storage.StringFieldData).Data = append(field.(*storage.StringFieldData).Data, value) - field.(*storage.StringFieldData).NumRows[0]++ + if value, ok := obj.(string); ok { + field.(*storage.StringFieldData).Data = append(field.(*storage.StringFieldData).Data, value) + field.(*storage.StringFieldData).NumRows[0]++ + } else { + return fmt.Errorf("illegal value '%v' for varchar type field '%s'", obj, schema.GetName()) + } return nil } default: diff --git a/internal/util/importutil/import_util_test.go b/internal/util/importutil/import_util_test.go index eb0d44ee89..0e87651960 100644 --- a/internal/util/importutil/import_util_test.go +++ b/internal/util/importutil/import_util_test.go @@ -19,6 +19,7 @@ import ( "context" "encoding/json" "errors" + "math" "testing" "github.com/milvus-io/milvus-proto/go-api/commonpb" @@ -27,6 +28,7 @@ import ( "github.com/stretchr/testify/assert" ) +// sampleSchema() return a schema contains all supported data types with an int64 primary key func sampleSchema() *schemapb.CollectionSchema { schema := &schemapb.CollectionSchema{ Name: "schema", @@ -35,35 +37,35 @@ func sampleSchema() *schemapb.CollectionSchema { Fields: []*schemapb.FieldSchema{ { FieldID: 102, - Name: "field_bool", + Name: "FieldBool", IsPrimaryKey: false, Description: "bool", DataType: schemapb.DataType_Bool, }, { FieldID: 103, - Name: "field_int8", + Name: "FieldInt8", IsPrimaryKey: false, Description: "int8", DataType: schemapb.DataType_Int8, }, { FieldID: 104, - Name: "field_int16", + Name: "FieldInt16", IsPrimaryKey: false, Description: "int16", DataType: schemapb.DataType_Int16, }, { FieldID: 105, - Name: "field_int32", + Name: "FieldInt32", IsPrimaryKey: false, Description: "int32", DataType: schemapb.DataType_Int32, }, { FieldID: 106, - Name: "field_int64", + Name: "FieldInt64", IsPrimaryKey: true, AutoID: false, Description: "int64", @@ -71,21 +73,21 @@ func sampleSchema() *schemapb.CollectionSchema { }, { FieldID: 107, - Name: "field_float", + Name: "FieldFloat", IsPrimaryKey: false, Description: "float", DataType: schemapb.DataType_Float, }, { FieldID: 108, - Name: "field_double", + Name: "FieldDouble", IsPrimaryKey: false, Description: "double", DataType: schemapb.DataType_Double, }, { FieldID: 109, - Name: "field_string", + Name: "FieldString", IsPrimaryKey: false, Description: "string", DataType: schemapb.DataType_VarChar, @@ -95,7 +97,7 @@ func sampleSchema() *schemapb.CollectionSchema { }, { FieldID: 110, - Name: "field_binary_vector", + Name: "FieldBinaryVector", IsPrimaryKey: false, Description: "binary_vector", DataType: schemapb.DataType_BinaryVector, @@ -105,7 +107,7 @@ func sampleSchema() *schemapb.CollectionSchema { }, { FieldID: 111, - Name: "field_float_vector", + Name: "FieldFloatVector", IsPrimaryKey: false, Description: "float_vector", DataType: schemapb.DataType_FloatVector, @@ -118,6 +120,24 @@ func sampleSchema() *schemapb.CollectionSchema { return schema } +// sampleContent/sampleRow is json structs to represent sampleSchema() for testing +type sampleRow struct { + FieldBool bool + FieldInt8 int8 + FieldInt16 int16 + FieldInt32 int32 + FieldInt64 int64 + FieldFloat float32 + FieldDouble float64 + FieldString string + FieldBinaryVector []int + FieldFloatVector []float32 +} +type sampleContent struct { + Rows []sampleRow +} + +// strKeySchema() return a schema with a varchar primary key func strKeySchema() *schemapb.CollectionSchema { schema := &schemapb.CollectionSchema{ Name: "schema", @@ -126,7 +146,7 @@ func strKeySchema() *schemapb.CollectionSchema { Fields: []*schemapb.FieldSchema{ { FieldID: 101, - Name: "uid", + Name: "UID", IsPrimaryKey: true, AutoID: false, Description: "uid", @@ -137,21 +157,21 @@ func strKeySchema() *schemapb.CollectionSchema { }, { FieldID: 102, - Name: "int_scalar", + Name: "FieldInt32", IsPrimaryKey: false, Description: "int_scalar", DataType: schemapb.DataType_Int32, }, { FieldID: 103, - Name: "float_scalar", + Name: "FieldFloat", IsPrimaryKey: false, Description: "float_scalar", DataType: schemapb.DataType_Float, }, { FieldID: 104, - Name: "string_scalar", + Name: "FieldString", IsPrimaryKey: false, Description: "string_scalar", DataType: schemapb.DataType_VarChar, @@ -161,14 +181,14 @@ func strKeySchema() *schemapb.CollectionSchema { }, { FieldID: 105, - Name: "bool_scalar", + Name: "FieldBool", IsPrimaryKey: false, Description: "bool_scalar", DataType: schemapb.DataType_Bool, }, { FieldID: 106, - Name: "vectors", + Name: "FieldFloatVector", IsPrimaryKey: false, Description: "vectors", DataType: schemapb.DataType_FloatVector, @@ -181,6 +201,19 @@ func strKeySchema() *schemapb.CollectionSchema { return schema } +// strKeyContent/strKeyRow is json structs to represent strKeySchema() for testing +type strKeyRow struct { + UID string + FieldInt32 int32 + FieldFloat float32 + FieldString string + FieldBool bool + FieldFloatVector []float32 +} +type strKeyContent struct { + Rows []strKeyRow +} + func jsonNumber(value string) json.Number { return json.Number(value) } @@ -232,6 +265,40 @@ func Test_InitSegmentData(t *testing.T) { assert.Nil(t, data) } +func Test_parseFloat(t *testing.T) { + value, err := parseFloat("dummy", 32, "") + assert.Zero(t, value) + assert.Error(t, err) + + value, err = parseFloat("NaN", 32, "") + assert.Zero(t, value) + assert.Error(t, err) + + value, err = parseFloat("Inf", 32, "") + assert.Zero(t, value) + assert.Error(t, err) + + value, err = parseFloat("Infinity", 32, "") + assert.Zero(t, value) + assert.Error(t, err) + + value, err = parseFloat("3.5e+38", 32, "") + assert.Zero(t, value) + assert.Error(t, err) + + value, err = parseFloat("1.8e+308", 64, "") + assert.Zero(t, value) + assert.Error(t, err) + + value, err = parseFloat("3.14159", 32, "") + assert.True(t, math.Abs(value-3.14159) < 0.000001) + assert.Nil(t, err) + + value, err = parseFloat("2.718281828459045", 64, "") + assert.True(t, math.Abs(value-2.718281828459045) < 0.0000000000000001) + assert.Nil(t, err) +} + func Test_InitValidators(t *testing.T) { validators := make(map[storage.FieldID]*Validator) err := initValidators(nil, validators) @@ -242,6 +309,18 @@ func Test_InitValidators(t *testing.T) { err = initValidators(schema, validators) assert.Nil(t, err) assert.Equal(t, len(schema.Fields), len(validators)) + for _, field := range schema.Fields { + fieldID := field.GetFieldID() + assert.Equal(t, field.GetName(), validators[fieldID].fieldName) + assert.Equal(t, field.GetIsPrimaryKey(), validators[fieldID].primaryKey) + assert.Equal(t, field.GetAutoID(), validators[fieldID].autoID) + if field.GetDataType() != schemapb.DataType_VarChar && field.GetDataType() != schemapb.DataType_String { + assert.False(t, validators[fieldID].isString) + } else { + assert.True(t, validators[fieldID].isString) + } + } + name2ID := make(map[string]storage.FieldID) for _, field := range schema.Fields { name2ID[field.GetName()] = field.GetFieldID() @@ -250,16 +329,6 @@ func Test_InitValidators(t *testing.T) { fields := initSegmentData(schema) assert.NotNil(t, fields) - checkValidateFunc := func(funcName string, validVal interface{}, invalidVal interface{}) { - id := name2ID[funcName] - v, ok := validators[id] - assert.True(t, ok) - err = v.validateFunc(validVal) - assert.Nil(t, err) - err = v.validateFunc(invalidVal) - assert.NotNil(t, err) - } - checkConvertFunc := func(funcName string, validVal interface{}, invalidVal interface{}) { id := name2ID[funcName] v, ok := validators[id] @@ -272,83 +341,61 @@ func Test_InitValidators(t *testing.T) { postNum := fieldData.RowNum() assert.Equal(t, 1, postNum-preNum) - if invalidVal != nil { - err = v.convertFunc(invalidVal, fieldData) - assert.NotNil(t, err) - } + err = v.convertFunc(invalidVal, fieldData) + assert.NotNil(t, err) } - t.Run("check validate functions", func(t *testing.T) { - var validVal interface{} = true - var invalidVal interface{} = "aa" - checkValidateFunc("field_bool", validVal, invalidVal) - - validVal = jsonNumber("100") - invalidVal = "aa" - checkValidateFunc("field_int8", validVal, invalidVal) - checkValidateFunc("field_int16", validVal, invalidVal) - checkValidateFunc("field_int32", validVal, invalidVal) - checkValidateFunc("field_int64", validVal, invalidVal) - checkValidateFunc("field_float", validVal, invalidVal) - checkValidateFunc("field_double", validVal, invalidVal) - - validVal = "aa" - invalidVal = 100 - checkValidateFunc("field_string", validVal, invalidVal) - - // the binary vector dimension is 16, shoud input 2 uint8 values - validVal = []interface{}{jsonNumber("100"), jsonNumber("101")} - invalidVal = "aa" - checkValidateFunc("field_binary_vector", validVal, invalidVal) - invalidVal = []interface{}{jsonNumber("100")} - checkValidateFunc("field_binary_vector", validVal, invalidVal) - invalidVal = []interface{}{jsonNumber("100"), jsonNumber("101"), jsonNumber("102")} - checkValidateFunc("field_binary_vector", validVal, invalidVal) - invalidVal = []interface{}{100, jsonNumber("100")} - checkValidateFunc("field_binary_vector", validVal, invalidVal) - - // the float vector dimension is 4, shoud input 4 float values - validVal = []interface{}{jsonNumber("1"), jsonNumber("2"), jsonNumber("3"), jsonNumber("4")} - invalidVal = true - checkValidateFunc("field_float_vector", validVal, invalidVal) - invalidVal = []interface{}{jsonNumber("1"), jsonNumber("2"), jsonNumber("3")} - checkValidateFunc("field_float_vector", validVal, invalidVal) - invalidVal = []interface{}{jsonNumber("1"), jsonNumber("2"), jsonNumber("3"), jsonNumber("4"), jsonNumber("5")} - checkValidateFunc("field_float_vector", validVal, invalidVal) - invalidVal = []interface{}{"a", "b", "c", "d"} - checkValidateFunc("field_float_vector", validVal, invalidVal) - }) - t.Run("check convert functions", func(t *testing.T) { var validVal interface{} = true - var invalidVal interface{} - checkConvertFunc("field_bool", validVal, invalidVal) + var invalidVal interface{} = 5 + checkConvertFunc("FieldBool", validVal, invalidVal) validVal = jsonNumber("100") invalidVal = jsonNumber("128") - checkConvertFunc("field_int8", validVal, invalidVal) + checkConvertFunc("FieldInt8", validVal, invalidVal) invalidVal = jsonNumber("65536") - checkConvertFunc("field_int16", validVal, invalidVal) + checkConvertFunc("FieldInt16", validVal, invalidVal) invalidVal = jsonNumber("2147483648") - checkConvertFunc("field_int32", validVal, invalidVal) + checkConvertFunc("FieldInt32", validVal, invalidVal) invalidVal = jsonNumber("1.2") - checkConvertFunc("field_int64", validVal, invalidVal) + checkConvertFunc("FieldInt64", validVal, invalidVal) invalidVal = jsonNumber("dummy") - checkConvertFunc("field_float", validVal, invalidVal) - checkConvertFunc("field_double", validVal, invalidVal) + checkConvertFunc("FieldFloat", validVal, invalidVal) + checkConvertFunc("FieldDouble", validVal, invalidVal) + + invalidVal = "6" + checkConvertFunc("FieldInt8", validVal, invalidVal) + checkConvertFunc("FieldInt16", validVal, invalidVal) + checkConvertFunc("FieldInt32", validVal, invalidVal) + checkConvertFunc("FieldInt64", validVal, invalidVal) + checkConvertFunc("FieldFloat", validVal, invalidVal) + checkConvertFunc("FieldDouble", validVal, invalidVal) validVal = "aa" - checkConvertFunc("field_string", validVal, nil) + checkConvertFunc("FieldString", validVal, nil) // the binary vector dimension is 16, shoud input two uint8 values, each value should between 0~255 validVal = []interface{}{jsonNumber("100"), jsonNumber("101")} - invalidVal = []interface{}{jsonNumber("100"), jsonNumber("256")} - checkConvertFunc("field_binary_vector", validVal, invalidVal) + invalidVal = []interface{}{jsonNumber("100"), jsonNumber("1256")} + checkConvertFunc("FieldBinaryVector", validVal, invalidVal) + + invalidVal = false + checkConvertFunc("FieldBinaryVector", validVal, invalidVal) + invalidVal = []interface{}{jsonNumber("100")} + checkConvertFunc("FieldBinaryVector", validVal, invalidVal) + invalidVal = []interface{}{jsonNumber("100"), 0} + checkConvertFunc("FieldBinaryVector", validVal, invalidVal) // the float vector dimension is 4, each value should be valid float number validVal = []interface{}{jsonNumber("1"), jsonNumber("2"), jsonNumber("3"), jsonNumber("4")} invalidVal = []interface{}{jsonNumber("1"), jsonNumber("2"), jsonNumber("3"), jsonNumber("dummy")} - checkConvertFunc("field_float_vector", validVal, invalidVal) + checkConvertFunc("FieldFloatVector", validVal, invalidVal) + invalidVal = false + checkConvertFunc("FieldFloatVector", validVal, invalidVal) + invalidVal = []interface{}{jsonNumber("1")} + checkConvertFunc("FieldFloatVector", validVal, invalidVal) + invalidVal = []interface{}{jsonNumber("1"), jsonNumber("2"), jsonNumber("3"), true} + checkConvertFunc("FieldFloatVector", validVal, invalidVal) }) t.Run("init error cases", func(t *testing.T) { @@ -360,7 +407,7 @@ func Test_InitValidators(t *testing.T) { } schema.Fields = append(schema.Fields, &schemapb.FieldSchema{ FieldID: 111, - Name: "field_float_vector", + Name: "FieldFloatVector", IsPrimaryKey: false, DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{ @@ -375,7 +422,7 @@ func Test_InitValidators(t *testing.T) { schema.Fields = make([]*schemapb.FieldSchema, 0) schema.Fields = append(schema.Fields, &schemapb.FieldSchema{ FieldID: 110, - Name: "field_binary_vector", + Name: "FieldBinaryVector", IsPrimaryKey: false, DataType: schemapb.DataType_BinaryVector, TypeParams: []*commonpb.KeyValuePair{ @@ -410,7 +457,7 @@ func Test_GetFileNameAndExt(t *testing.T) { func Test_GetFieldDimension(t *testing.T) { schema := &schemapb.FieldSchema{ FieldID: 111, - Name: "field_float_vector", + Name: "FieldFloatVector", IsPrimaryKey: false, Description: "float_vector", DataType: schemapb.DataType_FloatVector, diff --git a/internal/util/importutil/import_wrapper.go b/internal/util/importutil/import_wrapper.go index 650e58975d..1c3fb60d04 100644 --- a/internal/util/importutil/import_wrapper.go +++ b/internal/util/importutil/import_wrapper.go @@ -513,26 +513,27 @@ func (p *ImportWrapper) parseRowBasedJSON(filePath string, onlyValidate bool) er // parse file reader := bufio.NewReader(file) parser := NewJSONParser(p.ctx, p.collectionSchema) - var consumer *JSONRowConsumer - if !onlyValidate { - flushFunc := func(fields map[storage.FieldID]storage.FieldData, shardID int) error { + + // if only validate, we input a empty flushFunc so that the consumer do nothing but only validation. + var flushFunc ImportFlushFunc + if onlyValidate { + flushFunc = func(fields map[storage.FieldID]storage.FieldData, shardID int) error { + return nil + } + } else { + flushFunc = func(fields map[storage.FieldID]storage.FieldData, shardID int) error { var filePaths = []string{filePath} printFieldsDataInfo(fields, "import wrapper: prepare to flush binlogs", filePaths) return p.flushFunc(fields, shardID) } - - consumer, err = NewJSONRowConsumer(p.collectionSchema, p.rowIDAllocator, p.shardNum, SingleBlockSize, flushFunc) - if err != nil { - return err - } } - validator, err := NewJSONRowValidator(p.collectionSchema, consumer) + consumer, err := NewJSONRowConsumer(p.collectionSchema, p.rowIDAllocator, p.shardNum, SingleBlockSize, flushFunc) if err != nil { return err } - err = parser.ParseRows(reader, validator) + err = parser.ParseRows(reader, consumer) if err != nil { return err } diff --git a/internal/util/importutil/import_wrapper_test.go b/internal/util/importutil/import_wrapper_test.go index 48ec9dc4e6..07078ebb48 100644 --- a/internal/util/importutil/import_wrapper_test.go +++ b/internal/util/importutil/import_wrapper_test.go @@ -238,11 +238,11 @@ func Test_ImportWrapperRowBased(t *testing.T) { content := []byte(`{ "rows":[ - {"field_bool": true, "field_int8": 10, "field_int16": 101, "field_int32": 1001, "field_int64": 10001, "field_float": 3.14, "field_double": 1.56, "field_string": "hello world", "field_binary_vector": [254, 0], "field_float_vector": [1.1, 1.2, 1.3, 1.4]}, - {"field_bool": false, "field_int8": 11, "field_int16": 102, "field_int32": 1002, "field_int64": 10002, "field_float": 3.15, "field_double": 2.56, "field_string": "hello world", "field_binary_vector": [253, 0], "field_float_vector": [2.1, 2.2, 2.3, 2.4]}, - {"field_bool": true, "field_int8": 12, "field_int16": 103, "field_int32": 1003, "field_int64": 10003, "field_float": 3.16, "field_double": 3.56, "field_string": "hello world", "field_binary_vector": [252, 0], "field_float_vector": [3.1, 3.2, 3.3, 3.4]}, - {"field_bool": false, "field_int8": 13, "field_int16": 104, "field_int32": 1004, "field_int64": 10004, "field_float": 3.17, "field_double": 4.56, "field_string": "hello world", "field_binary_vector": [251, 0], "field_float_vector": [4.1, 4.2, 4.3, 4.4]}, - {"field_bool": true, "field_int8": 14, "field_int16": 105, "field_int32": 1005, "field_int64": 10005, "field_float": 3.18, "field_double": 5.56, "field_string": "hello world", "field_binary_vector": [250, 0], "field_float_vector": [5.1, 5.2, 5.3, 5.4]} + {"FieldBool": true, "FieldInt8": 10, "FieldInt16": 101, "FieldInt32": 1001, "FieldInt64": 10001, "FieldFloat": 3.14, "FieldDouble": 1.56, "FieldString": "hello world", "FieldBinaryVector": [254, 0], "FieldFloatVector": [1.1, 1.2, 1.3, 1.4]}, + {"FieldBool": false, "FieldInt8": 11, "FieldInt16": 102, "FieldInt32": 1002, "FieldInt64": 10002, "FieldFloat": 3.15, "FieldDouble": 2.56, "FieldString": "hello world", "FieldBinaryVector": [253, 0], "FieldFloatVector": [2.1, 2.2, 2.3, 2.4]}, + {"FieldBool": true, "FieldInt8": 12, "FieldInt16": 103, "FieldInt32": 1003, "FieldInt64": 10003, "FieldFloat": 3.16, "FieldDouble": 3.56, "FieldString": "hello world", "FieldBinaryVector": [252, 0], "FieldFloatVector": [3.1, 3.2, 3.3, 3.4]}, + {"FieldBool": false, "FieldInt8": 13, "FieldInt16": 104, "FieldInt32": 1004, "FieldInt64": 10004, "FieldFloat": 3.17, "FieldDouble": 4.56, "FieldString": "hello world", "FieldBinaryVector": [251, 0], "FieldFloatVector": [4.1, 4.2, 4.3, 4.4]}, + {"FieldBool": true, "FieldInt8": 14, "FieldInt16": 105, "FieldInt32": 1005, "FieldInt64": 10005, "FieldFloat": 3.18, "FieldDouble": 5.56, "FieldString": "hello world", "FieldBinaryVector": [250, 0], "FieldFloatVector": [5.1, 5.2, 5.3, 5.4]} ] }`) @@ -285,7 +285,7 @@ func Test_ImportWrapperRowBased(t *testing.T) { // parse error content = []byte(`{ "rows":[ - {"field_bool": true, "field_int8": false, "field_int16": 101, "field_int32": 1001, "field_int64": 10001, "field_float": 3.14, "field_double": 1.56, "field_string": "hello world", "field_binary_vector": [254, 0], "field_float_vector": [1.1, 1.2, 1.3, 1.4]}, + {"FieldBool": true, "FieldInt8": false, "FieldInt16": 101, "FieldInt32": 1001, "FieldInt64": 10001, "FieldFloat": 3.14, "FieldDouble": 1.56, "FieldString": "hello world", "FieldBinaryVector": [254, 0], "FieldFloatVector": [1.1, 1.2, 1.3, 1.4]}, ] }`) @@ -313,70 +313,70 @@ func createSampleNumpyFiles(t *testing.T, cm storage.ChunkManager) []string { ctx := context.Background() files := make([]string, 0) - filePath := "field_bool.npy" + filePath := "FieldBool.npy" content, err := CreateNumpyData([]bool{true, false, true, true, true}) assert.Nil(t, err) err = cm.Write(ctx, filePath, content) assert.NoError(t, err) files = append(files, filePath) - filePath = "field_int8.npy" + filePath = "FieldInt8.npy" content, err = CreateNumpyData([]int8{10, 11, 12, 13, 14}) assert.Nil(t, err) err = cm.Write(ctx, filePath, content) assert.NoError(t, err) files = append(files, filePath) - filePath = "field_int16.npy" + filePath = "FieldInt16.npy" content, err = CreateNumpyData([]int16{100, 101, 102, 103, 104}) assert.Nil(t, err) err = cm.Write(ctx, filePath, content) assert.NoError(t, err) files = append(files, filePath) - filePath = "field_int32.npy" + filePath = "FieldInt32.npy" content, err = CreateNumpyData([]int32{1000, 1001, 1002, 1003, 1004}) assert.Nil(t, err) err = cm.Write(ctx, filePath, content) assert.NoError(t, err) files = append(files, filePath) - filePath = "field_int64.npy" + filePath = "FieldInt64.npy" content, err = CreateNumpyData([]int64{10000, 10001, 10002, 10003, 10004}) assert.Nil(t, err) err = cm.Write(ctx, filePath, content) assert.NoError(t, err) files = append(files, filePath) - filePath = "field_float.npy" + filePath = "FieldFloat.npy" content, err = CreateNumpyData([]float32{3.14, 3.15, 3.16, 3.17, 3.18}) assert.Nil(t, err) err = cm.Write(ctx, filePath, content) assert.NoError(t, err) files = append(files, filePath) - filePath = "field_double.npy" + filePath = "FieldDouble.npy" content, err = CreateNumpyData([]float64{5.1, 5.2, 5.3, 5.4, 5.5}) assert.Nil(t, err) err = cm.Write(ctx, filePath, content) assert.NoError(t, err) files = append(files, filePath) - filePath = "field_string.npy" + filePath = "FieldString.npy" content, err = CreateNumpyData([]string{"a", "bb", "ccc", "dd", "e"}) assert.Nil(t, err) err = cm.Write(ctx, filePath, content) assert.NoError(t, err) files = append(files, filePath) - filePath = "field_binary_vector.npy" + filePath = "FieldBinaryVector.npy" content, err = CreateNumpyData([][2]uint8{{1, 2}, {3, 4}, {5, 6}, {7, 8}, {9, 10}}) assert.Nil(t, err) err = cm.Write(ctx, filePath, content) assert.NoError(t, err) files = append(files, filePath) - filePath = "field_float_vector.npy" + filePath = "FieldFloatVector.npy" content, err = CreateNumpyData([][4]float32{{1, 2, 3, 4}, {3, 4, 5, 6}, {5, 6, 7, 8}, {7, 8, 9, 10}, {9, 10, 11, 12}}) assert.Nil(t, err) err = cm.Write(ctx, filePath, content) @@ -430,7 +430,7 @@ func Test_ImportWrapperColumnBased_numpy(t *testing.T) { assert.Equal(t, commonpb.ImportState_ImportPersisted, importResult.State) // row count of fields not equal - filePath := "field_int8.npy" + filePath := "FieldInt8.npy" content, err := CreateNumpyData([]int8{10}) assert.Nil(t, err) err = cm.Write(ctx, filePath, content) @@ -779,11 +779,11 @@ func Test_ImportWrapperReportFailRowBased(t *testing.T) { content := []byte(`{ "rows":[ - {"field_bool": true, "field_int8": 10, "field_int16": 101, "field_int32": 1001, "field_int64": 10001, "field_float": 3.14, "field_double": 1.56, "field_string": "hello world", "field_binary_vector": [254, 0], "field_float_vector": [1.1, 1.2, 1.3, 1.4]}, - {"field_bool": false, "field_int8": 11, "field_int16": 102, "field_int32": 1002, "field_int64": 10002, "field_float": 3.15, "field_double": 2.56, "field_string": "hello world", "field_binary_vector": [253, 0], "field_float_vector": [2.1, 2.2, 2.3, 2.4]}, - {"field_bool": true, "field_int8": 12, "field_int16": 103, "field_int32": 1003, "field_int64": 10003, "field_float": 3.16, "field_double": 3.56, "field_string": "hello world", "field_binary_vector": [252, 0], "field_float_vector": [3.1, 3.2, 3.3, 3.4]}, - {"field_bool": false, "field_int8": 13, "field_int16": 104, "field_int32": 1004, "field_int64": 10004, "field_float": 3.17, "field_double": 4.56, "field_string": "hello world", "field_binary_vector": [251, 0], "field_float_vector": [4.1, 4.2, 4.3, 4.4]}, - {"field_bool": true, "field_int8": 14, "field_int16": 105, "field_int32": 1005, "field_int64": 10005, "field_float": 3.18, "field_double": 5.56, "field_string": "hello world", "field_binary_vector": [250, 0], "field_float_vector": [5.1, 5.2, 5.3, 5.4]} + {"FieldBool": true, "FieldInt8": 10, "FieldInt16": 101, "FieldInt32": 1001, "FieldInt64": 10001, "FieldFloat": 3.14, "FieldDouble": 1.56, "FieldString": "hello world", "FieldBinaryVector": [254, 0], "FieldFloatVector": [1.1, 1.2, 1.3, 1.4]}, + {"FieldBool": false, "FieldInt8": 11, "FieldInt16": 102, "FieldInt32": 1002, "FieldInt64": 10002, "FieldFloat": 3.15, "FieldDouble": 2.56, "FieldString": "hello world", "FieldBinaryVector": [253, 0], "FieldFloatVector": [2.1, 2.2, 2.3, 2.4]}, + {"FieldBool": true, "FieldInt8": 12, "FieldInt16": 103, "FieldInt32": 1003, "FieldInt64": 10003, "FieldFloat": 3.16, "FieldDouble": 3.56, "FieldString": "hello world", "FieldBinaryVector": [252, 0], "FieldFloatVector": [3.1, 3.2, 3.3, 3.4]}, + {"FieldBool": false, "FieldInt8": 13, "FieldInt16": 104, "FieldInt32": 1004, "FieldInt64": 10004, "FieldFloat": 3.17, "FieldDouble": 4.56, "FieldString": "hello world", "FieldBinaryVector": [251, 0], "FieldFloatVector": [4.1, 4.2, 4.3, 4.4]}, + {"FieldBool": true, "FieldInt8": 14, "FieldInt16": 105, "FieldInt32": 1005, "FieldInt64": 10005, "FieldFloat": 3.18, "FieldDouble": 5.56, "FieldString": "hello world", "FieldBinaryVector": [250, 0], "FieldFloatVector": [5.1, 5.2, 5.3, 5.4]} ] }`) diff --git a/internal/util/importutil/json_handler.go b/internal/util/importutil/json_handler.go index e9d3feb368..73189809e9 100644 --- a/internal/util/importutil/json_handler.go +++ b/internal/util/importutil/json_handler.go @@ -20,7 +20,6 @@ import ( "encoding/json" "errors" "fmt" - "reflect" "strconv" "go.uber.org/zap" @@ -40,92 +39,12 @@ type JSONRowHandler interface { // Validator is field value validator type Validator struct { - validateFunc func(obj interface{}) error // validate data type function - convertFunc func(obj interface{}, field storage.FieldData) error // convert data function - primaryKey bool // true for primary key - autoID bool // only for primary key field - isString bool // for string field - dimension int // only for vector field - fieldName string // field name -} - -// JSONRowValidator is row-based json format validator class -type JSONRowValidator struct { - downstream JSONRowHandler // downstream processor, typically is a JSONRowComsumer - validators map[storage.FieldID]*Validator // validators for each field - rowCounter int64 // how many rows have been validated -} - -func NewJSONRowValidator(collectionSchema *schemapb.CollectionSchema, downstream JSONRowHandler) (*JSONRowValidator, error) { - v := &JSONRowValidator{ - validators: make(map[storage.FieldID]*Validator), - downstream: downstream, - rowCounter: 0, - } - err := initValidators(collectionSchema, v.validators) - if err != nil { - log.Error("JSON row validator: failed to initialize json row-based validator", zap.Error(err)) - return nil, err - } - return v, nil -} - -func (v *JSONRowValidator) ValidateCount() int64 { - return v.rowCounter -} - -func (v *JSONRowValidator) Handle(rows []map[storage.FieldID]interface{}) error { - if v == nil || v.validators == nil || len(v.validators) == 0 { - log.Error("JSON row validator is not initialized") - return errors.New("JSON row validator is not initialized") - } - - // parse completed - if rows == nil { - log.Info("JSON row validation finished") - if v.downstream != nil && !reflect.ValueOf(v.downstream).IsNil() { - return v.downstream.Handle(rows) - } - return nil - } - - for i := 0; i < len(rows); i++ { - row := rows[i] - - for id, validator := range v.validators { - value, ok := row[id] - if validator.primaryKey && validator.autoID { - // primary key is auto-generated, if user provided it, return error - if ok { - log.Error("JSON row validator: primary key is auto-generated, no need to provide PK value at the row", - zap.String("fieldName", validator.fieldName), zap.Int64("rowNumber", v.rowCounter+int64(i))) - return fmt.Errorf("the primary key '%s' is auto-generated, no need to provide PK value at the row %d", - validator.fieldName, v.rowCounter+int64(i)) - } - continue - } - if !ok { - log.Error("JSON row validator: field missed at the row", - zap.String("fieldName", validator.fieldName), zap.Int64("rowNumber", v.rowCounter+int64(i))) - return fmt.Errorf("the field '%s' missed at the row %d", validator.fieldName, v.rowCounter+int64(i)) - } - - if err := validator.validateFunc(value); err != nil { - log.Error("JSON row validator: invalid value at the row", zap.String("fieldName", validator.fieldName), - zap.Int64("rowNumber", v.rowCounter+int64(i)), zap.Any("value", value), zap.Error(err)) - return fmt.Errorf("the field '%s' value at the row %d is invalid, error: %s", - validator.fieldName, v.rowCounter+int64(i), err.Error()) - } - } - } - - v.rowCounter += int64(len(rows)) - - if v.downstream != nil && !reflect.ValueOf(v.downstream).IsNil() { - return v.downstream.Handle(rows) - } - - return nil + convertFunc func(obj interface{}, field storage.FieldData) error // convert data function + primaryKey bool // true for primary key + autoID bool // only for primary key field + isString bool // for string field + dimension int // only for vector field + fieldName string // field name } // JSONRowConsumer is row-based json format consumer class @@ -203,6 +122,10 @@ func (v *JSONRowConsumer) IDRange() []int64 { return v.autoIDRange } +func (v *JSONRowConsumer) RowCount() int64 { + return v.rowCounter +} + func (v *JSONRowConsumer) flush(force bool) error { // force flush all data if force { @@ -277,6 +200,10 @@ func (v *JSONRowConsumer) Handle(rows []map[storage.FieldID]interface{}) error { var rowIDBegin typeutil.UniqueID var rowIDEnd typeutil.UniqueID if primaryValidator.autoID { + if v.rowIDAllocator == nil { + log.Error("JSON row consumer: primary keys is auto-generated but IDAllocator is nil") + return fmt.Errorf("primary keys is auto-generated but IDAllocator is nil") + } var err error rowIDBegin, rowIDEnd, err = v.rowIDAllocator.Alloc(uint32(len(rows))) if err != nil { diff --git a/internal/util/importutil/json_handler_test.go b/internal/util/importutil/json_handler_test.go index 7258fb6f24..6993e67ff1 100644 --- a/internal/util/importutil/json_handler_test.go +++ b/internal/util/importutil/json_handler_test.go @@ -19,7 +19,6 @@ package importutil import ( "context" "errors" - "strings" "testing" "github.com/stretchr/testify/assert" @@ -59,106 +58,6 @@ func newIDAllocator(ctx context.Context, t *testing.T, allocErr error) *allocato return idAllocator } -func Test_NewJSONRowValidator(t *testing.T) { - validator, err := NewJSONRowValidator(nil, nil) - assert.NotNil(t, err) - assert.Nil(t, validator) - - validator, err = NewJSONRowValidator(sampleSchema(), nil) - assert.NotNil(t, validator) - assert.Nil(t, err) -} - -func Test_JSONRowValidator(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - schema := sampleSchema() - parser := NewJSONParser(ctx, schema) - assert.NotNil(t, parser) - - // 0 row case - reader := strings.NewReader(`{ - "rows":[] - }`) - - validator, err := NewJSONRowValidator(schema, nil) - assert.NotNil(t, validator) - assert.Nil(t, err) - - err = parser.ParseRows(reader, validator) - assert.NotNil(t, err) - assert.Equal(t, int64(0), validator.ValidateCount()) - - // missed some fields - reader = strings.NewReader(`{ - "rows":[ - {"field_bool": true, "field_int8": 10, "field_int16": 101, "field_int32": 1001, "field_int64": 10001, "field_float": 3.14, "field_double": 1.56, "field_string": "hello world", "field_binary_vector": [254, 0], "field_float_vector": [1.1, 1.2, 1.3, 1.4]}, - {"field_int64": 10001, "field_float": 3.14, "field_double": 1.56, "field_string": "hello world", "field_binary_vector": [254, 0], "field_float_vector": [1.1, 1.2, 1.3, 1.4]} - ] - }`) - err = parser.ParseRows(reader, validator) - assert.NotNil(t, err) - - // invalid dimension - reader = strings.NewReader(`{ - "rows":[ - {"field_bool": true, "field_int8": true, "field_int16": 101, "field_int32": 1001, "field_int64": 10001, "field_float": 3.14, "field_double": 1.56, "field_string": "hello world", "field_binary_vector": [254, 0, 1, 66, 128, 0, 1, 66], "field_float_vector": [1.1, 1.2, 1.3, 1.4]} - ] - }`) - err = parser.ParseRows(reader, validator) - assert.NotNil(t, err) - - // invalid value type - reader = strings.NewReader(`{ - "rows":[ - {"field_bool": true, "field_int8": true, "field_int16": 101, "field_int32": 1001, "field_int64": 10001, "field_float": 3.14, "field_double": 1.56, "field_string": "hello world", "field_binary_vector": [254, 0], "field_float_vector": [1.1, 1.2, 1.3, 1.4]} - ] - }`) - err = parser.ParseRows(reader, validator) - assert.NotNil(t, err) - - // init failed - validator.validators = nil - err = validator.Handle(nil) - assert.NotNil(t, err) - - // primary key is auto-generate, but user provide pk value, return error - schema = &schemapb.CollectionSchema{ - Name: "schema", - Description: "schema", - AutoID: true, - Fields: []*schemapb.FieldSchema{ - { - FieldID: 101, - Name: "ID", - IsPrimaryKey: true, - AutoID: true, - DataType: schemapb.DataType_Int64, - }, - { - FieldID: 102, - Name: "Age", - IsPrimaryKey: false, - DataType: schemapb.DataType_Int64, - }, - }, - } - - validator, err = NewJSONRowValidator(schema, nil) - assert.NotNil(t, validator) - assert.Nil(t, err) - - reader = strings.NewReader(`{ - "rows":[ - {"ID": 1, "Age": 2} - ] - }`) - parser = NewJSONParser(ctx, schema) - err = parser.ParseRows(reader, validator) - assert.NotNil(t, err) -} - func Test_NewJSONRowConsumer(t *testing.T) { // nil schema consumer, err := NewJSONRowConsumer(nil, nil, 2, 16, nil) @@ -203,67 +102,6 @@ func Test_NewJSONRowConsumer(t *testing.T) { assert.Nil(t, err) } -func Test_JSONRowConsumer(t *testing.T) { - ctx := context.Background() - idAllocator := newIDAllocator(ctx, t, nil) - - schema := sampleSchema() - parser := NewJSONParser(ctx, schema) - assert.NotNil(t, parser) - - reader := strings.NewReader(`{ - "rows":[ - {"field_bool": true, "field_int8": 10, "field_int16": 101, "field_int32": 1001, "field_int64": 10001, "field_float": 3.14, "field_double": 1.56, "field_string": "hello world", "field_binary_vector": [254, 0], "field_float_vector": [1.1, 1.2, 1.3, 1.4]}, - {"field_bool": false, "field_int8": 11, "field_int16": 102, "field_int32": 1002, "field_int64": 10002, "field_float": 3.15, "field_double": 2.56, "field_string": "hello world", "field_binary_vector": [253, 0], "field_float_vector": [2.1, 2.2, 2.3, 2.4]}, - {"field_bool": true, "field_int8": 12, "field_int16": 103, "field_int32": 1003, "field_int64": 10003, "field_float": 3.16, "field_double": 3.56, "field_string": "hello world", "field_binary_vector": [252, 0], "field_float_vector": [3.1, 3.2, 3.3, 3.4]}, - {"field_bool": false, "field_int8": 13, "field_int16": 104, "field_int32": 1004, "field_int64": 10004, "field_float": 3.17, "field_double": 4.56, "field_string": "hello world", "field_binary_vector": [251, 0], "field_float_vector": [4.1, 4.2, 4.3, 4.4]}, - {"field_bool": true, "field_int8": 14, "field_int16": 105, "field_int32": 1005, "field_int64": 10005, "field_float": 3.18, "field_double": 5.56, "field_string": "hello world", "field_binary_vector": [250, 0], "field_float_vector": [5.1, 5.2, 5.3, 5.4]} - ] - }`) - - var shardNum int32 = 2 - var callTime int32 - var totalCount int - consumeFunc := func(fields map[storage.FieldID]storage.FieldData, shard int) error { - assert.Equal(t, int(callTime), shard) - callTime++ - rowCount := 0 - for _, data := range fields { - if rowCount == 0 { - rowCount = data.RowNum() - } else { - assert.Equal(t, rowCount, data.RowNum()) - } - } - totalCount += rowCount - return nil - } - - consumer, err := NewJSONRowConsumer(schema, idAllocator, shardNum, 1, consumeFunc) - assert.NotNil(t, consumer) - assert.Nil(t, err) - - validator, err := NewJSONRowValidator(schema, consumer) - assert.NotNil(t, validator) - assert.Nil(t, err) - - err = parser.ParseRows(reader, validator) - assert.Nil(t, err) - assert.Equal(t, int64(5), validator.ValidateCount()) - - assert.Equal(t, shardNum, callTime) - assert.Equal(t, 5, totalCount) - - // parse primary key error - reader = strings.NewReader(`{ - "rows":[ - {"field_bool": true, "field_int8": 10, "field_int16": 101, "field_int32": 1001, "field_int64": 0.5, "field_float": 3.14, "field_double": 1.56, "field_string": "hello world", "field_binary_vector": [254, 0], "field_float_vector": [1.1, 1.2, 1.3, 1.4]} - ] - }`) - err = parser.ParseRows(reader, validator) - assert.Error(t, err) -} - func Test_JSONRowConsumerFlush(t *testing.T) { var callTime int32 var totalCount int @@ -408,6 +246,11 @@ func Test_JSONRowConsumerHandle(t *testing.T) { assert.Equal(t, 2, len(consumer.autoIDRange)) assert.Equal(t, int64(1), consumer.autoIDRange[0]) assert.Equal(t, int64(1+rowCount), consumer.autoIDRange[1]) + + // pk is auto-generated byt IDAllocator is nil + consumer.rowIDAllocator = nil + err = consumer.Handle(input) + assert.Error(t, err) }) t.Run("handle varchar pk", func(t *testing.T) { @@ -452,133 +295,7 @@ func Test_JSONRowConsumerHandle(t *testing.T) { err = consumer.Handle(input) assert.Nil(t, err) - assert.Equal(t, int64(rowCount), consumer.rowCounter) + assert.Equal(t, int64(rowCount), consumer.RowCount()) assert.Equal(t, 0, len(consumer.autoIDRange)) }) } - -func Test_JSONRowConsumerStringKey(t *testing.T) { - ctx := context.Background() - idAllocator := newIDAllocator(ctx, t, nil) - - schema := strKeySchema() - parser := NewJSONParser(ctx, schema) - assert.NotNil(t, parser) - - reader := strings.NewReader(`{ - "rows": [{ - "uid": "Dm4aWrbNzhmjwCTEnCJ9LDPO2N09sqysxgVfbH9Zmn3nBzmwsmk0eZN6x7wSAoPQ", - "int_scalar": 9070353, - "float_scalar": 0.9798043638085004, - "string_scalar": "ShQ44OX0z8kGpRPhaXmfSsdH7JHq5DsZzu0e2umS1hrWG0uONH2RIIAdOECaaXir", - "bool_scalar": true, - "vectors": [0.5040062902126952, 0.8297619818664708, 0.20248342801564806, 0.12834786423659314] - }, - { - "uid": "RP50U0d2napRjXu94a8oGikWgklvVsXFurp8RR4tHGw7N0gk1b7opm59k3FCpyPb", - "int_scalar": 8505288, - "float_scalar": 0.937913432198687, - "string_scalar": "Ld4b0avxathBdNvCrtm3QsWO1pYktUVR7WgAtrtozIwrA8vpeactNhJ85CFGQnK5", - "bool_scalar": false, - "vectors": [0.528232122836893, 0.6916116750653186, 0.41443762522548705, 0.26624344144792056] - }, - { - "uid": "oxhFkQitWPPw0Bjmj7UQcn4iwvS0CU7RLAC81uQFFQjWtOdiB329CPyWkfGSeYfE", - "int_scalar": 4392660, - "float_scalar": 0.32381232630490264, - "string_scalar": "EmAlB0xdQcxeBtwlZJQnLgKodiuRinynoQtg0eXrjkq24dQohzSm7Bx3zquHd3kO", - "bool_scalar": false, - "vectors": [0.7978693027281338, 0.12394906726785092, 0.42431962903815285, 0.4098707807351914] - }, - { - "uid": "sxoEL4Mpk1LdsyXhbNm059UWJ3CvxURLCQczaVI5xtBD4QcVWTDFUW7dBdye6nbn", - "int_scalar": 7927425, - "float_scalar": 0.31074026464844895, - "string_scalar": "fdY2beCvs1wSws0Gb9ySD92xwfEfJpX5DQgsWoISylBAoYOcXpRaqIJoXYS4g269", - "bool_scalar": true, - "vectors": [0.3716157812069954, 0.006981281113265229, 0.9007003458552365, 0.22492634316191004] - }, - { - "uid": "g33Rqq2UQSHPRHw5FvuXxf5uGEhIAetxE6UuXXCJj0hafG8WuJr1ueZftsySCqAd", - "int_scalar": 9288807, - "float_scalar": 0.4953578200336135, - "string_scalar": "6f8Iv1zQAGksj5XxMbbI5evTrYrB8fSFQ58jl0oU7Z4BpA81VsD2tlWqkhfoBNa7", - "bool_scalar": false, - "vectors": [0.5921374209648096, 0.04234832587925662, 0.7803878096531548, 0.1964045837884633] - }, - { - "uid": "ACIJd7lTXkRgUNmlQk6AbnWIKEEV8Z6OS3vDcm0w9psmt9sH3z1JLg1fNVCqiX3d", - "int_scalar": 1173595, - "float_scalar": 0.9000745450802002, - "string_scalar": "gpj9YctF2ig1l1APkvRzHbVE8PZVKRbk7nvW73qS2uQbY5l7MeIeTPwRBjasbY8z", - "bool_scalar": true, - "vectors": [0.4655121736168688, 0.6195496905333787, 0.5316616196326639, 0.3417492053890768] - }, - { - "uid": "f0wRVZZ9u1bEKrAjLeZj3oliEnUjBiUl6TiermeczceBmGe6M2RHONgz3qEogrd5", - "int_scalar": 3722368, - "float_scalar": 0.7212299175768438, - "string_scalar": "xydiejGUlvS49BfBuy1EuYRKt3v2oKwC6pqy7Ga4dGWn3BnQigV4XAGawixDAGHN", - "bool_scalar": false, - "vectors": [0.6173164237304075, 0.374107748459483, 0.3686321416317251, 0.585725336391797] - }, - { - "uid": "uXq9q96vUqnDebcUISFkRFT27OjD89DWhok6urXIjTuLzaSWnCVTJkrJXxFctSg0", - "int_scalar": 1940731, - "float_scalar": 0.9524404085944204, - "string_scalar": "ZXSNzR5V3t62fjop7b7DHK56ByAF0INYwycKsu6OxGP4p2j0Obs6l0NUqukypGXd", - "bool_scalar": false, - "vectors": [0.07178869784465443, 0.4208459174227864, 0.5882811425075762, 0.6867753592116734] - }, - { - "uid": "EXDDklLvQIfeCJN8cES3b9mdCYDQVhq2iLj8WWA3TPtZ1SZ4Jpidj7OXJidSD7Wn", - "int_scalar": 2158426, - "float_scalar": 0.23770219927963454, - "string_scalar": "9TNeKVSMqTP8Zxs90kaAcB7n6JbIcvFWInzi9JxZQgmYxD5xLYwaCoeUzRiNAxAg", - "bool_scalar": false, - "vectors": [0.5659468293534021, 0.6275816433340369, 0.3978846871291008, 0.3571179679645908] - }, - { - "uid": "mlaXOgYvB88WWRpXNyWv6UqpmvIHrC6pRo03AtaPLMpVymu0L9ioO8GWa1XgGyj0", - "int_scalar": 198279, - "float_scalar": 0.020343767010139513, - "string_scalar": "AblYGRZJiMAwDbMEkungG0yKTeuya4FgyliakWWqSOJ5TvQWB9Ki2WXbnvSsYIDF", - "bool_scalar": true, - "vectors": [0.5374636140212398, 0.7655373567912009, 0.05491796821609715, 0.349384366747262] - } - ] - }`) - - var shardNum int32 = 2 - var callTime int32 - var totalCount int - consumeFunc := func(fields map[storage.FieldID]storage.FieldData, shard int) error { - assert.Equal(t, int(callTime), shard) - callTime++ - rowCount := 0 - for _, data := range fields { - if rowCount == 0 { - rowCount = data.RowNum() - } else { - assert.Equal(t, rowCount, data.RowNum()) - } - } - totalCount += rowCount - return nil - } - - consumer, err := NewJSONRowConsumer(schema, idAllocator, shardNum, 1, consumeFunc) - assert.NotNil(t, consumer) - assert.Nil(t, err) - - validator, err := NewJSONRowValidator(schema, consumer) - assert.NotNil(t, validator) - assert.Nil(t, err) - - err = parser.ParseRows(reader, validator) - assert.Nil(t, err) - assert.Equal(t, int64(10), validator.ValidateCount()) - - assert.Equal(t, shardNum, callTime) - assert.Equal(t, 10, totalCount) -} diff --git a/internal/util/importutil/json_parser.go b/internal/util/importutil/json_parser.go index 7082da2746..ffe48cc04d 100644 --- a/internal/util/importutil/json_parser.go +++ b/internal/util/importutil/json_parser.go @@ -25,6 +25,7 @@ import ( "strings" "github.com/milvus-io/milvus-proto/go-api/schemapb" + "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/typeutil" @@ -53,6 +54,15 @@ func NewJSONParser(ctx context.Context, collectionSchema *schemapb.CollectionSch name2FieldID := make(map[string]storage.FieldID) for i := 0; i < len(collectionSchema.Fields); i++ { schema := collectionSchema.Fields[i] + // RowIDField and TimeStampField is internal field, no need to parse + if schema.GetFieldID() == common.RowIDField || schema.GetFieldID() == common.TimeStampField { + continue + } + // if primary key field is auto-gernerated, no need to parse + if schema.GetAutoID() { + continue + } + fields[schema.GetName()] = 0 name2FieldID[schema.GetName()] = schema.GetFieldID() } @@ -89,6 +99,38 @@ func adjustBufSize(parser *JSONParser, collectionSchema *schemapb.CollectionSche parser.bufSize = int64(bufSize) } +func (p *JSONParser) verifyRow(raw interface{}) (map[storage.FieldID]interface{}, error) { + stringMap, ok := raw.(map[string]interface{}) + if !ok { + log.Error("JSON parser: invalid JSON format, each row should be a key-value map") + return nil, errors.New("invalid JSON format, each row should be a key-value map") + } + + row := make(map[storage.FieldID]interface{}) + for k, v := range stringMap { + // if user provided redundant field, return error + fieldID, ok := p.name2FieldID[k] + if !ok { + log.Error("JSON parser: the field is not defined in collection schema", zap.String("fieldName", k)) + return nil, fmt.Errorf("the field '%s' is not defined in collection schema", k) + } + row[fieldID] = v + } + + // some fields not provided? + if len(row) != len(p.name2FieldID) { + for k, v := range p.name2FieldID { + _, ok := row[v] + if !ok { + log.Error("JSON parser: a field value is missed", zap.String("fieldName", k)) + return nil, fmt.Errorf("value of field '%s' is missed", k) + } + } + } + + return row, nil +} + func (p *JSONParser) ParseRows(r io.Reader, handler JSONRowHandler) error { if handler == nil { log.Error("JSON parse handler is nil") @@ -151,24 +193,9 @@ func (p *JSONParser) ParseRows(r io.Reader, handler JSONRowHandler) error { return fmt.Errorf("failed to parse row value, error: %w", err) } - switch value.(type) { - case map[string]interface{}: - break - default: - log.Error("JSON parser: invalid JSON format, each row should be a key-value map") - return errors.New("invalid JSON format, each row should be a key-value map") - } - - row := make(map[storage.FieldID]interface{}) - stringMap := value.(map[string]interface{}) - for k, v := range stringMap { - // if user provided redundant field, return error - fieldID, ok := p.name2FieldID[k] - if !ok { - log.Error("JSON parser: the field is not defined in collection schema", zap.String("fieldName", k)) - return fmt.Errorf("the field '%s' is not defined in collection schema", k) - } - row[fieldID] = v + row, err := p.verifyRow(value) + if err != nil { + return err } buf = append(buf, row) diff --git a/internal/util/importutil/json_parser_test.go b/internal/util/importutil/json_parser_test.go index 21b91fc7a3..045b4f86fc 100644 --- a/internal/util/importutil/json_parser_test.go +++ b/internal/util/importutil/json_parser_test.go @@ -18,15 +18,38 @@ package importutil import ( "context" + "encoding/json" + "errors" + "math" + "strconv" "strings" "testing" "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus-proto/go-api/schemapb" + "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/typeutil" "github.com/stretchr/testify/assert" ) +// mock class of JSONRowCounsumer +type mockJSONRowConsumer struct { + handleErr error + rows []map[storage.FieldID]interface{} + handleCount int +} + +func (v *mockJSONRowConsumer) Handle(rows []map[storage.FieldID]interface{}) error { + if v.handleErr != nil { + return v.handleErr + } + if rows != nil { + v.rows = append(v.rows, rows...) + } + v.handleCount++ + return nil +} + func Test_AdjustBufSize(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -64,237 +87,302 @@ func Test_AdjustBufSize(t *testing.T) { assert.Equal(t, int64(MinBufferSize), parser.bufSize) } -func Test_JSONParserParserRows(t *testing.T) { +func Test_JSONParserParseRows_IntPK(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() schema := sampleSchema() parser := NewJSONParser(ctx, schema) assert.NotNil(t, parser) - parser.bufSize = 1 - reader := strings.NewReader(`{ - "rows":[ - {"field_bool": true, "field_int8": 10, "field_int16": 101, "field_int32": 1001, "field_int64": 10001, "field_float": 3.14, "field_double": 1.56, "field_string": "hello world", "field_binary_vector": [254, 0], "field_float_vector": [1.1, 1.2, 1.3, 1.4]}, - {"field_bool": false, "field_int8": 11, "field_int16": 102, "field_int32": 1002, "field_int64": 10002, "field_float": 3.15, "field_double": 2.56, "field_string": "hello world", "field_binary_vector": [253, 0], "field_float_vector": [2.1, 2.2, 2.3, 2.4]}, - {"field_bool": true, "field_int8": 12, "field_int16": 103, "field_int32": 1003, "field_int64": 10003, "field_float": 3.16, "field_double": 3.56, "field_string": "hello world", "field_binary_vector": [252, 0], "field_float_vector": [3.1, 3.2, 3.3, 3.4]}, - {"field_bool": false, "field_int8": 13, "field_int16": 104, "field_int32": 1004, "field_int64": 10004, "field_float": 3.17, "field_double": 4.56, "field_string": "hello world", "field_binary_vector": [251, 0], "field_float_vector": [4.1, 4.2, 4.3, 4.4]}, - {"field_bool": true, "field_int8": 14, "field_int16": 105, "field_int32": 1005, "field_int64": 10005, "field_float": 3.18, "field_double": 5.56, "field_string": "hello world", "field_binary_vector": [250, 0], "field_float_vector": [5.1, 5.2, 5.3, 5.4]} - ] - }`) + // prepare test data + content := &sampleContent{ + Rows: make([]sampleRow, 0), + } + for i := 0; i < 10; i++ { + row := sampleRow{ + FieldBool: i%2 == 0, + FieldInt8: int8(i % math.MaxInt8), + FieldInt16: int16(100 + i), + FieldInt32: int32(1000 + i), + FieldInt64: int64(99999999999999999 + i), + FieldFloat: 3 + float32(i)/11, + FieldDouble: 1 + float64(i)/7, + FieldString: "No." + strconv.FormatInt(int64(i), 10), + FieldBinaryVector: []int{(200 + i) % math.MaxUint8, 0}, + FieldFloatVector: []float32{float32(i) + 0.1, float32(i) + 0.2, float32(i) + 0.3, float32(i) + 0.4}, + } + content.Rows = append(content.Rows, row) + } - // handler is nil - err := parser.ParseRows(reader, nil) - assert.NotNil(t, err) - - validator, err := NewJSONRowValidator(schema, nil) - assert.NotNil(t, validator) + binContent, err := json.Marshal(content) assert.Nil(t, err) + strContent := string(binContent) + reader := strings.NewReader(strContent) - // success - err = parser.ParseRows(reader, validator) - assert.Nil(t, err) - assert.Equal(t, int64(5), validator.ValidateCount()) + consumer := &mockJSONRowConsumer{ + handleErr: nil, + rows: make([]map[int64]interface{}, 0), + handleCount: 0, + } - // not a row-based format - reader = strings.NewReader(`{ - "dummy":[] - }`) - validator, err = NewJSONRowValidator(schema, nil) - assert.NotNil(t, validator) - assert.Nil(t, err) + t.Run("parse success", func(t *testing.T) { + // set bufSize = 4, means call handle() after reading 4 rows + parser.bufSize = 4 + err = parser.ParseRows(reader, consumer) + assert.Nil(t, err) + assert.Equal(t, len(content.Rows), len(consumer.rows)) + for i := 0; i < len(consumer.rows); i++ { + contenctRow := content.Rows[i] + parsedRow := consumer.rows[i] - err = parser.ParseRows(reader, validator) - assert.NotNil(t, err) + v1, ok := parsedRow[102].(bool) + assert.True(t, ok) + assert.Equal(t, contenctRow.FieldBool, v1) - // rows is not a list - reader = strings.NewReader(`{ - "rows": - }`) - validator, err = NewJSONRowValidator(schema, nil) - assert.NotNil(t, validator) - assert.Nil(t, err) + v2, ok := parsedRow[103].(json.Number) + assert.True(t, ok) + assert.Equal(t, strconv.FormatInt(int64(contenctRow.FieldInt8), 10), string(v2)) - err = parser.ParseRows(reader, validator) - assert.NotNil(t, err) + v3, ok := parsedRow[104].(json.Number) + assert.True(t, ok) + assert.Equal(t, strconv.FormatInt(int64(contenctRow.FieldInt16), 10), string(v3)) - // typo - reader = strings.NewReader(`{ - "rows": [} - }`) - validator, err = NewJSONRowValidator(schema, nil) - assert.NotNil(t, validator) - assert.Nil(t, err) + v4, ok := parsedRow[105].(json.Number) + assert.True(t, ok) + assert.Equal(t, strconv.FormatInt(int64(contenctRow.FieldInt32), 10), string(v4)) - err = parser.ParseRows(reader, validator) - assert.NotNil(t, err) + v5, ok := parsedRow[106].(json.Number) + assert.True(t, ok) + assert.Equal(t, strconv.FormatInt(contenctRow.FieldInt64, 10), string(v5)) - // rows is not a list - reader = strings.NewReader(`{ - "rows": {} - }`) - validator, err = NewJSONRowValidator(schema, nil) - assert.NotNil(t, validator) - assert.Nil(t, err) + v6, ok := parsedRow[107].(json.Number) + assert.True(t, ok) + f32, err := parseFloat(string(v6), 32, "") + assert.Nil(t, err) + assert.InDelta(t, contenctRow.FieldFloat, float32(f32), 10e-6) - err = parser.ParseRows(reader, validator) - assert.NotNil(t, err) + v7, ok := parsedRow[108].(json.Number) + assert.True(t, ok) + f64, err := parseFloat(string(v7), 64, "") + assert.Nil(t, err) + assert.InDelta(t, contenctRow.FieldDouble, f64, 10e-14) - // rows is not a list of list - reader = strings.NewReader(`{ - "rows": [[]] - }`) - validator, err = NewJSONRowValidator(schema, nil) - assert.NotNil(t, validator) - assert.Nil(t, err) + v8, ok := parsedRow[109].(string) + assert.True(t, ok) + assert.Equal(t, contenctRow.FieldString, v8) - err = parser.ParseRows(reader, validator) - assert.NotNil(t, err) + v9, ok := parsedRow[110].([]interface{}) + assert.True(t, ok) + assert.Equal(t, len(contenctRow.FieldBinaryVector), len(v9)) + for k := 0; k < len(v9); k++ { + val, ok := v9[k].(json.Number) + assert.True(t, ok) + assert.Equal(t, strconv.FormatInt(int64(contenctRow.FieldBinaryVector[k]), 10), string(val)) + } - // not valid json format - reader = strings.NewReader(`[]`) - validator, err = NewJSONRowValidator(schema, nil) - assert.NotNil(t, validator) - assert.Nil(t, err) + v10, ok := parsedRow[111].([]interface{}) + assert.True(t, ok) + assert.Equal(t, len(contenctRow.FieldFloatVector), len(v10)) + for k := 0; k < len(v10); k++ { + val, ok := v10[k].(json.Number) + assert.True(t, ok) + fval, err := parseFloat(string(val), 64, "") + assert.Nil(t, err) + assert.InDelta(t, contenctRow.FieldFloatVector[k], float32(fval), 10e-6) + } + } + }) - err = parser.ParseRows(reader, validator) - assert.NotNil(t, err) + t.Run("error cases", func(t *testing.T) { + // handler is nil + err = parser.ParseRows(reader, nil) + assert.NotNil(t, err) - // empty content - reader = strings.NewReader(`{}`) - validator, err = NewJSONRowValidator(schema, nil) - assert.NotNil(t, validator) - assert.Nil(t, err) + // not a row-based format + reader = strings.NewReader(`{ + "dummy":[] + }`) - err = parser.ParseRows(reader, validator) - assert.NotNil(t, err) + err = parser.ParseRows(reader, consumer) + assert.NotNil(t, err) - // empty content - reader = strings.NewReader(``) - validator, err = NewJSONRowValidator(schema, nil) - assert.NotNil(t, validator) - assert.Nil(t, err) + // rows is not a list + reader = strings.NewReader(`{ + "rows": + }`) - err = parser.ParseRows(reader, validator) - assert.NotNil(t, err) + err = parser.ParseRows(reader, consumer) + assert.NotNil(t, err) - // redundant field - reader = strings.NewReader(`{ - "rows":[ - {"dummy": 1, "field_bool": true, "field_int8": 10, "field_int16": 101, "field_int32": 1001, "field_int64": 10001, "field_float": 3.14, "field_double": 1.56, "field_string": "hello world", "field_binary_vector": [254, 0], "field_float_vector": [1.1, 1.2, 1.3, 1.4]}, - ] - }`) - err = parser.ParseRows(reader, validator) - assert.NotNil(t, err) + // typo + reader = strings.NewReader(`{ + "rows": [} + }`) - // field missed - reader = strings.NewReader(`{ - "rows":[ - {"field_int8": 10, "field_int16": 101, "field_int32": 1001, "field_int64": 10001, "field_float": 3.14, "field_double": 1.56, "field_string": "hello world", "field_binary_vector": [254, 0], "field_float_vector": [1.1, 1.2, 1.3, 1.4]}, - ] - }`) - err = parser.ParseRows(reader, validator) - assert.NotNil(t, err) + err = parser.ParseRows(reader, consumer) + assert.NotNil(t, err) + + // rows is not a list + reader = strings.NewReader(`{ + "rows": {} + }`) + + err = parser.ParseRows(reader, consumer) + assert.NotNil(t, err) + + // rows is not a list of list + reader = strings.NewReader(`{ + "rows": [[]] + }`) + + err = parser.ParseRows(reader, consumer) + assert.NotNil(t, err) + + // not valid json format + reader = strings.NewReader(`[]`) + + err = parser.ParseRows(reader, consumer) + assert.NotNil(t, err) + + // empty content + reader = strings.NewReader(`{}`) + + err = parser.ParseRows(reader, consumer) + assert.NotNil(t, err) + + // empty content + reader = strings.NewReader(``) + + err = parser.ParseRows(reader, consumer) + assert.NotNil(t, err) + + // redundant field + reader = strings.NewReader(`{ + "rows":[ + {"dummy": 1, "FieldBool": true, "FieldInt8": 10, "FieldInt16": 101, "FieldInt32": 1001, "FieldInt64": 10001, "FieldFloat": 3.14, "FieldDouble": 1.56, "FieldString": "hello world", "FieldBinaryVector": [254, 0], "FieldFloatVector": [1.1, 1.2, 1.3, 1.4]} + ] + }`) + err = parser.ParseRows(reader, consumer) + assert.NotNil(t, err) + + // field missed + reader = strings.NewReader(`{ + "rows":[ + {"FieldInt8": 10, "FieldInt16": 101, "FieldInt32": 1001, "FieldInt64": 10001, "FieldFloat": 3.14, "FieldDouble": 1.56, "FieldString": "hello world", "FieldBinaryVector": [254, 0], "FieldFloatVector": [1.1, 1.2, 1.3, 1.4]} + ] + }`) + err = parser.ParseRows(reader, consumer) + assert.NotNil(t, err) + + // handle() error + content := `{ + "rows":[ + {"FieldBool": true, "FieldInt8": 10, "FieldInt16": 101, "FieldInt32": 1001, "FieldInt64": 10001, "FieldFloat": 3.14, "FieldDouble": 1.56, "FieldString": "hello world", "FieldBinaryVector": [254, 0], "FieldFloatVector": [1.1, 1.2, 1.3, 1.4]}, + {"FieldBool": true, "FieldInt8": 10, "FieldInt16": 101, "FieldInt32": 1001, "FieldInt64": 10001, "FieldFloat": 3.14, "FieldDouble": 1.56, "FieldString": "hello world", "FieldBinaryVector": [254, 0], "FieldFloatVector": [1.1, 1.2, 1.3, 1.4]}, + {"FieldBool": true, "FieldInt8": 10, "FieldInt16": 101, "FieldInt32": 1001, "FieldInt64": 10001, "FieldFloat": 3.14, "FieldDouble": 1.56, "FieldString": "hello world", "FieldBinaryVector": [254, 0], "FieldFloatVector": [1.1, 1.2, 1.3, 1.4]} + ] + }` + consumer.handleErr = errors.New("error") + reader = strings.NewReader(content) + parser.bufSize = 2 + err = parser.ParseRows(reader, consumer) + assert.NotNil(t, err) + + reader = strings.NewReader(content) + parser.bufSize = 5 + err = parser.ParseRows(reader, consumer) + assert.NotNil(t, err) + + // row count is 0 + reader = strings.NewReader(`{ + "rows":[] + }`) + err = parser.ParseRows(reader, consumer) + assert.NotNil(t, err) + + // canceled + consumer.handleErr = nil + cancel() + reader = strings.NewReader(content) + err = parser.ParseRows(reader, consumer) + assert.NotNil(t, err) + }) } -func Test_JSONParserParserRowsStringKey(t *testing.T) { +func Test_JSONParserParseRows_StrPK(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() schema := strKeySchema() parser := NewJSONParser(ctx, schema) assert.NotNil(t, parser) - parser.bufSize = 1 - reader := strings.NewReader(`{ - "rows": [{ - "uid": "Dm4aWrbNzhmjwCTEnCJ9LDPO2N09sqysxgVfbH9Zmn3nBzmwsmk0eZN6x7wSAoPQ", - "int_scalar": 9070353, - "float_scalar": 0.9798043638085004, - "string_scalar": "ShQ44OX0z8kGpRPhaXmfSsdH7JHq5DsZzu0e2umS1hrWG0uONH2RIIAdOECaaXir", - "bool_scalar": true, - "vectors": [0.5040062902126952, 0.8297619818664708, 0.20248342801564806, 0.12834786423659314] - }, - { - "uid": "RP50U0d2napRjXu94a8oGikWgklvVsXFurp8RR4tHGw7N0gk1b7opm59k3FCpyPb", - "int_scalar": 8505288, - "float_scalar": 0.937913432198687, - "string_scalar": "Ld4b0avxathBdNvCrtm3QsWO1pYktUVR7WgAtrtozIwrA8vpeactNhJ85CFGQnK5", - "bool_scalar": false, - "vectors": [0.528232122836893, 0.6916116750653186, 0.41443762522548705, 0.26624344144792056] - }, - { - "uid": "oxhFkQitWPPw0Bjmj7UQcn4iwvS0CU7RLAC81uQFFQjWtOdiB329CPyWkfGSeYfE", - "int_scalar": 4392660, - "float_scalar": 0.32381232630490264, - "string_scalar": "EmAlB0xdQcxeBtwlZJQnLgKodiuRinynoQtg0eXrjkq24dQohzSm7Bx3zquHd3kO", - "bool_scalar": false, - "vectors": [0.7978693027281338, 0.12394906726785092, 0.42431962903815285, 0.4098707807351914] - }, - { - "uid": "sxoEL4Mpk1LdsyXhbNm059UWJ3CvxURLCQczaVI5xtBD4QcVWTDFUW7dBdye6nbn", - "int_scalar": 7927425, - "float_scalar": 0.31074026464844895, - "string_scalar": "fdY2beCvs1wSws0Gb9ySD92xwfEfJpX5DQgsWoISylBAoYOcXpRaqIJoXYS4g269", - "bool_scalar": true, - "vectors": [0.3716157812069954, 0.006981281113265229, 0.9007003458552365, 0.22492634316191004] - }, - { - "uid": "g33Rqq2UQSHPRHw5FvuXxf5uGEhIAetxE6UuXXCJj0hafG8WuJr1ueZftsySCqAd", - "int_scalar": 9288807, - "float_scalar": 0.4953578200336135, - "string_scalar": "6f8Iv1zQAGksj5XxMbbI5evTrYrB8fSFQ58jl0oU7Z4BpA81VsD2tlWqkhfoBNa7", - "bool_scalar": false, - "vectors": [0.5921374209648096, 0.04234832587925662, 0.7803878096531548, 0.1964045837884633] - }, - { - "uid": "ACIJd7lTXkRgUNmlQk6AbnWIKEEV8Z6OS3vDcm0w9psmt9sH3z1JLg1fNVCqiX3d", - "int_scalar": 1173595, - "float_scalar": 0.9000745450802002, - "string_scalar": "gpj9YctF2ig1l1APkvRzHbVE8PZVKRbk7nvW73qS2uQbY5l7MeIeTPwRBjasbY8z", - "bool_scalar": true, - "vectors": [0.4655121736168688, 0.6195496905333787, 0.5316616196326639, 0.3417492053890768] - }, - { - "uid": "f0wRVZZ9u1bEKrAjLeZj3oliEnUjBiUl6TiermeczceBmGe6M2RHONgz3qEogrd5", - "int_scalar": 3722368, - "float_scalar": 0.7212299175768438, - "string_scalar": "xydiejGUlvS49BfBuy1EuYRKt3v2oKwC6pqy7Ga4dGWn3BnQigV4XAGawixDAGHN", - "bool_scalar": false, - "vectors": [0.6173164237304075, 0.374107748459483, 0.3686321416317251, 0.585725336391797] - }, - { - "uid": "uXq9q96vUqnDebcUISFkRFT27OjD89DWhok6urXIjTuLzaSWnCVTJkrJXxFctSg0", - "int_scalar": 1940731, - "float_scalar": 0.9524404085944204, - "string_scalar": "ZXSNzR5V3t62fjop7b7DHK56ByAF0INYwycKsu6OxGP4p2j0Obs6l0NUqukypGXd", - "bool_scalar": false, - "vectors": [0.07178869784465443, 0.4208459174227864, 0.5882811425075762, 0.6867753592116734] - }, - { - "uid": "EXDDklLvQIfeCJN8cES3b9mdCYDQVhq2iLj8WWA3TPtZ1SZ4Jpidj7OXJidSD7Wn", - "int_scalar": 2158426, - "float_scalar": 0.23770219927963454, - "string_scalar": "9TNeKVSMqTP8Zxs90kaAcB7n6JbIcvFWInzi9JxZQgmYxD5xLYwaCoeUzRiNAxAg", - "bool_scalar": false, - "vectors": [0.5659468293534021, 0.6275816433340369, 0.3978846871291008, 0.3571179679645908] - }, - { - "uid": "mlaXOgYvB88WWRpXNyWv6UqpmvIHrC6pRo03AtaPLMpVymu0L9ioO8GWa1XgGyj0", - "int_scalar": 198279, - "float_scalar": 0.020343767010139513, - "string_scalar": "AblYGRZJiMAwDbMEkungG0yKTeuya4FgyliakWWqSOJ5TvQWB9Ki2WXbnvSsYIDF", - "bool_scalar": true, - "vectors": [0.5374636140212398, 0.7655373567912009, 0.05491796821609715, 0.349384366747262] - } - ] - }`) + // prepare test data + content := &strKeyContent{ + Rows: make([]strKeyRow, 0), + } + for i := 0; i < 10; i++ { + row := strKeyRow{ + UID: "strID_" + strconv.FormatInt(int64(i), 10), + FieldInt32: int32(10000 + i), + FieldFloat: 1 + float32(i)/13, + FieldString: strconv.FormatInt(int64(i+1), 10) + " this string contains unicode character: 🎵", + FieldBool: i%3 == 0, + FieldFloatVector: []float32{float32(i) / 2, float32(i) / 3, float32(i) / 6, float32(i) / 9}, + } + content.Rows = append(content.Rows, row) + } - validator, err := NewJSONRowValidator(schema, nil) - assert.NotNil(t, validator) + binContent, err := json.Marshal(content) assert.Nil(t, err) + strContent := string(binContent) + reader := strings.NewReader(strContent) - err = parser.ParseRows(reader, validator) + consumer := &mockJSONRowConsumer{ + handleErr: nil, + rows: make([]map[int64]interface{}, 0), + handleCount: 0, + } + + err = parser.ParseRows(reader, consumer) assert.Nil(t, err) - assert.Equal(t, int64(10), validator.ValidateCount()) + assert.Equal(t, len(content.Rows), len(consumer.rows)) + for i := 0; i < len(consumer.rows); i++ { + contenctRow := content.Rows[i] + parsedRow := consumer.rows[i] + + v1, ok := parsedRow[101].(string) + assert.True(t, ok) + assert.Equal(t, contenctRow.UID, v1) + + v2, ok := parsedRow[102].(json.Number) + assert.True(t, ok) + assert.Equal(t, strconv.FormatInt(int64(contenctRow.FieldInt32), 10), string(v2)) + + v3, ok := parsedRow[103].(json.Number) + assert.True(t, ok) + f32, err := parseFloat(string(v3), 32, "") + assert.Nil(t, err) + assert.InDelta(t, contenctRow.FieldFloat, float32(f32), 10e-6) + + v4, ok := parsedRow[104].(string) + assert.True(t, ok) + assert.Equal(t, contenctRow.FieldString, v4) + + v5, ok := parsedRow[105].(bool) + assert.True(t, ok) + assert.Equal(t, contenctRow.FieldBool, v5) + + v6, ok := parsedRow[106].([]interface{}) + assert.True(t, ok) + assert.Equal(t, len(contenctRow.FieldFloatVector), len(v6)) + for k := 0; k < len(v6); k++ { + val, ok := v6[k].(json.Number) + assert.True(t, ok) + fval, err := parseFloat(string(val), 64, "") + assert.Nil(t, err) + assert.InDelta(t, contenctRow.FieldFloatVector[k], float32(fval), 10e-6) + } + } } diff --git a/internal/util/importutil/numpy_parser_test.go b/internal/util/importutil/numpy_parser_test.go index 9e7d3e75a2..258c27f694 100644 --- a/internal/util/importutil/numpy_parser_test.go +++ b/internal/util/importutil/numpy_parser_test.go @@ -55,7 +55,7 @@ func Test_NumpyParserValidate(t *testing.T) { Fields: []*schemapb.FieldSchema{ { FieldID: 109, - Name: "field_string", + Name: "FieldString", IsPrimaryKey: false, Description: "string", DataType: schemapb.DataType_String, @@ -64,7 +64,7 @@ func Test_NumpyParserValidate(t *testing.T) { }, flushFunc) err = p.validate(adapter, "dummy") assert.NotNil(t, err) - err = p.validate(adapter, "field_string") + err = p.validate(adapter, "FieldString") assert.NotNil(t, err) }) @@ -87,7 +87,7 @@ func Test_NumpyParserValidate(t *testing.T) { assert.Nil(t, err) assert.NotNil(t, adapter) - err = parser.validate(adapter, "field_double") + err = parser.validate(adapter, "FieldDouble") assert.Nil(t, err) assert.Equal(t, len(data1), parser.columnDesc.elementCount) @@ -108,7 +108,7 @@ func Test_NumpyParserValidate(t *testing.T) { assert.Nil(t, err) assert.NotNil(t, adapter) - err = parser.validate(adapter, "field_double") + err = parser.validate(adapter, "FieldDouble") assert.NotNil(t, err) // shape mismatch @@ -125,7 +125,7 @@ func Test_NumpyParserValidate(t *testing.T) { assert.Nil(t, err) assert.NotNil(t, adapter) - err = parser.validate(adapter, "field_double") + err = parser.validate(adapter, "FieldDouble") assert.NotNil(t, err) }) @@ -143,7 +143,7 @@ func Test_NumpyParserValidate(t *testing.T) { assert.Nil(t, err) assert.NotNil(t, adapter) - err = parser.validate(adapter, "field_binary_vector") + err = parser.validate(adapter, "FieldBinaryVector") assert.Nil(t, err) assert.Equal(t, len(data1)*len(data1[0]), parser.columnDesc.elementCount) @@ -175,7 +175,7 @@ func Test_NumpyParserValidate(t *testing.T) { assert.Nil(t, err) assert.NotNil(t, adapter) - err = parser.validate(adapter, "field_binary_vector") + err = parser.validate(adapter, "FieldBinaryVector") assert.NotNil(t, err) // shape[1] mismatch @@ -192,7 +192,7 @@ func Test_NumpyParserValidate(t *testing.T) { assert.Nil(t, err) assert.NotNil(t, adapter) - err = parser.validate(adapter, "field_binary_vector") + err = parser.validate(adapter, "FieldBinaryVector") assert.NotNil(t, err) // dimension mismatch @@ -200,18 +200,18 @@ func Test_NumpyParserValidate(t *testing.T) { Fields: []*schemapb.FieldSchema{ { FieldID: 109, - Name: "field_binary_vector", + Name: "FieldBinaryVector", DataType: schemapb.DataType_BinaryVector, }, }, }, flushFunc) - err = p.validate(adapter, "field_binary_vector") + err = p.validate(adapter, "FieldBinaryVector") assert.NotNil(t, err) }) t.Run("validate float vector", func(t *testing.T) { - filePath := TempFilesPath + "float_vector.npy" + filePath := TempFilesPath + "Float_vector.npy" data1 := [][4]float32{{0, 0, 0, 0}, {1, 1, 1, 1}, {2, 2, 2, 2}, {3, 3, 3, 3}} err := CreateNumpyFile(filePath, data1) assert.Nil(t, err) @@ -224,7 +224,7 @@ func Test_NumpyParserValidate(t *testing.T) { assert.Nil(t, err) assert.NotNil(t, adapter) - err = parser.validate(adapter, "field_float_vector") + err = parser.validate(adapter, "FieldFloatVector") assert.Nil(t, err) assert.Equal(t, len(data1)*len(data1[0]), parser.columnDesc.elementCount) @@ -242,7 +242,7 @@ func Test_NumpyParserValidate(t *testing.T) { assert.Nil(t, err) assert.NotNil(t, adapter) - err = parser.validate(adapter, "field_float_vector") + err = parser.validate(adapter, "FieldFloatVector") assert.NotNil(t, err) // shape mismatch @@ -259,7 +259,7 @@ func Test_NumpyParserValidate(t *testing.T) { assert.Nil(t, err) assert.NotNil(t, adapter) - err = parser.validate(adapter, "field_float_vector") + err = parser.validate(adapter, "FieldFloatVector") assert.NotNil(t, err) // shape[1] mismatch @@ -276,7 +276,7 @@ func Test_NumpyParserValidate(t *testing.T) { assert.Nil(t, err) assert.NotNil(t, adapter) - err = parser.validate(adapter, "field_float_vector") + err = parser.validate(adapter, "FieldFloatVector") assert.NotNil(t, err) // dimension mismatch @@ -284,13 +284,13 @@ func Test_NumpyParserValidate(t *testing.T) { Fields: []*schemapb.FieldSchema{ { FieldID: 109, - Name: "field_float_vector", + Name: "FieldFloatVector", DataType: schemapb.DataType_FloatVector, }, }, }, flushFunc) - err = p.validate(adapter, "field_float_vector") + err = p.validate(adapter, "FieldFloatVector") assert.NotNil(t, err) }) } @@ -350,7 +350,7 @@ func Test_NumpyParserParse(t *testing.T) { return nil } - checkFunc(data, "field_bool", flushFunc) + checkFunc(data, "FieldBool", flushFunc) }) t.Run("parse scalar int8", func(t *testing.T) { @@ -365,7 +365,7 @@ func Test_NumpyParserParse(t *testing.T) { return nil } - checkFunc(data, "field_int8", flushFunc) + checkFunc(data, "FieldInt8", flushFunc) }) t.Run("parse scalar int16", func(t *testing.T) { @@ -380,7 +380,7 @@ func Test_NumpyParserParse(t *testing.T) { return nil } - checkFunc(data, "field_int16", flushFunc) + checkFunc(data, "FieldInt16", flushFunc) }) t.Run("parse scalar int32", func(t *testing.T) { @@ -395,7 +395,7 @@ func Test_NumpyParserParse(t *testing.T) { return nil } - checkFunc(data, "field_int32", flushFunc) + checkFunc(data, "FieldInt32", flushFunc) }) t.Run("parse scalar int64", func(t *testing.T) { @@ -410,7 +410,7 @@ func Test_NumpyParserParse(t *testing.T) { return nil } - checkFunc(data, "field_int64", flushFunc) + checkFunc(data, "FieldInt64", flushFunc) }) t.Run("parse scalar float", func(t *testing.T) { @@ -425,7 +425,7 @@ func Test_NumpyParserParse(t *testing.T) { return nil } - checkFunc(data, "field_float", flushFunc) + checkFunc(data, "FieldFloat", flushFunc) }) t.Run("parse scalar double", func(t *testing.T) { @@ -440,7 +440,7 @@ func Test_NumpyParserParse(t *testing.T) { return nil } - checkFunc(data, "field_double", flushFunc) + checkFunc(data, "FieldDouble", flushFunc) }) t.Run("parse scalar varchar", func(t *testing.T) { @@ -455,7 +455,7 @@ func Test_NumpyParserParse(t *testing.T) { return nil } - checkFunc(data, "field_string", flushFunc) + checkFunc(data, "FieldString", flushFunc) }) t.Run("parse binary vector", func(t *testing.T) { @@ -473,7 +473,7 @@ func Test_NumpyParserParse(t *testing.T) { return nil } - checkFunc(data, "field_binary_vector", flushFunc) + checkFunc(data, "FieldBinaryVector", flushFunc) }) t.Run("parse binary vector with float32", func(t *testing.T) { @@ -491,7 +491,7 @@ func Test_NumpyParserParse(t *testing.T) { return nil } - checkFunc(data, "field_float_vector", flushFunc) + checkFunc(data, "FieldFloatVector", flushFunc) }) t.Run("parse binary vector with float64", func(t *testing.T) { @@ -509,7 +509,7 @@ func Test_NumpyParserParse(t *testing.T) { return nil } - checkFunc(data, "field_float_vector", flushFunc) + checkFunc(data, "FieldFloatVector", flushFunc) }) }