mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-02-04 11:18:44 +08:00
fix: Handle the error of the compaction queue being full (#37989)
issue: #37988 Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
parent
fd94f12fb9
commit
5e152767a3
@ -350,7 +350,17 @@ func (c *compactionPlanHandler) loadMeta() {
|
||||
continue
|
||||
}
|
||||
if t.NeedReAssignNodeID() {
|
||||
c.submitTask(t)
|
||||
if err = c.submitTask(t); err != nil {
|
||||
log.Info("compactionPlanHandler loadMeta submit task failed, try to clean it",
|
||||
zap.Int64("planID", task.GetPlanID()),
|
||||
zap.String("type", task.GetType().String()),
|
||||
zap.String("state", task.GetState().String()),
|
||||
zap.Error(err),
|
||||
)
|
||||
// ignore the drop error
|
||||
c.meta.DropCompactionTask(task)
|
||||
continue
|
||||
}
|
||||
log.Info("compactionPlanHandler loadMeta submitTask",
|
||||
zap.Int64("planID", t.GetTaskProto().GetPlanID()),
|
||||
zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()),
|
||||
@ -541,11 +551,14 @@ func (c *compactionPlanHandler) removeTasksByChannel(channel string) {
|
||||
c.executingGuard.Unlock()
|
||||
}
|
||||
|
||||
func (c *compactionPlanHandler) submitTask(t CompactionTask) {
|
||||
func (c *compactionPlanHandler) submitTask(t CompactionTask) error {
|
||||
_, span := otel.Tracer(typeutil.DataCoordRole).Start(context.Background(), fmt.Sprintf("Compaction-%s", t.GetTaskProto().GetType()))
|
||||
t.SetSpan(span)
|
||||
c.queueTasks.Enqueue(t)
|
||||
if err := c.queueTasks.Enqueue(t); err != nil {
|
||||
return err
|
||||
}
|
||||
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetTaskProto().GetType().String(), metrics.Pending).Inc()
|
||||
return nil
|
||||
}
|
||||
|
||||
// restoreTask used to restore Task from etcd
|
||||
@ -596,7 +609,11 @@ func (c *compactionPlanHandler) enqueueCompaction(task *datapb.CompactionTask) e
|
||||
log.Warn("Failed to enqueue compaction task, unable to save task meta", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
c.submitTask(t)
|
||||
if err = c.submitTask(t); err != nil {
|
||||
log.Warn("submit compaction task failed", zap.Error(err))
|
||||
c.meta.SetSegmentsCompacting(t.GetTaskProto().GetInputSegments(), false)
|
||||
return err
|
||||
}
|
||||
log.Info("Compaction plan submitted")
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -627,6 +627,44 @@ func (s *CompactionPlanHandlerSuite) TestGetCompactionTask() {
|
||||
s.Equal(1, info.failedCnt)
|
||||
}
|
||||
|
||||
func (s *CompactionPlanHandlerSuite) TestCompactionQueueFull() {
|
||||
s.SetupTest()
|
||||
paramtable.Get().Save("dataCoord.compaction.taskQueueCapacity", "1")
|
||||
defer paramtable.Get().Reset("dataCoord.compaction.taskQueueCapacity")
|
||||
|
||||
s.handler = newCompactionPlanHandler(s.cluster, s.mockSessMgr, s.mockMeta, s.mockAlloc, nil, nil)
|
||||
|
||||
t1 := newMixCompactionTask(&datapb.CompactionTask{
|
||||
TriggerID: 1,
|
||||
PlanID: 1,
|
||||
Type: datapb.CompactionType_MixCompaction,
|
||||
Channel: "ch-01",
|
||||
State: datapb.CompactionTaskState_executing,
|
||||
}, nil, s.mockMeta, s.mockSessMgr)
|
||||
t1.plan = &datapb.CompactionPlan{
|
||||
PlanID: 1,
|
||||
Type: datapb.CompactionType_MixCompaction,
|
||||
Channel: "ch-01",
|
||||
}
|
||||
|
||||
s.NoError(s.handler.submitTask(t1))
|
||||
|
||||
t2 := newMixCompactionTask(&datapb.CompactionTask{
|
||||
TriggerID: 1,
|
||||
PlanID: 2,
|
||||
Type: datapb.CompactionType_MixCompaction,
|
||||
Channel: "ch-01",
|
||||
State: datapb.CompactionTaskState_completed,
|
||||
}, nil, s.mockMeta, s.mockSessMgr)
|
||||
t2.plan = &datapb.CompactionPlan{
|
||||
PlanID: 2,
|
||||
Type: datapb.CompactionType_MixCompaction,
|
||||
Channel: "ch-01",
|
||||
}
|
||||
|
||||
s.Error(s.handler.submitTask(t2))
|
||||
}
|
||||
|
||||
func (s *CompactionPlanHandlerSuite) TestExecCompactionPlan() {
|
||||
s.SetupTest()
|
||||
s.mockMeta.EXPECT().CheckAndSetSegmentsCompacting(mock.Anything, mock.Anything).Return(true, true).Maybe()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user