enhance: simplify size calculation in file writers (#40808)

See: #40342

---------

Signed-off-by: Ted Xu <ted.xu@zilliz.com>
This commit is contained in:
Ted Xu 2025-03-26 20:04:22 +08:00 committed by GitHub
parent e2e1493580
commit 128efaa3e3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 113 additions and 176 deletions

View File

@ -228,7 +228,7 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormal() {
// 8+8+8+4+7+4*4=51
// 51*1024 = 52224
// writer will automatically flush after 1024 rows.
paramtable.Get().Save(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key, "52223")
paramtable.Get().Save(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key, "60000")
defer paramtable.Get().Reset(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key)
compactionResult, err := s.task.Compact()
@ -315,7 +315,7 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormalByMemoryLimit(
// 8+8+8+4+7+4*4=51
// 51*1024 = 52224
// writer will automatically flush after 1024 rows.
paramtable.Get().Save(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key, "52223")
paramtable.Get().Save(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key, "60000")
defer paramtable.Get().Reset(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key)
paramtable.Get().Save(paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.Key, "1")
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.Key)
@ -391,9 +391,9 @@ func (s *ClusteringCompactionTaskSuite) prepareCompactionWithBM25FunctionTask()
func (s *ClusteringCompactionTaskSuite) TestCompactionWithBM25Function() {
// 8 + 8 + 8 + 7 + 8 = 39
// 39*1024 = 39936
// plus buffer on null bitsets etc., let's make it 45000
// plus buffer on null bitsets etc., let's make it 50000
// writer will automatically flush after 1024 rows.
paramtable.Get().Save(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key, "45000")
paramtable.Get().Save(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key, "50000")
defer paramtable.Get().Reset(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key)
s.prepareCompactionWithBM25FunctionTask()

View File

@ -213,16 +213,6 @@ func (t *mixCompactionTask) writeSegment(ctx context.Context,
}
defer reader.Close()
writeSlice := func(r storage.Record, start, end int) error {
sliced := r.Slice(start, end)
defer sliced.Release()
err = mWriter.Write(sliced)
if err != nil {
log.Warn("compact wrong, failed to writer row", zap.Error(err))
return err
}
return nil
}
for {
var r storage.Record
r, err = reader.Next()
@ -235,12 +225,15 @@ func (t *mixCompactionTask) writeSegment(ctx context.Context,
return
}
}
pkArray := r.Column(pkField.FieldID)
tsArray := r.Column(common.TimeStampField).(*array.Int64)
sliceStart := -1
rows := r.Len()
for i := 0; i < rows; i++ {
var (
pkArray = r.Column(pkField.FieldID)
tsArray = r.Column(common.TimeStampField).(*array.Int64)
sliceStart = -1
rb *storage.RecordBuilder
)
for i := range r.Len() {
// Filtering deleted entities
var pk any
switch pkField.DataType {
@ -253,13 +246,13 @@ func (t *mixCompactionTask) writeSegment(ctx context.Context,
}
ts := typeutil.Timestamp(tsArray.Value(i))
if entityFilter.Filtered(pk, ts) {
if rb == nil {
rb = storage.NewRecordBuilder(t.plan.GetSchema())
}
if sliceStart != -1 {
err = writeSlice(r, sliceStart, i)
if err != nil {
return
rb.Append(r, sliceStart, i)
}
sliceStart = -1
}
continue
}
@ -268,11 +261,15 @@ func (t *mixCompactionTask) writeSegment(ctx context.Context,
}
}
if rb != nil {
if sliceStart != -1 {
err = writeSlice(r, sliceStart, r.Len())
if err != nil {
return
rb.Append(r, sliceStart, r.Len())
}
if rb.GetRowNum() > 0 {
mWriter.Write(rb.Build())
}
} else {
mWriter.Write(r)
}
}

View File

@ -182,8 +182,8 @@ func (w *MultiSegmentWriter) splitColumnByRecord(r storage.Record, splitThresHol
shortColumnGroup := storagecommon.ColumnGroup{Columns: make([]int, 0)}
for i, field := range w.schema.Fields {
arr := r.Column(field.FieldID)
size := storage.CalculateArraySize(arr)
rows := arr.Len()
size := arr.Data().SizeInBytes()
rows := uint64(arr.Len())
if rows != 0 && int64(size/rows) >= splitThresHold {
groups = append(groups, storagecommon.ColumnGroup{Columns: []int{i}})
} else {

View File

@ -151,7 +151,7 @@ func TestBulkPackWriter_Write(t *testing.T) {
EntriesNum: 10,
LogPath: "files/delta_log/123/456/789/10000",
LogSize: 592,
MemorySize: 283,
MemorySize: 327,
},
},
},

View File

@ -21,11 +21,13 @@ import (
"github.com/apache/arrow/go/v17/arrow"
"github.com/apache/arrow/go/v17/arrow/array"
"github.com/apache/arrow/go/v17/arrow/memory"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
func AppendValueAt(builder array.Builder, a arrow.Array, idx int, defaultValue *schemapb.ValueField) error {
func appendValueAt(builder array.Builder, a arrow.Array, idx int, defaultValue *schemapb.ValueField) error {
switch b := builder.(type) {
case *array.BooleanBuilder:
if a == nil {
@ -210,3 +212,61 @@ func AppendValueAt(builder array.Builder, a arrow.Array, idx int, defaultValue *
return fmt.Errorf("unsupported builder type: %T", builder)
}
}
// RecordBuilder is a helper to build arrow record.
// Due to current arrow impl (v12), the write performance is largely dependent on the batch size,
// small batch size will cause write performance degradation. To work around this issue, we accumulate
// records and write them in batches. This requires additional memory copy.
type RecordBuilder struct {
fields []*schemapb.FieldSchema
builders []array.Builder
nRows int
}
func (b *RecordBuilder) Append(rec Record, start, end int) {
for offset := start; offset < end; offset++ {
for i, builder := range b.builders {
f := b.fields[i]
appendValueAt(builder, rec.Column(f.FieldID), offset, f.GetDefaultValue())
}
}
b.nRows += (end - start)
}
func (b *RecordBuilder) GetRowNum() int {
return b.nRows
}
func (b *RecordBuilder) Build() Record {
arrays := make([]arrow.Array, len(b.builders))
fields := make([]arrow.Field, len(b.builders))
field2Col := make(map[FieldID]int, len(b.builders))
for c, builder := range b.builders {
arrays[c] = builder.NewArray()
f := b.fields[c]
fid := f.FieldID
fields[c] = arrow.Field{
Name: f.GetName(),
Type: arrays[c].DataType(),
Nullable: f.Nullable,
}
field2Col[fid] = c
}
rec := NewSimpleArrowRecord(array.NewRecord(arrow.NewSchema(fields, nil), arrays, int64(b.nRows)), field2Col)
b.nRows = 0
return rec
}
func NewRecordBuilder(schema *schemapb.CollectionSchema) *RecordBuilder {
builders := make([]array.Builder, len(schema.Fields))
for i, field := range schema.Fields {
dim, _ := typeutil.GetDim(field)
builders[i] = array.NewBuilder(memory.DefaultAllocator, serdeMap[field.DataType].arrowType(int(dim)))
}
return &RecordBuilder{
fields: schema.Fields,
builders: builders,
}
}

View File

@ -17,7 +17,6 @@
package storage
import (
"encoding/binary"
"fmt"
"io"
"math"
@ -41,7 +40,6 @@ type Record interface {
Len() int
Release()
Retain()
Slice(start, end int) Record
}
type RecordReader interface {
@ -94,18 +92,6 @@ func (r *compositeRecord) Retain() {
}
}
func (r *compositeRecord) Slice(start, end int) Record {
slices := make([]arrow.Array, len(r.recs))
for i, rec := range r.recs {
d := array.NewSliceData(rec.Data(), int64(start), int64(end))
slices[i] = array.MakeFromData(d)
}
return &compositeRecord{
index: r.index,
recs: slices,
}
}
type serdeEntry struct {
// arrowType returns the arrow type for the given dimension
arrowType func(int) arrow.DataType
@ -592,56 +578,6 @@ func (r *selectiveRecord) Retain() {
// do nothing
}
func (r *selectiveRecord) Slice(start, end int) Record {
panic("not implemented")
}
func CalculateArraySize(a arrow.Array) int {
if a == nil || a.Data() == nil || a.Data().Buffers() == nil {
return 0
}
var totalSize int
offset := a.Data().Offset()
length := a.Len()
if len(a.NullBitmapBytes()) > 0 {
totalSize += (length + 7) / 8
}
for i, buf := range a.Data().Buffers() {
if buf == nil {
continue
}
switch i {
case 0:
// Handle bitmap buffer, already handled
case 1:
switch a.DataType().ID() {
case arrow.STRING, arrow.BINARY:
// Handle variable-length types like STRING/BINARY
startOffset := int(binary.LittleEndian.Uint32(buf.Bytes()[offset*4:]))
endOffset := int(binary.LittleEndian.Uint32(buf.Bytes()[(offset+length)*4:]))
totalSize += endOffset - startOffset
case arrow.LIST:
// Handle nest types like list
for i := 0; i < length; i++ {
startOffset := int(binary.LittleEndian.Uint32(buf.Bytes()[(offset+i)*4:]))
endOffset := int(binary.LittleEndian.Uint32(buf.Bytes()[(offset+i+1)*4:]))
elementSize := a.DataType().(*arrow.ListType).Elem().(arrow.FixedWidthDataType).Bytes()
totalSize += (endOffset - startOffset) * elementSize
}
default:
// Handle fixed-length types
elementSize := a.DataType().(arrow.FixedWidthDataType).Bytes()
totalSize += elementSize * length
}
}
}
return totalSize
}
func newSelectiveRecord(r Record, selectedFieldId FieldID) Record {
return &selectiveRecord{
r: r,
@ -717,7 +653,7 @@ func (sfw *singleFieldRecordWriter) Write(r Record) error {
sfw.numRows += r.Len()
a := r.Column(sfw.fieldId)
sfw.writtenUncompressed += uint64(CalculateArraySize(a))
sfw.writtenUncompressed += a.Data().SizeInBytes()
rec := array.NewRecord(sfw.schema, []arrow.Array{a}, int64(r.Len()))
defer rec.Release()
return sfw.fw.WriteBuffered(rec)
@ -791,7 +727,7 @@ func (mfw *multiFieldRecordWriter) Write(r Record) error {
columns := make([]arrow.Array, len(mfw.fieldIDs))
for i, fieldId := range mfw.fieldIDs {
columns[i] = r.Column(fieldId)
mfw.writtenUncompressed += uint64(CalculateArraySize(columns[i]))
mfw.writtenUncompressed += columns[i].Data().SizeInBytes()
}
rec := array.NewRecord(mfw.schema, columns, int64(r.Len()))
defer rec.Release()
@ -914,11 +850,6 @@ func (sr *simpleArrowRecord) ArrowSchema() *arrow.Schema {
return sr.r.Schema()
}
func (sr *simpleArrowRecord) Slice(start, end int) Record {
s := sr.r.NewSlice(int64(start), int64(end))
return NewSimpleArrowRecord(s, sr.field2Col)
}
func NewSimpleArrowRecord(r arrow.Record, field2Col map[FieldID]int) *simpleArrowRecord {
return &simpleArrowRecord{
r: r,

View File

@ -157,7 +157,7 @@ func (pw *packedRecordWriter) Write(r Record) error {
}
pw.rowNum += int64(r.Len())
for col, arr := range rec.Columns() {
size := uint64(CalculateArraySize(arr))
size := arr.Data().SizeInBytes()
pw.writtenUncompressed += size
for columnGroup, group := range pw.columnGroups {
if lo.Contains(group.Columns, col) {

View File

@ -165,7 +165,7 @@ func TestCalculateArraySize(t *testing.T) {
tests := []struct {
name string
arrayBuilder func() arrow.Array
expectedSize int
expectedSize uint64
}{
{
name: "Empty array",
@ -184,7 +184,7 @@ func TestCalculateArraySize(t *testing.T) {
b.AppendValues([]int32{1, 2, 3, 4}, nil)
return b.NewArray()
},
expectedSize: 17, // 4 elements * 4 bytes + bitmap(1bytes)
expectedSize: 20, // 4 elements * 4 bytes + bitmap(4bytes)
},
{
name: "Variable-length string array",
@ -194,7 +194,9 @@ func TestCalculateArraySize(t *testing.T) {
b.AppendValues([]string{"hello", "world"}, nil)
return b.NewArray()
},
expectedSize: 11, // "hello" (5 bytes) + "world" (5 bytes) + bitmap(1bytes)
expectedSize: 23, // bytes: "hello" (5 bytes) + "world" (5 bytes)
// offsets: 2+1 elements * 4 bytes
// bitmap(1 byte)
},
{
name: "Nested list array",
@ -214,7 +216,9 @@ func TestCalculateArraySize(t *testing.T) {
return b.NewArray()
},
expectedSize: 21, // 3 + 2 elements in data buffer, plus bitmap(1bytes)
expectedSize: 44, // child buffer: 5 elements * 4 bytes, plus bitmap (4bytes)
// offsets: 3+1 elements * 4 bytes
// bitmap(4 bytes)
},
}
@ -223,32 +227,10 @@ func TestCalculateArraySize(t *testing.T) {
arr := tt.arrayBuilder()
defer arr.Release()
size := CalculateArraySize(arr)
size := arr.Data().SizeInBytes()
if size != tt.expectedSize {
t.Errorf("Expected size %d, got %d", tt.expectedSize, size)
}
})
}
}
func TestCalculateArraySizeWithOffset(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(t, 0)
b := array.NewStringBuilder(mem)
defer b.Release()
b.AppendValues([]string{"zero", "one", "two", "three", "four"}, nil)
fullArray := b.NewArray()
defer fullArray.Release()
slicedArray := array.NewSlice(fullArray, 1, 4) // Offset = 1, End = 4
defer slicedArray.Release()
size := CalculateArraySize(slicedArray)
expectedSize := len("one") + len("two") + len("three") + 1 // "one", "two", "three", bitmap(1 bytes)
if size != expectedSize {
t.Errorf("Expected size %d, got %d", expectedSize, size)
}
}

View File

@ -128,7 +128,7 @@ func Sort(schema *schemapb.CollectionSchema, rr []RecordReader,
for c, builder := range builders {
fid := schema.Fields[c].FieldID
defaultValue := schema.Fields[c].GetDefaultValue()
if err := AppendValueAt(builder, records[idx.ri].Column(fid), idx.i, defaultValue); err != nil {
if err := appendValueAt(builder, records[idx.ri].Column(fid), idx.i, defaultValue); err != nil {
return 0, err
}
}
@ -266,50 +266,15 @@ func MergeSort(schema *schemapb.CollectionSchema, rr []RecordReader,
// small batch size will cause write performance degradation. To work around this issue, we accumulate
// records and write them in batches. This requires additional memory copy.
batchSize := 100000
builders := make([]array.Builder, len(schema.Fields))
for i, f := range schema.Fields {
var b array.Builder
if recs[0].Column(f.FieldID) == nil {
b = array.NewBuilder(memory.DefaultAllocator, MilvusDataTypeToArrowType(f.GetDataType(), 1))
} else {
b = array.NewBuilder(memory.DefaultAllocator, recs[0].Column(f.FieldID).DataType())
}
b.Reserve(batchSize)
builders[i] = b
}
rb := NewRecordBuilder(schema)
writeRecord := func(rowNum int64) {
arrays := make([]arrow.Array, len(builders))
fields := make([]arrow.Field, len(builders))
field2Col := make(map[FieldID]int, len(builders))
for c, builder := range builders {
arrays[c] = builder.NewArray()
builder.Release()
fid := schema.Fields[c].FieldID
fields[c] = ConvertToArrowField(schema.Fields[c], arrays[c].DataType())
field2Col[fid] = c
}
rec := NewSimpleArrowRecord(array.NewRecord(arrow.NewSchema(fields, nil), arrays, rowNum), field2Col)
rw.Write(rec)
rec.Release()
}
rc := 0
for pq.Len() > 0 {
idx := pq.Dequeue()
for c, builder := range builders {
fid := schema.Fields[c].FieldID
defaultValue := schema.Fields[c].GetDefaultValue()
AppendValueAt(builder, recs[idx.ri].Column(fid), idx.i, defaultValue)
rb.Append(recs[idx.ri], idx.i, idx.i+1)
if rb.GetRowNum()%batchSize == 0 {
if err := rw.Write(rb.Build()); err != nil {
return 0, err
}
if (rc+1)%batchSize == 0 {
writeRecord(int64(batchSize))
rc = 0
} else {
rc++
}
// If poped idx reaches end of segment, invalidate cache and advance to next segment
@ -326,8 +291,10 @@ func MergeSort(schema *schemapb.CollectionSchema, rr []RecordReader,
}
// write the last batch
if rc > 0 {
writeRecord(int64(rc))
if rb.GetRowNum() > 0 {
if err := rw.Write(rb.Build()); err != nil {
return 0, err
}
}
return numRows, nil