fix: Restore the compacting state for stats task during recovery (#39459)

issue: #39333

---------

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
Signed-off-by: cai.zhang <cai.zhang@zilliz.com>
This commit is contained in:
cai.zhang 2025-02-05 17:11:12 +08:00 committed by GitHub
parent f007465942
commit 1d54ff157f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 714 additions and 74 deletions

View File

@ -59,6 +59,8 @@ type compactionPlanContext interface {
getCompactionTasksNumBySignalID(signalID int64) int
getCompactionInfo(ctx context.Context, signalID int64) *compactionInfo
removeTasksByChannel(channel string)
setTaskScheduler(scheduler *taskScheduler)
checkAndSetSegmentStating(channel string, segmentID int64) bool
}
var (
@ -230,6 +232,21 @@ func summaryCompactionState(tasks []*datapb.CompactionTask) *compactionInfo {
return ret
}
func (c *compactionPlanHandler) checkAndSetSegmentStating(channel string, segmentID int64) bool {
c.executingGuard.Lock()
defer c.executingGuard.Unlock()
for _, t := range c.executingTasks {
if t.GetTaskProto().GetType() == datapb.CompactionType_Level0DeleteCompaction {
if t.GetTaskProto().GetChannel() == channel && t.CheckCompactionContainsSegment(segmentID) {
return false
}
}
}
c.meta.SetSegmentStating(segmentID, true)
return true
}
func (c *compactionPlanHandler) getCompactionTasksNumBySignalID(triggerID int64) int {
cnt := 0
c.queueTasks.ForEach(func(ct CompactionTask) {
@ -248,22 +265,21 @@ func (c *compactionPlanHandler) getCompactionTasksNumBySignalID(triggerID int64)
}
func newCompactionPlanHandler(cluster Cluster, sessions session.DataNodeManager, meta CompactionMeta,
allocator allocator.Allocator, analyzeScheduler *taskScheduler, handler Handler,
allocator allocator.Allocator, handler Handler,
) *compactionPlanHandler {
// Higher capacity will have better ordering in priority, but consumes more memory.
// TODO[GOOSE]: Higher capacity makes tasks waiting longer, which need to be get rid of.
capacity := paramtable.Get().DataCoordCfg.CompactionTaskQueueCapacity.GetAsInt()
return &compactionPlanHandler{
queueTasks: *NewCompactionQueue(capacity, getPrioritizer()),
meta: meta,
sessions: sessions,
allocator: allocator,
stopCh: make(chan struct{}),
cluster: cluster,
executingTasks: make(map[int64]CompactionTask),
cleaningTasks: make(map[int64]CompactionTask),
analyzeScheduler: analyzeScheduler,
handler: handler,
queueTasks: *NewCompactionQueue(capacity, getPrioritizer()),
meta: meta,
sessions: sessions,
allocator: allocator,
stopCh: make(chan struct{}),
cluster: cluster,
executingTasks: make(map[int64]CompactionTask),
cleaningTasks: make(map[int64]CompactionTask),
handler: handler,
}
}
@ -277,6 +293,10 @@ func (c *compactionPlanHandler) checkSchedule() {
c.schedule(assigner)
}
func (c *compactionPlanHandler) setTaskScheduler(scheduler *taskScheduler) {
c.analyzeScheduler = scheduler
}
func (c *compactionPlanHandler) schedule(assigner NodeAssigner) []CompactionTask {
selected := make([]CompactionTask, 0)
if c.queueTasks.Len() == 0 {
@ -364,6 +384,13 @@ func (c *compactionPlanHandler) schedule(assigner NodeAssigner) []CompactionTask
}
c.executingGuard.Lock()
// Do not move this check logic outside the lock; it needs to remain mutually exclusive with the stats task.
if t.GetTaskProto().GetType() == datapb.CompactionType_Level0DeleteCompaction {
if !t.PreparePlan() {
c.executingGuard.Unlock()
continue
}
}
c.executingTasks[t.GetTaskProto().GetPlanID()] = t
c.executingGuard.Unlock()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetTaskProto().GetType().String(), metrics.Pending).Dec()
@ -373,7 +400,6 @@ func (c *compactionPlanHandler) schedule(assigner NodeAssigner) []CompactionTask
}
func (c *compactionPlanHandler) start() {
c.loadMeta()
c.stopWg.Add(2)
go c.loopSchedule()
go c.loopClean()

View File

@ -43,6 +43,9 @@ type CompactionTask interface {
SetNodeID(UniqueID) error
NeedReAssignNodeID() bool
SaveTaskMeta() error
PreparePlan() bool
CheckCompactionContainsSegment(segmentID int64) bool
}
type compactionTaskOpt func(task *datapb.CompactionTask)

View File

@ -177,6 +177,14 @@ func (t *clusteringCompactionTask) Clean() bool {
return t.doClean() == nil
}
func (t *clusteringCompactionTask) PreparePlan() bool {
return true
}
func (t *clusteringCompactionTask) CheckCompactionContainsSegment(segmentID int64) bool {
return false
}
func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) {
beginLogID, _, err := t.allocator.AllocN(1)
if err != nil {

View File

@ -85,7 +85,7 @@ func (s *ClusteringCompactionTaskSuite) SetupTest() {
s.mockSessionMgr = session.NewMockDataNodeManager(s.T())
scheduler := newTaskScheduler(ctx, s.meta, nil, cm, newIndexEngineVersionManager(), nil, nil)
scheduler := newTaskScheduler(ctx, s.meta, nil, cm, newIndexEngineVersionManager(), nil, nil, nil)
s.analyzeScheduler = scheduler
}

View File

@ -223,6 +223,54 @@ func (t *l0CompactionTask) ShadowClone(opts ...compactionTaskOpt) *datapb.Compac
return taskClone
}
func (t *l0CompactionTask) selectSealedSegment() ([]int64, []*datapb.CompactionSegmentBinlogs) {
taskProto := t.taskProto.Load().(*datapb.CompactionTask)
// Select sealed L1 segments for LevelZero compaction that meets the condition:
// dmlPos < triggerInfo.pos
sealedSegments := t.meta.SelectSegments(context.TODO(), WithCollection(taskProto.GetCollectionID()), SegmentFilterFunc(func(info *SegmentInfo) bool {
return (taskProto.GetPartitionID() == common.AllPartitionsID || info.GetPartitionID() == taskProto.GetPartitionID()) &&
info.GetInsertChannel() == taskProto.GetChannel() &&
isFlushState(info.GetState()) &&
!info.GetIsImporting() &&
info.GetLevel() != datapb.SegmentLevel_L0 &&
info.GetStartPosition().GetTimestamp() < taskProto.GetPos().GetTimestamp()
}))
sealedSegBinlogs := lo.Map(sealedSegments, func(info *SegmentInfo, _ int) *datapb.CompactionSegmentBinlogs {
return &datapb.CompactionSegmentBinlogs{
SegmentID: info.GetID(),
Field2StatslogPaths: info.GetStatslogs(),
InsertChannel: info.GetInsertChannel(),
Level: info.GetLevel(),
CollectionID: info.GetCollectionID(),
PartitionID: info.GetPartitionID(),
IsSorted: info.GetIsSorted(),
}
})
sealedSegmentIDs := lo.Map(sealedSegments, func(info *SegmentInfo, _ int) int64 {
return info.GetID()
})
return sealedSegmentIDs, sealedSegBinlogs
}
func (t *l0CompactionTask) CheckCompactionContainsSegment(segmentID int64) bool {
sealedSegmentIDs, _ := t.selectSealedSegment()
for _, sealedSegmentID := range sealedSegmentIDs {
if sealedSegmentID == segmentID {
return true
}
}
return false
}
func (t *l0CompactionTask) PreparePlan() bool {
sealedSegmentIDs, _ := t.selectSealedSegment()
exist, hasStating := t.meta.CheckSegmentsStating(context.TODO(), sealedSegmentIDs)
return exist && !hasStating
}
func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) {
beginLogID, _, err := t.allocator.AllocN(1)
if err != nil {
@ -259,35 +307,13 @@ func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, err
})
}
// Select sealed L1 segments for LevelZero compaction that meets the condition:
// dmlPos < triggerInfo.pos
sealedSegments := t.meta.SelectSegments(context.TODO(), WithCollection(taskProto.GetCollectionID()), SegmentFilterFunc(func(info *SegmentInfo) bool {
return (taskProto.GetPartitionID() == common.AllPartitionsID || info.GetPartitionID() == taskProto.GetPartitionID()) &&
info.GetInsertChannel() == plan.GetChannel() &&
isFlushState(info.GetState()) &&
!info.GetIsImporting() &&
info.GetLevel() != datapb.SegmentLevel_L0 &&
info.GetStartPosition().GetTimestamp() < taskProto.GetPos().GetTimestamp()
}))
if len(sealedSegments) == 0 {
sealedSegmentIDs, sealedSegBinlogs := t.selectSealedSegment()
if len(sealedSegmentIDs) == 0 {
// TODO fast finish l0 segment, just drop l0 segment
log.Info("l0Compaction available non-L0 Segments is empty ")
return nil, errors.Errorf("Selected zero L1/L2 segments for the position=%v", taskProto.GetPos())
}
sealedSegBinlogs := lo.Map(sealedSegments, func(info *SegmentInfo, _ int) *datapb.CompactionSegmentBinlogs {
return &datapb.CompactionSegmentBinlogs{
SegmentID: info.GetID(),
Field2StatslogPaths: info.GetStatslogs(),
InsertChannel: info.GetInsertChannel(),
Level: info.GetLevel(),
CollectionID: info.GetCollectionID(),
PartitionID: info.GetPartitionID(),
IsSorted: info.GetIsSorted(),
}
})
plan.SegmentBinlogs = append(plan.SegmentBinlogs, sealedSegBinlogs...)
log.Info("l0CompactionTask refreshed level zero compaction plan",
zap.Any("target position", taskProto.GetPos()),

View File

@ -290,6 +290,14 @@ func (t *mixCompactionTask) SetTask(task *datapb.CompactionTask) {
t.taskProto.Store(task)
}
func (t *mixCompactionTask) PreparePlan() bool {
return true
}
func (t *mixCompactionTask) CheckCompactionContainsSegment(segmentID int64) bool {
return false
}
func (t *mixCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) {
log := log.With(zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("PlanID", t.GetTaskProto().GetPlanID()), zap.Int64("collectionID", t.GetTaskProto().GetCollectionID()))
beginLogID, _, err := t.allocator.AllocN(1)

View File

@ -60,7 +60,7 @@ func (s *CompactionPlanHandlerSuite) SetupTest() {
s.mockCm = NewMockChannelManager(s.T())
s.mockSessMgr = session.NewMockDataNodeManager(s.T())
s.cluster = NewMockCluster(s.T())
s.handler = newCompactionPlanHandler(s.cluster, s.mockSessMgr, s.mockMeta, s.mockAlloc, nil, nil)
s.handler = newCompactionPlanHandler(s.cluster, s.mockSessMgr, s.mockMeta, s.mockAlloc, nil)
s.mockHandler = NewNMockHandler(s.T())
s.mockHandler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{}, nil).Maybe()
}
@ -130,6 +130,22 @@ func (s *CompactionPlanHandlerSuite) generateInitTasksForSchedule() {
func (s *CompactionPlanHandlerSuite) TestScheduleNodeWith1ParallelTask() {
// dataNode 101's paralleTasks has 1 task running, not L0 task
s.mockMeta.EXPECT().SelectSegments(mock.Anything, mock.Anything, mock.Anything).Return([]*SegmentInfo{
{
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 2,
PartitionID: 3,
},
currRows: 0,
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,
lastWrittenTime: time.Time{},
isStating: false,
},
})
s.mockMeta.EXPECT().CheckSegmentsStating(mock.Anything, mock.Anything).Return(true, false)
tests := []struct {
description string
tasks []CompactionTask
@ -236,6 +252,22 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWith1ParallelTask() {
}
func (s *CompactionPlanHandlerSuite) TestScheduleWithSlotLimit() {
s.mockMeta.EXPECT().SelectSegments(mock.Anything, mock.Anything, mock.Anything).Return([]*SegmentInfo{
{
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 2,
PartitionID: 3,
},
currRows: 0,
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,
lastWrittenTime: time.Time{},
isStating: false,
},
})
s.mockMeta.EXPECT().CheckSegmentsStating(mock.Anything, mock.Anything).Return(true, false)
tests := []struct {
description string
tasks []CompactionTask
@ -312,6 +344,23 @@ func (s *CompactionPlanHandlerSuite) TestScheduleWithSlotLimit() {
func (s *CompactionPlanHandlerSuite) TestScheduleNodeWithL0Executing() {
// dataNode 102's paralleTasks has running L0 tasks
// nothing of the same channel will be able to schedule
s.mockMeta.EXPECT().SelectSegments(mock.Anything, mock.Anything, mock.Anything).Return([]*SegmentInfo{
{
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 2,
PartitionID: 3,
},
currRows: 0,
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,
lastWrittenTime: time.Time{},
isStating: false,
},
})
s.mockMeta.EXPECT().CheckSegmentsStating(mock.Anything, mock.Anything).Return(true, false)
tests := []struct {
description string
tasks []CompactionTask
@ -576,6 +625,22 @@ func (s *CompactionPlanHandlerSuite) TestGetCompactionTask() {
}
return ret
})
s.mockMeta.EXPECT().SelectSegments(mock.Anything, mock.Anything, mock.Anything).Return([]*SegmentInfo{
{
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 2,
PartitionID: 3,
},
currRows: 0,
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,
lastWrittenTime: time.Time{},
isStating: false,
},
})
s.mockMeta.EXPECT().CheckSegmentsStating(mock.Anything, mock.Anything).Return(true, false)
for _, t := range inTasks {
s.handler.submitTask(t)
@ -595,7 +660,7 @@ func (s *CompactionPlanHandlerSuite) TestCompactionQueueFull() {
paramtable.Get().Save("dataCoord.compaction.taskQueueCapacity", "1")
defer paramtable.Get().Reset("dataCoord.compaction.taskQueueCapacity")
s.handler = newCompactionPlanHandler(s.cluster, s.mockSessMgr, s.mockMeta, s.mockAlloc, nil, nil)
s.handler = newCompactionPlanHandler(s.cluster, s.mockSessMgr, s.mockMeta, s.mockAlloc, nil)
t1 := newMixCompactionTask(&datapb.CompactionTask{
TriggerID: 1,
@ -621,7 +686,7 @@ func (s *CompactionPlanHandlerSuite) TestCompactionQueueFull() {
func (s *CompactionPlanHandlerSuite) TestExecCompactionPlan() {
s.SetupTest()
s.mockMeta.EXPECT().CheckAndSetSegmentsCompacting(mock.Anything, mock.Anything).Return(true, true).Maybe()
handler := newCompactionPlanHandler(nil, s.mockSessMgr, s.mockMeta, s.mockAlloc, nil, nil)
handler := newCompactionPlanHandler(nil, s.mockSessMgr, s.mockMeta, s.mockAlloc, nil)
task := &datapb.CompactionTask{
TriggerID: 1,

View File

@ -58,6 +58,9 @@ func (h *spyCompactionHandler) getCompactionInfo(ctx context.Context, signalID i
return nil
}
func (h *spyCompactionHandler) setTaskScheduler(scheduler *taskScheduler) {
}
var _ compactionPlanContext = (*spyCompactionHandler)(nil)
func (h *spyCompactionHandler) removeTasksByChannel(channel string) {}
@ -72,6 +75,10 @@ func (h *spyCompactionHandler) enqueueCompaction(task *datapb.CompactionTask) er
return err
}
func (h *spyCompactionHandler) checkAndSetSegmentStating(channel string, segmentID int64) bool {
return false
}
// isFull return true if the task pool is full
func (h *spyCompactionHandler) isFull() bool {
return false

View File

@ -253,7 +253,7 @@ func (jm *statsJobManager) SubmitStatsTask(originSegmentID, targetSegmentID int6
}
if err = jm.mt.statsTaskMeta.AddStatsTask(t); err != nil {
if errors.Is(err, merr.ErrTaskDuplicate) {
log.Info("stats task already exists", zap.Int64("taskID", taskID),
log.RatedInfo(10, "stats task already exists", zap.Int64("taskID", taskID),
zap.Int64("collectionID", originSegment.GetCollectionID()),
zap.Int64("segmentID", originSegment.GetID()))
return nil

View File

@ -65,6 +65,8 @@ type CompactionMeta interface {
CheckAndSetSegmentsCompacting(ctx context.Context, segmentIDs []int64) (bool, bool)
CompleteCompactionMutation(ctx context.Context, t *datapb.CompactionTask, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error)
CleanPartitionStatsInfo(ctx context.Context, info *datapb.PartitionStatsInfo) error
CheckSegmentsStating(ctx context.Context, segmentID []UniqueID) (bool, bool)
SetSegmentStating(segmentID UniqueID, stating bool)
SaveCompactionTask(ctx context.Context, task *datapb.CompactionTask) error
DropCompactionTask(ctx context.Context, task *datapb.CompactionTask) error
@ -1448,6 +1450,31 @@ func (m *meta) SetLastFlushTime(segmentID UniqueID, t time.Time) {
m.segments.SetFlushTime(segmentID, t)
}
func (m *meta) CheckSegmentsStating(ctx context.Context, segmentIDs []UniqueID) (exist bool, hasStating bool) {
m.RLock()
defer m.RUnlock()
exist = true
for _, segmentID := range segmentIDs {
seg := m.segments.GetSegment(segmentID)
if seg != nil {
if seg.isStating {
hasStating = true
}
} else {
exist = false
break
}
}
return exist, hasStating
}
func (m *meta) SetSegmentStating(segmentID UniqueID, stating bool) {
m.Lock()
defer m.Unlock()
m.segments.SetIsStating(segmentID, stating)
}
// SetSegmentCompacting sets compaction state for segment
func (m *meta) SetSegmentCompacting(segmentID UniqueID, compacting bool) {
m.Lock()

View File

@ -79,6 +79,63 @@ func (_c *MockCompactionMeta_CheckAndSetSegmentsCompacting_Call) RunAndReturn(ru
return _c
}
// CheckSegmentsStating provides a mock function with given fields: ctx, segmentID
func (_m *MockCompactionMeta) CheckSegmentsStating(ctx context.Context, segmentID []int64) (bool, bool) {
ret := _m.Called(ctx, segmentID)
if len(ret) == 0 {
panic("no return value specified for CheckSegmentsStating")
}
var r0 bool
var r1 bool
if rf, ok := ret.Get(0).(func(context.Context, []int64) (bool, bool)); ok {
return rf(ctx, segmentID)
}
if rf, ok := ret.Get(0).(func(context.Context, []int64) bool); ok {
r0 = rf(ctx, segmentID)
} else {
r0 = ret.Get(0).(bool)
}
if rf, ok := ret.Get(1).(func(context.Context, []int64) bool); ok {
r1 = rf(ctx, segmentID)
} else {
r1 = ret.Get(1).(bool)
}
return r0, r1
}
// MockCompactionMeta_CheckSegmentsStating_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckSegmentsStating'
type MockCompactionMeta_CheckSegmentsStating_Call struct {
*mock.Call
}
// CheckSegmentsStating is a helper method to define mock.On call
// - ctx context.Context
// - segmentID []int64
func (_e *MockCompactionMeta_Expecter) CheckSegmentsStating(ctx interface{}, segmentID interface{}) *MockCompactionMeta_CheckSegmentsStating_Call {
return &MockCompactionMeta_CheckSegmentsStating_Call{Call: _e.mock.On("CheckSegmentsStating", ctx, segmentID)}
}
func (_c *MockCompactionMeta_CheckSegmentsStating_Call) Run(run func(ctx context.Context, segmentID []int64)) *MockCompactionMeta_CheckSegmentsStating_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].([]int64))
})
return _c
}
func (_c *MockCompactionMeta_CheckSegmentsStating_Call) Return(_a0 bool, _a1 bool) *MockCompactionMeta_CheckSegmentsStating_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockCompactionMeta_CheckSegmentsStating_Call) RunAndReturn(run func(context.Context, []int64) (bool, bool)) *MockCompactionMeta_CheckSegmentsStating_Call {
_c.Call.Return(run)
return _c
}
// CleanPartitionStatsInfo provides a mock function with given fields: ctx, info
func (_m *MockCompactionMeta) CleanPartitionStatsInfo(ctx context.Context, info *datapb.PartitionStatsInfo) error {
ret := _m.Called(ctx, info)
@ -735,6 +792,40 @@ func (_c *MockCompactionMeta_SelectSegments_Call) RunAndReturn(run func(context.
return _c
}
// SetSegmentStating provides a mock function with given fields: segmentID, stating
func (_m *MockCompactionMeta) SetSegmentStating(segmentID int64, stating bool) {
_m.Called(segmentID, stating)
}
// MockCompactionMeta_SetSegmentStating_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetSegmentStating'
type MockCompactionMeta_SetSegmentStating_Call struct {
*mock.Call
}
// SetSegmentStating is a helper method to define mock.On call
// - segmentID int64
// - stating bool
func (_e *MockCompactionMeta_Expecter) SetSegmentStating(segmentID interface{}, stating interface{}) *MockCompactionMeta_SetSegmentStating_Call {
return &MockCompactionMeta_SetSegmentStating_Call{Call: _e.mock.On("SetSegmentStating", segmentID, stating)}
}
func (_c *MockCompactionMeta_SetSegmentStating_Call) Run(run func(segmentID int64, stating bool)) *MockCompactionMeta_SetSegmentStating_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64), args[1].(bool))
})
return _c
}
func (_c *MockCompactionMeta_SetSegmentStating_Call) Return() *MockCompactionMeta_SetSegmentStating_Call {
_c.Call.Return()
return _c
}
func (_c *MockCompactionMeta_SetSegmentStating_Call) RunAndReturn(run func(int64, bool)) *MockCompactionMeta_SetSegmentStating_Call {
_c.Call.Return(run)
return _c
}
// SetSegmentsCompacting provides a mock function with given fields: ctx, segmentID, compacting
func (_m *MockCompactionMeta) SetSegmentsCompacting(ctx context.Context, segmentID []int64, compacting bool) {
_m.Called(ctx, segmentID, compacting)

View File

@ -22,6 +22,53 @@ func (_m *MockCompactionPlanContext) EXPECT() *MockCompactionPlanContext_Expecte
return &MockCompactionPlanContext_Expecter{mock: &_m.Mock}
}
// checkAndSetSegmentStating provides a mock function with given fields: channel, segmentID
func (_m *MockCompactionPlanContext) checkAndSetSegmentStating(channel string, segmentID int64) bool {
ret := _m.Called(channel, segmentID)
if len(ret) == 0 {
panic("no return value specified for checkAndSetSegmentStating")
}
var r0 bool
if rf, ok := ret.Get(0).(func(string, int64) bool); ok {
r0 = rf(channel, segmentID)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// MockCompactionPlanContext_checkAndSetSegmentStating_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'checkAndSetSegmentStating'
type MockCompactionPlanContext_checkAndSetSegmentStating_Call struct {
*mock.Call
}
// checkAndSetSegmentStating is a helper method to define mock.On call
// - channel string
// - segmentID int64
func (_e *MockCompactionPlanContext_Expecter) checkAndSetSegmentStating(channel interface{}, segmentID interface{}) *MockCompactionPlanContext_checkAndSetSegmentStating_Call {
return &MockCompactionPlanContext_checkAndSetSegmentStating_Call{Call: _e.mock.On("checkAndSetSegmentStating", channel, segmentID)}
}
func (_c *MockCompactionPlanContext_checkAndSetSegmentStating_Call) Run(run func(channel string, segmentID int64)) *MockCompactionPlanContext_checkAndSetSegmentStating_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string), args[1].(int64))
})
return _c
}
func (_c *MockCompactionPlanContext_checkAndSetSegmentStating_Call) Return(_a0 bool) *MockCompactionPlanContext_checkAndSetSegmentStating_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockCompactionPlanContext_checkAndSetSegmentStating_Call) RunAndReturn(run func(string, int64) bool) *MockCompactionPlanContext_checkAndSetSegmentStating_Call {
_c.Call.Return(run)
return _c
}
// enqueueCompaction provides a mock function with given fields: task
func (_m *MockCompactionPlanContext) enqueueCompaction(task *datapb.CompactionTask) error {
ret := _m.Called(task)
@ -241,6 +288,39 @@ func (_c *MockCompactionPlanContext_removeTasksByChannel_Call) RunAndReturn(run
return _c
}
// setTaskScheduler provides a mock function with given fields: scheduler
func (_m *MockCompactionPlanContext) setTaskScheduler(scheduler *taskScheduler) {
_m.Called(scheduler)
}
// MockCompactionPlanContext_setTaskScheduler_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'setTaskScheduler'
type MockCompactionPlanContext_setTaskScheduler_Call struct {
*mock.Call
}
// setTaskScheduler is a helper method to define mock.On call
// - scheduler *taskScheduler
func (_e *MockCompactionPlanContext_Expecter) setTaskScheduler(scheduler interface{}) *MockCompactionPlanContext_setTaskScheduler_Call {
return &MockCompactionPlanContext_setTaskScheduler_Call{Call: _e.mock.On("setTaskScheduler", scheduler)}
}
func (_c *MockCompactionPlanContext_setTaskScheduler_Call) Run(run func(scheduler *taskScheduler)) *MockCompactionPlanContext_setTaskScheduler_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(*taskScheduler))
})
return _c
}
func (_c *MockCompactionPlanContext_setTaskScheduler_Call) Return() *MockCompactionPlanContext_setTaskScheduler_Call {
_c.Call.Return()
return _c
}
func (_c *MockCompactionPlanContext_setTaskScheduler_Call) RunAndReturn(run func(*taskScheduler)) *MockCompactionPlanContext_setTaskScheduler_Call {
_c.Call.Return(run)
return _c
}
// start provides a mock function with given fields:
func (_m *MockCompactionPlanContext) start() {
_m.Called()

View File

@ -56,6 +56,9 @@ type SegmentInfo struct {
size atomic.Int64
deltaRowcount atomic.Int64
lastWrittenTime time.Time
// It is only to ensure mutual exclusion between L0 compacting and stats tasks
isStating bool
}
// NewSegmentInfo create `SegmentInfo` wrapper from `datapb.SegmentInfo`
@ -278,6 +281,13 @@ func (s *SegmentsInfo) SetIsCompacting(segmentID UniqueID, isCompacting bool) {
}
}
// SetIsStating sets stating status for segment
func (s *SegmentsInfo) SetIsStating(segmentID UniqueID, isStating bool) {
if segment, ok := s.segments[segmentID]; ok {
s.segments[segmentID] = segment.ShadowClone(SetIsStating(isStating))
}
}
func (s *SegmentInfo) IsDeltaLogExists(logID int64) bool {
for _, deltaLogs := range s.GetDeltalogs() {
for _, l := range deltaLogs.GetBinlogs() {
@ -465,6 +475,13 @@ func SetIsCompacting(isCompacting bool) SegmentInfoOption {
}
}
// SetIsStating is the option to set stats state for segment info
func SetIsStating(isStating bool) SegmentInfoOption {
return func(segment *SegmentInfo) {
segment.isStating = isStating
}
}
// SetLevel is the option to set level for segment info
func SetLevel(level datapb.SegmentLevel) SegmentInfoOption {
return func(segment *SegmentInfo) {

View File

@ -382,15 +382,15 @@ func (s *Server) initDataCoord() error {
}
log.Info("init service discovery done")
s.initCompaction()
log.Info("init compaction done")
s.initTaskScheduler(storageCli)
log.Info("init task scheduler done")
s.initJobManager()
log.Info("init statsJobManager done")
s.initCompaction()
log.Info("init compaction done")
if err = s.initSegmentManager(); err != nil {
return err
}
@ -685,7 +685,8 @@ func (s *Server) initMeta(chunkManager storage.ChunkManager) error {
func (s *Server) initTaskScheduler(manager storage.ChunkManager) {
if s.taskScheduler == nil {
s.taskScheduler = newTaskScheduler(s.ctx, s.meta, s.indexNodeManager, manager, s.indexEngineVersionManager, s.handler, s.allocator)
s.taskScheduler = newTaskScheduler(s.ctx, s.meta, s.indexNodeManager, manager, s.indexEngineVersionManager, s.handler, s.allocator, s.compactionHandler)
s.compactionHandler.setTaskScheduler(s.taskScheduler)
}
}
@ -702,7 +703,9 @@ func (s *Server) initIndexNodeManager() {
}
func (s *Server) initCompaction() {
s.compactionHandler = newCompactionPlanHandler(s.cluster, s.sessionManager, s.meta, s.allocator, s.taskScheduler, s.handler)
cph := newCompactionPlanHandler(s.cluster, s.sessionManager, s.meta, s.allocator, s.handler)
cph.loadMeta()
s.compactionHandler = cph
s.compactionTriggerManager = NewCompactionTriggerManager(s.allocator, s.handler, s.compactionHandler, s.meta)
s.compactionTrigger = newCompactionTrigger(s.meta, s.compactionHandler, s.allocator, s.handler, s.indexEngineVersionManager)
}

View File

@ -1706,7 +1706,7 @@ func TestGetCompactionState(t *testing.T) {
{State: datapb.CompactionTaskState_timeout},
{State: datapb.CompactionTaskState_timeout},
})
mockHandler := newCompactionPlanHandler(nil, nil, mockMeta, nil, nil, nil)
mockHandler := newCompactionPlanHandler(nil, nil, mockMeta, nil, nil)
svr.compactionHandler = mockHandler
resp, err := svr.GetCompactionState(context.Background(), &milvuspb.GetCompactionStateRequest{CompactionID: 1})
assert.NoError(t, err)

View File

@ -100,10 +100,10 @@ func (stm *statsTaskMeta) AddStatsTask(t *indexpb.StatsTask) error {
defer stm.Unlock()
for _, st := range stm.tasks {
if st.GetSegmentID() == t.GetSegmentID() && st.GetSubJobType() == t.GetSubJobType() {
if st.GetTaskID() == t.GetTaskID() || (st.GetSegmentID() == t.GetSegmentID() && st.GetSubJobType() == t.GetSubJobType() && st.GetState() != indexpb.JobState_JobStateFailed) {
msg := fmt.Sprintf("stats task already exist in meta of segment %d with subJobType: %s",
t.GetSegmentID(), t.GetSubJobType().String())
log.Warn(msg)
log.RatedWarn(10, msg, zap.Int64("taskID", t.GetTaskID()), zap.Int64("exist taskID", st.GetTaskID()))
return merr.WrapErrTaskDuplicate(indexpb.JobType_JobTypeStatsJob.String(), msg)
}
}

View File

@ -118,7 +118,7 @@ func (at *analyzeTask) GetFailReason() string {
return at.taskInfo.GetFailReason()
}
func (at *analyzeTask) UpdateVersion(ctx context.Context, nodeID int64, meta *meta) error {
func (at *analyzeTask) UpdateVersion(ctx context.Context, nodeID int64, meta *meta, compactionHandler compactionPlanContext) error {
if err := meta.analyzeMeta.UpdateVersion(at.GetTaskID(), nodeID); err != nil {
return err
}
@ -227,7 +227,7 @@ func (at *analyzeTask) PreCheck(ctx context.Context, dependency *taskScheduler)
return true
}
func (at *analyzeTask) AssignTask(ctx context.Context, client types.IndexNodeClient) bool {
func (at *analyzeTask) AssignTask(ctx context.Context, client types.IndexNodeClient, meta *meta) bool {
ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval)
defer cancel()
resp, err := client.CreateJobV2(ctx, &workerpb.CreateJobV2Request{

View File

@ -120,7 +120,7 @@ func (it *indexBuildTask) GetFailReason() string {
return it.taskInfo.FailReason
}
func (it *indexBuildTask) UpdateVersion(ctx context.Context, nodeID int64, meta *meta) error {
func (it *indexBuildTask) UpdateVersion(ctx context.Context, nodeID int64, meta *meta, compactionHandler compactionPlanContext) error {
if err := meta.indexMeta.UpdateVersion(it.taskID, nodeID); err != nil {
return err
}
@ -260,11 +260,13 @@ func (it *indexBuildTask) PreCheck(ctx context.Context, dependency *taskSchedule
}
log.Ctx(ctx).Info("index task pre check successfully", zap.Int64("taskID", it.GetTaskID()),
zap.Int64("segID", segment.GetID()))
zap.Int64("segID", segment.GetID()),
zap.Int32("CurrentIndexVersion", it.req.GetCurrentIndexVersion()),
zap.Int32("CurrentScalarIndexVersion", it.req.GetCurrentScalarIndexVersion()))
return true
}
func (it *indexBuildTask) AssignTask(ctx context.Context, client types.IndexNodeClient) bool {
func (it *indexBuildTask) AssignTask(ctx context.Context, client types.IndexNodeClient, meta *meta) bool {
ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval)
defer cancel()
resp, err := client.CreateJobV2(ctx, &workerpb.CreateJobV2Request{

View File

@ -62,6 +62,7 @@ type taskScheduler struct {
indexEngineVersionManager IndexEngineVersionManager
handler Handler
allocator allocator.Allocator
compactionHandler compactionPlanContext
taskStats *expirable.LRU[UniqueID, Task]
}
@ -73,6 +74,7 @@ func newTaskScheduler(
indexEngineVersionManager IndexEngineVersionManager,
handler Handler,
allocator allocator.Allocator,
compactionHandler compactionPlanContext,
) *taskScheduler {
ctx, cancel := context.WithCancel(ctx)
@ -92,6 +94,7 @@ func newTaskScheduler(
indexEngineVersionManager: indexEngineVersionManager,
allocator: allocator,
taskStats: expirable.NewLRU[UniqueID, Task](64, nil, time.Minute*15),
compactionHandler: compactionHandler,
}
ts.reloadFromMeta()
return ts
@ -153,6 +156,33 @@ func (s *taskScheduler) reloadFromMeta() {
allStatsTasks := s.meta.statsTaskMeta.GetAllTasks()
for taskID, t := range allStatsTasks {
if t.GetState() != indexpb.JobState_JobStateFinished && t.GetState() != indexpb.JobState_JobStateFailed {
if t.GetState() == indexpb.JobState_JobStateInProgress || t.GetState() == indexpb.JobState_JobStateRetry {
if t.GetState() == indexpb.JobState_JobStateInProgress || t.GetState() == indexpb.JobState_JobStateRetry {
exist, canDo := s.meta.CheckAndSetSegmentsCompacting(context.TODO(), []UniqueID{t.GetSegmentID()})
if !exist || !canDo {
log.Ctx(s.ctx).Warn("segment is not exist or is compacting, skip stats, but this should not have happened, try to remove the stats task",
zap.Int64("taskID", taskID), zap.Bool("exist", exist), zap.Bool("canDo", canDo))
err := s.meta.statsTaskMeta.DropStatsTask(t.GetTaskID())
if err == nil {
continue
}
log.Ctx(s.ctx).Warn("remove stats task failed, set to failed", zap.Int64("taskID", taskID), zap.Error(err))
t.State = indexpb.JobState_JobStateFailed
t.FailReason = "segment is not exist or is compacting"
} else {
if !s.compactionHandler.checkAndSetSegmentStating(t.GetInsertChannel(), t.GetSegmentID()) {
s.meta.SetSegmentsCompacting(context.TODO(), []UniqueID{t.GetSegmentID()}, false)
err := s.meta.statsTaskMeta.DropStatsTask(t.GetTaskID())
if err == nil {
continue
}
log.Ctx(s.ctx).Warn("remove stats task failed, set to failed", zap.Int64("taskID", taskID), zap.Error(err))
t.State = indexpb.JobState_JobStateFailed
t.FailReason = "segment is not exist or is l0 compacting"
}
}
}
}
s.enqueue(&statsTask{
taskID: taskID,
segmentID: t.GetSegmentID(),
@ -389,14 +419,14 @@ func (s *taskScheduler) processInit(task Task) bool {
log.Ctx(s.ctx).Info("pick client success", zap.Int64("taskID", task.GetTaskID()), zap.Int64("nodeID", nodeID))
// 2. update version
if err := task.UpdateVersion(s.ctx, nodeID, s.meta); err != nil {
if err := task.UpdateVersion(s.ctx, nodeID, s.meta, s.compactionHandler); err != nil {
log.Ctx(s.ctx).Warn("update task version failed", zap.Int64("taskID", task.GetTaskID()), zap.Error(err))
return false
}
log.Ctx(s.ctx).Info("update task version success", zap.Int64("taskID", task.GetTaskID()))
// 3. assign task to indexNode
success := task.AssignTask(s.ctx, client)
success := task.AssignTask(s.ctx, client, s.meta)
if !success {
log.Ctx(s.ctx).Warn("assign task to client failed", zap.Int64("taskID", task.GetTaskID()),
zap.String("new state", task.GetState().String()), zap.String("fail reason", task.GetFailReason()))
@ -465,6 +495,7 @@ func (s *taskScheduler) processInProgress(task Task) bool {
if exist {
task.QueryResult(s.ctx, client)
if task.GetState() == indexpb.JobState_JobStateFinished || task.GetState() == indexpb.JobState_JobStateFailed {
task.ResetTask(s.meta)
return s.processFinished(task)
}
return true

View File

@ -53,6 +53,7 @@ var (
buildID = UniqueID(600)
nodeID = UniqueID(700)
partitionKeyID = UniqueID(800)
statsTaskID = UniqueID(900)
)
func createIndexMeta(catalog metastore.DataCoordCatalog) *indexMeta {
@ -852,7 +853,7 @@ func (s *taskSchedulerSuite) scheduler(handler Handler) {
cm := mocks.NewChunkManager(s.T())
cm.EXPECT().RootPath().Return("root")
scheduler := newTaskScheduler(ctx, mt, workerManager, cm, newIndexEngineVersionManager(), handler, nil)
scheduler := newTaskScheduler(ctx, mt, workerManager, cm, newIndexEngineVersionManager(), handler, nil, nil)
s.Equal(9, len(scheduler.tasks))
s.Equal(indexpb.JobState_JobStateInit, scheduler.tasks[1].GetState())
s.Equal(indexpb.JobState_JobStateInProgress, scheduler.tasks[2].GetState())
@ -999,7 +1000,7 @@ func (s *taskSchedulerSuite) Test_analyzeTaskFailCase() {
}))
handler := NewNMockHandler(s.T())
scheduler := newTaskScheduler(ctx, mt, workerManager, nil, nil, handler, nil)
scheduler := newTaskScheduler(ctx, mt, workerManager, nil, nil, handler, nil, nil)
mt.segments.DropSegment(1000)
scheduler.scheduleDuration = s.duration
@ -1059,7 +1060,7 @@ func (s *taskSchedulerSuite) Test_analyzeTaskFailCase() {
},
}, nil)
scheduler := newTaskScheduler(ctx, mt, workerManager, nil, nil, handler, nil)
scheduler := newTaskScheduler(ctx, mt, workerManager, nil, nil, handler, nil, nil)
// remove task in meta
err := scheduler.meta.analyzeMeta.DropAnalyzeTask(context.TODO(), 1)
@ -1340,7 +1341,7 @@ func (s *taskSchedulerSuite) Test_indexTaskFailCase() {
cm.EXPECT().RootPath().Return("ut-index")
handler := NewNMockHandler(s.T())
scheduler := newTaskScheduler(ctx, mt, workerManager, cm, newIndexEngineVersionManager(), handler, nil)
scheduler := newTaskScheduler(ctx, mt, workerManager, cm, newIndexEngineVersionManager(), handler, nil, nil)
paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("True")
defer paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("False")
@ -1614,7 +1615,7 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() {
paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true")
defer paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("false")
scheduler := newTaskScheduler(ctx, &mt, workerManager, cm, newIndexEngineVersionManager(), handler, nil)
scheduler := newTaskScheduler(ctx, &mt, workerManager, cm, newIndexEngineVersionManager(), handler, nil, nil)
waitTaskDoneFunc := func(sche *taskScheduler) {
for {
@ -1853,7 +1854,7 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() {
handler_isolation := NewNMockHandler(s.T())
handler_isolation.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(isoCollInfo, nil)
scheduler_isolation := newTaskScheduler(ctx, &mt, workerManager, cm, newIndexEngineVersionManager(), handler_isolation, nil)
scheduler_isolation := newTaskScheduler(ctx, &mt, workerManager, cm, newIndexEngineVersionManager(), handler_isolation, nil, nil)
scheduler_isolation.Start()
s.Run("Submit partitionKeyIsolation is false when MV not enabled", func() {
@ -1924,3 +1925,188 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() {
})
scheduler_isolation.Stop()
}
func (s *taskSchedulerSuite) Test_reload() {
s.Run("normal case", func() {
catalog := catalogmocks.NewDataCoordCatalog(s.T())
workerManager := session.NewMockWorkerManager(s.T())
handler := NewNMockHandler(s.T())
mt := createMeta(catalog, withAnalyzeMeta(s.createAnalyzeMeta(catalog)), withIndexMeta(createIndexMeta(catalog)),
withStatsTaskMeta(&statsTaskMeta{
ctx: context.Background(),
catalog: catalog,
tasks: map[int64]*indexpb.StatsTask{
statsTaskID: {
CollectionID: 10000,
PartitionID: 10001,
SegmentID: 1000,
InsertChannel: "",
TaskID: statsTaskID,
Version: 1,
NodeID: 1,
State: indexpb.JobState_JobStateInProgress,
FailReason: "",
TargetSegmentID: 2000,
SubJobType: indexpb.StatsSubJob_Sort,
CanRecycle: false,
},
},
}))
compactionHandler := NewMockCompactionPlanContext(s.T())
compactionHandler.EXPECT().checkAndSetSegmentStating(mock.Anything, mock.Anything).Return(true).Maybe()
scheduler := newTaskScheduler(context.Background(), mt, workerManager, nil, nil, handler, nil, compactionHandler)
s.NotNil(scheduler)
s.True(mt.segments.segments[1000].isCompacting)
task, ok := scheduler.tasks[statsTaskID]
s.True(ok)
s.NotNil(task)
})
s.Run("segment is compacting", func() {
catalog := catalogmocks.NewDataCoordCatalog(s.T())
catalog.EXPECT().DropStatsTask(mock.Anything, mock.Anything).Return(nil)
workerManager := session.NewMockWorkerManager(s.T())
handler := NewNMockHandler(s.T())
mt := createMeta(catalog, withAnalyzeMeta(s.createAnalyzeMeta(catalog)), withIndexMeta(createIndexMeta(catalog)),
withStatsTaskMeta(&statsTaskMeta{
ctx: context.Background(),
catalog: catalog,
tasks: map[int64]*indexpb.StatsTask{
statsTaskID: {
CollectionID: 10000,
PartitionID: 10001,
SegmentID: 1000,
InsertChannel: "",
TaskID: statsTaskID,
Version: 1,
NodeID: 1,
State: indexpb.JobState_JobStateInProgress,
FailReason: "",
TargetSegmentID: 2000,
SubJobType: indexpb.StatsSubJob_Sort,
CanRecycle: false,
},
},
}))
compactionHandler := NewMockCompactionPlanContext(s.T())
compactionHandler.EXPECT().checkAndSetSegmentStating(mock.Anything, mock.Anything).Return(true).Maybe()
mt.segments.segments[1000].isCompacting = true
scheduler := newTaskScheduler(context.Background(), mt, workerManager, nil, nil, handler, nil, compactionHandler)
s.NotNil(scheduler)
s.True(mt.segments.segments[1000].isCompacting)
task, ok := scheduler.tasks[statsTaskID]
s.False(ok)
s.Nil(task)
})
s.Run("drop task failed", func() {
catalog := catalogmocks.NewDataCoordCatalog(s.T())
catalog.EXPECT().DropStatsTask(mock.Anything, mock.Anything).Return(errors.New("mock error"))
workerManager := session.NewMockWorkerManager(s.T())
handler := NewNMockHandler(s.T())
mt := createMeta(catalog, withAnalyzeMeta(s.createAnalyzeMeta(catalog)), withIndexMeta(createIndexMeta(catalog)),
withStatsTaskMeta(&statsTaskMeta{
ctx: context.Background(),
catalog: catalog,
tasks: map[int64]*indexpb.StatsTask{
statsTaskID: {
CollectionID: 10000,
PartitionID: 10001,
SegmentID: 1000,
InsertChannel: "",
TaskID: statsTaskID,
Version: 1,
NodeID: 1,
State: indexpb.JobState_JobStateInProgress,
FailReason: "",
TargetSegmentID: 2000,
SubJobType: indexpb.StatsSubJob_Sort,
CanRecycle: false,
},
},
}))
compactionHandler := NewMockCompactionPlanContext(s.T())
compactionHandler.EXPECT().checkAndSetSegmentStating(mock.Anything, mock.Anything).Return(true).Maybe()
mt.segments.segments[1000].isCompacting = true
scheduler := newTaskScheduler(context.Background(), mt, workerManager, nil, nil, handler, nil, compactionHandler)
s.NotNil(scheduler)
s.True(mt.segments.segments[1000].isCompacting)
task, ok := scheduler.tasks[statsTaskID]
s.True(ok)
s.Equal(indexpb.JobState_JobStateFailed, task.GetState())
})
s.Run("segment is in l0 compaction", func() {
catalog := catalogmocks.NewDataCoordCatalog(s.T())
catalog.EXPECT().DropStatsTask(mock.Anything, mock.Anything).Return(nil)
workerManager := session.NewMockWorkerManager(s.T())
handler := NewNMockHandler(s.T())
mt := createMeta(catalog, withAnalyzeMeta(s.createAnalyzeMeta(catalog)), withIndexMeta(createIndexMeta(catalog)),
withStatsTaskMeta(&statsTaskMeta{
ctx: context.Background(),
catalog: catalog,
tasks: map[int64]*indexpb.StatsTask{
statsTaskID: {
CollectionID: 10000,
PartitionID: 10001,
SegmentID: 1000,
InsertChannel: "",
TaskID: statsTaskID,
Version: 1,
NodeID: 1,
State: indexpb.JobState_JobStateInProgress,
FailReason: "",
TargetSegmentID: 2000,
SubJobType: indexpb.StatsSubJob_Sort,
CanRecycle: false,
},
},
}))
compactionHandler := NewMockCompactionPlanContext(s.T())
compactionHandler.EXPECT().checkAndSetSegmentStating(mock.Anything, mock.Anything).Return(false).Maybe()
mt.segments.segments[1000].isCompacting = false
scheduler := newTaskScheduler(context.Background(), mt, workerManager, nil, nil, handler, nil, compactionHandler)
s.NotNil(scheduler)
s.False(mt.segments.segments[1000].isCompacting)
task, ok := scheduler.tasks[statsTaskID]
s.False(ok)
s.Nil(task)
})
s.Run("drop task failed", func() {
catalog := catalogmocks.NewDataCoordCatalog(s.T())
catalog.EXPECT().DropStatsTask(mock.Anything, mock.Anything).Return(errors.New("mock error"))
workerManager := session.NewMockWorkerManager(s.T())
handler := NewNMockHandler(s.T())
mt := createMeta(catalog, withAnalyzeMeta(s.createAnalyzeMeta(catalog)), withIndexMeta(createIndexMeta(catalog)),
withStatsTaskMeta(&statsTaskMeta{
ctx: context.Background(),
catalog: catalog,
tasks: map[int64]*indexpb.StatsTask{
statsTaskID: {
CollectionID: 10000,
PartitionID: 10001,
SegmentID: 1000,
InsertChannel: "",
TaskID: statsTaskID,
Version: 1,
NodeID: 1,
State: indexpb.JobState_JobStateInProgress,
FailReason: "",
TargetSegmentID: 2000,
SubJobType: indexpb.StatsSubJob_Sort,
CanRecycle: false,
},
},
}))
compactionHandler := NewMockCompactionPlanContext(s.T())
compactionHandler.EXPECT().checkAndSetSegmentStating(mock.Anything, mock.Anything).Return(false).Maybe()
mt.segments.segments[1000].isCompacting = false
scheduler := newTaskScheduler(context.Background(), mt, workerManager, nil, nil, handler, nil, compactionHandler)
s.NotNil(scheduler)
s.False(mt.segments.segments[1000].isCompacting)
task, ok := scheduler.tasks[statsTaskID]
s.True(ok)
s.Equal(indexpb.JobState_JobStateFailed, task.GetState())
})
}

View File

@ -79,6 +79,7 @@ func (st *statsTask) ResetTask(mt *meta) {
// reset isCompacting
mt.SetSegmentsCompacting(context.TODO(), []UniqueID{st.segmentID}, false)
mt.SetSegmentStating(st.segmentID, false)
}
func (st *statsTask) SetQueueTime(t time.Time) {
@ -127,15 +128,26 @@ func (st *statsTask) GetFailReason() string {
return st.taskInfo.GetFailReason()
}
func (st *statsTask) UpdateVersion(ctx context.Context, nodeID int64, meta *meta) error {
func (st *statsTask) UpdateVersion(ctx context.Context, nodeID int64, meta *meta, compactionHandler compactionPlanContext) error {
// mark compacting
if exist, canDo := meta.CheckAndSetSegmentsCompacting(ctx, []UniqueID{st.segmentID}); !exist || !canDo {
log.Warn("segment is not exist or is compacting, skip stats",
zap.Bool("exist", exist), zap.Bool("canDo", canDo))
st.SetState(indexpb.JobState_JobStateNone, "segment is not healthy")
st.SetState(indexpb.JobState_JobStateFailed, "segment is not healthy")
st.SetStartTime(time.Now())
return fmt.Errorf("mark segment compacting failed, isCompacting: %v", !canDo)
}
if !compactionHandler.checkAndSetSegmentStating(st.req.GetInsertChannel(), st.segmentID) {
log.Warn("segment is contains by l0 compaction, skip stats", zap.Int64("taskID", st.taskID),
zap.Int64("segmentID", st.segmentID))
st.SetState(indexpb.JobState_JobStateFailed, "segment is contains by l0 compaction")
//reset compacting
meta.SetSegmentsCompacting(ctx, []UniqueID{st.segmentID}, false)
st.SetStartTime(time.Now())
return fmt.Errorf("segment is contains by l0 compaction")
}
if err := meta.statsTaskMeta.UpdateVersion(st.taskID, nodeID); err != nil {
return err
}
@ -194,8 +206,6 @@ func (st *statsTask) PreCheck(ctx context.Context, dependency *taskScheduler) bo
PartitionID: segment.GetPartitionID(),
InsertChannel: segment.GetInsertChannel(),
SegmentID: segment.GetID(),
InsertLogs: segment.GetBinlogs(),
DeltaLogs: segment.GetDeltalogs(),
StorageConfig: createStorageConfig(),
Schema: collInfo.Schema,
SubJobType: st.subJobType,
@ -211,7 +221,19 @@ func (st *statsTask) PreCheck(ctx context.Context, dependency *taskScheduler) bo
return true
}
func (st *statsTask) AssignTask(ctx context.Context, client types.IndexNodeClient) bool {
func (st *statsTask) AssignTask(ctx context.Context, client types.IndexNodeClient, meta *meta) bool {
segment := meta.GetHealthySegment(ctx, st.segmentID)
if segment == nil {
log.Ctx(ctx).Warn("segment is node healthy, skip stats")
// need to set retry and reset compacting
st.SetState(indexpb.JobState_JobStateRetry, "segment is not healthy")
return false
}
// Set InsertLogs and DeltaLogs before execution, and wait for the L0 compaction containing the segment to complete
st.req.InsertLogs = segment.GetBinlogs()
st.req.DeltaLogs = segment.GetDeltalogs()
ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
defer cancel()
resp, err := client.CreateJobV2(ctx, &workerpb.CreateJobV2Request{

View File

@ -166,21 +166,33 @@ func (s *statsTaskSuite) TestTaskStats_PreCheck() {
s.Run("segment is compacting", func() {
s.mt.segments.segments[s.segID].isCompacting = true
s.Error(st.UpdateVersion(context.Background(), 1, s.mt))
s.Error(st.UpdateVersion(context.Background(), 1, s.mt, nil))
})
s.Run("segment is in l0 compaction", func() {
s.mt.segments.segments[s.segID].isCompacting = false
compactionHandler := NewMockCompactionPlanContext(s.T())
compactionHandler.EXPECT().checkAndSetSegmentStating(mock.Anything, mock.Anything).Return(false)
s.Error(st.UpdateVersion(context.Background(), 1, s.mt, compactionHandler))
s.False(s.mt.segments.segments[s.segID].isCompacting)
})
s.Run("normal case", func() {
s.mt.segments.segments[s.segID].isCompacting = false
compactionHandler := NewMockCompactionPlanContext(s.T())
compactionHandler.EXPECT().checkAndSetSegmentStating(mock.Anything, mock.Anything).Return(true)
catalog.EXPECT().SaveStatsTask(mock.Anything, mock.Anything).Return(nil).Once()
s.NoError(st.UpdateVersion(context.Background(), 1, s.mt))
s.NoError(st.UpdateVersion(context.Background(), 1, s.mt, compactionHandler))
})
s.Run("failed case", func() {
s.mt.segments.segments[s.segID].isCompacting = false
compactionHandler := NewMockCompactionPlanContext(s.T())
compactionHandler.EXPECT().checkAndSetSegmentStating(mock.Anything, mock.Anything).Return(true)
catalog.EXPECT().SaveStatsTask(mock.Anything, mock.Anything).Return(fmt.Errorf("error")).Once()
s.Error(st.UpdateVersion(context.Background(), 1, s.mt))
s.Error(st.UpdateVersion(context.Background(), 1, s.mt, compactionHandler))
})
})
@ -365,7 +377,20 @@ func (s *statsTaskSuite) TestTaskStats_PreCheck() {
Reason: "mock error",
}, nil)
s.False(st.AssignTask(context.Background(), in))
mt := &meta{
segments: &SegmentsInfo{
segments: map[int64]*SegmentInfo{
st.segmentID: {
SegmentInfo: &datapb.SegmentInfo{
ID: st.segmentID,
State: commonpb.SegmentState_Flushed,
},
},
},
},
}
s.False(st.AssignTask(context.Background(), in, mt))
})
s.Run("assign success", func() {
@ -375,7 +400,20 @@ func (s *statsTaskSuite) TestTaskStats_PreCheck() {
Reason: "",
}, nil)
s.True(st.AssignTask(context.Background(), in))
mt := &meta{
segments: &SegmentsInfo{
segments: map[int64]*SegmentInfo{
st.segmentID: {
SegmentInfo: &datapb.SegmentInfo{
ID: st.segmentID,
State: commonpb.SegmentState_Flushed,
},
},
},
},
}
s.True(st.AssignTask(context.Background(), in, mt))
})
})

View File

@ -33,9 +33,9 @@ type Task interface {
SetState(state indexpb.JobState, failReason string)
GetState() indexpb.JobState
GetFailReason() string
UpdateVersion(ctx context.Context, nodeID int64, meta *meta) error
UpdateVersion(ctx context.Context, nodeID int64, meta *meta, compactionHandler compactionPlanContext) error
UpdateMetaBuildingState(meta *meta) error
AssignTask(ctx context.Context, client types.IndexNodeClient) bool
AssignTask(ctx context.Context, client types.IndexNodeClient, meta *meta) bool
QueryResult(ctx context.Context, client types.IndexNodeClient)
DropTaskOnWorker(ctx context.Context, client types.IndexNodeClient) bool
SetJobInfo(meta *meta) error