diff --git a/internal/datacoord/index_builder.go b/internal/datacoord/index_builder.go index c5a7552542..514c5ca189 100644 --- a/internal/datacoord/index_builder.go +++ b/internal/datacoord/index_builder.go @@ -353,7 +353,7 @@ func (ib *indexBuilder) getTaskState(buildID, nodeID UniqueID) indexTaskState { zap.Error(err)) return indexTaskInProgress } - if response.Status.ErrorCode != commonpb.ErrorCode_Success { + if response.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { log.Ctx(ib.ctx).Warn("IndexCoord get jobs info from IndexNode fail", zap.Int64("nodeID", nodeID), zap.Int64("buildID", buildID), zap.String("fail reason", response.Status.Reason)) return indexTaskInProgress diff --git a/internal/datacoord/indexnode_manager.go b/internal/datacoord/indexnode_manager.go index ee676c975d..0b81f22673 100644 --- a/internal/datacoord/indexnode_manager.go +++ b/internal/datacoord/indexnode_manager.go @@ -123,7 +123,7 @@ func (nm *IndexNodeManager) PeekClient(meta *model.SegmentIndex) (UniqueID, type log.Warn("get IndexNode slots failed", zap.Int64("nodeID", nodeID), zap.Error(err)) return } - if resp.Status.ErrorCode != commonpb.ErrorCode_Success { + if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { log.Warn("get IndexNode slots failed", zap.Int64("nodeID", nodeID), zap.String("reason", resp.Status.Reason)) return @@ -179,7 +179,7 @@ func (nm *IndexNodeManager) ClientSupportDisk() bool { log.Warn("get IndexNode slots failed", zap.Int64("nodeID", nodeID), zap.Error(err)) return } - if resp.Status.ErrorCode != commonpb.ErrorCode_Success { + if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { log.Warn("get IndexNode slots failed", zap.Int64("nodeID", nodeID), zap.String("reason", resp.Status.Reason)) return diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index c5716a96ad..c4f381edaa 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -512,7 +512,7 @@ func (node *DataNode) Start() error { ), Count: 1, }) - if err != nil || rep.Status.ErrorCode != commonpb.ErrorCode_Success { + if err != nil || rep.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { log.Warn("fail to alloc timestamp", zap.Any("rep", rep), zap.Error(err)) startErr = errors.New("DataNode fail to alloc timestamp") return diff --git a/internal/datanode/services.go b/internal/datanode/services.go index c70eb207ef..5b115bdb19 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -466,7 +466,7 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest) Count: 1, }) - if rep.Status.ErrorCode != commonpb.ErrorCode_Success || err != nil { + if rep.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success || err != nil { return returnFailFunc("DataNode alloc ts failed", err) } @@ -539,7 +539,7 @@ func (node *DataNode) getPartitions(ctx context.Context, dbName string, collecti log.Warn("failed to get partitions of collection", logFields...) return nil, err } - if resp.Status.ErrorCode != commonpb.ErrorCode_Success { + if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { log.Warn("failed to get partitions of collection", logFields...) return nil, errors.New(resp.Status.Reason) } @@ -683,7 +683,7 @@ func assignSegmentFunc(node *DataNode, req *datapb.ImportTaskRequest) importutil if err != nil { return 0, "", fmt.Errorf("syncSegmentID Failed:%w", err) } - if resp.Status.ErrorCode != commonpb.ErrorCode_Success { + if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { return 0, "", fmt.Errorf("syncSegmentID Failed:%s", resp.Status.Reason) } if len(resp.SegIDAssignments) == 0 || resp.SegIDAssignments[0] == nil { diff --git a/internal/distributed/proxy/httpserver/handler_v1.go b/internal/distributed/proxy/httpserver/handler_v1.go index 9ae787ac2b..f7c632178f 100644 --- a/internal/distributed/proxy/httpserver/handler_v1.go +++ b/internal/distributed/proxy/httpserver/handler_v1.go @@ -46,7 +46,7 @@ func (h *Handlers) checkDatabase(c *gin.Context, dbName string) bool { if err != nil { c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: Code(err), HTTPReturnMessage: err.Error()}) return false - } else if response.Status.ErrorCode != commonpb.ErrorCode_Success { + } else if response.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: int32(response.Status.ErrorCode), HTTPReturnMessage: response.Status.Reason}) return false } @@ -73,7 +73,7 @@ func (h *Handlers) describeCollection(c *gin.Context, dbName string, collectionN if err != nil { c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: Code(err), HTTPReturnMessage: err.Error()}) return nil, err - } else if response.Status.ErrorCode != commonpb.ErrorCode_Success { + } else if response.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: int32(response.Status.ErrorCode), HTTPReturnMessage: response.Status.Reason}) return nil, errors.New(response.Status.Reason) } @@ -94,7 +94,7 @@ func (h *Handlers) hasCollection(c *gin.Context, dbName string, collectionName s if err != nil { c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: Code(err), HTTPReturnMessage: err.Error()}) return false, err - } else if response.Status.ErrorCode != commonpb.ErrorCode_Success { + } else if response.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: int32(response.Status.ErrorCode), HTTPReturnMessage: response.Status.Reason}) return false, errors.New(response.Status.Reason) } else { @@ -128,7 +128,7 @@ func (h *Handlers) listCollections(c *gin.Context) { response, err := h.proxy.ShowCollections(c, &req) if err != nil { c.JSON(http.StatusOK, gin.H{HTTPReturnCode: Code(err), HTTPReturnMessage: err.Error()}) - } else if response.Status.ErrorCode != commonpb.ErrorCode_Success { + } else if response.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { c.JSON(http.StatusOK, gin.H{HTTPReturnCode: int32(response.Status.ErrorCode), HTTPReturnMessage: response.Status.Reason}) } else { var collections []string @@ -262,7 +262,7 @@ func (h *Handlers) getCollectionDetails(c *gin.Context) { collLoadState := "" if stateErr != nil { log.Warn("get collection load state fail", zap.String("collection", collectionName), zap.String("err", stateErr.Error())) - } else if stateResp.Status.ErrorCode != commonpb.ErrorCode_Success { + } else if stateResp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { log.Warn("get collection load state fail", zap.String("collection", collectionName), zap.String("err", stateResp.Status.Reason)) } else { collLoadState = stateResp.State.String() @@ -283,7 +283,7 @@ func (h *Handlers) getCollectionDetails(c *gin.Context) { if indexErr != nil { indexDesc = []gin.H{} log.Warn("get indexes description fail", zap.String("collection", collectionName), zap.String("vectorField", vectorField), zap.String("err", indexErr.Error())) - } else if indexResp.Status.ErrorCode != commonpb.ErrorCode_Success { + } else if indexResp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { indexDesc = []gin.H{} log.Warn("get indexes description fail", zap.String("collection", collectionName), zap.String("vectorField", vectorField), zap.String("err", indexResp.Status.Reason)) } else { @@ -381,7 +381,7 @@ func (h *Handlers) query(c *gin.Context) { response, err := h.proxy.Query(c, &req) if err != nil { c.JSON(http.StatusOK, gin.H{HTTPReturnCode: Code(err), HTTPReturnMessage: err.Error()}) - } else if response.Status.ErrorCode != commonpb.ErrorCode_Success { + } else if response.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { c.JSON(http.StatusOK, gin.H{HTTPReturnCode: int32(response.Status.ErrorCode), HTTPReturnMessage: response.Status.Reason}) } else { outputData, err := buildQueryResp(int64(0), response.OutputFields, response.FieldsData, nil, nil) @@ -435,7 +435,7 @@ func (h *Handlers) get(c *gin.Context) { response, err := h.proxy.Query(c, &req) if err != nil { c.JSON(http.StatusOK, gin.H{HTTPReturnCode: Code(err), HTTPReturnMessage: err.Error()}) - } else if response.Status.ErrorCode != commonpb.ErrorCode_Success { + } else if response.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { c.JSON(http.StatusOK, gin.H{HTTPReturnCode: int32(response.Status.ErrorCode), HTTPReturnMessage: response.Status.Reason}) } else { outputData, err := buildQueryResp(int64(0), response.OutputFields, response.FieldsData, nil, nil) @@ -487,7 +487,7 @@ func (h *Handlers) delete(c *gin.Context) { response, err := h.proxy.Delete(c, &req) if err != nil { c.JSON(http.StatusOK, gin.H{HTTPReturnCode: Code(err), HTTPReturnMessage: err.Error()}) - } else if response.Status.ErrorCode != commonpb.ErrorCode_Success { + } else if response.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { c.JSON(http.StatusOK, gin.H{HTTPReturnCode: int32(response.Status.ErrorCode), HTTPReturnMessage: response.Status.Reason}) } else { c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{}}) @@ -548,7 +548,7 @@ func (h *Handlers) insert(c *gin.Context) { response, err := h.proxy.Insert(c, &req) if err != nil { c.JSON(http.StatusOK, gin.H{HTTPReturnCode: Code(err), HTTPReturnMessage: err.Error()}) - } else if response.Status.ErrorCode != commonpb.ErrorCode_Success { + } else if response.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { c.JSON(http.StatusOK, gin.H{HTTPReturnCode: int32(response.Status.ErrorCode), HTTPReturnMessage: response.Status.Reason}) } else { switch response.IDs.GetIdField().(type) { @@ -607,7 +607,7 @@ func (h *Handlers) search(c *gin.Context) { response, err := h.proxy.Search(c, &req) if err != nil { c.JSON(http.StatusOK, gin.H{HTTPReturnCode: Code(err), HTTPReturnMessage: err.Error()}) - } else if response.Status.ErrorCode != commonpb.ErrorCode_Success { + } else if response.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { c.JSON(http.StatusOK, gin.H{HTTPReturnCode: int32(response.Status.ErrorCode), HTTPReturnMessage: response.Status.Reason}) } else { if response.Results.TopK == int64(0) { diff --git a/internal/distributed/proxy/service.go b/internal/distributed/proxy/service.go index 88b7255bcc..351d46a681 100644 --- a/internal/distributed/proxy/service.go +++ b/internal/distributed/proxy/service.go @@ -959,7 +959,7 @@ func (s *Server) Check(ctx context.Context, req *grpc_health_v1.HealthCheckReque if err != nil { return ret, err } - if state.Status.ErrorCode != commonpb.ErrorCode_Success { + if state.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { return ret, nil } if state.State.StateCode != commonpb.StateCode_Healthy { @@ -978,7 +978,7 @@ func (s *Server) Watch(req *grpc_health_v1.HealthCheckRequest, server grpc_healt if err != nil { return server.Send(ret) } - if state.Status.ErrorCode != commonpb.ErrorCode_Success { + if state.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { return server.Send(ret) } if state.State.StateCode != commonpb.StateCode_Healthy { diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index 835c075a84..e08727645b 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -2184,7 +2184,7 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) return constructFailedResponse(err), nil } - if it.result.Status.ErrorCode != commonpb.ErrorCode_Success { + if it.result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { setErrorIndex := func() { numRows := request.NumRows errIndex := make([]uint32, numRows) @@ -2379,13 +2379,13 @@ func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest) metrics.FailLabel).Inc() // Not every error case changes the status internally // change status there to handle it - if it.result.Status.ErrorCode == commonpb.ErrorCode_Success { + if it.result.GetStatus().GetErrorCode() == commonpb.ErrorCode_Success { it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError } return constructFailedResponse(err, it.result.Status.ErrorCode), nil } - if it.result.Status.ErrorCode != commonpb.ErrorCode_Success { + if it.result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { setErrorIndex := func() { numRows := request.NumRows errIndex := make([]uint32, numRows) @@ -3110,7 +3110,7 @@ func (node *Proxy) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.G log.Debug("GetPersistentSegmentInfo", zap.Int("len(infos)", len(infoResp.Infos)), zap.Any("status", infoResp.Status)) - if infoResp.Status.ErrorCode != commonpb.ErrorCode_Success { + if infoResp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() resp.Status.Reason = infoResp.Status.Reason @@ -3185,7 +3185,7 @@ func (node *Proxy) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.GetQue log.Debug("GetQuerySegmentInfo", zap.Any("infos", infoResp.Infos), zap.Any("status", infoResp.Status)) - if infoResp.Status.ErrorCode != commonpb.ErrorCode_Success { + if infoResp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() log.Error("Failed to get segment info from QueryCoord", zap.String("errMsg", infoResp.Status.Reason)) diff --git a/internal/proxy/meta_cache.go b/internal/proxy/meta_cache.go index 6e387b7e87..12f6f3b37e 100644 --- a/internal/proxy/meta_cache.go +++ b/internal/proxy/meta_cache.go @@ -621,7 +621,7 @@ func (m *MetaCache) showPartitions(ctx context.Context, dbName string, collectio if err != nil { return nil, err } - if partitions.Status.ErrorCode != commonpb.ErrorCode_Success { + if partitions.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { return nil, fmt.Errorf("%s", partitions.Status.Reason) } @@ -808,11 +808,11 @@ func (m *MetaCache) GetShards(ctx context.Context, withCache bool, database, col if err != nil { return retry.Unrecoverable(err) } - if resp.Status.ErrorCode == commonpb.ErrorCode_Success { + if resp.GetStatus().GetErrorCode() == commonpb.ErrorCode_Success { return nil } // do not retry unless got NoReplicaAvailable from querycoord - if resp.Status.ErrorCode != commonpb.ErrorCode_NoReplicaAvailable { + if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_NoReplicaAvailable { return retry.Unrecoverable(fmt.Errorf("fail to get shard leaders from QueryCoord: %s", resp.Status.Reason)) } return fmt.Errorf("fail to get shard leaders from QueryCoord: %s", resp.Status.Reason) @@ -820,7 +820,7 @@ func (m *MetaCache) GetShards(ctx context.Context, withCache bool, database, col if err != nil { return nil, err } - if resp.Status.ErrorCode != commonpb.ErrorCode_Success { + if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { return nil, fmt.Errorf("fail to get shard leaders from QueryCoord: %s", resp.Status.Reason) } diff --git a/internal/proxy/segment.go b/internal/proxy/segment.go index cc2458f6c9..a4f4fff9f3 100644 --- a/internal/proxy/segment.go +++ b/internal/proxy/segment.go @@ -324,7 +324,7 @@ func (sa *segIDAssigner) syncSegments() (bool, error) { return false, fmt.Errorf("syncSegmentID Failed:%w", err) } - if resp.Status.ErrorCode != commonpb.ErrorCode_Success { + if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { return false, fmt.Errorf("syncSegmentID Failed:%s", resp.Status.Reason) } diff --git a/internal/proxy/task.go b/internal/proxy/task.go index 2e291e7174..705fe67628 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -434,7 +434,7 @@ func (hct *hasCollectionTask) Execute(ctx context.Context) error { if hct.result == nil { return errors.New("has collection resp is nil") } - if hct.result.Status.ErrorCode != commonpb.ErrorCode_Success { + if hct.result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { return errors.New(hct.result.Status.Reason) } return nil @@ -522,7 +522,7 @@ func (dct *describeCollectionTask) Execute(ctx context.Context) error { return err } - if result.Status.ErrorCode != commonpb.ErrorCode_Success { + if result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { dct.result.Status = result.Status // compatibility with PyMilvus existing implementation @@ -645,7 +645,7 @@ func (sct *showCollectionsTask) Execute(ctx context.Context) error { return errors.New("failed to show collections") } - if respFromRootCoord.Status.ErrorCode != commonpb.ErrorCode_Success { + if respFromRootCoord.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { return errors.New(respFromRootCoord.Status.Reason) } @@ -683,7 +683,7 @@ func (sct *showCollectionsTask) Execute(ctx context.Context) error { return errors.New("failed to show collections") } - if resp.Status.ErrorCode != commonpb.ErrorCode_Success { + if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { // update collectionID to collection name, and return new error info to sdk newErrorReason := resp.Status.Reason for _, collectionID := range collectionIDs { @@ -1059,7 +1059,7 @@ func (hpt *hasPartitionTask) Execute(ctx context.Context) (err error) { if err != nil { return err } - if hpt.result.Status.ErrorCode != commonpb.ErrorCode_Success { + if hpt.result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { return errors.New(hpt.result.Status.Reason) } return err @@ -1144,7 +1144,7 @@ func (spt *showPartitionsTask) Execute(ctx context.Context) error { return errors.New("failed to show partitions") } - if respFromRootCoord.Status.ErrorCode != commonpb.ErrorCode_Success { + if respFromRootCoord.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { return errors.New(respFromRootCoord.Status.Reason) } @@ -1188,7 +1188,7 @@ func (spt *showPartitionsTask) Execute(ctx context.Context) error { return errors.New("failed to show partitions") } - if resp.Status.ErrorCode != commonpb.ErrorCode_Success { + if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { return errors.New(resp.Status.Reason) } @@ -1303,7 +1303,7 @@ func (ft *flushTask) Execute(ctx context.Context) error { if err != nil { return fmt.Errorf("failed to call flush to data coordinator: %s", err.Error()) } - if resp.Status.ErrorCode != commonpb.ErrorCode_Success { + if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { return errors.New(resp.Status.Reason) } coll2Segments[collName] = &schemapb.LongArray{Data: resp.GetSegmentIDs()} @@ -1417,7 +1417,7 @@ func (lct *loadCollectionTask) Execute(ctx context.Context) (err error) { if err != nil { return err } - if indexResponse.Status.ErrorCode != commonpb.ErrorCode_Success { + if indexResponse.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { return errors.New(indexResponse.Status.Reason) } @@ -1645,7 +1645,7 @@ func (lpt *loadPartitionsTask) Execute(ctx context.Context) error { if err != nil { return err } - if indexResponse.Status.ErrorCode != commonpb.ErrorCode_Success { + if indexResponse.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { return errors.New(indexResponse.Status.Reason) } @@ -2230,7 +2230,7 @@ func (t *DescribeResourceGroupTask) Execute(ctx context.Context) error { return ret, nil } - if resp.Status.ErrorCode == commonpb.ErrorCode_Success { + if resp.GetStatus().GetErrorCode() == commonpb.ErrorCode_Success { rgInfo := resp.GetResourceGroup() numLoadedReplica, err := getCollectionName(rgInfo.NumLoadedReplica) diff --git a/internal/proxy/task_index.go b/internal/proxy/task_index.go index f366ff761f..c0345d2745 100644 --- a/internal/proxy/task_index.go +++ b/internal/proxy/task_index.go @@ -495,7 +495,7 @@ func (dit *describeIndexTask) Execute(ctx context.Context) error { } dit.result = &milvuspb.DescribeIndexResponse{} dit.result.Status = resp.GetStatus() - if dit.result.Status.ErrorCode != commonpb.ErrorCode_Success { + if dit.result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { return errors.New(dit.result.Status.Reason) } for _, indexInfo := range resp.IndexInfos { @@ -614,7 +614,7 @@ func (dit *getIndexStatisticsTask) Execute(ctx context.Context) error { } dit.result = &milvuspb.GetIndexStatisticsResponse{} dit.result.Status = resp.GetStatus() - if dit.result.Status.ErrorCode != commonpb.ErrorCode_Success { + if dit.result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { return errors.New(dit.result.Status.Reason) } for _, indexInfo := range resp.IndexInfos { diff --git a/internal/proxy/task_statistic.go b/internal/proxy/task_statistic.go index efe7a42a5a..874332255c 100644 --- a/internal/proxy/task_statistic.go +++ b/internal/proxy/task_statistic.go @@ -241,7 +241,7 @@ func (g *getStatisticsTask) getStatisticsFromDataCoord(ctx context.Context) erro if err != nil { return err } - if result.Status.ErrorCode != commonpb.ErrorCode_Success { + if result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { return errors.New(result.Status.Reason) } if g.resultBuf == nil { @@ -335,7 +335,7 @@ func checkFullLoaded(ctx context.Context, qc types.QueryCoord, dbName string, co if err != nil { return nil, nil, fmt.Errorf("showPartitions failed, collection = %d, partitionIDs = %v, err = %s", collectionID, searchPartitionIDs, err) } - if resp.Status.ErrorCode != commonpb.ErrorCode_Success { + if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { return nil, nil, fmt.Errorf("showPartitions failed, collection = %d, partitionIDs = %v, reason = %s", collectionID, searchPartitionIDs, resp.GetStatus().GetReason()) } @@ -360,7 +360,7 @@ func checkFullLoaded(ctx context.Context, qc types.QueryCoord, dbName string, co if err != nil { return nil, nil, fmt.Errorf("showPartitions failed, collection = %d, partitionIDs = %v, err = %s", collectionID, searchPartitionIDs, err) } - if resp.Status.ErrorCode != commonpb.ErrorCode_Success { + if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { return nil, nil, fmt.Errorf("showPartitions failed, collection = %d, partitionIDs = %v, reason = %s", collectionID, searchPartitionIDs, resp.GetStatus().GetReason()) } @@ -462,7 +462,7 @@ func reduceStatisticResponse(results []map[string]string) ([]*commonpb.KeyValueP // if err != nil { // return err // } -// if result.Status.ErrorCode != commonpb.ErrorCode_Success { +// if result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { // return errors.New(result.Status.Reason) // } // g.toReduceResults = append(g.toReduceResults, &internalpb.GetStatisticsResponse{ @@ -534,7 +534,7 @@ func reduceStatisticResponse(results []map[string]string) ([]*commonpb.KeyValueP // if err != nil { // return err // } -// if result.Status.ErrorCode != commonpb.ErrorCode_Success { +// if result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { // return errors.New(result.Status.Reason) // } // g.toReduceResults = append(g.toReduceResults, &internalpb.GetStatisticsResponse{ @@ -557,7 +557,7 @@ func reduceStatisticResponse(results []map[string]string) ([]*commonpb.KeyValueP // if err != nil { // return err // } -// if result.Status.ErrorCode != commonpb.ErrorCode_Success { +// if result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { // return errors.New(result.Status.Reason) // } // g.toReduceResults = append(g.toReduceResults, &internalpb.GetStatisticsResponse{ @@ -656,7 +656,7 @@ func (g *getCollectionStatisticsTask) Execute(ctx context.Context) error { if err != nil { return err } - if result.Status.ErrorCode != commonpb.ErrorCode_Success { + if result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { return errors.New(result.Status.Reason) } g.result = &milvuspb.GetCollectionStatisticsResponse{ @@ -746,7 +746,7 @@ func (g *getPartitionStatisticsTask) Execute(ctx context.Context) error { if result == nil { return errors.New("get partition statistics resp is nil") } - if result.Status.ErrorCode != commonpb.ErrorCode_Success { + if result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { return errors.New(result.Status.Reason) } g.result = &milvuspb.GetPartitionStatisticsResponse{ diff --git a/internal/proxy/timestamp.go b/internal/proxy/timestamp.go index b5850ab2a1..2fbf41127c 100644 --- a/internal/proxy/timestamp.go +++ b/internal/proxy/timestamp.go @@ -66,7 +66,7 @@ func (ta *timestampAllocator) alloc(ctx context.Context, count uint32) ([]Timest if err != nil { return nil, fmt.Errorf("syncTimestamp Failed:%w", err) } - if resp.Status.ErrorCode != commonpb.ErrorCode_Success { + if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { return nil, fmt.Errorf("syncTimeStamp Failed:%s", resp.Status.Reason) } start, cnt := resp.Timestamp, resp.Count diff --git a/internal/proxy/util.go b/internal/proxy/util.go index 569cdc88cd..d2895859ea 100644 --- a/internal/proxy/util.go +++ b/internal/proxy/util.go @@ -1011,7 +1011,7 @@ func isCollectionLoaded(ctx context.Context, qc types.QueryCoord, collID int64) if err != nil { return false, err } - if resp.Status.ErrorCode != commonpb.ErrorCode_Success { + if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { return false, errors.New(resp.Status.Reason) } @@ -1032,7 +1032,7 @@ func isPartitionLoaded(ctx context.Context, qc types.QueryCoord, collID int64, p if err != nil { return false, err } - if resp.Status.ErrorCode != commonpb.ErrorCode_Success { + if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { return false, errors.New(resp.Status.Reason) } @@ -1203,13 +1203,13 @@ func getCollectionProgress( return } - if resp.Status.ErrorCode == commonpb.ErrorCode_InsufficientMemoryToLoad { + if resp.GetStatus().GetErrorCode() == commonpb.ErrorCode_InsufficientMemoryToLoad { err = ErrInsufficientMemory log.Warn("detected insufficientMemoryError when getCollectionProgress", zap.Int64("collection_id", collectionID), zap.String("reason", resp.GetStatus().GetReason())) return } - if resp.Status.ErrorCode != commonpb.ErrorCode_Success { + if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { err = merr.Error(resp.GetStatus()) log.Warn("fail to show collections", zap.Int64("collection_id", collectionID), zap.String("reason", resp.Status.Reason)) diff --git a/internal/querynodev2/handlers.go b/internal/querynodev2/handlers.go index dd7bea4f92..4bd0f01fbe 100644 --- a/internal/querynodev2/handlers.go +++ b/internal/querynodev2/handlers.go @@ -149,10 +149,10 @@ func (node *QueryNode) queryChannel(ctx context.Context, req *querypb.QueryReque zap.String("scope", req.GetScope().String()), ) - failRet := WrapRetrieveResult(commonpb.ErrorCode_UnexpectedError, "") + var err error metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.TotalLabel, metrics.Leader).Inc() defer func() { - if failRet.Status.ErrorCode != commonpb.ErrorCode_Success { + if err != nil { metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.FailLabel, metrics.Leader).Inc() } }() @@ -170,18 +170,16 @@ func (node *QueryNode) queryChannel(ctx context.Context, req *querypb.QueryReque // get delegator sd, ok := node.delegators.Get(channel) if !ok { - err := merr.WrapErrServiceUnavailable("failed to get shard delegator for query") + err := merr.WrapErrChannelNotFound(channel) log.Warn("Query failed, failed to get shard delegator for query", zap.Error(err)) - failRet.Status = merr.Status(err) - return failRet, nil + return nil, err } // do query results, err := sd.Query(queryCtx, req) if err != nil { log.Warn("failed to query on delegator", zap.Error(err)) - failRet.Status.Reason = err.Error() - return failRet, nil + return nil, err } // reduce result @@ -196,16 +194,14 @@ func (node *QueryNode) queryChannel(ctx context.Context, req *querypb.QueryReque if collection == nil { err := merr.WrapErrCollectionNotFound(req.Req.GetCollectionID()) log.Warn("Query failed, failed to get collection", zap.Error(err)) - failRet.Status = merr.Status(err) - return failRet, nil + return nil, err } reducer := segments.CreateInternalReducer(req, collection.Schema()) - ret, err := reducer.Reduce(ctx, results) + resp, err := reducer.Reduce(ctx, results) if err != nil { - failRet.Status.Reason = err.Error() - return failRet, nil + return nil, err } tr.CtxElapse(ctx, fmt.Sprintf("do query with channel done , vChannel = %s, segmentIDs = %v", @@ -213,17 +209,14 @@ func (node *QueryNode) queryChannel(ctx context.Context, req *querypb.QueryReque req.GetSegmentIDs(), )) - // - ret.Status = merr.Status(nil) latency := tr.ElapseSpan() metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.Leader).Observe(float64(latency.Milliseconds())) metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.SuccessLabel, metrics.Leader).Inc() - return ret, nil + return resp, nil } func (node *QueryNode) queryChannelStream(ctx context.Context, req *querypb.QueryRequest, channel string, srv streamrpc.QueryStreamServer) error { metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.TotalLabel, metrics.Leader).Inc() - failRet := WrapRetrieveResult(commonpb.ErrorCode_UnexpectedError, "") msgID := req.Req.Base.GetMsgID() log := log.Ctx(ctx).With( zap.Int64("msgID", msgID), @@ -232,8 +225,9 @@ func (node *QueryNode) queryChannelStream(ctx context.Context, req *querypb.Quer zap.String("scope", req.GetScope().String()), ) + var err error defer func() { - if failRet.Status.ErrorCode != commonpb.ErrorCode_Success { + if err != nil { metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.FailLabel, metrics.Leader).Inc() } }() @@ -252,13 +246,13 @@ func (node *QueryNode) queryChannelStream(ctx context.Context, req *querypb.Quer // get delegator sd, ok := node.delegators.Get(channel) if !ok { - err := merr.WrapErrServiceUnavailable("failed to get query shard delegator") + err := merr.WrapErrChannelNotFound(channel) log.Warn("Query failed, failed to get query shard delegator", zap.Error(err)) return err } // do query - err := sd.QueryStream(queryCtx, req, srv) + err = sd.QueryStream(queryCtx, req, srv) if err != nil { return err } @@ -373,10 +367,10 @@ func (node *QueryNode) searchChannel(ctx context.Context, req *querypb.SearchReq } defer node.lifetime.Done() - failRet := WrapSearchResult(commonpb.ErrorCode_UnexpectedError, "") + var err error metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel, metrics.TotalLabel, metrics.Leader).Inc() defer func() { - if failRet.Status.ErrorCode != commonpb.ErrorCode_Success { + if err != nil { metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel, metrics.FailLabel, metrics.Leader).Inc() } }() @@ -393,23 +387,20 @@ func (node *QueryNode) searchChannel(ctx context.Context, req *querypb.SearchReq // get delegator sd, ok := node.delegators.Get(channel) if !ok { - err := merr.WrapErrServiceUnavailable("failed to get shard delegator for search") + err := merr.WrapErrChannelNotFound(channel) log.Warn("Query failed, failed to get shard delegator for search", zap.Error(err)) - failRet.Status.Reason = err.Error() - return failRet, err + return nil, err } - req, err := node.optimizeSearchParams(ctx, req, sd) + req, err = node.optimizeSearchParams(ctx, req, sd) if err != nil { log.Warn("failed to optimize search params", zap.Error(err)) - failRet.Status.Reason = err.Error() - return failRet, err + return nil, err } // do search results, err := sd.Search(searchCtx, req) if err != nil { log.Warn("failed to search on delegator", zap.Error(err)) - failRet.Status.Reason = err.Error() - return failRet, err + return nil, err } // reduce result @@ -420,10 +411,9 @@ func (node *QueryNode) searchChannel(ctx context.Context, req *querypb.SearchReq req.GetSegmentIDs(), )) - ret, err := segments.ReduceSearchResults(ctx, results, req.Req.GetNq(), req.Req.GetTopk(), req.Req.GetMetricType()) + resp, err := segments.ReduceSearchResults(ctx, results, req.Req.GetNq(), req.Req.GetTopk(), req.Req.GetMetricType()) if err != nil { - failRet.Status.Reason = err.Error() - return failRet, err + return nil, err } tr.CtxElapse(ctx, fmt.Sprintf("do search with channel done , vChannel = %s, segmentIDs = %v", @@ -432,14 +422,13 @@ func (node *QueryNode) searchChannel(ctx context.Context, req *querypb.SearchReq )) // update metric to prometheus - failRet.Status.ErrorCode = commonpb.ErrorCode_Success latency := tr.ElapseSpan() metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel, metrics.Leader).Observe(float64(latency.Milliseconds())) metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel, metrics.SuccessLabel, metrics.Leader).Inc() metrics.QueryNodeSearchNQ.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(req.Req.GetNq())) metrics.QueryNodeSearchTopK.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(req.Req.GetTopk())) - return ret, nil + return resp, nil } func (node *QueryNode) getChannelStatistics(ctx context.Context, req *querypb.GetStatisticsRequest, channel string) (*internalpb.GetStatisticsResponse, error) { @@ -448,11 +437,8 @@ func (node *QueryNode) getChannelStatistics(ctx context.Context, req *querypb.Ge zap.String("channel", channel), zap.String("scope", req.GetScope().String()), ) - failRet := &internalpb.GetStatisticsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - }, - } + + resp := &internalpb.GetStatisticsResponse{} if req.GetFromShardLeader() { var ( @@ -478,23 +464,26 @@ func (node *QueryNode) getChannelStatistics(ctx context.Context, req *querypb.Ge sd, ok := node.delegators.Get(channel) if !ok { - log.Warn("GetStatistics failed, failed to get query shard delegator") - return failRet, nil + err := merr.WrapErrChannelNotFound(channel, "failed to get channel statistics") + log.Warn("GetStatistics failed, failed to get query shard delegator", zap.Error(err)) + resp.Status = merr.Status(err) + return resp, nil } results, err := sd.GetStatistics(ctx, req) if err != nil { log.Warn("failed to get statistics from delegator", zap.Error(err)) - failRet.Status.Reason = err.Error() - return failRet, nil + resp.Status = merr.Status(err) + return resp, nil } - ret, err := reduceStatisticResponse(results) + resp, err = reduceStatisticResponse(results) if err != nil { - failRet.Status.Reason = err.Error() - return failRet, nil + log.Warn("failed to reduce channel statistics", zap.Error(err)) + resp.Status = merr.Status(err) + return resp, nil } - return ret, nil + return resp, nil } func segmentStatsResponse(segStats []segments.SegmentStats) *internalpb.GetStatisticsResponse { diff --git a/internal/querynodev2/metrics_info.go b/internal/querynodev2/metrics_info.go index 8f7b6f2a54..7e5c86d7ea 100644 --- a/internal/querynodev2/metrics_info.go +++ b/internal/querynodev2/metrics_info.go @@ -23,7 +23,6 @@ import ( "github.com/samber/lo" - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus/internal/querynodev2/collector" "github.com/milvus-io/milvus/internal/querynodev2/segments" @@ -163,10 +162,7 @@ func getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest, quotaMetrics, err := getQuotaMetrics(node) if err != nil { return &milvuspb.GetMetricsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: err.Error(), - }, + Status: merr.Status(err), ComponentName: metricsinfo.ConstructComponentName(typeutil.DataNodeRole, paramtable.GetNodeID()), }, nil } @@ -201,10 +197,7 @@ func getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest, resp, err := metricsinfo.MarshalComponentInfos(nodeInfos) if err != nil { return &milvuspb.GetMetricsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: err.Error(), - }, + Status: merr.Status(err), Response: "", ComponentName: metricsinfo.ConstructComponentName(typeutil.QueryNodeRole, paramtable.GetNodeID()), }, nil diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index df5c9e5a9d..e4f4d32c62 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -112,16 +112,11 @@ func (node *QueryNode) GetStatistics(ctx context.Context, req *querypb.GetStatis } defer node.lifetime.Done() - if !CheckTargetID(req.GetReq()) { - targetID := req.GetReq().GetBase().GetTargetID() - log.Warn("target ID not match", - zap.Int64("targetID", targetID), - zap.Int64("nodeID", paramtable.GetNodeID()), - ) + err := merr.CheckTargetID(req.GetReq().GetBase()) + if err != nil { + log.Warn("target ID check failed", zap.Error(err)) return &internalpb.GetStatisticsResponse{ - Status: util.WrapStatus(commonpb.ErrorCode_NodeIDNotMatch, - common.WrapNodeIDNotMatchMsg(targetID, paramtable.GetNodeID()), - ), + Status: merr.Status(err), }, nil } failRet := &internalpb.GetStatisticsResponse{ @@ -145,7 +140,7 @@ func (node *QueryNode) GetStatistics(ctx context.Context, req *querypb.GetStatis mu.Lock() defer mu.Unlock() if err != nil { - failRet.Status.Reason = err.Error() + failRet.Status = merr.Status(err) failRet.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError return err } @@ -164,7 +159,7 @@ func (node *QueryNode) GetStatistics(ctx context.Context, req *querypb.GetStatis ret, err := reduceStatisticResponse(toReduceResults) if err != nil { - failRet.Status.Reason = err.Error() + failRet.Status = merr.Status(err) return failRet, nil } log.Debug("reduce statistic result done") @@ -486,10 +481,7 @@ func (node *QueryNode) LoadSegments(ctx context.Context, req *querypb.LoadSegmen req.GetInfos()..., ) if err != nil { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: err.Error(), - }, nil + return merr.Status(err), nil } node.manager.Collection.Ref(req.GetCollectionID(), uint32(len(loaded))) @@ -686,16 +678,16 @@ func (node *QueryNode) SearchSegments(ctx context.Context, req *querypb.SearchRe zap.String("scope", req.GetScope().String()), ) - failRet := WrapSearchResult(commonpb.ErrorCode_UnexpectedError, "") + resp := &internalpb.SearchResults{} if !node.lifetime.Add(commonpbutil.IsHealthy) { - failRet.Status = merr.Status(merr.WrapErrServiceNotReady(fmt.Sprintf("node id: %d is unhealthy", paramtable.GetNodeID()))) - return failRet, nil + resp.Status = merr.Status(merr.WrapErrServiceNotReady(fmt.Sprintf("node id: %d is unhealthy", paramtable.GetNodeID()))) + return resp, nil } defer node.lifetime.Done() metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel, metrics.TotalLabel, metrics.FromLeader).Inc() defer func() { - if failRet.Status.ErrorCode != commonpb.ErrorCode_Success { + if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel, metrics.FailLabel, metrics.FromLeader).Inc() } }() @@ -713,22 +705,22 @@ func (node *QueryNode) SearchSegments(ctx context.Context, req *querypb.SearchRe if collection == nil { err := merr.WrapErrCollectionNotLoaded(req.GetReq().GetCollectionID()) log.Warn("failed to search segments", zap.Error(err)) - failRet.Status = merr.Status(err) - return failRet, nil + resp.Status = merr.Status(err) + return resp, nil } task := tasks.NewSearchTask(searchCtx, collection, node.manager, req) if err := node.scheduler.Add(task); err != nil { log.Warn("failed to search channel", zap.Error(err)) - failRet.Status.Reason = err.Error() - return failRet, nil + resp.Status = merr.Status(err) + return resp, nil } err := task.Wait() if err != nil { log.Warn("failed to search segments", zap.Error(err)) - failRet.Status.Reason = err.Error() - return failRet, nil + resp.Status = merr.Status(err) + return resp, nil } tr.CtxElapse(ctx, fmt.Sprintf("search segments done, channel = %s, segmentIDs = %v", @@ -736,16 +728,14 @@ func (node *QueryNode) SearchSegments(ctx context.Context, req *querypb.SearchRe req.GetSegmentIDs(), )) - // TODO QueryNodeSQLatencyInQueue QueryNodeReduceLatency - failRet.Status.ErrorCode = commonpb.ErrorCode_Success latency := tr.ElapseSpan() metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel, metrics.FromLeader).Observe(float64(latency.Milliseconds())) metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel, metrics.SuccessLabel, metrics.FromLeader).Inc() - result := task.Result() - result.GetCostAggregation().ResponseTime = tr.ElapseSpan().Milliseconds() - result.GetCostAggregation().TotalNQ = node.scheduler.GetWaitingTaskTotalNQ() - return result, nil + resp = task.Result() + resp.GetCostAggregation().ResponseTime = tr.ElapseSpan().Milliseconds() + resp.GetCostAggregation().TotalNQ = node.scheduler.GetWaitingTaskTotalNQ() + return resp, nil } // Search performs replica search tasks. @@ -776,14 +766,12 @@ func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) ( } defer node.lifetime.Done() - if !CheckTargetID(req.GetReq()) { - targetID := req.GetReq().GetBase().GetTargetID() - log.Warn("target ID not match", - zap.Int64("targetID", targetID), - zap.Int64("nodeID", paramtable.GetNodeID()), - ) - return WrapSearchResult(commonpb.ErrorCode_NodeIDNotMatch, - common.WrapNodeIDNotMatchMsg(targetID, paramtable.GetNodeID())), nil + err := merr.CheckTargetID(req.GetReq().GetBase()) + if err != nil { + log.Warn("target ID check failed", zap.Error(err)) + return &internalpb.SearchResults{ + Status: merr.Status(err), + }, nil } failRet := &internalpb.SearchResults{ @@ -828,8 +816,7 @@ func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) ( mu.Lock() defer mu.Unlock() if err != nil { - failRet.Status.Reason = err.Error() - failRet.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError + failRet.Status = merr.Status(err) return err } if ret.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { @@ -848,7 +835,7 @@ func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) ( if err != nil { log.Warn("failed to reduce search results", zap.Error(err)) failRet.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError - failRet.Status.Reason = err.Error() + failRet.Status = merr.Status(err) return failRet, nil } reduceLatency := tr.RecordSpan() @@ -868,7 +855,11 @@ func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) ( // only used for delegator query segments from worker func (node *QueryNode) QuerySegments(ctx context.Context, req *querypb.QueryRequest) (*internalpb.RetrieveResults, error) { - failRet := WrapRetrieveResult(commonpb.ErrorCode_UnexpectedError, "") + failRet := &internalpb.RetrieveResults{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + }, + } msgID := req.Req.Base.GetMsgID() traceID := trace.SpanFromContext(ctx).SpanContext().TraceID() channel := req.GetDmlChannels()[0] @@ -888,7 +879,7 @@ func (node *QueryNode) QuerySegments(ctx context.Context, req *querypb.QueryRequ metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.TotalLabel, metrics.FromLeader).Inc() defer func() { - if failRet.Status.ErrorCode != commonpb.ErrorCode_Success { + if failRet.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.FailLabel, metrics.FromLeader).Inc() } }() @@ -941,7 +932,6 @@ func (node *QueryNode) QuerySegments(ctx context.Context, req *querypb.QueryRequ } func (node *QueryNode) QueryStreamSegments(ctx context.Context, req *querypb.QueryRequest, streamer streamrpc.QueryStreamer) error { - failRet := WrapRetrieveResult(commonpb.ErrorCode_UnexpectedError, "") msgID := req.Req.Base.GetMsgID() traceID := trace.SpanFromContext(ctx).SpanContext().TraceID() channel := req.GetDmlChannels()[0] @@ -954,16 +944,17 @@ func (node *QueryNode) QueryStreamSegments(ctx context.Context, req *querypb.Que zap.String("scope", req.GetScope().String()), ) + resp := &internalpb.RetrieveResults{} metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.TotalLabel, metrics.FromLeader).Inc() defer func() { - if failRet.Status.ErrorCode != commonpb.ErrorCode_Success { + if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.FailLabel, metrics.FromLeader).Inc() } }() if !node.lifetime.Add(commonpbutil.IsHealthy) { - failRet.Status = merr.Status(merr.WrapErrServiceUnavailable(fmt.Sprintf("node id: %d is unhealthy", paramtable.GetNodeID()))) - srv.Send(failRet) + resp.Status = merr.Status(merr.WrapErrServiceUnavailable(fmt.Sprintf("node id: %d is unhealthy", paramtable.GetNodeID()))) + srv.Send(resp) return nil } defer node.lifetime.Done() @@ -977,8 +968,8 @@ func (node *QueryNode) QueryStreamSegments(ctx context.Context, req *querypb.Que err := node.queryStreamSegments(ctx, req, srv) if err != nil { - failRet.Status = merr.Status(err) - srv.Send(failRet) + resp.Status = merr.Status(err) + srv.Send(resp) return nil } @@ -989,7 +980,6 @@ func (node *QueryNode) QueryStreamSegments(ctx context.Context, req *querypb.Que req.GetSegmentIDs(), )) - failRet.Status.ErrorCode = commonpb.ErrorCode_Success // TODO QueryNodeSQLatencyInQueue QueryNodeReduceLatency latency := tr.ElapseSpan() metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.FromLeader).Observe(float64(latency.Milliseconds())) @@ -1027,14 +1017,12 @@ func (node *QueryNode) Query(ctx context.Context, req *querypb.QueryRequest) (*i } defer node.lifetime.Done() - if !CheckTargetID(req.GetReq()) { - targetID := req.GetReq().GetBase().GetTargetID() - log.Warn("target ID not match", - zap.Int64("targetID", targetID), - zap.Int64("nodeID", paramtable.GetNodeID()), - ) - return WrapRetrieveResult(commonpb.ErrorCode_NodeIDNotMatch, - common.WrapNodeIDNotMatchMsg(targetID, paramtable.GetNodeID())), nil + err := merr.CheckTargetID(req.GetReq().GetBase()) + if err != nil { + log.Warn("target ID check failed", zap.Error(err)) + return &internalpb.RetrieveResults{ + Status: merr.Status(err), + }, nil } toMergeResults := make([]*internalpb.RetrieveResults, len(req.GetDmlChannels())) @@ -1064,14 +1052,18 @@ func (node *QueryNode) Query(ctx context.Context, req *querypb.QueryRequest) (*i }) } if err := runningGp.Wait(); err != nil { - return WrapRetrieveResult(commonpb.ErrorCode_UnexpectedError, "failed to query channel", err), nil + return &internalpb.RetrieveResults{ + Status: merr.Status(err), + }, nil } tr.RecordSpan() reducer := segments.CreateInternalReducer(req, node.manager.Collection.Get(req.GetReq().GetCollectionID()).Schema()) ret, err := reducer.Reduce(ctx, toMergeResults) if err != nil { - return WrapRetrieveResult(commonpb.ErrorCode_UnexpectedError, "failed to query channel", err), nil + return &internalpb.RetrieveResults{ + Status: merr.Status(err), + }, nil } reduceLatency := tr.RecordSpan() metrics.QueryNodeReduceLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.ReduceShards). @@ -1112,15 +1104,10 @@ func (node *QueryNode) QueryStream(ctx context.Context, req *querypb.QueryReques } defer node.lifetime.Done() - if !CheckTargetID(req.GetReq()) { - targetID := req.GetReq().GetBase().GetTargetID() - log.Warn("target ID not match", - zap.Int64("targetID", targetID), - zap.Int64("nodeID", paramtable.GetNodeID()), - ) - srv.Send(WrapRetrieveResult(commonpb.ErrorCode_NodeIDNotMatch, - common.WrapNodeIDNotMatchMsg(targetID, paramtable.GetNodeID()))) - return nil + err := merr.CheckTargetID(req.GetReq().GetBase()) + if err != nil { + log.Warn("target ID check failed", zap.Error(err)) + return err } runningGp, runningCtx := errgroup.WithContext(ctx) @@ -1145,7 +1132,9 @@ func (node *QueryNode) QueryStream(ctx context.Context, req *querypb.QueryReques } if err := runningGp.Wait(); err != nil { - srv.Send(WrapRetrieveResult(commonpb.ErrorCode_UnexpectedError, "failed to query channel", err)) + srv.Send(&internalpb.RetrieveResults{ + Status: merr.Status(err), + }) return nil } @@ -1214,10 +1203,7 @@ func (node *QueryNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsR zap.Error(err)) return &milvuspb.GetMetricsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: err.Error(), - }, + Status: merr.Status(err), }, nil } @@ -1230,10 +1216,7 @@ func (node *QueryNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsR zap.String("metricType", metricType), zap.Error(err)) return &milvuspb.GetMetricsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: err.Error(), - }, + Status: merr.Status(err), }, nil } log.RatedDebug(50, "QueryNode.GetMetrics", @@ -1251,11 +1234,7 @@ func (node *QueryNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsR zap.String("metricType", metricType)) return &milvuspb.GetMetricsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: metricsinfo.MsgUnimplementedMetric, - }, - Response: "", + Status: merr.Status(merr.WrapErrMetricNotFound(metricType)), }, nil } @@ -1378,11 +1357,9 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi // get shard delegator shardDelegator, ok := node.delegators.Get(req.GetChannel()) if !ok { + err := merr.WrapErrChannelNotFound(req.GetChannel()) log.Warn("failed to find shard cluster when sync") - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "shard not exist", - }, nil + return merr.Status(err), nil } // translate segment action @@ -1416,10 +1393,7 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi shardDelegator.SyncTargetVersion(action.GetTargetVersion(), action.GetGrowingInTarget(), action.GetSealedInTarget(), action.GetDroppedInTarget()) default: - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "unexpected action type", - }, nil + return merr.Status(merr.WrapErrServiceInternal("unknown action type", action.GetType().String())), nil } } @@ -1503,10 +1477,7 @@ func (node *QueryNode) Delete(ctx context.Context, req *querypb.DeleteRequest) ( err := segment.Delete(pks, req.GetTimestamps()) if err != nil { log.Warn("segment delete failed", zap.Error(err)) - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: fmt.Sprintf("delete on segment %d failed, %s", req.GetSegmentId(), err.Error()), - }, nil + return merr.Status(err), nil } } diff --git a/internal/querynodev2/services_test.go b/internal/querynodev2/services_test.go index 6789bfa185..60cdd51c25 100644 --- a/internal/querynodev2/services_test.go +++ b/internal/querynodev2/services_test.go @@ -1189,8 +1189,7 @@ func (suite *ServiceSuite) TestSearch_Failed() { // Delegator not found resp, err = suite.node.Search(ctx, req) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) - suite.Contains(resp.GetStatus().GetReason(), merr.ErrServiceUnavailable.Error()) + suite.ErrorIs(merr.Error(resp.GetStatus()), merr.ErrChannelNotFound) suite.TestWatchDmChannelsInt64() suite.TestLoadSegments_Int64() @@ -1335,8 +1334,7 @@ func (suite *ServiceSuite) TestQuery_Failed() { // Delegator not found resp, err := suite.node.Query(ctx, req) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) - suite.Contains(resp.GetStatus().GetReason(), merr.ErrServiceUnavailable.Error()) + suite.ErrorIs(merr.Error(resp.GetStatus()), merr.ErrChannelNotFound) suite.TestWatchDmChannelsInt64() suite.TestLoadSegments_Int64() @@ -1467,8 +1465,7 @@ func (suite *ServiceSuite) TestQueryStream_Failed() { err = merr.Error(result.GetStatus()) // Check result if err != nil { - suite.Equal(commonpb.ErrorCode_UnexpectedError, result.GetStatus().GetErrorCode()) - suite.Contains(err.Error(), merr.ErrServiceUnavailable.Error()) + suite.ErrorIs(err, merr.ErrChannelNotFound) } } @@ -1674,8 +1671,8 @@ func (suite *ServiceSuite) TestGetMetric_Failed() { resp, err := suite.node.GetMetrics(ctx, req) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode) - suite.Equal(metricsinfo.MsgUnimplementedMetric, resp.Status.Reason) + err = merr.Error(resp.GetStatus()) + suite.ErrorIs(err, merr.ErrMetricNotFound) // metric parse failed req.Request = "---" diff --git a/internal/querynodev2/utils.go b/internal/querynodev2/utils.go deleted file mode 100644 index d63a94a76f..0000000000 --- a/internal/querynodev2/utils.go +++ /dev/null @@ -1,27 +0,0 @@ -package querynodev2 - -import ( - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus/internal/proto/internalpb" - "github.com/milvus-io/milvus/internal/util" - "github.com/milvus-io/milvus/pkg/util/paramtable" -) - -func WrapRetrieveResult(code commonpb.ErrorCode, msg string, errs ...error) *internalpb.RetrieveResults { - return &internalpb.RetrieveResults{ - Status: util.WrapStatus(code, msg, errs...), - } -} - -func WrapSearchResult(code commonpb.ErrorCode, msg string, errs ...error) *internalpb.SearchResults { - return &internalpb.SearchResults{ - Status: util.WrapStatus(code, msg, errs...), - } -} - -// CheckTargetID checks whether the target ID of request is the server itself, -// returns true if matched, -// returns false otherwise -func CheckTargetID[R interface{ GetBase() *commonpb.MsgBase }](req R) bool { - return req.GetBase().GetTargetID() == paramtable.GetNodeID() -} diff --git a/internal/rootcoord/broker.go b/internal/rootcoord/broker.go index d9f4eab464..2767acc08b 100644 --- a/internal/rootcoord/broker.go +++ b/internal/rootcoord/broker.go @@ -250,7 +250,7 @@ func (b *ServerBroker) GetSegmentIndexState(ctx context.Context, collID UniqueID if err != nil { return nil, err } - if resp.Status.ErrorCode != commonpb.ErrorCode_Success { + if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { return nil, errors.New(resp.Status.Reason) } diff --git a/internal/util/componentutil/componentutil.go b/internal/util/componentutil/componentutil.go index 0219c69d2a..7133c6d5c0 100644 --- a/internal/util/componentutil/componentutil.go +++ b/internal/util/componentutil/componentutil.go @@ -36,7 +36,7 @@ func WaitForComponentStates(ctx context.Context, service types.Component, servic return err } - if resp.Status.ErrorCode != commonpb.ErrorCode_Success { + if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { return errors.New(resp.Status.Reason) } diff --git a/pkg/util/merr/utils.go b/pkg/util/merr/utils.go index 79fa1ea639..5a9dd8c40e 100644 --- a/pkg/util/merr/utils.go +++ b/pkg/util/merr/utils.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/errors" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) var ( @@ -148,6 +149,14 @@ func CheckHealthy(state commonpb.StateCode) error { return nil } +func CheckTargetID(msg *commonpb.MsgBase) error { + if msg.GetTargetID() != paramtable.GetNodeID() { + return WrapErrNodeNotMatch(paramtable.GetNodeID(), msg.GetTargetID()) + } + + return nil +} + // Service related func WrapErrServiceNotReady(stage string, msg ...string) error { err := errors.Wrapf(ErrServiceNotReady, "stage=%s", stage)