mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-08 18:18:30 +08:00
enhance: Skip submit empty l0 tasks in DC (#31280)
Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
parent
c26c1b33c2
commit
0066c016b6
@ -332,6 +332,9 @@ func (c *compactionPlanHandler) RefreshPlan(task *compactionTask) error {
|
|||||||
info.GetLevel() != datapb.SegmentLevel_L0 &&
|
info.GetLevel() != datapb.SegmentLevel_L0 &&
|
||||||
info.GetDmlPosition().GetTimestamp() < task.triggerInfo.pos.GetTimestamp()
|
info.GetDmlPosition().GetTimestamp() < task.triggerInfo.pos.GetTimestamp()
|
||||||
})
|
})
|
||||||
|
if len(sealedSegments) == 0 {
|
||||||
|
return errors.Errorf("Selected zero L1/L2 segments for the position=%v", task.triggerInfo.pos)
|
||||||
|
}
|
||||||
|
|
||||||
sealedSegBinlogs := lo.Map(sealedSegments, func(info *SegmentInfo, _ int) *datapb.CompactionSegmentBinlogs {
|
sealedSegBinlogs := lo.Map(sealedSegments, func(info *SegmentInfo, _ int) *datapb.CompactionSegmentBinlogs {
|
||||||
return &datapb.CompactionSegmentBinlogs{
|
return &datapb.CompactionSegmentBinlogs{
|
||||||
|
|||||||
@ -298,6 +298,49 @@ func (s *CompactionPlanHandlerSuite) TestRefreshL0Plan() {
|
|||||||
s.Error(err)
|
s.Error(err)
|
||||||
s.ErrorIs(err, merr.ErrSegmentNotFound)
|
s.ErrorIs(err, merr.ErrSegmentNotFound)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
s.Run("select zero segments", func() {
|
||||||
|
s.SetupTest()
|
||||||
|
s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).RunAndReturn(func(segID int64) *SegmentInfo {
|
||||||
|
return &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{
|
||||||
|
ID: segID,
|
||||||
|
Level: datapb.SegmentLevel_L0,
|
||||||
|
InsertChannel: channel,
|
||||||
|
State: commonpb.SegmentState_Flushed,
|
||||||
|
Deltalogs: deltalogs,
|
||||||
|
}}
|
||||||
|
}).Times(2)
|
||||||
|
s.mockMeta.EXPECT().SelectSegments(mock.Anything).Return(nil).Once()
|
||||||
|
|
||||||
|
// 2 l0 segments
|
||||||
|
plan := &datapb.CompactionPlan{
|
||||||
|
PlanID: 1,
|
||||||
|
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
|
||||||
|
{
|
||||||
|
SegmentID: 100,
|
||||||
|
Level: datapb.SegmentLevel_L0,
|
||||||
|
InsertChannel: channel,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
SegmentID: 101,
|
||||||
|
Level: datapb.SegmentLevel_L0,
|
||||||
|
InsertChannel: channel,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Type: datapb.CompactionType_Level0DeleteCompaction,
|
||||||
|
}
|
||||||
|
|
||||||
|
task := &compactionTask{
|
||||||
|
triggerInfo: &compactionSignal{id: 19530, collectionID: 1, partitionID: 10},
|
||||||
|
state: executing,
|
||||||
|
plan: plan,
|
||||||
|
dataNodeID: 1,
|
||||||
|
}
|
||||||
|
|
||||||
|
handler := newCompactionPlanHandler(nil, nil, s.mockMeta, s.mockAlloc)
|
||||||
|
err := handler.RefreshPlan(task)
|
||||||
|
s.Error(err)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *CompactionPlanHandlerSuite) TestRefreshPlanMixCompaction() {
|
func (s *CompactionPlanHandlerSuite) TestRefreshPlanMixCompaction() {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user