enhance: [cp24]Add ut for l0CompactionTask processExecuting (#34801)

See also: #34796
pr: #34800

---------

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
XuanYang-cn 2024-07-19 16:31:41 +08:00 committed by GitHub
parent 34873a7f76
commit ff4bd2c9d2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 225 additions and 28 deletions

View File

@ -106,10 +106,11 @@ func (t *l0CompactionTask) processExecuting() bool {
}
switch result.GetState() {
case datapb.CompactionTaskState_executing:
// will L0Compaction be timeouted?
if t.checkTimeout() {
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_timeout))
if err != nil {
log.Warn("l0CompactionTask failed to updateAndSaveTaskMeta", zap.Error(err))
log.Warn("l0CompactionTask failed to set task timeout state", zap.Error(err))
return false
}
return t.processTimeout()
@ -122,12 +123,13 @@ func (t *l0CompactionTask) processExecuting() bool {
}
if err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_meta_saved)); err != nil {
log.Warn("l0CompactionTask failed to save task meta_saved state", zap.Error(err))
return false
}
return t.processMetaSaved()
case datapb.CompactionTaskState_failed:
if err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed)); err != nil {
log.Warn("l0CompactionTask failed to updateAndSaveTaskMeta", zap.Error(err))
log.Warn("l0CompactionTask failed to set task failed state", zap.Error(err))
return false
}
return t.processFailed()
@ -311,10 +313,13 @@ func (t *l0CompactionTask) processMetaSaved() bool {
}
func (t *l0CompactionTask) processCompleted() bool {
if err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{
PlanID: t.GetPlanID(),
}); err != nil {
log.Warn("l0CompactionTask unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()), zap.Error(err))
if t.GetNodeID() != 0 && t.GetNodeID() != NullNodeID {
err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{
PlanID: t.GetPlanID(),
})
if err != nil {
log.Warn("l0CompactionTask unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()), zap.Error(err))
}
}
t.resetSegmentCompacting()
@ -349,11 +354,15 @@ func (t *l0CompactionTask) processFailed() bool {
func (t *l0CompactionTask) checkTimeout() bool {
if t.GetTimeoutInSeconds() > 0 {
diff := time.Since(time.Unix(t.GetStartTime(), 0)).Seconds()
start := time.Unix(t.GetStartTime(), 0)
diff := time.Since(start).Seconds()
if diff > float64(t.GetTimeoutInSeconds()) {
log.Warn("compaction timeout",
zap.Int64("taskID", t.GetTriggerID()),
zap.Int64("planID", t.GetPlanID()),
zap.Int64("nodeID", t.GetNodeID()),
zap.Int32("timeout in seconds", t.GetTimeoutInSeconds()),
zap.Int64("startTime", t.GetStartTime()),
zap.Time("startTime", start),
)
return true
}

View File

@ -2,17 +2,40 @@ package datacoord
import (
"context"
"testing"
"time"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/merr"
)
func (s *CompactionTaskSuite) TestProcessRefreshPlan_NormalL0() {
func TestL0CompactionTaskSuite(t *testing.T) {
suite.Run(t, new(L0CompactionTaskSuite))
}
type L0CompactionTaskSuite struct {
suite.Suite
mockMeta *MockCompactionMeta
mockSessMgr *MockSessionManager
}
func (s *L0CompactionTaskSuite) SetupTest() {
s.mockMeta = NewMockCompactionMeta(s.T())
s.mockSessMgr = NewMockSessionManager(s.T())
}
func (s *L0CompactionTaskSuite) SetupSubTest() {
s.SetupTest()
}
func (s *L0CompactionTaskSuite) TestProcessRefreshPlan_NormalL0() {
channel := "Ch-1"
deltaLogs := []*datapb.FieldBinlog{getFieldBinlogIDs(101, 3)}
@ -69,7 +92,7 @@ func (s *CompactionTaskSuite) TestProcessRefreshPlan_NormalL0() {
s.ElementsMatch([]int64{200, 201, 202, 100, 101}, segIDs)
}
func (s *CompactionTaskSuite) TestProcessRefreshPlan_SegmentNotFoundL0() {
func (s *L0CompactionTaskSuite) TestProcessRefreshPlan_SegmentNotFoundL0() {
channel := "Ch-1"
s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).RunAndReturn(func(segID int64) *SegmentInfo {
return nil
@ -94,7 +117,7 @@ func (s *CompactionTaskSuite) TestProcessRefreshPlan_SegmentNotFoundL0() {
s.ErrorIs(err, merr.ErrSegmentNotFound)
}
func (s *CompactionTaskSuite) TestProcessRefreshPlan_SelectZeroSegmentsL0() {
func (s *L0CompactionTaskSuite) TestProcessRefreshPlan_SelectZeroSegmentsL0() {
channel := "Ch-1"
deltaLogs := []*datapb.FieldBinlog{getFieldBinlogIDs(101, 3)}
s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).RunAndReturn(func(segID int64) *SegmentInfo {
@ -125,7 +148,7 @@ func (s *CompactionTaskSuite) TestProcessRefreshPlan_SelectZeroSegmentsL0() {
s.Error(err)
}
func generateTestL0Task(state datapb.CompactionTaskState) *l0CompactionTask {
func (s *L0CompactionTaskSuite) generateTestL0Task(state datapb.CompactionTaskState) *l0CompactionTask {
return &l0CompactionTask{
CompactionTask: &datapb.CompactionTask{
PlanID: 1,
@ -137,16 +160,14 @@ func generateTestL0Task(state datapb.CompactionTaskState) *l0CompactionTask {
State: state,
InputSegments: []int64{100, 101},
},
meta: s.mockMeta,
sessions: s.mockSessMgr,
}
}
func (s *CompactionTaskSuite) SetupSubTest() {
s.SetupTest()
}
func (s *CompactionTaskSuite) TestProcessStateTrans() {
func (s *L0CompactionTaskSuite) TestStateTrans() {
s.Run("test pipelining needReassignNodeID", func() {
t := generateTestL0Task(datapb.CompactionTaskState_pipelining)
t := s.generateTestL0Task(datapb.CompactionTaskState_pipelining)
t.NodeID = NullNodeID
got := t.Process()
s.False(got)
@ -155,12 +176,11 @@ func (s *CompactionTaskSuite) TestProcessStateTrans() {
})
s.Run("test pipelining BuildCompactionRequest failed", func() {
t := generateTestL0Task(datapb.CompactionTaskState_pipelining)
t := s.generateTestL0Task(datapb.CompactionTaskState_pipelining)
t.NodeID = 100
channel := "ch-1"
deltaLogs := []*datapb.FieldBinlog{getFieldBinlogIDs(101, 3)}
t.meta = s.mockMeta
s.mockMeta.EXPECT().SelectSegments(mock.Anything, mock.Anything).Return(
[]*SegmentInfo{
{SegmentInfo: &datapb.SegmentInfo{
@ -183,7 +203,6 @@ func (s *CompactionTaskSuite) TestProcessStateTrans() {
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Once()
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).Return()
t.sessions = s.mockSessMgr
s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil).Once()
got := t.Process()
@ -192,12 +211,11 @@ func (s *CompactionTaskSuite) TestProcessStateTrans() {
})
s.Run("test pipelining Compaction failed", func() {
t := generateTestL0Task(datapb.CompactionTaskState_pipelining)
t := s.generateTestL0Task(datapb.CompactionTaskState_pipelining)
t.NodeID = 100
channel := "ch-1"
deltaLogs := []*datapb.FieldBinlog{getFieldBinlogIDs(101, 3)}
t.meta = s.mockMeta
s.mockMeta.EXPECT().SelectSegments(mock.Anything, mock.Anything).Return(
[]*SegmentInfo{
{SegmentInfo: &datapb.SegmentInfo{
@ -219,7 +237,6 @@ func (s *CompactionTaskSuite) TestProcessStateTrans() {
}).Twice()
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
t.sessions = s.mockSessMgr
s.mockSessMgr.EXPECT().Compaction(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, nodeID int64, plan *datapb.CompactionPlan) error {
s.Require().EqualValues(t.NodeID, nodeID)
return errors.New("mock error")
@ -232,12 +249,11 @@ func (s *CompactionTaskSuite) TestProcessStateTrans() {
})
s.Run("test pipelining success", func() {
t := generateTestL0Task(datapb.CompactionTaskState_pipelining)
t := s.generateTestL0Task(datapb.CompactionTaskState_pipelining)
t.NodeID = 100
channel := "ch-1"
deltaLogs := []*datapb.FieldBinlog{getFieldBinlogIDs(101, 3)}
t.meta = s.mockMeta
s.mockMeta.EXPECT().SelectSegments(mock.Anything, mock.Anything).Return(
[]*SegmentInfo{
{SegmentInfo: &datapb.SegmentInfo{
@ -259,7 +275,6 @@ func (s *CompactionTaskSuite) TestProcessStateTrans() {
}).Twice()
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Once()
t.sessions = s.mockSessMgr
s.mockSessMgr.EXPECT().Compaction(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, nodeID int64, plan *datapb.CompactionPlan) error {
s.Require().EqualValues(t.NodeID, nodeID)
return nil
@ -267,6 +282,179 @@ func (s *CompactionTaskSuite) TestProcessStateTrans() {
got := t.Process()
s.False(got)
s.Equal(datapb.CompactionTaskState_executing, t.State)
s.Equal(datapb.CompactionTaskState_executing, t.GetState())
})
// stay in executing state when GetCompactionPlanResults error except ErrNodeNotFound
s.Run("test executing GetCompactionPlanResult fail NodeNotFound", func() {
t := s.generateTestL0Task(datapb.CompactionTaskState_executing)
t.NodeID = 100
s.Require().True(t.GetNodeID() > 0)
s.mockSessMgr.EXPECT().GetCompactionPlanResult(t.NodeID, mock.Anything).Return(nil, merr.WrapErrNodeNotFound(t.NodeID)).Once()
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Once()
got := t.Process()
s.False(got)
s.Equal(datapb.CompactionTaskState_pipelining, t.GetState())
s.EqualValues(NullNodeID, t.GetNodeID())
})
// stay in executing state when GetCompactionPlanResults error except ErrNodeNotFound
s.Run("test executing GetCompactionPlanResult fail mock error", func() {
t := s.generateTestL0Task(datapb.CompactionTaskState_executing)
t.NodeID = 100
s.Require().True(t.GetNodeID() > 0)
s.mockSessMgr.EXPECT().GetCompactionPlanResult(t.NodeID, mock.Anything).Return(nil, errors.New("mock error")).Times(12)
for i := 0; i < 12; i++ {
got := t.Process()
s.False(got)
s.Equal(datapb.CompactionTaskState_executing, t.GetState())
s.EqualValues(100, t.GetNodeID())
}
})
s.Run("test executing with result executing", func() {
t := s.generateTestL0Task(datapb.CompactionTaskState_executing)
t.NodeID = 100
s.Require().True(t.GetNodeID() > 0)
s.mockSessMgr.EXPECT().GetCompactionPlanResult(t.NodeID, mock.Anything).
Return(&datapb.CompactionPlanResult{
PlanID: t.GetPlanID(),
State: datapb.CompactionTaskState_executing,
}, nil).Twice()
got := t.Process()
s.False(got)
// test timeout
t.StartTime = time.Now().Add(-time.Hour).Unix()
t.TimeoutInSeconds = 10
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).
RunAndReturn(func(inputs []int64, compacting bool) {
s.ElementsMatch(inputs, t.GetInputSegments())
s.False(compacting)
}).Once()
got = t.Process()
s.True(got)
s.Equal(datapb.CompactionTaskState_timeout, t.GetState())
})
s.Run("test executing with result executing timeout and updataAndSaveTaskMeta failed", func() {
t := s.generateTestL0Task(datapb.CompactionTaskState_executing)
t.NodeID = 100
s.Require().True(t.GetNodeID() > 0)
s.mockSessMgr.EXPECT().GetCompactionPlanResult(t.NodeID, mock.Anything).
Return(&datapb.CompactionPlanResult{
PlanID: t.GetPlanID(),
State: datapb.CompactionTaskState_executing,
}, nil).Once()
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(errors.New("mock error")).Once()
t.StartTime = time.Now().Add(-time.Hour).Unix()
t.TimeoutInSeconds = 10
got := t.Process()
s.False(got)
s.Equal(datapb.CompactionTaskState_executing, t.GetState())
})
s.Run("test executing with result completed", func() {
t := s.generateTestL0Task(datapb.CompactionTaskState_executing)
t.NodeID = 100
s.Require().True(t.GetNodeID() > 0)
s.mockSessMgr.EXPECT().GetCompactionPlanResult(t.NodeID, mock.Anything).
Return(&datapb.CompactionPlanResult{
PlanID: t.GetPlanID(),
State: datapb.CompactionTaskState_completed,
}, nil).Once()
s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetNodeID(), mock.Anything).Return(nil)
s.mockMeta.EXPECT().UpdateSegmentsInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Times(2)
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).Return().Once()
got := t.Process()
s.True(got)
s.Equal(datapb.CompactionTaskState_completed, t.GetState())
})
s.Run("test executing with result completed save segment meta failed", func() {
t := s.generateTestL0Task(datapb.CompactionTaskState_executing)
t.NodeID = 100
s.Require().True(t.GetNodeID() > 0)
s.mockSessMgr.EXPECT().GetCompactionPlanResult(t.NodeID, mock.Anything).
Return(&datapb.CompactionPlanResult{
PlanID: t.GetPlanID(),
State: datapb.CompactionTaskState_completed,
}, nil).Once()
s.mockMeta.EXPECT().UpdateSegmentsInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(errors.New("mock error")).Once()
got := t.Process()
s.False(got)
s.Equal(datapb.CompactionTaskState_executing, t.GetState())
})
s.Run("test executing with result completed save compaction meta failed", func() {
t := s.generateTestL0Task(datapb.CompactionTaskState_executing)
t.NodeID = 100
s.Require().True(t.GetNodeID() > 0)
s.mockSessMgr.EXPECT().GetCompactionPlanResult(t.NodeID, mock.Anything).
Return(&datapb.CompactionPlanResult{
PlanID: t.GetPlanID(),
State: datapb.CompactionTaskState_completed,
}, nil).Once()
s.mockMeta.EXPECT().UpdateSegmentsInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(errors.New("mock error")).Once()
got := t.Process()
s.False(got)
s.Equal(datapb.CompactionTaskState_executing, t.GetState())
})
s.Run("test executing with result failed", func() {
t := s.generateTestL0Task(datapb.CompactionTaskState_executing)
t.NodeID = 100
s.Require().True(t.GetNodeID() > 0)
s.mockSessMgr.EXPECT().GetCompactionPlanResult(t.NodeID, mock.Anything).
Return(&datapb.CompactionPlanResult{
PlanID: t.GetPlanID(),
State: datapb.CompactionTaskState_failed,
}, nil).Once()
s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetNodeID(), mock.Anything).Return(nil)
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Times(1)
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).Return().Once()
got := t.Process()
s.True(got)
s.Equal(datapb.CompactionTaskState_failed, t.GetState())
})
s.Run("test executing with result failed save compaction meta failed", func() {
t := s.generateTestL0Task(datapb.CompactionTaskState_executing)
t.NodeID = 100
s.Require().True(t.GetNodeID() > 0)
s.mockSessMgr.EXPECT().GetCompactionPlanResult(t.NodeID, mock.Anything).
Return(&datapb.CompactionPlanResult{
PlanID: t.GetPlanID(),
State: datapb.CompactionTaskState_failed,
}, nil).Once()
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(errors.New("mock error")).Once()
got := t.Process()
s.False(got)
s.Equal(datapb.CompactionTaskState_executing, t.GetState())
})
}