diff --git a/internal/datanode/compaction/mix_compactor_test.go b/internal/datanode/compaction/mix_compactor_test.go index 063f777eef..7a5339b143 100644 --- a/internal/datanode/compaction/mix_compactor_test.go +++ b/internal/datanode/compaction/mix_compactor_test.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "math" + "math/rand" "testing" "time" @@ -334,6 +335,82 @@ func (s *MixCompactionTaskSuite) TestCompactSortedSegment() { s.Empty(segment.Deltalogs) } +func (s *MixCompactionTaskSuite) TestCompactSortedSegmentLackBinlog() { + paramtable.Get().Save("dataNode.compaction.useMergeSort", "true") + defer paramtable.Get().Reset("dataNode.compaction.useMergeSort") + segments := []int64{1001, 1002, 1003} + alloc := allocator.NewLocalAllocator(100, math.MaxInt64) + s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil) + s.task.plan.SegmentBinlogs = make([]*datapb.CompactionSegmentBinlogs, 0) + deleteTs := tsoutil.ComposeTSByTime(getMilvusBirthday().Add(10*time.Second), 0) + addedFieldSet := typeutil.NewSet[int64]() + for _, f := range s.meta.GetSchema().GetFields() { + if f.FieldID == common.RowIDField || f.FieldID == common.TimeStampField || f.IsPrimaryKey || typeutil.IsVectorType(f.DataType) { + continue + } + addedFieldSet.Insert(f.FieldID) + } + + for _, segID := range segments { + s.initMultiRowsSegBuffer(segID, 100, 3) + kvs, fBinlogs, err := serializeWrite(context.TODO(), alloc, s.segWriter) + s.Require().NoError(err) + + for fid, binlog := range fBinlogs { + if addedFieldSet.Contain(fid) { + if rand.Intn(2) == 0 { + continue + } + for _, k := range binlog.GetBinlogs() { + delete(kvs, k.LogPath) + } + delete(fBinlogs, fid) + } + } + + s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.MatchedBy(func(keys []string) bool { + left, right := lo.Difference(keys, lo.Keys(kvs)) + return len(left) == 0 && len(right) == 0 + })).Return(lo.Values(kvs), nil).Once() + + blob, err := getInt64DeltaBlobs( + segID, + []int64{segID, segID + 3, segID + 6}, + []uint64{deleteTs, deleteTs, deleteTs}, + ) + s.Require().NoError(err) + deltaPath := fmt.Sprintf("deltalog/%d", segID) + s.mockBinlogIO.EXPECT().Download(mock.Anything, []string{deltaPath}). + Return([][]byte{blob.GetValue()}, nil).Once() + + s.task.plan.SegmentBinlogs = append(s.task.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{ + SegmentID: segID, + FieldBinlogs: lo.Values(fBinlogs), + IsSorted: true, + Deltalogs: []*datapb.FieldBinlog{ + {Binlogs: []*datapb.Binlog{{LogPath: deltaPath}}}, + }, + }) + + } + + result, err := s.task.Compact() + s.NoError(err) + s.NotNil(result) + + s.Equal(s.task.plan.GetPlanID(), result.GetPlanID()) + s.Equal(1, len(result.GetSegments())) + s.True(result.GetSegments()[0].GetIsSorted()) + + segment := result.GetSegments()[0] + s.EqualValues(19531, segment.GetSegmentID()) + s.EqualValues(291, segment.GetNumOfRows()) + s.NotEmpty(segment.InsertLogs) + + s.NotEmpty(segment.Field2StatslogPaths) + s.Empty(segment.Deltalogs) +} + func (s *MixCompactionTaskSuite) TestSplitMergeEntityExpired() { s.initSegBuffer(1, 3) collTTL := 864000 // 10 days @@ -375,6 +452,104 @@ func (s *MixCompactionTaskSuite) TestSplitMergeEntityExpired() { s.Empty(compactionSegments[0].GetField2StatslogPaths()) } +func (s *MixCompactionTaskSuite) TestMergeNoExpirationLackBinlog() { + s.initSegBuffer(1, 4) + deleteTs := tsoutil.ComposeTSByTime(getMilvusBirthday().Add(10*time.Second), 0) + tests := []struct { + description string + deletions map[int64]uint64 + expectedRes int + leftNumRows int + }{ + {"no deletion", nil, 1, 1}, + {"mismatch deletion", map[int64]uint64{int64(1): deleteTs}, 1, 1}, + {"deleted pk=4", map[int64]uint64{int64(4): deleteTs}, 1, 0}, + } + + alloc := allocator.NewLocalAllocator(888888, math.MaxInt64) + addedFieldSet := typeutil.NewSet[int64]() + for _, f := range s.meta.GetSchema().GetFields() { + if f.FieldID == common.RowIDField || f.FieldID == common.TimeStampField || f.IsPrimaryKey || typeutil.IsVectorType(f.DataType) { + continue + } + addedFieldSet.Insert(f.FieldID) + } + + kvs, fBinlogs, err := serializeWrite(context.TODO(), alloc, s.segWriter) + for fid, binlog := range fBinlogs { + if addedFieldSet.Contain(fid) { + if rand.Intn(2) == 0 { + continue + } + for _, k := range binlog.GetBinlogs() { + delete(kvs, k.LogPath) + } + delete(fBinlogs, fid) + } + } + s.Require().NoError(err) + + for _, test := range tests { + s.Run(test.description, func() { + if len(test.deletions) > 0 { + blob, err := getInt64DeltaBlobs( + s.segWriter.segmentID, + lo.Keys(test.deletions), + lo.Values(test.deletions), + ) + s.Require().NoError(err) + s.mockBinlogIO.EXPECT().Download(mock.Anything, []string{"foo"}). + Return([][]byte{blob.GetValue()}, nil).Once() + s.task.plan.SegmentBinlogs[0].Deltalogs = []*datapb.FieldBinlog{ + { + Binlogs: []*datapb.Binlog{ + { + LogPath: "foo", + }, + }, + }, + } + } + + insertPaths := lo.Keys(kvs) + insertBytes := func() [][]byte { + res := make([][]byte, 0, len(insertPaths)) + for _, path := range insertPaths { + res = append(res, kvs[path]) + } + return res + }() + s.mockBinlogIO.EXPECT().Download(mock.Anything, insertPaths).RunAndReturn( + func(ctx context.Context, paths []string) ([][]byte, error) { + s.Require().Equal(len(paths), len(kvs)) + return insertBytes, nil + }) + fieldBinlogs := make([]*datapb.FieldBinlog, 0, len(insertPaths)) + for _, k := range insertPaths { + fieldBinlogs = append(fieldBinlogs, &datapb.FieldBinlog{ + Binlogs: []*datapb.Binlog{ + { + LogPath: k, + }, + }, + }) + } + s.task.plan.SegmentBinlogs[0].FieldBinlogs = fieldBinlogs + + s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil).Maybe() + + s.task.collectionID = CollectionID + s.task.partitionID = PartitionID + s.task.maxRows = 1000 + + res, err := s.task.mergeSplit(s.task.ctx) + s.NoError(err) + s.EqualValues(test.expectedRes, len(res)) + s.EqualValues(test.leftNumRows, res[0].GetNumOfRows()) + }) + } +} + func (s *MixCompactionTaskSuite) TestMergeNoExpiration() { s.initSegBuffer(1, 4) deleteTs := tsoutil.ComposeTSByTime(getMilvusBirthday().Add(10*time.Second), 0) @@ -643,7 +818,16 @@ func getRow(magic int64) map[int64]interface{} { IntData: &schemapb.IntArray{Data: []int32{1, 2, 3}}, }, }, - JSONField: []byte(`{"batch":ok}`), + JSONField: []byte(`{"batch":ok}`), + BoolFieldWithDefaultValue: nil, + Int8FieldWithDefaultValue: nil, + Int16FieldWithDefaultValue: nil, + Int32FieldWithDefaultValue: nil, + Int64FieldWithDefaultValue: nil, + FloatFieldWithDefaultValue: nil, + DoubleFieldWithDefaultValue: nil, + StringFieldWithDefaultValue: nil, + VarCharFieldWithDefaultValue: nil, } } @@ -701,25 +885,34 @@ func (s *MixCompactionTaskSuite) initSegBuffer(size int, seed int64) { } const ( - CollectionID = 1 - PartitionID = 1 - SegmentID = 1 - BoolField = 100 - Int8Field = 101 - Int16Field = 102 - Int32Field = 103 - Int64Field = 104 - FloatField = 105 - DoubleField = 106 - StringField = 107 - BinaryVectorField = 108 - FloatVectorField = 109 - ArrayField = 110 - JSONField = 111 - Float16VectorField = 112 - BFloat16VectorField = 113 - SparseFloatVectorField = 114 - VarCharField = 115 + CollectionID = 1 + PartitionID = 1 + SegmentID = 1 + BoolField = 100 + Int8Field = 101 + Int16Field = 102 + Int32Field = 103 + Int64Field = 104 + FloatField = 105 + DoubleField = 106 + StringField = 107 + BinaryVectorField = 108 + FloatVectorField = 109 + ArrayField = 110 + JSONField = 111 + Float16VectorField = 112 + BFloat16VectorField = 113 + SparseFloatVectorField = 114 + VarCharField = 115 + BoolFieldWithDefaultValue = 116 + Int8FieldWithDefaultValue = 117 + Int16FieldWithDefaultValue = 118 + Int32FieldWithDefaultValue = 119 + Int64FieldWithDefaultValue = 120 + FloatFieldWithDefaultValue = 121 + DoubleFieldWithDefaultValue = 122 + StringFieldWithDefaultValue = 123 + VarCharFieldWithDefaultValue = 124 ) func getInt64DeltaBlobs(segID int64, pks []int64, tss []uint64) (*storage.Blob, error) { @@ -826,6 +1019,103 @@ func genTestCollectionMeta() *etcdpb.CollectionMeta { Description: "json", DataType: schemapb.DataType_JSON, }, + { + FieldID: BoolFieldWithDefaultValue, + Name: "field_bool_with_default_value", + DataType: schemapb.DataType_Bool, + DefaultValue: &schemapb.ValueField{ + Data: &schemapb.ValueField_BoolData{ + BoolData: true, + }, + }, + }, + { + FieldID: Int8FieldWithDefaultValue, + Name: "field_int8_with_default_value", + DataType: schemapb.DataType_Int8, + DefaultValue: &schemapb.ValueField{ + Data: &schemapb.ValueField_IntData{ + IntData: 10, + }, + }, + }, + { + FieldID: Int16FieldWithDefaultValue, + Name: "field_int16_with_default_value", + DataType: schemapb.DataType_Int16, + DefaultValue: &schemapb.ValueField{ + Data: &schemapb.ValueField_IntData{ + IntData: 10, + }, + }, + }, + { + FieldID: Int32FieldWithDefaultValue, + Name: "field_int32_with_default_value", + DataType: schemapb.DataType_Int32, + DefaultValue: &schemapb.ValueField{ + Data: &schemapb.ValueField_IntData{ + IntData: 10, + }, + }, + }, + { + FieldID: Int64FieldWithDefaultValue, + Name: "field_int64_with_default_value", + IsPrimaryKey: true, + DataType: schemapb.DataType_Int64, + DefaultValue: &schemapb.ValueField{ + Data: &schemapb.ValueField_LongData{ + LongData: 10, + }, + }, + }, + { + FieldID: FloatFieldWithDefaultValue, + Name: "field_float_with_default_value", + DataType: schemapb.DataType_Float, + DefaultValue: &schemapb.ValueField{ + Data: &schemapb.ValueField_FloatData{ + FloatData: 10, + }, + }, + }, + { + FieldID: DoubleFieldWithDefaultValue, + Name: "field_double_with_default_value", + DataType: schemapb.DataType_Double, + DefaultValue: &schemapb.ValueField{ + Data: &schemapb.ValueField_DoubleData{ + DoubleData: 10, + }, + }, + }, + { + FieldID: StringFieldWithDefaultValue, + Name: "field_string_with_default_value", + DataType: schemapb.DataType_String, + DefaultValue: &schemapb.ValueField{ + Data: &schemapb.ValueField_StringData{ + StringData: "a", + }, + }, + }, + { + FieldID: VarCharFieldWithDefaultValue, + Name: "field_varchar_with_default_value", + DataType: schemapb.DataType_VarChar, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.MaxLengthKey, + Value: "128", + }, + }, + DefaultValue: &schemapb.ValueField{ + Data: &schemapb.ValueField_StringData{ + StringData: "a", + }, + }, + }, { FieldID: BinaryVectorField, Name: "field_binary_vector", diff --git a/internal/datanode/compaction/segment_writer.go b/internal/datanode/compaction/segment_writer.go index 4a71c94d4a..66024e28fb 100644 --- a/internal/datanode/compaction/segment_writer.go +++ b/internal/datanode/compaction/segment_writer.go @@ -20,8 +20,11 @@ import ( "context" "fmt" "math" + "strconv" + "github.com/apache/arrow/go/v12/arrow" "github.com/apache/arrow/go/v12/arrow/array" + "github.com/apache/arrow/go/v12/arrow/memory" "github.com/samber/lo" "go.uber.org/atomic" "go.uber.org/zap" @@ -430,7 +433,44 @@ func (w *SegmentWriter) WriteRecord(r storage.Record) error { w.rowCount.Inc() } - return w.writer.WriteRecord(r) + + builders := make([]array.Builder, len(w.sch.Fields)) + for i, f := range w.sch.Fields { + var b array.Builder + if r.Column(f.FieldID) == nil { + b = array.NewBuilder(memory.DefaultAllocator, storage.MilvusDataTypeToArrowType(f.GetDataType(), 1)) + } else { + b = array.NewBuilder(memory.DefaultAllocator, r.Column(f.FieldID).DataType()) + } + builders[i] = b + } + for c, builder := range builders { + fid := w.sch.Fields[c].FieldID + defaultValue := w.sch.Fields[c].GetDefaultValue() + for i := 0; i < rows; i++ { + if err := storage.AppendValueAt(builder, r.Column(fid), i, defaultValue); err != nil { + return err + } + } + } + arrays := make([]arrow.Array, len(builders)) + fields := make([]arrow.Field, len(builders)) + field2Col := make(map[typeutil.UniqueID]int, len(builders)) + + for c, builder := range builders { + arrays[c] = builder.NewArray() + fid := w.sch.Fields[c].FieldID + fields[c] = arrow.Field{ + Name: strconv.Itoa(int(fid)), + Type: arrays[c].DataType(), + Nullable: true, // No nullable check here. + } + field2Col[fid] = c + } + + rec := storage.NewSimpleArrowRecord(array.NewRecord(arrow.NewSchema(fields, nil), arrays, int64(rows)), field2Col) + defer rec.Release() + return w.writer.WriteRecord(rec) } func (w *SegmentWriter) Write(v *storage.Value) error { diff --git a/internal/storage/arrow_util.go b/internal/storage/arrow_util.go index 446e9cbc4a..36998e01ab 100644 --- a/internal/storage/arrow_util.go +++ b/internal/storage/arrow_util.go @@ -21,11 +21,21 @@ import ( "github.com/apache/arrow/go/v12/arrow" "github.com/apache/arrow/go/v12/arrow/array" + + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" ) -func appendValueAt(builder array.Builder, a arrow.Array, idx int) error { +func AppendValueAt(builder array.Builder, a arrow.Array, idx int, defaultValue *schemapb.ValueField) error { switch b := builder.(type) { case *array.BooleanBuilder: + if a == nil { + if defaultValue != nil { + b.Append(defaultValue.GetBoolData()) + } else { + b.AppendNull() + } + return nil + } ba, ok := a.(*array.Boolean) if !ok { return fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type()) @@ -37,6 +47,14 @@ func appendValueAt(builder array.Builder, a arrow.Array, idx int) error { } return nil case *array.Int8Builder: + if a == nil { + if defaultValue != nil { + b.Append(int8(defaultValue.GetIntData())) + } else { + b.AppendNull() + } + return nil + } ia, ok := a.(*array.Int8) if !ok { return fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type()) @@ -48,6 +66,14 @@ func appendValueAt(builder array.Builder, a arrow.Array, idx int) error { } return nil case *array.Int16Builder: + if a == nil { + if defaultValue != nil { + b.Append(int16(defaultValue.GetIntData())) + } else { + b.AppendNull() + } + return nil + } ia, ok := a.(*array.Int16) if !ok { return fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type()) @@ -59,6 +85,14 @@ func appendValueAt(builder array.Builder, a arrow.Array, idx int) error { } return nil case *array.Int32Builder: + if a == nil { + if defaultValue != nil { + b.Append(defaultValue.GetIntData()) + } else { + b.AppendNull() + } + return nil + } ia, ok := a.(*array.Int32) if !ok { return fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type()) @@ -70,6 +104,14 @@ func appendValueAt(builder array.Builder, a arrow.Array, idx int) error { } return nil case *array.Int64Builder: + if a == nil { + if defaultValue != nil { + b.Append(defaultValue.GetLongData()) + } else { + b.AppendNull() + } + return nil + } ia, ok := a.(*array.Int64) if !ok { return fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type()) @@ -81,6 +123,14 @@ func appendValueAt(builder array.Builder, a arrow.Array, idx int) error { } return nil case *array.Float32Builder: + if a == nil { + if defaultValue != nil { + b.Append(defaultValue.GetFloatData()) + } else { + b.AppendNull() + } + return nil + } fa, ok := a.(*array.Float32) if !ok { return fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type()) @@ -92,6 +142,14 @@ func appendValueAt(builder array.Builder, a arrow.Array, idx int) error { } return nil case *array.Float64Builder: + if a == nil { + if defaultValue != nil { + b.Append(defaultValue.GetDoubleData()) + } else { + b.AppendNull() + } + return nil + } fa, ok := a.(*array.Float64) if !ok { return fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type()) @@ -103,6 +161,14 @@ func appendValueAt(builder array.Builder, a arrow.Array, idx int) error { } return nil case *array.StringBuilder: + if a == nil { + if defaultValue != nil { + b.Append(defaultValue.GetStringData()) + } else { + b.AppendNull() + } + return nil + } sa, ok := a.(*array.String) if !ok { return fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type()) @@ -114,6 +180,11 @@ func appendValueAt(builder array.Builder, a arrow.Array, idx int) error { } return nil case *array.BinaryBuilder: + // array type, not support defaultValue now + if a == nil { + b.AppendNull() + return nil + } ba, ok := a.(*array.Binary) if !ok { return fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type()) diff --git a/internal/storage/binlog_iterator_test.go b/internal/storage/binlog_iterator_test.go index bec802328f..5a536e851f 100644 --- a/internal/storage/binlog_iterator_test.go +++ b/internal/storage/binlog_iterator_test.go @@ -65,6 +65,124 @@ func generateTestSchema() *schemapb.CollectionSchema { return schema } +func generateTestAddedFieldSchema() *schemapb.CollectionSchema { + schema := &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{ + {FieldID: common.TimeStampField, Name: "ts", DataType: schemapb.DataType_Int64}, + {FieldID: common.RowIDField, Name: "rowid", DataType: schemapb.DataType_Int64, IsPrimaryKey: true}, + {FieldID: 10, Name: "bool", DataType: schemapb.DataType_Bool}, + {FieldID: 11, Name: "int8", DataType: schemapb.DataType_Int8}, + {FieldID: 12, Name: "int16", DataType: schemapb.DataType_Int16}, + {FieldID: 13, Name: "int64", DataType: schemapb.DataType_Int64}, + {FieldID: 14, Name: "float", DataType: schemapb.DataType_Float}, + {FieldID: 15, Name: "double", DataType: schemapb.DataType_Double}, + {FieldID: 16, Name: "varchar", DataType: schemapb.DataType_VarChar}, + {FieldID: 17, Name: "string", DataType: schemapb.DataType_String}, + {FieldID: 18, Name: "array", DataType: schemapb.DataType_Array}, + {FieldID: 19, Name: "json", DataType: schemapb.DataType_JSON}, + {FieldID: 101, Name: "int32", DataType: schemapb.DataType_Int32}, + {FieldID: 102, Name: "floatVector", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "8"}, + }}, + {FieldID: 103, Name: "binaryVector", DataType: schemapb.DataType_BinaryVector, TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "8"}, + }}, + {FieldID: 104, Name: "float16Vector", DataType: schemapb.DataType_Float16Vector, TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "8"}, + }}, + {FieldID: 105, Name: "bf16Vector", DataType: schemapb.DataType_BFloat16Vector, TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "8"}, + }}, + {FieldID: 106, Name: "sparseFloatVector", DataType: schemapb.DataType_SparseFloatVector, TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "28433"}, + }}, + {FieldID: 107, Name: "bool_null", Nullable: true, DataType: schemapb.DataType_Bool}, + {FieldID: 108, Name: "int8_null", Nullable: true, DataType: schemapb.DataType_Int8}, + {FieldID: 109, Name: "int16_null", Nullable: true, DataType: schemapb.DataType_Int16}, + {FieldID: 110, Name: "int64_null", Nullable: true, DataType: schemapb.DataType_Int64}, + {FieldID: 111, Name: "float_null", Nullable: true, DataType: schemapb.DataType_Float}, + {FieldID: 112, Name: "double_null", Nullable: true, DataType: schemapb.DataType_Double}, + {FieldID: 113, Name: "varchar_null", Nullable: true, DataType: schemapb.DataType_VarChar}, + {FieldID: 114, Name: "string_null", Nullable: true, DataType: schemapb.DataType_String}, + {FieldID: 115, Name: "array_null", Nullable: true, DataType: schemapb.DataType_Array}, + {FieldID: 116, Name: "json_null", Nullable: true, DataType: schemapb.DataType_JSON}, + { + FieldID: 117, Name: "bool_with_default_value", Nullable: true, DataType: schemapb.DataType_Bool, + DefaultValue: &schemapb.ValueField{ + Data: &schemapb.ValueField_BoolData{ + BoolData: true, + }, + }, + }, + { + FieldID: 118, Name: "int8_with_default_value", Nullable: true, DataType: schemapb.DataType_Int8, + DefaultValue: &schemapb.ValueField{ + Data: &schemapb.ValueField_IntData{ + IntData: 10, + }, + }, + }, + { + FieldID: 119, Name: "int16_with_default_value", Nullable: true, DataType: schemapb.DataType_Int16, + DefaultValue: &schemapb.ValueField{ + Data: &schemapb.ValueField_IntData{ + IntData: 10, + }, + }, + }, + { + FieldID: 120, Name: "int64_with_default_value", Nullable: true, DataType: schemapb.DataType_Int64, + DefaultValue: &schemapb.ValueField{ + Data: &schemapb.ValueField_LongData{ + LongData: 10, + }, + }, + }, + { + FieldID: 121, Name: "float_with_default_value", Nullable: true, DataType: schemapb.DataType_Float, + DefaultValue: &schemapb.ValueField{ + Data: &schemapb.ValueField_FloatData{ + FloatData: 10, + }, + }, + }, + { + FieldID: 122, Name: "double_with_default_value", Nullable: true, DataType: schemapb.DataType_Double, + DefaultValue: &schemapb.ValueField{ + Data: &schemapb.ValueField_DoubleData{ + DoubleData: 10, + }, + }, + }, + { + FieldID: 123, Name: "varchar_with_default_value", Nullable: true, DataType: schemapb.DataType_VarChar, + DefaultValue: &schemapb.ValueField{ + Data: &schemapb.ValueField_StringData{ + StringData: "a", + }, + }, + }, + { + FieldID: 124, Name: "string_with_default_value", Nullable: true, DataType: schemapb.DataType_String, + DefaultValue: &schemapb.ValueField{ + Data: &schemapb.ValueField_StringData{ + StringData: "a", + }, + }, + }, + {FieldID: 125, Name: "int32_null", Nullable: true, DataType: schemapb.DataType_Int32}, + { + FieldID: 126, Name: "int32_with_default_value", Nullable: true, DataType: schemapb.DataType_Int32, + DefaultValue: &schemapb.ValueField{ + Data: &schemapb.ValueField_IntData{ + IntData: 10, + }, + }, + }, + }} + + return schema +} + func generateTestData(num int) ([]*Blob, error) { return generateTestDataWithSeed(1, num) } @@ -242,6 +360,81 @@ func assertTestDataInternal(t *testing.T, i int, value *Value, lazy bool) { }, value) } +func assertTestAddedFieldData(t *testing.T, i int, value *Value) { + getf18 := func() any { + f18 := &schemapb.ScalarField{ + Data: &schemapb.ScalarField_IntData{ + IntData: &schemapb.IntArray{ + Data: []int32{int32(i), int32(i), int32(i)}, + }, + }, + } + f18b, err := proto.Marshal(f18) + assert.Nil(t, err) + return f18b + } + + f102 := make([]float32, 8) + for j := range f102 { + f102[j] = float32(i) + } + + f104 := make([]byte, 16) + for j := range f104 { + f104[j] = byte(i) + } + + f106 := typeutil.CreateSparseFloatRow([]uint32{0, uint32(18 * i), uint32(284 * i)}, []float32{1.1, 0.3, 2.4}) + + assert.EqualExportedValues(t, &Value{ + int64(i), + &Int64PrimaryKey{Value: int64(i)}, + int64(i), + false, + map[FieldID]interface{}{ + common.TimeStampField: int64(i), + common.RowIDField: int64(i), + + 10: true, + 11: int8(i), + 12: int16(i), + 13: int64(i), + 14: float32(i), + 15: float64(i), + 16: fmt.Sprint(i), + 17: fmt.Sprint(i), + 18: getf18(), + 19: []byte{byte(i)}, + 101: int32(i), + 102: f102, + 103: []byte{0xff}, + 104: f104, + 105: f104, + 106: f106, + 107: nil, + 108: nil, + 109: nil, + 110: nil, + 111: nil, + 112: nil, + 113: nil, + 114: nil, + 115: nil, + 116: nil, + 117: true, + 118: int8(10), + 119: int16(10), + 120: int64(10), + 121: float32(10), + 122: float64(10), + 123: "a", + 124: "a", + 125: nil, + 126: int32(10), + }, + }, value) +} + func TestInsertlogIterator(t *testing.T) { t.Run("empty iterator", func(t *testing.T) { itr := &InsertBinlogIterator{ diff --git a/internal/storage/payload_writer.go b/internal/storage/payload_writer.go index 9538962dcc..f02948a545 100644 --- a/internal/storage/payload_writer.go +++ b/internal/storage/payload_writer.go @@ -100,7 +100,7 @@ func NewPayloadWriter(colType schemapb.DataType, options ...PayloadWriterOptions } else { w.dim = NewNullableInt(1) } - w.arrowType = milvusDataTypeToArrowType(colType, *w.dim.Value) + w.arrowType = MilvusDataTypeToArrowType(colType, *w.dim.Value) w.builder = array.NewBuilder(memory.DefaultAllocator, w.arrowType) return w, nil } @@ -763,7 +763,7 @@ func (w *NativePayloadWriter) Close() { w.ReleasePayloadWriter() } -func milvusDataTypeToArrowType(dataType schemapb.DataType, dim int) arrow.DataType { +func MilvusDataTypeToArrowType(dataType schemapb.DataType, dim int) arrow.DataType { switch dataType { case schemapb.DataType_Bool: return &arrow.BooleanType{} diff --git a/internal/storage/serde.go b/internal/storage/serde.go index 8241610e7d..21fb956f43 100644 --- a/internal/storage/serde.go +++ b/internal/storage/serde.go @@ -68,6 +68,9 @@ type compositeRecord struct { var _ Record = (*compositeRecord)(nil) func (r *compositeRecord) Column(i FieldID) arrow.Array { + if _, ok := r.index[i]; !ok { + return nil + } return r.recs[r.index[i]] } @@ -91,7 +94,7 @@ func (r *compositeRecord) Retain() { } func (r *compositeRecord) Slice(start, end int) Record { - slices := make([]arrow.Array, len(r.index)) + slices := make([]arrow.Array, len(r.recs)) for i, rec := range r.recs { d := array.NewSliceData(rec.Data(), int64(start), int64(end)) slices[i] = array.MakeFromData(d) @@ -667,7 +670,6 @@ func (crw *CompositeRecordWriter) GetWrittenUncompressed() uint64 { func (crw *CompositeRecordWriter) Write(r Record) error { for fieldId, w := range crw.writers { - // TODO: if field is not exist, write sr := newSelectiveRecord(r, fieldId) if err := w.Write(sr); err != nil { return err @@ -912,10 +914,10 @@ func (sr *simpleArrowRecord) ArrowSchema() *arrow.Schema { func (sr *simpleArrowRecord) Slice(start, end int) Record { s := sr.r.NewSlice(int64(start), int64(end)) - return newSimpleArrowRecord(s, sr.field2Col) + return NewSimpleArrowRecord(s, sr.field2Col) } -func newSimpleArrowRecord(r arrow.Record, field2Col map[FieldID]int) *simpleArrowRecord { +func NewSimpleArrowRecord(r arrow.Record, field2Col map[FieldID]int) *simpleArrowRecord { return &simpleArrowRecord{ r: r, field2Col: field2Col, diff --git a/internal/storage/serde_events.go b/internal/storage/serde_events.go index 8ed72accb9..215cb58856 100644 --- a/internal/storage/serde_events.go +++ b/internal/storage/serde_events.go @@ -232,6 +232,14 @@ func ValueDeserializer(r Record, v []*Value, fieldSchema []*schemapb.FieldSchema for _, f := range fieldSchema { j := f.FieldID dt := f.DataType + if r.Column(j) == nil { + if f.GetDefaultValue() != nil { + m[j] = getDefaultValue(f) + } else { + m[j] = nil + } + continue + } if r.Column(j).IsNull(i) { m[j] = nil } else { @@ -428,7 +436,7 @@ func ValueSerializer(v []*Value, fieldSchema []*schemapb.FieldSchema) (Record, e } field2Col[field.FieldID] = i } - return newSimpleArrowRecord(array.NewRecord(arrow.NewSchema(fields, nil), arrays, int64(len(v))), field2Col), nil + return NewSimpleArrowRecord(array.NewRecord(arrow.NewSchema(fields, nil), arrays, int64(len(v))), field2Col), nil } func NewBinlogSerializeWriter(schema *schemapb.CollectionSchema, partitionID, segmentID UniqueID, @@ -566,7 +574,7 @@ func newDeltalogSerializeWriter(eventWriter *DeltalogStreamWriter, batchSize int field2Col := map[FieldID]int{ 0: 0, } - return newSimpleArrowRecord(array.NewRecord(arrow.NewSchema(field, nil), arr, int64(len(v))), field2Col), nil + return NewSimpleArrowRecord(array.NewRecord(arrow.NewSchema(field, nil), arr, int64(len(v))), field2Col), nil }, batchSize), nil } @@ -815,7 +823,7 @@ func newDeltalogMultiFieldWriter(eventWriter *MultiFieldDeltalogStreamWriter, ba common.RowIDField: 0, common.TimeStampField: 1, } - return newSimpleArrowRecord(array.NewRecord(arrowSchema, arr, int64(len(v))), field2Col), nil + return NewSimpleArrowRecord(array.NewRecord(arrowSchema, arr, int64(len(v))), field2Col), nil }, batchSize), nil } diff --git a/internal/storage/serde_events_test.go b/internal/storage/serde_events_test.go index c48547b246..57fe482cf9 100644 --- a/internal/storage/serde_events_test.go +++ b/internal/storage/serde_events_test.go @@ -71,6 +71,26 @@ func TestBinlogDeserializeReader(t *testing.T) { err = reader.Next() assert.Equal(t, io.EOF, err) }) + + t.Run("test deserialize with added field", func(t *testing.T) { + size := 3 + blobs, err := generateTestData(size) + assert.NoError(t, err) + reader, err := NewBinlogDeserializeReader(generateTestAddedFieldSchema(), MakeBlobsReader(blobs)) + assert.NoError(t, err) + defer reader.Close() + + for i := 1; i <= size; i++ { + err = reader.Next() + assert.NoError(t, err) + + value := reader.Value() + assertTestAddedFieldData(t, i, value) + } + + err = reader.Next() + assert.Equal(t, io.EOF, err) + }) } func TestBinlogStreamWriter(t *testing.T) { @@ -94,7 +114,7 @@ func TestBinlogStreamWriter(t *testing.T) { []arrow.Array{arr}, int64(size), ) - r := newSimpleArrowRecord(ar, map[FieldID]int{1: 0}) + r := NewSimpleArrowRecord(ar, map[FieldID]int{1: 0}) defer r.Release() err = rw.Write(r) assert.NoError(t, err) diff --git a/internal/storage/serde_events_v2.go b/internal/storage/serde_events_v2.go index cafd13f12e..02dcd9c0ba 100644 --- a/internal/storage/serde_events_v2.go +++ b/internal/storage/serde_events_v2.go @@ -45,7 +45,7 @@ func (pr *packedRecordReader) Next() error { if err != nil || rec == nil { return io.EOF } - pr.r = newSimpleArrowRecord(rec, pr.field2Col) + pr.r = NewSimpleArrowRecord(rec, pr.field2Col) return nil } diff --git a/internal/storage/sort.go b/internal/storage/sort.go index 14cad5f845..be8eb21435 100644 --- a/internal/storage/sort.go +++ b/internal/storage/sort.go @@ -113,7 +113,7 @@ func Sort(schema *schemapb.CollectionSchema, rr []RecordReader, field2Col[fid] = c } - rec := newSimpleArrowRecord(array.NewRecord(arrow.NewSchema(fields, nil), arrays, rowNum), field2Col) + rec := NewSimpleArrowRecord(array.NewRecord(arrow.NewSchema(fields, nil), arrays, rowNum), field2Col) defer rec.Release() return rw.Write(rec) } @@ -121,7 +121,8 @@ func Sort(schema *schemapb.CollectionSchema, rr []RecordReader, for i, idx := range indices { for c, builder := range builders { fid := schema.Fields[c].FieldID - if err := appendValueAt(builder, records[idx.ri].Column(fid), idx.i); err != nil { + defaultValue := schema.Fields[c].GetDefaultValue() + if err := AppendValueAt(builder, records[idx.ri].Column(fid), idx.i, defaultValue); err != nil { return 0, err } } @@ -255,7 +256,12 @@ func MergeSort(schema *schemapb.CollectionSchema, rr []RecordReader, batchSize := 100000 builders := make([]array.Builder, len(schema.Fields)) for i, f := range schema.Fields { - b := array.NewBuilder(memory.DefaultAllocator, recs[0].Column(f.FieldID).DataType()) + var b array.Builder + if recs[0].Column(f.FieldID) == nil { + b = array.NewBuilder(memory.DefaultAllocator, MilvusDataTypeToArrowType(f.GetDataType(), 1)) + } else { + b = array.NewBuilder(memory.DefaultAllocator, recs[0].Column(f.FieldID).DataType()) + } b.Reserve(batchSize) builders[i] = b } @@ -276,7 +282,7 @@ func MergeSort(schema *schemapb.CollectionSchema, rr []RecordReader, field2Col[fid] = c } - rec := newSimpleArrowRecord(array.NewRecord(arrow.NewSchema(fields, nil), arrays, rowNum), field2Col) + rec := NewSimpleArrowRecord(array.NewRecord(arrow.NewSchema(fields, nil), arrays, rowNum), field2Col) rw.Write(rec) rec.Release() } @@ -287,7 +293,8 @@ func MergeSort(schema *schemapb.CollectionSchema, rr []RecordReader, for c, builder := range builders { fid := schema.Fields[c].FieldID - appendValueAt(builder, rr[idx.ri].Record().Column(fid), idx.i) + defaultValue := schema.Fields[c].GetDefaultValue() + AppendValueAt(builder, rr[idx.ri].Record().Column(fid), idx.i, defaultValue) } if (rc+1)%batchSize == 0 { writeRecord(int64(batchSize)) diff --git a/internal/storage/utils.go b/internal/storage/utils.go index 0747d4c684..2516f08b53 100644 --- a/internal/storage/utils.go +++ b/internal/storage/utils.go @@ -1424,3 +1424,27 @@ func IsBM25FunctionOutputField(field *schemapb.FieldSchema, collSchema *schemapb } return false } + +func getDefaultValue(fieldSchema *schemapb.FieldSchema) interface{} { + switch fieldSchema.DataType { + case schemapb.DataType_Bool: + return fieldSchema.GetDefaultValue().GetBoolData() + case schemapb.DataType_Int8: + return int8(fieldSchema.GetDefaultValue().GetIntData()) + case schemapb.DataType_Int16: + return int16(fieldSchema.GetDefaultValue().GetIntData()) + case schemapb.DataType_Int32: + return fieldSchema.GetDefaultValue().GetIntData() + case schemapb.DataType_Int64: + return fieldSchema.GetDefaultValue().GetLongData() + case schemapb.DataType_Float: + return fieldSchema.GetDefaultValue().GetFloatData() + case schemapb.DataType_Double: + return fieldSchema.GetDefaultValue().GetDoubleData() + case schemapb.DataType_VarChar, schemapb.DataType_String: + return fieldSchema.GetDefaultValue().GetStringData() + default: + // won't happen + panic(fmt.Sprintf("undefined data type:%s", fieldSchema.DataType.String())) + } +}