enhance: Cherry-pick following SparseFloatVector bulk insert PRs to Milvus2.4 (#33391)

Cherry pick from master
pr: #33064 #33101 #33187 #33259 #33224
#33064 Support readable JSON file import for
Float16/BFloat16/SparseFloat
  #33101 Store SparseFloatVector into parquet as JSON string
  #33187 Fix SparseFloatVector data parse error for parquet
  #33259 Fix SparseFloatVector data parse error for json
  #33224 Optimize bulk insert unittest

Signed-off-by: Cai Yudong <yudong.cai@zilliz.com>
This commit is contained in:
Cai Yudong 2024-05-30 10:31:45 +08:00 committed by GitHub
parent 23b0731a6a
commit 68e2d532d8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 780 additions and 529 deletions

View File

@ -84,7 +84,9 @@ func (i *InsertData) GetRowNum() int {
var rowNum int
for _, data := range i.Data {
rowNum = data.RowNum()
break
if rowNum > 0 {
break
}
}
return rowNum
}

View File

@ -24,7 +24,6 @@ import (
"strings"
"testing"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"golang.org/x/exp/slices"
@ -98,55 +97,16 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data
},
},
}
insertData, err := testutil.CreateInsertData(schema, suite.numRows)
suite.NoError(err)
rows := make([]map[string]any, 0, suite.numRows)
fieldIDToField := lo.KeyBy(schema.GetFields(), func(field *schemapb.FieldSchema) int64 {
return field.GetFieldID()
})
for i := 0; i < insertData.GetRowNum(); i++ {
data := make(map[int64]interface{})
for fieldID, v := range insertData.Data {
field := fieldIDToField[fieldID]
dataType := field.GetDataType()
elemType := field.GetElementType()
switch dataType {
case schemapb.DataType_Array:
switch elemType {
case schemapb.DataType_Bool:
data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetBoolData().GetData()
case schemapb.DataType_Int8, schemapb.DataType_Int16, schemapb.DataType_Int32:
data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetIntData().GetData()
case schemapb.DataType_Int64:
data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetLongData().GetData()
case schemapb.DataType_Float:
data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetFloatData().GetData()
case schemapb.DataType_Double:
data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetDoubleData().GetData()
case schemapb.DataType_String:
data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetStringData().GetData()
}
case schemapb.DataType_JSON:
data[fieldID] = string(v.GetRow(i).([]byte))
case schemapb.DataType_BinaryVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector, schemapb.DataType_SparseFloatVector:
bytes := v.GetRow(i).([]byte)
ints := make([]int, 0, len(bytes))
for _, b := range bytes {
ints = append(ints, int(b))
}
data[fieldID] = ints
default:
data[fieldID] = v.GetRow(i)
}
}
row := lo.MapKeys(data, func(_ any, fieldID int64) string {
return fieldIDToField[fieldID].GetName()
})
rows = append(rows, row)
}
rows, err := testutil.CreateInsertDataRowsForJSON(schema, insertData)
suite.NoError(err)
jsonBytes, err := json.Marshal(rows)
suite.NoError(err)
type mockReader struct {
io.Reader
io.Closer

View File

@ -305,46 +305,58 @@ func (r *rowParser) parseEntity(fieldID int64, obj any) (any, error) {
vec[i] = float32(num)
}
return vec, nil
case schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector:
case schemapb.DataType_Float16Vector:
// parse float string to Float16 bytes
arr, ok := obj.([]interface{})
if !ok {
return nil, r.wrapTypeError(obj, fieldID)
}
if len(arr) != r.dim*2 {
return nil, r.wrapDimError(len(arr)/2, fieldID)
if len(arr) != r.dim {
return nil, r.wrapDimError(len(arr), fieldID)
}
vec := make([]byte, len(arr))
vec := make([]byte, len(arr)*2)
for i := 0; i < len(arr); i++ {
value, ok := arr[i].(json.Number)
if !ok {
return nil, r.wrapTypeError(arr[i], fieldID)
}
num, err := strconv.ParseUint(value.String(), 0, 8)
num, err := strconv.ParseFloat(value.String(), 32)
if err != nil {
return nil, err
}
vec[i] = byte(num)
copy(vec[i*2:], typeutil.Float32ToFloat16Bytes(float32(num)))
}
return vec, nil
case schemapb.DataType_BFloat16Vector:
// parse float string to BFloat16 bytes
arr, ok := obj.([]interface{})
if !ok {
return nil, r.wrapTypeError(obj, fieldID)
}
if len(arr) != r.dim {
return nil, r.wrapDimError(len(arr), fieldID)
}
vec := make([]byte, len(arr)*2)
for i := 0; i < len(arr); i++ {
value, ok := arr[i].(json.Number)
if !ok {
return nil, r.wrapTypeError(arr[i], fieldID)
}
num, err := strconv.ParseFloat(value.String(), 32)
if err != nil {
return nil, err
}
copy(vec[i*2:], typeutil.Float32ToBFloat16Bytes(float32(num)))
}
return vec, nil
case schemapb.DataType_SparseFloatVector:
arr, ok := obj.([]interface{})
arr, ok := obj.(map[string]interface{})
if !ok {
return nil, r.wrapTypeError(obj, fieldID)
}
if len(arr)%8 != 0 {
return nil, r.wrapDimError(len(arr), fieldID)
}
vec := make([]byte, len(arr))
for i := 0; i < len(arr); i++ {
value, ok := arr[i].(json.Number)
if !ok {
return nil, r.wrapTypeError(arr[i], fieldID)
}
num, err := strconv.ParseUint(value.String(), 0, 8)
if err != nil {
return nil, err
}
vec[i] = byte(num)
vec, err := typeutil.CreateSparseFloatRowFromMap(arr)
if err != nil {
return nil, err
}
return vec, nil
case schemapb.DataType_String, schemapb.DataType_VarChar:

View File

@ -128,60 +128,54 @@ func (suite *ReaderSuite) run(dt schemapb.DataType) {
io.ReaderAt
io.Seeker
}
var data interface{}
for fieldID, fieldData := range insertData.Data {
dataType := fieldIDToField[fieldID].GetDataType()
rowNum := fieldData.RowNum()
switch dataType {
case schemapb.DataType_JSON:
jsonStrs := make([]string, 0, fieldData.RowNum())
for i := 0; i < fieldData.RowNum(); i++ {
jsonStrs := make([]string, 0, rowNum)
for i := 0; i < rowNum; i++ {
row := fieldData.GetRow(i)
jsonStrs = append(jsonStrs, string(row.([]byte)))
}
reader, err := CreateReader(jsonStrs)
suite.NoError(err)
cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{
Reader: reader,
}, nil)
data = jsonStrs
case schemapb.DataType_BinaryVector:
chunked := lo.Chunk(insertData.Data[fieldID].GetRows().([]byte), dim/8)
chunkedRows := make([][dim / 8]byte, len(chunked))
rows := fieldData.GetRows().([]byte)
const rowBytes = dim / 8
chunked := lo.Chunk(rows, rowBytes)
chunkedRows := make([][rowBytes]byte, len(chunked))
for i, innerSlice := range chunked {
copy(chunkedRows[i][:], innerSlice[:])
}
reader, err := CreateReader(chunkedRows)
suite.NoError(err)
cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{
Reader: reader,
}, nil)
data = chunkedRows
case schemapb.DataType_FloatVector:
chunked := lo.Chunk(insertData.Data[fieldID].GetRows().([]float32), dim)
rows := fieldData.GetRows().([]float32)
chunked := lo.Chunk(rows, dim)
chunkedRows := make([][dim]float32, len(chunked))
for i, innerSlice := range chunked {
copy(chunkedRows[i][:], innerSlice[:])
}
reader, err := CreateReader(chunkedRows)
suite.NoError(err)
cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{
Reader: reader,
}, nil)
data = chunkedRows
case schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector:
chunked := lo.Chunk(insertData.Data[fieldID].GetRows().([]byte), dim*2)
chunkedRows := make([][dim * 2]byte, len(chunked))
rows := fieldData.GetRows().([]byte)
const rowBytes = dim * 2
chunked := lo.Chunk(rows, rowBytes)
chunkedRows := make([][rowBytes]byte, len(chunked))
for i, innerSlice := range chunked {
copy(chunkedRows[i][:], innerSlice[:])
}
reader, err := CreateReader(chunkedRows)
suite.NoError(err)
cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{
Reader: reader,
}, nil)
data = chunkedRows
default:
reader, err := CreateReader(insertData.Data[fieldID].GetRows())
suite.NoError(err)
cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{
Reader: reader,
}, nil)
data = fieldData.GetRows()
}
reader, err := CreateReader(data)
suite.NoError(err)
cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{
Reader: reader,
}, nil)
}
reader, err := NewReader(context.Background(), cm, schema, lo.Values(files), math.MaxInt)
@ -268,59 +262,54 @@ func (suite *ReaderSuite) failRun(dt schemapb.DataType, isDynamic bool) {
io.ReaderAt
io.Seeker
}
var data interface{}
for fieldID, fieldData := range insertData.Data {
dataType := fieldIDToField[fieldID].GetDataType()
if dataType == schemapb.DataType_JSON {
jsonStrs := make([]string, 0, fieldData.RowNum())
for i := 0; i < fieldData.RowNum(); i++ {
rowNum := fieldData.RowNum()
switch dataType {
case schemapb.DataType_JSON:
jsonStrs := make([]string, 0, rowNum)
for i := 0; i < rowNum; i++ {
row := fieldData.GetRow(i)
jsonStrs = append(jsonStrs, string(row.([]byte)))
}
reader, err := CreateReader(jsonStrs)
suite.NoError(err)
cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{
Reader: reader,
}, nil)
} else if dataType == schemapb.DataType_FloatVector {
chunked := lo.Chunk(insertData.Data[fieldID].GetRows().([]float32), dim)
data = jsonStrs
case schemapb.DataType_BinaryVector:
rows := fieldData.GetRows().([]byte)
const rowBytes = dim / 8
chunked := lo.Chunk(rows, rowBytes)
chunkedRows := make([][rowBytes]byte, len(chunked))
for i, innerSlice := range chunked {
copy(chunkedRows[i][:], innerSlice[:])
}
data = chunkedRows
case schemapb.DataType_FloatVector:
rows := fieldData.GetRows().([]float32)
chunked := lo.Chunk(rows, dim)
chunkedRows := make([][dim]float32, len(chunked))
for i, innerSlice := range chunked {
copy(chunkedRows[i][:], innerSlice[:])
}
reader, err := CreateReader(chunkedRows)
suite.NoError(err)
cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{
Reader: reader,
}, nil)
} else if dataType == schemapb.DataType_Float16Vector || dataType == schemapb.DataType_BFloat16Vector {
chunked := lo.Chunk(insertData.Data[fieldID].GetRows().([]byte), dim*2)
chunkedRows := make([][dim * 2]byte, len(chunked))
data = chunkedRows
case schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector:
rows := fieldData.GetRows().([]byte)
const rowBytes = dim * 2
chunked := lo.Chunk(rows, rowBytes)
chunkedRows := make([][rowBytes]byte, len(chunked))
for i, innerSlice := range chunked {
copy(chunkedRows[i][:], innerSlice[:])
}
reader, err := CreateReader(chunkedRows)
suite.NoError(err)
cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{
Reader: reader,
}, nil)
} else if dataType == schemapb.DataType_BinaryVector {
chunked := lo.Chunk(insertData.Data[fieldID].GetRows().([]byte), dim/8)
chunkedRows := make([][dim / 8]byte, len(chunked))
for i, innerSlice := range chunked {
copy(chunkedRows[i][:], innerSlice[:])
}
reader, err := CreateReader(chunkedRows)
suite.NoError(err)
cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{
Reader: reader,
}, nil)
} else {
reader, err := CreateReader(insertData.Data[fieldID].GetRows())
suite.NoError(err)
cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{
Reader: reader,
}, nil)
data = chunkedRows
default:
data = fieldData.GetRows()
}
reader, err := CreateReader(data)
suite.NoError(err)
cm.EXPECT().Reader(mock.Anything, files[fieldID]).Return(&mockReader{
Reader: reader,
}, nil)
}
reader, err := NewReader(context.Background(), cm, schema, lo.Values(files), math.MaxInt)

View File

@ -97,31 +97,7 @@ func (c *FieldReader) Next(count int64) (any, error) {
case schemapb.DataType_VarChar, schemapb.DataType_String:
return ReadStringData(c, count)
case schemapb.DataType_JSON:
// JSON field read data from string array Parquet
data, err := ReadStringData(c, count)
if err != nil {
return nil, err
}
if data == nil {
return nil, nil
}
byteArr := make([][]byte, 0)
for _, str := range data.([]string) {
var dummy interface{}
err = json.Unmarshal([]byte(str), &dummy)
if err != nil {
return nil, err
}
if c.field.GetIsDynamic() {
var dummy2 map[string]interface{}
err = json.Unmarshal([]byte(str), &dummy2)
if err != nil {
return nil, err
}
}
byteArr = append(byteArr, []byte(str))
}
return byteArr, nil
return ReadJSONData(c, count)
case schemapb.DataType_BinaryVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector:
return ReadBinaryData(c, count)
case schemapb.DataType_FloatVector:
@ -135,152 +111,9 @@ func (c *FieldReader) Next(count int64) (any, error) {
vectors := lo.Flatten(arrayData.([][]float32))
return vectors, nil
case schemapb.DataType_SparseFloatVector:
return ReadBinaryDataForSparseFloatVector(c, count)
return ReadSparseFloatVectorData(c, count)
case schemapb.DataType_Array:
data := make([]*schemapb.ScalarField, 0, count)
elementType := c.field.GetElementType()
switch elementType {
case schemapb.DataType_Bool:
boolArray, err := ReadBoolArrayData(c, count)
if err != nil {
return nil, err
}
if boolArray == nil {
return nil, nil
}
for _, elementArray := range boolArray.([][]bool) {
data = append(data, &schemapb.ScalarField{
Data: &schemapb.ScalarField_BoolData{
BoolData: &schemapb.BoolArray{
Data: elementArray,
},
},
})
}
case schemapb.DataType_Int8:
int8Array, err := ReadIntegerOrFloatArrayData[int32](c, count)
if err != nil {
return nil, err
}
if int8Array == nil {
return nil, nil
}
for _, elementArray := range int8Array.([][]int32) {
data = append(data, &schemapb.ScalarField{
Data: &schemapb.ScalarField_IntData{
IntData: &schemapb.IntArray{
Data: elementArray,
},
},
})
}
case schemapb.DataType_Int16:
int16Array, err := ReadIntegerOrFloatArrayData[int32](c, count)
if err != nil {
return nil, err
}
if int16Array == nil {
return nil, nil
}
for _, elementArray := range int16Array.([][]int32) {
data = append(data, &schemapb.ScalarField{
Data: &schemapb.ScalarField_IntData{
IntData: &schemapb.IntArray{
Data: elementArray,
},
},
})
}
case schemapb.DataType_Int32:
int32Array, err := ReadIntegerOrFloatArrayData[int32](c, count)
if err != nil {
return nil, err
}
if int32Array == nil {
return nil, nil
}
for _, elementArray := range int32Array.([][]int32) {
data = append(data, &schemapb.ScalarField{
Data: &schemapb.ScalarField_IntData{
IntData: &schemapb.IntArray{
Data: elementArray,
},
},
})
}
case schemapb.DataType_Int64:
int64Array, err := ReadIntegerOrFloatArrayData[int64](c, count)
if err != nil {
return nil, err
}
if int64Array == nil {
return nil, nil
}
for _, elementArray := range int64Array.([][]int64) {
data = append(data, &schemapb.ScalarField{
Data: &schemapb.ScalarField_LongData{
LongData: &schemapb.LongArray{
Data: elementArray,
},
},
})
}
case schemapb.DataType_Float:
float32Array, err := ReadIntegerOrFloatArrayData[float32](c, count)
if err != nil {
return nil, err
}
if float32Array == nil {
return nil, nil
}
for _, elementArray := range float32Array.([][]float32) {
data = append(data, &schemapb.ScalarField{
Data: &schemapb.ScalarField_FloatData{
FloatData: &schemapb.FloatArray{
Data: elementArray,
},
},
})
}
case schemapb.DataType_Double:
float64Array, err := ReadIntegerOrFloatArrayData[float64](c, count)
if err != nil {
return nil, err
}
if float64Array == nil {
return nil, nil
}
for _, elementArray := range float64Array.([][]float64) {
data = append(data, &schemapb.ScalarField{
Data: &schemapb.ScalarField_DoubleData{
DoubleData: &schemapb.DoubleArray{
Data: elementArray,
},
},
})
}
case schemapb.DataType_VarChar, schemapb.DataType_String:
stringArray, err := ReadStringArrayData(c, count)
if err != nil {
return nil, err
}
if stringArray == nil {
return nil, nil
}
for _, elementArray := range stringArray.([][]string) {
data = append(data, &schemapb.ScalarField{
Data: &schemapb.ScalarField_StringData{
StringData: &schemapb.StringArray{
Data: elementArray,
},
},
})
}
default:
return nil, merr.WrapErrImportFailed(fmt.Sprintf("unsupported data type '%s' for array field '%s'",
elementType.String(), c.field.GetName()))
}
return data, nil
return ReadArrayData(c, count)
default:
return nil, merr.WrapErrImportFailed(fmt.Sprintf("unsupported data type '%s' for field '%s'",
c.field.GetDataType().String(), c.field.GetName()))
@ -382,6 +215,34 @@ func ReadStringData(pcr *FieldReader, count int64) (any, error) {
return data, nil
}
func ReadJSONData(pcr *FieldReader, count int64) (any, error) {
// JSON field read data from string array Parquet
data, err := ReadStringData(pcr, count)
if err != nil {
return nil, err
}
if data == nil {
return nil, nil
}
byteArr := make([][]byte, 0)
for _, str := range data.([]string) {
var dummy interface{}
err = json.Unmarshal([]byte(str), &dummy)
if err != nil {
return nil, err
}
if pcr.field.GetIsDynamic() {
var dummy2 map[string]interface{}
err = json.Unmarshal([]byte(str), &dummy2)
if err != nil {
return nil, err
}
}
byteArr = append(byteArr, []byte(str))
}
return byteArr, nil
}
func ReadBinaryData(pcr *FieldReader, count int64) (any, error) {
dataType := pcr.field.GetDataType()
chunked, err := pcr.columnReader.NextBatch(count)
@ -417,38 +278,32 @@ func ReadBinaryData(pcr *FieldReader, count int64) (any, error) {
return data, nil
}
func ReadBinaryDataForSparseFloatVector(pcr *FieldReader, count int64) (any, error) {
chunked, err := pcr.columnReader.NextBatch(count)
func ReadSparseFloatVectorData(pcr *FieldReader, count int64) (any, error) {
data, err := ReadStringData(pcr, count)
if err != nil {
return nil, err
}
data := make([][]byte, 0, count)
if data == nil {
return nil, nil
}
byteArr := make([][]byte, 0, count)
maxDim := uint32(0)
for _, chunk := range chunked.Chunks() {
listReader := chunk.(*array.List)
offsets := listReader.Offsets()
if !isVectorAligned(offsets, pcr.dim, schemapb.DataType_SparseFloatVector) {
return nil, merr.WrapErrImportFailed("%s not aligned", schemapb.DataType_SparseFloatVector.String())
for _, str := range data.([]string) {
rowVec, err := typeutil.CreateSparseFloatRowFromJSON([]byte(str))
if err != nil {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("Invalid JSON string for SparseFloatVector: '%s', err = %v", str, err))
}
uint8Reader, ok := listReader.ListValues().(*array.Uint8)
if !ok {
return nil, WrapTypeErr("binary", listReader.ListValues().DataType().Name(), pcr.field)
}
vecData := uint8Reader.Uint8Values()
for i := 1; i < len(offsets); i++ {
elemCount := int((offsets[i] - offsets[i-1]) / 8)
rowVec := vecData[offsets[i-1]:offsets[i]]
data = append(data, rowVec)
maxIdx := typeutil.SparseFloatRowIndexAt(rowVec, elemCount-1)
if maxIdx+1 > maxDim {
maxDim = maxIdx + 1
}
byteArr = append(byteArr, rowVec)
elemCount := len(rowVec) / 8
maxIdx := typeutil.SparseFloatRowIndexAt(rowVec, elemCount-1)
if maxIdx+1 > maxDim {
maxDim = maxIdx + 1
}
}
return &storage.SparseFloatVectorFieldData{
SparseFloatArray: schemapb.SparseFloatArray{
Dim: int64(maxDim),
Contents: data,
Contents: byteArr,
},
}, nil
}
@ -462,16 +317,6 @@ func checkVectorAlignWithDim(offsets []int32, dim int32) bool {
return true
}
func checkSparseFloatVectorAlign(offsets []int32) bool {
// index: 4 bytes, value: 4 bytes
for i := 1; i < len(offsets); i++ {
if (offsets[i]-offsets[i-1])%8 != 0 {
return false
}
}
return true
}
func isVectorAligned(offsets []int32, dim int, dataType schemapb.DataType) bool {
if len(offsets) < 1 {
return false
@ -484,7 +329,8 @@ func isVectorAligned(offsets []int32, dim int, dataType schemapb.DataType) bool
case schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector:
return checkVectorAlignWithDim(offsets, int32(dim*2))
case schemapb.DataType_SparseFloatVector:
return checkSparseFloatVectorAlign(offsets)
// JSON format, skip alignment check
return true
default:
return false
}
@ -620,3 +466,150 @@ func ReadStringArrayData(pcr *FieldReader, count int64) (any, error) {
}
return data, nil
}
func ReadArrayData(pcr *FieldReader, count int64) (any, error) {
data := make([]*schemapb.ScalarField, 0, count)
elementType := pcr.field.GetElementType()
switch elementType {
case schemapb.DataType_Bool:
boolArray, err := ReadBoolArrayData(pcr, count)
if err != nil {
return nil, err
}
if boolArray == nil {
return nil, nil
}
for _, elementArray := range boolArray.([][]bool) {
data = append(data, &schemapb.ScalarField{
Data: &schemapb.ScalarField_BoolData{
BoolData: &schemapb.BoolArray{
Data: elementArray,
},
},
})
}
case schemapb.DataType_Int8:
int8Array, err := ReadIntegerOrFloatArrayData[int32](pcr, count)
if err != nil {
return nil, err
}
if int8Array == nil {
return nil, nil
}
for _, elementArray := range int8Array.([][]int32) {
data = append(data, &schemapb.ScalarField{
Data: &schemapb.ScalarField_IntData{
IntData: &schemapb.IntArray{
Data: elementArray,
},
},
})
}
case schemapb.DataType_Int16:
int16Array, err := ReadIntegerOrFloatArrayData[int32](pcr, count)
if err != nil {
return nil, err
}
if int16Array == nil {
return nil, nil
}
for _, elementArray := range int16Array.([][]int32) {
data = append(data, &schemapb.ScalarField{
Data: &schemapb.ScalarField_IntData{
IntData: &schemapb.IntArray{
Data: elementArray,
},
},
})
}
case schemapb.DataType_Int32:
int32Array, err := ReadIntegerOrFloatArrayData[int32](pcr, count)
if err != nil {
return nil, err
}
if int32Array == nil {
return nil, nil
}
for _, elementArray := range int32Array.([][]int32) {
data = append(data, &schemapb.ScalarField{
Data: &schemapb.ScalarField_IntData{
IntData: &schemapb.IntArray{
Data: elementArray,
},
},
})
}
case schemapb.DataType_Int64:
int64Array, err := ReadIntegerOrFloatArrayData[int64](pcr, count)
if err != nil {
return nil, err
}
if int64Array == nil {
return nil, nil
}
for _, elementArray := range int64Array.([][]int64) {
data = append(data, &schemapb.ScalarField{
Data: &schemapb.ScalarField_LongData{
LongData: &schemapb.LongArray{
Data: elementArray,
},
},
})
}
case schemapb.DataType_Float:
float32Array, err := ReadIntegerOrFloatArrayData[float32](pcr, count)
if err != nil {
return nil, err
}
if float32Array == nil {
return nil, nil
}
for _, elementArray := range float32Array.([][]float32) {
data = append(data, &schemapb.ScalarField{
Data: &schemapb.ScalarField_FloatData{
FloatData: &schemapb.FloatArray{
Data: elementArray,
},
},
})
}
case schemapb.DataType_Double:
float64Array, err := ReadIntegerOrFloatArrayData[float64](pcr, count)
if err != nil {
return nil, err
}
if float64Array == nil {
return nil, nil
}
for _, elementArray := range float64Array.([][]float64) {
data = append(data, &schemapb.ScalarField{
Data: &schemapb.ScalarField_DoubleData{
DoubleData: &schemapb.DoubleArray{
Data: elementArray,
},
},
})
}
case schemapb.DataType_VarChar, schemapb.DataType_String:
stringArray, err := ReadStringArrayData(pcr, count)
if err != nil {
return nil, err
}
if stringArray == nil {
return nil, nil
}
for _, elementArray := range stringArray.([][]string) {
data = append(data, &schemapb.ScalarField{
Data: &schemapb.ScalarField_StringData{
StringData: &schemapb.StringArray{
Data: elementArray,
},
},
})
}
default:
return nil, merr.WrapErrImportFailed(fmt.Sprintf("unsupported data type '%s' for array field '%s'",
elementType.String(), pcr.field.GetName()))
}
return data, nil
}

View File

@ -183,7 +183,7 @@ func convertToArrowDataType(field *schemapb.FieldSchema, isArray bool) (arrow.Da
Nullable: true,
Metadata: arrow.Metadata{},
}), nil
case schemapb.DataType_BinaryVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector, schemapb.DataType_SparseFloatVector:
case schemapb.DataType_BinaryVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector:
return arrow.ListOfField(arrow.Field{
Name: "item",
Type: &arrow.Uint8Type{},
@ -197,6 +197,8 @@ func convertToArrowDataType(field *schemapb.FieldSchema, isArray bool) (arrow.Da
Nullable: true,
Metadata: arrow.Metadata{},
}), nil
case schemapb.DataType_SparseFloatVector:
return &arrow.StringType{}, nil
default:
return nil, merr.WrapErrParameterInvalidMsg("unsupported data type %v", dataType.String())
}

View File

@ -1,6 +1,7 @@
package testutil
import (
"encoding/json"
"fmt"
"math/rand"
"strconv"
@ -333,23 +334,24 @@ func BuildArrayData(schema *schemapb.CollectionSchema, insertData *storage.Inser
builder.AppendValues(offsets, valid)
columns = append(columns, builder.NewListArray())
case schemapb.DataType_SparseFloatVector:
sparseFloatVecData := make([]byte, 0)
builder := array.NewListBuilder(mem, &arrow.Uint8Type{})
builder := array.NewStringBuilder(mem)
contents := insertData.Data[fieldID].(*storage.SparseFloatVectorFieldData).GetContents()
rows := len(contents)
offsets := make([]int32, 0, rows)
valid := make([]bool, 0, rows)
currOffset := int32(0)
jsonBytesData := make([][]byte, 0)
for i := 0; i < rows; i++ {
rowVecData := contents[i]
sparseFloatVecData = append(sparseFloatVecData, rowVecData...)
offsets = append(offsets, currOffset)
currOffset = currOffset + int32(len(rowVecData))
valid = append(valid, true)
mapData := typeutil.SparseFloatBytesToMap(rowVecData)
// convert to JSON format
jsonBytes, err := json.Marshal(mapData)
if err != nil {
return nil, err
}
jsonBytesData = append(jsonBytesData, jsonBytes)
}
builder.ValueBuilder().(*array.Uint8Builder).AppendValues(sparseFloatVecData, nil)
builder.AppendValues(offsets, valid)
columns = append(columns, builder.NewListArray())
builder.AppendValues(lo.Map(jsonBytesData, func(bs []byte, _ int) string {
return string(bs)
}), nil)
columns = append(columns, builder.NewStringArray())
case schemapb.DataType_JSON:
builder := array.NewStringBuilder(mem)
jsonData := insertData.Data[fieldID].(*storage.JSONFieldData).Data
@ -482,3 +484,66 @@ func BuildArrayData(schema *schemapb.CollectionSchema, insertData *storage.Inser
}
return columns, nil
}
func CreateInsertDataRowsForJSON(schema *schemapb.CollectionSchema, insertData *storage.InsertData) ([]map[string]any, error) {
fieldIDToField := lo.KeyBy(schema.GetFields(), func(field *schemapb.FieldSchema) int64 {
return field.GetFieldID()
})
rowNum := insertData.GetRowNum()
rows := make([]map[string]any, 0, rowNum)
for i := 0; i < rowNum; i++ {
data := make(map[int64]interface{})
for fieldID, v := range insertData.Data {
field := fieldIDToField[fieldID]
dataType := field.GetDataType()
elemType := field.GetElementType()
if field.GetAutoID() {
continue
}
switch dataType {
case schemapb.DataType_Array:
switch elemType {
case schemapb.DataType_Bool:
data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetBoolData().GetData()
case schemapb.DataType_Int8, schemapb.DataType_Int16, schemapb.DataType_Int32:
data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetIntData().GetData()
case schemapb.DataType_Int64:
data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetLongData().GetData()
case schemapb.DataType_Float:
data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetFloatData().GetData()
case schemapb.DataType_Double:
data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetDoubleData().GetData()
case schemapb.DataType_String:
data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetStringData().GetData()
}
case schemapb.DataType_JSON:
data[fieldID] = string(v.GetRow(i).([]byte))
case schemapb.DataType_BinaryVector:
bytes := v.GetRow(i).([]byte)
ints := make([]int, 0, len(bytes))
for _, b := range bytes {
ints = append(ints, int(b))
}
data[fieldID] = ints
case schemapb.DataType_Float16Vector:
bytes := v.GetRow(i).([]byte)
data[fieldID] = typeutil.Float16BytesToFloat32Vector(bytes)
case schemapb.DataType_BFloat16Vector:
bytes := v.GetRow(i).([]byte)
data[fieldID] = typeutil.BFloat16BytesToFloat32Vector(bytes)
case schemapb.DataType_SparseFloatVector:
bytes := v.GetRow(i).([]byte)
data[fieldID] = typeutil.SparseFloatBytesToMap(bytes)
default:
data[fieldID] = v.GetRow(i)
}
}
row := lo.MapKeys(data, func(_ any, fieldID int64) string {
return fieldIDToField[fieldID].GetName()
})
rows = append(rows, row)
}
return rows, nil
}

View File

@ -248,27 +248,20 @@ func GenerateFloatVectors(numRows, dim int) []float32 {
func GenerateFloat16Vectors(numRows, dim int) []byte {
total := numRows * dim
ret := make([]byte, total*2)
ret := make([]byte, 0, total*2)
for i := 0; i < total; i++ {
v := float16.Fromfloat32(rand.Float32()).Bits()
binary.LittleEndian.PutUint16(ret[i*2:], v)
f := (rand.Float32() - 0.5) * 100
ret = append(ret, typeutil.Float32ToFloat16Bytes(f)...)
}
return ret
}
func GenerateBFloat16Vectors(numRows, dim int) []byte {
total := numRows * dim
ret16 := make([]uint16, 0, total)
ret := make([]byte, 0, total*2)
for i := 0; i < total; i++ {
f := rand.Float32()
bits := math.Float32bits(f)
bits >>= 16
bits &= 0x7FFF
ret16 = append(ret16, uint16(bits))
}
ret := make([]byte, len(ret16)*2)
for i, value := range ret16 {
binary.LittleEndian.PutUint16(ret[i*2:], value)
f := (rand.Float32() - 0.5) * 100
ret = append(ret, typeutil.Float32ToBFloat16Bytes(f)...)
}
return ret
}

View File

@ -23,6 +23,7 @@ import (
"reflect"
"github.com/golang/protobuf/proto"
"github.com/x448/float16"
"go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/common"
@ -115,3 +116,52 @@ func SliceRemoveDuplicate(a interface{}) (ret []interface{}) {
return ret
}
func Float32ToFloat16Bytes(f float32) []byte {
ret := make([]byte, 2)
common.Endian.PutUint16(ret[:], float16.Fromfloat32(f).Bits())
return ret
}
func Float16BytesToFloat32(b []byte) float32 {
return float16.Frombits(common.Endian.Uint16(b)).Float32()
}
func Float16BytesToFloat32Vector(b []byte) []float32 {
dim := len(b) / 2
vec := make([]float32, 0, dim)
for j := 0; j < dim; j++ {
vec = append(vec, Float16BytesToFloat32(b[j*2:]))
}
return vec
}
func Float32ToBFloat16Bytes(f float32) []byte {
ret := make([]byte, 2)
common.Endian.PutUint16(ret[:], uint16(math.Float32bits(f)>>16))
return ret
}
func BFloat16BytesToFloat32(b []byte) float32 {
return math.Float32frombits(uint32(common.Endian.Uint16(b)) << 16)
}
func BFloat16BytesToFloat32Vector(b []byte) []float32 {
dim := len(b) / 2
vec := make([]float32, 0, dim)
for j := 0; j < dim; j++ {
vec = append(vec, BFloat16BytesToFloat32(b[j*2:]))
}
return vec
}
func SparseFloatBytesToMap(b []byte) map[uint32]float32 {
elemCount := len(b) / 8
values := make(map[uint32]float32)
for j := 0; j < elemCount; j++ {
idx := common.Endian.Uint32(b[j*8:])
f := BytesToFloat32(b[j*8+4:])
values[idx] = f
}
return values
}

View File

@ -18,9 +18,13 @@ package typeutil
import (
"math"
"math/rand"
"testing"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/log"
)
func TestConversion(t *testing.T) {
@ -94,4 +98,24 @@ func TestConversion(t *testing.T) {
ret1 := SliceRemoveDuplicate(arr)
assert.Equal(t, 3, len(ret1))
})
t.Run("TestFloat16", func(t *testing.T) {
for i := 0; i < 100; i++ {
v := (rand.Float32() - 0.5) * 100
b := Float32ToFloat16Bytes(v)
v2 := Float16BytesToFloat32(b)
log.Info("float16", zap.Float32("v", v), zap.Float32("v2", v2))
assert.Less(t, math.Abs(float64(v2/v-1)), 0.001)
}
})
t.Run("TestBFloat16", func(t *testing.T) {
for i := 0; i < 100; i++ {
v := (rand.Float32() - 0.5) * 100
b := Float32ToBFloat16Bytes(v)
v2 := BFloat16BytesToFloat32(b)
log.Info("bfloat16", zap.Float32("v", v), zap.Float32("v2", v2))
assert.Less(t, math.Abs(float64(v2/v-1)), 0.01)
}
})
}

View File

@ -22,6 +22,7 @@ import (
"encoding/json"
"fmt"
"math"
"reflect"
"sort"
"strconv"
"unsafe"
@ -1505,58 +1506,10 @@ func SparseFloatRowSetAt(row []byte, pos int, idx uint32, value float32) {
binary.LittleEndian.PutUint32(row[pos*8+4:], math.Float32bits(value))
}
func CreateSparseFloatRow(indices []uint32, values []float32) []byte {
row := make([]byte, len(indices)*8)
for i := 0; i < len(indices); i++ {
SparseFloatRowSetAt(row, i, indices[i], values[i])
}
return row
}
func SortSparseFloatRow(indices []uint32, values []float32) ([]uint32, []float32) {
elemCount := len(indices)
type sparseFloatVectorJSONRepresentation struct {
Indices []uint32 `json:"indices"`
Values []float32 `json:"values"`
}
// accepted format:
// - {"indices": [1, 2, 3], "values": [0.1, 0.2, 0.3]}
// - {"1": 0.1, "2": 0.2, "3": 0.3}
//
// we don't require the indices to be sorted from user input, but the returned
// byte representation must have indices sorted
func CreateSparseFloatRowFromJSON(input []byte) ([]byte, error) {
var indices []uint32
var values []float32
var vec sparseFloatVectorJSONRepresentation
decoder := json.NewDecoder(bytes.NewReader(input))
decoder.DisallowUnknownFields()
err := decoder.Decode(&vec)
if err == nil {
if len(vec.Indices) != len(vec.Values) {
return nil, fmt.Errorf("indices and values length mismatch")
}
if len(vec.Indices) == 0 {
return nil, fmt.Errorf("empty indices/values in JSON input")
}
indices = vec.Indices
values = vec.Values
} else {
var vec2 map[uint32]float32
decoder = json.NewDecoder(bytes.NewReader(input))
decoder.DisallowUnknownFields()
err = decoder.Decode(&vec2)
if err != nil {
return nil, fmt.Errorf("failed to parse JSON input: %v", err)
}
for idx, val := range vec2 {
indices = append(indices, idx)
values = append(values, val)
}
}
indexOrder := make([]int, len(indices))
indexOrder := make([]int, elemCount)
for i := range indexOrder {
indexOrder[i] = i
}
@ -1565,13 +1518,109 @@ func CreateSparseFloatRowFromJSON(input []byte) ([]byte, error) {
return indices[indexOrder[i]] < indices[indexOrder[j]]
})
sortedIndices := make([]uint32, len(indices))
sortedValues := make([]float32, len(values))
sortedIndices := make([]uint32, elemCount)
sortedValues := make([]float32, elemCount)
for i, index := range indexOrder {
sortedIndices[i] = indices[index]
sortedValues[i] = values[index]
}
return sortedIndices, sortedValues
}
func CreateSparseFloatRow(indices []uint32, values []float32) []byte {
row := make([]byte, len(indices)*8)
for i := 0; i < len(indices); i++ {
SparseFloatRowSetAt(row, i, indices[i], values[i])
}
return row
}
// accepted format:
// - {"indices": [1, 2, 3], "values": [0.1, 0.2, 0.3]} # format1
// - {"1": 0.1, "2": 0.2, "3": 0.3} # format2
//
// we don't require the indices to be sorted from user input, but the returned
// byte representation must have indices sorted
func CreateSparseFloatRowFromMap(input map[string]interface{}) ([]byte, error) {
var indices []uint32
var values []float32
if len(input) == 0 {
return nil, fmt.Errorf("empty JSON input")
}
jsonIndices, ok1 := input["indices"].([]interface{})
jsonValues, ok2 := input["values"].([]interface{})
if ok1 && ok2 {
// try format1
for _, idx := range jsonIndices {
if i1, s1 := idx.(int); s1 {
indices = append(indices, uint32(i1))
} else if i2, s2 := idx.(float64); s2 && i2 == float64(int(i2)) {
indices = append(indices, uint32(i2))
} else if i3, s3 := idx.(json.Number); s3 {
if num, err := strconv.ParseUint(i3.String(), 0, 32); err == nil {
indices = append(indices, uint32(num))
} else {
return nil, err
}
} else {
return nil, fmt.Errorf("invalid indicies type: %v(%s)", idx, reflect.TypeOf(idx))
}
}
for _, val := range jsonValues {
if v1, s1 := val.(int); s1 {
values = append(values, float32(v1))
} else if v2, s2 := val.(float64); s2 {
values = append(values, float32(v2))
} else if v3, s3 := val.(json.Number); s3 {
if num, err := strconv.ParseFloat(v3.String(), 32); err == nil {
values = append(values, float32(num))
} else {
return nil, err
}
} else {
return nil, fmt.Errorf("invalid values type: %v(%s)", val, reflect.TypeOf(val))
}
}
} else if !ok1 && !ok2 {
// try format2
for k, v := range input {
idx, err := strconv.ParseUint(k, 0, 32)
if err != nil {
return nil, err
}
var val float64
val, ok := v.(float64)
if !ok {
num, ok := v.(json.Number)
if !ok {
return nil, fmt.Errorf("invalid value type in JSON: %s", reflect.TypeOf(v))
}
val, err = strconv.ParseFloat(num.String(), 32)
if err != nil {
return nil, err
}
}
indices = append(indices, uint32(idx))
values = append(values, float32(val))
}
} else {
return nil, fmt.Errorf("invalid JSON input")
}
if len(indices) != len(values) {
return nil, fmt.Errorf("indices and values length mismatch")
}
if len(indices) == 0 {
return nil, fmt.Errorf("empty indices/values in JSON input")
}
sortedIndices, sortedValues := SortSparseFloatRow(indices, values)
row := CreateSparseFloatRow(sortedIndices, sortedValues)
if err := ValidateSparseFloatRows(row); err != nil {
return nil, err
@ -1579,6 +1628,17 @@ func CreateSparseFloatRowFromJSON(input []byte) ([]byte, error) {
return row, nil
}
func CreateSparseFloatRowFromJSON(input []byte) ([]byte, error) {
var vec map[string]interface{}
decoder := json.NewDecoder(bytes.NewReader(input))
decoder.DisallowUnknownFields()
err := decoder.Decode(&vec)
if err != nil {
return nil, err
}
return CreateSparseFloatRowFromMap(vec)
}
// dim of a sparse float vector is the maximum/last index + 1
func SparseFloatRowDim(row []byte) int64 {
if len(row) == 0 {

View File

@ -2119,6 +2119,122 @@ func TestValidateSparseFloatRows(t *testing.T) {
}
func TestParseJsonSparseFloatRow(t *testing.T) {
t.Run("valid row 1", func(t *testing.T) {
row := map[string]interface{}{"indices": []interface{}{1, 3, 5}, "values": []interface{}{1.0, 2.0, 3.0}}
res, err := CreateSparseFloatRowFromMap(row)
assert.NoError(t, err)
assert.Equal(t, CreateSparseFloatRow([]uint32{1, 3, 5}, []float32{1.0, 2.0, 3.0}), res)
})
t.Run("valid row 2", func(t *testing.T) {
row := map[string]interface{}{"indices": []interface{}{3, 1, 5}, "values": []interface{}{1.0, 2.0, 3.0}}
res, err := CreateSparseFloatRowFromMap(row)
assert.NoError(t, err)
assert.Equal(t, CreateSparseFloatRow([]uint32{1, 3, 5}, []float32{2.0, 1.0, 3.0}), res)
})
t.Run("valid row 3", func(t *testing.T) {
row := map[string]interface{}{"indices": []interface{}{1, 3, 5}, "values": []interface{}{1, 2, 3}}
res, err := CreateSparseFloatRowFromMap(row)
assert.NoError(t, err)
assert.Equal(t, CreateSparseFloatRow([]uint32{1, 3, 5}, []float32{1.0, 2.0, 3.0}), res)
})
t.Run("valid row 3", func(t *testing.T) {
row := map[string]interface{}{"indices": []interface{}{math.MaxInt32 + 1}, "values": []interface{}{1.0}}
res, err := CreateSparseFloatRowFromMap(row)
assert.NoError(t, err)
assert.Equal(t, CreateSparseFloatRow([]uint32{math.MaxInt32 + 1}, []float32{1.0}), res)
})
t.Run("invalid row 1", func(t *testing.T) {
row := map[string]interface{}{"indices": []interface{}{1, 3, 5}, "values": []interface{}{1.0, 2.0}}
_, err := CreateSparseFloatRowFromMap(row)
assert.Error(t, err)
})
t.Run("invalid row 2", func(t *testing.T) {
row := map[string]interface{}{"indices": []interface{}{1}, "values": []interface{}{1.0, 2.0}}
_, err := CreateSparseFloatRowFromMap(row)
assert.Error(t, err)
})
t.Run("invalid row 3", func(t *testing.T) {
row := map[string]interface{}{"indices": []interface{}{}, "values": []interface{}{}}
_, err := CreateSparseFloatRowFromMap(row)
assert.Error(t, err)
})
t.Run("invalid row 4", func(t *testing.T) {
row := map[string]interface{}{"indices": []interface{}{3}, "values": []interface{}{-0.2}}
_, err := CreateSparseFloatRowFromMap(row)
assert.Error(t, err)
})
t.Run("invalid row 5", func(t *testing.T) {
row := map[string]interface{}{"indices": []interface{}{3.1}, "values": []interface{}{0.2}}
_, err := CreateSparseFloatRowFromMap(row)
assert.Error(t, err)
})
t.Run("valid dict row 1", func(t *testing.T) {
row := map[string]interface{}{"1": 1.0, "3": 2.0, "5": 3.0}
res, err := CreateSparseFloatRowFromMap(row)
assert.NoError(t, err)
assert.Equal(t, CreateSparseFloatRow([]uint32{1, 3, 5}, []float32{1.0, 2.0, 3.0}), res)
})
t.Run("valid dict row 2", func(t *testing.T) {
row := map[string]interface{}{"3": 1.0, "1": 2.0, "5": 3.0}
res, err := CreateSparseFloatRowFromMap(row)
assert.NoError(t, err)
assert.Equal(t, CreateSparseFloatRow([]uint32{1, 3, 5}, []float32{2.0, 1.0, 3.0}), res)
})
t.Run("invalid dict row 1", func(t *testing.T) {
row := map[string]interface{}{"a": 1.0, "3": 2.0, "5": 3.0}
_, err := CreateSparseFloatRowFromMap(row)
assert.Error(t, err)
})
t.Run("invalid dict row 2", func(t *testing.T) {
row := map[string]interface{}{"1": "a", "3": 2.0, "5": 3.0}
_, err := CreateSparseFloatRowFromMap(row)
assert.Error(t, err)
})
t.Run("invalid dict row 3", func(t *testing.T) {
row := map[string]interface{}{"1": "1.0", "3": 2.0, "5": 3.0}
_, err := CreateSparseFloatRowFromMap(row)
assert.Error(t, err)
})
t.Run("invalid dict row 4", func(t *testing.T) {
row := map[string]interface{}{"-1": 1.0, "3": 2.0, "5": 3.0}
_, err := CreateSparseFloatRowFromMap(row)
assert.Error(t, err)
})
t.Run("invalid dict row 5", func(t *testing.T) {
row := map[string]interface{}{"1": -1.0, "3": 2.0, "5": 3.0}
_, err := CreateSparseFloatRowFromMap(row)
assert.Error(t, err)
})
t.Run("invalid dict row 6", func(t *testing.T) {
row := map[string]interface{}{}
_, err := CreateSparseFloatRowFromMap(row)
assert.Error(t, err)
})
t.Run("invalid dict row 7", func(t *testing.T) {
row := map[string]interface{}{"1.1": 1.0, "3": 2.0, "5": 3.0}
_, err := CreateSparseFloatRowFromMap(row)
assert.Error(t, err)
})
}
func TestParseJsonSparseFloatRowBytes(t *testing.T) {
t.Run("valid row 1", func(t *testing.T) {
row := []byte(`{"indices":[1,3,5],"values":[1.0,2.0,3.0]}`)
res, err := CreateSparseFloatRowFromJSON(row)
@ -2133,6 +2249,20 @@ func TestParseJsonSparseFloatRow(t *testing.T) {
assert.Equal(t, CreateSparseFloatRow([]uint32{1, 3, 5}, []float32{2.0, 1.0, 3.0}), res)
})
t.Run("valid row 3", func(t *testing.T) {
row := []byte(`{"indices":[1, 3, 5], "values":[1, 2, 3]}`)
res, err := CreateSparseFloatRowFromJSON(row)
assert.NoError(t, err)
assert.Equal(t, CreateSparseFloatRow([]uint32{1, 3, 5}, []float32{1.0, 2.0, 3.0}), res)
})
t.Run("valid row 3", func(t *testing.T) {
row := []byte(`{"indices":[2147483648], "values":[1.0]}`)
res, err := CreateSparseFloatRowFromJSON(row)
assert.NoError(t, err)
assert.Equal(t, CreateSparseFloatRow([]uint32{math.MaxInt32 + 1}, []float32{1.0}), res)
})
t.Run("invalid row 1", func(t *testing.T) {
row := []byte(`{"indices":[1,3,5],"values":[1.0,2.0,3.0`)
_, err := CreateSparseFloatRowFromJSON(row)
@ -2169,6 +2299,12 @@ func TestParseJsonSparseFloatRow(t *testing.T) {
assert.Error(t, err)
})
t.Run("invalid row 7", func(t *testing.T) {
row := []byte(`{"indices": []interface{}{3.1}, "values": []interface{}{0.2}}`)
_, err := CreateSparseFloatRowFromJSON(row)
assert.Error(t, err)
})
t.Run("valid dict row 1", func(t *testing.T) {
row := []byte(`{"1": 1.0, "3": 2.0, "5": 3.0}`)
res, err := CreateSparseFloatRowFromJSON(row)
@ -2224,4 +2360,10 @@ func TestParseJsonSparseFloatRow(t *testing.T) {
_, err := CreateSparseFloatRowFromJSON(row)
assert.Error(t, err)
})
t.Run("invalid dict row 8", func(t *testing.T) {
row := []byte(`{"1.1": 1.0, "3": 2.0, "5": 3.0}`)
_, err := CreateSparseFloatRowFromJSON(row)
assert.Error(t, err)
})
}

View File

@ -81,11 +81,17 @@ func (s *BulkInsertSuite) run() {
collectionName := "TestBulkInsert" + funcutil.GenRandomStr()
schema := integration.ConstructSchema(collectionName, dim, s.autoID,
&schemapb.FieldSchema{FieldID: 100, Name: "id", DataType: s.pkType, TypeParams: []*commonpb.KeyValuePair{{Key: common.MaxLengthKey, Value: "128"}}, IsPrimaryKey: true, AutoID: s.autoID},
&schemapb.FieldSchema{FieldID: 101, Name: "image_path", DataType: schemapb.DataType_VarChar, TypeParams: []*commonpb.KeyValuePair{{Key: common.MaxLengthKey, Value: "65535"}}},
&schemapb.FieldSchema{FieldID: 102, Name: "embeddings", DataType: s.vecType, TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "128"}}},
)
var schema *schemapb.CollectionSchema
fieldSchema1 := &schemapb.FieldSchema{FieldID: 100, Name: "id", DataType: s.pkType, TypeParams: []*commonpb.KeyValuePair{{Key: common.MaxLengthKey, Value: "128"}}, IsPrimaryKey: true, AutoID: s.autoID}
fieldSchema2 := &schemapb.FieldSchema{FieldID: 101, Name: "image_path", DataType: schemapb.DataType_VarChar, TypeParams: []*commonpb.KeyValuePair{{Key: common.MaxLengthKey, Value: "65535"}}}
fieldSchema3 := &schemapb.FieldSchema{FieldID: 102, Name: "embeddings", DataType: s.vecType, TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "128"}}}
fieldSchema4 := &schemapb.FieldSchema{FieldID: 103, Name: "embeddings", DataType: s.vecType, TypeParams: []*commonpb.KeyValuePair{}}
if s.vecType != schemapb.DataType_SparseFloatVector {
schema = integration.ConstructSchema(collectionName, dim, s.autoID, fieldSchema1, fieldSchema2, fieldSchema3)
} else {
schema = integration.ConstructSchema(collectionName, dim, s.autoID, fieldSchema1, fieldSchema2, fieldSchema4)
}
marshaledSchema, err := proto.Marshal(schema)
s.NoError(err)
@ -214,10 +220,13 @@ func (s *BulkInsertSuite) TestMultiFileTypes() {
s.metricType = metric.L2
s.run()
// s.vecType = schemapb.DataType_SparseFloatVector
// s.indexType = indexparamcheck.IndexSparseWand
// s.metricType = metric.IP
// s.run()
// TODO: not support numpy for SparseFloatVector by now
if fileType != importutilv2.Numpy {
s.vecType = schemapb.DataType_SparseFloatVector
s.indexType = indexparamcheck.IndexSparseWand
s.metricType = metric.IP
s.run()
}
}
}

View File

@ -109,87 +109,60 @@ func GenerateNumpyFiles(cm storage.ChunkManager, schema *schemapb.CollectionSche
path := fmt.Sprintf("%s/%s.npy", cm.RootPath(), field.GetName())
fieldID := field.GetFieldID()
fieldData := insertData.Data[fieldID]
dType := field.GetDataType()
switch dType {
case schemapb.DataType_Bool:
data = insertData.Data[fieldID].(*storage.BoolFieldData).Data
case schemapb.DataType_Int8:
data = insertData.Data[fieldID].(*storage.Int8FieldData).Data
case schemapb.DataType_Int16:
data = insertData.Data[fieldID].(*storage.Int16FieldData).Data
case schemapb.DataType_Int32:
data = insertData.Data[fieldID].(*storage.Int32FieldData).Data
case schemapb.DataType_Int64:
data = insertData.Data[fieldID].(*storage.Int64FieldData).Data
case schemapb.DataType_Float:
data = insertData.Data[fieldID].(*storage.FloatFieldData).Data
case schemapb.DataType_Double:
data = insertData.Data[fieldID].(*storage.DoubleFieldData).Data
case schemapb.DataType_String, schemapb.DataType_VarChar:
data = insertData.Data[fieldID].(*storage.StringFieldData).Data
case schemapb.DataType_BinaryVector:
vecData := insertData.Data[fieldID].(*storage.BinaryVectorFieldData).Data
if dim != insertData.Data[fieldID].(*storage.BinaryVectorFieldData).Dim {
panic(fmt.Sprintf("dim mis-match: %d, %d", dim, insertData.Data[fieldID].(*storage.BinaryVectorFieldData).Dim))
rows := fieldData.GetRows().([]byte)
if dim != fieldData.(*storage.BinaryVectorFieldData).Dim {
panic(fmt.Sprintf("dim mis-match: %d, %d", dim, fieldData.(*storage.BinaryVectorFieldData).Dim))
}
const rowBytes = dim / 8
rows := len(vecData) / rowBytes
binVecData := make([][rowBytes]byte, 0, rows)
for i := 0; i < rows; i++ {
rowVec := [rowBytes]byte{}
copy(rowVec[:], vecData[i*rowBytes:(i+1)*rowBytes])
binVecData = append(binVecData, rowVec)
chunked := lo.Chunk(rows, rowBytes)
chunkedRows := make([][rowBytes]byte, len(chunked))
for i, innerSlice := range chunked {
copy(chunkedRows[i][:], innerSlice[:])
}
data = binVecData
data = chunkedRows
case schemapb.DataType_FloatVector:
vecData := insertData.Data[fieldID].(*storage.FloatVectorFieldData).Data
if dim != insertData.Data[fieldID].(*storage.FloatVectorFieldData).Dim {
panic(fmt.Sprintf("dim mis-match: %d, %d", dim, insertData.Data[fieldID].(*storage.FloatVectorFieldData).Dim))
rows := fieldData.GetRows().([]float32)
if dim != fieldData.(*storage.FloatVectorFieldData).Dim {
panic(fmt.Sprintf("dim mis-match: %d, %d", dim, fieldData.(*storage.FloatVectorFieldData).Dim))
}
rows := len(vecData) / dim
floatVecData := make([][dim]float32, 0, rows)
for i := 0; i < rows; i++ {
rowVec := [dim]float32{}
copy(rowVec[:], vecData[i*dim:(i+1)*dim])
floatVecData = append(floatVecData, rowVec)
chunked := lo.Chunk(rows, dim)
chunkedRows := make([][dim]float32, len(chunked))
for i, innerSlice := range chunked {
copy(chunkedRows[i][:], innerSlice[:])
}
data = floatVecData
data = chunkedRows
case schemapb.DataType_Float16Vector:
vecData := insertData.Data[fieldID].(*storage.Float16VectorFieldData).Data
if dim != insertData.Data[fieldID].(*storage.Float16VectorFieldData).Dim {
panic(fmt.Sprintf("dim mis-match: %d, %d", dim, insertData.Data[fieldID].(*storage.Float16VectorFieldData).Dim))
rows := insertData.Data[fieldID].GetRows().([]byte)
if dim != fieldData.(*storage.Float16VectorFieldData).Dim {
panic(fmt.Sprintf("dim mis-match: %d, %d", dim, fieldData.(*storage.Float16VectorFieldData).Dim))
}
const rowBytes = dim * 2
rows := len(vecData) / rowBytes
float16VecData := make([][rowBytes]byte, 0, rows)
for i := 0; i < rows; i++ {
rowVec := [rowBytes]byte{}
copy(rowVec[:], vecData[i*rowBytes:(i+1)*rowBytes])
float16VecData = append(float16VecData, rowVec)
chunked := lo.Chunk(rows, rowBytes)
chunkedRows := make([][rowBytes]byte, len(chunked))
for i, innerSlice := range chunked {
copy(chunkedRows[i][:], innerSlice[:])
}
data = float16VecData
data = chunkedRows
case schemapb.DataType_BFloat16Vector:
vecData := insertData.Data[fieldID].(*storage.BFloat16VectorFieldData).Data
if dim != insertData.Data[fieldID].(*storage.BFloat16VectorFieldData).Dim {
panic(fmt.Sprintf("dim mis-match: %d, %d", dim, insertData.Data[fieldID].(*storage.BFloat16VectorFieldData).Dim))
rows := insertData.Data[fieldID].GetRows().([]byte)
if dim != fieldData.(*storage.BFloat16VectorFieldData).Dim {
panic(fmt.Sprintf("dim mis-match: %d, %d", dim, fieldData.(*storage.BFloat16VectorFieldData).Dim))
}
const rowBytes = dim * 2
rows := len(vecData) / rowBytes
bfloat16VecData := make([][rowBytes]byte, 0, rows)
for i := 0; i < rows; i++ {
rowVec := [rowBytes]byte{}
copy(rowVec[:], vecData[i*rowBytes:(i+1)*rowBytes])
bfloat16VecData = append(bfloat16VecData, rowVec)
chunked := lo.Chunk(rows, rowBytes)
chunkedRows := make([][rowBytes]byte, len(chunked))
for i, innerSlice := range chunked {
copy(chunkedRows[i][:], innerSlice[:])
}
data = bfloat16VecData
data = chunkedRows
case schemapb.DataType_SparseFloatVector:
data = insertData.Data[fieldID].(*storage.SparseFloatVectorFieldData).GetContents()
case schemapb.DataType_JSON:
data = insertData.Data[fieldID].(*storage.JSONFieldData).Data
case schemapb.DataType_Array:
data = insertData.Data[fieldID].(*storage.ArrayFieldData).Data
default:
panic(fmt.Sprintf("unsupported data type: %s", dType.String()))
data = insertData.Data[fieldID].GetRows()
}
err := writeFn(path, data)
@ -206,38 +179,9 @@ func GenerateNumpyFiles(cm storage.ChunkManager, schema *schemapb.CollectionSche
func GenerateJSONFile(t *testing.T, filePath string, schema *schemapb.CollectionSchema, count int) {
insertData, err := testutil.CreateInsertData(schema, count)
assert.NoError(t, err)
rows := make([]map[string]any, 0, count)
fieldIDToField := lo.KeyBy(schema.GetFields(), func(field *schemapb.FieldSchema) int64 {
return field.GetFieldID()
})
for i := 0; i < count; i++ {
data := make(map[int64]interface{})
for fieldID, v := range insertData.Data {
dataType := fieldIDToField[fieldID].GetDataType()
if fieldIDToField[fieldID].GetAutoID() {
continue
}
switch dataType {
case schemapb.DataType_Array:
data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetIntData().GetData()
case schemapb.DataType_JSON:
data[fieldID] = string(v.GetRow(i).([]byte))
case schemapb.DataType_BinaryVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector, schemapb.DataType_SparseFloatVector:
bytes := v.GetRow(i).([]byte)
ints := make([]int, 0, len(bytes))
for _, b := range bytes {
ints = append(ints, int(b))
}
data[fieldID] = ints
default:
data[fieldID] = v.GetRow(i)
}
}
row := lo.MapKeys(data, func(_ any, fieldID int64) string {
return fieldIDToField[fieldID].GetName()
})
rows = append(rows, row)
}
rows, err := testutil.CreateInsertDataRowsForJSON(schema, insertData)
assert.NoError(t, err)
jsonBytes, err := json.Marshal(rows)
assert.NoError(t, err)

View File

@ -93,7 +93,7 @@ def gen_binary_vectors(nb, dim):
return vectors
def gen_fp16_vectors(num, dim):
def gen_fp16_vectors(num, dim, for_json=False):
"""
generate float16 vector data
raw_vectors : the vectors
@ -105,13 +105,16 @@ def gen_fp16_vectors(num, dim):
for _ in range(num):
raw_vector = [random.random() for _ in range(dim)]
raw_vectors.append(raw_vector)
fp16_vector = np.array(raw_vector, dtype=np.float16).view(np.uint8).tolist()
if for_json:
fp16_vector = np.array(raw_vector, dtype=np.float16).tolist()
else:
fp16_vector = np.array(raw_vector, dtype=np.float16).view(np.uint8).tolist()
fp16_vectors.append(fp16_vector)
return raw_vectors, fp16_vectors
def gen_bf16_vectors(num, dim):
def gen_bf16_vectors(num, dim, for_json=False):
"""
generate brain float16 vector data
raw_vectors : the vectors
@ -123,7 +126,10 @@ def gen_bf16_vectors(num, dim):
for _ in range(num):
raw_vector = [random.random() for _ in range(dim)]
raw_vectors.append(raw_vector)
bf16_vector = np.array(jnp.array(raw_vector, dtype=jnp.bfloat16)).view(np.uint8).tolist()
if for_json:
bf16_vector = np.array(jnp.array(raw_vector, dtype=jnp.bfloat16)).tolist()
else:
bf16_vector = np.array(jnp.array(raw_vector, dtype=jnp.bfloat16)).view(np.uint8).tolist()
bf16_vectors.append(bf16_vector)
return raw_vectors, bf16_vectors
@ -603,9 +609,9 @@ def gen_dict_data_by_data_field(data_fields, rows, start=0, float_vector=True, d
float_vector = False
d[data_field] = gen_vectors(float_vector=float_vector, rows=1, dim=dim)[0]
if "bf16" in data_field:
d[data_field] = gen_bf16_vectors(1, dim)[1][0]
d[data_field] = gen_bf16_vectors(1, dim, True)[1][0]
if "fp16" in data_field:
d[data_field] = gen_fp16_vectors(1, dim)[1][0]
d[data_field] = gen_fp16_vectors(1, dim, True)[1][0]
elif data_field == DataField.float_field:
d[data_field] = random.random()
elif data_field == DataField.double_field: