From 710e2ca185aa1554022ff3945e47a6fe0c2c2944 Mon Sep 17 00:00:00 2001 From: xige-16 Date: Thu, 24 Jun 2021 16:00:15 +0800 Subject: [PATCH] return error before queryCoord ready (#6067) Signed-off-by: xige-16 --- internal/querycoord/impl.go | 137 ++++++++++++++++++++++++++++-------- 1 file changed, 109 insertions(+), 28 deletions(-) diff --git a/internal/querycoord/impl.go b/internal/querycoord/impl.go index f5e4e5cdd5..66edd5970c 100644 --- a/internal/querycoord/impl.go +++ b/internal/querycoord/impl.go @@ -72,12 +72,22 @@ func (qc *QueryCoord) GetStatisticsChannel(ctx context.Context) (*milvuspb.Strin func (qc *QueryCoord) ShowCollections(ctx context.Context, req *querypb.ShowCollectionsRequest) (*querypb.ShowCollectionsResponse, error) { dbID := req.DbID log.Debug("show collection start", zap.Int64("dbID", dbID)) + status := &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + } + if qc.stateCode.Load() != internalpb.StateCode_Healthy { + status.ErrorCode = commonpb.ErrorCode_UnexpectedError + err := errors.New("query coordinator is not healthy") + status.Reason = err.Error() + log.Debug("show collection end with query coordinator not healthy") + return &querypb.ShowCollectionsResponse{ + Status: status, + }, err + } collectionIDs := qc.meta.showCollections() log.Debug("show collection end", zap.Int64s("collections", collectionIDs)) return &querypb.ShowCollectionsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, + Status: status, CollectionIDs: collectionIDs, }, nil } @@ -90,6 +100,13 @@ func (qc *QueryCoord) LoadCollection(ctx context.Context, req *querypb.LoadColle status := &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, } + if qc.stateCode.Load() != internalpb.StateCode_Healthy { + status.ErrorCode = commonpb.ErrorCode_UnexpectedError + err := errors.New("query coordinator is not healthy") + status.Reason = err.Error() + log.Debug("load collection end with query coordinator not healthy") + return status, err + } hasCollection := qc.meta.hasCollection(collectionID) if hasCollection { @@ -132,6 +149,14 @@ func (qc *QueryCoord) ReleaseCollection(ctx context.Context, req *querypb.Releas status := &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, } + if qc.stateCode.Load() != internalpb.StateCode_Healthy { + status.ErrorCode = commonpb.ErrorCode_UnexpectedError + err := errors.New("query coordinator is not healthy") + status.Reason = err.Error() + log.Debug("release collection end with query coordinator not healthy") + return status, err + } + hasCollection := qc.meta.hasCollection(collectionID) if !hasCollection { log.Warn("release collection end, query coordinator don't have the log of", zap.String("collectionID", fmt.Sprintln(collectionID))) @@ -165,22 +190,32 @@ func (qc *QueryCoord) ReleaseCollection(ctx context.Context, req *querypb.Releas func (qc *QueryCoord) ShowPartitions(ctx context.Context, req *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) { collectionID := req.CollectionID log.Debug("show partitions start, ", zap.Int64("collectionID", collectionID)) + status := &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + } + if qc.stateCode.Load() != internalpb.StateCode_Healthy { + status.ErrorCode = commonpb.ErrorCode_UnexpectedError + err := errors.New("query coordinator is not healthy") + status.Reason = err.Error() + log.Debug("show partition end with query coordinator not healthy") + return &querypb.ShowPartitionsResponse{ + Status: status, + }, err + } + partitionIDs, err := qc.meta.showPartitions(collectionID) if err != nil { + status.ErrorCode = commonpb.ErrorCode_UnexpectedError + status.Reason = err.Error() return &querypb.ShowPartitionsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: err.Error(), - }, + Status: status, }, err } log.Debug("show partitions end", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs)) return &querypb.ShowPartitionsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, + Status: status, PartitionIDs: partitionIDs, }, nil } @@ -192,6 +227,13 @@ func (qc *QueryCoord) LoadPartitions(ctx context.Context, req *querypb.LoadParti status := &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, } + if qc.stateCode.Load() != internalpb.StateCode_Healthy { + status.ErrorCode = commonpb.ErrorCode_UnexpectedError + err := errors.New("query coordinator is not healthy") + status.Reason = err.Error() + log.Debug("load partition end with query coordinator not healthy") + return status, err + } if len(partitionIDs) == 0 { status.ErrorCode = commonpb.ErrorCode_UnexpectedError @@ -261,6 +303,14 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas status := &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, } + if qc.stateCode.Load() != internalpb.StateCode_Healthy { + status.ErrorCode = commonpb.ErrorCode_UnexpectedError + err := errors.New("query coordinator is not healthy") + status.Reason = err.Error() + log.Debug("release partition end with query coordinator not healthy") + return status, err + } + hasCollection := qc.meta.hasCollection(collectionID) if !hasCollection { log.Warn("release partitions end, query coordinator don't have the log of", zap.String("collectionID", fmt.Sprintln(collectionID))) @@ -299,29 +349,52 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas } func (qc *QueryCoord) CreateQueryChannel(ctx context.Context, req *querypb.CreateQueryChannelRequest) (*querypb.CreateQueryChannelResponse, error) { + status := &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + } + if qc.stateCode.Load() != internalpb.StateCode_Healthy { + status.ErrorCode = commonpb.ErrorCode_UnexpectedError + err := errors.New("query coordinator is not healthy") + status.Reason = err.Error() + log.Debug("createQueryChannel end with query coordinator not healthy") + return &querypb.CreateQueryChannelResponse{ + Status: status, + }, err + } + collectionID := req.CollectionID queryChannel, queryResultChannel := qc.meta.GetQueryChannel(collectionID) return &querypb.CreateQueryChannelResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, + Status: status, RequestChannel: queryChannel, ResultChannel: queryResultChannel, }, nil } func (qc *QueryCoord) GetPartitionStates(ctx context.Context, req *querypb.GetPartitionStatesRequest) (*querypb.GetPartitionStatesResponse, error) { + status := &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + } + if qc.stateCode.Load() != internalpb.StateCode_Healthy { + status.ErrorCode = commonpb.ErrorCode_UnexpectedError + err := errors.New("query coordinator is not healthy") + status.Reason = err.Error() + log.Debug("getPartitionStates end with query coordinator not healthy") + return &querypb.GetPartitionStatesResponse{ + Status: status, + }, err + } + partitionIDs := req.PartitionIDs partitionStates := make([]*querypb.PartitionStates, 0) for _, partitionID := range partitionIDs { state, err := qc.meta.getPartitionStateByID(partitionID) if err != nil { + status.ErrorCode = commonpb.ErrorCode_UnexpectedError + status.Reason = err.Error() return &querypb.GetPartitionStatesResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: err.Error(), - }, + Status: status, }, err } partitionState := &querypb.PartitionStates{ @@ -332,14 +405,25 @@ func (qc *QueryCoord) GetPartitionStates(ctx context.Context, req *querypb.GetPa } return &querypb.GetPartitionStatesResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, + Status: status, PartitionDescriptions: partitionStates, }, nil } func (qc *QueryCoord) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmentInfoRequest) (*querypb.GetSegmentInfoResponse, error) { + status := &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + } + if qc.stateCode.Load() != internalpb.StateCode_Healthy { + status.ErrorCode = commonpb.ErrorCode_UnexpectedError + err := errors.New("query coordinator is not healthy") + status.Reason = err.Error() + log.Debug("getSegmentInfo end with query coordinator not healthy") + return &querypb.GetSegmentInfoResponse{ + Status: status, + }, err + } + totalMemSize := int64(0) totalNumRows := int64(0) //TODO::get segment infos from meta @@ -347,11 +431,10 @@ func (qc *QueryCoord) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmen //segmentInfos, err := qs.meta.getSegmentInfos(segmentIDs) segmentInfos, err := qc.cluster.getSegmentInfo(ctx, req) if err != nil { + status.ErrorCode = commonpb.ErrorCode_UnexpectedError + status.Reason = err.Error() return &querypb.GetSegmentInfoResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: err.Error(), - }, + Status: status, }, err } for _, info := range segmentInfos { @@ -360,9 +443,7 @@ func (qc *QueryCoord) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmen } log.Debug("getSegmentInfo", zap.Int64("num rows", totalNumRows), zap.Int64("memory size", totalMemSize)) return &querypb.GetSegmentInfoResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, - Infos: segmentInfos, + Status: status, + Infos: segmentInfos, }, nil }