mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
fix: [cherry-pick] add log in mixCompactionTask and set fail/timeout task to clean (#35967)
#35966 master pr :#35970 Signed-off-by: wayblink <anyang.wang@zilliz.com>
This commit is contained in:
parent
cc414d53b7
commit
922f54967d
@ -164,6 +164,11 @@ func (t *l0CompactionTask) processCompleted() bool {
|
||||
|
||||
func (t *l0CompactionTask) processTimeout() bool {
|
||||
t.resetSegmentCompacting()
|
||||
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned))
|
||||
if err != nil {
|
||||
log.Warn("l0CompactionTask failed to updateAndSaveTaskMeta", zap.Error(err))
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
@ -178,6 +183,12 @@ func (t *l0CompactionTask) processFailed() bool {
|
||||
}
|
||||
|
||||
t.resetSegmentCompacting()
|
||||
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned))
|
||||
if err != nil {
|
||||
log.Warn("l0CompactionTask failed to updateAndSaveTaskMeta", zap.Error(err))
|
||||
return false
|
||||
}
|
||||
|
||||
log.Info("l0CompactionTask processFailed done", zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", t.GetPlanID()))
|
||||
return true
|
||||
}
|
||||
|
||||
@ -202,14 +202,14 @@ func (s *L0CompactionTaskSuite) TestStateTrans() {
|
||||
Deltalogs: deltaLogs,
|
||||
}}
|
||||
}).Twice()
|
||||
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Once()
|
||||
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Times(2)
|
||||
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).Return()
|
||||
|
||||
s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil).Once()
|
||||
|
||||
got := t.Process()
|
||||
s.True(got)
|
||||
s.Equal(datapb.CompactionTaskState_failed, t.State)
|
||||
s.Equal(datapb.CompactionTaskState_cleaned, t.State)
|
||||
})
|
||||
s.Run("test pipelining saveTaskMeta failed", func() {
|
||||
t := s.generateTestL0Task(datapb.CompactionTaskState_pipelining)
|
||||
@ -374,7 +374,7 @@ func (s *L0CompactionTaskSuite) TestStateTrans() {
|
||||
|
||||
got = t.Process()
|
||||
s.True(got)
|
||||
s.Equal(datapb.CompactionTaskState_timeout, t.GetState())
|
||||
s.Equal(datapb.CompactionTaskState_cleaned, t.GetState())
|
||||
})
|
||||
|
||||
s.Run("test executing with result executing timeout and updataAndSaveTaskMeta failed", func() {
|
||||
@ -466,12 +466,12 @@ func (s *L0CompactionTaskSuite) TestStateTrans() {
|
||||
}, nil).Once()
|
||||
s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetNodeID(), mock.Anything).Return(nil)
|
||||
|
||||
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Times(1)
|
||||
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Times(2)
|
||||
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).Return().Once()
|
||||
|
||||
got := t.Process()
|
||||
s.True(got)
|
||||
s.Equal(datapb.CompactionTaskState_failed, t.GetState())
|
||||
s.Equal(datapb.CompactionTaskState_cleaned, t.GetState())
|
||||
})
|
||||
s.Run("test executing with result failed save compaction meta failed", func() {
|
||||
t := s.generateTestL0Task(datapb.CompactionTaskState_executing)
|
||||
@ -494,7 +494,7 @@ func (s *L0CompactionTaskSuite) TestStateTrans() {
|
||||
t := s.generateTestL0Task(datapb.CompactionTaskState_timeout)
|
||||
t.NodeID = 100
|
||||
s.Require().True(t.GetNodeID() > 0)
|
||||
|
||||
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Times(1)
|
||||
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).RunAndReturn(func(segIDs []int64, isCompacting bool) {
|
||||
s.Require().False(isCompacting)
|
||||
s.ElementsMatch(segIDs, t.GetInputSegments())
|
||||
@ -568,6 +568,7 @@ func (s *L0CompactionTaskSuite) TestStateTrans() {
|
||||
t := s.generateTestL0Task(datapb.CompactionTaskState_failed)
|
||||
t.NodeID = 100
|
||||
s.Require().True(t.GetNodeID() > 0)
|
||||
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Times(1)
|
||||
s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetNodeID(), mock.Anything).Return(nil).Once()
|
||||
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).RunAndReturn(func(segIDs []int64, isCompacting bool) {
|
||||
s.ElementsMatch(segIDs, t.GetInputSegments())
|
||||
@ -575,12 +576,13 @@ func (s *L0CompactionTaskSuite) TestStateTrans() {
|
||||
|
||||
got := t.Process()
|
||||
s.True(got)
|
||||
s.Equal(datapb.CompactionTaskState_failed, t.GetState())
|
||||
s.Equal(datapb.CompactionTaskState_cleaned, t.GetState())
|
||||
})
|
||||
s.Run("test process failed failed", func() {
|
||||
t := s.generateTestL0Task(datapb.CompactionTaskState_failed)
|
||||
t.NodeID = 100
|
||||
s.Require().True(t.GetNodeID() > 0)
|
||||
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Once()
|
||||
s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetNodeID(), mock.Anything).Return(errors.New("mock error")).Once()
|
||||
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).RunAndReturn(func(segIDs []int64, isCompacting bool) {
|
||||
s.ElementsMatch(segIDs, t.GetInputSegments())
|
||||
@ -588,7 +590,7 @@ func (s *L0CompactionTaskSuite) TestStateTrans() {
|
||||
|
||||
got := t.Process()
|
||||
s.True(got)
|
||||
s.Equal(datapb.CompactionTaskState_failed, t.GetState())
|
||||
s.Equal(datapb.CompactionTaskState_cleaned, t.GetState())
|
||||
})
|
||||
|
||||
s.Run("test unkonwn task", func() {
|
||||
|
||||
@ -28,11 +28,12 @@ type mixCompactionTask struct {
|
||||
}
|
||||
|
||||
func (t *mixCompactionTask) processPipelining() bool {
|
||||
log := log.With(zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("PlanID", t.GetPlanID()), zap.Int64("collectionID", t.GetCollectionID()), zap.Int64("nodeID", t.GetNodeID()))
|
||||
if t.NeedReAssignNodeID() {
|
||||
log.Info("mixCompactionTask need assign nodeID")
|
||||
return false
|
||||
}
|
||||
|
||||
log := log.With(zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("nodeID", t.GetNodeID()))
|
||||
var err error
|
||||
t.plan, err = t.BuildCompactionRequest()
|
||||
if err != nil {
|
||||
@ -47,16 +48,22 @@ func (t *mixCompactionTask) processPipelining() bool {
|
||||
|
||||
err = t.sessions.Compaction(context.TODO(), t.GetNodeID(), t.GetPlan())
|
||||
if err != nil {
|
||||
log.Warn("mixCompactionTask failed to notify compaction tasks to DataNode", zap.Int64("planID", t.GetPlanID()), zap.Error(err))
|
||||
log.Warn("mixCompactionTask failed to notify compaction tasks to DataNode", zap.Error(err))
|
||||
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
|
||||
return false
|
||||
}
|
||||
log.Warn("mixCompactionTask notify compaction tasks to DataNode")
|
||||
|
||||
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing))
|
||||
err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing))
|
||||
if err != nil {
|
||||
log.Warn("mixCompactionTask update task state failed", zap.Error(err))
|
||||
return false
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (t *mixCompactionTask) processMetaSaved() bool {
|
||||
log := log.With(zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("PlanID", t.GetPlanID()), zap.Int64("collectionID", t.GetCollectionID()))
|
||||
if err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed)); err != nil {
|
||||
log.Warn("mixCompactionTask failed to proccessMetaSaved", zap.Error(err))
|
||||
return false
|
||||
@ -66,11 +73,13 @@ func (t *mixCompactionTask) processMetaSaved() bool {
|
||||
}
|
||||
|
||||
func (t *mixCompactionTask) processExecuting() bool {
|
||||
log := log.With(zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String()))
|
||||
log := log.With(zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("PlanID", t.GetPlanID()), zap.Int64("collectionID", t.GetCollectionID()))
|
||||
result, err := t.sessions.GetCompactionPlanResult(t.GetNodeID(), t.GetPlanID())
|
||||
if err != nil || result == nil {
|
||||
if errors.Is(err, merr.ErrNodeNotFound) {
|
||||
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
|
||||
if err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)); err != nil {
|
||||
log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err))
|
||||
}
|
||||
}
|
||||
log.Warn("mixCompactionTask failed to get compaction result", zap.Error(err))
|
||||
return false
|
||||
@ -78,6 +87,7 @@ func (t *mixCompactionTask) processExecuting() bool {
|
||||
switch result.GetState() {
|
||||
case datapb.CompactionTaskState_executing:
|
||||
if t.checkTimeout() {
|
||||
log.Info("mixCompactionTask timeout", zap.Int32("timeout in seconds", t.GetTimeoutInSeconds()), zap.Int64("startTime", t.GetStartTime()))
|
||||
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_timeout))
|
||||
if err != nil {
|
||||
log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err))
|
||||
@ -115,6 +125,7 @@ func (t *mixCompactionTask) processExecuting() bool {
|
||||
}
|
||||
return t.processMetaSaved()
|
||||
case datapb.CompactionTaskState_failed:
|
||||
log.Info("mixCompactionTask fail in datanode")
|
||||
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed))
|
||||
if err != nil {
|
||||
log.Warn("fail to updateAndSaveTaskMeta")
|
||||
@ -133,7 +144,7 @@ func (t *mixCompactionTask) SaveTaskMeta() error {
|
||||
}
|
||||
|
||||
func (t *mixCompactionTask) saveSegmentMeta() error {
|
||||
log := log.With(zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String()))
|
||||
log := log.With(zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("PlanID", t.GetPlanID()), zap.Int64("collectionID", t.GetCollectionID()))
|
||||
// Also prepare metric updates.
|
||||
newSegments, metricMutation, err := t.meta.CompleteCompactionMutation(t.CompactionTask, t.result)
|
||||
if err != nil {
|
||||
@ -149,21 +160,28 @@ func (t *mixCompactionTask) saveSegmentMeta() error {
|
||||
// Note: return True means exit this state machine.
|
||||
// ONLY return True for processCompleted or processFailed
|
||||
func (t *mixCompactionTask) Process() bool {
|
||||
log := log.With(zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("PlanID", t.GetPlanID()), zap.Int64("collectionID", t.GetCollectionID()))
|
||||
lastState := t.GetState().String()
|
||||
processResult := true
|
||||
switch t.GetState() {
|
||||
case datapb.CompactionTaskState_pipelining:
|
||||
return t.processPipelining()
|
||||
processResult = t.processPipelining()
|
||||
case datapb.CompactionTaskState_executing:
|
||||
return t.processExecuting()
|
||||
processResult = t.processExecuting()
|
||||
case datapb.CompactionTaskState_timeout:
|
||||
return t.processTimeout()
|
||||
processResult = t.processTimeout()
|
||||
case datapb.CompactionTaskState_meta_saved:
|
||||
return t.processMetaSaved()
|
||||
processResult = t.processMetaSaved()
|
||||
case datapb.CompactionTaskState_completed:
|
||||
return t.processCompleted()
|
||||
processResult = t.processCompleted()
|
||||
case datapb.CompactionTaskState_failed:
|
||||
return t.processFailed()
|
||||
processResult = t.processFailed()
|
||||
}
|
||||
return true
|
||||
currentState := t.GetState().String()
|
||||
if currentState != lastState {
|
||||
log.Info("mix compaction task state changed", zap.String("lastState", lastState), zap.String("currentState", currentState))
|
||||
}
|
||||
return processResult
|
||||
}
|
||||
|
||||
func (t *mixCompactionTask) GetResult() *datapb.CompactionPlanResult {
|
||||
@ -189,15 +207,16 @@ func (t *mixCompactionTask) NeedReAssignNodeID() bool {
|
||||
}
|
||||
|
||||
func (t *mixCompactionTask) processCompleted() bool {
|
||||
log := log.With(zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("PlanID", t.GetPlanID()), zap.Int64("collectionID", t.GetCollectionID()))
|
||||
if err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{
|
||||
PlanID: t.GetPlanID(),
|
||||
}); err != nil {
|
||||
log.Warn("mixCompactionTask processCompleted unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()))
|
||||
log.Warn("mixCompactionTask processCompleted unable to drop compaction plan")
|
||||
}
|
||||
|
||||
t.resetSegmentCompacting()
|
||||
UpdateCompactionSegmentSizeMetrics(t.result.GetSegments())
|
||||
log.Info("mixCompactionTask processCompleted done", zap.Int64("planID", t.GetPlanID()))
|
||||
log.Info("mixCompactionTask processCompleted done")
|
||||
|
||||
return true
|
||||
}
|
||||
@ -208,6 +227,11 @@ func (t *mixCompactionTask) resetSegmentCompacting() {
|
||||
|
||||
func (t *mixCompactionTask) processTimeout() bool {
|
||||
t.resetSegmentCompacting()
|
||||
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned))
|
||||
if err != nil {
|
||||
log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err))
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
@ -241,14 +265,20 @@ func (t *mixCompactionTask) ShadowClone(opts ...compactionTaskOpt) *datapb.Compa
|
||||
}
|
||||
|
||||
func (t *mixCompactionTask) processFailed() bool {
|
||||
log := log.With(zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("PlanID", t.GetPlanID()), zap.Int64("collectionID", t.GetCollectionID()))
|
||||
if err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{
|
||||
PlanID: t.GetPlanID(),
|
||||
}); err != nil {
|
||||
log.Warn("mixCompactionTask processFailed unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()), zap.Error(err))
|
||||
log.Warn("mixCompactionTask processFailed unable to drop compaction plan", zap.Error(err))
|
||||
}
|
||||
|
||||
log.Info("mixCompactionTask processFailed done", zap.Int64("planID", t.GetPlanID()))
|
||||
log.Info("mixCompactionTask processFailed done")
|
||||
t.resetSegmentCompacting()
|
||||
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned))
|
||||
if err != nil {
|
||||
log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err))
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
@ -256,10 +286,6 @@ func (t *mixCompactionTask) checkTimeout() bool {
|
||||
if t.GetTimeoutInSeconds() > 0 {
|
||||
diff := time.Since(time.Unix(t.GetStartTime(), 0)).Seconds()
|
||||
if diff > float64(t.GetTimeoutInSeconds()) {
|
||||
log.Warn("compaction timeout",
|
||||
zap.Int32("timeout in seconds", t.GetTimeoutInSeconds()),
|
||||
zap.Int64("startTime", t.GetStartTime()),
|
||||
)
|
||||
return true
|
||||
}
|
||||
}
|
||||
@ -329,6 +355,7 @@ func (t *mixCompactionTask) CleanLogPath() {
|
||||
}
|
||||
|
||||
func (t *mixCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) {
|
||||
log := log.With(zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("PlanID", t.GetPlanID()), zap.Int64("collectionID", t.GetCollectionID()))
|
||||
plan := &datapb.CompactionPlan{
|
||||
PlanID: t.GetPlanID(),
|
||||
StartTime: t.GetStartTime(),
|
||||
@ -341,7 +368,6 @@ func (t *mixCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, er
|
||||
SlotUsage: Params.DataCoordCfg.MixCompactionSlotUsage.GetAsInt64(),
|
||||
MaxSize: t.GetMaxSize(),
|
||||
}
|
||||
log := log.With(zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", plan.GetPlanID()))
|
||||
|
||||
segIDMap := make(map[int64][]*datapb.FieldBinlog, len(plan.SegmentBinlogs))
|
||||
for _, segID := range t.GetInputSegments() {
|
||||
|
||||
@ -1,6 +1,8 @@
|
||||
package datacoord
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/samber/lo"
|
||||
"github.com/stretchr/testify/mock"
|
||||
|
||||
@ -70,3 +72,45 @@ func (s *CompactionTaskSuite) TestProcessRefreshPlan_MixSegmentNotFound() {
|
||||
s.ErrorIs(err, merr.ErrSegmentNotFound)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *CompactionTaskSuite) TestCompactionTimeout() {
|
||||
channel := "Ch-1"
|
||||
binLogs := []*datapb.FieldBinlog{getFieldBinlogIDs(101, 3)}
|
||||
s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).RunAndReturn(func(segID int64) *SegmentInfo {
|
||||
return &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: segID,
|
||||
Level: datapb.SegmentLevel_L1,
|
||||
InsertChannel: channel,
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Binlogs: binLogs,
|
||||
}}
|
||||
}).Times(2)
|
||||
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
|
||||
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything)
|
||||
task := &mixCompactionTask{
|
||||
CompactionTask: &datapb.CompactionTask{
|
||||
PlanID: 1,
|
||||
TriggerID: 19530,
|
||||
CollectionID: 1,
|
||||
PartitionID: 10,
|
||||
Type: datapb.CompactionType_MixCompaction,
|
||||
NodeID: 1,
|
||||
State: datapb.CompactionTaskState_executing,
|
||||
InputSegments: []int64{200, 201},
|
||||
TimeoutInSeconds: 1,
|
||||
},
|
||||
meta: s.mockMeta,
|
||||
sessions: s.mockSessMgr,
|
||||
}
|
||||
plan, err := task.BuildCompactionRequest()
|
||||
task.plan = plan
|
||||
s.Require().NoError(err)
|
||||
time.Sleep(time.Second * 2)
|
||||
|
||||
s.mockSessMgr.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(&datapb.CompactionPlanResult{
|
||||
State: datapb.CompactionTaskState_executing,
|
||||
}, nil)
|
||||
end := task.processExecuting()
|
||||
s.Equal(true, end)
|
||||
s.Equal(datapb.CompactionTaskState_cleaned, task.State)
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user