diff --git a/internal/datacoord/job_manager.go b/internal/datacoord/job_manager.go index 6c5faa64f5..67c5818473 100644 --- a/internal/datacoord/job_manager.go +++ b/internal/datacoord/job_manager.go @@ -321,7 +321,7 @@ func (jm *statsJobManager) SubmitStatsTask(originSegmentID, targetSegmentID int6 } if err = jm.mt.statsTaskMeta.AddStatsTask(t); err != nil { if errors.Is(err, merr.ErrTaskDuplicate) { - log.RatedInfo(10, "stats task already exists", zap.Int64("taskID", taskID), + log.Ctx(jm.ctx).WithRateGroup("job_manager", 1, 60).RatedInfo(10, "stats task already exists", zap.Int64("taskID", taskID), zap.Int64("collectionID", originSegment.GetCollectionID()), zap.Int64("segmentID", originSegment.GetID())) return nil diff --git a/internal/datacoord/task_stats.go b/internal/datacoord/task_stats.go index 0f27a09b0b..429f40e601 100644 --- a/internal/datacoord/task_stats.go +++ b/internal/datacoord/task_stats.go @@ -143,6 +143,7 @@ func (st *statsTask) UpdateVersion(ctx context.Context, nodeID int64, meta *meta if exist, canDo := meta.CheckAndSetSegmentsCompacting(ctx, []UniqueID{st.segmentID}); !exist || !canDo { log.Warn("segment is not exist or is compacting, skip stats", zap.Bool("exist", exist), zap.Bool("canDo", canDo)) + // Fail stats task if segment is compacting, it's ok because the segment will be dropped after the compaction. st.SetState(indexpb.JobState_JobStateFailed, "segment is not healthy") st.SetStartTime(time.Now()) return fmt.Errorf("mark segment compacting failed, isCompacting: %v", !canDo) @@ -151,10 +152,10 @@ func (st *statsTask) UpdateVersion(ctx context.Context, nodeID int64, meta *meta if !compactionHandler.checkAndSetSegmentStating(st.req.GetInsertChannel(), st.segmentID) { log.Warn("segment is contains by l0 compaction, skip stats", zap.Int64("taskID", st.taskID), zap.Int64("segmentID", st.segmentID)) - st.SetState(indexpb.JobState_JobStateFailed, "segment is contains by l0 compaction") // reset compacting meta.SetSegmentsCompacting(ctx, []UniqueID{st.segmentID}, false) st.SetStartTime(time.Now()) + // Return err and keep task state as init to trigger retry. return errors.New("segment is contains by l0 compaction") } }