diff --git a/.golangci.yml b/.golangci.yml index 94071dde65..fe64c20716 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -83,6 +83,7 @@ linters-settings: - 'merr\.Wrap\w+\(\)\.Error\(\)' - '\.(ErrorCode|Reason) = ' - 'Reason:\s+\w+\.Error\(\)' + - 'errors.New\((.+)\.GetReason\(\)\)' #- 'fmt\.Print.*' WIP issues: diff --git a/internal/datacoord/index_builder.go b/internal/datacoord/index_builder.go index 59cc15db81..67752479db 100644 --- a/internal/datacoord/index_builder.go +++ b/internal/datacoord/index_builder.go @@ -22,7 +22,6 @@ import ( "sync" "time" - "github.com/cockroachdb/errors" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -31,6 +30,7 @@ import ( "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/merr" ) type indexTaskState int32 @@ -430,15 +430,14 @@ func (ib *indexBuilder) assignTask(builderClient types.IndexNodeClient, req *ind ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval) defer cancel() resp, err := builderClient.CreateJob(ctx, req) + if err == nil { + err = merr.Error(resp) + } if err != nil { log.Error("IndexCoord assignmentTasksLoop builderClient.CreateIndex failed", zap.Error(err)) return err } - if resp.GetErrorCode() != commonpb.ErrorCode_Success { - log.Error("IndexCoord assignmentTasksLoop builderClient.CreateIndex failed", zap.String("Reason", resp.GetReason())) - return errors.New(resp.GetReason()) - } return nil } diff --git a/internal/datacoord/index_service_test.go b/internal/datacoord/index_service_test.go index 87a5f3c443..ba97ef5fdd 100644 --- a/internal/datacoord/index_service_test.go +++ b/internal/datacoord/index_service_test.go @@ -197,7 +197,7 @@ func TestServer_GetIndexState(t *testing.T) { }) s.stateCode.Store(commonpb.StateCode_Healthy) - t.Run("index not exist", func(t *testing.T) { + t.Run("index not found", func(t *testing.T) { resp, err := s.GetIndexState(ctx, req) assert.NoError(t, err) assert.Equal(t, commonpb.ErrorCode_IndexNotExist, resp.GetStatus().GetErrorCode()) diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index 082831e08e..67d2b3e330 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -34,7 +34,7 @@ import ( "github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/util/commonpbutil" "github.com/milvus-io/milvus/pkg/util/conc" - "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/retry" ) @@ -149,7 +149,7 @@ func getSegmentInfos(ctx context.Context, datacoord types.DataCoordClient, segme SegmentIDs: segmentIDs, IncludeUnHealthy: true, }) - if err := funcutil.VerifyResponse(infoResp, err); err != nil { + if err := merr.CheckRpcCall(infoResp, err); err != nil { log.Error("Fail to get SegmentInfo by ids from datacoord", zap.Error(err)) return nil, err } diff --git a/internal/datanode/flow_graph_time_tick_node.go b/internal/datanode/flow_graph_time_tick_node.go index 7e67f6dbcc..240bde90d4 100644 --- a/internal/datanode/flow_graph_time_tick_node.go +++ b/internal/datanode/flow_graph_time_tick_node.go @@ -33,7 +33,7 @@ import ( "github.com/milvus-io/milvus/internal/util/flowgraph" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/commonpbutil" - "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/tsoutil" ) @@ -131,7 +131,7 @@ func (ttn *ttNode) updateChannelCP(channelPos *msgpb.MsgPosition, curTs time.Tim VChannel: ttn.vChannelName, Position: channelPos, }) - if err = funcutil.VerifyResponse(resp, err); err != nil { + if err = merr.CheckRpcCall(resp, err); err != nil { log.Warn("UpdateChannelCheckpoint failed", zap.String("channel", ttn.vChannelName), zap.Time("channelCPTs", channelCPTs), zap.Error(err)) return err diff --git a/internal/datanode/flush_manager.go b/internal/datanode/flush_manager.go index c7d4f4448f..b9a836b9da 100644 --- a/internal/datanode/flush_manager.go +++ b/internal/datanode/flush_manager.go @@ -922,24 +922,26 @@ func flushNotifyFunc(dsService *dataSyncService, opts ...retry.Option) notifyMet return err } + err = merr.Error(rsp) + // Segment not found during stale segment flush. Segment might get compacted already. // Stop retry and still proceed to the end, ignoring this error. - if !pack.flushed && rsp.GetErrorCode() == commonpb.ErrorCode_SegmentNotFound { + if !pack.flushed && errors.Is(err, merr.ErrSegmentNotFound) { log.Warn("stale segment not found, could be compacted", zap.Int64("segmentID", pack.segmentID)) log.Warn("failed to SaveBinlogPaths", zap.Int64("segmentID", pack.segmentID), - zap.Error(errors.New(rsp.GetReason()))) + zap.Error(err)) return nil } // meta error, datanode handles a virtual channel does not belong here - if rsp.GetErrorCode() == commonpb.ErrorCode_MetaFailed { + if errors.IsAny(err, merr.ErrSegmentNotFound, merr.ErrChannelNotFound) { log.Warn("meta error found, skip sync and start to drop virtual channel", zap.String("channel", dsService.vchannelName)) return nil } - if rsp.ErrorCode != commonpb.ErrorCode_Success { - return fmt.Errorf("data service save bin log path failed, reason = %s", rsp.Reason) + if err != nil { + return err } dsService.channel.transferNewSegments(lo.Map(startPos, func(pos *datapb.SegmentStartPosition, _ int) UniqueID { diff --git a/internal/datanode/flush_manager_test.go b/internal/datanode/flush_manager_test.go index e7667c7102..96fac32b8c 100644 --- a/internal/datanode/flush_manager_test.go +++ b/internal/datanode/flush_manager_test.go @@ -35,6 +35,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/retry" ) @@ -692,14 +693,14 @@ func TestFlushNotifyFunc(t *testing.T) { }) t.Run("datacoord save fails", func(t *testing.T) { - dataCoord.SaveBinlogPathStatus = commonpb.ErrorCode_UnexpectedError + dataCoord.SaveBinlogPathStatus = merr.Status(merr.WrapErrCollectionNotFound("collection")) assert.Panics(t, func() { notifyFunc(&segmentFlushPack{}) }) }) t.Run("stale segment not found", func(t *testing.T) { - dataCoord.SaveBinlogPathStatus = commonpb.ErrorCode_SegmentNotFound + dataCoord.SaveBinlogPathStatus = merr.Status(merr.WrapErrSegmentNotFound(100)) assert.NotPanics(t, func() { notifyFunc(&segmentFlushPack{flushed: false}) }) @@ -708,7 +709,7 @@ func TestFlushNotifyFunc(t *testing.T) { // issue https://github.com/milvus-io/milvus/issues/17097 // meta error, datanode shall not panic, just drop the virtual channel t.Run("datacoord found meta error", func(t *testing.T) { - dataCoord.SaveBinlogPathStatus = commonpb.ErrorCode_MetaFailed + dataCoord.SaveBinlogPathStatus = merr.Status(merr.WrapErrChannelNotFound("channel")) assert.NotPanics(t, func() { notifyFunc(&segmentFlushPack{}) }) diff --git a/internal/datanode/mock_test.go b/internal/datanode/mock_test.go index fcad1b7363..a494385eba 100644 --- a/internal/datanode/mock_test.go +++ b/internal/datanode/mock_test.go @@ -180,12 +180,12 @@ type DataCoordFactory struct { types.DataCoordClient SaveBinlogPathError bool - SaveBinlogPathStatus commonpb.ErrorCode + SaveBinlogPathStatus *commonpb.Status CompleteCompactionError bool CompleteCompactionNotSuccess bool + DropVirtualChannelError bool - DropVirtualChannelError bool DropVirtualChannelStatus commonpb.ErrorCode GetSegmentInfosError bool @@ -237,7 +237,7 @@ func (ds *DataCoordFactory) SaveBinlogPaths(ctx context.Context, req *datapb.Sav if ds.SaveBinlogPathError { return nil, errors.New("Error") } - return &commonpb.Status{ErrorCode: ds.SaveBinlogPathStatus}, nil + return ds.SaveBinlogPathStatus, nil } func (ds *DataCoordFactory) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest, opts ...grpc.CallOption) (*datapb.DropVirtualChannelResponse, error) { @@ -967,7 +967,7 @@ func (m *RootCoordFactory) AllocID(ctx context.Context, in *rootcoordpb.AllocIDR } if m.ID == -1 { - return nil, errors.New(resp.Status.GetReason()) + return nil, merr.Error(resp.Status) } resp.ID = m.ID @@ -1014,8 +1014,7 @@ func (m *RootCoordFactory) DescribeCollectionInternal(ctx context.Context, in *m } if m.collectionID == -1 { - resp.Status.ErrorCode = commonpb.ErrorCode_Success - return resp, errors.New(resp.Status.GetReason()) + return nil, merr.Error(resp.Status) } resp.CollectionID = m.collectionID diff --git a/internal/datanode/services.go b/internal/datanode/services.go index d656b5d3ab..4daf985787 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -582,15 +582,14 @@ func (node *DataNode) getPartitions(ctx context.Context, dbName string, collecti zap.String("collectionName", collectionName), } resp, err := node.rootCoord.ShowPartitions(ctx, req) + if err == nil { + err = merr.Error(resp.GetStatus()) + } if err != nil { logFields = append(logFields, zap.Error(err)) log.Warn("failed to get partitions of collection", logFields...) return nil, err } - if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - log.Warn("failed to get partitions of collection", logFields...) - return nil, errors.New(resp.GetStatus().GetReason()) - } partitionNames := resp.GetPartitionNames() partitionIDs := resp.GetPartitionIDs() @@ -1042,7 +1041,7 @@ func reportImportFunc(node *DataNode) importutil.ReportFunc { return err } if status.GetErrorCode() != commonpb.ErrorCode_Success { - return errors.New(status.GetReason()) + return merr.Error(status) } return nil }, retry.Attempts(node.reportImportRetryTimes)) diff --git a/internal/datanode/timetick_sender.go b/internal/datanode/timetick_sender.go index 5d67263a98..45e58c5a61 100644 --- a/internal/datanode/timetick_sender.go +++ b/internal/datanode/timetick_sender.go @@ -21,7 +21,6 @@ import ( "sync" "time" - "github.com/cockroachdb/errors" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -30,6 +29,7 @@ import ( "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/commonpbutil" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/retry" "github.com/milvus-io/milvus/pkg/util/tsoutil" ) @@ -160,7 +160,7 @@ func (m *timeTickSender) sendReport(ctx context.Context) error { log.RatedDebug(30, "timeTickSender send datanode timetick message", zap.Any("toSendMsgs", toSendMsgs), zap.Any("sendLastTss", sendLastTss)) err := retry.Do(ctx, func() error { submitTs := tsoutil.ComposeTSByTime(time.Now(), 0) - statusResp, err := m.dataCoord.ReportDataNodeTtMsgs(ctx, &datapb.ReportDataNodeTtMsgsRequest{ + status, err := m.dataCoord.ReportDataNodeTtMsgs(ctx, &datapb.ReportDataNodeTtMsgsRequest{ Base: commonpbutil.NewMsgBase( commonpbutil.WithMsgType(commonpb.MsgType_DataNodeTt), commonpbutil.WithTimeStamp(submitTs), @@ -168,17 +168,13 @@ func (m *timeTickSender) sendReport(ctx context.Context) error { ), Msgs: toSendMsgs, }) + if err == nil { + err = merr.Error(status) + } if err != nil { log.Warn("error happen when ReportDataNodeTtMsgs", zap.Error(err)) return err } - if statusResp.GetErrorCode() != commonpb.ErrorCode_Success { - log.Warn("ReportDataNodeTtMsgs resp status not succeed", - zap.String("error_code", statusResp.GetErrorCode().String()), - zap.Int32("code", statusResp.GetCode()), - zap.String("reason", statusResp.GetReason())) - return errors.New(statusResp.GetReason()) - } return nil }, retry.Attempts(20), retry.Sleep(time.Millisecond*100)) if err != nil { diff --git a/internal/distributed/proxy/httpserver/handler_v1.go b/internal/distributed/proxy/httpserver/handler_v1.go index f3075bafbd..256e444d2f 100644 --- a/internal/distributed/proxy/httpserver/handler_v1.go +++ b/internal/distributed/proxy/httpserver/handler_v1.go @@ -5,7 +5,6 @@ import ( "net/http" "strconv" - "github.com/cockroachdb/errors" "github.com/gin-gonic/gin" "github.com/gin-gonic/gin/binding" "github.com/golang/protobuf/proto" @@ -25,12 +24,12 @@ func checkAuthorization(c *gin.Context, req interface{}) error { if proxy.Params.CommonCfg.AuthorizationEnabled.GetAsBool() { username, ok := c.Get(ContextUsername) if !ok { - c.JSON(http.StatusUnauthorized, gin.H{HTTPReturnCode: Code(merr.ErrNeedAuthenticate), HTTPReturnMessage: merr.ErrNeedAuthenticate.Error()}) + c.JSON(http.StatusUnauthorized, gin.H{HTTPReturnCode: merr.Code(merr.ErrNeedAuthenticate), HTTPReturnMessage: merr.ErrNeedAuthenticate.Error()}) return merr.ErrNeedAuthenticate } _, authErr := proxy.PrivilegeInterceptorWithUsername(c, username.(string), req) if authErr != nil { - c.JSON(http.StatusForbidden, gin.H{HTTPReturnCode: Code(authErr), HTTPReturnMessage: authErr.Error()}) + c.JSON(http.StatusForbidden, gin.H{HTTPReturnCode: merr.Code(authErr), HTTPReturnMessage: authErr.Error()}) return authErr } } @@ -42,11 +41,11 @@ func (h *Handlers) checkDatabase(c *gin.Context, dbName string) bool { return true } response, err := h.proxy.ListDatabases(c, &milvuspb.ListDatabasesRequest{}) + if err == nil { + err = merr.Error(response.GetStatus()) + } if err != nil { - c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: Code(err), HTTPReturnMessage: err.Error()}) - return false - } else if response.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: int32(response.GetStatus().GetErrorCode()), HTTPReturnMessage: response.GetStatus().GetReason()}) + c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()}) return false } for _, db := range response.DbNames { @@ -54,7 +53,7 @@ func (h *Handlers) checkDatabase(c *gin.Context, dbName string) bool { return true } } - c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: Code(merr.ErrDatabaseNotFound), HTTPReturnMessage: merr.ErrDatabaseNotFound.Error()}) + c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(merr.ErrDatabaseNotFound), HTTPReturnMessage: merr.ErrDatabaseNotFound.Error()}) return false } @@ -69,12 +68,12 @@ func (h *Handlers) describeCollection(c *gin.Context, dbName string, collectionN } } response, err := h.proxy.DescribeCollection(c, &req) + if err == nil { + err = merr.Error(response.GetStatus()) + } if err != nil { - c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: Code(err), HTTPReturnMessage: err.Error()}) + c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()}) return nil, err - } else if response.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: int32(response.GetStatus().GetErrorCode()), HTTPReturnMessage: response.GetStatus().GetReason()}) - return nil, errors.New(response.GetStatus().GetReason()) } primaryField, ok := getPrimaryField(response.Schema) if ok && primaryField.AutoID && !response.Schema.AutoID { @@ -90,15 +89,14 @@ func (h *Handlers) hasCollection(c *gin.Context, dbName string, collectionName s CollectionName: collectionName, } response, err := h.proxy.HasCollection(c, &req) - if err != nil { - c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: Code(err), HTTPReturnMessage: err.Error()}) - return false, err - } else if response.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: int32(response.GetStatus().GetErrorCode()), HTTPReturnMessage: response.GetStatus().GetReason()}) - return false, errors.New(response.GetStatus().GetReason()) - } else { - return response.Value, nil + if err == nil { + err = merr.Error(response.GetStatus()) } + if err != nil { + c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()}) + return false, err + } + return response.Value, nil } func (h *Handlers) RegisterRoutesToV1(router gin.IRouter) { @@ -125,19 +123,20 @@ func (h *Handlers) listCollections(c *gin.Context) { return } response, err := h.proxy.ShowCollections(c, &req) - if err != nil { - c.JSON(http.StatusOK, gin.H{HTTPReturnCode: Code(err), HTTPReturnMessage: err.Error()}) - } else if response.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - c.JSON(http.StatusOK, gin.H{HTTPReturnCode: int32(response.GetStatus().GetErrorCode()), HTTPReturnMessage: response.GetStatus().GetReason()}) - } else { - var collections []string - if response.CollectionNames != nil { - collections = response.CollectionNames - } else { - collections = []string{} - } - c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: collections}) + if err == nil { + err = merr.Error(response.GetStatus()) } + if err != nil { + c.JSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()}) + return + } + var collections []string + if response.CollectionNames != nil { + collections = response.CollectionNames + } else { + collections = []string{} + } + c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: collections}) } func (h *Handlers) createCollection(c *gin.Context) { @@ -149,12 +148,12 @@ func (h *Handlers) createCollection(c *gin.Context) { } if err := c.ShouldBindBodyWith(&httpReq, binding.JSON); err != nil { log.Warn("high level restful api, the parameter of create collection is incorrect", zap.Any("request", httpReq), zap.Error(err)) - c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: Code(merr.ErrIncorrectParameterFormat), HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error()}) + c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(merr.ErrIncorrectParameterFormat), HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error()}) return } if httpReq.CollectionName == "" || httpReq.Dimension == 0 { log.Warn("high level restful api, create collection require parameters: [collectionName, dimension], but miss", zap.Any("request", httpReq)) - c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: Code(merr.ErrMissingRequiredParameters), HTTPReturnMessage: merr.ErrMissingRequiredParameters.Error()}) + c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(merr.ErrMissingRequiredParameters), HTTPReturnMessage: merr.ErrMissingRequiredParameters.Error()}) return } schema, err := proto.Marshal(&schemapb.CollectionSchema{ @@ -186,7 +185,7 @@ func (h *Handlers) createCollection(c *gin.Context) { }) if err != nil { log.Warn("high level restful api, marshal collection schema fail", zap.Any("request", httpReq), zap.Error(err)) - c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: Code(merr.ErrMarshalCollectionSchema), HTTPReturnMessage: merr.ErrMarshalCollectionSchema.Error()}) + c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(merr.ErrMarshalCollectionSchema), HTTPReturnMessage: merr.ErrMarshalCollectionSchema.Error()}) return } req := milvuspb.CreateCollectionRequest{ @@ -203,11 +202,11 @@ func (h *Handlers) createCollection(c *gin.Context) { return } response, err := h.proxy.CreateCollection(c, &req) + if err == nil { + err = merr.Error(response) + } if err != nil { - c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: Code(err), HTTPReturnMessage: err.Error()}) - return - } else if response.ErrorCode != commonpb.ErrorCode_Success { - c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: int32(response.ErrorCode), HTTPReturnMessage: response.Reason}) + c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()}) return } @@ -218,22 +217,22 @@ func (h *Handlers) createCollection(c *gin.Context) { IndexName: DefaultIndexName, ExtraParams: []*commonpb.KeyValuePair{{Key: common.MetricTypeKey, Value: httpReq.MetricType}}, }) + if err == nil { + err = merr.Error(response) + } if err != nil { - c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: Code(err), HTTPReturnMessage: err.Error()}) - return - } else if response.ErrorCode != commonpb.ErrorCode_Success { - c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: int32(response.ErrorCode), HTTPReturnMessage: response.Reason}) + c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()}) return } response, err = h.proxy.LoadCollection(c, &milvuspb.LoadCollectionRequest{ DbName: httpReq.DbName, CollectionName: httpReq.CollectionName, }) + if err == nil { + err = merr.Error(response) + } if err != nil { - c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: Code(err), HTTPReturnMessage: err.Error()}) - return - } else if response.ErrorCode != commonpb.ErrorCode_Success { - c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: int32(response.ErrorCode), HTTPReturnMessage: response.Reason}) + c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()}) return } c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{}}) @@ -243,7 +242,7 @@ func (h *Handlers) getCollectionDetails(c *gin.Context) { collectionName := c.Query(HTTPCollectionName) if collectionName == "" { log.Warn("high level restful api, desc collection require parameter: [collectionName], but miss") - c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: Code(merr.ErrMissingRequiredParameters), HTTPReturnMessage: merr.ErrMissingRequiredParameters.Error()}) + c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(merr.ErrMissingRequiredParameters), HTTPReturnMessage: merr.ErrMissingRequiredParameters.Error()}) return } dbName := c.DefaultQuery(HTTPDbName, DefaultDbName) @@ -254,15 +253,19 @@ func (h *Handlers) getCollectionDetails(c *gin.Context) { if err != nil { return } - stateResp, stateErr := h.proxy.GetLoadState(c, &milvuspb.GetLoadStateRequest{ + stateResp, err := h.proxy.GetLoadState(c, &milvuspb.GetLoadStateRequest{ DbName: dbName, CollectionName: collectionName, }) collLoadState := "" - if stateErr != nil { - log.Warn("get collection load state fail", zap.Error(stateErr)) - } else if stateResp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - log.Warn("get collection load state fail", zap.String("collection", collectionName), zap.String("err", stateResp.GetStatus().GetReason())) + if err == nil { + err = merr.Error(stateResp.GetStatus()) + } + if err != nil { + log.Warn("get collection load state fail", + zap.String("collection", collectionName), + zap.Error(err), + ) } else { collLoadState = stateResp.State.String() } @@ -273,18 +276,22 @@ func (h *Handlers) getCollectionDetails(c *gin.Context) { break } } - indexResp, indexErr := h.proxy.DescribeIndex(c, &milvuspb.DescribeIndexRequest{ + indexResp, err := h.proxy.DescribeIndex(c, &milvuspb.DescribeIndexRequest{ DbName: dbName, CollectionName: collectionName, FieldName: vectorField, }) + if err == nil { + err = merr.Error(indexResp.GetStatus()) + } var indexDesc []gin.H - if indexErr != nil { + if err != nil { indexDesc = []gin.H{} - log.Warn("get indexes description fail", zap.Error(indexErr)) - } else if indexResp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - indexDesc = []gin.H{} - log.Warn("get indexes description fail", zap.String("collection", collectionName), zap.String("vectorField", vectorField), zap.String("err", indexResp.GetStatus().GetReason())) + log.Warn("get indexes description fail", + zap.String("collection", collectionName), + zap.String("vectorField", vectorField), + zap.Error(err), + ) } else { indexDesc = printIndexes(indexResp.IndexDescriptions) } @@ -305,12 +312,12 @@ func (h *Handlers) dropCollection(c *gin.Context) { } if err := c.ShouldBindBodyWith(&httpReq, binding.JSON); err != nil { log.Warn("high level restful api, the parameter of drop collection is incorrect", zap.Any("request", httpReq), zap.Error(err)) - c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: Code(merr.ErrIncorrectParameterFormat), HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error()}) + c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(merr.ErrIncorrectParameterFormat), HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error()}) return } if httpReq.CollectionName == "" { log.Warn("high level restful api, drop collection require parameter: [collectionName], but miss") - c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: Code(merr.ErrMissingRequiredParameters), HTTPReturnMessage: merr.ErrMissingRequiredParameters.Error()}) + c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(merr.ErrMissingRequiredParameters), HTTPReturnMessage: merr.ErrMissingRequiredParameters.Error()}) return } req := milvuspb.DropCollectionRequest{ @@ -328,14 +335,15 @@ func (h *Handlers) dropCollection(c *gin.Context) { return } if !has { - c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: Code(merr.ErrCollectionNotFound), HTTPReturnMessage: merr.ErrCollectionNotFound.Error()}) + c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(merr.ErrCollectionNotFound), HTTPReturnMessage: merr.ErrCollectionNotFound.Error()}) return } response, err := h.proxy.DropCollection(c, &req) + if err == nil { + err = merr.Error(response) + } if err != nil { - c.JSON(http.StatusOK, gin.H{HTTPReturnCode: Code(err), HTTPReturnMessage: err.Error()}) - } else if response.ErrorCode != commonpb.ErrorCode_Success { - c.JSON(http.StatusOK, gin.H{HTTPReturnCode: int32(response.ErrorCode), HTTPReturnMessage: response.Reason}) + c.JSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()}) } else { c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{}}) } @@ -349,12 +357,12 @@ func (h *Handlers) query(c *gin.Context) { } if err := c.ShouldBindBodyWith(&httpReq, binding.JSON); err != nil { log.Warn("high level restful api, the parameter of query is incorrect", zap.Any("request", httpReq), zap.Error(err)) - c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: Code(merr.ErrIncorrectParameterFormat), HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error()}) + c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(merr.ErrIncorrectParameterFormat), HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error()}) return } if httpReq.CollectionName == "" || httpReq.Filter == "" { log.Warn("high level restful api, query require parameter: [collectionName, filter], but miss") - c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: Code(merr.ErrMissingRequiredParameters), HTTPReturnMessage: merr.ErrMissingRequiredParameters.Error()}) + c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(merr.ErrMissingRequiredParameters), HTTPReturnMessage: merr.ErrMissingRequiredParameters.Error()}) return } req := milvuspb.QueryRequest{ @@ -378,15 +386,16 @@ func (h *Handlers) query(c *gin.Context) { return } response, err := h.proxy.Query(c, &req) + if err == nil { + err = merr.Error(response.GetStatus()) + } if err != nil { - c.JSON(http.StatusOK, gin.H{HTTPReturnCode: Code(err), HTTPReturnMessage: err.Error()}) - } else if response.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - c.JSON(http.StatusOK, gin.H{HTTPReturnCode: int32(response.GetStatus().GetErrorCode()), HTTPReturnMessage: response.GetStatus().GetReason()}) + c.JSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()}) } else { outputData, err := buildQueryResp(int64(0), response.OutputFields, response.FieldsData, nil, nil) if err != nil { log.Warn("high level restful api, fail to deal with query result", zap.Any("response", response), zap.Error(err)) - c.JSON(http.StatusOK, gin.H{HTTPReturnCode: Code(merr.ErrInvalidSearchResult), HTTPReturnMessage: merr.ErrInvalidSearchResult.Error()}) + c.JSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(merr.ErrInvalidSearchResult), HTTPReturnMessage: merr.ErrInvalidSearchResult.Error()}) } else { c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: outputData}) } @@ -400,12 +409,12 @@ func (h *Handlers) get(c *gin.Context) { } if err := c.ShouldBindBodyWith(&httpReq, binding.JSON); err != nil { log.Warn("high level restful api, the parameter of get is incorrect", zap.Any("request", httpReq), zap.Error(err)) - c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: Code(merr.ErrIncorrectParameterFormat), HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error()}) + c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(merr.ErrIncorrectParameterFormat), HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error()}) return } if httpReq.CollectionName == "" || httpReq.ID == nil { log.Warn("high level restful api, get require parameter: [collectionName, id], but miss") - c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: Code(merr.ErrMissingRequiredParameters), HTTPReturnMessage: merr.ErrMissingRequiredParameters.Error()}) + c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(merr.ErrMissingRequiredParameters), HTTPReturnMessage: merr.ErrMissingRequiredParameters.Error()}) return } req := milvuspb.QueryRequest{ @@ -427,20 +436,21 @@ func (h *Handlers) get(c *gin.Context) { body, _ := c.Get(gin.BodyBytesKey) filter, err := checkGetPrimaryKey(coll.Schema, gjson.Get(string(body.([]byte)), DefaultPrimaryFieldName)) if err != nil { - c.JSON(http.StatusOK, gin.H{HTTPReturnCode: Code(merr.ErrCheckPrimaryKey), HTTPReturnMessage: merr.ErrCheckPrimaryKey.Error()}) + c.JSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(merr.ErrCheckPrimaryKey), HTTPReturnMessage: merr.ErrCheckPrimaryKey.Error()}) return } req.Expr = filter response, err := h.proxy.Query(c, &req) + if err == nil { + err = merr.Error(response.GetStatus()) + } if err != nil { - c.JSON(http.StatusOK, gin.H{HTTPReturnCode: Code(err), HTTPReturnMessage: err.Error()}) - } else if response.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - c.JSON(http.StatusOK, gin.H{HTTPReturnCode: int32(response.GetStatus().GetErrorCode()), HTTPReturnMessage: response.GetStatus().GetReason()}) + c.JSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()}) } else { outputData, err := buildQueryResp(int64(0), response.OutputFields, response.FieldsData, nil, nil) if err != nil { log.Warn("high level restful api, fail to deal with get result", zap.Any("response", response), zap.Error(err)) - c.JSON(http.StatusOK, gin.H{HTTPReturnCode: Code(merr.ErrInvalidSearchResult), HTTPReturnMessage: merr.ErrInvalidSearchResult.Error()}) + c.JSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(merr.ErrInvalidSearchResult), HTTPReturnMessage: merr.ErrInvalidSearchResult.Error()}) } else { c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: outputData}) log.Error("get resultIS: ", zap.Any("res", outputData)) @@ -454,12 +464,12 @@ func (h *Handlers) delete(c *gin.Context) { } if err := c.ShouldBindBodyWith(&httpReq, binding.JSON); err != nil { log.Warn("high level restful api, the parameter of delete is incorrect", zap.Any("request", httpReq), zap.Error(err)) - c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: Code(merr.ErrIncorrectParameterFormat), HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error()}) + c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(merr.ErrIncorrectParameterFormat), HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error()}) return } if httpReq.CollectionName == "" || httpReq.ID == nil { log.Warn("high level restful api, delete require parameter: [collectionName, id], but miss") - c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: Code(merr.ErrMissingRequiredParameters), HTTPReturnMessage: merr.ErrMissingRequiredParameters.Error()}) + c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(merr.ErrMissingRequiredParameters), HTTPReturnMessage: merr.ErrMissingRequiredParameters.Error()}) return } req := milvuspb.DeleteRequest{ @@ -479,15 +489,16 @@ func (h *Handlers) delete(c *gin.Context) { body, _ := c.Get(gin.BodyBytesKey) filter, err := checkGetPrimaryKey(coll.Schema, gjson.Get(string(body.([]byte)), DefaultPrimaryFieldName)) if err != nil { - c.JSON(http.StatusOK, gin.H{HTTPReturnCode: Code(merr.ErrCheckPrimaryKey), HTTPReturnMessage: merr.ErrCheckPrimaryKey.Error()}) + c.JSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(merr.ErrCheckPrimaryKey), HTTPReturnMessage: merr.ErrCheckPrimaryKey.Error()}) return } req.Expr = filter response, err := h.proxy.Delete(c, &req) + if err == nil { + err = merr.Error(response.GetStatus()) + } if err != nil { - c.JSON(http.StatusOK, gin.H{HTTPReturnCode: Code(err), HTTPReturnMessage: err.Error()}) - } else if response.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - c.JSON(http.StatusOK, gin.H{HTTPReturnCode: int32(response.GetStatus().GetErrorCode()), HTTPReturnMessage: response.GetStatus().GetReason()}) + c.JSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()}) } else { c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{}}) } @@ -503,7 +514,7 @@ func (h *Handlers) insert(c *gin.Context) { } if err = c.ShouldBindBodyWith(&singleInsertReq, binding.JSON); err != nil { log.Warn("high level restful api, the parameter of insert is incorrect", zap.Any("request", httpReq), zap.Error(err)) - c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: Code(merr.ErrIncorrectParameterFormat), HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error()}) + c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(merr.ErrIncorrectParameterFormat), HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error()}) return } httpReq.DbName = singleInsertReq.DbName @@ -512,7 +523,7 @@ func (h *Handlers) insert(c *gin.Context) { } if httpReq.CollectionName == "" || httpReq.Data == nil { log.Warn("high level restful api, insert require parameter: [collectionName, data], but miss") - c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: Code(merr.ErrMissingRequiredParameters), HTTPReturnMessage: merr.ErrMissingRequiredParameters.Error()}) + c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(merr.ErrMissingRequiredParameters), HTTPReturnMessage: merr.ErrMissingRequiredParameters.Error()}) return } req := milvuspb.InsertRequest{ @@ -535,20 +546,21 @@ func (h *Handlers) insert(c *gin.Context) { err = checkAndSetData(string(body.([]byte)), coll, &httpReq) if err != nil { log.Warn("high level restful api, fail to deal with insert data", zap.Any("body", body), zap.Error(err)) - c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: Code(merr.ErrInvalidInsertData), HTTPReturnMessage: merr.ErrInvalidInsertData.Error()}) + c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(merr.ErrInvalidInsertData), HTTPReturnMessage: merr.ErrInvalidInsertData.Error()}) return } req.FieldsData, err = anyToColumns(httpReq.Data, coll.Schema) if err != nil { log.Warn("high level restful api, fail to deal with insert data", zap.Any("data", httpReq.Data), zap.Error(err)) - c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: Code(merr.ErrInvalidInsertData), HTTPReturnMessage: merr.ErrInvalidInsertData.Error()}) + c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(merr.ErrInvalidInsertData), HTTPReturnMessage: merr.ErrInvalidInsertData.Error()}) return } response, err := h.proxy.Insert(c, &req) + if err == nil { + err = merr.Error(response.GetStatus()) + } if err != nil { - c.JSON(http.StatusOK, gin.H{HTTPReturnCode: Code(err), HTTPReturnMessage: err.Error()}) - } else if response.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - c.JSON(http.StatusOK, gin.H{HTTPReturnCode: int32(response.GetStatus().GetErrorCode()), HTTPReturnMessage: response.GetStatus().GetReason()}) + c.JSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()}) } else { switch response.IDs.GetIdField().(type) { case *schemapb.IDs_IntId: @@ -556,7 +568,7 @@ func (h *Handlers) insert(c *gin.Context) { case *schemapb.IDs_StrId: c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{"insertCount": response.InsertCnt, "insertIds": response.IDs.IdField.(*schemapb.IDs_StrId).StrId.Data}}) default: - c.JSON(http.StatusOK, gin.H{HTTPReturnCode: Code(merr.ErrCheckPrimaryKey), HTTPReturnMessage: merr.ErrCheckPrimaryKey.Error()}) + c.JSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(merr.ErrCheckPrimaryKey), HTTPReturnMessage: merr.ErrCheckPrimaryKey.Error()}) } } } @@ -568,12 +580,12 @@ func (h *Handlers) search(c *gin.Context) { } if err := c.ShouldBindBodyWith(&httpReq, binding.JSON); err != nil { log.Warn("high level restful api, the parameter of search is incorrect", zap.Any("request", httpReq), zap.Error(err)) - c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: Code(merr.ErrIncorrectParameterFormat), HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error()}) + c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(merr.ErrIncorrectParameterFormat), HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error()}) return } if httpReq.CollectionName == "" || httpReq.Vector == nil { log.Warn("high level restful api, search require parameter: [collectionName, vector], but miss") - c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: Code(merr.ErrMissingRequiredParameters), HTTPReturnMessage: merr.ErrMissingRequiredParameters.Error()}) + c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(merr.ErrMissingRequiredParameters), HTTPReturnMessage: merr.ErrMissingRequiredParameters.Error()}) return } params := map[string]interface{}{ // auto generated mapping @@ -604,10 +616,11 @@ func (h *Handlers) search(c *gin.Context) { return } response, err := h.proxy.Search(c, &req) + if err == nil { + err = merr.Error(response.GetStatus()) + } if err != nil { - c.JSON(http.StatusOK, gin.H{HTTPReturnCode: Code(err), HTTPReturnMessage: err.Error()}) - } else if response.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - c.JSON(http.StatusOK, gin.H{HTTPReturnCode: int32(response.GetStatus().GetErrorCode()), HTTPReturnMessage: response.GetStatus().GetReason()}) + c.JSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()}) } else { if response.Results.TopK == int64(0) { c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: []interface{}{}}) @@ -615,7 +628,7 @@ func (h *Handlers) search(c *gin.Context) { outputData, err := buildQueryResp(response.Results.TopK, response.Results.OutputFields, response.Results.FieldsData, response.Results.Ids, response.Results.Scores) if err != nil { log.Warn("high level restful api, fail to deal with search result", zap.Any("result", response.Results), zap.Error(err)) - c.JSON(http.StatusOK, gin.H{HTTPReturnCode: Code(merr.ErrInvalidSearchResult), HTTPReturnMessage: merr.ErrInvalidSearchResult.Error()}) + c.JSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(merr.ErrInvalidSearchResult), HTTPReturnMessage: merr.ErrInvalidSearchResult.Error()}) } else { c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: outputData}) } diff --git a/internal/distributed/proxy/httpserver/handler_v1_test.go b/internal/distributed/proxy/httpserver/handler_v1_test.go index 1f09b36007..8f29d6b3bb 100644 --- a/internal/distributed/proxy/httpserver/handler_v1_test.go +++ b/internal/distributed/proxy/httpserver/handler_v1_test.go @@ -105,9 +105,9 @@ func genAuthMiddleWare(needAuth bool) gin.HandlerFunc { return func(c *gin.Context) { username, password, ok := ParseUsernamePassword(c) if !ok { - c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{HTTPReturnCode: Code(merr.ErrNeedAuthenticate), HTTPReturnMessage: merr.ErrNeedAuthenticate.Error()}) + c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{HTTPReturnCode: merr.Code(merr.ErrNeedAuthenticate), HTTPReturnMessage: merr.ErrNeedAuthenticate.Error()}) } else if username == util.UserRoot && password != util.DefaultRootPassword { - c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{HTTPReturnCode: Code(merr.ErrNeedAuthenticate), HTTPReturnMessage: merr.ErrNeedAuthenticate.Error()}) + c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{HTTPReturnCode: merr.Code(merr.ErrNeedAuthenticate), HTTPReturnMessage: merr.ErrNeedAuthenticate.Error()}) } else { c.Set(ContextUsername, username) } @@ -123,7 +123,7 @@ func Print(code int32, message string) string { } func PrintErr(err error) string { - return Print(Code(err), err.Error()) + return Print(merr.Code(err), err.Error()) } func TestVectorAuthenticate(t *testing.T) { @@ -185,19 +185,16 @@ func TestVectorListCollection(t *testing.T) { expectedBody: PrintErr(ErrDefault), }) - reason := "cannot create folder" mp1 := mocks.NewMockProxy(t) + err := merr.WrapErrIoFailedReason("cannot create folder") mp1.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(&milvuspb.ShowCollectionsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_CannotCreateFolder, - Reason: reason, - }, + Status: merr.Status(err), }, nil).Once() testCases = append(testCases, testCase{ name: "show collections fail", mp: mp1, exceptCode: 200, - expectedBody: Print(int32(commonpb.ErrorCode_CannotCreateFolder), reason), + expectedBody: PrintErr(err), }) mp := mocks.NewMockProxy(t) @@ -303,17 +300,14 @@ func TestVectorCreateCollection(t *testing.T) { expectedBody: PrintErr(ErrDefault), }) - reason := "collection " + DefaultCollectionName + " already exists" + err := merr.WrapErrCollectionResourceLimitExceeded() mp2 := mocks.NewMockProxy(t) - mp2.EXPECT().CreateCollection(mock.Anything, mock.Anything).Return(&commonpb.Status{ - ErrorCode: commonpb.ErrorCode_CannotCreateFile, // 18 - Reason: reason, - }, nil).Once() + mp2.EXPECT().CreateCollection(mock.Anything, mock.Anything).Return(merr.Status(err), nil).Once() testCases = append(testCases, testCase{ name: "create collection fail", mp: mp2, exceptCode: 200, - expectedBody: Print(int32(commonpb.ErrorCode_CannotCreateFile), reason), + expectedBody: PrintErr(err), }) mp3 := mocks.NewMockProxy(t) @@ -380,18 +374,15 @@ func TestVectorDropCollection(t *testing.T) { expectedBody: PrintErr(ErrDefault), }) - reason := "cannot find collection " + DefaultCollectionName + err := merr.WrapErrCollectionNotFound(DefaultCollectionName) mp2 := mocks.NewMockProxy(t) mp2, _ = wrapWithHasCollection(t, mp2, ReturnTrue, 1, nil) - mp2.EXPECT().DropCollection(mock.Anything, mock.Anything).Return(&commonpb.Status{ - ErrorCode: commonpb.ErrorCode_CollectionNotExists, // 4 - Reason: reason, - }, nil).Once() + mp2.EXPECT().DropCollection(mock.Anything, mock.Anything).Return(merr.Status(err), nil).Once() testCases = append(testCases, testCase{ name: "drop collection fail", mp: mp2, exceptCode: 200, - expectedBody: Print(int32(commonpb.ErrorCode_CollectionNotExists), reason), + expectedBody: PrintErr(err), }) mp3 := mocks.NewMockProxy(t) @@ -433,20 +424,17 @@ func TestQuery(t *testing.T) { expectedBody: PrintErr(ErrDefault), }) - reason := DefaultCollectionName + " name not found" + err := merr.WrapErrCollectionNotFound(DefaultCollectionName) mp3 := mocks.NewMockProxy(t) mp3, _ = wrapWithDescribeColl(t, mp3, ReturnSuccess, 1, nil) mp3.EXPECT().Query(mock.Anything, mock.Anything).Return(&milvuspb.QueryResults{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_CollectionNameNotFound, // 28 - Reason: reason, - }, + Status: merr.Status(err), }, nil).Twice() testCases = append(testCases, testCase{ name: "query fail", mp: mp3, exceptCode: 200, - expectedBody: Print(int32(commonpb.ErrorCode_CollectionNameNotFound), reason), + expectedBody: PrintErr(err), }) mp4 := mocks.NewMockProxy(t) @@ -519,20 +507,17 @@ func TestDelete(t *testing.T) { expectedBody: PrintErr(ErrDefault), }) - reason := DefaultCollectionName + " name not found" + err := merr.WrapErrCollectionNotFound(DefaultCollectionName) mp3 := mocks.NewMockProxy(t) mp3, _ = wrapWithDescribeColl(t, mp3, ReturnSuccess, 1, nil) mp3.EXPECT().Delete(mock.Anything, mock.Anything).Return(&milvuspb.MutationResult{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_CollectionNameNotFound, // 28 - Reason: reason, - }, + Status: merr.Status(err), }, nil).Once() testCases = append(testCases, testCase{ name: "delete fail", mp: mp3, exceptCode: 200, - expectedBody: Print(int32(commonpb.ErrorCode_CollectionNameNotFound), reason), + expectedBody: PrintErr(err), }) mp4 := mocks.NewMockProxy(t) @@ -581,20 +566,17 @@ func TestInsert(t *testing.T) { expectedBody: PrintErr(ErrDefault), }) - reason := DefaultCollectionName + " name not found" + err := merr.WrapErrCollectionNotFound(DefaultCollectionName) mp3 := mocks.NewMockProxy(t) mp3, _ = wrapWithDescribeColl(t, mp3, ReturnSuccess, 1, nil) mp3.EXPECT().Insert(mock.Anything, mock.Anything).Return(&milvuspb.MutationResult{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_CollectionNameNotFound, // 28 - Reason: reason, - }, + Status: merr.Status(err), }, nil).Once() testCases = append(testCases, testCase{ name: "insert fail", mp: mp3, exceptCode: 200, - expectedBody: Print(int32(commonpb.ErrorCode_CollectionNameNotFound), reason), + expectedBody: PrintErr(err), }) mp4 := mocks.NewMockProxy(t) @@ -775,19 +757,16 @@ func TestSearch(t *testing.T) { expectedBody: PrintErr(ErrDefault), }) - reason := DefaultCollectionName + " name not found" + err := merr.WrapErrCollectionNotFound(DefaultCollectionName) mp3 := mocks.NewMockProxy(t) mp3.EXPECT().Search(mock.Anything, mock.Anything).Return(&milvuspb.SearchResults{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_CollectionNameNotFound, // 28 - Reason: reason, - }, + Status: merr.Status(err), }, nil).Once() testCases = append(testCases, testCase{ name: "search fail", mp: mp3, exceptCode: 200, - expectedBody: Print(int32(commonpb.ErrorCode_CollectionNameNotFound), reason), + expectedBody: PrintErr(err), }) mp4 := mocks.NewMockProxy(t) @@ -861,17 +840,15 @@ func wrapWithDescribeColl(t *testing.T, mp *mocks.MockProxy, returnType ReturnTy expectedBody: PrintErr(ErrDefault), } case ReturnWrongStatus: + err := merr.WrapErrCollectionNotFound(DefaultCollectionName) call = mp.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(&milvuspb.DescribeCollectionResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_CollectionNotExists, - Reason: "can't find collection: " + DefaultCollectionName, - }, + Status: merr.Status(err), }, nil) testcase = testCase{ name: "[share] collection not found", mp: mp, exceptCode: 200, - expectedBody: "{\"code\":4,\"message\":\"can't find collection: " + DefaultCollectionName + "\"}", + expectedBody: PrintErr(err), } } if times == 2 { @@ -917,18 +894,15 @@ func wrapWithHasCollection(t *testing.T, mp *mocks.MockProxy, returnType ReturnT expectedBody: PrintErr(ErrDefault), } case ReturnWrongStatus: - reason := "can't find collection: " + DefaultCollectionName + err := merr.WrapErrCollectionNotFound(DefaultCollectionName) call = mp.EXPECT().HasCollection(mock.Anything, mock.Anything).Return(&milvuspb.BoolResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, // 1 - Reason: reason, - }, + Status: merr.Status(err), }, nil) testcase = testCase{ name: "[share] unexpected error", mp: mp, exceptCode: 200, - expectedBody: Print(int32(commonpb.ErrorCode_UnexpectedError), reason), + expectedBody: PrintErr(err), } } if times == 2 { @@ -1001,7 +975,7 @@ func TestHttpRequestFormat(t *testing.T) { func TestAuthorization(t *testing.T) { paramtable.Init() paramtable.Get().Save(proxy.Params.CommonCfg.AuthorizationEnabled.Key, "true") - errorStr := Print(Code(merr.ErrServiceUnavailable), "internal: Milvus Proxy is not ready yet. please wait: service unavailable") + errorStr := Print(merr.Code(merr.ErrServiceUnavailable), "internal: Milvus Proxy is not ready yet. please wait: service unavailable") jsons := map[string][]byte{ errorStr: []byte(`{"collectionName": "` + DefaultCollectionName + `", "vector": [0.1, 0.2], "filter": "id in [2]", "id": [2], "dimension": 2, "data":[{"book_id":1,"book_intro":[0.1,0.11],"distance":0.01,"word_count":1000},{"book_id":2,"book_intro":[0.2,0.22],"distance":0.04,"word_count":2000},{"book_id":3,"book_intro":[0.3,0.33],"distance":0.09,"word_count":3000}]}`), } @@ -1130,11 +1104,9 @@ func TestDatabaseNotFound(t *testing.T) { t.Run("list database without success code", func(t *testing.T) { mp := mocks.NewMockProxy(t) + err := errors.New("unexpected error") mp.EXPECT().ListDatabases(mock.Anything, mock.Anything).Return(&milvuspb.ListDatabasesResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "", - }, + Status: merr.Status(err), }, nil).Once() testEngine := initHTTPServer(mp, true) req := httptest.NewRequest(http.MethodGet, URIPrefix+VectorCollectionsPath+"?dbName=test", nil) @@ -1142,7 +1114,7 @@ func TestDatabaseNotFound(t *testing.T) { w := httptest.NewRecorder() testEngine.ServeHTTP(w, req) assert.Equal(t, w.Code, http.StatusOK) - assert.Equal(t, w.Body.String(), Print(int32(commonpb.ErrorCode_UnexpectedError), "")) + assert.Equal(t, w.Body.String(), PrintErr(err)) }) t.Run("list database success", func(t *testing.T) { @@ -1366,7 +1338,7 @@ func Test_Handles_VectorCollectionsDescribe(t *testing.T) { w := httptest.NewRecorder() testEngine.ServeHTTP(w, req) assert.Equal(t, w.Code, http.StatusForbidden) - assert.Equal(t, w.Body.String(), Print(Code(merr.ErrServiceUnavailable), "internal: Milvus Proxy is not ready yet. please wait: service unavailable")) + assert.Equal(t, w.Body.String(), Print(merr.Code(merr.ErrServiceUnavailable), "internal: Milvus Proxy is not ready yet. please wait: service unavailable")) }) t.Run("describe collection fail with error", func(t *testing.T) { @@ -1384,11 +1356,12 @@ func Test_Handles_VectorCollectionsDescribe(t *testing.T) { }) t.Run("describe collection fail with status code", func(t *testing.T) { + err := merr.WrapErrDatabaseNotFound(DefaultDbName) paramtable.Get().Save(proxy.Params.CommonCfg.AuthorizationEnabled.Key, "false") mp.EXPECT(). DescribeCollection(mock.Anything, mock.Anything). Return(&milvuspb.DescribeCollectionResponse{ - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}, + Status: merr.Status(err), }, nil). Once() req := httptest.NewRequest(http.MethodGet, "/vector/collections/describe?collectionName=book", nil) @@ -1396,7 +1369,7 @@ func Test_Handles_VectorCollectionsDescribe(t *testing.T) { w := httptest.NewRecorder() testEngine.ServeHTTP(w, req) assert.Equal(t, w.Code, http.StatusOK) - assert.Equal(t, w.Body.String(), "{\"code\":1,\"message\":\"\"}") + assert.Equal(t, w.Body.String(), PrintErr(err)) }) t.Run("get load state and describe index fail with error", func(t *testing.T) { diff --git a/internal/distributed/proxy/httpserver/utils.go b/internal/distributed/proxy/httpserver/utils.go index e7ca9f89d0..21ed8f053e 100644 --- a/internal/distributed/proxy/httpserver/utils.go +++ b/internal/distributed/proxy/httpserver/utils.go @@ -23,7 +23,6 @@ import ( "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/funcutil" - "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/parameterutil.go" ) @@ -856,9 +855,3 @@ func buildQueryResp(rowsNum int64, needFields []string, fieldDataList []*schemap return queryResp, nil } - -// --------------------- error code --------------------- // - -func Code(err error) int32 { - return merr.Code(err) -} diff --git a/internal/distributed/proxy/service.go b/internal/distributed/proxy/service.go index 671600c46e..50b543e1eb 100644 --- a/internal/distributed/proxy/service.go +++ b/internal/distributed/proxy/service.go @@ -127,7 +127,7 @@ func authenticate(c *gin.Context) { return } } - c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{httpserver.HTTPReturnCode: httpserver.Code(merr.ErrNeedAuthenticate), httpserver.HTTPReturnMessage: merr.ErrNeedAuthenticate.Error()}) + c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{httpserver.HTTPReturnCode: merr.Code(merr.ErrNeedAuthenticate), httpserver.HTTPReturnMessage: merr.ErrNeedAuthenticate.Error()}) } // registerHTTPServer register the http server, panic when failed diff --git a/internal/proto/planpb/plan.pb.go b/internal/proto/planpb/plan.pb.go index 2edfb4c927..06de0040e9 100644 --- a/internal/proto/planpb/plan.pb.go +++ b/internal/proto/planpb/plan.pb.go @@ -237,7 +237,6 @@ func (BinaryExpr_BinaryOp) EnumDescriptor() ([]byte, []int) { type GenericValue struct { // Types that are valid to be assigned to Val: - // // *GenericValue_BoolVal // *GenericValue_Int64Val // *GenericValue_FloatVal @@ -1298,7 +1297,6 @@ var xxx_messageInfo_AlwaysTrueExpr proto.InternalMessageInfo type Expr struct { // Types that are valid to be assigned to Expr: - // // *Expr_TermExpr // *Expr_UnaryExpr // *Expr_BinaryExpr @@ -1670,7 +1668,6 @@ func (m *QueryPlanNode) GetLimit() int64 { type PlanNode struct { // Types that are valid to be assigned to Node: - // // *PlanNode_VectorAnns // *PlanNode_Predicates // *PlanNode_Query diff --git a/internal/proto/proxypb/proxy.pb.go b/internal/proto/proxypb/proxy.pb.go index c5cf24fd78..60ba4e6a11 100644 --- a/internal/proto/proxypb/proxy.pb.go +++ b/internal/proto/proxypb/proxy.pb.go @@ -29,9 +29,8 @@ const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package type InvalidateCollMetaCacheRequest struct { // MsgType: - // - // DropCollection -> {meta cache, dml channels} - // Other -> {meta cache} + // DropCollection -> {meta cache, dml channels} + // Other -> {meta cache} Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"` DbName string `protobuf:"bytes,2,opt,name=db_name,json=dbName,proto3" json:"db_name,omitempty"` CollectionName string `protobuf:"bytes,3,opt,name=collection_name,json=collectionName,proto3" json:"collection_name,omitempty"` diff --git a/internal/proto/rootcoordpb/root_coord.pb.go b/internal/proto/rootcoordpb/root_coord.pb.go index ff74594ed2..7d16956e9b 100644 --- a/internal/proto/rootcoordpb/root_coord.pb.go +++ b/internal/proto/rootcoordpb/root_coord.pb.go @@ -793,28 +793,28 @@ type RootCoordClient interface { GetComponentStates(ctx context.Context, in *milvuspb.GetComponentStatesRequest, opts ...grpc.CallOption) (*milvuspb.ComponentStates, error) GetTimeTickChannel(ctx context.Context, in *internalpb.GetTimeTickChannelRequest, opts ...grpc.CallOption) (*milvuspb.StringResponse, error) GetStatisticsChannel(ctx context.Context, in *internalpb.GetStatisticsChannelRequest, opts ...grpc.CallOption) (*milvuspb.StringResponse, error) - // * + //* // @brief This method is used to create collection // // @param CreateCollectionRequest, use to provide collection information to be created. // // @return Status CreateCollection(ctx context.Context, in *milvuspb.CreateCollectionRequest, opts ...grpc.CallOption) (*commonpb.Status, error) - // * + //* // @brief This method is used to delete collection. // // @param DropCollectionRequest, collection name is going to be deleted. // // @return Status DropCollection(ctx context.Context, in *milvuspb.DropCollectionRequest, opts ...grpc.CallOption) (*commonpb.Status, error) - // * + //* // @brief This method is used to test collection existence. // // @param HasCollectionRequest, collection name is going to be tested. // // @return BoolResponse HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequest, opts ...grpc.CallOption) (*milvuspb.BoolResponse, error) - // * + //* // @brief This method is used to get collection schema. // // @param DescribeCollectionRequest, target collection name. @@ -825,28 +825,28 @@ type RootCoordClient interface { CreateAlias(ctx context.Context, in *milvuspb.CreateAliasRequest, opts ...grpc.CallOption) (*commonpb.Status, error) DropAlias(ctx context.Context, in *milvuspb.DropAliasRequest, opts ...grpc.CallOption) (*commonpb.Status, error) AlterAlias(ctx context.Context, in *milvuspb.AlterAliasRequest, opts ...grpc.CallOption) (*commonpb.Status, error) - // * + //* // @brief This method is used to list all collections. // // @return StringListResponse, collection name list ShowCollections(ctx context.Context, in *milvuspb.ShowCollectionsRequest, opts ...grpc.CallOption) (*milvuspb.ShowCollectionsResponse, error) AlterCollection(ctx context.Context, in *milvuspb.AlterCollectionRequest, opts ...grpc.CallOption) (*commonpb.Status, error) - // * + //* // @brief This method is used to create partition // // @return Status CreatePartition(ctx context.Context, in *milvuspb.CreatePartitionRequest, opts ...grpc.CallOption) (*commonpb.Status, error) - // * + //* // @brief This method is used to drop partition // // @return Status DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequest, opts ...grpc.CallOption) (*commonpb.Status, error) - // * + //* // @brief This method is used to test partition existence. // // @return BoolResponse HasPartition(ctx context.Context, in *milvuspb.HasPartitionRequest, opts ...grpc.CallOption) (*milvuspb.BoolResponse, error) - // * + //* // @brief This method is used to show partition information // // @param ShowPartitionRequest, target collection name. @@ -854,7 +854,7 @@ type RootCoordClient interface { // @return StringListResponse ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionsRequest, opts ...grpc.CallOption) (*milvuspb.ShowPartitionsResponse, error) ShowPartitionsInternal(ctx context.Context, in *milvuspb.ShowPartitionsRequest, opts ...grpc.CallOption) (*milvuspb.ShowPartitionsResponse, error) - // rpc DescribeSegment(milvus.DescribeSegmentRequest) returns (milvus.DescribeSegmentResponse) {} + // rpc DescribeSegment(milvus.DescribeSegmentRequest) returns (milvus.DescribeSegmentResponse) {} ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsRequest, opts ...grpc.CallOption) (*milvuspb.ShowSegmentsResponse, error) AllocTimestamp(ctx context.Context, in *AllocTimestampRequest, opts ...grpc.CallOption) (*AllocTimestampResponse, error) AllocID(ctx context.Context, in *AllocIDRequest, opts ...grpc.CallOption) (*AllocIDResponse, error) @@ -1327,28 +1327,28 @@ type RootCoordServer interface { GetComponentStates(context.Context, *milvuspb.GetComponentStatesRequest) (*milvuspb.ComponentStates, error) GetTimeTickChannel(context.Context, *internalpb.GetTimeTickChannelRequest) (*milvuspb.StringResponse, error) GetStatisticsChannel(context.Context, *internalpb.GetStatisticsChannelRequest) (*milvuspb.StringResponse, error) - // * + //* // @brief This method is used to create collection // // @param CreateCollectionRequest, use to provide collection information to be created. // // @return Status CreateCollection(context.Context, *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) - // * + //* // @brief This method is used to delete collection. // // @param DropCollectionRequest, collection name is going to be deleted. // // @return Status DropCollection(context.Context, *milvuspb.DropCollectionRequest) (*commonpb.Status, error) - // * + //* // @brief This method is used to test collection existence. // // @param HasCollectionRequest, collection name is going to be tested. // // @return BoolResponse HasCollection(context.Context, *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) - // * + //* // @brief This method is used to get collection schema. // // @param DescribeCollectionRequest, target collection name. @@ -1359,28 +1359,28 @@ type RootCoordServer interface { CreateAlias(context.Context, *milvuspb.CreateAliasRequest) (*commonpb.Status, error) DropAlias(context.Context, *milvuspb.DropAliasRequest) (*commonpb.Status, error) AlterAlias(context.Context, *milvuspb.AlterAliasRequest) (*commonpb.Status, error) - // * + //* // @brief This method is used to list all collections. // // @return StringListResponse, collection name list ShowCollections(context.Context, *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) AlterCollection(context.Context, *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) - // * + //* // @brief This method is used to create partition // // @return Status CreatePartition(context.Context, *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) - // * + //* // @brief This method is used to drop partition // // @return Status DropPartition(context.Context, *milvuspb.DropPartitionRequest) (*commonpb.Status, error) - // * + //* // @brief This method is used to test partition existence. // // @return BoolResponse HasPartition(context.Context, *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) - // * + //* // @brief This method is used to show partition information // // @param ShowPartitionRequest, target collection name. @@ -1388,7 +1388,7 @@ type RootCoordServer interface { // @return StringListResponse ShowPartitions(context.Context, *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) ShowPartitionsInternal(context.Context, *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) - // rpc DescribeSegment(milvus.DescribeSegmentRequest) returns (milvus.DescribeSegmentResponse) {} + // rpc DescribeSegment(milvus.DescribeSegmentRequest) returns (milvus.DescribeSegmentResponse) {} ShowSegments(context.Context, *milvuspb.ShowSegmentsRequest) (*milvuspb.ShowSegmentsResponse, error) AllocTimestamp(context.Context, *AllocTimestampRequest) (*AllocTimestampResponse, error) AllocID(context.Context, *AllocIDRequest) (*AllocIDResponse, error) diff --git a/internal/proxy/channels_mgr.go b/internal/proxy/channels_mgr.go index 5ab4e58612..3ecdce564c 100644 --- a/internal/proxy/channels_mgr.go +++ b/internal/proxy/channels_mgr.go @@ -23,7 +23,6 @@ import ( "strconv" "sync" - "github.com/cockroachdb/errors" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -33,6 +32,7 @@ import ( "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/util/commonpbutil" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -104,7 +104,7 @@ func getDmlChannelsFunc(ctx context.Context, rc types.RootCoordClient) getChanne log.Error("failed to describe collection", zap.String("error_code", resp.GetStatus().GetErrorCode().String()), zap.String("reason", resp.GetStatus().GetReason())) - return channelInfos{}, errors.New(resp.GetStatus().GetReason()) + return channelInfos{}, merr.Error(resp.GetStatus()) } return newChannels(resp.GetVirtualChannelNames(), resp.GetPhysicalChannelNames()) diff --git a/internal/proxy/task.go b/internal/proxy/task.go index f1a33a643e..bdb6b45bd7 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -443,7 +443,7 @@ func (hct *hasCollectionTask) Execute(ctx context.Context) error { return errors.New("has collection resp is nil") } if hct.result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - return errors.New(hct.result.GetStatus().GetReason()) + return merr.Error(hct.result.GetStatus()) } return nil } @@ -657,7 +657,7 @@ func (sct *showCollectionsTask) Execute(ctx context.Context) error { } if respFromRootCoord.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - return errors.New(respFromRootCoord.GetStatus().GetReason()) + return merr.Error(respFromRootCoord.GetStatus()) } if sct.GetType() == milvuspb.ShowType_InMemory { @@ -1071,7 +1071,7 @@ func (hpt *hasPartitionTask) Execute(ctx context.Context) (err error) { return err } if hpt.result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - return errors.New(hpt.result.GetStatus().GetReason()) + return merr.Error(hpt.result.GetStatus()) } return err } @@ -1156,7 +1156,7 @@ func (spt *showPartitionsTask) Execute(ctx context.Context) error { } if respFromRootCoord.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - return errors.New(respFromRootCoord.GetStatus().GetReason()) + return merr.Error(respFromRootCoord.GetStatus()) } if spt.GetType() == milvuspb.ShowType_InMemory { @@ -1200,7 +1200,7 @@ func (spt *showPartitionsTask) Execute(ctx context.Context) error { } if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - return errors.New(resp.GetStatus().GetReason()) + return merr.Error(resp.GetStatus()) } spt.result = &milvuspb.ShowPartitionsResponse{ @@ -1316,7 +1316,7 @@ func (ft *flushTask) Execute(ctx context.Context) error { return fmt.Errorf("failed to call flush to data coordinator: %s", err.Error()) } if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - return errors.New(resp.GetStatus().GetReason()) + return merr.Error(resp.GetStatus()) } coll2Segments[collName] = &schemapb.LongArray{Data: resp.GetSegmentIDs()} flushColl2Segments[collName] = &schemapb.LongArray{Data: resp.GetFlushSegmentIDs()} @@ -1432,7 +1432,7 @@ func (lct *loadCollectionTask) Execute(ctx context.Context) (err error) { return err } if indexResponse.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - return errors.New(indexResponse.GetStatus().GetReason()) + return merr.Error(indexResponse.GetStatus()) } hasVecIndex := false @@ -1660,7 +1660,7 @@ func (lpt *loadPartitionsTask) Execute(ctx context.Context) error { return err } if indexResponse.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - return errors.New(indexResponse.GetStatus().GetReason()) + return merr.Error(indexResponse.GetStatus()) } hasVecIndex := false diff --git a/internal/proxy/task_index.go b/internal/proxy/task_index.go index a1d13b9f6e..cc0229bb07 100644 --- a/internal/proxy/task_index.go +++ b/internal/proxy/task_index.go @@ -497,7 +497,7 @@ func (dit *describeIndexTask) Execute(ctx context.Context) error { dit.result = &milvuspb.DescribeIndexResponse{} dit.result.Status = resp.GetStatus() if dit.result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - return errors.New(dit.result.GetStatus().GetReason()) + return merr.Error(dit.result.GetStatus()) } for _, indexInfo := range resp.IndexInfos { field, err := schemaHelper.GetFieldFromID(indexInfo.FieldID) @@ -617,7 +617,7 @@ func (dit *getIndexStatisticsTask) Execute(ctx context.Context) error { dit.result = &milvuspb.GetIndexStatisticsResponse{} dit.result.Status = resp.GetStatus() if dit.result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - return errors.New(dit.result.GetStatus().GetReason()) + return merr.Error(dit.result.GetStatus()) } for _, indexInfo := range resp.IndexInfos { field, err := schemaHelper.GetFieldFromID(indexInfo.FieldID) diff --git a/internal/proxy/util.go b/internal/proxy/util.go index dd5f35ec1f..18d518f1ca 100644 --- a/internal/proxy/util.go +++ b/internal/proxy/util.go @@ -177,24 +177,24 @@ func ValidateResourceGroupName(entity string) error { func ValidateDatabaseName(dbName string) error { if dbName == "" { - return merr.WrapErrInvalidedDatabaseName(dbName, "database name couldn't be empty") + return merr.WrapErrDatabaseNameInvalid(dbName, "database name couldn't be empty") } if len(dbName) > Params.ProxyCfg.MaxNameLength.GetAsInt() { - return merr.WrapErrInvalidedDatabaseName(dbName, + return merr.WrapErrDatabaseNameInvalid(dbName, fmt.Sprintf("the length of a database name must be less than %d characters", Params.ProxyCfg.MaxNameLength.GetAsInt())) } firstChar := dbName[0] if firstChar != '_' && !isAlpha(firstChar) { - return merr.WrapErrInvalidedDatabaseName(dbName, + return merr.WrapErrDatabaseNameInvalid(dbName, "the first character of a database name must be an underscore or letter") } for i := 1; i < len(dbName); i++ { c := dbName[i] if c != '_' && !isAlpha(c) && !isNumber(c) { - return merr.WrapErrInvalidedDatabaseName(dbName, + return merr.WrapErrDatabaseNameInvalid(dbName, "database name can only contain numbers, letters and underscores") } } @@ -1053,7 +1053,7 @@ func isCollectionLoaded(ctx context.Context, qc types.QueryCoordClient, collID i return false, err } if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - return false, errors.New(resp.GetStatus().GetReason()) + return false, merr.Error(resp.GetStatus()) } for _, loadedCollID := range resp.GetCollectionIDs() { @@ -1074,7 +1074,7 @@ func isPartitionLoaded(ctx context.Context, qc types.QueryCoordClient, collID in return false, err } if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - return false, errors.New(resp.GetStatus().GetReason()) + return false, merr.Error(resp.GetStatus()) } for _, loadedPartID := range resp.GetPartitionIDs() { diff --git a/internal/querycoordv2/meta/coordinator_broker.go b/internal/querycoordv2/meta/coordinator_broker.go index da10650317..5153f46bd9 100644 --- a/internal/querycoordv2/meta/coordinator_broker.go +++ b/internal/querycoordv2/meta/coordinator_broker.go @@ -21,7 +21,6 @@ import ( "fmt" "time" - "github.com/cockroachdb/errors" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -130,7 +129,7 @@ func (broker *CoordinatorBroker) GetRecoveryInfo(ctx context.Context, collection } if recoveryInfo.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - err = errors.New(recoveryInfo.GetStatus().GetReason()) + err = merr.Error(recoveryInfo.GetStatus()) log.Warn("get recovery info failed", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.Error(err)) return nil, nil, err } @@ -156,7 +155,7 @@ func (broker *CoordinatorBroker) GetRecoveryInfoV2(ctx context.Context, collecti } if recoveryInfo.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - err = errors.New(recoveryInfo.GetStatus().GetReason()) + err = merr.Error(recoveryInfo.GetStatus()) log.Warn("get recovery info failed", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs), zap.Error(err)) return nil, nil, err } diff --git a/internal/rootcoord/broker.go b/internal/rootcoord/broker.go index b7d4b6ce40..fd97429e10 100644 --- a/internal/rootcoord/broker.go +++ b/internal/rootcoord/broker.go @@ -32,6 +32,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/commonpbutil" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -202,7 +203,7 @@ func (b *ServerBroker) Flush(ctx context.Context, cID int64, segIDs []int64) err return errors.New("failed to call flush to data coordinator: " + err.Error()) } if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - return errors.New(resp.GetStatus().GetReason()) + return merr.Error(resp.GetStatus()) } log.Info("flush on collection succeed", zap.Int64("collectionID", cID)) return nil @@ -251,7 +252,7 @@ func (b *ServerBroker) GetSegmentIndexState(ctx context.Context, collID UniqueID return nil, err } if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - return nil, errors.New(resp.GetStatus().GetReason()) + return nil, merr.Error(resp.GetStatus()) } return resp.GetStates(), nil diff --git a/internal/rootcoord/proxy_client_manager.go b/internal/rootcoord/proxy_client_manager.go index 4e3bee85d4..3b5af86fc5 100644 --- a/internal/rootcoord/proxy_client_manager.go +++ b/internal/rootcoord/proxy_client_manager.go @@ -21,7 +21,6 @@ import ( "fmt" "sync" - "github.com/cockroachdb/errors" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -33,6 +32,7 @@ import ( "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metricsinfo" ) @@ -239,7 +239,7 @@ func (p *proxyClientManager) RefreshPolicyInfoCache(ctx context.Context, req *pr return fmt.Errorf("RefreshPolicyInfoCache failed, proxyID = %d, err = %s", k, err) } if status.GetErrorCode() != commonpb.ErrorCode_Success { - return errors.New(status.GetReason()) + return merr.Error(status) } return nil }) diff --git a/internal/util/componentutil/componentutil.go b/internal/util/componentutil/componentutil.go index 20295b1801..96b74732d1 100644 --- a/internal/util/componentutil/componentutil.go +++ b/internal/util/componentutil/componentutil.go @@ -21,11 +21,11 @@ import ( "fmt" "time" - "github.com/cockroachdb/errors" "google.golang.org/grpc" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/retry" ) @@ -40,7 +40,7 @@ func WaitForComponentStates[T interface { } if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - return errors.New(resp.GetStatus().GetReason()) + return merr.Error(resp.GetStatus()) } meet := false diff --git a/pkg/util/funcutil/verify_response.go b/pkg/util/funcutil/verify_response.go deleted file mode 100644 index d23cfd644c..0000000000 --- a/pkg/util/funcutil/verify_response.go +++ /dev/null @@ -1,49 +0,0 @@ -package funcutil - -import ( - "github.com/cockroachdb/errors" - - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" -) - -// errors for VerifyResponse -var errNilResponse = errors.New("response is nil") - -var errNilStatusResponse = errors.New("response has nil status") - -var errUnknownResponseType = errors.New("unknown response type") - -// Response response interface for verification -type Response interface { - GetStatus() *commonpb.Status -} - -// VerifyResponse verify grpc Response 1. check error is nil 2. check response.GetStatus() with status success -func VerifyResponse(response interface{}, err error) error { - if err != nil { - return err - } - if response == nil { - return errNilResponse - } - switch resp := response.(type) { - case Response: - // note that resp will not be nil here, since it's still an interface - if resp.GetStatus() == nil { - return errNilStatusResponse - } - if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - return errors.New(resp.GetStatus().GetReason()) - } - case *commonpb.Status: - if resp == nil { - return errNilResponse - } - if resp.ErrorCode != commonpb.ErrorCode_Success { - return errors.New(resp.GetReason()) - } - default: - return errUnknownResponseType - } - return nil -} diff --git a/pkg/util/merr/utils.go b/pkg/util/merr/utils.go index 6da59299ca..a22155e5c2 100644 --- a/pkg/util/merr/utils.go +++ b/pkg/util/merr/utils.go @@ -78,6 +78,23 @@ func Status(err error) *commonpb.Status { } } +func CheckRpcCall(resp any, err error) error { + if err != nil { + return err + } + if resp == nil { + return errUnexpected + } + switch resp := resp.(type) { + case interface{ GetStatus() *commonpb.Status }: + return Error(resp.GetStatus()) + case *commonpb.Status: + return Error(resp) + } + return nil +} + +// Deprecated func StatusWithErrorCode(err error, code commonpb.ErrorCode) *commonpb.Status { if err == nil { return &commonpb.Status{} @@ -289,7 +306,7 @@ func WrapErrDatabaseResourceLimitExceeded(msg ...string) error { return err } -func WrapErrInvalidedDatabaseName(database any, msg ...string) error { +func WrapErrDatabaseNameInvalid(database any, msg ...string) error { err := wrapWithField(ErrDatabaseInvalidName, "database", database) if len(msg) > 0 { err = errors.Wrap(err, strings.Join(msg, "; ")) diff --git a/tests/python_client/README.md b/tests/python_client/README.md index f1a583f9d7..98424e61b0 100644 --- a/tests/python_client/README.md +++ b/tests/python_client/README.md @@ -227,7 +227,7 @@ assert self.partition_wrap.is_empty # drop collection collection_w.drop() # create partition failed - self.partition_wrap.init_partition(collection_w.collection, partition_name, check_task=CheckTasks.err_res, check_items={ct.err_code: 1, ct.err_msg: "can't find collection"}) + self.partition_wrap.init_partition(collection_w.collection, partition_name, check_task=CheckTasks.err_res, check_items={ct.err_code: 4, ct.err_msg: "collection not found"}) ``` - Tips diff --git a/tests/python_client/testcases/test_collection.py b/tests/python_client/testcases/test_collection.py index 5ed572a60c..b4976c360c 100644 --- a/tests/python_client/testcases/test_collection.py +++ b/tests/python_client/testcases/test_collection.py @@ -1068,7 +1068,7 @@ class TestCollectionOperation(TestcaseBase): check_items={exp_name: c_name, exp_schema: default_schema}) self.collection_wrap.drop() assert not self.utility_wrap.has_collection(c_name)[0] - error = {ct.err_code: 1, ct.err_msg: f'HasPartition failed: collection not found: {c_name}'} + error = {ct.err_code: 4, ct.err_msg: 'collection not found'} collection_w.has_partition("p", check_task=CheckTasks.err_res, check_items=error) @pytest.mark.tags(CaseLabel.L1) @@ -3030,7 +3030,7 @@ class TestLoadCollection(TestcaseBase): collection_w = self.init_collection_general(prefix, True, is_index=False)[0] collection_w.load(check_task=CheckTasks.err_res, check_items={"err_code": 1, - "err_msg": "index not exist"}) + "err_msg": "index not found"}) class TestDescribeCollection(TestcaseBase): diff --git a/tests/python_client/testcases/test_partition.py b/tests/python_client/testcases/test_partition.py index 7e408568a3..3f4acc3e25 100644 --- a/tests/python_client/testcases/test_partition.py +++ b/tests/python_client/testcases/test_partition.py @@ -595,7 +595,7 @@ class TestPartitionOperations(TestcaseBase): # create partition failed self.partition_wrap.init_partition(collection_w.collection, cf.gen_unique_str(prefix), check_task=CheckTasks.err_res, - check_items={ct.err_code: 1, ct.err_msg: "can't find collection"}) + check_items={ct.err_code: 4, ct.err_msg: "collection not found"}) @pytest.mark.tags(CaseLabel.L2) def test_partition_same_name_in_diff_collections(self): @@ -849,7 +849,7 @@ class TestPartitionOperations(TestcaseBase): # release the partition and check err response partition_w.release(check_task=CheckTasks.err_res, - check_items={ct.err_code: 1, ct.err_msg: "can't find collection"}) + check_items={ct.err_code: 4, ct.err_msg: "collection not found"}) @pytest.mark.tags(CaseLabel.L1) def test_partition_release_after_collection_released(self): @@ -963,7 +963,7 @@ class TestPartitionOperations(TestcaseBase): # insert data to partition partition_w.insert(cf.gen_default_dataframe_data(), check_task=CheckTasks.err_res, - check_items={ct.err_code: 1, ct.err_msg: "None Type"}) + check_items={ct.err_code: 4, ct.err_msg: "collection not found"}) @pytest.mark.tags(CaseLabel.L2) def test_partition_insert_maximum_size_data(self): diff --git a/tests/python_client/testcases/test_utility.py b/tests/python_client/testcases/test_utility.py index 1a183716d2..6fb6746df7 100644 --- a/tests/python_client/testcases/test_utility.py +++ b/tests/python_client/testcases/test_utility.py @@ -809,7 +809,7 @@ class TestUtilityBase(TestcaseBase): self.utility_wrap.index_building_progress( c_name, check_task=CheckTasks.err_res, - check_items={ct.err_code: 1, ct.err_msg: "can't find collection"}) + check_items={ct.err_code: 4, ct.err_msg: "collection not found"}) @pytest.mark.tags(CaseLabel.L1) def test_index_process_collection_empty(self): @@ -898,7 +898,7 @@ class TestUtilityBase(TestcaseBase): self.utility_wrap.wait_for_index_building_complete( c_name, check_task=CheckTasks.err_res, - check_items={ct.err_code: 1, ct.err_msg: "can't find collection"}) + check_items={ct.err_code: 4, ct.err_msg: "collection not found"}) @pytest.mark.tags(CaseLabel.L1) def test_wait_index_collection_empty(self): @@ -4721,7 +4721,7 @@ class TestUtilityNegativeRbac(TestcaseBase): # collection flush with db_b permission self.database_wrap.using_database(db_b) collection_w.flush(check_task=CheckTasks.err_res, - check_items={ct.err_code: 1, ct.err_msg: "can't find collection"}) + check_items={ct.err_code: 4, ct.err_msg: "collection not found"}) self.database_wrap.using_database(db_a) collection_w.flush(check_task=CheckTasks.check_permission_deny)