diff --git a/internal/datanode/compactor/clustering_compactor_test.go b/internal/datanode/compactor/clustering_compactor_test.go index 2aa49df96c..b451523f4a 100644 --- a/internal/datanode/compactor/clustering_compactor_test.go +++ b/internal/datanode/compactor/clustering_compactor_test.go @@ -228,7 +228,7 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormal() { // 8+8+8+4+7+4*4=51 // 51*1024 = 52224 // writer will automatically flush after 1024 rows. - paramtable.Get().Save(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key, "52223") + paramtable.Get().Save(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key, "60000") defer paramtable.Get().Reset(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key) compactionResult, err := s.task.Compact() @@ -315,7 +315,7 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormalByMemoryLimit( // 8+8+8+4+7+4*4=51 // 51*1024 = 52224 // writer will automatically flush after 1024 rows. - paramtable.Get().Save(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key, "52223") + paramtable.Get().Save(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key, "60000") defer paramtable.Get().Reset(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key) paramtable.Get().Save(paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.Key, "1") defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.Key) @@ -391,9 +391,9 @@ func (s *ClusteringCompactionTaskSuite) prepareCompactionWithBM25FunctionTask() func (s *ClusteringCompactionTaskSuite) TestCompactionWithBM25Function() { // 8 + 8 + 8 + 7 + 8 = 39 // 39*1024 = 39936 - // plus buffer on null bitsets etc., let's make it 45000 + // plus buffer on null bitsets etc., let's make it 50000 // writer will automatically flush after 1024 rows. - paramtable.Get().Save(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key, "45000") + paramtable.Get().Save(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key, "50000") defer paramtable.Get().Reset(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key) s.prepareCompactionWithBM25FunctionTask() diff --git a/internal/datanode/compactor/mix_compactor.go b/internal/datanode/compactor/mix_compactor.go index d68dd23edf..f49bf78c93 100644 --- a/internal/datanode/compactor/mix_compactor.go +++ b/internal/datanode/compactor/mix_compactor.go @@ -213,16 +213,6 @@ func (t *mixCompactionTask) writeSegment(ctx context.Context, } defer reader.Close() - writeSlice := func(r storage.Record, start, end int) error { - sliced := r.Slice(start, end) - defer sliced.Release() - err = mWriter.Write(sliced) - if err != nil { - log.Warn("compact wrong, failed to writer row", zap.Error(err)) - return err - } - return nil - } for { var r storage.Record r, err = reader.Next() @@ -235,12 +225,15 @@ func (t *mixCompactionTask) writeSegment(ctx context.Context, return } } - pkArray := r.Column(pkField.FieldID) - tsArray := r.Column(common.TimeStampField).(*array.Int64) - sliceStart := -1 - rows := r.Len() - for i := 0; i < rows; i++ { + var ( + pkArray = r.Column(pkField.FieldID) + tsArray = r.Column(common.TimeStampField).(*array.Int64) + sliceStart = -1 + rb *storage.RecordBuilder + ) + + for i := range r.Len() { // Filtering deleted entities var pk any switch pkField.DataType { @@ -253,13 +246,13 @@ func (t *mixCompactionTask) writeSegment(ctx context.Context, } ts := typeutil.Timestamp(tsArray.Value(i)) if entityFilter.Filtered(pk, ts) { - if sliceStart != -1 { - err = writeSlice(r, sliceStart, i) - if err != nil { - return - } - sliceStart = -1 + if rb == nil { + rb = storage.NewRecordBuilder(t.plan.GetSchema()) } + if sliceStart != -1 { + rb.Append(r, sliceStart, i) + } + sliceStart = -1 continue } @@ -268,11 +261,15 @@ func (t *mixCompactionTask) writeSegment(ctx context.Context, } } - if sliceStart != -1 { - err = writeSlice(r, sliceStart, r.Len()) - if err != nil { - return + if rb != nil { + if sliceStart != -1 { + rb.Append(r, sliceStart, r.Len()) } + if rb.GetRowNum() > 0 { + mWriter.Write(rb.Build()) + } + } else { + mWriter.Write(r) } } diff --git a/internal/datanode/compactor/segment_writer.go b/internal/datanode/compactor/segment_writer.go index 92695dcd22..d09a5e8a77 100644 --- a/internal/datanode/compactor/segment_writer.go +++ b/internal/datanode/compactor/segment_writer.go @@ -182,8 +182,8 @@ func (w *MultiSegmentWriter) splitColumnByRecord(r storage.Record, splitThresHol shortColumnGroup := storagecommon.ColumnGroup{Columns: make([]int, 0)} for i, field := range w.schema.Fields { arr := r.Column(field.FieldID) - size := storage.CalculateArraySize(arr) - rows := arr.Len() + size := arr.Data().SizeInBytes() + rows := uint64(arr.Len()) if rows != 0 && int64(size/rows) >= splitThresHold { groups = append(groups, storagecommon.ColumnGroup{Columns: []int{i}}) } else { diff --git a/internal/flushcommon/syncmgr/pack_writer_test.go b/internal/flushcommon/syncmgr/pack_writer_test.go index bf739347d6..fab95c75c9 100644 --- a/internal/flushcommon/syncmgr/pack_writer_test.go +++ b/internal/flushcommon/syncmgr/pack_writer_test.go @@ -151,7 +151,7 @@ func TestBulkPackWriter_Write(t *testing.T) { EntriesNum: 10, LogPath: "files/delta_log/123/456/789/10000", LogSize: 592, - MemorySize: 283, + MemorySize: 327, }, }, }, diff --git a/internal/storage/arrow_util.go b/internal/storage/arrow_util.go index eb1566ed38..60a27870ca 100644 --- a/internal/storage/arrow_util.go +++ b/internal/storage/arrow_util.go @@ -21,11 +21,13 @@ import ( "github.com/apache/arrow/go/v17/arrow" "github.com/apache/arrow/go/v17/arrow/array" + "github.com/apache/arrow/go/v17/arrow/memory" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) -func AppendValueAt(builder array.Builder, a arrow.Array, idx int, defaultValue *schemapb.ValueField) error { +func appendValueAt(builder array.Builder, a arrow.Array, idx int, defaultValue *schemapb.ValueField) error { switch b := builder.(type) { case *array.BooleanBuilder: if a == nil { @@ -210,3 +212,61 @@ func AppendValueAt(builder array.Builder, a arrow.Array, idx int, defaultValue * return fmt.Errorf("unsupported builder type: %T", builder) } } + +// RecordBuilder is a helper to build arrow record. +// Due to current arrow impl (v12), the write performance is largely dependent on the batch size, +// small batch size will cause write performance degradation. To work around this issue, we accumulate +// records and write them in batches. This requires additional memory copy. +type RecordBuilder struct { + fields []*schemapb.FieldSchema + builders []array.Builder + + nRows int +} + +func (b *RecordBuilder) Append(rec Record, start, end int) { + for offset := start; offset < end; offset++ { + for i, builder := range b.builders { + f := b.fields[i] + appendValueAt(builder, rec.Column(f.FieldID), offset, f.GetDefaultValue()) + } + } + b.nRows += (end - start) +} + +func (b *RecordBuilder) GetRowNum() int { + return b.nRows +} + +func (b *RecordBuilder) Build() Record { + arrays := make([]arrow.Array, len(b.builders)) + fields := make([]arrow.Field, len(b.builders)) + field2Col := make(map[FieldID]int, len(b.builders)) + for c, builder := range b.builders { + arrays[c] = builder.NewArray() + f := b.fields[c] + fid := f.FieldID + fields[c] = arrow.Field{ + Name: f.GetName(), + Type: arrays[c].DataType(), + Nullable: f.Nullable, + } + field2Col[fid] = c + } + + rec := NewSimpleArrowRecord(array.NewRecord(arrow.NewSchema(fields, nil), arrays, int64(b.nRows)), field2Col) + b.nRows = 0 + return rec +} + +func NewRecordBuilder(schema *schemapb.CollectionSchema) *RecordBuilder { + builders := make([]array.Builder, len(schema.Fields)) + for i, field := range schema.Fields { + dim, _ := typeutil.GetDim(field) + builders[i] = array.NewBuilder(memory.DefaultAllocator, serdeMap[field.DataType].arrowType(int(dim))) + } + return &RecordBuilder{ + fields: schema.Fields, + builders: builders, + } +} diff --git a/internal/storage/serde.go b/internal/storage/serde.go index ace5573aaa..0bb02165ac 100644 --- a/internal/storage/serde.go +++ b/internal/storage/serde.go @@ -17,7 +17,6 @@ package storage import ( - "encoding/binary" "fmt" "io" "math" @@ -41,7 +40,6 @@ type Record interface { Len() int Release() Retain() - Slice(start, end int) Record } type RecordReader interface { @@ -94,18 +92,6 @@ func (r *compositeRecord) Retain() { } } -func (r *compositeRecord) Slice(start, end int) Record { - slices := make([]arrow.Array, len(r.recs)) - for i, rec := range r.recs { - d := array.NewSliceData(rec.Data(), int64(start), int64(end)) - slices[i] = array.MakeFromData(d) - } - return &compositeRecord{ - index: r.index, - recs: slices, - } -} - type serdeEntry struct { // arrowType returns the arrow type for the given dimension arrowType func(int) arrow.DataType @@ -592,56 +578,6 @@ func (r *selectiveRecord) Retain() { // do nothing } -func (r *selectiveRecord) Slice(start, end int) Record { - panic("not implemented") -} - -func CalculateArraySize(a arrow.Array) int { - if a == nil || a.Data() == nil || a.Data().Buffers() == nil { - return 0 - } - - var totalSize int - offset := a.Data().Offset() - length := a.Len() - - if len(a.NullBitmapBytes()) > 0 { - totalSize += (length + 7) / 8 - } - - for i, buf := range a.Data().Buffers() { - if buf == nil { - continue - } - - switch i { - case 0: - // Handle bitmap buffer, already handled - case 1: - switch a.DataType().ID() { - case arrow.STRING, arrow.BINARY: - // Handle variable-length types like STRING/BINARY - startOffset := int(binary.LittleEndian.Uint32(buf.Bytes()[offset*4:])) - endOffset := int(binary.LittleEndian.Uint32(buf.Bytes()[(offset+length)*4:])) - totalSize += endOffset - startOffset - case arrow.LIST: - // Handle nest types like list - for i := 0; i < length; i++ { - startOffset := int(binary.LittleEndian.Uint32(buf.Bytes()[(offset+i)*4:])) - endOffset := int(binary.LittleEndian.Uint32(buf.Bytes()[(offset+i+1)*4:])) - elementSize := a.DataType().(*arrow.ListType).Elem().(arrow.FixedWidthDataType).Bytes() - totalSize += (endOffset - startOffset) * elementSize - } - default: - // Handle fixed-length types - elementSize := a.DataType().(arrow.FixedWidthDataType).Bytes() - totalSize += elementSize * length - } - } - } - return totalSize -} - func newSelectiveRecord(r Record, selectedFieldId FieldID) Record { return &selectiveRecord{ r: r, @@ -717,7 +653,7 @@ func (sfw *singleFieldRecordWriter) Write(r Record) error { sfw.numRows += r.Len() a := r.Column(sfw.fieldId) - sfw.writtenUncompressed += uint64(CalculateArraySize(a)) + sfw.writtenUncompressed += a.Data().SizeInBytes() rec := array.NewRecord(sfw.schema, []arrow.Array{a}, int64(r.Len())) defer rec.Release() return sfw.fw.WriteBuffered(rec) @@ -791,7 +727,7 @@ func (mfw *multiFieldRecordWriter) Write(r Record) error { columns := make([]arrow.Array, len(mfw.fieldIDs)) for i, fieldId := range mfw.fieldIDs { columns[i] = r.Column(fieldId) - mfw.writtenUncompressed += uint64(CalculateArraySize(columns[i])) + mfw.writtenUncompressed += columns[i].Data().SizeInBytes() } rec := array.NewRecord(mfw.schema, columns, int64(r.Len())) defer rec.Release() @@ -914,11 +850,6 @@ func (sr *simpleArrowRecord) ArrowSchema() *arrow.Schema { return sr.r.Schema() } -func (sr *simpleArrowRecord) Slice(start, end int) Record { - s := sr.r.NewSlice(int64(start), int64(end)) - return NewSimpleArrowRecord(s, sr.field2Col) -} - func NewSimpleArrowRecord(r arrow.Record, field2Col map[FieldID]int) *simpleArrowRecord { return &simpleArrowRecord{ r: r, diff --git a/internal/storage/serde_events_v2.go b/internal/storage/serde_events_v2.go index 4bc456f0e6..6ded69e495 100644 --- a/internal/storage/serde_events_v2.go +++ b/internal/storage/serde_events_v2.go @@ -157,7 +157,7 @@ func (pw *packedRecordWriter) Write(r Record) error { } pw.rowNum += int64(r.Len()) for col, arr := range rec.Columns() { - size := uint64(CalculateArraySize(arr)) + size := arr.Data().SizeInBytes() pw.writtenUncompressed += size for columnGroup, group := range pw.columnGroups { if lo.Contains(group.Columns, col) { diff --git a/internal/storage/serde_test.go b/internal/storage/serde_test.go index 42ddeb946e..24b5e82c94 100644 --- a/internal/storage/serde_test.go +++ b/internal/storage/serde_test.go @@ -165,7 +165,7 @@ func TestCalculateArraySize(t *testing.T) { tests := []struct { name string arrayBuilder func() arrow.Array - expectedSize int + expectedSize uint64 }{ { name: "Empty array", @@ -184,7 +184,7 @@ func TestCalculateArraySize(t *testing.T) { b.AppendValues([]int32{1, 2, 3, 4}, nil) return b.NewArray() }, - expectedSize: 17, // 4 elements * 4 bytes + bitmap(1bytes) + expectedSize: 20, // 4 elements * 4 bytes + bitmap(4bytes) }, { name: "Variable-length string array", @@ -194,7 +194,9 @@ func TestCalculateArraySize(t *testing.T) { b.AppendValues([]string{"hello", "world"}, nil) return b.NewArray() }, - expectedSize: 11, // "hello" (5 bytes) + "world" (5 bytes) + bitmap(1bytes) + expectedSize: 23, // bytes: "hello" (5 bytes) + "world" (5 bytes) + // offsets: 2+1 elements * 4 bytes + // bitmap(1 byte) }, { name: "Nested list array", @@ -214,7 +216,9 @@ func TestCalculateArraySize(t *testing.T) { return b.NewArray() }, - expectedSize: 21, // 3 + 2 elements in data buffer, plus bitmap(1bytes) + expectedSize: 44, // child buffer: 5 elements * 4 bytes, plus bitmap (4bytes) + // offsets: 3+1 elements * 4 bytes + // bitmap(4 bytes) }, } @@ -223,32 +227,10 @@ func TestCalculateArraySize(t *testing.T) { arr := tt.arrayBuilder() defer arr.Release() - size := CalculateArraySize(arr) + size := arr.Data().SizeInBytes() if size != tt.expectedSize { t.Errorf("Expected size %d, got %d", tt.expectedSize, size) } }) } } - -func TestCalculateArraySizeWithOffset(t *testing.T) { - mem := memory.NewCheckedAllocator(memory.DefaultAllocator) - defer mem.AssertSize(t, 0) - - b := array.NewStringBuilder(mem) - defer b.Release() - - b.AppendValues([]string{"zero", "one", "two", "three", "four"}, nil) - fullArray := b.NewArray() - defer fullArray.Release() - - slicedArray := array.NewSlice(fullArray, 1, 4) // Offset = 1, End = 4 - defer slicedArray.Release() - - size := CalculateArraySize(slicedArray) - expectedSize := len("one") + len("two") + len("three") + 1 // "one", "two", "three", bitmap(1 bytes) - - if size != expectedSize { - t.Errorf("Expected size %d, got %d", expectedSize, size) - } -} diff --git a/internal/storage/sort.go b/internal/storage/sort.go index edd4c8b411..730c253848 100644 --- a/internal/storage/sort.go +++ b/internal/storage/sort.go @@ -128,7 +128,7 @@ func Sort(schema *schemapb.CollectionSchema, rr []RecordReader, for c, builder := range builders { fid := schema.Fields[c].FieldID defaultValue := schema.Fields[c].GetDefaultValue() - if err := AppendValueAt(builder, records[idx.ri].Column(fid), idx.i, defaultValue); err != nil { + if err := appendValueAt(builder, records[idx.ri].Column(fid), idx.i, defaultValue); err != nil { return 0, err } } @@ -266,50 +266,15 @@ func MergeSort(schema *schemapb.CollectionSchema, rr []RecordReader, // small batch size will cause write performance degradation. To work around this issue, we accumulate // records and write them in batches. This requires additional memory copy. batchSize := 100000 - builders := make([]array.Builder, len(schema.Fields)) - for i, f := range schema.Fields { - var b array.Builder - if recs[0].Column(f.FieldID) == nil { - b = array.NewBuilder(memory.DefaultAllocator, MilvusDataTypeToArrowType(f.GetDataType(), 1)) - } else { - b = array.NewBuilder(memory.DefaultAllocator, recs[0].Column(f.FieldID).DataType()) - } - b.Reserve(batchSize) - builders[i] = b - } + rb := NewRecordBuilder(schema) - writeRecord := func(rowNum int64) { - arrays := make([]arrow.Array, len(builders)) - fields := make([]arrow.Field, len(builders)) - field2Col := make(map[FieldID]int, len(builders)) - - for c, builder := range builders { - arrays[c] = builder.NewArray() - builder.Release() - fid := schema.Fields[c].FieldID - fields[c] = ConvertToArrowField(schema.Fields[c], arrays[c].DataType()) - field2Col[fid] = c - } - - rec := NewSimpleArrowRecord(array.NewRecord(arrow.NewSchema(fields, nil), arrays, rowNum), field2Col) - rw.Write(rec) - rec.Release() - } - - rc := 0 for pq.Len() > 0 { idx := pq.Dequeue() - - for c, builder := range builders { - fid := schema.Fields[c].FieldID - defaultValue := schema.Fields[c].GetDefaultValue() - AppendValueAt(builder, recs[idx.ri].Column(fid), idx.i, defaultValue) - } - if (rc+1)%batchSize == 0 { - writeRecord(int64(batchSize)) - rc = 0 - } else { - rc++ + rb.Append(recs[idx.ri], idx.i, idx.i+1) + if rb.GetRowNum()%batchSize == 0 { + if err := rw.Write(rb.Build()); err != nil { + return 0, err + } } // If poped idx reaches end of segment, invalidate cache and advance to next segment @@ -326,8 +291,10 @@ func MergeSort(schema *schemapb.CollectionSchema, rr []RecordReader, } // write the last batch - if rc > 0 { - writeRecord(int64(rc)) + if rb.GetRowNum() > 0 { + if err := rw.Write(rb.Build()); err != nil { + return 0, err + } } return numRows, nil