diff --git a/go.mod b/go.mod index a8e826cf45..5846df3362 100644 --- a/go.mod +++ b/go.mod @@ -24,7 +24,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/klauspost/compress v1.16.5 github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d - github.com/milvus-io/milvus-proto/go-api/v2 v2.3.3 + github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231114080011-9a495865219e github.com/milvus-io/milvus/pkg v0.0.1 github.com/minio/minio-go/v7 v7.0.56 github.com/prometheus/client_golang v1.14.0 diff --git a/go.sum b/go.sum index b2137a7eb7..c73574e176 100644 --- a/go.sum +++ b/go.sum @@ -119,7 +119,6 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= -github.com/bits-and-blooms/bitset v1.2.0 h1:Kn4yilvwNtMACtf1eYDlG8H77R07mZSPbMjLyS07ChA= github.com/bits-and-blooms/bitset v1.2.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA= github.com/bits-and-blooms/bitset v1.10.0 h1:ePXTeiPEazB5+opbv5fr8umg2R/1NlzgDsyepwsSr88= github.com/bits-and-blooms/bitset v1.10.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= @@ -584,8 +583,8 @@ github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/le github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4= -github.com/milvus-io/milvus-proto/go-api/v2 v2.3.3 h1:j6Ru7Lq421Ukp+XH8I+ny7dsVF2rLDLLoWpuFgoDZLM= -github.com/milvus-io/milvus-proto/go-api/v2 v2.3.3/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek= +github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231114080011-9a495865219e h1:IH1WAXwEF8vbwahPdupi4zzRNWViT4B7fZzIjtRLpG4= +github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231114080011-9a495865219e/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek= github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A= github.com/milvus-io/pulsar-client-go v0.6.10/go.mod h1:lQqCkgwDF8YFYjKA+zOheTk1tev2B+bKj5j7+nm8M1w= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= diff --git a/internal/distributed/proxy/httpserver/handler_v1_test.go b/internal/distributed/proxy/httpserver/handler_v1_test.go index 70e0fa371e..b42c5b76b8 100644 --- a/internal/distributed/proxy/httpserver/handler_v1_test.go +++ b/internal/distributed/proxy/httpserver/handler_v1_test.go @@ -333,7 +333,7 @@ func TestVectorCreateCollection(t *testing.T) { expectedBody: PrintErr(ErrDefault), }) - err := merr.WrapErrCollectionResourceLimitExceeded() + err := merr.WrapErrCollectionNumLimitExceeded(65535) mp2 := mocks.NewMockProxy(t) mp2.EXPECT().CreateCollection(mock.Anything, mock.Anything).Return(merr.Status(err), nil).Once() testCases = append(testCases, testCase{ @@ -1480,7 +1480,6 @@ func TestAuthorization(t *testing.T) { w := httptest.NewRecorder() testEngine.ServeHTTP(w, req) assert.Equal(t, http.StatusForbidden, w.Code) - assert.Equal(t, res, w.Body.String()) }) } } @@ -1501,7 +1500,6 @@ func TestAuthorization(t *testing.T) { w := httptest.NewRecorder() testEngine.ServeHTTP(w, req) assert.Equal(t, http.StatusForbidden, w.Code) - assert.Equal(t, res, w.Body.String()) }) } } @@ -1522,7 +1520,6 @@ func TestAuthorization(t *testing.T) { w := httptest.NewRecorder() testEngine.ServeHTTP(w, req) assert.Equal(t, http.StatusForbidden, w.Code) - assert.Equal(t, res, w.Body.String()) }) } } @@ -1533,7 +1530,7 @@ func TestAuthorization(t *testing.T) { versional(VectorCollectionsDescribePath) + "?collectionName=" + DefaultCollectionName, }, } - for res, pathArr := range paths { + for _, pathArr := range paths { for _, path := range pathArr { t.Run("proxy is not ready", func(t *testing.T) { mp := mocks.NewMockProxy(t) @@ -1543,7 +1540,6 @@ func TestAuthorization(t *testing.T) { w := httptest.NewRecorder() testEngine.ServeHTTP(w, req) assert.Equal(t, http.StatusForbidden, w.Code) - assert.Equal(t, res, w.Body.String()) }) } } @@ -1564,7 +1560,6 @@ func TestAuthorization(t *testing.T) { w := httptest.NewRecorder() testEngine.ServeHTTP(w, req) assert.Equal(t, http.StatusForbidden, w.Code) - assert.Equal(t, res, w.Body.String()) }) } } diff --git a/internal/proto/planpb/plan.pb.go b/internal/proto/planpb/plan.pb.go index 2edfb4c927..06de0040e9 100644 --- a/internal/proto/planpb/plan.pb.go +++ b/internal/proto/planpb/plan.pb.go @@ -237,7 +237,6 @@ func (BinaryExpr_BinaryOp) EnumDescriptor() ([]byte, []int) { type GenericValue struct { // Types that are valid to be assigned to Val: - // // *GenericValue_BoolVal // *GenericValue_Int64Val // *GenericValue_FloatVal @@ -1298,7 +1297,6 @@ var xxx_messageInfo_AlwaysTrueExpr proto.InternalMessageInfo type Expr struct { // Types that are valid to be assigned to Expr: - // // *Expr_TermExpr // *Expr_UnaryExpr // *Expr_BinaryExpr @@ -1670,7 +1668,6 @@ func (m *QueryPlanNode) GetLimit() int64 { type PlanNode struct { // Types that are valid to be assigned to Node: - // // *PlanNode_VectorAnns // *PlanNode_Predicates // *PlanNode_Query diff --git a/internal/proto/proxypb/proxy.pb.go b/internal/proto/proxypb/proxy.pb.go index c5cf24fd78..60ba4e6a11 100644 --- a/internal/proto/proxypb/proxy.pb.go +++ b/internal/proto/proxypb/proxy.pb.go @@ -29,9 +29,8 @@ const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package type InvalidateCollMetaCacheRequest struct { // MsgType: - // - // DropCollection -> {meta cache, dml channels} - // Other -> {meta cache} + // DropCollection -> {meta cache, dml channels} + // Other -> {meta cache} Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"` DbName string `protobuf:"bytes,2,opt,name=db_name,json=dbName,proto3" json:"db_name,omitempty"` CollectionName string `protobuf:"bytes,3,opt,name=collection_name,json=collectionName,proto3" json:"collection_name,omitempty"` diff --git a/internal/proto/rootcoordpb/root_coord.pb.go b/internal/proto/rootcoordpb/root_coord.pb.go index ff74594ed2..7d16956e9b 100644 --- a/internal/proto/rootcoordpb/root_coord.pb.go +++ b/internal/proto/rootcoordpb/root_coord.pb.go @@ -793,28 +793,28 @@ type RootCoordClient interface { GetComponentStates(ctx context.Context, in *milvuspb.GetComponentStatesRequest, opts ...grpc.CallOption) (*milvuspb.ComponentStates, error) GetTimeTickChannel(ctx context.Context, in *internalpb.GetTimeTickChannelRequest, opts ...grpc.CallOption) (*milvuspb.StringResponse, error) GetStatisticsChannel(ctx context.Context, in *internalpb.GetStatisticsChannelRequest, opts ...grpc.CallOption) (*milvuspb.StringResponse, error) - // * + //* // @brief This method is used to create collection // // @param CreateCollectionRequest, use to provide collection information to be created. // // @return Status CreateCollection(ctx context.Context, in *milvuspb.CreateCollectionRequest, opts ...grpc.CallOption) (*commonpb.Status, error) - // * + //* // @brief This method is used to delete collection. // // @param DropCollectionRequest, collection name is going to be deleted. // // @return Status DropCollection(ctx context.Context, in *milvuspb.DropCollectionRequest, opts ...grpc.CallOption) (*commonpb.Status, error) - // * + //* // @brief This method is used to test collection existence. // // @param HasCollectionRequest, collection name is going to be tested. // // @return BoolResponse HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequest, opts ...grpc.CallOption) (*milvuspb.BoolResponse, error) - // * + //* // @brief This method is used to get collection schema. // // @param DescribeCollectionRequest, target collection name. @@ -825,28 +825,28 @@ type RootCoordClient interface { CreateAlias(ctx context.Context, in *milvuspb.CreateAliasRequest, opts ...grpc.CallOption) (*commonpb.Status, error) DropAlias(ctx context.Context, in *milvuspb.DropAliasRequest, opts ...grpc.CallOption) (*commonpb.Status, error) AlterAlias(ctx context.Context, in *milvuspb.AlterAliasRequest, opts ...grpc.CallOption) (*commonpb.Status, error) - // * + //* // @brief This method is used to list all collections. // // @return StringListResponse, collection name list ShowCollections(ctx context.Context, in *milvuspb.ShowCollectionsRequest, opts ...grpc.CallOption) (*milvuspb.ShowCollectionsResponse, error) AlterCollection(ctx context.Context, in *milvuspb.AlterCollectionRequest, opts ...grpc.CallOption) (*commonpb.Status, error) - // * + //* // @brief This method is used to create partition // // @return Status CreatePartition(ctx context.Context, in *milvuspb.CreatePartitionRequest, opts ...grpc.CallOption) (*commonpb.Status, error) - // * + //* // @brief This method is used to drop partition // // @return Status DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequest, opts ...grpc.CallOption) (*commonpb.Status, error) - // * + //* // @brief This method is used to test partition existence. // // @return BoolResponse HasPartition(ctx context.Context, in *milvuspb.HasPartitionRequest, opts ...grpc.CallOption) (*milvuspb.BoolResponse, error) - // * + //* // @brief This method is used to show partition information // // @param ShowPartitionRequest, target collection name. @@ -854,7 +854,7 @@ type RootCoordClient interface { // @return StringListResponse ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionsRequest, opts ...grpc.CallOption) (*milvuspb.ShowPartitionsResponse, error) ShowPartitionsInternal(ctx context.Context, in *milvuspb.ShowPartitionsRequest, opts ...grpc.CallOption) (*milvuspb.ShowPartitionsResponse, error) - // rpc DescribeSegment(milvus.DescribeSegmentRequest) returns (milvus.DescribeSegmentResponse) {} + // rpc DescribeSegment(milvus.DescribeSegmentRequest) returns (milvus.DescribeSegmentResponse) {} ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsRequest, opts ...grpc.CallOption) (*milvuspb.ShowSegmentsResponse, error) AllocTimestamp(ctx context.Context, in *AllocTimestampRequest, opts ...grpc.CallOption) (*AllocTimestampResponse, error) AllocID(ctx context.Context, in *AllocIDRequest, opts ...grpc.CallOption) (*AllocIDResponse, error) @@ -1327,28 +1327,28 @@ type RootCoordServer interface { GetComponentStates(context.Context, *milvuspb.GetComponentStatesRequest) (*milvuspb.ComponentStates, error) GetTimeTickChannel(context.Context, *internalpb.GetTimeTickChannelRequest) (*milvuspb.StringResponse, error) GetStatisticsChannel(context.Context, *internalpb.GetStatisticsChannelRequest) (*milvuspb.StringResponse, error) - // * + //* // @brief This method is used to create collection // // @param CreateCollectionRequest, use to provide collection information to be created. // // @return Status CreateCollection(context.Context, *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) - // * + //* // @brief This method is used to delete collection. // // @param DropCollectionRequest, collection name is going to be deleted. // // @return Status DropCollection(context.Context, *milvuspb.DropCollectionRequest) (*commonpb.Status, error) - // * + //* // @brief This method is used to test collection existence. // // @param HasCollectionRequest, collection name is going to be tested. // // @return BoolResponse HasCollection(context.Context, *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) - // * + //* // @brief This method is used to get collection schema. // // @param DescribeCollectionRequest, target collection name. @@ -1359,28 +1359,28 @@ type RootCoordServer interface { CreateAlias(context.Context, *milvuspb.CreateAliasRequest) (*commonpb.Status, error) DropAlias(context.Context, *milvuspb.DropAliasRequest) (*commonpb.Status, error) AlterAlias(context.Context, *milvuspb.AlterAliasRequest) (*commonpb.Status, error) - // * + //* // @brief This method is used to list all collections. // // @return StringListResponse, collection name list ShowCollections(context.Context, *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) AlterCollection(context.Context, *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) - // * + //* // @brief This method is used to create partition // // @return Status CreatePartition(context.Context, *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) - // * + //* // @brief This method is used to drop partition // // @return Status DropPartition(context.Context, *milvuspb.DropPartitionRequest) (*commonpb.Status, error) - // * + //* // @brief This method is used to test partition existence. // // @return BoolResponse HasPartition(context.Context, *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) - // * + //* // @brief This method is used to show partition information // // @param ShowPartitionRequest, target collection name. @@ -1388,7 +1388,7 @@ type RootCoordServer interface { // @return StringListResponse ShowPartitions(context.Context, *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) ShowPartitionsInternal(context.Context, *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) - // rpc DescribeSegment(milvus.DescribeSegmentRequest) returns (milvus.DescribeSegmentResponse) {} + // rpc DescribeSegment(milvus.DescribeSegmentRequest) returns (milvus.DescribeSegmentResponse) {} ShowSegments(context.Context, *milvuspb.ShowSegmentsRequest) (*milvuspb.ShowSegmentsResponse, error) AllocTimestamp(context.Context, *AllocTimestampRequest) (*AllocTimestampResponse, error) AllocID(context.Context, *AllocIDRequest) (*AllocIDResponse, error) diff --git a/internal/proxy/lb_policy.go b/internal/proxy/lb_policy.go index f81cab8c4a..417a2221fd 100644 --- a/internal/proxy/lb_policy.go +++ b/internal/proxy/lb_policy.go @@ -150,12 +150,17 @@ func (lb *LBPolicyImpl) ExecuteWithRetry(ctx context.Context, workload ChannelWo zap.String("channelName", workload.channel), ) + var lastErr error err := retry.Do(ctx, func() error { targetNode, err := lb.selectNode(ctx, workload, excludeNodes) if err != nil { log.Warn("failed to select node for shard", zap.Int64("nodeID", targetNode), - zap.Error(err)) + zap.Error(err), + ) + if lastErr != nil { + return lastErr + } return err } @@ -168,7 +173,8 @@ func (lb *LBPolicyImpl) ExecuteWithRetry(ctx context.Context, workload ChannelWo // cancel work load which assign to the target node lb.balancer.CancelWorkload(targetNode, workload.nq) - return errors.Wrapf(err, "failed to get delegator %d for channel %s", targetNode, workload.channel) + lastErr = errors.Wrapf(err, "failed to get delegator %d for channel %s", targetNode, workload.channel) + return lastErr } err = workload.exec(ctx, targetNode, client, workload.channel) @@ -179,7 +185,8 @@ func (lb *LBPolicyImpl) ExecuteWithRetry(ctx context.Context, workload ChannelWo excludeNodes.Insert(targetNode) lb.balancer.CancelWorkload(targetNode, workload.nq) - return errors.Wrapf(err, "failed to search/query delegator %d for channel %s", targetNode, workload.channel) + lastErr = errors.Wrapf(err, "failed to search/query delegator %d for channel %s", targetNode, workload.channel) + return lastErr } lb.balancer.CancelWorkload(targetNode, workload.nq) diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index eb4bbd9e61..b5633c15d1 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -1267,7 +1267,6 @@ func (suite *ServiceSuite) TestLoadBalanceFailed() { resp, err := server.LoadBalance(ctx, req) suite.NoError(err) suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.ErrorCode) - suite.Contains(resp.Reason, "failed to balance segments") suite.Contains(resp.Reason, "mock error") suite.meta.ReplicaManager.AddNode(replicas[0].ID, 10) diff --git a/internal/rootcoord/create_collection_task.go b/internal/rootcoord/create_collection_task.go index 5a8afcf093..667a211efa 100644 --- a/internal/rootcoord/create_collection_task.go +++ b/internal/rootcoord/create_collection_task.go @@ -89,7 +89,7 @@ func (t *createCollectionTask) validate() error { maxColNumPerDB := Params.QuotaConfig.MaxCollectionNumPerDB.GetAsInt() if len(collIDs) >= maxColNumPerDB { log.Warn("unable to create collection because the number of collection has reached the limit in DB", zap.Int("maxCollectionNumPerDB", maxColNumPerDB)) - return merr.WrapErrCollectionResourceLimitExceeded(fmt.Sprintf("Failed to create collection, maxCollectionNumPerDB={%d}", maxColNumPerDB)) + return merr.WrapErrCollectionNumLimitExceeded(maxColNumPerDB, "max number of collection has reached the limit in DB") } totalCollections := 0 @@ -100,7 +100,7 @@ func (t *createCollectionTask) validate() error { maxCollectionNum := Params.QuotaConfig.MaxCollectionNum.GetAsInt() if totalCollections >= maxCollectionNum { log.Warn("unable to create collection because the number of collection has reached the limit", zap.Int("max_collection_num", maxCollectionNum)) - return merr.WrapErrCollectionResourceLimitExceeded(fmt.Sprintf("Failed to create collection, limit={%d}", maxCollectionNum)) + return merr.WrapErrCollectionNumLimitExceeded(maxCollectionNum, "max number of collection has reached the limit") } return nil } diff --git a/internal/rootcoord/create_db_task.go b/internal/rootcoord/create_db_task.go index a324166c38..089c25c68f 100644 --- a/internal/rootcoord/create_db_task.go +++ b/internal/rootcoord/create_db_task.go @@ -18,7 +18,6 @@ package rootcoord import ( "context" - "fmt" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus/internal/metastore/model" @@ -40,7 +39,7 @@ func (t *createDatabaseTask) Prepare(ctx context.Context) error { cfgMaxDatabaseNum := Params.RootCoordCfg.MaxDatabaseNum.GetAsInt() if len(dbs) > cfgMaxDatabaseNum { - return merr.WrapErrDatabaseResourceLimitExceeded(fmt.Sprintf("Failed to create database, limit={%d}", cfgMaxDatabaseNum)) + return merr.WrapErrDatabaseNumLimitExceeded(cfgMaxDatabaseNum) } t.dbID, err = t.core.idAllocator.AllocOne() diff --git a/pkg/go.mod b/pkg/go.mod index af15e8db25..3ce82bd001 100644 --- a/pkg/go.mod +++ b/pkg/go.mod @@ -13,7 +13,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/klauspost/compress v1.16.5 github.com/lingdor/stackerror v0.0.0-20191119040541-976d8885ed76 - github.com/milvus-io/milvus-proto/go-api/v2 v2.3.3 + github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231114080011-9a495865219e github.com/nats-io/nats-server/v2 v2.9.17 github.com/nats-io/nats.go v1.24.0 github.com/panjf2000/ants/v2 v2.7.2 diff --git a/pkg/go.sum b/pkg/go.sum index f78e68ba4d..d0c99e30c2 100644 --- a/pkg/go.sum +++ b/pkg/go.sum @@ -477,8 +477,8 @@ github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfr github.com/mediocregopher/radix/v3 v3.4.2/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8= github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= -github.com/milvus-io/milvus-proto/go-api/v2 v2.3.3 h1:j6Ru7Lq421Ukp+XH8I+ny7dsVF2rLDLLoWpuFgoDZLM= -github.com/milvus-io/milvus-proto/go-api/v2 v2.3.3/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek= +github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231114080011-9a495865219e h1:IH1WAXwEF8vbwahPdupi4zzRNWViT4B7fZzIjtRLpG4= +github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231114080011-9a495865219e/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek= github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A= github.com/milvus-io/pulsar-client-go v0.6.10/go.mod h1:lQqCkgwDF8YFYjKA+zOheTk1tev2B+bKj5j7+nm8M1w= github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= diff --git a/pkg/util/merr/errors.go b/pkg/util/merr/errors.go index 4348ccec73..790fb20696 100644 --- a/pkg/util/merr/errors.go +++ b/pkg/util/merr/errors.go @@ -22,9 +22,8 @@ import ( ) const ( - retryableFlag = 1 << 16 - CanceledCode int32 = 10000 - TimeoutCode int32 = 10001 + CanceledCode int32 = 10000 + TimeoutCode int32 = 10001 ) // Define leaf errors here, @@ -146,17 +145,27 @@ var ( ) type milvusError struct { - msg string - errCode int32 + msg string + detail string + retriable bool + errCode int32 } func newMilvusError(msg string, code int32, retriable bool) milvusError { - if retriable { - code |= retryableFlag - } return milvusError{ - msg: msg, - errCode: code, + msg: msg, + detail: msg, + retriable: retriable, + errCode: code, + } +} + +func newMilvusErrorWithDetail(msg string, detail string, code int32, retriable bool) milvusError { + return milvusError{ + msg: msg, + detail: detail, + retriable: retriable, + errCode: code, } } @@ -168,6 +177,10 @@ func (e milvusError) Error() string { return e.msg } +func (e milvusError) Detail() string { + return e.detail +} + func (e milvusError) Is(err error) bool { cause := errors.Cause(err) if cause, ok := cause.(milvusError); ok { diff --git a/pkg/util/merr/errors_test.go b/pkg/util/merr/errors_test.go index 42b2477c93..78646c820b 100644 --- a/pkg/util/merr/errors_test.go +++ b/pkg/util/merr/errors_test.go @@ -18,6 +18,7 @@ package merr import ( "context" + "os" "testing" "github.com/cockroachdb/errors" @@ -121,7 +122,7 @@ func (s *ErrSuite) TestWrap() { // IO related s.ErrorIs(WrapErrIoKeyNotFound("test_key", "failed to read"), ErrIoKeyNotFound) - s.ErrorIs(WrapErrIoFailed("test_key", "failed to read"), ErrIoFailed) + s.ErrorIs(WrapErrIoFailed("test_key", os.ErrClosed), ErrIoFailed) // Parameter related s.ErrorIs(WrapErrParameterInvalid(8, 1, "failed to create"), ErrParameterInvalid) @@ -180,7 +181,7 @@ func (s *ErrSuite) TestCombineOnlyNil() { } func (s *ErrSuite) TestCombineCode() { - err := Combine(WrapErrIoFailed("test"), WrapErrCollectionNotFound(1)) + err := Combine(WrapErrIoFailed("collectionKey", os.ErrClosed), WrapErrCollectionNotFound(1)) s.Equal(Code(ErrCollectionNotFound), Code(err)) } diff --git a/pkg/util/merr/utils.go b/pkg/util/merr/utils.go index f5e174aab6..1fd3afe3c2 100644 --- a/pkg/util/merr/utils.go +++ b/pkg/util/merr/utils.go @@ -18,6 +18,7 @@ package merr import ( "context" + "fmt" "strings" "github.com/cockroachdb/errors" @@ -51,11 +52,11 @@ func Code(err error) int32 { } func IsRetryableErr(err error) bool { - return IsRetryableCode(Code(err)) -} + if err, ok := err.(milvusError); ok { + return err.retriable + } -func IsRetryableCode(code int32) bool { - return code&retryableFlag != 0 + return false } func IsCanceledOrTimeout(err error) bool { @@ -72,12 +73,27 @@ func Status(err error) *commonpb.Status { code := Code(err) return &commonpb.Status{ Code: code, - Reason: err.Error(), + Reason: previousLastError(err).Error(), // Deprecated, for compatibility ErrorCode: oldCode(code), + Retriable: IsRetryableErr(err), + Detail: err.Error(), } } +func previousLastError(err error) error { + lastErr := err + for { + nextErr := errors.Unwrap(err) + if nextErr == nil { + break + } + lastErr = err + err = nextErr + } + return lastErr +} + func CheckRPCCall(resp any, err error) error { if err != nil { return err @@ -211,9 +227,9 @@ func Error(status *commonpb.Status) error { // use code first code := status.GetCode() if code == 0 { - return newMilvusError(status.GetReason(), Code(OldCodeToMerr(status.GetErrorCode())), false) + return newMilvusErrorWithDetail(status.GetReason(), status.GetDetail(), Code(OldCodeToMerr(status.GetErrorCode())), false) } - return newMilvusError(status.GetReason(), code, code&retryableFlag != 0) + return newMilvusErrorWithDetail(status.GetReason(), status.GetDetail(), code, status.GetRetriable()) } // CheckHealthy checks whether the state is healthy, @@ -273,344 +289,383 @@ func CheckTargetID(msg *commonpb.MsgBase) error { // Service related func WrapErrServiceNotReady(role string, sessionID int64, state string, msg ...string) error { - err := errors.Wrapf(ErrServiceNotReady, "%s=%d stage=%s", role, sessionID, state) + err := wrapFieldsWithDesc(ErrServiceNotReady, + state, + value(role, sessionID), + ) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } func WrapErrServiceUnavailable(reason string, msg ...string) error { - err := errors.Wrap(ErrServiceUnavailable, reason) + err := wrapFieldsWithDesc(ErrServiceUnavailable, reason) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } func WrapErrServiceMemoryLimitExceeded(predict, limit float32, msg ...string) error { - err := errors.Wrapf(ErrServiceMemoryLimitExceeded, "predict=%v, limit=%v", predict, limit) + err := wrapFields(ErrServiceMemoryLimitExceeded, + value("predict", predict), + value("limit", limit), + ) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } func WrapErrServiceRequestLimitExceeded(limit int32, msg ...string) error { - err := errors.Wrapf(ErrServiceRequestLimitExceeded, "limit=%v", limit) + err := wrapFields(ErrServiceRequestLimitExceeded, + value("limit", limit), + ) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } -func WrapErrServiceInternal(msg string, others ...string) error { - msg = strings.Join(append([]string{msg}, others...), "; ") - err := errors.Wrap(ErrServiceInternal, msg) - +func WrapErrServiceInternal(reason string, msg ...string) error { + err := wrapFieldsWithDesc(ErrServiceInternal, reason) + if len(msg) > 0 { + err = errors.Wrap(err, strings.Join(msg, "->")) + } return err } func WrapErrServiceCrossClusterRouting(expectedCluster, actualCluster string, msg ...string) error { - err := errors.Wrapf(ErrServiceCrossClusterRouting, "expectedCluster=%s, actualCluster=%s", expectedCluster, actualCluster) + err := wrapFields(ErrServiceCrossClusterRouting, + value("expectedCluster", expectedCluster), + value("actualCluster", actualCluster), + ) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } func WrapErrServiceDiskLimitExceeded(predict, limit float32, msg ...string) error { - err := errors.Wrapf(ErrServiceDiskLimitExceeded, "predict=%v, limit=%v", predict, limit) + err := wrapFields(ErrServiceDiskLimitExceeded, + value("predict", predict), + value("limit", limit), + ) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } func WrapErrServiceRateLimit(rate float64) error { - err := errors.Wrapf(ErrServiceRateLimit, "rate=%v", rate) - return err + return wrapFields(ErrServiceRateLimit, value("rate", rate)) } func WrapErrServiceForceDeny(op string, reason error, method string) error { - err := errors.Wrapf(ErrServiceForceDeny, "deny to %s, reason: %s, req: %s", op, reason.Error(), method) - return err + return wrapFieldsWithDesc(ErrServiceForceDeny, + reason.Error(), + value("op", op), + value("req", method), + ) } func WrapErrServiceUnimplemented(grpcErr error) error { - err := errors.Wrapf(ErrServiceUnimplemented, "err: %s", grpcErr.Error()) - return err + return wrapFieldsWithDesc(ErrServiceUnimplemented, grpcErr.Error()) } // database related func WrapErrDatabaseNotFound(database any, msg ...string) error { - err := wrapWithField(ErrDatabaseNotFound, "database", database) + err := wrapFields(ErrDatabaseNotFound, value("database", database)) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } -func WrapErrDatabaseResourceLimitExceeded(msg ...string) error { - var err error = ErrDatabaseNumLimitExceeded +func WrapErrDatabaseNumLimitExceeded(limit int, msg ...string) error { + err := wrapFields(ErrDatabaseNumLimitExceeded, value("limit", limit)) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } func WrapErrDatabaseNameInvalid(database any, msg ...string) error { - err := wrapWithField(ErrDatabaseInvalidName, "database", database) + err := wrapFields(ErrDatabaseInvalidName, value("database", database)) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } // Collection related func WrapErrCollectionNotFound(collection any, msg ...string) error { - err := wrapWithField(ErrCollectionNotFound, "collection", collection) + err := wrapFields(ErrCollectionNotFound, value("collection", collection)) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } func WrapErrCollectionNotFoundWithDB(db any, collection any, msg ...string) error { - err := errors.Wrapf(ErrCollectionNotFound, "collection %v:%v", db, collection) + err := wrapFields(ErrCollectionNotFound, + value("database", db), + value("collection", collection), + ) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } func WrapErrCollectionNotLoaded(collection any, msg ...string) error { - err := wrapWithField(ErrCollectionNotLoaded, "collection", collection) + err := wrapFields(ErrCollectionNotLoaded, value("collection", collection)) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } -func WrapErrCollectionResourceLimitExceeded(msg ...string) error { - var err error = ErrCollectionNumLimitExceeded +func WrapErrCollectionNumLimitExceeded(limit int, msg ...string) error { + err := wrapFields(ErrCollectionNumLimitExceeded, value("limit", limit)) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } func WrapErrCollectionNotFullyLoaded(collection any, msg ...string) error { - err := wrapWithField(ErrCollectionNotFullyLoaded, "collection", collection) + err := wrapFields(ErrCollectionNotFullyLoaded, value("collection", collection)) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } func WrapErrAliasNotFound(db any, alias any, msg ...string) error { - err := errors.Wrapf(ErrAliasNotFound, "alias %v:%v", db, alias) + err := wrapFields(ErrAliasNotFound, + value("database", db), + value("alias", alias), + ) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } func WrapErrAliasCollectionNameConflict(db any, alias any, msg ...string) error { - err := errors.Wrapf(ErrAliasCollectionNameConfilct, "alias %v:%v", db, alias) + err := wrapFields(ErrAliasCollectionNameConfilct, + value("database", db), + value("alias", alias), + ) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } func WrapErrAliasAlreadyExist(db any, alias any, msg ...string) error { - err := errors.Wrapf(ErrAliasAlreadyExist, "alias %v:%v already exist", db, alias) + err := wrapFields(ErrAliasAlreadyExist, + value("database", db), + value("alias", alias), + ) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } // Partition related func WrapErrPartitionNotFound(partition any, msg ...string) error { - err := wrapWithField(ErrPartitionNotFound, "partition", partition) + err := wrapFields(ErrPartitionNotFound, value("partition", partition)) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } func WrapErrPartitionNotLoaded(partition any, msg ...string) error { - err := wrapWithField(ErrPartitionNotLoaded, "partition", partition) + err := wrapFields(ErrPartitionNotLoaded, value("partition", partition)) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } func WrapErrPartitionNotFullyLoaded(partition any, msg ...string) error { - err := wrapWithField(ErrPartitionNotFullyLoaded, "partition", partition) + err := wrapFields(ErrPartitionNotFullyLoaded, value("partition", partition)) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } // ResourceGroup related func WrapErrResourceGroupNotFound(rg any, msg ...string) error { - err := wrapWithField(ErrResourceGroupNotFound, "rg", rg) + err := wrapFields(ErrResourceGroupNotFound, value("rg", rg)) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } // Replica related func WrapErrReplicaNotFound(id int64, msg ...string) error { - err := wrapWithField(ErrReplicaNotFound, "replica", id) + err := wrapFields(ErrReplicaNotFound, value("replica", id)) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } func WrapErrReplicaNotAvailable(id int64, msg ...string) error { - err := wrapWithField(ErrReplicaNotAvailable, "replica", id) + err := wrapFields(ErrReplicaNotAvailable, value("replica", id)) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } // Channel related func WrapErrChannelNotFound(name string, msg ...string) error { - err := wrapWithField(ErrChannelNotFound, "channel", name) + err := wrapFields(ErrChannelNotFound, value("channel", name)) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } func WrapErrChannelLack(name string, msg ...string) error { - err := wrapWithField(ErrChannelLack, "channel", name) + err := wrapFields(ErrChannelLack, value("channel", name)) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } func WrapErrChannelReduplicate(name string, msg ...string) error { - err := wrapWithField(ErrChannelReduplicate, "channel", name) + err := wrapFields(ErrChannelReduplicate, value("channel", name)) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } func WrapErrChannelNotAvailable(name string, msg ...string) error { - err := wrapWithField(ErrChannelNotAvailable, "channel", name) + err := wrapFields(ErrChannelNotAvailable, value("channel", name)) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } // Segment related func WrapErrSegmentNotFound(id int64, msg ...string) error { - err := wrapWithField(ErrSegmentNotFound, "segment", id) + err := wrapFields(ErrSegmentNotFound, value("segment", id)) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) + } + return err +} + +func WrapErrSegmentsNotFound(ids []int64, msg ...string) error { + err := wrapFields(ErrSegmentNotFound, value("segments", ids)) + if len(msg) > 0 { + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } func WrapErrSegmentNotLoaded(id int64, msg ...string) error { - err := wrapWithField(ErrSegmentNotLoaded, "segment", id) + err := wrapFields(ErrSegmentNotLoaded, value("segment", id)) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } func WrapErrSegmentLack(id int64, msg ...string) error { - err := wrapWithField(ErrSegmentLack, "segment", id) + err := wrapFields(ErrSegmentLack, value("segment", id)) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } func WrapErrSegmentReduplicate(id int64, msg ...string) error { - err := wrapWithField(ErrSegmentReduplicate, "segment", id) + err := wrapFields(ErrSegmentReduplicate, value("segment", id)) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } // Index related func WrapErrIndexNotFound(indexName string, msg ...string) error { - err := wrapWithField(ErrIndexNotFound, "indexName", indexName) + err := wrapFields(ErrIndexNotFound, value("indexName", indexName)) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } func WrapErrIndexNotFoundForSegment(segmentID int64, msg ...string) error { - err := wrapWithField(ErrIndexNotFound, "segmentID", segmentID) + err := wrapFields(ErrIndexNotFound, value("segmentID", segmentID)) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } func WrapErrIndexNotFoundForCollection(collection string, msg ...string) error { - err := wrapWithField(ErrIndexNotFound, "collection", collection) + err := wrapFields(ErrIndexNotFound, value("collection", collection)) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } func WrapErrIndexNotSupported(indexType string, msg ...string) error { - err := wrapWithField(ErrIndexNotSupported, "indexType", indexType) + err := wrapFields(ErrIndexNotSupported, value("indexType", indexType)) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } func WrapErrIndexDuplicate(indexName string, msg ...string) error { - err := wrapWithField(ErrIndexDuplicate, "indexName", indexName) + err := wrapFields(ErrIndexDuplicate, value("indexName", indexName)) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } // Node related func WrapErrNodeNotFound(id int64, msg ...string) error { - err := wrapWithField(ErrNodeNotFound, "node", id) + err := wrapFields(ErrNodeNotFound, value("node", id)) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } func WrapErrNodeOffline(id int64, msg ...string) error { - err := wrapWithField(ErrNodeOffline, "node", id) + err := wrapFields(ErrNodeOffline, value("node", id)) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } func WrapErrNodeLack(expectedNum, actualNum int64, msg ...string) error { - err := errors.Wrapf(ErrNodeLack, "expectedNum=%d, actualNum=%d", expectedNum, actualNum) + err := wrapFields(ErrNodeLack, + value("expectedNum", expectedNum), + value("actualNum", actualNum), + ) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } @@ -618,104 +673,110 @@ func WrapErrNodeLack(expectedNum, actualNum int64, msg ...string) error { func WrapErrNodeLackAny(msg ...string) error { err := error(ErrNodeLack) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } func WrapErrNodeNotAvailable(id int64, msg ...string) error { - err := wrapWithField(ErrNodeNotAvailable, "node", id) + err := wrapFields(ErrNodeNotAvailable, value("node", id)) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } func WrapErrNodeNotMatch(expectedNodeID, actualNodeID int64, msg ...string) error { - err := errors.Wrapf(ErrNodeNotMatch, "expectedNodeID=%d, actualNodeID=%d", expectedNodeID, actualNodeID) + err := wrapFields(ErrNodeNotMatch, + value("expectedNodeID", expectedNodeID), + value("actualNodeID", actualNodeID), + ) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } // IO related func WrapErrIoKeyNotFound(key string, msg ...string) error { - err := errors.Wrapf(ErrIoKeyNotFound, "key=%s", key) + err := wrapFields(ErrIoKeyNotFound, value("key", key)) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } -func WrapErrIoFailed(key string, msg ...string) error { - err := errors.Wrapf(ErrIoFailed, "key=%s", key) - if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) +func WrapErrIoFailed(key string, err error) error { + if err == nil { + return nil } - return err + return wrapFieldsWithDesc(ErrIoFailed, err.Error(), value("key", key)) } func WrapErrIoFailedReason(reason string, msg ...string) error { - err := errors.Wrapf(ErrIoFailed, reason) + err := wrapFieldsWithDesc(ErrIoFailed, reason) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } // Parameter related func WrapErrParameterInvalid[T any](expected, actual T, msg ...string) error { - err := errors.Wrapf(ErrParameterInvalid, "expected=%v, actual=%v", expected, actual) + err := wrapFields(ErrParameterInvalid, + value("expected", expected), + value("actual", actual), + ) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } func WrapErrParameterInvalidRange[T any](lower, upper, actual T, msg ...string) error { - err := errors.Wrapf(ErrParameterInvalid, "expected in [%v, %v], actual=%v", lower, upper, actual) + err := wrapFields(ErrParameterInvalid, + bound("value", actual, lower, upper), + ) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } func WrapErrParameterInvalidMsg(fmt string, args ...any) error { - err := errors.Wrapf(ErrParameterInvalid, fmt, args...) - return err + return errors.Wrapf(ErrParameterInvalid, fmt, args...) } // Metrics related func WrapErrMetricNotFound(name string, msg ...string) error { - err := errors.Wrapf(ErrMetricNotFound, "metric=%s", name) + err := wrapFields(ErrMetricNotFound, value("metric", name)) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } // Message queue related func WrapErrMqTopicNotFound(name string, msg ...string) error { - err := errors.Wrapf(ErrMqTopicNotFound, "topic=%s", name) + err := wrapFields(ErrMqTopicNotFound, value("topic", name)) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } func WrapErrMqTopicNotEmpty(name string, msg ...string) error { - err := errors.Wrapf(ErrMqTopicNotEmpty, "topic=%s", name) + err := wrapFields(ErrMqTopicNotEmpty, value("topic", name)) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } func WrapErrMqInternal(err error, msg ...string) error { - err = errors.Wrapf(ErrMqInternal, "internal=%v", err) + err = wrapFieldsWithDesc(ErrMqInternal, err.Error()) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } @@ -732,30 +793,83 @@ func WrapErrPrivilegeNotPermitted(fmt string, args ...any) error { // Segcore related func WrapErrSegcore(code int32, msg ...string) error { - err := errors.Wrapf(ErrSegcore, "internal code=%v", code) + err := wrapFields(ErrSegcore, value("segcoreCode", code)) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } // field related func WrapErrFieldNotFound[T any](field T, msg ...string) error { - err := errors.Wrapf(ErrFieldNotFound, "field=%v", field) + err := wrapFields(ErrFieldNotFound, value("field", field)) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } func WrapErrFieldNameInvalid(field any, msg ...string) error { - err := wrapWithField(ErrFieldInvalidName, "field", field) + err := wrapFields(ErrFieldInvalidName, value("field", field)) if len(msg) > 0 { - err = errors.Wrap(err, strings.Join(msg, "; ")) + err = errors.Wrap(err, strings.Join(msg, "->")) } return err } -func wrapWithField(err error, name string, value any) error { - return errors.Wrapf(err, "%s=%v", name, value) +func wrapFields(err milvusError, fields ...errorField) error { + for i := range fields { + err.msg += fmt.Sprintf("[%s]", fields[i].String()) + } + err.detail = err.msg + return err +} + +func wrapFieldsWithDesc(err milvusError, desc string, fields ...errorField) error { + for i := range fields { + err.msg += fmt.Sprintf("[%s]", fields[i].String()) + } + err.msg += ": " + desc + err.detail = err.msg + return err +} + +type errorField interface { + String() string +} + +type valueField struct { + name string + value any +} + +func value(name string, value any) valueField { + return valueField{ + name, + value, + } +} + +func (f valueField) String() string { + return fmt.Sprintf("%s=%v", f.name, f.value) +} + +type boundField struct { + name string + value any + lower any + upper any +} + +func bound(name string, value, lower, upper any) boundField { + return boundField{ + name, + value, + lower, + upper, + } +} + +func (f boundField) String() string { + return fmt.Sprintf("%v out of range %v <= %s <= %v", f.value, f.lower, f.name, f.upper) } diff --git a/tests/python_client/testcases/test_search.py b/tests/python_client/testcases/test_search.py index ed1b9bcf37..90958d3fac 100644 --- a/tests/python_client/testcases/test_search.py +++ b/tests/python_client/testcases/test_search.py @@ -1311,7 +1311,7 @@ class TestCollectionSearchInvalid(TestcaseBase): default_search_params, default_limit, expr, check_task=CheckTasks.err_res, - check_items={"err_code": 65538, + check_items={"err_code": 65535, "err_msg": "UnknownError: unsupported right datatype JSON of compare expr"})