From e5b456ff1c0bd7aee47728ab805effdcd3032b78 Mon Sep 17 00:00:00 2001 From: groot Date: Thu, 18 May 2023 19:25:25 +0800 Subject: [PATCH] Support JSON for bulkinsert (#24227) Signed-off-by: yhmo --- internal/util/importutil/binlog_adapter.go | 33 ++++++++++ .../util/importutil/binlog_adapter_test.go | 32 ++++++++- internal/util/importutil/binlog_file.go | 43 ++++++++++++ internal/util/importutil/binlog_file_test.go | 66 +++++++++++++++++++ internal/util/importutil/import_util.go | 20 ++++++ internal/util/importutil/import_util_test.go | 48 ++++++++++++++ .../util/importutil/import_wrapper_test.go | 29 ++++---- internal/util/importutil/json_parser_test.go | 2 + internal/util/importutil/numpy_adapter.go | 1 + internal/util/importutil/numpy_parser.go | 45 +++++++++++-- internal/util/importutil/numpy_parser_test.go | 5 ++ 11 files changed, 306 insertions(+), 18 deletions(-) diff --git a/internal/util/importutil/binlog_adapter.go b/internal/util/importutil/binlog_adapter.go index dc80bb12b8..b5a1133ef1 100644 --- a/internal/util/importutil/binlog_adapter.go +++ b/internal/util/importutil/binlog_adapter.go @@ -725,6 +725,16 @@ func (p *BinlogAdapter) readInsertlog(fieldID storage.FieldID, logPath string, if err != nil { return err } + case schemapb.DataType_JSON: + data, err := binlogFile.ReadJSON() + if err != nil { + return err + } + + err = p.dispatchBytesToShards(data, memoryData, shardList, fieldID) + if err != nil { + return err + } case schemapb.DataType_BinaryVector: data, dim, err := binlogFile.ReadBinaryVector() if err != nil { @@ -937,6 +947,29 @@ func (p *BinlogAdapter) dispatchVarcharToShards(data []string, memoryData []map[ return nil } +func (p *BinlogAdapter) dispatchBytesToShards(data [][]byte, memoryData []map[storage.FieldID]storage.FieldData, + shardList []int32, fieldID storage.FieldID) error { + // verify row count + if len(data) != len(shardList) { + log.Error("Binlog adapter: JSON field row count is not equal to shard list row count", zap.Int("dataLen", len(data)), zap.Int("shardLen", len(shardList))) + return fmt.Errorf("varchar JSON row count %d is not equal to shard list row count %d", len(data), len(shardList)) + } + + // dispatch entities acoording to shard list + for i, val := range data { + shardID := shardList[i] + if shardID < 0 { + continue // this entity has been deleted or excluded by timestamp + } + + fields := memoryData[shardID] // initSegmentData() can ensure the existence, no need to check bound here + field := fields[fieldID] // initSegmentData() can ensure the existence, no need to check existence here + field.(*storage.JSONFieldData).Data = append(field.(*storage.JSONFieldData).Data, val) + } + + return nil +} + func (p *BinlogAdapter) dispatchBinaryVecToShards(data []byte, dim int, memoryData []map[storage.FieldID]storage.FieldData, shardList []int32, fieldID storage.FieldID) error { // verify row count diff --git a/internal/util/importutil/binlog_adapter_test.go b/internal/util/importutil/binlog_adapter_test.go index c60ac0732a..cb2c537a8f 100644 --- a/internal/util/importutil/binlog_adapter_test.go +++ b/internal/util/importutil/binlog_adapter_test.go @@ -19,6 +19,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "math" "strconv" "testing" @@ -82,6 +83,7 @@ func createFieldsData(rowCount int) map[storage.FieldID]interface{} { varcharData := make([]string, 0) binVecData := make([][]byte, 0) floatVecData := make([][]float32, 0) + jsonData := make([][]byte, 0) boolFunc := func(i int) bool { return i%3 != 0 @@ -100,6 +102,7 @@ func createFieldsData(rowCount int) map[storage.FieldID]interface{} { varcharData = append(varcharData, "no."+strconv.Itoa(i)) binVecData = append(binVecData, []byte{byte(i % 256), byte(i % 256)}) // dim = 16 floatVecData = append(floatVecData, []float32{float32(i / 2), float32(i / 4), float32(i / 5), float32(i / 8)}) // dim = 4 + jsonData = append(jsonData, []byte(fmt.Sprintf("{\"y\": %d}", i))) } fieldsData[0] = rowIDData @@ -114,6 +117,7 @@ func createFieldsData(rowCount int) map[storage.FieldID]interface{} { fieldsData[109] = varcharData fieldsData[110] = binVecData fieldsData[111] = floatVecData + fieldsData[112] = jsonData return fieldsData } @@ -150,6 +154,9 @@ func createSegmentsData(fieldsData map[storage.FieldID]interface{}, shardNum int for _, vec := range floatVectors { segData[fieldID].(*storage.FloatVectorFieldData).Data = append(segData[fieldID].(*storage.FloatVectorFieldData).Data, vec...) } + fieldID = int64(112) + segData[fieldID].(*storage.JSONFieldData).Data = append(segData[fieldID].(*storage.JSONFieldData).Data, fieldsData[fieldID].([][]byte)...) + segmentsData = append(segmentsData, segData) } return segmentsData @@ -227,7 +234,7 @@ func Test_BinlogAdapterVerify(t *testing.T) { // row id field missed holder.fieldFiles = make(map[int64][]string) - for i := int64(102); i <= 111; i++ { + for i := int64(102); i <= 112; i++ { holder.fieldFiles[i] = make([]string, 0) } err = adapter.verify(holder) @@ -249,7 +256,7 @@ func Test_BinlogAdapterVerify(t *testing.T) { assert.NotNil(t, err) // succeed - for i := int64(102); i <= 111; i++ { + for i := int64(102); i <= 112; i++ { holder.fieldFiles[i] = []string{ "a", } @@ -729,6 +736,7 @@ func Test_BinlogAdapterReadInt64PK(t *testing.T) { int64(109): {"109_insertlog"}, int64(110): {"110_insertlog"}, int64(111): {"111_insertlog"}, + int64(112): {"112_insertlog"}, } holder.deltaFiles = []string{"deltalog"} err = adapter.Read(holder) @@ -750,6 +758,7 @@ func Test_BinlogAdapterReadInt64PK(t *testing.T) { "109_insertlog": createBinlogBuf(t, schemapb.DataType_VarChar, fieldsData[109].([]string)), "110_insertlog": createBinlogBuf(t, schemapb.DataType_BinaryVector, fieldsData[110].([][]byte)), "111_insertlog": createBinlogBuf(t, schemapb.DataType_FloatVector, fieldsData[111].([][]float32)), + "112_insertlog": createBinlogBuf(t, schemapb.DataType_JSON, fieldsData[112].([][]byte)), "deltalog": createDeltalogBuf(t, deletedItems, false), } @@ -982,6 +991,20 @@ func Test_BinlogAdapterDispatch(t *testing.T) { assert.Equal(t, 1, segmentsData[1][fieldID].RowNum()) assert.Equal(t, 0, segmentsData[2][fieldID].RowNum()) + // dispatch JSON data + fieldID = int64(112) + data := [][]byte{[]byte("{\"x\": 3, \"y\": 10.5}"), []byte("{\"y\": true}"), []byte("{\"z\": \"hello\"}"), []byte("{}")} + err = adapter.dispatchBytesToShards(data, segmentsData, shardList, fieldID) // row count mismatch + assert.NotNil(t, err) + for _, segment := range segmentsData { + assert.Equal(t, 0, segment[fieldID].RowNum()) + } + err = adapter.dispatchBytesToShards([][]byte{[]byte("{}"), []byte("{}"), []byte("{}")}, segmentsData, shardList, fieldID) // succeed + assert.Nil(t, err) + assert.Equal(t, 1, segmentsData[0][fieldID].RowNum()) + assert.Equal(t, 1, segmentsData[1][fieldID].RowNum()) + assert.Equal(t, 0, segmentsData[2][fieldID].RowNum()) + // dispatch binary vector data fieldID = int64(110) err = adapter.dispatchBinaryVecToShards([]byte{1, 2, 3, 4}, 16, segmentsData, shardList, fieldID) // row count mismatch @@ -1077,6 +1100,11 @@ func Test_BinlogAdapterReadInsertlog(t *testing.T) { err = adapter.readInsertlog(1, "varchar", segmentsData, []int32{1}) assert.NotNil(t, err) + // failed to dispatch JSON data + chunkManager.readBuf["JSON"] = createBinlogBuf(t, schemapb.DataType_JSON, fieldsData[112].([][]byte)) + err = adapter.readInsertlog(1, "JSON", segmentsData, []int32{1}) + assert.NotNil(t, err) + // failed to dispatch binvector data chunkManager.readBuf["binvector"] = createBinlogBuf(t, schemapb.DataType_BinaryVector, fieldsData[110].([][]byte)) err = adapter.readInsertlog(1, "binvector", segmentsData, []int32{1}) diff --git a/internal/util/importutil/binlog_file.go b/internal/util/importutil/binlog_file.go index 61325e352b..05ef74eac0 100644 --- a/internal/util/importutil/binlog_file.go +++ b/internal/util/importutil/binlog_file.go @@ -434,6 +434,49 @@ func (p *BinlogFile) ReadVarchar() ([]string, error) { return result, nil } +// ReadJSON method reads all the blocks of a binlog by a data type. +// A binlog is designed to support multiple blocks, but so far each binlog always contains only one block. +func (p *BinlogFile) ReadJSON() ([][]byte, error) { + if p.reader == nil { + log.Error("Binlog file: binlog reader not yet initialized") + return nil, errors.New("binlog reader not yet initialized") + } + + result := make([][]byte, 0) + for { + event, err := p.reader.NextEventReader() + if err != nil { + log.Error("Binlog file: failed to iterate events reader", zap.Error(err)) + return nil, fmt.Errorf("failed to iterate events reader, error: %w", err) + } + + // end of the file + if event == nil { + break + } + + if event.TypeCode != storage.InsertEventType { + log.Error("Binlog file: binlog file is not insert log") + return nil, errors.New("binlog file is not insert log") + } + + if p.DataType() != schemapb.DataType_JSON { + log.Error("Binlog file: binlog data type is not JSON") + return nil, errors.New("binlog data type is not JSON") + } + + data, err := event.PayloadReaderInterface.GetJSONFromPayload() + if err != nil { + log.Error("Binlog file: failed to read JSON data", zap.Error(err)) + return nil, fmt.Errorf("failed to read JSON data, error: %w", err) + } + + result = append(result, data...) + } + + return result, nil +} + // ReadBinaryVector method reads all the blocks of a binlog by a data type. // A binlog is designed to support multiple blocks, but so far each binlog always contains only one block. // return vectors data and the dimension diff --git a/internal/util/importutil/binlog_file_test.go b/internal/util/importutil/binlog_file_test.go index 254351e9c8..c19c028f3d 100644 --- a/internal/util/importutil/binlog_file_test.go +++ b/internal/util/importutil/binlog_file_test.go @@ -112,6 +112,17 @@ func createBinlogBuf(t *testing.T, dataType schemapb.DataType, data interface{}) // without the two lines, the case will crash at here. // the "original_size" is come from storage.originalSizeKey w.AddExtra("original_size", fmt.Sprintf("%v", sizeTotal)) + case schemapb.DataType_JSON: + rows := data.([][]byte) + sizeTotal := 0 + for i := 0; i < len(rows); i++ { + err = evt.AddOneJSONToPayload(rows[i]) + assert.Nil(t, err) + sizeTotal += binary.Size(rows[i]) + } + // without the two lines, the case will crash at here. + // the "original_size" is come from storage.originalSizeKey + w.AddExtra("original_size", fmt.Sprintf("%v", sizeTotal)) case schemapb.DataType_BinaryVector: vectors := data.([][]byte) for i := 0; i < len(vectors); i++ { @@ -231,6 +242,10 @@ func Test_BinlogFileOpen(t *testing.T) { assert.Nil(t, dataVarchar) assert.NotNil(t, err) + dataJSON, err := binlogFile.ReadJSON() + assert.Nil(t, dataJSON) + assert.NotNil(t, err) + dataBinaryVector, dim, err := binlogFile.ReadBinaryVector() assert.Nil(t, dataBinaryVector) assert.Equal(t, 0, dim) @@ -631,12 +646,63 @@ func Test_BinlogFileVarchar(t *testing.T) { err = binlogFile.Open("dummy") assert.Nil(t, err) + d, err := binlogFile.ReadJSON() + assert.Zero(t, len(d)) + assert.NotNil(t, err) + + binlogFile.Close() +} + +func Test_BinlogFileJSON(t *testing.T) { + source := [][]byte{[]byte("{\"x\": 3, \"y\": 10.5}"), []byte("{\"y\": true}"), []byte("{\"z\": \"hello\"}"), []byte("{}")} + chunkManager := &MockChunkManager{ + readBuf: map[string][]byte{ + "dummy": createBinlogBuf(t, schemapb.DataType_JSON, source), + }, + } + + binlogFile, err := NewBinlogFile(chunkManager) + assert.Nil(t, err) + assert.NotNil(t, binlogFile) + + // correct reading + err = binlogFile.Open("dummy") + assert.Nil(t, err) + assert.Equal(t, schemapb.DataType_JSON, binlogFile.DataType()) + + data, err := binlogFile.ReadJSON() + assert.Nil(t, err) + assert.NotNil(t, data) + assert.Equal(t, len(source), len(data)) + for i := 0; i < len(source); i++ { + assert.Equal(t, string(source[i]), string(data[i])) + } + + binlogFile.Close() + + // wrong data type reading + binlogFile, err = NewBinlogFile(chunkManager) + assert.Nil(t, err) + err = binlogFile.Open("dummy") + assert.Nil(t, err) + d, dim, err := binlogFile.ReadBinaryVector() assert.Zero(t, len(d)) assert.Zero(t, dim) assert.NotNil(t, err) binlogFile.Close() + + // wrong log type + chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false) + err = binlogFile.Open("dummy") + assert.Nil(t, err) + + data, err = binlogFile.ReadJSON() + assert.Zero(t, len(data)) + assert.NotNil(t, err) + + binlogFile.Close() } func Test_BinlogFileBinaryVector(t *testing.T) { diff --git a/internal/util/importutil/import_util.go b/internal/util/importutil/import_util.go index d295209ee4..58014bf93d 100644 --- a/internal/util/importutil/import_util.go +++ b/internal/util/importutil/import_util.go @@ -103,6 +103,10 @@ func initSegmentData(collectionSchema *schemapb.CollectionSchema) map[storage.Fi segmentData[schema.GetFieldID()] = &storage.StringFieldData{ Data: make([]string, 0), } + case schemapb.DataType_JSON: + segmentData[schema.GetFieldID()] = &storage.JSONFieldData{ + Data: make([][]byte, 0), + } default: log.Error("Import util: unsupported data type", zap.String("DataType", getTypeName(schema.DataType))) return nil @@ -303,6 +307,20 @@ func initValidators(collectionSchema *schemapb.CollectionSchema, validators map[ } return nil } + case schemapb.DataType_JSON: + validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error { + if value, ok := obj.(string); ok { + var dummy interface{} + err := json.Unmarshal([]byte(value), &dummy) + if err != nil { + return fmt.Errorf("failed to parse value '%v' for JSON field '%s', error: %w", value, schema.GetName(), err) + } + field.(*storage.JSONFieldData).Data = append(field.(*storage.JSONFieldData).Data, []byte(value)) + } else { + return fmt.Errorf("illegal value '%v' for JSON type field '%s'", obj, schema.GetName()) + } + return nil + } default: return fmt.Errorf("unsupport data type: %s", getTypeName(collectionSchema.Fields[i].DataType)) } @@ -495,6 +513,8 @@ func getTypeName(dt schemapb.DataType) string { return "BinaryVector" case schemapb.DataType_FloatVector: return "FloatVector" + case schemapb.DataType_JSON: + return "JSON" default: return "InvalidType" } diff --git a/internal/util/importutil/import_util_test.go b/internal/util/importutil/import_util_test.go index fb31ae343c..dc9f16870f 100644 --- a/internal/util/importutil/import_util_test.go +++ b/internal/util/importutil/import_util_test.go @@ -116,6 +116,13 @@ func sampleSchema() *schemapb.CollectionSchema { {Key: "dim", Value: "4"}, }, }, + { + FieldID: 112, + Name: "FieldJSON", + IsPrimaryKey: false, + Description: "json", + DataType: schemapb.DataType_JSON, + }, }, } return schema @@ -131,6 +138,7 @@ type sampleRow struct { FieldFloat float32 FieldDouble float64 FieldString string + FieldJSON string FieldBinaryVector []int FieldFloatVector []float32 } @@ -454,6 +462,44 @@ func Test_InitValidators(t *testing.T) { err = initValidators(schema, validators) assert.NotNil(t, err) }) + + t.Run("json field", func(t *testing.T) { + schema = &schemapb.CollectionSchema{ + Name: "schema", + Description: "schema", + AutoID: true, + Fields: []*schemapb.FieldSchema{ + { + FieldID: 102, + Name: "FieldJSON", + DataType: schemapb.DataType_JSON, + }, + }, + } + + validators = make(map[storage.FieldID]*Validator) + err = initValidators(schema, validators) + assert.Nil(t, err) + + v, ok := validators[102] + assert.True(t, ok) + + fields := initSegmentData(schema) + assert.NotNil(t, fields) + fieldData := fields[102] + + err = v.convertFunc("{\"x\": 1, \"y\": 5}", fieldData) + assert.Nil(t, err) + assert.Equal(t, 1, fieldData.RowNum()) + + err = v.convertFunc("{}", fieldData) + assert.Nil(t, err) + assert.Equal(t, 2, fieldData.RowNum()) + + err = v.convertFunc("", fieldData) + assert.Error(t, err) + assert.Equal(t, 2, fieldData.RowNum()) + }) } func Test_GetFileNameAndExt(t *testing.T) { @@ -617,6 +663,8 @@ func Test_GetTypeName(t *testing.T) { assert.NotEmpty(t, str) str = getTypeName(schemapb.DataType_FloatVector) assert.NotEmpty(t, str) + str = getTypeName(schemapb.DataType_JSON) + assert.NotEmpty(t, str) str = getTypeName(schemapb.DataType_None) assert.Equal(t, "InvalidType", str) } diff --git a/internal/util/importutil/import_wrapper_test.go b/internal/util/importutil/import_wrapper_test.go index db423b931e..da5a0b1d5f 100644 --- a/internal/util/importutil/import_wrapper_test.go +++ b/internal/util/importutil/import_wrapper_test.go @@ -241,11 +241,11 @@ func Test_ImportWrapperRowBased(t *testing.T) { content := []byte(`{ "rows":[ - {"FieldBool": true, "FieldInt8": 10, "FieldInt16": 101, "FieldInt32": 1001, "FieldInt64": 10001, "FieldFloat": 3.14, "FieldDouble": 1.56, "FieldString": "hello world", "FieldBinaryVector": [254, 0], "FieldFloatVector": [1.1, 1.2, 1.3, 1.4]}, - {"FieldBool": false, "FieldInt8": 11, "FieldInt16": 102, "FieldInt32": 1002, "FieldInt64": 10002, "FieldFloat": 3.15, "FieldDouble": 2.56, "FieldString": "hello world", "FieldBinaryVector": [253, 0], "FieldFloatVector": [2.1, 2.2, 2.3, 2.4]}, - {"FieldBool": true, "FieldInt8": 12, "FieldInt16": 103, "FieldInt32": 1003, "FieldInt64": 10003, "FieldFloat": 3.16, "FieldDouble": 3.56, "FieldString": "hello world", "FieldBinaryVector": [252, 0], "FieldFloatVector": [3.1, 3.2, 3.3, 3.4]}, - {"FieldBool": false, "FieldInt8": 13, "FieldInt16": 104, "FieldInt32": 1004, "FieldInt64": 10004, "FieldFloat": 3.17, "FieldDouble": 4.56, "FieldString": "hello world", "FieldBinaryVector": [251, 0], "FieldFloatVector": [4.1, 4.2, 4.3, 4.4]}, - {"FieldBool": true, "FieldInt8": 14, "FieldInt16": 105, "FieldInt32": 1005, "FieldInt64": 10005, "FieldFloat": 3.18, "FieldDouble": 5.56, "FieldString": "hello world", "FieldBinaryVector": [250, 0], "FieldFloatVector": [5.1, 5.2, 5.3, 5.4]} + {"FieldBool": true, "FieldInt8": 10, "FieldInt16": 101, "FieldInt32": 1001, "FieldInt64": 10001, "FieldFloat": 3.14, "FieldDouble": 1.56, "FieldString": "hello world", "FieldJSON": "{\"x\": 2}", "FieldBinaryVector": [254, 0], "FieldFloatVector": [1.1, 1.2, 1.3, 1.4]}, + {"FieldBool": false, "FieldInt8": 11, "FieldInt16": 102, "FieldInt32": 1002, "FieldInt64": 10002, "FieldFloat": 3.15, "FieldDouble": 2.56, "FieldString": "hello world", "FieldJSON": "{\"k\": 2.5}", "FieldBinaryVector": [253, 0], "FieldFloatVector": [2.1, 2.2, 2.3, 2.4]}, + {"FieldBool": true, "FieldInt8": 12, "FieldInt16": 103, "FieldInt32": 1003, "FieldInt64": 10003, "FieldFloat": 3.16, "FieldDouble": 3.56, "FieldString": "hello world", "FieldJSON": "{\"y\": \"hello\"}", "FieldBinaryVector": [252, 0], "FieldFloatVector": [3.1, 3.2, 3.3, 3.4]}, + {"FieldBool": false, "FieldInt8": 13, "FieldInt16": 104, "FieldInt32": 1004, "FieldInt64": 10004, "FieldFloat": 3.17, "FieldDouble": 4.56, "FieldString": "hello world", "FieldJSON": "{}", "FieldBinaryVector": [251, 0], "FieldFloatVector": [4.1, 4.2, 4.3, 4.4]}, + {"FieldBool": true, "FieldInt8": 14, "FieldInt16": 105, "FieldInt32": 1005, "FieldInt64": 10005, "FieldFloat": 3.18, "FieldDouble": 5.56, "FieldString": "hello world", "FieldJSON": "{\"x\": true}", "FieldBinaryVector": [250, 0], "FieldFloatVector": [5.1, 5.2, 5.3, 5.4]} ] }`) @@ -288,7 +288,7 @@ func Test_ImportWrapperRowBased(t *testing.T) { // parse error content = []byte(`{ "rows":[ - {"FieldBool": true, "FieldInt8": false, "FieldInt16": 101, "FieldInt32": 1001, "FieldInt64": 10001, "FieldFloat": 3.14, "FieldDouble": 1.56, "FieldString": "hello world", "FieldBinaryVector": [254, 0], "FieldFloatVector": [1.1, 1.2, 1.3, 1.4]}, + {"FieldBool": true, "FieldInt8": false, "FieldInt16": 101, "FieldInt32": 1001, "FieldInt64": 10001, "FieldFloat": 3.14, "FieldDouble": 1.56, "FieldString": "hello world", "FieldJSON": "{\"x\": 2}", "FieldBinaryVector": [254, 0], "FieldFloatVector": [1.1, 1.2, 1.3, 1.4]}, ] }`) @@ -372,6 +372,13 @@ func createSampleNumpyFiles(t *testing.T, cm storage.ChunkManager) []string { assert.NoError(t, err) files = append(files, filePath) + filePath = path.Join(cm.RootPath(), "FieldJSON.npy") + content, err = CreateNumpyData([]string{"{\"x\": 10, \"y\": 5}", "{\"z\": 5}", "{}", "{}", "{\"x\": 3}"}) + assert.Nil(t, err) + err = cm.Write(ctx, filePath, content) + assert.NoError(t, err) + files = append(files, filePath) + filePath = path.Join(cm.RootPath(), "FieldBinaryVector.npy") content, err = CreateNumpyData([][2]uint8{{1, 2}, {3, 4}, {5, 6}, {7, 8}, {9, 10}}) assert.Nil(t, err) @@ -705,11 +712,11 @@ func Test_ImportWrapperReportFailRowBased(t *testing.T) { content := []byte(`{ "rows":[ - {"FieldBool": true, "FieldInt8": 10, "FieldInt16": 101, "FieldInt32": 1001, "FieldInt64": 10001, "FieldFloat": 3.14, "FieldDouble": 1.56, "FieldString": "hello world", "FieldBinaryVector": [254, 0], "FieldFloatVector": [1.1, 1.2, 1.3, 1.4]}, - {"FieldBool": false, "FieldInt8": 11, "FieldInt16": 102, "FieldInt32": 1002, "FieldInt64": 10002, "FieldFloat": 3.15, "FieldDouble": 2.56, "FieldString": "hello world", "FieldBinaryVector": [253, 0], "FieldFloatVector": [2.1, 2.2, 2.3, 2.4]}, - {"FieldBool": true, "FieldInt8": 12, "FieldInt16": 103, "FieldInt32": 1003, "FieldInt64": 10003, "FieldFloat": 3.16, "FieldDouble": 3.56, "FieldString": "hello world", "FieldBinaryVector": [252, 0], "FieldFloatVector": [3.1, 3.2, 3.3, 3.4]}, - {"FieldBool": false, "FieldInt8": 13, "FieldInt16": 104, "FieldInt32": 1004, "FieldInt64": 10004, "FieldFloat": 3.17, "FieldDouble": 4.56, "FieldString": "hello world", "FieldBinaryVector": [251, 0], "FieldFloatVector": [4.1, 4.2, 4.3, 4.4]}, - {"FieldBool": true, "FieldInt8": 14, "FieldInt16": 105, "FieldInt32": 1005, "FieldInt64": 10005, "FieldFloat": 3.18, "FieldDouble": 5.56, "FieldString": "hello world", "FieldBinaryVector": [250, 0], "FieldFloatVector": [5.1, 5.2, 5.3, 5.4]} + {"FieldBool": true, "FieldInt8": 10, "FieldInt16": 101, "FieldInt32": 1001, "FieldInt64": 10001, "FieldFloat": 3.14, "FieldDouble": 1.56, "FieldString": "hello world", "FieldJSON": "{\"x\": \"aaa\"}", "FieldBinaryVector": [254, 0], "FieldFloatVector": [1.1, 1.2, 1.3, 1.4]}, + {"FieldBool": false, "FieldInt8": 11, "FieldInt16": 102, "FieldInt32": 1002, "FieldInt64": 10002, "FieldFloat": 3.15, "FieldDouble": 2.56, "FieldString": "hello world", "FieldJSON": "{}", "FieldBinaryVector": [253, 0], "FieldFloatVector": [2.1, 2.2, 2.3, 2.4]}, + {"FieldBool": true, "FieldInt8": 12, "FieldInt16": 103, "FieldInt32": 1003, "FieldInt64": 10003, "FieldFloat": 3.16, "FieldDouble": 3.56, "FieldString": "hello world", "FieldJSON": "{\"x\": 2, \"y\": 5}", "FieldBinaryVector": [252, 0], "FieldFloatVector": [3.1, 3.2, 3.3, 3.4]}, + {"FieldBool": false, "FieldInt8": 13, "FieldInt16": 104, "FieldInt32": 1004, "FieldInt64": 10004, "FieldFloat": 3.17, "FieldDouble": 4.56, "FieldString": "hello world", "FieldJSON": "{\"x\": true}", "FieldBinaryVector": [251, 0], "FieldFloatVector": [4.1, 4.2, 4.3, 4.4]}, + {"FieldBool": true, "FieldInt8": 14, "FieldInt16": 105, "FieldInt32": 1005, "FieldInt64": 10005, "FieldFloat": 3.18, "FieldDouble": 5.56, "FieldString": "hello world", "FieldJSON": "{}", "FieldBinaryVector": [250, 0], "FieldFloatVector": [5.1, 5.2, 5.3, 5.4]} ] }`) diff --git a/internal/util/importutil/json_parser_test.go b/internal/util/importutil/json_parser_test.go index 54e3609fd1..1f393f8963 100644 --- a/internal/util/importutil/json_parser_test.go +++ b/internal/util/importutil/json_parser_test.go @@ -20,6 +20,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "math" "strconv" "strings" @@ -101,6 +102,7 @@ func Test_JSONParserParseRows_IntPK(t *testing.T) { FieldFloat: 3 + float32(i)/11, FieldDouble: 1 + float64(i)/7, FieldString: "No." + strconv.FormatInt(int64(i), 10), + FieldJSON: fmt.Sprintf("{\"x\": %d}", i), FieldBinaryVector: []int{(200 + i) % math.MaxUint8, 0}, FieldFloatVector: []float32{float32(i) + 0.1, float32(i) + 0.2, float32(i) + 0.3, float32(i) + 0.4}, } diff --git a/internal/util/importutil/numpy_adapter.go b/internal/util/importutil/numpy_adapter.go index 19ca7374a3..210e0517ac 100644 --- a/internal/util/importutil/numpy_adapter.go +++ b/internal/util/importutil/numpy_adapter.go @@ -135,6 +135,7 @@ func convertNumpyType(typeStr string) (schemapb.DataType, error) { return schemapb.DataType_Double, nil default: if isStringType(typeStr) { + // Note: JSON field and VARCHAR field are using string type numpy return schemapb.DataType_VarChar, nil } log.Error("Numpy adapter: the numpy file data type is not supported", zap.String("dtype", typeStr)) diff --git a/internal/util/importutil/numpy_parser.go b/internal/util/importutil/numpy_parser.go index dcd7a3e172..f82d57616d 100644 --- a/internal/util/importutil/numpy_parser.go +++ b/internal/util/importutil/numpy_parser.go @@ -18,6 +18,7 @@ package importutil import ( "context" + "encoding/json" "errors" "fmt" @@ -313,11 +314,15 @@ func (p *NumpyParser) validateHeader(columnReader *NumpyColumnReader) error { shape[1]*8, columnReader.fieldName, columnReader.dimension) } } else { - if elementType != columnReader.dataType { - log.Error("Numpy parser: illegal data type of numpy file for scalar field", zap.Any("numpyDataType", elementType), - zap.String("fieldName", columnReader.fieldName), zap.Any("fieldDataType", columnReader.dataType)) - return fmt.Errorf("illegal data type %s of numpy file for scalar field '%s' with type %s", - getTypeName(elementType), columnReader.fieldName, getTypeName(columnReader.dataType)) + // JSON field and VARCHAR field are using string type numpy + // legal input if columnReader.dataType is JSON and elementType is VARCHAR + if elementType != schemapb.DataType_VarChar && columnReader.dataType != schemapb.DataType_JSON { + if elementType != columnReader.dataType { + log.Error("Numpy parser: illegal data type of numpy file for scalar field", zap.Any("numpyDataType", elementType), + zap.String("fieldName", columnReader.fieldName), zap.Any("fieldDataType", columnReader.dataType)) + return fmt.Errorf("illegal data type %s of numpy file for scalar field '%s' with type %s", + getTypeName(elementType), columnReader.fieldName, getTypeName(columnReader.dataType)) + } } // scalar field, the shape should be 1 @@ -528,6 +533,30 @@ func (p *NumpyParser) readData(columnReader *NumpyColumnReader, rowCount int) (s return &storage.StringFieldData{ Data: data, }, nil + case schemapb.DataType_JSON: + // JSON field read data from string array numpy + data, err := columnReader.reader.ReadString(rowCount) + if err != nil { + log.Error("Numpy parser: failed to read json string array", zap.Error(err)) + return nil, fmt.Errorf("failed to read json string array: %s", err.Error()) + } + + byteArr := make([][]byte, 0) + for _, str := range data { + var dummy interface{} + err := json.Unmarshal([]byte(str), &dummy) + if err != nil { + log.Error("Numpy parser: illegal string value for JSON field", + zap.String("value", str), zap.String("FieldName", columnReader.fieldName), zap.Error(err)) + return nil, fmt.Errorf("failed to parse value '%v' for JSON field '%s', error: %w", + str, columnReader.fieldName, err) + } + byteArr = append(byteArr, []byte(str)) + } + + return &storage.JSONFieldData{ + Data: byteArr, + }, nil case schemapb.DataType_BinaryVector: data, err := columnReader.reader.ReadUint8(rowCount * (columnReader.dimension / 8)) if err != nil { @@ -654,6 +683,12 @@ func (p *NumpyParser) appendFunc(schema *schemapb.FieldSchema) func(src storage. arr.Data = append(arr.Data, src.GetRow(n).(string)) return nil } + case schemapb.DataType_JSON: + return func(src storage.FieldData, n int, target storage.FieldData) error { + arr := target.(*storage.JSONFieldData) + arr.Data = append(arr.Data, src.GetRow(n).([]byte)) + return nil + } default: return nil } diff --git a/internal/util/importutil/numpy_parser_test.go b/internal/util/importutil/numpy_parser_test.go index d477801818..1744f487b4 100644 --- a/internal/util/importutil/numpy_parser_test.go +++ b/internal/util/importutil/numpy_parser_test.go @@ -117,6 +117,7 @@ func Test_NumpyParserValidateFileNames(t *testing.T) { "FieldFloat.npy", "FieldDouble.npy", "FieldString.npy", + "FieldJSON.npy", "FieldBinaryVector.npy", } err = parser.validateFileNames(fileNames) @@ -495,6 +496,10 @@ func Test_NumpyParserReadData(t *testing.T) { specialReadEmptyFunc("FieldString", []string{"aaa"}) }) + t.Run("read JSON", func(t *testing.T) { + specialReadEmptyFunc("FieldJSON", []string{"{\"x\": 1}"}) + }) + t.Run("read binary vector", func(t *testing.T) { specialReadEmptyFunc("FieldBinaryVector", [][2]uint8{{1, 2}, {3, 4}}) })