mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
fix: correct memory size estimation on arrays (#40312)
See: #40342 --------- Signed-off-by: Ted Xu <ted.xu@zilliz.com>
This commit is contained in:
parent
1637cf5664
commit
878ce56079
@ -150,14 +150,14 @@ func TestBulkPackWriter_Write(t *testing.T) {
|
|||||||
{
|
{
|
||||||
EntriesNum: 10,
|
EntriesNum: 10,
|
||||||
LogPath: "files/delta_log/123/456/789/10000",
|
LogPath: "files/delta_log/123/456/789/10000",
|
||||||
LogSize: 594,
|
LogSize: 592,
|
||||||
MemorySize: 283,
|
MemorySize: 283,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
wantStats: map[int64]*datapb.FieldBinlog{},
|
wantStats: map[int64]*datapb.FieldBinlog{},
|
||||||
wantBm25Stats: map[int64]*datapb.FieldBinlog{},
|
wantBm25Stats: map[int64]*datapb.FieldBinlog{},
|
||||||
wantSize: 594,
|
wantSize: 592,
|
||||||
wantErr: nil,
|
wantErr: nil,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|||||||
@ -20,6 +20,7 @@ import (
|
|||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"io"
|
"io"
|
||||||
"math"
|
"math"
|
||||||
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/apache/arrow/go/v17/arrow"
|
"github.com/apache/arrow/go/v17/arrow"
|
||||||
@ -32,6 +33,7 @@ import (
|
|||||||
|
|
||||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||||
"github.com/milvus-io/milvus/pkg/v2/common"
|
"github.com/milvus-io/milvus/pkg/v2/common"
|
||||||
|
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Record interface {
|
type Record interface {
|
||||||
@ -713,8 +715,9 @@ type singleFieldRecordWriter struct {
|
|||||||
schema *arrow.Schema
|
schema *arrow.Schema
|
||||||
writerProps *parquet.WriterProperties
|
writerProps *parquet.WriterProperties
|
||||||
|
|
||||||
numRows int
|
numRows int
|
||||||
writtenUncompressed uint64
|
writtenUncompressed uint64
|
||||||
|
memoryExpansionRatio int
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sfw *singleFieldRecordWriter) Write(r Record) error {
|
func (sfw *singleFieldRecordWriter) Write(r Record) error {
|
||||||
@ -728,21 +731,45 @@ func (sfw *singleFieldRecordWriter) Write(r Record) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (sfw *singleFieldRecordWriter) GetWrittenUncompressed() uint64 {
|
func (sfw *singleFieldRecordWriter) GetWrittenUncompressed() uint64 {
|
||||||
return sfw.writtenUncompressed
|
return sfw.writtenUncompressed * uint64(sfw.memoryExpansionRatio)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sfw *singleFieldRecordWriter) Close() error {
|
func (sfw *singleFieldRecordWriter) Close() error {
|
||||||
return sfw.fw.Close()
|
return sfw.fw.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func newSingleFieldRecordWriter(fieldId FieldID, field arrow.Field, writer io.Writer, opts ...RecordWriterOptions) (*singleFieldRecordWriter, error) {
|
func newSingleFieldRecordWriter(field *schemapb.FieldSchema, writer io.Writer, opts ...RecordWriterOptions) (*singleFieldRecordWriter, error) {
|
||||||
|
// calculate memory expansion ratio
|
||||||
|
// arrays are serialized by protobuf, where int values may be compacted, see https://protobuf.dev/reference/go/size
|
||||||
|
// to correct the actual size, we need to multiply the memory expansion ratio accordingly.
|
||||||
|
determineMemoryExpansionRatio := func(field *schemapb.FieldSchema) int {
|
||||||
|
if field.DataType == schemapb.DataType_Array {
|
||||||
|
switch field.GetElementType() {
|
||||||
|
case schemapb.DataType_Int16:
|
||||||
|
return 2
|
||||||
|
case schemapb.DataType_Int32:
|
||||||
|
return 4
|
||||||
|
case schemapb.DataType_Int64:
|
||||||
|
return 8
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 1
|
||||||
|
}
|
||||||
|
dim, _ := typeutil.GetDim(field)
|
||||||
w := &singleFieldRecordWriter{
|
w := &singleFieldRecordWriter{
|
||||||
fieldId: fieldId,
|
fieldId: field.FieldID,
|
||||||
schema: arrow.NewSchema([]arrow.Field{field}, nil),
|
schema: arrow.NewSchema([]arrow.Field{
|
||||||
|
{
|
||||||
|
Name: strconv.Itoa(int(field.FieldID)),
|
||||||
|
Type: serdeMap[field.DataType].arrowType(int(dim)),
|
||||||
|
Nullable: true, // No nullable check here.
|
||||||
|
},
|
||||||
|
}, nil),
|
||||||
writerProps: parquet.NewWriterProperties(
|
writerProps: parquet.NewWriterProperties(
|
||||||
parquet.WithMaxRowGroupLength(math.MaxInt64), // No additional grouping for now.
|
parquet.WithMaxRowGroupLength(math.MaxInt64), // No additional grouping for now.
|
||||||
parquet.WithCompression(compress.Codecs.Zstd),
|
parquet.WithCompression(compress.Codecs.Zstd),
|
||||||
parquet.WithCompressionLevel(3)),
|
parquet.WithCompressionLevel(3)),
|
||||||
|
memoryExpansionRatio: determineMemoryExpansionRatio(field),
|
||||||
}
|
}
|
||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
o(w)
|
o(w)
|
||||||
|
|||||||
@ -322,13 +322,7 @@ func (bsw *BinlogStreamWriter) GetRecordWriter() (RecordWriter, error) {
|
|||||||
return bsw.rw, nil
|
return bsw.rw, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
fid := bsw.fieldSchema.FieldID
|
rw, err := newSingleFieldRecordWriter(bsw.fieldSchema, &bsw.buf, WithRecordWriterProps(getFieldWriterProps(bsw.fieldSchema)))
|
||||||
dim, _ := typeutil.GetDim(bsw.fieldSchema)
|
|
||||||
rw, err := newSingleFieldRecordWriter(fid, arrow.Field{
|
|
||||||
Name: strconv.Itoa(int(fid)),
|
|
||||||
Type: serdeMap[bsw.fieldSchema.DataType].arrowType(int(dim)),
|
|
||||||
Nullable: true, // No nullable check here.
|
|
||||||
}, &bsw.buf, WithRecordWriterProps(getFieldWriterProps(bsw.fieldSchema)))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -814,12 +808,7 @@ func (dsw *DeltalogStreamWriter) GetRecordWriter() (RecordWriter, error) {
|
|||||||
if dsw.rw != nil {
|
if dsw.rw != nil {
|
||||||
return dsw.rw, nil
|
return dsw.rw, nil
|
||||||
}
|
}
|
||||||
dim, _ := typeutil.GetDim(dsw.fieldSchema)
|
rw, err := newSingleFieldRecordWriter(dsw.fieldSchema, &dsw.buf, WithRecordWriterProps(getFieldWriterProps(dsw.fieldSchema)))
|
||||||
rw, err := newSingleFieldRecordWriter(dsw.fieldSchema.FieldID, arrow.Field{
|
|
||||||
Name: dsw.fieldSchema.Name,
|
|
||||||
Type: serdeMap[dsw.fieldSchema.DataType].arrowType(int(dim)),
|
|
||||||
Nullable: false,
|
|
||||||
}, &dsw.buf, WithRecordWriterProps(getFieldWriterProps(dsw.fieldSchema)))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|||||||
@ -97,9 +97,13 @@ func TestBinlogStreamWriter(t *testing.T) {
|
|||||||
t.Run("test write", func(t *testing.T) {
|
t.Run("test write", func(t *testing.T) {
|
||||||
size := 3
|
size := 3
|
||||||
|
|
||||||
field := arrow.Field{Name: "bool", Type: arrow.FixedWidthTypes.Boolean}
|
field := &schemapb.FieldSchema{
|
||||||
|
FieldID: 1,
|
||||||
|
DataType: schemapb.DataType_Bool,
|
||||||
|
}
|
||||||
|
|
||||||
var w bytes.Buffer
|
var w bytes.Buffer
|
||||||
rw, err := newSingleFieldRecordWriter(1, field, &w)
|
rw, err := newSingleFieldRecordWriter(field, &w)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
builder := array.NewBooleanBuilder(memory.DefaultAllocator)
|
builder := array.NewBooleanBuilder(memory.DefaultAllocator)
|
||||||
@ -108,7 +112,7 @@ func TestBinlogStreamWriter(t *testing.T) {
|
|||||||
defer arr.Release()
|
defer arr.Release()
|
||||||
ar := array.NewRecord(
|
ar := array.NewRecord(
|
||||||
arrow.NewSchema(
|
arrow.NewSchema(
|
||||||
[]arrow.Field{field},
|
[]arrow.Field{{Name: "bool", Type: arrow.FixedWidthTypes.Boolean}},
|
||||||
nil,
|
nil,
|
||||||
),
|
),
|
||||||
[]arrow.Array{arr},
|
[]arrow.Array{arr},
|
||||||
@ -213,6 +217,84 @@ func TestBinlogSerializeWriter(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSize(t *testing.T) {
|
||||||
|
t.Run("test array of int", func(t *testing.T) {
|
||||||
|
size := 100
|
||||||
|
schema := &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{
|
||||||
|
{
|
||||||
|
FieldID: 18,
|
||||||
|
Name: "array",
|
||||||
|
DataType: schemapb.DataType_Array,
|
||||||
|
ElementType: schemapb.DataType_Int32,
|
||||||
|
},
|
||||||
|
}}
|
||||||
|
|
||||||
|
writers := NewBinlogStreamWriters(0, 0, 0, schema.Fields)
|
||||||
|
writer, err := NewBinlogSerializeWriter(schema, 0, 0, writers, 7)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
for i := 0; i < size; i++ {
|
||||||
|
e := int32(i)
|
||||||
|
d := []int32{e, e, e, e, e, e, e, e}
|
||||||
|
value := &Value{
|
||||||
|
Value: map[FieldID]any{
|
||||||
|
18: &schemapb.ScalarField{
|
||||||
|
Data: &schemapb.ScalarField_IntData{
|
||||||
|
IntData: &schemapb.IntArray{Data: d},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
err := writer.Write(value)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = writer.Close()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
memSize := writer.WrittenMemorySize()
|
||||||
|
assert.Greater(t, memSize, uint64(8*4*size)) // written memory size should greater than data size
|
||||||
|
t.Log("writtern memory size", memSize)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("test array of varchar", func(t *testing.T) {
|
||||||
|
size := 100
|
||||||
|
schema := &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{
|
||||||
|
{
|
||||||
|
FieldID: 18,
|
||||||
|
Name: "array",
|
||||||
|
DataType: schemapb.DataType_Array,
|
||||||
|
ElementType: schemapb.DataType_String,
|
||||||
|
},
|
||||||
|
}}
|
||||||
|
|
||||||
|
writers := NewBinlogStreamWriters(0, 0, 0, schema.Fields)
|
||||||
|
writer, err := NewBinlogSerializeWriter(schema, 0, 0, writers, 7)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
for i := 0; i < size; i++ {
|
||||||
|
e := fmt.Sprintf("%4d", i)
|
||||||
|
d := []string{e, e, e, e, e, e, e, e}
|
||||||
|
value := &Value{
|
||||||
|
Value: map[FieldID]any{
|
||||||
|
18: &schemapb.ScalarField{
|
||||||
|
Data: &schemapb.ScalarField_StringData{
|
||||||
|
StringData: &schemapb.StringArray{Data: d},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
err := writer.Write(value)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = writer.Close()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
memSize := writer.WrittenMemorySize()
|
||||||
|
assert.Greater(t, memSize, uint64(8*4*size)) // written memory size should greater than data size
|
||||||
|
t.Log("writtern memory size", memSize)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func BenchmarkSerializeWriter(b *testing.B) {
|
func BenchmarkSerializeWriter(b *testing.B) {
|
||||||
const (
|
const (
|
||||||
dim = 128
|
dim = 128
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user