fix: Only mark segment compacting for sort stats task (#42516)

issue: #42506

---------

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
cai.zhang 2025-06-04 22:46:32 +08:00 committed by GitHub
parent a3612d2728
commit 43c99a2c49
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 45 additions and 27 deletions

View File

@ -121,7 +121,7 @@ func (policy *clusteringCompactionPolicy) triggerOneCollection(ctx context.Conte
partSegments := GetSegmentsChanPart(policy.meta, collectionID, SegmentFilterFunc(func(segment *SegmentInfo) bool {
return isSegmentHealthy(segment) &&
isFlush(segment) &&
isFlushed(segment) &&
!segment.isCompacting && // not compacting now
!segment.GetIsImporting() && // not importing now
segment.GetLevel() != datapb.SegmentLevel_L0 && // ignore level zero segments

View File

@ -88,7 +88,7 @@ func (policy *singleCompactionPolicy) triggerOneCollection(ctx context.Context,
partSegments := GetSegmentsChanPart(policy.meta, collectionID, SegmentFilterFunc(func(segment *SegmentInfo) bool {
return isSegmentHealthy(segment) &&
isFlush(segment) &&
isFlushed(segment) &&
!segment.isCompacting && // not compacting now
!segment.GetIsImporting() && // not importing now
segment.GetLevel() == datapb.SegmentLevel_L2 && // only support L2 for now

View File

@ -544,7 +544,7 @@ func (t *compactionTrigger) getCandidates(signal *compactionSignal) ([]chanPartS
filters := []SegmentFilter{
SegmentFilterFunc(func(segment *SegmentInfo) bool {
return isSegmentHealthy(segment) &&
isFlush(segment) &&
isFlushed(segment) &&
!segment.isCompacting && // not compacting now
!segment.GetIsImporting() && // not importing now
segment.GetLevel() != datapb.SegmentLevel_L0 && // ignore level zero segments
@ -766,6 +766,10 @@ func (t *compactionTrigger) ShouldRebuildSegmentIndex(segment *SegmentInfo) bool
return false
}
func isFlushed(segment *SegmentInfo) bool {
return segment.GetState() == commonpb.SegmentState_Flushed
}
func isFlush(segment *SegmentInfo) bool {
return segment.GetState() == commonpb.SegmentState_Flushed || segment.GetState() == commonpb.SegmentState_Flushing
}

View File

@ -1936,7 +1936,7 @@ func (m *meta) GcConfirm(ctx context.Context, collectionID, partitionID UniqueID
func (m *meta) GetCompactableSegmentGroupByCollection() map[int64][]*SegmentInfo {
allSegs := m.SelectSegments(m.ctx, SegmentFilterFunc(func(segment *SegmentInfo) bool {
return isSegmentHealthy(segment) &&
isFlush(segment) && // sealed segment
isFlushed(segment) && // sealed segment
!segment.isCompacting && // not compacting now
!segment.GetIsImporting() // not importing now
}))

View File

@ -127,8 +127,10 @@ func (st *statsTask) UpdateTaskVersion(nodeID int64) error {
func (st *statsTask) resetTask(ctx context.Context, reason string) {
// reset isCompacting
st.meta.SetSegmentsCompacting(ctx, []UniqueID{st.GetSegmentID()}, false)
st.meta.SetSegmentStating(st.GetSegmentID(), false)
if st.GetSubJobType() == indexpb.StatsSubJob_Sort {
st.meta.SetSegmentsCompacting(ctx, []UniqueID{st.GetSegmentID()}, false)
st.meta.SetSegmentStating(st.GetSegmentID(), false)
}
// reset state to init
st.UpdateStateWithMeta(indexpb.JobState_JobStateInit, reason)
@ -153,24 +155,26 @@ func (st *statsTask) CreateTaskOnWorker(nodeID int64, cluster session.Cluster) {
)
// Check segment compaction state
if exist, canCompact := st.meta.CheckAndSetSegmentsCompacting(ctx, []UniqueID{st.GetSegmentID()}); !exist || !canCompact {
log.Warn("segment is not exist or is compacting, skip stats and remove stats task",
zap.Bool("exist", exist), zap.Bool("canCompact", canCompact))
if st.GetSubJobType() == indexpb.StatsSubJob_Sort {
if exist, canCompact := st.meta.CheckAndSetSegmentsCompacting(ctx, []UniqueID{st.GetSegmentID()}); !exist || !canCompact {
log.Warn("segment is not exist or is compacting, skip stats and remove stats task",
zap.Bool("exist", exist), zap.Bool("canCompact", canCompact))
if err := st.meta.statsTaskMeta.DropStatsTask(ctx, st.GetTaskID()); err != nil {
log.Warn("remove stats task failed, will retry later", zap.Error(err))
if err := st.meta.statsTaskMeta.DropStatsTask(ctx, st.GetTaskID()); err != nil {
log.Warn("remove stats task failed, will retry later", zap.Error(err))
return
}
st.SetState(indexpb.JobState_JobStateNone, "segment is not exist or is compacting")
return
}
st.SetState(indexpb.JobState_JobStateNone, "segment is not exist or is compacting")
return
}
// Check if segment is part of L0 compaction
if !st.compactionInspector.checkAndSetSegmentStating(st.GetInsertChannel(), st.GetSegmentID()) {
log.Warn("segment is contained by L0 compaction, skipping stats task")
// Reset isCompacting flag
st.meta.SetSegmentsCompacting(ctx, []UniqueID{st.GetSegmentID()}, false)
return
// Check if segment is part of L0 compaction
if !st.compactionInspector.checkAndSetSegmentStating(st.GetInsertChannel(), st.GetSegmentID()) {
log.Warn("segment is contained by L0 compaction, skipping stats task")
// Reset isCompacting flag
st.meta.SetSegmentsCompacting(ctx, []UniqueID{st.GetSegmentID()}, false)
return
}
}
var err error
@ -383,14 +387,16 @@ func (st *statsTask) prepareJobRequest(ctx context.Context, segment *SegmentInfo
}
func (st *statsTask) SetJobInfo(ctx context.Context, result *workerpb.StatsResult) error {
var err error
switch st.GetSubJobType() {
case indexpb.StatsSubJob_Sort:
// first update segment, failed state cannot generate new segment
metricMutation, err := st.meta.SaveStatsResultSegment(st.GetSegmentID(), result)
var metricMutation *segMetricMutation
metricMutation, err = st.meta.SaveStatsResultSegment(st.GetSegmentID(), result)
if err != nil {
log.Ctx(ctx).Warn("save sort stats result failed", zap.Int64("taskID", st.GetTaskID()),
zap.Int64("segmentID", st.GetSegmentID()), zap.Error(err))
return err
break
}
metricMutation.commit()
@ -399,25 +405,33 @@ func (st *statsTask) SetJobInfo(ctx context.Context, result *workerpb.StatsResul
default:
}
case indexpb.StatsSubJob_TextIndexJob:
err := st.meta.UpdateSegment(st.GetSegmentID(), SetTextIndexLogs(result.GetTextStatsLogs()))
err = st.meta.UpdateSegment(st.GetSegmentID(), SetTextIndexLogs(result.GetTextStatsLogs()))
if err != nil {
log.Ctx(ctx).Warn("save text index stats result failed", zap.Int64("taskID", st.GetTaskID()),
zap.Int64("segmentID", st.GetSegmentID()), zap.Error(err))
return err
break
}
case indexpb.StatsSubJob_JsonKeyIndexJob:
err := st.meta.UpdateSegment(st.GetSegmentID(), SetJsonKeyIndexLogs(result.GetJsonKeyStatsLogs()))
err = st.meta.UpdateSegment(st.GetSegmentID(), SetJsonKeyIndexLogs(result.GetJsonKeyStatsLogs()))
if err != nil {
log.Ctx(ctx).Warn("save json key index stats result failed", zap.Int64("taskId", st.GetTaskID()),
zap.Int64("segmentID", st.GetSegmentID()), zap.Error(err))
return err
break
}
case indexpb.StatsSubJob_BM25Job:
// bm25 logs are generated during with segment flush.
}
// if segment is not found, it means the segment is already dropped,
// so we can ignore the error and mark task as finished.
if err != nil && !errors.Is(err, merr.ErrSegmentNotFound) {
return err
}
// Reset isCompacting flag after stats task is finished
st.meta.SetSegmentsCompacting(ctx, []UniqueID{st.GetSegmentID()}, false)
if st.GetSubJobType() == indexpb.StatsSubJob_Sort {
st.meta.SetSegmentsCompacting(ctx, []UniqueID{st.GetSegmentID()}, false)
st.meta.SetSegmentStating(st.GetSegmentID(), false)
}
log.Ctx(ctx).Info("SetJobInfo for stats task success", zap.Int64("taskID", st.GetTaskID()),
zap.Int64("oldSegmentID", st.GetSegmentID()), zap.Int64("targetSegmentID", st.GetTargetSegmentID()),
zap.String("subJobType", st.GetSubJobType().String()), zap.String("state", st.GetState().String()))