mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
fix: Fix sort stats generates large binlogs (#42456)
Remove the hardcoded batchSize of 100,000 and instead trigger a write every 64MB based on actual data size. This prevents sort stats from generating excessively large binlog files. issue: https://github.com/milvus-io/milvus/issues/42400 --------- Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
parent
aa66072a1c
commit
e0113b375e
@ -95,7 +95,7 @@ func mergeSortMultipleSegments(ctx context.Context,
|
||||
log.Warn("compaction only support int64 and varchar pk field")
|
||||
}
|
||||
|
||||
if _, err = storage.MergeSort(plan.GetSchema(), segmentReaders, writer, predicate); err != nil {
|
||||
if _, err = storage.MergeSort(compactionParams.BinLogMaxSize, plan.GetSchema(), segmentReaders, writer, predicate); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
||||
@ -239,8 +239,9 @@ func (st *statsTask) sort(ctx context.Context) ([]*datapb.FieldBinlog, error) {
|
||||
log.Warn("error creating insert binlog reader", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rrs := []storage.RecordReader{rr}
|
||||
numValidRows, err := storage.Sort(st.req.Schema, rrs, srw, predicate)
|
||||
numValidRows, err := storage.Sort(st.req.GetBinlogMaxSize(), st.req.GetSchema(), rrs, srw, predicate)
|
||||
if err != nil {
|
||||
log.Warn("sort failed", zap.Int64("taskID", st.req.GetTaskID()), zap.Error(err))
|
||||
return nil, err
|
||||
|
||||
@ -27,189 +27,211 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
||||
)
|
||||
|
||||
func appendValueAt(builder array.Builder, a arrow.Array, idx int, defaultValue *schemapb.ValueField) error {
|
||||
func appendValueAt(builder array.Builder, a arrow.Array, idx int, defaultValue *schemapb.ValueField) (uint64, error) {
|
||||
switch b := builder.(type) {
|
||||
case *array.BooleanBuilder:
|
||||
if a == nil {
|
||||
if defaultValue != nil {
|
||||
b.Append(defaultValue.GetBoolData())
|
||||
return 1, nil
|
||||
} else {
|
||||
b.AppendNull()
|
||||
return 0, nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
ba, ok := a.(*array.Boolean)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type())
|
||||
return 0, fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type())
|
||||
}
|
||||
if ba.IsNull(idx) {
|
||||
b.AppendNull()
|
||||
return 0, nil
|
||||
} else {
|
||||
b.Append(ba.Value(idx))
|
||||
return 1, nil
|
||||
}
|
||||
return nil
|
||||
case *array.Int8Builder:
|
||||
if a == nil {
|
||||
if defaultValue != nil {
|
||||
b.Append(int8(defaultValue.GetIntData()))
|
||||
return 1, nil
|
||||
} else {
|
||||
b.AppendNull()
|
||||
return 0, nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
ia, ok := a.(*array.Int8)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type())
|
||||
return 0, fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type())
|
||||
}
|
||||
if ia.IsNull(idx) {
|
||||
b.AppendNull()
|
||||
return 0, nil
|
||||
} else {
|
||||
b.Append(ia.Value(idx))
|
||||
return 1, nil
|
||||
}
|
||||
return nil
|
||||
case *array.Int16Builder:
|
||||
if a == nil {
|
||||
if defaultValue != nil {
|
||||
b.Append(int16(defaultValue.GetIntData()))
|
||||
return 2, nil
|
||||
} else {
|
||||
b.AppendNull()
|
||||
return 0, nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
ia, ok := a.(*array.Int16)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type())
|
||||
return 0, fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type())
|
||||
}
|
||||
if ia.IsNull(idx) {
|
||||
b.AppendNull()
|
||||
return 0, nil
|
||||
} else {
|
||||
b.Append(ia.Value(idx))
|
||||
return 2, nil
|
||||
}
|
||||
return nil
|
||||
case *array.Int32Builder:
|
||||
if a == nil {
|
||||
if defaultValue != nil {
|
||||
b.Append(defaultValue.GetIntData())
|
||||
return 4, nil
|
||||
} else {
|
||||
b.AppendNull()
|
||||
return 0, nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
ia, ok := a.(*array.Int32)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type())
|
||||
return 0, fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type())
|
||||
}
|
||||
if ia.IsNull(idx) {
|
||||
b.AppendNull()
|
||||
return 0, nil
|
||||
} else {
|
||||
b.Append(ia.Value(idx))
|
||||
return 4, nil
|
||||
}
|
||||
return nil
|
||||
case *array.Int64Builder:
|
||||
if a == nil {
|
||||
if defaultValue != nil {
|
||||
b.Append(defaultValue.GetLongData())
|
||||
return 8, nil
|
||||
} else {
|
||||
b.AppendNull()
|
||||
return 0, nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
ia, ok := a.(*array.Int64)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type())
|
||||
return 0, fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type())
|
||||
}
|
||||
if ia.IsNull(idx) {
|
||||
b.AppendNull()
|
||||
return 0, nil
|
||||
} else {
|
||||
b.Append(ia.Value(idx))
|
||||
return 8, nil
|
||||
}
|
||||
return nil
|
||||
case *array.Float32Builder:
|
||||
if a == nil {
|
||||
if defaultValue != nil {
|
||||
b.Append(defaultValue.GetFloatData())
|
||||
return 4, nil
|
||||
} else {
|
||||
b.AppendNull()
|
||||
return 0, nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
fa, ok := a.(*array.Float32)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type())
|
||||
return 0, fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type())
|
||||
}
|
||||
if fa.IsNull(idx) {
|
||||
b.AppendNull()
|
||||
return 0, nil
|
||||
} else {
|
||||
b.Append(fa.Value(idx))
|
||||
return 4, nil
|
||||
}
|
||||
return nil
|
||||
case *array.Float64Builder:
|
||||
if a == nil {
|
||||
if defaultValue != nil {
|
||||
b.Append(defaultValue.GetDoubleData())
|
||||
return 8, nil
|
||||
} else {
|
||||
b.AppendNull()
|
||||
return 0, nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
fa, ok := a.(*array.Float64)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type())
|
||||
return 0, fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type())
|
||||
}
|
||||
if fa.IsNull(idx) {
|
||||
b.AppendNull()
|
||||
return 0, nil
|
||||
} else {
|
||||
b.Append(fa.Value(idx))
|
||||
return 8, nil
|
||||
}
|
||||
return nil
|
||||
case *array.StringBuilder:
|
||||
if a == nil {
|
||||
if defaultValue != nil {
|
||||
b.Append(defaultValue.GetStringData())
|
||||
val := defaultValue.GetStringData()
|
||||
b.Append(val)
|
||||
return uint64(len(val)), nil
|
||||
} else {
|
||||
b.AppendNull()
|
||||
return 0, nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
sa, ok := a.(*array.String)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type())
|
||||
return 0, fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type())
|
||||
}
|
||||
if sa.IsNull(idx) {
|
||||
b.AppendNull()
|
||||
return 0, nil
|
||||
} else {
|
||||
b.Append(sa.Value(idx))
|
||||
val := sa.Value(idx)
|
||||
b.Append(val)
|
||||
return uint64(len(val)), nil
|
||||
}
|
||||
return nil
|
||||
case *array.BinaryBuilder:
|
||||
// array type, not support defaultValue now
|
||||
if a == nil {
|
||||
b.AppendNull()
|
||||
return nil
|
||||
return 0, nil
|
||||
}
|
||||
ba, ok := a.(*array.Binary)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type())
|
||||
return 0, fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type())
|
||||
}
|
||||
if ba.IsNull(idx) {
|
||||
b.AppendNull()
|
||||
return 0, nil
|
||||
} else {
|
||||
b.Append(ba.Value(idx))
|
||||
val := ba.Value(idx)
|
||||
b.Append(val)
|
||||
return uint64(len(val)), nil
|
||||
}
|
||||
return nil
|
||||
case *array.FixedSizeBinaryBuilder:
|
||||
ba, ok := a.(*array.FixedSizeBinary)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type())
|
||||
return 0, fmt.Errorf("invalid value type %T, expect %T", a.DataType(), builder.Type())
|
||||
}
|
||||
if ba.IsNull(idx) {
|
||||
b.AppendNull()
|
||||
return 0, nil
|
||||
} else {
|
||||
b.Append(ba.Value(idx))
|
||||
val := ba.Value(idx)
|
||||
b.Append(val)
|
||||
return uint64(len(val)), nil
|
||||
}
|
||||
return nil
|
||||
default:
|
||||
return fmt.Errorf("unsupported builder type: %T", builder)
|
||||
return 0, fmt.Errorf("unsupported builder type: %T", builder)
|
||||
}
|
||||
}
|
||||
|
||||
@ -222,22 +244,33 @@ type RecordBuilder struct {
|
||||
builders []array.Builder
|
||||
|
||||
nRows int
|
||||
size uint64
|
||||
}
|
||||
|
||||
func (b *RecordBuilder) Append(rec Record, start, end int) {
|
||||
func (b *RecordBuilder) Append(rec Record, start, end int) error {
|
||||
for offset := start; offset < end; offset++ {
|
||||
for i, builder := range b.builders {
|
||||
f := b.fields[i]
|
||||
appendValueAt(builder, rec.Column(f.FieldID), offset, f.GetDefaultValue())
|
||||
col := rec.Column(f.FieldID)
|
||||
size, err := appendValueAt(builder, col, offset, f.GetDefaultValue())
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to append value at offset %d for field %s: %w", offset, f.GetName(), err)
|
||||
}
|
||||
b.size += size
|
||||
}
|
||||
}
|
||||
b.nRows += (end - start)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *RecordBuilder) GetRowNum() int {
|
||||
return b.nRows
|
||||
}
|
||||
|
||||
func (b *RecordBuilder) GetSize() uint64 {
|
||||
return b.size
|
||||
}
|
||||
|
||||
func (b *RecordBuilder) Build() Record {
|
||||
arrays := make([]arrow.Array, len(b.builders))
|
||||
fields := make([]arrow.Field, len(b.builders))
|
||||
@ -256,6 +289,7 @@ func (b *RecordBuilder) Build() Record {
|
||||
|
||||
rec := NewSimpleArrowRecord(array.NewRecord(arrow.NewSchema(fields, nil), arrays, int64(b.nRows)), field2Col)
|
||||
b.nRows = 0
|
||||
b.size = 0
|
||||
return rec
|
||||
}
|
||||
|
||||
|
||||
@ -27,7 +27,7 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
||||
)
|
||||
|
||||
func Sort(schema *schemapb.CollectionSchema, rr []RecordReader,
|
||||
func Sort(batchSize uint64, schema *schemapb.CollectionSchema, rr []RecordReader,
|
||||
rw RecordWriter, predicate func(r Record, ri, i int) bool,
|
||||
) (int, error) {
|
||||
records := make([]Record, 0)
|
||||
@ -90,7 +90,6 @@ func Sort(schema *schemapb.CollectionSchema, rr []RecordReader,
|
||||
}
|
||||
|
||||
rb := NewRecordBuilder(schema)
|
||||
batchSize := 100000
|
||||
writeRecord := func() error {
|
||||
rec := rb.Build()
|
||||
defer rec.Release()
|
||||
@ -100,9 +99,13 @@ func Sort(schema *schemapb.CollectionSchema, rr []RecordReader,
|
||||
return nil
|
||||
}
|
||||
|
||||
for i, idx := range indices {
|
||||
rb.Append(records[idx.ri], idx.i, idx.i+1)
|
||||
if (i+1)%batchSize == 0 {
|
||||
for _, idx := range indices {
|
||||
if err := rb.Append(records[idx.ri], idx.i, idx.i+1); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// Write when accumulated data size reaches batchSize
|
||||
if rb.GetSize() >= batchSize {
|
||||
if err := writeRecord(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
@ -165,7 +168,7 @@ func NewPriorityQueue[T any](less func(x, y *T) bool) *PriorityQueue[T] {
|
||||
return &pq
|
||||
}
|
||||
|
||||
func MergeSort(schema *schemapb.CollectionSchema, rr []RecordReader,
|
||||
func MergeSort(batchSize uint64, schema *schemapb.CollectionSchema, rr []RecordReader,
|
||||
rw RecordWriter, predicate func(r Record, ri, i int) bool,
|
||||
) (numRows int, err error) {
|
||||
type index struct {
|
||||
@ -230,16 +233,15 @@ func MergeSort(schema *schemapb.CollectionSchema, rr []RecordReader,
|
||||
}
|
||||
}
|
||||
|
||||
// Due to current arrow impl (v12), the write performance is largely dependent on the batch size,
|
||||
// small batch size will cause write performance degradation. To work around this issue, we accumulate
|
||||
// records and write them in batches. This requires additional memory copy.
|
||||
batchSize := 100000
|
||||
rb := NewRecordBuilder(schema)
|
||||
|
||||
for pq.Len() > 0 {
|
||||
idx := pq.Dequeue()
|
||||
rb.Append(recs[idx.ri], idx.i, idx.i+1)
|
||||
if rb.GetRowNum()%batchSize == 0 {
|
||||
// Due to current arrow impl (v12), the write performance is largely dependent on the batch size,
|
||||
// small batch size will cause write performance degradation. To work around this issue, we accumulate
|
||||
// records and write them in batches. This requires additional memory copy.
|
||||
if rb.GetSize() >= batchSize {
|
||||
if err := rw.Write(rb.Build()); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
@ -26,6 +26,8 @@ import (
|
||||
)
|
||||
|
||||
func TestSort(t *testing.T) {
|
||||
const batchSize = 64 * 1024 * 1024
|
||||
|
||||
getReaders := func() []RecordReader {
|
||||
blobs, err := generateTestDataWithSeed(10, 3)
|
||||
assert.NoError(t, err)
|
||||
@ -55,7 +57,7 @@ func TestSort(t *testing.T) {
|
||||
}
|
||||
|
||||
t.Run("sort", func(t *testing.T) {
|
||||
gotNumRows, err := Sort(generateTestSchema(), getReaders(), rw, func(r Record, ri, i int) bool {
|
||||
gotNumRows, err := Sort(batchSize, generateTestSchema(), getReaders(), rw, func(r Record, ri, i int) bool {
|
||||
return true
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
@ -65,7 +67,7 @@ func TestSort(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("sort with predicate", func(t *testing.T) {
|
||||
gotNumRows, err := Sort(generateTestSchema(), getReaders(), rw, func(r Record, ri, i int) bool {
|
||||
gotNumRows, err := Sort(batchSize, generateTestSchema(), getReaders(), rw, func(r Record, ri, i int) bool {
|
||||
pk := r.Column(common.RowIDField).(*array.Int64).Value(i)
|
||||
return pk >= 20
|
||||
})
|
||||
@ -105,8 +107,10 @@ func TestMergeSort(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
const batchSize = 64 * 1024 * 1024
|
||||
|
||||
t.Run("merge sort", func(t *testing.T) {
|
||||
gotNumRows, err := MergeSort(generateTestSchema(), getReaders(), rw, func(r Record, ri, i int) bool {
|
||||
gotNumRows, err := MergeSort(batchSize, generateTestSchema(), getReaders(), rw, func(r Record, ri, i int) bool {
|
||||
return true
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
@ -116,7 +120,7 @@ func TestMergeSort(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("merge sort with predicate", func(t *testing.T) {
|
||||
gotNumRows, err := MergeSort(generateTestSchema(), getReaders(), rw, func(r Record, ri, i int) bool {
|
||||
gotNumRows, err := MergeSort(batchSize, generateTestSchema(), getReaders(), rw, func(r Record, ri, i int) bool {
|
||||
pk := r.Column(common.RowIDField).(*array.Int64).Value(i)
|
||||
return pk >= 20
|
||||
})
|
||||
@ -150,11 +154,12 @@ func BenchmarkSort(b *testing.B) {
|
||||
},
|
||||
}
|
||||
|
||||
const batchSize = 64 * 1024 * 1024
|
||||
b.ResetTimer()
|
||||
|
||||
b.Run("sort", func(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
Sort(generateTestSchema(), rr, rw, func(r Record, ri, i int) bool {
|
||||
Sort(batchSize, generateTestSchema(), rr, rw, func(r Record, ri, i int) bool {
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user