mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-02-04 11:18:44 +08:00
Check whether ubsubscribe/release task is stale (#20165)
Signed-off-by: yah01 <yang.cen@zilliz.com> Signed-off-by: yah01 <yang.cen@zilliz.com>
This commit is contained in:
parent
f2f10cb114
commit
0b939c5735
@ -26,6 +26,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
|
||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||
. "github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
@ -644,20 +645,19 @@ func (scheduler *taskScheduler) checkSegmentTaskStale(task *SegmentTask) bool {
|
||||
}
|
||||
|
||||
case ActionTypeReduce:
|
||||
// Do nothing here,
|
||||
// the task should succeeded if the segment not exists
|
||||
// sealed := scheduler.distMgr.SegmentDistManager.GetByNode(action.Node())
|
||||
// growing := scheduler.distMgr.LeaderViewManager.GetSegmentByNode(action.Node())
|
||||
// segments := make([]int64, 0, len(sealed)+len(growing))
|
||||
// for _, segment := range sealed {
|
||||
// segments = append(segments, segment.GetID())
|
||||
// }
|
||||
// segments = append(segments, growing...)
|
||||
// if !funcutil.SliceContain(segments, task.SegmentID()) {
|
||||
// log.Warn("the task is stale, the segment to release not exists in dist",
|
||||
// zap.Int64("segment", task.segmentID))
|
||||
// return true
|
||||
// }
|
||||
sealed := scheduler.distMgr.SegmentDistManager.GetByNode(action.Node())
|
||||
growing := scheduler.distMgr.LeaderViewManager.GetSegmentByNode(action.Node())
|
||||
segments := make([]int64, 0, len(sealed)+len(growing))
|
||||
for _, segment := range sealed {
|
||||
segments = append(segments, segment.GetID())
|
||||
}
|
||||
segments = append(segments, growing...)
|
||||
if !funcutil.SliceContain(segments, task.SegmentID()) {
|
||||
log.Warn("the task is stale, the segment to release not exists in dist",
|
||||
zap.Int64("segment", task.segmentID))
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
@ -679,21 +679,20 @@ func (scheduler *taskScheduler) checkChannelTaskStale(task *ChannelTask) bool {
|
||||
}
|
||||
|
||||
case ActionTypeReduce:
|
||||
// Do nothing here,
|
||||
// the task should succeeded if the channel not exists
|
||||
// hasChannel := false
|
||||
// views := scheduler.distMgr.LeaderViewManager.GetLeaderView(action.Node())
|
||||
// for _, view := range views {
|
||||
// if view.Channel == task.Channel() {
|
||||
// hasChannel = true
|
||||
// break
|
||||
// }
|
||||
// }
|
||||
// if !hasChannel {
|
||||
// log.Warn("the task is stale, the channel to unsubscribe not exists in dist",
|
||||
// zap.String("channel", task.Channel()))
|
||||
// return true
|
||||
// }
|
||||
hasChannel := false
|
||||
views := scheduler.distMgr.LeaderViewManager.GetLeaderView(action.Node())
|
||||
for _, view := range views {
|
||||
if view.Channel == task.Channel() {
|
||||
hasChannel = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !hasChannel {
|
||||
log.Warn("the task is stale, the channel to unsubscribe not exists in dist",
|
||||
zap.String("channel", task.Channel()))
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
|
||||
@ -297,9 +297,14 @@ func (suite *TaskSuite) TestUnsubscribeChannelTask() {
|
||||
suite.dispatchAndWait(targetNode)
|
||||
suite.AssertTaskNum(0, 0, 0, 0)
|
||||
|
||||
for _, task := range tasks {
|
||||
suite.Equal(TaskStatusSucceeded, task.Status())
|
||||
suite.NoError(task.Err())
|
||||
for i, task := range tasks {
|
||||
if i == 0 {
|
||||
suite.Equal(TaskStatusSucceeded, task.Status())
|
||||
suite.NoError(task.Err())
|
||||
} else {
|
||||
suite.Equal(TaskStatusStale, task.Status())
|
||||
suite.Error(task.Err())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -558,25 +563,37 @@ func (suite *TaskSuite) TestReleaseGrowingSegmentTask() {
|
||||
err = suite.scheduler.Add(task)
|
||||
suite.NoError(err)
|
||||
}
|
||||
suite.dist.LeaderViewManager.Update(targetNode, &meta.LeaderView{
|
||||
ID: targetNode,
|
||||
GrowingSegments: typeutil.NewUniqueSet(suite.releaseSegments[1:]...),
|
||||
})
|
||||
|
||||
segmentsNum := len(suite.releaseSegments)
|
||||
suite.AssertTaskNum(0, segmentsNum, 0, segmentsNum)
|
||||
|
||||
// Process tasks
|
||||
suite.dispatchAndWait(targetNode)
|
||||
suite.AssertTaskNum(segmentsNum, 0, 0, segmentsNum)
|
||||
suite.AssertTaskNum(segmentsNum-1, 0, 0, segmentsNum-1)
|
||||
|
||||
// Other nodes' HB can't trigger the procedure of tasks
|
||||
suite.dispatchAndWait(targetNode + 1)
|
||||
suite.AssertTaskNum(segmentsNum, 0, 0, segmentsNum)
|
||||
suite.AssertTaskNum(segmentsNum-1, 0, 0, segmentsNum-1)
|
||||
|
||||
// Release done
|
||||
suite.dist.LeaderViewManager.Update(targetNode)
|
||||
|
||||
// Process tasks done
|
||||
suite.dispatchAndWait(targetNode)
|
||||
suite.AssertTaskNum(0, 0, 0, 0)
|
||||
|
||||
for _, task := range tasks {
|
||||
suite.Equal(TaskStatusSucceeded, task.Status())
|
||||
suite.NoError(task.Err())
|
||||
for i, task := range tasks {
|
||||
if i == 0 {
|
||||
suite.Equal(TaskStatusStale, task.Status())
|
||||
suite.Error(task.Err())
|
||||
} else {
|
||||
suite.Equal(TaskStatusSucceeded, task.Status())
|
||||
suite.NoError(task.Err())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -736,8 +753,8 @@ func (suite *TaskSuite) TestTaskCanceled() {
|
||||
suite.Equal(TaskStatusCanceled, task.Status())
|
||||
suite.ErrorIs(task.Err(), ErrTaskCanceled)
|
||||
} else {
|
||||
suite.Equal(TaskStatusSucceeded, task.Status())
|
||||
suite.NoError(task.Err())
|
||||
suite.Equal(TaskStatusStale, task.Status())
|
||||
suite.Error(task.Err())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user