From 6eb77ddc4d1f478a06c8bb00f3cf3db0a61e0d4f Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Thu, 13 Nov 2025 09:41:37 +0800 Subject: [PATCH] fix: [2.5]Fix target segment marked dropped for save stats result twice (#45480) issue: #45477 master pr: #45478 --------- Signed-off-by: Cai Zhang --- ci/jenkins/PR.groovy | 2 +- internal/datacoord/compaction_trigger.go | 5 ++++- internal/datacoord/meta.go | 25 ++++++++++++----------- internal/datacoord/task_scheduler.go | 4 +++- internal/datacoord/task_scheduler_test.go | 7 +------ 5 files changed, 22 insertions(+), 21 deletions(-) diff --git a/ci/jenkins/PR.groovy b/ci/jenkins/PR.groovy index 4e6fdb29e8..d2db4e680a 100644 --- a/ci/jenkins/PR.groovy +++ b/ci/jenkins/PR.groovy @@ -89,7 +89,7 @@ pipeline { axes { axis { name 'milvus_deployment_option' - values 'standalone', 'distributed', 'standalone-kafka-mmap' + values 'standalone', 'distributed' } } stages { diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 6a91119581..42568a144c 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -360,7 +360,10 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) error { coll, err := t.getCollection(group.collectionID) if err != nil { log.Warn("get collection info failed, skip handling compaction", zap.Error(err)) - return err + if signal.collectionID != 0 { + return err + } + continue } if !signal.isForce && !isCollectionAutoCompactionEnabled(coll) { diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 5af383c0b5..db3e536aa6 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -2117,8 +2117,8 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats metricMutation := &segMetricMutation{stateChange: make(map[string]map[string]map[string]int)} oldSegment := m.segments.GetSegment(oldSegmentID) - if oldSegment == nil { - log.Warn("old segment is not found with stats task") + if oldSegment == nil || !isSegmentHealthy(oldSegment) { + log.Warn("old segment is not found or not healthy with stats task") return nil, merr.WrapErrSegmentNotFound(oldSegmentID) } @@ -2135,16 +2135,17 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats } segmentInfo := &datapb.SegmentInfo{ - CollectionID: oldSegment.GetCollectionID(), - PartitionID: oldSegment.GetPartitionID(), - InsertChannel: oldSegment.GetInsertChannel(), - MaxRowNum: oldSegment.GetMaxRowNum(), - LastExpireTime: oldSegment.GetLastExpireTime(), - StartPosition: oldSegment.GetStartPosition(), - DmlPosition: oldSegment.GetDmlPosition(), - IsImporting: oldSegment.GetIsImporting(), - StorageVersion: oldSegment.GetStorageVersion(), - State: oldSegment.GetState(), + CollectionID: oldSegment.GetCollectionID(), + PartitionID: oldSegment.GetPartitionID(), + InsertChannel: oldSegment.GetInsertChannel(), + MaxRowNum: oldSegment.GetMaxRowNum(), + LastExpireTime: oldSegment.GetLastExpireTime(), + StartPosition: oldSegment.GetStartPosition(), + DmlPosition: oldSegment.GetDmlPosition(), + IsImporting: oldSegment.GetIsImporting(), + StorageVersion: oldSegment.GetStorageVersion(), + // only flushed segment can do sort stats + State: commonpb.SegmentState_Flushed, Level: oldSegment.GetLevel(), LastLevel: oldSegment.GetLastLevel(), PartitionStatsVersion: oldSegment.GetPartitionStatsVersion(), diff --git a/internal/datacoord/task_scheduler.go b/internal/datacoord/task_scheduler.go index 9e492fd721..ba97a3d868 100644 --- a/internal/datacoord/task_scheduler.go +++ b/internal/datacoord/task_scheduler.go @@ -581,7 +581,9 @@ func (s *taskScheduler) processFinished(task Task) bool { client, exist := s.nodeManager.GetClientByID(task.GetNodeID()) if exist { if !task.DropTaskOnWorker(s.ctx, client) { - return false + log.Ctx(s.ctx).Warn("drop task on worker failed, but ignore it", + zap.Int64("taskID", task.GetTaskID()), zap.Int64("nodeID", task.GetNodeID())) + return true } } log.Ctx(s.ctx).Info("task has been finished", zap.Int64("taskID", task.GetTaskID()), diff --git a/internal/datacoord/task_scheduler_test.go b/internal/datacoord/task_scheduler_test.go index 6fe5e48ae3..db856edc52 100644 --- a/internal/datacoord/task_scheduler_test.go +++ b/internal/datacoord/task_scheduler_test.go @@ -1301,16 +1301,11 @@ func (s *taskSchedulerSuite) Test_analyzeTaskFailCase() { // set job info failed --> state: Finished catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(errors.New("set job info failed")).Once() - // set job success, drop job on task failed --> state: Finished + // set job success, drop job on task failed --> state: Finished (skip drop error, task success) catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(nil).Once() workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once() in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(merr.Status(errors.New("drop job failed")), nil).Once() - // drop job success --> no task - catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(nil).Once() - workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once() - in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once() - for { if scheduler.pendingTasks.TaskCount() == 0 { taskNum := scheduler.runningTasks.Len()