fix: Fix a crash issue of bulkinsert (#40331)

issue: https://github.com/milvus-io/milvus/issues/40291
pr: https://github.com/milvus-io/milvus/pull/40304

Signed-off-by: yhmo <yihua.mo@zilliz.com>
This commit is contained in:
groot 2025-03-14 18:14:07 +08:00 committed by GitHub
parent 6249335859
commit 9fbfcda48e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 21 additions and 10 deletions

View File

@ -272,7 +272,7 @@ func ReadNullableBoolData(pcr *FieldReader, count int64) (any, []bool, error) {
validData = append(validData, make([]bool, dataNums)...) validData = append(validData, make([]bool, dataNums)...)
data = append(data, make([]bool, dataNums)...) data = append(data, make([]bool, dataNums)...)
} else { } else {
validData = append(validData, bytesToBoolArray(dataNums, boolReader.NullBitmapBytes())...) validData = append(validData, bytesToValidData(dataNums, boolReader.NullBitmapBytes())...)
for i := 0; i < dataNums; i++ { for i := 0; i < dataNums; i++ {
data = append(data, boolReader.Value(i)) data = append(data, boolReader.Value(i))
} }
@ -370,37 +370,37 @@ func ReadNullableIntegerOrFloatData[T constraints.Integer | constraints.Float](p
switch chunk.DataType().ID() { switch chunk.DataType().ID() {
case arrow.INT8: case arrow.INT8:
int8Reader := chunk.(*array.Int8) int8Reader := chunk.(*array.Int8)
validData = append(validData, bytesToBoolArray(dataNums, int8Reader.NullBitmapBytes())...) validData = append(validData, bytesToValidData(dataNums, int8Reader.NullBitmapBytes())...)
for i := 0; i < dataNums; i++ { for i := 0; i < dataNums; i++ {
data = append(data, T(int8Reader.Value(i))) data = append(data, T(int8Reader.Value(i)))
} }
case arrow.INT16: case arrow.INT16:
int16Reader := chunk.(*array.Int16) int16Reader := chunk.(*array.Int16)
validData = append(validData, bytesToBoolArray(dataNums, int16Reader.NullBitmapBytes())...) validData = append(validData, bytesToValidData(dataNums, int16Reader.NullBitmapBytes())...)
for i := 0; i < dataNums; i++ { for i := 0; i < dataNums; i++ {
data = append(data, T(int16Reader.Value(i))) data = append(data, T(int16Reader.Value(i)))
} }
case arrow.INT32: case arrow.INT32:
int32Reader := chunk.(*array.Int32) int32Reader := chunk.(*array.Int32)
validData = append(validData, bytesToBoolArray(dataNums, int32Reader.NullBitmapBytes())...) validData = append(validData, bytesToValidData(dataNums, int32Reader.NullBitmapBytes())...)
for i := 0; i < dataNums; i++ { for i := 0; i < dataNums; i++ {
data = append(data, T(int32Reader.Value(i))) data = append(data, T(int32Reader.Value(i)))
} }
case arrow.INT64: case arrow.INT64:
int64Reader := chunk.(*array.Int64) int64Reader := chunk.(*array.Int64)
validData = append(validData, bytesToBoolArray(dataNums, int64Reader.NullBitmapBytes())...) validData = append(validData, bytesToValidData(dataNums, int64Reader.NullBitmapBytes())...)
for i := 0; i < dataNums; i++ { for i := 0; i < dataNums; i++ {
data = append(data, T(int64Reader.Value(i))) data = append(data, T(int64Reader.Value(i)))
} }
case arrow.FLOAT32: case arrow.FLOAT32:
float32Reader := chunk.(*array.Float32) float32Reader := chunk.(*array.Float32)
validData = append(validData, bytesToBoolArray(dataNums, float32Reader.NullBitmapBytes())...) validData = append(validData, bytesToValidData(dataNums, float32Reader.NullBitmapBytes())...)
for i := 0; i < dataNums; i++ { for i := 0; i < dataNums; i++ {
data = append(data, T(float32Reader.Value(i))) data = append(data, T(float32Reader.Value(i)))
} }
case arrow.FLOAT64: case arrow.FLOAT64:
float64Reader := chunk.(*array.Float64) float64Reader := chunk.(*array.Float64)
validData = append(validData, bytesToBoolArray(dataNums, float64Reader.NullBitmapBytes())...) validData = append(validData, bytesToValidData(dataNums, float64Reader.NullBitmapBytes())...)
for i := 0; i < dataNums; i++ { for i := 0; i < dataNums; i++ {
data = append(data, T(float64Reader.Value(i))) data = append(data, T(float64Reader.Value(i)))
} }
@ -473,7 +473,7 @@ func ReadNullableStringData(pcr *FieldReader, count int64) (any, []bool, error)
validData = append(validData, make([]bool, dataNums)...) validData = append(validData, make([]bool, dataNums)...)
data = append(data, make([]string, dataNums)...) data = append(data, make([]string, dataNums)...)
} else { } else {
validData = append(validData, bytesToBoolArray(dataNums, stringReader.NullBitmapBytes())...) validData = append(validData, bytesToValidData(dataNums, stringReader.NullBitmapBytes())...)
for i := 0; i < dataNums; i++ { for i := 0; i < dataNums; i++ {
if stringReader.IsNull(i) { if stringReader.IsNull(i) {
data = append(data, "") data = append(data, "")
@ -547,7 +547,7 @@ func ReadNullableVarcharData(pcr *FieldReader, count int64) (any, []bool, error)
validData = append(validData, make([]bool, dataNums)...) validData = append(validData, make([]bool, dataNums)...)
data = append(data, make([]string, dataNums)...) data = append(data, make([]string, dataNums)...)
} else { } else {
validData = append(validData, bytesToBoolArray(dataNums, stringReader.NullBitmapBytes())...) validData = append(validData, bytesToValidData(dataNums, stringReader.NullBitmapBytes())...)
for i := 0; i < dataNums; i++ { for i := 0; i < dataNums; i++ {
if stringReader.IsNull(i) { if stringReader.IsNull(i) {
data = append(data, "") data = append(data, "")

View File

@ -272,9 +272,20 @@ func isSchemaEqual(schema *schemapb.CollectionSchema, arrSchema *arrow.Schema) e
} }
// todo(smellthemoon): use byte to store valid_data // todo(smellthemoon): use byte to store valid_data
func bytesToBoolArray(length int, bytes []byte) []bool { func bytesToValidData(length int, bytes []byte) []bool {
bools := make([]bool, 0, length) bools := make([]bool, 0, length)
if len(bytes) == 0 {
// parquet field is "optional" or "required"
// for "required" field, the arrow.array.NullBitmapBytes() returns an empty byte list
// which means all the elements are valid. In this case, we simply construct an all-true bool array
for i := 0; i < length; i++ {
bools = append(bools, true)
}
return bools
}
// for "optional" field, the arrow.array.NullBitmapBytes() returns a non-empty byte list
// with each bit representing the existence of an element
for i := 0; i < length; i++ { for i := 0; i < length; i++ {
bit := (bytes[uint(i)/8] & BitMask[byte(i)%8]) != 0 bit := (bytes[uint(i)/8] & BitMask[byte(i)%8]) != 0
bools = append(bools, bit) bools = append(bools, bit)