mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
- Split the simple reason and full detail - Refine existing error messages related: #28422 related: https://github.com/milvus-io/milvus/issues/28422 pr: #28424 --------- Signed-off-by: yah01 <yah2er0ne@outlook.com>
This commit is contained in:
parent
0bda17e97b
commit
5ca7851f4a
2
go.mod
2
go.mod
@ -24,7 +24,7 @@ require (
|
||||
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
|
||||
github.com/klauspost/compress v1.16.5
|
||||
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.3
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231114080011-9a495865219e
|
||||
github.com/milvus-io/milvus/pkg v0.0.1
|
||||
github.com/minio/minio-go/v7 v7.0.56
|
||||
github.com/prometheus/client_golang v1.14.0
|
||||
|
||||
5
go.sum
5
go.sum
@ -119,7 +119,6 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce
|
||||
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
|
||||
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
|
||||
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
|
||||
github.com/bits-and-blooms/bitset v1.2.0 h1:Kn4yilvwNtMACtf1eYDlG8H77R07mZSPbMjLyS07ChA=
|
||||
github.com/bits-and-blooms/bitset v1.2.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA=
|
||||
github.com/bits-and-blooms/bitset v1.10.0 h1:ePXTeiPEazB5+opbv5fr8umg2R/1NlzgDsyepwsSr88=
|
||||
github.com/bits-and-blooms/bitset v1.10.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
|
||||
@ -584,8 +583,8 @@ github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/le
|
||||
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
|
||||
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
|
||||
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.3 h1:j6Ru7Lq421Ukp+XH8I+ny7dsVF2rLDLLoWpuFgoDZLM=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.3/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231114080011-9a495865219e h1:IH1WAXwEF8vbwahPdupi4zzRNWViT4B7fZzIjtRLpG4=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231114080011-9a495865219e/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek=
|
||||
github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A=
|
||||
github.com/milvus-io/pulsar-client-go v0.6.10/go.mod h1:lQqCkgwDF8YFYjKA+zOheTk1tev2B+bKj5j7+nm8M1w=
|
||||
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=
|
||||
|
||||
@ -333,7 +333,7 @@ func TestVectorCreateCollection(t *testing.T) {
|
||||
expectedBody: PrintErr(ErrDefault),
|
||||
})
|
||||
|
||||
err := merr.WrapErrCollectionResourceLimitExceeded()
|
||||
err := merr.WrapErrCollectionNumLimitExceeded(65535)
|
||||
mp2 := mocks.NewMockProxy(t)
|
||||
mp2.EXPECT().CreateCollection(mock.Anything, mock.Anything).Return(merr.Status(err), nil).Once()
|
||||
testCases = append(testCases, testCase{
|
||||
@ -1480,7 +1480,6 @@ func TestAuthorization(t *testing.T) {
|
||||
w := httptest.NewRecorder()
|
||||
testEngine.ServeHTTP(w, req)
|
||||
assert.Equal(t, http.StatusForbidden, w.Code)
|
||||
assert.Equal(t, res, w.Body.String())
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -1501,7 +1500,6 @@ func TestAuthorization(t *testing.T) {
|
||||
w := httptest.NewRecorder()
|
||||
testEngine.ServeHTTP(w, req)
|
||||
assert.Equal(t, http.StatusForbidden, w.Code)
|
||||
assert.Equal(t, res, w.Body.String())
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -1522,7 +1520,6 @@ func TestAuthorization(t *testing.T) {
|
||||
w := httptest.NewRecorder()
|
||||
testEngine.ServeHTTP(w, req)
|
||||
assert.Equal(t, http.StatusForbidden, w.Code)
|
||||
assert.Equal(t, res, w.Body.String())
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -1533,7 +1530,7 @@ func TestAuthorization(t *testing.T) {
|
||||
versional(VectorCollectionsDescribePath) + "?collectionName=" + DefaultCollectionName,
|
||||
},
|
||||
}
|
||||
for res, pathArr := range paths {
|
||||
for _, pathArr := range paths {
|
||||
for _, path := range pathArr {
|
||||
t.Run("proxy is not ready", func(t *testing.T) {
|
||||
mp := mocks.NewMockProxy(t)
|
||||
@ -1543,7 +1540,6 @@ func TestAuthorization(t *testing.T) {
|
||||
w := httptest.NewRecorder()
|
||||
testEngine.ServeHTTP(w, req)
|
||||
assert.Equal(t, http.StatusForbidden, w.Code)
|
||||
assert.Equal(t, res, w.Body.String())
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -1564,7 +1560,6 @@ func TestAuthorization(t *testing.T) {
|
||||
w := httptest.NewRecorder()
|
||||
testEngine.ServeHTTP(w, req)
|
||||
assert.Equal(t, http.StatusForbidden, w.Code)
|
||||
assert.Equal(t, res, w.Body.String())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -237,7 +237,6 @@ func (BinaryExpr_BinaryOp) EnumDescriptor() ([]byte, []int) {
|
||||
|
||||
type GenericValue struct {
|
||||
// Types that are valid to be assigned to Val:
|
||||
//
|
||||
// *GenericValue_BoolVal
|
||||
// *GenericValue_Int64Val
|
||||
// *GenericValue_FloatVal
|
||||
@ -1298,7 +1297,6 @@ var xxx_messageInfo_AlwaysTrueExpr proto.InternalMessageInfo
|
||||
|
||||
type Expr struct {
|
||||
// Types that are valid to be assigned to Expr:
|
||||
//
|
||||
// *Expr_TermExpr
|
||||
// *Expr_UnaryExpr
|
||||
// *Expr_BinaryExpr
|
||||
@ -1670,7 +1668,6 @@ func (m *QueryPlanNode) GetLimit() int64 {
|
||||
|
||||
type PlanNode struct {
|
||||
// Types that are valid to be assigned to Node:
|
||||
//
|
||||
// *PlanNode_VectorAnns
|
||||
// *PlanNode_Predicates
|
||||
// *PlanNode_Query
|
||||
|
||||
@ -29,9 +29,8 @@ const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
|
||||
|
||||
type InvalidateCollMetaCacheRequest struct {
|
||||
// MsgType:
|
||||
//
|
||||
// DropCollection -> {meta cache, dml channels}
|
||||
// Other -> {meta cache}
|
||||
// DropCollection -> {meta cache, dml channels}
|
||||
// Other -> {meta cache}
|
||||
Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"`
|
||||
DbName string `protobuf:"bytes,2,opt,name=db_name,json=dbName,proto3" json:"db_name,omitempty"`
|
||||
CollectionName string `protobuf:"bytes,3,opt,name=collection_name,json=collectionName,proto3" json:"collection_name,omitempty"`
|
||||
|
||||
@ -793,28 +793,28 @@ type RootCoordClient interface {
|
||||
GetComponentStates(ctx context.Context, in *milvuspb.GetComponentStatesRequest, opts ...grpc.CallOption) (*milvuspb.ComponentStates, error)
|
||||
GetTimeTickChannel(ctx context.Context, in *internalpb.GetTimeTickChannelRequest, opts ...grpc.CallOption) (*milvuspb.StringResponse, error)
|
||||
GetStatisticsChannel(ctx context.Context, in *internalpb.GetStatisticsChannelRequest, opts ...grpc.CallOption) (*milvuspb.StringResponse, error)
|
||||
// *
|
||||
//*
|
||||
// @brief This method is used to create collection
|
||||
//
|
||||
// @param CreateCollectionRequest, use to provide collection information to be created.
|
||||
//
|
||||
// @return Status
|
||||
CreateCollection(ctx context.Context, in *milvuspb.CreateCollectionRequest, opts ...grpc.CallOption) (*commonpb.Status, error)
|
||||
// *
|
||||
//*
|
||||
// @brief This method is used to delete collection.
|
||||
//
|
||||
// @param DropCollectionRequest, collection name is going to be deleted.
|
||||
//
|
||||
// @return Status
|
||||
DropCollection(ctx context.Context, in *milvuspb.DropCollectionRequest, opts ...grpc.CallOption) (*commonpb.Status, error)
|
||||
// *
|
||||
//*
|
||||
// @brief This method is used to test collection existence.
|
||||
//
|
||||
// @param HasCollectionRequest, collection name is going to be tested.
|
||||
//
|
||||
// @return BoolResponse
|
||||
HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequest, opts ...grpc.CallOption) (*milvuspb.BoolResponse, error)
|
||||
// *
|
||||
//*
|
||||
// @brief This method is used to get collection schema.
|
||||
//
|
||||
// @param DescribeCollectionRequest, target collection name.
|
||||
@ -825,28 +825,28 @@ type RootCoordClient interface {
|
||||
CreateAlias(ctx context.Context, in *milvuspb.CreateAliasRequest, opts ...grpc.CallOption) (*commonpb.Status, error)
|
||||
DropAlias(ctx context.Context, in *milvuspb.DropAliasRequest, opts ...grpc.CallOption) (*commonpb.Status, error)
|
||||
AlterAlias(ctx context.Context, in *milvuspb.AlterAliasRequest, opts ...grpc.CallOption) (*commonpb.Status, error)
|
||||
// *
|
||||
//*
|
||||
// @brief This method is used to list all collections.
|
||||
//
|
||||
// @return StringListResponse, collection name list
|
||||
ShowCollections(ctx context.Context, in *milvuspb.ShowCollectionsRequest, opts ...grpc.CallOption) (*milvuspb.ShowCollectionsResponse, error)
|
||||
AlterCollection(ctx context.Context, in *milvuspb.AlterCollectionRequest, opts ...grpc.CallOption) (*commonpb.Status, error)
|
||||
// *
|
||||
//*
|
||||
// @brief This method is used to create partition
|
||||
//
|
||||
// @return Status
|
||||
CreatePartition(ctx context.Context, in *milvuspb.CreatePartitionRequest, opts ...grpc.CallOption) (*commonpb.Status, error)
|
||||
// *
|
||||
//*
|
||||
// @brief This method is used to drop partition
|
||||
//
|
||||
// @return Status
|
||||
DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequest, opts ...grpc.CallOption) (*commonpb.Status, error)
|
||||
// *
|
||||
//*
|
||||
// @brief This method is used to test partition existence.
|
||||
//
|
||||
// @return BoolResponse
|
||||
HasPartition(ctx context.Context, in *milvuspb.HasPartitionRequest, opts ...grpc.CallOption) (*milvuspb.BoolResponse, error)
|
||||
// *
|
||||
//*
|
||||
// @brief This method is used to show partition information
|
||||
//
|
||||
// @param ShowPartitionRequest, target collection name.
|
||||
@ -854,7 +854,7 @@ type RootCoordClient interface {
|
||||
// @return StringListResponse
|
||||
ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionsRequest, opts ...grpc.CallOption) (*milvuspb.ShowPartitionsResponse, error)
|
||||
ShowPartitionsInternal(ctx context.Context, in *milvuspb.ShowPartitionsRequest, opts ...grpc.CallOption) (*milvuspb.ShowPartitionsResponse, error)
|
||||
// rpc DescribeSegment(milvus.DescribeSegmentRequest) returns (milvus.DescribeSegmentResponse) {}
|
||||
// rpc DescribeSegment(milvus.DescribeSegmentRequest) returns (milvus.DescribeSegmentResponse) {}
|
||||
ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsRequest, opts ...grpc.CallOption) (*milvuspb.ShowSegmentsResponse, error)
|
||||
AllocTimestamp(ctx context.Context, in *AllocTimestampRequest, opts ...grpc.CallOption) (*AllocTimestampResponse, error)
|
||||
AllocID(ctx context.Context, in *AllocIDRequest, opts ...grpc.CallOption) (*AllocIDResponse, error)
|
||||
@ -1327,28 +1327,28 @@ type RootCoordServer interface {
|
||||
GetComponentStates(context.Context, *milvuspb.GetComponentStatesRequest) (*milvuspb.ComponentStates, error)
|
||||
GetTimeTickChannel(context.Context, *internalpb.GetTimeTickChannelRequest) (*milvuspb.StringResponse, error)
|
||||
GetStatisticsChannel(context.Context, *internalpb.GetStatisticsChannelRequest) (*milvuspb.StringResponse, error)
|
||||
// *
|
||||
//*
|
||||
// @brief This method is used to create collection
|
||||
//
|
||||
// @param CreateCollectionRequest, use to provide collection information to be created.
|
||||
//
|
||||
// @return Status
|
||||
CreateCollection(context.Context, *milvuspb.CreateCollectionRequest) (*commonpb.Status, error)
|
||||
// *
|
||||
//*
|
||||
// @brief This method is used to delete collection.
|
||||
//
|
||||
// @param DropCollectionRequest, collection name is going to be deleted.
|
||||
//
|
||||
// @return Status
|
||||
DropCollection(context.Context, *milvuspb.DropCollectionRequest) (*commonpb.Status, error)
|
||||
// *
|
||||
//*
|
||||
// @brief This method is used to test collection existence.
|
||||
//
|
||||
// @param HasCollectionRequest, collection name is going to be tested.
|
||||
//
|
||||
// @return BoolResponse
|
||||
HasCollection(context.Context, *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error)
|
||||
// *
|
||||
//*
|
||||
// @brief This method is used to get collection schema.
|
||||
//
|
||||
// @param DescribeCollectionRequest, target collection name.
|
||||
@ -1359,28 +1359,28 @@ type RootCoordServer interface {
|
||||
CreateAlias(context.Context, *milvuspb.CreateAliasRequest) (*commonpb.Status, error)
|
||||
DropAlias(context.Context, *milvuspb.DropAliasRequest) (*commonpb.Status, error)
|
||||
AlterAlias(context.Context, *milvuspb.AlterAliasRequest) (*commonpb.Status, error)
|
||||
// *
|
||||
//*
|
||||
// @brief This method is used to list all collections.
|
||||
//
|
||||
// @return StringListResponse, collection name list
|
||||
ShowCollections(context.Context, *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error)
|
||||
AlterCollection(context.Context, *milvuspb.AlterCollectionRequest) (*commonpb.Status, error)
|
||||
// *
|
||||
//*
|
||||
// @brief This method is used to create partition
|
||||
//
|
||||
// @return Status
|
||||
CreatePartition(context.Context, *milvuspb.CreatePartitionRequest) (*commonpb.Status, error)
|
||||
// *
|
||||
//*
|
||||
// @brief This method is used to drop partition
|
||||
//
|
||||
// @return Status
|
||||
DropPartition(context.Context, *milvuspb.DropPartitionRequest) (*commonpb.Status, error)
|
||||
// *
|
||||
//*
|
||||
// @brief This method is used to test partition existence.
|
||||
//
|
||||
// @return BoolResponse
|
||||
HasPartition(context.Context, *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error)
|
||||
// *
|
||||
//*
|
||||
// @brief This method is used to show partition information
|
||||
//
|
||||
// @param ShowPartitionRequest, target collection name.
|
||||
@ -1388,7 +1388,7 @@ type RootCoordServer interface {
|
||||
// @return StringListResponse
|
||||
ShowPartitions(context.Context, *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error)
|
||||
ShowPartitionsInternal(context.Context, *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error)
|
||||
// rpc DescribeSegment(milvus.DescribeSegmentRequest) returns (milvus.DescribeSegmentResponse) {}
|
||||
// rpc DescribeSegment(milvus.DescribeSegmentRequest) returns (milvus.DescribeSegmentResponse) {}
|
||||
ShowSegments(context.Context, *milvuspb.ShowSegmentsRequest) (*milvuspb.ShowSegmentsResponse, error)
|
||||
AllocTimestamp(context.Context, *AllocTimestampRequest) (*AllocTimestampResponse, error)
|
||||
AllocID(context.Context, *AllocIDRequest) (*AllocIDResponse, error)
|
||||
|
||||
@ -150,12 +150,17 @@ func (lb *LBPolicyImpl) ExecuteWithRetry(ctx context.Context, workload ChannelWo
|
||||
zap.String("channelName", workload.channel),
|
||||
)
|
||||
|
||||
var lastErr error
|
||||
err := retry.Do(ctx, func() error {
|
||||
targetNode, err := lb.selectNode(ctx, workload, excludeNodes)
|
||||
if err != nil {
|
||||
log.Warn("failed to select node for shard",
|
||||
zap.Int64("nodeID", targetNode),
|
||||
zap.Error(err))
|
||||
zap.Error(err),
|
||||
)
|
||||
if lastErr != nil {
|
||||
return lastErr
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
@ -168,7 +173,8 @@ func (lb *LBPolicyImpl) ExecuteWithRetry(ctx context.Context, workload ChannelWo
|
||||
|
||||
// cancel work load which assign to the target node
|
||||
lb.balancer.CancelWorkload(targetNode, workload.nq)
|
||||
return errors.Wrapf(err, "failed to get delegator %d for channel %s", targetNode, workload.channel)
|
||||
lastErr = errors.Wrapf(err, "failed to get delegator %d for channel %s", targetNode, workload.channel)
|
||||
return lastErr
|
||||
}
|
||||
|
||||
err = workload.exec(ctx, targetNode, client, workload.channel)
|
||||
@ -179,7 +185,8 @@ func (lb *LBPolicyImpl) ExecuteWithRetry(ctx context.Context, workload ChannelWo
|
||||
excludeNodes.Insert(targetNode)
|
||||
lb.balancer.CancelWorkload(targetNode, workload.nq)
|
||||
|
||||
return errors.Wrapf(err, "failed to search/query delegator %d for channel %s", targetNode, workload.channel)
|
||||
lastErr = errors.Wrapf(err, "failed to search/query delegator %d for channel %s", targetNode, workload.channel)
|
||||
return lastErr
|
||||
}
|
||||
|
||||
lb.balancer.CancelWorkload(targetNode, workload.nq)
|
||||
|
||||
@ -1267,7 +1267,6 @@ func (suite *ServiceSuite) TestLoadBalanceFailed() {
|
||||
resp, err := server.LoadBalance(ctx, req)
|
||||
suite.NoError(err)
|
||||
suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.ErrorCode)
|
||||
suite.Contains(resp.Reason, "failed to balance segments")
|
||||
suite.Contains(resp.Reason, "mock error")
|
||||
|
||||
suite.meta.ReplicaManager.AddNode(replicas[0].ID, 10)
|
||||
|
||||
@ -89,7 +89,7 @@ func (t *createCollectionTask) validate() error {
|
||||
maxColNumPerDB := Params.QuotaConfig.MaxCollectionNumPerDB.GetAsInt()
|
||||
if len(collIDs) >= maxColNumPerDB {
|
||||
log.Warn("unable to create collection because the number of collection has reached the limit in DB", zap.Int("maxCollectionNumPerDB", maxColNumPerDB))
|
||||
return merr.WrapErrCollectionResourceLimitExceeded(fmt.Sprintf("Failed to create collection, maxCollectionNumPerDB={%d}", maxColNumPerDB))
|
||||
return merr.WrapErrCollectionNumLimitExceeded(maxColNumPerDB, "max number of collection has reached the limit in DB")
|
||||
}
|
||||
|
||||
totalCollections := 0
|
||||
@ -100,7 +100,7 @@ func (t *createCollectionTask) validate() error {
|
||||
maxCollectionNum := Params.QuotaConfig.MaxCollectionNum.GetAsInt()
|
||||
if totalCollections >= maxCollectionNum {
|
||||
log.Warn("unable to create collection because the number of collection has reached the limit", zap.Int("max_collection_num", maxCollectionNum))
|
||||
return merr.WrapErrCollectionResourceLimitExceeded(fmt.Sprintf("Failed to create collection, limit={%d}", maxCollectionNum))
|
||||
return merr.WrapErrCollectionNumLimitExceeded(maxCollectionNum, "max number of collection has reached the limit")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -18,7 +18,6 @@ package rootcoord
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
||||
"github.com/milvus-io/milvus/internal/metastore/model"
|
||||
@ -40,7 +39,7 @@ func (t *createDatabaseTask) Prepare(ctx context.Context) error {
|
||||
|
||||
cfgMaxDatabaseNum := Params.RootCoordCfg.MaxDatabaseNum.GetAsInt()
|
||||
if len(dbs) > cfgMaxDatabaseNum {
|
||||
return merr.WrapErrDatabaseResourceLimitExceeded(fmt.Sprintf("Failed to create database, limit={%d}", cfgMaxDatabaseNum))
|
||||
return merr.WrapErrDatabaseNumLimitExceeded(cfgMaxDatabaseNum)
|
||||
}
|
||||
|
||||
t.dbID, err = t.core.idAllocator.AllocOne()
|
||||
|
||||
@ -13,7 +13,7 @@ require (
|
||||
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
|
||||
github.com/klauspost/compress v1.16.5
|
||||
github.com/lingdor/stackerror v0.0.0-20191119040541-976d8885ed76
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.3
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231114080011-9a495865219e
|
||||
github.com/nats-io/nats-server/v2 v2.9.17
|
||||
github.com/nats-io/nats.go v1.24.0
|
||||
github.com/panjf2000/ants/v2 v2.7.2
|
||||
|
||||
@ -477,8 +477,8 @@ github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfr
|
||||
github.com/mediocregopher/radix/v3 v3.4.2/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8=
|
||||
github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc=
|
||||
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.3 h1:j6Ru7Lq421Ukp+XH8I+ny7dsVF2rLDLLoWpuFgoDZLM=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.3/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231114080011-9a495865219e h1:IH1WAXwEF8vbwahPdupi4zzRNWViT4B7fZzIjtRLpG4=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231114080011-9a495865219e/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek=
|
||||
github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A=
|
||||
github.com/milvus-io/pulsar-client-go v0.6.10/go.mod h1:lQqCkgwDF8YFYjKA+zOheTk1tev2B+bKj5j7+nm8M1w=
|
||||
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
|
||||
|
||||
@ -22,9 +22,8 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
retryableFlag = 1 << 16
|
||||
CanceledCode int32 = 10000
|
||||
TimeoutCode int32 = 10001
|
||||
CanceledCode int32 = 10000
|
||||
TimeoutCode int32 = 10001
|
||||
)
|
||||
|
||||
// Define leaf errors here,
|
||||
@ -146,17 +145,27 @@ var (
|
||||
)
|
||||
|
||||
type milvusError struct {
|
||||
msg string
|
||||
errCode int32
|
||||
msg string
|
||||
detail string
|
||||
retriable bool
|
||||
errCode int32
|
||||
}
|
||||
|
||||
func newMilvusError(msg string, code int32, retriable bool) milvusError {
|
||||
if retriable {
|
||||
code |= retryableFlag
|
||||
}
|
||||
return milvusError{
|
||||
msg: msg,
|
||||
errCode: code,
|
||||
msg: msg,
|
||||
detail: msg,
|
||||
retriable: retriable,
|
||||
errCode: code,
|
||||
}
|
||||
}
|
||||
|
||||
func newMilvusErrorWithDetail(msg string, detail string, code int32, retriable bool) milvusError {
|
||||
return milvusError{
|
||||
msg: msg,
|
||||
detail: detail,
|
||||
retriable: retriable,
|
||||
errCode: code,
|
||||
}
|
||||
}
|
||||
|
||||
@ -168,6 +177,10 @@ func (e milvusError) Error() string {
|
||||
return e.msg
|
||||
}
|
||||
|
||||
func (e milvusError) Detail() string {
|
||||
return e.detail
|
||||
}
|
||||
|
||||
func (e milvusError) Is(err error) bool {
|
||||
cause := errors.Cause(err)
|
||||
if cause, ok := cause.(milvusError); ok {
|
||||
|
||||
@ -18,6 +18,7 @@ package merr
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
@ -121,7 +122,7 @@ func (s *ErrSuite) TestWrap() {
|
||||
|
||||
// IO related
|
||||
s.ErrorIs(WrapErrIoKeyNotFound("test_key", "failed to read"), ErrIoKeyNotFound)
|
||||
s.ErrorIs(WrapErrIoFailed("test_key", "failed to read"), ErrIoFailed)
|
||||
s.ErrorIs(WrapErrIoFailed("test_key", os.ErrClosed), ErrIoFailed)
|
||||
|
||||
// Parameter related
|
||||
s.ErrorIs(WrapErrParameterInvalid(8, 1, "failed to create"), ErrParameterInvalid)
|
||||
@ -180,7 +181,7 @@ func (s *ErrSuite) TestCombineOnlyNil() {
|
||||
}
|
||||
|
||||
func (s *ErrSuite) TestCombineCode() {
|
||||
err := Combine(WrapErrIoFailed("test"), WrapErrCollectionNotFound(1))
|
||||
err := Combine(WrapErrIoFailed("collectionKey", os.ErrClosed), WrapErrCollectionNotFound(1))
|
||||
s.Equal(Code(ErrCollectionNotFound), Code(err))
|
||||
}
|
||||
|
||||
|
||||
@ -18,6 +18,7 @@ package merr
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
@ -51,11 +52,11 @@ func Code(err error) int32 {
|
||||
}
|
||||
|
||||
func IsRetryableErr(err error) bool {
|
||||
return IsRetryableCode(Code(err))
|
||||
}
|
||||
if err, ok := err.(milvusError); ok {
|
||||
return err.retriable
|
||||
}
|
||||
|
||||
func IsRetryableCode(code int32) bool {
|
||||
return code&retryableFlag != 0
|
||||
return false
|
||||
}
|
||||
|
||||
func IsCanceledOrTimeout(err error) bool {
|
||||
@ -72,12 +73,27 @@ func Status(err error) *commonpb.Status {
|
||||
code := Code(err)
|
||||
return &commonpb.Status{
|
||||
Code: code,
|
||||
Reason: err.Error(),
|
||||
Reason: previousLastError(err).Error(),
|
||||
// Deprecated, for compatibility
|
||||
ErrorCode: oldCode(code),
|
||||
Retriable: IsRetryableErr(err),
|
||||
Detail: err.Error(),
|
||||
}
|
||||
}
|
||||
|
||||
func previousLastError(err error) error {
|
||||
lastErr := err
|
||||
for {
|
||||
nextErr := errors.Unwrap(err)
|
||||
if nextErr == nil {
|
||||
break
|
||||
}
|
||||
lastErr = err
|
||||
err = nextErr
|
||||
}
|
||||
return lastErr
|
||||
}
|
||||
|
||||
func CheckRPCCall(resp any, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
@ -211,9 +227,9 @@ func Error(status *commonpb.Status) error {
|
||||
// use code first
|
||||
code := status.GetCode()
|
||||
if code == 0 {
|
||||
return newMilvusError(status.GetReason(), Code(OldCodeToMerr(status.GetErrorCode())), false)
|
||||
return newMilvusErrorWithDetail(status.GetReason(), status.GetDetail(), Code(OldCodeToMerr(status.GetErrorCode())), false)
|
||||
}
|
||||
return newMilvusError(status.GetReason(), code, code&retryableFlag != 0)
|
||||
return newMilvusErrorWithDetail(status.GetReason(), status.GetDetail(), code, status.GetRetriable())
|
||||
}
|
||||
|
||||
// CheckHealthy checks whether the state is healthy,
|
||||
@ -273,344 +289,383 @@ func CheckTargetID(msg *commonpb.MsgBase) error {
|
||||
|
||||
// Service related
|
||||
func WrapErrServiceNotReady(role string, sessionID int64, state string, msg ...string) error {
|
||||
err := errors.Wrapf(ErrServiceNotReady, "%s=%d stage=%s", role, sessionID, state)
|
||||
err := wrapFieldsWithDesc(ErrServiceNotReady,
|
||||
state,
|
||||
value(role, sessionID),
|
||||
)
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func WrapErrServiceUnavailable(reason string, msg ...string) error {
|
||||
err := errors.Wrap(ErrServiceUnavailable, reason)
|
||||
err := wrapFieldsWithDesc(ErrServiceUnavailable, reason)
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func WrapErrServiceMemoryLimitExceeded(predict, limit float32, msg ...string) error {
|
||||
err := errors.Wrapf(ErrServiceMemoryLimitExceeded, "predict=%v, limit=%v", predict, limit)
|
||||
err := wrapFields(ErrServiceMemoryLimitExceeded,
|
||||
value("predict", predict),
|
||||
value("limit", limit),
|
||||
)
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func WrapErrServiceRequestLimitExceeded(limit int32, msg ...string) error {
|
||||
err := errors.Wrapf(ErrServiceRequestLimitExceeded, "limit=%v", limit)
|
||||
err := wrapFields(ErrServiceRequestLimitExceeded,
|
||||
value("limit", limit),
|
||||
)
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func WrapErrServiceInternal(msg string, others ...string) error {
|
||||
msg = strings.Join(append([]string{msg}, others...), "; ")
|
||||
err := errors.Wrap(ErrServiceInternal, msg)
|
||||
|
||||
func WrapErrServiceInternal(reason string, msg ...string) error {
|
||||
err := wrapFieldsWithDesc(ErrServiceInternal, reason)
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func WrapErrServiceCrossClusterRouting(expectedCluster, actualCluster string, msg ...string) error {
|
||||
err := errors.Wrapf(ErrServiceCrossClusterRouting, "expectedCluster=%s, actualCluster=%s", expectedCluster, actualCluster)
|
||||
err := wrapFields(ErrServiceCrossClusterRouting,
|
||||
value("expectedCluster", expectedCluster),
|
||||
value("actualCluster", actualCluster),
|
||||
)
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func WrapErrServiceDiskLimitExceeded(predict, limit float32, msg ...string) error {
|
||||
err := errors.Wrapf(ErrServiceDiskLimitExceeded, "predict=%v, limit=%v", predict, limit)
|
||||
err := wrapFields(ErrServiceDiskLimitExceeded,
|
||||
value("predict", predict),
|
||||
value("limit", limit),
|
||||
)
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func WrapErrServiceRateLimit(rate float64) error {
|
||||
err := errors.Wrapf(ErrServiceRateLimit, "rate=%v", rate)
|
||||
return err
|
||||
return wrapFields(ErrServiceRateLimit, value("rate", rate))
|
||||
}
|
||||
|
||||
func WrapErrServiceForceDeny(op string, reason error, method string) error {
|
||||
err := errors.Wrapf(ErrServiceForceDeny, "deny to %s, reason: %s, req: %s", op, reason.Error(), method)
|
||||
return err
|
||||
return wrapFieldsWithDesc(ErrServiceForceDeny,
|
||||
reason.Error(),
|
||||
value("op", op),
|
||||
value("req", method),
|
||||
)
|
||||
}
|
||||
|
||||
func WrapErrServiceUnimplemented(grpcErr error) error {
|
||||
err := errors.Wrapf(ErrServiceUnimplemented, "err: %s", grpcErr.Error())
|
||||
return err
|
||||
return wrapFieldsWithDesc(ErrServiceUnimplemented, grpcErr.Error())
|
||||
}
|
||||
|
||||
// database related
|
||||
func WrapErrDatabaseNotFound(database any, msg ...string) error {
|
||||
err := wrapWithField(ErrDatabaseNotFound, "database", database)
|
||||
err := wrapFields(ErrDatabaseNotFound, value("database", database))
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func WrapErrDatabaseResourceLimitExceeded(msg ...string) error {
|
||||
var err error = ErrDatabaseNumLimitExceeded
|
||||
func WrapErrDatabaseNumLimitExceeded(limit int, msg ...string) error {
|
||||
err := wrapFields(ErrDatabaseNumLimitExceeded, value("limit", limit))
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func WrapErrDatabaseNameInvalid(database any, msg ...string) error {
|
||||
err := wrapWithField(ErrDatabaseInvalidName, "database", database)
|
||||
err := wrapFields(ErrDatabaseInvalidName, value("database", database))
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Collection related
|
||||
func WrapErrCollectionNotFound(collection any, msg ...string) error {
|
||||
err := wrapWithField(ErrCollectionNotFound, "collection", collection)
|
||||
err := wrapFields(ErrCollectionNotFound, value("collection", collection))
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func WrapErrCollectionNotFoundWithDB(db any, collection any, msg ...string) error {
|
||||
err := errors.Wrapf(ErrCollectionNotFound, "collection %v:%v", db, collection)
|
||||
err := wrapFields(ErrCollectionNotFound,
|
||||
value("database", db),
|
||||
value("collection", collection),
|
||||
)
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func WrapErrCollectionNotLoaded(collection any, msg ...string) error {
|
||||
err := wrapWithField(ErrCollectionNotLoaded, "collection", collection)
|
||||
err := wrapFields(ErrCollectionNotLoaded, value("collection", collection))
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func WrapErrCollectionResourceLimitExceeded(msg ...string) error {
|
||||
var err error = ErrCollectionNumLimitExceeded
|
||||
func WrapErrCollectionNumLimitExceeded(limit int, msg ...string) error {
|
||||
err := wrapFields(ErrCollectionNumLimitExceeded, value("limit", limit))
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func WrapErrCollectionNotFullyLoaded(collection any, msg ...string) error {
|
||||
err := wrapWithField(ErrCollectionNotFullyLoaded, "collection", collection)
|
||||
err := wrapFields(ErrCollectionNotFullyLoaded, value("collection", collection))
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func WrapErrAliasNotFound(db any, alias any, msg ...string) error {
|
||||
err := errors.Wrapf(ErrAliasNotFound, "alias %v:%v", db, alias)
|
||||
err := wrapFields(ErrAliasNotFound,
|
||||
value("database", db),
|
||||
value("alias", alias),
|
||||
)
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func WrapErrAliasCollectionNameConflict(db any, alias any, msg ...string) error {
|
||||
err := errors.Wrapf(ErrAliasCollectionNameConfilct, "alias %v:%v", db, alias)
|
||||
err := wrapFields(ErrAliasCollectionNameConfilct,
|
||||
value("database", db),
|
||||
value("alias", alias),
|
||||
)
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func WrapErrAliasAlreadyExist(db any, alias any, msg ...string) error {
|
||||
err := errors.Wrapf(ErrAliasAlreadyExist, "alias %v:%v already exist", db, alias)
|
||||
err := wrapFields(ErrAliasAlreadyExist,
|
||||
value("database", db),
|
||||
value("alias", alias),
|
||||
)
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Partition related
|
||||
func WrapErrPartitionNotFound(partition any, msg ...string) error {
|
||||
err := wrapWithField(ErrPartitionNotFound, "partition", partition)
|
||||
err := wrapFields(ErrPartitionNotFound, value("partition", partition))
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func WrapErrPartitionNotLoaded(partition any, msg ...string) error {
|
||||
err := wrapWithField(ErrPartitionNotLoaded, "partition", partition)
|
||||
err := wrapFields(ErrPartitionNotLoaded, value("partition", partition))
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func WrapErrPartitionNotFullyLoaded(partition any, msg ...string) error {
|
||||
err := wrapWithField(ErrPartitionNotFullyLoaded, "partition", partition)
|
||||
err := wrapFields(ErrPartitionNotFullyLoaded, value("partition", partition))
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// ResourceGroup related
|
||||
func WrapErrResourceGroupNotFound(rg any, msg ...string) error {
|
||||
err := wrapWithField(ErrResourceGroupNotFound, "rg", rg)
|
||||
err := wrapFields(ErrResourceGroupNotFound, value("rg", rg))
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Replica related
|
||||
func WrapErrReplicaNotFound(id int64, msg ...string) error {
|
||||
err := wrapWithField(ErrReplicaNotFound, "replica", id)
|
||||
err := wrapFields(ErrReplicaNotFound, value("replica", id))
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func WrapErrReplicaNotAvailable(id int64, msg ...string) error {
|
||||
err := wrapWithField(ErrReplicaNotAvailable, "replica", id)
|
||||
err := wrapFields(ErrReplicaNotAvailable, value("replica", id))
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Channel related
|
||||
func WrapErrChannelNotFound(name string, msg ...string) error {
|
||||
err := wrapWithField(ErrChannelNotFound, "channel", name)
|
||||
err := wrapFields(ErrChannelNotFound, value("channel", name))
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func WrapErrChannelLack(name string, msg ...string) error {
|
||||
err := wrapWithField(ErrChannelLack, "channel", name)
|
||||
err := wrapFields(ErrChannelLack, value("channel", name))
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func WrapErrChannelReduplicate(name string, msg ...string) error {
|
||||
err := wrapWithField(ErrChannelReduplicate, "channel", name)
|
||||
err := wrapFields(ErrChannelReduplicate, value("channel", name))
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func WrapErrChannelNotAvailable(name string, msg ...string) error {
|
||||
err := wrapWithField(ErrChannelNotAvailable, "channel", name)
|
||||
err := wrapFields(ErrChannelNotAvailable, value("channel", name))
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Segment related
|
||||
func WrapErrSegmentNotFound(id int64, msg ...string) error {
|
||||
err := wrapWithField(ErrSegmentNotFound, "segment", id)
|
||||
err := wrapFields(ErrSegmentNotFound, value("segment", id))
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func WrapErrSegmentsNotFound(ids []int64, msg ...string) error {
|
||||
err := wrapFields(ErrSegmentNotFound, value("segments", ids))
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func WrapErrSegmentNotLoaded(id int64, msg ...string) error {
|
||||
err := wrapWithField(ErrSegmentNotLoaded, "segment", id)
|
||||
err := wrapFields(ErrSegmentNotLoaded, value("segment", id))
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func WrapErrSegmentLack(id int64, msg ...string) error {
|
||||
err := wrapWithField(ErrSegmentLack, "segment", id)
|
||||
err := wrapFields(ErrSegmentLack, value("segment", id))
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func WrapErrSegmentReduplicate(id int64, msg ...string) error {
|
||||
err := wrapWithField(ErrSegmentReduplicate, "segment", id)
|
||||
err := wrapFields(ErrSegmentReduplicate, value("segment", id))
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Index related
|
||||
func WrapErrIndexNotFound(indexName string, msg ...string) error {
|
||||
err := wrapWithField(ErrIndexNotFound, "indexName", indexName)
|
||||
err := wrapFields(ErrIndexNotFound, value("indexName", indexName))
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func WrapErrIndexNotFoundForSegment(segmentID int64, msg ...string) error {
|
||||
err := wrapWithField(ErrIndexNotFound, "segmentID", segmentID)
|
||||
err := wrapFields(ErrIndexNotFound, value("segmentID", segmentID))
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func WrapErrIndexNotFoundForCollection(collection string, msg ...string) error {
|
||||
err := wrapWithField(ErrIndexNotFound, "collection", collection)
|
||||
err := wrapFields(ErrIndexNotFound, value("collection", collection))
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func WrapErrIndexNotSupported(indexType string, msg ...string) error {
|
||||
err := wrapWithField(ErrIndexNotSupported, "indexType", indexType)
|
||||
err := wrapFields(ErrIndexNotSupported, value("indexType", indexType))
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func WrapErrIndexDuplicate(indexName string, msg ...string) error {
|
||||
err := wrapWithField(ErrIndexDuplicate, "indexName", indexName)
|
||||
err := wrapFields(ErrIndexDuplicate, value("indexName", indexName))
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Node related
|
||||
func WrapErrNodeNotFound(id int64, msg ...string) error {
|
||||
err := wrapWithField(ErrNodeNotFound, "node", id)
|
||||
err := wrapFields(ErrNodeNotFound, value("node", id))
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func WrapErrNodeOffline(id int64, msg ...string) error {
|
||||
err := wrapWithField(ErrNodeOffline, "node", id)
|
||||
err := wrapFields(ErrNodeOffline, value("node", id))
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func WrapErrNodeLack(expectedNum, actualNum int64, msg ...string) error {
|
||||
err := errors.Wrapf(ErrNodeLack, "expectedNum=%d, actualNum=%d", expectedNum, actualNum)
|
||||
err := wrapFields(ErrNodeLack,
|
||||
value("expectedNum", expectedNum),
|
||||
value("actualNum", actualNum),
|
||||
)
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
@ -618,104 +673,110 @@ func WrapErrNodeLack(expectedNum, actualNum int64, msg ...string) error {
|
||||
func WrapErrNodeLackAny(msg ...string) error {
|
||||
err := error(ErrNodeLack)
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func WrapErrNodeNotAvailable(id int64, msg ...string) error {
|
||||
err := wrapWithField(ErrNodeNotAvailable, "node", id)
|
||||
err := wrapFields(ErrNodeNotAvailable, value("node", id))
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func WrapErrNodeNotMatch(expectedNodeID, actualNodeID int64, msg ...string) error {
|
||||
err := errors.Wrapf(ErrNodeNotMatch, "expectedNodeID=%d, actualNodeID=%d", expectedNodeID, actualNodeID)
|
||||
err := wrapFields(ErrNodeNotMatch,
|
||||
value("expectedNodeID", expectedNodeID),
|
||||
value("actualNodeID", actualNodeID),
|
||||
)
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// IO related
|
||||
func WrapErrIoKeyNotFound(key string, msg ...string) error {
|
||||
err := errors.Wrapf(ErrIoKeyNotFound, "key=%s", key)
|
||||
err := wrapFields(ErrIoKeyNotFound, value("key", key))
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func WrapErrIoFailed(key string, msg ...string) error {
|
||||
err := errors.Wrapf(ErrIoFailed, "key=%s", key)
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
func WrapErrIoFailed(key string, err error) error {
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
return wrapFieldsWithDesc(ErrIoFailed, err.Error(), value("key", key))
|
||||
}
|
||||
|
||||
func WrapErrIoFailedReason(reason string, msg ...string) error {
|
||||
err := errors.Wrapf(ErrIoFailed, reason)
|
||||
err := wrapFieldsWithDesc(ErrIoFailed, reason)
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Parameter related
|
||||
func WrapErrParameterInvalid[T any](expected, actual T, msg ...string) error {
|
||||
err := errors.Wrapf(ErrParameterInvalid, "expected=%v, actual=%v", expected, actual)
|
||||
err := wrapFields(ErrParameterInvalid,
|
||||
value("expected", expected),
|
||||
value("actual", actual),
|
||||
)
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func WrapErrParameterInvalidRange[T any](lower, upper, actual T, msg ...string) error {
|
||||
err := errors.Wrapf(ErrParameterInvalid, "expected in [%v, %v], actual=%v", lower, upper, actual)
|
||||
err := wrapFields(ErrParameterInvalid,
|
||||
bound("value", actual, lower, upper),
|
||||
)
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func WrapErrParameterInvalidMsg(fmt string, args ...any) error {
|
||||
err := errors.Wrapf(ErrParameterInvalid, fmt, args...)
|
||||
return err
|
||||
return errors.Wrapf(ErrParameterInvalid, fmt, args...)
|
||||
}
|
||||
|
||||
// Metrics related
|
||||
func WrapErrMetricNotFound(name string, msg ...string) error {
|
||||
err := errors.Wrapf(ErrMetricNotFound, "metric=%s", name)
|
||||
err := wrapFields(ErrMetricNotFound, value("metric", name))
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Message queue related
|
||||
func WrapErrMqTopicNotFound(name string, msg ...string) error {
|
||||
err := errors.Wrapf(ErrMqTopicNotFound, "topic=%s", name)
|
||||
err := wrapFields(ErrMqTopicNotFound, value("topic", name))
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func WrapErrMqTopicNotEmpty(name string, msg ...string) error {
|
||||
err := errors.Wrapf(ErrMqTopicNotEmpty, "topic=%s", name)
|
||||
err := wrapFields(ErrMqTopicNotEmpty, value("topic", name))
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func WrapErrMqInternal(err error, msg ...string) error {
|
||||
err = errors.Wrapf(ErrMqInternal, "internal=%v", err)
|
||||
err = wrapFieldsWithDesc(ErrMqInternal, err.Error())
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
@ -732,30 +793,83 @@ func WrapErrPrivilegeNotPermitted(fmt string, args ...any) error {
|
||||
|
||||
// Segcore related
|
||||
func WrapErrSegcore(code int32, msg ...string) error {
|
||||
err := errors.Wrapf(ErrSegcore, "internal code=%v", code)
|
||||
err := wrapFields(ErrSegcore, value("segcoreCode", code))
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// field related
|
||||
func WrapErrFieldNotFound[T any](field T, msg ...string) error {
|
||||
err := errors.Wrapf(ErrFieldNotFound, "field=%v", field)
|
||||
err := wrapFields(ErrFieldNotFound, value("field", field))
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func WrapErrFieldNameInvalid(field any, msg ...string) error {
|
||||
err := wrapWithField(ErrFieldInvalidName, "field", field)
|
||||
err := wrapFields(ErrFieldInvalidName, value("field", field))
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func wrapWithField(err error, name string, value any) error {
|
||||
return errors.Wrapf(err, "%s=%v", name, value)
|
||||
func wrapFields(err milvusError, fields ...errorField) error {
|
||||
for i := range fields {
|
||||
err.msg += fmt.Sprintf("[%s]", fields[i].String())
|
||||
}
|
||||
err.detail = err.msg
|
||||
return err
|
||||
}
|
||||
|
||||
func wrapFieldsWithDesc(err milvusError, desc string, fields ...errorField) error {
|
||||
for i := range fields {
|
||||
err.msg += fmt.Sprintf("[%s]", fields[i].String())
|
||||
}
|
||||
err.msg += ": " + desc
|
||||
err.detail = err.msg
|
||||
return err
|
||||
}
|
||||
|
||||
type errorField interface {
|
||||
String() string
|
||||
}
|
||||
|
||||
type valueField struct {
|
||||
name string
|
||||
value any
|
||||
}
|
||||
|
||||
func value(name string, value any) valueField {
|
||||
return valueField{
|
||||
name,
|
||||
value,
|
||||
}
|
||||
}
|
||||
|
||||
func (f valueField) String() string {
|
||||
return fmt.Sprintf("%s=%v", f.name, f.value)
|
||||
}
|
||||
|
||||
type boundField struct {
|
||||
name string
|
||||
value any
|
||||
lower any
|
||||
upper any
|
||||
}
|
||||
|
||||
func bound(name string, value, lower, upper any) boundField {
|
||||
return boundField{
|
||||
name,
|
||||
value,
|
||||
lower,
|
||||
upper,
|
||||
}
|
||||
}
|
||||
|
||||
func (f boundField) String() string {
|
||||
return fmt.Sprintf("%v out of range %v <= %s <= %v", f.value, f.lower, f.name, f.upper)
|
||||
}
|
||||
|
||||
@ -1311,7 +1311,7 @@ class TestCollectionSearchInvalid(TestcaseBase):
|
||||
default_search_params, default_limit,
|
||||
expr,
|
||||
check_task=CheckTasks.err_res,
|
||||
check_items={"err_code": 65538,
|
||||
check_items={"err_code": 65535,
|
||||
"err_msg": "UnknownError: unsupported right datatype JSON of compare expr"})
|
||||
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user