diff --git a/internal/querynodev2/collector/average.go b/internal/querynodev2/collector/average.go index 526aa6bf9b..b791813da3 100644 --- a/internal/querynodev2/collector/average.go +++ b/internal/querynodev2/collector/average.go @@ -18,6 +18,8 @@ package collector import ( "sync" + + "github.com/milvus-io/milvus/pkg/util/merr" ) type averageData struct { @@ -65,7 +67,7 @@ func (c *averageCollector) Average(label string) (float64, error) { average, ok := c.averages[label] if !ok { - return 0, WrapErrAvarageLabelNotRegister(label) + return 0, merr.WrapErrAverageLabelNotRegister(label) } return average.Value(), nil diff --git a/internal/querynodev2/collector/errors.go b/internal/querynodev2/collector/errors.go deleted file mode 100644 index 283452bec9..0000000000 --- a/internal/querynodev2/collector/errors.go +++ /dev/null @@ -1,31 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package collector - -import ( - "fmt" - - "github.com/cockroachdb/errors" -) - -var ( - ErrAvarageLabelNotRegister = errors.New("AvarageLabelNotRegister") -) - -func WrapErrAvarageLabelNotRegister(label string) error { - return fmt.Errorf("%w :%s", ErrAvarageLabelNotRegister, label) -} diff --git a/internal/querynodev2/errors.go b/internal/querynodev2/errors.go deleted file mode 100644 index 25e4bea08c..0000000000 --- a/internal/querynodev2/errors.go +++ /dev/null @@ -1,42 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package querynodev2 - -import ( - "fmt" - - "github.com/cockroachdb/errors" -) - -var ( - ErrNodeUnhealthy = errors.New("NodeIsUnhealthy") - ErrGetDelegatorFailed = errors.New("GetShardDelefatorFailed") - ErrInitPipelineFailed = errors.New("InitPipelineFailed") -) - -// WrapErrNodeUnhealthy wraps ErrNodeUnhealthy with nodeID. -func WrapErrNodeUnhealthy(nodeID int64) error { - return fmt.Errorf("%w id: %d", ErrNodeUnhealthy, nodeID) -} - -func WrapErrInitPipelineFailed(err error) error { - return fmt.Errorf("%w err: %s", ErrInitPipelineFailed, err.Error()) -} - -func msgQueryNodeIsUnhealthy(nodeID int64) string { - return fmt.Sprintf("query node %d is not ready", nodeID) -} diff --git a/internal/querynodev2/handlers.go b/internal/querynodev2/handlers.go index d811b2a01d..a8a50a8ad9 100644 --- a/internal/querynodev2/handlers.go +++ b/internal/querynodev2/handlers.go @@ -125,7 +125,8 @@ func (node *QueryNode) queryChannel(ctx context.Context, req *querypb.QueryReque }() if !node.lifetime.Add(commonpbutil.IsHealthy) { - failRet.Status.Reason = msgQueryNodeIsUnhealthy(paramtable.GetNodeID()) + err := merr.WrapErrServiceUnavailable(fmt.Sprintf("node id: %d is unhealthy", paramtable.GetNodeID())) + failRet.Status = merr.Status(err) return failRet, nil } defer node.lifetime.Done() @@ -167,8 +168,9 @@ func (node *QueryNode) queryChannel(ctx context.Context, req *querypb.QueryReque // get delegator sd, ok := node.delegators.Get(channel) if !ok { - log.Warn("Query failed, failed to get query shard delegator", zap.Error(ErrGetDelegatorFailed)) - failRet.Status.Reason = ErrGetDelegatorFailed.Error() + err := merr.WrapErrServiceUnavailable("failed to get query shard delegator") + log.Warn("Query failed, failed to get query shard delegator", zap.Error(err)) + failRet.Status = merr.Status(err) return failRet, nil } @@ -330,7 +332,7 @@ func (node *QueryNode) searchChannel(ctx context.Context, req *querypb.SearchReq traceID := trace.SpanFromContext(ctx).SpanContext().TraceID() if !node.lifetime.Add(commonpbutil.IsHealthy) { - return nil, WrapErrNodeUnhealthy(paramtable.GetNodeID()) + return nil, merr.WrapErrServiceNotReady(fmt.Sprintf("node id: %d is unhealthy", paramtable.GetNodeID())) } defer node.lifetime.Done() @@ -353,8 +355,9 @@ func (node *QueryNode) searchChannel(ctx context.Context, req *querypb.SearchReq task := tasks.NewSearchTask(searchCtx, collection, node.manager, req) if !node.scheduler.Add(task) { - log.Warn("failed to search channel", zap.Error(tasks.ErrTaskQueueFull)) - return nil, tasks.ErrTaskQueueFull + err := merr.WrapErrTaskQueueFull() + log.Warn("failed to search channel", zap.Error(err)) + return nil, err } err := task.Wait() @@ -380,8 +383,9 @@ func (node *QueryNode) searchChannel(ctx context.Context, req *querypb.SearchReq // get delegator sd, ok := node.delegators.Get(channel) if !ok { - log.Warn("Query failed, failed to get query shard delegator", zap.Error(ErrGetDelegatorFailed)) - return nil, ErrGetDelegatorFailed + err := merr.WrapErrServiceUnavailable("failed to get query shard delegator") + log.Warn("Query failed, failed to get query shard delegator", zap.Error(err)) + return nil, err } req, err := node.optimizeSearchParams(ctx, req, sd) if err != nil { diff --git a/internal/querynodev2/pipeline/errors.go b/internal/querynodev2/pipeline/errors.go deleted file mode 100644 index 09d3f18954..0000000000 --- a/internal/querynodev2/pipeline/errors.go +++ /dev/null @@ -1,61 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package pipeline - -import ( - "fmt" - - "github.com/cockroachdb/errors" -) - -var ( - ErrMsgInvalidType = errors.New("InvalidMessageType") - ErrMsgNotAligned = errors.New("CheckAlignedFailed") - ErrMsgEmpty = errors.New("EmptyMessage") - ErrMsgNotTarget = errors.New("NotTarget") - ErrMsgExcluded = errors.New("SegmentExcluded") - - ErrCollectionNotFound = errors.New("CollectionNotFound") - ErrShardDelegatorNotFound = errors.New("ShardDelegatorNotFound") - - ErrNewPipelineFailed = errors.New("FailedCreateNewPipeline") - ErrStartPipeline = errors.New("PipineStartFailed") -) - -func WrapErrMsgNotAligned(err error) error { - return fmt.Errorf("%w :%s", ErrMsgNotAligned, err) -} - -func WrapErrMsgNotTarget(reason string) error { - return fmt.Errorf("%w%s", ErrMsgNotTarget, reason) -} - -func WrapErrMsgExcluded(segmentID int64) error { - return fmt.Errorf("%w ID:%d", ErrMsgExcluded, segmentID) -} - -func WrapErrNewPipelineFailed(err error) error { - return fmt.Errorf("%w :%s", ErrNewPipelineFailed, err) -} - -func WrapErrStartPipeline(reason string) error { - return fmt.Errorf("%w :%s", ErrStartPipeline, reason) -} - -func WrapErrShardDelegatorNotFound(channel string) error { - return fmt.Errorf("%w channel:%s", ErrShardDelegatorNotFound, channel) -} diff --git a/internal/querynodev2/pipeline/filter_node.go b/internal/querynodev2/pipeline/filter_node.go index 866d191af6..9c08cffcf2 100644 --- a/internal/querynodev2/pipeline/filter_node.go +++ b/internal/querynodev2/pipeline/filter_node.go @@ -30,11 +30,12 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/mq/msgstream" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) -//filterNode filter the invalid message of pipeline +// filterNode filter the invalid message of pipeline type filterNode struct { *BaseNode collectionID UniqueID @@ -102,7 +103,7 @@ func (fNode *filterNode) Operate(in Msg) Msg { return out } -//filtrate message with filter policy +// filtrate message with filter policy func (fNode *filterNode) filtrate(c *Collection, msg msgstream.TsMsg) error { switch msg.Type() { @@ -126,7 +127,7 @@ func (fNode *filterNode) filtrate(c *Collection, msg msgstream.TsMsg) error { } } default: - return ErrMsgInvalidType + return merr.WrapErrParameterInvalid("msgType is Insert or Delete", "not") } return nil } diff --git a/internal/querynodev2/pipeline/filter_policy.go b/internal/querynodev2/pipeline/filter_policy.go index ce6a1ff792..fb8f62a39f 100644 --- a/internal/querynodev2/pipeline/filter_policy.go +++ b/internal/querynodev2/pipeline/filter_policy.go @@ -16,31 +16,37 @@ package pipeline -//MsgFilter will return error if Msg was invalid +import ( + "fmt" + + "github.com/milvus-io/milvus/pkg/util/merr" +) + +// MsgFilter will return error if Msg was invalid type InsertMsgFilter = func(n *filterNode, c *Collection, msg *InsertMsg) error type DeleteMsgFilter = func(n *filterNode, c *Collection, msg *DeleteMsg) error -//Chack msg is aligned -- -//len of each kind of infos in InsertMsg should match each other +// Chack msg is aligned -- +// len of each kind of infos in InsertMsg should match each other func InsertNotAligned(n *filterNode, c *Collection, msg *InsertMsg) error { err := msg.CheckAligned() if err != nil { - return WrapErrMsgNotAligned(err) + return err } return nil } func InsertEmpty(n *filterNode, c *Collection, msg *InsertMsg) error { if len(msg.GetTimestamps()) <= 0 { - return ErrMsgEmpty + return merr.WrapErrParameterInvalid("has msg", "the length of timestamp field is 0") } return nil } func InsertOutOfTarget(n *filterNode, c *Collection, msg *InsertMsg) error { if msg.GetCollectionID() != c.ID() { - return WrapErrMsgNotTarget("Collection") + return merr.WrapErrParameterInvalid(msg.GetCollectionID(), c.ID(), "msg not target because of collection") } // all growing will be be in-memory to support dynamic partition load/release @@ -53,7 +59,8 @@ func InsertExcluded(n *filterNode, c *Collection, msg *InsertMsg) error { return nil } if msg.EndTimestamp <= segInfo.GetDmlPosition().GetTimestamp() { - return WrapErrMsgExcluded(msg.SegmentID) + m := fmt.Sprintf("Segment excluded, id: %d", msg.GetSegmentID()) + return merr.WrapErrSegmentLack(msg.GetSegmentID(), m) } return nil } @@ -61,21 +68,21 @@ func InsertExcluded(n *filterNode, c *Collection, msg *InsertMsg) error { func DeleteNotAligned(n *filterNode, c *Collection, msg *DeleteMsg) error { err := msg.CheckAligned() if err != nil { - return WrapErrMsgNotAligned(err) + return err } return nil } func DeleteEmpty(n *filterNode, c *Collection, msg *DeleteMsg) error { if len(msg.GetTimestamps()) <= 0 { - return ErrMsgEmpty + return merr.WrapErrParameterInvalid("has msg", "the length of timestamp field is 0") } return nil } func DeleteOutOfTarget(n *filterNode, c *Collection, msg *DeleteMsg) error { if msg.GetCollectionID() != c.ID() { - return WrapErrMsgNotTarget("Collection") + return merr.WrapErrParameterInvalid(msg.GetCollectionID(), c.ID(), "msg not target because of collection") } // all growing will be be in-memory to support dynamic partition load/release diff --git a/internal/querynodev2/pipeline/manager.go b/internal/querynodev2/pipeline/manager.go index 30589ec544..ba67753dd5 100644 --- a/internal/querynodev2/pipeline/manager.go +++ b/internal/querynodev2/pipeline/manager.go @@ -27,11 +27,12 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/mq/msgdispatcher" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) -//Manager manage pipeline in querynode +// Manager manage pipeline in querynode type Manager interface { Num() int Add(collectionID UniqueID, channel string) (Pipeline, error) @@ -55,7 +56,7 @@ func (m *manager) Num() int { return len(m.channel2Pipeline) } -//Add pipeline for each channel of collection +// Add pipeline for each channel of collection func (m *manager) Add(collectionID UniqueID, channel string) (Pipeline, error) { m.mu.Lock() defer m.mu.Unlock() @@ -76,12 +77,12 @@ func (m *manager) Add(collectionID UniqueID, channel string) (Pipeline, error) { //get shard delegator for add growing in pipeline delegator, ok := m.delegators.Get(channel) if !ok { - return nil, WrapErrShardDelegatorNotFound(channel) + return nil, merr.WrapErrShardDelegatorNotFound(channel) } newPipeLine, err := NewPipeLine(collectionID, channel, m.dataManager, m.tSafeManager, m.dispatcher, delegator) if err != nil { - return nil, WrapErrNewPipelineFailed(err) + return nil, merr.WrapErrServiceUnavailable(err.Error(), "failed to create new pipeline") } m.channel2Pipeline[channel] = newPipeLine @@ -105,7 +106,7 @@ func (m *manager) Get(channel string) Pipeline { return pipeline } -//Remove pipeline from Manager by channel +// Remove pipeline from Manager by channel func (m *manager) Remove(channels ...string) { m.mu.Lock() defer m.mu.Unlock() @@ -122,7 +123,7 @@ func (m *manager) Remove(channels ...string) { metrics.QueryNodeNumDmlChannels.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Dec() } -//Start pipeline by channel +// Start pipeline by channel func (m *manager) Start(channels ...string) error { m.mu.Lock() defer m.mu.Unlock() @@ -130,7 +131,8 @@ func (m *manager) Start(channels ...string) error { //check pipelie all exist before start for _, channel := range channels { if _, ok := m.channel2Pipeline[channel]; !ok { - return WrapErrStartPipeline(fmt.Sprintf("pipeline with channel %s not exist", channel)) + reason := fmt.Sprintf("pipeline with channel %s not exist", channel) + return merr.WrapErrServiceUnavailable(reason, "pipine start failed") } } @@ -140,7 +142,7 @@ func (m *manager) Start(channels ...string) error { return nil } -//Close all pipeline of Manager +// Close all pipeline of Manager func (m *manager) Close() { m.mu.Lock() defer m.mu.Unlock() diff --git a/internal/querynodev2/pipeline/message.go b/internal/querynodev2/pipeline/message.go index 1e277cad8f..49932dcb34 100644 --- a/internal/querynodev2/pipeline/message.go +++ b/internal/querynodev2/pipeline/message.go @@ -22,6 +22,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus/internal/querynodev2/collector" "github.com/milvus-io/milvus/pkg/mq/msgstream" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metricsinfo" ) @@ -47,7 +48,7 @@ func (msg *insertNodeMsg) append(taskMsg msgstream.TsMsg) error { msg.deleteMsgs = append(msg.deleteMsgs, deleteMsg) collector.Rate.Add(metricsinfo.DeleteConsumeThroughput, float64(proto.Size(&deleteMsg.DeleteRequest))) default: - return ErrMsgInvalidType + return merr.WrapErrParameterInvalid("msgType is Insert or Delete", "not") } return nil } diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index 8367847bb2..7e0b0fcb15 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -305,12 +305,12 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm } err = pipeline.ConsumeMsgStream(position) if err != nil { - err = WrapErrInitPipelineFailed(err) + err = merr.WrapErrServiceUnavailable(err.Error(), "InitPipelineFailed") log.Warn(err.Error(), zap.Int64("collectionID", channel.CollectionID), zap.String("channel", channel.ChannelName), ) - return util.WrapStatus(commonpb.ErrorCode_UnexpectedError, "", err), nil + return merr.Status(err), nil } // start pipeline @@ -778,16 +778,14 @@ func (node *QueryNode) SyncReplicaSegments(ctx context.Context, req *querypb.Syn // ShowConfigurations returns the configurations of queryNode matching req.Pattern func (node *QueryNode) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) { if !node.lifetime.Add(commonpbutil.IsHealthy) { + err := merr.WrapErrServiceNotReady(fmt.Sprintf("node id: %d is unhealthy", paramtable.GetNodeID())) log.Warn("QueryNode.ShowConfigurations failed", zap.Int64("nodeId", paramtable.GetNodeID()), zap.String("req", req.Pattern), - zap.Error(WrapErrNodeUnhealthy(paramtable.GetNodeID()))) + zap.Error(err)) return &internalpb.ShowConfigurationsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: msgQueryNodeIsUnhealthy(paramtable.GetNodeID()), - }, + Status: merr.Status(err), Configuations: nil, }, nil } @@ -814,16 +812,14 @@ func (node *QueryNode) ShowConfigurations(ctx context.Context, req *internalpb.S // GetMetrics return system infos of the query node, such as total memory, memory usage, cpu usage ... func (node *QueryNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { if !node.lifetime.Add(commonpbutil.IsHealthy) { + err := merr.WrapErrServiceNotReady(fmt.Sprintf("node id: %d is unhealthy", paramtable.GetNodeID())) log.Warn("QueryNode.GetMetrics failed", zap.Int64("nodeId", paramtable.GetNodeID()), zap.String("req", req.Request), - zap.Error(WrapErrNodeUnhealthy(paramtable.GetNodeID()))) + zap.Error(err)) return &milvuspb.GetMetricsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: msgQueryNodeIsUnhealthy(paramtable.GetNodeID()), - }, + Status: merr.Status(err), Response: "", }, nil } @@ -888,14 +884,12 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get zap.Int64("nodeID", paramtable.GetNodeID()), ) if !node.lifetime.Add(commonpbutil.IsHealthy) { + err := merr.WrapErrServiceNotReady(fmt.Sprintf("node id: %d is unhealthy", paramtable.GetNodeID())) log.Warn("QueryNode.GetMetrics failed", - zap.Error(WrapErrNodeUnhealthy(paramtable.GetNodeID()))) + zap.Error(err)) return &querypb.GetDataDistributionResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: msgQueryNodeIsUnhealthy(paramtable.GetNodeID()), - }, + Status: merr.Status(err), }, nil } defer node.lifetime.Done() diff --git a/internal/querynodev2/tasks/errors.go b/internal/querynodev2/tasks/errors.go deleted file mode 100644 index bda49aab98..0000000000 --- a/internal/querynodev2/tasks/errors.go +++ /dev/null @@ -1,7 +0,0 @@ -package tasks - -import "github.com/cockroachdb/errors" - -var ( - ErrTaskQueueFull = errors.New("TaskQueueFull") -) diff --git a/pkg/util/merr/errors.go b/pkg/util/merr/errors.go index 59e6255b3a..cb8b621faa 100644 --- a/pkg/util/merr/errors.go +++ b/pkg/util/merr/errors.go @@ -103,6 +103,15 @@ var ( ErrTopicNotFound = newMilvusError("topic not found", 1300, false) ErrTopicNotEmpty = newMilvusError("topic not empty", 1301, false) + // Average related + ErrAverageLabelNotRegister = newMilvusError("average label not register", 1400, false) + + // shard delegator related + ErrShardDelegatorNotFound = newMilvusError("shard delegator not found", 1500, false) + + // task related + ErrTaskQueueFull = newMilvusError("task queue full", 1600, false) + // Do NOT export this, // never allow programmer using this, keep only for converting unknown error to milvusError errUnexpected = newMilvusError("unexpected error", (1<<16)-1, false) diff --git a/pkg/util/merr/errors_test.go b/pkg/util/merr/errors_test.go index 340712320e..d94fe488be 100644 --- a/pkg/util/merr/errors_test.go +++ b/pkg/util/merr/errors_test.go @@ -114,6 +114,14 @@ func (s *ErrSuite) TestWrap() { s.ErrorIs(WrapErrTopicNotFound("unknown", "failed to get topic"), ErrTopicNotFound) s.ErrorIs(WrapErrTopicNotEmpty("unknown", "topic is not empty"), ErrTopicNotEmpty) + // average related + s.ErrorIs(WrapErrAverageLabelNotRegister("unknown", "average label not register"), ErrAverageLabelNotRegister) + + // shard delegator related + s.ErrorIs(WrapErrShardDelegatorNotFound("unknown", "fail to get shard delegator"), ErrShardDelegatorNotFound) + + // task related + s.ErrorIs(WrapErrTaskQueueFull("test_task_queue", "task queue is full"), ErrTaskQueueFull) } func (s *ErrSuite) TestCombine() { diff --git a/pkg/util/merr/utils.go b/pkg/util/merr/utils.go index 5e1fefcc2a..3a48a03c8c 100644 --- a/pkg/util/merr/utils.go +++ b/pkg/util/merr/utils.go @@ -380,6 +380,33 @@ func WrapErrTopicNotEmpty(name string, msg ...string) error { return err } +// Average related +func WrapErrAverageLabelNotRegister(label string, msg ...string) error { + err := errors.Wrapf(ErrAverageLabelNotRegister, "averageLabel=%s", label) + if len(msg) > 0 { + err = errors.Wrap(err, strings.Join(msg, "; ")) + } + return err +} + +// shard delegator related +func WrapErrShardDelegatorNotFound(channel string, msg ...string) error { + err := errors.Wrapf(ErrShardDelegatorNotFound, "channel=%s", channel) + if len(msg) > 0 { + err = errors.Wrap(err, strings.Join(msg, "; ")) + } + return err +} + +// task related +func WrapErrTaskQueueFull(msg ...string) error { + err := error(ErrTaskQueueFull) + if len(msg) > 0 { + err = errors.Wrap(err, strings.Join(msg, "; ")) + } + return err +} + func wrapWithField(err error, name string, value any) error { return errors.Wrapf(err, "%s=%v", name, value) }