diff --git a/internal/storage/insert_data.go b/internal/storage/insert_data.go index 3722965279..23e10e5290 100644 --- a/internal/storage/insert_data.go +++ b/internal/storage/insert_data.go @@ -84,7 +84,9 @@ func (i *InsertData) GetRowNum() int { var rowNum int for _, data := range i.Data { rowNum = data.RowNum() - break + if rowNum > 0 { + break + } } return rowNum } diff --git a/internal/util/importutilv2/json/reader_test.go b/internal/util/importutilv2/json/reader_test.go index b30de4e5b5..38dc64d86e 100644 --- a/internal/util/importutilv2/json/reader_test.go +++ b/internal/util/importutilv2/json/reader_test.go @@ -24,7 +24,6 @@ import ( "strings" "testing" - "github.com/samber/lo" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "golang.org/x/exp/slices" @@ -98,55 +97,16 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data }, }, } + insertData, err := testutil.CreateInsertData(schema, suite.numRows) suite.NoError(err) - rows := make([]map[string]any, 0, suite.numRows) - fieldIDToField := lo.KeyBy(schema.GetFields(), func(field *schemapb.FieldSchema) int64 { - return field.GetFieldID() - }) - for i := 0; i < insertData.GetRowNum(); i++ { - data := make(map[int64]interface{}) - for fieldID, v := range insertData.Data { - field := fieldIDToField[fieldID] - dataType := field.GetDataType() - elemType := field.GetElementType() - switch dataType { - case schemapb.DataType_Array: - switch elemType { - case schemapb.DataType_Bool: - data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetBoolData().GetData() - case schemapb.DataType_Int8, schemapb.DataType_Int16, schemapb.DataType_Int32: - data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetIntData().GetData() - case schemapb.DataType_Int64: - data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetLongData().GetData() - case schemapb.DataType_Float: - data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetFloatData().GetData() - case schemapb.DataType_Double: - data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetDoubleData().GetData() - case schemapb.DataType_String: - data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetStringData().GetData() - } - case schemapb.DataType_JSON: - data[fieldID] = string(v.GetRow(i).([]byte)) - case schemapb.DataType_BinaryVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector, schemapb.DataType_SparseFloatVector: - bytes := v.GetRow(i).([]byte) - ints := make([]int, 0, len(bytes)) - for _, b := range bytes { - ints = append(ints, int(b)) - } - data[fieldID] = ints - default: - data[fieldID] = v.GetRow(i) - } - } - row := lo.MapKeys(data, func(_ any, fieldID int64) string { - return fieldIDToField[fieldID].GetName() - }) - rows = append(rows, row) - } + + rows, err := testutil.CreateInsertDataRowsForJSON(schema, insertData) + suite.NoError(err) jsonBytes, err := json.Marshal(rows) suite.NoError(err) + type mockReader struct { io.Reader io.Closer diff --git a/internal/util/importutilv2/json/row_parser.go b/internal/util/importutilv2/json/row_parser.go index ec61751ad9..c3f4bcb1c4 100644 --- a/internal/util/importutilv2/json/row_parser.go +++ b/internal/util/importutilv2/json/row_parser.go @@ -305,46 +305,58 @@ func (r *rowParser) parseEntity(fieldID int64, obj any) (any, error) { vec[i] = float32(num) } return vec, nil - case schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector: + case schemapb.DataType_Float16Vector: + // parse float string to Float16 bytes arr, ok := obj.([]interface{}) if !ok { return nil, r.wrapTypeError(obj, fieldID) } - if len(arr) != r.dim*2 { - return nil, r.wrapDimError(len(arr)/2, fieldID) + if len(arr) != r.dim { + return nil, r.wrapDimError(len(arr), fieldID) } - vec := make([]byte, len(arr)) + vec := make([]byte, len(arr)*2) for i := 0; i < len(arr); i++ { value, ok := arr[i].(json.Number) if !ok { return nil, r.wrapTypeError(arr[i], fieldID) } - num, err := strconv.ParseUint(value.String(), 0, 8) + num, err := strconv.ParseFloat(value.String(), 32) if err != nil { return nil, err } - vec[i] = byte(num) + copy(vec[i*2:], typeutil.Float32ToFloat16Bytes(float32(num))) + } + return vec, nil + case schemapb.DataType_BFloat16Vector: + // parse float string to BFloat16 bytes + arr, ok := obj.([]interface{}) + if !ok { + return nil, r.wrapTypeError(obj, fieldID) + } + if len(arr) != r.dim { + return nil, r.wrapDimError(len(arr), fieldID) + } + vec := make([]byte, len(arr)*2) + for i := 0; i < len(arr); i++ { + value, ok := arr[i].(json.Number) + if !ok { + return nil, r.wrapTypeError(arr[i], fieldID) + } + num, err := strconv.ParseFloat(value.String(), 32) + if err != nil { + return nil, err + } + copy(vec[i*2:], typeutil.Float32ToBFloat16Bytes(float32(num))) } return vec, nil case schemapb.DataType_SparseFloatVector: - arr, ok := obj.([]interface{}) + arr, ok := obj.(map[string]interface{}) if !ok { return nil, r.wrapTypeError(obj, fieldID) } - if len(arr)%8 != 0 { - return nil, r.wrapDimError(len(arr), fieldID) - } - vec := make([]byte, len(arr)) - for i := 0; i < len(arr); i++ { - value, ok := arr[i].(json.Number) - if !ok { - return nil, r.wrapTypeError(arr[i], fieldID) - } - num, err := strconv.ParseUint(value.String(), 0, 8) - if err != nil { - return nil, err - } - vec[i] = byte(num) + vec, err := typeutil.CreateSparseFloatRowFromMap(arr) + if err != nil { + return nil, err } return vec, nil case schemapb.DataType_String, schemapb.DataType_VarChar: diff --git a/internal/util/importutilv2/numpy/reader_test.go b/internal/util/importutilv2/numpy/reader_test.go index f94abb6b1a..3b96e6553d 100644 --- a/internal/util/importutilv2/numpy/reader_test.go +++ b/internal/util/importutilv2/numpy/reader_test.go @@ -128,60 +128,54 @@ func (suite *ReaderSuite) run(dt schemapb.DataType) { io.ReaderAt io.Seeker } + + var data interface{} for fieldID, fieldData := range insertData.Data { dataType := fieldIDToField[fieldID].GetDataType() + rowNum := fieldData.RowNum() switch dataType { case schemapb.DataType_JSON: - jsonStrs := make([]string, 0, fieldData.RowNum()) - for i := 0; i < fieldData.RowNum(); i++ { + jsonStrs := make([]string, 0, rowNum) + for i := 0; i < rowNum; i++ { row := fieldData.GetRow(i) jsonStrs = append(jsonStrs, string(row.([]byte))) } - reader, err := CreateReader(jsonStrs) - suite.NoError(err) - cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{ - Reader: reader, - }, nil) + data = jsonStrs case schemapb.DataType_BinaryVector: - chunked := lo.Chunk(insertData.Data[fieldID].GetRows().([]byte), dim/8) - chunkedRows := make([][dim / 8]byte, len(chunked)) + rows := fieldData.GetRows().([]byte) + const rowBytes = dim / 8 + chunked := lo.Chunk(rows, rowBytes) + chunkedRows := make([][rowBytes]byte, len(chunked)) for i, innerSlice := range chunked { copy(chunkedRows[i][:], innerSlice[:]) } - reader, err := CreateReader(chunkedRows) - suite.NoError(err) - cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{ - Reader: reader, - }, nil) + data = chunkedRows case schemapb.DataType_FloatVector: - chunked := lo.Chunk(insertData.Data[fieldID].GetRows().([]float32), dim) + rows := fieldData.GetRows().([]float32) + chunked := lo.Chunk(rows, dim) chunkedRows := make([][dim]float32, len(chunked)) for i, innerSlice := range chunked { copy(chunkedRows[i][:], innerSlice[:]) } - reader, err := CreateReader(chunkedRows) - suite.NoError(err) - cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{ - Reader: reader, - }, nil) + data = chunkedRows case schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector: - chunked := lo.Chunk(insertData.Data[fieldID].GetRows().([]byte), dim*2) - chunkedRows := make([][dim * 2]byte, len(chunked)) + rows := fieldData.GetRows().([]byte) + const rowBytes = dim * 2 + chunked := lo.Chunk(rows, rowBytes) + chunkedRows := make([][rowBytes]byte, len(chunked)) for i, innerSlice := range chunked { copy(chunkedRows[i][:], innerSlice[:]) } - reader, err := CreateReader(chunkedRows) - suite.NoError(err) - cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{ - Reader: reader, - }, nil) + data = chunkedRows default: - reader, err := CreateReader(insertData.Data[fieldID].GetRows()) - suite.NoError(err) - cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{ - Reader: reader, - }, nil) + data = fieldData.GetRows() } + + reader, err := CreateReader(data) + suite.NoError(err) + cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{ + Reader: reader, + }, nil) } reader, err := NewReader(context.Background(), cm, schema, lo.Values(files), math.MaxInt) @@ -268,59 +262,54 @@ func (suite *ReaderSuite) failRun(dt schemapb.DataType, isDynamic bool) { io.ReaderAt io.Seeker } + + var data interface{} for fieldID, fieldData := range insertData.Data { dataType := fieldIDToField[fieldID].GetDataType() - if dataType == schemapb.DataType_JSON { - jsonStrs := make([]string, 0, fieldData.RowNum()) - for i := 0; i < fieldData.RowNum(); i++ { + rowNum := fieldData.RowNum() + switch dataType { + case schemapb.DataType_JSON: + jsonStrs := make([]string, 0, rowNum) + for i := 0; i < rowNum; i++ { row := fieldData.GetRow(i) jsonStrs = append(jsonStrs, string(row.([]byte))) } - reader, err := CreateReader(jsonStrs) - suite.NoError(err) - cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{ - Reader: reader, - }, nil) - } else if dataType == schemapb.DataType_FloatVector { - chunked := lo.Chunk(insertData.Data[fieldID].GetRows().([]float32), dim) + data = jsonStrs + case schemapb.DataType_BinaryVector: + rows := fieldData.GetRows().([]byte) + const rowBytes = dim / 8 + chunked := lo.Chunk(rows, rowBytes) + chunkedRows := make([][rowBytes]byte, len(chunked)) + for i, innerSlice := range chunked { + copy(chunkedRows[i][:], innerSlice[:]) + } + data = chunkedRows + case schemapb.DataType_FloatVector: + rows := fieldData.GetRows().([]float32) + chunked := lo.Chunk(rows, dim) chunkedRows := make([][dim]float32, len(chunked)) for i, innerSlice := range chunked { copy(chunkedRows[i][:], innerSlice[:]) } - reader, err := CreateReader(chunkedRows) - suite.NoError(err) - cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{ - Reader: reader, - }, nil) - } else if dataType == schemapb.DataType_Float16Vector || dataType == schemapb.DataType_BFloat16Vector { - chunked := lo.Chunk(insertData.Data[fieldID].GetRows().([]byte), dim*2) - chunkedRows := make([][dim * 2]byte, len(chunked)) + data = chunkedRows + case schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector: + rows := fieldData.GetRows().([]byte) + const rowBytes = dim * 2 + chunked := lo.Chunk(rows, rowBytes) + chunkedRows := make([][rowBytes]byte, len(chunked)) for i, innerSlice := range chunked { copy(chunkedRows[i][:], innerSlice[:]) } - reader, err := CreateReader(chunkedRows) - suite.NoError(err) - cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{ - Reader: reader, - }, nil) - } else if dataType == schemapb.DataType_BinaryVector { - chunked := lo.Chunk(insertData.Data[fieldID].GetRows().([]byte), dim/8) - chunkedRows := make([][dim / 8]byte, len(chunked)) - for i, innerSlice := range chunked { - copy(chunkedRows[i][:], innerSlice[:]) - } - reader, err := CreateReader(chunkedRows) - suite.NoError(err) - cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{ - Reader: reader, - }, nil) - } else { - reader, err := CreateReader(insertData.Data[fieldID].GetRows()) - suite.NoError(err) - cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{ - Reader: reader, - }, nil) + data = chunkedRows + default: + data = fieldData.GetRows() } + + reader, err := CreateReader(data) + suite.NoError(err) + cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{ + Reader: reader, + }, nil) } reader, err := NewReader(context.Background(), cm, schema, lo.Values(files), math.MaxInt) diff --git a/internal/util/importutilv2/parquet/field_reader.go b/internal/util/importutilv2/parquet/field_reader.go index 3359bbfa06..707bdade50 100644 --- a/internal/util/importutilv2/parquet/field_reader.go +++ b/internal/util/importutilv2/parquet/field_reader.go @@ -97,31 +97,7 @@ func (c *FieldReader) Next(count int64) (any, error) { case schemapb.DataType_VarChar, schemapb.DataType_String: return ReadStringData(c, count) case schemapb.DataType_JSON: - // JSON field read data from string array Parquet - data, err := ReadStringData(c, count) - if err != nil { - return nil, err - } - if data == nil { - return nil, nil - } - byteArr := make([][]byte, 0) - for _, str := range data.([]string) { - var dummy interface{} - err = json.Unmarshal([]byte(str), &dummy) - if err != nil { - return nil, err - } - if c.field.GetIsDynamic() { - var dummy2 map[string]interface{} - err = json.Unmarshal([]byte(str), &dummy2) - if err != nil { - return nil, err - } - } - byteArr = append(byteArr, []byte(str)) - } - return byteArr, nil + return ReadJSONData(c, count) case schemapb.DataType_BinaryVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector: return ReadBinaryData(c, count) case schemapb.DataType_FloatVector: @@ -135,152 +111,9 @@ func (c *FieldReader) Next(count int64) (any, error) { vectors := lo.Flatten(arrayData.([][]float32)) return vectors, nil case schemapb.DataType_SparseFloatVector: - return ReadBinaryDataForSparseFloatVector(c, count) + return ReadSparseFloatVectorData(c, count) case schemapb.DataType_Array: - data := make([]*schemapb.ScalarField, 0, count) - elementType := c.field.GetElementType() - switch elementType { - case schemapb.DataType_Bool: - boolArray, err := ReadBoolArrayData(c, count) - if err != nil { - return nil, err - } - if boolArray == nil { - return nil, nil - } - for _, elementArray := range boolArray.([][]bool) { - data = append(data, &schemapb.ScalarField{ - Data: &schemapb.ScalarField_BoolData{ - BoolData: &schemapb.BoolArray{ - Data: elementArray, - }, - }, - }) - } - case schemapb.DataType_Int8: - int8Array, err := ReadIntegerOrFloatArrayData[int32](c, count) - if err != nil { - return nil, err - } - if int8Array == nil { - return nil, nil - } - for _, elementArray := range int8Array.([][]int32) { - data = append(data, &schemapb.ScalarField{ - Data: &schemapb.ScalarField_IntData{ - IntData: &schemapb.IntArray{ - Data: elementArray, - }, - }, - }) - } - case schemapb.DataType_Int16: - int16Array, err := ReadIntegerOrFloatArrayData[int32](c, count) - if err != nil { - return nil, err - } - if int16Array == nil { - return nil, nil - } - for _, elementArray := range int16Array.([][]int32) { - data = append(data, &schemapb.ScalarField{ - Data: &schemapb.ScalarField_IntData{ - IntData: &schemapb.IntArray{ - Data: elementArray, - }, - }, - }) - } - case schemapb.DataType_Int32: - int32Array, err := ReadIntegerOrFloatArrayData[int32](c, count) - if err != nil { - return nil, err - } - if int32Array == nil { - return nil, nil - } - for _, elementArray := range int32Array.([][]int32) { - data = append(data, &schemapb.ScalarField{ - Data: &schemapb.ScalarField_IntData{ - IntData: &schemapb.IntArray{ - Data: elementArray, - }, - }, - }) - } - case schemapb.DataType_Int64: - int64Array, err := ReadIntegerOrFloatArrayData[int64](c, count) - if err != nil { - return nil, err - } - if int64Array == nil { - return nil, nil - } - for _, elementArray := range int64Array.([][]int64) { - data = append(data, &schemapb.ScalarField{ - Data: &schemapb.ScalarField_LongData{ - LongData: &schemapb.LongArray{ - Data: elementArray, - }, - }, - }) - } - case schemapb.DataType_Float: - float32Array, err := ReadIntegerOrFloatArrayData[float32](c, count) - if err != nil { - return nil, err - } - if float32Array == nil { - return nil, nil - } - for _, elementArray := range float32Array.([][]float32) { - data = append(data, &schemapb.ScalarField{ - Data: &schemapb.ScalarField_FloatData{ - FloatData: &schemapb.FloatArray{ - Data: elementArray, - }, - }, - }) - } - case schemapb.DataType_Double: - float64Array, err := ReadIntegerOrFloatArrayData[float64](c, count) - if err != nil { - return nil, err - } - if float64Array == nil { - return nil, nil - } - for _, elementArray := range float64Array.([][]float64) { - data = append(data, &schemapb.ScalarField{ - Data: &schemapb.ScalarField_DoubleData{ - DoubleData: &schemapb.DoubleArray{ - Data: elementArray, - }, - }, - }) - } - case schemapb.DataType_VarChar, schemapb.DataType_String: - stringArray, err := ReadStringArrayData(c, count) - if err != nil { - return nil, err - } - if stringArray == nil { - return nil, nil - } - for _, elementArray := range stringArray.([][]string) { - data = append(data, &schemapb.ScalarField{ - Data: &schemapb.ScalarField_StringData{ - StringData: &schemapb.StringArray{ - Data: elementArray, - }, - }, - }) - } - default: - return nil, merr.WrapErrImportFailed(fmt.Sprintf("unsupported data type '%s' for array field '%s'", - elementType.String(), c.field.GetName())) - } - return data, nil + return ReadArrayData(c, count) default: return nil, merr.WrapErrImportFailed(fmt.Sprintf("unsupported data type '%s' for field '%s'", c.field.GetDataType().String(), c.field.GetName())) @@ -382,6 +215,34 @@ func ReadStringData(pcr *FieldReader, count int64) (any, error) { return data, nil } +func ReadJSONData(pcr *FieldReader, count int64) (any, error) { + // JSON field read data from string array Parquet + data, err := ReadStringData(pcr, count) + if err != nil { + return nil, err + } + if data == nil { + return nil, nil + } + byteArr := make([][]byte, 0) + for _, str := range data.([]string) { + var dummy interface{} + err = json.Unmarshal([]byte(str), &dummy) + if err != nil { + return nil, err + } + if pcr.field.GetIsDynamic() { + var dummy2 map[string]interface{} + err = json.Unmarshal([]byte(str), &dummy2) + if err != nil { + return nil, err + } + } + byteArr = append(byteArr, []byte(str)) + } + return byteArr, nil +} + func ReadBinaryData(pcr *FieldReader, count int64) (any, error) { dataType := pcr.field.GetDataType() chunked, err := pcr.columnReader.NextBatch(count) @@ -417,38 +278,32 @@ func ReadBinaryData(pcr *FieldReader, count int64) (any, error) { return data, nil } -func ReadBinaryDataForSparseFloatVector(pcr *FieldReader, count int64) (any, error) { - chunked, err := pcr.columnReader.NextBatch(count) +func ReadSparseFloatVectorData(pcr *FieldReader, count int64) (any, error) { + data, err := ReadStringData(pcr, count) if err != nil { return nil, err } - data := make([][]byte, 0, count) + if data == nil { + return nil, nil + } + byteArr := make([][]byte, 0, count) maxDim := uint32(0) - for _, chunk := range chunked.Chunks() { - listReader := chunk.(*array.List) - offsets := listReader.Offsets() - if !isVectorAligned(offsets, pcr.dim, schemapb.DataType_SparseFloatVector) { - return nil, merr.WrapErrImportFailed("%s not aligned", schemapb.DataType_SparseFloatVector.String()) + for _, str := range data.([]string) { + rowVec, err := typeutil.CreateSparseFloatRowFromJSON([]byte(str)) + if err != nil { + return nil, merr.WrapErrImportFailed(fmt.Sprintf("Invalid JSON string for SparseFloatVector: '%s', err = %v", str, err)) } - uint8Reader, ok := listReader.ListValues().(*array.Uint8) - if !ok { - return nil, WrapTypeErr("binary", listReader.ListValues().DataType().Name(), pcr.field) - } - vecData := uint8Reader.Uint8Values() - for i := 1; i < len(offsets); i++ { - elemCount := int((offsets[i] - offsets[i-1]) / 8) - rowVec := vecData[offsets[i-1]:offsets[i]] - data = append(data, rowVec) - maxIdx := typeutil.SparseFloatRowIndexAt(rowVec, elemCount-1) - if maxIdx+1 > maxDim { - maxDim = maxIdx + 1 - } + byteArr = append(byteArr, rowVec) + elemCount := len(rowVec) / 8 + maxIdx := typeutil.SparseFloatRowIndexAt(rowVec, elemCount-1) + if maxIdx+1 > maxDim { + maxDim = maxIdx + 1 } } return &storage.SparseFloatVectorFieldData{ SparseFloatArray: schemapb.SparseFloatArray{ Dim: int64(maxDim), - Contents: data, + Contents: byteArr, }, }, nil } @@ -462,16 +317,6 @@ func checkVectorAlignWithDim(offsets []int32, dim int32) bool { return true } -func checkSparseFloatVectorAlign(offsets []int32) bool { - // index: 4 bytes, value: 4 bytes - for i := 1; i < len(offsets); i++ { - if (offsets[i]-offsets[i-1])%8 != 0 { - return false - } - } - return true -} - func isVectorAligned(offsets []int32, dim int, dataType schemapb.DataType) bool { if len(offsets) < 1 { return false @@ -484,7 +329,8 @@ func isVectorAligned(offsets []int32, dim int, dataType schemapb.DataType) bool case schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector: return checkVectorAlignWithDim(offsets, int32(dim*2)) case schemapb.DataType_SparseFloatVector: - return checkSparseFloatVectorAlign(offsets) + // JSON format, skip alignment check + return true default: return false } @@ -620,3 +466,150 @@ func ReadStringArrayData(pcr *FieldReader, count int64) (any, error) { } return data, nil } + +func ReadArrayData(pcr *FieldReader, count int64) (any, error) { + data := make([]*schemapb.ScalarField, 0, count) + elementType := pcr.field.GetElementType() + switch elementType { + case schemapb.DataType_Bool: + boolArray, err := ReadBoolArrayData(pcr, count) + if err != nil { + return nil, err + } + if boolArray == nil { + return nil, nil + } + for _, elementArray := range boolArray.([][]bool) { + data = append(data, &schemapb.ScalarField{ + Data: &schemapb.ScalarField_BoolData{ + BoolData: &schemapb.BoolArray{ + Data: elementArray, + }, + }, + }) + } + case schemapb.DataType_Int8: + int8Array, err := ReadIntegerOrFloatArrayData[int32](pcr, count) + if err != nil { + return nil, err + } + if int8Array == nil { + return nil, nil + } + for _, elementArray := range int8Array.([][]int32) { + data = append(data, &schemapb.ScalarField{ + Data: &schemapb.ScalarField_IntData{ + IntData: &schemapb.IntArray{ + Data: elementArray, + }, + }, + }) + } + case schemapb.DataType_Int16: + int16Array, err := ReadIntegerOrFloatArrayData[int32](pcr, count) + if err != nil { + return nil, err + } + if int16Array == nil { + return nil, nil + } + for _, elementArray := range int16Array.([][]int32) { + data = append(data, &schemapb.ScalarField{ + Data: &schemapb.ScalarField_IntData{ + IntData: &schemapb.IntArray{ + Data: elementArray, + }, + }, + }) + } + case schemapb.DataType_Int32: + int32Array, err := ReadIntegerOrFloatArrayData[int32](pcr, count) + if err != nil { + return nil, err + } + if int32Array == nil { + return nil, nil + } + for _, elementArray := range int32Array.([][]int32) { + data = append(data, &schemapb.ScalarField{ + Data: &schemapb.ScalarField_IntData{ + IntData: &schemapb.IntArray{ + Data: elementArray, + }, + }, + }) + } + case schemapb.DataType_Int64: + int64Array, err := ReadIntegerOrFloatArrayData[int64](pcr, count) + if err != nil { + return nil, err + } + if int64Array == nil { + return nil, nil + } + for _, elementArray := range int64Array.([][]int64) { + data = append(data, &schemapb.ScalarField{ + Data: &schemapb.ScalarField_LongData{ + LongData: &schemapb.LongArray{ + Data: elementArray, + }, + }, + }) + } + case schemapb.DataType_Float: + float32Array, err := ReadIntegerOrFloatArrayData[float32](pcr, count) + if err != nil { + return nil, err + } + if float32Array == nil { + return nil, nil + } + for _, elementArray := range float32Array.([][]float32) { + data = append(data, &schemapb.ScalarField{ + Data: &schemapb.ScalarField_FloatData{ + FloatData: &schemapb.FloatArray{ + Data: elementArray, + }, + }, + }) + } + case schemapb.DataType_Double: + float64Array, err := ReadIntegerOrFloatArrayData[float64](pcr, count) + if err != nil { + return nil, err + } + if float64Array == nil { + return nil, nil + } + for _, elementArray := range float64Array.([][]float64) { + data = append(data, &schemapb.ScalarField{ + Data: &schemapb.ScalarField_DoubleData{ + DoubleData: &schemapb.DoubleArray{ + Data: elementArray, + }, + }, + }) + } + case schemapb.DataType_VarChar, schemapb.DataType_String: + stringArray, err := ReadStringArrayData(pcr, count) + if err != nil { + return nil, err + } + if stringArray == nil { + return nil, nil + } + for _, elementArray := range stringArray.([][]string) { + data = append(data, &schemapb.ScalarField{ + Data: &schemapb.ScalarField_StringData{ + StringData: &schemapb.StringArray{ + Data: elementArray, + }, + }, + }) + } + default: + return nil, merr.WrapErrImportFailed(fmt.Sprintf("unsupported data type '%s' for array field '%s'", + elementType.String(), pcr.field.GetName())) + } + return data, nil +} diff --git a/internal/util/importutilv2/parquet/util.go b/internal/util/importutilv2/parquet/util.go index 4164ff4f6e..d74b293474 100644 --- a/internal/util/importutilv2/parquet/util.go +++ b/internal/util/importutilv2/parquet/util.go @@ -183,7 +183,7 @@ func convertToArrowDataType(field *schemapb.FieldSchema, isArray bool) (arrow.Da Nullable: true, Metadata: arrow.Metadata{}, }), nil - case schemapb.DataType_BinaryVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector, schemapb.DataType_SparseFloatVector: + case schemapb.DataType_BinaryVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector: return arrow.ListOfField(arrow.Field{ Name: "item", Type: &arrow.Uint8Type{}, @@ -197,6 +197,8 @@ func convertToArrowDataType(field *schemapb.FieldSchema, isArray bool) (arrow.Da Nullable: true, Metadata: arrow.Metadata{}, }), nil + case schemapb.DataType_SparseFloatVector: + return &arrow.StringType{}, nil default: return nil, merr.WrapErrParameterInvalidMsg("unsupported data type %v", dataType.String()) } diff --git a/internal/util/testutil/test_util.go b/internal/util/testutil/test_util.go index a9da8ca3b8..4548f0e77f 100644 --- a/internal/util/testutil/test_util.go +++ b/internal/util/testutil/test_util.go @@ -1,6 +1,7 @@ package testutil import ( + "encoding/json" "fmt" "math/rand" "strconv" @@ -333,23 +334,24 @@ func BuildArrayData(schema *schemapb.CollectionSchema, insertData *storage.Inser builder.AppendValues(offsets, valid) columns = append(columns, builder.NewListArray()) case schemapb.DataType_SparseFloatVector: - sparseFloatVecData := make([]byte, 0) - builder := array.NewListBuilder(mem, &arrow.Uint8Type{}) + builder := array.NewStringBuilder(mem) contents := insertData.Data[fieldID].(*storage.SparseFloatVectorFieldData).GetContents() rows := len(contents) - offsets := make([]int32, 0, rows) - valid := make([]bool, 0, rows) - currOffset := int32(0) + jsonBytesData := make([][]byte, 0) for i := 0; i < rows; i++ { rowVecData := contents[i] - sparseFloatVecData = append(sparseFloatVecData, rowVecData...) - offsets = append(offsets, currOffset) - currOffset = currOffset + int32(len(rowVecData)) - valid = append(valid, true) + mapData := typeutil.SparseFloatBytesToMap(rowVecData) + // convert to JSON format + jsonBytes, err := json.Marshal(mapData) + if err != nil { + return nil, err + } + jsonBytesData = append(jsonBytesData, jsonBytes) } - builder.ValueBuilder().(*array.Uint8Builder).AppendValues(sparseFloatVecData, nil) - builder.AppendValues(offsets, valid) - columns = append(columns, builder.NewListArray()) + builder.AppendValues(lo.Map(jsonBytesData, func(bs []byte, _ int) string { + return string(bs) + }), nil) + columns = append(columns, builder.NewStringArray()) case schemapb.DataType_JSON: builder := array.NewStringBuilder(mem) jsonData := insertData.Data[fieldID].(*storage.JSONFieldData).Data @@ -482,3 +484,66 @@ func BuildArrayData(schema *schemapb.CollectionSchema, insertData *storage.Inser } return columns, nil } + +func CreateInsertDataRowsForJSON(schema *schemapb.CollectionSchema, insertData *storage.InsertData) ([]map[string]any, error) { + fieldIDToField := lo.KeyBy(schema.GetFields(), func(field *schemapb.FieldSchema) int64 { + return field.GetFieldID() + }) + + rowNum := insertData.GetRowNum() + rows := make([]map[string]any, 0, rowNum) + for i := 0; i < rowNum; i++ { + data := make(map[int64]interface{}) + for fieldID, v := range insertData.Data { + field := fieldIDToField[fieldID] + dataType := field.GetDataType() + elemType := field.GetElementType() + if field.GetAutoID() { + continue + } + switch dataType { + case schemapb.DataType_Array: + switch elemType { + case schemapb.DataType_Bool: + data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetBoolData().GetData() + case schemapb.DataType_Int8, schemapb.DataType_Int16, schemapb.DataType_Int32: + data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetIntData().GetData() + case schemapb.DataType_Int64: + data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetLongData().GetData() + case schemapb.DataType_Float: + data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetFloatData().GetData() + case schemapb.DataType_Double: + data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetDoubleData().GetData() + case schemapb.DataType_String: + data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetStringData().GetData() + } + case schemapb.DataType_JSON: + data[fieldID] = string(v.GetRow(i).([]byte)) + case schemapb.DataType_BinaryVector: + bytes := v.GetRow(i).([]byte) + ints := make([]int, 0, len(bytes)) + for _, b := range bytes { + ints = append(ints, int(b)) + } + data[fieldID] = ints + case schemapb.DataType_Float16Vector: + bytes := v.GetRow(i).([]byte) + data[fieldID] = typeutil.Float16BytesToFloat32Vector(bytes) + case schemapb.DataType_BFloat16Vector: + bytes := v.GetRow(i).([]byte) + data[fieldID] = typeutil.BFloat16BytesToFloat32Vector(bytes) + case schemapb.DataType_SparseFloatVector: + bytes := v.GetRow(i).([]byte) + data[fieldID] = typeutil.SparseFloatBytesToMap(bytes) + default: + data[fieldID] = v.GetRow(i) + } + } + row := lo.MapKeys(data, func(_ any, fieldID int64) string { + return fieldIDToField[fieldID].GetName() + }) + rows = append(rows, row) + } + + return rows, nil +} diff --git a/pkg/util/testutils/gen_data.go b/pkg/util/testutils/gen_data.go index ce48aad783..0eb692a224 100644 --- a/pkg/util/testutils/gen_data.go +++ b/pkg/util/testutils/gen_data.go @@ -248,27 +248,20 @@ func GenerateFloatVectors(numRows, dim int) []float32 { func GenerateFloat16Vectors(numRows, dim int) []byte { total := numRows * dim - ret := make([]byte, total*2) + ret := make([]byte, 0, total*2) for i := 0; i < total; i++ { - v := float16.Fromfloat32(rand.Float32()).Bits() - binary.LittleEndian.PutUint16(ret[i*2:], v) + f := (rand.Float32() - 0.5) * 100 + ret = append(ret, typeutil.Float32ToFloat16Bytes(f)...) } return ret } func GenerateBFloat16Vectors(numRows, dim int) []byte { total := numRows * dim - ret16 := make([]uint16, 0, total) + ret := make([]byte, 0, total*2) for i := 0; i < total; i++ { - f := rand.Float32() - bits := math.Float32bits(f) - bits >>= 16 - bits &= 0x7FFF - ret16 = append(ret16, uint16(bits)) - } - ret := make([]byte, len(ret16)*2) - for i, value := range ret16 { - binary.LittleEndian.PutUint16(ret[i*2:], value) + f := (rand.Float32() - 0.5) * 100 + ret = append(ret, typeutil.Float32ToBFloat16Bytes(f)...) } return ret } diff --git a/pkg/util/typeutil/convension.go b/pkg/util/typeutil/convension.go index d5e2e96e34..95e138b5c5 100644 --- a/pkg/util/typeutil/convension.go +++ b/pkg/util/typeutil/convension.go @@ -23,6 +23,7 @@ import ( "reflect" "github.com/golang/protobuf/proto" + "github.com/x448/float16" "go.uber.org/zap" "github.com/milvus-io/milvus/pkg/common" @@ -115,3 +116,52 @@ func SliceRemoveDuplicate(a interface{}) (ret []interface{}) { return ret } + +func Float32ToFloat16Bytes(f float32) []byte { + ret := make([]byte, 2) + common.Endian.PutUint16(ret[:], float16.Fromfloat32(f).Bits()) + return ret +} + +func Float16BytesToFloat32(b []byte) float32 { + return float16.Frombits(common.Endian.Uint16(b)).Float32() +} + +func Float16BytesToFloat32Vector(b []byte) []float32 { + dim := len(b) / 2 + vec := make([]float32, 0, dim) + for j := 0; j < dim; j++ { + vec = append(vec, Float16BytesToFloat32(b[j*2:])) + } + return vec +} + +func Float32ToBFloat16Bytes(f float32) []byte { + ret := make([]byte, 2) + common.Endian.PutUint16(ret[:], uint16(math.Float32bits(f)>>16)) + return ret +} + +func BFloat16BytesToFloat32(b []byte) float32 { + return math.Float32frombits(uint32(common.Endian.Uint16(b)) << 16) +} + +func BFloat16BytesToFloat32Vector(b []byte) []float32 { + dim := len(b) / 2 + vec := make([]float32, 0, dim) + for j := 0; j < dim; j++ { + vec = append(vec, BFloat16BytesToFloat32(b[j*2:])) + } + return vec +} + +func SparseFloatBytesToMap(b []byte) map[uint32]float32 { + elemCount := len(b) / 8 + values := make(map[uint32]float32) + for j := 0; j < elemCount; j++ { + idx := common.Endian.Uint32(b[j*8:]) + f := BytesToFloat32(b[j*8+4:]) + values[idx] = f + } + return values +} diff --git a/pkg/util/typeutil/conversion_test.go b/pkg/util/typeutil/conversion_test.go index da5a9623fb..56bd88b54a 100644 --- a/pkg/util/typeutil/conversion_test.go +++ b/pkg/util/typeutil/conversion_test.go @@ -18,9 +18,13 @@ package typeutil import ( "math" + "math/rand" "testing" "github.com/stretchr/testify/assert" + "go.uber.org/zap" + + "github.com/milvus-io/milvus/pkg/log" ) func TestConversion(t *testing.T) { @@ -94,4 +98,24 @@ func TestConversion(t *testing.T) { ret1 := SliceRemoveDuplicate(arr) assert.Equal(t, 3, len(ret1)) }) + + t.Run("TestFloat16", func(t *testing.T) { + for i := 0; i < 100; i++ { + v := (rand.Float32() - 0.5) * 100 + b := Float32ToFloat16Bytes(v) + v2 := Float16BytesToFloat32(b) + log.Info("float16", zap.Float32("v", v), zap.Float32("v2", v2)) + assert.Less(t, math.Abs(float64(v2/v-1)), 0.001) + } + }) + + t.Run("TestBFloat16", func(t *testing.T) { + for i := 0; i < 100; i++ { + v := (rand.Float32() - 0.5) * 100 + b := Float32ToBFloat16Bytes(v) + v2 := BFloat16BytesToFloat32(b) + log.Info("bfloat16", zap.Float32("v", v), zap.Float32("v2", v2)) + assert.Less(t, math.Abs(float64(v2/v-1)), 0.01) + } + }) } diff --git a/pkg/util/typeutil/schema.go b/pkg/util/typeutil/schema.go index 14ccbb3fcc..5a696be743 100644 --- a/pkg/util/typeutil/schema.go +++ b/pkg/util/typeutil/schema.go @@ -22,6 +22,7 @@ import ( "encoding/json" "fmt" "math" + "reflect" "sort" "strconv" "unsafe" @@ -1505,58 +1506,10 @@ func SparseFloatRowSetAt(row []byte, pos int, idx uint32, value float32) { binary.LittleEndian.PutUint32(row[pos*8+4:], math.Float32bits(value)) } -func CreateSparseFloatRow(indices []uint32, values []float32) []byte { - row := make([]byte, len(indices)*8) - for i := 0; i < len(indices); i++ { - SparseFloatRowSetAt(row, i, indices[i], values[i]) - } - return row -} +func SortSparseFloatRow(indices []uint32, values []float32) ([]uint32, []float32) { + elemCount := len(indices) -type sparseFloatVectorJSONRepresentation struct { - Indices []uint32 `json:"indices"` - Values []float32 `json:"values"` -} - -// accepted format: -// - {"indices": [1, 2, 3], "values": [0.1, 0.2, 0.3]} -// - {"1": 0.1, "2": 0.2, "3": 0.3} -// -// we don't require the indices to be sorted from user input, but the returned -// byte representation must have indices sorted -func CreateSparseFloatRowFromJSON(input []byte) ([]byte, error) { - var indices []uint32 - var values []float32 - - var vec sparseFloatVectorJSONRepresentation - decoder := json.NewDecoder(bytes.NewReader(input)) - decoder.DisallowUnknownFields() - err := decoder.Decode(&vec) - if err == nil { - if len(vec.Indices) != len(vec.Values) { - return nil, fmt.Errorf("indices and values length mismatch") - } - if len(vec.Indices) == 0 { - return nil, fmt.Errorf("empty indices/values in JSON input") - } - indices = vec.Indices - values = vec.Values - } else { - var vec2 map[uint32]float32 - decoder = json.NewDecoder(bytes.NewReader(input)) - decoder.DisallowUnknownFields() - err = decoder.Decode(&vec2) - if err != nil { - return nil, fmt.Errorf("failed to parse JSON input: %v", err) - } - - for idx, val := range vec2 { - indices = append(indices, idx) - values = append(values, val) - } - } - - indexOrder := make([]int, len(indices)) + indexOrder := make([]int, elemCount) for i := range indexOrder { indexOrder[i] = i } @@ -1565,13 +1518,109 @@ func CreateSparseFloatRowFromJSON(input []byte) ([]byte, error) { return indices[indexOrder[i]] < indices[indexOrder[j]] }) - sortedIndices := make([]uint32, len(indices)) - sortedValues := make([]float32, len(values)) + sortedIndices := make([]uint32, elemCount) + sortedValues := make([]float32, elemCount) for i, index := range indexOrder { sortedIndices[i] = indices[index] sortedValues[i] = values[index] } + return sortedIndices, sortedValues +} + +func CreateSparseFloatRow(indices []uint32, values []float32) []byte { + row := make([]byte, len(indices)*8) + for i := 0; i < len(indices); i++ { + SparseFloatRowSetAt(row, i, indices[i], values[i]) + } + return row +} + +// accepted format: +// - {"indices": [1, 2, 3], "values": [0.1, 0.2, 0.3]} # format1 +// - {"1": 0.1, "2": 0.2, "3": 0.3} # format2 +// +// we don't require the indices to be sorted from user input, but the returned +// byte representation must have indices sorted +func CreateSparseFloatRowFromMap(input map[string]interface{}) ([]byte, error) { + var indices []uint32 + var values []float32 + + if len(input) == 0 { + return nil, fmt.Errorf("empty JSON input") + } + + jsonIndices, ok1 := input["indices"].([]interface{}) + jsonValues, ok2 := input["values"].([]interface{}) + + if ok1 && ok2 { + // try format1 + for _, idx := range jsonIndices { + if i1, s1 := idx.(int); s1 { + indices = append(indices, uint32(i1)) + } else if i2, s2 := idx.(float64); s2 && i2 == float64(int(i2)) { + indices = append(indices, uint32(i2)) + } else if i3, s3 := idx.(json.Number); s3 { + if num, err := strconv.ParseUint(i3.String(), 0, 32); err == nil { + indices = append(indices, uint32(num)) + } else { + return nil, err + } + } else { + return nil, fmt.Errorf("invalid indicies type: %v(%s)", idx, reflect.TypeOf(idx)) + } + } + for _, val := range jsonValues { + if v1, s1 := val.(int); s1 { + values = append(values, float32(v1)) + } else if v2, s2 := val.(float64); s2 { + values = append(values, float32(v2)) + } else if v3, s3 := val.(json.Number); s3 { + if num, err := strconv.ParseFloat(v3.String(), 32); err == nil { + values = append(values, float32(num)) + } else { + return nil, err + } + } else { + return nil, fmt.Errorf("invalid values type: %v(%s)", val, reflect.TypeOf(val)) + } + } + } else if !ok1 && !ok2 { + // try format2 + for k, v := range input { + idx, err := strconv.ParseUint(k, 0, 32) + if err != nil { + return nil, err + } + + var val float64 + val, ok := v.(float64) + if !ok { + num, ok := v.(json.Number) + if !ok { + return nil, fmt.Errorf("invalid value type in JSON: %s", reflect.TypeOf(v)) + } + val, err = strconv.ParseFloat(num.String(), 32) + if err != nil { + return nil, err + } + } + + indices = append(indices, uint32(idx)) + values = append(values, float32(val)) + } + } else { + return nil, fmt.Errorf("invalid JSON input") + } + + if len(indices) != len(values) { + return nil, fmt.Errorf("indices and values length mismatch") + } + if len(indices) == 0 { + return nil, fmt.Errorf("empty indices/values in JSON input") + } + + sortedIndices, sortedValues := SortSparseFloatRow(indices, values) row := CreateSparseFloatRow(sortedIndices, sortedValues) if err := ValidateSparseFloatRows(row); err != nil { return nil, err @@ -1579,6 +1628,17 @@ func CreateSparseFloatRowFromJSON(input []byte) ([]byte, error) { return row, nil } +func CreateSparseFloatRowFromJSON(input []byte) ([]byte, error) { + var vec map[string]interface{} + decoder := json.NewDecoder(bytes.NewReader(input)) + decoder.DisallowUnknownFields() + err := decoder.Decode(&vec) + if err != nil { + return nil, err + } + return CreateSparseFloatRowFromMap(vec) +} + // dim of a sparse float vector is the maximum/last index + 1 func SparseFloatRowDim(row []byte) int64 { if len(row) == 0 { diff --git a/pkg/util/typeutil/schema_test.go b/pkg/util/typeutil/schema_test.go index c3cc583f53..f487336b94 100644 --- a/pkg/util/typeutil/schema_test.go +++ b/pkg/util/typeutil/schema_test.go @@ -2119,6 +2119,122 @@ func TestValidateSparseFloatRows(t *testing.T) { } func TestParseJsonSparseFloatRow(t *testing.T) { + t.Run("valid row 1", func(t *testing.T) { + row := map[string]interface{}{"indices": []interface{}{1, 3, 5}, "values": []interface{}{1.0, 2.0, 3.0}} + res, err := CreateSparseFloatRowFromMap(row) + assert.NoError(t, err) + assert.Equal(t, CreateSparseFloatRow([]uint32{1, 3, 5}, []float32{1.0, 2.0, 3.0}), res) + }) + + t.Run("valid row 2", func(t *testing.T) { + row := map[string]interface{}{"indices": []interface{}{3, 1, 5}, "values": []interface{}{1.0, 2.0, 3.0}} + res, err := CreateSparseFloatRowFromMap(row) + assert.NoError(t, err) + assert.Equal(t, CreateSparseFloatRow([]uint32{1, 3, 5}, []float32{2.0, 1.0, 3.0}), res) + }) + + t.Run("valid row 3", func(t *testing.T) { + row := map[string]interface{}{"indices": []interface{}{1, 3, 5}, "values": []interface{}{1, 2, 3}} + res, err := CreateSparseFloatRowFromMap(row) + assert.NoError(t, err) + assert.Equal(t, CreateSparseFloatRow([]uint32{1, 3, 5}, []float32{1.0, 2.0, 3.0}), res) + }) + + t.Run("valid row 3", func(t *testing.T) { + row := map[string]interface{}{"indices": []interface{}{math.MaxInt32 + 1}, "values": []interface{}{1.0}} + res, err := CreateSparseFloatRowFromMap(row) + assert.NoError(t, err) + assert.Equal(t, CreateSparseFloatRow([]uint32{math.MaxInt32 + 1}, []float32{1.0}), res) + }) + + t.Run("invalid row 1", func(t *testing.T) { + row := map[string]interface{}{"indices": []interface{}{1, 3, 5}, "values": []interface{}{1.0, 2.0}} + _, err := CreateSparseFloatRowFromMap(row) + assert.Error(t, err) + }) + + t.Run("invalid row 2", func(t *testing.T) { + row := map[string]interface{}{"indices": []interface{}{1}, "values": []interface{}{1.0, 2.0}} + _, err := CreateSparseFloatRowFromMap(row) + assert.Error(t, err) + }) + + t.Run("invalid row 3", func(t *testing.T) { + row := map[string]interface{}{"indices": []interface{}{}, "values": []interface{}{}} + _, err := CreateSparseFloatRowFromMap(row) + assert.Error(t, err) + }) + + t.Run("invalid row 4", func(t *testing.T) { + row := map[string]interface{}{"indices": []interface{}{3}, "values": []interface{}{-0.2}} + _, err := CreateSparseFloatRowFromMap(row) + assert.Error(t, err) + }) + + t.Run("invalid row 5", func(t *testing.T) { + row := map[string]interface{}{"indices": []interface{}{3.1}, "values": []interface{}{0.2}} + _, err := CreateSparseFloatRowFromMap(row) + assert.Error(t, err) + }) + + t.Run("valid dict row 1", func(t *testing.T) { + row := map[string]interface{}{"1": 1.0, "3": 2.0, "5": 3.0} + res, err := CreateSparseFloatRowFromMap(row) + assert.NoError(t, err) + assert.Equal(t, CreateSparseFloatRow([]uint32{1, 3, 5}, []float32{1.0, 2.0, 3.0}), res) + }) + + t.Run("valid dict row 2", func(t *testing.T) { + row := map[string]interface{}{"3": 1.0, "1": 2.0, "5": 3.0} + res, err := CreateSparseFloatRowFromMap(row) + assert.NoError(t, err) + assert.Equal(t, CreateSparseFloatRow([]uint32{1, 3, 5}, []float32{2.0, 1.0, 3.0}), res) + }) + + t.Run("invalid dict row 1", func(t *testing.T) { + row := map[string]interface{}{"a": 1.0, "3": 2.0, "5": 3.0} + _, err := CreateSparseFloatRowFromMap(row) + assert.Error(t, err) + }) + + t.Run("invalid dict row 2", func(t *testing.T) { + row := map[string]interface{}{"1": "a", "3": 2.0, "5": 3.0} + _, err := CreateSparseFloatRowFromMap(row) + assert.Error(t, err) + }) + + t.Run("invalid dict row 3", func(t *testing.T) { + row := map[string]interface{}{"1": "1.0", "3": 2.0, "5": 3.0} + _, err := CreateSparseFloatRowFromMap(row) + assert.Error(t, err) + }) + + t.Run("invalid dict row 4", func(t *testing.T) { + row := map[string]interface{}{"-1": 1.0, "3": 2.0, "5": 3.0} + _, err := CreateSparseFloatRowFromMap(row) + assert.Error(t, err) + }) + + t.Run("invalid dict row 5", func(t *testing.T) { + row := map[string]interface{}{"1": -1.0, "3": 2.0, "5": 3.0} + _, err := CreateSparseFloatRowFromMap(row) + assert.Error(t, err) + }) + + t.Run("invalid dict row 6", func(t *testing.T) { + row := map[string]interface{}{} + _, err := CreateSparseFloatRowFromMap(row) + assert.Error(t, err) + }) + + t.Run("invalid dict row 7", func(t *testing.T) { + row := map[string]interface{}{"1.1": 1.0, "3": 2.0, "5": 3.0} + _, err := CreateSparseFloatRowFromMap(row) + assert.Error(t, err) + }) +} + +func TestParseJsonSparseFloatRowBytes(t *testing.T) { t.Run("valid row 1", func(t *testing.T) { row := []byte(`{"indices":[1,3,5],"values":[1.0,2.0,3.0]}`) res, err := CreateSparseFloatRowFromJSON(row) @@ -2133,6 +2249,20 @@ func TestParseJsonSparseFloatRow(t *testing.T) { assert.Equal(t, CreateSparseFloatRow([]uint32{1, 3, 5}, []float32{2.0, 1.0, 3.0}), res) }) + t.Run("valid row 3", func(t *testing.T) { + row := []byte(`{"indices":[1, 3, 5], "values":[1, 2, 3]}`) + res, err := CreateSparseFloatRowFromJSON(row) + assert.NoError(t, err) + assert.Equal(t, CreateSparseFloatRow([]uint32{1, 3, 5}, []float32{1.0, 2.0, 3.0}), res) + }) + + t.Run("valid row 3", func(t *testing.T) { + row := []byte(`{"indices":[2147483648], "values":[1.0]}`) + res, err := CreateSparseFloatRowFromJSON(row) + assert.NoError(t, err) + assert.Equal(t, CreateSparseFloatRow([]uint32{math.MaxInt32 + 1}, []float32{1.0}), res) + }) + t.Run("invalid row 1", func(t *testing.T) { row := []byte(`{"indices":[1,3,5],"values":[1.0,2.0,3.0`) _, err := CreateSparseFloatRowFromJSON(row) @@ -2169,6 +2299,12 @@ func TestParseJsonSparseFloatRow(t *testing.T) { assert.Error(t, err) }) + t.Run("invalid row 7", func(t *testing.T) { + row := []byte(`{"indices": []interface{}{3.1}, "values": []interface{}{0.2}}`) + _, err := CreateSparseFloatRowFromJSON(row) + assert.Error(t, err) + }) + t.Run("valid dict row 1", func(t *testing.T) { row := []byte(`{"1": 1.0, "3": 2.0, "5": 3.0}`) res, err := CreateSparseFloatRowFromJSON(row) @@ -2224,4 +2360,10 @@ func TestParseJsonSparseFloatRow(t *testing.T) { _, err := CreateSparseFloatRowFromJSON(row) assert.Error(t, err) }) + + t.Run("invalid dict row 8", func(t *testing.T) { + row := []byte(`{"1.1": 1.0, "3": 2.0, "5": 3.0}`) + _, err := CreateSparseFloatRowFromJSON(row) + assert.Error(t, err) + }) } diff --git a/tests/integration/import/import_test.go b/tests/integration/import/import_test.go index 7d103cfde2..ae592ebc39 100644 --- a/tests/integration/import/import_test.go +++ b/tests/integration/import/import_test.go @@ -81,11 +81,17 @@ func (s *BulkInsertSuite) run() { collectionName := "TestBulkInsert" + funcutil.GenRandomStr() - schema := integration.ConstructSchema(collectionName, dim, s.autoID, - &schemapb.FieldSchema{FieldID: 100, Name: "id", DataType: s.pkType, TypeParams: []*commonpb.KeyValuePair{{Key: common.MaxLengthKey, Value: "128"}}, IsPrimaryKey: true, AutoID: s.autoID}, - &schemapb.FieldSchema{FieldID: 101, Name: "image_path", DataType: schemapb.DataType_VarChar, TypeParams: []*commonpb.KeyValuePair{{Key: common.MaxLengthKey, Value: "65535"}}}, - &schemapb.FieldSchema{FieldID: 102, Name: "embeddings", DataType: s.vecType, TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "128"}}}, - ) + var schema *schemapb.CollectionSchema + fieldSchema1 := &schemapb.FieldSchema{FieldID: 100, Name: "id", DataType: s.pkType, TypeParams: []*commonpb.KeyValuePair{{Key: common.MaxLengthKey, Value: "128"}}, IsPrimaryKey: true, AutoID: s.autoID} + fieldSchema2 := &schemapb.FieldSchema{FieldID: 101, Name: "image_path", DataType: schemapb.DataType_VarChar, TypeParams: []*commonpb.KeyValuePair{{Key: common.MaxLengthKey, Value: "65535"}}} + fieldSchema3 := &schemapb.FieldSchema{FieldID: 102, Name: "embeddings", DataType: s.vecType, TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "128"}}} + fieldSchema4 := &schemapb.FieldSchema{FieldID: 103, Name: "embeddings", DataType: s.vecType, TypeParams: []*commonpb.KeyValuePair{}} + if s.vecType != schemapb.DataType_SparseFloatVector { + schema = integration.ConstructSchema(collectionName, dim, s.autoID, fieldSchema1, fieldSchema2, fieldSchema3) + } else { + schema = integration.ConstructSchema(collectionName, dim, s.autoID, fieldSchema1, fieldSchema2, fieldSchema4) + } + marshaledSchema, err := proto.Marshal(schema) s.NoError(err) @@ -214,10 +220,13 @@ func (s *BulkInsertSuite) TestMultiFileTypes() { s.metricType = metric.L2 s.run() - // s.vecType = schemapb.DataType_SparseFloatVector - // s.indexType = indexparamcheck.IndexSparseWand - // s.metricType = metric.IP - // s.run() + // TODO: not support numpy for SparseFloatVector by now + if fileType != importutilv2.Numpy { + s.vecType = schemapb.DataType_SparseFloatVector + s.indexType = indexparamcheck.IndexSparseWand + s.metricType = metric.IP + s.run() + } } } diff --git a/tests/integration/import/util_test.go b/tests/integration/import/util_test.go index 1b4e3ac944..6987ffc355 100644 --- a/tests/integration/import/util_test.go +++ b/tests/integration/import/util_test.go @@ -109,87 +109,60 @@ func GenerateNumpyFiles(cm storage.ChunkManager, schema *schemapb.CollectionSche path := fmt.Sprintf("%s/%s.npy", cm.RootPath(), field.GetName()) fieldID := field.GetFieldID() + fieldData := insertData.Data[fieldID] dType := field.GetDataType() switch dType { - case schemapb.DataType_Bool: - data = insertData.Data[fieldID].(*storage.BoolFieldData).Data - case schemapb.DataType_Int8: - data = insertData.Data[fieldID].(*storage.Int8FieldData).Data - case schemapb.DataType_Int16: - data = insertData.Data[fieldID].(*storage.Int16FieldData).Data - case schemapb.DataType_Int32: - data = insertData.Data[fieldID].(*storage.Int32FieldData).Data - case schemapb.DataType_Int64: - data = insertData.Data[fieldID].(*storage.Int64FieldData).Data - case schemapb.DataType_Float: - data = insertData.Data[fieldID].(*storage.FloatFieldData).Data - case schemapb.DataType_Double: - data = insertData.Data[fieldID].(*storage.DoubleFieldData).Data - case schemapb.DataType_String, schemapb.DataType_VarChar: - data = insertData.Data[fieldID].(*storage.StringFieldData).Data case schemapb.DataType_BinaryVector: - vecData := insertData.Data[fieldID].(*storage.BinaryVectorFieldData).Data - if dim != insertData.Data[fieldID].(*storage.BinaryVectorFieldData).Dim { - panic(fmt.Sprintf("dim mis-match: %d, %d", dim, insertData.Data[fieldID].(*storage.BinaryVectorFieldData).Dim)) + rows := fieldData.GetRows().([]byte) + if dim != fieldData.(*storage.BinaryVectorFieldData).Dim { + panic(fmt.Sprintf("dim mis-match: %d, %d", dim, fieldData.(*storage.BinaryVectorFieldData).Dim)) } const rowBytes = dim / 8 - rows := len(vecData) / rowBytes - binVecData := make([][rowBytes]byte, 0, rows) - for i := 0; i < rows; i++ { - rowVec := [rowBytes]byte{} - copy(rowVec[:], vecData[i*rowBytes:(i+1)*rowBytes]) - binVecData = append(binVecData, rowVec) + chunked := lo.Chunk(rows, rowBytes) + chunkedRows := make([][rowBytes]byte, len(chunked)) + for i, innerSlice := range chunked { + copy(chunkedRows[i][:], innerSlice[:]) } - data = binVecData + data = chunkedRows case schemapb.DataType_FloatVector: - vecData := insertData.Data[fieldID].(*storage.FloatVectorFieldData).Data - if dim != insertData.Data[fieldID].(*storage.FloatVectorFieldData).Dim { - panic(fmt.Sprintf("dim mis-match: %d, %d", dim, insertData.Data[fieldID].(*storage.FloatVectorFieldData).Dim)) + rows := fieldData.GetRows().([]float32) + if dim != fieldData.(*storage.FloatVectorFieldData).Dim { + panic(fmt.Sprintf("dim mis-match: %d, %d", dim, fieldData.(*storage.FloatVectorFieldData).Dim)) } - rows := len(vecData) / dim - floatVecData := make([][dim]float32, 0, rows) - for i := 0; i < rows; i++ { - rowVec := [dim]float32{} - copy(rowVec[:], vecData[i*dim:(i+1)*dim]) - floatVecData = append(floatVecData, rowVec) + chunked := lo.Chunk(rows, dim) + chunkedRows := make([][dim]float32, len(chunked)) + for i, innerSlice := range chunked { + copy(chunkedRows[i][:], innerSlice[:]) } - data = floatVecData + data = chunkedRows case schemapb.DataType_Float16Vector: - vecData := insertData.Data[fieldID].(*storage.Float16VectorFieldData).Data - if dim != insertData.Data[fieldID].(*storage.Float16VectorFieldData).Dim { - panic(fmt.Sprintf("dim mis-match: %d, %d", dim, insertData.Data[fieldID].(*storage.Float16VectorFieldData).Dim)) + rows := insertData.Data[fieldID].GetRows().([]byte) + if dim != fieldData.(*storage.Float16VectorFieldData).Dim { + panic(fmt.Sprintf("dim mis-match: %d, %d", dim, fieldData.(*storage.Float16VectorFieldData).Dim)) } const rowBytes = dim * 2 - rows := len(vecData) / rowBytes - float16VecData := make([][rowBytes]byte, 0, rows) - for i := 0; i < rows; i++ { - rowVec := [rowBytes]byte{} - copy(rowVec[:], vecData[i*rowBytes:(i+1)*rowBytes]) - float16VecData = append(float16VecData, rowVec) + chunked := lo.Chunk(rows, rowBytes) + chunkedRows := make([][rowBytes]byte, len(chunked)) + for i, innerSlice := range chunked { + copy(chunkedRows[i][:], innerSlice[:]) } - data = float16VecData + data = chunkedRows case schemapb.DataType_BFloat16Vector: - vecData := insertData.Data[fieldID].(*storage.BFloat16VectorFieldData).Data - if dim != insertData.Data[fieldID].(*storage.BFloat16VectorFieldData).Dim { - panic(fmt.Sprintf("dim mis-match: %d, %d", dim, insertData.Data[fieldID].(*storage.BFloat16VectorFieldData).Dim)) + rows := insertData.Data[fieldID].GetRows().([]byte) + if dim != fieldData.(*storage.BFloat16VectorFieldData).Dim { + panic(fmt.Sprintf("dim mis-match: %d, %d", dim, fieldData.(*storage.BFloat16VectorFieldData).Dim)) } const rowBytes = dim * 2 - rows := len(vecData) / rowBytes - bfloat16VecData := make([][rowBytes]byte, 0, rows) - for i := 0; i < rows; i++ { - rowVec := [rowBytes]byte{} - copy(rowVec[:], vecData[i*rowBytes:(i+1)*rowBytes]) - bfloat16VecData = append(bfloat16VecData, rowVec) + chunked := lo.Chunk(rows, rowBytes) + chunkedRows := make([][rowBytes]byte, len(chunked)) + for i, innerSlice := range chunked { + copy(chunkedRows[i][:], innerSlice[:]) } - data = bfloat16VecData + data = chunkedRows case schemapb.DataType_SparseFloatVector: data = insertData.Data[fieldID].(*storage.SparseFloatVectorFieldData).GetContents() - case schemapb.DataType_JSON: - data = insertData.Data[fieldID].(*storage.JSONFieldData).Data - case schemapb.DataType_Array: - data = insertData.Data[fieldID].(*storage.ArrayFieldData).Data default: - panic(fmt.Sprintf("unsupported data type: %s", dType.String())) + data = insertData.Data[fieldID].GetRows() } err := writeFn(path, data) @@ -206,38 +179,9 @@ func GenerateNumpyFiles(cm storage.ChunkManager, schema *schemapb.CollectionSche func GenerateJSONFile(t *testing.T, filePath string, schema *schemapb.CollectionSchema, count int) { insertData, err := testutil.CreateInsertData(schema, count) assert.NoError(t, err) - rows := make([]map[string]any, 0, count) - fieldIDToField := lo.KeyBy(schema.GetFields(), func(field *schemapb.FieldSchema) int64 { - return field.GetFieldID() - }) - for i := 0; i < count; i++ { - data := make(map[int64]interface{}) - for fieldID, v := range insertData.Data { - dataType := fieldIDToField[fieldID].GetDataType() - if fieldIDToField[fieldID].GetAutoID() { - continue - } - switch dataType { - case schemapb.DataType_Array: - data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetIntData().GetData() - case schemapb.DataType_JSON: - data[fieldID] = string(v.GetRow(i).([]byte)) - case schemapb.DataType_BinaryVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector, schemapb.DataType_SparseFloatVector: - bytes := v.GetRow(i).([]byte) - ints := make([]int, 0, len(bytes)) - for _, b := range bytes { - ints = append(ints, int(b)) - } - data[fieldID] = ints - default: - data[fieldID] = v.GetRow(i) - } - } - row := lo.MapKeys(data, func(_ any, fieldID int64) string { - return fieldIDToField[fieldID].GetName() - }) - rows = append(rows, row) - } + + rows, err := testutil.CreateInsertDataRowsForJSON(schema, insertData) + assert.NoError(t, err) jsonBytes, err := json.Marshal(rows) assert.NoError(t, err) diff --git a/tests/python_client/common/bulk_insert_data.py b/tests/python_client/common/bulk_insert_data.py index f5d666be27..7a98a6c9f8 100644 --- a/tests/python_client/common/bulk_insert_data.py +++ b/tests/python_client/common/bulk_insert_data.py @@ -93,7 +93,7 @@ def gen_binary_vectors(nb, dim): return vectors -def gen_fp16_vectors(num, dim): +def gen_fp16_vectors(num, dim, for_json=False): """ generate float16 vector data raw_vectors : the vectors @@ -105,13 +105,16 @@ def gen_fp16_vectors(num, dim): for _ in range(num): raw_vector = [random.random() for _ in range(dim)] raw_vectors.append(raw_vector) - fp16_vector = np.array(raw_vector, dtype=np.float16).view(np.uint8).tolist() + if for_json: + fp16_vector = np.array(raw_vector, dtype=np.float16).tolist() + else: + fp16_vector = np.array(raw_vector, dtype=np.float16).view(np.uint8).tolist() fp16_vectors.append(fp16_vector) return raw_vectors, fp16_vectors -def gen_bf16_vectors(num, dim): +def gen_bf16_vectors(num, dim, for_json=False): """ generate brain float16 vector data raw_vectors : the vectors @@ -123,7 +126,10 @@ def gen_bf16_vectors(num, dim): for _ in range(num): raw_vector = [random.random() for _ in range(dim)] raw_vectors.append(raw_vector) - bf16_vector = np.array(jnp.array(raw_vector, dtype=jnp.bfloat16)).view(np.uint8).tolist() + if for_json: + bf16_vector = np.array(jnp.array(raw_vector, dtype=jnp.bfloat16)).tolist() + else: + bf16_vector = np.array(jnp.array(raw_vector, dtype=jnp.bfloat16)).view(np.uint8).tolist() bf16_vectors.append(bf16_vector) return raw_vectors, bf16_vectors @@ -603,9 +609,9 @@ def gen_dict_data_by_data_field(data_fields, rows, start=0, float_vector=True, d float_vector = False d[data_field] = gen_vectors(float_vector=float_vector, rows=1, dim=dim)[0] if "bf16" in data_field: - d[data_field] = gen_bf16_vectors(1, dim)[1][0] + d[data_field] = gen_bf16_vectors(1, dim, True)[1][0] if "fp16" in data_field: - d[data_field] = gen_fp16_vectors(1, dim)[1][0] + d[data_field] = gen_fp16_vectors(1, dim, True)[1][0] elif data_field == DataField.float_field: d[data_field] = random.random() elif data_field == DataField.double_field: