From afef5fed60993e4c0fe38656d33bfb8232304e3e Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Fri, 10 Jan 2025 14:07:05 +0800 Subject: [PATCH] fix: Clustering compaction ignoring deltalogs (#39133) See also: #39131 pr: #39132 Signed-off-by: yangxuan --- .../compaction/clustering_compactor.go | 3 ++- .../compaction/clustering_compactor_test.go | 18 ++++++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/internal/datanode/compaction/clustering_compactor.go b/internal/datanode/compaction/clustering_compactor.go index 94a6004028..62ace26650 100644 --- a/internal/datanode/compaction/clustering_compactor.go +++ b/internal/datanode/compaction/clustering_compactor.go @@ -473,7 +473,8 @@ func (t *clusteringCompactionTask) mapping(ctx context.Context, for _, segment := range inputSegments { segmentClone := &datapb.CompactionSegmentBinlogs{ SegmentID: segment.SegmentID, - // only FieldBinlogs needed + // only FieldBinlogs and deltalogs needed + Deltalogs: segment.Deltalogs, FieldBinlogs: segment.FieldBinlogs, } future := t.mappingPool.Submit(func() (any, error) { diff --git a/internal/datanode/compaction/clustering_compactor_test.go b/internal/datanode/compaction/clustering_compactor_test.go index 1833f87871..9a79db3208 100644 --- a/internal/datanode/compaction/clustering_compactor_test.go +++ b/internal/datanode/compaction/clustering_compactor_test.go @@ -170,6 +170,15 @@ func (s *ClusteringCompactionTaskSuite) TestCompactionInit() { } func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormal() { + dblobs, err := getInt64DeltaBlobs( + 1, + []int64{100}, + []uint64{tsoutil.ComposeTSByTime(getMilvusBirthday().Add(time.Second), 0)}, + ) + s.Require().NoError(err) + s.mockBinlogIO.EXPECT().Download(mock.Anything, []string{"1"}). + Return([][]byte{dblobs.GetValue()}, nil).Once() + schema := genCollectionSchema() var segmentID int64 = 1001 segWriter, err := NewSegmentWriter(schema, 1000, compactionBatchSize, segmentID, PartitionID, CollectionID, []int64{}) @@ -193,6 +202,9 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormal() { { SegmentID: segmentID, FieldBinlogs: lo.Values(fBinlogs), + Deltalogs: []*datapb.FieldBinlog{ + {Binlogs: []*datapb.Binlog{{LogID: 1, LogPath: "1"}}}, + }, }, } @@ -236,6 +248,12 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormal() { s.Equal(2, totalBinlogNum/len(schema.GetFields())) s.Equal(1, statsBinlogNum) s.Equal(totalRowNum, statsRowNum) + + s.EqualValues(10239, + lo.SumBy(compactionResult.GetSegments(), func(seg *datapb.CompactionSegment) int64 { + return seg.GetNumOfRows() + }), + ) } func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormalByMemoryLimit() {