diff --git a/tests/integration/README.md b/tests/integration/README.md index db9711c58a..e6a01928f2 100644 --- a/tests/integration/README.md +++ b/tests/integration/README.md @@ -138,3 +138,9 @@ Some utility methods are provided in `MiniClusterSuite` to interact with the clu It's a known issue that integration test cases run in same process might affect due to some singleton component not fully cleaned. As a temp solution, test cases are separated into different packages to run independently. + +### Some tips + +1. Sometimes, if the test case is killed by some SIGKILL, it will leave some orphan milvus process running in the background. You could use `killall milvus` to kill all milvus process, or `killall -9 milvus` to kill all milvus process forcefully. +2. Because the test framework use some determined port (such as 53100 for coord, 19530 for proxy), it will be failed to start a new milvus process if the port is already in use. You could use `lsof -i :53100` to check if the port is already in use. +3. The test coverage of milvus can not be generated by the integration test, because that the integration test use multi-process. the test coverage only cover the code that is executed by the integration test itself. diff --git a/tests/integration/cluster/cluster.go b/tests/integration/cluster/cluster.go index bf6c020548..d1b5d5f10e 100644 --- a/tests/integration/cluster/cluster.go +++ b/tests/integration/cluster/cluster.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math" + "os" "path" "strings" "sync" @@ -33,6 +34,10 @@ import ( "github.com/milvus-io/milvus/tests/integration/cluster/process" ) +const ( + MilvusWorkDirEnvKey = "MILVUS_WORK_DIR" +) + type ( MiniClusterV3Option func(*MiniClusterV3) ClusterOperationOpt func(*clusterOperationOpt) @@ -56,6 +61,12 @@ func WithRootPath(rootPath string) MiniClusterV3Option { } } +func WithWorkDir(workDir string) MiniClusterV3Option { + return func(c *MiniClusterV3) { + c.workDir = workDir + } +} + func WithEtcdCli(etcdCli *clientv3.Client) MiniClusterV3Option { return func(c *MiniClusterV3) { c.EtcdCli = etcdCli @@ -77,6 +88,7 @@ func NewMiniClusterV3( ctx: ctx, mu: sync.Mutex{}, nodeID: 0, + workDir: os.Getenv(MilvusWorkDirEnvKey), configRefreshInterval: 100 * time.Millisecond, extraEnv: make(map[string]string), mixcoord: make(map[int64]*process.MixcoordProcess), @@ -109,6 +121,7 @@ type MiniClusterV3 struct { defaultStreamingNode *process.StreamingNodeProcess nodeID int64 + workDir string mixcoord map[int64]*process.MixcoordProcess proxy map[int64]*process.ProxyProcess datanode map[int64]*process.DataNodeProcess @@ -129,6 +142,9 @@ type MiniClusterV3 struct { } func (c *MiniClusterV3) init() { + if c.workDir == "" { + panic("work dir is not set") + } if c.rootPath == "" { c.rootPath = fmt.Sprintf("integration-%s", uuid.New()) } @@ -646,6 +662,7 @@ func (c *MiniClusterV3) getOptions() []process.Option { env[k] = v } return []process.Option{ + process.WithWorkDir(c.workDir), process.WithServerID(c.nodeID), process.WithRootPath(c.rootPath), process.WithETCDClient(c.EtcdCli), diff --git a/tests/integration/cluster/process/milvus_process.go b/tests/integration/cluster/process/milvus_process.go index 799b5bc325..e0ffe7b972 100644 --- a/tests/integration/cluster/process/milvus_process.go +++ b/tests/integration/cluster/process/milvus_process.go @@ -27,6 +27,7 @@ import ( "path" "path/filepath" "strconv" + "strings" "sync" "syscall" "time" @@ -37,6 +38,7 @@ import ( "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/backoff" + "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" @@ -51,12 +53,12 @@ import ( "github.com/milvus-io/milvus/pkg/v2/util/crypto" "github.com/milvus-io/milvus/pkg/v2/util/interceptor" "github.com/milvus-io/milvus/pkg/v2/util/lifetime" + "github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/syncutil" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) const ( - MilvusWorkDirEnvKey = "MILVUS_WORK_DIR" MilvusClusterComponent = "cluster-manager" ) @@ -153,7 +155,6 @@ func NewMilvusProcess(opts ...Option) *MilvusProcess { notifier: syncutil.NewAsyncTaskNotifier[error](), graceful: lifetime.NewSafeChan(), client: syncutil.NewFuture[io.Closer](), - workDir: os.Getenv(MilvusWorkDirEnvKey), } for _, opt := range opts { @@ -452,14 +453,28 @@ func (mp *MilvusProcess) GetAddress(ctx context.Context) (string, error) { } func (mp *MilvusProcess) getGrpcClient(ctx context.Context, addr string) (*grpc.ClientConn, error) { + if mp.getConfigValueFromEnv(paramtable.Get().InternalTLSCfg.InternalTLSEnabled.Key) == "true" { + caPemPath := mp.getConfigValueFromEnv(paramtable.Get().InternalTLSCfg.InternalTLSCaPemPath.Key) + sni := mp.getConfigValueFromEnv(paramtable.Get().InternalTLSCfg.InternalTLSSNI.Key) + creds, err := credentials.NewClientTLSFromFile(caPemPath, sni) + if err != nil { + panic(fmt.Errorf("failed to create client tls from file: %w", err)) + } + mp.Logger().Info("create grpc client with tls") + return DailGRPClient(ctx, addr, mp.rootPath, mp.nodeID, grpc.WithTransportCredentials(creds)) + } return DailGRPClient(ctx, addr, mp.rootPath, mp.nodeID) } +// getConfigValueFromEnv gets the value of the environment variable from the process. +func (mp *MilvusProcess) getConfigValueFromEnv(key string) string { + envKey := strings.ToUpper(strings.ReplaceAll(key, ".", "_")) + return mp.env[envKey] +} + // DailGRPClient dials a grpc client with the given address, root path and node id. -func DailGRPClient(ctx context.Context, addr string, rootPath string, nodeID int64) (*grpc.ClientConn, error) { - return grpc.DialContext( - ctx, - addr, +func DailGRPClient(ctx context.Context, addr string, rootPath string, nodeID int64, extraOpts ...grpc.DialOption) (*grpc.ClientConn, error) { + opts := []grpc.DialOption{ grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( @@ -488,6 +503,12 @@ func DailGRPClient(ctx context.Context, addr string, rootPath string, nodeID int grpc.FailOnNonTempDialError(true), grpc.WithReturnConnectionError(), grpc.WithDisableRetry(), + } + opts = append(opts, extraOpts...) + return grpc.DialContext( + ctx, + addr, + opts..., ) } diff --git a/tests/integration/internaltls/internaltls_test.go b/tests/integration/internaltls/internaltls_test.go index 97dd66fd6d..ed8b79fd44 100644 --- a/tests/integration/internaltls/internaltls_test.go +++ b/tests/integration/internaltls/internaltls_test.go @@ -46,10 +46,11 @@ type InternaltlsTestSuit struct { } func (s *InternaltlsTestSuit) SetupSuite() { + workDir := s.WorkDir() s.WithMilvusConfig(paramtable.Get().InternalTLSCfg.InternalTLSEnabled.Key, "true") - s.WithMilvusConfig(paramtable.Get().InternalTLSCfg.InternalTLSServerPemPath.Key, "../../../configs/cert/server.pem") - s.WithMilvusConfig(paramtable.Get().InternalTLSCfg.InternalTLSServerKeyPath.Key, "../../../configs/cert/server.key") - s.WithMilvusConfig(paramtable.Get().InternalTLSCfg.InternalTLSCaPemPath.Key, "../../../configs/cert/ca.pem") + s.WithMilvusConfig(paramtable.Get().InternalTLSCfg.InternalTLSServerPemPath.Key, workDir+"/configs/cert/server.pem") + s.WithMilvusConfig(paramtable.Get().InternalTLSCfg.InternalTLSServerKeyPath.Key, workDir+"/configs/cert/server.key") + s.WithMilvusConfig(paramtable.Get().InternalTLSCfg.InternalTLSCaPemPath.Key, workDir+"/configs/cert/ca.pem") s.WithMilvusConfig(paramtable.Get().InternalTLSCfg.InternalTLSSNI.Key, "localhost") s.MiniClusterSuite.SetupSuite() @@ -217,6 +218,5 @@ func (s *InternaltlsTestSuit) TestHelloMilvus_basic() { } func TestInternalTLS(t *testing.T) { - t.Skip("skip until we fix the issue https://github.com/milvus-io/milvus/issues/42930") suite.Run(t, new(InternaltlsTestSuit)) } diff --git a/tests/integration/suite.go b/tests/integration/suite.go index 24d81032fb..6c931f222a 100644 --- a/tests/integration/suite.go +++ b/tests/integration/suite.go @@ -19,6 +19,7 @@ package integration import ( "context" "flag" + "os" "strings" "sync" "time" @@ -48,6 +49,11 @@ type MiniClusterSuite struct { opt clusterSuiteOption } +// WorkDir returns the work directory of the cluster. +func (s *MiniClusterSuite) WorkDir() string { + return os.Getenv(cluster.MilvusWorkDirEnvKey) +} + // WithMilvusConfig sets the environment variable for the given key. // The key can be got from the paramtable package, such as "common.QuotaConfigPath". func (s *MiniClusterSuite) WithMilvusConfig(key string, value string) { @@ -79,7 +85,7 @@ func (s *MiniClusterSuite) SetupSuite() { ctx, cancel := context.WithTimeout(context.Background(), caseTimeout) s.cancelFunc = cancel - s.Cluster = cluster.NewMiniClusterV3(ctx, cluster.WithExtraEnv(s.envConfigs)) + s.Cluster = cluster.NewMiniClusterV3(ctx, cluster.WithExtraEnv(s.envConfigs), cluster.WithWorkDir(s.WorkDir())) } func (s *MiniClusterSuite) SetupTest() {