From 6dc7d2041f72ac8a9cbab7c8c5dffded9d9bd711 Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Fri, 6 Sep 2024 20:09:05 +0800 Subject: [PATCH] fix: Set an empty segment if compaction deleted all inserts (#36045) See also: #36038 pr: #36044 --------- Signed-off-by: yangxuan --- .../datanode/compaction/mix_compactor_test.go | 21 ++++++++++++------- .../datanode/compaction/segment_writer.go | 21 ++++++++++++++++++- 2 files changed, 33 insertions(+), 9 deletions(-) diff --git a/internal/datanode/compaction/mix_compactor_test.go b/internal/datanode/compaction/mix_compactor_test.go index 1b7dadf303..13e02ca60a 100644 --- a/internal/datanode/compaction/mix_compactor_test.go +++ b/internal/datanode/compaction/mix_compactor_test.go @@ -221,6 +221,7 @@ func (s *MixCompactionTaskSuite) TestSplitMergeEntityExpired() { s.task.currentTs = currTs s.task.plan.CollectionTtl = int64(collTTL) s.mockAlloc.EXPECT().Alloc(mock.Anything).Return(888888, 999999, nil) + s.mockAlloc.EXPECT().AllocOne().Return(19531, nil) kvs, _, err := serializeWrite(context.TODO(), s.task.Allocator, s.segWriter) s.Require().NoError(err) @@ -237,7 +238,12 @@ func (s *MixCompactionTaskSuite) TestSplitMergeEntityExpired() { compactionSegments, err := s.task.mergeSplit(s.task.ctx, [][]string{lo.Keys(kvs)}, nil) s.NoError(err) - s.Equal(0, len(compactionSegments)) + s.Equal(1, len(compactionSegments)) + s.EqualValues(0, compactionSegments[0].GetNumOfRows()) + s.EqualValues(19531, compactionSegments[0].GetSegmentID()) + s.Empty(compactionSegments[0].GetDeltalogs()) + s.Empty(compactionSegments[0].GetInsertLogs()) + s.Empty(compactionSegments[0].GetField2StatslogPaths()) } func (s *MixCompactionTaskSuite) TestMergeNoExpiration() { @@ -247,10 +253,11 @@ func (s *MixCompactionTaskSuite) TestMergeNoExpiration() { description string deletions map[interface{}]uint64 expectedRes int + leftNumRows int }{ - {"no deletion", nil, 1}, - {"mismatch deletion", map[interface{}]uint64{int64(1): deleteTs}, 1}, - {"deleted pk=4", map[interface{}]uint64{int64(4): deleteTs}, 0}, + {"no deletion", nil, 1, 1}, + {"mismatch deletion", map[interface{}]uint64{int64(1): deleteTs}, 1, 1}, + {"deleted pk=4", map[interface{}]uint64{int64(4): deleteTs}, 1, 0}, } s.mockAlloc.EXPECT().Alloc(mock.Anything).Return(888888, 999999, nil) @@ -258,7 +265,7 @@ func (s *MixCompactionTaskSuite) TestMergeNoExpiration() { s.Require().NoError(err) for _, test := range tests { s.Run(test.description, func() { - if test.expectedRes > 0 { + if test.leftNumRows > 0 { s.mockAlloc.EXPECT().Alloc(mock.Anything).Return(77777, 99999, nil).Once() } s.mockAlloc.EXPECT().AllocOne().Return(888888, nil).Maybe() @@ -275,9 +282,7 @@ func (s *MixCompactionTaskSuite) TestMergeNoExpiration() { res, err := s.task.mergeSplit(s.task.ctx, [][]string{lo.Keys(kvs)}, test.deletions) s.NoError(err) s.EqualValues(test.expectedRes, len(res)) - if test.expectedRes > 0 { - s.EqualValues(1, res[0].GetNumOfRows()) - } + s.EqualValues(test.leftNumRows, res[0].GetNumOfRows()) }) } } diff --git a/internal/datanode/compaction/segment_writer.go b/internal/datanode/compaction/segment_writer.go index 414220a086..a5cd9548ef 100644 --- a/internal/datanode/compaction/segment_writer.go +++ b/internal/datanode/compaction/segment_writer.go @@ -47,6 +47,7 @@ type MultiSegmentWriter struct { // segID -> fieldID -> binlogs res []*datapb.CompactionSegment + // DONOT leave it empty of all segments are deleted, just return a segment with zero meta for datacoord } func NewMultiSegmentWriter(binlogIO io.BinlogIO, allocator allocator.Allocator, plan *datapb.CompactionPlan, maxRows int64, partitionID, collectionID int64) *MultiSegmentWriter { @@ -175,9 +176,27 @@ func (w *MultiSegmentWriter) Write(v *storage.Value) error { return writer.Write(v) } -// Could return an empty list if every insert of the segment is deleted +func (w *MultiSegmentWriter) appendEmptySegment() error { + writer, err := w.getWriter() + if err != nil { + return err + } + + w.res = append(w.res, &datapb.CompactionSegment{ + SegmentID: writer.GetSegmentID(), + NumOfRows: 0, + Channel: w.channel, + }) + return nil +} + +// DONOT return an empty list if every insert of the segment is deleted, +// append an empty segment instead func (w *MultiSegmentWriter) Finish() ([]*datapb.CompactionSegment, error) { if w.current == -1 { + if err := w.appendEmptySegment(); err != nil { + return nil, err + } return w.res, nil }