mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
fix: [2.5]Fix target segment marked dropped for save stats result twice (#45480)
issue: #45477 master pr: #45478 --------- Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
parent
1d6786545b
commit
6eb77ddc4d
@ -89,7 +89,7 @@ pipeline {
|
|||||||
axes {
|
axes {
|
||||||
axis {
|
axis {
|
||||||
name 'milvus_deployment_option'
|
name 'milvus_deployment_option'
|
||||||
values 'standalone', 'distributed', 'standalone-kafka-mmap'
|
values 'standalone', 'distributed'
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
stages {
|
stages {
|
||||||
|
|||||||
@ -360,7 +360,10 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) error {
|
|||||||
coll, err := t.getCollection(group.collectionID)
|
coll, err := t.getCollection(group.collectionID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("get collection info failed, skip handling compaction", zap.Error(err))
|
log.Warn("get collection info failed, skip handling compaction", zap.Error(err))
|
||||||
return err
|
if signal.collectionID != 0 {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if !signal.isForce && !isCollectionAutoCompactionEnabled(coll) {
|
if !signal.isForce && !isCollectionAutoCompactionEnabled(coll) {
|
||||||
|
|||||||
@ -2117,8 +2117,8 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats
|
|||||||
metricMutation := &segMetricMutation{stateChange: make(map[string]map[string]map[string]int)}
|
metricMutation := &segMetricMutation{stateChange: make(map[string]map[string]map[string]int)}
|
||||||
|
|
||||||
oldSegment := m.segments.GetSegment(oldSegmentID)
|
oldSegment := m.segments.GetSegment(oldSegmentID)
|
||||||
if oldSegment == nil {
|
if oldSegment == nil || !isSegmentHealthy(oldSegment) {
|
||||||
log.Warn("old segment is not found with stats task")
|
log.Warn("old segment is not found or not healthy with stats task")
|
||||||
return nil, merr.WrapErrSegmentNotFound(oldSegmentID)
|
return nil, merr.WrapErrSegmentNotFound(oldSegmentID)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2135,16 +2135,17 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats
|
|||||||
}
|
}
|
||||||
|
|
||||||
segmentInfo := &datapb.SegmentInfo{
|
segmentInfo := &datapb.SegmentInfo{
|
||||||
CollectionID: oldSegment.GetCollectionID(),
|
CollectionID: oldSegment.GetCollectionID(),
|
||||||
PartitionID: oldSegment.GetPartitionID(),
|
PartitionID: oldSegment.GetPartitionID(),
|
||||||
InsertChannel: oldSegment.GetInsertChannel(),
|
InsertChannel: oldSegment.GetInsertChannel(),
|
||||||
MaxRowNum: oldSegment.GetMaxRowNum(),
|
MaxRowNum: oldSegment.GetMaxRowNum(),
|
||||||
LastExpireTime: oldSegment.GetLastExpireTime(),
|
LastExpireTime: oldSegment.GetLastExpireTime(),
|
||||||
StartPosition: oldSegment.GetStartPosition(),
|
StartPosition: oldSegment.GetStartPosition(),
|
||||||
DmlPosition: oldSegment.GetDmlPosition(),
|
DmlPosition: oldSegment.GetDmlPosition(),
|
||||||
IsImporting: oldSegment.GetIsImporting(),
|
IsImporting: oldSegment.GetIsImporting(),
|
||||||
StorageVersion: oldSegment.GetStorageVersion(),
|
StorageVersion: oldSegment.GetStorageVersion(),
|
||||||
State: oldSegment.GetState(),
|
// only flushed segment can do sort stats
|
||||||
|
State: commonpb.SegmentState_Flushed,
|
||||||
Level: oldSegment.GetLevel(),
|
Level: oldSegment.GetLevel(),
|
||||||
LastLevel: oldSegment.GetLastLevel(),
|
LastLevel: oldSegment.GetLastLevel(),
|
||||||
PartitionStatsVersion: oldSegment.GetPartitionStatsVersion(),
|
PartitionStatsVersion: oldSegment.GetPartitionStatsVersion(),
|
||||||
|
|||||||
@ -581,7 +581,9 @@ func (s *taskScheduler) processFinished(task Task) bool {
|
|||||||
client, exist := s.nodeManager.GetClientByID(task.GetNodeID())
|
client, exist := s.nodeManager.GetClientByID(task.GetNodeID())
|
||||||
if exist {
|
if exist {
|
||||||
if !task.DropTaskOnWorker(s.ctx, client) {
|
if !task.DropTaskOnWorker(s.ctx, client) {
|
||||||
return false
|
log.Ctx(s.ctx).Warn("drop task on worker failed, but ignore it",
|
||||||
|
zap.Int64("taskID", task.GetTaskID()), zap.Int64("nodeID", task.GetNodeID()))
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.Ctx(s.ctx).Info("task has been finished", zap.Int64("taskID", task.GetTaskID()),
|
log.Ctx(s.ctx).Info("task has been finished", zap.Int64("taskID", task.GetTaskID()),
|
||||||
|
|||||||
@ -1301,16 +1301,11 @@ func (s *taskSchedulerSuite) Test_analyzeTaskFailCase() {
|
|||||||
// set job info failed --> state: Finished
|
// set job info failed --> state: Finished
|
||||||
catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(errors.New("set job info failed")).Once()
|
catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(errors.New("set job info failed")).Once()
|
||||||
|
|
||||||
// set job success, drop job on task failed --> state: Finished
|
// set job success, drop job on task failed --> state: Finished (skip drop error, task success)
|
||||||
catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(nil).Once()
|
catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(nil).Once()
|
||||||
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
|
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
|
||||||
in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(merr.Status(errors.New("drop job failed")), nil).Once()
|
in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(merr.Status(errors.New("drop job failed")), nil).Once()
|
||||||
|
|
||||||
// drop job success --> no task
|
|
||||||
catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(nil).Once()
|
|
||||||
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
|
|
||||||
in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once()
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
if scheduler.pendingTasks.TaskCount() == 0 {
|
if scheduler.pendingTasks.TaskCount() == 0 {
|
||||||
taskNum := scheduler.runningTasks.Len()
|
taskNum := scheduler.runningTasks.Len()
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user