diff --git a/internal/datanode/compactor/merge_sort.go b/internal/datanode/compactor/merge_sort.go index 6efc7714b0..13290a66b4 100644 --- a/internal/datanode/compactor/merge_sort.go +++ b/internal/datanode/compactor/merge_sort.go @@ -95,7 +95,7 @@ func mergeSortMultipleSegments(ctx context.Context, log.Warn("compaction only support int64 and varchar pk field") } - if _, err = storage.MergeSort(plan.GetSchema(), segmentReaders, writer, predicate); err != nil { + if _, err = storage.MergeSort(compactionParams.BinLogMaxSize, plan.GetSchema(), segmentReaders, writer, predicate); err != nil { return nil, err } diff --git a/internal/datanode/index/task_stats.go b/internal/datanode/index/task_stats.go index ebc81cf35d..5ef7aec28f 100644 --- a/internal/datanode/index/task_stats.go +++ b/internal/datanode/index/task_stats.go @@ -239,8 +239,9 @@ func (st *statsTask) sort(ctx context.Context) ([]*datapb.FieldBinlog, error) { log.Warn("error creating insert binlog reader", zap.Error(err)) return nil, err } + rrs := []storage.RecordReader{rr} - numValidRows, err := storage.Sort(st.req.Schema, rrs, srw, predicate) + numValidRows, err := storage.Sort(st.req.GetBinlogMaxSize(), st.req.GetSchema(), rrs, srw, predicate) if err != nil { log.Warn("sort failed", zap.Int64("taskID", st.req.GetTaskID()), zap.Error(err)) return nil, err diff --git a/internal/storage/arrow_util.go b/internal/storage/arrow_util.go index 60a27870ca..49f0e5270b 100644 --- a/internal/storage/arrow_util.go +++ b/internal/storage/arrow_util.go @@ -27,189 +27,211 @@ import ( "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) -func appendValueAt(builder array.Builder, a arrow.Array, idx int, defaultValue *schemapb.ValueField) error { +func appendValueAt(builder array.Builder, a arrow.Array, idx int, defaultValue *schemapb.ValueField) (uint64, error) { switch b := builder.(type) { case *array.BooleanBuilder: if a == nil { if defaultValue != nil { b.Append(defaultValue.GetBoolData()) + return 1, nil } else { b.AppendNull() + return 0, nil } - return nil } ba, ok := a.(*array.Boolean) if !ok { - return fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type()) + return 0, fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type()) } if ba.IsNull(idx) { b.AppendNull() + return 0, nil } else { b.Append(ba.Value(idx)) + return 1, nil } - return nil case *array.Int8Builder: if a == nil { if defaultValue != nil { b.Append(int8(defaultValue.GetIntData())) + return 1, nil } else { b.AppendNull() + return 0, nil } - return nil } ia, ok := a.(*array.Int8) if !ok { - return fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type()) + return 0, fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type()) } if ia.IsNull(idx) { b.AppendNull() + return 0, nil } else { b.Append(ia.Value(idx)) + return 1, nil } - return nil case *array.Int16Builder: if a == nil { if defaultValue != nil { b.Append(int16(defaultValue.GetIntData())) + return 2, nil } else { b.AppendNull() + return 0, nil } - return nil } ia, ok := a.(*array.Int16) if !ok { - return fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type()) + return 0, fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type()) } if ia.IsNull(idx) { b.AppendNull() + return 0, nil } else { b.Append(ia.Value(idx)) + return 2, nil } - return nil case *array.Int32Builder: if a == nil { if defaultValue != nil { b.Append(defaultValue.GetIntData()) + return 4, nil } else { b.AppendNull() + return 0, nil } - return nil } ia, ok := a.(*array.Int32) if !ok { - return fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type()) + return 0, fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type()) } if ia.IsNull(idx) { b.AppendNull() + return 0, nil } else { b.Append(ia.Value(idx)) + return 4, nil } - return nil case *array.Int64Builder: if a == nil { if defaultValue != nil { b.Append(defaultValue.GetLongData()) + return 8, nil } else { b.AppendNull() + return 0, nil } - return nil } ia, ok := a.(*array.Int64) if !ok { - return fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type()) + return 0, fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type()) } if ia.IsNull(idx) { b.AppendNull() + return 0, nil } else { b.Append(ia.Value(idx)) + return 8, nil } - return nil case *array.Float32Builder: if a == nil { if defaultValue != nil { b.Append(defaultValue.GetFloatData()) + return 4, nil } else { b.AppendNull() + return 0, nil } - return nil } fa, ok := a.(*array.Float32) if !ok { - return fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type()) + return 0, fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type()) } if fa.IsNull(idx) { b.AppendNull() + return 0, nil } else { b.Append(fa.Value(idx)) + return 4, nil } - return nil case *array.Float64Builder: if a == nil { if defaultValue != nil { b.Append(defaultValue.GetDoubleData()) + return 8, nil } else { b.AppendNull() + return 0, nil } - return nil } fa, ok := a.(*array.Float64) if !ok { - return fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type()) + return 0, fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type()) } if fa.IsNull(idx) { b.AppendNull() + return 0, nil } else { b.Append(fa.Value(idx)) + return 8, nil } - return nil case *array.StringBuilder: if a == nil { if defaultValue != nil { - b.Append(defaultValue.GetStringData()) + val := defaultValue.GetStringData() + b.Append(val) + return uint64(len(val)), nil } else { b.AppendNull() + return 0, nil } - return nil } sa, ok := a.(*array.String) if !ok { - return fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type()) + return 0, fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type()) } if sa.IsNull(idx) { b.AppendNull() + return 0, nil } else { - b.Append(sa.Value(idx)) + val := sa.Value(idx) + b.Append(val) + return uint64(len(val)), nil } - return nil case *array.BinaryBuilder: // array type, not support defaultValue now if a == nil { b.AppendNull() - return nil + return 0, nil } ba, ok := a.(*array.Binary) if !ok { - return fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type()) + return 0, fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type()) } if ba.IsNull(idx) { b.AppendNull() + return 0, nil } else { - b.Append(ba.Value(idx)) + val := ba.Value(idx) + b.Append(val) + return uint64(len(val)), nil } - return nil case *array.FixedSizeBinaryBuilder: ba, ok := a.(*array.FixedSizeBinary) if !ok { - return fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type()) + return 0, fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type()) } if ba.IsNull(idx) { b.AppendNull() + return 0, nil } else { - b.Append(ba.Value(idx)) + val := ba.Value(idx) + b.Append(val) + return uint64(len(val)), nil } - return nil default: - return fmt.Errorf("unsupported builder type: %T", builder) + return 0, fmt.Errorf("unsupported builder type: %T", builder) } } @@ -222,22 +244,33 @@ type RecordBuilder struct { builders []array.Builder nRows int + size uint64 } -func (b *RecordBuilder) Append(rec Record, start, end int) { +func (b *RecordBuilder) Append(rec Record, start, end int) error { for offset := start; offset < end; offset++ { for i, builder := range b.builders { f := b.fields[i] - appendValueAt(builder, rec.Column(f.FieldID), offset, f.GetDefaultValue()) + col := rec.Column(f.FieldID) + size, err := appendValueAt(builder, col, offset, f.GetDefaultValue()) + if err != nil { + return fmt.Errorf("failed to append value at offset %d for field %s: %w", offset, f.GetName(), err) + } + b.size += size } } b.nRows += (end - start) + return nil } func (b *RecordBuilder) GetRowNum() int { return b.nRows } +func (b *RecordBuilder) GetSize() uint64 { + return b.size +} + func (b *RecordBuilder) Build() Record { arrays := make([]arrow.Array, len(b.builders)) fields := make([]arrow.Field, len(b.builders)) @@ -256,6 +289,7 @@ func (b *RecordBuilder) Build() Record { rec := NewSimpleArrowRecord(array.NewRecord(arrow.NewSchema(fields, nil), arrays, int64(b.nRows)), field2Col) b.nRows = 0 + b.size = 0 return rec } diff --git a/internal/storage/sort.go b/internal/storage/sort.go index 03b02aed1d..cb005e5008 100644 --- a/internal/storage/sort.go +++ b/internal/storage/sort.go @@ -27,7 +27,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) -func Sort(schema *schemapb.CollectionSchema, rr []RecordReader, +func Sort(batchSize uint64, schema *schemapb.CollectionSchema, rr []RecordReader, rw RecordWriter, predicate func(r Record, ri, i int) bool, ) (int, error) { records := make([]Record, 0) @@ -90,7 +90,6 @@ func Sort(schema *schemapb.CollectionSchema, rr []RecordReader, } rb := NewRecordBuilder(schema) - batchSize := 100000 writeRecord := func() error { rec := rb.Build() defer rec.Release() @@ -100,9 +99,13 @@ func Sort(schema *schemapb.CollectionSchema, rr []RecordReader, return nil } - for i, idx := range indices { - rb.Append(records[idx.ri], idx.i, idx.i+1) - if (i+1)%batchSize == 0 { + for _, idx := range indices { + if err := rb.Append(records[idx.ri], idx.i, idx.i+1); err != nil { + return 0, err + } + + // Write when accumulated data size reaches batchSize + if rb.GetSize() >= batchSize { if err := writeRecord(); err != nil { return 0, err } @@ -165,7 +168,7 @@ func NewPriorityQueue[T any](less func(x, y *T) bool) *PriorityQueue[T] { return &pq } -func MergeSort(schema *schemapb.CollectionSchema, rr []RecordReader, +func MergeSort(batchSize uint64, schema *schemapb.CollectionSchema, rr []RecordReader, rw RecordWriter, predicate func(r Record, ri, i int) bool, ) (numRows int, err error) { type index struct { @@ -230,16 +233,15 @@ func MergeSort(schema *schemapb.CollectionSchema, rr []RecordReader, } } - // Due to current arrow impl (v12), the write performance is largely dependent on the batch size, - // small batch size will cause write performance degradation. To work around this issue, we accumulate - // records and write them in batches. This requires additional memory copy. - batchSize := 100000 rb := NewRecordBuilder(schema) for pq.Len() > 0 { idx := pq.Dequeue() rb.Append(recs[idx.ri], idx.i, idx.i+1) - if rb.GetRowNum()%batchSize == 0 { + // Due to current arrow impl (v12), the write performance is largely dependent on the batch size, + // small batch size will cause write performance degradation. To work around this issue, we accumulate + // records and write them in batches. This requires additional memory copy. + if rb.GetSize() >= batchSize { if err := rw.Write(rb.Build()); err != nil { return 0, err } diff --git a/internal/storage/sort_test.go b/internal/storage/sort_test.go index 5196768f29..77689a82ed 100644 --- a/internal/storage/sort_test.go +++ b/internal/storage/sort_test.go @@ -26,6 +26,8 @@ import ( ) func TestSort(t *testing.T) { + const batchSize = 64 * 1024 * 1024 + getReaders := func() []RecordReader { blobs, err := generateTestDataWithSeed(10, 3) assert.NoError(t, err) @@ -55,7 +57,7 @@ func TestSort(t *testing.T) { } t.Run("sort", func(t *testing.T) { - gotNumRows, err := Sort(generateTestSchema(), getReaders(), rw, func(r Record, ri, i int) bool { + gotNumRows, err := Sort(batchSize, generateTestSchema(), getReaders(), rw, func(r Record, ri, i int) bool { return true }) assert.NoError(t, err) @@ -65,7 +67,7 @@ func TestSort(t *testing.T) { }) t.Run("sort with predicate", func(t *testing.T) { - gotNumRows, err := Sort(generateTestSchema(), getReaders(), rw, func(r Record, ri, i int) bool { + gotNumRows, err := Sort(batchSize, generateTestSchema(), getReaders(), rw, func(r Record, ri, i int) bool { pk := r.Column(common.RowIDField).(*array.Int64).Value(i) return pk >= 20 }) @@ -105,8 +107,10 @@ func TestMergeSort(t *testing.T) { }, } + const batchSize = 64 * 1024 * 1024 + t.Run("merge sort", func(t *testing.T) { - gotNumRows, err := MergeSort(generateTestSchema(), getReaders(), rw, func(r Record, ri, i int) bool { + gotNumRows, err := MergeSort(batchSize, generateTestSchema(), getReaders(), rw, func(r Record, ri, i int) bool { return true }) assert.NoError(t, err) @@ -116,7 +120,7 @@ func TestMergeSort(t *testing.T) { }) t.Run("merge sort with predicate", func(t *testing.T) { - gotNumRows, err := MergeSort(generateTestSchema(), getReaders(), rw, func(r Record, ri, i int) bool { + gotNumRows, err := MergeSort(batchSize, generateTestSchema(), getReaders(), rw, func(r Record, ri, i int) bool { pk := r.Column(common.RowIDField).(*array.Int64).Value(i) return pk >= 20 }) @@ -150,11 +154,12 @@ func BenchmarkSort(b *testing.B) { }, } + const batchSize = 64 * 1024 * 1024 b.ResetTimer() b.Run("sort", func(b *testing.B) { for i := 0; i < b.N; i++ { - Sort(generateTestSchema(), rr, rw, func(r Record, ri, i int) bool { + Sort(batchSize, generateTestSchema(), rr, rw, func(r Record, ri, i int) bool { return true }) }