mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
feat: Add nullable vector support in import utility layer (#46142)
related: #45993 Add nullable vector support in import utility layer Key changes: ImportV2 util: - Add nullable vector types (FloatVector, Float16Vector, BFloat16Vector, BinaryVector, SparseFloatVector, Int8Vector) to AppendNullableDefaultFieldsData() - Add tests for nullable vector field data appending CSV/JSON/Numpy readers: - Add nullPercent parameter to test data generation for better null coverage - Mark vector fields as nullable in test schemas - Add test cases for nullable vector field parsing - Refactor tests to use loop-based approach with 0%, 50%, 100% null percentages Parquet field reader: - Add ReadNullableBinaryData() for nullable BinaryVector/Float16Vector/BFloat16Vector - Add ReadNullableFloatVectorData() for nullable FloatVector - Add ReadNullableSparseFloatVectorData() for nullable SparseFloatVector - Add ReadNullableInt8VectorData() for nullable Int8Vector - Add ReadNullableStructData() for generic nullable struct data - Update Next() to use nullable read methods when field is nullable - Add null data validation for non-nullable fields <!-- This is an auto-generated comment: release notes by coderabbit.ai --> - Core invariant: import must preserve per-row alignment and validity for every field — nullable vector fields are expected to be encoded with per-row validity masks and all readers/writers must emit arrays aligned to original input rows (null entries represented explicitly). - New feature & scope: adds end-to-end nullable-vector support in the import utility layer — AppendNullableDefaultFieldsData in internal/datanode/importv2/util.go now appends nil placeholders for nullable vectors (FloatVector, Float16Vector, BFloat16Vector, BinaryVector, SparseFloatVector, Int8Vector); parquet reader (internal/util/importutilv2/parquet/field_reader.go) adds ReadNullableBinaryData, ReadNullableFloatVectorData, ReadNullableSparseFloatVectorData, ReadNullableInt8VectorData, ReadNullableStructData and routes nullable branches to these helpers; CSV/JSON/Numpy readers and test utilities updated to generate and validate 0/50/100% null scenarios and mark vector fields as nullable in test schemas. - Logic removed / simplified: eliminates ad-hoc "parameter-invalid" rejections for nullable vectors inside FieldReader.Next by centralizing nullable handling into ReadNullable* helpers and shared validators (getArrayDataNullable, checkNullableVectorAlignWithDim/checkNullableVectorAligned), simplifying control flow and removing scattered special-case checks. - No data loss / no regression (concrete code paths): nulls are preserved end-to-end — AppendNullableDefaultFieldsData explicitly inserts nil entries per null row (datanode import append path); ReadNullable*Data helpers return both data and []bool validity masks so callers in field_reader.go and downstream readers receive exact per-row validity; testutil.BuildSparseVectorData was extended to accept validData so sparse vectors are materialized only for valid rows while null rows are represented as missing. These concrete paths ensure null rows are represented rather than dropped, preventing data loss or behavioral regression. <!-- end of auto-generated comment: release notes by coderabbit.ai --> Signed-off-by: marcelo-cjl <marcelo.chen@zilliz.com>
This commit is contained in:
parent
dce44f2a20
commit
3c2cf2c066
@ -374,6 +374,19 @@ func AppendNullableDefaultFieldsData(schema *schemapb.CollectionSchema, data *st
|
||||
appender := &nullDefaultAppender[*schemapb.ScalarField]{}
|
||||
err = appender.AppendNull(fieldData, rowNum)
|
||||
}
|
||||
case schemapb.DataType_FloatVector,
|
||||
schemapb.DataType_Float16Vector,
|
||||
schemapb.DataType_BFloat16Vector,
|
||||
schemapb.DataType_BinaryVector,
|
||||
schemapb.DataType_SparseFloatVector,
|
||||
schemapb.DataType_Int8Vector:
|
||||
if nullable {
|
||||
for i := 0; i < rowNum; i++ {
|
||||
if err = fieldData.AppendRow(nil); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("Unexpected data type: %d, cannot be filled with default value", dataType)
|
||||
}
|
||||
|
||||
@ -495,10 +495,52 @@ func Test_AppendNullableDefaultFieldsData(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "float vector is nullable",
|
||||
fieldID: 200,
|
||||
dataType: schemapb.DataType_FloatVector,
|
||||
nullable: true,
|
||||
},
|
||||
{
|
||||
name: "float16 vector is nullable",
|
||||
fieldID: 200,
|
||||
dataType: schemapb.DataType_Float16Vector,
|
||||
nullable: true,
|
||||
},
|
||||
{
|
||||
name: "bfloat16 vector is nullable",
|
||||
fieldID: 200,
|
||||
dataType: schemapb.DataType_BFloat16Vector,
|
||||
nullable: true,
|
||||
},
|
||||
{
|
||||
name: "binary vector is nullable",
|
||||
fieldID: 200,
|
||||
dataType: schemapb.DataType_BinaryVector,
|
||||
nullable: true,
|
||||
},
|
||||
{
|
||||
name: "sparse float vector is nullable",
|
||||
fieldID: 200,
|
||||
dataType: schemapb.DataType_SparseFloatVector,
|
||||
nullable: true,
|
||||
},
|
||||
{
|
||||
name: "int8 vector is nullable",
|
||||
fieldID: 200,
|
||||
dataType: schemapb.DataType_Int8Vector,
|
||||
nullable: true,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
schema := buildSchemaFn()
|
||||
isVectorType := tt.dataType == schemapb.DataType_FloatVector ||
|
||||
tt.dataType == schemapb.DataType_Float16Vector ||
|
||||
tt.dataType == schemapb.DataType_BFloat16Vector ||
|
||||
tt.dataType == schemapb.DataType_BinaryVector ||
|
||||
tt.dataType == schemapb.DataType_SparseFloatVector ||
|
||||
tt.dataType == schemapb.DataType_Int8Vector
|
||||
fieldSchema := &schemapb.FieldSchema{
|
||||
FieldID: tt.fieldID,
|
||||
Name: fmt.Sprintf("field_%d", tt.fieldID),
|
||||
@ -511,10 +553,12 @@ func Test_AppendNullableDefaultFieldsData(t *testing.T) {
|
||||
fieldSchema.TypeParams = append(fieldSchema.TypeParams, &commonpb.KeyValuePair{Key: common.MaxCapacityKey, Value: "100"})
|
||||
} else if tt.dataType == schemapb.DataType_VarChar {
|
||||
fieldSchema.TypeParams = append(fieldSchema.TypeParams, &commonpb.KeyValuePair{Key: common.MaxLengthKey, Value: "100"})
|
||||
} else if isVectorType && tt.dataType != schemapb.DataType_SparseFloatVector {
|
||||
fieldSchema.TypeParams = append(fieldSchema.TypeParams, &commonpb.KeyValuePair{Key: common.DimKey, Value: "8"})
|
||||
}
|
||||
|
||||
// create data without the new field
|
||||
insertData, err := testutil.CreateInsertData(schema, count)
|
||||
insertData, err := testutil.CreateInsertData(schema, count, 100)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// add new nullalbe/default field to the schema
|
||||
@ -524,7 +568,7 @@ func Test_AppendNullableDefaultFieldsData(t *testing.T) {
|
||||
tempSchema := &schemapb.CollectionSchema{
|
||||
Fields: []*schemapb.FieldSchema{fieldSchema},
|
||||
}
|
||||
tempData, err := testutil.CreateInsertData(tempSchema, 1)
|
||||
tempData, err := testutil.CreateInsertData(tempSchema, 1, 100)
|
||||
assert.NoError(t, err)
|
||||
insertData.Data[fieldSchema.GetFieldID()] = tempData.Data[fieldSchema.GetFieldID()]
|
||||
|
||||
|
||||
@ -58,7 +58,7 @@ func (suite *ReaderSuite) SetupTest() {
|
||||
suite.vecDataType = schemapb.DataType_FloatVector
|
||||
}
|
||||
|
||||
func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.DataType, nullable bool) {
|
||||
func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.DataType, nullable bool, nullPercent int) {
|
||||
schema := &schemapb.CollectionSchema{
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
@ -83,6 +83,7 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data
|
||||
Value: "8",
|
||||
},
|
||||
},
|
||||
Nullable: nullable,
|
||||
},
|
||||
{
|
||||
FieldID: 102,
|
||||
@ -129,7 +130,7 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data
|
||||
nullkey := ""
|
||||
|
||||
// generate csv data
|
||||
insertData, err := testutil.CreateInsertData(schema, suite.numRows)
|
||||
insertData, err := testutil.CreateInsertData(schema, suite.numRows, nullPercent)
|
||||
suite.NoError(err)
|
||||
csvData, err := testutil.CreateInsertDataForCSV(schema, insertData, nullkey)
|
||||
suite.NoError(err)
|
||||
@ -178,65 +179,60 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data
|
||||
}
|
||||
|
||||
func (suite *ReaderSuite) TestReadScalarFields() {
|
||||
suite.run(schemapb.DataType_Bool, schemapb.DataType_None, false)
|
||||
suite.run(schemapb.DataType_Int8, schemapb.DataType_None, false)
|
||||
suite.run(schemapb.DataType_Int16, schemapb.DataType_None, false)
|
||||
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false)
|
||||
suite.run(schemapb.DataType_Int64, schemapb.DataType_None, false)
|
||||
suite.run(schemapb.DataType_Float, schemapb.DataType_None, false)
|
||||
suite.run(schemapb.DataType_Double, schemapb.DataType_None, false)
|
||||
suite.run(schemapb.DataType_String, schemapb.DataType_None, false)
|
||||
suite.run(schemapb.DataType_VarChar, schemapb.DataType_None, false)
|
||||
suite.run(schemapb.DataType_JSON, schemapb.DataType_None, false)
|
||||
elementTypes := []schemapb.DataType{
|
||||
schemapb.DataType_Bool,
|
||||
schemapb.DataType_Int8,
|
||||
schemapb.DataType_Int16,
|
||||
schemapb.DataType_Int32,
|
||||
schemapb.DataType_Int64,
|
||||
schemapb.DataType_Float,
|
||||
schemapb.DataType_Double,
|
||||
schemapb.DataType_String,
|
||||
}
|
||||
scalarTypes := append(elementTypes, []schemapb.DataType{schemapb.DataType_VarChar, schemapb.DataType_JSON, schemapb.DataType_Array}...)
|
||||
|
||||
suite.run(schemapb.DataType_Array, schemapb.DataType_Bool, false)
|
||||
suite.run(schemapb.DataType_Array, schemapb.DataType_Int8, false)
|
||||
suite.run(schemapb.DataType_Array, schemapb.DataType_Int16, false)
|
||||
suite.run(schemapb.DataType_Array, schemapb.DataType_Int32, false)
|
||||
suite.run(schemapb.DataType_Array, schemapb.DataType_Int64, false)
|
||||
suite.run(schemapb.DataType_Array, schemapb.DataType_Float, false)
|
||||
suite.run(schemapb.DataType_Array, schemapb.DataType_Double, false)
|
||||
suite.run(schemapb.DataType_Array, schemapb.DataType_String, false)
|
||||
|
||||
suite.run(schemapb.DataType_Bool, schemapb.DataType_None, true)
|
||||
suite.run(schemapb.DataType_Int8, schemapb.DataType_None, true)
|
||||
suite.run(schemapb.DataType_Int16, schemapb.DataType_None, true)
|
||||
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, true)
|
||||
suite.run(schemapb.DataType_Int64, schemapb.DataType_None, true)
|
||||
suite.run(schemapb.DataType_Float, schemapb.DataType_None, true)
|
||||
suite.run(schemapb.DataType_Double, schemapb.DataType_None, true)
|
||||
suite.run(schemapb.DataType_String, schemapb.DataType_None, true)
|
||||
suite.run(schemapb.DataType_VarChar, schemapb.DataType_None, true)
|
||||
suite.run(schemapb.DataType_JSON, schemapb.DataType_None, true)
|
||||
|
||||
suite.run(schemapb.DataType_Array, schemapb.DataType_Bool, true)
|
||||
suite.run(schemapb.DataType_Array, schemapb.DataType_Int8, true)
|
||||
suite.run(schemapb.DataType_Array, schemapb.DataType_Int16, true)
|
||||
suite.run(schemapb.DataType_Array, schemapb.DataType_Int32, true)
|
||||
suite.run(schemapb.DataType_Array, schemapb.DataType_Int64, true)
|
||||
suite.run(schemapb.DataType_Array, schemapb.DataType_Float, true)
|
||||
suite.run(schemapb.DataType_Array, schemapb.DataType_Double, true)
|
||||
suite.run(schemapb.DataType_Array, schemapb.DataType_String, true)
|
||||
for _, dataType := range scalarTypes {
|
||||
if dataType == schemapb.DataType_Array {
|
||||
for _, elementType := range elementTypes {
|
||||
suite.run(dataType, elementType, false, 0)
|
||||
for _, nullPercent := range []int{0, 50, 100} {
|
||||
suite.run(dataType, elementType, true, nullPercent)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
suite.run(dataType, schemapb.DataType_None, false, 0)
|
||||
for _, nullPercent := range []int{0, 50, 100} {
|
||||
suite.run(dataType, schemapb.DataType_None, true, nullPercent)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (suite *ReaderSuite) TestStringPK() {
|
||||
suite.pkDataType = schemapb.DataType_VarChar
|
||||
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false)
|
||||
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false, 0)
|
||||
for _, nullPercent := range []int{0, 50, 100} {
|
||||
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, true, nullPercent)
|
||||
}
|
||||
}
|
||||
|
||||
func (suite *ReaderSuite) TestVector() {
|
||||
suite.vecDataType = schemapb.DataType_BinaryVector
|
||||
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false)
|
||||
suite.vecDataType = schemapb.DataType_FloatVector
|
||||
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false)
|
||||
suite.vecDataType = schemapb.DataType_Float16Vector
|
||||
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false)
|
||||
suite.vecDataType = schemapb.DataType_BFloat16Vector
|
||||
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false)
|
||||
suite.vecDataType = schemapb.DataType_SparseFloatVector
|
||||
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false)
|
||||
suite.vecDataType = schemapb.DataType_Int8Vector
|
||||
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false)
|
||||
dataTypes := []schemapb.DataType{
|
||||
schemapb.DataType_BinaryVector,
|
||||
schemapb.DataType_FloatVector,
|
||||
schemapb.DataType_Float16Vector,
|
||||
schemapb.DataType_BFloat16Vector,
|
||||
schemapb.DataType_SparseFloatVector,
|
||||
schemapb.DataType_Int8Vector,
|
||||
}
|
||||
|
||||
for _, dataType := range dataTypes {
|
||||
suite.vecDataType = dataType
|
||||
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false, 0)
|
||||
for _, nullPercent := range []int{0, 50, 100} {
|
||||
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, true, nullPercent)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (suite *ReaderSuite) TestError() {
|
||||
|
||||
@ -131,35 +131,41 @@ func (suite *RowParserSuite) createAllTypesSchema() *schemapb.CollectionSchema {
|
||||
Name: "float_vector",
|
||||
DataType: schemapb.DataType_FloatVector,
|
||||
TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "2"}},
|
||||
Nullable: suite.hasNullable,
|
||||
},
|
||||
{
|
||||
FieldID: 22,
|
||||
Name: "bin_vector",
|
||||
DataType: schemapb.DataType_BinaryVector,
|
||||
TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "16"}},
|
||||
Nullable: suite.hasNullable,
|
||||
},
|
||||
{
|
||||
FieldID: 23,
|
||||
Name: "sparse_vector",
|
||||
DataType: schemapb.DataType_SparseFloatVector,
|
||||
Nullable: suite.hasNullable,
|
||||
},
|
||||
{
|
||||
FieldID: 24,
|
||||
Name: "f16_vector",
|
||||
DataType: schemapb.DataType_Float16Vector,
|
||||
TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "2"}},
|
||||
Nullable: suite.hasNullable,
|
||||
},
|
||||
{
|
||||
FieldID: 25,
|
||||
Name: "bf16_vector",
|
||||
DataType: schemapb.DataType_BFloat16Vector,
|
||||
TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "2"}},
|
||||
Nullable: suite.hasNullable,
|
||||
},
|
||||
{
|
||||
FieldID: 26,
|
||||
Name: "int8_vector",
|
||||
DataType: schemapb.DataType_Int8Vector,
|
||||
TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "2"}},
|
||||
Nullable: suite.hasNullable,
|
||||
},
|
||||
{
|
||||
FieldID: 27,
|
||||
@ -648,6 +654,12 @@ func (suite *RowParserSuite) TestValid() {
|
||||
suite.runValid(&testCase{name: "A/N/D nullable field varchar is nil", content: suite.genAllTypesRowData("varchar", suite.nullKey)})
|
||||
suite.runValid(&testCase{name: "A/N/D nullable field json is nil", content: suite.genAllTypesRowData("json", suite.nullKey)})
|
||||
suite.runValid(&testCase{name: "A/N/D nullable field array_int8 is nil", content: suite.genAllTypesRowData("array_int8", suite.nullKey)})
|
||||
suite.runValid(&testCase{name: "A/N/D nullable field float_vector is nil", content: suite.genAllTypesRowData("float_vector", suite.nullKey)})
|
||||
suite.runValid(&testCase{name: "A/N/D nullable field bin_vector is nil", content: suite.genAllTypesRowData("bin_vector", suite.nullKey)})
|
||||
suite.runValid(&testCase{name: "A/N/D nullable field sparse_vector is nil", content: suite.genAllTypesRowData("sparse_vector", suite.nullKey)})
|
||||
suite.runValid(&testCase{name: "A/N/D nullable field f16_vector is nil", content: suite.genAllTypesRowData("f16_vector", suite.nullKey)})
|
||||
suite.runValid(&testCase{name: "A/N/D nullable field bf16_vector is nil", content: suite.genAllTypesRowData("bf16_vector", suite.nullKey)})
|
||||
suite.runValid(&testCase{name: "A/N/D nullable field int8_vector is nil", content: suite.genAllTypesRowData("int8_vector", suite.nullKey)})
|
||||
|
||||
// Test struct array parsing
|
||||
suite.runValid(&testCase{name: "A/N/D struct array valid", content: suite.genAllTypesRowData("x", "2")})
|
||||
@ -704,13 +716,6 @@ func (suite *RowParserSuite) TestParseError() {
|
||||
suite.Error(err)
|
||||
suite.Nil(parser)
|
||||
|
||||
// field value missed
|
||||
content = suite.genAllTypesRowData("x", "2", "float_vector")
|
||||
header, _ = suite.genRowContent(schema, content)
|
||||
parser, err = NewRowParser(schema, header, suite.nullKey)
|
||||
suite.Error(err)
|
||||
suite.Nil(parser)
|
||||
|
||||
// function output no need provide
|
||||
content = suite.genAllTypesRowData("x", "2")
|
||||
content["function_sparse_vector"] = "{\"1\":0.5,\"10\":1.5,\"100\":2.5}"
|
||||
|
||||
@ -56,7 +56,7 @@ func (suite *ReaderSuite) SetupTest() {
|
||||
suite.vecDataType = schemapb.DataType_FloatVector
|
||||
}
|
||||
|
||||
func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.DataType, nullable bool) {
|
||||
func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.DataType, nullable bool, nullPercent int) {
|
||||
schema := &schemapb.CollectionSchema{
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
@ -81,6 +81,7 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data
|
||||
Value: "8",
|
||||
},
|
||||
},
|
||||
Nullable: nullable,
|
||||
},
|
||||
{
|
||||
FieldID: 102,
|
||||
@ -121,7 +122,7 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data
|
||||
})
|
||||
}
|
||||
|
||||
insertData, err := testutil.CreateInsertData(schema, suite.numRows)
|
||||
insertData, err := testutil.CreateInsertData(schema, suite.numRows, nullPercent)
|
||||
suite.NoError(err)
|
||||
|
||||
rows, err := testutil.CreateInsertDataRowsForJSON(schema, insertData)
|
||||
@ -261,12 +262,16 @@ func (suite *ReaderSuite) TestReadScalarFields() {
|
||||
for _, dataType := range scalarTypes {
|
||||
if dataType == schemapb.DataType_Array {
|
||||
for _, elementType := range elementTypes {
|
||||
suite.run(dataType, elementType, false)
|
||||
suite.run(dataType, elementType, true)
|
||||
suite.run(dataType, elementType, false, 0)
|
||||
for _, nullPercent := range []int{0, 50, 100} {
|
||||
suite.run(dataType, elementType, true, nullPercent)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
suite.run(dataType, schemapb.DataType_None, false)
|
||||
suite.run(dataType, schemapb.DataType_None, true)
|
||||
suite.run(dataType, schemapb.DataType_None, false, 0)
|
||||
for _, nullPercent := range []int{0, 50, 100} {
|
||||
suite.run(dataType, schemapb.DataType_None, true, nullPercent)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -291,22 +296,29 @@ func (suite *ReaderSuite) TestReadScalarFieldsWithDefaultValue() {
|
||||
|
||||
func (suite *ReaderSuite) TestStringPK() {
|
||||
suite.pkDataType = schemapb.DataType_VarChar
|
||||
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false)
|
||||
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false, 0)
|
||||
for _, nullPercent := range []int{0, 50, 100} {
|
||||
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, true, nullPercent)
|
||||
}
|
||||
}
|
||||
|
||||
func (suite *ReaderSuite) TestVector() {
|
||||
suite.vecDataType = schemapb.DataType_BinaryVector
|
||||
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false)
|
||||
suite.vecDataType = schemapb.DataType_FloatVector
|
||||
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false)
|
||||
suite.vecDataType = schemapb.DataType_Float16Vector
|
||||
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false)
|
||||
suite.vecDataType = schemapb.DataType_BFloat16Vector
|
||||
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false)
|
||||
suite.vecDataType = schemapb.DataType_SparseFloatVector
|
||||
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false)
|
||||
suite.vecDataType = schemapb.DataType_Int8Vector
|
||||
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false)
|
||||
dataTypes := []schemapb.DataType{
|
||||
schemapb.DataType_BinaryVector,
|
||||
schemapb.DataType_FloatVector,
|
||||
schemapb.DataType_Float16Vector,
|
||||
schemapb.DataType_BFloat16Vector,
|
||||
schemapb.DataType_SparseFloatVector,
|
||||
schemapb.DataType_Int8Vector,
|
||||
}
|
||||
|
||||
for _, dataType := range dataTypes {
|
||||
suite.vecDataType = dataType
|
||||
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false, 0)
|
||||
for _, nullPercent := range []int{0, 50, 100} {
|
||||
suite.run(schemapb.DataType_Int32, schemapb.DataType_None, true, nullPercent)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (suite *ReaderSuite) TestDecodeError() {
|
||||
|
||||
@ -110,35 +110,41 @@ func (suite *RowParserSuite) createAllTypesSchema() *schemapb.CollectionSchema {
|
||||
Name: "float_vector",
|
||||
DataType: schemapb.DataType_FloatVector,
|
||||
TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "2"}},
|
||||
Nullable: suite.hasNullable,
|
||||
},
|
||||
{
|
||||
FieldID: 22,
|
||||
Name: "bin_vector",
|
||||
DataType: schemapb.DataType_BinaryVector,
|
||||
TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "16"}},
|
||||
Nullable: suite.hasNullable,
|
||||
},
|
||||
{
|
||||
FieldID: 23,
|
||||
Name: "sparse_vector",
|
||||
DataType: schemapb.DataType_SparseFloatVector,
|
||||
Nullable: suite.hasNullable,
|
||||
},
|
||||
{
|
||||
FieldID: 24,
|
||||
Name: "f16_vector",
|
||||
DataType: schemapb.DataType_Float16Vector,
|
||||
TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "2"}},
|
||||
Nullable: suite.hasNullable,
|
||||
},
|
||||
{
|
||||
FieldID: 25,
|
||||
Name: "bf16_vector",
|
||||
DataType: schemapb.DataType_BFloat16Vector,
|
||||
TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "2"}},
|
||||
Nullable: suite.hasNullable,
|
||||
},
|
||||
{
|
||||
FieldID: 26,
|
||||
Name: "int8_vector",
|
||||
DataType: schemapb.DataType_Int8Vector,
|
||||
TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "2"}},
|
||||
Nullable: suite.hasNullable,
|
||||
},
|
||||
{
|
||||
FieldID: 27,
|
||||
@ -633,6 +639,12 @@ func (suite *RowParserSuite) TestValid() {
|
||||
suite.runValid(&testCase{name: "A/N/D nullable field varchar is nil", content: suite.genAllTypesRowData("varchar", nil)})
|
||||
suite.runValid(&testCase{name: "A/N/D nullable field json is nil", content: suite.genAllTypesRowData("json", nil)})
|
||||
suite.runValid(&testCase{name: "A/N/D nullable field array_int8 is nil", content: suite.genAllTypesRowData("array_int8", nil)})
|
||||
suite.runValid(&testCase{name: "A/N/D nullable field float_vector is nil", content: suite.genAllTypesRowData("float_vector", nil)})
|
||||
suite.runValid(&testCase{name: "A/N/D nullable field bin_vector is nil", content: suite.genAllTypesRowData("bin_vector", nil)})
|
||||
suite.runValid(&testCase{name: "A/N/D nullable field sparse_vector is nil", content: suite.genAllTypesRowData("sparse_vector", nil)})
|
||||
suite.runValid(&testCase{name: "A/N/D nullable field f16_vector is nil", content: suite.genAllTypesRowData("f16_vector", nil)})
|
||||
suite.runValid(&testCase{name: "A/N/D nullable field bf16_vector is nil", content: suite.genAllTypesRowData("bf16_vector", nil)})
|
||||
suite.runValid(&testCase{name: "A/N/D nullable field int8_vector is nil", content: suite.genAllTypesRowData("int8_vector", nil)})
|
||||
|
||||
suite.setSchema(false, true, true)
|
||||
suite.runValid(&testCase{name: "N/D valid parse", content: suite.genAllTypesRowData("x", 2)})
|
||||
@ -675,7 +687,6 @@ func (suite *RowParserSuite) TestParseError() {
|
||||
{name: "not a JSON for dynamic", content: suite.genAllTypesRowData("$meta", []int{})},
|
||||
{name: "exceeds max length varchar", content: suite.genAllTypesRowData("varchar", "aaaaaaaaaa")},
|
||||
{name: "exceeds max capacity", content: suite.genAllTypesRowData("array_int8", []int{1, 2, 3, 4, 5})},
|
||||
{name: "field value missed", content: suite.genAllTypesRowData("x", 2, "float_vector")},
|
||||
{name: "type error bool", content: suite.genAllTypesRowData("bool", 0.2)},
|
||||
{name: "type error int8", content: suite.genAllTypesRowData("int8", []int32{})},
|
||||
{name: "type error int16", content: suite.genAllTypesRowData("int16", []int32{})},
|
||||
|
||||
@ -433,18 +433,18 @@ func (suite *ReaderSuite) TestStringPK() {
|
||||
}
|
||||
|
||||
func (suite *ReaderSuite) TestVector() {
|
||||
suite.vecDataType = schemapb.DataType_BinaryVector
|
||||
suite.run(schemapb.DataType_Int32)
|
||||
suite.vecDataType = schemapb.DataType_FloatVector
|
||||
suite.run(schemapb.DataType_Int32)
|
||||
suite.vecDataType = schemapb.DataType_Float16Vector
|
||||
suite.run(schemapb.DataType_Int32)
|
||||
suite.vecDataType = schemapb.DataType_BFloat16Vector
|
||||
suite.run(schemapb.DataType_Int32)
|
||||
// suite.vecDataType = schemapb.DataType_SparseFloatVector
|
||||
// suite.run(schemapb.DataType_Int32)
|
||||
suite.vecDataType = schemapb.DataType_Int8Vector
|
||||
suite.run(schemapb.DataType_Int32)
|
||||
dataTypes := []schemapb.DataType{
|
||||
schemapb.DataType_BinaryVector,
|
||||
schemapb.DataType_FloatVector,
|
||||
schemapb.DataType_Float16Vector,
|
||||
schemapb.DataType_BFloat16Vector,
|
||||
schemapb.DataType_Int8Vector,
|
||||
}
|
||||
|
||||
for _, dataType := range dataTypes {
|
||||
suite.vecDataType = dataType
|
||||
suite.run(schemapb.DataType_Int32)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNumpyCreateReaders(t *testing.T) {
|
||||
|
||||
@ -189,13 +189,13 @@ func (c *FieldReader) Next(count int64) (any, any, error) {
|
||||
case schemapb.DataType_BinaryVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector:
|
||||
// vector not support default_value
|
||||
if c.field.GetNullable() {
|
||||
return nil, nil, merr.WrapErrParameterInvalidMsg("not support nullable in vector")
|
||||
return ReadNullableBinaryData(c, count)
|
||||
}
|
||||
data, err := ReadBinaryData(c, count)
|
||||
return data, nil, err
|
||||
case schemapb.DataType_FloatVector:
|
||||
if c.field.GetNullable() {
|
||||
return nil, nil, merr.WrapErrParameterInvalidMsg("not support nullable in vector")
|
||||
return ReadNullableFloatVectorData(c, count)
|
||||
}
|
||||
arrayData, err := ReadIntegerOrFloatArrayData[float32](c, count)
|
||||
if err != nil {
|
||||
@ -208,13 +208,13 @@ func (c *FieldReader) Next(count int64) (any, any, error) {
|
||||
return vectors, nil, nil
|
||||
case schemapb.DataType_SparseFloatVector:
|
||||
if c.field.GetNullable() {
|
||||
return nil, nil, merr.WrapErrParameterInvalidMsg("not support nullable in vector")
|
||||
return ReadNullableSparseFloatVectorData(c, count)
|
||||
}
|
||||
data, err := ReadSparseFloatVectorData(c, count)
|
||||
return data, nil, err
|
||||
case schemapb.DataType_Int8Vector:
|
||||
if c.field.GetNullable() {
|
||||
return nil, nil, merr.WrapErrParameterInvalidMsg("not support nullable in vector")
|
||||
return ReadNullableInt8VectorData(c, count)
|
||||
}
|
||||
arrayData, err := ReadIntegerOrFloatArrayData[int8](c, count)
|
||||
if err != nil {
|
||||
@ -493,9 +493,6 @@ func ReadNullableIntegerOrFloatData[T constraints.Integer | constraints.Float](p
|
||||
// Value type of "indices" is array.List, element type is array.Uint32
|
||||
// Value type of "values" is array.List, element type is array.Float32
|
||||
// The length of the list is equal to the length of chunked.Chunks()
|
||||
//
|
||||
// Note: now the ReadStructData() is used by SparseVector type and SparseVector is not nullable,
|
||||
// create a new method ReadNullableStructData() if we have nullable struct type in future.
|
||||
func ReadStructData(pcr *FieldReader, count int64) ([]map[string]arrow.Array, error) {
|
||||
chunked, err := pcr.columnReader.NextBatch(count)
|
||||
if err != nil {
|
||||
@ -524,6 +521,40 @@ func ReadStructData(pcr *FieldReader, count int64) ([]map[string]arrow.Array, er
|
||||
return data, nil
|
||||
}
|
||||
|
||||
func ReadNullableStructData(pcr *FieldReader, count int64) ([]map[string]arrow.Array, []bool, error) {
|
||||
chunked, err := pcr.columnReader.NextBatch(count)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
data := make([]map[string]arrow.Array, 0, count)
|
||||
validData := make([]bool, 0, count)
|
||||
|
||||
for _, chunk := range chunked.Chunks() {
|
||||
structReader, ok := chunk.(*array.Struct)
|
||||
if !ok {
|
||||
return nil, nil, WrapTypeErr(pcr.field, chunk.DataType().Name())
|
||||
}
|
||||
|
||||
structType := structReader.DataType().(*arrow.StructType)
|
||||
rows := structReader.Len()
|
||||
// Sparse storage: only store valid rows' data
|
||||
for i := 0; i < rows; i++ {
|
||||
validData = append(validData, !structReader.IsNull(i))
|
||||
if !structReader.IsNull(i) {
|
||||
st := make(map[string]arrow.Array)
|
||||
for k, field := range structType.Fields() {
|
||||
st[field.Name] = structReader.Field(k)
|
||||
}
|
||||
data = append(data, st)
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(data) == 0 && len(validData) == 0 {
|
||||
return nil, nil, nil
|
||||
}
|
||||
return data, validData, nil
|
||||
}
|
||||
|
||||
func ReadStringData(pcr *FieldReader, count int64, isVarcharField bool) (any, error) {
|
||||
chunked, err := pcr.columnReader.NextBatch(count)
|
||||
if err != nil {
|
||||
@ -851,6 +882,61 @@ func ReadBinaryData(pcr *FieldReader, count int64) (any, error) {
|
||||
return data, nil
|
||||
}
|
||||
|
||||
func ReadNullableBinaryData(pcr *FieldReader, count int64) (any, []bool, error) {
|
||||
dataType := pcr.field.GetDataType()
|
||||
chunked, err := pcr.columnReader.NextBatch(count)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
data := make([]byte, 0, count)
|
||||
validData := make([]bool, 0, count)
|
||||
|
||||
// Sparse storage: only store valid rows' data
|
||||
for _, chunk := range chunked.Chunks() {
|
||||
rows := chunk.Data().Len()
|
||||
switch chunk.DataType().ID() {
|
||||
case arrow.NULL:
|
||||
for i := 0; i < rows; i++ {
|
||||
validData = append(validData, false)
|
||||
}
|
||||
case arrow.BINARY:
|
||||
binaryReader := chunk.(*array.Binary)
|
||||
for i := 0; i < rows; i++ {
|
||||
if binaryReader.IsNull(i) {
|
||||
validData = append(validData, false)
|
||||
} else {
|
||||
data = append(data, binaryReader.Value(i)...)
|
||||
validData = append(validData, true)
|
||||
}
|
||||
}
|
||||
case arrow.LIST:
|
||||
listReader := chunk.(*array.List)
|
||||
if err = checkNullableVectorAligned(listReader.Offsets(), listReader, pcr.dim, dataType); err != nil {
|
||||
return nil, nil, merr.WrapErrImportFailed(fmt.Sprintf("length of vector is not aligned: %s, data type: %s", err.Error(), dataType.String()))
|
||||
}
|
||||
uint8Reader, ok := listReader.ListValues().(*array.Uint8)
|
||||
if !ok {
|
||||
return nil, nil, WrapTypeErr(pcr.field, listReader.ListValues().DataType().Name())
|
||||
}
|
||||
for i := 0; i < rows; i++ {
|
||||
if listReader.IsNull(i) {
|
||||
validData = append(validData, false)
|
||||
} else {
|
||||
start, end := listReader.ValueOffsets(i)
|
||||
data = append(data, uint8Reader.Uint8Values()[start:end]...)
|
||||
validData = append(validData, true)
|
||||
}
|
||||
}
|
||||
default:
|
||||
return nil, nil, WrapTypeErr(pcr.field, chunk.DataType().Name())
|
||||
}
|
||||
}
|
||||
if len(data) == 0 && len(validData) == 0 {
|
||||
return nil, nil, nil
|
||||
}
|
||||
return data, validData, nil
|
||||
}
|
||||
|
||||
func parseSparseFloatRowVector(str string) ([]byte, uint32, error) {
|
||||
rowVec, err := typeutil.CreateSparseFloatRowFromJSON([]byte(str))
|
||||
if err != nil {
|
||||
@ -1045,6 +1131,80 @@ func ReadSparseFloatVectorData(pcr *FieldReader, count int64) (any, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
func ReadNullableSparseFloatVectorData(pcr *FieldReader, count int64) (any, []bool, error) {
|
||||
if pcr.sparseIsString {
|
||||
data, validData, err := ReadNullableStringData(pcr, count)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if data == nil {
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
// Sparse storage: only store valid rows' data
|
||||
byteArr := make([][]byte, 0, count)
|
||||
maxDim := uint32(0)
|
||||
|
||||
for i, str := range data.([]string) {
|
||||
if validData[i] {
|
||||
rowVec, rowMaxIdx, err := parseSparseFloatRowVector(str)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
byteArr = append(byteArr, rowVec)
|
||||
if rowMaxIdx > maxDim {
|
||||
maxDim = rowMaxIdx
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return &storage.SparseFloatVectorFieldData{
|
||||
SparseFloatArray: schemapb.SparseFloatArray{
|
||||
Dim: int64(maxDim),
|
||||
Contents: byteArr,
|
||||
},
|
||||
ValidData: validData,
|
||||
Nullable: true,
|
||||
}, validData, nil
|
||||
}
|
||||
|
||||
data, validData, err := ReadNullableStructData(pcr, count)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if data == nil {
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
// Sparse storage: only store valid rows' data
|
||||
byteArr := make([][]byte, 0, count)
|
||||
maxDim := uint32(0)
|
||||
|
||||
for i, structData := range data {
|
||||
if validData[i] {
|
||||
singleByteArr, singleMaxDim, err := parseSparseFloatVectorStructs([]map[string]arrow.Array{structData})
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if len(singleByteArr) > 0 {
|
||||
byteArr = append(byteArr, singleByteArr[0])
|
||||
if singleMaxDim > maxDim {
|
||||
maxDim = singleMaxDim
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return &storage.SparseFloatVectorFieldData{
|
||||
SparseFloatArray: schemapb.SparseFloatArray{
|
||||
Dim: int64(maxDim),
|
||||
Contents: byteArr,
|
||||
},
|
||||
ValidData: validData,
|
||||
Nullable: true,
|
||||
}, validData, nil
|
||||
}
|
||||
|
||||
func checkVectorAlignWithDim(offsets []int32, dim int32) error {
|
||||
for i := 1; i < len(offsets); i++ {
|
||||
if offsets[i]-offsets[i-1] != dim {
|
||||
@ -1054,6 +1214,16 @@ func checkVectorAlignWithDim(offsets []int32, dim int32) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func checkNullableVectorAlignWithDim(offsets []int32, listReader *array.List, dim int32) error {
|
||||
for i := 1; i < len(offsets); i++ {
|
||||
length := offsets[i] - offsets[i-1]
|
||||
if !listReader.IsNull(i-1) && length != dim {
|
||||
return fmt.Errorf("expected %d but got %d", dim, length)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func checkVectorAligned(offsets []int32, dim int, dataType schemapb.DataType) error {
|
||||
if len(offsets) < 1 {
|
||||
return errors.New("empty offsets")
|
||||
@ -1075,6 +1245,26 @@ func checkVectorAligned(offsets []int32, dim int, dataType schemapb.DataType) er
|
||||
}
|
||||
}
|
||||
|
||||
func checkNullableVectorAligned(offsets []int32, listReader *array.List, dim int, dataType schemapb.DataType) error {
|
||||
if len(offsets) < 1 {
|
||||
return errors.New("empty offsets")
|
||||
}
|
||||
switch dataType {
|
||||
case schemapb.DataType_BinaryVector:
|
||||
return checkNullableVectorAlignWithDim(offsets, listReader, int32(dim/8))
|
||||
case schemapb.DataType_FloatVector:
|
||||
return checkNullableVectorAlignWithDim(offsets, listReader, int32(dim))
|
||||
case schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector:
|
||||
return checkNullableVectorAlignWithDim(offsets, listReader, int32(dim*2))
|
||||
case schemapb.DataType_SparseFloatVector:
|
||||
return nil
|
||||
case schemapb.DataType_Int8Vector:
|
||||
return checkNullableVectorAlignWithDim(offsets, listReader, int32(dim))
|
||||
default:
|
||||
return fmt.Errorf("unexpected vector data type %s", dataType.String())
|
||||
}
|
||||
}
|
||||
|
||||
func getArrayData[T any](offsets []int32, getElement func(int) (T, error), outputArray func(arr []T, valid bool)) error {
|
||||
for i := 1; i < len(offsets); i++ {
|
||||
start, end := offsets[i-1], offsets[i]
|
||||
@ -1092,6 +1282,24 @@ func getArrayData[T any](offsets []int32, getElement func(int) (T, error), outpu
|
||||
return nil
|
||||
}
|
||||
|
||||
func getArrayDataNullable[T any](offsets []int32, listReader *array.List, getElement func(int) (T, error), outputArray func(arr []T, valid bool)) error {
|
||||
for i := 1; i < len(offsets); i++ {
|
||||
isValid := !listReader.IsNull(i - 1)
|
||||
|
||||
start, end := offsets[i-1], offsets[i]
|
||||
arrData := make([]T, 0, end-start)
|
||||
for j := start; j < end; j++ {
|
||||
elementVal, err := getElement(int(j))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
arrData = append(arrData, elementVal)
|
||||
}
|
||||
outputArray(arrData, isValid)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func ReadBoolArrayData(pcr *FieldReader, count int64) (any, error) {
|
||||
chunked, err := pcr.columnReader.NextBatch(count)
|
||||
if err != nil {
|
||||
@ -1430,6 +1638,108 @@ func ReadNullableIntegerOrFloatArrayData[T constraints.Integer | constraints.Flo
|
||||
return data, validData, nil
|
||||
}
|
||||
|
||||
func ReadNullableFloatVectorData(pcr *FieldReader, count int64) (any, []bool, error) {
|
||||
chunked, err := pcr.columnReader.NextBatch(count)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
data := make([]float32, 0, int(count)*pcr.dim)
|
||||
validData := make([]bool, 0, count)
|
||||
|
||||
for _, chunk := range chunked.Chunks() {
|
||||
listReader, ok := chunk.(*array.List)
|
||||
if !ok {
|
||||
// the chunk type may be *array.Null if the data in chunk is all null
|
||||
_, ok := chunk.(*array.Null)
|
||||
if !ok {
|
||||
return nil, nil, WrapTypeErr(pcr.field, chunk.DataType().Name())
|
||||
}
|
||||
dataNums := chunk.Data().Len()
|
||||
validData = append(validData, make([]bool, dataNums)...)
|
||||
continue
|
||||
}
|
||||
|
||||
dataType := pcr.field.GetDataType()
|
||||
offsets := listReader.Offsets()
|
||||
if err = checkNullableVectorAligned(offsets, listReader, pcr.dim, dataType); err != nil {
|
||||
return nil, nil, merr.WrapErrImportFailed(fmt.Sprintf("length of vector is not aligned: %s, data type: %s", err.Error(), dataType.String()))
|
||||
}
|
||||
|
||||
valueReader := listReader.ListValues()
|
||||
rows := listReader.Len()
|
||||
|
||||
// Sparse storage: only store valid rows' data
|
||||
float32Reader, ok := valueReader.(*array.Float32)
|
||||
if !ok {
|
||||
return nil, nil, WrapTypeErr(pcr.field, valueReader.DataType().Name())
|
||||
}
|
||||
for i := 0; i < rows; i++ {
|
||||
validData = append(validData, !listReader.IsNull(i))
|
||||
if !listReader.IsNull(i) {
|
||||
start, end := offsets[i], offsets[i+1]
|
||||
for j := start; j < end; j++ {
|
||||
data = append(data, float32Reader.Value(int(j)))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(data) == 0 && len(validData) == 0 {
|
||||
return nil, nil, nil
|
||||
}
|
||||
return data, validData, typeutil.VerifyFloats32(data)
|
||||
}
|
||||
|
||||
func ReadNullableInt8VectorData(pcr *FieldReader, count int64) (any, []bool, error) {
|
||||
chunked, err := pcr.columnReader.NextBatch(count)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
data := make([]int8, 0, int(count)*pcr.dim)
|
||||
validData := make([]bool, 0, count)
|
||||
|
||||
for _, chunk := range chunked.Chunks() {
|
||||
listReader, ok := chunk.(*array.List)
|
||||
if !ok {
|
||||
// the chunk type may be *array.Null if the data in chunk is all null
|
||||
_, ok := chunk.(*array.Null)
|
||||
if !ok {
|
||||
return nil, nil, WrapTypeErr(pcr.field, chunk.DataType().Name())
|
||||
}
|
||||
dataNums := chunk.Data().Len()
|
||||
validData = append(validData, make([]bool, dataNums)...)
|
||||
continue
|
||||
}
|
||||
|
||||
dataType := pcr.field.GetDataType()
|
||||
offsets := listReader.Offsets()
|
||||
if err = checkNullableVectorAligned(offsets, listReader, pcr.dim, dataType); err != nil {
|
||||
return nil, nil, merr.WrapErrImportFailed(fmt.Sprintf("length of vector is not aligned: %s, data type: %s", err.Error(), dataType.String()))
|
||||
}
|
||||
|
||||
valueReader := listReader.ListValues()
|
||||
rows := listReader.Len()
|
||||
|
||||
// Sparse storage: only store valid rows' data
|
||||
int8Reader, ok := valueReader.(*array.Int8)
|
||||
if !ok {
|
||||
return nil, nil, WrapTypeErr(pcr.field, valueReader.DataType().Name())
|
||||
}
|
||||
for i := 0; i < rows; i++ {
|
||||
validData = append(validData, !listReader.IsNull(i))
|
||||
if !listReader.IsNull(i) {
|
||||
start, end := offsets[i], offsets[i+1]
|
||||
for j := start; j < end; j++ {
|
||||
data = append(data, int8Reader.Value(int(j)))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(data) == 0 && len(validData) == 0 {
|
||||
return nil, nil, nil
|
||||
}
|
||||
return data, validData, nil
|
||||
}
|
||||
|
||||
func ReadStringArrayData(pcr *FieldReader, count int64) (any, error) {
|
||||
maxLength, err := parameterutil.GetMaxLength(pcr.field)
|
||||
if err != nil {
|
||||
|
||||
@ -345,7 +345,7 @@ func TestParseSparseFloatVectorStructs(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestReadFieldData(t *testing.T) {
|
||||
checkFunc := func(dataHasNull bool, readScehamIsNullable bool, dataType schemapb.DataType, elementType schemapb.DataType) {
|
||||
checkFunc := func(t *testing.T, nullPercent int, readScehamIsNullable bool, dataType schemapb.DataType, elementType schemapb.DataType) {
|
||||
fieldName := dataType.String()
|
||||
if elementType != schemapb.DataType_None {
|
||||
fieldName = fieldName + "_" + elementType.String()
|
||||
@ -357,7 +357,7 @@ func TestReadFieldData(t *testing.T) {
|
||||
Name: fieldName,
|
||||
DataType: dataType,
|
||||
ElementType: elementType,
|
||||
Nullable: dataHasNull,
|
||||
Nullable: nullPercent != 0,
|
||||
TypeParams: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: "dim",
|
||||
@ -397,10 +397,6 @@ func TestReadFieldData(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
|
||||
rowCount := 5
|
||||
nullPercent := 0
|
||||
if dataHasNull {
|
||||
nullPercent = 50
|
||||
}
|
||||
insertData, err := testutil.CreateInsertData(schema, rowCount, nullPercent)
|
||||
assert.NoError(t, err)
|
||||
columns, err := testutil.BuildArrayData(schema, insertData, false)
|
||||
@ -423,7 +419,7 @@ func TestReadFieldData(t *testing.T) {
|
||||
defer reader.Close()
|
||||
|
||||
_, err = reader.Read()
|
||||
if !readScehamIsNullable && dataHasNull {
|
||||
if !readScehamIsNullable && nullPercent != 0 {
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
assert.NoError(t, err)
|
||||
@ -432,17 +428,17 @@ func TestReadFieldData(t *testing.T) {
|
||||
|
||||
type testCase struct {
|
||||
name string
|
||||
dataHasNull bool
|
||||
nullPercent int
|
||||
readScehamIsNullable bool
|
||||
dataType schemapb.DataType
|
||||
elementType schemapb.DataType
|
||||
}
|
||||
buildCaseFunc := func(dataHasNull bool, readScehamIsNullable bool, dataType schemapb.DataType, elementType schemapb.DataType) *testCase {
|
||||
name := fmt.Sprintf("dataHasNull='%v' schemaNullable='%v' dataType='%s' elementType='%s'",
|
||||
dataHasNull, readScehamIsNullable, dataType, elementType)
|
||||
buildCaseFunc := func(nullPercent int, readScehamIsNullable bool, dataType schemapb.DataType, elementType schemapb.DataType) *testCase {
|
||||
name := fmt.Sprintf("nullPercent='%v' schemaNullable='%v' dataType='%s' elementType='%s'",
|
||||
nullPercent, readScehamIsNullable, dataType, elementType)
|
||||
return &testCase{
|
||||
name: name,
|
||||
dataHasNull: dataHasNull,
|
||||
nullPercent: nullPercent,
|
||||
readScehamIsNullable: readScehamIsNullable,
|
||||
dataType: dataType,
|
||||
elementType: elementType,
|
||||
@ -459,12 +455,19 @@ func TestReadFieldData(t *testing.T) {
|
||||
schemapb.DataType_Float,
|
||||
schemapb.DataType_Double,
|
||||
schemapb.DataType_VarChar,
|
||||
schemapb.DataType_FloatVector,
|
||||
schemapb.DataType_BinaryVector,
|
||||
schemapb.DataType_SparseFloatVector,
|
||||
schemapb.DataType_Float16Vector,
|
||||
schemapb.DataType_BFloat16Vector,
|
||||
schemapb.DataType_Int8Vector,
|
||||
}
|
||||
for _, dataType := range nullableDataTypes {
|
||||
cases = append(cases, buildCaseFunc(true, true, dataType, schemapb.DataType_None))
|
||||
cases = append(cases, buildCaseFunc(true, false, dataType, schemapb.DataType_None))
|
||||
cases = append(cases, buildCaseFunc(false, true, dataType, schemapb.DataType_None))
|
||||
cases = append(cases, buildCaseFunc(false, true, dataType, schemapb.DataType_None))
|
||||
for _, nullPercent := range []int{0, 50} {
|
||||
for _, readScehamIsNullable := range []bool{true, false} {
|
||||
cases = append(cases, buildCaseFunc(nullPercent, readScehamIsNullable, dataType, schemapb.DataType_None))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
elementTypes := []schemapb.DataType{
|
||||
@ -478,28 +481,23 @@ func TestReadFieldData(t *testing.T) {
|
||||
schemapb.DataType_VarChar,
|
||||
}
|
||||
for _, elementType := range elementTypes {
|
||||
cases = append(cases, buildCaseFunc(true, true, schemapb.DataType_Array, elementType))
|
||||
cases = append(cases, buildCaseFunc(true, false, schemapb.DataType_Array, elementType))
|
||||
cases = append(cases, buildCaseFunc(false, true, schemapb.DataType_Array, elementType))
|
||||
cases = append(cases, buildCaseFunc(false, false, schemapb.DataType_Array, elementType))
|
||||
for _, nullPercent := range []int{0, 50} {
|
||||
for _, readScehamIsNullable := range []bool{true, false} {
|
||||
cases = append(cases, buildCaseFunc(nullPercent, readScehamIsNullable, schemapb.DataType_Array, elementType))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
notNullableTypes := []schemapb.DataType{
|
||||
schemapb.DataType_JSON,
|
||||
schemapb.DataType_FloatVector,
|
||||
schemapb.DataType_BinaryVector,
|
||||
schemapb.DataType_SparseFloatVector,
|
||||
schemapb.DataType_Float16Vector,
|
||||
schemapb.DataType_BFloat16Vector,
|
||||
schemapb.DataType_Int8Vector,
|
||||
}
|
||||
for _, dataType := range notNullableTypes {
|
||||
cases = append(cases, buildCaseFunc(false, false, dataType, schemapb.DataType_None))
|
||||
cases = append(cases, buildCaseFunc(0, false, dataType, schemapb.DataType_None))
|
||||
}
|
||||
|
||||
for _, tt := range cases {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
checkFunc(tt.dataHasNull, tt.readScehamIsNullable, tt.dataType, tt.elementType)
|
||||
checkFunc(t, tt.nullPercent, tt.readScehamIsNullable, tt.dataType, tt.elementType)
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -659,8 +657,7 @@ func TestTypeMismatch(t *testing.T) {
|
||||
cases = append(cases, buildCaseFunc(schemapb.DataType_Bool, schemapb.DataType_None, schemapb.DataType_Array, elementType, false))
|
||||
}
|
||||
|
||||
notNullableTypes := []schemapb.DataType{
|
||||
schemapb.DataType_JSON,
|
||||
vectorTypes := []schemapb.DataType{
|
||||
schemapb.DataType_FloatVector,
|
||||
schemapb.DataType_BinaryVector,
|
||||
schemapb.DataType_SparseFloatVector,
|
||||
@ -668,6 +665,18 @@ func TestTypeMismatch(t *testing.T) {
|
||||
schemapb.DataType_BFloat16Vector,
|
||||
schemapb.DataType_Int8Vector,
|
||||
}
|
||||
for _, dataType := range vectorTypes {
|
||||
srcDataType := schemapb.DataType_Bool
|
||||
cases = append(cases, buildCaseFunc(srcDataType, schemapb.DataType_None, dataType, schemapb.DataType_None, true))
|
||||
cases = append(cases, buildCaseFunc(srcDataType, schemapb.DataType_None, dataType, schemapb.DataType_None, false))
|
||||
|
||||
cases = append(cases, buildCaseFunc(schemapb.DataType_Array, schemapb.DataType_Bool, dataType, schemapb.DataType_None, true))
|
||||
cases = append(cases, buildCaseFunc(schemapb.DataType_Array, schemapb.DataType_Bool, dataType, schemapb.DataType_None, false))
|
||||
}
|
||||
|
||||
notNullableTypes := []schemapb.DataType{
|
||||
schemapb.DataType_JSON,
|
||||
}
|
||||
for _, dataType := range notNullableTypes {
|
||||
srcDataType := schemapb.DataType_Bool
|
||||
if dataType == schemapb.DataType_Bool {
|
||||
|
||||
@ -450,8 +450,10 @@ func (s *ReaderSuite) runWithSparseVector(indicesType arrow.DataType, valuesType
|
||||
builder.AppendValues(int64Data, validData)
|
||||
arrowColumns = append(arrowColumns, builder.NewInt64Array())
|
||||
|
||||
contents := insertData.Data[schema.Fields[1].FieldID].(*storage.SparseFloatVectorFieldData).GetContents()
|
||||
arr, err := testutil.BuildSparseVectorData(mem, contents, arrowFields[1].Type)
|
||||
sparseFieldData := insertData.Data[schema.Fields[1].FieldID].(*storage.SparseFloatVectorFieldData)
|
||||
contents := sparseFieldData.GetContents()
|
||||
sparseValidData := sparseFieldData.ValidData
|
||||
arr, err := testutil.BuildSparseVectorData(mem, contents, arrowFields[1].Type, sparseValidData)
|
||||
assert.NoError(s.T(), err)
|
||||
arrowColumns = append(arrowColumns, arr)
|
||||
|
||||
@ -545,65 +547,34 @@ func (s *ReaderSuite) TestReadScalarFieldsWithDefaultValue() {
|
||||
}
|
||||
|
||||
func (s *ReaderSuite) TestReadScalarFields() {
|
||||
s.run(schemapb.DataType_Bool, schemapb.DataType_None, false, 0)
|
||||
s.run(schemapb.DataType_Int8, schemapb.DataType_None, false, 0)
|
||||
s.run(schemapb.DataType_Int16, schemapb.DataType_None, false, 0)
|
||||
s.run(schemapb.DataType_Int32, schemapb.DataType_None, false, 0)
|
||||
s.run(schemapb.DataType_Int64, schemapb.DataType_None, false, 0)
|
||||
s.run(schemapb.DataType_Float, schemapb.DataType_None, false, 0)
|
||||
s.run(schemapb.DataType_Double, schemapb.DataType_None, false, 0)
|
||||
s.run(schemapb.DataType_String, schemapb.DataType_None, false, 0)
|
||||
s.run(schemapb.DataType_VarChar, schemapb.DataType_None, false, 0)
|
||||
s.run(schemapb.DataType_JSON, schemapb.DataType_None, false, 0)
|
||||
s.run(schemapb.DataType_Geometry, schemapb.DataType_None, false, 0)
|
||||
elementTypes := []schemapb.DataType{
|
||||
schemapb.DataType_Bool,
|
||||
schemapb.DataType_Int8,
|
||||
schemapb.DataType_Int16,
|
||||
schemapb.DataType_Int32,
|
||||
schemapb.DataType_Int64,
|
||||
schemapb.DataType_Float,
|
||||
schemapb.DataType_Double,
|
||||
schemapb.DataType_String,
|
||||
}
|
||||
|
||||
s.run(schemapb.DataType_Array, schemapb.DataType_Bool, false, 0)
|
||||
s.run(schemapb.DataType_Array, schemapb.DataType_Int8, false, 0)
|
||||
s.run(schemapb.DataType_Array, schemapb.DataType_Int16, false, 0)
|
||||
s.run(schemapb.DataType_Array, schemapb.DataType_Int32, false, 0)
|
||||
s.run(schemapb.DataType_Array, schemapb.DataType_Int64, false, 0)
|
||||
s.run(schemapb.DataType_Array, schemapb.DataType_Float, false, 0)
|
||||
s.run(schemapb.DataType_Array, schemapb.DataType_Double, false, 0)
|
||||
s.run(schemapb.DataType_Array, schemapb.DataType_String, false, 0)
|
||||
scalarTypes := append(elementTypes, []schemapb.DataType{schemapb.DataType_VarChar, schemapb.DataType_JSON, schemapb.DataType_Geometry, schemapb.DataType_Array}...)
|
||||
|
||||
s.run(schemapb.DataType_Bool, schemapb.DataType_None, true, 50)
|
||||
s.run(schemapb.DataType_Int8, schemapb.DataType_None, true, 50)
|
||||
s.run(schemapb.DataType_Int16, schemapb.DataType_None, true, 50)
|
||||
s.run(schemapb.DataType_Int32, schemapb.DataType_None, true, 50)
|
||||
s.run(schemapb.DataType_Int64, schemapb.DataType_None, true, 50)
|
||||
s.run(schemapb.DataType_Float, schemapb.DataType_None, true, 50)
|
||||
s.run(schemapb.DataType_String, schemapb.DataType_None, true, 50)
|
||||
s.run(schemapb.DataType_VarChar, schemapb.DataType_None, true, 50)
|
||||
s.run(schemapb.DataType_JSON, schemapb.DataType_None, true, 50)
|
||||
|
||||
s.run(schemapb.DataType_Array, schemapb.DataType_Bool, true, 50)
|
||||
s.run(schemapb.DataType_Array, schemapb.DataType_Int8, true, 50)
|
||||
s.run(schemapb.DataType_Array, schemapb.DataType_Int16, true, 50)
|
||||
s.run(schemapb.DataType_Array, schemapb.DataType_Int32, true, 50)
|
||||
s.run(schemapb.DataType_Array, schemapb.DataType_Int64, true, 50)
|
||||
s.run(schemapb.DataType_Array, schemapb.DataType_Float, true, 50)
|
||||
s.run(schemapb.DataType_Array, schemapb.DataType_Double, true, 50)
|
||||
s.run(schemapb.DataType_Array, schemapb.DataType_String, true, 50)
|
||||
|
||||
s.run(schemapb.DataType_Bool, schemapb.DataType_None, true, 100)
|
||||
s.run(schemapb.DataType_Int8, schemapb.DataType_None, true, 100)
|
||||
s.run(schemapb.DataType_Int16, schemapb.DataType_None, true, 100)
|
||||
s.run(schemapb.DataType_Int32, schemapb.DataType_None, true, 100)
|
||||
s.run(schemapb.DataType_Int64, schemapb.DataType_None, true, 100)
|
||||
s.run(schemapb.DataType_Float, schemapb.DataType_None, true, 100)
|
||||
s.run(schemapb.DataType_String, schemapb.DataType_None, true, 100)
|
||||
s.run(schemapb.DataType_VarChar, schemapb.DataType_None, true, 100)
|
||||
s.run(schemapb.DataType_JSON, schemapb.DataType_None, true, 100)
|
||||
s.run(schemapb.DataType_Geometry, schemapb.DataType_None, true, 100)
|
||||
|
||||
s.run(schemapb.DataType_Array, schemapb.DataType_Bool, true, 100)
|
||||
s.run(schemapb.DataType_Array, schemapb.DataType_Int8, true, 100)
|
||||
s.run(schemapb.DataType_Array, schemapb.DataType_Int16, true, 100)
|
||||
s.run(schemapb.DataType_Array, schemapb.DataType_Int32, true, 100)
|
||||
s.run(schemapb.DataType_Array, schemapb.DataType_Int64, true, 100)
|
||||
s.run(schemapb.DataType_Array, schemapb.DataType_Float, true, 100)
|
||||
s.run(schemapb.DataType_Array, schemapb.DataType_Double, true, 100)
|
||||
s.run(schemapb.DataType_Array, schemapb.DataType_String, true, 100)
|
||||
for _, dataType := range scalarTypes {
|
||||
if dataType == schemapb.DataType_Array {
|
||||
for _, elementType := range elementTypes {
|
||||
s.run(dataType, elementType, false, 0)
|
||||
for _, nullPercent := range []int{0, 50, 100} {
|
||||
s.run(dataType, elementType, true, nullPercent)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
s.run(dataType, schemapb.DataType_None, false, 0)
|
||||
for _, nullPercent := range []int{0, 50, 100} {
|
||||
s.run(dataType, schemapb.DataType_None, true, nullPercent)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
s.failRun(schemapb.DataType_JSON, true)
|
||||
}
|
||||
@ -611,24 +582,28 @@ func (s *ReaderSuite) TestReadScalarFields() {
|
||||
func (s *ReaderSuite) TestStringPK() {
|
||||
s.pkDataType = schemapb.DataType_VarChar
|
||||
s.run(schemapb.DataType_Int32, schemapb.DataType_None, false, 0)
|
||||
s.run(schemapb.DataType_Int32, schemapb.DataType_None, true, 50)
|
||||
s.run(schemapb.DataType_Int32, schemapb.DataType_None, true, 100)
|
||||
for _, nullPercent := range []int{0, 50, 100} {
|
||||
s.run(schemapb.DataType_Int32, schemapb.DataType_None, true, nullPercent)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *ReaderSuite) TestVector() {
|
||||
s.vecDataType = schemapb.DataType_BinaryVector
|
||||
s.run(schemapb.DataType_Int32, schemapb.DataType_None, false, 0)
|
||||
s.vecDataType = schemapb.DataType_FloatVector
|
||||
s.run(schemapb.DataType_Int32, schemapb.DataType_None, false, 0)
|
||||
s.vecDataType = schemapb.DataType_Float16Vector
|
||||
s.run(schemapb.DataType_Int32, schemapb.DataType_None, false, 0)
|
||||
s.vecDataType = schemapb.DataType_BFloat16Vector
|
||||
s.run(schemapb.DataType_Int32, schemapb.DataType_None, false, 0)
|
||||
// this test case only test parsing sparse vector from JSON-format string
|
||||
s.vecDataType = schemapb.DataType_SparseFloatVector
|
||||
s.run(schemapb.DataType_Int32, schemapb.DataType_None, false, 0)
|
||||
s.vecDataType = schemapb.DataType_Int8Vector
|
||||
s.run(schemapb.DataType_Int32, schemapb.DataType_None, false, 0)
|
||||
dataTypes := []schemapb.DataType{
|
||||
schemapb.DataType_BinaryVector,
|
||||
schemapb.DataType_FloatVector,
|
||||
schemapb.DataType_Float16Vector,
|
||||
schemapb.DataType_BFloat16Vector,
|
||||
schemapb.DataType_SparseFloatVector,
|
||||
schemapb.DataType_Int8Vector,
|
||||
}
|
||||
|
||||
for _, dataType := range dataTypes {
|
||||
s.vecDataType = dataType
|
||||
s.run(schemapb.DataType_Int32, schemapb.DataType_None, false, 0)
|
||||
for _, nullPercent := range []int{0, 50, 100} {
|
||||
s.run(schemapb.DataType_Int32, schemapb.DataType_None, true, nullPercent)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *ReaderSuite) TestSparseVector() {
|
||||
|
||||
@ -110,11 +110,76 @@ func CreateInsertData(schema *schemapb.CollectionSchema, rows int, nullPercent .
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Pre-generate validData for nullable fields to determine sparse storage size
|
||||
validDataMap := make(map[int64][]bool)
|
||||
allFields := typeutil.GetAllFieldSchemas(schema)
|
||||
for _, f := range allFields {
|
||||
if f.GetAutoID() || f.IsFunctionOutput {
|
||||
continue
|
||||
}
|
||||
if f.GetNullable() {
|
||||
if len(nullPercent) > 1 {
|
||||
return nil, merr.WrapErrParameterInvalidMsg("the length of nullPercent is wrong")
|
||||
}
|
||||
var validData []bool
|
||||
if len(nullPercent) == 0 || nullPercent[0] == 50 {
|
||||
validData = testutils.GenerateBoolArray(rows)
|
||||
} else if len(nullPercent) == 1 && nullPercent[0] == 100 {
|
||||
validData = make([]bool, rows)
|
||||
} else if len(nullPercent) == 1 && nullPercent[0] == 0 {
|
||||
validData = make([]bool, rows)
|
||||
for i := range validData {
|
||||
validData[i] = true
|
||||
}
|
||||
} else {
|
||||
return nil, merr.WrapErrParameterInvalidMsg(fmt.Sprintf("not support the number of nullPercent(%d)", nullPercent))
|
||||
}
|
||||
validDataMap[f.FieldID] = validData
|
||||
}
|
||||
}
|
||||
|
||||
// Helper function to check if a type is a vector type (uses sparse storage)
|
||||
isVectorType := func(dataType schemapb.DataType) bool {
|
||||
switch dataType {
|
||||
case schemapb.DataType_BinaryVector,
|
||||
schemapb.DataType_FloatVector,
|
||||
schemapb.DataType_Float16Vector,
|
||||
schemapb.DataType_BFloat16Vector,
|
||||
schemapb.DataType_SparseFloatVector,
|
||||
schemapb.DataType_Int8Vector:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// Helper function to count valid rows
|
||||
countValidRows := func(validData []bool) int {
|
||||
if len(validData) == 0 {
|
||||
return rows
|
||||
}
|
||||
count := 0
|
||||
for _, v := range validData {
|
||||
if v {
|
||||
count++
|
||||
}
|
||||
}
|
||||
return count
|
||||
}
|
||||
|
||||
for _, f := range allFields {
|
||||
if f.GetAutoID() || f.IsFunctionOutput {
|
||||
continue
|
||||
}
|
||||
validData := validDataMap[f.FieldID]
|
||||
// Vector types use sparse storage (only valid rows), scalar types use dense storage (all rows)
|
||||
var dataRows int
|
||||
if isVectorType(f.GetDataType()) {
|
||||
dataRows = countValidRows(validData)
|
||||
} else {
|
||||
dataRows = rows
|
||||
}
|
||||
|
||||
switch f.GetDataType() {
|
||||
case schemapb.DataType_Bool:
|
||||
insertData.Data[f.FieldID].AppendDataRows(testutils.GenerateBoolArray(rows))
|
||||
@ -135,54 +200,47 @@ func CreateInsertData(schema *schemapb.CollectionSchema, rows int, nullPercent .
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
insertData.Data[f.FieldID] = &storage.BinaryVectorFieldData{
|
||||
Data: testutils.GenerateBinaryVectors(rows, int(dim)),
|
||||
Dim: int(dim),
|
||||
}
|
||||
// For nullable vectors, use sparse storage (only generate valid rows)
|
||||
insertData.Data[f.FieldID].(*storage.BinaryVectorFieldData).Data = testutils.GenerateBinaryVectors(dataRows, int(dim))
|
||||
insertData.Data[f.FieldID].(*storage.BinaryVectorFieldData).Dim = int(dim)
|
||||
case schemapb.DataType_FloatVector:
|
||||
dim, err := typeutil.GetDim(f)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
insertData.Data[f.GetFieldID()] = &storage.FloatVectorFieldData{
|
||||
Data: testutils.GenerateFloatVectors(rows, int(dim)),
|
||||
Dim: int(dim),
|
||||
}
|
||||
// For nullable vectors, use sparse storage (only generate valid rows)
|
||||
insertData.Data[f.GetFieldID()].(*storage.FloatVectorFieldData).Data = testutils.GenerateFloatVectors(dataRows, int(dim))
|
||||
insertData.Data[f.GetFieldID()].(*storage.FloatVectorFieldData).Dim = int(dim)
|
||||
case schemapb.DataType_Float16Vector:
|
||||
dim, err := typeutil.GetDim(f)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
insertData.Data[f.FieldID] = &storage.Float16VectorFieldData{
|
||||
Data: testutils.GenerateFloat16Vectors(rows, int(dim)),
|
||||
Dim: int(dim),
|
||||
}
|
||||
// For nullable vectors, use sparse storage (only generate valid rows)
|
||||
insertData.Data[f.FieldID].(*storage.Float16VectorFieldData).Data = testutils.GenerateFloat16Vectors(dataRows, int(dim))
|
||||
insertData.Data[f.FieldID].(*storage.Float16VectorFieldData).Dim = int(dim)
|
||||
case schemapb.DataType_BFloat16Vector:
|
||||
dim, err := typeutil.GetDim(f)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
insertData.Data[f.FieldID] = &storage.BFloat16VectorFieldData{
|
||||
Data: testutils.GenerateBFloat16Vectors(rows, int(dim)),
|
||||
Dim: int(dim),
|
||||
}
|
||||
// For nullable vectors, use sparse storage (only generate valid rows)
|
||||
insertData.Data[f.FieldID].(*storage.BFloat16VectorFieldData).Data = testutils.GenerateBFloat16Vectors(dataRows, int(dim))
|
||||
insertData.Data[f.FieldID].(*storage.BFloat16VectorFieldData).Dim = int(dim)
|
||||
case schemapb.DataType_SparseFloatVector:
|
||||
data, dim := testutils.GenerateSparseFloatVectorsData(rows)
|
||||
insertData.Data[f.FieldID] = &storage.SparseFloatVectorFieldData{
|
||||
SparseFloatArray: schemapb.SparseFloatArray{
|
||||
Contents: data,
|
||||
Dim: dim,
|
||||
},
|
||||
}
|
||||
// For nullable vectors, use sparse storage (only generate valid rows)
|
||||
data, dim := testutils.GenerateSparseFloatVectorsData(dataRows)
|
||||
sparseData := insertData.Data[f.FieldID].(*storage.SparseFloatVectorFieldData)
|
||||
sparseData.Contents = data
|
||||
sparseData.Dim = dim
|
||||
case schemapb.DataType_Int8Vector:
|
||||
dim, err := typeutil.GetDim(f)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
insertData.Data[f.FieldID] = &storage.Int8VectorFieldData{
|
||||
Data: testutils.GenerateInt8Vectors(rows, int(dim)),
|
||||
Dim: int(dim),
|
||||
}
|
||||
// For nullable vectors, use sparse storage (only generate valid rows)
|
||||
insertData.Data[f.FieldID].(*storage.Int8VectorFieldData).Data = testutils.GenerateInt8Vectors(dataRows, int(dim))
|
||||
insertData.Data[f.FieldID].(*storage.Int8VectorFieldData).Dim = int(dim)
|
||||
case schemapb.DataType_String, schemapb.DataType_VarChar:
|
||||
insertData.Data[f.FieldID].AppendDataRows(testutils.GenerateStringArray(rows))
|
||||
case schemapb.DataType_JSON:
|
||||
@ -220,23 +278,10 @@ func CreateInsertData(schema *schemapb.CollectionSchema, rows int, nullPercent .
|
||||
default:
|
||||
panic(fmt.Sprintf("unsupported data type: %s", f.GetDataType().String()))
|
||||
}
|
||||
// Apply pre-generated validData for nullable fields
|
||||
if f.GetNullable() {
|
||||
if len(nullPercent) > 1 {
|
||||
return nil, merr.WrapErrParameterInvalidMsg("the length of nullPercent is wrong")
|
||||
}
|
||||
if len(nullPercent) == 0 || nullPercent[0] == 50 {
|
||||
insertData.Data[f.FieldID].AppendValidDataRows(testutils.GenerateBoolArray(rows))
|
||||
} else if len(nullPercent) == 1 && nullPercent[0] == 100 {
|
||||
insertData.Data[f.FieldID].AppendValidDataRows(make([]bool, rows))
|
||||
} else if len(nullPercent) == 1 && nullPercent[0] == 0 {
|
||||
validData := make([]bool, rows)
|
||||
for i := range validData {
|
||||
validData[i] = true
|
||||
}
|
||||
insertData.Data[f.FieldID].AppendValidDataRows(validData)
|
||||
} else {
|
||||
return nil, merr.WrapErrParameterInvalidMsg(fmt.Sprintf("not support the number of nullPercent(%d)", nullPercent))
|
||||
}
|
||||
validData := validDataMap[f.FieldID]
|
||||
insertData.Data[f.FieldID].AppendValidDataRows(validData)
|
||||
}
|
||||
}
|
||||
return insertData, nil
|
||||
@ -304,25 +349,34 @@ func CreateFieldWithDefaultValue(dataType schemapb.DataType, id int64, nullable
|
||||
return field, nil
|
||||
}
|
||||
|
||||
func BuildSparseVectorData(mem *memory.GoAllocator, contents [][]byte, arrowType arrow.DataType) (arrow.Array, error) {
|
||||
func BuildSparseVectorData(mem *memory.GoAllocator, contents [][]byte, arrowType arrow.DataType, validData []bool) (arrow.Array, error) {
|
||||
if arrowType == nil || arrowType.ID() == arrow.STRING {
|
||||
// build sparse vector as JSON-format string
|
||||
builder := array.NewStringBuilder(mem)
|
||||
rows := len(contents)
|
||||
jsonBytesData := make([][]byte, 0)
|
||||
for i := 0; i < rows; i++ {
|
||||
rowVecData := contents[i]
|
||||
mapData := typeutil.SparseFloatBytesToMap(rowVecData)
|
||||
// convert to JSON format
|
||||
jsonBytes, err := json.Marshal(mapData)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
jsonBytesData = append(jsonBytesData, jsonBytes)
|
||||
// For sparse storage: iterate over logical rows, use physical index for contents
|
||||
var logicalRows int
|
||||
if len(validData) > 0 {
|
||||
logicalRows = len(validData)
|
||||
} else {
|
||||
logicalRows = len(contents)
|
||||
}
|
||||
physicalIdx := 0
|
||||
for i := 0; i < logicalRows; i++ {
|
||||
isValid := len(validData) == 0 || validData[i]
|
||||
if isValid {
|
||||
rowVecData := contents[physicalIdx]
|
||||
mapData := typeutil.SparseFloatBytesToMap(rowVecData)
|
||||
// convert to JSON format
|
||||
jsonBytes, err := json.Marshal(mapData)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
builder.Append(string(jsonBytes))
|
||||
physicalIdx++
|
||||
} else {
|
||||
builder.AppendNull()
|
||||
}
|
||||
}
|
||||
builder.AppendValues(lo.Map(jsonBytesData, func(bs []byte, _ int) string {
|
||||
return string(bs)
|
||||
}), nil)
|
||||
return builder.NewStringArray(), nil
|
||||
} else if arrowType.ID() == arrow.STRUCT {
|
||||
// build sparse vector as parquet struct
|
||||
@ -399,15 +453,27 @@ func BuildSparseVectorData(mem *memory.GoAllocator, contents [][]byte, arrowType
|
||||
return nil, merr.WrapErrImportFailed(msg)
|
||||
}
|
||||
|
||||
for i := 0; i < len(contents); i++ {
|
||||
builder.Append(true)
|
||||
indicesBuilder.Append(true)
|
||||
valuesBuilder.Append(true)
|
||||
rowVecData := contents[i]
|
||||
elemCount := len(rowVecData) / 8
|
||||
for j := 0; j < elemCount; j++ {
|
||||
appendIndexFunc(common.Endian.Uint32(rowVecData[j*8:]))
|
||||
appendValueFunc(math.Float32frombits(common.Endian.Uint32(rowVecData[j*8+4:])))
|
||||
// For sparse storage: iterate over logical rows, use physical index for contents
|
||||
var logicalRows int
|
||||
if len(validData) > 0 {
|
||||
logicalRows = len(validData)
|
||||
} else {
|
||||
logicalRows = len(contents)
|
||||
}
|
||||
physicalIdx := 0
|
||||
for i := 0; i < logicalRows; i++ {
|
||||
isValid := len(validData) == 0 || validData[i]
|
||||
builder.Append(isValid)
|
||||
indicesBuilder.Append(isValid)
|
||||
valuesBuilder.Append(isValid)
|
||||
if isValid {
|
||||
rowVecData := contents[physicalIdx]
|
||||
elemCount := len(rowVecData) / 8
|
||||
for j := 0; j < elemCount; j++ {
|
||||
appendIndexFunc(common.Endian.Uint32(rowVecData[j*8:]))
|
||||
appendValueFunc(math.Float32frombits(common.Endian.Uint32(rowVecData[j*8+4:])))
|
||||
}
|
||||
physicalIdx++
|
||||
}
|
||||
}
|
||||
return builder.NewStructArray(), nil
|
||||
@ -490,82 +556,183 @@ func BuildArrayData(schema *schemapb.CollectionSchema, insertData *storage.Inser
|
||||
columns = append(columns, builder.NewStringArray())
|
||||
case schemapb.DataType_BinaryVector:
|
||||
builder := array.NewListBuilder(mem, &arrow.Uint8Type{})
|
||||
valueBuilder := builder.ValueBuilder().(*array.Uint8Builder)
|
||||
dim := insertData.Data[fieldID].(*storage.BinaryVectorFieldData).Dim
|
||||
binVecData := insertData.Data[fieldID].(*storage.BinaryVectorFieldData).Data
|
||||
validData := insertData.Data[fieldID].(*storage.BinaryVectorFieldData).ValidData
|
||||
rowBytes := dim / 8
|
||||
rows := len(binVecData) / rowBytes
|
||||
offsets := make([]int32, 0, rows)
|
||||
valid := make([]bool, 0)
|
||||
for i := 0; i < rows; i++ {
|
||||
offsets = append(offsets, int32(i*rowBytes))
|
||||
valid = append(valid, true)
|
||||
// For sparse storage: logicalRows from validData, physicalRows from data
|
||||
var logicalRows int
|
||||
if len(validData) > 0 {
|
||||
logicalRows = len(validData)
|
||||
} else {
|
||||
logicalRows = len(binVecData) / rowBytes
|
||||
}
|
||||
builder.ValueBuilder().(*array.Uint8Builder).AppendValues(binVecData, nil)
|
||||
offsets := make([]int32, 0, logicalRows+1)
|
||||
valid := make([]bool, 0, logicalRows)
|
||||
currOffset := int32(0)
|
||||
physicalIdx := 0 // Track physical index in sparse data
|
||||
for i := 0; i < logicalRows; i++ {
|
||||
offsets = append(offsets, currOffset)
|
||||
if len(validData) > 0 && !validData[i] {
|
||||
valid = append(valid, false)
|
||||
} else {
|
||||
// Use physical index for sparse storage
|
||||
start := physicalIdx * rowBytes
|
||||
end := start + rowBytes
|
||||
valueBuilder.AppendValues(binVecData[start:end], nil)
|
||||
currOffset += int32(rowBytes)
|
||||
valid = append(valid, true)
|
||||
physicalIdx++ // Increment only for valid rows
|
||||
}
|
||||
}
|
||||
offsets = append(offsets, currOffset)
|
||||
builder.AppendValues(offsets, valid)
|
||||
columns = append(columns, builder.NewListArray())
|
||||
case schemapb.DataType_FloatVector:
|
||||
builder := array.NewListBuilder(mem, &arrow.Float32Type{})
|
||||
valueBuilder := builder.ValueBuilder().(*array.Float32Builder)
|
||||
dim := insertData.Data[fieldID].(*storage.FloatVectorFieldData).Dim
|
||||
floatVecData := insertData.Data[fieldID].(*storage.FloatVectorFieldData).Data
|
||||
rows := len(floatVecData) / dim
|
||||
offsets := make([]int32, 0, rows)
|
||||
valid := make([]bool, 0, rows)
|
||||
for i := 0; i < rows; i++ {
|
||||
offsets = append(offsets, int32(i*dim))
|
||||
valid = append(valid, true)
|
||||
validData := insertData.Data[fieldID].(*storage.FloatVectorFieldData).ValidData
|
||||
// For sparse storage: logicalRows from validData, physicalRows from data
|
||||
var logicalRows int
|
||||
if len(validData) > 0 {
|
||||
logicalRows = len(validData)
|
||||
} else {
|
||||
logicalRows = len(floatVecData) / dim
|
||||
}
|
||||
builder.ValueBuilder().(*array.Float32Builder).AppendValues(floatVecData, nil)
|
||||
offsets := make([]int32, 0, logicalRows+1)
|
||||
valid := make([]bool, 0, logicalRows)
|
||||
currOffset := int32(0)
|
||||
physicalIdx := 0 // Track physical index in sparse data
|
||||
for i := 0; i < logicalRows; i++ {
|
||||
offsets = append(offsets, currOffset)
|
||||
if len(validData) > 0 && !validData[i] {
|
||||
valid = append(valid, false)
|
||||
} else {
|
||||
// Use physical index for sparse storage
|
||||
start := physicalIdx * dim
|
||||
end := start + dim
|
||||
valueBuilder.AppendValues(floatVecData[start:end], nil)
|
||||
currOffset += int32(dim)
|
||||
valid = append(valid, true)
|
||||
physicalIdx++ // Increment only for valid rows
|
||||
}
|
||||
}
|
||||
offsets = append(offsets, currOffset)
|
||||
builder.AppendValues(offsets, valid)
|
||||
columns = append(columns, builder.NewListArray())
|
||||
case schemapb.DataType_Float16Vector:
|
||||
builder := array.NewListBuilder(mem, &arrow.Uint8Type{})
|
||||
valueBuilder := builder.ValueBuilder().(*array.Uint8Builder)
|
||||
dim := insertData.Data[fieldID].(*storage.Float16VectorFieldData).Dim
|
||||
float16VecData := insertData.Data[fieldID].(*storage.Float16VectorFieldData).Data
|
||||
validData := insertData.Data[fieldID].(*storage.Float16VectorFieldData).ValidData
|
||||
rowBytes := dim * 2
|
||||
rows := len(float16VecData) / rowBytes
|
||||
offsets := make([]int32, 0, rows)
|
||||
valid := make([]bool, 0, rows)
|
||||
for i := 0; i < rows; i++ {
|
||||
offsets = append(offsets, int32(i*rowBytes))
|
||||
valid = append(valid, true)
|
||||
// For sparse storage: logicalRows from validData, physicalRows from data
|
||||
var logicalRows int
|
||||
if len(validData) > 0 {
|
||||
logicalRows = len(validData)
|
||||
} else {
|
||||
logicalRows = len(float16VecData) / rowBytes
|
||||
}
|
||||
builder.ValueBuilder().(*array.Uint8Builder).AppendValues(float16VecData, nil)
|
||||
offsets := make([]int32, 0, logicalRows+1)
|
||||
valid := make([]bool, 0, logicalRows)
|
||||
currOffset := int32(0)
|
||||
physicalIdx := 0 // Track physical index in sparse data
|
||||
for i := 0; i < logicalRows; i++ {
|
||||
offsets = append(offsets, currOffset)
|
||||
if len(validData) > 0 && !validData[i] {
|
||||
valid = append(valid, false)
|
||||
} else {
|
||||
// Use physical index for sparse storage
|
||||
start := physicalIdx * rowBytes
|
||||
end := start + rowBytes
|
||||
valueBuilder.AppendValues(float16VecData[start:end], nil)
|
||||
currOffset += int32(rowBytes)
|
||||
valid = append(valid, true)
|
||||
physicalIdx++ // Increment only for valid rows
|
||||
}
|
||||
}
|
||||
offsets = append(offsets, currOffset)
|
||||
builder.AppendValues(offsets, valid)
|
||||
columns = append(columns, builder.NewListArray())
|
||||
case schemapb.DataType_BFloat16Vector:
|
||||
builder := array.NewListBuilder(mem, &arrow.Uint8Type{})
|
||||
valueBuilder := builder.ValueBuilder().(*array.Uint8Builder)
|
||||
dim := insertData.Data[fieldID].(*storage.BFloat16VectorFieldData).Dim
|
||||
bfloat16VecData := insertData.Data[fieldID].(*storage.BFloat16VectorFieldData).Data
|
||||
validData := insertData.Data[fieldID].(*storage.BFloat16VectorFieldData).ValidData
|
||||
rowBytes := dim * 2
|
||||
rows := len(bfloat16VecData) / rowBytes
|
||||
offsets := make([]int32, 0, rows)
|
||||
valid := make([]bool, 0, rows)
|
||||
for i := 0; i < rows; i++ {
|
||||
offsets = append(offsets, int32(i*rowBytes))
|
||||
valid = append(valid, true)
|
||||
// For sparse storage: logicalRows from validData, physicalRows from data
|
||||
var logicalRows int
|
||||
if len(validData) > 0 {
|
||||
logicalRows = len(validData)
|
||||
} else {
|
||||
logicalRows = len(bfloat16VecData) / rowBytes
|
||||
}
|
||||
builder.ValueBuilder().(*array.Uint8Builder).AppendValues(bfloat16VecData, nil)
|
||||
offsets := make([]int32, 0, logicalRows+1)
|
||||
valid := make([]bool, 0, logicalRows)
|
||||
currOffset := int32(0)
|
||||
physicalIdx := 0 // Track physical index in sparse data
|
||||
for i := 0; i < logicalRows; i++ {
|
||||
offsets = append(offsets, currOffset)
|
||||
if len(validData) > 0 && !validData[i] {
|
||||
valid = append(valid, false)
|
||||
} else {
|
||||
// Use physical index for sparse storage
|
||||
start := physicalIdx * rowBytes
|
||||
end := start + rowBytes
|
||||
valueBuilder.AppendValues(bfloat16VecData[start:end], nil)
|
||||
currOffset += int32(rowBytes)
|
||||
valid = append(valid, true)
|
||||
physicalIdx++ // Increment only for valid rows
|
||||
}
|
||||
}
|
||||
offsets = append(offsets, currOffset)
|
||||
builder.AppendValues(offsets, valid)
|
||||
columns = append(columns, builder.NewListArray())
|
||||
case schemapb.DataType_SparseFloatVector:
|
||||
contents := insertData.Data[fieldID].(*storage.SparseFloatVectorFieldData).GetContents()
|
||||
arr, err := BuildSparseVectorData(mem, contents, nil)
|
||||
validData := insertData.Data[fieldID].(*storage.SparseFloatVectorFieldData).ValidData
|
||||
arr, err := BuildSparseVectorData(mem, contents, nil, validData)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
columns = append(columns, arr)
|
||||
case schemapb.DataType_Int8Vector:
|
||||
builder := array.NewListBuilder(mem, &arrow.Int8Type{})
|
||||
valueBuilder := builder.ValueBuilder().(*array.Int8Builder)
|
||||
dim := insertData.Data[fieldID].(*storage.Int8VectorFieldData).Dim
|
||||
int8VecData := insertData.Data[fieldID].(*storage.Int8VectorFieldData).Data
|
||||
rows := len(int8VecData) / dim
|
||||
offsets := make([]int32, 0, rows)
|
||||
valid := make([]bool, 0, rows)
|
||||
for i := 0; i < rows; i++ {
|
||||
offsets = append(offsets, int32(i*dim))
|
||||
valid = append(valid, true)
|
||||
validData := insertData.Data[fieldID].(*storage.Int8VectorFieldData).ValidData
|
||||
// For sparse storage: logicalRows from validData, physicalRows from data
|
||||
var logicalRows int
|
||||
if len(validData) > 0 {
|
||||
logicalRows = len(validData)
|
||||
} else {
|
||||
logicalRows = len(int8VecData) / dim
|
||||
}
|
||||
builder.ValueBuilder().(*array.Int8Builder).AppendValues(int8VecData, nil)
|
||||
offsets := make([]int32, 0, logicalRows+1)
|
||||
valid := make([]bool, 0, logicalRows)
|
||||
currOffset := int32(0)
|
||||
physicalIdx := 0 // Track physical index in sparse data
|
||||
for i := 0; i < logicalRows; i++ {
|
||||
offsets = append(offsets, currOffset)
|
||||
if len(validData) > 0 && !validData[i] {
|
||||
valid = append(valid, false)
|
||||
} else {
|
||||
// Use physical index for sparse storage
|
||||
start := physicalIdx * dim
|
||||
end := start + dim
|
||||
valueBuilder.AppendValues(int8VecData[start:end], nil)
|
||||
currOffset += int32(dim)
|
||||
valid = append(valid, true)
|
||||
physicalIdx++ // Increment only for valid rows
|
||||
}
|
||||
}
|
||||
offsets = append(offsets, currOffset)
|
||||
builder.AppendValues(offsets, valid)
|
||||
columns = append(columns, builder.NewListArray())
|
||||
case schemapb.DataType_JSON:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user