From 0983d1fac797547ab774144acc7048ba520fda27 Mon Sep 17 00:00:00 2001 From: Preetham <9149028+preetham@users.noreply.github.com> Date: Tue, 30 May 2023 07:03:28 +0530 Subject: [PATCH] Update: Wrap common calls datanode client (#24494) Signed-off-by: Preetham --- .../distributed/datanode/client/client.go | 114 ++++-------------- 1 file changed, 21 insertions(+), 93 deletions(-) diff --git a/internal/distributed/datanode/client/client.go b/internal/distributed/datanode/client/client.go index 00d9944f11..cb88db7524 100644 --- a/internal/distributed/datanode/client/client.go +++ b/internal/distributed/datanode/client/client.go @@ -100,33 +100,32 @@ func (c *Client) getAddr() (string, error) { return c.addr, nil } -// GetComponentStates returns ComponentStates -func (c *Client) GetComponentStates(ctx context.Context) (*milvuspb.ComponentStates, error) { +func wrapGrpcCall[T any](ctx context.Context, c *Client, call func(grpcClient datapb.DataNodeClient) (*T, error)) (*T, error) { ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataNodeClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() } - return client.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{}) + return call(client) }) if err != nil || ret == nil { return nil, err } - return ret.(*milvuspb.ComponentStates), err + return ret.(*T), err +} + +// GetComponentStates returns ComponentStates +func (c *Client) GetComponentStates(ctx context.Context) (*milvuspb.ComponentStates, error) { + return wrapGrpcCall(ctx, c, func(client datapb.DataNodeClient) (*milvuspb.ComponentStates, error) { + return client.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{}) + }) } // GetStatisticsChannel return the statistics channel in string // Statistics channel contains statistics infos of query nodes, such as segment infos, memory infos func (c *Client) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) { - ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataNodeClient) (any, error) { - if !funcutil.CheckCtxValid(ctx) { - return nil, ctx.Err() - } + return wrapGrpcCall(ctx, c, func(client datapb.DataNodeClient) (*milvuspb.StringResponse, error) { return client.GetStatisticsChannel(ctx, &internalpb.GetStatisticsChannelRequest{}) }) - if err != nil || ret == nil { - return nil, err - } - return ret.(*milvuspb.StringResponse), err } // Deprecated @@ -136,16 +135,9 @@ func (c *Client) WatchDmChannels(ctx context.Context, req *datapb.WatchDmChannel commonpbutil.UpdateMsgBase( req.GetBase(), commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID())) - ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataNodeClient) (any, error) { - if !funcutil.CheckCtxValid(ctx) { - return nil, ctx.Err() - } + return wrapGrpcCall(ctx, c, func(client datapb.DataNodeClient) (*commonpb.Status, error) { return client.WatchDmChannels(ctx, req) }) - if err != nil || ret == nil { - return nil, err - } - return ret.(*commonpb.Status), err } // FlushSegments notifies DataNode to flush the segments req provids. The flush tasks are async to this @@ -165,16 +157,9 @@ func (c *Client) FlushSegments(ctx context.Context, req *datapb.FlushSegmentsReq commonpbutil.UpdateMsgBase( req.GetBase(), commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID())) - ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataNodeClient) (any, error) { - if !funcutil.CheckCtxValid(ctx) { - return nil, ctx.Err() - } + return wrapGrpcCall(ctx, c, func(client datapb.DataNodeClient) (*commonpb.Status, error) { return client.FlushSegments(ctx, req) }) - if err != nil || ret == nil { - return nil, err - } - return ret.(*commonpb.Status), err } // ShowConfigurations gets specified configurations para of DataNode @@ -183,17 +168,9 @@ func (c *Client) ShowConfigurations(ctx context.Context, req *internalpb.ShowCon commonpbutil.UpdateMsgBase( req.GetBase(), commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID())) - ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataNodeClient) (any, error) { - if !funcutil.CheckCtxValid(ctx) { - return nil, ctx.Err() - } + return wrapGrpcCall(ctx, c, func(client datapb.DataNodeClient) (*internalpb.ShowConfigurationsResponse, error) { return client.ShowConfigurations(ctx, req) }) - if err != nil || ret == nil { - return nil, err - } - - return ret.(*internalpb.ShowConfigurationsResponse), err } // GetMetrics returns metrics @@ -202,30 +179,16 @@ func (c *Client) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest commonpbutil.UpdateMsgBase( req.GetBase(), commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID())) - ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataNodeClient) (any, error) { - if !funcutil.CheckCtxValid(ctx) { - return nil, ctx.Err() - } + return wrapGrpcCall(ctx, c, func(client datapb.DataNodeClient) (*milvuspb.GetMetricsResponse, error) { return client.GetMetrics(ctx, req) }) - if err != nil || ret == nil { - return nil, err - } - return ret.(*milvuspb.GetMetricsResponse), err } // Compaction return compaction by given plan func (c *Client) Compaction(ctx context.Context, req *datapb.CompactionPlan) (*commonpb.Status, error) { - ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataNodeClient) (any, error) { - if !funcutil.CheckCtxValid(ctx) { - return nil, ctx.Err() - } + return wrapGrpcCall(ctx, c, func(client datapb.DataNodeClient) (*commonpb.Status, error) { return client.Compaction(ctx, req) }) - if err != nil || ret == nil { - return nil, err - } - return ret.(*commonpb.Status), err } func (c *Client) GetCompactionState(ctx context.Context, req *datapb.CompactionStateRequest) (*datapb.CompactionStateResponse, error) { @@ -233,16 +196,9 @@ func (c *Client) GetCompactionState(ctx context.Context, req *datapb.CompactionS commonpbutil.UpdateMsgBase( req.GetBase(), commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID())) - ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataNodeClient) (any, error) { - if !funcutil.CheckCtxValid(ctx) { - return nil, ctx.Err() - } + return wrapGrpcCall(ctx, c, func(client datapb.DataNodeClient) (*datapb.CompactionStateResponse, error) { return client.GetCompactionState(ctx, req) }) - if err != nil || ret == nil { - return nil, err - } - return ret.(*datapb.CompactionStateResponse), err } // Import data files(json, numpy, etc.) on MinIO/S3 storage, read and parse them into sealed segments @@ -251,16 +207,9 @@ func (c *Client) Import(ctx context.Context, req *datapb.ImportTaskRequest) (*co commonpbutil.UpdateMsgBase( req.GetBase(), commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID())) - ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataNodeClient) (any, error) { - if !funcutil.CheckCtxValid(ctx) { - return nil, ctx.Err() - } + return wrapGrpcCall(ctx, c, func(client datapb.DataNodeClient) (*commonpb.Status, error) { return client.Import(ctx, req) }) - if err != nil || ret == nil { - return nil, err - } - return ret.(*commonpb.Status), err } func (c *Client) ResendSegmentStats(ctx context.Context, req *datapb.ResendSegmentStatsRequest) (*datapb.ResendSegmentStatsResponse, error) { @@ -268,16 +217,9 @@ func (c *Client) ResendSegmentStats(ctx context.Context, req *datapb.ResendSegme commonpbutil.UpdateMsgBase( req.GetBase(), commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID())) - ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataNodeClient) (any, error) { - if !funcutil.CheckCtxValid(ctx) { - return nil, ctx.Err() - } + return wrapGrpcCall(ctx, c, func(client datapb.DataNodeClient) (*datapb.ResendSegmentStatsResponse, error) { return client.ResendSegmentStats(ctx, req) }) - if err != nil || ret == nil { - return nil, err - } - return ret.(*datapb.ResendSegmentStatsResponse), err } // AddImportSegment is the DataNode client side code for AddImportSegment call. @@ -286,28 +228,14 @@ func (c *Client) AddImportSegment(ctx context.Context, req *datapb.AddImportSegm commonpbutil.UpdateMsgBase( req.GetBase(), commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID())) - ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataNodeClient) (any, error) { - if !funcutil.CheckCtxValid(ctx) { - return nil, ctx.Err() - } + return wrapGrpcCall(ctx, c, func(client datapb.DataNodeClient) (*datapb.AddImportSegmentResponse, error) { return client.AddImportSegment(ctx, req) }) - if err != nil || ret == nil { - return nil, err - } - return ret.(*datapb.AddImportSegmentResponse), err } // SyncSegments is the DataNode client side code for SyncSegments call. func (c *Client) SyncSegments(ctx context.Context, req *datapb.SyncSegmentsRequest) (*commonpb.Status, error) { - ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataNodeClient) (any, error) { - if !funcutil.CheckCtxValid(ctx) { - return nil, ctx.Err() - } + return wrapGrpcCall(ctx, c, func(client datapb.DataNodeClient) (*commonpb.Status, error) { return client.SyncSegments(ctx, req) }) - if err != nil || ret == nil { - return nil, err - } - return ret.(*commonpb.Status), err }