From 4412cfcaaf1cd4facf85aa88ff50ddaede470be1 Mon Sep 17 00:00:00 2001 From: wei liu Date: Fri, 28 Oct 2022 17:15:32 +0800 Subject: [PATCH] reduce querycoord unnecessary panic (#19925) Signed-off-by: Wei Liu Signed-off-by: Wei Liu --- internal/querycoordv2/balance/utils.go | 28 ++++- .../querycoordv2/checkers/channel_checker.go | 14 ++- .../querycoordv2/checkers/segment_checker.go | 19 ++- internal/querycoordv2/handlers.go | 16 ++- internal/querycoordv2/task/merger_test.go | 4 +- internal/querycoordv2/task/task.go | 27 +++-- internal/querycoordv2/task/task_test.go | 110 +++++++++++++----- 7 files changed, 169 insertions(+), 49 deletions(-) diff --git a/internal/querycoordv2/balance/utils.go b/internal/querycoordv2/balance/utils.go index 30a005b6c7..1491f28ea5 100644 --- a/internal/querycoordv2/balance/utils.go +++ b/internal/querycoordv2/balance/utils.go @@ -20,7 +20,9 @@ import ( "context" "time" + "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/querycoordv2/task" + "go.uber.org/zap" ) func CreateSegmentTasksFromPlans(ctx context.Context, checkerID int64, timeout time.Duration, plans []SegmentAssignPlan) []task.Task { @@ -35,7 +37,7 @@ func CreateSegmentTasksFromPlans(ctx context.Context, checkerID int64, timeout t action := task.NewSegmentAction(p.From, task.ActionTypeReduce, p.Segment.GetInsertChannel(), p.Segment.GetID()) actions = append(actions, action) } - task := task.NewSegmentTask( + task, err := task.NewSegmentTask( ctx, timeout, checkerID, @@ -43,6 +45,17 @@ func CreateSegmentTasksFromPlans(ctx context.Context, checkerID int64, timeout t p.ReplicaID, actions..., ) + if err != nil { + log.Warn("Create segment task from plan failed", + zap.Int64("collection", p.Segment.GetCollectionID()), + zap.Int64("replica", p.ReplicaID), + zap.String("channel", p.Segment.GetInsertChannel()), + zap.Int64("From", p.From), + zap.Int64("To", p.To), + zap.Error(err), + ) + continue + } ret = append(ret, task) } return ret @@ -60,7 +73,18 @@ func CreateChannelTasksFromPlans(ctx context.Context, checkerID int64, timeout t action := task.NewChannelAction(p.From, task.ActionTypeReduce, p.Channel.GetChannelName()) actions = append(actions, action) } - task := task.NewChannelTask(ctx, timeout, checkerID, p.Channel.GetCollectionID(), p.ReplicaID, actions...) + task, err := task.NewChannelTask(ctx, timeout, checkerID, p.Channel.GetCollectionID(), p.ReplicaID, actions...) + if err != nil { + log.Warn("Create channel task from plan failed", + zap.Int64("collection", p.Channel.GetCollectionID()), + zap.Int64("replica", p.ReplicaID), + zap.String("channel", p.Channel.GetChannelName()), + zap.Int64("From", p.From), + zap.Int64("To", p.To), + zap.Error(err), + ) + continue + } ret = append(ret, task) } return ret diff --git a/internal/querycoordv2/checkers/channel_checker.go b/internal/querycoordv2/checkers/channel_checker.go index b650602301..b6366863e1 100644 --- a/internal/querycoordv2/checkers/channel_checker.go +++ b/internal/querycoordv2/checkers/channel_checker.go @@ -19,11 +19,13 @@ package checkers import ( "context" + "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/querycoordv2/balance" "github.com/milvus-io/milvus/internal/querycoordv2/meta" . "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/querycoordv2/utils" + "go.uber.org/zap" ) // TODO(sunby): have too much similar codes with SegmentChecker @@ -148,7 +150,17 @@ func (c *ChannelChecker) createChannelReduceTasks(ctx context.Context, channels ret := make([]task.Task, 0, len(channels)) for _, ch := range channels { action := task.NewChannelAction(ch.Node, task.ActionTypeReduce, ch.GetChannelName()) - task := task.NewChannelTask(ctx, Params.QueryCoordCfg.ChannelTaskTimeout, c.ID(), ch.GetCollectionID(), replicaID, action) + task, err := task.NewChannelTask(ctx, Params.QueryCoordCfg.ChannelTaskTimeout, c.ID(), ch.GetCollectionID(), replicaID, action) + if err != nil { + log.Warn("Create channel reduce task failed", + zap.Int64("collection", ch.GetCollectionID()), + zap.Int64("replica", replicaID), + zap.String("channel", ch.GetChannelName()), + zap.Int64("From", ch.Node), + zap.Error(err), + ) + continue + } ret = append(ret, task) } return ret diff --git a/internal/querycoordv2/checkers/segment_checker.go b/internal/querycoordv2/checkers/segment_checker.go index 3af61451f0..1f44f391a5 100644 --- a/internal/querycoordv2/checkers/segment_checker.go +++ b/internal/querycoordv2/checkers/segment_checker.go @@ -19,6 +19,7 @@ package checkers import ( "context" + "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querycoordv2/balance" @@ -27,6 +28,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/querycoordv2/utils" "github.com/milvus-io/milvus/internal/util/typeutil" + "go.uber.org/zap" ) type SegmentChecker struct { @@ -246,14 +248,27 @@ func (c *SegmentChecker) createSegmentReduceTasks(ctx context.Context, segments ret := make([]task.Task, 0, len(segments)) for _, s := range segments { action := task.NewSegmentActionWithScope(s.Node, task.ActionTypeReduce, s.GetInsertChannel(), s.GetID(), scope) - ret = append(ret, task.NewSegmentTask( + task, err := task.NewSegmentTask( ctx, Params.QueryCoordCfg.SegmentTaskTimeout, c.ID(), s.GetCollectionID(), replicaID, action, - )) + ) + + if err != nil { + log.Warn("Create segment reduce task failed", + zap.Int64("collection", s.GetCollectionID()), + zap.Int64("replica", replicaID), + zap.String("channel", s.GetInsertChannel()), + zap.Int64("From", s.Node), + zap.Error(err), + ) + continue + } + + ret = append(ret, task) } return ret } diff --git a/internal/querycoordv2/handlers.go b/internal/querycoordv2/handlers.go index 47a1c262f6..f5a74e1169 100644 --- a/internal/querycoordv2/handlers.go +++ b/internal/querycoordv2/handlers.go @@ -115,7 +115,7 @@ func (s *Server) balanceSegments(ctx context.Context, req *querypb.LoadBalanceRe zap.Int64("destNodeID", plan.To), zap.Int64("segmentID", plan.Segment.GetID()), ) - task := task.NewSegmentTask(ctx, + task, err := task.NewSegmentTask(ctx, Params.QueryCoordCfg.SegmentTaskTimeout, req.GetBase().GetMsgID(), req.GetCollectionID(), @@ -123,7 +123,19 @@ func (s *Server) balanceSegments(ctx context.Context, req *querypb.LoadBalanceRe task.NewSegmentAction(plan.To, task.ActionTypeGrow, plan.Segment.GetInsertChannel(), plan.Segment.GetID()), task.NewSegmentAction(srcNode, task.ActionTypeReduce, plan.Segment.GetInsertChannel(), plan.Segment.GetID()), ) - err := s.taskScheduler.Add(task) + + if err != nil { + log.Warn("Create segment task for balance failed", + zap.Int64("collection", req.GetCollectionID()), + zap.Int64("replica", replica.GetID()), + zap.String("channel", plan.Segment.InsertChannel), + zap.Int64("From", srcNode), + zap.Int64("To", plan.To), + zap.Error(err), + ) + continue + } + err = s.taskScheduler.Add(task) if err != nil { task.Cancel() return err diff --git a/internal/querycoordv2/task/merger_test.go b/internal/querycoordv2/task/merger_test.go index 9eed4dc29b..d302ac5308 100644 --- a/internal/querycoordv2/task/merger_test.go +++ b/internal/querycoordv2/task/merger_test.go @@ -120,9 +120,9 @@ func (suite *MergerSuite) TestMerge() { ctx := context.Background() for segmentID := int64(1); segmentID <= 3; segmentID++ { - task := NewSegmentTask(ctx, timeout, 0, suite.collectionID, suite.replicaID, + task, err := NewSegmentTask(ctx, timeout, 0, suite.collectionID, suite.replicaID, NewSegmentAction(suite.nodeID, ActionTypeGrow, "", segmentID)) - + suite.NoError(err) suite.merger.Add(NewLoadSegmentsTask(task, 0, suite.requests[segmentID])) } diff --git a/internal/querycoordv2/task/task.go b/internal/querycoordv2/task/task.go index b91d323d9e..8a8a5874be 100644 --- a/internal/querycoordv2/task/task.go +++ b/internal/querycoordv2/task/task.go @@ -18,6 +18,7 @@ package task import ( "context" + "errors" "fmt" "time" @@ -44,6 +45,12 @@ const ( TaskPriorityHigh ) +var ( + ErrEmptyActions = errors.New("actions could not be empty") + ErrActionsTypeInconsistent = errors.New("actions have inconsistent type") + ErrActionsTargetInconsistent = errors.New("actions have inconsistent target channel/segment") +) + var ( // All task priorities from low to high TaskPriorities = []Priority{TaskPriorityLow, TaskPriorityNormal, TaskPriorityHigh} @@ -233,9 +240,9 @@ func NewSegmentTask(ctx context.Context, sourceID, collectionID, replicaID UniqueID, - actions ...Action) *SegmentTask { + actions ...Action) (*SegmentTask, error) { if len(actions) == 0 { - panic("empty actions is not allowed") + return nil, ErrEmptyActions } segmentID := int64(-1) @@ -243,13 +250,13 @@ func NewSegmentTask(ctx context.Context, for _, action := range actions { action, ok := action.(*SegmentAction) if !ok { - panic("SegmentTask can only contain SegmentActions") + return nil, ErrActionsTypeInconsistent } if segmentID == -1 { segmentID = action.SegmentID() shard = action.Shard() } else if segmentID != action.SegmentID() { - panic("all actions must process the same segment") + return nil, ErrActionsTargetInconsistent } } @@ -258,7 +265,7 @@ func NewSegmentTask(ctx context.Context, return &SegmentTask{ baseTask: base, segmentID: segmentID, - } + }, nil } func (task *SegmentTask) Shard() string { @@ -285,21 +292,21 @@ func NewChannelTask(ctx context.Context, sourceID, collectionID, replicaID UniqueID, - actions ...Action) *ChannelTask { + actions ...Action) (*ChannelTask, error) { if len(actions) == 0 { - panic("empty actions is not allowed") + return nil, ErrEmptyActions } channel := "" for _, action := range actions { channelAction, ok := action.(interface{ ChannelName() string }) if !ok { - panic("ChannelTask must contain only ChannelAction") + return nil, ErrActionsTypeInconsistent } if channel == "" { channel = channelAction.ChannelName() } else if channel != channelAction.ChannelName() { - panic("all actions must process the same channel") + return nil, ErrActionsTargetInconsistent } } @@ -307,7 +314,7 @@ func NewChannelTask(ctx context.Context, base.actions = actions return &ChannelTask{ baseTask: base, - } + }, nil } func (task *ChannelTask) Channel() string { diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index 8b50941b4d..682fec695e 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -203,7 +203,7 @@ func (suite *TaskSuite) TestSubscribeChannelTask() { ChannelName: channel, UnflushedSegmentIds: []int64{suite.growingSegments[channel]}, })) - task := NewChannelTask( + task, err := NewChannelTask( ctx, timeout, 0, @@ -211,8 +211,9 @@ func (suite *TaskSuite) TestSubscribeChannelTask() { suite.replica, NewChannelAction(targetNode, ActionTypeGrow, channel), ) + suite.NoError(err) tasks = append(tasks, task) - err := suite.scheduler.Add(task) + err = suite.scheduler.Add(task) suite.NoError(err) } suite.AssertTaskNum(0, len(suite.subChannels), len(suite.subChannels), 0) @@ -261,7 +262,7 @@ func (suite *TaskSuite) TestUnsubscribeChannelTask() { CollectionID: suite.collection, ChannelName: channel, })) - task := NewChannelTask( + task, err := NewChannelTask( ctx, timeout, 0, @@ -269,8 +270,10 @@ func (suite *TaskSuite) TestUnsubscribeChannelTask() { -1, NewChannelAction(targetNode, ActionTypeReduce, channel), ) + + suite.NoError(err) tasks = append(tasks, task) - err := suite.scheduler.Add(task) + err = suite.scheduler.Add(task) suite.NoError(err) } // Only first channel exists @@ -343,7 +346,7 @@ func (suite *TaskSuite) TestLoadSegmentTask() { PartitionID: partition, InsertChannel: channel.ChannelName, }) - task := NewSegmentTask( + task, err := NewSegmentTask( ctx, timeout, 0, @@ -351,8 +354,9 @@ func (suite *TaskSuite) TestLoadSegmentTask() { suite.replica, NewSegmentAction(targetNode, ActionTypeGrow, channel.GetChannelName(), segment), ) + suite.NoError(err) tasks = append(tasks, task) - err := suite.scheduler.Add(task) + err = suite.scheduler.Add(task) suite.NoError(err) } segmentsNum := len(suite.loadSegments) @@ -426,7 +430,7 @@ func (suite *TaskSuite) TestLoadSegmentTaskFailed() { PartitionID: partition, InsertChannel: channel.ChannelName, }) - task := NewSegmentTask( + task, err := NewSegmentTask( ctx, timeout, 0, @@ -434,8 +438,9 @@ func (suite *TaskSuite) TestLoadSegmentTaskFailed() { suite.replica, NewSegmentAction(targetNode, ActionTypeGrow, channel.GetChannelName(), segment), ) + suite.NoError(err) tasks = append(tasks, task) - err := suite.scheduler.Add(task) + err = suite.scheduler.Add(task) suite.NoError(err) } segmentsNum := len(suite.loadSegments) @@ -492,7 +497,7 @@ func (suite *TaskSuite) TestReleaseSegmentTask() { }, }) view.Segments[segment] = &querypb.SegmentDist{NodeID: targetNode, Version: 0} - task := NewSegmentTask( + task, err := NewSegmentTask( ctx, timeout, 0, @@ -500,8 +505,9 @@ func (suite *TaskSuite) TestReleaseSegmentTask() { suite.replica, NewSegmentAction(targetNode, ActionTypeReduce, channel.GetChannelName(), segment), ) + suite.NoError(err) tasks = append(tasks, task) - err := suite.scheduler.Add(task) + err = suite.scheduler.Add(task) suite.NoError(err) } suite.dist.SegmentDistManager.Update(targetNode, segments...) @@ -539,7 +545,7 @@ func (suite *TaskSuite) TestReleaseGrowingSegmentTask() { tasks := []Task{} for _, segment := range suite.releaseSegments { - task := NewSegmentTask( + task, err := NewSegmentTask( ctx, timeout, 0, @@ -547,8 +553,9 @@ func (suite *TaskSuite) TestReleaseGrowingSegmentTask() { suite.replica, NewSegmentActionWithScope(targetNode, ActionTypeReduce, "", segment, querypb.DataScope_Streaming), ) + suite.NoError(err) tasks = append(tasks, task) - err := suite.scheduler.Add(task) + err = suite.scheduler.Add(task) suite.NoError(err) } @@ -630,7 +637,7 @@ func (suite *TaskSuite) TestMoveSegmentTask() { }) view.Segments[segment] = &querypb.SegmentDist{NodeID: sourceNode, Version: 0} - task := NewSegmentTask( + task, err := NewSegmentTask( ctx, timeout, 0, @@ -639,8 +646,9 @@ func (suite *TaskSuite) TestMoveSegmentTask() { NewSegmentAction(targetNode, ActionTypeGrow, channel.GetChannelName(), segment), NewSegmentAction(sourceNode, ActionTypeReduce, channel.GetChannelName(), segment), ) + suite.NoError(err) tasks = append(tasks, task) - err := suite.scheduler.Add(task) + err = suite.scheduler.Add(task) suite.NoError(err) } suite.dist.SegmentDistManager.Update(sourceNode, segments...) @@ -689,7 +697,7 @@ func (suite *TaskSuite) TestTaskCanceled() { CollectionID: suite.collection, ChannelName: channel, })) - task := NewChannelTask( + task, err := NewChannelTask( ctx, timeout, 0, @@ -697,8 +705,9 @@ func (suite *TaskSuite) TestTaskCanceled() { -1, NewChannelAction(targetNode, ActionTypeReduce, channel), ) + suite.NoError(err) tasks = append(tasks, task) - err := suite.scheduler.Add(task) + err = suite.scheduler.Add(task) suite.NoError(err) } // Only first channel exists @@ -778,7 +787,7 @@ func (suite *TaskSuite) TestSegmentTaskStale() { PartitionID: partition, InsertChannel: channel.ChannelName, }) - task := NewSegmentTask( + task, err := NewSegmentTask( ctx, timeout, 0, @@ -786,8 +795,9 @@ func (suite *TaskSuite) TestSegmentTaskStale() { suite.replica, NewSegmentAction(targetNode, ActionTypeGrow, channel.GetChannelName(), segment), ) + suite.NoError(err) tasks = append(tasks, task) - err := suite.scheduler.Add(task) + err = suite.scheduler.Add(task) suite.NoError(err) } segmentsNum := len(suite.loadSegments) @@ -833,7 +843,7 @@ func (suite *TaskSuite) TestChannelTaskReplace() { targetNode := int64(3) for _, channel := range suite.subChannels { - task := NewChannelTask( + task, err := NewChannelTask( ctx, timeout, 0, @@ -841,15 +851,16 @@ func (suite *TaskSuite) TestChannelTaskReplace() { suite.replica, NewChannelAction(targetNode, ActionTypeGrow, channel), ) + suite.NoError(err) task.SetPriority(TaskPriorityNormal) - err := suite.scheduler.Add(task) + err = suite.scheduler.Add(task) suite.NoError(err) } // Task with the same replica and segment, // but without higher priority can't be added for _, channel := range suite.subChannels { - task := NewChannelTask( + task, err := NewChannelTask( ctx, timeout, 0, @@ -857,8 +868,9 @@ func (suite *TaskSuite) TestChannelTaskReplace() { suite.replica, NewChannelAction(targetNode, ActionTypeGrow, channel), ) + suite.NoError(err) task.SetPriority(TaskPriorityNormal) - err := suite.scheduler.Add(task) + err = suite.scheduler.Add(task) suite.ErrorIs(err, ErrConflictTaskExisted) task.SetPriority(TaskPriorityLow) err = suite.scheduler.Add(task) @@ -867,7 +879,7 @@ func (suite *TaskSuite) TestChannelTaskReplace() { // Replace the task with one with higher priority for _, channel := range suite.subChannels { - task := NewChannelTask( + task, err := NewChannelTask( ctx, timeout, 0, @@ -875,21 +887,56 @@ func (suite *TaskSuite) TestChannelTaskReplace() { suite.replica, NewChannelAction(targetNode, ActionTypeGrow, channel), ) + suite.NoError(err) task.SetPriority(TaskPriorityHigh) - err := suite.scheduler.Add(task) + err = suite.scheduler.Add(task) suite.NoError(err) } channelNum := len(suite.subChannels) suite.AssertTaskNum(0, channelNum, channelNum, 0) } +func (suite *TaskSuite) TestCreateTaskBehavior() { + chanelTask, err := NewChannelTask(context.TODO(), 5*time.Second, 0, 0, 0) + suite.Error(err) + suite.ErrorIs(err, ErrEmptyActions) + suite.Nil(chanelTask) + + action := NewSegmentAction(0, 0, "", 0) + chanelTask, err = NewChannelTask(context.TODO(), 5*time.Second, 0, 0, 0, action) + suite.ErrorIs(err, ErrActionsTypeInconsistent) + suite.Nil(chanelTask) + + action1 := NewChannelAction(0, 0, "fake-channel1") + action2 := NewChannelAction(0, 0, "fake-channel2") + chanelTask, err = NewChannelTask(context.TODO(), 5*time.Second, 0, 0, 0, action1, action2) + suite.ErrorIs(err, ErrActionsTargetInconsistent) + suite.Nil(chanelTask) + + segmentTask, err := NewSegmentTask(context.TODO(), 5*time.Second, 0, 0, 0) + suite.ErrorIs(err, ErrEmptyActions) + suite.Nil(segmentTask) + + channelAction := NewChannelAction(0, 0, "fake-channel1") + segmentTask, err = NewSegmentTask(context.TODO(), 5*time.Second, 0, 0, 0, channelAction) + suite.ErrorIs(err, ErrActionsTypeInconsistent) + suite.Nil(segmentTask) + + segmentAction1 := NewSegmentAction(0, 0, "", 0) + segmentAction2 := NewSegmentAction(0, 0, "", 1) + + segmentTask, err = NewSegmentTask(context.TODO(), 5*time.Second, 0, 0, 0, segmentAction1, segmentAction2) + suite.ErrorIs(err, ErrActionsTargetInconsistent) + suite.Nil(segmentTask) +} + func (suite *TaskSuite) TestSegmentTaskReplace() { ctx := context.Background() timeout := 10 * time.Second targetNode := int64(3) for _, segment := range suite.loadSegments { - task := NewSegmentTask( + task, err := NewSegmentTask( ctx, timeout, 0, @@ -897,15 +944,16 @@ func (suite *TaskSuite) TestSegmentTaskReplace() { suite.replica, NewSegmentAction(targetNode, ActionTypeGrow, "", segment), ) + suite.NoError(err) task.SetPriority(TaskPriorityNormal) - err := suite.scheduler.Add(task) + err = suite.scheduler.Add(task) suite.NoError(err) } // Task with the same replica and segment, // but without higher priority can't be added for _, segment := range suite.loadSegments { - task := NewSegmentTask( + task, err := NewSegmentTask( ctx, timeout, 0, @@ -913,8 +961,9 @@ func (suite *TaskSuite) TestSegmentTaskReplace() { suite.replica, NewSegmentAction(targetNode, ActionTypeGrow, "", segment), ) + suite.NoError(err) task.SetPriority(TaskPriorityNormal) - err := suite.scheduler.Add(task) + err = suite.scheduler.Add(task) suite.ErrorIs(err, ErrConflictTaskExisted) task.SetPriority(TaskPriorityLow) err = suite.scheduler.Add(task) @@ -923,7 +972,7 @@ func (suite *TaskSuite) TestSegmentTaskReplace() { // Replace the task with one with higher priority for _, segment := range suite.loadSegments { - task := NewSegmentTask( + task, err := NewSegmentTask( ctx, timeout, 0, @@ -931,8 +980,9 @@ func (suite *TaskSuite) TestSegmentTaskReplace() { suite.replica, NewSegmentAction(targetNode, ActionTypeGrow, "", segment), ) + suite.NoError(err) task.SetPriority(TaskPriorityHigh) - err := suite.scheduler.Add(task) + err = suite.scheduler.Add(task) suite.NoError(err) } segmentNum := len(suite.loadSegments)