mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-08 01:58:34 +08:00
fix: enable tls integration test after tls fixed at streaming arch (#43025)
issue: #42680 Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
parent
9886d7d4b6
commit
906430f291
@ -138,3 +138,9 @@ Some utility methods are provided in `MiniClusterSuite` to interact with the clu
|
||||
It's a known issue that integration test cases run in same process might affect due to some singleton component not fully cleaned.
|
||||
|
||||
As a temp solution, test cases are separated into different packages to run independently.
|
||||
|
||||
### Some tips
|
||||
|
||||
1. Sometimes, if the test case is killed by some SIGKILL, it will leave some orphan milvus process running in the background. You could use `killall milvus` to kill all milvus process, or `killall -9 milvus` to kill all milvus process forcefully.
|
||||
2. Because the test framework use some determined port (such as 53100 for coord, 19530 for proxy), it will be failed to start a new milvus process if the port is already in use. You could use `lsof -i :53100` to check if the port is already in use.
|
||||
3. The test coverage of milvus can not be generated by the integration test, because that the integration test use multi-process. the test coverage only cover the code that is executed by the integration test itself.
|
||||
|
||||
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
"sync"
|
||||
@ -33,6 +34,10 @@ import (
|
||||
"github.com/milvus-io/milvus/tests/integration/cluster/process"
|
||||
)
|
||||
|
||||
const (
|
||||
MilvusWorkDirEnvKey = "MILVUS_WORK_DIR"
|
||||
)
|
||||
|
||||
type (
|
||||
MiniClusterV3Option func(*MiniClusterV3)
|
||||
ClusterOperationOpt func(*clusterOperationOpt)
|
||||
@ -56,6 +61,12 @@ func WithRootPath(rootPath string) MiniClusterV3Option {
|
||||
}
|
||||
}
|
||||
|
||||
func WithWorkDir(workDir string) MiniClusterV3Option {
|
||||
return func(c *MiniClusterV3) {
|
||||
c.workDir = workDir
|
||||
}
|
||||
}
|
||||
|
||||
func WithEtcdCli(etcdCli *clientv3.Client) MiniClusterV3Option {
|
||||
return func(c *MiniClusterV3) {
|
||||
c.EtcdCli = etcdCli
|
||||
@ -77,6 +88,7 @@ func NewMiniClusterV3(
|
||||
ctx: ctx,
|
||||
mu: sync.Mutex{},
|
||||
nodeID: 0,
|
||||
workDir: os.Getenv(MilvusWorkDirEnvKey),
|
||||
configRefreshInterval: 100 * time.Millisecond,
|
||||
extraEnv: make(map[string]string),
|
||||
mixcoord: make(map[int64]*process.MixcoordProcess),
|
||||
@ -109,6 +121,7 @@ type MiniClusterV3 struct {
|
||||
defaultStreamingNode *process.StreamingNodeProcess
|
||||
|
||||
nodeID int64
|
||||
workDir string
|
||||
mixcoord map[int64]*process.MixcoordProcess
|
||||
proxy map[int64]*process.ProxyProcess
|
||||
datanode map[int64]*process.DataNodeProcess
|
||||
@ -129,6 +142,9 @@ type MiniClusterV3 struct {
|
||||
}
|
||||
|
||||
func (c *MiniClusterV3) init() {
|
||||
if c.workDir == "" {
|
||||
panic("work dir is not set")
|
||||
}
|
||||
if c.rootPath == "" {
|
||||
c.rootPath = fmt.Sprintf("integration-%s", uuid.New())
|
||||
}
|
||||
@ -646,6 +662,7 @@ func (c *MiniClusterV3) getOptions() []process.Option {
|
||||
env[k] = v
|
||||
}
|
||||
return []process.Option{
|
||||
process.WithWorkDir(c.workDir),
|
||||
process.WithServerID(c.nodeID),
|
||||
process.WithRootPath(c.rootPath),
|
||||
process.WithETCDClient(c.EtcdCli),
|
||||
|
||||
@ -27,6 +27,7 @@ import (
|
||||
"path"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
@ -37,6 +38,7 @@ import (
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/backoff"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
"google.golang.org/grpc/metadata"
|
||||
@ -51,12 +53,12 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/crypto"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/interceptor"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/lifetime"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/syncutil"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
||||
)
|
||||
|
||||
const (
|
||||
MilvusWorkDirEnvKey = "MILVUS_WORK_DIR"
|
||||
MilvusClusterComponent = "cluster-manager"
|
||||
)
|
||||
|
||||
@ -153,7 +155,6 @@ func NewMilvusProcess(opts ...Option) *MilvusProcess {
|
||||
notifier: syncutil.NewAsyncTaskNotifier[error](),
|
||||
graceful: lifetime.NewSafeChan(),
|
||||
client: syncutil.NewFuture[io.Closer](),
|
||||
workDir: os.Getenv(MilvusWorkDirEnvKey),
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
@ -452,14 +453,28 @@ func (mp *MilvusProcess) GetAddress(ctx context.Context) (string, error) {
|
||||
}
|
||||
|
||||
func (mp *MilvusProcess) getGrpcClient(ctx context.Context, addr string) (*grpc.ClientConn, error) {
|
||||
if mp.getConfigValueFromEnv(paramtable.Get().InternalTLSCfg.InternalTLSEnabled.Key) == "true" {
|
||||
caPemPath := mp.getConfigValueFromEnv(paramtable.Get().InternalTLSCfg.InternalTLSCaPemPath.Key)
|
||||
sni := mp.getConfigValueFromEnv(paramtable.Get().InternalTLSCfg.InternalTLSSNI.Key)
|
||||
creds, err := credentials.NewClientTLSFromFile(caPemPath, sni)
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("failed to create client tls from file: %w", err))
|
||||
}
|
||||
mp.Logger().Info("create grpc client with tls")
|
||||
return DailGRPClient(ctx, addr, mp.rootPath, mp.nodeID, grpc.WithTransportCredentials(creds))
|
||||
}
|
||||
return DailGRPClient(ctx, addr, mp.rootPath, mp.nodeID)
|
||||
}
|
||||
|
||||
// getConfigValueFromEnv gets the value of the environment variable from the process.
|
||||
func (mp *MilvusProcess) getConfigValueFromEnv(key string) string {
|
||||
envKey := strings.ToUpper(strings.ReplaceAll(key, ".", "_"))
|
||||
return mp.env[envKey]
|
||||
}
|
||||
|
||||
// DailGRPClient dials a grpc client with the given address, root path and node id.
|
||||
func DailGRPClient(ctx context.Context, addr string, rootPath string, nodeID int64) (*grpc.ClientConn, error) {
|
||||
return grpc.DialContext(
|
||||
ctx,
|
||||
addr,
|
||||
func DailGRPClient(ctx context.Context, addr string, rootPath string, nodeID int64, extraOpts ...grpc.DialOption) (*grpc.ClientConn, error) {
|
||||
opts := []grpc.DialOption{
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
grpc.WithBlock(),
|
||||
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
|
||||
@ -488,6 +503,12 @@ func DailGRPClient(ctx context.Context, addr string, rootPath string, nodeID int
|
||||
grpc.FailOnNonTempDialError(true),
|
||||
grpc.WithReturnConnectionError(),
|
||||
grpc.WithDisableRetry(),
|
||||
}
|
||||
opts = append(opts, extraOpts...)
|
||||
return grpc.DialContext(
|
||||
ctx,
|
||||
addr,
|
||||
opts...,
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@ -46,10 +46,11 @@ type InternaltlsTestSuit struct {
|
||||
}
|
||||
|
||||
func (s *InternaltlsTestSuit) SetupSuite() {
|
||||
workDir := s.WorkDir()
|
||||
s.WithMilvusConfig(paramtable.Get().InternalTLSCfg.InternalTLSEnabled.Key, "true")
|
||||
s.WithMilvusConfig(paramtable.Get().InternalTLSCfg.InternalTLSServerPemPath.Key, "../../../configs/cert/server.pem")
|
||||
s.WithMilvusConfig(paramtable.Get().InternalTLSCfg.InternalTLSServerKeyPath.Key, "../../../configs/cert/server.key")
|
||||
s.WithMilvusConfig(paramtable.Get().InternalTLSCfg.InternalTLSCaPemPath.Key, "../../../configs/cert/ca.pem")
|
||||
s.WithMilvusConfig(paramtable.Get().InternalTLSCfg.InternalTLSServerPemPath.Key, workDir+"/configs/cert/server.pem")
|
||||
s.WithMilvusConfig(paramtable.Get().InternalTLSCfg.InternalTLSServerKeyPath.Key, workDir+"/configs/cert/server.key")
|
||||
s.WithMilvusConfig(paramtable.Get().InternalTLSCfg.InternalTLSCaPemPath.Key, workDir+"/configs/cert/ca.pem")
|
||||
s.WithMilvusConfig(paramtable.Get().InternalTLSCfg.InternalTLSSNI.Key, "localhost")
|
||||
|
||||
s.MiniClusterSuite.SetupSuite()
|
||||
@ -217,6 +218,5 @@ func (s *InternaltlsTestSuit) TestHelloMilvus_basic() {
|
||||
}
|
||||
|
||||
func TestInternalTLS(t *testing.T) {
|
||||
t.Skip("skip until we fix the issue https://github.com/milvus-io/milvus/issues/42930")
|
||||
suite.Run(t, new(InternaltlsTestSuit))
|
||||
}
|
||||
|
||||
@ -19,6 +19,7 @@ package integration
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@ -48,6 +49,11 @@ type MiniClusterSuite struct {
|
||||
opt clusterSuiteOption
|
||||
}
|
||||
|
||||
// WorkDir returns the work directory of the cluster.
|
||||
func (s *MiniClusterSuite) WorkDir() string {
|
||||
return os.Getenv(cluster.MilvusWorkDirEnvKey)
|
||||
}
|
||||
|
||||
// WithMilvusConfig sets the environment variable for the given key.
|
||||
// The key can be got from the paramtable package, such as "common.QuotaConfigPath".
|
||||
func (s *MiniClusterSuite) WithMilvusConfig(key string, value string) {
|
||||
@ -79,7 +85,7 @@ func (s *MiniClusterSuite) SetupSuite() {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), caseTimeout)
|
||||
s.cancelFunc = cancel
|
||||
|
||||
s.Cluster = cluster.NewMiniClusterV3(ctx, cluster.WithExtraEnv(s.envConfigs))
|
||||
s.Cluster = cluster.NewMiniClusterV3(ctx, cluster.WithExtraEnv(s.envConfigs), cluster.WithWorkDir(s.WorkDir()))
|
||||
}
|
||||
|
||||
func (s *MiniClusterSuite) SetupTest() {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user