diff --git a/internal/proxy/proxy_test.go b/internal/proxy/proxy_test.go index 94b0873ac5..518390eba9 100644 --- a/internal/proxy/proxy_test.go +++ b/internal/proxy/proxy_test.go @@ -304,7 +304,9 @@ func runIndexNode(ctx context.Context, localMsg bool, alias string) *grpcindexno func TestProxy(t *testing.T) { var err error - err = os.Setenv("ROCKSMQ_PATH", "/tmp/milvus/rocksmq") + path := "/tmp/milvus/rocksmq" + err = os.Setenv("ROCKSMQ_PATH", path) + defer os.RemoveAll(path) assert.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -645,7 +647,11 @@ func TestProxy(t *testing.T) { } } + var wg sync.WaitGroup + + wg.Add(1) t.Run("create collection", func(t *testing.T) { + defer wg.Done() req := createCollectionReq resp, err := proxy.CreateCollection(ctx, req) assert.NoError(t, err) @@ -658,7 +664,9 @@ func TestProxy(t *testing.T) { assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) + wg.Add(1) t.Run("create alias", func(t *testing.T) { + defer wg.Done() // create alias aliasReq := &milvuspb.CreateAliasRequest{ Base: nil, @@ -691,7 +699,9 @@ func TestProxy(t *testing.T) { assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) + wg.Add(1) t.Run("alter alias", func(t *testing.T) { + defer wg.Done() // alter alias alterReq := &milvuspb.AlterAliasRequest{ Base: nil, @@ -724,7 +734,9 @@ func TestProxy(t *testing.T) { assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) + wg.Add(1) t.Run("drop alias", func(t *testing.T) { + defer wg.Done() // drop alias resp, err := proxy.DropAlias(ctx, &milvuspb.DropAliasRequest{ Base: nil, @@ -755,7 +767,9 @@ func TestProxy(t *testing.T) { assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) + wg.Add(1) t.Run("has collection", func(t *testing.T) { + defer wg.Done() resp, err := proxy.HasCollection(ctx, &milvuspb.HasCollectionRequest{ Base: nil, DbName: dbName, @@ -778,7 +792,9 @@ func TestProxy(t *testing.T) { assert.False(t, resp.Value) }) + wg.Add(1) t.Run("describe collection", func(t *testing.T) { + defer wg.Done() collectionID, err := globalMetaCache.GetCollectionID(ctx, collectionName) assert.NoError(t, err) @@ -808,7 +824,9 @@ func TestProxy(t *testing.T) { assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("get collection statistics", func(t *testing.T) { + defer wg.Done() resp, err := proxy.GetCollectionStatistics(ctx, &milvuspb.GetCollectionStatisticsRequest{ Base: nil, DbName: dbName, @@ -828,7 +846,9 @@ func TestProxy(t *testing.T) { assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("show collections", func(t *testing.T) { + defer wg.Done() resp, err := proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{ Base: nil, DbName: dbName, @@ -841,7 +861,9 @@ func TestProxy(t *testing.T) { assert.Equal(t, 1, len(resp.CollectionNames)) }) + wg.Add(1) t.Run("create partition", func(t *testing.T) { + defer wg.Done() resp, err := proxy.CreatePartition(ctx, &milvuspb.CreatePartitionRequest{ Base: nil, DbName: dbName, @@ -872,7 +894,9 @@ func TestProxy(t *testing.T) { assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) + wg.Add(1) t.Run("has partition", func(t *testing.T) { + defer wg.Done() resp, err := proxy.HasPartition(ctx, &milvuspb.HasPartitionRequest{ Base: nil, DbName: dbName, @@ -904,7 +928,9 @@ func TestProxy(t *testing.T) { assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("get partition statistics", func(t *testing.T) { + defer wg.Done() resp, err := proxy.GetPartitionStatistics(ctx, &milvuspb.GetPartitionStatisticsRequest{ Base: nil, DbName: dbName, @@ -935,7 +961,9 @@ func TestProxy(t *testing.T) { assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("show partitions", func(t *testing.T) { + defer wg.Done() collectionID, err := globalMetaCache.GetCollectionID(ctx, collectionName) assert.NoError(t, err) @@ -965,7 +993,9 @@ func TestProxy(t *testing.T) { assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("insert", func(t *testing.T) { + defer wg.Done() req := constructInsertRequest() resp, err := proxy.Insert(ctx, req) @@ -979,7 +1009,9 @@ func TestProxy(t *testing.T) { // TODO(dragondriver): proxy.Delete() flushed := true // fortunately, no task depends on this state, maybe CreateIndex? + wg.Add(1) t.Run("flush", func(t *testing.T) { + defer wg.Done() resp, err := proxy.Flush(ctx, &milvuspb.FlushRequest{ Base: nil, DbName: dbName, @@ -1031,7 +1063,9 @@ func TestProxy(t *testing.T) { log.Warn("flush operation was not sure to be done") } + wg.Add(1) t.Run("create index", func(t *testing.T) { + defer wg.Done() req := constructCreateIndexRequest() resp, err := proxy.CreateIndex(ctx, req) @@ -1039,7 +1073,9 @@ func TestProxy(t *testing.T) { assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) + wg.Add(1) t.Run("describe index", func(t *testing.T) { + defer wg.Done() resp, err := proxy.DescribeIndex(ctx, &milvuspb.DescribeIndexRequest{ Base: nil, DbName: dbName, @@ -1052,7 +1088,9 @@ func TestProxy(t *testing.T) { indexName = resp.IndexDescriptions[0].IndexName }) + wg.Add(1) t.Run("get index build progress", func(t *testing.T) { + defer wg.Done() resp, err := proxy.GetIndexBuildProgress(ctx, &milvuspb.GetIndexBuildProgressRequest{ Base: nil, DbName: dbName, @@ -1064,7 +1102,9 @@ func TestProxy(t *testing.T) { assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("get index state", func(t *testing.T) { + defer wg.Done() resp, err := proxy.GetIndexState(ctx, &milvuspb.GetIndexStateRequest{ Base: nil, DbName: dbName, @@ -1077,7 +1117,9 @@ func TestProxy(t *testing.T) { }) loaded := true + wg.Add(1) t.Run("load collection", func(t *testing.T) { + defer wg.Done() resp, err := proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ Base: nil, DbName: dbName, @@ -1131,7 +1173,9 @@ func TestProxy(t *testing.T) { log.Warn("load operation was not sure to be done") } + wg.Add(1) t.Run("show in-memory collections", func(t *testing.T) { + defer wg.Done() resp, err := proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{ Base: nil, DbName: dbName, @@ -1169,7 +1213,9 @@ func TestProxy(t *testing.T) { }) if loaded { + wg.Add(1) t.Run("search", func(t *testing.T) { + defer wg.Done() req := constructSearchRequest() //resp, err := proxy.Search(ctx, req) @@ -1180,7 +1226,9 @@ func TestProxy(t *testing.T) { // TODO(dragondriver): compare search result }) + wg.Add(1) t.Run("query", func(t *testing.T) { + defer wg.Done() //resp, err := proxy.Query(ctx, &milvuspb.QueryRequest{ _, err := proxy.Query(ctx, &milvuspb.QueryRequest{ Base: nil, @@ -1199,7 +1247,9 @@ func TestProxy(t *testing.T) { }) } + wg.Add(1) t.Run("calculate distance", func(t *testing.T) { + defer wg.Done() opLeft := &milvuspb.VectorsArray{ Array: &milvuspb.VectorsArray_DataArray{ DataArray: &schemapb.VectorField{ @@ -1252,7 +1302,9 @@ func TestProxy(t *testing.T) { assert.Panics(t, f) }) + wg.Add(1) t.Run("get persistent segment info", func(t *testing.T) { + defer wg.Done() resp, err := proxy.GetPersistentSegmentInfo(ctx, &milvuspb.GetPersistentSegmentInfoRequest{ Base: nil, DbName: dbName, @@ -1262,7 +1314,9 @@ func TestProxy(t *testing.T) { assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("get query segment info", func(t *testing.T) { + defer wg.Done() resp, err := proxy.GetQuerySegmentInfo(ctx, &milvuspb.GetQuerySegmentInfoRequest{ Base: nil, DbName: dbName, @@ -1274,13 +1328,17 @@ func TestProxy(t *testing.T) { // TODO(dragondriver): dummy + wg.Add(1) t.Run("register link", func(t *testing.T) { + defer wg.Done() resp, err := proxy.RegisterLink(ctx, &milvuspb.RegisterLinkRequest{}) assert.NoError(t, err) assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("get metrics", func(t *testing.T) { + defer wg.Done() req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics) assert.NoError(t, err) resp, err := proxy.GetMetrics(ctx, req) @@ -1308,7 +1366,9 @@ func TestProxy(t *testing.T) { assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("release collection", func(t *testing.T) { + defer wg.Done() collectionID, err := globalMetaCache.GetCollectionID(ctx, collectionName) assert.NoError(t, err) @@ -1330,7 +1390,9 @@ func TestProxy(t *testing.T) { assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) + wg.Add(1) t.Run("show in-memory collections after release", func(t *testing.T) { + defer wg.Done() resp, err := proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{ Base: nil, DbName: dbName, @@ -1343,7 +1405,9 @@ func TestProxy(t *testing.T) { assert.Equal(t, 0, len(resp.CollectionNames)) }) + wg.Add(1) t.Run("load partitions", func(t *testing.T) { + defer wg.Done() resp, err := proxy.LoadPartitions(ctx, &milvuspb.LoadPartitionsRequest{ Base: nil, DbName: dbName, @@ -1374,7 +1438,9 @@ func TestProxy(t *testing.T) { assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) + wg.Add(1) t.Run("show in-memory partitions", func(t *testing.T) { + defer wg.Done() collectionID, err := globalMetaCache.GetCollectionID(ctx, collectionName) assert.NoError(t, err) @@ -1416,7 +1482,9 @@ func TestProxy(t *testing.T) { assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("release partition", func(t *testing.T) { + defer wg.Done() resp, err := proxy.ReleasePartitions(ctx, &milvuspb.ReleasePartitionsRequest{ Base: nil, DbName: dbName, @@ -1427,7 +1495,9 @@ func TestProxy(t *testing.T) { assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) + wg.Add(1) t.Run("show in-memory partitions after release partition", func(t *testing.T) { + defer wg.Done() collectionID, err := globalMetaCache.GetCollectionID(ctx, collectionName) assert.NoError(t, err) @@ -1456,7 +1526,9 @@ func TestProxy(t *testing.T) { assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("drop partition", func(t *testing.T) { + defer wg.Done() resp, err := proxy.DropPartition(ctx, &milvuspb.DropPartitionRequest{ Base: nil, DbName: dbName, @@ -1505,7 +1577,9 @@ func TestProxy(t *testing.T) { assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) + wg.Add(1) t.Run("has partition after drop partition", func(t *testing.T) { + defer wg.Done() resp, err := proxy.HasPartition(ctx, &milvuspb.HasPartitionRequest{ Base: nil, DbName: dbName, @@ -1517,7 +1591,9 @@ func TestProxy(t *testing.T) { assert.False(t, resp.Value) }) + wg.Add(1) t.Run("show partitions after drop partition", func(t *testing.T) { + defer wg.Done() collectionID, err := globalMetaCache.GetCollectionID(ctx, collectionName) assert.NoError(t, err) @@ -1535,7 +1611,9 @@ func TestProxy(t *testing.T) { assert.Equal(t, 1, len(resp.PartitionNames)) }) + wg.Add(1) t.Run("drop index", func(t *testing.T) { + defer wg.Done() resp, err := proxy.DropIndex(ctx, &milvuspb.DropIndexRequest{ Base: nil, DbName: dbName, @@ -1547,7 +1625,9 @@ func TestProxy(t *testing.T) { assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) + wg.Add(1) t.Run("Delete", func(t *testing.T) { + defer wg.Done() _, err := proxy.Delete(ctx, &milvuspb.DeleteRequest{ Base: nil, DbName: dbName, @@ -1558,7 +1638,9 @@ func TestProxy(t *testing.T) { assert.NoError(t, err) }) + wg.Add(1) t.Run("drop collection", func(t *testing.T) { + defer wg.Done() collectionID, err := globalMetaCache.GetCollectionID(ctx, collectionName) assert.NoError(t, err) @@ -1588,7 +1670,9 @@ func TestProxy(t *testing.T) { assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) + wg.Add(1) t.Run("has collection after drop collection", func(t *testing.T) { + defer wg.Done() resp, err := proxy.HasCollection(ctx, &milvuspb.HasCollectionRequest{ Base: nil, DbName: dbName, @@ -1600,7 +1684,9 @@ func TestProxy(t *testing.T) { assert.False(t, resp.Value) }) + wg.Add(1) t.Run("show all collections after drop collection", func(t *testing.T) { + defer wg.Done() resp, err := proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{ Base: nil, DbName: dbName, @@ -1625,181 +1711,241 @@ func TestProxy(t *testing.T) { proxy.UpdateStateCode(internalpb.StateCode_Abnormal) + wg.Add(1) t.Run("ReleaseDQLMessageStream fail, unhealthy", func(t *testing.T) { + defer wg.Done() resp, err := proxy.ReleaseDQLMessageStream(ctx, &proxypb.ReleaseDQLMessageStreamRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) + wg.Add(1) t.Run("CreateCollection fail, unhealthy", func(t *testing.T) { + defer wg.Done() resp, err := proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) + wg.Add(1) t.Run("DropCollection fail, unhealthy", func(t *testing.T) { + defer wg.Done() resp, err := proxy.DropCollection(ctx, &milvuspb.DropCollectionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) + wg.Add(1) t.Run("HasCollection fail, unhealthy", func(t *testing.T) { + defer wg.Done() resp, err := proxy.HasCollection(ctx, &milvuspb.HasCollectionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("LoadCollection fail, unhealthy", func(t *testing.T) { + defer wg.Done() resp, err := proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) + wg.Add(1) t.Run("ReleaseCollection fail, unhealthy", func(t *testing.T) { + defer wg.Done() resp, err := proxy.ReleaseCollection(ctx, &milvuspb.ReleaseCollectionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) + wg.Add(1) t.Run("DescribeCollection fail, unhealthy", func(t *testing.T) { + defer wg.Done() resp, err := proxy.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("GetCollectionStatistics fail, unhealthy", func(t *testing.T) { + defer wg.Done() resp, err := proxy.GetCollectionStatistics(ctx, &milvuspb.GetCollectionStatisticsRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("ShowCollections fail, unhealthy", func(t *testing.T) { + defer wg.Done() resp, err := proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("CreatePartition fail, unhealthy", func(t *testing.T) { + defer wg.Done() resp, err := proxy.CreatePartition(ctx, &milvuspb.CreatePartitionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) + wg.Add(1) t.Run("DropPartition fail, unhealthy", func(t *testing.T) { + defer wg.Done() resp, err := proxy.DropPartition(ctx, &milvuspb.DropPartitionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) + wg.Add(1) t.Run("HasPartition fail, unhealthy", func(t *testing.T) { + defer wg.Done() resp, err := proxy.HasPartition(ctx, &milvuspb.HasPartitionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("LoadPartitions fail, unhealthy", func(t *testing.T) { + defer wg.Done() resp, err := proxy.LoadPartitions(ctx, &milvuspb.LoadPartitionsRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) + wg.Add(1) t.Run("ReleasePartitions fail, unhealthy", func(t *testing.T) { + defer wg.Done() resp, err := proxy.ReleasePartitions(ctx, &milvuspb.ReleasePartitionsRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) + wg.Add(1) t.Run("GetPartitionStatistics fail, unhealthy", func(t *testing.T) { + defer wg.Done() resp, err := proxy.GetPartitionStatistics(ctx, &milvuspb.GetPartitionStatisticsRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("ShowPartitions fail, unhealthy", func(t *testing.T) { + defer wg.Done() resp, err := proxy.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("CreateIndex fail, unhealthy", func(t *testing.T) { + defer wg.Done() resp, err := proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) + wg.Add(1) t.Run("DescribeIndex fail, unhealthy", func(t *testing.T) { + defer wg.Done() resp, err := proxy.DescribeIndex(ctx, &milvuspb.DescribeIndexRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("DropIndex fail, unhealthy", func(t *testing.T) { + defer wg.Done() resp, err := proxy.DropIndex(ctx, &milvuspb.DropIndexRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) + wg.Add(1) t.Run("GetIndexBuildProgress fail, unhealthy", func(t *testing.T) { + defer wg.Done() resp, err := proxy.GetIndexBuildProgress(ctx, &milvuspb.GetIndexBuildProgressRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("GetIndexState fail, unhealthy", func(t *testing.T) { + defer wg.Done() resp, err := proxy.GetIndexState(ctx, &milvuspb.GetIndexStateRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("Insert fail, unhealthy", func(t *testing.T) { + defer wg.Done() resp, err := proxy.Insert(ctx, &milvuspb.InsertRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("Delete fail, unhealthy", func(t *testing.T) { + defer wg.Done() resp, err := proxy.Delete(ctx, &milvuspb.DeleteRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("Search fail, unhealthy", func(t *testing.T) { + defer wg.Done() resp, err := proxy.Search(ctx, &milvuspb.SearchRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("Flush fail, unhealthy", func(t *testing.T) { + defer wg.Done() resp, err := proxy.Flush(ctx, &milvuspb.FlushRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("Query fail, unhealthy", func(t *testing.T) { + defer wg.Done() resp, err := proxy.Query(ctx, &milvuspb.QueryRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("GetPersistentSegmentInfo fail, unhealthy", func(t *testing.T) { + defer wg.Done() resp, err := proxy.GetPersistentSegmentInfo(ctx, &milvuspb.GetPersistentSegmentInfoRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("GetQuerySegmentInfo fail, unhealthy", func(t *testing.T) { + defer wg.Done() resp, err := proxy.GetQuerySegmentInfo(ctx, &milvuspb.GetQuerySegmentInfoRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("RegisterLink fail, unhealthy", func(t *testing.T) { + defer wg.Done() resp, err := proxy.RegisterLink(ctx, &milvuspb.RegisterLinkRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("GetMetrics fail, unhealthy", func(t *testing.T) { + defer wg.Done() resp, err := proxy.GetMetrics(ctx, &milvuspb.GetMetricsRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) @@ -1812,127 +1958,169 @@ func TestProxy(t *testing.T) { ddParallel := proxy.sched.ddQueue.getMaxTaskNum() proxy.sched.ddQueue.setMaxTaskNum(0) + wg.Add(1) t.Run("CreateCollection fail, dd queue full", func(t *testing.T) { + defer wg.Done() resp, err := proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) + wg.Add(1) t.Run("DropCollection fail, dd queue full", func(t *testing.T) { + defer wg.Done() resp, err := proxy.DropCollection(ctx, &milvuspb.DropCollectionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) + wg.Add(1) t.Run("HasCollection fail, dd queue full", func(t *testing.T) { + defer wg.Done() resp, err := proxy.HasCollection(ctx, &milvuspb.HasCollectionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("LoadCollection fail, dd queue full", func(t *testing.T) { + defer wg.Done() resp, err := proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) + wg.Add(1) t.Run("ReleaseCollection fail, dd queue full", func(t *testing.T) { + defer wg.Done() resp, err := proxy.ReleaseCollection(ctx, &milvuspb.ReleaseCollectionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) + wg.Add(1) t.Run("DescribeCollection fail, dd queue full", func(t *testing.T) { + defer wg.Done() resp, err := proxy.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("GetCollectionStatistics fail, dd queue full", func(t *testing.T) { + defer wg.Done() resp, err := proxy.GetCollectionStatistics(ctx, &milvuspb.GetCollectionStatisticsRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("ShowCollections fail, dd queue full", func(t *testing.T) { + defer wg.Done() resp, err := proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("CreatePartition fail, dd queue full", func(t *testing.T) { + defer wg.Done() resp, err := proxy.CreatePartition(ctx, &milvuspb.CreatePartitionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) + wg.Add(1) t.Run("DropPartition fail, dd queue full", func(t *testing.T) { + defer wg.Done() resp, err := proxy.DropPartition(ctx, &milvuspb.DropPartitionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) + wg.Add(1) t.Run("HasPartition fail, dd queue full", func(t *testing.T) { + defer wg.Done() resp, err := proxy.HasPartition(ctx, &milvuspb.HasPartitionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("LoadPartitions fail, dd queue full", func(t *testing.T) { + defer wg.Done() resp, err := proxy.LoadPartitions(ctx, &milvuspb.LoadPartitionsRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) + wg.Add(1) t.Run("ReleasePartitions fail, dd queue full", func(t *testing.T) { + defer wg.Done() resp, err := proxy.ReleasePartitions(ctx, &milvuspb.ReleasePartitionsRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) + wg.Add(1) t.Run("GetPartitionStatistics fail, dd queue full", func(t *testing.T) { + defer wg.Done() resp, err := proxy.GetPartitionStatistics(ctx, &milvuspb.GetPartitionStatisticsRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("ShowPartitions fail, dd queue full", func(t *testing.T) { + defer wg.Done() resp, err := proxy.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("CreateIndex fail, dd queue full", func(t *testing.T) { + defer wg.Done() resp, err := proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) + wg.Add(1) t.Run("DescribeIndex fail, dd queue full", func(t *testing.T) { + defer wg.Done() resp, err := proxy.DescribeIndex(ctx, &milvuspb.DescribeIndexRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("DropIndex fail, dd queue full", func(t *testing.T) { + defer wg.Done() resp, err := proxy.DropIndex(ctx, &milvuspb.DropIndexRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) + wg.Add(1) t.Run("GetIndexBuildProgress fail, dd queue full", func(t *testing.T) { + defer wg.Done() resp, err := proxy.GetIndexBuildProgress(ctx, &milvuspb.GetIndexBuildProgressRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("GetIndexState fail, dd queue full", func(t *testing.T) { + defer wg.Done() resp, err := proxy.GetIndexState(ctx, &milvuspb.GetIndexStateRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("Flush fail, dd queue full", func(t *testing.T) { + defer wg.Done() resp, err := proxy.Flush(ctx, &milvuspb.FlushRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) @@ -1943,13 +2131,17 @@ func TestProxy(t *testing.T) { dmParallelism := proxy.sched.dmQueue.getMaxTaskNum() proxy.sched.dmQueue.setMaxTaskNum(0) + wg.Add(1) t.Run("Insert fail, dm queue full", func(t *testing.T) { + defer wg.Done() resp, err := proxy.Insert(ctx, &milvuspb.InsertRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("Delete fail, dm queue full", func(t *testing.T) { + defer wg.Done() resp, err := proxy.Delete(ctx, &milvuspb.DeleteRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) @@ -1960,13 +2152,17 @@ func TestProxy(t *testing.T) { dqParallelism := proxy.sched.dqQueue.getMaxTaskNum() proxy.sched.dqQueue.setMaxTaskNum(0) + wg.Add(1) t.Run("Search fail, dq queue full", func(t *testing.T) { + defer wg.Done() resp, err := proxy.Search(ctx, &milvuspb.SearchRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("Query fail, dq queue full", func(t *testing.T) { + defer wg.Done() resp, err := proxy.Query(ctx, &milvuspb.QueryRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) @@ -1981,156 +2177,207 @@ func TestProxy(t *testing.T) { defer shortCancel() time.Sleep(timeout) + wg.Add(1) t.Run("CreateCollection, timeout", func(t *testing.T) { + defer wg.Done() resp, err := proxy.CreateCollection(shortCtx, &milvuspb.CreateCollectionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) + wg.Add(1) t.Run("DropCollection fail, timeout", func(t *testing.T) { + defer wg.Done() resp, err := proxy.DropCollection(shortCtx, &milvuspb.DropCollectionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) + wg.Add(1) t.Run("HasCollection fail, timeout", func(t *testing.T) { + defer wg.Done() resp, err := proxy.HasCollection(shortCtx, &milvuspb.HasCollectionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("LoadCollection fail, timeout", func(t *testing.T) { + defer wg.Done() resp, err := proxy.LoadCollection(shortCtx, &milvuspb.LoadCollectionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) + wg.Add(1) t.Run("ReleaseCollection fail, timeout", func(t *testing.T) { + defer wg.Done() resp, err := proxy.ReleaseCollection(shortCtx, &milvuspb.ReleaseCollectionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) + wg.Add(1) t.Run("DescribeCollection fail, timeout", func(t *testing.T) { + defer wg.Done() resp, err := proxy.DescribeCollection(shortCtx, &milvuspb.DescribeCollectionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("GetCollectionStatistics fail, timeout", func(t *testing.T) { + defer wg.Done() resp, err := proxy.GetCollectionStatistics(shortCtx, &milvuspb.GetCollectionStatisticsRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("ShowCollections fail, timeout", func(t *testing.T) { + defer wg.Done() resp, err := proxy.ShowCollections(shortCtx, &milvuspb.ShowCollectionsRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("CreatePartition fail, timeout", func(t *testing.T) { + defer wg.Done() resp, err := proxy.CreatePartition(shortCtx, &milvuspb.CreatePartitionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) + wg.Add(1) t.Run("DropPartition fail, timeout", func(t *testing.T) { + defer wg.Done() resp, err := proxy.DropPartition(shortCtx, &milvuspb.DropPartitionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) + wg.Add(1) t.Run("HasPartition fail, timeout", func(t *testing.T) { + defer wg.Done() resp, err := proxy.HasPartition(shortCtx, &milvuspb.HasPartitionRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("LoadPartitions fail, timeout", func(t *testing.T) { + defer wg.Done() resp, err := proxy.LoadPartitions(shortCtx, &milvuspb.LoadPartitionsRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) + wg.Add(1) t.Run("ReleasePartitions fail, timeout", func(t *testing.T) { + defer wg.Done() resp, err := proxy.ReleasePartitions(shortCtx, &milvuspb.ReleasePartitionsRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) + wg.Add(1) t.Run("GetPartitionStatistics fail, timeout", func(t *testing.T) { + defer wg.Done() resp, err := proxy.GetPartitionStatistics(shortCtx, &milvuspb.GetPartitionStatisticsRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("ShowPartitions fail, timeout", func(t *testing.T) { + defer wg.Done() resp, err := proxy.ShowPartitions(shortCtx, &milvuspb.ShowPartitionsRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("CreateIndex fail, timeout", func(t *testing.T) { + defer wg.Done() resp, err := proxy.CreateIndex(shortCtx, &milvuspb.CreateIndexRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) + wg.Add(1) t.Run("DescribeIndex fail, timeout", func(t *testing.T) { + defer wg.Done() resp, err := proxy.DescribeIndex(shortCtx, &milvuspb.DescribeIndexRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("DropIndex fail, timeout", func(t *testing.T) { + defer wg.Done() resp, err := proxy.DropIndex(shortCtx, &milvuspb.DropIndexRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) + wg.Add(1) t.Run("GetIndexBuildProgress fail, timeout", func(t *testing.T) { + defer wg.Done() resp, err := proxy.GetIndexBuildProgress(shortCtx, &milvuspb.GetIndexBuildProgressRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("GetIndexState fail, timeout", func(t *testing.T) { + defer wg.Done() resp, err := proxy.GetIndexState(shortCtx, &milvuspb.GetIndexStateRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("Flush fail, timeout", func(t *testing.T) { + defer wg.Done() _, err := proxy.Flush(shortCtx, &milvuspb.FlushRequest{}) assert.NoError(t, err) // FIXME(dragondriver) // assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("Insert fail, timeout", func(t *testing.T) { + defer wg.Done() resp, err := proxy.Insert(shortCtx, &milvuspb.InsertRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("Delete fail, timeout", func(t *testing.T) { + defer wg.Done() resp, err := proxy.Delete(shortCtx, &milvuspb.DeleteRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("Search fail, timeout", func(t *testing.T) { + defer wg.Done() resp, err := proxy.Search(shortCtx, &milvuspb.SearchRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Add(1) t.Run("Query fail, dq queue full", func(t *testing.T) { + defer wg.Done() resp, err := proxy.Query(shortCtx, &milvuspb.QueryRequest{}) assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + wg.Wait() cancel() }