diff --git a/internal/querycoordv2/balance/utils.go b/internal/querycoordv2/balance/utils.go index ad06e27dd7..e7fe0d9c24 100644 --- a/internal/querycoordv2/balance/utils.go +++ b/internal/querycoordv2/balance/utils.go @@ -45,23 +45,24 @@ func CreateSegmentTasksFromPlans(ctx context.Context, checkerID int64, timeout t p.ReplicaID, actions..., ) - log.Info("Create Segment task", - zap.Int64("collection", p.Segment.GetCollectionID()), - zap.Int64("replica", p.ReplicaID), - zap.String("channel", p.Segment.GetInsertChannel()), - zap.Int64("From", p.From), - zap.Int64("To", p.To)) if err != nil { - log.Warn("Create segment task from plan failed", + log.Warn("create segment task from plan failed", zap.Int64("collection", p.Segment.GetCollectionID()), zap.Int64("replica", p.ReplicaID), zap.String("channel", p.Segment.GetInsertChannel()), - zap.Int64("From", p.From), - zap.Int64("To", p.To), + zap.Int64("from", p.From), + zap.Int64("to", p.To), zap.Error(err), ) continue } + + log.Info("create segment task", + zap.Int64("collection", p.Segment.GetCollectionID()), + zap.Int64("replica", p.ReplicaID), + zap.String("channel", p.Segment.GetInsertChannel()), + zap.Int64("from", p.From), + zap.Int64("to", p.To)) task.SetPriority(GetTaskPriorityFromWeight(p.Weight)) ret = append(ret, task) } @@ -81,23 +82,24 @@ func CreateChannelTasksFromPlans(ctx context.Context, checkerID int64, timeout t actions = append(actions, action) } task, err := task.NewChannelTask(ctx, timeout, checkerID, p.Channel.GetCollectionID(), p.ReplicaID, actions...) - log.Info("Create Channel task", - zap.Int64("collection", p.Channel.GetCollectionID()), - zap.Int64("replica", p.ReplicaID), - zap.String("channel", p.Channel.GetChannelName()), - zap.Int64("From", p.From), - zap.Int64("To", p.To)) if err != nil { - log.Warn("Create channel task from plan failed", + log.Warn("create channel task failed", zap.Int64("collection", p.Channel.GetCollectionID()), zap.Int64("replica", p.ReplicaID), zap.String("channel", p.Channel.GetChannelName()), - zap.Int64("From", p.From), - zap.Int64("To", p.To), + zap.Int64("from", p.From), + zap.Int64("to", p.To), zap.Error(err), ) continue } + + log.Info("create channel task", + zap.Int64("collection", p.Channel.GetCollectionID()), + zap.Int64("replica", p.ReplicaID), + zap.String("channel", p.Channel.GetChannelName()), + zap.Int64("from", p.From), + zap.Int64("to", p.To)) task.SetPriority(GetTaskPriorityFromWeight(p.Weight)) ret = append(ret, task) } diff --git a/internal/querycoordv2/checkers/channel_checker.go b/internal/querycoordv2/checkers/channel_checker.go index 8caf79877f..31a4d4485c 100644 --- a/internal/querycoordv2/checkers/channel_checker.go +++ b/internal/querycoordv2/checkers/channel_checker.go @@ -193,11 +193,11 @@ func (c *ChannelChecker) createChannelReduceTasks(ctx context.Context, channels action := task.NewChannelAction(ch.Node, task.ActionTypeReduce, ch.GetChannelName()) task, err := task.NewChannelTask(ctx, Params.QueryCoordCfg.ChannelTaskTimeout.GetAsDuration(time.Millisecond), c.ID(), ch.GetCollectionID(), replicaID, action) if err != nil { - log.Warn("Create channel reduce task failed", + log.Warn("create channel reduce task failed", zap.Int64("collection", ch.GetCollectionID()), zap.Int64("replica", replicaID), zap.String("channel", ch.GetChannelName()), - zap.Int64("From", ch.Node), + zap.Int64("from", ch.Node), zap.Error(err), ) continue diff --git a/internal/querycoordv2/checkers/segment_checker.go b/internal/querycoordv2/checkers/segment_checker.go index e83e535911..4ca7156026 100644 --- a/internal/querycoordv2/checkers/segment_checker.go +++ b/internal/querycoordv2/checkers/segment_checker.go @@ -296,11 +296,11 @@ func (c *SegmentChecker) createSegmentReduceTasks(ctx context.Context, segments ) if err != nil { - log.Warn("Create segment reduce task failed", + log.Warn("create segment reduce task failed", zap.Int64("collection", s.GetCollectionID()), zap.Int64("replica", replicaID), zap.String("channel", s.GetInsertChannel()), - zap.Int64("From", s.Node), + zap.Int64("from", s.Node), zap.Error(err), ) continue diff --git a/internal/querycoordv2/handlers.go b/internal/querycoordv2/handlers.go index d62585ea28..c42deab5ff 100644 --- a/internal/querycoordv2/handlers.go +++ b/internal/querycoordv2/handlers.go @@ -149,12 +149,12 @@ func (s *Server) balanceSegments(ctx context.Context, req *querypb.LoadBalanceRe ) if err != nil { - log.Warn("Create segment task for balance failed", + log.Warn("create segment task for balance failed", zap.Int64("collection", req.GetCollectionID()), zap.Int64("replica", replica.GetID()), zap.String("channel", plan.Segment.InsertChannel), - zap.Int64("From", srcNode), - zap.Int64("To", plan.To), + zap.Int64("from", srcNode), + zap.Int64("to", plan.To), zap.Error(err), ) continue diff --git a/internal/querycoordv2/task/action.go b/internal/querycoordv2/task/action.go index c07cff98b8..29eb609b5c 100644 --- a/internal/querycoordv2/task/action.go +++ b/internal/querycoordv2/task/action.go @@ -17,8 +17,6 @@ package task import ( - "github.com/cockroachdb/errors" - "github.com/samber/lo" "go.uber.org/atomic" @@ -28,12 +26,6 @@ import ( . "github.com/milvus-io/milvus/internal/util/typeutil" ) -var ( - ErrActionCanceled = errors.New("ActionCanceled") - ErrActionRPCFailed = errors.New("ActionRPCFailed") - ErrActionStale = errors.New("ActionStale") -) - type ActionType = int32 const ( diff --git a/internal/querycoordv2/task/task.go b/internal/querycoordv2/task/task.go index 32a589408e..780e06545a 100644 --- a/internal/querycoordv2/task/task.go +++ b/internal/querycoordv2/task/task.go @@ -22,9 +22,9 @@ import ( "time" "github.com/cockroachdb/errors" - "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querycoordv2/meta" + "github.com/milvus-io/milvus/internal/util/merr" . "github.com/milvus-io/milvus/internal/util/typeutil" "go.uber.org/atomic" ) @@ -46,12 +46,6 @@ const ( TaskPriorityHigh // for channel checker ) -var ( - ErrEmptyActions = errors.New("actions could not be empty") - ErrActionsTypeInconsistent = errors.New("actions have inconsistent type") - ErrActionsTargetInconsistent = errors.New("actions have inconsistent target channel/segment") -) - var ( // All task priorities from low to high TaskPriorities = []Priority{TaskPriorityLow, TaskPriorityNormal, TaskPriorityHigh} @@ -249,7 +243,7 @@ func NewSegmentTask(ctx context.Context, replicaID UniqueID, actions ...Action) (*SegmentTask, error) { if len(actions) == 0 { - return nil, ErrEmptyActions + return nil, errors.WithStack(merr.WrapErrParameterInvalid("non-empty actions", "no action")) } segmentID := int64(-1) @@ -257,13 +251,13 @@ func NewSegmentTask(ctx context.Context, for _, action := range actions { action, ok := action.(*SegmentAction) if !ok { - return nil, ErrActionsTypeInconsistent + return nil, errors.WithStack(merr.WrapErrParameterInvalid("SegmentAction", "other action", "all actions must be with the same type")) } if segmentID == -1 { segmentID = action.SegmentID() shard = action.Shard() } else if segmentID != action.SegmentID() { - return nil, ErrActionsTargetInconsistent + return nil, errors.WithStack(merr.WrapErrParameterInvalid(segmentID, action.SegmentID(), "all actions must operate the same segment")) } } @@ -301,19 +295,19 @@ func NewChannelTask(ctx context.Context, replicaID UniqueID, actions ...Action) (*ChannelTask, error) { if len(actions) == 0 { - return nil, ErrEmptyActions + return nil, errors.WithStack(merr.WrapErrParameterInvalid("non-empty actions", "no action")) } channel := "" for _, action := range actions { - channelAction, ok := action.(interface{ ChannelName() string }) + channelAction, ok := action.(*ChannelAction) if !ok { - return nil, ErrActionsTypeInconsistent + return nil, errors.WithStack(merr.WrapErrParameterInvalid("ChannelAction", "other action", "all actions must be with the same type")) } if channel == "" { channel = channelAction.ChannelName() } else if channel != channelAction.ChannelName() { - return nil, ErrActionsTargetInconsistent + return nil, errors.WithStack(merr.WrapErrParameterInvalid(channel, channelAction.ChannelName(), "all actions must operate the same segment")) } } diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index 9179640fe7..5d5a9ec1a1 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -35,6 +35,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/utils" "github.com/milvus-io/milvus/internal/util/etcd" + "github.com/milvus-io/milvus/internal/util/merr" "github.com/milvus-io/milvus/internal/util/typeutil" mock "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -1051,35 +1052,34 @@ func (suite *TaskSuite) TestChannelTaskReplace() { func (suite *TaskSuite) TestCreateTaskBehavior() { chanelTask, err := NewChannelTask(context.TODO(), 5*time.Second, 0, 0, 0) - suite.Error(err) - suite.ErrorIs(err, ErrEmptyActions) + suite.ErrorIs(err, merr.ErrParameterInvalid) suite.Nil(chanelTask) action := NewSegmentAction(0, 0, "", 0) chanelTask, err = NewChannelTask(context.TODO(), 5*time.Second, 0, 0, 0, action) - suite.ErrorIs(err, ErrActionsTypeInconsistent) + suite.ErrorIs(err, merr.ErrParameterInvalid) suite.Nil(chanelTask) action1 := NewChannelAction(0, 0, "fake-channel1") action2 := NewChannelAction(0, 0, "fake-channel2") chanelTask, err = NewChannelTask(context.TODO(), 5*time.Second, 0, 0, 0, action1, action2) - suite.ErrorIs(err, ErrActionsTargetInconsistent) + suite.ErrorIs(err, merr.ErrParameterInvalid) suite.Nil(chanelTask) segmentTask, err := NewSegmentTask(context.TODO(), 5*time.Second, 0, 0, 0) - suite.ErrorIs(err, ErrEmptyActions) + suite.ErrorIs(err, merr.ErrParameterInvalid) suite.Nil(segmentTask) channelAction := NewChannelAction(0, 0, "fake-channel1") segmentTask, err = NewSegmentTask(context.TODO(), 5*time.Second, 0, 0, 0, channelAction) - suite.ErrorIs(err, ErrActionsTypeInconsistent) + suite.ErrorIs(err, merr.ErrParameterInvalid) suite.Nil(segmentTask) segmentAction1 := NewSegmentAction(0, 0, "", 0) segmentAction2 := NewSegmentAction(0, 0, "", 1) segmentTask, err = NewSegmentTask(context.TODO(), 5*time.Second, 0, 0, 0, segmentAction1, segmentAction2) - suite.ErrorIs(err, ErrActionsTargetInconsistent) + suite.ErrorIs(err, merr.ErrParameterInvalid) suite.Nil(segmentTask) }