fix: streamingcoord and streamingnode client support internal tls (#42685)

https://github.com/milvus-io/milvus/issues/42680

streamingnode/streamingcoord support internal tls

Signed-off-by: rhys <sdbwlr@163.com>
This commit is contained in:
rhys 2025-06-27 17:50:42 +08:00 committed by GitHub
parent 8367e4ec6a
commit 48661655d6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 37 additions and 6 deletions

View File

@ -34,6 +34,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
mix "github.com/milvus-io/milvus/internal/distributed/mixcoord/client"
"github.com/milvus-io/milvus/internal/distributed/utils"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
tikvkv "github.com/milvus-io/milvus/internal/kv/tikv"
"github.com/milvus-io/milvus/internal/storage"
@ -357,6 +358,7 @@ func (s *Server) initGRPCServer() {
interceptor.ServerIDValidationStreamServerInterceptor(serverIDGetter),
)),
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()),
utils.EnableInternalTLS("StreamingNode"),
)
streamingpb.RegisterStreamingNodeStateServiceServer(s.grpcServer, s.componentState)
}

View File

@ -7,7 +7,6 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"github.com/milvus-io/milvus/internal/json"
"github.com/milvus-io/milvus/internal/streamingcoord/client/assignment"
@ -97,6 +96,7 @@ func NewClient(etcdCli *clientv3.Client) Client {
// getDialOptions returns grpc dial options.
func getDialOptions(rb resolver.Builder) []grpc.DialOption {
cfg := &paramtable.Get().StreamingCoordGrpcClientCfg
tlsCfg := &paramtable.Get().InternalTLSCfg
retryPolicy := cfg.GetDefaultRetryPolicy()
retryPolicy["retryableStatusCodes"] = []string{"UNAVAILABLE"}
defaultServiceConfig := map[string]interface{}{
@ -117,11 +117,15 @@ func getDialOptions(rb resolver.Builder) []grpc.DialOption {
if err != nil {
panic(err)
}
creds, err := tlsCfg.GetClientCreds(context.Background())
if err != nil {
panic(err)
}
dialOptions := cfg.GetDialOptionsFromConfig()
dialOptions = append(dialOptions,
grpc.WithBlock(),
grpc.WithResolvers(rb),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithTransportCredentials(creds),
grpc.WithChainUnaryInterceptor(
otelgrpc.UnaryClientInterceptor(tracer.GetInterceptorOpts()...),
interceptor.ClusterInjectionUnaryClientInterceptor(),

View File

@ -7,7 +7,6 @@ import (
"github.com/cockroachdb/errors"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"github.com/milvus-io/milvus/internal/json"
"github.com/milvus-io/milvus/internal/streamingnode/client/handler/assignment"
@ -118,6 +117,7 @@ func NewHandlerClient(w types.AssignmentDiscoverWatcher) HandlerClient {
// getDialOptions returns grpc dial options.
func getDialOptions(rb resolver.Builder) []grpc.DialOption {
cfg := &paramtable.Get().StreamingNodeGrpcClientCfg
tlsCfg := &paramtable.Get().InternalTLSCfg
retryPolicy := cfg.GetDefaultRetryPolicy()
retryPolicy["retryableStatusCodes"] = []string{"UNAVAILABLE"}
defaultServiceConfig := map[string]interface{}{
@ -138,11 +138,15 @@ func getDialOptions(rb resolver.Builder) []grpc.DialOption {
if err != nil {
panic(err)
}
creds, err := tlsCfg.GetClientCreds(context.Background())
if err != nil {
panic(err)
}
dialOptions := cfg.GetDialOptionsFromConfig()
dialOptions = append(dialOptions,
grpc.WithBlock(),
grpc.WithResolvers(rb),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithTransportCredentials(creds),
grpc.WithChainUnaryInterceptor(
otelgrpc.UnaryClientInterceptor(tracer.GetInterceptorOpts()...),
interceptor.ClusterInjectionUnaryClientInterceptor(),

View File

@ -7,7 +7,6 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"github.com/milvus-io/milvus/internal/json"
"github.com/milvus-io/milvus/internal/util/sessionutil"
@ -70,6 +69,7 @@ func NewManagerClient(etcdCli *clientv3.Client) ManagerClient {
// getDialOptions returns grpc dial options.
func getDialOptions(rb resolver.Builder) []grpc.DialOption {
cfg := &paramtable.Get().StreamingNodeGrpcClientCfg
tlsCfg := &paramtable.Get().InternalTLSCfg
retryPolicy := cfg.GetDefaultRetryPolicy()
retryPolicy["retryableStatusCodes"] = []string{"UNAVAILABLE"}
defaultServiceConfig := map[string]interface{}{
@ -90,11 +90,15 @@ func getDialOptions(rb resolver.Builder) []grpc.DialOption {
if err != nil {
panic(err)
}
creds, err := tlsCfg.GetClientCreds(context.Background())
if err != nil {
panic(err)
}
dialOptions := cfg.GetDialOptionsFromConfig()
dialOptions = append(dialOptions,
grpc.WithBlock(),
grpc.WithResolvers(rb),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithTransportCredentials(creds),
grpc.WithChainUnaryInterceptor(
otelgrpc.UnaryClientInterceptor(tracer.GetInterceptorOpts()...),
interceptor.ClusterInjectionUnaryClientInterceptor(),

View File

@ -17,6 +17,7 @@
package paramtable
import (
"context"
"fmt"
"strconv"
"time"
@ -24,6 +25,8 @@ import (
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
"github.com/milvus-io/milvus/pkg/v2/log"
@ -582,3 +585,17 @@ func (p *InternalTLSConfig) Init(base *BaseTable) {
}
p.InternalTLSSNI.Init(base.mgr)
}
func (p *InternalTLSConfig) GetClientCreds(ctx context.Context) (credentials.TransportCredentials, error) {
if !p.InternalTLSEnabled.GetAsBool() {
return insecure.NewCredentials(), nil
}
caPemPath := p.InternalTLSCaPemPath.GetValue()
sni := p.InternalTLSSNI.GetValue()
creds, err := credentials.NewClientTLSFromFile(caPemPath, sni)
if err != nil {
log.Ctx(ctx).Error("Failed to create internal TLS credentials", zap.Error(err))
return nil, fmt.Errorf("failed to create internal TLS credentials: %w", err)
}
return creds, nil
}