From 43c99a2c49847aafcd457cb0fff36c44d279ff48 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Wed, 4 Jun 2025 22:46:32 +0800 Subject: [PATCH] fix: Only mark segment compacting for sort stats task (#42516) issue: #42506 --------- Signed-off-by: Cai Zhang --- .../datacoord/compaction_policy_clustering.go | 2 +- .../datacoord/compaction_policy_single.go | 2 +- internal/datacoord/compaction_trigger.go | 6 +- internal/datacoord/meta.go | 2 +- internal/datacoord/task_stats.go | 60 ++++++++++++------- 5 files changed, 45 insertions(+), 27 deletions(-) diff --git a/internal/datacoord/compaction_policy_clustering.go b/internal/datacoord/compaction_policy_clustering.go index 437700b10f..a44c90678a 100644 --- a/internal/datacoord/compaction_policy_clustering.go +++ b/internal/datacoord/compaction_policy_clustering.go @@ -121,7 +121,7 @@ func (policy *clusteringCompactionPolicy) triggerOneCollection(ctx context.Conte partSegments := GetSegmentsChanPart(policy.meta, collectionID, SegmentFilterFunc(func(segment *SegmentInfo) bool { return isSegmentHealthy(segment) && - isFlush(segment) && + isFlushed(segment) && !segment.isCompacting && // not compacting now !segment.GetIsImporting() && // not importing now segment.GetLevel() != datapb.SegmentLevel_L0 && // ignore level zero segments diff --git a/internal/datacoord/compaction_policy_single.go b/internal/datacoord/compaction_policy_single.go index 2a78c5a572..a3fa6852e2 100644 --- a/internal/datacoord/compaction_policy_single.go +++ b/internal/datacoord/compaction_policy_single.go @@ -88,7 +88,7 @@ func (policy *singleCompactionPolicy) triggerOneCollection(ctx context.Context, partSegments := GetSegmentsChanPart(policy.meta, collectionID, SegmentFilterFunc(func(segment *SegmentInfo) bool { return isSegmentHealthy(segment) && - isFlush(segment) && + isFlushed(segment) && !segment.isCompacting && // not compacting now !segment.GetIsImporting() && // not importing now segment.GetLevel() == datapb.SegmentLevel_L2 && // only support L2 for now diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index a43c78e656..714d96f251 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -544,7 +544,7 @@ func (t *compactionTrigger) getCandidates(signal *compactionSignal) ([]chanPartS filters := []SegmentFilter{ SegmentFilterFunc(func(segment *SegmentInfo) bool { return isSegmentHealthy(segment) && - isFlush(segment) && + isFlushed(segment) && !segment.isCompacting && // not compacting now !segment.GetIsImporting() && // not importing now segment.GetLevel() != datapb.SegmentLevel_L0 && // ignore level zero segments @@ -766,6 +766,10 @@ func (t *compactionTrigger) ShouldRebuildSegmentIndex(segment *SegmentInfo) bool return false } +func isFlushed(segment *SegmentInfo) bool { + return segment.GetState() == commonpb.SegmentState_Flushed +} + func isFlush(segment *SegmentInfo) bool { return segment.GetState() == commonpb.SegmentState_Flushed || segment.GetState() == commonpb.SegmentState_Flushing } diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index c7efc802f7..0658cc2885 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -1936,7 +1936,7 @@ func (m *meta) GcConfirm(ctx context.Context, collectionID, partitionID UniqueID func (m *meta) GetCompactableSegmentGroupByCollection() map[int64][]*SegmentInfo { allSegs := m.SelectSegments(m.ctx, SegmentFilterFunc(func(segment *SegmentInfo) bool { return isSegmentHealthy(segment) && - isFlush(segment) && // sealed segment + isFlushed(segment) && // sealed segment !segment.isCompacting && // not compacting now !segment.GetIsImporting() // not importing now })) diff --git a/internal/datacoord/task_stats.go b/internal/datacoord/task_stats.go index 6766dcb4b0..e4bc1db690 100644 --- a/internal/datacoord/task_stats.go +++ b/internal/datacoord/task_stats.go @@ -127,8 +127,10 @@ func (st *statsTask) UpdateTaskVersion(nodeID int64) error { func (st *statsTask) resetTask(ctx context.Context, reason string) { // reset isCompacting - st.meta.SetSegmentsCompacting(ctx, []UniqueID{st.GetSegmentID()}, false) - st.meta.SetSegmentStating(st.GetSegmentID(), false) + if st.GetSubJobType() == indexpb.StatsSubJob_Sort { + st.meta.SetSegmentsCompacting(ctx, []UniqueID{st.GetSegmentID()}, false) + st.meta.SetSegmentStating(st.GetSegmentID(), false) + } // reset state to init st.UpdateStateWithMeta(indexpb.JobState_JobStateInit, reason) @@ -153,24 +155,26 @@ func (st *statsTask) CreateTaskOnWorker(nodeID int64, cluster session.Cluster) { ) // Check segment compaction state - if exist, canCompact := st.meta.CheckAndSetSegmentsCompacting(ctx, []UniqueID{st.GetSegmentID()}); !exist || !canCompact { - log.Warn("segment is not exist or is compacting, skip stats and remove stats task", - zap.Bool("exist", exist), zap.Bool("canCompact", canCompact)) + if st.GetSubJobType() == indexpb.StatsSubJob_Sort { + if exist, canCompact := st.meta.CheckAndSetSegmentsCompacting(ctx, []UniqueID{st.GetSegmentID()}); !exist || !canCompact { + log.Warn("segment is not exist or is compacting, skip stats and remove stats task", + zap.Bool("exist", exist), zap.Bool("canCompact", canCompact)) - if err := st.meta.statsTaskMeta.DropStatsTask(ctx, st.GetTaskID()); err != nil { - log.Warn("remove stats task failed, will retry later", zap.Error(err)) + if err := st.meta.statsTaskMeta.DropStatsTask(ctx, st.GetTaskID()); err != nil { + log.Warn("remove stats task failed, will retry later", zap.Error(err)) + return + } + st.SetState(indexpb.JobState_JobStateNone, "segment is not exist or is compacting") return } - st.SetState(indexpb.JobState_JobStateNone, "segment is not exist or is compacting") - return - } - // Check if segment is part of L0 compaction - if !st.compactionInspector.checkAndSetSegmentStating(st.GetInsertChannel(), st.GetSegmentID()) { - log.Warn("segment is contained by L0 compaction, skipping stats task") - // Reset isCompacting flag - st.meta.SetSegmentsCompacting(ctx, []UniqueID{st.GetSegmentID()}, false) - return + // Check if segment is part of L0 compaction + if !st.compactionInspector.checkAndSetSegmentStating(st.GetInsertChannel(), st.GetSegmentID()) { + log.Warn("segment is contained by L0 compaction, skipping stats task") + // Reset isCompacting flag + st.meta.SetSegmentsCompacting(ctx, []UniqueID{st.GetSegmentID()}, false) + return + } } var err error @@ -383,14 +387,16 @@ func (st *statsTask) prepareJobRequest(ctx context.Context, segment *SegmentInfo } func (st *statsTask) SetJobInfo(ctx context.Context, result *workerpb.StatsResult) error { + var err error switch st.GetSubJobType() { case indexpb.StatsSubJob_Sort: // first update segment, failed state cannot generate new segment - metricMutation, err := st.meta.SaveStatsResultSegment(st.GetSegmentID(), result) + var metricMutation *segMetricMutation + metricMutation, err = st.meta.SaveStatsResultSegment(st.GetSegmentID(), result) if err != nil { log.Ctx(ctx).Warn("save sort stats result failed", zap.Int64("taskID", st.GetTaskID()), zap.Int64("segmentID", st.GetSegmentID()), zap.Error(err)) - return err + break } metricMutation.commit() @@ -399,25 +405,33 @@ func (st *statsTask) SetJobInfo(ctx context.Context, result *workerpb.StatsResul default: } case indexpb.StatsSubJob_TextIndexJob: - err := st.meta.UpdateSegment(st.GetSegmentID(), SetTextIndexLogs(result.GetTextStatsLogs())) + err = st.meta.UpdateSegment(st.GetSegmentID(), SetTextIndexLogs(result.GetTextStatsLogs())) if err != nil { log.Ctx(ctx).Warn("save text index stats result failed", zap.Int64("taskID", st.GetTaskID()), zap.Int64("segmentID", st.GetSegmentID()), zap.Error(err)) - return err + break } case indexpb.StatsSubJob_JsonKeyIndexJob: - err := st.meta.UpdateSegment(st.GetSegmentID(), SetJsonKeyIndexLogs(result.GetJsonKeyStatsLogs())) + err = st.meta.UpdateSegment(st.GetSegmentID(), SetJsonKeyIndexLogs(result.GetJsonKeyStatsLogs())) if err != nil { log.Ctx(ctx).Warn("save json key index stats result failed", zap.Int64("taskId", st.GetTaskID()), zap.Int64("segmentID", st.GetSegmentID()), zap.Error(err)) - return err + break } case indexpb.StatsSubJob_BM25Job: // bm25 logs are generated during with segment flush. } + // if segment is not found, it means the segment is already dropped, + // so we can ignore the error and mark task as finished. + if err != nil && !errors.Is(err, merr.ErrSegmentNotFound) { + return err + } // Reset isCompacting flag after stats task is finished - st.meta.SetSegmentsCompacting(ctx, []UniqueID{st.GetSegmentID()}, false) + if st.GetSubJobType() == indexpb.StatsSubJob_Sort { + st.meta.SetSegmentsCompacting(ctx, []UniqueID{st.GetSegmentID()}, false) + st.meta.SetSegmentStating(st.GetSegmentID(), false) + } log.Ctx(ctx).Info("SetJobInfo for stats task success", zap.Int64("taskID", st.GetTaskID()), zap.Int64("oldSegmentID", st.GetSegmentID()), zap.Int64("targetSegmentID", st.GetTargetSegmentID()), zap.String("subJobType", st.GetSubJobType().String()), zap.String("state", st.GetState().String()))