fix: Accurate size estimation for sliced arrow arrays in compaction (#45294)

Sliced arrow arrays "incorrectly" returned the original array's size via
SizeInBytes(), causing inaccurate memory estimates during compaction.

This resulted in segments closing prematurely in mergeSplit mode -
expected 500MB compactions produced 4x100+MB segments instead.

Fixed by calculating actual byte size of sliced arrays, ensuring proper
segment sizing and more accurate memory usage tracking.

See also: #45293

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
XuanYang-cn 2025-11-06 14:57:34 +08:00 committed by GitHub
parent e284733399
commit 623a9e5156
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 657 additions and 3 deletions

View File

@ -26,6 +26,7 @@ import (
"github.com/apache/arrow/go/v17/arrow"
"github.com/apache/arrow/go/v17/arrow/array"
"github.com/apache/arrow/go/v17/arrow/bitutil"
"github.com/apache/arrow/go/v17/parquet"
"github.com/apache/arrow/go/v17/parquet/compress"
"github.com/apache/arrow/go/v17/parquet/pqarrow"
@ -831,7 +832,7 @@ func (sfw *singleFieldRecordWriter) Write(r Record) error {
sfw.numRows += r.Len()
a := r.Column(sfw.fieldId)
sfw.writtenUncompressed += a.Data().SizeInBytes()
sfw.writtenUncompressed += calculateActualDataSize(a)
rec := array.NewRecord(sfw.schema, []arrow.Array{a}, int64(r.Len()))
defer rec.Release()
return sfw.fw.WriteBuffered(rec)
@ -1046,7 +1047,7 @@ func (mfw *multiFieldRecordWriter) Write(r Record) error {
columns := make([]arrow.Array, len(mfw.fieldIDs))
for i, fieldId := range mfw.fieldIDs {
columns[i] = r.Column(fieldId)
mfw.writtenUncompressed += columns[i].Data().SizeInBytes()
mfw.writtenUncompressed += calculateActualDataSize(columns[i])
}
rec := array.NewRecord(mfw.schema, columns, int64(r.Len()))
defer rec.Release()
@ -1225,3 +1226,261 @@ func BuildRecord(b *array.RecordBuilder, data *InsertData, schema *schemapb.Coll
}
return nil
}
func calculateActualDataSize(a arrow.Array) uint64 {
data := a.Data()
if data == nil {
return 0
}
return ActualSizeInBytes(data)
}
// calculate preciese data size of sliced ArrayData
func ActualSizeInBytes(data arrow.ArrayData) uint64 {
var size uint64
dt := data.DataType()
length := data.Len()
offset := data.Offset()
buffers := data.Buffers()
switch dt.ID() {
case arrow.NULL:
return 0
case arrow.BOOL:
if buffers[0] != nil {
size += uint64(bitutil.BytesForBits(int64(length)))
}
if buffers[1] != nil {
size += uint64(bitutil.BytesForBits(int64(length)))
}
case arrow.UINT8, arrow.INT8:
if buffers[0] != nil {
size += uint64(bitutil.BytesForBits(int64(length)))
}
if buffers[1] != nil {
size += uint64(length)
}
case arrow.UINT16, arrow.INT16, arrow.FLOAT16:
if buffers[0] != nil {
size += uint64(bitutil.BytesForBits(int64(length)))
}
if buffers[1] != nil {
size += uint64(length * 2)
}
case arrow.UINT32, arrow.INT32, arrow.FLOAT32, arrow.DATE32, arrow.TIME32,
arrow.INTERVAL_MONTHS:
if buffers[0] != nil {
size += uint64(bitutil.BytesForBits(int64(length)))
}
if buffers[1] != nil {
size += uint64(length * 4)
}
case arrow.UINT64, arrow.INT64, arrow.FLOAT64, arrow.DATE64, arrow.TIME64,
arrow.TIMESTAMP, arrow.DURATION, arrow.INTERVAL_DAY_TIME:
if buffers[0] != nil {
size += uint64(bitutil.BytesForBits(int64(length)))
}
if buffers[1] != nil {
size += uint64(length * 8)
}
case arrow.INTERVAL_MONTH_DAY_NANO:
if buffers[0] != nil {
size += uint64(bitutil.BytesForBits(int64(length)))
}
if buffers[1] != nil {
size += uint64(length * 16)
}
case arrow.DECIMAL128:
if buffers[0] != nil {
size += uint64(bitutil.BytesForBits(int64(length)))
}
if buffers[1] != nil {
size += uint64(length * 16)
}
case arrow.DECIMAL256:
if buffers[0] != nil {
size += uint64(bitutil.BytesForBits(int64(length)))
}
if buffers[1] != nil {
size += uint64(length * 32)
}
case arrow.FIXED_SIZE_BINARY:
fsbType := dt.(*arrow.FixedSizeBinaryType)
byteWidth := fsbType.ByteWidth
if buffers[0] != nil {
size += uint64(bitutil.BytesForBits(int64(length)))
}
if buffers[1] != nil {
size += uint64(length * byteWidth)
}
case arrow.STRING, arrow.BINARY:
if buffers[0] != nil {
size += uint64(bitutil.BytesForBits(int64(length)))
}
if buffers[1] != nil && buffers[2] != nil {
size += uint64((length + 1) * 4)
offsets := arrow.Int32Traits.CastFromBytes(buffers[1].Bytes())
if offset+length < len(offsets) {
dataStart := offsets[offset]
dataEnd := offsets[offset+length]
size += uint64(dataEnd - dataStart)
}
}
case arrow.LARGE_STRING, arrow.LARGE_BINARY:
if buffers[0] != nil {
size += uint64(bitutil.BytesForBits(int64(length)))
}
if buffers[1] != nil && buffers[2] != nil {
size += uint64((length + 1) * 8)
offsets := arrow.Int64Traits.CastFromBytes(buffers[1].Bytes())
if offset+length < len(offsets) {
dataStart := offsets[offset]
dataEnd := offsets[offset+length]
size += uint64(dataEnd - dataStart)
}
}
case arrow.STRING_VIEW, arrow.BINARY_VIEW:
if buffers[0] != nil {
size += uint64(bitutil.BytesForBits(int64(length)))
}
if buffers[1] != nil {
size += uint64(length * arrow.ViewHeaderSizeBytes)
}
for i := 2; i < len(buffers); i++ {
if buffers[i] != nil {
size += uint64(buffers[i].Len())
}
}
case arrow.LIST, arrow.MAP:
if buffers[0] != nil {
size += uint64(bitutil.BytesForBits(int64(length)))
}
if buffers[1] != nil {
size += uint64((length + 1) * 4)
}
for _, child := range data.Children() {
size += ActualSizeInBytes(child)
}
case arrow.LARGE_LIST:
if buffers[0] != nil {
size += uint64(bitutil.BytesForBits(int64(length)))
}
if buffers[1] != nil {
size += uint64((length + 1) * 8)
}
for _, child := range data.Children() {
size += ActualSizeInBytes(child)
}
case arrow.LIST_VIEW:
if buffers[0] != nil {
size += uint64(bitutil.BytesForBits(int64(length)))
}
if buffers[1] != nil {
size += uint64(length * 4)
}
if buffers[2] != nil {
size += uint64(length * 4)
}
for _, child := range data.Children() {
size += ActualSizeInBytes(child)
}
case arrow.LARGE_LIST_VIEW:
if buffers[0] != nil {
size += uint64(bitutil.BytesForBits(int64(length)))
}
if buffers[1] != nil {
size += uint64(length * 8)
}
if buffers[2] != nil {
size += uint64(length * 8)
}
for _, child := range data.Children() {
size += ActualSizeInBytes(child)
}
case arrow.FIXED_SIZE_LIST:
if buffers[0] != nil {
size += uint64(bitutil.BytesForBits(int64(length)))
}
for _, child := range data.Children() {
size += ActualSizeInBytes(child)
}
case arrow.STRUCT:
if buffers[0] != nil {
size += uint64(bitutil.BytesForBits(int64(length)))
}
for _, child := range data.Children() {
size += ActualSizeInBytes(child)
}
case arrow.SPARSE_UNION:
if buffers[0] != nil {
size += uint64(length)
}
for _, child := range data.Children() {
size += ActualSizeInBytes(child)
}
case arrow.DENSE_UNION:
if buffers[0] != nil {
size += uint64(length)
}
if buffers[1] != nil {
size += uint64(length * 4)
}
for _, child := range data.Children() {
size += ActualSizeInBytes(child)
}
case arrow.DICTIONARY:
for _, buf := range buffers {
if buf != nil {
size += uint64(buf.Len())
}
}
if dict := data.Dictionary(); dict != nil {
size += ActualSizeInBytes(dict)
}
case arrow.RUN_END_ENCODED:
for _, child := range data.Children() {
size += ActualSizeInBytes(child)
}
case arrow.EXTENSION:
extType := dt.(arrow.ExtensionType)
storageData := array.NewData(extType.StorageType(), length, buffers, data.Children(), data.NullN(), offset)
size = ActualSizeInBytes(storageData)
storageData.Release()
default:
for _, buf := range buffers {
if buf != nil {
size += uint64(buf.Len())
}
}
for _, child := range data.Children() {
size += ActualSizeInBytes(child)
}
}
return size
}

View File

@ -140,7 +140,8 @@ func (pw *packedRecordWriter) Write(r Record) error {
}
pw.rowNum += int64(r.Len())
for col, arr := range rec.Columns() {
size := arr.Data().SizeInBytes()
// size := arr.Data().SizeInBytes()
size := calculateActualDataSize(arr)
pw.writtenUncompressed += size
for _, columnGroup := range pw.columnGroups {
if lo.Contains(columnGroup.Columns, col) {

View File

@ -24,6 +24,7 @@ import (
"github.com/apache/arrow/go/v17/arrow"
"github.com/apache/arrow/go/v17/arrow/array"
"github.com/apache/arrow/go/v17/arrow/bitutil"
"github.com/apache/arrow/go/v17/arrow/memory"
"github.com/stretchr/testify/assert"
@ -462,3 +463,396 @@ func TestArrayOfVectorIntegration(t *testing.T) {
assert.True(t, ok)
assert.Equal(t, "4", dimStr)
}
func TestActualSizeInBytesSlicedFixedSizeBinary(t *testing.T) {
dim := 128
byteWidth := dim * 4
totalRows := 1000
builder := array.NewFixedSizeBinaryBuilder(memory.DefaultAllocator, &arrow.FixedSizeBinaryType{ByteWidth: byteWidth})
defer builder.Release()
for i := 0; i < totalRows; i++ {
vec := make([]byte, byteWidth)
for j := range vec {
vec[j] = byte((i + j) % 256)
}
builder.Append(vec)
}
arr := builder.NewArray().(*array.FixedSizeBinary)
defer arr.Release()
t.Run("Full array", func(t *testing.T) {
actualSize := ActualSizeInBytes(arr.Data())
expectedSize := uint64(bitutil.BytesForBits(int64(totalRows))) + uint64(totalRows*byteWidth)
assert.Equal(t, expectedSize, actualSize)
t.Logf("Full array - ActualSize: %d, Expected: %d", actualSize, expectedSize)
})
t.Run("Sliced array [100:200]", func(t *testing.T) {
sliced := array.NewSlice(arr, 100, 200).(*array.FixedSizeBinary)
defer sliced.Release()
slicedLen := sliced.Len()
actualSize := ActualSizeInBytes(sliced.Data())
expectedSize := uint64(bitutil.BytesForBits(int64(slicedLen))) + uint64(slicedLen*byteWidth)
assert.Equal(t, 100, slicedLen)
assert.Equal(t, expectedSize, actualSize)
assert.Less(t, actualSize, ActualSizeInBytes(arr.Data()))
t.Logf("Sliced [100:200] - ActualSize: %d, Expected: %d (length: %d)", actualSize, expectedSize, slicedLen)
})
t.Run("Sliced array [0:10]", func(t *testing.T) {
sliced := array.NewSlice(arr, 0, 10).(*array.FixedSizeBinary)
defer sliced.Release()
slicedLen := sliced.Len()
actualSize := ActualSizeInBytes(sliced.Data())
expectedSize := uint64(bitutil.BytesForBits(int64(slicedLen))) + uint64(slicedLen*byteWidth)
assert.Equal(t, 10, slicedLen)
assert.Equal(t, expectedSize, actualSize)
t.Logf("Sliced [0:10] - ActualSize: %d, Expected: %d", actualSize, expectedSize)
})
t.Run("Sliced array [990:1000]", func(t *testing.T) {
sliced := array.NewSlice(arr, 990, 1000).(*array.FixedSizeBinary)
defer sliced.Release()
slicedLen := sliced.Len()
actualSize := ActualSizeInBytes(sliced.Data())
expectedSize := uint64(bitutil.BytesForBits(int64(slicedLen))) + uint64(slicedLen*byteWidth)
assert.Equal(t, 10, slicedLen)
assert.Equal(t, expectedSize, actualSize)
t.Logf("Sliced [990:1000] - ActualSize: %d, Expected: %d", actualSize, expectedSize)
})
}
func TestActualSizeInBytesSlicedString(t *testing.T) {
totalRows := 100
builder := array.NewStringBuilder(memory.DefaultAllocator)
defer builder.Release()
for i := 0; i < totalRows; i++ {
builder.Append(string(make([]byte, i+10)))
}
arr := builder.NewArray().(*array.String)
defer arr.Release()
t.Run("Full array", func(t *testing.T) {
actualSize := ActualSizeInBytes(arr.Data())
expectedDataSize := (10 + 109) * 50
expectedOffsetSize := (totalRows + 1) * 4
expectedNullBitmapSize := bitutil.BytesForBits(int64(totalRows))
expectedTotal := uint64(expectedNullBitmapSize + int64(expectedOffsetSize) + int64(expectedDataSize))
assert.GreaterOrEqual(t, actualSize, expectedTotal)
t.Logf("Full array - ActualSize: %d", actualSize)
})
t.Run("Sliced array [10:20]", func(t *testing.T) {
sliced := array.NewSlice(arr, 10, 20).(*array.String)
defer sliced.Release()
slicedLen := sliced.Len()
actualSize := ActualSizeInBytes(sliced.Data())
assert.Equal(t, 10, slicedLen)
assert.Less(t, actualSize, ActualSizeInBytes(arr.Data()))
t.Logf("Sliced [10:20] - ActualSize: %d (length: %d)", actualSize, slicedLen)
})
t.Run("Sliced array [0:5]", func(t *testing.T) {
sliced := array.NewSlice(arr, 0, 5).(*array.String)
defer sliced.Release()
slicedLen := sliced.Len()
actualSize := ActualSizeInBytes(sliced.Data())
assert.Equal(t, 5, slicedLen)
assert.Less(t, actualSize, ActualSizeInBytes(arr.Data()))
t.Logf("Sliced [0:5] - ActualSize: %d", actualSize)
})
}
func TestActualSizeInBytesSlicedInt64(t *testing.T) {
totalRows := 1000
builder := array.NewInt64Builder(memory.DefaultAllocator)
defer builder.Release()
for i := 0; i < totalRows; i++ {
builder.Append(int64(i))
}
arr := builder.NewArray().(*array.Int64)
defer arr.Release()
t.Run("Full array", func(t *testing.T) {
actualSize := ActualSizeInBytes(arr.Data())
expectedSize := uint64(bitutil.BytesForBits(int64(totalRows))) + uint64(totalRows*8)
assert.Equal(t, expectedSize, actualSize)
t.Logf("Full array - ActualSize: %d, Expected: %d", actualSize, expectedSize)
})
t.Run("Sliced array [100:200]", func(t *testing.T) {
sliced := array.NewSlice(arr, 100, 200).(*array.Int64)
defer sliced.Release()
slicedLen := sliced.Len()
actualSize := ActualSizeInBytes(sliced.Data())
expectedSize := uint64(bitutil.BytesForBits(int64(slicedLen))) + uint64(slicedLen*8)
assert.Equal(t, 100, slicedLen)
assert.Equal(t, expectedSize, actualSize)
t.Logf("Sliced [100:200] - ActualSize: %d, Expected: %d", actualSize, expectedSize)
})
t.Run("Sliced array [500:501]", func(t *testing.T) {
sliced := array.NewSlice(arr, 500, 501).(*array.Int64)
defer sliced.Release()
slicedLen := sliced.Len()
actualSize := ActualSizeInBytes(sliced.Data())
expectedSize := uint64(bitutil.BytesForBits(int64(slicedLen))) + uint64(slicedLen*8)
assert.Equal(t, 1, slicedLen)
assert.Equal(t, expectedSize, actualSize)
t.Logf("Sliced [500:501] - ActualSize: %d, Expected: %d", actualSize, expectedSize)
})
}
func TestActualSizeInBytesSlicedList(t *testing.T) {
pool := memory.DefaultAllocator
listBuilder := array.NewListBuilder(pool, arrow.PrimitiveTypes.Int32)
defer listBuilder.Release()
valueBuilder := listBuilder.ValueBuilder().(*array.Int32Builder)
totalRows := 100
for i := 0; i < totalRows; i++ {
listBuilder.Append(true)
numElements := i%10 + 1
for j := 0; j < numElements; j++ {
valueBuilder.Append(int32(i*10 + j))
}
}
arr := listBuilder.NewArray().(*array.List)
defer arr.Release()
t.Run("Full array", func(t *testing.T) {
actualSize := ActualSizeInBytes(arr.Data())
nullBitmapSize := bitutil.BytesForBits(int64(totalRows))
offsetSize := (totalRows + 1) * 4
childSize := ActualSizeInBytes(arr.ListValues().Data())
expectedSize := uint64(nullBitmapSize+int64(offsetSize)) + childSize
assert.Equal(t, expectedSize, actualSize)
t.Logf("Full array - ActualSize: %d, Expected: %d", actualSize, expectedSize)
})
t.Run("Sliced array [10:20]", func(t *testing.T) {
sliced := array.NewSlice(arr, 10, 20).(*array.List)
defer sliced.Release()
slicedLen := sliced.Len()
actualSize := ActualSizeInBytes(sliced.Data())
assert.Equal(t, 10, slicedLen)
assert.Less(t, actualSize, ActualSizeInBytes(arr.Data()))
t.Logf("Sliced [10:20] - ActualSize: %d (length: %d)", actualSize, slicedLen)
})
t.Run("Sliced array [0:1]", func(t *testing.T) {
sliced := array.NewSlice(arr, 0, 1).(*array.List)
defer sliced.Release()
slicedLen := sliced.Len()
actualSize := ActualSizeInBytes(sliced.Data())
assert.Equal(t, 1, slicedLen)
assert.Less(t, actualSize, ActualSizeInBytes(arr.Data()))
t.Logf("Sliced [0:1] - ActualSize: %d", actualSize)
})
}
func TestActualSizeInBytesSlicedFloat32(t *testing.T) {
totalRows := 500
builder := array.NewFloat32Builder(memory.DefaultAllocator)
defer builder.Release()
for i := 0; i < totalRows; i++ {
builder.Append(float32(i) * 1.5)
}
arr := builder.NewArray().(*array.Float32)
defer arr.Release()
t.Run("Full array", func(t *testing.T) {
actualSize := ActualSizeInBytes(arr.Data())
expectedSize := uint64(bitutil.BytesForBits(int64(totalRows))) + uint64(totalRows*4)
assert.Equal(t, expectedSize, actualSize)
t.Logf("Full array - ActualSize: %d, Expected: %d", actualSize, expectedSize)
})
t.Run("Sliced array [200:300]", func(t *testing.T) {
sliced := array.NewSlice(arr, 200, 300).(*array.Float32)
defer sliced.Release()
slicedLen := sliced.Len()
actualSize := ActualSizeInBytes(sliced.Data())
expectedSize := uint64(bitutil.BytesForBits(int64(slicedLen))) + uint64(slicedLen*4)
assert.Equal(t, 100, slicedLen)
assert.Equal(t, expectedSize, actualSize)
t.Logf("Sliced [200:300] - ActualSize: %d, Expected: %d", actualSize, expectedSize)
})
}
func TestActualSizeInBytesSlicedBool(t *testing.T) {
totalRows := 1024
builder := array.NewBooleanBuilder(memory.DefaultAllocator)
defer builder.Release()
for i := 0; i < totalRows; i++ {
builder.Append(i%2 == 0)
}
arr := builder.NewArray().(*array.Boolean)
defer arr.Release()
t.Run("Full array", func(t *testing.T) {
actualSize := ActualSizeInBytes(arr.Data())
expectedSize := uint64(bitutil.BytesForBits(int64(totalRows)) * 2)
assert.Equal(t, expectedSize, actualSize)
t.Logf("Full array - ActualSize: %d, Expected: %d", actualSize, expectedSize)
})
t.Run("Sliced array [512:768]", func(t *testing.T) {
sliced := array.NewSlice(arr, 512, 768).(*array.Boolean)
defer sliced.Release()
slicedLen := sliced.Len()
actualSize := ActualSizeInBytes(sliced.Data())
expectedSize := uint64(bitutil.BytesForBits(int64(slicedLen)) * 2)
assert.Equal(t, 256, slicedLen)
assert.Equal(t, expectedSize, actualSize)
t.Logf("Sliced [512:768] - ActualSize: %d, Expected: %d", actualSize, expectedSize)
})
}
func TestActualSizeInBytesSlicedBinary(t *testing.T) {
totalRows := 50
builder := array.NewBinaryBuilder(memory.DefaultAllocator, arrow.BinaryTypes.Binary)
defer builder.Release()
for i := 0; i < totalRows; i++ {
data := make([]byte, i+5)
for j := range data {
data[j] = byte(i)
}
builder.Append(data)
}
arr := builder.NewArray().(*array.Binary)
defer arr.Release()
t.Run("Full array", func(t *testing.T) {
actualSize := ActualSizeInBytes(arr.Data())
t.Logf("Full array - ActualSize: %d", actualSize)
})
t.Run("Sliced array [10:30]", func(t *testing.T) {
sliced := array.NewSlice(arr, 10, 30).(*array.Binary)
defer sliced.Release()
slicedLen := sliced.Len()
actualSize := ActualSizeInBytes(sliced.Data())
assert.Equal(t, 20, slicedLen)
assert.Less(t, actualSize, ActualSizeInBytes(arr.Data()))
t.Logf("Sliced [10:30] - ActualSize: %d (length: %d)", actualSize, slicedLen)
})
t.Run("Sliced array [0:10]", func(t *testing.T) {
sliced := array.NewSlice(arr, 0, 10).(*array.Binary)
defer sliced.Release()
slicedLen := sliced.Len()
actualSize := ActualSizeInBytes(sliced.Data())
assert.Equal(t, 10, slicedLen)
assert.Less(t, actualSize, ActualSizeInBytes(arr.Data()))
t.Logf("Sliced [0:10] - ActualSize: %d", actualSize)
})
}
func TestActualSizeInBytesCompareWithDataSizeInBytes(t *testing.T) {
dim := 768
byteWidth := dim * 4
totalRows := 1000
builder := array.NewFixedSizeBinaryBuilder(memory.DefaultAllocator, &arrow.FixedSizeBinaryType{ByteWidth: byteWidth})
defer builder.Release()
for i := 0; i < totalRows; i++ {
vec := make([]byte, byteWidth)
for j := range vec {
vec[j] = byte((i + j) % 256)
}
builder.Append(vec)
}
arr := builder.NewArray().(*array.FixedSizeBinary)
defer arr.Release()
t.Run("Full array comparison", func(t *testing.T) {
actualSize := ActualSizeInBytes(arr.Data())
arrowSize := arr.Data().SizeInBytes()
t.Logf("Full array - ActualSizeInBytes: %d, Data().SizeInBytes(): %d", actualSize, arrowSize)
t.Logf("Difference: %d bytes (%.2f%%)",
int64(arrowSize)-int64(actualSize),
float64(int64(arrowSize)-int64(actualSize))/float64(actualSize)*100)
})
t.Run("Sliced array [100:200] comparison", func(t *testing.T) {
sliced := array.NewSlice(arr, 100, 200).(*array.FixedSizeBinary)
defer sliced.Release()
actualSize := ActualSizeInBytes(sliced.Data())
arrowSize := sliced.Data().SizeInBytes()
expectedSize := uint64(100 * byteWidth)
t.Logf("Sliced [100:200] - ActualSizeInBytes: %d, Data().SizeInBytes(): %d", actualSize, arrowSize)
t.Logf("Expected actual data: %d bytes", expectedSize)
t.Logf("ActualSizeInBytes correctly accounts for slice: %v", actualSize < uint64(totalRows*byteWidth))
assert.Less(t, actualSize, uint64(totalRows*byteWidth))
})
}