mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-02-01 16:45:36 +08:00
fix: [2.4] Handle the error of the compaction queue being full (#37990)
issue: #37988 master pr: #37989 Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
parent
88b731d393
commit
045cf56b6c
@ -347,7 +347,17 @@ func (c *compactionPlanHandler) loadMeta() {
|
||||
continue
|
||||
}
|
||||
if t.NeedReAssignNodeID() {
|
||||
c.submitTask(t)
|
||||
if err = c.submitTask(t); err != nil {
|
||||
log.Info("compactionPlanHandler loadMeta submit task failed, try to clean it",
|
||||
zap.Int64("planID", task.GetPlanID()),
|
||||
zap.String("type", task.GetType().String()),
|
||||
zap.String("state", task.GetState().String()),
|
||||
zap.Error(err),
|
||||
)
|
||||
// ignore the drop error
|
||||
c.meta.DropCompactionTask(task)
|
||||
continue
|
||||
}
|
||||
log.Info("compactionPlanHandler loadMeta submitTask",
|
||||
zap.Int64("planID", t.GetPlanID()),
|
||||
zap.Int64("triggerID", t.GetTriggerID()),
|
||||
@ -537,11 +547,14 @@ func (c *compactionPlanHandler) removeTasksByChannel(channel string) {
|
||||
c.executingGuard.Unlock()
|
||||
}
|
||||
|
||||
func (c *compactionPlanHandler) submitTask(t CompactionTask) {
|
||||
func (c *compactionPlanHandler) submitTask(t CompactionTask) error {
|
||||
_, span := otel.Tracer(typeutil.DataCoordRole).Start(context.Background(), fmt.Sprintf("Compaction-%s", t.GetType()))
|
||||
t.SetSpan(span)
|
||||
c.queueTasks.Enqueue(t)
|
||||
if err := c.queueTasks.Enqueue(t); err != nil {
|
||||
return err
|
||||
}
|
||||
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetType().String(), metrics.Pending).Inc()
|
||||
return nil
|
||||
}
|
||||
|
||||
// restoreTask used to restore Task from etcd
|
||||
@ -592,7 +605,11 @@ func (c *compactionPlanHandler) enqueueCompaction(task *datapb.CompactionTask) e
|
||||
log.Warn("Failed to enqueue compaction task, unable to save task meta", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
c.submitTask(t)
|
||||
if err = c.submitTask(t); err != nil {
|
||||
log.Warn("submit compaction task failed", zap.Error(err))
|
||||
c.meta.SetSegmentsCompacting(t.GetInputSegments(), false)
|
||||
return err
|
||||
}
|
||||
log.Info("Compaction plan submitted")
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -30,6 +30,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/pkg/util/metautil"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
@ -709,6 +710,52 @@ func (s *CompactionPlanHandlerSuite) TestGetCompactionTask() {
|
||||
s.Equal(1, info.failedCnt)
|
||||
}
|
||||
|
||||
func (s *CompactionPlanHandlerSuite) TestCompactionQueueFull() {
|
||||
s.SetupTest()
|
||||
paramtable.Get().Save("dataCoord.compaction.taskQueueCapacity", "1")
|
||||
defer paramtable.Get().Reset("dataCoord.compaction.taskQueueCapacity")
|
||||
|
||||
s.handler = newCompactionPlanHandler(s.cluster, s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc, nil, nil)
|
||||
|
||||
t1 := &mixCompactionTask{
|
||||
CompactionTask: &datapb.CompactionTask{
|
||||
TriggerID: 1,
|
||||
PlanID: 1,
|
||||
Type: datapb.CompactionType_MixCompaction,
|
||||
Channel: "ch-01",
|
||||
State: datapb.CompactionTaskState_executing,
|
||||
},
|
||||
meta: s.mockMeta,
|
||||
sessions: s.mockSessMgr,
|
||||
}
|
||||
t1.plan = &datapb.CompactionPlan{
|
||||
PlanID: 1,
|
||||
Type: datapb.CompactionType_MixCompaction,
|
||||
Channel: "ch-01",
|
||||
}
|
||||
|
||||
s.NoError(s.handler.submitTask(t1))
|
||||
|
||||
t2 := &mixCompactionTask{
|
||||
CompactionTask: &datapb.CompactionTask{
|
||||
TriggerID: 1,
|
||||
PlanID: 2,
|
||||
Type: datapb.CompactionType_MixCompaction,
|
||||
Channel: "ch-01",
|
||||
State: datapb.CompactionTaskState_completed,
|
||||
},
|
||||
meta: s.mockMeta,
|
||||
sessions: s.mockSessMgr,
|
||||
}
|
||||
t2.plan = &datapb.CompactionPlan{
|
||||
PlanID: 2,
|
||||
Type: datapb.CompactionType_MixCompaction,
|
||||
Channel: "ch-01",
|
||||
}
|
||||
|
||||
s.Error(s.handler.submitTask(t2))
|
||||
}
|
||||
|
||||
func (s *CompactionPlanHandlerSuite) TestExecCompactionPlan() {
|
||||
s.SetupTest()
|
||||
s.mockMeta.EXPECT().CheckAndSetSegmentsCompacting(mock.Anything).Return(true, true).Maybe()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user