From bb913dd837573b860290960cc80b9b64f83cbf8c Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Wed, 31 Dec 2025 16:07:22 +0800 Subject: [PATCH] fix: simplify go ut (#46606) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit issue: #46500 - simplify the run_go_codecov.sh to make sure the set -e to protect any sub command failure. - remove all embed etcd in test to make full test can be run at local. ## PR Summary: Simplify Go Unit Tests by Removing Embedded etcd and Async Startup Scaffolding **Core Invariant:** This PR assumes that unit tests can be simplified by running without embedded etcd servers (delegating to environment-based or external etcd instances via `kvfactory.GetEtcdAndPath()` or `ETCD_ENDPOINTS`) and by removing goroutine-based async startup scaffolding in favor of synchronous component initialization. Tests remain functionally equivalent while becoming simpler to run and debug locally. **What is Removed or Simplified:** 1. **Embedded etcd test infrastructure deleted**: Removes `EmbedEtcdUtil` type and its public methods (SetupEtcd, TearDownEmbedEtcd) from `pkg/util/testutils/embed_etcd.go`, removes the `StartTestEmbedEtcdServer()` helper from `pkg/util/etcd/etcd_util.go`, and removes etcd embedding from test suites (e.g., `TaskSuite`, `EtcdSourceSuite`, `mixcoord/client_test.go`). Tests now either skip etcd-dependent tests (via `MILVUS_UT_WITHOUT_KAFKA=1` environment flag in `kafka_test.go`) or source etcd from external configuration (via `kvfactory.GetEtcdAndPath()` in `task_test.go`, or `ETCD_ENDPOINTS` environment variable in `etcd_source_test.go`). This eliminates the overhead of spinning up temporary etcd servers for unit tests. 2. **Async startup scaffolding replaced with synchronous initialization**: In `internal/proxy/proxy_test.go` and `proxy_rpc_test.go`, the `startGrpc()` method signature removes the `sync.WaitGroup` parameter; components are now created, prepared, and run synchronously in-place rather than in goroutines (e.g., `go testServer.startGrpc(ctx, &p)` becomes `testServer.startGrpc(ctx, &p)` running synchronously). Readiness checks (e.g., `waitForGrpcReady()`) remain in place to ensure startup safety without concurrency constructs. This simplifies control flow and reduces debugging complexity. 3. **Shell script orchestration unified with proper error handling**: In `scripts/run_go_codecov.sh` and `scripts/run_intergration_test.sh`, per-package inline test invocations are consolidated into a single `test_cmd()` function with unified `TEST_CMD_WITH_ARGS` array containing race, coverage, verbose, and other flags. The problematic `set -ex` is replaced with `set -e` alone (removing debug output noise while preserving strict error semantics), ensuring the scripts fail fast on any command failure. **Why No Regression:** - Test assertions and code paths remain unchanged; only deployment source of etcd (embedded → external) and startup orchestration (async → sync) change. - Readiness verification (e.g., `waitForGrpcReady()`) is retained, ensuring components are initialized before test execution. - Test flags (race detection, coverage, verbosity) are uniformly applied across all packages via unified `TEST_CMD_WITH_ARGS`, preserving test coverage and quality. - `set -e` alone is sufficient for strict failure detection without the `-x` flag's verbose output. --------- Signed-off-by: chyezh --- .../mixcoord/client/client_test.go | 17 - internal/proxy/proxy_rpc_test.go | 5 +- internal/proxy/proxy_test.go | 598 ++---------------- internal/querycoordv2/task/task_test.go | 14 +- pkg/config/etcd_source_test.go | 25 +- .../walimpls/impls/kafka/kafka_test.go | 4 + pkg/util/etcd/etcd_util.go | 14 - pkg/util/testutils/embed_etcd.go | 50 -- scripts/run_go_codecov.sh | 89 +-- scripts/run_intergration_test.sh | 69 +- 10 files changed, 159 insertions(+), 726 deletions(-) delete mode 100644 pkg/util/testutils/embed_etcd.go diff --git a/internal/distributed/mixcoord/client/client_test.go b/internal/distributed/mixcoord/client/client_test.go index e4717c2a60..3b54a322ca 100644 --- a/internal/distributed/mixcoord/client/client_test.go +++ b/internal/distributed/mixcoord/client/client_test.go @@ -20,25 +20,21 @@ import ( "context" "math/rand" "os" - "strings" "testing" "time" "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" mock1 "github.com/stretchr/testify/mock" - "go.uber.org/zap" "google.golang.org/grpc" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/util/mock" - "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/proto/indexpb" "github.com/milvus-io/milvus/pkg/v2/proto/internalpb" - "github.com/milvus-io/milvus/pkg/v2/util/etcd" "github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" ) @@ -46,19 +42,6 @@ import ( var mockErr = errors.New("mock grpc err") func TestMain(m *testing.M) { - // init embed etcd - embedetcdServer, tempDir, err := etcd.StartTestEmbedEtcdServer() - if err != nil { - log.Fatal("failed to start embed etcd server", zap.Error(err)) - } - defer os.RemoveAll(tempDir) - defer embedetcdServer.Close() - - addrs := etcd.GetEmbedEtcdEndpoints(embedetcdServer) - - paramtable.Init() - paramtable.Get().Save(Params.EtcdCfg.Endpoints.Key, strings.Join(addrs, ",")) - rand.Seed(time.Now().UnixNano()) os.Exit(m.Run()) } diff --git a/internal/proxy/proxy_rpc_test.go b/internal/proxy/proxy_rpc_test.go index d7f745dc03..80df4c42bf 100644 --- a/internal/proxy/proxy_rpc_test.go +++ b/internal/proxy/proxy_rpc_test.go @@ -4,7 +4,6 @@ import ( "context" "os" "strings" - "sync" "testing" "github.com/stretchr/testify/assert" @@ -23,7 +22,6 @@ import ( func TestProxyRpcLimit(t *testing.T) { var err error - var wg sync.WaitGroup path := "/tmp/milvus/rocksmq" + funcutil.GenRandomStr() t.Setenv("ROCKSMQ_PATH", path) @@ -50,8 +48,7 @@ func TestProxyRpcLimit(t *testing.T) { testServer := newProxyTestServer(proxy) testServer.Proxy.SetAddress(p.GetAddress()) - wg.Add(1) - go testServer.startGrpc(ctx, &wg, &p) + go testServer.startGrpc(ctx, &p) assert.NoError(t, testServer.waitForGrpcReady()) defer testServer.grpcServer.Stop() client, err := grpcproxyclient.NewClient(ctx, "localhost:"+p.Port.GetValue(), 1) diff --git a/internal/proxy/proxy_test.go b/internal/proxy/proxy_test.go index e704eb0b2e..4ef4906411 100644 --- a/internal/proxy/proxy_test.go +++ b/internal/proxy/proxy_test.go @@ -126,26 +126,20 @@ func init() { func runMixCoord(ctx context.Context, localMsg bool) *grpcmixcoord.Server { var rc *grpcmixcoord.Server - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - factory := dependency.NewDefaultFactory(localMsg) - var err error - rc, err = grpcmixcoord.NewServer(ctx, factory) - if err != nil { - panic(err) - } - if err = rc.Prepare(); err != nil { - panic(err) - } - err = rc.Run() - if err != nil { - panic(err) - } - }() - wg.Wait() + factory := dependency.NewDefaultFactory(localMsg) + var err error + rc, err = grpcmixcoord.NewServer(ctx, factory) + if err != nil { + panic(err) + } + if err = rc.Prepare(); err != nil { + panic(err) + } + err = rc.Run() + if err != nil { + panic(err) + } metrics.RegisterMixCoord(Registry) return rc @@ -153,26 +147,20 @@ func runMixCoord(ctx context.Context, localMsg bool) *grpcmixcoord.Server { func runStreamingNode(ctx context.Context, localMsg bool, alias string) *grpcstreamingnode.Server { var sn *grpcstreamingnode.Server - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - factory := dependency.MockDefaultFactory(localMsg, Params) - var err error - sn, err = grpcstreamingnode.NewServer(ctx, factory) - if err != nil { - panic(err) - } - if err = sn.Prepare(); err != nil { - panic(err) - } - err = sn.Run() - if err != nil { - panic(err) - } - }() - wg.Wait() + factory := dependency.MockDefaultFactory(localMsg, Params) + var err error + sn, err = grpcstreamingnode.NewServer(ctx, factory) + if err != nil { + panic(err) + } + if err = sn.Prepare(); err != nil { + panic(err) + } + err = sn.Run() + if err != nil { + panic(err) + } metrics.RegisterStreamingNode(Registry) return sn @@ -180,26 +168,20 @@ func runStreamingNode(ctx context.Context, localMsg bool, alias string) *grpcstr func runQueryNode(ctx context.Context, localMsg bool, alias string) *grpcquerynode.Server { var qn *grpcquerynode.Server - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - factory := dependency.MockDefaultFactory(localMsg, Params) - var err error - qn, err = grpcquerynode.NewServer(ctx, factory) - if err != nil { - panic(err) - } - if err = qn.Prepare(); err != nil { - panic(err) - } - err = qn.Run() - if err != nil { - panic(err) - } - }() - wg.Wait() + factory := dependency.MockDefaultFactory(localMsg, Params) + var err error + qn, err = grpcquerynode.NewServer(ctx, factory) + if err != nil { + panic(err) + } + if err = qn.Prepare(); err != nil { + panic(err) + } + err = qn.Run() + if err != nil { + panic(err) + } metrics.RegisterQueryNode(Registry) return qn @@ -207,26 +189,20 @@ func runQueryNode(ctx context.Context, localMsg bool, alias string) *grpcqueryno func runDataNode(ctx context.Context, localMsg bool, alias string) *grpcdatanode.Server { var dn *grpcdatanode.Server - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - factory := dependency.MockDefaultFactory(localMsg, Params) - var err error - dn, err = grpcdatanode.NewServer(ctx, factory) - if err != nil { - panic(err) - } - if err = dn.Prepare(); err != nil { - panic(err) - } - err = dn.Run() - if err != nil { - panic(err) - } - }() - wg.Wait() + factory := dependency.MockDefaultFactory(localMsg, Params) + var err error + dn, err = grpcdatanode.NewServer(ctx, factory) + if err != nil { + panic(err) + } + if err = dn.Prepare(); err != nil { + panic(err) + } + err = dn.Run() + if err != nil { + panic(err) + } metrics.RegisterDataNode(Registry) return dn @@ -264,9 +240,7 @@ func (s *proxyTestServer) GetStatisticsChannel(ctx context.Context, request *int return s.Proxy.GetStatisticsChannel(ctx, request) } -func (s *proxyTestServer) startGrpc(ctx context.Context, wg *sync.WaitGroup, p *paramtable.GrpcServerConfig) { - defer wg.Done() - +func (s *proxyTestServer) startGrpc(ctx context.Context, p *paramtable.GrpcServerConfig) { kaep := keepalive.EnforcementPolicy{ MinTime: 5 * time.Second, // If a client pings more than once every 5 seconds, terminate the connection PermitWithoutStream: true, // Allow pings even when there are no active streams @@ -935,7 +909,6 @@ func checkPartitionInMemory(t *testing.T, ctx context.Context, proxy *Proxy, dbN func TestProxy(t *testing.T) { var err error - var wg sync.WaitGroup paramtable.Init() params := paramtable.Get() testutil.ResetEnvironment() @@ -1006,7 +979,6 @@ func TestProxy(t *testing.T) { assert.NoError(t, err) testServer := newProxyTestServer(proxy) - wg.Add(1) bt := paramtable.NewBaseTable(paramtable.SkipRemote(true)) base := ¶mtable.ComponentParam{} @@ -1016,7 +988,7 @@ func TestProxy(t *testing.T) { testServer.Proxy.SetAddress(p.GetAddress()) assert.Equal(t, p.GetAddress(), testServer.Proxy.GetAddress()) - go testServer.startGrpc(ctx, &wg, &p) + go testServer.startGrpc(ctx, &p) assert.NoError(t, testServer.waitForGrpcReady()) rootCoordClient, err := mixc.NewClient(ctx) @@ -1104,9 +1076,7 @@ func TestProxy(t *testing.T) { schema := constructTestCollectionSchema(collectionName, int64Field, floatVecField, binaryVecField, structField, dim) createCollectionReq := constructTestCreateCollectionRequest(dbName, collectionName, schema, shardsNum) - wg.Add(1) t.Run("create collection", func(t *testing.T) { - defer wg.Done() req := createCollectionReq resp, err := proxy.CreateCollection(ctx, req) assert.NoError(t, err) @@ -1124,9 +1094,7 @@ func TestProxy(t *testing.T) { assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("create alias", func(t *testing.T) { - defer wg.Done() // create alias aliasReq := &milvuspb.CreateAliasRequest{ Base: nil, @@ -1150,9 +1118,7 @@ func TestProxy(t *testing.T) { }) }) - wg.Add(1) t.Run("describe alias", func(t *testing.T) { - defer wg.Done() describeAliasReq := &milvuspb.DescribeAliasRequest{ Base: nil, DbName: dbName, @@ -1163,9 +1129,7 @@ func TestProxy(t *testing.T) { assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("list alias", func(t *testing.T) { - defer wg.Done() listAliasReq := &milvuspb.ListAliasesRequest{ Base: nil, } @@ -1174,9 +1138,7 @@ func TestProxy(t *testing.T) { assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("alter alias", func(t *testing.T) { - defer wg.Done() // alter alias alterReq := &milvuspb.AlterAliasRequest{ Base: nil, @@ -1210,9 +1172,7 @@ func TestProxy(t *testing.T) { assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("drop alias", func(t *testing.T) { - defer wg.Done() // drop alias resp, err := proxy.DropAlias(ctx, &milvuspb.DropAliasRequest{ Base: nil, @@ -1237,9 +1197,7 @@ func TestProxy(t *testing.T) { assert.Error(t, err) }) - wg.Add(1) t.Run("has collection", func(t *testing.T) { - defer wg.Done() resp, err := proxy.HasCollection(ctx, &milvuspb.HasCollectionRequest{ Base: nil, DbName: dbName, @@ -1262,9 +1220,7 @@ func TestProxy(t *testing.T) { assert.False(t, resp.Value) }) - wg.Add(1) t.Run("describe collection", func(t *testing.T) { - defer wg.Done() collectionID, err := globalMetaCache.GetCollectionID(ctx, dbName, collectionName) assert.NoError(t, err) @@ -1316,9 +1272,7 @@ func TestProxy(t *testing.T) { assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("get collection statistics", func(t *testing.T) { - defer wg.Done() resp, err := proxy.GetCollectionStatistics(ctx, &milvuspb.GetCollectionStatisticsRequest{ Base: nil, DbName: dbName, @@ -1338,9 +1292,7 @@ func TestProxy(t *testing.T) { assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("show collections", func(t *testing.T) { - defer wg.Done() resp, err := proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{ Base: nil, DbName: dbName, @@ -1352,9 +1304,7 @@ func TestProxy(t *testing.T) { assert.Contains(t, resp.CollectionNames, collectionName, "collections: %v", resp.CollectionNames) }) - wg.Add(1) t.Run("alter collection", func(t *testing.T) { - defer wg.Done() resp, err := proxy.AlterCollection(ctx, &milvuspb.AlterCollectionRequest{ Base: nil, DbName: dbName, @@ -1370,9 +1320,7 @@ func TestProxy(t *testing.T) { assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("create partition", func(t *testing.T) { - defer wg.Done() resp, err := proxy.CreatePartition(ctx, &milvuspb.CreatePartitionRequest{ Base: nil, DbName: dbName, @@ -1393,9 +1341,7 @@ func TestProxy(t *testing.T) { assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("has partition", func(t *testing.T) { - defer wg.Done() resp, err := proxy.HasPartition(ctx, &milvuspb.HasPartitionRequest{ Base: nil, DbName: dbName, @@ -1427,9 +1373,7 @@ func TestProxy(t *testing.T) { assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("get partition statistics", func(t *testing.T) { - defer wg.Done() resp, err := proxy.GetPartitionStatistics(ctx, &milvuspb.GetPartitionStatisticsRequest{ Base: nil, DbName: dbName, @@ -1460,9 +1404,7 @@ func TestProxy(t *testing.T) { assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("show partitions", func(t *testing.T) { - defer wg.Done() collectionID, err := globalMetaCache.GetCollectionID(ctx, dbName, collectionName) assert.NoError(t, err) @@ -1504,9 +1446,7 @@ func TestProxy(t *testing.T) { }) var insertedIDs []int64 - wg.Add(1) t.Run("insert", func(t *testing.T) { - defer wg.Done() req := constructTestCollectionInsertRequest(dbName, collectionName, floatVecField, binaryVecField, structField, schema, rowNum, dim) resp, err := proxy.Insert(ctx, req) @@ -1527,9 +1467,7 @@ func TestProxy(t *testing.T) { // TODO(dragondriver): proxy.Delete() flushed := true - wg.Add(1) t.Run("flush", func(t *testing.T) { - defer wg.Done() resp, err := proxy.Flush(ctx, &milvuspb.FlushRequest{ Base: nil, DbName: dbName, @@ -1560,9 +1498,7 @@ func TestProxy(t *testing.T) { log.Warn("flush operation was not sure to be done") } - wg.Add(1) t.Run("get statistics after flush", func(t *testing.T) { - defer wg.Done() if !flushed { t.Skip("flush operation was not done") } @@ -1586,9 +1522,7 @@ func TestProxy(t *testing.T) { assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("create index for floatVec field", func(t *testing.T) { - defer wg.Done() req := constructTestCreateIndexRequest(dbName, collectionName, schemapb.DataType_FloatVector, floatVecField, dim, nlist) resp, err := proxy.CreateIndex(ctx, req) @@ -1596,9 +1530,7 @@ func TestProxy(t *testing.T) { assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("alter_index", func(t *testing.T) { - defer wg.Done() req := &milvuspb.AlterIndexRequest{ DbName: dbName, CollectionName: collectionName, @@ -1616,9 +1548,7 @@ func TestProxy(t *testing.T) { assert.NoError(t, err) }) - wg.Add(1) t.Run("describe index", func(t *testing.T) { - defer wg.Done() resp, err := proxy.DescribeIndex(ctx, &milvuspb.DescribeIndexRequest{ Base: nil, DbName: dbName, @@ -1649,9 +1579,7 @@ func TestProxy(t *testing.T) { assert.NoError(t, err) }) - wg.Add(1) t.Run("describe index with indexName", func(t *testing.T) { - defer wg.Done() resp, err := proxy.DescribeIndex(ctx, &milvuspb.DescribeIndexRequest{ Base: nil, DbName: dbName, @@ -1664,9 +1592,7 @@ func TestProxy(t *testing.T) { assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("get index statistics", func(t *testing.T) { - defer wg.Done() resp, err := proxy.GetIndexStatistics(ctx, &milvuspb.GetIndexStatisticsRequest{ Base: nil, DbName: dbName, @@ -1678,9 +1604,7 @@ func TestProxy(t *testing.T) { assert.Equal(t, testFloatIndexName, resp.IndexDescriptions[0].IndexName) }) - wg.Add(1) t.Run("get index build progress", func(t *testing.T) { - defer wg.Done() resp, err := proxy.GetIndexBuildProgress(ctx, &milvuspb.GetIndexBuildProgressRequest{ Base: nil, DbName: dbName, @@ -1692,9 +1616,7 @@ func TestProxy(t *testing.T) { assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("get index state", func(t *testing.T) { - defer wg.Done() resp, err := proxy.GetIndexState(ctx, &milvuspb.GetIndexStateRequest{ Base: nil, DbName: dbName, @@ -1706,9 +1628,7 @@ func TestProxy(t *testing.T) { assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("load collection not all vecFields with index", func(t *testing.T) { - defer wg.Done() { stateResp, err := proxy.GetLoadState(ctx, &milvuspb.GetLoadStateRequest{ DbName: dbName, @@ -1728,9 +1648,7 @@ func TestProxy(t *testing.T) { assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode()) }) - wg.Add(1) t.Run("create index for binVec field", func(t *testing.T) { - defer wg.Done() req := constructTestCreateIndexRequest(dbName, collectionName, schemapb.DataType_BinaryVector, binaryVecField, dim, nlist) resp, err := proxy.CreateIndex(ctx, req) @@ -1739,9 +1657,7 @@ func TestProxy(t *testing.T) { }) fieldName := typeutil.ConcatStructFieldName(structField, subFieldFVec) - wg.Add(1) t.Run("create index for embedding list field", func(t *testing.T) { - defer wg.Done() req := constructTestCreateIndexRequest(dbName, collectionName, schemapb.DataType_ArrayOfVector, fieldName, dim, nlist) resp, err := proxy.CreateIndex(ctx, req) @@ -1749,9 +1665,7 @@ func TestProxy(t *testing.T) { assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("alter index for embedding list field", func(t *testing.T) { - defer wg.Done() req := &milvuspb.AlterIndexRequest{ DbName: dbName, CollectionName: collectionName, @@ -1769,9 +1683,7 @@ func TestProxy(t *testing.T) { assert.NoError(t, err) }) - wg.Add(1) t.Run("describe index for embedding list field", func(t *testing.T) { - defer wg.Done() resp, err := proxy.DescribeIndex(ctx, &milvuspb.DescribeIndexRequest{ Base: nil, DbName: dbName, @@ -1786,9 +1698,7 @@ func TestProxy(t *testing.T) { assert.True(t, enableMmap, "params: %+v", resp.IndexDescriptions[0]) }) - wg.Add(1) t.Run("describe index with indexName for embedding list field", func(t *testing.T) { - defer wg.Done() resp, err := proxy.DescribeIndex(ctx, &milvuspb.DescribeIndexRequest{ Base: nil, DbName: dbName, @@ -1801,9 +1711,7 @@ func TestProxy(t *testing.T) { assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("get index statistics for embedding list field", func(t *testing.T) { - defer wg.Done() resp, err := proxy.GetIndexStatistics(ctx, &milvuspb.GetIndexStatisticsRequest{ Base: nil, DbName: dbName, @@ -1815,9 +1723,7 @@ func TestProxy(t *testing.T) { assert.Equal(t, testStructFVecIndexName, resp.IndexDescriptions[0].IndexName) }) - wg.Add(1) t.Run("get index build progress for embedding list field", func(t *testing.T) { - defer wg.Done() resp, err := proxy.GetIndexBuildProgress(ctx, &milvuspb.GetIndexBuildProgressRequest{ Base: nil, DbName: dbName, @@ -1829,9 +1735,7 @@ func TestProxy(t *testing.T) { assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("get index state for embedding list field", func(t *testing.T) { - defer wg.Done() resp, err := proxy.GetIndexState(ctx, &milvuspb.GetIndexStateRequest{ Base: nil, DbName: dbName, @@ -1844,9 +1748,7 @@ func TestProxy(t *testing.T) { }) loaded := true - wg.Add(1) t.Run("load collection", func(t *testing.T) { - defer wg.Done() { stateResp, err := proxy.GetLoadState(ctx, &milvuspb.GetLoadStateRequest{ DbName: dbName, @@ -1888,9 +1790,7 @@ func TestProxy(t *testing.T) { assert.True(t, loaded) }) - wg.Add(1) t.Run("show in-memory collections", func(t *testing.T) { - defer wg.Done() resp, err := proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{ Base: nil, DbName: dbName, @@ -1957,10 +1857,7 @@ func TestProxy(t *testing.T) { } }) - wg.Add(1) t.Run("get replicas", func(t *testing.T) { - defer wg.Done() - collectionID, err := globalMetaCache.GetCollectionID(ctx, dbName, collectionName) assert.NoError(t, err) @@ -1971,9 +1868,7 @@ func TestProxy(t *testing.T) { assert.Equal(t, 1, len(resp.Replicas)) }) - wg.Add(1) t.Run("get collection statistics from shard", func(t *testing.T) { - defer wg.Done() if !loaded { t.Skip("collection not loaded") return @@ -2003,9 +1898,7 @@ func TestProxy(t *testing.T) { roundDecimal := 6 expr := fmt.Sprintf("%s > 0", int64Field) - wg.Add(1) t.Run("search", func(t *testing.T) { - defer wg.Done() req := constructTestSearchRequest(dbName, collectionName, floatVecField, expr, nq, nprobe, topk, roundDecimal, dim) resp, err := proxy.Search(ctx, req) @@ -2021,18 +1914,14 @@ func TestProxy(t *testing.T) { } }) - wg.Add(1) t.Run("advanced search", func(t *testing.T) { - defer wg.Done() req := constructTestAdvancedSearchRequest(dbName, collectionName, floatVecField, expr, nq, nprobe, topk, roundDecimal, dim) resp, err := proxy.Search(ctx, req) assert.NoError(t, err) assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) - wg.Add(1) t.Run("embedding list search", func(t *testing.T) { - defer wg.Done() req := constructTestEmbeddingListSearchRequest(dbName, collectionName, fieldName, expr, nq, nprobe, topk, roundDecimal, dim) resp, err := proxy.Search(ctx, req) @@ -2048,18 +1937,14 @@ func TestProxy(t *testing.T) { } }) - wg.Add(1) t.Run("search by primary keys", func(t *testing.T) { - defer wg.Done() req := constructSearchByPksRequest(t, dbName, collectionName, floatVecField, int64Field, insertedIDs, nprobe, topk, roundDecimal) resp, err := proxy.Search(ctx, req) assert.NoError(t, err) assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) - wg.Add(1) t.Run("calculate distance", func(t *testing.T) { - defer wg.Done() opLeft := &milvuspb.VectorsArray{ Array: &milvuspb.VectorsArray_DataArray{ DataArray: &schemapb.VectorField{ @@ -2110,9 +1995,7 @@ func TestProxy(t *testing.T) { assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("get persistent segment info", func(t *testing.T) { - defer wg.Done() resp, err := proxy.GetPersistentSegmentInfo(ctx, &milvuspb.GetPersistentSegmentInfoRequest{ Base: nil, DbName: dbName, @@ -2122,9 +2005,7 @@ func TestProxy(t *testing.T) { assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("get segment info", func(t *testing.T) { - defer wg.Done() resp, err := proxy.GetSegmentsInfo(ctx, &internalpb.GetSegmentsInfoRequest{ DbName: dbName, CollectionID: 1, @@ -2134,9 +2015,7 @@ func TestProxy(t *testing.T) { assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("get query segment info", func(t *testing.T) { - defer wg.Done() resp, err := proxy.GetQuerySegmentInfo(ctx, &milvuspb.GetQuerySegmentInfoRequest{ Base: nil, DbName: dbName, @@ -2146,9 +2025,7 @@ func TestProxy(t *testing.T) { assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("loadBalance", func(t *testing.T) { - defer wg.Done() resp, err := proxy.LoadBalance(ctx, &milvuspb.LoadBalanceRequest{ Base: nil, }) @@ -2158,17 +2035,13 @@ func TestProxy(t *testing.T) { // TODO(dragondriver): dummy - wg.Add(1) t.Run("register link", func(t *testing.T) { - defer wg.Done() resp, err := proxy.RegisterLink(ctx, &milvuspb.RegisterLinkRequest{}) assert.NoError(t, err) assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("get metrics", func(t *testing.T) { - defer wg.Done() req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics) assert.NoError(t, err) resp, err := proxy.GetMetrics(ctx, req) @@ -2196,9 +2069,7 @@ func TestProxy(t *testing.T) { assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("get proxy metrics", func(t *testing.T) { - defer wg.Done() req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics) assert.NoError(t, err) resp, err := proxy.GetProxyMetrics(ctx, req) @@ -2235,9 +2106,7 @@ func TestProxy(t *testing.T) { rateCol.Register(internalpb.RateType_DMLInsert.String()) }) - wg.Add(1) t.Run("release collection", func(t *testing.T) { - defer wg.Done() _, err := globalMetaCache.GetCollectionID(ctx, dbName, collectionName) assert.NoError(t, err) @@ -2251,9 +2120,7 @@ func TestProxy(t *testing.T) { assert.Equal(t, "", resp.Reason) }) - wg.Add(1) t.Run("show in-memory collections after release", func(t *testing.T) { - defer wg.Done() resp, err := proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{ Base: nil, DbName: dbName, @@ -2267,9 +2134,7 @@ func TestProxy(t *testing.T) { }) pLoaded := true - wg.Add(1) t.Run("load partitions", func(t *testing.T) { - defer wg.Done() collectionID, err := globalMetaCache.GetCollectionID(ctx, dbName, collectionName) assert.NoError(t, err) @@ -2319,9 +2184,7 @@ func TestProxy(t *testing.T) { }) assert.True(t, pLoaded) - wg.Add(1) t.Run("show in-memory partitions", func(t *testing.T) { - defer wg.Done() collectionID, err := globalMetaCache.GetCollectionID(ctx, dbName, collectionName) assert.NoError(t, err) @@ -2385,9 +2248,7 @@ func TestProxy(t *testing.T) { } }) - wg.Add(1) t.Run("insert partition", func(t *testing.T) { - defer wg.Done() req := constructPartitionInsertRequest(dbName, collectionName, partitionName, floatVecField, binaryVecField, structField, schema, rowNum, dim) resp, err := proxy.Insert(ctx, req) @@ -2398,9 +2259,7 @@ func TestProxy(t *testing.T) { assert.Equal(t, int64(rowNum), resp.InsertCnt) }) - wg.Add(1) t.Run("get partition statistics from shard", func(t *testing.T) { - defer wg.Done() if !pLoaded { t.Skip("partition not loaded") } @@ -2436,9 +2295,7 @@ func TestProxy(t *testing.T) { assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("get collection statistics from hybrid", func(t *testing.T) { - defer wg.Done() if !flushed { t.Skip("flush operation was not done") } @@ -2465,9 +2322,7 @@ func TestProxy(t *testing.T) { assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("upsert when autoID == true", func(t *testing.T) { - defer wg.Done() // autoID==true but not pass pk in upsert, failed req := constructCollectionUpsertRequestNoPK(dbName, collectionName, floatVecField, binaryVecField, structField, schema, rowNum, dim) @@ -2489,9 +2344,7 @@ func TestProxy(t *testing.T) { assert.Equal(t, int64(rowNum), resp.UpsertCnt) }) - wg.Add(1) t.Run("release partition", func(t *testing.T) { - defer wg.Done() resp, err := proxy.ReleasePartitions(ctx, &milvuspb.ReleasePartitionsRequest{ Base: nil, DbName: dbName, @@ -2502,9 +2355,7 @@ func TestProxy(t *testing.T) { assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("show in-memory partitions after release partition", func(t *testing.T) { - defer wg.Done() collectionID, err := globalMetaCache.GetCollectionID(ctx, dbName, collectionName) assert.NoError(t, err) @@ -2533,9 +2384,7 @@ func TestProxy(t *testing.T) { assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("drop partition", func(t *testing.T) { - defer wg.Done() resp, err := proxy.DropPartition(ctx, &milvuspb.DropPartitionRequest{ Base: nil, DbName: dbName, @@ -2583,9 +2432,7 @@ func TestProxy(t *testing.T) { assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("has partition after drop partition", func(t *testing.T) { - defer wg.Done() resp, err := proxy.HasPartition(ctx, &milvuspb.HasPartitionRequest{ Base: nil, DbName: dbName, @@ -2597,9 +2444,7 @@ func TestProxy(t *testing.T) { assert.False(t, resp.Value) }) - wg.Add(1) t.Run("show partitions after drop partition", func(t *testing.T) { - defer wg.Done() collectionID, err := globalMetaCache.GetCollectionID(ctx, dbName, collectionName) assert.NoError(t, err) @@ -2617,9 +2462,7 @@ func TestProxy(t *testing.T) { assert.Equal(t, 1, len(resp.PartitionNames)) }) - wg.Add(1) t.Run("drop index", func(t *testing.T) { - defer wg.Done() resp, err := proxy.DropIndex(ctx, &milvuspb.DropIndexRequest{ Base: nil, DbName: dbName, @@ -2631,9 +2474,7 @@ func TestProxy(t *testing.T) { assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("Delete", func(t *testing.T) { - defer wg.Done() _, err := proxy.Delete(ctx, &milvuspb.DeleteRequest{ Base: nil, DbName: dbName, @@ -2644,9 +2485,7 @@ func TestProxy(t *testing.T) { assert.NoError(t, err) }) - wg.Add(1) t.Run("truncate collection", func(t *testing.T) { - defer wg.Done() _, err := globalMetaCache.GetCollectionID(ctx, dbName, collectionName) assert.NoError(t, err) @@ -2669,9 +2508,7 @@ func TestProxy(t *testing.T) { assert.Equal(t, "0", rowNumStr) }) - wg.Add(1) t.Run("drop collection", func(t *testing.T) { - defer wg.Done() _, err := globalMetaCache.GetCollectionID(ctx, dbName, collectionName) assert.NoError(t, err) @@ -2710,9 +2547,7 @@ func TestProxy(t *testing.T) { assert.False(t, hasDatabase) }) - wg.Add(1) t.Run("has collection after drop collection", func(t *testing.T) { - defer wg.Done() resp, err := proxy.HasCollection(ctx, &milvuspb.HasCollectionRequest{ Base: nil, DbName: dbName, @@ -2724,9 +2559,7 @@ func TestProxy(t *testing.T) { assert.False(t, resp.Value) }) - wg.Add(1) t.Run("show all collections after drop collection", func(t *testing.T) { - defer wg.Done() resp, err := proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{ Base: nil, DbName: dbName, @@ -2742,10 +2575,7 @@ func TestProxy(t *testing.T) { username := "test_username_" + funcutil.RandomString(15) password := "password" - wg.Add(1) t.Run("credential CREATE api", func(t *testing.T) { - defer wg.Done() - // 1. create credential createCredentialReq := constructCreateCredentialRequest(username, crypto.Base64Encode(password)) // success @@ -2777,9 +2607,7 @@ func TestProxy(t *testing.T) { assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("credential UPDATE api", func(t *testing.T) { - defer wg.Done() rootCtx := ctx fooCtx := GetContext(context.Background(), "foo:123456") ctx = fooCtx @@ -2840,10 +2668,7 @@ func TestProxy(t *testing.T) { assert.Equal(t, commonpb.ErrorCode_Success, updateResp.ErrorCode) }) - wg.Add(1) t.Run("credential GET api", func(t *testing.T) { - defer wg.Done() - // 3. get credential newPassword := "new_password" getCredentialReq := constructGetCredentialRequest(username) @@ -2858,10 +2683,7 @@ func TestProxy(t *testing.T) { assert.NotEqual(t, commonpb.ErrorCode_Success, getResp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("credential LIST api", func(t *testing.T) { - defer wg.Done() - // 4. list credential usernames listCredUsersReq := constructListCredUsersRequest() listUsersResp, err := proxy.ListCredUsers(ctx, listCredUsersReq) @@ -2869,10 +2691,7 @@ func TestProxy(t *testing.T) { assert.True(t, len(listUsersResp.Usernames) > 0) }) - wg.Add(1) t.Run("credential DELETE api", func(t *testing.T) { - defer wg.Done() - // 5. delete credential delCredReq := constructDelCredRequest(username) @@ -2881,7 +2700,6 @@ func TestProxy(t *testing.T) { assert.Equal(t, commonpb.ErrorCode_Success, deleteResp.ErrorCode) }) - testProxyRole(ctx, t, proxy) testProxyPrivilege(ctx, t, proxy) testProxyOperatePrivilegeV2(ctx, t, proxy) assert.False(t, false, true) @@ -2899,81 +2717,61 @@ func TestProxy(t *testing.T) { proxy.UpdateStateCode(commonpb.StateCode_Abnormal) - wg.Add(1) t.Run("CreateCollection fail, unhealthy", func(t *testing.T) { - defer wg.Done() resp, err := proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("DropCollection fail, unhealthy", func(t *testing.T) { - defer wg.Done() resp, err := proxy.DropCollection(ctx, &milvuspb.DropCollectionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("TruncateCollection fail, unhealthy", func(t *testing.T) { - defer wg.Done() resp, err := proxy.TruncateCollection(ctx, &milvuspb.TruncateCollectionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("HasCollection fail, unhealthy", func(t *testing.T) { - defer wg.Done() resp, err := proxy.HasCollection(ctx, &milvuspb.HasCollectionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("LoadCollection fail, unhealthy", func(t *testing.T) { - defer wg.Done() resp, err := proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("ReleaseCollection fail, unhealthy", func(t *testing.T) { - defer wg.Done() resp, err := proxy.ReleaseCollection(ctx, &milvuspb.ReleaseCollectionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("DescribeCollection fail, unhealthy", func(t *testing.T) { - defer wg.Done() resp, err := proxy.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("GetCollectionStatistics fail, unhealthy", func(t *testing.T) { - defer wg.Done() resp, err := proxy.GetCollectionStatistics(ctx, &milvuspb.GetCollectionStatisticsRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("ShowCollections fail, unhealthy", func(t *testing.T) { - defer wg.Done() resp, err := proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("alter collection fail, unhealthy", func(t *testing.T) { - defer wg.Done() resp, err := proxy.AlterCollection(ctx, &milvuspb.AlterCollectionRequest{ Base: nil, DbName: dbName, @@ -2983,297 +2781,223 @@ func TestProxy(t *testing.T) { assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("CreatePartition fail, unhealthy", func(t *testing.T) { - defer wg.Done() resp, err := proxy.CreatePartition(ctx, &milvuspb.CreatePartitionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("DropPartition fail, unhealthy", func(t *testing.T) { - defer wg.Done() resp, err := proxy.DropPartition(ctx, &milvuspb.DropPartitionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("HasPartition fail, unhealthy", func(t *testing.T) { - defer wg.Done() resp, err := proxy.HasPartition(ctx, &milvuspb.HasPartitionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("LoadPartitions fail, unhealthy", func(t *testing.T) { - defer wg.Done() resp, err := proxy.LoadPartitions(ctx, &milvuspb.LoadPartitionsRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("ReleasePartitions fail, unhealthy", func(t *testing.T) { - defer wg.Done() resp, err := proxy.ReleasePartitions(ctx, &milvuspb.ReleasePartitionsRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("GetPartitionStatistics fail, unhealthy", func(t *testing.T) { - defer wg.Done() resp, err := proxy.GetPartitionStatistics(ctx, &milvuspb.GetPartitionStatisticsRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("ShowPartitions fail, unhealthy", func(t *testing.T) { - defer wg.Done() resp, err := proxy.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("GetLoadingProgress fail, unhealthy", func(t *testing.T) { - defer wg.Done() resp, err := proxy.GetLoadingProgress(ctx, &milvuspb.GetLoadingProgressRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("GetLoadState fail, unhealthy", func(t *testing.T) { - defer wg.Done() resp, err := proxy.GetLoadState(ctx, &milvuspb.GetLoadStateRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("CreateIndex fail, unhealthy", func(t *testing.T) { - defer wg.Done() resp, err := proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("DescribeIndex fail, unhealthy", func(t *testing.T) { - defer wg.Done() resp, err := proxy.DescribeIndex(ctx, &milvuspb.DescribeIndexRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("GetIndexStatistics fail, unhealthy", func(t *testing.T) { - defer wg.Done() resp, err := proxy.GetIndexStatistics(ctx, &milvuspb.GetIndexStatisticsRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("DropIndex fail, unhealthy", func(t *testing.T) { - defer wg.Done() resp, err := proxy.DropIndex(ctx, &milvuspb.DropIndexRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("GetIndexBuildProgress fail, unhealthy", func(t *testing.T) { - defer wg.Done() resp, err := proxy.GetIndexBuildProgress(ctx, &milvuspb.GetIndexBuildProgressRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("GetIndexState fail, unhealthy", func(t *testing.T) { - defer wg.Done() resp, err := proxy.GetIndexState(ctx, &milvuspb.GetIndexStateRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("Insert fail, unhealthy", func(t *testing.T) { - defer wg.Done() resp, err := proxy.Insert(ctx, &milvuspb.InsertRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("Delete fail, unhealthy", func(t *testing.T) { - defer wg.Done() resp, err := proxy.Delete(ctx, &milvuspb.DeleteRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("Upsert fail, unhealthy", func(t *testing.T) { - defer wg.Done() resp, err := proxy.Upsert(ctx, &milvuspb.UpsertRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("Search fail, unhealthy", func(t *testing.T) { - defer wg.Done() resp, err := proxy.Search(ctx, &milvuspb.SearchRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("Flush fail, unhealthy", func(t *testing.T) { - defer wg.Done() resp, err := proxy.Flush(ctx, &milvuspb.FlushRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("Query fail, unhealthy", func(t *testing.T) { - defer wg.Done() resp, err := proxy.Query(ctx, &milvuspb.QueryRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("CreateAlias fail, unhealthy", func(t *testing.T) { - defer wg.Done() resp, err := proxy.CreateAlias(ctx, &milvuspb.CreateAliasRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("DropAlias fail, unhealthy", func(t *testing.T) { - defer wg.Done() resp, err := proxy.DropAlias(ctx, &milvuspb.DropAliasRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("AlterAlias fail, unhealthy", func(t *testing.T) { - defer wg.Done() resp, err := proxy.AlterAlias(ctx, &milvuspb.AlterAliasRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("ListAliases fail, unhealthy", func(t *testing.T) { - defer wg.Done() resp, err := proxy.ListAliases(ctx, &milvuspb.ListAliasesRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("DescribeAlias fail, unhealthy", func(t *testing.T) { - defer wg.Done() resp, err := proxy.DescribeAlias(ctx, &milvuspb.DescribeAliasRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("GetPersistentSegmentInfo fail, unhealthy", func(t *testing.T) { - defer wg.Done() resp, err := proxy.GetPersistentSegmentInfo(ctx, &milvuspb.GetPersistentSegmentInfoRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("GetQuerySegmentInfo fail, unhealthy", func(t *testing.T) { - defer wg.Done() resp, err := proxy.GetQuerySegmentInfo(ctx, &milvuspb.GetQuerySegmentInfoRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("LoadBalance fail, unhealthy", func(t *testing.T) { - defer wg.Done() resp, err := proxy.LoadBalance(ctx, &milvuspb.LoadBalanceRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("RegisterLink fail, unhealthy", func(t *testing.T) { - defer wg.Done() resp, err := proxy.RegisterLink(ctx, &milvuspb.RegisterLinkRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("GetMetrics fail, unhealthy", func(t *testing.T) { - defer wg.Done() resp, err := proxy.GetMetrics(ctx, &milvuspb.GetMetricsRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("InvalidateCredCache fail, unhealthy", func(t *testing.T) { - defer wg.Done() resp, err := proxy.InvalidateCredentialCache(ctx, &proxypb.InvalidateCredCacheRequest{Username: "xxx"}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("UpdateCredentialCache fail, unhealthy", func(t *testing.T) { - defer wg.Done() resp, err := proxy.UpdateCredentialCache(ctx, &proxypb.UpdateCredCacheRequest{Username: "xxx", Password: "xxx"}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("CreateCredential fail, unhealthy", func(t *testing.T) { - defer wg.Done() resp, err := proxy.CreateCredential(ctx, &milvuspb.CreateCredentialRequest{Username: "xxx"}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("UpdateCredential fail, unhealthy", func(t *testing.T) { - defer wg.Done() resp, err := proxy.UpdateCredential(ctx, &milvuspb.UpdateCredentialRequest{Username: "xxx"}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("DeleteCredential fail, unhealthy", func(t *testing.T) { - defer wg.Done() resp, err := proxy.DeleteCredential(ctx, &milvuspb.DeleteCredentialRequest{Username: "xxx"}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("ListCredUsers fail, unhealthy", func(t *testing.T) { - defer wg.Done() resp, err := proxy.ListCredUsers(ctx, &milvuspb.ListCredUsersRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) @@ -3296,81 +3020,61 @@ func TestProxy(t *testing.T) { ddParallel := proxy.sched.ddQueue.getMaxTaskNum() proxy.sched.ddQueue.setMaxTaskNum(0) - wg.Add(1) t.Run("CreateCollection fail, dd queue full", func(t *testing.T) { - defer wg.Done() resp, err := proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("DropCollection fail, dd queue full", func(t *testing.T) { - defer wg.Done() resp, err := proxy.DropCollection(ctx, &milvuspb.DropCollectionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("TruncateCollection fail, dd queue full", func(t *testing.T) { - defer wg.Done() resp, err := proxy.TruncateCollection(ctx, &milvuspb.TruncateCollectionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("HasCollection fail, dd queue full", func(t *testing.T) { - defer wg.Done() resp, err := proxy.HasCollection(ctx, &milvuspb.HasCollectionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("LoadCollection fail, dd queue full", func(t *testing.T) { - defer wg.Done() resp, err := proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("ReleaseCollection fail, dd queue full", func(t *testing.T) { - defer wg.Done() resp, err := proxy.ReleaseCollection(ctx, &milvuspb.ReleaseCollectionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("DescribeCollection fail, dd queue full", func(t *testing.T) { - defer wg.Done() resp, err := proxy.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("GetCollectionStatistics fail, dd queue full", func(t *testing.T) { - defer wg.Done() resp, err := proxy.GetCollectionStatistics(ctx, &milvuspb.GetCollectionStatisticsRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("ShowCollections fail, dd queue full", func(t *testing.T) { - defer wg.Done() resp, err := proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("alter collection fail, dd queue full", func(t *testing.T) { - defer wg.Done() resp, err := proxy.AlterCollection(ctx, &milvuspb.AlterCollectionRequest{ Base: nil, DbName: dbName, @@ -3380,154 +3084,116 @@ func TestProxy(t *testing.T) { assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("CreatePartition fail, dd queue full", func(t *testing.T) { - defer wg.Done() resp, err := proxy.CreatePartition(ctx, &milvuspb.CreatePartitionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("DropPartition fail, dd queue full", func(t *testing.T) { - defer wg.Done() resp, err := proxy.DropPartition(ctx, &milvuspb.DropPartitionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("HasPartition fail, dd queue full", func(t *testing.T) { - defer wg.Done() resp, err := proxy.HasPartition(ctx, &milvuspb.HasPartitionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("LoadPartitions fail, dd queue full", func(t *testing.T) { - defer wg.Done() resp, err := proxy.LoadPartitions(ctx, &milvuspb.LoadPartitionsRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("ReleasePartitions fail, dd queue full", func(t *testing.T) { - defer wg.Done() resp, err := proxy.ReleasePartitions(ctx, &milvuspb.ReleasePartitionsRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("GetPartitionStatistics fail, dd queue full", func(t *testing.T) { - defer wg.Done() resp, err := proxy.GetPartitionStatistics(ctx, &milvuspb.GetPartitionStatisticsRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("ShowPartitions fail, dd queue full", func(t *testing.T) { - defer wg.Done() resp, err := proxy.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("CreateIndex fail, dd queue full", func(t *testing.T) { - defer wg.Done() resp, err := proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("DescribeIndex fail, dd queue full", func(t *testing.T) { - defer wg.Done() resp, err := proxy.DescribeIndex(ctx, &milvuspb.DescribeIndexRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("GetIndexStatistics fail, dd queue full", func(t *testing.T) { - defer wg.Done() resp, err := proxy.GetIndexStatistics(ctx, &milvuspb.GetIndexStatisticsRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("DropIndex fail, dd queue full", func(t *testing.T) { - defer wg.Done() resp, err := proxy.DropIndex(ctx, &milvuspb.DropIndexRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("GetIndexBuildProgress fail, dd queue full", func(t *testing.T) { - defer wg.Done() resp, err := proxy.GetIndexBuildProgress(ctx, &milvuspb.GetIndexBuildProgressRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("GetIndexState fail, dd queue full", func(t *testing.T) { - defer wg.Done() resp, err := proxy.GetIndexState(ctx, &milvuspb.GetIndexStateRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("Flush fail, dc queue full", func(t *testing.T) { - defer wg.Done() proxy.sched.dcQueue.setMaxTaskNum(0) resp, err := proxy.Flush(ctx, &milvuspb.FlushRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("CreateAlias fail, dd queue full", func(t *testing.T) { - defer wg.Done() resp, err := proxy.CreateAlias(ctx, &milvuspb.CreateAliasRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("DropAlias fail, dd queue full", func(t *testing.T) { - defer wg.Done() resp, err := proxy.DropAlias(ctx, &milvuspb.DropAliasRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("AlterAlias fail, dd queue full", func(t *testing.T) { - defer wg.Done() resp, err := proxy.AlterAlias(ctx, &milvuspb.AlterAliasRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("DescribeAlias fail, dd queue full", func(t *testing.T) { - defer wg.Done() resp, err := proxy.DescribeAlias(ctx, &milvuspb.DescribeAliasRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("ListAliases fail, dd queue full", func(t *testing.T) { - defer wg.Done() resp, err := proxy.ListAliases(ctx, &milvuspb.ListAliasesRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) @@ -3538,25 +3204,19 @@ func TestProxy(t *testing.T) { dmParallelism := proxy.sched.dmQueue.getMaxTaskNum() proxy.sched.dmQueue.setMaxTaskNum(0) - wg.Add(1) t.Run("Insert fail, dm queue full", func(t *testing.T) { - defer wg.Done() resp, err := proxy.Insert(ctx, &milvuspb.InsertRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("Delete fail, dm queue full", func(t *testing.T) { - defer wg.Done() resp, err := proxy.Delete(ctx, &milvuspb.DeleteRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("Upsert fail, dm queue full", func(t *testing.T) { - defer wg.Done() resp, err := proxy.Upsert(ctx, &milvuspb.UpsertRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) @@ -3567,17 +3227,13 @@ func TestProxy(t *testing.T) { dqParallelism := proxy.sched.dqQueue.getMaxTaskNum() proxy.sched.dqQueue.setMaxTaskNum(0) - wg.Add(1) t.Run("Search fail, dq queue full", func(t *testing.T) { - defer wg.Done() resp, err := proxy.Search(ctx, &milvuspb.SearchRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("Query fail, dq queue full", func(t *testing.T) { - defer wg.Done() resp, err := proxy.Query(ctx, &milvuspb.QueryRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) @@ -3592,81 +3248,61 @@ func TestProxy(t *testing.T) { defer shortCancel() time.Sleep(timeout) - wg.Add(1) t.Run("CreateCollection, timeout", func(t *testing.T) { - defer wg.Done() resp, err := proxy.CreateCollection(shortCtx, &milvuspb.CreateCollectionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("DropCollection fail, timeout", func(t *testing.T) { - defer wg.Done() resp, err := proxy.DropCollection(shortCtx, &milvuspb.DropCollectionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("TruncateCollection fail, timeout", func(t *testing.T) { - defer wg.Done() resp, err := proxy.TruncateCollection(shortCtx, &milvuspb.TruncateCollectionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("HasCollection fail, timeout", func(t *testing.T) { - defer wg.Done() resp, err := proxy.HasCollection(shortCtx, &milvuspb.HasCollectionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("LoadCollection fail, timeout", func(t *testing.T) { - defer wg.Done() resp, err := proxy.LoadCollection(shortCtx, &milvuspb.LoadCollectionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("ReleaseCollection fail, timeout", func(t *testing.T) { - defer wg.Done() resp, err := proxy.ReleaseCollection(shortCtx, &milvuspb.ReleaseCollectionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("DescribeCollection fail, timeout", func(t *testing.T) { - defer wg.Done() resp, err := proxy.DescribeCollection(shortCtx, &milvuspb.DescribeCollectionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("GetCollectionStatistics fail, timeout", func(t *testing.T) { - defer wg.Done() resp, err := proxy.GetCollectionStatistics(shortCtx, &milvuspb.GetCollectionStatisticsRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("ShowCollections fail, timeout", func(t *testing.T) { - defer wg.Done() resp, err := proxy.ShowCollections(shortCtx, &milvuspb.ShowCollectionsRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("alter collection fail, timeout", func(t *testing.T) { - defer wg.Done() resp, err := proxy.AlterCollection(shortCtx, &milvuspb.AlterCollectionRequest{ Base: nil, DbName: dbName, @@ -3676,234 +3312,176 @@ func TestProxy(t *testing.T) { assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("CreatePartition fail, timeout", func(t *testing.T) { - defer wg.Done() resp, err := proxy.CreatePartition(shortCtx, &milvuspb.CreatePartitionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("DropPartition fail, timeout", func(t *testing.T) { - defer wg.Done() resp, err := proxy.DropPartition(shortCtx, &milvuspb.DropPartitionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("HasPartition fail, timeout", func(t *testing.T) { - defer wg.Done() resp, err := proxy.HasPartition(shortCtx, &milvuspb.HasPartitionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("LoadPartitions fail, timeout", func(t *testing.T) { - defer wg.Done() resp, err := proxy.LoadPartitions(shortCtx, &milvuspb.LoadPartitionsRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("ReleasePartitions fail, timeout", func(t *testing.T) { - defer wg.Done() resp, err := proxy.ReleasePartitions(shortCtx, &milvuspb.ReleasePartitionsRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("GetPartitionStatistics fail, timeout", func(t *testing.T) { - defer wg.Done() resp, err := proxy.GetPartitionStatistics(shortCtx, &milvuspb.GetPartitionStatisticsRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("ShowPartitions fail, timeout", func(t *testing.T) { - defer wg.Done() resp, err := proxy.ShowPartitions(shortCtx, &milvuspb.ShowPartitionsRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("GetLoadingProgress fail, timeout", func(t *testing.T) { - defer wg.Done() resp, err := proxy.GetLoadingProgress(shortCtx, &milvuspb.GetLoadingProgressRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("CreateIndex fail, timeout", func(t *testing.T) { - defer wg.Done() resp, err := proxy.CreateIndex(shortCtx, &milvuspb.CreateIndexRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("DescribeIndex fail, timeout", func(t *testing.T) { - defer wg.Done() resp, err := proxy.DescribeIndex(shortCtx, &milvuspb.DescribeIndexRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("GetIndexStatistics fail, timeout", func(t *testing.T) { - defer wg.Done() resp, err := proxy.GetIndexStatistics(shortCtx, &milvuspb.GetIndexStatisticsRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("DropIndex fail, timeout", func(t *testing.T) { - defer wg.Done() resp, err := proxy.DropIndex(shortCtx, &milvuspb.DropIndexRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("GetIndexBuildProgress fail, timeout", func(t *testing.T) { - defer wg.Done() resp, err := proxy.GetIndexBuildProgress(shortCtx, &milvuspb.GetIndexBuildProgressRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("GetIndexState fail, timeout", func(t *testing.T) { - defer wg.Done() resp, err := proxy.GetIndexState(shortCtx, &milvuspb.GetIndexStateRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("Flush fail, timeout", func(t *testing.T) { - defer wg.Done() _, err := proxy.Flush(shortCtx, &milvuspb.FlushRequest{}) assert.NoError(t, err) // FIXME(dragondriver) // assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("Insert fail, timeout", func(t *testing.T) { - defer wg.Done() resp, err := proxy.Insert(shortCtx, &milvuspb.InsertRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("Delete fail, timeout", func(t *testing.T) { - defer wg.Done() resp, err := proxy.Delete(shortCtx, &milvuspb.DeleteRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("Update fail, timeout", func(t *testing.T) { - defer wg.Done() resp, err := proxy.Upsert(shortCtx, &milvuspb.UpsertRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("Search fail, timeout", func(t *testing.T) { - defer wg.Done() resp, err := proxy.Search(shortCtx, &milvuspb.SearchRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("Query fail, dq queue full", func(t *testing.T) { - defer wg.Done() resp, err := proxy.Query(shortCtx, &milvuspb.QueryRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("CreateAlias fail, timeout", func(t *testing.T) { - defer wg.Done() resp, err := proxy.CreateAlias(shortCtx, &milvuspb.CreateAliasRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("DropAlias fail, timeout", func(t *testing.T) { - defer wg.Done() resp, err := proxy.DropAlias(shortCtx, &milvuspb.DropAliasRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("AlterAlias fail, timeout", func(t *testing.T) { - defer wg.Done() resp, err := proxy.AlterAlias(shortCtx, &milvuspb.AlterAliasRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("DescribeAlias fail, timeout", func(t *testing.T) { - defer wg.Done() resp, err := proxy.DescribeAlias(shortCtx, &milvuspb.DescribeAliasRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("ListAliases fail, timeout", func(t *testing.T) { - defer wg.Done() resp, err := proxy.ListAliases(shortCtx, &milvuspb.ListAliasesRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run("CreateCredential fail, timeout", func(t *testing.T) { - defer wg.Done() resp, err := proxy.CreateCredential(shortCtx, &milvuspb.CreateCredentialRequest{Username: "xxx"}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("UpdateCredential fail, timeout", func(t *testing.T) { - defer wg.Done() resp, err := proxy.UpdateCredential(shortCtx, &milvuspb.UpdateCredentialRequest{Username: "xxx"}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("DeleteCredential fail, user root cannot be deleted", func(t *testing.T) { - defer wg.Done() resp, err := proxy.DeleteCredential(shortCtx, &milvuspb.DeleteCredentialRequest{Username: "root"}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("DeleteCredential fail, timeout", func(t *testing.T) { - defer wg.Done() resp, err := proxy.DeleteCredential(shortCtx, &milvuspb.DeleteCredentialRequest{Username: "xxx"}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) @@ -3915,9 +3493,7 @@ func TestProxy(t *testing.T) { schema = constructTestCollectionSchema(collectionName, int64Field, floatVecField, binaryVecField, structField, dim) createCollectionReq = constructTestCreateCollectionRequest(dbName, collectionName, schema, shardsNum) - wg.Add(1) t.Run("create collection upsert valid", func(t *testing.T) { - defer wg.Done() req := createCollectionReq resp, err := proxy.CreateCollection(ctx, req) assert.NoError(t, err) @@ -3939,9 +3515,7 @@ func TestProxy(t *testing.T) { assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("create partition", func(t *testing.T) { - defer wg.Done() resp, err := proxy.CreatePartition(ctx, &milvuspb.CreatePartitionRequest{ Base: nil, DbName: dbName, @@ -3962,9 +3536,7 @@ func TestProxy(t *testing.T) { assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("upsert partition", func(t *testing.T) { - defer wg.Done() req := constructPartitionReqUpsertRequestValid(dbName, collectionName, partitionName, floatVecField, binaryVecField, structField, schema, rowNum, dim) resp, err := proxy.Upsert(ctx, req) @@ -3975,9 +3547,7 @@ func TestProxy(t *testing.T) { assert.Equal(t, int64(rowNum), resp.UpsertCnt) }) - wg.Add(1) t.Run("upsert when occurs unexpected error like illegal partition name", func(t *testing.T) { - defer wg.Done() req := constructPartitionReqUpsertRequestInvalid(dbName, collectionName, floatVecField, binaryVecField, structField, schema, rowNum, dim) resp, err := proxy.Upsert(ctx, req) @@ -3988,9 +3558,7 @@ func TestProxy(t *testing.T) { assert.Equal(t, int64(0), resp.UpsertCnt) }) - wg.Add(1) t.Run("upsert when autoID == false", func(t *testing.T) { - defer wg.Done() req := constructCollectionUpsertRequestValid(dbName, collectionName, floatVecField, binaryVecField, structField, schema, rowNum, dim) resp, err := proxy.Upsert(ctx, req) @@ -4002,16 +3570,11 @@ func TestProxy(t *testing.T) { }) testServer.gracefulStop() - wg.Wait() log.Info("case done") } func testProxyRole(ctx context.Context, t *testing.T, proxy *Proxy) { - var wg sync.WaitGroup - wg.Add(1) t.Run("Create Role", func(t *testing.T) { - defer wg.Done() - entity := &milvuspb.RoleEntity{Name: " "} resp, _ := proxy.CreateRole(ctx, &milvuspb.CreateRoleRequest{Entity: entity}) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) @@ -4027,10 +3590,7 @@ func testProxyRole(ctx context.Context, t *testing.T, proxy *Proxy) { assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("Drop Role", func(t *testing.T) { - defer wg.Done() - resp, _ := proxy.DropRole(ctx, &milvuspb.DropRoleRequest{RoleName: " "}) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) @@ -4082,10 +3642,7 @@ func testProxyRole(ctx context.Context, t *testing.T, proxy *Proxy) { assert.Equal(t, roleNumOfUser, len(userResp.Results[0].Roles)) }) - wg.Add(1) t.Run("Operate User Role", func(t *testing.T) { - defer wg.Done() - username := "root" roleName := "public" // AddUserToRole @@ -4111,10 +3668,7 @@ func testProxyRole(ctx context.Context, t *testing.T, proxy *Proxy) { assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run("Select Role", func(t *testing.T) { - defer wg.Done() - resp, _ := proxy.SelectRole(ctx, &milvuspb.SelectRoleRequest{Role: &milvuspb.RoleEntity{Name: " "}}) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) @@ -4149,10 +3703,7 @@ func testProxyRole(ctx context.Context, t *testing.T, proxy *Proxy) { assert.Equal(t, commonpb.ErrorCode_Success, opResp.ErrorCode) }) - wg.Add(1) t.Run("Select User", func(t *testing.T) { - defer wg.Done() - entity := &milvuspb.UserEntity{Name: " "} resp, _ := proxy.SelectUser(ctx, &milvuspb.SelectUserRequest{User: entity}) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) @@ -4179,10 +3730,7 @@ func testProxyRole(ctx context.Context, t *testing.T, proxy *Proxy) { assert.Equal(t, commonpb.ErrorCode_Success, opResp.ErrorCode) }) - wg.Add(1) t.Run("User Role mapping info", func(t *testing.T) { - defer wg.Done() - ctx := context.Background() username := fmt.Sprintf("user%d", rand.Int31()) roleName := fmt.Sprintf("role%d", rand.Int31()) @@ -4243,8 +3791,6 @@ func testProxyRole(ctx context.Context, t *testing.T, proxy *Proxy) { assert.Equal(t, 0, len(selectUserResp.Results)) } }) - - wg.Wait() } func testProxyRoleUnhealthy(ctx context.Context, t *testing.T, proxy *Proxy) { @@ -4259,37 +3805,27 @@ func testProxyRoleFail(ctx context.Context, t *testing.T, proxy *Proxy, reason s var wg sync.WaitGroup roleName := "xxx" - wg.Add(1) t.Run(fmt.Sprintf("CreateRole fail, %s", reason), func(t *testing.T) { - defer wg.Done() resp, _ := proxy.CreateRole(ctx, &milvuspb.CreateRoleRequest{Entity: &milvuspb.RoleEntity{Name: roleName}}) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run(fmt.Sprintf("DropRole fail, %s", reason), func(t *testing.T) { - defer wg.Done() resp, _ := proxy.DropRole(ctx, &milvuspb.DropRoleRequest{RoleName: roleName}) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run(fmt.Sprintf("OperateUserRole fail, %s", reason), func(t *testing.T) { - defer wg.Done() resp, _ := proxy.OperateUserRole(ctx, &milvuspb.OperateUserRoleRequest{Username: "root", RoleName: "public"}) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run(fmt.Sprintf("SelectRole fail, %s", reason), func(t *testing.T) { - defer wg.Done() resp, _ := proxy.SelectRole(ctx, &milvuspb.SelectRoleRequest{}) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Add(1) t.Run(fmt.Sprintf("SelectUser fail, %s", reason), func(t *testing.T) { - defer wg.Done() resp, _ := proxy.SelectUser(ctx, &milvuspb.SelectUserRequest{}) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) @@ -4299,10 +3835,8 @@ func testProxyRoleFail(ctx context.Context, t *testing.T, proxy *Proxy, reason s func testProxyPrivilege(ctx context.Context, t *testing.T, proxy *Proxy) { var wg sync.WaitGroup - wg.Add(1) - t.Run("Operate Privilege, Select Grant", func(t *testing.T) { - defer wg.Done() + t.Run("Operate Privilege, Select Grant", func(t *testing.T) { // GrantPrivilege req := &milvuspb.OperatePrivilegeRequest{} resp, _ := proxy.OperatePrivilege(ctx, req) @@ -4467,11 +4001,7 @@ func testProxyPrivilege(ctx context.Context, t *testing.T, proxy *Proxy) { } func testProxyOperatePrivilegeV2(ctx context.Context, t *testing.T, proxy *Proxy) { - var wg sync.WaitGroup - wg.Add(1) t.Run("Operate Privilege V2, Select Grant", func(t *testing.T) { - defer wg.Done() - // GrantPrivilege req := &milvuspb.OperatePrivilegeV2Request{} resp, _ := proxy.OperatePrivilegeV2(ctx, req) @@ -4562,8 +4092,6 @@ func testProxyOperatePrivilegeV2(ctx context.Context, t *testing.T, proxy *Proxy resp, _ = proxy.OperatePrivilegeV2(ctx, roleReq) assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - - wg.Wait() } func testProxyPrivilegeUnhealthy(ctx context.Context, t *testing.T, proxy *Proxy) { @@ -4575,10 +4103,7 @@ func testProxyPrivilegeTimeout(ctx context.Context, t *testing.T, proxy *Proxy) } func testProxyPrivilegeFail(ctx context.Context, t *testing.T, proxy *Proxy, reason string) { - var wg sync.WaitGroup - wg.Add(1) t.Run(fmt.Sprintf("Operate Grant fail, %s", reason), func(t *testing.T) { - defer wg.Done() resp, _ := proxy.OperatePrivilege(ctx, &milvuspb.OperatePrivilegeRequest{ Entity: &milvuspb.GrantEntity{ Role: &milvuspb.RoleEntity{Name: "admin"}, @@ -4590,10 +4115,7 @@ func testProxyPrivilegeFail(ctx context.Context, t *testing.T, proxy *Proxy, rea assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Add(1) t.Run(fmt.Sprintf("SelectGrant fail, %s", reason), func(t *testing.T) { - defer wg.Done() - resp, _ := proxy.SelectGrant(ctx, &milvuspb.SelectGrantRequest{ Entity: &milvuspb.GrantEntity{ Role: &milvuspb.RoleEntity{Name: "admin"}, @@ -4601,15 +4123,10 @@ func testProxyPrivilegeFail(ctx context.Context, t *testing.T, proxy *Proxy, rea }) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) - wg.Wait() } func testProxyRefreshPolicyInfoCache(ctx context.Context, t *testing.T, proxy *Proxy) { - var wg sync.WaitGroup - wg.Add(1) t.Run("RefreshPolicyInfoCache", func(t *testing.T) { - defer wg.Done() - resp, err := proxy.RefreshPolicyInfoCache(ctx, &proxypb.RefreshPolicyInfoCacheRequest{ OpType: int32(typeutil.CacheAddUserToRole), OpKey: funcutil.EncodeUserRoleCache("foo", "public"), @@ -4627,7 +4144,6 @@ func testProxyRefreshPolicyInfoCache(ctx context.Context, t *testing.T, proxy *P assert.NoError(t, err) assert.Error(t, merr.Error(resp)) }) - wg.Wait() } func testProxyRefreshPolicyInfoCacheUnhealthy(ctx context.Context, t *testing.T, proxy *Proxy) { @@ -4635,17 +4151,13 @@ func testProxyRefreshPolicyInfoCacheUnhealthy(ctx context.Context, t *testing.T, } func testProxyRefreshPolicyInfoCacheFail(ctx context.Context, t *testing.T, proxy *Proxy, reason string) { - var wg sync.WaitGroup - wg.Add(1) t.Run(fmt.Sprintf("RefreshPolicyInfoCache fail, %s", reason), func(t *testing.T) { - defer wg.Done() resp, _ := proxy.RefreshPolicyInfoCache(ctx, &proxypb.RefreshPolicyInfoCacheRequest{ OpType: int32(typeutil.CacheAddUserToRole), OpKey: funcutil.EncodeUserRoleCache("foo", "public"), }) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) - wg.Wait() } func Test_GetCompactionState(t *testing.T) { diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index dd3ec16893..d2cba0b31d 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -39,17 +39,16 @@ import ( . "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/utils" + kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv" "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/kv" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/proto/indexpb" "github.com/milvus-io/milvus/pkg/v2/proto/querypb" "github.com/milvus-io/milvus/pkg/v2/proto/rootcoordpb" - "github.com/milvus-io/milvus/pkg/v2/util/etcd" "github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/metricsinfo" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" - "github.com/milvus-io/milvus/pkg/v2/util/testutils" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) @@ -61,7 +60,6 @@ type distribution struct { type TaskSuite struct { suite.Suite - testutils.EmbedEtcdUtil // Data collection int64 @@ -147,15 +145,7 @@ func (suite *TaskSuite) SetupTest() { config := GenerateEtcdConfig() suite.ctx = context.Background() - cli, err := etcd.GetEtcdClient( - config.UseEmbedEtcd.GetAsBool(), - config.EtcdUseSSL.GetAsBool(), - config.Endpoints.GetAsStrings(), - config.EtcdTLSCert.GetValue(), - config.EtcdTLSKey.GetValue(), - config.EtcdTLSCACert.GetValue(), - config.EtcdTLSMinVersion.GetValue()) - suite.Require().NoError(err) + cli, _ := kvfactory.GetEtcdAndPath() suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath.GetValue()) suite.store = querycoord.NewCatalog(suite.kv) diff --git a/pkg/config/etcd_source_test.go b/pkg/config/etcd_source_test.go index 7ec5c860cc..92a2deebb3 100644 --- a/pkg/config/etcd_source_test.go +++ b/pkg/config/etcd_source_test.go @@ -19,11 +19,11 @@ package config import ( "context" "os" + "strings" "testing" "time" "github.com/stretchr/testify/suite" - "go.etcd.io/etcd/server/v3/embed" "go.uber.org/atomic" "github.com/milvus-io/milvus/pkg/v2/util/etcd" @@ -32,29 +32,18 @@ import ( type EtcdSourceSuite struct { suite.Suite - embedEtcdServer *embed.Etcd - tempDir string - endpoints []string + endpoints []string } func (s *EtcdSourceSuite) SetupSuite() { - // init embed etcd - embedServer, tempDir, err := etcd.StartTestEmbedEtcdServer() - - s.Require().NoError(err) - - s.embedEtcdServer = embedServer - s.tempDir = tempDir - s.endpoints = etcd.GetEmbedEtcdEndpoints(embedServer) + endpoints := os.Getenv("ETCD_ENDPOINTS") + if endpoints == "" { + endpoints = "localhost:2379" + } + s.endpoints = strings.Split(endpoints, ",") } func (s *EtcdSourceSuite) TearDownSuite() { - if s.embedEtcdServer != nil { - s.embedEtcdServer.Close() - } - if s.tempDir != "" { - os.RemoveAll(s.tempDir) - } } func (s *EtcdSourceSuite) TestNewSource() { diff --git a/pkg/streaming/walimpls/impls/kafka/kafka_test.go b/pkg/streaming/walimpls/impls/kafka/kafka_test.go index 44ee314279..516baf82e8 100644 --- a/pkg/streaming/walimpls/impls/kafka/kafka_test.go +++ b/pkg/streaming/walimpls/impls/kafka/kafka_test.go @@ -1,6 +1,7 @@ package kafka import ( + "os" "testing" "github.com/stretchr/testify/assert" @@ -28,6 +29,9 @@ func TestRegistry(t *testing.T) { } func TestKafka(t *testing.T) { + if os.Getenv("MILVUS_UT_WITHOUT_KAFKA") != "" { + t.Skip("there's no kafka broker available, skipping kafka test") + } walimpls.NewWALImplsTestFramework(t, 100, &builderImpl{}).Run() } diff --git a/pkg/util/etcd/etcd_util.go b/pkg/util/etcd/etcd_util.go index 5ad72c7b6d..512a3dac9d 100644 --- a/pkg/util/etcd/etcd_util.go +++ b/pkg/util/etcd/etcd_util.go @@ -253,20 +253,6 @@ func buildKvGroup(keys, values []string) (map[string]string, error) { return ret, nil } -// StartTestEmbedEtcdServer returns a newly created embed etcd server. -// ### USED FOR UNIT TEST ONLY ### -func StartTestEmbedEtcdServer() (*embed.Etcd, string, error) { - dir, err := os.MkdirTemp(os.TempDir(), "milvus_ut") - if err != nil { - return nil, "", err - } - config := embed.NewConfig() - config.Dir = dir - config.LogLevel = "warn" - server, err := embed.StartEtcd(config) - return server, dir, err -} - // GetEmbedEtcdEndpoints returns etcd listener address for endpoint config. func GetEmbedEtcdEndpoints(server *embed.Etcd) []string { addrs := make([]string, 0, len(server.Clients)) diff --git a/pkg/util/testutils/embed_etcd.go b/pkg/util/testutils/embed_etcd.go deleted file mode 100644 index e379125878..0000000000 --- a/pkg/util/testutils/embed_etcd.go +++ /dev/null @@ -1,50 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package testutils - -import ( - "os" - - "go.etcd.io/etcd/server/v3/embed" - - "github.com/milvus-io/milvus/pkg/v2/util/etcd" -) - -type EmbedEtcdUtil struct { - server *embed.Etcd - tempDir string -} - -func (util *EmbedEtcdUtil) SetupEtcd() ([]string, error) { - // init embed etcd - embedetcdServer, tempDir, err := etcd.StartTestEmbedEtcdServer() - if err != nil { - return nil, err - } - util.server, util.tempDir = embedetcdServer, tempDir - - return etcd.GetEmbedEtcdEndpoints(embedetcdServer), nil -} - -func (util *EmbedEtcdUtil) TearDownEmbedEtcd() { - if util.server != nil { - util.server.Close() - } - if util.tempDir != "" { - os.RemoveAll(util.tempDir) - } -} diff --git a/scripts/run_go_codecov.sh b/scripts/run_go_codecov.sh index bd94468459..28a8031ecc 100755 --- a/scripts/run_go_codecov.sh +++ b/scripts/run_go_codecov.sh @@ -16,61 +16,64 @@ # See the License for the specific language governing permissions and # limitations under the License. -FILE_COVERAGE_INFO="go_coverage.txt" -FILE_COVERAGE_HTML="go_coverage.html" +FILE_COVERAGE_INFO="$PWD/go_coverage.txt" +FILE_COVERAGE_HTML="$PWD/go_coverage.html" + BASEDIR=$(dirname "$0") source $BASEDIR/setenv.sh -set -ex -echo "mode: atomic" > ${FILE_COVERAGE_INFO} +set -e -# run unittest -echo "Running unittest under ./internal & ./pkg" +echo "mode: atomic" > ${FILE_COVERAGE_INFO} TEST_CMD=$@ if [ -z "$TEST_CMD" ]; then TEST_CMD="go test" fi -# starting the timer -beginTime=`date +%s` -pushd cmd/tools -$TEST_CMD -gcflags="all=-N -l" -race -tags dynamic,test -v -buildvcs=false -coverpkg=./... -coverprofile=profile.out -covermode=atomic ./... -if [ -f profile.out ]; then - grep -v kafka profile.out | grep -v planparserv2/generated | grep -v mocks | sed '1d' >> ../${FILE_COVERAGE_INFO} - rm profile.out -fi -popd -for d in $(go list -buildvcs=false ./internal/... | grep -v -e vendor -e kafka -e planparserv2/generated -e mocks); do - $TEST_CMD -gcflags="all=-N -l" -race -tags dynamic,test -v -buildvcs=false -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d" - if [ -f profile.out ]; then - grep -v kafka profile.out | grep -v planparserv2/generated | grep -v mocks | sed '1d' >> ${FILE_COVERAGE_INFO} - rm profile.out - fi -done -pushd pkg -for d in $(go list -buildvcs=false ./... | grep -v -e vendor -e kafka -e planparserv2/generated -e mocks); do - $TEST_CMD -gcflags="all=-N -l" -race -tags dynamic,test -v -buildvcs=false -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d" - if [ -f profile.out ]; then - grep -v kafka profile.out | grep -v planparserv2/generated | grep -v mocks | sed '1d' >> ../${FILE_COVERAGE_INFO} - rm profile.out - fi -done -popd -# milvusclient -pushd client -for d in $(go list -buildvcs=false ./... | grep -v -e vendor -e kafka -e planparserv2/generated -e mocks); do - $TEST_CMD -gcflags="all=-N -l" -race -tags dynamic -v -buildvcs=false -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d" - if [ -f profile.out ]; then - grep -v kafka profile.out | grep -v planparserv2/generated | grep -v mocks | sed '1d' >> ../${FILE_COVERAGE_INFO} - rm profile.out - fi -done -popd -endTime=`date +%s` +TEST_CMD_WITH_ARGS=( + $TEST_CMD + "-gcflags=all=-N -l" + -race + -tags dynamic,test + -v + -failfast + -buildvcs=false + -coverpkg=./... + -coverprofile=profile.out + -covermode=atomic +) -echo "Total time for go unittest:" $(($endTime-$beginTime)) "s" +function test_cmd() { + mapfile -t PKGS < <(go list -tags dynamic,test ./...) + for pkg in "${PKGS[@]}"; do + echo -e "-----------------------------------\nRunning test cases at $pkg ..." + "${TEST_CMD_WITH_ARGS[@]}" "$pkg" + if [ -f profile.out ]; then + # Skip the per-profile header to keep a single global "mode:" line + # Skip the packages that are not covered by the test + sed '1{/^mode:/d}' profile.out | grep -vE '(planparserv2/generated|mocks)' >> "${FILE_COVERAGE_INFO}" || [ $? -eq 1 ] + rm profile.out + fi + echo -e "-----------------------------------\n" + done +} + +export MILVUS_UT_WITHOUT_KAFKA=1 # kafka is not available in the CI environment, so skip the kafka tests + +# starting the timer +beginTime=$(date +%s) +echo -e "=== Running go unittest ===\n\n" + +for d in cmd/tools internal pkg client; do + pushd "$d" + test_cmd + popd +done + +endTime=$(date +%s) +echo -e "=== Total time for go unittest: $(($endTime-$beginTime)) s ===" # generate html report # go tool cover -html=./${FILE_COVERAGE_INFO} -o ./${FILE_COVERAGE_HTML} diff --git a/scripts/run_intergration_test.sh b/scripts/run_intergration_test.sh index b828c50f5d..529837baf2 100755 --- a/scripts/run_intergration_test.sh +++ b/scripts/run_intergration_test.sh @@ -23,36 +23,55 @@ FILE_COVERAGE_INFO="it_coverage.txt" BASEDIR=$(dirname "$0") source $BASEDIR/setenv.sh -TEST_CMD=$@ -if [ -z "$TEST_CMD" ]; then - TEST_CMD="go test -failfast -count=1" -fi - set -e + echo "mode: atomic" > ${FILE_COVERAGE_INFO} echo "MILVUS_WORK_DIR: $MILVUS_WORK_DIR" -# starting the timer -beginTime=`date +%s` +TEST_CMD=$@ +if [ -z "$TEST_CMD" ]; then + TEST_CMD="go test" +fi -for d in $(go list ./tests/integration/...); do - echo "Start to run integration test under \"$d\" pkg" - if [[ $d == *"coordrecovery"* ]]; then - echo "running coordrecovery" - # simplified command to speed up coord init test since it is large. - $TEST_CMD -tags dynamic,test -v -coverprofile=profile.out -covermode=atomic "$d" -caseTimeout=20m -timeout=30m - elif [[ $d == *"import"* ]]; then - $TEST_CMD -tags dynamic,test -v -coverprofile=profile.out -covermode=atomic "$d" -caseTimeout=20m -timeout=60m - else - $TEST_CMD -race -tags dynamic,test -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d" -caseTimeout=15m -timeout=30m - fi - if [ -f profile.out ]; then - grep -v kafka profile.out | grep -v planparserv2/generated | grep -v mocks | sed '1d' >> ${FILE_COVERAGE_INFO} - rm profile.out - fi +TEST_CMD_WITH_ARGS=( + $TEST_CMD + "-gcflags=all=-N -l" + -race + -tags dynamic,test + -v + -failfast + -count=1 + -buildvcs=false + -coverpkg=./... + -coverprofile=profile.out + -covermode=atomic + -caseTimeout=20m + -timeout=60m +) + +function test_cmd() { + mapfile -t PKGS < <(go list -tags dynamic,test ./...) + for pkg in "${PKGS[@]}"; do + echo -e "-----------------------------------\nRunning test cases at $pkg ..." + "${TEST_CMD_WITH_ARGS[@]}" "$pkg" + if [ -f profile.out ]; then + # Skip the per-profile header to keep a single global "mode:" line + # Skip the packages that are not covered by the test + sed '1{/^mode:/d}' profile.out | grep -vE '(planparserv2/generated|mocks)' >> "${FILE_COVERAGE_INFO}" || [ $? -eq 1 ] + rm profile.out + fi + echo -e "-----------------------------------\n" + done +} + +beginTime=`date +%s` +printf "=== Running integration test ===\n\n" + +for d in tests/integration; do + pushd "$d" + test_cmd + popd done endTime=`date +%s` - -echo "Total time for go integration test:" $(($endTime-$beginTime)) "s" - +printf "=== Total time for go integration test: $(($endTime-$beginTime)) s==="