mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
When l0 compaction is executing, do not mark the stats task as failed; keep it in the init state to allow retry. issue: https://github.com/milvus-io/milvus/issues/43039 pr: https://github.com/milvus-io/milvus/pull/43061 Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
parent
cf15decf42
commit
7fe1826eb9
@ -321,7 +321,7 @@ func (jm *statsJobManager) SubmitStatsTask(originSegmentID, targetSegmentID int6
|
||||
}
|
||||
if err = jm.mt.statsTaskMeta.AddStatsTask(t); err != nil {
|
||||
if errors.Is(err, merr.ErrTaskDuplicate) {
|
||||
log.RatedInfo(10, "stats task already exists", zap.Int64("taskID", taskID),
|
||||
log.Ctx(jm.ctx).WithRateGroup("job_manager", 1, 60).RatedInfo(10, "stats task already exists", zap.Int64("taskID", taskID),
|
||||
zap.Int64("collectionID", originSegment.GetCollectionID()),
|
||||
zap.Int64("segmentID", originSegment.GetID()))
|
||||
return nil
|
||||
|
||||
@ -143,6 +143,7 @@ func (st *statsTask) UpdateVersion(ctx context.Context, nodeID int64, meta *meta
|
||||
if exist, canDo := meta.CheckAndSetSegmentsCompacting(ctx, []UniqueID{st.segmentID}); !exist || !canDo {
|
||||
log.Warn("segment is not exist or is compacting, skip stats",
|
||||
zap.Bool("exist", exist), zap.Bool("canDo", canDo))
|
||||
// Fail stats task if segment is compacting, it's ok because the segment will be dropped after the compaction.
|
||||
st.SetState(indexpb.JobState_JobStateFailed, "segment is not healthy")
|
||||
st.SetStartTime(time.Now())
|
||||
return fmt.Errorf("mark segment compacting failed, isCompacting: %v", !canDo)
|
||||
@ -151,10 +152,10 @@ func (st *statsTask) UpdateVersion(ctx context.Context, nodeID int64, meta *meta
|
||||
if !compactionHandler.checkAndSetSegmentStating(st.req.GetInsertChannel(), st.segmentID) {
|
||||
log.Warn("segment is contains by l0 compaction, skip stats", zap.Int64("taskID", st.taskID),
|
||||
zap.Int64("segmentID", st.segmentID))
|
||||
st.SetState(indexpb.JobState_JobStateFailed, "segment is contains by l0 compaction")
|
||||
// reset compacting
|
||||
meta.SetSegmentsCompacting(ctx, []UniqueID{st.segmentID}, false)
|
||||
st.SetStartTime(time.Now())
|
||||
// Return err and keep task state as init to trigger retry.
|
||||
return errors.New("segment is contains by l0 compaction")
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user