diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index 98f2df1f6c..a72f5b3e9c 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -59,6 +59,8 @@ type compactionPlanContext interface { getCompactionTasksNumBySignalID(signalID int64) int getCompactionInfo(ctx context.Context, signalID int64) *compactionInfo removeTasksByChannel(channel string) + setTaskScheduler(scheduler *taskScheduler) + checkAndSetSegmentStating(channel string, segmentID int64) bool } var ( @@ -230,6 +232,21 @@ func summaryCompactionState(tasks []*datapb.CompactionTask) *compactionInfo { return ret } +func (c *compactionPlanHandler) checkAndSetSegmentStating(channel string, segmentID int64) bool { + c.executingGuard.Lock() + defer c.executingGuard.Unlock() + + for _, t := range c.executingTasks { + if t.GetTaskProto().GetType() == datapb.CompactionType_Level0DeleteCompaction { + if t.GetTaskProto().GetChannel() == channel && t.CheckCompactionContainsSegment(segmentID) { + return false + } + } + } + c.meta.SetSegmentStating(segmentID, true) + return true +} + func (c *compactionPlanHandler) getCompactionTasksNumBySignalID(triggerID int64) int { cnt := 0 c.queueTasks.ForEach(func(ct CompactionTask) { @@ -248,22 +265,21 @@ func (c *compactionPlanHandler) getCompactionTasksNumBySignalID(triggerID int64) } func newCompactionPlanHandler(cluster Cluster, sessions session.DataNodeManager, meta CompactionMeta, - allocator allocator.Allocator, analyzeScheduler *taskScheduler, handler Handler, + allocator allocator.Allocator, handler Handler, ) *compactionPlanHandler { // Higher capacity will have better ordering in priority, but consumes more memory. // TODO[GOOSE]: Higher capacity makes tasks waiting longer, which need to be get rid of. capacity := paramtable.Get().DataCoordCfg.CompactionTaskQueueCapacity.GetAsInt() return &compactionPlanHandler{ - queueTasks: *NewCompactionQueue(capacity, getPrioritizer()), - meta: meta, - sessions: sessions, - allocator: allocator, - stopCh: make(chan struct{}), - cluster: cluster, - executingTasks: make(map[int64]CompactionTask), - cleaningTasks: make(map[int64]CompactionTask), - analyzeScheduler: analyzeScheduler, - handler: handler, + queueTasks: *NewCompactionQueue(capacity, getPrioritizer()), + meta: meta, + sessions: sessions, + allocator: allocator, + stopCh: make(chan struct{}), + cluster: cluster, + executingTasks: make(map[int64]CompactionTask), + cleaningTasks: make(map[int64]CompactionTask), + handler: handler, } } @@ -277,6 +293,10 @@ func (c *compactionPlanHandler) checkSchedule() { c.schedule(assigner) } +func (c *compactionPlanHandler) setTaskScheduler(scheduler *taskScheduler) { + c.analyzeScheduler = scheduler +} + func (c *compactionPlanHandler) schedule(assigner NodeAssigner) []CompactionTask { selected := make([]CompactionTask, 0) if c.queueTasks.Len() == 0 { @@ -364,6 +384,13 @@ func (c *compactionPlanHandler) schedule(assigner NodeAssigner) []CompactionTask } c.executingGuard.Lock() + // Do not move this check logic outside the lock; it needs to remain mutually exclusive with the stats task. + if t.GetTaskProto().GetType() == datapb.CompactionType_Level0DeleteCompaction { + if !t.PreparePlan() { + c.executingGuard.Unlock() + continue + } + } c.executingTasks[t.GetTaskProto().GetPlanID()] = t c.executingGuard.Unlock() metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetTaskProto().GetType().String(), metrics.Pending).Dec() @@ -373,7 +400,6 @@ func (c *compactionPlanHandler) schedule(assigner NodeAssigner) []CompactionTask } func (c *compactionPlanHandler) start() { - c.loadMeta() c.stopWg.Add(2) go c.loopSchedule() go c.loopClean() diff --git a/internal/datacoord/compaction_task.go b/internal/datacoord/compaction_task.go index a5b8020856..6b8e79a84d 100644 --- a/internal/datacoord/compaction_task.go +++ b/internal/datacoord/compaction_task.go @@ -43,6 +43,9 @@ type CompactionTask interface { SetNodeID(UniqueID) error NeedReAssignNodeID() bool SaveTaskMeta() error + + PreparePlan() bool + CheckCompactionContainsSegment(segmentID int64) bool } type compactionTaskOpt func(task *datapb.CompactionTask) diff --git a/internal/datacoord/compaction_task_clustering.go b/internal/datacoord/compaction_task_clustering.go index 282fd98942..d6769aa7aa 100644 --- a/internal/datacoord/compaction_task_clustering.go +++ b/internal/datacoord/compaction_task_clustering.go @@ -177,6 +177,14 @@ func (t *clusteringCompactionTask) Clean() bool { return t.doClean() == nil } +func (t *clusteringCompactionTask) PreparePlan() bool { + return true +} + +func (t *clusteringCompactionTask) CheckCompactionContainsSegment(segmentID int64) bool { + return false +} + func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) { beginLogID, _, err := t.allocator.AllocN(1) if err != nil { diff --git a/internal/datacoord/compaction_task_clustering_test.go b/internal/datacoord/compaction_task_clustering_test.go index 49c413a7cd..cd87008be2 100644 --- a/internal/datacoord/compaction_task_clustering_test.go +++ b/internal/datacoord/compaction_task_clustering_test.go @@ -85,7 +85,7 @@ func (s *ClusteringCompactionTaskSuite) SetupTest() { s.mockSessionMgr = session.NewMockDataNodeManager(s.T()) - scheduler := newTaskScheduler(ctx, s.meta, nil, cm, newIndexEngineVersionManager(), nil, nil) + scheduler := newTaskScheduler(ctx, s.meta, nil, cm, newIndexEngineVersionManager(), nil, nil, nil) s.analyzeScheduler = scheduler } diff --git a/internal/datacoord/compaction_task_l0.go b/internal/datacoord/compaction_task_l0.go index adab10e017..7dbb42e16d 100644 --- a/internal/datacoord/compaction_task_l0.go +++ b/internal/datacoord/compaction_task_l0.go @@ -223,6 +223,54 @@ func (t *l0CompactionTask) ShadowClone(opts ...compactionTaskOpt) *datapb.Compac return taskClone } +func (t *l0CompactionTask) selectSealedSegment() ([]int64, []*datapb.CompactionSegmentBinlogs) { + taskProto := t.taskProto.Load().(*datapb.CompactionTask) + // Select sealed L1 segments for LevelZero compaction that meets the condition: + // dmlPos < triggerInfo.pos + sealedSegments := t.meta.SelectSegments(context.TODO(), WithCollection(taskProto.GetCollectionID()), SegmentFilterFunc(func(info *SegmentInfo) bool { + return (taskProto.GetPartitionID() == common.AllPartitionsID || info.GetPartitionID() == taskProto.GetPartitionID()) && + info.GetInsertChannel() == taskProto.GetChannel() && + isFlushState(info.GetState()) && + !info.GetIsImporting() && + info.GetLevel() != datapb.SegmentLevel_L0 && + info.GetStartPosition().GetTimestamp() < taskProto.GetPos().GetTimestamp() + })) + + sealedSegBinlogs := lo.Map(sealedSegments, func(info *SegmentInfo, _ int) *datapb.CompactionSegmentBinlogs { + return &datapb.CompactionSegmentBinlogs{ + SegmentID: info.GetID(), + Field2StatslogPaths: info.GetStatslogs(), + InsertChannel: info.GetInsertChannel(), + Level: info.GetLevel(), + CollectionID: info.GetCollectionID(), + PartitionID: info.GetPartitionID(), + IsSorted: info.GetIsSorted(), + } + }) + + sealedSegmentIDs := lo.Map(sealedSegments, func(info *SegmentInfo, _ int) int64 { + return info.GetID() + }) + + return sealedSegmentIDs, sealedSegBinlogs +} + +func (t *l0CompactionTask) CheckCompactionContainsSegment(segmentID int64) bool { + sealedSegmentIDs, _ := t.selectSealedSegment() + for _, sealedSegmentID := range sealedSegmentIDs { + if sealedSegmentID == segmentID { + return true + } + } + return false +} + +func (t *l0CompactionTask) PreparePlan() bool { + sealedSegmentIDs, _ := t.selectSealedSegment() + exist, hasStating := t.meta.CheckSegmentsStating(context.TODO(), sealedSegmentIDs) + return exist && !hasStating +} + func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) { beginLogID, _, err := t.allocator.AllocN(1) if err != nil { @@ -259,35 +307,13 @@ func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, err }) } - // Select sealed L1 segments for LevelZero compaction that meets the condition: - // dmlPos < triggerInfo.pos - sealedSegments := t.meta.SelectSegments(context.TODO(), WithCollection(taskProto.GetCollectionID()), SegmentFilterFunc(func(info *SegmentInfo) bool { - return (taskProto.GetPartitionID() == common.AllPartitionsID || info.GetPartitionID() == taskProto.GetPartitionID()) && - info.GetInsertChannel() == plan.GetChannel() && - isFlushState(info.GetState()) && - !info.GetIsImporting() && - info.GetLevel() != datapb.SegmentLevel_L0 && - info.GetStartPosition().GetTimestamp() < taskProto.GetPos().GetTimestamp() - })) - - if len(sealedSegments) == 0 { + sealedSegmentIDs, sealedSegBinlogs := t.selectSealedSegment() + if len(sealedSegmentIDs) == 0 { // TODO fast finish l0 segment, just drop l0 segment log.Info("l0Compaction available non-L0 Segments is empty ") return nil, errors.Errorf("Selected zero L1/L2 segments for the position=%v", taskProto.GetPos()) } - sealedSegBinlogs := lo.Map(sealedSegments, func(info *SegmentInfo, _ int) *datapb.CompactionSegmentBinlogs { - return &datapb.CompactionSegmentBinlogs{ - SegmentID: info.GetID(), - Field2StatslogPaths: info.GetStatslogs(), - InsertChannel: info.GetInsertChannel(), - Level: info.GetLevel(), - CollectionID: info.GetCollectionID(), - PartitionID: info.GetPartitionID(), - IsSorted: info.GetIsSorted(), - } - }) - plan.SegmentBinlogs = append(plan.SegmentBinlogs, sealedSegBinlogs...) log.Info("l0CompactionTask refreshed level zero compaction plan", zap.Any("target position", taskProto.GetPos()), diff --git a/internal/datacoord/compaction_task_mix.go b/internal/datacoord/compaction_task_mix.go index 6f4ffa15ba..c8ff5f45cd 100644 --- a/internal/datacoord/compaction_task_mix.go +++ b/internal/datacoord/compaction_task_mix.go @@ -290,6 +290,14 @@ func (t *mixCompactionTask) SetTask(task *datapb.CompactionTask) { t.taskProto.Store(task) } +func (t *mixCompactionTask) PreparePlan() bool { + return true +} + +func (t *mixCompactionTask) CheckCompactionContainsSegment(segmentID int64) bool { + return false +} + func (t *mixCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) { log := log.With(zap.Int64("triggerID", t.GetTaskProto().GetTriggerID()), zap.Int64("PlanID", t.GetTaskProto().GetPlanID()), zap.Int64("collectionID", t.GetTaskProto().GetCollectionID())) beginLogID, _, err := t.allocator.AllocN(1) diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index 31bf39a484..e67965c0bb 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -60,7 +60,7 @@ func (s *CompactionPlanHandlerSuite) SetupTest() { s.mockCm = NewMockChannelManager(s.T()) s.mockSessMgr = session.NewMockDataNodeManager(s.T()) s.cluster = NewMockCluster(s.T()) - s.handler = newCompactionPlanHandler(s.cluster, s.mockSessMgr, s.mockMeta, s.mockAlloc, nil, nil) + s.handler = newCompactionPlanHandler(s.cluster, s.mockSessMgr, s.mockMeta, s.mockAlloc, nil) s.mockHandler = NewNMockHandler(s.T()) s.mockHandler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{}, nil).Maybe() } @@ -130,6 +130,22 @@ func (s *CompactionPlanHandlerSuite) generateInitTasksForSchedule() { func (s *CompactionPlanHandlerSuite) TestScheduleNodeWith1ParallelTask() { // dataNode 101's paralleTasks has 1 task running, not L0 task + s.mockMeta.EXPECT().SelectSegments(mock.Anything, mock.Anything, mock.Anything).Return([]*SegmentInfo{ + { + SegmentInfo: &datapb.SegmentInfo{ + ID: 1, + CollectionID: 2, + PartitionID: 3, + }, + currRows: 0, + allocations: nil, + lastFlushTime: time.Time{}, + isCompacting: false, + lastWrittenTime: time.Time{}, + isStating: false, + }, + }) + s.mockMeta.EXPECT().CheckSegmentsStating(mock.Anything, mock.Anything).Return(true, false) tests := []struct { description string tasks []CompactionTask @@ -236,6 +252,22 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWith1ParallelTask() { } func (s *CompactionPlanHandlerSuite) TestScheduleWithSlotLimit() { + s.mockMeta.EXPECT().SelectSegments(mock.Anything, mock.Anything, mock.Anything).Return([]*SegmentInfo{ + { + SegmentInfo: &datapb.SegmentInfo{ + ID: 1, + CollectionID: 2, + PartitionID: 3, + }, + currRows: 0, + allocations: nil, + lastFlushTime: time.Time{}, + isCompacting: false, + lastWrittenTime: time.Time{}, + isStating: false, + }, + }) + s.mockMeta.EXPECT().CheckSegmentsStating(mock.Anything, mock.Anything).Return(true, false) tests := []struct { description string tasks []CompactionTask @@ -312,6 +344,23 @@ func (s *CompactionPlanHandlerSuite) TestScheduleWithSlotLimit() { func (s *CompactionPlanHandlerSuite) TestScheduleNodeWithL0Executing() { // dataNode 102's paralleTasks has running L0 tasks // nothing of the same channel will be able to schedule + + s.mockMeta.EXPECT().SelectSegments(mock.Anything, mock.Anything, mock.Anything).Return([]*SegmentInfo{ + { + SegmentInfo: &datapb.SegmentInfo{ + ID: 1, + CollectionID: 2, + PartitionID: 3, + }, + currRows: 0, + allocations: nil, + lastFlushTime: time.Time{}, + isCompacting: false, + lastWrittenTime: time.Time{}, + isStating: false, + }, + }) + s.mockMeta.EXPECT().CheckSegmentsStating(mock.Anything, mock.Anything).Return(true, false) tests := []struct { description string tasks []CompactionTask @@ -576,6 +625,22 @@ func (s *CompactionPlanHandlerSuite) TestGetCompactionTask() { } return ret }) + s.mockMeta.EXPECT().SelectSegments(mock.Anything, mock.Anything, mock.Anything).Return([]*SegmentInfo{ + { + SegmentInfo: &datapb.SegmentInfo{ + ID: 1, + CollectionID: 2, + PartitionID: 3, + }, + currRows: 0, + allocations: nil, + lastFlushTime: time.Time{}, + isCompacting: false, + lastWrittenTime: time.Time{}, + isStating: false, + }, + }) + s.mockMeta.EXPECT().CheckSegmentsStating(mock.Anything, mock.Anything).Return(true, false) for _, t := range inTasks { s.handler.submitTask(t) @@ -595,7 +660,7 @@ func (s *CompactionPlanHandlerSuite) TestCompactionQueueFull() { paramtable.Get().Save("dataCoord.compaction.taskQueueCapacity", "1") defer paramtable.Get().Reset("dataCoord.compaction.taskQueueCapacity") - s.handler = newCompactionPlanHandler(s.cluster, s.mockSessMgr, s.mockMeta, s.mockAlloc, nil, nil) + s.handler = newCompactionPlanHandler(s.cluster, s.mockSessMgr, s.mockMeta, s.mockAlloc, nil) t1 := newMixCompactionTask(&datapb.CompactionTask{ TriggerID: 1, @@ -621,7 +686,7 @@ func (s *CompactionPlanHandlerSuite) TestCompactionQueueFull() { func (s *CompactionPlanHandlerSuite) TestExecCompactionPlan() { s.SetupTest() s.mockMeta.EXPECT().CheckAndSetSegmentsCompacting(mock.Anything, mock.Anything).Return(true, true).Maybe() - handler := newCompactionPlanHandler(nil, s.mockSessMgr, s.mockMeta, s.mockAlloc, nil, nil) + handler := newCompactionPlanHandler(nil, s.mockSessMgr, s.mockMeta, s.mockAlloc, nil) task := &datapb.CompactionTask{ TriggerID: 1, diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index b0f2197b6e..723cc85b72 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -58,6 +58,9 @@ func (h *spyCompactionHandler) getCompactionInfo(ctx context.Context, signalID i return nil } +func (h *spyCompactionHandler) setTaskScheduler(scheduler *taskScheduler) { +} + var _ compactionPlanContext = (*spyCompactionHandler)(nil) func (h *spyCompactionHandler) removeTasksByChannel(channel string) {} @@ -72,6 +75,10 @@ func (h *spyCompactionHandler) enqueueCompaction(task *datapb.CompactionTask) er return err } +func (h *spyCompactionHandler) checkAndSetSegmentStating(channel string, segmentID int64) bool { + return false +} + // isFull return true if the task pool is full func (h *spyCompactionHandler) isFull() bool { return false diff --git a/internal/datacoord/job_manager.go b/internal/datacoord/job_manager.go index 96c910c49b..827db4ca37 100644 --- a/internal/datacoord/job_manager.go +++ b/internal/datacoord/job_manager.go @@ -253,7 +253,7 @@ func (jm *statsJobManager) SubmitStatsTask(originSegmentID, targetSegmentID int6 } if err = jm.mt.statsTaskMeta.AddStatsTask(t); err != nil { if errors.Is(err, merr.ErrTaskDuplicate) { - log.Info("stats task already exists", zap.Int64("taskID", taskID), + log.RatedInfo(10, "stats task already exists", zap.Int64("taskID", taskID), zap.Int64("collectionID", originSegment.GetCollectionID()), zap.Int64("segmentID", originSegment.GetID())) return nil diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 12df98eabf..186b62de9e 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -65,6 +65,8 @@ type CompactionMeta interface { CheckAndSetSegmentsCompacting(ctx context.Context, segmentIDs []int64) (bool, bool) CompleteCompactionMutation(ctx context.Context, t *datapb.CompactionTask, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error) CleanPartitionStatsInfo(ctx context.Context, info *datapb.PartitionStatsInfo) error + CheckSegmentsStating(ctx context.Context, segmentID []UniqueID) (bool, bool) + SetSegmentStating(segmentID UniqueID, stating bool) SaveCompactionTask(ctx context.Context, task *datapb.CompactionTask) error DropCompactionTask(ctx context.Context, task *datapb.CompactionTask) error @@ -1448,6 +1450,31 @@ func (m *meta) SetLastFlushTime(segmentID UniqueID, t time.Time) { m.segments.SetFlushTime(segmentID, t) } +func (m *meta) CheckSegmentsStating(ctx context.Context, segmentIDs []UniqueID) (exist bool, hasStating bool) { + m.RLock() + defer m.RUnlock() + exist = true + for _, segmentID := range segmentIDs { + seg := m.segments.GetSegment(segmentID) + if seg != nil { + if seg.isStating { + hasStating = true + } + } else { + exist = false + break + } + } + return exist, hasStating +} + +func (m *meta) SetSegmentStating(segmentID UniqueID, stating bool) { + m.Lock() + defer m.Unlock() + + m.segments.SetIsStating(segmentID, stating) +} + // SetSegmentCompacting sets compaction state for segment func (m *meta) SetSegmentCompacting(segmentID UniqueID, compacting bool) { m.Lock() diff --git a/internal/datacoord/mock_compaction_meta.go b/internal/datacoord/mock_compaction_meta.go index a809b5022b..59a519d107 100644 --- a/internal/datacoord/mock_compaction_meta.go +++ b/internal/datacoord/mock_compaction_meta.go @@ -79,6 +79,63 @@ func (_c *MockCompactionMeta_CheckAndSetSegmentsCompacting_Call) RunAndReturn(ru return _c } +// CheckSegmentsStating provides a mock function with given fields: ctx, segmentID +func (_m *MockCompactionMeta) CheckSegmentsStating(ctx context.Context, segmentID []int64) (bool, bool) { + ret := _m.Called(ctx, segmentID) + + if len(ret) == 0 { + panic("no return value specified for CheckSegmentsStating") + } + + var r0 bool + var r1 bool + if rf, ok := ret.Get(0).(func(context.Context, []int64) (bool, bool)); ok { + return rf(ctx, segmentID) + } + if rf, ok := ret.Get(0).(func(context.Context, []int64) bool); ok { + r0 = rf(ctx, segmentID) + } else { + r0 = ret.Get(0).(bool) + } + + if rf, ok := ret.Get(1).(func(context.Context, []int64) bool); ok { + r1 = rf(ctx, segmentID) + } else { + r1 = ret.Get(1).(bool) + } + + return r0, r1 +} + +// MockCompactionMeta_CheckSegmentsStating_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckSegmentsStating' +type MockCompactionMeta_CheckSegmentsStating_Call struct { + *mock.Call +} + +// CheckSegmentsStating is a helper method to define mock.On call +// - ctx context.Context +// - segmentID []int64 +func (_e *MockCompactionMeta_Expecter) CheckSegmentsStating(ctx interface{}, segmentID interface{}) *MockCompactionMeta_CheckSegmentsStating_Call { + return &MockCompactionMeta_CheckSegmentsStating_Call{Call: _e.mock.On("CheckSegmentsStating", ctx, segmentID)} +} + +func (_c *MockCompactionMeta_CheckSegmentsStating_Call) Run(run func(ctx context.Context, segmentID []int64)) *MockCompactionMeta_CheckSegmentsStating_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].([]int64)) + }) + return _c +} + +func (_c *MockCompactionMeta_CheckSegmentsStating_Call) Return(_a0 bool, _a1 bool) *MockCompactionMeta_CheckSegmentsStating_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockCompactionMeta_CheckSegmentsStating_Call) RunAndReturn(run func(context.Context, []int64) (bool, bool)) *MockCompactionMeta_CheckSegmentsStating_Call { + _c.Call.Return(run) + return _c +} + // CleanPartitionStatsInfo provides a mock function with given fields: ctx, info func (_m *MockCompactionMeta) CleanPartitionStatsInfo(ctx context.Context, info *datapb.PartitionStatsInfo) error { ret := _m.Called(ctx, info) @@ -735,6 +792,40 @@ func (_c *MockCompactionMeta_SelectSegments_Call) RunAndReturn(run func(context. return _c } +// SetSegmentStating provides a mock function with given fields: segmentID, stating +func (_m *MockCompactionMeta) SetSegmentStating(segmentID int64, stating bool) { + _m.Called(segmentID, stating) +} + +// MockCompactionMeta_SetSegmentStating_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetSegmentStating' +type MockCompactionMeta_SetSegmentStating_Call struct { + *mock.Call +} + +// SetSegmentStating is a helper method to define mock.On call +// - segmentID int64 +// - stating bool +func (_e *MockCompactionMeta_Expecter) SetSegmentStating(segmentID interface{}, stating interface{}) *MockCompactionMeta_SetSegmentStating_Call { + return &MockCompactionMeta_SetSegmentStating_Call{Call: _e.mock.On("SetSegmentStating", segmentID, stating)} +} + +func (_c *MockCompactionMeta_SetSegmentStating_Call) Run(run func(segmentID int64, stating bool)) *MockCompactionMeta_SetSegmentStating_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64), args[1].(bool)) + }) + return _c +} + +func (_c *MockCompactionMeta_SetSegmentStating_Call) Return() *MockCompactionMeta_SetSegmentStating_Call { + _c.Call.Return() + return _c +} + +func (_c *MockCompactionMeta_SetSegmentStating_Call) RunAndReturn(run func(int64, bool)) *MockCompactionMeta_SetSegmentStating_Call { + _c.Call.Return(run) + return _c +} + // SetSegmentsCompacting provides a mock function with given fields: ctx, segmentID, compacting func (_m *MockCompactionMeta) SetSegmentsCompacting(ctx context.Context, segmentID []int64, compacting bool) { _m.Called(ctx, segmentID, compacting) diff --git a/internal/datacoord/mock_compaction_plan_context.go b/internal/datacoord/mock_compaction_plan_context.go index fc6b8ce246..ce73b0cdef 100644 --- a/internal/datacoord/mock_compaction_plan_context.go +++ b/internal/datacoord/mock_compaction_plan_context.go @@ -22,6 +22,53 @@ func (_m *MockCompactionPlanContext) EXPECT() *MockCompactionPlanContext_Expecte return &MockCompactionPlanContext_Expecter{mock: &_m.Mock} } +// checkAndSetSegmentStating provides a mock function with given fields: channel, segmentID +func (_m *MockCompactionPlanContext) checkAndSetSegmentStating(channel string, segmentID int64) bool { + ret := _m.Called(channel, segmentID) + + if len(ret) == 0 { + panic("no return value specified for checkAndSetSegmentStating") + } + + var r0 bool + if rf, ok := ret.Get(0).(func(string, int64) bool); ok { + r0 = rf(channel, segmentID) + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// MockCompactionPlanContext_checkAndSetSegmentStating_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'checkAndSetSegmentStating' +type MockCompactionPlanContext_checkAndSetSegmentStating_Call struct { + *mock.Call +} + +// checkAndSetSegmentStating is a helper method to define mock.On call +// - channel string +// - segmentID int64 +func (_e *MockCompactionPlanContext_Expecter) checkAndSetSegmentStating(channel interface{}, segmentID interface{}) *MockCompactionPlanContext_checkAndSetSegmentStating_Call { + return &MockCompactionPlanContext_checkAndSetSegmentStating_Call{Call: _e.mock.On("checkAndSetSegmentStating", channel, segmentID)} +} + +func (_c *MockCompactionPlanContext_checkAndSetSegmentStating_Call) Run(run func(channel string, segmentID int64)) *MockCompactionPlanContext_checkAndSetSegmentStating_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string), args[1].(int64)) + }) + return _c +} + +func (_c *MockCompactionPlanContext_checkAndSetSegmentStating_Call) Return(_a0 bool) *MockCompactionPlanContext_checkAndSetSegmentStating_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockCompactionPlanContext_checkAndSetSegmentStating_Call) RunAndReturn(run func(string, int64) bool) *MockCompactionPlanContext_checkAndSetSegmentStating_Call { + _c.Call.Return(run) + return _c +} + // enqueueCompaction provides a mock function with given fields: task func (_m *MockCompactionPlanContext) enqueueCompaction(task *datapb.CompactionTask) error { ret := _m.Called(task) @@ -241,6 +288,39 @@ func (_c *MockCompactionPlanContext_removeTasksByChannel_Call) RunAndReturn(run return _c } +// setTaskScheduler provides a mock function with given fields: scheduler +func (_m *MockCompactionPlanContext) setTaskScheduler(scheduler *taskScheduler) { + _m.Called(scheduler) +} + +// MockCompactionPlanContext_setTaskScheduler_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'setTaskScheduler' +type MockCompactionPlanContext_setTaskScheduler_Call struct { + *mock.Call +} + +// setTaskScheduler is a helper method to define mock.On call +// - scheduler *taskScheduler +func (_e *MockCompactionPlanContext_Expecter) setTaskScheduler(scheduler interface{}) *MockCompactionPlanContext_setTaskScheduler_Call { + return &MockCompactionPlanContext_setTaskScheduler_Call{Call: _e.mock.On("setTaskScheduler", scheduler)} +} + +func (_c *MockCompactionPlanContext_setTaskScheduler_Call) Run(run func(scheduler *taskScheduler)) *MockCompactionPlanContext_setTaskScheduler_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*taskScheduler)) + }) + return _c +} + +func (_c *MockCompactionPlanContext_setTaskScheduler_Call) Return() *MockCompactionPlanContext_setTaskScheduler_Call { + _c.Call.Return() + return _c +} + +func (_c *MockCompactionPlanContext_setTaskScheduler_Call) RunAndReturn(run func(*taskScheduler)) *MockCompactionPlanContext_setTaskScheduler_Call { + _c.Call.Return(run) + return _c +} + // start provides a mock function with given fields: func (_m *MockCompactionPlanContext) start() { _m.Called() diff --git a/internal/datacoord/segment_info.go b/internal/datacoord/segment_info.go index dcd08e8cce..9b5825f116 100644 --- a/internal/datacoord/segment_info.go +++ b/internal/datacoord/segment_info.go @@ -56,6 +56,9 @@ type SegmentInfo struct { size atomic.Int64 deltaRowcount atomic.Int64 lastWrittenTime time.Time + + // It is only to ensure mutual exclusion between L0 compacting and stats tasks + isStating bool } // NewSegmentInfo create `SegmentInfo` wrapper from `datapb.SegmentInfo` @@ -278,6 +281,13 @@ func (s *SegmentsInfo) SetIsCompacting(segmentID UniqueID, isCompacting bool) { } } +// SetIsStating sets stating status for segment +func (s *SegmentsInfo) SetIsStating(segmentID UniqueID, isStating bool) { + if segment, ok := s.segments[segmentID]; ok { + s.segments[segmentID] = segment.ShadowClone(SetIsStating(isStating)) + } +} + func (s *SegmentInfo) IsDeltaLogExists(logID int64) bool { for _, deltaLogs := range s.GetDeltalogs() { for _, l := range deltaLogs.GetBinlogs() { @@ -465,6 +475,13 @@ func SetIsCompacting(isCompacting bool) SegmentInfoOption { } } +// SetIsStating is the option to set stats state for segment info +func SetIsStating(isStating bool) SegmentInfoOption { + return func(segment *SegmentInfo) { + segment.isStating = isStating + } +} + // SetLevel is the option to set level for segment info func SetLevel(level datapb.SegmentLevel) SegmentInfoOption { return func(segment *SegmentInfo) { diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 24a1f4de64..648b8796a9 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -382,15 +382,15 @@ func (s *Server) initDataCoord() error { } log.Info("init service discovery done") + s.initCompaction() + log.Info("init compaction done") + s.initTaskScheduler(storageCli) log.Info("init task scheduler done") s.initJobManager() log.Info("init statsJobManager done") - s.initCompaction() - log.Info("init compaction done") - if err = s.initSegmentManager(); err != nil { return err } @@ -685,7 +685,8 @@ func (s *Server) initMeta(chunkManager storage.ChunkManager) error { func (s *Server) initTaskScheduler(manager storage.ChunkManager) { if s.taskScheduler == nil { - s.taskScheduler = newTaskScheduler(s.ctx, s.meta, s.indexNodeManager, manager, s.indexEngineVersionManager, s.handler, s.allocator) + s.taskScheduler = newTaskScheduler(s.ctx, s.meta, s.indexNodeManager, manager, s.indexEngineVersionManager, s.handler, s.allocator, s.compactionHandler) + s.compactionHandler.setTaskScheduler(s.taskScheduler) } } @@ -702,7 +703,9 @@ func (s *Server) initIndexNodeManager() { } func (s *Server) initCompaction() { - s.compactionHandler = newCompactionPlanHandler(s.cluster, s.sessionManager, s.meta, s.allocator, s.taskScheduler, s.handler) + cph := newCompactionPlanHandler(s.cluster, s.sessionManager, s.meta, s.allocator, s.handler) + cph.loadMeta() + s.compactionHandler = cph s.compactionTriggerManager = NewCompactionTriggerManager(s.allocator, s.handler, s.compactionHandler, s.meta) s.compactionTrigger = newCompactionTrigger(s.meta, s.compactionHandler, s.allocator, s.handler, s.indexEngineVersionManager) } diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 4e4cde4967..8fca418b0e 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -1706,7 +1706,7 @@ func TestGetCompactionState(t *testing.T) { {State: datapb.CompactionTaskState_timeout}, {State: datapb.CompactionTaskState_timeout}, }) - mockHandler := newCompactionPlanHandler(nil, nil, mockMeta, nil, nil, nil) + mockHandler := newCompactionPlanHandler(nil, nil, mockMeta, nil, nil) svr.compactionHandler = mockHandler resp, err := svr.GetCompactionState(context.Background(), &milvuspb.GetCompactionStateRequest{CompactionID: 1}) assert.NoError(t, err) diff --git a/internal/datacoord/stats_task_meta.go b/internal/datacoord/stats_task_meta.go index 0eff0e5e6e..d5de7063d7 100644 --- a/internal/datacoord/stats_task_meta.go +++ b/internal/datacoord/stats_task_meta.go @@ -100,10 +100,10 @@ func (stm *statsTaskMeta) AddStatsTask(t *indexpb.StatsTask) error { defer stm.Unlock() for _, st := range stm.tasks { - if st.GetSegmentID() == t.GetSegmentID() && st.GetSubJobType() == t.GetSubJobType() { + if st.GetTaskID() == t.GetTaskID() || (st.GetSegmentID() == t.GetSegmentID() && st.GetSubJobType() == t.GetSubJobType() && st.GetState() != indexpb.JobState_JobStateFailed) { msg := fmt.Sprintf("stats task already exist in meta of segment %d with subJobType: %s", t.GetSegmentID(), t.GetSubJobType().String()) - log.Warn(msg) + log.RatedWarn(10, msg, zap.Int64("taskID", t.GetTaskID()), zap.Int64("exist taskID", st.GetTaskID())) return merr.WrapErrTaskDuplicate(indexpb.JobType_JobTypeStatsJob.String(), msg) } } diff --git a/internal/datacoord/task_analyze.go b/internal/datacoord/task_analyze.go index 97082bcaa8..0a1cac94f5 100644 --- a/internal/datacoord/task_analyze.go +++ b/internal/datacoord/task_analyze.go @@ -118,7 +118,7 @@ func (at *analyzeTask) GetFailReason() string { return at.taskInfo.GetFailReason() } -func (at *analyzeTask) UpdateVersion(ctx context.Context, nodeID int64, meta *meta) error { +func (at *analyzeTask) UpdateVersion(ctx context.Context, nodeID int64, meta *meta, compactionHandler compactionPlanContext) error { if err := meta.analyzeMeta.UpdateVersion(at.GetTaskID(), nodeID); err != nil { return err } @@ -227,7 +227,7 @@ func (at *analyzeTask) PreCheck(ctx context.Context, dependency *taskScheduler) return true } -func (at *analyzeTask) AssignTask(ctx context.Context, client types.IndexNodeClient) bool { +func (at *analyzeTask) AssignTask(ctx context.Context, client types.IndexNodeClient, meta *meta) bool { ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval) defer cancel() resp, err := client.CreateJobV2(ctx, &workerpb.CreateJobV2Request{ diff --git a/internal/datacoord/task_index.go b/internal/datacoord/task_index.go index db88d1f98a..a333302642 100644 --- a/internal/datacoord/task_index.go +++ b/internal/datacoord/task_index.go @@ -120,7 +120,7 @@ func (it *indexBuildTask) GetFailReason() string { return it.taskInfo.FailReason } -func (it *indexBuildTask) UpdateVersion(ctx context.Context, nodeID int64, meta *meta) error { +func (it *indexBuildTask) UpdateVersion(ctx context.Context, nodeID int64, meta *meta, compactionHandler compactionPlanContext) error { if err := meta.indexMeta.UpdateVersion(it.taskID, nodeID); err != nil { return err } @@ -260,11 +260,13 @@ func (it *indexBuildTask) PreCheck(ctx context.Context, dependency *taskSchedule } log.Ctx(ctx).Info("index task pre check successfully", zap.Int64("taskID", it.GetTaskID()), - zap.Int64("segID", segment.GetID())) + zap.Int64("segID", segment.GetID()), + zap.Int32("CurrentIndexVersion", it.req.GetCurrentIndexVersion()), + zap.Int32("CurrentScalarIndexVersion", it.req.GetCurrentScalarIndexVersion())) return true } -func (it *indexBuildTask) AssignTask(ctx context.Context, client types.IndexNodeClient) bool { +func (it *indexBuildTask) AssignTask(ctx context.Context, client types.IndexNodeClient, meta *meta) bool { ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval) defer cancel() resp, err := client.CreateJobV2(ctx, &workerpb.CreateJobV2Request{ diff --git a/internal/datacoord/task_scheduler.go b/internal/datacoord/task_scheduler.go index e264b80342..e04d6d0ceb 100644 --- a/internal/datacoord/task_scheduler.go +++ b/internal/datacoord/task_scheduler.go @@ -62,6 +62,7 @@ type taskScheduler struct { indexEngineVersionManager IndexEngineVersionManager handler Handler allocator allocator.Allocator + compactionHandler compactionPlanContext taskStats *expirable.LRU[UniqueID, Task] } @@ -73,6 +74,7 @@ func newTaskScheduler( indexEngineVersionManager IndexEngineVersionManager, handler Handler, allocator allocator.Allocator, + compactionHandler compactionPlanContext, ) *taskScheduler { ctx, cancel := context.WithCancel(ctx) @@ -92,6 +94,7 @@ func newTaskScheduler( indexEngineVersionManager: indexEngineVersionManager, allocator: allocator, taskStats: expirable.NewLRU[UniqueID, Task](64, nil, time.Minute*15), + compactionHandler: compactionHandler, } ts.reloadFromMeta() return ts @@ -153,6 +156,33 @@ func (s *taskScheduler) reloadFromMeta() { allStatsTasks := s.meta.statsTaskMeta.GetAllTasks() for taskID, t := range allStatsTasks { if t.GetState() != indexpb.JobState_JobStateFinished && t.GetState() != indexpb.JobState_JobStateFailed { + if t.GetState() == indexpb.JobState_JobStateInProgress || t.GetState() == indexpb.JobState_JobStateRetry { + if t.GetState() == indexpb.JobState_JobStateInProgress || t.GetState() == indexpb.JobState_JobStateRetry { + exist, canDo := s.meta.CheckAndSetSegmentsCompacting(context.TODO(), []UniqueID{t.GetSegmentID()}) + if !exist || !canDo { + log.Ctx(s.ctx).Warn("segment is not exist or is compacting, skip stats, but this should not have happened, try to remove the stats task", + zap.Int64("taskID", taskID), zap.Bool("exist", exist), zap.Bool("canDo", canDo)) + err := s.meta.statsTaskMeta.DropStatsTask(t.GetTaskID()) + if err == nil { + continue + } + log.Ctx(s.ctx).Warn("remove stats task failed, set to failed", zap.Int64("taskID", taskID), zap.Error(err)) + t.State = indexpb.JobState_JobStateFailed + t.FailReason = "segment is not exist or is compacting" + } else { + if !s.compactionHandler.checkAndSetSegmentStating(t.GetInsertChannel(), t.GetSegmentID()) { + s.meta.SetSegmentsCompacting(context.TODO(), []UniqueID{t.GetSegmentID()}, false) + err := s.meta.statsTaskMeta.DropStatsTask(t.GetTaskID()) + if err == nil { + continue + } + log.Ctx(s.ctx).Warn("remove stats task failed, set to failed", zap.Int64("taskID", taskID), zap.Error(err)) + t.State = indexpb.JobState_JobStateFailed + t.FailReason = "segment is not exist or is l0 compacting" + } + } + } + } s.enqueue(&statsTask{ taskID: taskID, segmentID: t.GetSegmentID(), @@ -389,14 +419,14 @@ func (s *taskScheduler) processInit(task Task) bool { log.Ctx(s.ctx).Info("pick client success", zap.Int64("taskID", task.GetTaskID()), zap.Int64("nodeID", nodeID)) // 2. update version - if err := task.UpdateVersion(s.ctx, nodeID, s.meta); err != nil { + if err := task.UpdateVersion(s.ctx, nodeID, s.meta, s.compactionHandler); err != nil { log.Ctx(s.ctx).Warn("update task version failed", zap.Int64("taskID", task.GetTaskID()), zap.Error(err)) return false } log.Ctx(s.ctx).Info("update task version success", zap.Int64("taskID", task.GetTaskID())) // 3. assign task to indexNode - success := task.AssignTask(s.ctx, client) + success := task.AssignTask(s.ctx, client, s.meta) if !success { log.Ctx(s.ctx).Warn("assign task to client failed", zap.Int64("taskID", task.GetTaskID()), zap.String("new state", task.GetState().String()), zap.String("fail reason", task.GetFailReason())) @@ -465,6 +495,7 @@ func (s *taskScheduler) processInProgress(task Task) bool { if exist { task.QueryResult(s.ctx, client) if task.GetState() == indexpb.JobState_JobStateFinished || task.GetState() == indexpb.JobState_JobStateFailed { + task.ResetTask(s.meta) return s.processFinished(task) } return true diff --git a/internal/datacoord/task_scheduler_test.go b/internal/datacoord/task_scheduler_test.go index d7122c8078..7d49f232b5 100644 --- a/internal/datacoord/task_scheduler_test.go +++ b/internal/datacoord/task_scheduler_test.go @@ -53,6 +53,7 @@ var ( buildID = UniqueID(600) nodeID = UniqueID(700) partitionKeyID = UniqueID(800) + statsTaskID = UniqueID(900) ) func createIndexMeta(catalog metastore.DataCoordCatalog) *indexMeta { @@ -852,7 +853,7 @@ func (s *taskSchedulerSuite) scheduler(handler Handler) { cm := mocks.NewChunkManager(s.T()) cm.EXPECT().RootPath().Return("root") - scheduler := newTaskScheduler(ctx, mt, workerManager, cm, newIndexEngineVersionManager(), handler, nil) + scheduler := newTaskScheduler(ctx, mt, workerManager, cm, newIndexEngineVersionManager(), handler, nil, nil) s.Equal(9, len(scheduler.tasks)) s.Equal(indexpb.JobState_JobStateInit, scheduler.tasks[1].GetState()) s.Equal(indexpb.JobState_JobStateInProgress, scheduler.tasks[2].GetState()) @@ -999,7 +1000,7 @@ func (s *taskSchedulerSuite) Test_analyzeTaskFailCase() { })) handler := NewNMockHandler(s.T()) - scheduler := newTaskScheduler(ctx, mt, workerManager, nil, nil, handler, nil) + scheduler := newTaskScheduler(ctx, mt, workerManager, nil, nil, handler, nil, nil) mt.segments.DropSegment(1000) scheduler.scheduleDuration = s.duration @@ -1059,7 +1060,7 @@ func (s *taskSchedulerSuite) Test_analyzeTaskFailCase() { }, }, nil) - scheduler := newTaskScheduler(ctx, mt, workerManager, nil, nil, handler, nil) + scheduler := newTaskScheduler(ctx, mt, workerManager, nil, nil, handler, nil, nil) // remove task in meta err := scheduler.meta.analyzeMeta.DropAnalyzeTask(context.TODO(), 1) @@ -1340,7 +1341,7 @@ func (s *taskSchedulerSuite) Test_indexTaskFailCase() { cm.EXPECT().RootPath().Return("ut-index") handler := NewNMockHandler(s.T()) - scheduler := newTaskScheduler(ctx, mt, workerManager, cm, newIndexEngineVersionManager(), handler, nil) + scheduler := newTaskScheduler(ctx, mt, workerManager, cm, newIndexEngineVersionManager(), handler, nil, nil) paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("True") defer paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("False") @@ -1614,7 +1615,7 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() { paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true") defer paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("false") - scheduler := newTaskScheduler(ctx, &mt, workerManager, cm, newIndexEngineVersionManager(), handler, nil) + scheduler := newTaskScheduler(ctx, &mt, workerManager, cm, newIndexEngineVersionManager(), handler, nil, nil) waitTaskDoneFunc := func(sche *taskScheduler) { for { @@ -1853,7 +1854,7 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() { handler_isolation := NewNMockHandler(s.T()) handler_isolation.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(isoCollInfo, nil) - scheduler_isolation := newTaskScheduler(ctx, &mt, workerManager, cm, newIndexEngineVersionManager(), handler_isolation, nil) + scheduler_isolation := newTaskScheduler(ctx, &mt, workerManager, cm, newIndexEngineVersionManager(), handler_isolation, nil, nil) scheduler_isolation.Start() s.Run("Submit partitionKeyIsolation is false when MV not enabled", func() { @@ -1924,3 +1925,188 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() { }) scheduler_isolation.Stop() } + +func (s *taskSchedulerSuite) Test_reload() { + s.Run("normal case", func() { + catalog := catalogmocks.NewDataCoordCatalog(s.T()) + workerManager := session.NewMockWorkerManager(s.T()) + handler := NewNMockHandler(s.T()) + mt := createMeta(catalog, withAnalyzeMeta(s.createAnalyzeMeta(catalog)), withIndexMeta(createIndexMeta(catalog)), + withStatsTaskMeta(&statsTaskMeta{ + ctx: context.Background(), + catalog: catalog, + tasks: map[int64]*indexpb.StatsTask{ + statsTaskID: { + CollectionID: 10000, + PartitionID: 10001, + SegmentID: 1000, + InsertChannel: "", + TaskID: statsTaskID, + Version: 1, + NodeID: 1, + State: indexpb.JobState_JobStateInProgress, + FailReason: "", + TargetSegmentID: 2000, + SubJobType: indexpb.StatsSubJob_Sort, + CanRecycle: false, + }, + }, + })) + compactionHandler := NewMockCompactionPlanContext(s.T()) + compactionHandler.EXPECT().checkAndSetSegmentStating(mock.Anything, mock.Anything).Return(true).Maybe() + scheduler := newTaskScheduler(context.Background(), mt, workerManager, nil, nil, handler, nil, compactionHandler) + s.NotNil(scheduler) + s.True(mt.segments.segments[1000].isCompacting) + task, ok := scheduler.tasks[statsTaskID] + s.True(ok) + s.NotNil(task) + }) + + s.Run("segment is compacting", func() { + catalog := catalogmocks.NewDataCoordCatalog(s.T()) + catalog.EXPECT().DropStatsTask(mock.Anything, mock.Anything).Return(nil) + workerManager := session.NewMockWorkerManager(s.T()) + handler := NewNMockHandler(s.T()) + mt := createMeta(catalog, withAnalyzeMeta(s.createAnalyzeMeta(catalog)), withIndexMeta(createIndexMeta(catalog)), + withStatsTaskMeta(&statsTaskMeta{ + ctx: context.Background(), + catalog: catalog, + tasks: map[int64]*indexpb.StatsTask{ + statsTaskID: { + CollectionID: 10000, + PartitionID: 10001, + SegmentID: 1000, + InsertChannel: "", + TaskID: statsTaskID, + Version: 1, + NodeID: 1, + State: indexpb.JobState_JobStateInProgress, + FailReason: "", + TargetSegmentID: 2000, + SubJobType: indexpb.StatsSubJob_Sort, + CanRecycle: false, + }, + }, + })) + compactionHandler := NewMockCompactionPlanContext(s.T()) + compactionHandler.EXPECT().checkAndSetSegmentStating(mock.Anything, mock.Anything).Return(true).Maybe() + mt.segments.segments[1000].isCompacting = true + scheduler := newTaskScheduler(context.Background(), mt, workerManager, nil, nil, handler, nil, compactionHandler) + s.NotNil(scheduler) + s.True(mt.segments.segments[1000].isCompacting) + task, ok := scheduler.tasks[statsTaskID] + s.False(ok) + s.Nil(task) + }) + + s.Run("drop task failed", func() { + catalog := catalogmocks.NewDataCoordCatalog(s.T()) + catalog.EXPECT().DropStatsTask(mock.Anything, mock.Anything).Return(errors.New("mock error")) + workerManager := session.NewMockWorkerManager(s.T()) + handler := NewNMockHandler(s.T()) + mt := createMeta(catalog, withAnalyzeMeta(s.createAnalyzeMeta(catalog)), withIndexMeta(createIndexMeta(catalog)), + withStatsTaskMeta(&statsTaskMeta{ + ctx: context.Background(), + catalog: catalog, + tasks: map[int64]*indexpb.StatsTask{ + statsTaskID: { + CollectionID: 10000, + PartitionID: 10001, + SegmentID: 1000, + InsertChannel: "", + TaskID: statsTaskID, + Version: 1, + NodeID: 1, + State: indexpb.JobState_JobStateInProgress, + FailReason: "", + TargetSegmentID: 2000, + SubJobType: indexpb.StatsSubJob_Sort, + CanRecycle: false, + }, + }, + })) + compactionHandler := NewMockCompactionPlanContext(s.T()) + compactionHandler.EXPECT().checkAndSetSegmentStating(mock.Anything, mock.Anything).Return(true).Maybe() + mt.segments.segments[1000].isCompacting = true + scheduler := newTaskScheduler(context.Background(), mt, workerManager, nil, nil, handler, nil, compactionHandler) + s.NotNil(scheduler) + s.True(mt.segments.segments[1000].isCompacting) + task, ok := scheduler.tasks[statsTaskID] + s.True(ok) + s.Equal(indexpb.JobState_JobStateFailed, task.GetState()) + }) + + s.Run("segment is in l0 compaction", func() { + catalog := catalogmocks.NewDataCoordCatalog(s.T()) + catalog.EXPECT().DropStatsTask(mock.Anything, mock.Anything).Return(nil) + workerManager := session.NewMockWorkerManager(s.T()) + handler := NewNMockHandler(s.T()) + mt := createMeta(catalog, withAnalyzeMeta(s.createAnalyzeMeta(catalog)), withIndexMeta(createIndexMeta(catalog)), + withStatsTaskMeta(&statsTaskMeta{ + ctx: context.Background(), + catalog: catalog, + tasks: map[int64]*indexpb.StatsTask{ + statsTaskID: { + CollectionID: 10000, + PartitionID: 10001, + SegmentID: 1000, + InsertChannel: "", + TaskID: statsTaskID, + Version: 1, + NodeID: 1, + State: indexpb.JobState_JobStateInProgress, + FailReason: "", + TargetSegmentID: 2000, + SubJobType: indexpb.StatsSubJob_Sort, + CanRecycle: false, + }, + }, + })) + compactionHandler := NewMockCompactionPlanContext(s.T()) + compactionHandler.EXPECT().checkAndSetSegmentStating(mock.Anything, mock.Anything).Return(false).Maybe() + mt.segments.segments[1000].isCompacting = false + scheduler := newTaskScheduler(context.Background(), mt, workerManager, nil, nil, handler, nil, compactionHandler) + s.NotNil(scheduler) + s.False(mt.segments.segments[1000].isCompacting) + task, ok := scheduler.tasks[statsTaskID] + s.False(ok) + s.Nil(task) + }) + + s.Run("drop task failed", func() { + catalog := catalogmocks.NewDataCoordCatalog(s.T()) + catalog.EXPECT().DropStatsTask(mock.Anything, mock.Anything).Return(errors.New("mock error")) + workerManager := session.NewMockWorkerManager(s.T()) + handler := NewNMockHandler(s.T()) + mt := createMeta(catalog, withAnalyzeMeta(s.createAnalyzeMeta(catalog)), withIndexMeta(createIndexMeta(catalog)), + withStatsTaskMeta(&statsTaskMeta{ + ctx: context.Background(), + catalog: catalog, + tasks: map[int64]*indexpb.StatsTask{ + statsTaskID: { + CollectionID: 10000, + PartitionID: 10001, + SegmentID: 1000, + InsertChannel: "", + TaskID: statsTaskID, + Version: 1, + NodeID: 1, + State: indexpb.JobState_JobStateInProgress, + FailReason: "", + TargetSegmentID: 2000, + SubJobType: indexpb.StatsSubJob_Sort, + CanRecycle: false, + }, + }, + })) + compactionHandler := NewMockCompactionPlanContext(s.T()) + compactionHandler.EXPECT().checkAndSetSegmentStating(mock.Anything, mock.Anything).Return(false).Maybe() + mt.segments.segments[1000].isCompacting = false + scheduler := newTaskScheduler(context.Background(), mt, workerManager, nil, nil, handler, nil, compactionHandler) + s.NotNil(scheduler) + s.False(mt.segments.segments[1000].isCompacting) + task, ok := scheduler.tasks[statsTaskID] + s.True(ok) + s.Equal(indexpb.JobState_JobStateFailed, task.GetState()) + }) +} diff --git a/internal/datacoord/task_stats.go b/internal/datacoord/task_stats.go index 1b316ffcee..e67d3c954a 100644 --- a/internal/datacoord/task_stats.go +++ b/internal/datacoord/task_stats.go @@ -79,6 +79,7 @@ func (st *statsTask) ResetTask(mt *meta) { // reset isCompacting mt.SetSegmentsCompacting(context.TODO(), []UniqueID{st.segmentID}, false) + mt.SetSegmentStating(st.segmentID, false) } func (st *statsTask) SetQueueTime(t time.Time) { @@ -127,15 +128,26 @@ func (st *statsTask) GetFailReason() string { return st.taskInfo.GetFailReason() } -func (st *statsTask) UpdateVersion(ctx context.Context, nodeID int64, meta *meta) error { +func (st *statsTask) UpdateVersion(ctx context.Context, nodeID int64, meta *meta, compactionHandler compactionPlanContext) error { // mark compacting if exist, canDo := meta.CheckAndSetSegmentsCompacting(ctx, []UniqueID{st.segmentID}); !exist || !canDo { log.Warn("segment is not exist or is compacting, skip stats", zap.Bool("exist", exist), zap.Bool("canDo", canDo)) - st.SetState(indexpb.JobState_JobStateNone, "segment is not healthy") + st.SetState(indexpb.JobState_JobStateFailed, "segment is not healthy") + st.SetStartTime(time.Now()) return fmt.Errorf("mark segment compacting failed, isCompacting: %v", !canDo) } + if !compactionHandler.checkAndSetSegmentStating(st.req.GetInsertChannel(), st.segmentID) { + log.Warn("segment is contains by l0 compaction, skip stats", zap.Int64("taskID", st.taskID), + zap.Int64("segmentID", st.segmentID)) + st.SetState(indexpb.JobState_JobStateFailed, "segment is contains by l0 compaction") + //reset compacting + meta.SetSegmentsCompacting(ctx, []UniqueID{st.segmentID}, false) + st.SetStartTime(time.Now()) + return fmt.Errorf("segment is contains by l0 compaction") + } + if err := meta.statsTaskMeta.UpdateVersion(st.taskID, nodeID); err != nil { return err } @@ -194,8 +206,6 @@ func (st *statsTask) PreCheck(ctx context.Context, dependency *taskScheduler) bo PartitionID: segment.GetPartitionID(), InsertChannel: segment.GetInsertChannel(), SegmentID: segment.GetID(), - InsertLogs: segment.GetBinlogs(), - DeltaLogs: segment.GetDeltalogs(), StorageConfig: createStorageConfig(), Schema: collInfo.Schema, SubJobType: st.subJobType, @@ -211,7 +221,19 @@ func (st *statsTask) PreCheck(ctx context.Context, dependency *taskScheduler) bo return true } -func (st *statsTask) AssignTask(ctx context.Context, client types.IndexNodeClient) bool { +func (st *statsTask) AssignTask(ctx context.Context, client types.IndexNodeClient, meta *meta) bool { + segment := meta.GetHealthySegment(ctx, st.segmentID) + if segment == nil { + log.Ctx(ctx).Warn("segment is node healthy, skip stats") + // need to set retry and reset compacting + st.SetState(indexpb.JobState_JobStateRetry, "segment is not healthy") + return false + } + + // Set InsertLogs and DeltaLogs before execution, and wait for the L0 compaction containing the segment to complete + st.req.InsertLogs = segment.GetBinlogs() + st.req.DeltaLogs = segment.GetDeltalogs() + ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval) defer cancel() resp, err := client.CreateJobV2(ctx, &workerpb.CreateJobV2Request{ diff --git a/internal/datacoord/task_stats_test.go b/internal/datacoord/task_stats_test.go index 93ba3b1d72..252402d87e 100644 --- a/internal/datacoord/task_stats_test.go +++ b/internal/datacoord/task_stats_test.go @@ -166,21 +166,33 @@ func (s *statsTaskSuite) TestTaskStats_PreCheck() { s.Run("segment is compacting", func() { s.mt.segments.segments[s.segID].isCompacting = true - s.Error(st.UpdateVersion(context.Background(), 1, s.mt)) + s.Error(st.UpdateVersion(context.Background(), 1, s.mt, nil)) + }) + + s.Run("segment is in l0 compaction", func() { + s.mt.segments.segments[s.segID].isCompacting = false + compactionHandler := NewMockCompactionPlanContext(s.T()) + compactionHandler.EXPECT().checkAndSetSegmentStating(mock.Anything, mock.Anything).Return(false) + s.Error(st.UpdateVersion(context.Background(), 1, s.mt, compactionHandler)) + s.False(s.mt.segments.segments[s.segID].isCompacting) }) s.Run("normal case", func() { s.mt.segments.segments[s.segID].isCompacting = false + compactionHandler := NewMockCompactionPlanContext(s.T()) + compactionHandler.EXPECT().checkAndSetSegmentStating(mock.Anything, mock.Anything).Return(true) catalog.EXPECT().SaveStatsTask(mock.Anything, mock.Anything).Return(nil).Once() - s.NoError(st.UpdateVersion(context.Background(), 1, s.mt)) + s.NoError(st.UpdateVersion(context.Background(), 1, s.mt, compactionHandler)) }) s.Run("failed case", func() { s.mt.segments.segments[s.segID].isCompacting = false + compactionHandler := NewMockCompactionPlanContext(s.T()) + compactionHandler.EXPECT().checkAndSetSegmentStating(mock.Anything, mock.Anything).Return(true) catalog.EXPECT().SaveStatsTask(mock.Anything, mock.Anything).Return(fmt.Errorf("error")).Once() - s.Error(st.UpdateVersion(context.Background(), 1, s.mt)) + s.Error(st.UpdateVersion(context.Background(), 1, s.mt, compactionHandler)) }) }) @@ -365,7 +377,20 @@ func (s *statsTaskSuite) TestTaskStats_PreCheck() { Reason: "mock error", }, nil) - s.False(st.AssignTask(context.Background(), in)) + mt := &meta{ + segments: &SegmentsInfo{ + segments: map[int64]*SegmentInfo{ + st.segmentID: { + SegmentInfo: &datapb.SegmentInfo{ + ID: st.segmentID, + State: commonpb.SegmentState_Flushed, + }, + }, + }, + }, + } + + s.False(st.AssignTask(context.Background(), in, mt)) }) s.Run("assign success", func() { @@ -375,7 +400,20 @@ func (s *statsTaskSuite) TestTaskStats_PreCheck() { Reason: "", }, nil) - s.True(st.AssignTask(context.Background(), in)) + mt := &meta{ + segments: &SegmentsInfo{ + segments: map[int64]*SegmentInfo{ + st.segmentID: { + SegmentInfo: &datapb.SegmentInfo{ + ID: st.segmentID, + State: commonpb.SegmentState_Flushed, + }, + }, + }, + }, + } + + s.True(st.AssignTask(context.Background(), in, mt)) }) }) diff --git a/internal/datacoord/types.go b/internal/datacoord/types.go index 89215eb3cf..e3a14cba79 100644 --- a/internal/datacoord/types.go +++ b/internal/datacoord/types.go @@ -33,9 +33,9 @@ type Task interface { SetState(state indexpb.JobState, failReason string) GetState() indexpb.JobState GetFailReason() string - UpdateVersion(ctx context.Context, nodeID int64, meta *meta) error + UpdateVersion(ctx context.Context, nodeID int64, meta *meta, compactionHandler compactionPlanContext) error UpdateMetaBuildingState(meta *meta) error - AssignTask(ctx context.Context, client types.IndexNodeClient) bool + AssignTask(ctx context.Context, client types.IndexNodeClient, meta *meta) bool QueryResult(ctx context.Context, client types.IndexNodeClient) DropTaskOnWorker(ctx context.Context, client types.IndexNodeClient) bool SetJobInfo(meta *meta) error