mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
Add more compaction test cases (#15715)
Test merge compaction with 2 segments with the same PK and only 1 valid deletion. This test can varify the compaction behavior of DataNode for the dup PK in different segment cases. Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
parent
c4bd942543
commit
6f85b49a4f
@ -640,6 +640,87 @@ func TestCompactorInterfaceMethods(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, int64(4), updates.GetNumRows())
|
||||
})
|
||||
|
||||
t.Run("Test typeII compact 2 segments with the same pk", func(t *testing.T) {
|
||||
// Test merge compactions, two segments with the same pk, one deletion pk=1
|
||||
// The merged segment 19530 should only contain 2 rows and both pk=2
|
||||
// Both pk = 1 rows of the two segments are compacted.
|
||||
var collID, partID, segID1, segID2 UniqueID = 1, 10, 200, 201
|
||||
|
||||
alloc := NewAllocatorFactory(1)
|
||||
rc := &RootCoordFactory{}
|
||||
dc := &DataCoordFactory{}
|
||||
mockfm := &mockFlushManager{}
|
||||
mockKv := memkv.NewMemoryKV()
|
||||
mockbIO := &binlogIO{mockKv, alloc}
|
||||
replica, err := newReplica(context.TODO(), rc, collID)
|
||||
require.NoError(t, err)
|
||||
|
||||
replica.addFlushedSegmentWithPKs(segID1, collID, partID, "channelname", 2, []UniqueID{1})
|
||||
replica.addFlushedSegmentWithPKs(segID2, collID, partID, "channelname", 2, []UniqueID{1})
|
||||
require.True(t, replica.hasSegment(segID1, true))
|
||||
require.True(t, replica.hasSegment(segID2, true))
|
||||
|
||||
meta := NewMetaFactory().GetCollectionMeta(collID, "test_compact_coll_name")
|
||||
// the same pk for segmentI and segmentII
|
||||
iData1 := genInsertDataWithPKs([2]int64{1, 2})
|
||||
iData2 := genInsertDataWithPKs([2]int64{1, 2})
|
||||
|
||||
dData1 := &DeleteData{
|
||||
Pks: []UniqueID{1},
|
||||
Tss: []Timestamp{20000},
|
||||
RowCount: 1,
|
||||
}
|
||||
// empty dData2
|
||||
dData2 := &DeleteData{
|
||||
Pks: []UniqueID{},
|
||||
Tss: []Timestamp{},
|
||||
RowCount: 0,
|
||||
}
|
||||
|
||||
cpaths1, err := mockbIO.upload(context.TODO(), segID1, partID, []*InsertData{iData1}, dData1, meta)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 11, len(cpaths1.inPaths))
|
||||
|
||||
cpaths2, err := mockbIO.upload(context.TODO(), segID2, partID, []*InsertData{iData2}, dData2, meta)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 11, len(cpaths2.inPaths))
|
||||
|
||||
plan := &datapb.CompactionPlan{
|
||||
PlanID: 20080,
|
||||
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
|
||||
{
|
||||
SegmentID: segID1,
|
||||
FieldBinlogs: cpaths1.inPaths,
|
||||
Field2StatslogPaths: cpaths1.statsPaths,
|
||||
Deltalogs: cpaths1.deltaInfo,
|
||||
},
|
||||
{
|
||||
SegmentID: segID2,
|
||||
FieldBinlogs: cpaths2.inPaths,
|
||||
Field2StatslogPaths: cpaths2.statsPaths,
|
||||
Deltalogs: cpaths2.deltaInfo,
|
||||
},
|
||||
},
|
||||
StartTime: 0,
|
||||
TimeoutInSeconds: 1,
|
||||
Type: datapb.CompactionType_MergeCompaction,
|
||||
Timetravel: 40000,
|
||||
Channel: "channelname",
|
||||
}
|
||||
|
||||
alloc.random = false // generated ID = 19530
|
||||
task := newCompactionTask(context.TODO(), mockbIO, mockbIO, replica, mockfm, alloc, dc, plan)
|
||||
err = task.compact()
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.False(t, replica.hasSegment(segID1, true))
|
||||
assert.False(t, replica.hasSegment(segID2, true))
|
||||
assert.True(t, replica.hasSegment(19530, true))
|
||||
updates, err := replica.getSegmentStatisticsUpdates(19530)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, int64(2), updates.GetNumRows())
|
||||
})
|
||||
}
|
||||
|
||||
type mockFlushManager struct {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user