diff --git a/internal/datanode/compaction/clustering_compactor.go b/internal/datanode/compaction/clustering_compactor.go index 62ace26650..c8fc77e32c 100644 --- a/internal/datanode/compaction/clustering_compactor.go +++ b/internal/datanode/compaction/clustering_compactor.go @@ -621,8 +621,7 @@ func (t *clusteringCompactionTask) mappingSegment( blobs := lo.Map(allValues, func(v []byte, i int) *storage.Blob { return &storage.Blob{Key: paths[i], Value: v} }) - - pkIter, err := storage.NewBinlogDeserializeReader(blobs, t.primaryKeyField.GetFieldID()) + pkIter, err := storage.NewBinlogDeserializeReader(t.plan.Schema, storage.MakeBlobsReader(blobs)) if err != nil { log.Warn("new insert binlogs Itr wrong", zap.Strings("paths", paths), zap.Error(err)) return err @@ -1196,7 +1195,7 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment( return &storage.Blob{Key: paths[i], Value: v} }) - pkIter, err := storage.NewBinlogDeserializeReader(blobs, t.primaryKeyField.GetFieldID()) + pkIter, err := storage.NewBinlogDeserializeReader(t.plan.Schema, storage.MakeBlobsReader(blobs)) if err != nil { log.Warn("new insert binlogs Itr wrong", zap.Strings("path", paths), zap.Error(err)) return nil, err diff --git a/internal/datanode/compaction/compactor_common.go b/internal/datanode/compaction/compactor_common.go index 0e34995891..031c390433 100644 --- a/internal/datanode/compaction/compactor_common.go +++ b/internal/datanode/compaction/compactor_common.go @@ -168,50 +168,6 @@ func mergeDeltalogs(ctx context.Context, io io.BinlogIO, paths []string) (map[in return pk2Ts, nil } -func composePaths(segments []*datapb.CompactionSegmentBinlogs) ( - deltaPaths map[typeutil.UniqueID][]string, insertPaths map[typeutil.UniqueID][]string, err error, -) { - if err := binlog.DecompressCompactionBinlogs(segments); err != nil { - log.Warn("compact wrong, fail to decompress compaction binlogs", zap.Error(err)) - return nil, nil, err - } - - deltaPaths = make(map[typeutil.UniqueID][]string) // segmentID to deltalog paths - insertPaths = make(map[typeutil.UniqueID][]string, 0) // segmentID to binlog paths - for _, s := range segments { - segId := s.GetSegmentID() - // Get the batch count of field binlog files from non-empty segment - // each segment might contain different batches - var binlogBatchCount int - for _, b := range s.GetFieldBinlogs() { - if b != nil { - binlogBatchCount = len(b.GetBinlogs()) - break - } - } - if binlogBatchCount == 0 { - log.Warn("compacting empty segment", zap.Int64("segmentID", s.GetSegmentID())) - continue - } - - for idx := 0; idx < binlogBatchCount; idx++ { - var batchPaths []string - for _, f := range s.GetFieldBinlogs() { - batchPaths = append(batchPaths, f.GetBinlogs()[idx].GetLogPath()) - } - insertPaths[segId] = append(insertPaths[segId], batchPaths...) - } - - deltaPaths[s.GetSegmentID()] = []string{} - for _, d := range s.GetDeltalogs() { - for _, l := range d.GetBinlogs() { - deltaPaths[segId] = append(deltaPaths[s.GetSegmentID()], l.GetLogPath()) - } - } - } - return deltaPaths, insertPaths, nil -} - func serializeWrite(ctx context.Context, allocator allocator.Interface, writer *SegmentWriter) (kvs map[string][]byte, fieldBinlogs map[int64]*datapb.FieldBinlog, err error) { _, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "serializeWrite") defer span.End() diff --git a/internal/datanode/compaction/mix_compactor.go b/internal/datanode/compaction/mix_compactor.go index 1c51db02c2..2848bb3811 100644 --- a/internal/datanode/compaction/mix_compactor.go +++ b/internal/datanode/compaction/mix_compactor.go @@ -32,6 +32,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/flushcommon/io" + "github.com/milvus-io/milvus/internal/metastore/kv/binlog" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" @@ -56,7 +57,6 @@ type mixCompactionTask struct { partitionID int64 targetSize int64 maxRows int64 - pkID int64 bm25FieldIDs []int64 @@ -131,8 +131,6 @@ func (t *mixCompactionTask) preCompact() error { func (t *mixCompactionTask) mergeSplit( ctx context.Context, - insertPaths map[int64][]string, - deltaPaths map[int64][]string, ) ([]*datapb.CompactionSegment, error) { _ = t.tr.RecordSpan() @@ -154,9 +152,9 @@ func (t *mixCompactionTask) mergeSplit( log.Warn("failed to get pk field from schema") return nil, err } - for segId, binlogPaths := range insertPaths { - deltaPaths := deltaPaths[segId] - del, exp, err := t.writeSegment(ctx, binlogPaths, deltaPaths, mWriter, pkField) + + for _, seg := range t.plan.GetSegmentBinlogs() { + del, exp, err := t.writeSegment(ctx, seg, mWriter, pkField) if err != nil { return nil, err } @@ -180,21 +178,15 @@ func (t *mixCompactionTask) mergeSplit( } func (t *mixCompactionTask) writeSegment(ctx context.Context, - binlogPaths []string, - deltaPaths []string, + seg *datapb.CompactionSegmentBinlogs, mWriter *MultiSegmentWriter, pkField *schemapb.FieldSchema, ) (deletedRowCount, expiredRowCount int64, err error) { - log := log.With(zap.Strings("paths", binlogPaths)) - allValues, err := t.binlogIO.Download(ctx, binlogPaths) - if err != nil { - log.Warn("compact wrong, fail to download insertLogs", zap.Error(err)) - return + deltaPaths := make([]string, 0) + for _, fieldBinlog := range seg.GetDeltalogs() { + for _, binlog := range fieldBinlog.GetBinlogs() { + deltaPaths = append(deltaPaths, binlog.GetLogPath()) + } } - - blobs := lo.Map(allValues, func(v []byte, i int) *storage.Blob { - return &storage.Blob{Key: binlogPaths[i], Value: v} - }) - delta, err := mergeDeltalogs(ctx, t.binlogIO, deltaPaths) if err != nil { log.Warn("compact wrong, fail to merge deltalogs", zap.Error(err)) @@ -202,7 +194,30 @@ func (t *mixCompactionTask) writeSegment(ctx context.Context, } entityFilter := newEntityFilter(delta, t.plan.GetCollectionTtl(), t.currentTime) - reader, err := storage.NewCompositeBinlogRecordReader(blobs) + itr := 0 + binlogs := seg.GetFieldBinlogs() + reader, err := storage.NewCompositeBinlogRecordReader(t.plan.GetSchema(), func() ([]*storage.Blob, error) { + if len(binlogs) <= 0 { + return nil, sio.EOF + } + paths := make([]string, len(binlogs)) + for i, fieldBinlog := range binlogs { + if itr >= len(fieldBinlog.GetBinlogs()) { + return nil, sio.EOF + } + paths[i] = fieldBinlog.GetBinlogs()[itr].GetLogPath() + } + itr++ + values, err := t.binlogIO.Download(ctx, paths) + if err != nil { + log.Warn("compact wrong, fail to download insertLogs", zap.Error(err)) + return nil, err + } + blobs := lo.Map(values, func(v []byte, i int) *storage.Blob { + return &storage.Blob{Key: paths[i], Value: v} + }) + return blobs, nil + }) if err != nil { log.Warn("compact wrong, failed to new insert binlogs reader", zap.Error(err)) return @@ -302,19 +317,22 @@ func (t *mixCompactionTask) Compact() (*datapb.CompactionPlanResult, error) { defer cancelAll() log.Info("compact start") - deltaPaths, insertPaths, err := composePaths(t.plan.GetSegmentBinlogs()) - if err != nil { - log.Warn("compact wrong, failed to composePaths", zap.Error(err)) + // Decompress compaction binlogs first + if err := binlog.DecompressCompactionBinlogs(t.plan.SegmentBinlogs); err != nil { + log.Warn("compact wrong, fail to decompress compaction binlogs", zap.Error(err)) return nil, err } // Unable to deal with all empty segments cases, so return error - isEmpty := true - for _, paths := range insertPaths { - if len(paths) > 0 { - isEmpty = false - break + isEmpty := func() bool { + for _, seg := range t.plan.GetSegmentBinlogs() { + for _, field := range seg.GetFieldBinlogs() { + if len(field.GetBinlogs()) > 0 { + return false + } + } } - } + return true + }() if isEmpty { log.Warn("compact wrong, all segments' binlogs are empty") return nil, errors.New("illegal compaction plan") @@ -328,13 +346,15 @@ func (t *mixCompactionTask) Compact() (*datapb.CompactionPlanResult, error) { break } } - if len(insertPaths) <= 1 || len(insertPaths) > paramtable.Get().DataNodeCfg.MaxSegmentMergeSort.GetAsInt() { + if len(t.plan.GetSegmentBinlogs()) <= 1 || + len(t.plan.GetSegmentBinlogs()) > paramtable.Get().DataNodeCfg.MaxSegmentMergeSort.GetAsInt() { // sort merge is not applicable if there is only one segment or too many segments sortMergeAppicable = false } } var res []*datapb.CompactionSegment + var err error if sortMergeAppicable { log.Info("compact by merge sort") res, err = mergeSortMultipleSegments(ctxTimeout, t.plan, t.collectionID, t.partitionID, t.maxRows, t.binlogIO, @@ -344,7 +364,7 @@ func (t *mixCompactionTask) Compact() (*datapb.CompactionPlanResult, error) { return nil, err } } else { - res, err = t.mergeSplit(ctxTimeout, insertPaths, deltaPaths) + res, err = t.mergeSplit(ctxTimeout) if err != nil { log.Warn("compact wrong, failed to mergeSplit", zap.Error(err)) return nil, err diff --git a/internal/datanode/compaction/mix_compactor_test.go b/internal/datanode/compaction/mix_compactor_test.go index 7c95868412..063f777eef 100644 --- a/internal/datanode/compaction/mix_compactor_test.go +++ b/internal/datanode/compaction/mix_compactor_test.go @@ -56,7 +56,6 @@ type MixCompactionTaskSuite struct { segWriter *SegmentWriter task *mixCompactionTask - plan *datapb.CompactionPlan } func (s *MixCompactionTaskSuite) SetupSuite() { @@ -70,7 +69,7 @@ func (s *MixCompactionTaskSuite) SetupTest() { paramtable.Get().Save(paramtable.Get().CommonCfg.EntityExpirationTTL.Key, "0") - s.plan = &datapb.CompactionPlan{ + plan := &datapb.CompactionPlan{ PlanID: 999, SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{{ SegmentID: 100, @@ -86,15 +85,14 @@ func (s *MixCompactionTaskSuite) SetupTest() { MaxSize: 64 * 1024 * 1024, } - s.task = NewMixCompactionTask(context.Background(), s.mockBinlogIO, s.plan) - s.task.plan = s.plan + s.task = NewMixCompactionTask(context.Background(), s.mockBinlogIO, plan) } func (s *MixCompactionTaskSuite) SetupBM25() { s.mockBinlogIO = io.NewMockBinlogIO(s.T()) s.meta = genTestCollectionMetaWithBM25() - s.plan = &datapb.CompactionPlan{ + plan := &datapb.CompactionPlan{ PlanID: 999, SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{{ SegmentID: 100, @@ -110,8 +108,7 @@ func (s *MixCompactionTaskSuite) SetupBM25() { MaxSize: 64 * 1024 * 1024, } - s.task = NewMixCompactionTask(context.Background(), s.mockBinlogIO, s.plan) - s.task.plan = s.plan + s.task = NewMixCompactionTask(context.Background(), s.mockBinlogIO, plan) } func (s *MixCompactionTaskSuite) SetupSubTest() { @@ -163,7 +160,7 @@ func (s *MixCompactionTaskSuite) TestCompactDupPK() { return len(left) == 0 && len(right) == 0 })).Return(lo.Values(kvs), nil).Once() - s.plan.SegmentBinlogs = append(s.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{ + s.task.plan.SegmentBinlogs = append(s.task.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{ SegmentID: segID, FieldBinlogs: lo.Values(fBinlogs), Deltalogs: []*datapb.FieldBinlog{ @@ -200,7 +197,7 @@ func (s *MixCompactionTaskSuite) TestCompactTwoToOne() { return len(left) == 0 && len(right) == 0 })).Return(lo.Values(kvs), nil).Once() - s.plan.SegmentBinlogs = append(s.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{ + s.task.plan.SegmentBinlogs = append(s.task.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{ SegmentID: segID, FieldBinlogs: lo.Values(fBinlogs), }) @@ -214,7 +211,7 @@ func (s *MixCompactionTaskSuite) TestCompactTwoToOne() { NumOfRows: 0, }, pkoracle.NewBloomFilterSet(), nil) - s.plan.SegmentBinlogs = append(s.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{ + s.task.plan.SegmentBinlogs = append(s.task.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{ SegmentID: seg.SegmentID(), }) @@ -248,7 +245,7 @@ func (s *MixCompactionTaskSuite) TestCompactTwoToOneWithBM25() { return len(left) == 0 && len(right) == 0 })).Return(lo.Values(kvs), nil).Once() - s.plan.SegmentBinlogs = append(s.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{ + s.task.plan.SegmentBinlogs = append(s.task.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{ SegmentID: segID, FieldBinlogs: lo.Values(fBinlogs), }) @@ -262,7 +259,7 @@ func (s *MixCompactionTaskSuite) TestCompactTwoToOneWithBM25() { NumOfRows: 0, }, pkoracle.NewBloomFilterSet(), nil) - s.plan.SegmentBinlogs = append(s.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{ + s.task.plan.SegmentBinlogs = append(s.task.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{ SegmentID: seg.SegmentID(), }) @@ -309,7 +306,7 @@ func (s *MixCompactionTaskSuite) TestCompactSortedSegment() { s.mockBinlogIO.EXPECT().Download(mock.Anything, []string{deltaPath}). Return([][]byte{blob.GetValue()}, nil).Once() - s.plan.SegmentBinlogs = append(s.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{ + s.task.plan.SegmentBinlogs = append(s.task.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{ SegmentID: segID, FieldBinlogs: lo.Values(fBinlogs), IsSorted: true, @@ -357,7 +354,18 @@ func (s *MixCompactionTaskSuite) TestSplitMergeEntityExpired() { s.task.partitionID = PartitionID s.task.maxRows = 1000 - compactionSegments, err := s.task.mergeSplit(s.task.ctx, map[int64][]string{s.segWriter.segmentID: lo.Keys(kvs)}, nil) + fieldBinlogs := make([]*datapb.FieldBinlog, 0, len(kvs)) + for k := range kvs { + fieldBinlogs = append(fieldBinlogs, &datapb.FieldBinlog{ + Binlogs: []*datapb.Binlog{ + { + LogPath: k, + }, + }, + }) + } + s.task.plan.SegmentBinlogs[0].FieldBinlogs = fieldBinlogs + compactionSegments, err := s.task.mergeSplit(s.task.ctx) s.NoError(err) s.Equal(1, len(compactionSegments)) s.EqualValues(0, compactionSegments[0].GetNumOfRows()) @@ -383,16 +391,10 @@ func (s *MixCompactionTaskSuite) TestMergeNoExpiration() { alloc := allocator.NewLocalAllocator(888888, math.MaxInt64) kvs, _, err := serializeWrite(context.TODO(), alloc, s.segWriter) - insertPaths := lo.Keys(kvs) s.Require().NoError(err) + for _, test := range tests { s.Run(test.description, func() { - s.mockBinlogIO.EXPECT().Download(mock.Anything, insertPaths).RunAndReturn( - func(ctx context.Context, paths []string) ([][]byte, error) { - s.Require().Equal(len(paths), len(kvs)) - return lo.Values(kvs), nil - }) - deletePaths := make(map[int64][]string, 0) if len(test.deletions) > 0 { blob, err := getInt64DeltaBlobs( s.segWriter.segmentID, @@ -402,15 +404,49 @@ func (s *MixCompactionTaskSuite) TestMergeNoExpiration() { s.Require().NoError(err) s.mockBinlogIO.EXPECT().Download(mock.Anything, []string{"foo"}). Return([][]byte{blob.GetValue()}, nil).Once() - deletePaths[s.segWriter.segmentID] = []string{"foo"} + s.task.plan.SegmentBinlogs[0].Deltalogs = []*datapb.FieldBinlog{ + { + Binlogs: []*datapb.Binlog{ + { + LogPath: "foo", + }, + }, + }, + } } + insertPaths := lo.Keys(kvs) + insertBytes := func() [][]byte { + res := make([][]byte, 0, len(insertPaths)) + for _, path := range insertPaths { + res = append(res, kvs[path]) + } + return res + }() + s.mockBinlogIO.EXPECT().Download(mock.Anything, insertPaths).RunAndReturn( + func(ctx context.Context, paths []string) ([][]byte, error) { + s.Require().Equal(len(paths), len(kvs)) + return insertBytes, nil + }) + fieldBinlogs := make([]*datapb.FieldBinlog, 0, len(insertPaths)) + for _, k := range insertPaths { + fieldBinlogs = append(fieldBinlogs, &datapb.FieldBinlog{ + Binlogs: []*datapb.Binlog{ + { + LogPath: k, + }, + }, + }) + } + s.task.plan.SegmentBinlogs[0].FieldBinlogs = fieldBinlogs + s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil).Maybe() s.task.collectionID = CollectionID s.task.partitionID = PartitionID s.task.maxRows = 1000 - res, err := s.task.mergeSplit(s.task.ctx, map[int64][]string{s.segWriter.segmentID: insertPaths}, deletePaths) + + res, err := s.task.mergeSplit(s.task.ctx) s.NoError(err) s.EqualValues(test.expectedRes, len(res)) s.EqualValues(test.leftNumRows, res[0].GetNumOfRows()) @@ -570,14 +606,14 @@ func (s *MixCompactionTaskSuite) TestCompactFail() { }) s.Run("Test compact invalid empty segment binlogs", func() { - s.plan.SegmentBinlogs = nil + s.task.plan.SegmentBinlogs = nil _, err := s.task.Compact() s.Error(err) }) s.Run("Test compact failed maxSize zero", func() { - s.plan.MaxSize = 0 + s.task.plan.MaxSize = 0 _, err := s.task.Compact() s.Error(err) }) @@ -876,7 +912,7 @@ func BenchmarkMixCompactor(b *testing.B) { return len(left) == 0 && len(right) == 0 })).Return(lo.Values(kvs), nil).Once() - s.plan.SegmentBinlogs = append(s.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{ + s.task.plan.SegmentBinlogs = append(s.task.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{ SegmentID: segID, FieldBinlogs: lo.Values(fBinlogs), }) diff --git a/internal/indexnode/task_stats.go b/internal/indexnode/task_stats.go index 1b51da6a10..85bcffeb56 100644 --- a/internal/indexnode/task_stats.go +++ b/internal/indexnode/task_stats.go @@ -413,7 +413,7 @@ func (st *statsTask) sort(ctx context.Context) ([]*datapb.FieldBinlog, error) { return &storage.Blob{Key: paths[i], Value: v} }) - rr, err := storage.NewCompositeBinlogRecordReader(blobs) + rr, err := storage.NewCompositeBinlogRecordReader(st.req.Schema, storage.MakeBlobsReader(blobs)) if err != nil { log.Warn("downloadData wrong, failed to new insert binlogs reader", zap.Error(err)) return nil, err diff --git a/internal/storage/binlog_iterator_test.go b/internal/storage/binlog_iterator_test.go index fb4f8600d2..bec802328f 100644 --- a/internal/storage/binlog_iterator_test.go +++ b/internal/storage/binlog_iterator_test.go @@ -33,14 +33,14 @@ import ( func generateTestSchema() *schemapb.CollectionSchema { schema := &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{ {FieldID: common.TimeStampField, Name: "ts", DataType: schemapb.DataType_Int64}, - {FieldID: common.RowIDField, Name: "rowid", DataType: schemapb.DataType_Int64}, + {FieldID: common.RowIDField, Name: "rowid", DataType: schemapb.DataType_Int64, IsPrimaryKey: true}, {FieldID: 10, Name: "bool", DataType: schemapb.DataType_Bool}, {FieldID: 11, Name: "int8", DataType: schemapb.DataType_Int8}, {FieldID: 12, Name: "int16", DataType: schemapb.DataType_Int16}, {FieldID: 13, Name: "int64", DataType: schemapb.DataType_Int64}, {FieldID: 14, Name: "float", DataType: schemapb.DataType_Float}, {FieldID: 15, Name: "double", DataType: schemapb.DataType_Double}, - {FieldID: 16, Name: "varchar", DataType: schemapb.DataType_VarChar, IsPrimaryKey: true}, + {FieldID: 16, Name: "varchar", DataType: schemapb.DataType_VarChar}, {FieldID: 17, Name: "string", DataType: schemapb.DataType_String}, {FieldID: 18, Name: "array", DataType: schemapb.DataType_Array}, {FieldID: 19, Name: "json", DataType: schemapb.DataType_JSON}, diff --git a/internal/storage/serde_events.go b/internal/storage/serde_events.go index c4181e5cc4..9373843879 100644 --- a/internal/storage/serde_events.go +++ b/internal/storage/serde_events.go @@ -48,7 +48,7 @@ type CompositeBinlogRecordReader struct { brs []*BinlogReader rrs []array.RecordReader - schema map[FieldID]schemapb.DataType + schema *schemapb.CollectionSchema index map[FieldID]int16 r *compositeRecord } @@ -71,7 +71,6 @@ func (crr *CompositeBinlogRecordReader) iterateNextBatch() error { if crr.rrs == nil { crr.rrs = make([]array.RecordReader, len(blobs)) crr.brs = make([]*BinlogReader, len(blobs)) - crr.schema = make(map[FieldID]schemapb.DataType) crr.index = make(map[FieldID]int16, len(blobs)) } @@ -81,8 +80,6 @@ func (crr *CompositeBinlogRecordReader) iterateNextBatch() error { return err } - // TODO: assert schema being the same in every blobs - crr.schema[reader.FieldID] = reader.PayloadDataType er, err := reader.NextEventReader() if err != nil { return err @@ -169,7 +166,7 @@ func parseBlobKey(blobKey string) (colId FieldID, logId UniqueID) { return InvalidUniqueID, InvalidUniqueID } -func NewCompositeBinlogRecordReader(blobs []*Blob) (*CompositeBinlogRecordReader, error) { +func MakeBlobsReader(blobs []*Blob) ChunkedBlobsReader { blobMap := make(map[FieldID][]*Blob) for _, blob := range blobs { colId, _ := parseBlobKey(blob.Key) @@ -190,88 +187,105 @@ func NewCompositeBinlogRecordReader(blobs []*Blob) (*CompositeBinlogRecordReader sortedBlobs = append(sortedBlobs, blobsForField) } chunkPos := 0 + return func() ([]*Blob, error) { + if len(sortedBlobs) == 0 || chunkPos >= len(sortedBlobs[0]) { + return nil, io.EOF + } + blobs := make([]*Blob, len(sortedBlobs)) + for fieldPos := range blobs { + blobs[fieldPos] = sortedBlobs[fieldPos][chunkPos] + } + chunkPos++ + return blobs, nil + } +} + +func NewCompositeBinlogRecordReader(schema *schemapb.CollectionSchema, blobsReader ChunkedBlobsReader) (*CompositeBinlogRecordReader, error) { return &CompositeBinlogRecordReader{ - BlobsReader: func() ([]*Blob, error) { - if len(sortedBlobs) == 0 || chunkPos >= len(sortedBlobs[0]) { - return nil, io.EOF - } - blobs := make([]*Blob, len(sortedBlobs)) - for fieldPos := range blobs { - blobs[fieldPos] = sortedBlobs[fieldPos][chunkPos] - } - chunkPos++ - return blobs, nil - }, + schema: schema, + BlobsReader: blobsReader, }, nil } -func NewBinlogDeserializeReader(blobs []*Blob, PKfieldID UniqueID) (*DeserializeReader[*Value], error) { - reader, err := NewCompositeBinlogRecordReader(blobs) +func ValueDeserializer(r Record, v []*Value, fieldSchema []*schemapb.FieldSchema) error { + pkField := func() *schemapb.FieldSchema { + for _, field := range fieldSchema { + if field.GetIsPrimaryKey() { + return field + } + } + return nil + }() + if pkField == nil { + return merr.WrapErrServiceInternal("no primary key field found") + } + + for i := 0; i < r.Len(); i++ { + value := v[i] + if value == nil { + value = &Value{} + value.Value = make(map[FieldID]interface{}, len(fieldSchema)) + v[i] = value + } + + m := value.Value.(map[FieldID]interface{}) + for _, f := range fieldSchema { + j := f.FieldID + dt := f.DataType + if r.Column(j).IsNull(i) { + m[j] = nil + } else { + d, ok := serdeMap[dt].deserialize(r.Column(j), i) + if ok { + m[j] = d // TODO: avoid memory copy here. + } else { + return merr.WrapErrServiceInternal(fmt.Sprintf("unexpected type %s", dt)) + } + } + } + + rowID, ok := m[common.RowIDField].(int64) + if !ok { + return merr.WrapErrIoKeyNotFound("no row id column found") + } + value.ID = rowID + value.Timestamp = m[common.TimeStampField].(int64) + + pk, err := GenPrimaryKeyByRawData(m[pkField.FieldID], pkField.DataType) + if err != nil { + return err + } + + value.PK = pk + value.IsDeleted = false + value.Value = m + } + return nil +} + +func NewBinlogDeserializeReader(schema *schemapb.CollectionSchema, blobsReader ChunkedBlobsReader) (*DeserializeReader[*Value], error) { + reader, err := NewCompositeBinlogRecordReader(schema, blobsReader) if err != nil { return nil, err } return NewDeserializeReader(reader, func(r Record, v []*Value) error { - schema := reader.schema - // Note: the return value `Value` is reused. - for i := 0; i < r.Len(); i++ { - value := v[i] - if value == nil { - value = &Value{} - value.Value = make(map[FieldID]interface{}, len(schema)) - v[i] = value - } - - m := value.Value.(map[FieldID]interface{}) - for j, dt := range schema { - if r.Column(j).IsNull(i) { - m[j] = nil - } else { - d, ok := serdeMap[dt].deserialize(r.Column(j), i) - if ok { - m[j] = d // TODO: avoid memory copy here. - } else { - return merr.WrapErrServiceInternal(fmt.Sprintf("unexpected type %s", dt)) - } - } - } - - rowID, ok := m[common.RowIDField].(int64) - if !ok { - return merr.WrapErrIoKeyNotFound("no row id column found") - } - value.ID = rowID - value.Timestamp = m[common.TimeStampField].(int64) - - pk, err := GenPrimaryKeyByRawData(m[PKfieldID], schema[PKfieldID]) - if err != nil { - return err - } - - value.PK = pk - value.IsDeleted = false - value.Value = m - } - return nil + return ValueDeserializer(r, v, schema.Fields) }), nil } func newDeltalogOneFieldReader(blobs []*Blob) (*DeserializeReader[*DeleteLog], error) { - reader, err := NewCompositeBinlogRecordReader(blobs) + reader, err := NewCompositeBinlogRecordReader(nil, MakeBlobsReader(blobs)) if err != nil { return nil, err } return NewDeserializeReader(reader, func(r Record, v []*DeleteLog) error { - var fid FieldID // The only fid from delete file - for k := range reader.schema { - fid = k - break - } for i := 0; i < r.Len(); i++ { if v[i] == nil { v[i] = &DeleteLog{} } - a := r.Column(fid).(*array.String) + // retrieve the only field + a := r.(*compositeRecord).recs[0].(*array.String) strVal := a.Value(i) if err := v[i].Parse(strVal); err != nil { return err diff --git a/internal/storage/serde_events_test.go b/internal/storage/serde_events_test.go index bb252395bf..c48547b246 100644 --- a/internal/storage/serde_events_test.go +++ b/internal/storage/serde_events_test.go @@ -43,7 +43,9 @@ import ( func TestBinlogDeserializeReader(t *testing.T) { t.Run("test empty data", func(t *testing.T) { - reader, err := NewBinlogDeserializeReader(nil, common.RowIDField) + reader, err := NewBinlogDeserializeReader(nil, func() ([]*Blob, error) { + return nil, io.EOF + }) assert.NoError(t, err) defer reader.Close() err = reader.Next() @@ -54,7 +56,7 @@ func TestBinlogDeserializeReader(t *testing.T) { size := 3 blobs, err := generateTestData(size) assert.NoError(t, err) - reader, err := NewBinlogDeserializeReader(blobs, common.RowIDField) + reader, err := NewBinlogDeserializeReader(generateTestSchema(), MakeBlobsReader(blobs)) assert.NoError(t, err) defer reader.Close() @@ -117,7 +119,9 @@ func TestBinlogStreamWriter(t *testing.T) { func TestBinlogSerializeWriter(t *testing.T) { t.Run("test empty data", func(t *testing.T) { - reader, err := NewBinlogDeserializeReader(nil, common.RowIDField) + reader, err := NewBinlogDeserializeReader(nil, func() ([]*Blob, error) { + return nil, io.EOF + }) assert.NoError(t, err) defer reader.Close() err = reader.Next() @@ -128,7 +132,7 @@ func TestBinlogSerializeWriter(t *testing.T) { size := 16 blobs, err := generateTestData(size) assert.NoError(t, err) - reader, err := NewBinlogDeserializeReader(blobs, common.RowIDField) + reader, err := NewBinlogDeserializeReader(generateTestSchema(), MakeBlobsReader(blobs)) assert.NoError(t, err) defer reader.Close() @@ -170,13 +174,13 @@ func TestBinlogSerializeWriter(t *testing.T) { newblobs[i] = blob i++ } - // Both field pk and field 17 are with datatype string and auto id + // Both field pk and field 13 are with datatype int64 and auto id // in test data. Field pk uses delta byte array encoding, while - // field 17 uses dict encoding. - assert.Less(t, writers[16].buf.Len(), writers[17].buf.Len()) + // field 13 uses dict encoding. + assert.Less(t, writers[0].buf.Len(), writers[13].buf.Len()) // assert.Equal(t, blobs[0].Value, newblobs[0].Value) - reader, err = NewBinlogDeserializeReader(newblobs, common.RowIDField) + reader, err = NewBinlogDeserializeReader(generateTestSchema(), MakeBlobsReader(newblobs)) assert.NoError(t, err) defer reader.Close() for i := 1; i <= size; i++ { @@ -299,7 +303,7 @@ func TestNull(t *testing.T) { blobs[i] = blob i++ } - reader, err := NewBinlogDeserializeReader(blobs, common.RowIDField) + reader, err := NewBinlogDeserializeReader(generateTestSchema(), MakeBlobsReader(blobs)) assert.NoError(t, err) defer reader.Close() err = reader.Next() diff --git a/internal/storage/serde_events_v2_test.go b/internal/storage/serde_events_v2_test.go index 39ab4102b2..f63d35701a 100644 --- a/internal/storage/serde_events_v2_test.go +++ b/internal/storage/serde_events_v2_test.go @@ -31,7 +31,7 @@ func TestPackedSerde(t *testing.T) { blobs, err := generateTestData(size) assert.NoError(t, err) - reader, err := NewBinlogDeserializeReader(blobs, common.RowIDField) + reader, err := NewBinlogDeserializeReader(generateTestSchema(), MakeBlobsReader(blobs)) assert.NoError(t, err) defer reader.Close() diff --git a/internal/storage/serde_test.go b/internal/storage/serde_test.go index df23ab229e..90dbf5b5c2 100644 --- a/internal/storage/serde_test.go +++ b/internal/storage/serde_test.go @@ -127,7 +127,7 @@ func BenchmarkDeserializeReader(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - reader, err := NewBinlogDeserializeReader(blobs, common.RowIDField) + reader, err := NewBinlogDeserializeReader(generateTestSchema(), MakeBlobsReader(blobs)) assert.NoError(b, err) defer reader.Close() for i := 0; i < len; i++ { diff --git a/internal/storage/sort_test.go b/internal/storage/sort_test.go index f93628b5a9..84806a667e 100644 --- a/internal/storage/sort_test.go +++ b/internal/storage/sort_test.go @@ -29,11 +29,11 @@ func TestSort(t *testing.T) { getReaders := func() []RecordReader { blobs, err := generateTestDataWithSeed(10, 3) assert.NoError(t, err) - reader10, err := NewCompositeBinlogRecordReader(blobs) + reader10, err := NewCompositeBinlogRecordReader(generateTestSchema(), MakeBlobsReader(blobs)) assert.NoError(t, err) blobs, err = generateTestDataWithSeed(20, 3) assert.NoError(t, err) - reader20, err := NewCompositeBinlogRecordReader(blobs) + reader20, err := NewCompositeBinlogRecordReader(generateTestSchema(), MakeBlobsReader(blobs)) assert.NoError(t, err) rr := []RecordReader{reader20, reader10} return rr @@ -80,11 +80,11 @@ func TestMergeSort(t *testing.T) { getReaders := func() []RecordReader { blobs, err := generateTestDataWithSeed(10, 3) assert.NoError(t, err) - reader10, err := NewCompositeBinlogRecordReader(blobs) + reader10, err := NewCompositeBinlogRecordReader(generateTestSchema(), MakeBlobsReader(blobs)) assert.NoError(t, err) blobs, err = generateTestDataWithSeed(20, 3) assert.NoError(t, err) - reader20, err := NewCompositeBinlogRecordReader(blobs) + reader20, err := NewCompositeBinlogRecordReader(generateTestSchema(), MakeBlobsReader(blobs)) assert.NoError(t, err) rr := []RecordReader{reader20, reader10} return rr @@ -132,11 +132,11 @@ func BenchmarkSort(b *testing.B) { batch := 500000 blobs, err := generateTestDataWithSeed(batch, batch) assert.NoError(b, err) - reader10, err := NewCompositeBinlogRecordReader(blobs) + reader10, err := NewCompositeBinlogRecordReader(generateTestSchema(), MakeBlobsReader(blobs)) assert.NoError(b, err) blobs, err = generateTestDataWithSeed(batch*2+1, batch) assert.NoError(b, err) - reader20, err := NewCompositeBinlogRecordReader(blobs) + reader20, err := NewCompositeBinlogRecordReader(generateTestSchema(), MakeBlobsReader(blobs)) assert.NoError(b, err) rr := []RecordReader{reader20, reader10}