From 6f85b49a4fec5322277fc7bd7c7ce9f1a6df2dee Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Thu, 24 Feb 2022 11:27:53 +0800 Subject: [PATCH] Add more compaction test cases (#15715) Test merge compaction with 2 segments with the same PK and only 1 valid deletion. This test can varify the compaction behavior of DataNode for the dup PK in different segment cases. Signed-off-by: yangxuan --- internal/datanode/compactor_test.go | 81 +++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) diff --git a/internal/datanode/compactor_test.go b/internal/datanode/compactor_test.go index f1617c0993..54d401b162 100644 --- a/internal/datanode/compactor_test.go +++ b/internal/datanode/compactor_test.go @@ -640,6 +640,87 @@ func TestCompactorInterfaceMethods(t *testing.T) { assert.NoError(t, err) assert.Equal(t, int64(4), updates.GetNumRows()) }) + + t.Run("Test typeII compact 2 segments with the same pk", func(t *testing.T) { + // Test merge compactions, two segments with the same pk, one deletion pk=1 + // The merged segment 19530 should only contain 2 rows and both pk=2 + // Both pk = 1 rows of the two segments are compacted. + var collID, partID, segID1, segID2 UniqueID = 1, 10, 200, 201 + + alloc := NewAllocatorFactory(1) + rc := &RootCoordFactory{} + dc := &DataCoordFactory{} + mockfm := &mockFlushManager{} + mockKv := memkv.NewMemoryKV() + mockbIO := &binlogIO{mockKv, alloc} + replica, err := newReplica(context.TODO(), rc, collID) + require.NoError(t, err) + + replica.addFlushedSegmentWithPKs(segID1, collID, partID, "channelname", 2, []UniqueID{1}) + replica.addFlushedSegmentWithPKs(segID2, collID, partID, "channelname", 2, []UniqueID{1}) + require.True(t, replica.hasSegment(segID1, true)) + require.True(t, replica.hasSegment(segID2, true)) + + meta := NewMetaFactory().GetCollectionMeta(collID, "test_compact_coll_name") + // the same pk for segmentI and segmentII + iData1 := genInsertDataWithPKs([2]int64{1, 2}) + iData2 := genInsertDataWithPKs([2]int64{1, 2}) + + dData1 := &DeleteData{ + Pks: []UniqueID{1}, + Tss: []Timestamp{20000}, + RowCount: 1, + } + // empty dData2 + dData2 := &DeleteData{ + Pks: []UniqueID{}, + Tss: []Timestamp{}, + RowCount: 0, + } + + cpaths1, err := mockbIO.upload(context.TODO(), segID1, partID, []*InsertData{iData1}, dData1, meta) + require.NoError(t, err) + require.Equal(t, 11, len(cpaths1.inPaths)) + + cpaths2, err := mockbIO.upload(context.TODO(), segID2, partID, []*InsertData{iData2}, dData2, meta) + require.NoError(t, err) + require.Equal(t, 11, len(cpaths2.inPaths)) + + plan := &datapb.CompactionPlan{ + PlanID: 20080, + SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ + { + SegmentID: segID1, + FieldBinlogs: cpaths1.inPaths, + Field2StatslogPaths: cpaths1.statsPaths, + Deltalogs: cpaths1.deltaInfo, + }, + { + SegmentID: segID2, + FieldBinlogs: cpaths2.inPaths, + Field2StatslogPaths: cpaths2.statsPaths, + Deltalogs: cpaths2.deltaInfo, + }, + }, + StartTime: 0, + TimeoutInSeconds: 1, + Type: datapb.CompactionType_MergeCompaction, + Timetravel: 40000, + Channel: "channelname", + } + + alloc.random = false // generated ID = 19530 + task := newCompactionTask(context.TODO(), mockbIO, mockbIO, replica, mockfm, alloc, dc, plan) + err = task.compact() + assert.NoError(t, err) + + assert.False(t, replica.hasSegment(segID1, true)) + assert.False(t, replica.hasSegment(segID2, true)) + assert.True(t, replica.hasSegment(19530, true)) + updates, err := replica.getSegmentStatisticsUpdates(19530) + assert.NoError(t, err) + assert.Equal(t, int64(2), updates.GetNumRows()) + }) } type mockFlushManager struct {