From 066c8ea17591e775f7f734defff8e3514abafc39 Mon Sep 17 00:00:00 2001 From: Ted Xu Date: Mon, 27 May 2024 16:27:42 +0800 Subject: [PATCH] feat: stream reader/writer to support nulls (#33080) See: #31728 --------- Signed-off-by: Ted Xu --- internal/storage/serde.go | 130 ++++++++++++++++++++++++++++++--- internal/storage/serde_test.go | 100 +++++++++++++++++++++---- 2 files changed, 206 insertions(+), 24 deletions(-) diff --git a/internal/storage/serde.go b/internal/storage/serde.go index c75bf5aabc..6ec7b38c92 100644 --- a/internal/storage/serde.go +++ b/internal/storage/serde.go @@ -199,10 +199,16 @@ func (crr *compositeRecordReader) Close() { } type serdeEntry struct { - arrowType func(int) arrow.DataType + // arrowType returns the arrow type for the given dimension + arrowType func(int) arrow.DataType + // deserialize deserializes the i-th element in the array, returns the value and ok. + // null is deserialized to nil without checking the type nullability. deserialize func(arrow.Array, int) (any, bool) - serialize func(array.Builder, any) bool - sizeof func(any) uint64 + // serialize serializes the value to the builder, returns ok. + // nil is serialized to null without checking the type nullability. + serialize func(array.Builder, any) bool + // sizeof returns the size in bytes of the value + sizeof func(any) uint64 } var serdeMap = func() map[schemapb.DataType]serdeEntry { @@ -212,12 +218,19 @@ var serdeMap = func() map[schemapb.DataType]serdeEntry { return arrow.FixedWidthTypes.Boolean }, func(a arrow.Array, i int) (any, bool) { + if a.IsNull(i) { + return nil, true + } if arr, ok := a.(*array.Boolean); ok && i < arr.Len() { return arr.Value(i), true } return nil, false }, func(b array.Builder, v any) bool { + if v == nil { + b.AppendNull() + return true + } if builder, ok := b.(*array.BooleanBuilder); ok { if v, ok := v.(bool); ok { builder.Append(v) @@ -235,12 +248,19 @@ var serdeMap = func() map[schemapb.DataType]serdeEntry { return arrow.PrimitiveTypes.Int8 }, func(a arrow.Array, i int) (any, bool) { + if a.IsNull(i) { + return nil, true + } if arr, ok := a.(*array.Int8); ok && i < arr.Len() { return arr.Value(i), true } return nil, false }, func(b array.Builder, v any) bool { + if v == nil { + b.AppendNull() + return true + } if builder, ok := b.(*array.Int8Builder); ok { if v, ok := v.(int8); ok { builder.Append(v) @@ -258,12 +278,19 @@ var serdeMap = func() map[schemapb.DataType]serdeEntry { return arrow.PrimitiveTypes.Int16 }, func(a arrow.Array, i int) (any, bool) { + if a.IsNull(i) { + return nil, true + } if arr, ok := a.(*array.Int16); ok && i < arr.Len() { return arr.Value(i), true } return nil, false }, func(b array.Builder, v any) bool { + if v == nil { + b.AppendNull() + return true + } if builder, ok := b.(*array.Int16Builder); ok { if v, ok := v.(int16); ok { builder.Append(v) @@ -281,12 +308,19 @@ var serdeMap = func() map[schemapb.DataType]serdeEntry { return arrow.PrimitiveTypes.Int32 }, func(a arrow.Array, i int) (any, bool) { + if a.IsNull(i) { + return nil, true + } if arr, ok := a.(*array.Int32); ok && i < arr.Len() { return arr.Value(i), true } return nil, false }, func(b array.Builder, v any) bool { + if v == nil { + b.AppendNull() + return true + } if builder, ok := b.(*array.Int32Builder); ok { if v, ok := v.(int32); ok { builder.Append(v) @@ -304,12 +338,19 @@ var serdeMap = func() map[schemapb.DataType]serdeEntry { return arrow.PrimitiveTypes.Int64 }, func(a arrow.Array, i int) (any, bool) { + if a.IsNull(i) { + return nil, true + } if arr, ok := a.(*array.Int64); ok && i < arr.Len() { return arr.Value(i), true } return nil, false }, func(b array.Builder, v any) bool { + if v == nil { + b.AppendNull() + return true + } if builder, ok := b.(*array.Int64Builder); ok { if v, ok := v.(int64); ok { builder.Append(v) @@ -327,12 +368,19 @@ var serdeMap = func() map[schemapb.DataType]serdeEntry { return arrow.PrimitiveTypes.Float32 }, func(a arrow.Array, i int) (any, bool) { + if a.IsNull(i) { + return nil, true + } if arr, ok := a.(*array.Float32); ok && i < arr.Len() { return arr.Value(i), true } return nil, false }, func(b array.Builder, v any) bool { + if v == nil { + b.AppendNull() + return true + } if builder, ok := b.(*array.Float32Builder); ok { if v, ok := v.(float32); ok { builder.Append(v) @@ -350,12 +398,19 @@ var serdeMap = func() map[schemapb.DataType]serdeEntry { return arrow.PrimitiveTypes.Float64 }, func(a arrow.Array, i int) (any, bool) { + if a.IsNull(i) { + return nil, true + } if arr, ok := a.(*array.Float64); ok && i < arr.Len() { return arr.Value(i), true } return nil, false }, func(b array.Builder, v any) bool { + if v == nil { + b.AppendNull() + return true + } if builder, ok := b.(*array.Float64Builder); ok { if v, ok := v.(float64); ok { builder.Append(v) @@ -373,12 +428,19 @@ var serdeMap = func() map[schemapb.DataType]serdeEntry { return arrow.BinaryTypes.String }, func(a arrow.Array, i int) (any, bool) { + if a.IsNull(i) { + return nil, true + } if arr, ok := a.(*array.String); ok && i < arr.Len() { return arr.Value(i), true } return nil, false }, func(b array.Builder, v any) bool { + if v == nil { + b.AppendNull() + return true + } if builder, ok := b.(*array.StringBuilder); ok { if v, ok := v.(string); ok { builder.Append(v) @@ -388,6 +450,9 @@ var serdeMap = func() map[schemapb.DataType]serdeEntry { return false }, func(v any) uint64 { + if v == nil { + return 8 + } return uint64(len(v.(string))) }, } @@ -399,6 +464,9 @@ var serdeMap = func() map[schemapb.DataType]serdeEntry { return arrow.BinaryTypes.Binary }, func(a arrow.Array, i int) (any, bool) { + if a.IsNull(i) { + return nil, true + } if arr, ok := a.(*array.Binary); ok && i < arr.Len() { v := &schemapb.ScalarField{} if err := proto.Unmarshal(arr.Value(i), v); err == nil { @@ -408,6 +476,10 @@ var serdeMap = func() map[schemapb.DataType]serdeEntry { return nil, false }, func(b array.Builder, v any) bool { + if v == nil { + b.AppendNull() + return true + } if builder, ok := b.(*array.BinaryBuilder); ok { if vv, ok := v.(*schemapb.ScalarField); ok { if bytes, err := proto.Marshal(vv); err == nil { @@ -419,11 +491,17 @@ var serdeMap = func() map[schemapb.DataType]serdeEntry { return false }, func(v any) uint64 { + if v == nil { + return 8 + } return uint64(v.(*schemapb.ScalarField).XXX_Size()) }, } sizeOfBytes := func(v any) uint64 { + if v == nil { + return 8 + } return uint64(len(v.([]byte))) } @@ -432,12 +510,19 @@ var serdeMap = func() map[schemapb.DataType]serdeEntry { return arrow.BinaryTypes.Binary }, func(a arrow.Array, i int) (any, bool) { + if a.IsNull(i) { + return nil, true + } if arr, ok := a.(*array.Binary); ok && i < arr.Len() { return arr.Value(i), true } return nil, false }, func(b array.Builder, v any) bool { + if v == nil { + b.AppendNull() + return true + } if builder, ok := b.(*array.BinaryBuilder); ok { if v, ok := v.([]byte); ok { builder.Append(v) @@ -452,12 +537,19 @@ var serdeMap = func() map[schemapb.DataType]serdeEntry { m[schemapb.DataType_JSON] = byteEntry fixedSizeDeserializer := func(a arrow.Array, i int) (any, bool) { + if a.IsNull(i) { + return nil, true + } if arr, ok := a.(*array.FixedSizeBinary); ok && i < arr.Len() { return arr.Value(i), true } return nil, false } fixedSizeSerializer := func(b array.Builder, v any) bool { + if v == nil { + b.AppendNull() + return true + } if builder, ok := b.(*array.FixedSizeBinaryBuilder); ok { if v, ok := v.([]byte); ok { builder.Append(v) @@ -496,12 +588,19 @@ var serdeMap = func() map[schemapb.DataType]serdeEntry { return &arrow.FixedSizeBinaryType{ByteWidth: i * 4} }, func(a arrow.Array, i int) (any, bool) { + if a.IsNull(i) { + return nil, true + } if arr, ok := a.(*array.FixedSizeBinary); ok && i < arr.Len() { return arrow.Float32Traits.CastFromBytes(arr.Value(i)), true } return nil, false }, func(b array.Builder, v any) bool { + if v == nil { + b.AppendNull() + return true + } if builder, ok := b.(*array.FixedSizeBinaryBuilder); ok { if vv, ok := v.([]float32); ok { dim := len(vv) @@ -518,6 +617,9 @@ var serdeMap = func() map[schemapb.DataType]serdeEntry { return false }, func(v any) uint64 { + if v == nil { + return 8 + } return uint64(len(v.([]float32)) * 4) }, } @@ -639,11 +741,15 @@ func NewBinlogDeserializeReader(blobs []*Blob, PKfieldID UniqueID) (*Deserialize m := value.Value.(map[FieldID]interface{}) for j, dt := range r.Schema() { - d, ok := serdeMap[dt].deserialize(r.Column(j), i) - if ok { - m[j] = d // TODO: avoid memory copy here. + if r.Column(j).IsNull(i) { + m[j] = nil } else { - return errors.New(fmt.Sprintf("unexpected type %s", dt)) + d, ok := serdeMap[dt].deserialize(r.Column(j), i) + if ok { + m[j] = d // TODO: avoid memory copy here. + } else { + return errors.New(fmt.Sprintf("unexpected type %s", dt)) + } } } @@ -900,8 +1006,9 @@ func (bsw *BinlogStreamWriter) GetRecordWriter() (RecordWriter, error) { fid := bsw.fieldSchema.FieldID dim, _ := typeutil.GetDim(bsw.fieldSchema) rw, err := newSingleFieldRecordWriter(fid, arrow.Field{ - Name: strconv.Itoa(int(fid)), - Type: serdeMap[bsw.fieldSchema.DataType].arrowType(int(dim)), + Name: strconv.Itoa(int(fid)), + Type: serdeMap[bsw.fieldSchema.DataType].arrowType(int(dim)), + Nullable: true, // No nullable check here. }, &bsw.buf) if err != nil { return nil, err @@ -1028,8 +1135,9 @@ func NewBinlogSerializeWriter(schema *schemapb.CollectionSchema, partitionID, se arrays[i] = builder.NewArray() builder.Release() fields[i] = arrow.Field{ - Name: strconv.Itoa(int(fid)), - Type: arrays[i].DataType(), + Name: strconv.Itoa(int(fid)), + Type: arrays[i].DataType(), + Nullable: true, // No nullable check here. } field2Col[fid] = i i++ diff --git a/internal/storage/serde_test.go b/internal/storage/serde_test.go index 0d9306069d..8cdf15b847 100644 --- a/internal/storage/serde_test.go +++ b/internal/storage/serde_test.go @@ -178,6 +178,68 @@ func TestBinlogSerializeWriter(t *testing.T) { }) } +func TestNull(t *testing.T) { + t.Run("test null", func(t *testing.T) { + schema := generateTestSchema() + // Copy write the generated data + writers := NewBinlogStreamWriters(0, 0, 0, schema.Fields) + writer, err := NewBinlogSerializeWriter(schema, 0, 0, writers, 1024) + assert.NoError(t, err) + + m := make(map[FieldID]any) + m[common.RowIDField] = int64(0) + m[common.TimeStampField] = int64(0) + m[10] = nil + m[11] = nil + m[12] = nil + m[13] = nil + m[14] = nil + m[15] = nil + m[16] = nil + m[17] = nil + m[18] = nil + m[19] = nil + m[101] = nil + m[102] = nil + m[103] = nil + m[104] = nil + m[105] = nil + m[106] = nil + pk, err := GenPrimaryKeyByRawData(m[common.RowIDField], schemapb.DataType_Int64) + assert.NoError(t, err) + + value := &Value{ + ID: 0, + PK: pk, + Timestamp: 0, + IsDeleted: false, + Value: m, + } + writer.Write(value) + err = writer.Close() + assert.NoError(t, err) + + // Read from the written data + blobs := make([]*Blob, len(writers)) + i := 0 + for _, w := range writers { + blob, err := w.Finalize() + assert.NoError(t, err) + assert.NotNil(t, blob) + blobs[i] = blob + i++ + } + reader, err := NewBinlogDeserializeReader(blobs, common.RowIDField) + assert.NoError(t, err) + defer reader.Close() + err = reader.Next() + assert.NoError(t, err) + + readValue := reader.Value() + assert.Equal(t, value, readValue) + }) +} + func TestSerDe(t *testing.T) { type args struct { dt schemapb.DataType @@ -190,33 +252,45 @@ func TestSerDe(t *testing.T) { want1 bool }{ {"test bool", args{dt: schemapb.DataType_Bool, v: true}, true, true}, - {"test bool negative", args{dt: schemapb.DataType_Bool, v: nil}, nil, false}, + {"test bool null", args{dt: schemapb.DataType_Bool, v: nil}, nil, true}, + {"test bool negative", args{dt: schemapb.DataType_Bool, v: -1}, nil, false}, {"test int8", args{dt: schemapb.DataType_Int8, v: int8(1)}, int8(1), true}, - {"test int8 negative", args{dt: schemapb.DataType_Int8, v: nil}, nil, false}, + {"test int8 null", args{dt: schemapb.DataType_Int8, v: nil}, nil, true}, + {"test int8 negative", args{dt: schemapb.DataType_Int8, v: true}, nil, false}, {"test int16", args{dt: schemapb.DataType_Int16, v: int16(1)}, int16(1), true}, - {"test int16 negative", args{dt: schemapb.DataType_Int16, v: nil}, nil, false}, + {"test int16 null", args{dt: schemapb.DataType_Int16, v: nil}, nil, true}, + {"test int16 negative", args{dt: schemapb.DataType_Int16, v: true}, nil, false}, {"test int32", args{dt: schemapb.DataType_Int32, v: int32(1)}, int32(1), true}, - {"test int32 negative", args{dt: schemapb.DataType_Int32, v: nil}, nil, false}, + {"test int32 null", args{dt: schemapb.DataType_Int32, v: nil}, nil, true}, + {"test int32 negative", args{dt: schemapb.DataType_Int32, v: true}, nil, false}, {"test int64", args{dt: schemapb.DataType_Int64, v: int64(1)}, int64(1), true}, - {"test int64 negative", args{dt: schemapb.DataType_Int64, v: nil}, nil, false}, + {"test int64 null", args{dt: schemapb.DataType_Int64, v: nil}, nil, true}, + {"test int64 negative", args{dt: schemapb.DataType_Int64, v: true}, nil, false}, {"test float32", args{dt: schemapb.DataType_Float, v: float32(1)}, float32(1), true}, - {"test float32 negative", args{dt: schemapb.DataType_Float, v: nil}, nil, false}, + {"test float32 null", args{dt: schemapb.DataType_Float, v: nil}, nil, true}, + {"test float32 negative", args{dt: schemapb.DataType_Float, v: -1}, nil, false}, {"test float64", args{dt: schemapb.DataType_Double, v: float64(1)}, float64(1), true}, - {"test float64 negative", args{dt: schemapb.DataType_Double, v: nil}, nil, false}, + {"test float64 null", args{dt: schemapb.DataType_Double, v: nil}, nil, true}, + {"test float64 negative", args{dt: schemapb.DataType_Double, v: -1}, nil, false}, {"test string", args{dt: schemapb.DataType_String, v: "test"}, "test", true}, - {"test string negative", args{dt: schemapb.DataType_String, v: nil}, nil, false}, + {"test string null", args{dt: schemapb.DataType_String, v: nil}, nil, true}, + {"test string negative", args{dt: schemapb.DataType_String, v: -1}, nil, false}, {"test varchar", args{dt: schemapb.DataType_VarChar, v: "test"}, "test", true}, - {"test varchar negative", args{dt: schemapb.DataType_VarChar, v: nil}, nil, false}, + {"test varchar null", args{dt: schemapb.DataType_VarChar, v: nil}, nil, true}, + {"test varchar negative", args{dt: schemapb.DataType_VarChar, v: -1}, nil, false}, {"test array negative", args{dt: schemapb.DataType_Array, v: "{}"}, nil, false}, - {"test array negative null", args{dt: schemapb.DataType_Array, v: nil}, nil, false}, + {"test array null", args{dt: schemapb.DataType_Array, v: nil}, nil, true}, {"test json", args{dt: schemapb.DataType_JSON, v: []byte("{}")}, []byte("{}"), true}, - {"test json negative", args{dt: schemapb.DataType_JSON, v: nil}, nil, false}, + {"test json null", args{dt: schemapb.DataType_JSON, v: nil}, nil, true}, + {"test json negative", args{dt: schemapb.DataType_JSON, v: -1}, nil, false}, {"test float vector", args{dt: schemapb.DataType_FloatVector, v: []float32{1.0}}, []float32{1.0}, true}, - {"test float vector negative", args{dt: schemapb.DataType_FloatVector, v: nil}, nil, false}, + {"test float vector null", args{dt: schemapb.DataType_FloatVector, v: nil}, nil, true}, + {"test float vector negative", args{dt: schemapb.DataType_FloatVector, v: []int{1}}, nil, false}, {"test bool vector", args{dt: schemapb.DataType_BinaryVector, v: []byte{0xff}}, []byte{0xff}, true}, {"test float16 vector", args{dt: schemapb.DataType_Float16Vector, v: []byte{0xff, 0xff}}, []byte{0xff, 0xff}, true}, {"test bfloat16 vector", args{dt: schemapb.DataType_BFloat16Vector, v: []byte{0xff, 0xff}}, []byte{0xff, 0xff}, true}, - {"test bfloat16 vector negative", args{dt: schemapb.DataType_BFloat16Vector, v: nil}, nil, false}, + {"test bfloat16 vector null", args{dt: schemapb.DataType_BFloat16Vector, v: nil}, nil, true}, + {"test bfloat16 vector negative", args{dt: schemapb.DataType_BFloat16Vector, v: -1}, nil, false}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) {