mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
proxy tls support one-way authentication (#17348)
Signed-off-by: xiyichan <2863768433@qq.com> Co-authored-by: SimFG <bang.fu@zilliz.com> Signed-off-by: SimFG <bang.fu@zilliz.com> Co-authored-by: xiyichan <2863768433@qq.com>
This commit is contained in:
parent
f5e63177d2
commit
0986c29d7f
@ -297,6 +297,9 @@ common:
|
|||||||
# please adjust in embedded Milvus: local
|
# please adjust in embedded Milvus: local
|
||||||
storageType: minio
|
storageType: minio
|
||||||
|
|
||||||
|
|
||||||
security:
|
security:
|
||||||
authorizationEnabled: false
|
authorizationEnabled: false
|
||||||
tlsEnabled: false
|
# tls mode values [0, 1, 2]
|
||||||
|
# 0 is close, 1 is one-way authentication, 2 is two-way authentication.
|
||||||
|
tlsMode: 0
|
||||||
|
|||||||
@ -1,6 +1,6 @@
|
|||||||
# How to enable use TLS proxy
|
# How to enable use TLS proxy
|
||||||
|
|
||||||
Milvus proxy uses TLS mutual authentication.
|
Milvus proxy uses TLS two-way and one-way authentication.
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@ -489,8 +489,14 @@ tls:
|
|||||||
|
|
||||||
common:
|
common:
|
||||||
security:
|
security:
|
||||||
tlsEnabled: true
|
tlsMode: 2
|
||||||
```
|
```
|
||||||
|
### One-way authentication
|
||||||
|
Server need server.pem and server.key. Client-side need server.pem.
|
||||||
|
### Two-way authentication
|
||||||
|
Server-side need server.pem, server.key and ca.pem. Client-side need client.pem, client.key, ca.pem.
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
## Connect to the Milvus server with TLS
|
## Connect to the Milvus server with TLS
|
||||||
|
|||||||
@ -188,22 +188,33 @@ func (s *Server) startExternalGrpc(grpcPort int, errChan chan error) {
|
|||||||
grpc_auth.StreamServerInterceptor(proxy.AuthenticationInterceptor))),
|
grpc_auth.StreamServerInterceptor(proxy.AuthenticationInterceptor))),
|
||||||
}
|
}
|
||||||
|
|
||||||
if Params.TLSEnabled {
|
if Params.TLSMode == 1 {
|
||||||
|
creds, err := credentials.NewServerTLSFromFile(Params.ServerPemPath, Params.ServerKeyPath)
|
||||||
|
if err != nil {
|
||||||
|
log.Warn("proxy can't create creds", zap.Error(err))
|
||||||
|
errChan <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
grpcOpts = append(grpcOpts, grpc.Creds(creds))
|
||||||
|
} else if Params.TLSMode == 2 {
|
||||||
cert, err := tls.LoadX509KeyPair(Params.ServerPemPath, Params.ServerKeyPath)
|
cert, err := tls.LoadX509KeyPair(Params.ServerPemPath, Params.ServerKeyPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("proxy cant load x509 key pair", zap.Error(err))
|
log.Warn("proxy cant load x509 key pair", zap.Error(err))
|
||||||
panic(err)
|
errChan <- err
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
certPool := x509.NewCertPool()
|
certPool := x509.NewCertPool()
|
||||||
rootBuf, err := ioutil.ReadFile(Params.CaPemPath)
|
rootBuf, err := ioutil.ReadFile(Params.CaPemPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("failed read ca pem", zap.Error(err))
|
log.Warn("failed read ca pem", zap.Error(err))
|
||||||
panic(err)
|
errChan <- err
|
||||||
|
return
|
||||||
}
|
}
|
||||||
if !certPool.AppendCertsFromPEM(rootBuf) {
|
if !certPool.AppendCertsFromPEM(rootBuf) {
|
||||||
log.Warn("fail to append ca to cert")
|
log.Warn("fail to append ca to cert")
|
||||||
panic("fail to append ca to cert")
|
errChan <- fmt.Errorf("fail to append ca to cert")
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
tlsConf := &tls.Config{
|
tlsConf := &tls.Config{
|
||||||
@ -226,7 +237,8 @@ func (s *Server) startExternalGrpc(grpcPort int, errChan chan error) {
|
|||||||
|
|
||||||
if err := s.grpcExternalServer.Serve(lis); err != nil {
|
if err := s.grpcExternalServer.Serve(lis); err != nil {
|
||||||
log.Error("failed to serve on Proxy's listener", zap.Error(err))
|
log.Error("failed to serve on Proxy's listener", zap.Error(err))
|
||||||
panic(err)
|
errChan <- err
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -277,7 +289,8 @@ func (s *Server) startInternalGrpc(grpcPort int, errChan chan error) {
|
|||||||
|
|
||||||
if err := s.grpcInternalServer.Serve(lis); err != nil {
|
if err := s.grpcInternalServer.Serve(lis); err != nil {
|
||||||
log.Error("failed to internal serve on Proxy's listener", zap.Error(err))
|
log.Error("failed to internal serve on Proxy's listener", zap.Error(err))
|
||||||
panic(err)
|
errChan <- err
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -805,7 +805,7 @@ func (m *MockProxy) ListCredUsers(ctx context.Context, req *milvuspb.ListCredUse
|
|||||||
type WaitOption struct {
|
type WaitOption struct {
|
||||||
Duration time.Duration `json:"duration"`
|
Duration time.Duration `json:"duration"`
|
||||||
Port int `json:"port"`
|
Port int `json:"port"`
|
||||||
TLSEnabled bool `json:"tls_enabled"`
|
TLSMode int `json:"tls_mode"`
|
||||||
ClientPemPath string `json:"client_pem_path"`
|
ClientPemPath string `json:"client_pem_path"`
|
||||||
ClientKeyPath string `json:"client_key_path"`
|
ClientKeyPath string `json:"client_key_path"`
|
||||||
CaPath string `json:"ca_path"`
|
CaPath string `json:"ca_path"`
|
||||||
@ -819,11 +819,11 @@ func (opt *WaitOption) String() string {
|
|||||||
return string(s)
|
return string(s)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newWaitOption(duration time.Duration, Port int, tlsEnabled bool, clientPemPath, clientKeyPath, clientCaPath string) *WaitOption {
|
func newWaitOption(duration time.Duration, Port int, tlsMode int, clientPemPath, clientKeyPath, clientCaPath string) *WaitOption {
|
||||||
return &WaitOption{
|
return &WaitOption{
|
||||||
Duration: duration,
|
Duration: duration,
|
||||||
Port: Port,
|
Port: Port,
|
||||||
TLSEnabled: tlsEnabled,
|
TLSMode: tlsMode,
|
||||||
ClientPemPath: clientPemPath,
|
ClientPemPath: clientPemPath,
|
||||||
ClientKeyPath: clientKeyPath,
|
ClientKeyPath: clientKeyPath,
|
||||||
CaPath: clientCaPath,
|
CaPath: clientCaPath,
|
||||||
@ -860,16 +860,25 @@ func waitForGrpcReady(opt *WaitOption) {
|
|||||||
go func() {
|
go func() {
|
||||||
// just used in UT to self-check service is available.
|
// just used in UT to self-check service is available.
|
||||||
address := "localhost:" + strconv.Itoa(opt.Port)
|
address := "localhost:" + strconv.Itoa(opt.Port)
|
||||||
if !opt.TLSEnabled {
|
var err error
|
||||||
_, err := grpc.Dial(address, grpc.WithBlock(), grpc.WithInsecure())
|
|
||||||
ch <- err
|
if opt.TLSMode == 1 || opt.TLSMode == 2 {
|
||||||
} else {
|
var creds credentials.TransportCredentials
|
||||||
creds, err := withCredential(opt.ClientPemPath, opt.ClientKeyPath, opt.CaPath)
|
if opt.TLSMode == 1 {
|
||||||
|
creds, err = credentials.NewClientTLSFromFile(Params.ServerPemPath, "localhost")
|
||||||
|
} else {
|
||||||
|
creds, err = withCredential(opt.ClientPemPath, opt.ClientKeyPath, opt.CaPath)
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
ch <- err
|
ch <- err
|
||||||
|
return
|
||||||
}
|
}
|
||||||
_, err = grpc.Dial(address, grpc.WithBlock(), grpc.WithTransportCredentials(creds))
|
_, err = grpc.Dial(address, grpc.WithBlock(), grpc.WithTransportCredentials(creds))
|
||||||
ch <- err
|
ch <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if _, err := grpc.Dial(address, grpc.WithBlock(), grpc.WithInsecure()); true {
|
||||||
|
ch <- err
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -895,8 +904,8 @@ var clientKeyPath = "../../../configs/cert/client.key"
|
|||||||
|
|
||||||
// waitForServerReady wait for internal grpc service and external service to be ready, according to the params.
|
// waitForServerReady wait for internal grpc service and external service to be ready, according to the params.
|
||||||
func waitForServerReady() {
|
func waitForServerReady() {
|
||||||
waitForGrpcReady(newWaitOption(waitDuration, Params.InternalPort, false, "", "", ""))
|
waitForGrpcReady(newWaitOption(waitDuration, Params.InternalPort, 0, "", "", ""))
|
||||||
waitForGrpcReady(newWaitOption(waitDuration, Params.Port, Params.TLSEnabled, clientPemPath, clientKeyPath, Params.CaPemPath))
|
waitForGrpcReady(newWaitOption(waitDuration, Params.Port, Params.TLSMode, clientPemPath, clientKeyPath, Params.CaPemPath))
|
||||||
}
|
}
|
||||||
|
|
||||||
func runAndWaitForServerReady(server *Server) error {
|
func runAndWaitForServerReady(server *Server) error {
|
||||||
@ -1184,7 +1193,6 @@ func Test_NewServer(t *testing.T) {
|
|||||||
proxy.Params.ProxyCfg.GinLogging = false
|
proxy.Params.ProxyCfg.GinLogging = false
|
||||||
err = runAndWaitForServerReady(server)
|
err = runAndWaitForServerReady(server)
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
|
|
||||||
err = server.Stop()
|
err = server.Stop()
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
}
|
}
|
||||||
@ -1329,11 +1337,11 @@ func Test_NewServer_HTTPServerDisabled(t *testing.T) {
|
|||||||
err = runAndWaitForServerReady(server)
|
err = runAndWaitForServerReady(server)
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
assert.Nil(t, server.httpServer)
|
assert.Nil(t, server.httpServer)
|
||||||
|
|
||||||
err = server.Stop()
|
err = server.Stop()
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
}
|
}
|
||||||
func Test_NewServer_TLS(t *testing.T) {
|
|
||||||
|
func getServer(t *testing.T) *Server {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
server, err := NewServer(ctx, nil)
|
server, err := NewServer(ctx, nil)
|
||||||
assert.NotNil(t, server)
|
assert.NotNil(t, server)
|
||||||
@ -1344,15 +1352,67 @@ func Test_NewServer_TLS(t *testing.T) {
|
|||||||
server.indexCoordClient = &MockIndexCoord{}
|
server.indexCoordClient = &MockIndexCoord{}
|
||||||
server.queryCoordClient = &MockQueryCoord{}
|
server.queryCoordClient = &MockQueryCoord{}
|
||||||
server.dataCoordClient = &MockDataCoord{}
|
server.dataCoordClient = &MockDataCoord{}
|
||||||
|
return server
|
||||||
|
}
|
||||||
|
|
||||||
Params.TLSEnabled = true
|
func Test_NewServer_TLS_TwoWay(t *testing.T) {
|
||||||
|
server := getServer(t)
|
||||||
|
|
||||||
|
Params.InitOnce("proxy")
|
||||||
|
Params.TLSMode = 2
|
||||||
Params.ServerPemPath = "../../../configs/cert/server.pem"
|
Params.ServerPemPath = "../../../configs/cert/server.pem"
|
||||||
Params.ServerKeyPath = "../../../configs/cert/server.key"
|
Params.ServerKeyPath = "../../../configs/cert/server.key"
|
||||||
Params.CaPemPath = "../../../configs/cert/ca.pem"
|
Params.CaPemPath = "../../../configs/cert/ca.pem"
|
||||||
|
|
||||||
err = runAndWaitForServerReady(server)
|
err := runAndWaitForServerReady(server)
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
|
assert.NotNil(t, server.grpcExternalServer)
|
||||||
err = server.Stop()
|
err = server.Stop()
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Test_NewServer_TLS_OneWay(t *testing.T) {
|
||||||
|
server := getServer(t)
|
||||||
|
|
||||||
|
Params.InitOnce("proxy")
|
||||||
|
Params.TLSMode = 1
|
||||||
|
Params.ServerPemPath = "../../../configs/cert/server.pem"
|
||||||
|
Params.ServerKeyPath = "../../../configs/cert/server.key"
|
||||||
|
|
||||||
|
err := runAndWaitForServerReady(server)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.NotNil(t, server.grpcExternalServer)
|
||||||
|
err = server.Stop()
|
||||||
|
assert.Nil(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_NewServer_TLS_FileNotExisted(t *testing.T) {
|
||||||
|
server := getServer(t)
|
||||||
|
|
||||||
|
Params.InitOnce("proxy")
|
||||||
|
Params.TLSMode = 1
|
||||||
|
Params.ServerPemPath = "../not/existed/server.pem"
|
||||||
|
Params.ServerKeyPath = "../../../configs/cert/server.key"
|
||||||
|
err := runAndWaitForServerReady(server)
|
||||||
|
assert.NotNil(t, err)
|
||||||
|
server.Stop()
|
||||||
|
|
||||||
|
Params.TLSMode = 2
|
||||||
|
Params.ServerPemPath = "../not/existed/server.pem"
|
||||||
|
Params.CaPemPath = "../../../configs/cert/ca.pem"
|
||||||
|
err = runAndWaitForServerReady(server)
|
||||||
|
assert.NotNil(t, err)
|
||||||
|
server.Stop()
|
||||||
|
|
||||||
|
Params.ServerPemPath = "../../../configs/cert/server.pem"
|
||||||
|
Params.CaPemPath = "../not/existed/ca.pem"
|
||||||
|
err = runAndWaitForServerReady(server)
|
||||||
|
assert.NotNil(t, err)
|
||||||
|
server.Stop()
|
||||||
|
|
||||||
|
Params.ServerPemPath = "../../../configs/cert/server.pem"
|
||||||
|
Params.CaPemPath = "service.go"
|
||||||
|
err = runAndWaitForServerReady(server)
|
||||||
|
assert.NotNil(t, err)
|
||||||
|
server.Stop()
|
||||||
|
}
|
||||||
|
|||||||
@ -55,7 +55,7 @@ type grpcConfig struct {
|
|||||||
once sync.Once
|
once sync.Once
|
||||||
Domain string
|
Domain string
|
||||||
IP string
|
IP string
|
||||||
TLSEnabled bool
|
TLSMode int
|
||||||
Port int
|
Port int
|
||||||
InternalPort int
|
InternalPort int
|
||||||
ServerPemPath string
|
ServerPemPath string
|
||||||
@ -89,7 +89,7 @@ func (p *grpcConfig) initPort() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *grpcConfig) initTLSPath() {
|
func (p *grpcConfig) initTLSPath() {
|
||||||
p.TLSEnabled = p.ParseBool("common.security.tlsEnabled", false)
|
p.TLSMode = p.ParseIntWithDefault("common.security.tlsMode", 0)
|
||||||
p.ServerPemPath = p.Get("tls.serverPemPath")
|
p.ServerPemPath = p.Get("tls.serverPemPath")
|
||||||
p.ServerKeyPath = p.Get("tls.serverKeyPath")
|
p.ServerKeyPath = p.Get("tls.serverKeyPath")
|
||||||
p.CaPemPath = p.Get("tls.caPemPath")
|
p.CaPemPath = p.Get("tls.caPemPath")
|
||||||
|
|||||||
@ -117,4 +117,13 @@ func TestGrpcClientParams(t *testing.T) {
|
|||||||
Params.initKeepAliveTimeout()
|
Params.initKeepAliveTimeout()
|
||||||
assert.Equal(t, Params.KeepAliveTimeout, 500*time.Millisecond)
|
assert.Equal(t, Params.KeepAliveTimeout, 500*time.Millisecond)
|
||||||
|
|
||||||
|
Params.Save("common.security.tlsMode", "1")
|
||||||
|
Params.Save("tls.serverPemPath", "/pem")
|
||||||
|
Params.Save("tls.serverKeyPath", "/key")
|
||||||
|
Params.Save("tls.caPemPath", "/ca")
|
||||||
|
Params.initTLSPath()
|
||||||
|
assert.Equal(t, Params.TLSMode, 1)
|
||||||
|
assert.Equal(t, Params.ServerPemPath, "/pem")
|
||||||
|
assert.Equal(t, Params.ServerKeyPath, "/key")
|
||||||
|
assert.Equal(t, Params.CaPemPath, "/ca")
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user