From 48661655d68d57a9cec5c5be38b8c26b2a02b5f8 Mon Sep 17 00:00:00 2001 From: rhys Date: Fri, 27 Jun 2025 17:50:42 +0800 Subject: [PATCH] fix: streamingcoord and streamingnode client support internal tls (#42685) https://github.com/milvus-io/milvus/issues/42680 streamingnode/streamingcoord support internal tls Signed-off-by: rhys --- internal/distributed/streamingnode/service.go | 2 ++ internal/streamingcoord/client/client.go | 8 ++++++-- .../client/handler/handler_client.go | 8 ++++++-- .../client/manager/manager_client.go | 8 ++++++-- pkg/util/paramtable/grpc_param.go | 17 +++++++++++++++++ 5 files changed, 37 insertions(+), 6 deletions(-) diff --git a/internal/distributed/streamingnode/service.go b/internal/distributed/streamingnode/service.go index 964e98bd2e..22807c193a 100644 --- a/internal/distributed/streamingnode/service.go +++ b/internal/distributed/streamingnode/service.go @@ -34,6 +34,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" mix "github.com/milvus-io/milvus/internal/distributed/mixcoord/client" + "github.com/milvus-io/milvus/internal/distributed/utils" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" tikvkv "github.com/milvus-io/milvus/internal/kv/tikv" "github.com/milvus-io/milvus/internal/storage" @@ -357,6 +358,7 @@ func (s *Server) initGRPCServer() { interceptor.ServerIDValidationStreamServerInterceptor(serverIDGetter), )), grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()), + utils.EnableInternalTLS("StreamingNode"), ) streamingpb.RegisterStreamingNodeStateServiceServer(s.grpcServer, s.componentState) } diff --git a/internal/streamingcoord/client/client.go b/internal/streamingcoord/client/client.go index 7ede0ce9a4..7b3e95ee63 100644 --- a/internal/streamingcoord/client/client.go +++ b/internal/streamingcoord/client/client.go @@ -7,7 +7,6 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" "github.com/milvus-io/milvus/internal/json" "github.com/milvus-io/milvus/internal/streamingcoord/client/assignment" @@ -97,6 +96,7 @@ func NewClient(etcdCli *clientv3.Client) Client { // getDialOptions returns grpc dial options. func getDialOptions(rb resolver.Builder) []grpc.DialOption { cfg := ¶mtable.Get().StreamingCoordGrpcClientCfg + tlsCfg := ¶mtable.Get().InternalTLSCfg retryPolicy := cfg.GetDefaultRetryPolicy() retryPolicy["retryableStatusCodes"] = []string{"UNAVAILABLE"} defaultServiceConfig := map[string]interface{}{ @@ -117,11 +117,15 @@ func getDialOptions(rb resolver.Builder) []grpc.DialOption { if err != nil { panic(err) } + creds, err := tlsCfg.GetClientCreds(context.Background()) + if err != nil { + panic(err) + } dialOptions := cfg.GetDialOptionsFromConfig() dialOptions = append(dialOptions, grpc.WithBlock(), grpc.WithResolvers(rb), - grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithTransportCredentials(creds), grpc.WithChainUnaryInterceptor( otelgrpc.UnaryClientInterceptor(tracer.GetInterceptorOpts()...), interceptor.ClusterInjectionUnaryClientInterceptor(), diff --git a/internal/streamingnode/client/handler/handler_client.go b/internal/streamingnode/client/handler/handler_client.go index e3db2e6b18..00384d5ea6 100644 --- a/internal/streamingnode/client/handler/handler_client.go +++ b/internal/streamingnode/client/handler/handler_client.go @@ -7,7 +7,6 @@ import ( "github.com/cockroachdb/errors" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" "github.com/milvus-io/milvus/internal/json" "github.com/milvus-io/milvus/internal/streamingnode/client/handler/assignment" @@ -118,6 +117,7 @@ func NewHandlerClient(w types.AssignmentDiscoverWatcher) HandlerClient { // getDialOptions returns grpc dial options. func getDialOptions(rb resolver.Builder) []grpc.DialOption { cfg := ¶mtable.Get().StreamingNodeGrpcClientCfg + tlsCfg := ¶mtable.Get().InternalTLSCfg retryPolicy := cfg.GetDefaultRetryPolicy() retryPolicy["retryableStatusCodes"] = []string{"UNAVAILABLE"} defaultServiceConfig := map[string]interface{}{ @@ -138,11 +138,15 @@ func getDialOptions(rb resolver.Builder) []grpc.DialOption { if err != nil { panic(err) } + creds, err := tlsCfg.GetClientCreds(context.Background()) + if err != nil { + panic(err) + } dialOptions := cfg.GetDialOptionsFromConfig() dialOptions = append(dialOptions, grpc.WithBlock(), grpc.WithResolvers(rb), - grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithTransportCredentials(creds), grpc.WithChainUnaryInterceptor( otelgrpc.UnaryClientInterceptor(tracer.GetInterceptorOpts()...), interceptor.ClusterInjectionUnaryClientInterceptor(), diff --git a/internal/streamingnode/client/manager/manager_client.go b/internal/streamingnode/client/manager/manager_client.go index 530773ed62..bdea8f20e8 100644 --- a/internal/streamingnode/client/manager/manager_client.go +++ b/internal/streamingnode/client/manager/manager_client.go @@ -7,7 +7,6 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" "github.com/milvus-io/milvus/internal/json" "github.com/milvus-io/milvus/internal/util/sessionutil" @@ -70,6 +69,7 @@ func NewManagerClient(etcdCli *clientv3.Client) ManagerClient { // getDialOptions returns grpc dial options. func getDialOptions(rb resolver.Builder) []grpc.DialOption { cfg := ¶mtable.Get().StreamingNodeGrpcClientCfg + tlsCfg := ¶mtable.Get().InternalTLSCfg retryPolicy := cfg.GetDefaultRetryPolicy() retryPolicy["retryableStatusCodes"] = []string{"UNAVAILABLE"} defaultServiceConfig := map[string]interface{}{ @@ -90,11 +90,15 @@ func getDialOptions(rb resolver.Builder) []grpc.DialOption { if err != nil { panic(err) } + creds, err := tlsCfg.GetClientCreds(context.Background()) + if err != nil { + panic(err) + } dialOptions := cfg.GetDialOptionsFromConfig() dialOptions = append(dialOptions, grpc.WithBlock(), grpc.WithResolvers(rb), - grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithTransportCredentials(creds), grpc.WithChainUnaryInterceptor( otelgrpc.UnaryClientInterceptor(tracer.GetInterceptorOpts()...), interceptor.ClusterInjectionUnaryClientInterceptor(), diff --git a/pkg/util/paramtable/grpc_param.go b/pkg/util/paramtable/grpc_param.go index 0f26def585..fe219be15a 100644 --- a/pkg/util/paramtable/grpc_param.go +++ b/pkg/util/paramtable/grpc_param.go @@ -17,6 +17,7 @@ package paramtable import ( + "context" "fmt" "strconv" "time" @@ -24,6 +25,8 @@ import ( "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/backoff" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/keepalive" "github.com/milvus-io/milvus/pkg/v2/log" @@ -582,3 +585,17 @@ func (p *InternalTLSConfig) Init(base *BaseTable) { } p.InternalTLSSNI.Init(base.mgr) } + +func (p *InternalTLSConfig) GetClientCreds(ctx context.Context) (credentials.TransportCredentials, error) { + if !p.InternalTLSEnabled.GetAsBool() { + return insecure.NewCredentials(), nil + } + caPemPath := p.InternalTLSCaPemPath.GetValue() + sni := p.InternalTLSSNI.GetValue() + creds, err := credentials.NewClientTLSFromFile(caPemPath, sni) + if err != nil { + log.Ctx(ctx).Error("Failed to create internal TLS credentials", zap.Error(err)) + return nil, fmt.Errorf("failed to create internal TLS credentials: %w", err) + } + return creds, nil +}