enhance: iterative download data during compaction to reduce memory cost (#39724)

See #37234

---------

Signed-off-by: Ted Xu <ted.xu@zilliz.com>
This commit is contained in:
Ted Xu 2025-02-13 10:36:47 +08:00 committed by GitHub
parent 0e7e4af8a5
commit 2978b0890e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 218 additions and 189 deletions

View File

@ -621,8 +621,7 @@ func (t *clusteringCompactionTask) mappingSegment(
blobs := lo.Map(allValues, func(v []byte, i int) *storage.Blob {
return &storage.Blob{Key: paths[i], Value: v}
})
pkIter, err := storage.NewBinlogDeserializeReader(blobs, t.primaryKeyField.GetFieldID())
pkIter, err := storage.NewBinlogDeserializeReader(t.plan.Schema, storage.MakeBlobsReader(blobs))
if err != nil {
log.Warn("new insert binlogs Itr wrong", zap.Strings("paths", paths), zap.Error(err))
return err
@ -1196,7 +1195,7 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment(
return &storage.Blob{Key: paths[i], Value: v}
})
pkIter, err := storage.NewBinlogDeserializeReader(blobs, t.primaryKeyField.GetFieldID())
pkIter, err := storage.NewBinlogDeserializeReader(t.plan.Schema, storage.MakeBlobsReader(blobs))
if err != nil {
log.Warn("new insert binlogs Itr wrong", zap.Strings("path", paths), zap.Error(err))
return nil, err

View File

@ -168,50 +168,6 @@ func mergeDeltalogs(ctx context.Context, io io.BinlogIO, paths []string) (map[in
return pk2Ts, nil
}
func composePaths(segments []*datapb.CompactionSegmentBinlogs) (
deltaPaths map[typeutil.UniqueID][]string, insertPaths map[typeutil.UniqueID][]string, err error,
) {
if err := binlog.DecompressCompactionBinlogs(segments); err != nil {
log.Warn("compact wrong, fail to decompress compaction binlogs", zap.Error(err))
return nil, nil, err
}
deltaPaths = make(map[typeutil.UniqueID][]string) // segmentID to deltalog paths
insertPaths = make(map[typeutil.UniqueID][]string, 0) // segmentID to binlog paths
for _, s := range segments {
segId := s.GetSegmentID()
// Get the batch count of field binlog files from non-empty segment
// each segment might contain different batches
var binlogBatchCount int
for _, b := range s.GetFieldBinlogs() {
if b != nil {
binlogBatchCount = len(b.GetBinlogs())
break
}
}
if binlogBatchCount == 0 {
log.Warn("compacting empty segment", zap.Int64("segmentID", s.GetSegmentID()))
continue
}
for idx := 0; idx < binlogBatchCount; idx++ {
var batchPaths []string
for _, f := range s.GetFieldBinlogs() {
batchPaths = append(batchPaths, f.GetBinlogs()[idx].GetLogPath())
}
insertPaths[segId] = append(insertPaths[segId], batchPaths...)
}
deltaPaths[s.GetSegmentID()] = []string{}
for _, d := range s.GetDeltalogs() {
for _, l := range d.GetBinlogs() {
deltaPaths[segId] = append(deltaPaths[s.GetSegmentID()], l.GetLogPath())
}
}
}
return deltaPaths, insertPaths, nil
}
func serializeWrite(ctx context.Context, allocator allocator.Interface, writer *SegmentWriter) (kvs map[string][]byte, fieldBinlogs map[int64]*datapb.FieldBinlog, err error) {
_, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "serializeWrite")
defer span.End()

View File

@ -32,6 +32,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/flushcommon/io"
"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
@ -56,7 +57,6 @@ type mixCompactionTask struct {
partitionID int64
targetSize int64
maxRows int64
pkID int64
bm25FieldIDs []int64
@ -131,8 +131,6 @@ func (t *mixCompactionTask) preCompact() error {
func (t *mixCompactionTask) mergeSplit(
ctx context.Context,
insertPaths map[int64][]string,
deltaPaths map[int64][]string,
) ([]*datapb.CompactionSegment, error) {
_ = t.tr.RecordSpan()
@ -154,9 +152,9 @@ func (t *mixCompactionTask) mergeSplit(
log.Warn("failed to get pk field from schema")
return nil, err
}
for segId, binlogPaths := range insertPaths {
deltaPaths := deltaPaths[segId]
del, exp, err := t.writeSegment(ctx, binlogPaths, deltaPaths, mWriter, pkField)
for _, seg := range t.plan.GetSegmentBinlogs() {
del, exp, err := t.writeSegment(ctx, seg, mWriter, pkField)
if err != nil {
return nil, err
}
@ -180,21 +178,15 @@ func (t *mixCompactionTask) mergeSplit(
}
func (t *mixCompactionTask) writeSegment(ctx context.Context,
binlogPaths []string,
deltaPaths []string,
seg *datapb.CompactionSegmentBinlogs,
mWriter *MultiSegmentWriter, pkField *schemapb.FieldSchema,
) (deletedRowCount, expiredRowCount int64, err error) {
log := log.With(zap.Strings("paths", binlogPaths))
allValues, err := t.binlogIO.Download(ctx, binlogPaths)
if err != nil {
log.Warn("compact wrong, fail to download insertLogs", zap.Error(err))
return
deltaPaths := make([]string, 0)
for _, fieldBinlog := range seg.GetDeltalogs() {
for _, binlog := range fieldBinlog.GetBinlogs() {
deltaPaths = append(deltaPaths, binlog.GetLogPath())
}
}
blobs := lo.Map(allValues, func(v []byte, i int) *storage.Blob {
return &storage.Blob{Key: binlogPaths[i], Value: v}
})
delta, err := mergeDeltalogs(ctx, t.binlogIO, deltaPaths)
if err != nil {
log.Warn("compact wrong, fail to merge deltalogs", zap.Error(err))
@ -202,7 +194,30 @@ func (t *mixCompactionTask) writeSegment(ctx context.Context,
}
entityFilter := newEntityFilter(delta, t.plan.GetCollectionTtl(), t.currentTime)
reader, err := storage.NewCompositeBinlogRecordReader(blobs)
itr := 0
binlogs := seg.GetFieldBinlogs()
reader, err := storage.NewCompositeBinlogRecordReader(t.plan.GetSchema(), func() ([]*storage.Blob, error) {
if len(binlogs) <= 0 {
return nil, sio.EOF
}
paths := make([]string, len(binlogs))
for i, fieldBinlog := range binlogs {
if itr >= len(fieldBinlog.GetBinlogs()) {
return nil, sio.EOF
}
paths[i] = fieldBinlog.GetBinlogs()[itr].GetLogPath()
}
itr++
values, err := t.binlogIO.Download(ctx, paths)
if err != nil {
log.Warn("compact wrong, fail to download insertLogs", zap.Error(err))
return nil, err
}
blobs := lo.Map(values, func(v []byte, i int) *storage.Blob {
return &storage.Blob{Key: paths[i], Value: v}
})
return blobs, nil
})
if err != nil {
log.Warn("compact wrong, failed to new insert binlogs reader", zap.Error(err))
return
@ -302,19 +317,22 @@ func (t *mixCompactionTask) Compact() (*datapb.CompactionPlanResult, error) {
defer cancelAll()
log.Info("compact start")
deltaPaths, insertPaths, err := composePaths(t.plan.GetSegmentBinlogs())
if err != nil {
log.Warn("compact wrong, failed to composePaths", zap.Error(err))
// Decompress compaction binlogs first
if err := binlog.DecompressCompactionBinlogs(t.plan.SegmentBinlogs); err != nil {
log.Warn("compact wrong, fail to decompress compaction binlogs", zap.Error(err))
return nil, err
}
// Unable to deal with all empty segments cases, so return error
isEmpty := true
for _, paths := range insertPaths {
if len(paths) > 0 {
isEmpty = false
break
isEmpty := func() bool {
for _, seg := range t.plan.GetSegmentBinlogs() {
for _, field := range seg.GetFieldBinlogs() {
if len(field.GetBinlogs()) > 0 {
return false
}
}
}
}
return true
}()
if isEmpty {
log.Warn("compact wrong, all segments' binlogs are empty")
return nil, errors.New("illegal compaction plan")
@ -328,13 +346,15 @@ func (t *mixCompactionTask) Compact() (*datapb.CompactionPlanResult, error) {
break
}
}
if len(insertPaths) <= 1 || len(insertPaths) > paramtable.Get().DataNodeCfg.MaxSegmentMergeSort.GetAsInt() {
if len(t.plan.GetSegmentBinlogs()) <= 1 ||
len(t.plan.GetSegmentBinlogs()) > paramtable.Get().DataNodeCfg.MaxSegmentMergeSort.GetAsInt() {
// sort merge is not applicable if there is only one segment or too many segments
sortMergeAppicable = false
}
}
var res []*datapb.CompactionSegment
var err error
if sortMergeAppicable {
log.Info("compact by merge sort")
res, err = mergeSortMultipleSegments(ctxTimeout, t.plan, t.collectionID, t.partitionID, t.maxRows, t.binlogIO,
@ -344,7 +364,7 @@ func (t *mixCompactionTask) Compact() (*datapb.CompactionPlanResult, error) {
return nil, err
}
} else {
res, err = t.mergeSplit(ctxTimeout, insertPaths, deltaPaths)
res, err = t.mergeSplit(ctxTimeout)
if err != nil {
log.Warn("compact wrong, failed to mergeSplit", zap.Error(err))
return nil, err

View File

@ -56,7 +56,6 @@ type MixCompactionTaskSuite struct {
segWriter *SegmentWriter
task *mixCompactionTask
plan *datapb.CompactionPlan
}
func (s *MixCompactionTaskSuite) SetupSuite() {
@ -70,7 +69,7 @@ func (s *MixCompactionTaskSuite) SetupTest() {
paramtable.Get().Save(paramtable.Get().CommonCfg.EntityExpirationTTL.Key, "0")
s.plan = &datapb.CompactionPlan{
plan := &datapb.CompactionPlan{
PlanID: 999,
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{{
SegmentID: 100,
@ -86,15 +85,14 @@ func (s *MixCompactionTaskSuite) SetupTest() {
MaxSize: 64 * 1024 * 1024,
}
s.task = NewMixCompactionTask(context.Background(), s.mockBinlogIO, s.plan)
s.task.plan = s.plan
s.task = NewMixCompactionTask(context.Background(), s.mockBinlogIO, plan)
}
func (s *MixCompactionTaskSuite) SetupBM25() {
s.mockBinlogIO = io.NewMockBinlogIO(s.T())
s.meta = genTestCollectionMetaWithBM25()
s.plan = &datapb.CompactionPlan{
plan := &datapb.CompactionPlan{
PlanID: 999,
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{{
SegmentID: 100,
@ -110,8 +108,7 @@ func (s *MixCompactionTaskSuite) SetupBM25() {
MaxSize: 64 * 1024 * 1024,
}
s.task = NewMixCompactionTask(context.Background(), s.mockBinlogIO, s.plan)
s.task.plan = s.plan
s.task = NewMixCompactionTask(context.Background(), s.mockBinlogIO, plan)
}
func (s *MixCompactionTaskSuite) SetupSubTest() {
@ -163,7 +160,7 @@ func (s *MixCompactionTaskSuite) TestCompactDupPK() {
return len(left) == 0 && len(right) == 0
})).Return(lo.Values(kvs), nil).Once()
s.plan.SegmentBinlogs = append(s.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{
s.task.plan.SegmentBinlogs = append(s.task.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{
SegmentID: segID,
FieldBinlogs: lo.Values(fBinlogs),
Deltalogs: []*datapb.FieldBinlog{
@ -200,7 +197,7 @@ func (s *MixCompactionTaskSuite) TestCompactTwoToOne() {
return len(left) == 0 && len(right) == 0
})).Return(lo.Values(kvs), nil).Once()
s.plan.SegmentBinlogs = append(s.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{
s.task.plan.SegmentBinlogs = append(s.task.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{
SegmentID: segID,
FieldBinlogs: lo.Values(fBinlogs),
})
@ -214,7 +211,7 @@ func (s *MixCompactionTaskSuite) TestCompactTwoToOne() {
NumOfRows: 0,
}, pkoracle.NewBloomFilterSet(), nil)
s.plan.SegmentBinlogs = append(s.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{
s.task.plan.SegmentBinlogs = append(s.task.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{
SegmentID: seg.SegmentID(),
})
@ -248,7 +245,7 @@ func (s *MixCompactionTaskSuite) TestCompactTwoToOneWithBM25() {
return len(left) == 0 && len(right) == 0
})).Return(lo.Values(kvs), nil).Once()
s.plan.SegmentBinlogs = append(s.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{
s.task.plan.SegmentBinlogs = append(s.task.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{
SegmentID: segID,
FieldBinlogs: lo.Values(fBinlogs),
})
@ -262,7 +259,7 @@ func (s *MixCompactionTaskSuite) TestCompactTwoToOneWithBM25() {
NumOfRows: 0,
}, pkoracle.NewBloomFilterSet(), nil)
s.plan.SegmentBinlogs = append(s.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{
s.task.plan.SegmentBinlogs = append(s.task.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{
SegmentID: seg.SegmentID(),
})
@ -309,7 +306,7 @@ func (s *MixCompactionTaskSuite) TestCompactSortedSegment() {
s.mockBinlogIO.EXPECT().Download(mock.Anything, []string{deltaPath}).
Return([][]byte{blob.GetValue()}, nil).Once()
s.plan.SegmentBinlogs = append(s.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{
s.task.plan.SegmentBinlogs = append(s.task.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{
SegmentID: segID,
FieldBinlogs: lo.Values(fBinlogs),
IsSorted: true,
@ -357,7 +354,18 @@ func (s *MixCompactionTaskSuite) TestSplitMergeEntityExpired() {
s.task.partitionID = PartitionID
s.task.maxRows = 1000
compactionSegments, err := s.task.mergeSplit(s.task.ctx, map[int64][]string{s.segWriter.segmentID: lo.Keys(kvs)}, nil)
fieldBinlogs := make([]*datapb.FieldBinlog, 0, len(kvs))
for k := range kvs {
fieldBinlogs = append(fieldBinlogs, &datapb.FieldBinlog{
Binlogs: []*datapb.Binlog{
{
LogPath: k,
},
},
})
}
s.task.plan.SegmentBinlogs[0].FieldBinlogs = fieldBinlogs
compactionSegments, err := s.task.mergeSplit(s.task.ctx)
s.NoError(err)
s.Equal(1, len(compactionSegments))
s.EqualValues(0, compactionSegments[0].GetNumOfRows())
@ -383,16 +391,10 @@ func (s *MixCompactionTaskSuite) TestMergeNoExpiration() {
alloc := allocator.NewLocalAllocator(888888, math.MaxInt64)
kvs, _, err := serializeWrite(context.TODO(), alloc, s.segWriter)
insertPaths := lo.Keys(kvs)
s.Require().NoError(err)
for _, test := range tests {
s.Run(test.description, func() {
s.mockBinlogIO.EXPECT().Download(mock.Anything, insertPaths).RunAndReturn(
func(ctx context.Context, paths []string) ([][]byte, error) {
s.Require().Equal(len(paths), len(kvs))
return lo.Values(kvs), nil
})
deletePaths := make(map[int64][]string, 0)
if len(test.deletions) > 0 {
blob, err := getInt64DeltaBlobs(
s.segWriter.segmentID,
@ -402,15 +404,49 @@ func (s *MixCompactionTaskSuite) TestMergeNoExpiration() {
s.Require().NoError(err)
s.mockBinlogIO.EXPECT().Download(mock.Anything, []string{"foo"}).
Return([][]byte{blob.GetValue()}, nil).Once()
deletePaths[s.segWriter.segmentID] = []string{"foo"}
s.task.plan.SegmentBinlogs[0].Deltalogs = []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{
LogPath: "foo",
},
},
},
}
}
insertPaths := lo.Keys(kvs)
insertBytes := func() [][]byte {
res := make([][]byte, 0, len(insertPaths))
for _, path := range insertPaths {
res = append(res, kvs[path])
}
return res
}()
s.mockBinlogIO.EXPECT().Download(mock.Anything, insertPaths).RunAndReturn(
func(ctx context.Context, paths []string) ([][]byte, error) {
s.Require().Equal(len(paths), len(kvs))
return insertBytes, nil
})
fieldBinlogs := make([]*datapb.FieldBinlog, 0, len(insertPaths))
for _, k := range insertPaths {
fieldBinlogs = append(fieldBinlogs, &datapb.FieldBinlog{
Binlogs: []*datapb.Binlog{
{
LogPath: k,
},
},
})
}
s.task.plan.SegmentBinlogs[0].FieldBinlogs = fieldBinlogs
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil).Maybe()
s.task.collectionID = CollectionID
s.task.partitionID = PartitionID
s.task.maxRows = 1000
res, err := s.task.mergeSplit(s.task.ctx, map[int64][]string{s.segWriter.segmentID: insertPaths}, deletePaths)
res, err := s.task.mergeSplit(s.task.ctx)
s.NoError(err)
s.EqualValues(test.expectedRes, len(res))
s.EqualValues(test.leftNumRows, res[0].GetNumOfRows())
@ -570,14 +606,14 @@ func (s *MixCompactionTaskSuite) TestCompactFail() {
})
s.Run("Test compact invalid empty segment binlogs", func() {
s.plan.SegmentBinlogs = nil
s.task.plan.SegmentBinlogs = nil
_, err := s.task.Compact()
s.Error(err)
})
s.Run("Test compact failed maxSize zero", func() {
s.plan.MaxSize = 0
s.task.plan.MaxSize = 0
_, err := s.task.Compact()
s.Error(err)
})
@ -876,7 +912,7 @@ func BenchmarkMixCompactor(b *testing.B) {
return len(left) == 0 && len(right) == 0
})).Return(lo.Values(kvs), nil).Once()
s.plan.SegmentBinlogs = append(s.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{
s.task.plan.SegmentBinlogs = append(s.task.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{
SegmentID: segID,
FieldBinlogs: lo.Values(fBinlogs),
})

View File

@ -413,7 +413,7 @@ func (st *statsTask) sort(ctx context.Context) ([]*datapb.FieldBinlog, error) {
return &storage.Blob{Key: paths[i], Value: v}
})
rr, err := storage.NewCompositeBinlogRecordReader(blobs)
rr, err := storage.NewCompositeBinlogRecordReader(st.req.Schema, storage.MakeBlobsReader(blobs))
if err != nil {
log.Warn("downloadData wrong, failed to new insert binlogs reader", zap.Error(err))
return nil, err

View File

@ -33,14 +33,14 @@ import (
func generateTestSchema() *schemapb.CollectionSchema {
schema := &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{
{FieldID: common.TimeStampField, Name: "ts", DataType: schemapb.DataType_Int64},
{FieldID: common.RowIDField, Name: "rowid", DataType: schemapb.DataType_Int64},
{FieldID: common.RowIDField, Name: "rowid", DataType: schemapb.DataType_Int64, IsPrimaryKey: true},
{FieldID: 10, Name: "bool", DataType: schemapb.DataType_Bool},
{FieldID: 11, Name: "int8", DataType: schemapb.DataType_Int8},
{FieldID: 12, Name: "int16", DataType: schemapb.DataType_Int16},
{FieldID: 13, Name: "int64", DataType: schemapb.DataType_Int64},
{FieldID: 14, Name: "float", DataType: schemapb.DataType_Float},
{FieldID: 15, Name: "double", DataType: schemapb.DataType_Double},
{FieldID: 16, Name: "varchar", DataType: schemapb.DataType_VarChar, IsPrimaryKey: true},
{FieldID: 16, Name: "varchar", DataType: schemapb.DataType_VarChar},
{FieldID: 17, Name: "string", DataType: schemapb.DataType_String},
{FieldID: 18, Name: "array", DataType: schemapb.DataType_Array},
{FieldID: 19, Name: "json", DataType: schemapb.DataType_JSON},

View File

@ -48,7 +48,7 @@ type CompositeBinlogRecordReader struct {
brs []*BinlogReader
rrs []array.RecordReader
schema map[FieldID]schemapb.DataType
schema *schemapb.CollectionSchema
index map[FieldID]int16
r *compositeRecord
}
@ -71,7 +71,6 @@ func (crr *CompositeBinlogRecordReader) iterateNextBatch() error {
if crr.rrs == nil {
crr.rrs = make([]array.RecordReader, len(blobs))
crr.brs = make([]*BinlogReader, len(blobs))
crr.schema = make(map[FieldID]schemapb.DataType)
crr.index = make(map[FieldID]int16, len(blobs))
}
@ -81,8 +80,6 @@ func (crr *CompositeBinlogRecordReader) iterateNextBatch() error {
return err
}
// TODO: assert schema being the same in every blobs
crr.schema[reader.FieldID] = reader.PayloadDataType
er, err := reader.NextEventReader()
if err != nil {
return err
@ -169,7 +166,7 @@ func parseBlobKey(blobKey string) (colId FieldID, logId UniqueID) {
return InvalidUniqueID, InvalidUniqueID
}
func NewCompositeBinlogRecordReader(blobs []*Blob) (*CompositeBinlogRecordReader, error) {
func MakeBlobsReader(blobs []*Blob) ChunkedBlobsReader {
blobMap := make(map[FieldID][]*Blob)
for _, blob := range blobs {
colId, _ := parseBlobKey(blob.Key)
@ -190,88 +187,105 @@ func NewCompositeBinlogRecordReader(blobs []*Blob) (*CompositeBinlogRecordReader
sortedBlobs = append(sortedBlobs, blobsForField)
}
chunkPos := 0
return func() ([]*Blob, error) {
if len(sortedBlobs) == 0 || chunkPos >= len(sortedBlobs[0]) {
return nil, io.EOF
}
blobs := make([]*Blob, len(sortedBlobs))
for fieldPos := range blobs {
blobs[fieldPos] = sortedBlobs[fieldPos][chunkPos]
}
chunkPos++
return blobs, nil
}
}
func NewCompositeBinlogRecordReader(schema *schemapb.CollectionSchema, blobsReader ChunkedBlobsReader) (*CompositeBinlogRecordReader, error) {
return &CompositeBinlogRecordReader{
BlobsReader: func() ([]*Blob, error) {
if len(sortedBlobs) == 0 || chunkPos >= len(sortedBlobs[0]) {
return nil, io.EOF
}
blobs := make([]*Blob, len(sortedBlobs))
for fieldPos := range blobs {
blobs[fieldPos] = sortedBlobs[fieldPos][chunkPos]
}
chunkPos++
return blobs, nil
},
schema: schema,
BlobsReader: blobsReader,
}, nil
}
func NewBinlogDeserializeReader(blobs []*Blob, PKfieldID UniqueID) (*DeserializeReader[*Value], error) {
reader, err := NewCompositeBinlogRecordReader(blobs)
func ValueDeserializer(r Record, v []*Value, fieldSchema []*schemapb.FieldSchema) error {
pkField := func() *schemapb.FieldSchema {
for _, field := range fieldSchema {
if field.GetIsPrimaryKey() {
return field
}
}
return nil
}()
if pkField == nil {
return merr.WrapErrServiceInternal("no primary key field found")
}
for i := 0; i < r.Len(); i++ {
value := v[i]
if value == nil {
value = &Value{}
value.Value = make(map[FieldID]interface{}, len(fieldSchema))
v[i] = value
}
m := value.Value.(map[FieldID]interface{})
for _, f := range fieldSchema {
j := f.FieldID
dt := f.DataType
if r.Column(j).IsNull(i) {
m[j] = nil
} else {
d, ok := serdeMap[dt].deserialize(r.Column(j), i)
if ok {
m[j] = d // TODO: avoid memory copy here.
} else {
return merr.WrapErrServiceInternal(fmt.Sprintf("unexpected type %s", dt))
}
}
}
rowID, ok := m[common.RowIDField].(int64)
if !ok {
return merr.WrapErrIoKeyNotFound("no row id column found")
}
value.ID = rowID
value.Timestamp = m[common.TimeStampField].(int64)
pk, err := GenPrimaryKeyByRawData(m[pkField.FieldID], pkField.DataType)
if err != nil {
return err
}
value.PK = pk
value.IsDeleted = false
value.Value = m
}
return nil
}
func NewBinlogDeserializeReader(schema *schemapb.CollectionSchema, blobsReader ChunkedBlobsReader) (*DeserializeReader[*Value], error) {
reader, err := NewCompositeBinlogRecordReader(schema, blobsReader)
if err != nil {
return nil, err
}
return NewDeserializeReader(reader, func(r Record, v []*Value) error {
schema := reader.schema
// Note: the return value `Value` is reused.
for i := 0; i < r.Len(); i++ {
value := v[i]
if value == nil {
value = &Value{}
value.Value = make(map[FieldID]interface{}, len(schema))
v[i] = value
}
m := value.Value.(map[FieldID]interface{})
for j, dt := range schema {
if r.Column(j).IsNull(i) {
m[j] = nil
} else {
d, ok := serdeMap[dt].deserialize(r.Column(j), i)
if ok {
m[j] = d // TODO: avoid memory copy here.
} else {
return merr.WrapErrServiceInternal(fmt.Sprintf("unexpected type %s", dt))
}
}
}
rowID, ok := m[common.RowIDField].(int64)
if !ok {
return merr.WrapErrIoKeyNotFound("no row id column found")
}
value.ID = rowID
value.Timestamp = m[common.TimeStampField].(int64)
pk, err := GenPrimaryKeyByRawData(m[PKfieldID], schema[PKfieldID])
if err != nil {
return err
}
value.PK = pk
value.IsDeleted = false
value.Value = m
}
return nil
return ValueDeserializer(r, v, schema.Fields)
}), nil
}
func newDeltalogOneFieldReader(blobs []*Blob) (*DeserializeReader[*DeleteLog], error) {
reader, err := NewCompositeBinlogRecordReader(blobs)
reader, err := NewCompositeBinlogRecordReader(nil, MakeBlobsReader(blobs))
if err != nil {
return nil, err
}
return NewDeserializeReader(reader, func(r Record, v []*DeleteLog) error {
var fid FieldID // The only fid from delete file
for k := range reader.schema {
fid = k
break
}
for i := 0; i < r.Len(); i++ {
if v[i] == nil {
v[i] = &DeleteLog{}
}
a := r.Column(fid).(*array.String)
// retrieve the only field
a := r.(*compositeRecord).recs[0].(*array.String)
strVal := a.Value(i)
if err := v[i].Parse(strVal); err != nil {
return err

View File

@ -43,7 +43,9 @@ import (
func TestBinlogDeserializeReader(t *testing.T) {
t.Run("test empty data", func(t *testing.T) {
reader, err := NewBinlogDeserializeReader(nil, common.RowIDField)
reader, err := NewBinlogDeserializeReader(nil, func() ([]*Blob, error) {
return nil, io.EOF
})
assert.NoError(t, err)
defer reader.Close()
err = reader.Next()
@ -54,7 +56,7 @@ func TestBinlogDeserializeReader(t *testing.T) {
size := 3
blobs, err := generateTestData(size)
assert.NoError(t, err)
reader, err := NewBinlogDeserializeReader(blobs, common.RowIDField)
reader, err := NewBinlogDeserializeReader(generateTestSchema(), MakeBlobsReader(blobs))
assert.NoError(t, err)
defer reader.Close()
@ -117,7 +119,9 @@ func TestBinlogStreamWriter(t *testing.T) {
func TestBinlogSerializeWriter(t *testing.T) {
t.Run("test empty data", func(t *testing.T) {
reader, err := NewBinlogDeserializeReader(nil, common.RowIDField)
reader, err := NewBinlogDeserializeReader(nil, func() ([]*Blob, error) {
return nil, io.EOF
})
assert.NoError(t, err)
defer reader.Close()
err = reader.Next()
@ -128,7 +132,7 @@ func TestBinlogSerializeWriter(t *testing.T) {
size := 16
blobs, err := generateTestData(size)
assert.NoError(t, err)
reader, err := NewBinlogDeserializeReader(blobs, common.RowIDField)
reader, err := NewBinlogDeserializeReader(generateTestSchema(), MakeBlobsReader(blobs))
assert.NoError(t, err)
defer reader.Close()
@ -170,13 +174,13 @@ func TestBinlogSerializeWriter(t *testing.T) {
newblobs[i] = blob
i++
}
// Both field pk and field 17 are with datatype string and auto id
// Both field pk and field 13 are with datatype int64 and auto id
// in test data. Field pk uses delta byte array encoding, while
// field 17 uses dict encoding.
assert.Less(t, writers[16].buf.Len(), writers[17].buf.Len())
// field 13 uses dict encoding.
assert.Less(t, writers[0].buf.Len(), writers[13].buf.Len())
// assert.Equal(t, blobs[0].Value, newblobs[0].Value)
reader, err = NewBinlogDeserializeReader(newblobs, common.RowIDField)
reader, err = NewBinlogDeserializeReader(generateTestSchema(), MakeBlobsReader(newblobs))
assert.NoError(t, err)
defer reader.Close()
for i := 1; i <= size; i++ {
@ -299,7 +303,7 @@ func TestNull(t *testing.T) {
blobs[i] = blob
i++
}
reader, err := NewBinlogDeserializeReader(blobs, common.RowIDField)
reader, err := NewBinlogDeserializeReader(generateTestSchema(), MakeBlobsReader(blobs))
assert.NoError(t, err)
defer reader.Close()
err = reader.Next()

View File

@ -31,7 +31,7 @@ func TestPackedSerde(t *testing.T) {
blobs, err := generateTestData(size)
assert.NoError(t, err)
reader, err := NewBinlogDeserializeReader(blobs, common.RowIDField)
reader, err := NewBinlogDeserializeReader(generateTestSchema(), MakeBlobsReader(blobs))
assert.NoError(t, err)
defer reader.Close()

View File

@ -127,7 +127,7 @@ func BenchmarkDeserializeReader(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
reader, err := NewBinlogDeserializeReader(blobs, common.RowIDField)
reader, err := NewBinlogDeserializeReader(generateTestSchema(), MakeBlobsReader(blobs))
assert.NoError(b, err)
defer reader.Close()
for i := 0; i < len; i++ {

View File

@ -29,11 +29,11 @@ func TestSort(t *testing.T) {
getReaders := func() []RecordReader {
blobs, err := generateTestDataWithSeed(10, 3)
assert.NoError(t, err)
reader10, err := NewCompositeBinlogRecordReader(blobs)
reader10, err := NewCompositeBinlogRecordReader(generateTestSchema(), MakeBlobsReader(blobs))
assert.NoError(t, err)
blobs, err = generateTestDataWithSeed(20, 3)
assert.NoError(t, err)
reader20, err := NewCompositeBinlogRecordReader(blobs)
reader20, err := NewCompositeBinlogRecordReader(generateTestSchema(), MakeBlobsReader(blobs))
assert.NoError(t, err)
rr := []RecordReader{reader20, reader10}
return rr
@ -80,11 +80,11 @@ func TestMergeSort(t *testing.T) {
getReaders := func() []RecordReader {
blobs, err := generateTestDataWithSeed(10, 3)
assert.NoError(t, err)
reader10, err := NewCompositeBinlogRecordReader(blobs)
reader10, err := NewCompositeBinlogRecordReader(generateTestSchema(), MakeBlobsReader(blobs))
assert.NoError(t, err)
blobs, err = generateTestDataWithSeed(20, 3)
assert.NoError(t, err)
reader20, err := NewCompositeBinlogRecordReader(blobs)
reader20, err := NewCompositeBinlogRecordReader(generateTestSchema(), MakeBlobsReader(blobs))
assert.NoError(t, err)
rr := []RecordReader{reader20, reader10}
return rr
@ -132,11 +132,11 @@ func BenchmarkSort(b *testing.B) {
batch := 500000
blobs, err := generateTestDataWithSeed(batch, batch)
assert.NoError(b, err)
reader10, err := NewCompositeBinlogRecordReader(blobs)
reader10, err := NewCompositeBinlogRecordReader(generateTestSchema(), MakeBlobsReader(blobs))
assert.NoError(b, err)
blobs, err = generateTestDataWithSeed(batch*2+1, batch)
assert.NoError(b, err)
reader20, err := NewCompositeBinlogRecordReader(blobs)
reader20, err := NewCompositeBinlogRecordReader(generateTestSchema(), MakeBlobsReader(blobs))
assert.NoError(b, err)
rr := []RecordReader{reader20, reader10}