diff --git a/internal/distributed/datacoord/client/client.go b/internal/distributed/datacoord/client/client.go index 8d7d9afcba..6f87909804 100644 --- a/internal/distributed/datacoord/client/client.go +++ b/internal/distributed/datacoord/client/client.go @@ -40,6 +40,8 @@ import ( // ClientParams is the parameters of client singleton var ClientParams paramtable.GrpcClientConfig +var Params paramtable.ComponentParam + var _ types.DataCoord = (*Client)(nil) // Client is the datacoord grpc client @@ -164,7 +166,10 @@ func (c *Client) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResp // Flush flushes a collection's data func (c *Client) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.FlushResponse, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID)) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.DataCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -216,7 +221,10 @@ func (c *Client) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI // error is returned only when some communication issue occurs func (c *Client) GetSegmentStates(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID)) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.DataCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -241,7 +249,10 @@ func (c *Client) GetSegmentStates(ctx context.Context, req *datapb.GetSegmentSta // error is returned only when some communication issue occurs func (c *Client) GetInsertBinlogPaths(ctx context.Context, req *datapb.GetInsertBinlogPathsRequest) (*datapb.GetInsertBinlogPathsResponse, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID)) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.DataCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -266,7 +277,10 @@ func (c *Client) GetInsertBinlogPaths(ctx context.Context, req *datapb.GetInsert // error is returned only when some communication issue occurs func (c *Client) GetCollectionStatistics(ctx context.Context, req *datapb.GetCollectionStatisticsRequest) (*datapb.GetCollectionStatisticsResponse, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID)) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.DataCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -291,7 +305,10 @@ func (c *Client) GetCollectionStatistics(ctx context.Context, req *datapb.GetCol // error is returned only when some communication issue occurs func (c *Client) GetPartitionStatistics(ctx context.Context, req *datapb.GetPartitionStatisticsRequest) (*datapb.GetPartitionStatisticsResponse, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID)) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.DataCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -328,7 +345,10 @@ func (c *Client) GetSegmentInfoChannel(ctx context.Context) (*milvuspb.StringRes // error is returned only when some communication issue occurs func (c *Client) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID)) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.DataCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -358,7 +378,10 @@ func (c *Client) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoR func (c *Client) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPathsRequest) (*commonpb.Status, error) { // use Call here on purpose req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID)) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.DataCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.Call(ctx, func(client datapb.DataCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -380,7 +403,10 @@ func (c *Client) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath // error is returned only when some communication issue occurs func (c *Client) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInfoRequest) (*datapb.GetRecoveryInfoResponse, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID)) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.DataCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -404,7 +430,10 @@ func (c *Client) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf // error is returned only when some communication issue occurs func (c *Client) GetFlushedSegments(ctx context.Context, req *datapb.GetFlushedSegmentsRequest) (*datapb.GetFlushedSegmentsResponse, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID)) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.DataCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -427,7 +456,10 @@ func (c *Client) GetFlushedSegments(ctx context.Context, req *datapb.GetFlushedS // error is returned only when some communication issue occurs func (c *Client) GetSegmentsByStates(ctx context.Context, req *datapb.GetSegmentsByStatesRequest) (*datapb.GetSegmentsByStatesResponse, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID)) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.DataCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -443,7 +475,10 @@ func (c *Client) GetSegmentsByStates(ctx context.Context, req *datapb.GetSegment // ShowConfigurations gets specified configurations para of DataCoord func (c *Client) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID)) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.DataCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -460,7 +495,10 @@ func (c *Client) ShowConfigurations(ctx context.Context, req *internalpb.ShowCon // GetMetrics gets all metrics of datacoord func (c *Client) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID)) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.DataCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -546,7 +584,10 @@ func (c *Client) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateR // DropVirtualChannel drops virtual channel in datacoord. func (c *Client) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID)) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.DataCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -562,7 +603,10 @@ func (c *Client) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtual // SetSegmentState sets the state of a given segment. func (c *Client) SetSegmentState(ctx context.Context, req *datapb.SetSegmentStateRequest) (*datapb.SetSegmentStateResponse, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID)) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.DataCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -578,7 +622,10 @@ func (c *Client) SetSegmentState(ctx context.Context, req *datapb.SetSegmentStat // Import data files(json, numpy, etc.) on MinIO/S3 storage, read and parse them into sealed segments func (c *Client) Import(ctx context.Context, req *datapb.ImportTaskRequest) (*datapb.ImportTaskResponse, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID)) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.DataCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -594,7 +641,10 @@ func (c *Client) Import(ctx context.Context, req *datapb.ImportTaskRequest) (*da // UpdateSegmentStatistics is the client side caller of UpdateSegmentStatistics. func (c *Client) UpdateSegmentStatistics(ctx context.Context, req *datapb.UpdateSegmentStatisticsRequest) (*commonpb.Status, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID)) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.DataCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -610,7 +660,10 @@ func (c *Client) UpdateSegmentStatistics(ctx context.Context, req *datapb.Update // AcquireSegmentLock acquire the reference lock of the segments. func (c *Client) AcquireSegmentLock(ctx context.Context, req *datapb.AcquireSegmentLockRequest) (*commonpb.Status, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID)) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.DataCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -626,7 +679,10 @@ func (c *Client) AcquireSegmentLock(ctx context.Context, req *datapb.AcquireSegm // ReleaseSegmentLock release the reference lock of the segments. func (c *Client) ReleaseSegmentLock(ctx context.Context, req *datapb.ReleaseSegmentLockRequest) (*commonpb.Status, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID)) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.DataCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -642,7 +698,10 @@ func (c *Client) ReleaseSegmentLock(ctx context.Context, req *datapb.ReleaseSegm // SaveImportSegment is the DataCoord client side code for SaveImportSegment call. func (c *Client) SaveImportSegment(ctx context.Context, req *datapb.SaveImportSegmentRequest) (*commonpb.Status, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID)) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.DataCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -657,7 +716,10 @@ func (c *Client) SaveImportSegment(ctx context.Context, req *datapb.SaveImportSe func (c *Client) UnsetIsImportingState(ctx context.Context, req *datapb.UnsetIsImportingStateRequest) (*commonpb.Status, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID)) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.DataCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -672,7 +734,10 @@ func (c *Client) UnsetIsImportingState(ctx context.Context, req *datapb.UnsetIsI func (c *Client) MarkSegmentsDropped(ctx context.Context, req *datapb.MarkSegmentsDroppedRequest) (*commonpb.Status, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID)) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.DataCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -688,7 +753,10 @@ func (c *Client) MarkSegmentsDropped(ctx context.Context, req *datapb.MarkSegmen // BroadcastAlteredCollection is the DataCoord client side code for BroadcastAlteredCollection call. func (c *Client) BroadcastAlteredCollection(ctx context.Context, req *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID)) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.DataCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() diff --git a/internal/distributed/datanode/client/client.go b/internal/distributed/datanode/client/client.go index c501fa7b48..c88b8f0b18 100644 --- a/internal/distributed/datanode/client/client.go +++ b/internal/distributed/datanode/client/client.go @@ -24,6 +24,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/milvuspb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/util/commonpbutil" "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/grpcclient" "github.com/milvus-io/milvus/internal/util/paramtable" @@ -33,6 +34,8 @@ import ( var ClientParams paramtable.GrpcClientConfig +var Params paramtable.ComponentParam + // Client is the grpc client for DataNode type Client struct { grpcClient grpcclient.GrpcClient[datapb.DataNodeClient] @@ -129,6 +132,10 @@ func (c *Client) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResp // Deprecated // WatchDmChannels create consumers on dmChannels to reveive Incremental data func (c *Client) WatchDmChannels(ctx context.Context, req *datapb.WatchDmChannelsRequest) (*commonpb.Status, error) { + req = typeutil.Clone(req) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.DataNodeCfg.GetNodeID())) ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataNodeClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -142,14 +149,22 @@ func (c *Client) WatchDmChannels(ctx context.Context, req *datapb.WatchDmChannel } // FlushSegments notifies DataNode to flush the segments req provids. The flush tasks are async to this -// rpc, DataNode will flush the segments in the background. +// +// rpc, DataNode will flush the segments in the background. // // Return UnexpectedError code in status: -// If DataNode isn't in HEALTHY: states not HEALTHY or dynamic checks not HEALTHY -// If DataNode doesn't find the correspounding segmentID in its memeory replica +// +// If DataNode isn't in HEALTHY: states not HEALTHY or dynamic checks not HEALTHY +// If DataNode doesn't find the correspounding segmentID in its memeory replica +// // Return Success code in status and trigers background flush: -// Log an info log if a segment is under flushing +// +// Log an info log if a segment is under flushing func (c *Client) FlushSegments(ctx context.Context, req *datapb.FlushSegmentsRequest) (*commonpb.Status, error) { + req = typeutil.Clone(req) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.DataNodeCfg.GetNodeID())) ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataNodeClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -164,6 +179,10 @@ func (c *Client) FlushSegments(ctx context.Context, req *datapb.FlushSegmentsReq // ShowConfigurations gets specified configurations para of DataNode func (c *Client) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) { + req = typeutil.Clone(req) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.DataNodeCfg.GetNodeID())) ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataNodeClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -179,6 +198,10 @@ func (c *Client) ShowConfigurations(ctx context.Context, req *internalpb.ShowCon // GetMetrics returns metrics func (c *Client) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { + req = typeutil.Clone(req) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.DataNodeCfg.GetNodeID())) ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataNodeClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -206,6 +229,10 @@ func (c *Client) Compaction(ctx context.Context, req *datapb.CompactionPlan) (*c } func (c *Client) GetCompactionState(ctx context.Context, req *datapb.CompactionStateRequest) (*datapb.CompactionStateResponse, error) { + req = typeutil.Clone(req) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.DataNodeCfg.GetNodeID())) ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataNodeClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -220,6 +247,10 @@ func (c *Client) GetCompactionState(ctx context.Context, req *datapb.CompactionS // Import data files(json, numpy, etc.) on MinIO/S3 storage, read and parse them into sealed segments func (c *Client) Import(ctx context.Context, req *datapb.ImportTaskRequest) (*commonpb.Status, error) { + req = typeutil.Clone(req) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.DataNodeCfg.GetNodeID())) ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataNodeClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -233,6 +264,10 @@ func (c *Client) Import(ctx context.Context, req *datapb.ImportTaskRequest) (*co } func (c *Client) ResendSegmentStats(ctx context.Context, req *datapb.ResendSegmentStatsRequest) (*datapb.ResendSegmentStatsResponse, error) { + req = typeutil.Clone(req) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.DataNodeCfg.GetNodeID())) ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataNodeClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -247,6 +282,10 @@ func (c *Client) ResendSegmentStats(ctx context.Context, req *datapb.ResendSegme // AddImportSegment is the DataNode client side code for AddImportSegment call. func (c *Client) AddImportSegment(ctx context.Context, req *datapb.AddImportSegmentRequest) (*datapb.AddImportSegmentResponse, error) { + req = typeutil.Clone(req) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.DataNodeCfg.GetNodeID())) ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataNodeClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() diff --git a/internal/distributed/indexcoord/client/client.go b/internal/distributed/indexcoord/client/client.go index f241263e4f..668d2e9baf 100644 --- a/internal/distributed/indexcoord/client/client.go +++ b/internal/distributed/indexcoord/client/client.go @@ -39,6 +39,8 @@ import ( var ClientParams paramtable.GrpcClientConfig +var Params paramtable.ComponentParam + // Client is the grpc client of IndexCoord. type Client struct { grpcClient grpcclient.GrpcClient[indexpb.IndexCoordClient] @@ -246,7 +248,10 @@ func (c *Client) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) ( // ShowConfigurations gets specified configurations para of IndexCoord func (c *Client) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID)) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.IndexCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client indexpb.IndexCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -263,7 +268,10 @@ func (c *Client) ShowConfigurations(ctx context.Context, req *internalpb.ShowCon // GetMetrics gets the metrics info of IndexCoord. func (c *Client) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID)) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.IndexCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client indexpb.IndexCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() diff --git a/internal/distributed/indexnode/client/client.go b/internal/distributed/indexnode/client/client.go index 9601fbdafa..86a8f3f4cc 100644 --- a/internal/distributed/indexnode/client/client.go +++ b/internal/distributed/indexnode/client/client.go @@ -26,6 +26,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/milvuspb" "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/util/commonpbutil" "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/grpcclient" "github.com/milvus-io/milvus/internal/util/paramtable" @@ -34,6 +35,8 @@ import ( var ClientParams paramtable.GrpcClientConfig +var Params paramtable.ComponentParam + // Client is the grpc client of IndexNode. type Client struct { grpcClient grpcclient.GrpcClient[indexpb.IndexNodeClient] @@ -183,6 +186,10 @@ func (c *Client) GetJobStats(ctx context.Context, req *indexpb.GetJobStatsReques // ShowConfigurations gets specified configurations para of IndexNode func (c *Client) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) { + req = typeutil.Clone(req) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.IndexNodeCfg.GetNodeID())) ret, err := c.grpcClient.ReCall(ctx, func(client indexpb.IndexNodeClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -198,6 +205,10 @@ func (c *Client) ShowConfigurations(ctx context.Context, req *internalpb.ShowCon // GetMetrics gets the metrics info of IndexNode. func (c *Client) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { + req = typeutil.Clone(req) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.IndexNodeCfg.GetNodeID())) ret, err := c.grpcClient.ReCall(ctx, func(client indexpb.IndexNodeClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() diff --git a/internal/distributed/proxy/client/client.go b/internal/distributed/proxy/client/client.go index 15c3b99627..d86f0ba80c 100644 --- a/internal/distributed/proxy/client/client.go +++ b/internal/distributed/proxy/client/client.go @@ -34,6 +34,8 @@ import ( var ClientParams paramtable.GrpcClientConfig +var Params paramtable.ComponentParam + // Client is the grpc client for Proxy type Client struct { grpcClient grpcclient.GrpcClient[proxypb.ProxyClient] @@ -126,7 +128,10 @@ func (c *Client) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResp // InvalidateCollectionMetaCache invalidate collection meta cache func (c *Client) InvalidateCollectionMetaCache(ctx context.Context, req *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID())) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.ProxyCfg.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())), + ) ret, err := c.grpcClient.ReCall(ctx, func(client proxypb.ProxyClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -141,7 +146,10 @@ func (c *Client) InvalidateCollectionMetaCache(ctx context.Context, req *proxypb func (c *Client) InvalidateCredentialCache(ctx context.Context, req *proxypb.InvalidateCredCacheRequest) (*commonpb.Status, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID())) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.ProxyCfg.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())), + ) ret, err := c.grpcClient.ReCall(ctx, func(client proxypb.ProxyClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -156,7 +164,10 @@ func (c *Client) InvalidateCredentialCache(ctx context.Context, req *proxypb.Inv func (c *Client) UpdateCredentialCache(ctx context.Context, req *proxypb.UpdateCredCacheRequest) (*commonpb.Status, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID())) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.ProxyCfg.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())), + ) ret, err := c.grpcClient.ReCall(ctx, func(client proxypb.ProxyClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -171,7 +182,10 @@ func (c *Client) UpdateCredentialCache(ctx context.Context, req *proxypb.UpdateC func (c *Client) RefreshPolicyInfoCache(ctx context.Context, req *proxypb.RefreshPolicyInfoCacheRequest) (*commonpb.Status, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID())) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.ProxyCfg.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())), + ) ret, err := c.grpcClient.ReCall(ctx, func(client proxypb.ProxyClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -188,7 +202,10 @@ func (c *Client) RefreshPolicyInfoCache(ctx context.Context, req *proxypb.Refres // because it only obtains the metrics of Proxy, not including the topological metrics of Query cluster and Data cluster. func (c *Client) GetProxyMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID())) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.ProxyCfg.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())), + ) ret, err := c.grpcClient.ReCall(ctx, func(client proxypb.ProxyClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -204,7 +221,10 @@ func (c *Client) GetProxyMetrics(ctx context.Context, req *milvuspb.GetMetricsRe // SetRates notifies Proxy to limit rates of requests. func (c *Client) SetRates(ctx context.Context, req *proxypb.SetRatesRequest) (*commonpb.Status, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID())) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.ProxyCfg.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())), + ) ret, err := c.grpcClient.ReCall(ctx, func(client proxypb.ProxyClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() diff --git a/internal/distributed/querycoord/client/client.go b/internal/distributed/querycoord/client/client.go index 01c27019fb..a1e8609255 100644 --- a/internal/distributed/querycoord/client/client.go +++ b/internal/distributed/querycoord/client/client.go @@ -38,6 +38,8 @@ import ( var ClientParams paramtable.GrpcClientConfig +var Params paramtable.ComponentParam + // Client is the grpc client of QueryCoord. type Client struct { grpcClient grpcclient.GrpcClient[querypb.QueryCoordClient] @@ -159,7 +161,10 @@ func (c *Client) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResp // ShowCollections shows the collections in the QueryCoord. func (c *Client) ShowCollections(ctx context.Context, req *querypb.ShowCollectionsRequest) (*querypb.ShowCollectionsResponse, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID)) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.QueryCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -175,7 +180,10 @@ func (c *Client) ShowCollections(ctx context.Context, req *querypb.ShowCollectio // LoadCollection loads the data of the specified collections in the QueryCoord. func (c *Client) LoadCollection(ctx context.Context, req *querypb.LoadCollectionRequest) (*commonpb.Status, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID)) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.QueryCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -191,7 +199,10 @@ func (c *Client) LoadCollection(ctx context.Context, req *querypb.LoadCollection // ReleaseCollection release the data of the specified collections in the QueryCoord. func (c *Client) ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID)) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.QueryCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -207,7 +218,10 @@ func (c *Client) ReleaseCollection(ctx context.Context, req *querypb.ReleaseColl // ShowPartitions shows the partitions in the QueryCoord. func (c *Client) ShowPartitions(ctx context.Context, req *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID)) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.QueryCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -223,7 +237,10 @@ func (c *Client) ShowPartitions(ctx context.Context, req *querypb.ShowPartitions // LoadPartitions loads the data of the specified partitions in the QueryCoord. func (c *Client) LoadPartitions(ctx context.Context, req *querypb.LoadPartitionsRequest) (*commonpb.Status, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID)) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.QueryCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -239,7 +256,10 @@ func (c *Client) LoadPartitions(ctx context.Context, req *querypb.LoadPartitions // ReleasePartitions release the data of the specified partitions in the QueryCoord. func (c *Client) ReleasePartitions(ctx context.Context, req *querypb.ReleasePartitionsRequest) (*commonpb.Status, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID)) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.QueryCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -255,7 +275,10 @@ func (c *Client) ReleasePartitions(ctx context.Context, req *querypb.ReleasePart // GetPartitionStates gets the states of the specified partition. func (c *Client) GetPartitionStates(ctx context.Context, req *querypb.GetPartitionStatesRequest) (*querypb.GetPartitionStatesResponse, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID)) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.QueryCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -271,7 +294,10 @@ func (c *Client) GetPartitionStates(ctx context.Context, req *querypb.GetPartiti // GetSegmentInfo gets the information of the specified segment from QueryCoord. func (c *Client) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmentInfoRequest) (*querypb.GetSegmentInfoResponse, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID)) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.QueryCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -287,7 +313,10 @@ func (c *Client) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmentInfo // LoadBalance migrate the sealed segments on the source node to the dst nodes. func (c *Client) LoadBalance(ctx context.Context, req *querypb.LoadBalanceRequest) (*commonpb.Status, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID)) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.QueryCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -303,7 +332,10 @@ func (c *Client) LoadBalance(ctx context.Context, req *querypb.LoadBalanceReques // ShowConfigurations gets specified configurations para of QueryCoord func (c *Client) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID)) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.QueryCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -320,7 +352,10 @@ func (c *Client) ShowConfigurations(ctx context.Context, req *internalpb.ShowCon // GetMetrics gets the metrics information of QueryCoord. func (c *Client) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID)) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.QueryCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -336,7 +371,10 @@ func (c *Client) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest // GetReplicas gets the replicas of a certain collection. func (c *Client) GetReplicas(ctx context.Context, req *milvuspb.GetReplicasRequest) (*milvuspb.GetReplicasResponse, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID)) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.QueryCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -352,7 +390,10 @@ func (c *Client) GetReplicas(ctx context.Context, req *milvuspb.GetReplicasReque // GetShardLeaders gets the shard leaders of a certain collection. func (c *Client) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeadersRequest) (*querypb.GetShardLeadersResponse, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID)) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.QueryCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() diff --git a/internal/distributed/querynode/client/client.go b/internal/distributed/querynode/client/client.go index 13610c6cd3..96d06ebd6a 100644 --- a/internal/distributed/querynode/client/client.go +++ b/internal/distributed/querynode/client/client.go @@ -26,6 +26,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/milvuspb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/util/commonpbutil" "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/grpcclient" "github.com/milvus-io/milvus/internal/util/paramtable" @@ -34,6 +35,8 @@ import ( var ClientParams paramtable.GrpcClientConfig +var Params paramtable.ComponentParam + // Client is the grpc client of QueryNode. type Client struct { grpcClient grpcclient.GrpcClient[querypb.QueryNodeClient] @@ -140,6 +143,10 @@ func (c *Client) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResp // WatchDmChannels watches the channels about data manipulation. func (c *Client) WatchDmChannels(ctx context.Context, req *querypb.WatchDmChannelsRequest) (*commonpb.Status, error) { + req = typeutil.Clone(req) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.QueryNodeCfg.GetNodeID())) ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryNodeClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -154,6 +161,10 @@ func (c *Client) WatchDmChannels(ctx context.Context, req *querypb.WatchDmChanne // UnsubDmChannel unsubscribes the channels about data manipulation. func (c *Client) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmChannelRequest) (*commonpb.Status, error) { + req = typeutil.Clone(req) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.QueryNodeCfg.GetNodeID())) ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryNodeClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -168,6 +179,10 @@ func (c *Client) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmChannel // LoadSegments loads the segments to search. func (c *Client) LoadSegments(ctx context.Context, req *querypb.LoadSegmentsRequest) (*commonpb.Status, error) { + req = typeutil.Clone(req) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.QueryNodeCfg.GetNodeID())) ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryNodeClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -182,6 +197,10 @@ func (c *Client) LoadSegments(ctx context.Context, req *querypb.LoadSegmentsRequ // ReleaseCollection releases the data of the specified collection in QueryNode. func (c *Client) ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) { + req = typeutil.Clone(req) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.QueryNodeCfg.GetNodeID())) ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryNodeClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -196,6 +215,10 @@ func (c *Client) ReleaseCollection(ctx context.Context, req *querypb.ReleaseColl // ReleasePartitions releases the data of the specified partitions in QueryNode. func (c *Client) ReleasePartitions(ctx context.Context, req *querypb.ReleasePartitionsRequest) (*commonpb.Status, error) { + req = typeutil.Clone(req) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.QueryNodeCfg.GetNodeID())) ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryNodeClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -210,6 +233,10 @@ func (c *Client) ReleasePartitions(ctx context.Context, req *querypb.ReleasePart // ReleaseSegments releases the data of the specified segments in QueryNode. func (c *Client) ReleaseSegments(ctx context.Context, req *querypb.ReleaseSegmentsRequest) (*commonpb.Status, error) { + req = typeutil.Clone(req) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.QueryNodeCfg.GetNodeID())) ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryNodeClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -252,6 +279,10 @@ func (c *Client) Query(ctx context.Context, req *querypb.QueryRequest) (*interna // GetSegmentInfo gets the information of the specified segments in QueryNode. func (c *Client) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmentInfoRequest) (*querypb.GetSegmentInfoResponse, error) { + req = typeutil.Clone(req) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.QueryNodeCfg.GetNodeID())) ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryNodeClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -266,6 +297,10 @@ func (c *Client) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmentInfo // SyncReplicaSegments syncs replica node segments information to shard leaders. func (c *Client) SyncReplicaSegments(ctx context.Context, req *querypb.SyncReplicaSegmentsRequest) (*commonpb.Status, error) { + req = typeutil.Clone(req) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.QueryNodeCfg.GetNodeID())) ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryNodeClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -280,6 +315,10 @@ func (c *Client) SyncReplicaSegments(ctx context.Context, req *querypb.SyncRepli // ShowConfigurations gets specified configurations para of QueryNode func (c *Client) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) { + req = typeutil.Clone(req) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.QueryNodeCfg.GetNodeID())) ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryNodeClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -295,6 +334,10 @@ func (c *Client) ShowConfigurations(ctx context.Context, req *internalpb.ShowCon // GetMetrics gets the metrics information of QueryNode. func (c *Client) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { + req = typeutil.Clone(req) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.QueryNodeCfg.GetNodeID())) ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryNodeClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -321,6 +364,10 @@ func (c *Client) GetStatistics(ctx context.Context, request *querypb.GetStatisti } func (c *Client) GetDataDistribution(ctx context.Context, req *querypb.GetDataDistributionRequest) (*querypb.GetDataDistributionResponse, error) { + req = typeutil.Clone(req) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.QueryNodeCfg.GetNodeID())) ret, err := c.grpcClient.Call(ctx, func(client querypb.QueryNodeClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -334,6 +381,10 @@ func (c *Client) GetDataDistribution(ctx context.Context, req *querypb.GetDataDi } func (c *Client) SyncDistribution(ctx context.Context, req *querypb.SyncDistributionRequest) (*commonpb.Status, error) { + req = typeutil.Clone(req) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.QueryNodeCfg.GetNodeID())) ret, err := c.grpcClient.Call(ctx, func(client querypb.QueryNodeClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() diff --git a/internal/distributed/rootcoord/client/client.go b/internal/distributed/rootcoord/client/client.go index 9365a9bf05..d5070d3657 100644 --- a/internal/distributed/rootcoord/client/client.go +++ b/internal/distributed/rootcoord/client/client.go @@ -39,6 +39,8 @@ import ( var ClientParams paramtable.GrpcClientConfig +var Params paramtable.ComponentParam + // Client grpc client type Client struct { grpcClient grpcclient.GrpcClient[rootcoordpb.RootCoordClient] @@ -165,7 +167,10 @@ func (c *Client) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResp // CreateCollection create collection func (c *Client) CreateCollection(ctx context.Context, in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) { in = typeutil.Clone(in) - commonpbutil.UpdateMsgBase(in.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID())) + commonpbutil.UpdateMsgBase( + in.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -181,7 +186,10 @@ func (c *Client) CreateCollection(ctx context.Context, in *milvuspb.CreateCollec // DropCollection drop collection func (c *Client) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRequest) (*commonpb.Status, error) { in = typeutil.Clone(in) - commonpbutil.UpdateMsgBase(in.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID())) + commonpbutil.UpdateMsgBase( + in.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -197,7 +205,10 @@ func (c *Client) DropCollection(ctx context.Context, in *milvuspb.DropCollection // HasCollection check collection existence func (c *Client) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) { in = typeutil.Clone(in) - commonpbutil.UpdateMsgBase(in.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID())) + commonpbutil.UpdateMsgBase( + in.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -213,7 +224,10 @@ func (c *Client) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRe // DescribeCollection return collection info func (c *Client) DescribeCollection(ctx context.Context, in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) { in = typeutil.Clone(in) - commonpbutil.UpdateMsgBase(in.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID())) + commonpbutil.UpdateMsgBase( + in.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -229,7 +243,10 @@ func (c *Client) DescribeCollection(ctx context.Context, in *milvuspb.DescribeCo // ShowCollections list all collection names func (c *Client) ShowCollections(ctx context.Context, in *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) { in = typeutil.Clone(in) - commonpbutil.UpdateMsgBase(in.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID())) + commonpbutil.UpdateMsgBase( + in.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -244,7 +261,10 @@ func (c *Client) ShowCollections(ctx context.Context, in *milvuspb.ShowCollectio func (c *Client) AlterCollection(ctx context.Context, request *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) { request = typeutil.Clone(request) - commonpbutil.UpdateMsgBase(request.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID())) + commonpbutil.UpdateMsgBase( + request.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -260,7 +280,10 @@ func (c *Client) AlterCollection(ctx context.Context, request *milvuspb.AlterCol // CreatePartition create partition func (c *Client) CreatePartition(ctx context.Context, in *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) { in = typeutil.Clone(in) - commonpbutil.UpdateMsgBase(in.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID())) + commonpbutil.UpdateMsgBase( + in.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -276,7 +299,10 @@ func (c *Client) CreatePartition(ctx context.Context, in *milvuspb.CreatePartiti // DropPartition drop partition func (c *Client) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequest) (*commonpb.Status, error) { in = typeutil.Clone(in) - commonpbutil.UpdateMsgBase(in.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID())) + commonpbutil.UpdateMsgBase( + in.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -292,7 +318,10 @@ func (c *Client) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRe // HasPartition check partition existence func (c *Client) HasPartition(ctx context.Context, in *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) { in = typeutil.Clone(in) - commonpbutil.UpdateMsgBase(in.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID())) + commonpbutil.UpdateMsgBase( + in.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -308,7 +337,10 @@ func (c *Client) HasPartition(ctx context.Context, in *milvuspb.HasPartitionRequ // ShowPartitions list all partitions in collection func (c *Client) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) { in = typeutil.Clone(in) - commonpbutil.UpdateMsgBase(in.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID())) + commonpbutil.UpdateMsgBase( + in.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -324,7 +356,10 @@ func (c *Client) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitions // AllocTimestamp global timestamp allocator func (c *Client) AllocTimestamp(ctx context.Context, in *rootcoordpb.AllocTimestampRequest) (*rootcoordpb.AllocTimestampResponse, error) { in = typeutil.Clone(in) - commonpbutil.UpdateMsgBase(in.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID())) + commonpbutil.UpdateMsgBase( + in.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -340,7 +375,10 @@ func (c *Client) AllocTimestamp(ctx context.Context, in *rootcoordpb.AllocTimest // AllocID global ID allocator func (c *Client) AllocID(ctx context.Context, in *rootcoordpb.AllocIDRequest) (*rootcoordpb.AllocIDResponse, error) { in = typeutil.Clone(in) - commonpbutil.UpdateMsgBase(in.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID())) + commonpbutil.UpdateMsgBase( + in.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -356,7 +394,10 @@ func (c *Client) AllocID(ctx context.Context, in *rootcoordpb.AllocIDRequest) (* // UpdateChannelTimeTick used to handle ChannelTimeTickMsg func (c *Client) UpdateChannelTimeTick(ctx context.Context, in *internalpb.ChannelTimeTickMsg) (*commonpb.Status, error) { in = typeutil.Clone(in) - commonpbutil.UpdateMsgBase(in.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID())) + commonpbutil.UpdateMsgBase( + in.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -372,7 +413,10 @@ func (c *Client) UpdateChannelTimeTick(ctx context.Context, in *internalpb.Chann // ShowSegments list all segments func (c *Client) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsRequest) (*milvuspb.ShowSegmentsResponse, error) { in = typeutil.Clone(in) - commonpbutil.UpdateMsgBase(in.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID())) + commonpbutil.UpdateMsgBase( + in.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -388,7 +432,10 @@ func (c *Client) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsRequ // InvalidateCollectionMetaCache notifies RootCoord to release the collection cache in Proxies. func (c *Client) InvalidateCollectionMetaCache(ctx context.Context, in *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) { in = typeutil.Clone(in) - commonpbutil.UpdateMsgBase(in.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID())) + commonpbutil.UpdateMsgBase( + in.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -404,7 +451,10 @@ func (c *Client) InvalidateCollectionMetaCache(ctx context.Context, in *proxypb. // ShowConfigurations gets specified configurations para of RootCoord func (c *Client) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID())) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -421,7 +471,10 @@ func (c *Client) ShowConfigurations(ctx context.Context, req *internalpb.ShowCon // GetMetrics get metrics func (c *Client) GetMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { in = typeutil.Clone(in) - commonpbutil.UpdateMsgBase(in.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID())) + commonpbutil.UpdateMsgBase( + in.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -437,7 +490,10 @@ func (c *Client) GetMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) // CreateAlias create collection alias func (c *Client) CreateAlias(ctx context.Context, req *milvuspb.CreateAliasRequest) (*commonpb.Status, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID())) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -453,7 +509,10 @@ func (c *Client) CreateAlias(ctx context.Context, req *milvuspb.CreateAliasReque // DropAlias drop collection alias func (c *Client) DropAlias(ctx context.Context, req *milvuspb.DropAliasRequest) (*commonpb.Status, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID())) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -469,7 +528,10 @@ func (c *Client) DropAlias(ctx context.Context, req *milvuspb.DropAliasRequest) // AlterAlias alter collection alias func (c *Client) AlterAlias(ctx context.Context, req *milvuspb.AlterAliasRequest) (*commonpb.Status, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID())) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -553,7 +615,10 @@ func (c *Client) CreateCredential(ctx context.Context, req *internalpb.Credentia func (c *Client) GetCredential(ctx context.Context, req *rootcoordpb.GetCredentialRequest) (*rootcoordpb.GetCredentialResponse, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID())) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -581,7 +646,10 @@ func (c *Client) UpdateCredential(ctx context.Context, req *internalpb.Credentia func (c *Client) DeleteCredential(ctx context.Context, req *milvuspb.DeleteCredentialRequest) (*commonpb.Status, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID())) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -596,7 +664,10 @@ func (c *Client) DeleteCredential(ctx context.Context, req *milvuspb.DeleteCrede func (c *Client) ListCredUsers(ctx context.Context, req *milvuspb.ListCredUsersRequest) (*milvuspb.ListCredUsersResponse, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID())) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -611,7 +682,10 @@ func (c *Client) ListCredUsers(ctx context.Context, req *milvuspb.ListCredUsersR func (c *Client) CreateRole(ctx context.Context, req *milvuspb.CreateRoleRequest) (*commonpb.Status, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID())) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -626,7 +700,10 @@ func (c *Client) CreateRole(ctx context.Context, req *milvuspb.CreateRoleRequest func (c *Client) DropRole(ctx context.Context, req *milvuspb.DropRoleRequest) (*commonpb.Status, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID())) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -641,7 +718,10 @@ func (c *Client) DropRole(ctx context.Context, req *milvuspb.DropRoleRequest) (* func (c *Client) OperateUserRole(ctx context.Context, req *milvuspb.OperateUserRoleRequest) (*commonpb.Status, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID())) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -656,7 +736,10 @@ func (c *Client) OperateUserRole(ctx context.Context, req *milvuspb.OperateUserR func (c *Client) SelectRole(ctx context.Context, req *milvuspb.SelectRoleRequest) (*milvuspb.SelectRoleResponse, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID())) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -671,7 +754,10 @@ func (c *Client) SelectRole(ctx context.Context, req *milvuspb.SelectRoleRequest func (c *Client) SelectUser(ctx context.Context, req *milvuspb.SelectUserRequest) (*milvuspb.SelectUserResponse, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID())) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -686,7 +772,10 @@ func (c *Client) SelectUser(ctx context.Context, req *milvuspb.SelectUserRequest func (c *Client) OperatePrivilege(ctx context.Context, req *milvuspb.OperatePrivilegeRequest) (*commonpb.Status, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID())) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -701,7 +790,10 @@ func (c *Client) OperatePrivilege(ctx context.Context, req *milvuspb.OperatePriv func (c *Client) SelectGrant(ctx context.Context, req *milvuspb.SelectGrantRequest) (*milvuspb.SelectGrantResponse, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID())) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() @@ -716,7 +808,10 @@ func (c *Client) SelectGrant(ctx context.Context, req *milvuspb.SelectGrantReque func (c *Client) ListPolicy(ctx context.Context, req *internalpb.ListPolicyRequest) (*internalpb.ListPolicyResponse, error) { req = typeutil.Clone(req) - commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID())) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() diff --git a/internal/util/commonpbutil/commonpbutil.go b/internal/util/commonpbutil/commonpbutil.go index 96d45af951..edaf9f0555 100644 --- a/internal/util/commonpbutil/commonpbutil.go +++ b/internal/util/commonpbutil/commonpbutil.go @@ -50,16 +50,27 @@ func WithSourceID(sourceID int64) MsgBaseOptions { } } +func WithTargetID(targetID int64) MsgBaseOptions { + return func(msgBase *commonpb.MsgBase) { + msgBase.TargetID = targetID + } +} + func GetNowTimestamp() uint64 { return uint64(time.Now().Unix()) } -func FillMsgBaseFromClient(targetID int64) MsgBaseOptions { +func FillMsgBaseFromClient(sourceID int64, options ...MsgBaseOptions) MsgBaseOptions { return func(msgBase *commonpb.MsgBase) { if msgBase.Timestamp == 0 { msgBase.Timestamp = GetNowTimestamp() } - msgBase.TargetID = targetID + if msgBase.SourceID == 0 { + msgBase.SourceID = sourceID + } + for _, op := range options { + op(msgBase) + } } } @@ -88,5 +99,4 @@ func UpdateMsgBase(msgBase *commonpb.MsgBase, options ...MsgBaseOptions) *common op(msgBaseRt) } return msgBaseRt - } diff --git a/internal/util/paramtable/component_param.go b/internal/util/paramtable/component_param.go index 08df6a31be..8f9db5c69a 100644 --- a/internal/util/paramtable/component_param.go +++ b/internal/util/paramtable/component_param.go @@ -111,7 +111,7 @@ func (p *ComponentParam) KafkaEnable() bool { return p.KafkaCfg.Address != "" } -/////////////////////////////////////////////////////////////////////////////// +// ///////////////////////////////////////////////////////////////////////////// // --- common --- type commonConfig struct { Base *BaseTable @@ -453,7 +453,7 @@ func (p *commonConfig) initSessionRetryTimes() { p.SessionRetryTimes = p.Base.ParseInt64WithDefault("common.session.retryTimes", 30) } -/////////////////////////////////////////////////////////////////////////////// +// ///////////////////////////////////////////////////////////////////////////// // --- rootcoord --- type rootCoordConfig struct { Base *BaseTable @@ -461,6 +461,8 @@ type rootCoordConfig struct { Address string Port int + NodeID atomic.Value + DmlChannelNum int64 MaxPartitionNum int64 MinSegmentSizeToEnableIndex int64 @@ -485,9 +487,22 @@ func (p *rootCoordConfig) init(base *BaseTable) { p.ImportTaskRetention = p.Base.ParseFloatWithDefault("rootCoord.importTaskRetention", 24*60*60) p.ImportTaskSubPath = "importtask" p.EnableActiveStandby = p.Base.ParseBool("rootCoord.enableActiveStandby", false) + p.NodeID.Store(UniqueID(0)) } -/////////////////////////////////////////////////////////////////////////////// +func (p *rootCoordConfig) SetNodeID(id UniqueID) { + p.NodeID.Store(id) +} + +func (p *rootCoordConfig) GetNodeID() UniqueID { + val := p.NodeID.Load() + if val != nil { + return val.(UniqueID) + } + return 0 +} + +// ///////////////////////////////////////////////////////////////////////////// // --- proxy --- type proxyConfig struct { Base *BaseTable @@ -666,7 +681,7 @@ func (p *proxyConfig) initMaxRoleNum() { p.MaxRoleNum = int(maxRoleNum) } -/////////////////////////////////////////////////////////////////////////////// +// ///////////////////////////////////////////////////////////////////////////// // --- querycoord --- type queryCoordConfig struct { Base *BaseTable @@ -855,7 +870,7 @@ func (p *queryCoordConfig) GetNodeID() UniqueID { return 0 } -/////////////////////////////////////////////////////////////////////////////// +// ///////////////////////////////////////////////////////////////////////////// // --- querynode --- type queryNodeConfig struct { Base *BaseTable @@ -1102,7 +1117,7 @@ func (p *queryNodeConfig) initDiskCapacity() { p.DiskCapacityLimit = diskSize * 1024 * 1024 * 1024 } -/////////////////////////////////////////////////////////////////////////////// +// ///////////////////////////////////////////////////////////////////////////// // --- datacoord --- type dataCoordConfig struct { Base *BaseTable @@ -1319,7 +1334,7 @@ func (p *dataCoordConfig) GetNodeID() UniqueID { return 0 } -/////////////////////////////////////////////////////////////////////////////// +// ///////////////////////////////////////////////////////////////////////////// // --- datanode --- type dataNodeConfig struct { Base *BaseTable @@ -1401,7 +1416,7 @@ func (p *dataNodeConfig) GetNodeID() UniqueID { return 0 } -/////////////////////////////////////////////////////////////////////////////// +// ///////////////////////////////////////////////////////////////////////////// // --- indexcoord --- type indexCoordConfig struct { Base *BaseTable @@ -1414,6 +1429,8 @@ type indexCoordConfig struct { WithCredential bool IndexNodeID int64 + NodeID atomic.Value + MinSegmentNumRowsToEnableIndex int64 GCInterval time.Duration @@ -1434,6 +1451,7 @@ func (p *indexCoordConfig) init(base *BaseTable) { p.initWithCredential() p.initIndexNodeID() p.initEnableActiveStandby() + p.NodeID.Store(UniqueID(0)) } func (p *indexCoordConfig) initMinSegmentNumRowsToEnableIndex() { @@ -1464,7 +1482,19 @@ func (p *indexCoordConfig) initEnableActiveStandby() { p.EnableActiveStandby = p.Base.ParseBool("indexCoord.enableActiveStandby", false) } -/////////////////////////////////////////////////////////////////////////////// +func (p *indexCoordConfig) SetNodeID(id UniqueID) { + p.NodeID.Store(id) +} + +func (p *indexCoordConfig) GetNodeID() UniqueID { + val := p.NodeID.Load() + if val != nil { + return val.(UniqueID) + } + return 0 +} + +// ///////////////////////////////////////////////////////////////////////////// // --- indexnode --- type indexNodeConfig struct { Base *BaseTable