diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index a3e1166b34..4920719df7 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -63,25 +63,12 @@ const moduleName = "Proxy" const SlowReadSpan = time.Second * 5 -// UpdateStateCode updates the state code of Proxy. -func (node *Proxy) UpdateStateCode(code commonpb.StateCode) { - node.stateCode.Store(code) -} - // GetComponentStates gets the state of Proxy. func (node *Proxy) GetComponentStates(ctx context.Context, req *milvuspb.GetComponentStatesRequest) (*milvuspb.ComponentStates, error) { stats := &milvuspb.ComponentStates{ Status: merr.Status(nil), } - code, ok := node.stateCode.Load().(commonpb.StateCode) - if !ok { - errMsg := "unexpected error in type assertion" - stats.Status = &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: errMsg, - } - return stats, nil - } + code := node.GetStateCode() nodeID := common.NotRegisteredID if node.session != nil && node.session.Registered() { nodeID = node.session.ServerID @@ -106,8 +93,8 @@ func (node *Proxy) GetStatisticsChannel(ctx context.Context, req *internalpb.Get // InvalidateCollectionMetaCache invalidate the meta cache of specific collection. func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) { - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { - return unhealthyStatus(), nil + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return merr.Status(err), nil } ctx = logutil.WithModule(ctx, moduleName) @@ -148,8 +135,8 @@ func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *p } func (node *Proxy) CreateDatabase(ctx context.Context, request *milvuspb.CreateDatabaseRequest) (*commonpb.Status, error) { - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { - return unhealthyStatus(), nil + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return merr.Status(err), nil } ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-CreateDatabase") @@ -194,8 +181,8 @@ func (node *Proxy) CreateDatabase(ctx context.Context, request *milvuspb.CreateD } func (node *Proxy) DropDatabase(ctx context.Context, request *milvuspb.DropDatabaseRequest) (*commonpb.Status, error) { - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { - return unhealthyStatus(), nil + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return merr.Status(err), nil } ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-DropDatabase") @@ -238,7 +225,7 @@ func (node *Proxy) DropDatabase(ctx context.Context, request *milvuspb.DropDatab func (node *Proxy) ListDatabases(ctx context.Context, request *milvuspb.ListDatabasesRequest) (*milvuspb.ListDatabasesResponse, error) { resp := &milvuspb.ListDatabasesResponse{} - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { resp.Status = merr.Status(err) return resp, nil } @@ -286,8 +273,8 @@ func (node *Proxy) ListDatabases(ctx context.Context, request *milvuspb.ListData // CreateCollection create a collection by the schema. // TODO(dragondriver): add more detailed ut for ConsistencyLevel, should we support multiple consistency level in Proxy? func (node *Proxy) CreateCollection(ctx context.Context, request *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) { - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { - return unhealthyStatus(), nil + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return merr.Status(err), nil } ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-CreateCollection") @@ -355,8 +342,8 @@ func (node *Proxy) CreateCollection(ctx context.Context, request *milvuspb.Creat // DropCollection drop a collection. func (node *Proxy) DropCollection(ctx context.Context, request *milvuspb.DropCollectionRequest) (*commonpb.Status, error) { - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { - return unhealthyStatus(), nil + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return merr.Status(err), nil } ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-DropCollection") @@ -414,9 +401,9 @@ func (node *Proxy) DropCollection(ctx context.Context, request *milvuspb.DropCol // HasCollection check if the specific collection exists in Milvus. func (node *Proxy) HasCollection(ctx context.Context, request *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) { - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { return &milvuspb.BoolResponse{ - Status: unhealthyStatus(), + Status: merr.Status(err), }, nil } @@ -481,8 +468,8 @@ func (node *Proxy) HasCollection(ctx context.Context, request *milvuspb.HasColle // LoadCollection load a collection into query nodes. func (node *Proxy) LoadCollection(ctx context.Context, request *milvuspb.LoadCollectionRequest) (*commonpb.Status, error) { - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { - return unhealthyStatus(), nil + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return merr.Status(err), nil } ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-LoadCollection") @@ -542,8 +529,8 @@ func (node *Proxy) LoadCollection(ctx context.Context, request *milvuspb.LoadCol // ReleaseCollection remove the loaded collection from query nodes. func (node *Proxy) ReleaseCollection(ctx context.Context, request *milvuspb.ReleaseCollectionRequest) (*commonpb.Status, error) { - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { - return unhealthyStatus(), nil + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return merr.Status(err), nil } ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-ReleaseCollection") @@ -607,9 +594,9 @@ func (node *Proxy) ReleaseCollection(ctx context.Context, request *milvuspb.Rele // DescribeCollection get the meta information of specific collection, such as schema, created timestamp and etc. func (node *Proxy) DescribeCollection(ctx context.Context, request *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) { - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { return &milvuspb.DescribeCollectionResponse{ - Status: unhealthyStatus(), + Status: merr.Status(err), }, nil } @@ -679,9 +666,9 @@ func (node *Proxy) DescribeCollection(ctx context.Context, request *milvuspb.Des // GetStatistics get the statistics, such as `num_rows`. // WARNING: It is an experimental API func (node *Proxy) GetStatistics(ctx context.Context, request *milvuspb.GetStatisticsRequest) (*milvuspb.GetStatisticsResponse, error) { - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { return &milvuspb.GetStatisticsResponse{ - Status: unhealthyStatus(), + Status: merr.Status(err), }, nil } @@ -759,9 +746,9 @@ func (node *Proxy) GetStatistics(ctx context.Context, request *milvuspb.GetStati // GetCollectionStatistics get the collection statistics, such as `num_rows`. func (node *Proxy) GetCollectionStatistics(ctx context.Context, request *milvuspb.GetCollectionStatisticsRequest) (*milvuspb.GetCollectionStatisticsResponse, error) { - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { return &milvuspb.GetCollectionStatisticsResponse{ - Status: unhealthyStatus(), + Status: merr.Status(err), }, nil } @@ -831,9 +818,9 @@ func (node *Proxy) GetCollectionStatistics(ctx context.Context, request *milvusp // ShowCollections list all collections in Milvus. func (node *Proxy) ShowCollections(ctx context.Context, request *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) { - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { return &milvuspb.ShowCollectionsResponse{ - Status: unhealthyStatus(), + Status: merr.Status(err), }, nil } ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-ShowCollections") @@ -897,8 +884,8 @@ func (node *Proxy) ShowCollections(ctx context.Context, request *milvuspb.ShowCo } func (node *Proxy) AlterCollection(ctx context.Context, request *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) { - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { - return unhealthyStatus(), nil + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return merr.Status(err), nil } ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-AlterCollection") @@ -961,8 +948,8 @@ func (node *Proxy) AlterCollection(ctx context.Context, request *milvuspb.AlterC // CreatePartition create a partition in specific collection. func (node *Proxy) CreatePartition(ctx context.Context, request *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) { - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { - return unhealthyStatus(), nil + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return merr.Status(err), nil } ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-CreatePartition") @@ -1026,8 +1013,8 @@ func (node *Proxy) CreatePartition(ctx context.Context, request *milvuspb.Create // DropPartition drop a partition in specific collection. func (node *Proxy) DropPartition(ctx context.Context, request *milvuspb.DropPartitionRequest) (*commonpb.Status, error) { - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { - return unhealthyStatus(), nil + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return merr.Status(err), nil } ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-DropPartition") @@ -1092,9 +1079,9 @@ func (node *Proxy) DropPartition(ctx context.Context, request *milvuspb.DropPart // HasPartition check if partition exist. func (node *Proxy) HasPartition(ctx context.Context, request *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) { - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { return &milvuspb.BoolResponse{ - Status: unhealthyStatus(), + Status: merr.Status(err), }, nil } @@ -1170,8 +1157,8 @@ func (node *Proxy) HasPartition(ctx context.Context, request *milvuspb.HasPartit // LoadPartitions load specific partitions into query nodes. func (node *Proxy) LoadPartitions(ctx context.Context, request *milvuspb.LoadPartitionsRequest) (*commonpb.Status, error) { - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { - return unhealthyStatus(), nil + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return merr.Status(err), nil } ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-LoadPartitions") @@ -1239,8 +1226,8 @@ func (node *Proxy) LoadPartitions(ctx context.Context, request *milvuspb.LoadPar // ReleasePartitions release specific partitions from query nodes. func (node *Proxy) ReleasePartitions(ctx context.Context, request *milvuspb.ReleasePartitionsRequest) (*commonpb.Status, error) { - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { - return unhealthyStatus(), nil + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return merr.Status(err), nil } ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-ReleasePartitions") @@ -1308,9 +1295,9 @@ func (node *Proxy) ReleasePartitions(ctx context.Context, request *milvuspb.Rele // GetPartitionStatistics get the statistics of partition, such as num_rows. func (node *Proxy) GetPartitionStatistics(ctx context.Context, request *milvuspb.GetPartitionStatisticsRequest) (*milvuspb.GetPartitionStatisticsResponse, error) { - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { return &milvuspb.GetPartitionStatisticsResponse{ - Status: unhealthyStatus(), + Status: merr.Status(err), }, nil } @@ -1382,9 +1369,9 @@ func (node *Proxy) GetPartitionStatistics(ctx context.Context, request *milvuspb // ShowPartitions list all partitions in the specific collection. func (node *Proxy) ShowPartitions(ctx context.Context, request *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) { - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { return &milvuspb.ShowPartitionsResponse{ - Status: unhealthyStatus(), + Status: merr.Status(err), }, nil } @@ -1467,8 +1454,8 @@ func (node *Proxy) ShowPartitions(ctx context.Context, request *milvuspb.ShowPar } func (node *Proxy) GetLoadingProgress(ctx context.Context, request *milvuspb.GetLoadingProgressRequest) (*milvuspb.GetLoadingProgressResponse, error) { - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { - return &milvuspb.GetLoadingProgressResponse{Status: unhealthyStatus()}, nil + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return &milvuspb.GetLoadingProgressResponse{Status: merr.Status(err)}, nil } method := "GetLoadingProgress" tr := timerecord.NewTimeRecorder(method) @@ -1545,8 +1532,8 @@ func (node *Proxy) GetLoadingProgress(ctx context.Context, request *milvuspb.Get } func (node *Proxy) GetLoadState(ctx context.Context, request *milvuspb.GetLoadStateRequest) (*milvuspb.GetLoadStateResponse, error) { - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { - return &milvuspb.GetLoadStateResponse{Status: unhealthyStatus()}, nil + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return &milvuspb.GetLoadStateResponse{Status: merr.Status(err)}, nil } method := "GetLoadState" tr := timerecord.NewTimeRecorder(method) @@ -1643,8 +1630,8 @@ func (node *Proxy) GetLoadState(ctx context.Context, request *milvuspb.GetLoadSt // CreateIndex create index for collection. func (node *Proxy) CreateIndex(ctx context.Context, request *milvuspb.CreateIndexRequest) (*commonpb.Status, error) { - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { - return unhealthyStatus(), nil + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return merr.Status(err), nil } ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-CreateIndex") @@ -1714,9 +1701,9 @@ func (node *Proxy) CreateIndex(ctx context.Context, request *milvuspb.CreateInde // DescribeIndex get the meta information of index, such as index state, index id and etc. func (node *Proxy) DescribeIndex(ctx context.Context, request *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) { - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { return &milvuspb.DescribeIndexResponse{ - Status: unhealthyStatus(), + Status: merr.Status(err), }, nil } @@ -1798,7 +1785,7 @@ func (node *Proxy) DescribeIndex(ctx context.Context, request *milvuspb.Describe // GetIndexStatistics get the information of index. func (node *Proxy) GetIndexStatistics(ctx context.Context, request *milvuspb.GetIndexStatisticsRequest) (*milvuspb.GetIndexStatisticsResponse, error) { - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { err := merr.WrapErrServiceNotReady(fmt.Sprintf("proxy %d is unhealthy", paramtable.GetNodeID())) return &milvuspb.GetIndexStatisticsResponse{ Status: merr.Status(err), @@ -1876,8 +1863,8 @@ func (node *Proxy) GetIndexStatistics(ctx context.Context, request *milvuspb.Get // DropIndex drop the index of collection. func (node *Proxy) DropIndex(ctx context.Context, request *milvuspb.DropIndexRequest) (*commonpb.Status, error) { - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { - return unhealthyStatus(), nil + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return merr.Status(err), nil } ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-DropIndex") @@ -1948,9 +1935,9 @@ func (node *Proxy) DropIndex(ctx context.Context, request *milvuspb.DropIndexReq // IndexRows is the num of indexed rows. And TotalRows is the total number of segment rows. // Deprecated: use DescribeIndex instead func (node *Proxy) GetIndexBuildProgress(ctx context.Context, request *milvuspb.GetIndexBuildProgressRequest) (*milvuspb.GetIndexBuildProgressResponse, error) { - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { return &milvuspb.GetIndexBuildProgressResponse{ - Status: unhealthyStatus(), + Status: merr.Status(err), }, nil } @@ -2024,9 +2011,9 @@ func (node *Proxy) GetIndexBuildProgress(ctx context.Context, request *milvuspb. // GetIndexState get the build-state of index. // Deprecated: use DescribeIndex instead func (node *Proxy) GetIndexState(ctx context.Context, request *milvuspb.GetIndexStateRequest) (*milvuspb.GetIndexStateResponse, error) { - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { return &milvuspb.GetIndexStateResponse{ - Status: unhealthyStatus(), + Status: merr.Status(err), }, nil } @@ -2103,9 +2090,9 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Insert") defer sp.End() - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { return &milvuspb.MutationResult{ - Status: unhealthyStatus(), + Status: merr.Status(err), }, nil } log := log.Ctx(ctx).With( @@ -2229,9 +2216,9 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.DeleteLabel, request.GetCollectionName()).Add(float64(proto.Size(request))) - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { return &milvuspb.MutationResult{ - Status: unhealthyStatus(), + Status: merr.Status(err), }, nil } @@ -2298,9 +2285,9 @@ func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest) ) log.Debug("Start processing upsert request in Proxy") - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { return &milvuspb.MutationResult{ - Status: unhealthyStatus(), + Status: merr.Status(err), }, nil } method := "Upsert" @@ -2416,9 +2403,9 @@ func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest) rateCol.Add(internalpb.RateType_DQLSearch.String(), float64(request.GetNq())) - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { return &milvuspb.SearchResults{ - Status: unhealthyStatus(), + Status: merr.Status(err), }, nil } method := "Search" @@ -2524,12 +2511,9 @@ func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest) // Flush notify data nodes to persist the data of collection. func (node *Proxy) Flush(ctx context.Context, request *milvuspb.FlushRequest) (*milvuspb.FlushResponse, error) { resp := &milvuspb.FlushResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "", - }, + Status: merr.Status(nil), } - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { resp.Status = merr.Status(err) return resp, nil } @@ -2607,9 +2591,9 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (* rateCol.Add(internalpb.RateType_DQLQuery.String(), 1) - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { return &milvuspb.QueryResults{ - Status: unhealthyStatus(), + Status: merr.Status(err), }, nil } @@ -2712,8 +2696,8 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (* // CreateAlias create alias for collection, then you can search the collection with alias. func (node *Proxy) CreateAlias(ctx context.Context, request *milvuspb.CreateAliasRequest) (*commonpb.Status, error) { - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { - return unhealthyStatus(), nil + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return merr.Status(err), nil } ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-CreateAlias") @@ -2776,26 +2760,20 @@ func (node *Proxy) CreateAlias(ctx context.Context, request *milvuspb.CreateAlia func (node *Proxy) DescribeAlias(ctx context.Context, request *milvuspb.DescribeAliasRequest) (*milvuspb.DescribeAliasResponse, error) { return &milvuspb.DescribeAliasResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "TODO: implement me", - }, + Status: merr.Status(merr.WrapErrServiceUnavailable("DescribeAlias unimplemented")), }, nil } func (node *Proxy) ListAliases(ctx context.Context, request *milvuspb.ListAliasesRequest) (*milvuspb.ListAliasesResponse, error) { return &milvuspb.ListAliasesResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "TODO: implement me", - }, + Status: merr.Status(merr.WrapErrServiceUnavailable("ListAliases unimplemented")), }, nil } // DropAlias alter the alias of collection. func (node *Proxy) DropAlias(ctx context.Context, request *milvuspb.DropAliasRequest) (*commonpb.Status, error) { - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { - return unhealthyStatus(), nil + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return merr.Status(err), nil } ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-DropAlias") @@ -2857,8 +2835,8 @@ func (node *Proxy) DropAlias(ctx context.Context, request *milvuspb.DropAliasReq // AlterAlias alter alias of collection. func (node *Proxy) AlterAlias(ctx context.Context, request *milvuspb.AlterAliasRequest) (*commonpb.Status, error) { - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { - return unhealthyStatus(), nil + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return merr.Status(err), nil } ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-AlterAlias") @@ -2922,10 +2900,7 @@ func (node *Proxy) AlterAlias(ctx context.Context, request *milvuspb.AlterAliasR // CalcDistance calculates the distances between vectors. func (node *Proxy) CalcDistance(ctx context.Context, request *milvuspb.CalcDistanceRequest) (*milvuspb.CalcDistanceResults, error) { return &milvuspb.CalcDistanceResults{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "interface obsolete", - }, + Status: merr.Status(merr.WrapErrServiceUnavailable("CalcDistance deprecated")), }, nil } @@ -2938,7 +2913,7 @@ func (node *Proxy) FlushAll(ctx context.Context, req *milvuspb.FlushAllRequest) resp := &milvuspb.FlushAllResponse{ Status: merr.Status(nil), } - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { resp.Status = merr.Status(err) return resp, nil } @@ -3027,10 +3002,7 @@ func (node *Proxy) FlushAll(ctx context.Context, req *milvuspb.FlushAllRequest) // GetDdChannel returns the used channel for dd operations. func (node *Proxy) GetDdChannel(ctx context.Context, request *internalpb.GetDdChannelRequest) (*milvuspb.StringResponse, error) { return &milvuspb.StringResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "TODO: implement me", - }, + Status: merr.Status(merr.WrapErrServiceUnavailable("unimp")), }, nil } @@ -3049,7 +3021,7 @@ func (node *Proxy) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.G resp := &milvuspb.GetPersistentSegmentInfoResponse{ Status: merr.Status(nil), } - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { resp.Status = merr.Status(err) return resp, nil } @@ -3137,7 +3109,7 @@ func (node *Proxy) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.GetQue resp := &milvuspb.GetQuerySegmentInfoResponse{ Status: merr.Status(nil), } - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { resp.Status = merr.Status(err) return resp, nil } @@ -3248,33 +3220,25 @@ func (node *Proxy) Dummy(ctx context.Context, req *milvuspb.DummyRequest) (*milv // RegisterLink registers a link func (node *Proxy) RegisterLink(ctx context.Context, req *milvuspb.RegisterLinkRequest) (*milvuspb.RegisterLinkResponse, error) { - code := node.stateCode.Load().(commonpb.StateCode) + code := node.GetStateCode() ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-RegisterLink") defer sp.End() log := log.Ctx(ctx).With( zap.String("role", typeutil.ProxyRole), - zap.Any("state code of proxy", code)) + zap.String("state", code.String())) log.Debug("RegisterLink") - if code != commonpb.StateCode_Healthy { + if err := merr.CheckHealthy(code); err != nil { return &milvuspb.RegisterLinkResponse{ - Address: nil, - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "proxy not healthy", - }, + Status: merr.Status(err), }, nil } // metrics.ProxyLinkedSDKs.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Inc() return &milvuspb.RegisterLinkResponse{ - Address: nil, - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - Reason: os.Getenv(metricsinfo.DeployModeEnvKey), - }, + Status: merr.Success(os.Getenv(metricsinfo.DeployModeEnvKey)), }, nil } @@ -3290,7 +3254,7 @@ func (node *Proxy) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsReque zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("req", req.Request)) - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { err := merr.WrapErrServiceNotReady(fmt.Sprintf("proxy %d is unhealthy", paramtable.GetNodeID())) log.Warn("Proxy.GetMetrics failed", zap.Int64("nodeID", paramtable.GetNodeID()), @@ -3345,11 +3309,7 @@ func (node *Proxy) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsReque zap.String("metricType", metricType)) return &milvuspb.GetMetricsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: metricsinfo.MsgUnimplementedMetric, - }, - Response: "", + Status: merr.Status(merr.WrapErrMetricNotFound(metricType)), }, nil } @@ -3363,7 +3323,7 @@ func (node *Proxy) GetProxyMetrics(ctx context.Context, req *milvuspb.GetMetrics zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("req", req.Request)) - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { err := merr.WrapErrServiceNotReady(fmt.Sprintf("proxy %d is unhealthy", paramtable.GetNodeID())) log.Warn("Proxy.GetProxyMetrics failed", zap.Error(err)) @@ -3410,10 +3370,7 @@ func (node *Proxy) GetProxyMetrics(ctx context.Context, req *milvuspb.GetMetrics zap.String("metricType", metricType)) return &milvuspb.GetMetricsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: metricsinfo.MsgUnimplementedMetric, - }, + Status: merr.Status(merr.WrapErrMetricNotFound(metricType)), }, nil } @@ -3428,8 +3385,8 @@ func (node *Proxy) LoadBalance(ctx context.Context, req *milvuspb.LoadBalanceReq zap.Int64("proxy_id", paramtable.GetNodeID()), zap.Any("req", req)) - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { - return unhealthyStatus(), nil + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return merr.Status(err), nil } status := merr.Status(nil) @@ -3484,7 +3441,7 @@ func (node *Proxy) GetReplicas(ctx context.Context, req *milvuspb.GetReplicasReq zap.Int64("collection", req.GetCollectionID()), zap.Bool("with shard nodes", req.GetWithShardNodes())) resp := &milvuspb.GetReplicasResponse{} - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { resp.Status = merr.Status(err) return resp, nil } @@ -3524,7 +3481,7 @@ func (node *Proxy) GetCompactionState(ctx context.Context, req *milvuspb.GetComp log.Debug("received GetCompactionState request") resp := &milvuspb.GetCompactionStateResponse{} - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { resp.Status = merr.Status(err) return resp, nil } @@ -3546,7 +3503,7 @@ func (node *Proxy) ManualCompaction(ctx context.Context, req *milvuspb.ManualCom log.Info("received ManualCompaction request") resp := &milvuspb.ManualCompactionResponse{} - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { resp.Status = merr.Status(err) return resp, nil } @@ -3568,7 +3525,7 @@ func (node *Proxy) GetCompactionStateWithPlans(ctx context.Context, req *milvusp log.Debug("received GetCompactionStateWithPlans request") resp := &milvuspb.GetCompactionPlansResponse{} - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { resp.Status = merr.Status(err) return resp, nil } @@ -3591,7 +3548,7 @@ func (node *Proxy) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStat zap.Any("request", req)) var err error failResp := &milvuspb.GetFlushStateResponse{} - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { failResp.Status = merr.Status(err) log.Warn("unable to get flush state because of closed server") return failResp, nil @@ -3638,7 +3595,7 @@ func (node *Proxy) GetFlushAllState(ctx context.Context, req *milvuspb.GetFlushA var err error resp := &milvuspb.GetFlushAllStateResponse{} - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { resp.Status = merr.Status(err) log.Warn("GetFlushAllState failed, closed server") return resp, nil @@ -3656,23 +3613,10 @@ func (node *Proxy) GetFlushAllState(ctx context.Context, req *milvuspb.GetFlushA // checkHealthy checks proxy state is Healthy func (node *Proxy) checkHealthy() bool { - code := node.stateCode.Load().(commonpb.StateCode) + code := node.GetStateCode() return code == commonpb.StateCode_Healthy } -func (node *Proxy) checkHealthyAndReturnCode() (commonpb.StateCode, bool) { - code := node.stateCode.Load().(commonpb.StateCode) - return code, code == commonpb.StateCode_Healthy -} - -// unhealthyStatus returns the proxy not healthy status -func unhealthyStatus() *commonpb.Status { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "proxy not healthy", - } -} - // Import data files(json, numpy, etc.) on MinIO/S3 storage, read and parse them into sealed segments func (node *Proxy) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvuspb.ImportResponse, error) { ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Import") @@ -3687,7 +3631,7 @@ func (node *Proxy) Import(ctx context.Context, req *milvuspb.ImportRequest) (*mi resp := &milvuspb.ImportResponse{ Status: merr.Status(nil), } - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { resp.Status = merr.Status(err) return resp, nil } @@ -3732,7 +3676,7 @@ func (node *Proxy) GetImportState(ctx context.Context, req *milvuspb.GetImportSt resp := &milvuspb.GetImportStateResponse{ Status: merr.Status(nil), } - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { resp.Status = merr.Status(err) return resp, nil } @@ -3769,7 +3713,7 @@ func (node *Proxy) ListImportTasks(ctx context.Context, req *milvuspb.ListImport resp := &milvuspb.ListImportTasksResponse{ Status: merr.Status(nil), } - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { resp.Status = merr.Status(err) return resp, nil } @@ -3804,8 +3748,8 @@ func (node *Proxy) InvalidateCredentialCache(ctx context.Context, request *proxy zap.String("username", request.Username)) log.Debug("received request to invalidate credential cache") - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { - return unhealthyStatus(), nil + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return merr.Status(err), nil } username := request.Username @@ -3827,8 +3771,8 @@ func (node *Proxy) UpdateCredentialCache(ctx context.Context, request *proxypb.U zap.String("username", request.Username)) log.Debug("received request to update credential cache") - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { - return unhealthyStatus(), nil + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return merr.Status(err), nil } credInfo := &internalpb.CredentialInfo{ @@ -3852,42 +3796,32 @@ func (node *Proxy) CreateCredential(ctx context.Context, req *milvuspb.CreateCre log.Debug("CreateCredential", zap.String("role", typeutil.ProxyRole)) - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { - return unhealthyStatus(), nil + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return merr.Status(err), nil } // validate params username := req.Username if err := ValidateUsername(username); err != nil { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_IllegalArgument, - Reason: err.Error(), - }, nil + return merr.Status(err), nil } rawPassword, err := crypto.Base64Decode(req.Password) if err != nil { log.Error("decode password fail", zap.Error(err)) - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_CreateCredentialFailure, - Reason: "decode password fail key:" + req.Username, - }, nil + err = errors.Wrap(err, "decode password fail") + return merr.Status(err), nil } if err = ValidatePassword(rawPassword); err != nil { log.Error("illegal password", zap.Error(err)) - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_IllegalArgument, - Reason: err.Error(), - }, nil + return merr.Status(err), nil } encryptedPassword, err := crypto.PasswordEncrypt(rawPassword) if err != nil { log.Error("encrypt password fail", zap.Error(err)) - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_CreateCredentialFailure, - Reason: "encrypt password fail key:" + req.Username, - }, nil + err = errors.Wrap(err, "encrypt password failed") + return merr.Status(err), nil } credInfo := &internalpb.CredentialInfo{ @@ -3913,35 +3847,28 @@ func (node *Proxy) UpdateCredential(ctx context.Context, req *milvuspb.UpdateCre log.Debug("UpdateCredential", zap.String("role", typeutil.ProxyRole)) - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { - return unhealthyStatus(), nil + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return merr.Status(err), nil } rawOldPassword, err := crypto.Base64Decode(req.OldPassword) if err != nil { log.Error("decode old password fail", zap.Error(err)) - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UpdateCredentialFailure, - Reason: "decode old password fail when updating:" + req.Username, - }, nil + err = errors.Wrap(err, "decode old password failed") + return merr.Status(err), nil } rawNewPassword, err := crypto.Base64Decode(req.NewPassword) if err != nil { log.Error("decode password fail", zap.Error(err)) - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UpdateCredentialFailure, - Reason: "decode password fail when updating:" + req.Username, - }, nil + err = errors.Wrap(err, "decode password failed") + return merr.Status(err), nil } // valid new password if err = ValidatePassword(rawNewPassword); err != nil { log.Error("illegal password", zap.Error(err)) - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_IllegalArgument, - Reason: err.Error(), - }, nil + return merr.Status(err), nil } skipPasswordVerify := false @@ -3954,20 +3881,16 @@ func (node *Proxy) UpdateCredential(ctx context.Context, req *milvuspb.UpdateCre } if !skipPasswordVerify && !passwordVerify(ctx, req.Username, rawOldPassword, globalMetaCache) { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UpdateCredentialFailure, - Reason: "old password is not correct:" + req.Username, - }, nil + err := merr.WrapErrPrivilegeNotAuthenticated("old password not correct for %s", req.GetUsername()) + return merr.Status(err), nil } // update meta data encryptedPassword, err := crypto.PasswordEncrypt(rawNewPassword) if err != nil { log.Error("encrypt password fail", zap.Error(err)) - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UpdateCredentialFailure, - Reason: "encrypt password fail when updating:" + req.Username, - }, nil + err = errors.Wrap(err, "encrypt password failed") + return merr.Status(err), nil } updateCredReq := &internalpb.CredentialInfo{ Username: req.Username, @@ -3992,15 +3915,13 @@ func (node *Proxy) DeleteCredential(ctx context.Context, req *milvuspb.DeleteCre log.Debug("DeleteCredential", zap.String("role", typeutil.ProxyRole)) - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { - return unhealthyStatus(), nil + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return merr.Status(err), nil } if req.Username == util.UserRoot { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_DeleteCredentialFailure, - Reason: "user root cannot be deleted", - }, nil + err := merr.WrapErrPrivilegeNotPermitted("root user cannot be deleted") + return merr.Status(err), nil } result, err := node.rootCoord.DeleteCredential(ctx, req) if err != nil { // for error like conntext timeout etc. @@ -4019,8 +3940,8 @@ func (node *Proxy) ListCredUsers(ctx context.Context, req *milvuspb.ListCredUser zap.String("role", typeutil.ProxyRole)) log.Debug("ListCredUsers") - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { - return &milvuspb.ListCredUsersResponse{Status: unhealthyStatus()}, nil + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return &milvuspb.ListCredUsersResponse{Status: merr.Status(err)}, nil } rootCoordReq := &milvuspb.ListCredUsersRequest{ Base: commonpbutil.NewMsgBase( @@ -4046,8 +3967,8 @@ func (node *Proxy) CreateRole(ctx context.Context, req *milvuspb.CreateRoleReque log := log.Ctx(ctx) log.Debug("CreateRole", zap.Any("req", req)) - if code, ok := node.checkHealthyAndReturnCode(); !ok { - return errorutil.UnhealthyStatus(code), nil + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return merr.Status(err), nil } var roleName string @@ -4055,10 +3976,7 @@ func (node *Proxy) CreateRole(ctx context.Context, req *milvuspb.CreateRoleReque roleName = req.Entity.Name } if err := ValidateRoleName(roleName); err != nil { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_IllegalArgument, - Reason: err.Error(), - }, nil + return merr.Status(err), nil } result, err := node.rootCoord.CreateRole(ctx, req) @@ -4077,21 +3995,15 @@ func (node *Proxy) DropRole(ctx context.Context, req *milvuspb.DropRoleRequest) log.Debug("DropRole", zap.Any("req", req)) - if code, ok := node.checkHealthyAndReturnCode(); !ok { - return errorutil.UnhealthyStatus(code), nil + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return merr.Status(err), nil } if err := ValidateRoleName(req.RoleName); err != nil { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_IllegalArgument, - Reason: err.Error(), - }, nil + return merr.Status(err), nil } if IsDefaultRole(req.RoleName) { - errMsg := fmt.Sprintf("the role[%s] is a default role, which can't be droped", req.RoleName) - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_IllegalArgument, - Reason: errMsg, - }, nil + err := merr.WrapErrPrivilegeNotPermitted("the role[%s] is a default role, which can't be droped", req.GetRoleName()) + return merr.Status(err), nil } result, err := node.rootCoord.DropRole(ctx, req) if err != nil { @@ -4110,20 +4022,14 @@ func (node *Proxy) OperateUserRole(ctx context.Context, req *milvuspb.OperateUse log := log.Ctx(ctx) log.Debug("OperateUserRole", zap.Any("req", req)) - if code, ok := node.checkHealthyAndReturnCode(); !ok { - return errorutil.UnhealthyStatus(code), nil + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return merr.Status(err), nil } if err := ValidateUsername(req.Username); err != nil { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_IllegalArgument, - Reason: err.Error(), - }, nil + return merr.Status(err), nil } if err := ValidateRoleName(req.RoleName); err != nil { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_IllegalArgument, - Reason: err.Error(), - }, nil + return merr.Status(err), nil } result, err := node.rootCoord.OperateUserRole(ctx, req) @@ -4141,8 +4047,8 @@ func (node *Proxy) SelectRole(ctx context.Context, req *milvuspb.SelectRoleReque log := log.Ctx(ctx) log.Debug("SelectRole", zap.Any("req", req)) - if code, ok := node.checkHealthyAndReturnCode(); !ok { - return &milvuspb.SelectRoleResponse{Status: errorutil.UnhealthyStatus(code)}, nil + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return &milvuspb.SelectRoleResponse{Status: merr.Status(err)}, nil } if req.Role != nil { @@ -4170,8 +4076,8 @@ func (node *Proxy) SelectUser(ctx context.Context, req *milvuspb.SelectUserReque log := log.Ctx(ctx) log.Debug("SelectUser", zap.Any("req", req)) - if code, ok := node.checkHealthyAndReturnCode(); !ok { - return &milvuspb.SelectUserResponse{Status: errorutil.UnhealthyStatus(code)}, nil + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return &milvuspb.SelectUserResponse{Status: merr.Status(err)}, nil } if req.User != nil { @@ -4233,22 +4139,16 @@ func (node *Proxy) OperatePrivilege(ctx context.Context, req *milvuspb.OperatePr log.Debug("OperatePrivilege", zap.Any("req", req)) - if code, ok := node.checkHealthyAndReturnCode(); !ok { - return errorutil.UnhealthyStatus(code), nil + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return merr.Status(err), nil } if err := node.validPrivilegeParams(req); err != nil { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_IllegalArgument, - Reason: err.Error(), - }, nil + return merr.Status(err), nil } curUser, err := GetCurUserFromContext(ctx) if err != nil { log.Warn("fail to get current user", zap.Error(err)) - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "fail to get current user, please make sure the authorizationEnabled setting in the milvus.yaml is true", - }, nil + return merr.Status(err), nil } req.Entity.Grantor.User = &milvuspb.UserEntity{Name: curUser} result, err := node.rootCoord.OperatePrivilege(ctx, req) @@ -4261,7 +4161,7 @@ func (node *Proxy) OperatePrivilege(ctx context.Context, req *milvuspb.OperatePr func (node *Proxy) validGrantParams(req *milvuspb.SelectGrantRequest) error { if req.Entity == nil { - return fmt.Errorf("the grant entity in the request is nil") + return merr.WrapErrParameterInvalidMsg("the grant entity in the request is nil") } if req.Entity.Object != nil { @@ -4275,7 +4175,7 @@ func (node *Proxy) validGrantParams(req *milvuspb.SelectGrantRequest) error { } if req.Entity.Role == nil { - return fmt.Errorf("the role entity in the grant entity is nil") + return merr.WrapErrParameterInvalidMsg("the role entity in the grant entity is nil") } if err := ValidateRoleName(req.Entity.Role.Name); err != nil { @@ -4293,16 +4193,13 @@ func (node *Proxy) SelectGrant(ctx context.Context, req *milvuspb.SelectGrantReq log.Debug("SelectGrant", zap.Any("req", req)) - if code, ok := node.checkHealthyAndReturnCode(); !ok { - return &milvuspb.SelectGrantResponse{Status: errorutil.UnhealthyStatus(code)}, nil + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return &milvuspb.SelectGrantResponse{Status: merr.Status(err)}, nil } if err := node.validGrantParams(req); err != nil { return &milvuspb.SelectGrantResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_IllegalArgument, - Reason: err.Error(), - }, + Status: merr.Status(err), }, nil } @@ -4324,8 +4221,8 @@ func (node *Proxy) RefreshPolicyInfoCache(ctx context.Context, req *proxypb.Refr log.Debug("RefreshPrivilegeInfoCache", zap.Any("req", req)) - if code, ok := node.checkHealthyAndReturnCode(); !ok { - return merr.Status(merr.WrapErrServiceNotReady(code.String())), nil + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return merr.Status(err), nil } if globalMetaCache != nil { @@ -4334,12 +4231,9 @@ func (node *Proxy) RefreshPolicyInfoCache(ctx context.Context, req *proxypb.Refr OpKey: req.OpKey, }) if err != nil { - log.Error("fail to refresh policy info", + log.Warn("fail to refresh policy info", zap.Error(err)) - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_RefreshPolicyInfoCacheFailure, - Reason: err.Error(), - }, err + return merr.Status(err), nil } } log.Debug("RefreshPrivilegeInfoCache success") @@ -4350,8 +4244,8 @@ func (node *Proxy) RefreshPolicyInfoCache(ctx context.Context, req *proxypb.Refr // SetRates limits the rates of requests. func (node *Proxy) SetRates(ctx context.Context, request *proxypb.SetRatesRequest) (*commonpb.Status, error) { resp := merr.Status(nil) - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { - resp = unhealthyStatus() + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + resp = merr.Status(err) return resp, nil } @@ -4366,10 +4260,10 @@ func (node *Proxy) SetRates(ctx context.Context, request *proxypb.SetRatesReques } func (node *Proxy) CheckHealth(ctx context.Context, request *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { reason := errorutil.UnHealthReason("proxy", node.session.ServerID, "proxy is unhealthy") return &milvuspb.CheckHealthResponse{ - Status: unhealthyStatus(), + Status: merr.Status(err), IsHealthy: false, Reasons: []string{reason}, }, nil @@ -4447,16 +4341,13 @@ func (node *Proxy) RenameCollection(ctx context.Context, req *milvuspb.RenameCol log.Info("received rename collection request") var err error - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { - return unhealthyStatus(), nil + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return merr.Status(err), nil } if err := validateCollectionName(req.GetNewName()); err != nil { log.Warn("validate new collection name fail", zap.Error(err)) - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_IllegalCollectionName, - Reason: err.Error(), - }, nil + return merr.Status(err), nil } req.Base = commonpbutil.NewMsgBase( @@ -4474,8 +4365,8 @@ func (node *Proxy) RenameCollection(ctx context.Context, req *milvuspb.RenameCol } func (node *Proxy) CreateResourceGroup(ctx context.Context, request *milvuspb.CreateResourceGroupRequest) (*commonpb.Status, error) { - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { - return unhealthyStatus(), nil + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return merr.Status(err), nil } method := "CreateResourceGroup" @@ -4535,15 +4426,12 @@ func (node *Proxy) CreateResourceGroup(ctx context.Context, request *milvuspb.Cr func getErrResponse(err error, method string) *commonpb.Status { metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_IllegalArgument, - Reason: err.Error(), - } + return merr.Status(err) } func (node *Proxy) DropResourceGroup(ctx context.Context, request *milvuspb.DropResourceGroupRequest) (*commonpb.Status, error) { - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { - return unhealthyStatus(), nil + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return merr.Status(err), nil } method := "DropResourceGroup" @@ -4595,8 +4483,8 @@ func (node *Proxy) DropResourceGroup(ctx context.Context, request *milvuspb.Drop } func (node *Proxy) TransferNode(ctx context.Context, request *milvuspb.TransferNodeRequest) (*commonpb.Status, error) { - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { - return unhealthyStatus(), nil + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return merr.Status(err), nil } method := "TransferNode" @@ -4662,8 +4550,8 @@ func (node *Proxy) TransferNode(ctx context.Context, request *milvuspb.TransferN } func (node *Proxy) TransferReplica(ctx context.Context, request *milvuspb.TransferReplicaRequest) (*commonpb.Status, error) { - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { - return unhealthyStatus(), nil + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return merr.Status(err), nil } method := "TransferReplica" @@ -4729,9 +4617,9 @@ func (node *Proxy) TransferReplica(ctx context.Context, request *milvuspb.Transf } func (node *Proxy) ListResourceGroups(ctx context.Context, request *milvuspb.ListResourceGroupsRequest) (*milvuspb.ListResourceGroupsResponse, error) { - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { return &milvuspb.ListResourceGroupsResponse{ - Status: unhealthyStatus(), + Status: merr.Status(err), }, nil } @@ -4792,9 +4680,9 @@ func (node *Proxy) ListResourceGroups(ctx context.Context, request *milvuspb.Lis } func (node *Proxy) DescribeResourceGroup(ctx context.Context, request *milvuspb.DescribeResourceGroupRequest) (*milvuspb.DescribeResourceGroupResponse, error) { - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { return &milvuspb.DescribeResourceGroupResponse{ - Status: unhealthyStatus(), + Status: merr.Status(err), }, nil } @@ -4856,25 +4744,19 @@ func (node *Proxy) DescribeResourceGroup(ctx context.Context, request *milvuspb. func (node *Proxy) ListIndexedSegment(ctx context.Context, request *federpb.ListIndexedSegmentRequest) (*federpb.ListIndexedSegmentResponse, error) { return &federpb.ListIndexedSegmentResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "TODO: implement me", - }, + Status: merr.Status(merr.WrapErrServiceUnavailable("unimp")), }, nil } func (node *Proxy) DescribeSegmentIndexData(ctx context.Context, request *federpb.DescribeSegmentIndexDataRequest) (*federpb.DescribeSegmentIndexDataResponse, error) { return &federpb.DescribeSegmentIndexDataResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "TODO: implement me", - }, + Status: merr.Status(merr.WrapErrServiceUnavailable("unimp")), }, nil } func (node *Proxy) Connect(ctx context.Context, request *milvuspb.ConnectRequest) (*milvuspb.ConnectResponse, error) { - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { - return &milvuspb.ConnectResponse{Status: unhealthyStatus()}, nil + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return &milvuspb.ConnectResponse{Status: merr.Status(err)}, nil } db := GetCurDBNameFromContextOrDefault(ctx) @@ -4902,10 +4784,7 @@ func (node *Proxy) Connect(ctx context.Context, request *milvuspb.ConnectRequest if !funcutil.SliceContain(resp.GetDbNames(), db) { log.Info("connect failed, target database not exist") return &milvuspb.ConnectResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, // DatabaseNotExist? - Reason: fmt.Sprintf("database not found: %s", db), - }, + Status: merr.Status(merr.WrapErrDatabaseNotFound(db)), }, nil } @@ -4936,8 +4815,8 @@ func (node *Proxy) Connect(ctx context.Context, request *milvuspb.ConnectRequest } func (node *Proxy) ListClientInfos(ctx context.Context, req *proxypb.ListClientInfosRequest) (*proxypb.ListClientInfosResponse, error) { - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { - return &proxypb.ListClientInfosResponse{Status: unhealthyStatus()}, nil + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return &proxypb.ListClientInfosResponse{Status: merr.Status(err)}, nil } clients := GetConnectionManager().list() @@ -4949,8 +4828,8 @@ func (node *Proxy) ListClientInfos(ctx context.Context, req *proxypb.ListClientI } func (node *Proxy) AllocTimestamp(ctx context.Context, req *milvuspb.AllocTimestampRequest) (*milvuspb.AllocTimestampResponse, error) { - if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { - return &milvuspb.AllocTimestampResponse{Status: unhealthyStatus()}, nil + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return &milvuspb.AllocTimestampResponse{Status: merr.Status(err)}, nil } log.Info("AllocTimestamp request receive") diff --git a/internal/proxy/impl_test.go b/internal/proxy/impl_test.go index d810b58525..c83e3cb77b 100644 --- a/internal/proxy/impl_test.go +++ b/internal/proxy/impl_test.go @@ -49,7 +49,7 @@ func TestProxy_InvalidateCollectionMetaCache_remove_stream(t *testing.T) { chMgr.EXPECT().removeDMLStream(mock.Anything).Return() node := &Proxy{chMgr: chMgr} - node.stateCode.Store(commonpb.StateCode_Healthy) + node.UpdateStateCode(commonpb.StateCode_Healthy) ctx := context.Background() req := &proxypb.InvalidateCollMetaCacheRequest{ @@ -65,7 +65,7 @@ func TestProxy_CheckHealth(t *testing.T) { t.Run("not healthy", func(t *testing.T) { node := &Proxy{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}} node.multiRateLimiter = NewMultiRateLimiter() - node.stateCode.Store(commonpb.StateCode_Abnormal) + node.UpdateStateCode(commonpb.StateCode_Abnormal) ctx := context.Background() resp, err := node.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) assert.NoError(t, err) @@ -83,7 +83,7 @@ func TestProxy_CheckHealth(t *testing.T) { session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}, } node.multiRateLimiter = NewMultiRateLimiter() - node.stateCode.Store(commonpb.StateCode_Healthy) + node.UpdateStateCode(commonpb.StateCode_Healthy) ctx := context.Background() resp, err := node.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) assert.NoError(t, err) @@ -116,7 +116,7 @@ func TestProxy_CheckHealth(t *testing.T) { dataCoord: dataCoordMock, } node.multiRateLimiter = NewMultiRateLimiter() - node.stateCode.Store(commonpb.StateCode_Healthy) + node.UpdateStateCode(commonpb.StateCode_Healthy) ctx := context.Background() resp, err := node.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) assert.NoError(t, err) @@ -133,7 +133,7 @@ func TestProxy_CheckHealth(t *testing.T) { queryCoord: qc, } node.multiRateLimiter = NewMultiRateLimiter() - node.stateCode.Store(commonpb.StateCode_Healthy) + node.UpdateStateCode(commonpb.StateCode_Healthy) resp, err := node.CheckHealth(context.Background(), &milvuspb.CheckHealthRequest{}) assert.NoError(t, err) assert.Equal(t, true, resp.IsHealthy) @@ -160,20 +160,20 @@ func TestProxy_CheckHealth(t *testing.T) { func TestProxyRenameCollection(t *testing.T) { t.Run("not healthy", func(t *testing.T) { node := &Proxy{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}} - node.stateCode.Store(commonpb.StateCode_Abnormal) + node.UpdateStateCode(commonpb.StateCode_Abnormal) ctx := context.Background() resp, err := node.RenameCollection(ctx, &milvuspb.RenameCollectionRequest{}) assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetErrorCode()) + assert.ErrorIs(t, merr.Error(resp), merr.ErrServiceNotReady) }) t.Run("rename with illegal new collection name", func(t *testing.T) { node := &Proxy{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}} - node.stateCode.Store(commonpb.StateCode_Healthy) + node.UpdateStateCode(commonpb.StateCode_Healthy) ctx := context.Background() resp, err := node.RenameCollection(ctx, &milvuspb.RenameCollectionRequest{NewName: "$#^%#&#$*!)#@!"}) assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_IllegalCollectionName, resp.GetErrorCode()) + assert.ErrorIs(t, merr.Error(resp), merr.ErrParameterInvalid) }) t.Run("rename fail", func(t *testing.T) { @@ -184,7 +184,7 @@ func TestProxyRenameCollection(t *testing.T) { session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}, rootCoord: rc, } - node.stateCode.Store(commonpb.StateCode_Healthy) + node.UpdateStateCode(commonpb.StateCode_Healthy) ctx := context.Background() resp, err := node.RenameCollection(ctx, &milvuspb.RenameCollectionRequest{NewName: "new"}) @@ -200,7 +200,7 @@ func TestProxyRenameCollection(t *testing.T) { session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}, rootCoord: rc, } - node.stateCode.Store(commonpb.StateCode_Healthy) + node.UpdateStateCode(commonpb.StateCode_Healthy) ctx := context.Background() resp, err := node.RenameCollection(ctx, &milvuspb.RenameCollectionRequest{NewName: "new"}) @@ -216,7 +216,7 @@ func TestProxy_ResourceGroup(t *testing.T) { node, err := NewProxy(ctx, factory) assert.NoError(t, err) node.multiRateLimiter = NewMultiRateLimiter() - node.stateCode.Store(commonpb.StateCode_Healthy) + node.UpdateStateCode(commonpb.StateCode_Healthy) qc := mocks.NewMockQueryCoordClient(t) node.SetQueryCoordClient(qc) @@ -308,7 +308,7 @@ func TestProxy_InvalidResourceGroupName(t *testing.T) { node, err := NewProxy(ctx, factory) assert.NoError(t, err) node.multiRateLimiter = NewMultiRateLimiter() - node.stateCode.Store(commonpb.StateCode_Healthy) + node.UpdateStateCode(commonpb.StateCode_Healthy) qc := mocks.NewMockQueryCoordClient(t) node.SetQueryCoordClient(qc) @@ -329,7 +329,7 @@ func TestProxy_InvalidResourceGroupName(t *testing.T) { ResourceGroup: "...", }) assert.NoError(t, err) - assert.Equal(t, resp.ErrorCode, commonpb.ErrorCode_IllegalArgument) + assert.ErrorIs(t, merr.Error(resp), merr.ErrParameterInvalid) }) t.Run("drop resource group", func(t *testing.T) { @@ -347,7 +347,7 @@ func TestProxy_InvalidResourceGroupName(t *testing.T) { NumNode: 1, }) assert.NoError(t, err) - assert.Equal(t, resp.ErrorCode, commonpb.ErrorCode_IllegalArgument) + assert.ErrorIs(t, merr.Error(resp), merr.ErrParameterInvalid) }) t.Run("transfer replica", func(t *testing.T) { @@ -358,7 +358,7 @@ func TestProxy_InvalidResourceGroupName(t *testing.T) { CollectionName: "collection1", }) assert.NoError(t, err) - assert.Equal(t, resp.ErrorCode, commonpb.ErrorCode_IllegalArgument) + assert.ErrorIs(t, merr.Error(resp), merr.ErrParameterInvalid) }) } @@ -398,7 +398,7 @@ func TestProxy_FlushAll_DbCollection(t *testing.T) { node, err := NewProxy(ctx, factory) assert.NoError(t, err) - node.stateCode.Store(commonpb.StateCode_Healthy) + node.UpdateStateCode(commonpb.StateCode_Healthy) node.tsoAllocator = ×tampAllocator{ tso: newMockTimestampAllocatorInterface(), } @@ -437,7 +437,7 @@ func TestProxy_FlushAll(t *testing.T) { node, err := NewProxy(ctx, factory) assert.NoError(t, err) - node.stateCode.Store(commonpb.StateCode_Healthy) + node.UpdateStateCode(commonpb.StateCode_Healthy) node.tsoAllocator = ×tampAllocator{ tso: newMockTimestampAllocatorInterface(), } @@ -483,11 +483,11 @@ func TestProxy_FlushAll(t *testing.T) { }) t.Run("FlushAll failed, server is abnormal", func(t *testing.T) { - node.stateCode.Store(commonpb.StateCode_Abnormal) + node.UpdateStateCode(commonpb.StateCode_Abnormal) resp, err := node.FlushAll(ctx, &milvuspb.FlushAllRequest{}) assert.NoError(t, err) assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady) - node.stateCode.Store(commonpb.StateCode_Healthy) + node.UpdateStateCode(commonpb.StateCode_Healthy) }) t.Run("FlushAll failed, get id failed", func(t *testing.T) { @@ -557,7 +557,7 @@ func TestProxy_GetFlushAllState(t *testing.T) { node, err := NewProxy(ctx, factory) assert.NoError(t, err) - node.stateCode.Store(commonpb.StateCode_Healthy) + node.UpdateStateCode(commonpb.StateCode_Healthy) node.tsoAllocator = ×tampAllocator{ tso: newMockTimestampAllocatorInterface(), } @@ -576,11 +576,11 @@ func TestProxy_GetFlushAllState(t *testing.T) { }) t.Run("GetFlushAllState failed, server is abnormal", func(t *testing.T) { - node.stateCode.Store(commonpb.StateCode_Abnormal) + node.UpdateStateCode(commonpb.StateCode_Abnormal) resp, err := node.GetFlushAllState(ctx, &milvuspb.GetFlushAllStateRequest{}) assert.NoError(t, err) assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady) - node.stateCode.Store(commonpb.StateCode_Healthy) + node.UpdateStateCode(commonpb.StateCode_Healthy) }) t.Run("DataCoord GetFlushAllState failed", func(t *testing.T) { @@ -604,7 +604,7 @@ func TestProxy_GetFlushState(t *testing.T) { node, err := NewProxy(ctx, factory) assert.NoError(t, err) - node.stateCode.Store(commonpb.StateCode_Healthy) + node.UpdateStateCode(commonpb.StateCode_Healthy) node.tsoAllocator = ×tampAllocator{ tso: newMockTimestampAllocatorInterface(), } @@ -623,11 +623,11 @@ func TestProxy_GetFlushState(t *testing.T) { }) t.Run("GetFlushState failed, server is abnormal", func(t *testing.T) { - node.stateCode.Store(commonpb.StateCode_Abnormal) + node.UpdateStateCode(commonpb.StateCode_Abnormal) resp, err := node.GetFlushState(ctx, &milvuspb.GetFlushStateRequest{}) assert.NoError(t, err) assert.Equal(t, resp.GetStatus().GetErrorCode(), commonpb.ErrorCode_NotReadyServe) - node.stateCode.Store(commonpb.StateCode_Healthy) + node.UpdateStateCode(commonpb.StateCode_Healthy) }) t.Run("GetFlushState with collection name", func(t *testing.T) { @@ -635,7 +635,7 @@ func TestProxy_GetFlushState(t *testing.T) { CollectionName: "*", }) assert.NoError(t, err) - assert.Equal(t, resp.GetStatus().GetErrorCode(), commonpb.ErrorCode_UnexpectedError) + assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrParameterInvalid) cacheBak := globalMetaCache defer func() { globalMetaCache = cacheBak }() @@ -684,7 +684,7 @@ func TestProxy_GetReplicas(t *testing.T) { node, err := NewProxy(ctx, factory) assert.NoError(t, err) - node.stateCode.Store(commonpb.StateCode_Healthy) + node.UpdateStateCode(commonpb.StateCode_Healthy) node.tsoAllocator = ×tampAllocator{ tso: newMockTimestampAllocatorInterface(), } @@ -705,13 +705,13 @@ func TestProxy_GetReplicas(t *testing.T) { }) t.Run("proxy_not_healthy", func(t *testing.T) { - node.stateCode.Store(commonpb.StateCode_Abnormal) + node.UpdateStateCode(commonpb.StateCode_Abnormal) resp, err := node.GetReplicas(ctx, &milvuspb.GetReplicasRequest{ CollectionID: 1000, }) assert.NoError(t, err) assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady) - node.stateCode.Store(commonpb.StateCode_Healthy) + node.UpdateStateCode(commonpb.StateCode_Healthy) }) t.Run("QueryCoordClient_returnsError", func(t *testing.T) { @@ -757,7 +757,7 @@ func TestProxy_Connect(t *testing.T) { mock.Anything, mock.Anything, ).Return(&milvuspb.ListDatabasesResponse{ - Status: unhealthyStatus(), + Status: merr.Status(merr.WrapErrServiceNotReady("initialization")), }, nil) node := &Proxy{rootCoord: r} @@ -885,11 +885,11 @@ func TestProxyCreateDatabase(t *testing.T) { t.Run("not healthy", func(t *testing.T) { node := &Proxy{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}} - node.stateCode.Store(commonpb.StateCode_Abnormal) + node.UpdateStateCode(commonpb.StateCode_Abnormal) ctx := context.Background() resp, err := node.CreateDatabase(ctx, &milvuspb.CreateDatabaseRequest{}) assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetErrorCode()) + assert.ErrorIs(t, merr.Error(resp), merr.ErrServiceNotReady) }) factory := dependency.NewDefaultFactory(true) @@ -901,7 +901,7 @@ func TestProxyCreateDatabase(t *testing.T) { tso: newMockTimestampAllocatorInterface(), } node.multiRateLimiter = NewMultiRateLimiter() - node.stateCode.Store(commonpb.StateCode_Healthy) + node.UpdateStateCode(commonpb.StateCode_Healthy) node.sched, err = newTaskScheduler(ctx, node.tsoAllocator, node.factory) node.sched.ddQueue.setMaxTaskNum(10) assert.NoError(t, err) @@ -925,7 +925,7 @@ func TestProxyCreateDatabase(t *testing.T) { rc.On("CreateDatabase", mock.Anything, mock.Anything). Return(merr.Status(nil), nil) node.rootCoord = rc - node.stateCode.Store(commonpb.StateCode_Healthy) + node.UpdateStateCode(commonpb.StateCode_Healthy) ctx := context.Background() resp, err := node.CreateDatabase(ctx, &milvuspb.CreateDatabaseRequest{DbName: "db"}) @@ -939,11 +939,11 @@ func TestProxyDropDatabase(t *testing.T) { t.Run("not healthy", func(t *testing.T) { node := &Proxy{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}} - node.stateCode.Store(commonpb.StateCode_Abnormal) + node.UpdateStateCode(commonpb.StateCode_Abnormal) ctx := context.Background() resp, err := node.DropDatabase(ctx, &milvuspb.DropDatabaseRequest{}) assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetErrorCode()) + assert.ErrorIs(t, merr.Error(resp), merr.ErrServiceNotReady) }) factory := dependency.NewDefaultFactory(true) @@ -955,7 +955,7 @@ func TestProxyDropDatabase(t *testing.T) { tso: newMockTimestampAllocatorInterface(), } node.multiRateLimiter = NewMultiRateLimiter() - node.stateCode.Store(commonpb.StateCode_Healthy) + node.UpdateStateCode(commonpb.StateCode_Healthy) node.sched, err = newTaskScheduler(ctx, node.tsoAllocator, node.factory) node.sched.ddQueue.setMaxTaskNum(10) assert.NoError(t, err) @@ -979,7 +979,7 @@ func TestProxyDropDatabase(t *testing.T) { rc.On("DropDatabase", mock.Anything, mock.Anything). Return(merr.Status(nil), nil) node.rootCoord = rc - node.stateCode.Store(commonpb.StateCode_Healthy) + node.UpdateStateCode(commonpb.StateCode_Healthy) ctx := context.Background() resp, err := node.DropDatabase(ctx, &milvuspb.DropDatabaseRequest{DbName: "db"}) @@ -993,7 +993,7 @@ func TestProxyListDatabase(t *testing.T) { t.Run("not healthy", func(t *testing.T) { node := &Proxy{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}} - node.stateCode.Store(commonpb.StateCode_Abnormal) + node.UpdateStateCode(commonpb.StateCode_Abnormal) ctx := context.Background() resp, err := node.ListDatabases(ctx, &milvuspb.ListDatabasesRequest{}) assert.NoError(t, err) @@ -1009,7 +1009,7 @@ func TestProxyListDatabase(t *testing.T) { tso: newMockTimestampAllocatorInterface(), } node.multiRateLimiter = NewMultiRateLimiter() - node.stateCode.Store(commonpb.StateCode_Healthy) + node.UpdateStateCode(commonpb.StateCode_Healthy) node.sched, err = newTaskScheduler(ctx, node.tsoAllocator, node.factory) node.sched.ddQueue.setMaxTaskNum(10) assert.NoError(t, err) @@ -1035,7 +1035,7 @@ func TestProxyListDatabase(t *testing.T) { Status: merr.Status(nil), }, nil) node.rootCoord = rc - node.stateCode.Store(commonpb.StateCode_Healthy) + node.UpdateStateCode(commonpb.StateCode_Healthy) ctx := context.Background() resp, err := node.ListDatabases(ctx, &milvuspb.ListDatabasesRequest{}) diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 4bc941775d..0ca41dfa84 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -23,12 +23,12 @@ import ( "os" "strconv" "sync" - "sync/atomic" "syscall" "time" "github.com/cockroachdb/errors" clientv3 "go.etcd.io/etcd/client/v3" + "go.uber.org/atomic" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -76,7 +76,7 @@ type Proxy struct { ip string port int - stateCode atomic.Value + stateCode atomic.Int32 etcdCli *clientv3.Client address string @@ -135,6 +135,15 @@ func NewProxy(ctx context.Context, factory dependency.Factory) (*Proxy, error) { return node, nil } +// UpdateStateCode updates the state code of Proxy. +func (node *Proxy) UpdateStateCode(code commonpb.StateCode) { + node.stateCode.Store(int32(code)) +} + +func (node *Proxy) GetStateCode() commonpb.StateCode { + return commonpb.StateCode(node.stateCode.Load()) +} + // Register registers proxy at etcd func (node *Proxy) Register() error { node.session.Register() diff --git a/internal/proxy/proxy_rpc_test.go b/internal/proxy/proxy_rpc_test.go index e260e1a9ed..ae49378025 100644 --- a/internal/proxy/proxy_rpc_test.go +++ b/internal/proxy/proxy_rpc_test.go @@ -56,7 +56,7 @@ func TestProxyRpcLimit(t *testing.T) { defer testServer.grpcServer.Stop() client, err := grpcproxyclient.NewClient(ctx, "localhost:"+p.Port.GetValue(), 1) assert.NoError(t, err) - proxy.stateCode.Store(commonpb.StateCode_Healthy) + proxy.UpdateStateCode(commonpb.StateCode_Healthy) rates := make([]*internalpb.Rate, 0) diff --git a/internal/proxy/proxy_test.go b/internal/proxy/proxy_test.go index 2ef6717bfa..b8b3a9d641 100644 --- a/internal/proxy/proxy_test.go +++ b/internal/proxy/proxy_test.go @@ -479,7 +479,7 @@ func TestProxy(t *testing.T) { err = proxy.Start() assert.NoError(t, err) - assert.Equal(t, commonpb.StateCode_Healthy, proxy.stateCode.Load().(commonpb.StateCode)) + assert.Equal(t, commonpb.StateCode_Healthy, proxy.GetStateCode()) // register proxy err = proxy.Register() @@ -496,7 +496,7 @@ func TestProxy(t *testing.T) { assert.Equal(t, commonpb.ErrorCode_Success, states.GetStatus().GetErrorCode()) assert.Equal(t, paramtable.GetNodeID(), states.State.NodeID) assert.Equal(t, typeutil.ProxyRole, states.State.Role) - assert.Equal(t, proxy.stateCode.Load().(commonpb.StateCode), states.State.StateCode) + assert.Equal(t, proxy.GetStateCode(), states.State.StateCode) }) t.Run("get statistics channel", func(t *testing.T) { @@ -1608,7 +1608,7 @@ func TestProxy(t *testing.T) { CollectionName: collectionName, Files: []string{"f1.json"}, } - proxy.stateCode.Store(commonpb.StateCode_Healthy) + proxy.UpdateStateCode(commonpb.StateCode_Healthy) resp, err := proxy.Import(context.TODO(), req) assert.EqualValues(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) assert.NoError(t, err) @@ -1623,7 +1623,7 @@ func TestProxy(t *testing.T) { CollectionName: "bad_collection_name", Files: []string{"f1.json"}, } - proxy.stateCode.Store(commonpb.StateCode_Healthy) + proxy.UpdateStateCode(commonpb.StateCode_Healthy) resp, err := proxy.Import(context.TODO(), req) assert.NoError(t, err) assert.EqualValues(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) @@ -1636,7 +1636,7 @@ func TestProxy(t *testing.T) { CollectionName: "bad_collection_name", Files: []string{"f1.json"}, } - proxy.stateCode.Store(commonpb.StateCode_Healthy) + proxy.UpdateStateCode(commonpb.StateCode_Healthy) resp, err := proxy.Import(context.TODO(), req) assert.NoError(t, err) assert.EqualValues(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) @@ -2295,7 +2295,7 @@ func TestProxy(t *testing.T) { // proxy unhealthy // //notStateCode := "not state code" - //proxy.stateCode.Store(notStateCode) + //proxy.UpdateStateCode(notStateCode) // //t.Run("GetComponentStates fail", func(t *testing.T) { // _, err := proxy.GetComponentStates(ctx) @@ -3942,14 +3942,16 @@ func testProxyRefreshPolicyInfoCache(ctx context.Context, t *testing.T, proxy *P }) assert.NoError(t, err) assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode) - _, err = proxy.RefreshPolicyInfoCache(ctx, &proxypb.RefreshPolicyInfoCacheRequest{}) - assert.Error(t, err) + resp, err = proxy.RefreshPolicyInfoCache(ctx, &proxypb.RefreshPolicyInfoCacheRequest{}) + assert.NoError(t, err) + assert.Error(t, merr.Error(resp)) - _, err = proxy.RefreshPolicyInfoCache(ctx, &proxypb.RefreshPolicyInfoCacheRequest{ + resp, err = proxy.RefreshPolicyInfoCache(ctx, &proxypb.RefreshPolicyInfoCacheRequest{ OpType: 100, OpKey: funcutil.EncodeUserRoleCache("foo", "public"), }) - assert.Error(t, err) + assert.NoError(t, err) + assert.Error(t, merr.Error(resp)) }) wg.Wait() } @@ -3976,7 +3978,7 @@ func Test_GetCompactionState(t *testing.T) { t.Run("get compaction state", func(t *testing.T) { datacoord := &DataCoordMock{} proxy := &Proxy{dataCoord: datacoord} - proxy.stateCode.Store(commonpb.StateCode_Healthy) + proxy.UpdateStateCode(commonpb.StateCode_Healthy) resp, err := proxy.GetCompactionState(context.TODO(), nil) assert.EqualValues(t, &milvuspb.GetCompactionStateResponse{}, resp) assert.NoError(t, err) @@ -3985,7 +3987,7 @@ func Test_GetCompactionState(t *testing.T) { t.Run("get compaction state with unhealthy proxy", func(t *testing.T) { datacoord := &DataCoordMock{} proxy := &Proxy{dataCoord: datacoord} - proxy.stateCode.Store(commonpb.StateCode_Abnormal) + proxy.UpdateStateCode(commonpb.StateCode_Abnormal) resp, err := proxy.GetCompactionState(context.TODO(), nil) assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady) assert.NoError(t, err) @@ -3996,7 +3998,7 @@ func Test_ManualCompaction(t *testing.T) { t.Run("test manual compaction", func(t *testing.T) { datacoord := &DataCoordMock{} proxy := &Proxy{dataCoord: datacoord} - proxy.stateCode.Store(commonpb.StateCode_Healthy) + proxy.UpdateStateCode(commonpb.StateCode_Healthy) resp, err := proxy.ManualCompaction(context.TODO(), nil) assert.EqualValues(t, &milvuspb.ManualCompactionResponse{}, resp) assert.NoError(t, err) @@ -4004,7 +4006,7 @@ func Test_ManualCompaction(t *testing.T) { t.Run("test manual compaction with unhealthy", func(t *testing.T) { datacoord := &DataCoordMock{} proxy := &Proxy{dataCoord: datacoord} - proxy.stateCode.Store(commonpb.StateCode_Abnormal) + proxy.UpdateStateCode(commonpb.StateCode_Abnormal) resp, err := proxy.ManualCompaction(context.TODO(), nil) assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady) assert.NoError(t, err) @@ -4015,7 +4017,7 @@ func Test_GetCompactionStateWithPlans(t *testing.T) { t.Run("test get compaction state with plans", func(t *testing.T) { datacoord := &DataCoordMock{} proxy := &Proxy{dataCoord: datacoord} - proxy.stateCode.Store(commonpb.StateCode_Healthy) + proxy.UpdateStateCode(commonpb.StateCode_Healthy) resp, err := proxy.GetCompactionStateWithPlans(context.TODO(), nil) assert.EqualValues(t, &milvuspb.GetCompactionPlansResponse{}, resp) assert.NoError(t, err) @@ -4023,7 +4025,7 @@ func Test_GetCompactionStateWithPlans(t *testing.T) { t.Run("test get compaction state with plans with unhealthy proxy", func(t *testing.T) { datacoord := &DataCoordMock{} proxy := &Proxy{dataCoord: datacoord} - proxy.stateCode.Store(commonpb.StateCode_Abnormal) + proxy.UpdateStateCode(commonpb.StateCode_Abnormal) resp, err := proxy.GetCompactionStateWithPlans(context.TODO(), nil) assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady) assert.NoError(t, err) @@ -4046,7 +4048,7 @@ func Test_GetFlushState(t *testing.T) { datacoord := &DataCoordMock{} proxy := &Proxy{dataCoord: datacoord} - proxy.stateCode.Store(commonpb.StateCode_Healthy) + proxy.UpdateStateCode(commonpb.StateCode_Healthy) resp, err := proxy.GetFlushState(context.TODO(), &milvuspb.GetFlushStateRequest{ CollectionName: "coll", }) @@ -4057,7 +4059,7 @@ func Test_GetFlushState(t *testing.T) { t.Run("test get flush state with unhealthy proxy", func(t *testing.T) { datacoord := &DataCoordMock{} proxy := &Proxy{dataCoord: datacoord} - proxy.stateCode.Store(commonpb.StateCode_Abnormal) + proxy.UpdateStateCode(commonpb.StateCode_Abnormal) resp, err := proxy.GetFlushState(context.TODO(), nil) assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady) assert.NoError(t, err) @@ -4066,7 +4068,7 @@ func Test_GetFlushState(t *testing.T) { func TestProxy_GetComponentStates(t *testing.T) { n := &Proxy{} - n.stateCode.Store(commonpb.StateCode_Healthy) + n.UpdateStateCode(commonpb.StateCode_Healthy) resp, err := n.GetComponentStates(context.Background(), nil) assert.NoError(t, err) assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) @@ -4078,14 +4080,6 @@ func TestProxy_GetComponentStates(t *testing.T) { assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) } -func TestProxy_GetComponentStates_state_code(t *testing.T) { - p := &Proxy{} - p.stateCode.Store("not commonpb.StateCode") - states, err := p.GetComponentStates(context.Background(), nil) - assert.NoError(t, err) - assert.NotEqual(t, commonpb.ErrorCode_Success, states.GetStatus().GetErrorCode()) -} - func TestProxy_Import(t *testing.T) { var wg sync.WaitGroup @@ -4182,7 +4176,7 @@ func TestProxy_GetImportState(t *testing.T) { rootCoord.state.Store(commonpb.StateCode_Healthy) t.Run("test get import state", func(t *testing.T) { proxy := &Proxy{rootCoord: rootCoord} - proxy.stateCode.Store(commonpb.StateCode_Healthy) + proxy.UpdateStateCode(commonpb.StateCode_Healthy) resp, err := proxy.GetImportState(context.TODO(), req) assert.EqualValues(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) @@ -4190,7 +4184,7 @@ func TestProxy_GetImportState(t *testing.T) { }) t.Run("test get import state with unhealthy", func(t *testing.T) { proxy := &Proxy{rootCoord: rootCoord} - proxy.stateCode.Store(commonpb.StateCode_Abnormal) + proxy.UpdateStateCode(commonpb.StateCode_Abnormal) resp, err := proxy.GetImportState(context.TODO(), req) assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady) assert.NoError(t, err) @@ -4203,7 +4197,7 @@ func TestProxy_ListImportTasks(t *testing.T) { rootCoord.state.Store(commonpb.StateCode_Healthy) t.Run("test list import tasks", func(t *testing.T) { proxy := &Proxy{rootCoord: rootCoord} - proxy.stateCode.Store(commonpb.StateCode_Healthy) + proxy.UpdateStateCode(commonpb.StateCode_Healthy) resp, err := proxy.ListImportTasks(context.TODO(), req) assert.EqualValues(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) @@ -4211,7 +4205,7 @@ func TestProxy_ListImportTasks(t *testing.T) { }) t.Run("test list import tasks with unhealthy", func(t *testing.T) { proxy := &Proxy{rootCoord: rootCoord} - proxy.stateCode.Store(commonpb.StateCode_Abnormal) + proxy.UpdateStateCode(commonpb.StateCode_Abnormal) resp, err := proxy.ListImportTasks(context.TODO(), req) assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady) assert.NoError(t, err) @@ -4248,7 +4242,7 @@ func TestProxy_GetLoadState(t *testing.T) { InMemoryPercentages: []int64{}, }, nil) proxy := &Proxy{queryCoord: qc} - proxy.stateCode.Store(commonpb.StateCode_Healthy) + proxy.UpdateStateCode(commonpb.StateCode_Healthy) stateResp, err := proxy.GetLoadState(context.Background(), &milvuspb.GetLoadStateRequest{CollectionName: "foo"}) assert.NoError(t, err) assert.ErrorIs(t, merr.Error(stateResp.GetStatus()), merr.ErrServiceNotReady) @@ -4263,7 +4257,7 @@ func TestProxy_GetLoadState(t *testing.T) { qc.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(nil, merr.WrapErrCollectionNotLoaded("foo")) qc.EXPECT().ShowPartitions(mock.Anything, mock.Anything).Return(nil, merr.WrapErrPartitionNotLoaded("p1")) proxy := &Proxy{queryCoord: qc} - proxy.stateCode.Store(commonpb.StateCode_Healthy) + proxy.UpdateStateCode(commonpb.StateCode_Healthy) stateResp, err := proxy.GetLoadState(context.Background(), &milvuspb.GetLoadStateRequest{CollectionName: "foo"}) assert.NoError(t, err) @@ -4304,7 +4298,7 @@ func TestProxy_GetLoadState(t *testing.T) { InMemoryPercentages: []int64{100}, }, nil) proxy := &Proxy{queryCoord: qc} - proxy.stateCode.Store(commonpb.StateCode_Healthy) + proxy.UpdateStateCode(commonpb.StateCode_Healthy) stateResp, err := proxy.GetLoadState(context.Background(), &milvuspb.GetLoadStateRequest{CollectionName: "foo", Base: &commonpb.MsgBase{}}) assert.NoError(t, err) @@ -4313,7 +4307,7 @@ func TestProxy_GetLoadState(t *testing.T) { stateResp, err = proxy.GetLoadState(context.Background(), &milvuspb.GetLoadStateRequest{CollectionName: ""}) assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_UnexpectedError, stateResp.GetStatus().GetErrorCode()) + assert.ErrorIs(t, merr.Error(stateResp.GetStatus()), merr.ErrParameterInvalid) progressResp, err := proxy.GetLoadingProgress(context.Background(), &milvuspb.GetLoadingProgressRequest{CollectionName: "foo"}) assert.NoError(t, err) @@ -4339,7 +4333,7 @@ func TestProxy_GetLoadState(t *testing.T) { InMemoryPercentages: []int64{50}, }, nil) proxy := &Proxy{queryCoord: qc} - proxy.stateCode.Store(commonpb.StateCode_Healthy) + proxy.UpdateStateCode(commonpb.StateCode_Healthy) stateResp, err := proxy.GetLoadState(context.Background(), &milvuspb.GetLoadStateRequest{CollectionName: "foo"}) assert.NoError(t, err) @@ -4373,7 +4367,7 @@ func TestProxy_GetLoadState(t *testing.T) { Status: merr.Status(mockErr), }, nil) proxy := &Proxy{queryCoord: qc} - proxy.stateCode.Store(commonpb.StateCode_Healthy) + proxy.UpdateStateCode(commonpb.StateCode_Healthy) stateResp, err := proxy.GetLoadState(context.Background(), &milvuspb.GetLoadStateRequest{CollectionName: "foo"}) assert.NoError(t, err) diff --git a/internal/proxy/task_database.go b/internal/proxy/task_database.go index d5216d36f5..d9f3803e13 100644 --- a/internal/proxy/task_database.go +++ b/internal/proxy/task_database.go @@ -63,7 +63,6 @@ func (cdt *createDatabaseTask) PreExecute(ctx context.Context) error { func (cdt *createDatabaseTask) Execute(ctx context.Context) error { var err error - cdt.result = &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError} cdt.result, err = cdt.rootCoord.CreateDatabase(ctx, cdt.CreateDatabaseRequest) return err } @@ -125,7 +124,6 @@ func (ddt *dropDatabaseTask) PreExecute(ctx context.Context) error { func (ddt *dropDatabaseTask) Execute(ctx context.Context) error { var err error - ddt.result = &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError} ddt.result, err = ddt.rootCoord.DropDatabase(ctx, ddt.DropDatabaseRequest) if ddt.result != nil && ddt.result.ErrorCode == commonpb.ErrorCode_Success { @@ -191,11 +189,6 @@ func (ldt *listDatabaseTask) PreExecute(ctx context.Context) error { func (ldt *listDatabaseTask) Execute(ctx context.Context) error { var err error - ldt.result = &milvuspb.ListDatabasesResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - }, - } ldt.result, err = ldt.rootCoord.ListDatabases(ctx, ldt.ListDatabasesRequest) return err } diff --git a/internal/proxy/task_search.go b/internal/proxy/task_search.go index a407ec93e6..d247fba405 100644 --- a/internal/proxy/task_search.go +++ b/internal/proxy/task_search.go @@ -639,10 +639,7 @@ func (t *searchTask) Requery() error { func (t *searchTask) fillInEmptyResult(numQueries int64) { t.result = &milvuspb.SearchResults{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - Reason: "search result is empty", - }, + Status: merr.Success("search result is empty"), CollectionName: t.collectionName, Results: &schemapb.SearchResultData{ NumQueries: numQueries, diff --git a/internal/proxy/util.go b/internal/proxy/util.go index 18d518f1ca..21ddf12872 100644 --- a/internal/proxy/util.go +++ b/internal/proxy/util.go @@ -127,24 +127,24 @@ func validateCollectionNameOrAlias(entity, entityType string) error { entity = strings.TrimSpace(entity) if entity == "" { - return fmt.Errorf("collection %s should not be empty", entityType) + return merr.WrapErrParameterInvalidMsg("collection %s should not be empty", entityType) } invalidMsg := fmt.Sprintf("Invalid collection %s: %s. ", entityType, entity) if len(entity) > Params.ProxyCfg.MaxNameLength.GetAsInt() { - return fmt.Errorf("%s the length of a collection %s must be less than %s characters", invalidMsg, entityType, + return merr.WrapErrParameterInvalidMsg("%s the length of a collection %s must be less than %s characters", invalidMsg, entityType, Params.ProxyCfg.MaxNameLength.GetValue()) } firstChar := entity[0] if firstChar != '_' && !isAlpha(firstChar) { - return fmt.Errorf("%s the first character of a collection %s must be an underscore or letter", invalidMsg, entityType) + return merr.WrapErrParameterInvalidMsg("%s the first character of a collection %s must be an underscore or letter", invalidMsg, entityType) } for i := 1; i < len(entity); i++ { c := entity[i] if c != '_' && !isAlpha(c) && !isNumber(c) { - return fmt.Errorf("%s collection %s can only contain numbers, letters and underscores", invalidMsg, entityType) + return merr.WrapErrParameterInvalidMsg("%s collection %s can only contain numbers, letters and underscores", invalidMsg, entityType) } } return nil @@ -157,19 +157,19 @@ func ValidateResourceGroupName(entity string) error { invalidMsg := fmt.Sprintf("Invalid resource group name %s.", entity) if len(entity) > Params.ProxyCfg.MaxNameLength.GetAsInt() { - return fmt.Errorf("%s the length of a resource group name must be less than %s characters", + return merr.WrapErrParameterInvalidMsg("%s the length of a resource group name must be less than %s characters", invalidMsg, Params.ProxyCfg.MaxNameLength.GetValue()) } firstChar := entity[0] if firstChar != '_' && !isAlpha(firstChar) { - return fmt.Errorf("%s the first character of a resource group name must be an underscore or letter", invalidMsg) + return merr.WrapErrParameterInvalidMsg("%s the first character of a resource group name must be an underscore or letter", invalidMsg) } for i := 1; i < len(entity); i++ { c := entity[i] if c != '_' && !isAlpha(c) && !isNumber(c) { - return fmt.Errorf("%s resource group name can only contain numbers, letters and underscores", invalidMsg) + return merr.WrapErrParameterInvalidMsg("%s resource group name can only contain numbers, letters and underscores", invalidMsg) } } return nil @@ -722,27 +722,23 @@ func ValidateUsername(username string) error { username = strings.TrimSpace(username) if username == "" { - return errors.New("username should not be empty") + return merr.WrapErrParameterInvalidMsg("username must be not empty") } - invalidMsg := "Invalid username: " + username + ". " if len(username) > Params.ProxyCfg.MaxUsernameLength.GetAsInt() { - msg := invalidMsg + "The length of username must be less than " + Params.ProxyCfg.MaxUsernameLength.GetValue() + " characters." - return errors.New(msg) + return merr.WrapErrParameterInvalidMsg("invalid username %s with length %d, the length of username must be less than %d", username, len(username), Params.ProxyCfg.MaxUsernameLength.GetValue()) } firstChar := username[0] if !isAlpha(firstChar) { - msg := invalidMsg + "The first character of username must be a letter." - return errors.New(msg) + return merr.WrapErrParameterInvalidMsg("invalid user name %s, the first character must be a letter, but got %s", username, firstChar) } usernameSize := len(username) for i := 1; i < usernameSize; i++ { c := username[i] if c != '_' && !isAlpha(c) && !isNumber(c) { - msg := invalidMsg + "Username should only contain numbers, letters, and underscores." - return errors.New(msg) + return merr.WrapErrParameterInvalidMsg("invalid user name %s, username must contain only numbers, letters and underscores, but got %s", username, c) } } return nil @@ -750,9 +746,9 @@ func ValidateUsername(username string) error { func ValidatePassword(password string) error { if len(password) < Params.ProxyCfg.MinPasswordLength.GetAsInt() || len(password) > Params.ProxyCfg.MaxPasswordLength.GetAsInt() { - msg := "The length of password must be great than " + Params.ProxyCfg.MinPasswordLength.GetValue() + - " and less than " + Params.ProxyCfg.MaxPasswordLength.GetValue() + " characters." - return errors.New(msg) + return merr.WrapErrParameterInvalidRange(Params.ProxyCfg.MinPasswordLength.GetAsInt(), + Params.ProxyCfg.MaxPasswordLength.GetAsInt(), + len(password), "invalid password length") } return nil } diff --git a/pkg/util/merr/errors.go b/pkg/util/merr/errors.go index 41e214683f..147e7f5d9e 100644 --- a/pkg/util/merr/errors.go +++ b/pkg/util/merr/errors.go @@ -105,6 +105,12 @@ var ( ErrMqTopicNotEmpty = newMilvusError("topic not empty", 1301, false) ErrMqInternal = newMilvusError("message queue internal error", 1302, false) + // Privilege related + // this operation is denied because the user not authorized, user need to login in first + ErrPrivilegeNotAuthenticated = newMilvusError("not authenticated", 1400, false) + // this operation is denied because the user has no permission to do this, user need higher privilege + ErrPrivilegeNotPermitted = newMilvusError("privilege not permitted", 1401, false) + // field related ErrFieldNotFound = newMilvusError("field not found", 1700, false) diff --git a/pkg/util/merr/utils.go b/pkg/util/merr/utils.go index 7f4bb6b49a..ec312d754f 100644 --- a/pkg/util/merr/utils.go +++ b/pkg/util/merr/utils.go @@ -18,7 +18,6 @@ package merr import ( "context" - "fmt" "strings" "github.com/cockroachdb/errors" @@ -94,6 +93,13 @@ func CheckRPCCall(resp any, err error) error { return nil } +func Success(reason ...string) *commonpb.Status { + status := Status(nil) + // NOLINT + status.Reason = strings.Join(reason, " ") + return status +} + // Deprecated func StatusWithErrorCode(err error, code commonpb.ErrorCode) *commonpb.Status { if err == nil { @@ -198,7 +204,7 @@ func Error(status *commonpb.Status) error { // use code first code := status.GetCode() if code == 0 { - return newMilvusError(fmt.Sprintf("legacy error code:%d, reason: %s", status.GetErrorCode(), status.GetReason()), errUnexpected.errCode, false) + return newMilvusError(status.GetReason(), errUnexpected.errCode, false) } return newMilvusError(status.GetReason(), code, code&retryableFlag != 0) @@ -643,6 +649,16 @@ func WrapErrMqInternal(err error, msg ...string) error { return err } +func WrapErrPrivilegeNotAuthenticated(fmt string, args ...any) error { + err := errors.Wrapf(ErrPrivilegeNotAuthenticated, fmt, args...) + return err +} + +func WrapErrPrivilegeNotPermitted(fmt string, args ...any) error { + err := errors.Wrapf(ErrPrivilegeNotPermitted, fmt, args...) + return err +} + // Segcore related func WrapErrSegcore(code int32, msg ...string) error { err := errors.Wrapf(ErrSegcore, "internal code=%v", code)