From f925fa7661c2c2ac67eca04817a4933453b97d28 Mon Sep 17 00:00:00 2001 From: wei liu Date: Tue, 8 Nov 2022 19:43:05 +0800 Subject: [PATCH] fix check segment stale task (#20401) Signed-off-by: Wei Liu Signed-off-by: Wei Liu --- internal/querycoordv2/task/scheduler.go | 24 +++++++++++++++++---- internal/querycoordv2/task/task_test.go | 8 ++++--- internal/util/paramtable/component_param.go | 12 +---------- 3 files changed, 26 insertions(+), 18 deletions(-) diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index 4b4526bc80..df7c02dfc5 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -25,6 +25,7 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/session" @@ -475,7 +476,13 @@ func (scheduler *taskScheduler) isRelated(task Task, node int64) bool { return true } if task, ok := task.(*SegmentTask); ok { - segment := scheduler.targetMgr.GetHistoricalSegment(task.CollectionID(), task.SegmentID(), meta.NextTarget) + taskType := GetTaskType(task) + var segment *datapb.SegmentInfo + if taskType == TaskTypeMove { + segment = scheduler.targetMgr.GetHistoricalSegment(task.CollectionID(), task.SegmentID(), meta.CurrentTarget) + } else { + segment = scheduler.targetMgr.GetHistoricalSegment(task.CollectionID(), task.SegmentID(), meta.NextTarget) + } if segment == nil { continue } @@ -638,12 +645,21 @@ func (scheduler *taskScheduler) checkSegmentTaskStale(task *SegmentTask) bool { for _, action := range task.Actions() { switch action.Type() { case ActionTypeGrow: - segment := scheduler.targetMgr.GetHistoricalSegment(task.CollectionID(), task.SegmentID(), meta.NextTarget) + taskType := GetTaskType(task) + var segment *datapb.SegmentInfo + if taskType == TaskTypeMove { + segment = scheduler.targetMgr.GetHistoricalSegment(task.CollectionID(), task.SegmentID(), meta.CurrentTarget) + } else { + segment = scheduler.targetMgr.GetHistoricalSegment(task.CollectionID(), task.SegmentID(), meta.NextTarget) + } if segment == nil { - log.Warn("task stale due tu the segment to load not exists in targets", - zap.Int64("segment", task.segmentID)) + log.Warn("task stale due to the segment to load not exists in targets", + zap.Int64("segment", task.segmentID), + zap.Int32("taskType", taskType), + ) return true } + replica := scheduler.meta.ReplicaManager.GetByCollectionAndNode(task.CollectionID(), action.Node()) if replica == nil { log.Warn("task stale due to replica not found") diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index 38f2dc38e3..215a62cdd2 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -636,10 +636,11 @@ func (suite *TaskSuite) TestMoveSegmentTask() { suite.cluster.EXPECT().ReleaseSegments(mock.Anything, leader, mock.Anything).Return(utils.WrapStatus(commonpb.ErrorCode_Success, ""), nil) // Test move segment task - suite.dist.ChannelDistManager.Update(leader, meta.DmChannelFromVChannel(&datapb.VchannelInfo{ + vchannel := &datapb.VchannelInfo{ CollectionID: suite.collection, ChannelName: channel.ChannelName, - })) + } + suite.dist.ChannelDistManager.Update(leader, meta.DmChannelFromVChannel(vchannel)) view := &meta.LeaderView{ ID: leader, CollectionID: suite.collection, @@ -672,8 +673,9 @@ func (suite *TaskSuite) TestMoveSegmentTask() { err = suite.scheduler.Add(task) suite.NoError(err) } - suite.broker.EXPECT().GetRecoveryInfo(mock.Anything, suite.collection, int64(1)).Return(nil, segmentInfos, nil) + suite.broker.EXPECT().GetRecoveryInfo(mock.Anything, suite.collection, int64(1)).Return([]*datapb.VchannelInfo{vchannel}, segmentInfos, nil) suite.target.UpdateCollectionNextTargetWithPartitions(suite.collection, int64(1)) + suite.target.UpdateCollectionCurrentTarget(suite.collection, int64(1)) suite.dist.SegmentDistManager.Update(sourceNode, segments...) suite.dist.LeaderViewManager.Update(leader, view) segmentsNum := len(suite.moveSegments) diff --git a/internal/util/paramtable/component_param.go b/internal/util/paramtable/component_param.go index 0ffd4ebf87..4ca77a634c 100644 --- a/internal/util/paramtable/component_param.go +++ b/internal/util/paramtable/component_param.go @@ -698,7 +698,6 @@ func (p *queryCoordConfig) init(base *BaseTable) { p.initSegmentTaskTimeout() p.initDistPullInterval() p.initLoadTimeoutSeconds() - p.initCheckHandoffInterval() p.initEnableActiveStandby() p.initNextTargetSurviveTime() p.initUpdateNextTargetInterval() @@ -812,15 +811,6 @@ func (p *queryCoordConfig) initLoadTimeoutSeconds() { p.LoadTimeoutSeconds = time.Duration(loadTimeout) * time.Second } -func (p *queryCoordConfig) initCheckHandoffInterval() { - interval := p.Base.LoadWithDefault("queryCoord.checkHandoffInterval", "5000") - checkHandoffInterval, err := strconv.ParseInt(interval, 10, 64) - if err != nil { - panic(err) - } - p.CheckHandoffInterval = time.Duration(checkHandoffInterval) * time.Millisecond -} - func (p *queryCoordConfig) initNextTargetSurviveTime() { interval := p.Base.LoadWithDefault("queryCoord.NextTargetSurviveTime", "300") nextTargetSurviveTime, err := strconv.ParseInt(interval, 10, 64) @@ -831,7 +821,7 @@ func (p *queryCoordConfig) initNextTargetSurviveTime() { } func (p *queryCoordConfig) initUpdateNextTargetInterval() { - interval := p.Base.LoadWithDefault("queryCoord.UpdateNextTargetInterval", "30") + interval := p.Base.LoadWithDefault("queryCoord.UpdateNextTargetInterval", "10") updateNextTargetInterval, err := strconv.ParseInt(interval, 10, 64) if err != nil { panic(err)