diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 0f7d127f45..4c3551c289 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -60,6 +60,7 @@ import ( "github.com/milvus-io/milvus/internal/util/errorutil" "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/logutil" + "github.com/milvus-io/milvus/internal/util/merr" "github.com/milvus-io/milvus/internal/util/metricsinfo" "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/retry" @@ -713,10 +714,7 @@ func (c *Core) GetComponentStates(ctx context.Context) (*milvuspb.ComponentState StateCode: code, ExtraInfo: nil, }, - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - Reason: "", - }, + Status: merr.Status(nil), SubcomponentStates: []*milvuspb.ComponentInfo{ { NodeID: nodeID, @@ -731,29 +729,23 @@ func (c *Core) GetComponentStates(ctx context.Context) (*milvuspb.ComponentState // GetTimeTickChannel get timetick channel name func (c *Core) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) { return &milvuspb.StringResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - Reason: "", - }, - Value: Params.CommonCfg.RootCoordTimeTick.GetValue(), + Status: merr.Status(nil), + Value: Params.CommonCfg.RootCoordTimeTick.GetValue(), }, nil } // GetStatisticsChannel get statistics channel name func (c *Core) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) { return &milvuspb.StringResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - Reason: "", - }, - Value: Params.CommonCfg.RootCoordStatistics.GetValue(), + Status: merr.Status(nil), + Value: Params.CommonCfg.RootCoordStatistics.GetValue(), }, nil } // CreateCollection create collection func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) { if code, ok := c.checkHealthy(); !ok { - return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]), nil + return merr.Status(merr.WrapErrServiceNotReady(code.String())), nil } metrics.RootCoordDDLReqCounter.WithLabelValues("CreateCollection", metrics.TotalLabel).Inc() @@ -779,7 +771,7 @@ func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollecti zap.String("name", in.GetCollectionName())) metrics.RootCoordDDLReqCounter.WithLabelValues("CreateCollection", metrics.FailLabel).Inc() - return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil + return merr.Status(err), nil } if err := t.WaitToFinish(); err != nil { @@ -790,7 +782,7 @@ func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollecti zap.Uint64("ts", t.GetTs())) metrics.RootCoordDDLReqCounter.WithLabelValues("CreateCollection", metrics.FailLabel).Inc() - return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil + return merr.Status(err), nil } metrics.RootCoordDDLReqCounter.WithLabelValues("CreateCollection", metrics.SuccessLabel).Inc() @@ -801,13 +793,13 @@ func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollecti zap.String("role", typeutil.RootCoordRole), zap.String("name", in.GetCollectionName()), zap.Uint64("ts", t.GetTs())) - return succStatus(), nil + return merr.Status(nil), nil } // DropCollection drop collection func (c *Core) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRequest) (*commonpb.Status, error) { if code, ok := c.checkHealthy(); !ok { - return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]), nil + return merr.Status(merr.WrapErrServiceNotReady(code.String())), nil } metrics.RootCoordDDLReqCounter.WithLabelValues("DropCollection", metrics.TotalLabel).Inc() @@ -831,7 +823,7 @@ func (c *Core) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRe zap.String("name", in.GetCollectionName())) metrics.RootCoordDDLReqCounter.WithLabelValues("DropCollection", metrics.FailLabel).Inc() - return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil + return merr.Status(err), nil } if err := t.WaitToFinish(); err != nil { @@ -841,7 +833,7 @@ func (c *Core) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRe zap.Uint64("ts", t.GetTs())) metrics.RootCoordDDLReqCounter.WithLabelValues("DropCollection", metrics.FailLabel).Inc() - return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil + return merr.Status(err), nil } metrics.RootCoordDDLReqCounter.WithLabelValues("DropCollection", metrics.SuccessLabel).Inc() @@ -851,15 +843,14 @@ func (c *Core) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRe log.Ctx(ctx).Info("done to drop collection", zap.String("role", typeutil.RootCoordRole), zap.String("name", in.GetCollectionName()), zap.Uint64("ts", t.GetTs())) - return succStatus(), nil + return merr.Status(nil), nil } // HasCollection check collection existence func (c *Core) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) { if code, ok := c.checkHealthy(); !ok { return &milvuspb.BoolResponse{ - Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]), - Value: false, + Status: merr.Status(merr.WrapErrServiceNotReady(code.String())), }, nil } @@ -882,8 +873,7 @@ func (c *Core) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequ log.Warn("failed to enqueue request to has collection", zap.Error(err)) metrics.RootCoordDDLReqCounter.WithLabelValues("HasCollection", metrics.FailLabel).Inc() return &milvuspb.BoolResponse{ - Status: failStatus(commonpb.ErrorCode_UnexpectedError, "HasCollection failed: "+err.Error()), - Value: false, + Status: merr.Status(err), }, nil } @@ -891,8 +881,7 @@ func (c *Core) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequ log.Warn("failed to has collection", zap.Error(err)) metrics.RootCoordDDLReqCounter.WithLabelValues("HasCollection", metrics.FailLabel).Inc() return &milvuspb.BoolResponse{ - Status: failStatus(commonpb.ErrorCode_UnexpectedError, "HasCollection failed: "+err.Error()), - Value: false, + Status: merr.Status(err), }, nil } @@ -913,7 +902,7 @@ func (c *Core) describeCollection(ctx context.Context, in *milvuspb.DescribeColl } func convertModelToDesc(collInfo *model.Collection, aliases []string) *milvuspb.DescribeCollectionResponse { - resp := &milvuspb.DescribeCollectionResponse{Status: succStatus()} + resp := &milvuspb.DescribeCollectionResponse{Status: merr.Status(nil)} resp.Schema = &schemapb.CollectionSchema{ Name: collInfo.Name, @@ -943,7 +932,7 @@ func convertModelToDesc(collInfo *model.Collection, aliases []string) *milvuspb. func (c *Core) describeCollectionImpl(ctx context.Context, in *milvuspb.DescribeCollectionRequest, allowUnavailable bool) (*milvuspb.DescribeCollectionResponse, error) { if code, ok := c.checkHealthy(); !ok { return &milvuspb.DescribeCollectionResponse{ - Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode"+commonpb.StateCode_name[int32(code)]), + Status: merr.Status(merr.WrapErrServiceNotReady(code.String())), }, nil } @@ -963,7 +952,7 @@ func (c *Core) describeCollectionImpl(ctx context.Context, in *milvuspb.Describe t := &describeCollectionTask{ baseTask: newBaseTask(ctx, c), Req: in, - Rsp: &milvuspb.DescribeCollectionResponse{}, + Rsp: &milvuspb.DescribeCollectionResponse{Status: merr.Status(nil)}, allowUnavailable: allowUnavailable, } @@ -972,7 +961,7 @@ func (c *Core) describeCollectionImpl(ctx context.Context, in *milvuspb.Describe metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeCollection", metrics.FailLabel).Inc() return &milvuspb.DescribeCollectionResponse{ // TODO: use commonpb.ErrorCode_CollectionNotExists. SDK use commonpb.ErrorCode_UnexpectedError now. - Status: failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), + Status: merr.Status(err), // Status: common.StatusFromError(err), }, nil } @@ -982,7 +971,7 @@ func (c *Core) describeCollectionImpl(ctx context.Context, in *milvuspb.Describe metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeCollection", metrics.FailLabel).Inc() return &milvuspb.DescribeCollectionResponse{ // TODO: use commonpb.ErrorCode_CollectionNotExists. SDK use commonpb.ErrorCode_UnexpectedError now. - Status: failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), + Status: merr.Status(err), // Status: common.StatusFromError(err), }, nil } @@ -1014,7 +1003,7 @@ func (c *Core) DescribeCollectionInternal(ctx context.Context, in *milvuspb.Desc func (c *Core) ShowCollections(ctx context.Context, in *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) { if code, ok := c.checkHealthy(); !ok { return &milvuspb.ShowCollectionsResponse{ - Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]), + Status: merr.Status(merr.WrapErrServiceNotReady(code.String())), }, nil } @@ -1037,7 +1026,7 @@ func (c *Core) ShowCollections(ctx context.Context, in *milvuspb.ShowCollections log.Warn("failed to enqueue request to show collections", zap.Error(err)) metrics.RootCoordDDLReqCounter.WithLabelValues("ShowCollections", metrics.FailLabel).Inc() return &milvuspb.ShowCollectionsResponse{ - Status: failStatus(commonpb.ErrorCode_UnexpectedError, "ShowCollections failed: "+err.Error()), + Status: merr.Status(err), }, nil } @@ -1045,7 +1034,7 @@ func (c *Core) ShowCollections(ctx context.Context, in *milvuspb.ShowCollections log.Warn("failed to show collections", zap.Error(err)) metrics.RootCoordDDLReqCounter.WithLabelValues("ShowCollections", metrics.FailLabel).Inc() return &milvuspb.ShowCollectionsResponse{ - Status: failStatus(commonpb.ErrorCode_UnexpectedError, "ShowCollections failed: "+err.Error()), + Status: merr.Status(err), }, nil } @@ -1059,7 +1048,7 @@ func (c *Core) ShowCollections(ctx context.Context, in *milvuspb.ShowCollections func (c *Core) AlterCollection(ctx context.Context, in *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) { if code, ok := c.checkHealthy(); !ok { - return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]), nil + return merr.Status(merr.WrapErrServiceNotReady(code.String())), nil } metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollection", metrics.TotalLabel).Inc() @@ -1085,7 +1074,7 @@ func (c *Core) AlterCollection(ctx context.Context, in *milvuspb.AlterCollection zap.String("name", in.GetCollectionName())) metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollection", metrics.FailLabel).Inc() - return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil + return merr.Status(err), nil } if err := t.WaitToFinish(); err != nil { @@ -1096,7 +1085,7 @@ func (c *Core) AlterCollection(ctx context.Context, in *milvuspb.AlterCollection zap.Uint64("ts", t.GetTs())) metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollection", metrics.FailLabel).Inc() - return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil + return merr.Status(err), nil } metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollection", metrics.SuccessLabel).Inc() @@ -1107,13 +1096,13 @@ func (c *Core) AlterCollection(ctx context.Context, in *milvuspb.AlterCollection zap.String("role", typeutil.RootCoordRole), zap.String("name", in.GetCollectionName()), zap.Uint64("ts", t.GetTs())) - return succStatus(), nil + return merr.Status(nil), nil } // CreatePartition create partition func (c *Core) CreatePartition(ctx context.Context, in *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) { if code, ok := c.checkHealthy(); !ok { - return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]), nil + return merr.Status(merr.WrapErrServiceNotReady(code.String())), nil } metrics.RootCoordDDLReqCounter.WithLabelValues("CreatePartition", metrics.TotalLabel).Inc() @@ -1141,7 +1130,7 @@ func (c *Core) CreatePartition(ctx context.Context, in *milvuspb.CreatePartition zap.String("partition", in.GetPartitionName())) metrics.RootCoordDDLReqCounter.WithLabelValues("CreatePartition", metrics.FailLabel).Inc() - return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil + return merr.Status(err), nil } if err := t.WaitToFinish(); err != nil { @@ -1153,7 +1142,7 @@ func (c *Core) CreatePartition(ctx context.Context, in *milvuspb.CreatePartition zap.Uint64("ts", t.GetTs())) metrics.RootCoordDDLReqCounter.WithLabelValues("CreatePartition", metrics.FailLabel).Inc() - return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil + return merr.Status(err), nil } metrics.RootCoordDDLReqCounter.WithLabelValues("CreatePartition", metrics.SuccessLabel).Inc() @@ -1164,13 +1153,13 @@ func (c *Core) CreatePartition(ctx context.Context, in *milvuspb.CreatePartition zap.String("collection", in.GetCollectionName()), zap.String("partition", in.GetPartitionName()), zap.Uint64("ts", t.GetTs())) - return succStatus(), nil + return merr.Status(nil), nil } // DropPartition drop partition func (c *Core) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequest) (*commonpb.Status, error) { if code, ok := c.checkHealthy(); !ok { - return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]), nil + return merr.Status(merr.WrapErrServiceNotReady(code.String())), nil } metrics.RootCoordDDLReqCounter.WithLabelValues("DropPartition", metrics.TotalLabel).Inc() @@ -1198,7 +1187,7 @@ func (c *Core) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequ zap.String("partition", in.GetPartitionName())) metrics.RootCoordDDLReqCounter.WithLabelValues("DropPartition", metrics.FailLabel).Inc() - return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil + return merr.Status(err), nil } if err := t.WaitToFinish(); err != nil { log.Ctx(ctx).Error("failed to drop partition", @@ -1209,7 +1198,7 @@ func (c *Core) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequ zap.Uint64("ts", t.GetTs())) metrics.RootCoordDDLReqCounter.WithLabelValues("DropPartition", metrics.FailLabel).Inc() - return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil + return merr.Status(err), nil } metrics.RootCoordDDLReqCounter.WithLabelValues("DropPartition", metrics.SuccessLabel).Inc() @@ -1220,15 +1209,14 @@ func (c *Core) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequ zap.String("collection", in.GetCollectionName()), zap.String("partition", in.GetPartitionName()), zap.Uint64("ts", t.GetTs())) - return succStatus(), nil + return merr.Status(nil), nil } // HasPartition check partition existence func (c *Core) HasPartition(ctx context.Context, in *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) { if code, ok := c.checkHealthy(); !ok { return &milvuspb.BoolResponse{ - Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]), - Value: false, + Status: merr.Status(merr.WrapErrServiceNotReady(code.String())), }, nil } @@ -1253,8 +1241,7 @@ func (c *Core) HasPartition(ctx context.Context, in *milvuspb.HasPartitionReques log.Warn("failed to enqueue request to has partition", zap.Error(err)) metrics.RootCoordDDLReqCounter.WithLabelValues("HasPartition", metrics.FailLabel).Inc() return &milvuspb.BoolResponse{ - Status: failStatus(commonpb.ErrorCode_UnexpectedError, "HasPartition failed: "+err.Error()), - Value: false, + Status: merr.Status(err), }, nil } @@ -1262,8 +1249,7 @@ func (c *Core) HasPartition(ctx context.Context, in *milvuspb.HasPartitionReques log.Warn("failed to has partition", zap.Error(err)) metrics.RootCoordDDLReqCounter.WithLabelValues("HasPartition", metrics.FailLabel).Inc() return &milvuspb.BoolResponse{ - Status: failStatus(commonpb.ErrorCode_UnexpectedError, "HasPartition failed: "+err.Error()), - Value: false, + Status: merr.Status(err), }, nil } @@ -1278,7 +1264,7 @@ func (c *Core) HasPartition(ctx context.Context, in *milvuspb.HasPartitionReques func (c *Core) showPartitionsImpl(ctx context.Context, in *milvuspb.ShowPartitionsRequest, allowUnavailable bool) (*milvuspb.ShowPartitionsResponse, error) { if code, ok := c.checkHealthy(); !ok { return &milvuspb.ShowPartitionsResponse{ - Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]), + Status: merr.Status(merr.WrapErrServiceNotReady(code.String())), }, nil } @@ -1300,7 +1286,7 @@ func (c *Core) showPartitionsImpl(ctx context.Context, in *milvuspb.ShowPartitio log.Warn("failed to enqueue request to show partitions", zap.Error(err)) metrics.RootCoordDDLReqCounter.WithLabelValues("ShowPartitions", metrics.FailLabel).Inc() return &milvuspb.ShowPartitionsResponse{ - Status: failStatus(commonpb.ErrorCode_UnexpectedError, "ShowPartitions failed: "+err.Error()), + Status: merr.Status(err), // Status: common.StatusFromError(err), }, nil } @@ -1309,7 +1295,7 @@ func (c *Core) showPartitionsImpl(ctx context.Context, in *milvuspb.ShowPartitio log.Warn("failed to show partitions", zap.Error(err)) metrics.RootCoordDDLReqCounter.WithLabelValues("ShowPartitions", metrics.FailLabel).Inc() return &milvuspb.ShowPartitionsResponse{ - Status: failStatus(commonpb.ErrorCode_UnexpectedError, "ShowPartitions failed: "+err.Error()), + Status: merr.Status(err), // Status: common.StatusFromError(err), }, nil } @@ -1336,14 +1322,14 @@ func (c *Core) ShowPartitionsInternal(ctx context.Context, in *milvuspb.ShowPart func (c *Core) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsRequest) (*milvuspb.ShowSegmentsResponse, error) { // ShowSegments Only used in GetPersistentSegmentInfo, it's already deprecated for a long time. // Though we continue to keep current logic, it's not right enough since RootCoord only contains indexed segments. - return &milvuspb.ShowSegmentsResponse{Status: succStatus()}, nil + return &milvuspb.ShowSegmentsResponse{Status: merr.Status(nil)}, nil } // AllocTimestamp alloc timestamp func (c *Core) AllocTimestamp(ctx context.Context, in *rootcoordpb.AllocTimestampRequest) (*rootcoordpb.AllocTimestampResponse, error) { if code, ok := c.checkHealthy(); !ok { return &rootcoordpb.AllocTimestampResponse{ - Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]), + Status: merr.Status(merr.WrapErrServiceNotReady(code.String())), }, nil } @@ -1353,7 +1339,7 @@ func (c *Core) AllocTimestamp(ctx context.Context, in *rootcoordpb.AllocTimestam zap.Error(err)) return &rootcoordpb.AllocTimestampResponse{ - Status: failStatus(commonpb.ErrorCode_UnexpectedError, "AllocTimestamp failed: "+err.Error()), + Status: merr.Status(err), }, nil } @@ -1361,7 +1347,7 @@ func (c *Core) AllocTimestamp(ctx context.Context, in *rootcoordpb.AllocTimestam ts = ts - uint64(in.GetCount()) + 1 metrics.RootCoordTimestamp.Set(float64(ts)) return &rootcoordpb.AllocTimestampResponse{ - Status: succStatus(), + Status: merr.Status(nil), Timestamp: ts, Count: in.GetCount(), }, nil @@ -1371,7 +1357,7 @@ func (c *Core) AllocTimestamp(ctx context.Context, in *rootcoordpb.AllocTimestam func (c *Core) AllocID(ctx context.Context, in *rootcoordpb.AllocIDRequest) (*rootcoordpb.AllocIDResponse, error) { if code, ok := c.checkHealthy(); !ok { return &rootcoordpb.AllocIDResponse{ - Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]), + Status: merr.Status(merr.WrapErrServiceNotReady(code.String())), }, nil } start, _, err := c.idAllocator.Alloc(in.Count) @@ -1381,14 +1367,14 @@ func (c *Core) AllocID(ctx context.Context, in *rootcoordpb.AllocIDRequest) (*ro zap.Error(err)) return &rootcoordpb.AllocIDResponse{ - Status: failStatus(commonpb.ErrorCode_UnexpectedError, "AllocID failed: "+err.Error()), + Status: merr.Status(err), Count: in.Count, }, nil } metrics.RootCoordIDAllocCounter.Add(float64(in.Count)) return &rootcoordpb.AllocIDResponse{ - Status: succStatus(), + Status: merr.Status(nil), ID: start, Count: in.Count, }, nil @@ -1399,40 +1385,39 @@ func (c *Core) UpdateChannelTimeTick(ctx context.Context, in *internalpb.Channel log := log.Ctx(ctx) if code, ok := c.checkHealthy(); !ok { log.Warn("failed to updateTimeTick because rootcoord is not healthy", zap.Any("state", code)) - return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]), nil + return merr.Status(merr.WrapErrServiceNotReady(code.String())), nil } if in.Base.MsgType != commonpb.MsgType_TimeTick { log.Warn("failed to updateTimeTick because base messasge is not timetick, state", zap.Any("base message type", in.Base.MsgType)) - msgTypeName := commonpb.MsgType_name[int32(in.Base.GetMsgType())] - return failStatus(commonpb.ErrorCode_UnexpectedError, "invalid message type "+msgTypeName), nil + return merr.Status(merr.WrapErrParameterInvalid(commonpb.MsgType_TimeTick.String(), in.Base.MsgType.String(), "invalid message type")), nil } err := c.chanTimeTick.updateTimeTick(in, "gRPC") if err != nil { log.Warn("failed to updateTimeTick", zap.String("role", typeutil.RootCoordRole), zap.Error(err)) - return failStatus(commonpb.ErrorCode_UnexpectedError, "UpdateTimeTick failed: "+err.Error()), nil + return merr.Status(err), nil } - return succStatus(), nil + return merr.Status(nil), nil } // InvalidateCollectionMetaCache notifies RootCoord to release the collection cache in Proxies. func (c *Core) InvalidateCollectionMetaCache(ctx context.Context, in *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) { if code, ok := c.checkHealthy(); !ok { - return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]), nil + return merr.Status(merr.WrapErrServiceNotReady(code.String())), nil } err := c.proxyClientManager.InvalidateCollectionMetaCache(ctx, in) if err != nil { - return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil + return merr.Status(err), nil } - return succStatus(), nil + return merr.Status(nil), nil } // ShowConfigurations returns the configurations of RootCoord matching req.Pattern func (c *Core) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) { if code, ok := c.checkHealthy(); !ok { return &internalpb.ShowConfigurationsResponse{ - Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]), + Status: merr.Status(merr.WrapErrServiceNotReady(code.String())), Configuations: nil, }, nil } @@ -1459,7 +1444,7 @@ func (c *Core) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfi func (c *Core) GetMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { if code, ok := c.checkHealthy(); !ok { return &milvuspb.GetMetricsResponse{ - Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]), + Status: merr.Status(merr.WrapErrServiceNotReady(code.String())), Response: "", }, nil } @@ -1469,7 +1454,7 @@ func (c *Core) GetMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) ( log.Warn("ParseMetricType failed", zap.String("role", typeutil.RootCoordRole), zap.Int64("nodeID", c.session.ServerID), zap.String("req", in.Request), zap.Error(err)) return &milvuspb.GetMetricsResponse{ - Status: failStatus(commonpb.ErrorCode_UnexpectedError, "ParseMetricType failed: "+err.Error()), + Status: merr.Status(err), Response: "", }, nil } @@ -1486,7 +1471,7 @@ func (c *Core) GetMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) ( zap.String("metricType", metricType), zap.Error(err)) return &milvuspb.GetMetricsResponse{ - Status: failStatus(commonpb.ErrorCode_UnexpectedError, fmt.Sprintf("getSystemInfoMetrics failed: %s", err.Error())), + Status: merr.Status(err), Response: "", }, nil } @@ -1499,7 +1484,7 @@ func (c *Core) GetMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) ( zap.String("metricType", metricType)) return &milvuspb.GetMetricsResponse{ - Status: failStatus(commonpb.ErrorCode_UnexpectedError, metricsinfo.MsgUnimplementedMetric), + Status: merr.Status(merr.WrapErrMetricNotFound(metricType)), Response: "", }, nil } @@ -1507,7 +1492,7 @@ func (c *Core) GetMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) ( // CreateAlias create collection alias func (c *Core) CreateAlias(ctx context.Context, in *milvuspb.CreateAliasRequest) (*commonpb.Status, error) { if code, ok := c.checkHealthy(); !ok { - return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]), nil + return merr.Status(merr.WrapErrServiceNotReady(code.String())), nil } metrics.RootCoordDDLReqCounter.WithLabelValues("CreateAlias", metrics.TotalLabel).Inc() @@ -1535,7 +1520,7 @@ func (c *Core) CreateAlias(ctx context.Context, in *milvuspb.CreateAliasRequest) zap.String("collection", in.GetCollectionName())) metrics.RootCoordDDLReqCounter.WithLabelValues("CreateAlias", metrics.FailLabel).Inc() - return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil + return merr.Status(err), nil } if err := t.WaitToFinish(); err != nil { @@ -1547,7 +1532,7 @@ func (c *Core) CreateAlias(ctx context.Context, in *milvuspb.CreateAliasRequest) zap.Uint64("ts", t.GetTs())) metrics.RootCoordDDLReqCounter.WithLabelValues("CreateAlias", metrics.FailLabel).Inc() - return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil + return merr.Status(err), nil } metrics.RootCoordDDLReqCounter.WithLabelValues("CreateAlias", metrics.SuccessLabel).Inc() @@ -1558,13 +1543,13 @@ func (c *Core) CreateAlias(ctx context.Context, in *milvuspb.CreateAliasRequest) zap.String("alias", in.GetAlias()), zap.String("collection", in.GetCollectionName()), zap.Uint64("ts", t.GetTs())) - return succStatus(), nil + return merr.Status(nil), nil } // DropAlias drop collection alias func (c *Core) DropAlias(ctx context.Context, in *milvuspb.DropAliasRequest) (*commonpb.Status, error) { if code, ok := c.checkHealthy(); !ok { - return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]), nil + return merr.Status(merr.WrapErrServiceNotReady(code.String())), nil } metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.TotalLabel).Inc() @@ -1590,7 +1575,7 @@ func (c *Core) DropAlias(ctx context.Context, in *milvuspb.DropAliasRequest) (*c zap.String("alias", in.GetAlias())) metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.FailLabel).Inc() - return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil + return merr.Status(err), nil } if err := t.WaitToFinish(); err != nil { @@ -1601,7 +1586,7 @@ func (c *Core) DropAlias(ctx context.Context, in *milvuspb.DropAliasRequest) (*c zap.Uint64("ts", t.GetTs())) metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.FailLabel).Inc() - return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil + return merr.Status(err), nil } metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.SuccessLabel).Inc() @@ -1611,13 +1596,13 @@ func (c *Core) DropAlias(ctx context.Context, in *milvuspb.DropAliasRequest) (*c zap.String("role", typeutil.RootCoordRole), zap.String("alias", in.GetAlias()), zap.Uint64("ts", t.GetTs())) - return succStatus(), nil + return merr.Status(nil), nil } // AlterAlias alter collection alias func (c *Core) AlterAlias(ctx context.Context, in *milvuspb.AlterAliasRequest) (*commonpb.Status, error) { if code, ok := c.checkHealthy(); !ok { - return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]), nil + return merr.Status(merr.WrapErrServiceNotReady(code.String())), nil } metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.TotalLabel).Inc() @@ -1645,7 +1630,7 @@ func (c *Core) AlterAlias(ctx context.Context, in *milvuspb.AlterAliasRequest) ( zap.String("collection", in.GetCollectionName())) metrics.RootCoordDDLReqCounter.WithLabelValues("AlterAlias", metrics.FailLabel).Inc() - return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil + return merr.Status(err), nil } if err := t.WaitToFinish(); err != nil { @@ -1657,7 +1642,7 @@ func (c *Core) AlterAlias(ctx context.Context, in *milvuspb.AlterAliasRequest) ( zap.Uint64("ts", t.GetTs())) metrics.RootCoordDDLReqCounter.WithLabelValues("AlterAlias", metrics.FailLabel).Inc() - return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil + return merr.Status(err), nil } metrics.RootCoordDDLReqCounter.WithLabelValues("AlterAlias", metrics.SuccessLabel).Inc() @@ -1668,14 +1653,14 @@ func (c *Core) AlterAlias(ctx context.Context, in *milvuspb.AlterAliasRequest) ( zap.String("alias", in.GetAlias()), zap.String("collection", in.GetCollectionName()), zap.Uint64("ts", t.GetTs())) - return succStatus(), nil + return merr.Status(nil), nil } // Import imports large files (json, numpy, etc.) on MinIO/S3 storage into Milvus storage. func (c *Core) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvuspb.ImportResponse, error) { if code, ok := c.checkHealthy(); !ok { return &milvuspb.ImportResponse{ - Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]), + Status: merr.Status(merr.WrapErrServiceNotReady(code.String())), }, nil } @@ -1716,7 +1701,7 @@ func (c *Core) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvus func (c *Core) GetImportState(ctx context.Context, req *milvuspb.GetImportStateRequest) (*milvuspb.GetImportStateResponse, error) { if code, ok := c.checkHealthy(); !ok { return &milvuspb.GetImportStateResponse{ - Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]), + Status: merr.Status(merr.WrapErrServiceNotReady(code.String())), }, nil } return c.importManager.getTaskState(req.GetTask()), nil @@ -1726,7 +1711,7 @@ func (c *Core) GetImportState(ctx context.Context, req *milvuspb.GetImportStateR func (c *Core) ListImportTasks(ctx context.Context, req *milvuspb.ListImportTasksRequest) (*milvuspb.ListImportTasksResponse, error) { if code, ok := c.checkHealthy(); !ok { return &milvuspb.ListImportTasksResponse{ - Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]), + Status: merr.Status(merr.WrapErrServiceNotReady(code.String())), }, nil } @@ -1739,8 +1724,10 @@ func (c *Core) ListImportTasks(ctx context.Context, req *milvuspb.ListImportTask if err != nil { err = fmt.Errorf("failed to find collection ID from its name: '%s', error: %w", req.GetCollectionName(), err) log.Error("ListImportTasks failed", zap.Error(err)) + status := merr.Status(err) + status.ErrorCode = commonpb.ErrorCode_IllegalCollectionName return &milvuspb.ListImportTasksResponse{ - Status: failStatus(commonpb.ErrorCode_IllegalCollectionName, err.Error()), + Status: status, }, nil } colID = colInfo.CollectionID @@ -1752,15 +1739,13 @@ func (c *Core) ListImportTasks(ctx context.Context, req *milvuspb.ListImportTask err = fmt.Errorf("failed to list import tasks, collection name: '%s', error: %w", req.GetCollectionName(), err) log.Error("ListImportTasks failed", zap.Error(err)) return &milvuspb.ListImportTasksResponse{ - Status: failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), + Status: merr.Status(err), }, nil } resp := &milvuspb.ListImportTasksResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, - Tasks: tasks, + Status: merr.Status(nil), + Tasks: tasks, } return resp, nil } @@ -1771,7 +1756,7 @@ func (c *Core) ReportImport(ctx context.Context, ir *rootcoordpb.ImportResult) ( zap.Int64("task ID", ir.GetTaskId()), zap.Any("import state", ir.GetState())) if code, ok := c.checkHealthy(); !ok { - return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]), nil + return merr.Status(merr.WrapErrServiceNotReady(code.String())), nil } // This method update a busy node to idle node, and send import task to idle node @@ -1801,6 +1786,7 @@ func (c *Core) ReportImport(ctx context.Context, ir *rootcoordpb.ImportResult) ( return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UpdateImportTaskFailure, Reason: err.Error(), + Code: merr.Code(err), }, nil } @@ -1823,10 +1809,7 @@ func (c *Core) ReportImport(ctx context.Context, ir *rootcoordpb.ImportResult) ( if err := c.broker.Flush(ctx, ti.GetCollectionId(), ir.GetSegments()); err != nil { log.Error("failed to call Flush on bulk insert segments", zap.Int64("task ID", ir.GetTaskId())) - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: err.Error(), - }, nil + return merr.Status(err), nil } } @@ -1879,7 +1862,10 @@ func (c *Core) CreateCredential(ctx context.Context, credInfo *internalpb.Creden log.Error("CreateCredential save credential failed", zap.String("role", typeutil.RootCoordRole), zap.String("username", credInfo.Username), zap.Error(err)) metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc() - return failStatus(commonpb.ErrorCode_CreateCredentialFailure, "CreateCredential failed: "+err.Error()), nil + + status := merr.Status(err) + status.ErrorCode = commonpb.ErrorCode_CreateCredentialFailure + return status, nil } // update proxy's local cache err = c.UpdateCredCache(ctx, credInfo) @@ -1894,7 +1880,7 @@ func (c *Core) CreateCredential(ctx context.Context, credInfo *internalpb.Creden metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.RootCoordNumOfCredentials.Inc() - return succStatus(), nil + return merr.Status(nil), nil } // GetCredential get credential by username @@ -1910,8 +1896,11 @@ func (c *Core) GetCredential(ctx context.Context, in *rootcoordpb.GetCredentialR log.Error("GetCredential query credential failed", zap.String("role", typeutil.RootCoordRole), zap.String("username", in.Username), zap.Error(err)) metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc() + + status := merr.Status(err) + status.ErrorCode = commonpb.ErrorCode_GetCredentialFailure return &rootcoordpb.GetCredentialResponse{ - Status: failStatus(commonpb.ErrorCode_GetCredentialFailure, "GetCredential failed: "+err.Error()), + Status: status, }, err } log.Debug("GetCredential success", zap.String("role", typeutil.RootCoordRole), @@ -1920,7 +1909,7 @@ func (c *Core) GetCredential(ctx context.Context, in *rootcoordpb.GetCredentialR metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds())) return &rootcoordpb.GetCredentialResponse{ - Status: succStatus(), + Status: merr.Status(nil), Username: credInfo.Username, Password: credInfo.EncryptedPassword, }, nil @@ -1939,7 +1928,10 @@ func (c *Core) UpdateCredential(ctx context.Context, credInfo *internalpb.Creden log.Error("UpdateCredential save credential failed", zap.String("role", typeutil.RootCoordRole), zap.String("username", credInfo.Username), zap.Error(err)) metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc() - return failStatus(commonpb.ErrorCode_UpdateCredentialFailure, "UpdateCredential failed: "+err.Error()), nil + + status := merr.Status(err) + status.ErrorCode = commonpb.ErrorCode_UpdateCredentialFailure + return status, nil } // update proxy's local cache err = c.UpdateCredCache(ctx, credInfo) @@ -1947,14 +1939,17 @@ func (c *Core) UpdateCredential(ctx context.Context, credInfo *internalpb.Creden log.Error("UpdateCredential update cache failed", zap.String("role", typeutil.RootCoordRole), zap.String("username", credInfo.Username), zap.Error(err)) metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc() - return failStatus(commonpb.ErrorCode_UpdateCredentialFailure, "UpdateCredential failed: "+err.Error()), nil + + status := merr.Status(err) + status.ErrorCode = commonpb.ErrorCode_UpdateCredentialFailure + return status, nil } log.Debug("UpdateCredential success", zap.String("role", typeutil.RootCoordRole), zap.String("username", credInfo.Username)) metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds())) - return succStatus(), nil + return merr.Status(nil), nil } // DeleteCredential delete a user @@ -1969,7 +1964,10 @@ func (c *Core) DeleteCredential(ctx context.Context, in *milvuspb.DeleteCredenti log.Error("DeleteCredential remove credential failed", zap.String("role", typeutil.RootCoordRole), zap.String("username", in.Username), zap.Error(err)) metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc() - return failStatus(commonpb.ErrorCode_DeleteCredentialFailure, "DeleteCredential failed: "+err.Error()), err + + status := merr.Status(err) + status.ErrorCode = commonpb.ErrorCode_DeleteCredentialFailure + return status, nil } // invalidate proxy's local cache err = c.ExpireCredCache(ctx, in.Username) @@ -1977,7 +1975,10 @@ func (c *Core) DeleteCredential(ctx context.Context, in *milvuspb.DeleteCredenti log.Error("DeleteCredential expire credential cache failed", zap.String("role", typeutil.RootCoordRole), zap.String("username", in.Username), zap.Error(err)) metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc() - return failStatus(commonpb.ErrorCode_DeleteCredentialFailure, "DeleteCredential failed: "+err.Error()), nil + + status := merr.Status(err) + status.ErrorCode = commonpb.ErrorCode_DeleteCredentialFailure + return status, nil } log.Debug("DeleteCredential success", zap.String("role", typeutil.RootCoordRole), zap.String("username", in.Username)) @@ -1985,7 +1986,7 @@ func (c *Core) DeleteCredential(ctx context.Context, in *milvuspb.DeleteCredenti metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.RootCoordNumOfCredentials.Dec() - return succStatus(), nil + return merr.Status(nil), nil } // ListCredUsers list all usernames @@ -2000,16 +2001,17 @@ func (c *Core) ListCredUsers(ctx context.Context, in *milvuspb.ListCredUsersRequ zap.String("role", typeutil.RootCoordRole), zap.Error(err)) metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc() - return &milvuspb.ListCredUsersResponse{ - Status: failStatus(commonpb.ErrorCode_ListCredUsersFailure, "ListCredUsers failed: "+err.Error()), - }, err + + status := merr.Status(err) + status.ErrorCode = commonpb.ErrorCode_ListCredUsersFailure + return &milvuspb.ListCredUsersResponse{Status: status}, nil } log.Debug("ListCredUsers success", zap.String("role", typeutil.RootCoordRole)) metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds())) return &milvuspb.ListCredUsersResponse{ - Status: succStatus(), + Status: merr.Status(nil), Usernames: credInfo.Usernames, }, nil } @@ -2042,7 +2044,7 @@ func (c *Core) CreateRole(ctx context.Context, in *milvuspb.CreateRoleRequest) ( metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.RootCoordNumOfRoles.Inc() - return succStatus(), nil + return merr.Status(nil), nil } // DropRole drop role @@ -2111,7 +2113,7 @@ func (c *Core) DropRole(ctx context.Context, in *milvuspb.DropRoleRequest) (*com metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.RootCoordNumOfRoles.Dec() - return succStatus(), nil + return merr.Status(nil), nil } // OperateUserRole operate the relationship between a user and a role @@ -2175,7 +2177,7 @@ func (c *Core) OperateUserRole(ctx context.Context, in *milvuspb.OperateUserRole logger.Debug(method + " success") metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds())) - return succStatus(), nil + return merr.Status(nil), nil } // SelectRole select role @@ -2196,7 +2198,7 @@ func (c *Core) SelectRole(ctx context.Context, in *milvuspb.SelectRoleRequest) ( if _, err := c.meta.SelectRole(util.DefaultTenant, &milvuspb.RoleEntity{Name: in.Role.Name}, false); err != nil { if common.IsKeyNotExistError(err) { return &milvuspb.SelectRoleResponse{ - Status: succStatus(), + Status: merr.Status(nil), }, nil } errMsg := "fail to select the role to check the role name" @@ -2219,7 +2221,7 @@ func (c *Core) SelectRole(ctx context.Context, in *milvuspb.SelectRoleRequest) ( metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds())) return &milvuspb.SelectRoleResponse{ - Status: succStatus(), + Status: merr.Status(nil), Results: roleResults, }, nil } @@ -2242,7 +2244,7 @@ func (c *Core) SelectUser(ctx context.Context, in *milvuspb.SelectUserRequest) ( if _, err := c.meta.SelectUser(util.DefaultTenant, &milvuspb.UserEntity{Name: in.User.Name}, false); err != nil { if common.IsKeyNotExistError(err) { return &milvuspb.SelectUserResponse{ - Status: succStatus(), + Status: merr.Status(nil), }, nil } errMsg := "fail to select the user to check the username" @@ -2265,7 +2267,7 @@ func (c *Core) SelectUser(ctx context.Context, in *milvuspb.SelectUserRequest) ( metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds())) return &milvuspb.SelectUserResponse{ - Status: succStatus(), + Status: merr.Status(nil), Results: userResults, }, nil } @@ -2411,7 +2413,7 @@ func (c *Core) OperatePrivilege(ctx context.Context, in *milvuspb.OperatePrivile logger.Debug(method + " success") metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds())) - return succStatus(), nil + return merr.Status(nil), nil } // SelectGrant select grant @@ -2455,7 +2457,7 @@ func (c *Core) SelectGrant(ctx context.Context, in *milvuspb.SelectGrantRequest) grantEntities, err := c.meta.SelectGrant(util.DefaultTenant, in.Entity) if common.IsKeyNotExistError(err) { return &milvuspb.SelectGrantResponse{ - Status: succStatus(), + Status: merr.Status(nil), }, nil } if err != nil { @@ -2470,7 +2472,7 @@ func (c *Core) SelectGrant(ctx context.Context, in *milvuspb.SelectGrantRequest) metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds())) return &milvuspb.SelectGrantResponse{ - Status: succStatus(), + Status: merr.Status(nil), Entities: grantEntities, }, nil } @@ -2508,7 +2510,7 @@ func (c *Core) ListPolicy(ctx context.Context, in *internalpb.ListPolicyRequest) metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds())) return &internalpb.ListPolicyResponse{ - Status: succStatus(), + Status: merr.Status(nil), PolicyInfos: policies, UserRoles: userRoles, }, nil @@ -2516,7 +2518,7 @@ func (c *Core) ListPolicy(ctx context.Context, in *internalpb.ListPolicyRequest) func (c *Core) RenameCollection(ctx context.Context, req *milvuspb.RenameCollectionRequest) (*commonpb.Status, error) { if code, ok := c.checkHealthy(); !ok { - return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]), nil + return merr.Status(merr.WrapErrServiceNotReady(code.String())), nil } log := log.Ctx(ctx).With(zap.String("oldCollectionName", req.GetOldName()), zap.String("newCollectionName", req.GetNewName())) @@ -2536,20 +2538,20 @@ func (c *Core) RenameCollection(ctx context.Context, req *milvuspb.RenameCollect if err := c.scheduler.AddTask(t); err != nil { log.Warn("failed to enqueue request to rename collection", zap.Error(err)) metrics.RootCoordDDLReqCounter.WithLabelValues("RenameCollection", metrics.FailLabel).Inc() - return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil + return merr.Status(err), nil } if err := t.WaitToFinish(); err != nil { log.Warn("failed to rename collection", zap.Uint64("ts", t.GetTs()), zap.Error(err)) metrics.RootCoordDDLReqCounter.WithLabelValues("RenameCollection", metrics.FailLabel).Inc() - return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil + return merr.Status(err), nil } metrics.RootCoordDDLReqCounter.WithLabelValues("RenameCollection", metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues("RenameCollection").Observe(float64(tr.ElapseSpan().Milliseconds())) log.Info("done to rename collection", zap.Uint64("ts", t.GetTs())) - return succStatus(), nil + return merr.Status(nil), nil } func (c *Core) CheckHealth(ctx context.Context, in *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { diff --git a/internal/util/merr/errors.go b/internal/util/merr/errors.go index c90c384234..54850e3993 100644 --- a/internal/util/merr/errors.go +++ b/internal/util/merr/errors.go @@ -91,6 +91,9 @@ var ( // Parameter related ErrParameterInvalid = newMilvusError("invalid parameter", 1100, false) + // Metrics related + ErrMetricNotFound = newMilvusError("MetricNotFound", 1200, false) + // Do NOT export this, // never allow programmer using this, keep only for converting unknown error to milvusError errUnexpected = newMilvusError("unexpected error", (1<<16)-1, false) diff --git a/internal/util/merr/errors_test.go b/internal/util/merr/errors_test.go index 483a95e5d6..4b6d04974d 100644 --- a/internal/util/merr/errors_test.go +++ b/internal/util/merr/errors_test.go @@ -21,6 +21,7 @@ import ( "testing" "github.com/cockroachdb/errors" + "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/stretchr/testify/suite" ) @@ -53,7 +54,7 @@ func (s *ErrSuite) TestStatus() { s.ErrorIs(err, restoredErr) s.Equal(int32(0), Status(nil).Code) - s.Nil(Error(successStatus)) + s.Nil(Error(&commonpb.Status{})) } func (s *ErrSuite) TestWrap() { @@ -100,6 +101,9 @@ func (s *ErrSuite) TestWrap() { // Parameter related s.ErrorIs(WrapErrParameterInvalid(8, 1, "failed to create"), ErrParameterInvalid) s.ErrorIs(WrapErrParameterInvalidRange(1, 1<<16, 0, "topk should be in range"), ErrParameterInvalid) + + // Metrics related + s.ErrorIs(WrapErrMetricNotFound("unknown", "failed to get metric"), ErrMetricNotFound) } func (s *ErrSuite) TestCombine() { diff --git a/internal/util/merr/utils.go b/internal/util/merr/utils.go index 56b7bf2ecd..6a4b0d70b6 100644 --- a/internal/util/merr/utils.go +++ b/internal/util/merr/utils.go @@ -26,8 +26,13 @@ import ( "github.com/milvus-io/milvus-proto/go-api/commonpb" ) -// Declare a success status, avoid create it every time -var successStatus = &commonpb.Status{} +var ( + // For compatibility + oldErrCodes = map[int32]commonpb.ErrorCode{ + ErrServiceNotReady.code(): commonpb.ErrorCode_NotReadyServe, + ErrCollectionNotFound.code(): commonpb.ErrorCode_CollectionNotExists, + } +) // Code returns the error code of the given error, // WARN: DO NOT use this for now @@ -60,7 +65,7 @@ func IsRetriable(err error) bool { // returns Success status if err is nil func Status(err error) *commonpb.Status { if err == nil { - return successStatus + return &commonpb.Status{} } return &commonpb.Status{ @@ -274,6 +279,15 @@ func WrapErrParameterInvalidRange[T any](lower, upper, actual T, msg ...string) return err } +// Metrics related +func WrapErrMetricNotFound(name string, msg ...string) error { + err := errors.Wrapf(ErrMetricNotFound, "metric=%s", name) + if len(msg) > 0 { + err = errors.Wrap(err, strings.Join(msg, "; ")) + } + return err +} + func wrapWithField(err error, name string, value any) error { return errors.Wrapf(err, "%s=%v", name, value) } diff --git a/internal/util/paramtable/base_table.go b/internal/util/paramtable/base_table.go index 82054d128a..aa1fc9a57f 100644 --- a/internal/util/paramtable/base_table.go +++ b/internal/util/paramtable/base_table.go @@ -191,6 +191,10 @@ func (gp *BaseTable) Get(key string) string { // GetWithDefault loads an object with @key. If the object does not exist, @defaultValue will be returned. func (gp *BaseTable) GetWithDefault(key, defaultValue string) string { + if gp.mgr == nil { + return defaultValue + } + str, err := gp.mgr.GetConfig(key) if err != nil { return defaultValue