mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-08 01:58:34 +08:00
fix: L2 segments remain as L2 even after sort compaction (#43237)
issue: #43186 Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
parent
002a325f0f
commit
c54a04c71c
@ -1660,8 +1660,10 @@ func (m *meta) completeMixCompactionMutation(
|
|||||||
log = log.With(zap.Int64s("compactFrom", compactFromSegIDs))
|
log = log.With(zap.Int64s("compactFrom", compactFromSegIDs))
|
||||||
|
|
||||||
resultInvisible := false
|
resultInvisible := false
|
||||||
|
targetSegmentLevel := datapb.SegmentLevel_L1
|
||||||
if t.GetType() == datapb.CompactionType_SortCompaction {
|
if t.GetType() == datapb.CompactionType_SortCompaction {
|
||||||
resultInvisible = compactFromSegInfos[0].GetIsInvisible()
|
resultInvisible = compactFromSegInfos[0].GetIsInvisible()
|
||||||
|
targetSegmentLevel = compactFromSegInfos[0].GetLevel()
|
||||||
if !compactFromSegInfos[0].GetCreatedByCompaction() {
|
if !compactFromSegInfos[0].GetCreatedByCompaction() {
|
||||||
resultInvisible = false
|
resultInvisible = false
|
||||||
}
|
}
|
||||||
@ -1687,7 +1689,7 @@ func (m *meta) completeMixCompactionMutation(
|
|||||||
CreatedByCompaction: true,
|
CreatedByCompaction: true,
|
||||||
CompactionFrom: compactFromSegIDs,
|
CompactionFrom: compactFromSegIDs,
|
||||||
LastExpireTime: tsoutil.ComposeTSByTime(time.Unix(t.GetStartTime(), 0), 0),
|
LastExpireTime: tsoutil.ComposeTSByTime(time.Unix(t.GetStartTime(), 0), 0),
|
||||||
Level: datapb.SegmentLevel_L1,
|
Level: targetSegmentLevel,
|
||||||
StorageVersion: compactToSegment.GetStorageVersion(),
|
StorageVersion: compactToSegment.GetStorageVersion(),
|
||||||
StartPosition: getMinPosition(lo.Map(compactFromSegInfos, func(info *SegmentInfo, _ int) *msgpb.MsgPosition {
|
StartPosition: getMinPosition(lo.Map(compactFromSegInfos, func(info *SegmentInfo, _ int) *msgpb.MsgPosition {
|
||||||
return info.GetStartPosition()
|
return info.GetStartPosition()
|
||||||
|
|||||||
@ -425,6 +425,87 @@ func (suite *MetaBasicSuite) TestCompleteCompactionMutation() {
|
|||||||
droppedCount := mutation.stateChange[datapb.SegmentLevel_L1.String()][commonpb.SegmentState_Dropped.String()][getSortStatus(false)]
|
droppedCount := mutation.stateChange[datapb.SegmentLevel_L1.String()][commonpb.SegmentState_Dropped.String()][getSortStatus(false)]
|
||||||
suite.EqualValues(2, droppedCount)
|
suite.EqualValues(2, droppedCount)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
suite.Run("test L2 sort", func() {
|
||||||
|
getLatestSegments := func() *SegmentsInfo {
|
||||||
|
latestSegments := NewSegmentsInfo()
|
||||||
|
for segID, segment := range map[UniqueID]*SegmentInfo{
|
||||||
|
1: {SegmentInfo: &datapb.SegmentInfo{
|
||||||
|
ID: 1,
|
||||||
|
CollectionID: 100,
|
||||||
|
PartitionID: 10,
|
||||||
|
State: commonpb.SegmentState_Flushed,
|
||||||
|
Level: datapb.SegmentLevel_L2,
|
||||||
|
Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 10000, 10001)},
|
||||||
|
Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 20000, 20001)},
|
||||||
|
// latest segment has 2 deltalogs, one submit for compaction, one is appended before compaction done
|
||||||
|
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 30000), getFieldBinlogIDs(0, 30001)},
|
||||||
|
NumOfRows: 2,
|
||||||
|
}},
|
||||||
|
} {
|
||||||
|
latestSegments.SetSegment(segID, segment)
|
||||||
|
}
|
||||||
|
|
||||||
|
return latestSegments
|
||||||
|
}
|
||||||
|
|
||||||
|
latestSegments := getLatestSegments()
|
||||||
|
compactToSeg := &datapb.CompactionSegment{
|
||||||
|
SegmentID: 2,
|
||||||
|
InsertLogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 50000)},
|
||||||
|
Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 50001)},
|
||||||
|
NumOfRows: 2,
|
||||||
|
}
|
||||||
|
|
||||||
|
result := &datapb.CompactionPlanResult{
|
||||||
|
Segments: []*datapb.CompactionSegment{compactToSeg},
|
||||||
|
}
|
||||||
|
task := &datapb.CompactionTask{
|
||||||
|
InputSegments: []UniqueID{1},
|
||||||
|
Type: datapb.CompactionType_SortCompaction,
|
||||||
|
}
|
||||||
|
m := &meta{
|
||||||
|
catalog: &datacoord.Catalog{MetaKv: NewMetaMemoryKV()},
|
||||||
|
segments: latestSegments,
|
||||||
|
chunkManager: mockChMgr,
|
||||||
|
}
|
||||||
|
|
||||||
|
infos, mutation, err := m.CompleteCompactionMutation(context.TODO(), task, result)
|
||||||
|
assert.NoError(suite.T(), err)
|
||||||
|
suite.Equal(1, len(infos))
|
||||||
|
info := infos[0]
|
||||||
|
suite.NoError(err)
|
||||||
|
suite.NotNil(info)
|
||||||
|
suite.NotNil(mutation)
|
||||||
|
|
||||||
|
// check newSegment
|
||||||
|
suite.EqualValues(2, info.GetID())
|
||||||
|
suite.Equal(datapb.SegmentLevel_L2, info.GetLevel())
|
||||||
|
suite.Equal(commonpb.SegmentState_Flushed, info.GetState())
|
||||||
|
|
||||||
|
binlogs := info.GetBinlogs()
|
||||||
|
for _, fbinlog := range binlogs {
|
||||||
|
for _, blog := range fbinlog.GetBinlogs() {
|
||||||
|
suite.Empty(blog.GetLogPath())
|
||||||
|
suite.EqualValues(50000, blog.GetLogID())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
statslogs := info.GetStatslogs()
|
||||||
|
for _, fbinlog := range statslogs {
|
||||||
|
for _, blog := range fbinlog.GetBinlogs() {
|
||||||
|
suite.Empty(blog.GetLogPath())
|
||||||
|
suite.EqualValues(50001, blog.GetLogID())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// check compactFrom segments
|
||||||
|
for _, segID := range []int64{1} {
|
||||||
|
seg := m.GetSegment(context.TODO(), segID)
|
||||||
|
suite.Equal(commonpb.SegmentState_Dropped, seg.GetState())
|
||||||
|
suite.NotEmpty(seg.GetDroppedAt())
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (suite *MetaBasicSuite) TestSetSegment() {
|
func (suite *MetaBasicSuite) TestSetSegment() {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user