diff --git a/scripts/run_intergration_test.sh b/scripts/run_intergration_test.sh index 4600035ef2..98810b9880 100755 --- a/scripts/run_intergration_test.sh +++ b/scripts/run_intergration_test.sh @@ -30,9 +30,10 @@ if [[ $(uname -s) == "Darwin" && "$(uname -m)" == "arm64" ]]; then APPLE_SILICON_FLAG="-tags dynamic" fi -pushd tests/integration -go test -race ${APPLE_SILICON_FLAG} -v -popd +for d in $(go list ./tests/integration/...); do + echo "$d" + go test -race ${APPLE_SILICON_FLAG} -v "$d" +done endTime=`date +%s` diff --git a/tests/integration/bulkinsert_test.go b/tests/integration/bulkinsert/bulkinsert_test.go similarity index 85% rename from tests/integration/bulkinsert_test.go rename to tests/integration/bulkinsert/bulkinsert_test.go index bb57292d96..16a9472800 100644 --- a/tests/integration/bulkinsert_test.go +++ b/tests/integration/bulkinsert/bulkinsert_test.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package integration +package bulkinsert import ( "context" @@ -38,6 +38,7 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/distance" "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/tests/integration" ) const ( @@ -46,7 +47,7 @@ const ( ) type BulkInsertSuite struct { - MiniClusterSuite + integration.MiniClusterSuite } // test bulk insert E2E @@ -67,7 +68,7 @@ func (s *BulkInsertSuite) TestBulkInsert() { //floatVecField := floatVecField dim := 128 - schema := constructSchema(collectionName, dim, true, + schema := integration.ConstructSchema(collectionName, dim, true, &schemapb.FieldSchema{Name: "id", DataType: schemapb.DataType_Int64, IsPrimaryKey: true, AutoID: true}, &schemapb.FieldSchema{Name: "image_path", DataType: schemapb.DataType_VarChar, TypeParams: []*commonpb.KeyValuePair{{Key: common.MaxLengthKey, Value: "65535"}}}, &schemapb.FieldSchema{Name: "embeddings", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "128"}}}, @@ -75,7 +76,7 @@ func (s *BulkInsertSuite) TestBulkInsert() { marshaledSchema, err := proto.Marshal(schema) s.NoError(err) - createCollectionStatus, err := c.proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ + createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ DbName: dbName, CollectionName: collectionName, Schema: marshaledSchema, @@ -89,19 +90,19 @@ func (s *BulkInsertSuite) TestBulkInsert() { s.Equal(createCollectionStatus.GetErrorCode(), commonpb.ErrorCode_Success) log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus)) - showCollectionsResp, err := c.proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{}) + showCollectionsResp, err := c.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{}) s.NoError(err) s.Equal(showCollectionsResp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success) log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp)) - err = GenerateNumpyFile(c.chunkManager.RootPath()+"/"+"embeddings.npy", 100, schemapb.DataType_FloatVector, []*commonpb.KeyValuePair{ + err = GenerateNumpyFile(c.ChunkManager.RootPath()+"/"+"embeddings.npy", 100, schemapb.DataType_FloatVector, []*commonpb.KeyValuePair{ { Key: common.DimKey, Value: strconv.Itoa(Dim), }, }) s.NoError(err) - err = GenerateNumpyFile(c.chunkManager.RootPath()+"/"+"image_path.npy", 100, schemapb.DataType_VarChar, []*commonpb.KeyValuePair{ + err = GenerateNumpyFile(c.ChunkManager.RootPath()+"/"+"image_path.npy", 100, schemapb.DataType_VarChar, []*commonpb.KeyValuePair{ { Key: common.MaxLengthKey, Value: strconv.Itoa(65535), @@ -110,14 +111,14 @@ func (s *BulkInsertSuite) TestBulkInsert() { s.NoError(err) bulkInsertFiles := []string{ - c.chunkManager.RootPath() + "/" + "embeddings.npy", - c.chunkManager.RootPath() + "/" + "image_path.npy", + c.ChunkManager.RootPath() + "/" + "embeddings.npy", + c.ChunkManager.RootPath() + "/" + "image_path.npy", } - health1, err := c.dataCoord.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) + health1, err := c.DataCoord.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) s.NoError(err) log.Info("dataCoord health", zap.Any("health1", health1)) - importResp, err := c.proxy.Import(ctx, &milvuspb.ImportRequest{ + importResp, err := c.Proxy.Import(ctx, &milvuspb.ImportRequest{ CollectionName: collectionName, Files: bulkInsertFiles, }) @@ -128,7 +129,7 @@ func (s *BulkInsertSuite) TestBulkInsert() { for _, task := range tasks { loop: for { - importTaskState, err := c.proxy.GetImportState(ctx, &milvuspb.GetImportStateRequest{ + importTaskState, err := c.Proxy.GetImportState(ctx, &milvuspb.GetImportStateRequest{ Task: task, }) s.NoError(err) @@ -147,11 +148,11 @@ func (s *BulkInsertSuite) TestBulkInsert() { } } - health2, err := c.dataCoord.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) + health2, err := c.DataCoord.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) s.NoError(err) log.Info("dataCoord health", zap.Any("health2", health2)) - segments, err := c.metaWatcher.ShowSegments() + segments, err := c.MetaWatcher.ShowSegments() s.NoError(err) s.NotEmpty(segments) for _, segment := range segments { @@ -159,11 +160,11 @@ func (s *BulkInsertSuite) TestBulkInsert() { } // create index - createIndexStatus, err := c.proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ + createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ CollectionName: collectionName, FieldName: "embeddings", IndexName: "_default", - ExtraParams: constructIndexParam(dim, IndexHNSW, distance.L2), + ExtraParams: integration.ConstructIndexParam(dim, integration.IndexHNSW, distance.L2), }) if createIndexStatus.GetErrorCode() != commonpb.ErrorCode_Success { log.Warn("createIndexStatus fail reason", zap.String("reason", createIndexStatus.GetReason())) @@ -171,10 +172,10 @@ func (s *BulkInsertSuite) TestBulkInsert() { s.NoError(err) s.Equal(commonpb.ErrorCode_Success, createIndexStatus.GetErrorCode()) - waitingForIndexBuilt(ctx, c, s.T(), collectionName, "embeddings") + s.WaitForIndexBuilt(ctx, collectionName, "embeddings") // load - loadStatus, err := c.proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ + loadStatus, err := c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ DbName: dbName, CollectionName: collectionName, }) @@ -183,7 +184,7 @@ func (s *BulkInsertSuite) TestBulkInsert() { log.Warn("loadStatus fail reason", zap.String("reason", loadStatus.GetReason())) } s.Equal(commonpb.ErrorCode_Success, loadStatus.GetErrorCode()) - waitingForLoad(ctx, c, collectionName) + s.WaitForLoad(ctx, collectionName) // search expr := "" //fmt.Sprintf("%s > 0", int64Field) @@ -191,11 +192,11 @@ func (s *BulkInsertSuite) TestBulkInsert() { topk := 10 roundDecimal := -1 - params := getSearchParams(IndexHNSW, distance.L2) - searchReq := constructSearchRequest("", collectionName, expr, + params := integration.GetSearchParams(integration.IndexHNSW, distance.L2) + searchReq := integration.ConstructSearchRequest("", collectionName, expr, "embeddings", schemapb.DataType_FloatVector, nil, distance.L2, params, nq, dim, topk, roundDecimal) - searchResult, err := c.proxy.Search(ctx, searchReq) + searchResult, err := c.Proxy.Search(ctx, searchReq) if searchResult.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { log.Warn("searchResult fail reason", zap.String("reason", searchResult.GetStatus().GetReason())) diff --git a/tests/integration/get_vector_test.go b/tests/integration/getvector/get_vector_test.go similarity index 82% rename from tests/integration/get_vector_test.go rename to tests/integration/getvector/get_vector_test.go index e881991d98..d20fe2fc63 100644 --- a/tests/integration/get_vector_test.go +++ b/tests/integration/getvector/get_vector_test.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package integration +package getvector import ( "context" @@ -32,10 +32,11 @@ import ( "github.com/milvus-io/milvus/pkg/util/distance" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/typeutil" + "github.com/milvus-io/milvus/tests/integration" ) type TestGetVectorSuite struct { - MiniClusterSuite + integration.MiniClusterSuite // test params nq int @@ -47,7 +48,7 @@ type TestGetVectorSuite struct { } func (s *TestGetVectorSuite) run() { - ctx, cancel := context.WithCancel(s.Cluster.ctx) + ctx, cancel := context.WithCancel(s.Cluster.GetContext()) defer cancel() collection := fmt.Sprintf("TestGetVector_%d_%d_%s_%s_%s", @@ -89,11 +90,11 @@ func (s *TestGetVectorSuite) run() { }, IndexParams: nil, } - schema := constructSchema(collection, dim, false, pk, fVec) + schema := integration.ConstructSchema(collection, dim, false, pk, fVec) marshaledSchema, err := proto.Marshal(schema) s.Require().NoError(err) - createCollectionStatus, err := s.Cluster.proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ + createCollectionStatus, err := s.Cluster.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ CollectionName: collection, Schema: marshaledSchema, ShardsNum: 2, @@ -103,19 +104,19 @@ func (s *TestGetVectorSuite) run() { fieldsData := make([]*schemapb.FieldData, 0) if s.pkType == schemapb.DataType_Int64 { - fieldsData = append(fieldsData, newInt64FieldData(pkFieldName, NB)) + fieldsData = append(fieldsData, integration.NewInt64FieldData(pkFieldName, NB)) } else { - fieldsData = append(fieldsData, newStringFieldData(pkFieldName, NB)) + fieldsData = append(fieldsData, integration.NewStringFieldData(pkFieldName, NB)) } var vecFieldData *schemapb.FieldData if s.vecType == schemapb.DataType_FloatVector { - vecFieldData = newFloatVectorFieldData(vecFieldName, NB, dim) + vecFieldData = integration.NewFloatVectorFieldData(vecFieldName, NB, dim) } else { - vecFieldData = newBinaryVectorFieldData(vecFieldName, NB, dim) + vecFieldData = integration.NewBinaryVectorFieldData(vecFieldName, NB, dim) } fieldsData = append(fieldsData, vecFieldData) - hashKeys := generateHashKeys(NB) - _, err = s.Cluster.proxy.Insert(ctx, &milvuspb.InsertRequest{ + hashKeys := integration.GenerateHashKeys(NB) + _, err = s.Cluster.Proxy.Insert(ctx, &milvuspb.InsertRequest{ CollectionName: collection, FieldsData: fieldsData, HashKeys: hashKeys, @@ -125,7 +126,7 @@ func (s *TestGetVectorSuite) run() { s.Require().Equal(createCollectionStatus.GetErrorCode(), commonpb.ErrorCode_Success) // flush - flushResp, err := s.Cluster.proxy.Flush(ctx, &milvuspb.FlushRequest{ + flushResp, err := s.Cluster.Proxy.Flush(ctx, &milvuspb.FlushRequest{ CollectionNames: []string{collection}, }) s.Require().NoError(err) @@ -134,42 +135,42 @@ func (s *TestGetVectorSuite) run() { s.Require().NotEmpty(segmentIDs) s.Require().True(has) - segments, err := s.Cluster.metaWatcher.ShowSegments() + segments, err := s.Cluster.MetaWatcher.ShowSegments() s.Require().NoError(err) s.Require().NotEmpty(segments) - waitingForFlush(ctx, s.Cluster, ids) + s.WaitForFlush(ctx, ids) // create index - _, err = s.Cluster.proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ + _, err = s.Cluster.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ CollectionName: collection, FieldName: vecFieldName, IndexName: "_default", - ExtraParams: constructIndexParam(dim, s.indexType, s.metricType), + ExtraParams: integration.ConstructIndexParam(dim, s.indexType, s.metricType), }) s.Require().NoError(err) s.Require().Equal(createCollectionStatus.GetErrorCode(), commonpb.ErrorCode_Success) - waitingForIndexBuilt(ctx, s.Cluster, s.T(), collection, vecFieldName) + s.WaitForIndexBuilt(ctx, collection, vecFieldName) // load - _, err = s.Cluster.proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ + _, err = s.Cluster.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ CollectionName: collection, }) s.Require().NoError(err) s.Require().Equal(createCollectionStatus.GetErrorCode(), commonpb.ErrorCode_Success) - waitingForLoad(ctx, s.Cluster, collection) + s.WaitForLoad(ctx, collection) // search nq := s.nq topk := s.topK outputFields := []string{vecFieldName} - params := getSearchParams(s.indexType, s.metricType) - searchReq := constructSearchRequest("", collection, "", + params := integration.GetSearchParams(s.indexType, s.metricType) + searchReq := integration.ConstructSearchRequest("", collection, "", vecFieldName, s.vecType, outputFields, s.metricType, params, nq, dim, topk, -1) - searchResp, err := s.Cluster.proxy.Search(ctx, searchReq) + searchResp, err := s.Cluster.Proxy.Search(ctx, searchReq) s.Require().NoError(err) s.Require().Equal(searchResp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success) @@ -238,7 +239,7 @@ func (s *TestGetVectorSuite) run() { } } - status, err := s.Cluster.proxy.DropCollection(ctx, &milvuspb.DropCollectionRequest{ + status, err := s.Cluster.Proxy.DropCollection(ctx, &milvuspb.DropCollectionRequest{ CollectionName: collection, }) s.Require().NoError(err) @@ -248,7 +249,7 @@ func (s *TestGetVectorSuite) run() { func (s *TestGetVectorSuite) TestGetVector_FLAT() { s.nq = 10 s.topK = 10 - s.indexType = IndexFaissIDMap + s.indexType = integration.IndexFaissIDMap s.metricType = distance.L2 s.pkType = schemapb.DataType_Int64 s.vecType = schemapb.DataType_FloatVector @@ -258,7 +259,7 @@ func (s *TestGetVectorSuite) TestGetVector_FLAT() { func (s *TestGetVectorSuite) TestGetVector_IVF_FLAT() { s.nq = 10 s.topK = 10 - s.indexType = IndexFaissIvfFlat + s.indexType = integration.IndexFaissIvfFlat s.metricType = distance.L2 s.pkType = schemapb.DataType_Int64 s.vecType = schemapb.DataType_FloatVector @@ -268,7 +269,7 @@ func (s *TestGetVectorSuite) TestGetVector_IVF_FLAT() { func (s *TestGetVectorSuite) TestGetVector_IVF_PQ() { s.nq = 10 s.topK = 10 - s.indexType = IndexFaissIvfPQ + s.indexType = integration.IndexFaissIvfPQ s.metricType = distance.L2 s.pkType = schemapb.DataType_Int64 s.vecType = schemapb.DataType_FloatVector @@ -278,7 +279,7 @@ func (s *TestGetVectorSuite) TestGetVector_IVF_PQ() { func (s *TestGetVectorSuite) TestGetVector_IVF_SQ8() { s.nq = 10 s.topK = 10 - s.indexType = IndexFaissIvfSQ8 + s.indexType = integration.IndexFaissIvfSQ8 s.metricType = distance.L2 s.pkType = schemapb.DataType_Int64 s.vecType = schemapb.DataType_FloatVector @@ -288,7 +289,7 @@ func (s *TestGetVectorSuite) TestGetVector_IVF_SQ8() { func (s *TestGetVectorSuite) TestGetVector_HNSW() { s.nq = 10 s.topK = 10 - s.indexType = IndexHNSW + s.indexType = integration.IndexHNSW s.metricType = distance.L2 s.pkType = schemapb.DataType_Int64 s.vecType = schemapb.DataType_FloatVector @@ -298,7 +299,7 @@ func (s *TestGetVectorSuite) TestGetVector_HNSW() { func (s *TestGetVectorSuite) TestGetVector_IP() { s.nq = 10 s.topK = 10 - s.indexType = IndexHNSW + s.indexType = integration.IndexHNSW s.metricType = distance.IP s.pkType = schemapb.DataType_Int64 s.vecType = schemapb.DataType_FloatVector @@ -308,7 +309,7 @@ func (s *TestGetVectorSuite) TestGetVector_IP() { func (s *TestGetVectorSuite) TestGetVector_StringPK() { s.nq = 10 s.topK = 10 - s.indexType = IndexHNSW + s.indexType = integration.IndexHNSW s.metricType = distance.L2 s.pkType = schemapb.DataType_VarChar s.vecType = schemapb.DataType_FloatVector @@ -318,7 +319,7 @@ func (s *TestGetVectorSuite) TestGetVector_StringPK() { func (s *TestGetVectorSuite) TestGetVector_BinaryVector() { s.nq = 10 s.topK = 10 - s.indexType = IndexFaissBinIvfFlat + s.indexType = integration.IndexFaissBinIvfFlat s.metricType = distance.JACCARD s.pkType = schemapb.DataType_Int64 s.vecType = schemapb.DataType_BinaryVector @@ -329,7 +330,7 @@ func (s *TestGetVectorSuite) TestGetVector_Big_NQ_TOPK() { s.T().Skip("skip big NQ Top due to timeout") s.nq = 10000 s.topK = 200 - s.indexType = IndexHNSW + s.indexType = integration.IndexHNSW s.metricType = distance.L2 s.pkType = schemapb.DataType_Int64 s.vecType = schemapb.DataType_FloatVector @@ -339,7 +340,7 @@ func (s *TestGetVectorSuite) TestGetVector_Big_NQ_TOPK() { //func (s *TestGetVectorSuite) TestGetVector_DISKANN() { // s.nq = 10 // s.topK = 10 -// s.indexType = IndexDISKANN +// s.indexType = integration.IndexDISKANN // s.metricType = distance.L2 // s.pkType = schemapb.DataType_Int64 // s.vecType = schemapb.DataType_FloatVector diff --git a/tests/integration/hello_milvus_test.go b/tests/integration/hellomilvus/hello_milvus_test.go similarity index 73% rename from tests/integration/hello_milvus_test.go rename to tests/integration/hellomilvus/hello_milvus_test.go index 069332ce64..a4caea6ea4 100644 --- a/tests/integration/hello_milvus_test.go +++ b/tests/integration/hellomilvus/hello_milvus_test.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package integration +package hellomilvus import ( "context" @@ -32,14 +32,15 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/distance" "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/tests/integration" ) type HelloMilvusSuite struct { - MiniClusterSuite + integration.MiniClusterSuite } func (s *HelloMilvusSuite) TestHelloMilvus() { - ctx, cancel := context.WithCancel(s.Cluster.ctx) + ctx, cancel := context.WithCancel(s.Cluster.GetContext()) defer cancel() c := s.Cluster @@ -51,11 +52,11 @@ func (s *HelloMilvusSuite) TestHelloMilvus() { collectionName := "TestHelloMilvus" + funcutil.GenRandomStr() - schema := constructSchema(collectionName, dim, true) + schema := integration.ConstructSchema(collectionName, dim, true) marshaledSchema, err := proto.Marshal(schema) s.NoError(err) - createCollectionStatus, err := c.proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ + createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ DbName: dbName, CollectionName: collectionName, Schema: marshaledSchema, @@ -68,14 +69,14 @@ func (s *HelloMilvusSuite) TestHelloMilvus() { s.Equal(createCollectionStatus.GetErrorCode(), commonpb.ErrorCode_Success) log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus)) - showCollectionsResp, err := c.proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{}) + showCollectionsResp, err := c.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{}) s.NoError(err) s.Equal(showCollectionsResp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success) log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp)) - fVecColumn := newFloatVectorFieldData(floatVecField, rowNum, dim) - hashKeys := generateHashKeys(rowNum) - insertResult, err := c.proxy.Insert(ctx, &milvuspb.InsertRequest{ + fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, rowNum, dim) + hashKeys := integration.GenerateHashKeys(rowNum) + insertResult, err := c.Proxy.Insert(ctx, &milvuspb.InsertRequest{ DbName: dbName, CollectionName: collectionName, FieldsData: []*schemapb.FieldData{fVecColumn}, @@ -86,7 +87,7 @@ func (s *HelloMilvusSuite) TestHelloMilvus() { s.Equal(insertResult.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success) // flush - flushResp, err := c.proxy.Flush(ctx, &milvuspb.FlushRequest{ + flushResp, err := c.Proxy.Flush(ctx, &milvuspb.FlushRequest{ DbName: dbName, CollectionNames: []string{collectionName}, }) @@ -96,20 +97,20 @@ func (s *HelloMilvusSuite) TestHelloMilvus() { s.NotEmpty(segmentIDs) s.True(has) - segments, err := c.metaWatcher.ShowSegments() + segments, err := c.MetaWatcher.ShowSegments() s.NoError(err) s.NotEmpty(segments) for _, segment := range segments { log.Info("ShowSegments result", zap.String("segment", segment.String())) } - waitingForFlush(ctx, c, ids) + s.WaitForFlush(ctx, ids) // create index - createIndexStatus, err := c.proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ + createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ CollectionName: collectionName, - FieldName: floatVecField, + FieldName: integration.FloatVecField, IndexName: "_default", - ExtraParams: constructIndexParam(dim, IndexFaissIvfFlat, distance.L2), + ExtraParams: integration.ConstructIndexParam(dim, integration.IndexFaissIvfFlat, distance.L2), }) if createIndexStatus.GetErrorCode() != commonpb.ErrorCode_Success { log.Warn("createIndexStatus fail reason", zap.String("reason", createIndexStatus.GetReason())) @@ -117,10 +118,10 @@ func (s *HelloMilvusSuite) TestHelloMilvus() { s.NoError(err) s.Equal(commonpb.ErrorCode_Success, createIndexStatus.GetErrorCode()) - waitingForIndexBuilt(ctx, c, s.T(), collectionName, floatVecField) + s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField) // load - loadStatus, err := c.proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ + loadStatus, err := c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ DbName: dbName, CollectionName: collectionName, }) @@ -129,19 +130,19 @@ func (s *HelloMilvusSuite) TestHelloMilvus() { log.Warn("loadStatus fail reason", zap.String("reason", loadStatus.GetReason())) } s.Equal(commonpb.ErrorCode_Success, loadStatus.GetErrorCode()) - waitingForLoad(ctx, c, collectionName) + s.WaitForLoad(ctx, collectionName) // search - expr := fmt.Sprintf("%s > 0", int64Field) + expr := fmt.Sprintf("%s > 0", integration.Int64Field) nq := 10 topk := 10 roundDecimal := -1 - params := getSearchParams(IndexFaissIvfFlat, distance.L2) - searchReq := constructSearchRequest("", collectionName, expr, - floatVecField, schemapb.DataType_FloatVector, nil, distance.L2, params, nq, dim, topk, roundDecimal) + params := integration.GetSearchParams(integration.IndexFaissIvfFlat, distance.L2) + searchReq := integration.ConstructSearchRequest("", collectionName, expr, + integration.FloatVecField, schemapb.DataType_FloatVector, nil, distance.L2, params, nq, dim, topk, roundDecimal) - searchResult, err := c.proxy.Search(ctx, searchReq) + searchResult, err := c.Proxy.Search(ctx, searchReq) if searchResult.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { log.Warn("searchResult fail reason", zap.String("reason", searchResult.GetStatus().GetReason())) diff --git a/tests/integration/get_index_statistics_test.go b/tests/integration/indexstat/get_index_statistics_test.go similarity index 79% rename from tests/integration/get_index_statistics_test.go rename to tests/integration/indexstat/get_index_statistics_test.go index 5085788b65..7c0d871a89 100644 --- a/tests/integration/get_index_statistics_test.go +++ b/tests/integration/indexstat/get_index_statistics_test.go @@ -1,9 +1,11 @@ -package integration +package indexstat import ( "context" + "testing" "github.com/golang/protobuf/proto" + "github.com/stretchr/testify/suite" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/commonpb" @@ -12,10 +14,11 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/distance" "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/tests/integration" ) type GetIndexStatisticsSuite struct { - MiniClusterSuite + integration.MiniClusterSuite } func (s *GetIndexStatisticsSuite) TestGetIndexStatistics() { @@ -29,11 +32,11 @@ func (s *GetIndexStatisticsSuite) TestGetIndexStatistics() { dim := 128 rowNum := 3000 - schema := constructSchema(collectionName, dim, true) + schema := integration.ConstructSchema(collectionName, dim, true) marshaledSchema, err := proto.Marshal(schema) s.NoError(err) - createCollectionStatus, err := c.proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ + createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ DbName: dbName, CollectionName: collectionName, Schema: marshaledSchema, @@ -45,9 +48,9 @@ func (s *GetIndexStatisticsSuite) TestGetIndexStatistics() { } s.Equal(createCollectionStatus.GetErrorCode(), commonpb.ErrorCode_Success) - fVecColumn := newFloatVectorFieldData(floatVecField, rowNum, dim) - hashKeys := generateHashKeys(rowNum) - insertResult, err := c.proxy.Insert(ctx, &milvuspb.InsertRequest{ + fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, rowNum, dim) + hashKeys := integration.GenerateHashKeys(rowNum) + insertResult, err := c.Proxy.Insert(ctx, &milvuspb.InsertRequest{ DbName: dbName, CollectionName: collectionName, FieldsData: []*schemapb.FieldData{fVecColumn}, @@ -58,7 +61,7 @@ func (s *GetIndexStatisticsSuite) TestGetIndexStatistics() { s.Equal(insertResult.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success) // flush - flushResp, err := c.proxy.Flush(ctx, &milvuspb.FlushRequest{ + flushResp, err := c.Proxy.Flush(ctx, &milvuspb.FlushRequest{ DbName: dbName, CollectionNames: []string{collectionName}, }) @@ -67,15 +70,15 @@ func (s *GetIndexStatisticsSuite) TestGetIndexStatistics() { ids := segmentIDs.GetData() s.NotEmpty(segmentIDs) s.Equal(true, has) - waitingForFlush(ctx, c, ids) + s.WaitForFlush(ctx, ids) // create index indexName := "_default" - createIndexStatus, err := c.proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ + createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ CollectionName: collectionName, - FieldName: floatVecField, + FieldName: integration.FloatVecField, IndexName: "_default", - ExtraParams: constructIndexParam(dim, IndexFaissIvfFlat, distance.L2), + ExtraParams: integration.ConstructIndexParam(dim, integration.IndexFaissIvfFlat, distance.L2), }) if createIndexStatus.GetErrorCode() != commonpb.ErrorCode_Success { log.Warn("createIndexStatus fail reason", zap.String("reason", createIndexStatus.GetReason())) @@ -83,9 +86,9 @@ func (s *GetIndexStatisticsSuite) TestGetIndexStatistics() { s.NoError(err) s.Equal(commonpb.ErrorCode_Success, createIndexStatus.GetErrorCode()) - waitingForIndexBuilt(ctx, c, s.T(), collectionName, floatVecField) + s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField) - getIndexStatisticsResponse, err := c.proxy.GetIndexStatistics(ctx, &milvuspb.GetIndexStatisticsRequest{ + getIndexStatisticsResponse, err := c.Proxy.GetIndexStatistics(ctx, &milvuspb.GetIndexStatisticsRequest{ CollectionName: collectionName, IndexName: indexName, }) @@ -132,7 +135,7 @@ func (s *GetIndexStatisticsSuite) TestGetIndexStatistics() { s.NoError(err) - waitingForIndexBuilt(ctx, c, t, collectionName, floatVecField) + waitingForIndexBuilt(ctx, collectionName, integration.FloatVecField) getIndexStatisticsResponse2, err := c.proxy.GetIndexStatistics(ctx, &milvuspb.GetIndexStatisticsRequest{ CollectionName: collectionName, @@ -147,3 +150,7 @@ func (s *GetIndexStatisticsSuite) TestGetIndexStatistics() { log.Info("TestGetIndexStatistics succeed") } + +func TestGetIndexStat(t *testing.T) { + suite.Run(t, new(GetIndexStatisticsSuite)) +} diff --git a/tests/integration/json_expr_test.go b/tests/integration/jsonexpr/json_expr_test.go similarity index 76% rename from tests/integration/json_expr_test.go rename to tests/integration/jsonexpr/json_expr_test.go index d63fb0dede..fb312d7750 100644 --- a/tests/integration/json_expr_test.go +++ b/tests/integration/jsonexpr/json_expr_test.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package integration +package jsonexpr import ( "context" @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/errors" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/distance" + "github.com/milvus-io/milvus/tests/integration" "github.com/stretchr/testify/suite" "github.com/golang/protobuf/proto" @@ -39,7 +40,7 @@ import ( ) type JSONExprSuite struct { - MiniClusterSuite + integration.MiniClusterSuite } func (s *JSONExprSuite) TestJsonEnableDynamicSchema() { @@ -55,7 +56,7 @@ func (s *JSONExprSuite) TestJsonEnableDynamicSchema() { constructCollectionSchema := func() *schemapb.CollectionSchema { pk := &schemapb.FieldSchema{ FieldID: 0, - Name: int64Field, + Name: integration.Int64Field, IsPrimaryKey: true, Description: "", DataType: schemapb.DataType_Int64, @@ -65,7 +66,7 @@ func (s *JSONExprSuite) TestJsonEnableDynamicSchema() { } fVec := &schemapb.FieldSchema{ FieldID: 0, - Name: floatVecField, + Name: integration.FloatVecField, IsPrimaryKey: false, Description: "", DataType: schemapb.DataType_FloatVector, @@ -93,7 +94,7 @@ func (s *JSONExprSuite) TestJsonEnableDynamicSchema() { marshaledSchema, err := proto.Marshal(schema) s.NoError(err) - createCollectionStatus, err := c.proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ + createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ DbName: dbName, CollectionName: collectionName, Schema: marshaledSchema, @@ -106,22 +107,22 @@ func (s *JSONExprSuite) TestJsonEnableDynamicSchema() { s.Equal(createCollectionStatus.GetErrorCode(), commonpb.ErrorCode_Success) log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus)) - showCollectionsResp, err := c.proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{}) + showCollectionsResp, err := c.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{}) s.NoError(err) s.Equal(showCollectionsResp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success) log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp)) - describeCollectionResp, err := c.proxy.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{CollectionName: collectionName}) + describeCollectionResp, err := c.Proxy.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{CollectionName: collectionName}) s.NoError(err) s.True(describeCollectionResp.Schema.EnableDynamicField) s.Equal(2, len(describeCollectionResp.GetSchema().GetFields())) - fVecColumn := newFloatVectorFieldData(floatVecField, rowNum, dim) + fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, rowNum, dim) jsonData := newJSONData(common.MetaFieldName, rowNum) jsonData.IsDynamic = true - s.insertFlushIndexLoad(ctx, c, dbName, collectionName, rowNum, dim, []*schemapb.FieldData{fVecColumn, jsonData}) + s.insertFlushIndexLoad(ctx, dbName, collectionName, rowNum, dim, []*schemapb.FieldData{fVecColumn, jsonData}) - s.checkSearch(c, collectionName, common.MetaFieldName, dim) + s.checkSearch(collectionName, common.MetaFieldName, dim) } func (s *JSONExprSuite) TestJSON_InsertWithoutDynamicData() { @@ -138,7 +139,7 @@ func (s *JSONExprSuite) TestJSON_InsertWithoutDynamicData() { constructCollectionSchema := func() *schemapb.CollectionSchema { pk := &schemapb.FieldSchema{ FieldID: 0, - Name: int64Field, + Name: integration.Int64Field, IsPrimaryKey: true, Description: "", DataType: schemapb.DataType_Int64, @@ -148,7 +149,7 @@ func (s *JSONExprSuite) TestJSON_InsertWithoutDynamicData() { } fVec := &schemapb.FieldSchema{ FieldID: 0, - Name: floatVecField, + Name: integration.FloatVecField, IsPrimaryKey: false, Description: "", DataType: schemapb.DataType_FloatVector, @@ -176,7 +177,7 @@ func (s *JSONExprSuite) TestJSON_InsertWithoutDynamicData() { marshaledSchema, err := proto.Marshal(schema) s.NoError(err) - createCollectionStatus, err := c.proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ + createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ DbName: dbName, CollectionName: collectionName, Schema: marshaledSchema, @@ -189,18 +190,18 @@ func (s *JSONExprSuite) TestJSON_InsertWithoutDynamicData() { s.Equal(createCollectionStatus.GetErrorCode(), commonpb.ErrorCode_Success) log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus)) - showCollectionsResp, err := c.proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{}) + showCollectionsResp, err := c.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{}) s.NoError(err) s.Equal(showCollectionsResp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success) log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp)) - describeCollectionResp, err := c.proxy.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{CollectionName: collectionName}) + describeCollectionResp, err := c.Proxy.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{CollectionName: collectionName}) s.NoError(err) s.True(describeCollectionResp.Schema.EnableDynamicField) s.Equal(2, len(describeCollectionResp.GetSchema().GetFields())) - fVecColumn := newFloatVectorFieldData(floatVecField, rowNum, dim) - s.insertFlushIndexLoad(ctx, c, dbName, collectionName, rowNum, dim, []*schemapb.FieldData{fVecColumn}) + fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, rowNum, dim) + s.insertFlushIndexLoad(ctx, dbName, collectionName, rowNum, dim, []*schemapb.FieldData{fVecColumn}) expr := "" // search @@ -208,7 +209,7 @@ func (s *JSONExprSuite) TestJSON_InsertWithoutDynamicData() { checkFunc := func(result *milvuspb.SearchResults) { s.Equal(0, len(result.Results.FieldsData)) } - s.doSearch(c, collectionName, []string{common.MetaFieldName}, expr, dim, checkFunc) + s.doSearch(collectionName, []string{common.MetaFieldName}, expr, dim, checkFunc) log.Info("GT expression run successfully") } @@ -226,7 +227,7 @@ func (s *JSONExprSuite) TestJSON_DynamicSchemaWithJSON() { constructCollectionSchema := func() *schemapb.CollectionSchema { pk := &schemapb.FieldSchema{ FieldID: 0, - Name: int64Field, + Name: integration.Int64Field, IsPrimaryKey: true, Description: "", DataType: schemapb.DataType_Int64, @@ -236,7 +237,7 @@ func (s *JSONExprSuite) TestJSON_DynamicSchemaWithJSON() { } fVec := &schemapb.FieldSchema{ FieldID: 0, - Name: floatVecField, + Name: integration.FloatVecField, IsPrimaryKey: false, Description: "", DataType: schemapb.DataType_FloatVector, @@ -250,7 +251,7 @@ func (s *JSONExprSuite) TestJSON_DynamicSchemaWithJSON() { AutoID: false, } j := &schemapb.FieldSchema{ - Name: jsonField, + Name: integration.JSONField, Description: "json field", DataType: schemapb.DataType_JSON, } @@ -270,7 +271,7 @@ func (s *JSONExprSuite) TestJSON_DynamicSchemaWithJSON() { marshaledSchema, err := proto.Marshal(schema) s.NoError(err) - createCollectionStatus, err := c.proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ + createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ DbName: dbName, CollectionName: collectionName, Schema: marshaledSchema, @@ -283,91 +284,91 @@ func (s *JSONExprSuite) TestJSON_DynamicSchemaWithJSON() { s.Equal(createCollectionStatus.GetErrorCode(), commonpb.ErrorCode_Success) log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus)) - showCollectionsResp, err := c.proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{}) + showCollectionsResp, err := c.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{}) s.NoError(err) s.Equal(showCollectionsResp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success) log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp)) - describeCollectionResp, err := c.proxy.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{CollectionName: collectionName}) + describeCollectionResp, err := c.Proxy.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{CollectionName: collectionName}) s.NoError(err) s.True(describeCollectionResp.Schema.EnableDynamicField) s.Equal(3, len(describeCollectionResp.GetSchema().GetFields())) - fVecColumn := newFloatVectorFieldData(floatVecField, rowNum, dim) - jsonData := newJSONData(jsonField, rowNum) + fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, rowNum, dim) + jsonData := newJSONData(integration.JSONField, rowNum) dynamicData := newJSONData(common.MetaFieldName, rowNum) dynamicData.IsDynamic = true - s.insertFlushIndexLoad(ctx, c, dbName, collectionName, rowNum, dim, []*schemapb.FieldData{fVecColumn, jsonData, dynamicData}) + s.insertFlushIndexLoad(ctx, dbName, collectionName, rowNum, dim, []*schemapb.FieldData{fVecColumn, jsonData, dynamicData}) - s.checkSearch(c, collectionName, common.MetaFieldName, dim) + s.checkSearch(collectionName, common.MetaFieldName, dim) expr := "" // search expr = `jsonField["A"] < 10` checkFunc := func(result *milvuspb.SearchResults) { s.Equal(1, len(result.Results.FieldsData)) - s.Equal(jsonField, result.Results.FieldsData[0].GetFieldName()) + s.Equal(integration.JSONField, result.Results.FieldsData[0].GetFieldName()) s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType()) s.Equal(5, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData())) } - s.doSearch(c, collectionName, []string{jsonField}, expr, dim, checkFunc) + s.doSearch(collectionName, []string{integration.JSONField}, expr, dim, checkFunc) log.Info("LT expression run successfully") expr = `jsonField["A"] <= 5` checkFunc = func(result *milvuspb.SearchResults) { s.Equal(1, len(result.Results.FieldsData)) - s.Equal(jsonField, result.Results.FieldsData[0].GetFieldName()) + s.Equal(integration.JSONField, result.Results.FieldsData[0].GetFieldName()) s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType()) s.Equal(3, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData())) } - s.doSearch(c, collectionName, []string{jsonField}, expr, dim, checkFunc) + s.doSearch(collectionName, []string{integration.JSONField}, expr, dim, checkFunc) log.Info("LE expression run successfully") expr = `jsonField["A"] == 5` checkFunc = func(result *milvuspb.SearchResults) { s.Equal(1, len(result.Results.FieldsData)) - s.Equal(jsonField, result.Results.FieldsData[0].GetFieldName()) + s.Equal(integration.JSONField, result.Results.FieldsData[0].GetFieldName()) s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType()) s.Equal(1, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData())) } - s.doSearch(c, collectionName, []string{jsonField}, expr, dim, checkFunc) + s.doSearch(collectionName, []string{integration.JSONField}, expr, dim, checkFunc) log.Info("EQ expression run successfully") expr = `jsonField["C"][0] in [90, 91, 95, 97]` checkFunc = func(result *milvuspb.SearchResults) { s.Equal(1, len(result.Results.FieldsData)) - s.Equal(jsonField, result.Results.FieldsData[0].GetFieldName()) + s.Equal(integration.JSONField, result.Results.FieldsData[0].GetFieldName()) s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType()) s.Equal(4, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData())) } - s.doSearch(c, collectionName, []string{jsonField}, expr, dim, checkFunc) + s.doSearch(collectionName, []string{integration.JSONField}, expr, dim, checkFunc) log.Info("IN expression run successfully") expr = `jsonField["C"][0] not in [90, 91, 95, 97]` checkFunc = func(result *milvuspb.SearchResults) { s.Equal(1, len(result.Results.FieldsData)) - s.Equal(jsonField, result.Results.FieldsData[0].GetFieldName()) + s.Equal(integration.JSONField, result.Results.FieldsData[0].GetFieldName()) s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType()) s.Equal(10, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData())) } - s.doSearch(c, collectionName, []string{jsonField}, expr, dim, checkFunc) + s.doSearch(collectionName, []string{integration.JSONField}, expr, dim, checkFunc) log.Info("NIN expression run successfully") expr = `jsonField["E"]["G"] > 100` checkFunc = func(result *milvuspb.SearchResults) { s.Equal(1, len(result.Results.FieldsData)) - s.Equal(jsonField, result.Results.FieldsData[0].GetFieldName()) + s.Equal(integration.JSONField, result.Results.FieldsData[0].GetFieldName()) s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType()) s.Equal(9, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData())) } - s.doSearch(c, collectionName, []string{jsonField}, expr, dim, checkFunc) + s.doSearch(collectionName, []string{integration.JSONField}, expr, dim, checkFunc) log.Info("nested path expression run successfully") expr = `jsonField == ""` - s.doSearchWithInvalidExpr(c, collectionName, []string{jsonField}, expr, dim) + s.doSearchWithInvalidExpr(collectionName, []string{integration.JSONField}, expr, dim) } -func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName string, dim int) { +func (s *JSONExprSuite) checkSearch(collectionName, fieldName string, dim int) { expr := "" // search expr = `$meta["A"] > 90` @@ -377,7 +378,7 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType()) s.Equal(5, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData())) } - s.doSearch(c, collectionName, []string{"A"}, expr, dim, checkFunc) + s.doSearch(collectionName, []string{"A"}, expr, dim, checkFunc) log.Info("GT expression run successfully") expr = `$meta["A"] < 10` @@ -387,7 +388,7 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType()) s.Equal(5, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData())) } - s.doSearch(c, collectionName, []string{"B"}, expr, dim, checkFunc) + s.doSearch(collectionName, []string{"B"}, expr, dim, checkFunc) log.Info("LT expression run successfully") expr = `$meta["A"] <= 5` @@ -397,7 +398,7 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType()) s.Equal(3, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData())) } - s.doSearch(c, collectionName, []string{"C"}, expr, dim, checkFunc) + s.doSearch(collectionName, []string{"C"}, expr, dim, checkFunc) log.Info("LE expression run successfully") expr = `A >= 95` @@ -407,7 +408,7 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType()) s.Equal(3, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData())) } - s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc) + s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc) log.Info("GE expression run successfully") expr = `$meta["A"] == 5` @@ -417,7 +418,7 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType()) s.Equal(1, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData())) } - s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc) + s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc) log.Info("EQ expression run successfully") expr = `A != 95` @@ -427,7 +428,7 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType()) s.Equal(10, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData())) } - s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc) + s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc) log.Info("NE expression run successfully") expr = `not (A != 95)` @@ -437,7 +438,7 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType()) s.Equal(1, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData())) } - s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc) + s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc) log.Info("NOT NE expression run successfully") expr = `A > 90 && B < 5` @@ -447,7 +448,7 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType()) s.Equal(2, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData())) } - s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc) + s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc) log.Info("NE expression run successfully") expr = `A > 95 || 5 > B` @@ -457,7 +458,7 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType()) s.Equal(4, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData())) } - s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc) + s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc) log.Info("NE expression run successfully") expr = `not (A == 95)` @@ -467,7 +468,7 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType()) s.Equal(10, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData())) } - s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc) + s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc) log.Info("NOT expression run successfully") expr = `A in [90, 91, 95, 97]` @@ -477,7 +478,7 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType()) s.Equal(3, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData())) } - s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc) + s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc) log.Info("IN expression run successfully") expr = `A not in [90, 91, 95, 97]` @@ -487,7 +488,7 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType()) s.Equal(10, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData())) } - s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc) + s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc) log.Info("NIN expression run successfully") expr = `C[0] in [90, 91, 95, 97]` @@ -497,7 +498,7 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType()) s.Equal(4, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData())) } - s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc) + s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc) log.Info("IN expression run successfully") expr = `C[0] not in [90, 91, 95, 97]` @@ -507,7 +508,7 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType()) s.Equal(10, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData())) } - s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc) + s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc) log.Info("NIN expression run successfully") expr = `0 <= A < 5` @@ -517,7 +518,7 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType()) s.Equal(2, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData())) } - s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc) + s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc) log.Info("BinaryRange expression run successfully") expr = `100 > A >= 90` @@ -527,7 +528,7 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType()) s.Equal(5, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData())) } - s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc) + s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc) log.Info("BinaryRange expression run successfully") expr = `1+5 <= A < 5+10` @@ -537,7 +538,7 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType()) s.Equal(4, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData())) } - s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc) + s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc) log.Info("BinaryRange expression run successfully") expr = `A + 5 == 10` @@ -547,7 +548,7 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType()) s.Equal(1, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData())) } - s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc) + s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc) log.Info("Arithmetic expression run successfully") expr = `exists A` @@ -557,14 +558,14 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType()) s.Equal(10, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData())) } - s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc) + s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc) log.Info("EXISTS expression run successfully") expr = `exists AAA` checkFunc = func(result *milvuspb.SearchResults) { s.Equal(0, len(result.Results.FieldsData)) } - s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc) + s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc) log.Info("EXISTS expression run successfully") expr = `not exists A` @@ -574,7 +575,7 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType()) s.Equal(10, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData())) } - s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc) + s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc) log.Info("NOT EXISTS expression run successfully") expr = `E["G"] > 100` @@ -584,7 +585,7 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType()) s.Equal(9, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData())) } - s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc) + s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc) log.Info("nested path expression run successfully") expr = `D like "name-%"` @@ -594,7 +595,7 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType()) s.Equal(10, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData())) } - s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc) + s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc) log.Info("like expression run successfully") expr = `D like "name-11"` @@ -604,21 +605,21 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType()) s.Equal(1, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData())) } - s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc) + s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc) log.Info("like expression run successfully") expr = `A like "10"` checkFunc = func(result *milvuspb.SearchResults) { s.Equal(0, len(result.Results.FieldsData)) } - s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc) + s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc) log.Info("like expression run successfully") expr = `A in []` checkFunc = func(result *milvuspb.SearchResults) { s.Equal(0, len(result.Results.FieldsData)) } - s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc) + s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc) log.Info("term empty expression run successfully") expr = `A not in []` @@ -628,47 +629,47 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType()) s.Equal(10, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData())) } - s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc) + s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc) log.Info("term empty expression run successfully") // invalid expr expr = `E[F] > 100` - s.doSearchWithInvalidExpr(c, collectionName, []string{fieldName}, expr, dim) + s.doSearchWithInvalidExpr(collectionName, []string{fieldName}, expr, dim) expr = `A >> 10` - s.doSearchWithInvalidExpr(c, collectionName, []string{fieldName}, expr, dim) + s.doSearchWithInvalidExpr(collectionName, []string{fieldName}, expr, dim) expr = `not A > 5` - s.doSearchWithInvalidExpr(c, collectionName, []string{fieldName}, expr, dim) + s.doSearchWithInvalidExpr(collectionName, []string{fieldName}, expr, dim) expr = `not A == 5` - s.doSearchWithInvalidExpr(c, collectionName, []string{fieldName}, expr, dim) + s.doSearchWithInvalidExpr(collectionName, []string{fieldName}, expr, dim) expr = `A > B` - s.doSearchWithInvalidExpr(c, collectionName, []string{fieldName}, expr, dim) + s.doSearchWithInvalidExpr(collectionName, []string{fieldName}, expr, dim) expr = `A > Int64Field` - s.doSearchWithInvalidExpr(c, collectionName, []string{fieldName}, expr, dim) + s.doSearchWithInvalidExpr(collectionName, []string{fieldName}, expr, dim) expr = `A like abc` - s.doSearchWithInvalidExpr(c, collectionName, []string{fieldName}, expr, dim) + s.doSearchWithInvalidExpr(collectionName, []string{fieldName}, expr, dim) expr = `D like "%name-%"` - s.doSearchWithInvalidExpr(c, collectionName, []string{fieldName}, expr, dim) + s.doSearchWithInvalidExpr(collectionName, []string{fieldName}, expr, dim) expr = `D like "na%me"` - s.doSearchWithInvalidExpr(c, collectionName, []string{fieldName}, expr, dim) + s.doSearchWithInvalidExpr(collectionName, []string{fieldName}, expr, dim) expr = `1+5 <= A+1 < 5+10` - s.doSearchWithInvalidExpr(c, collectionName, []string{fieldName}, expr, dim) + s.doSearchWithInvalidExpr(collectionName, []string{fieldName}, expr, dim) expr = `$meta == ""` - s.doSearchWithInvalidExpr(c, collectionName, []string{fieldName}, expr, dim) + s.doSearchWithInvalidExpr(collectionName, []string{fieldName}, expr, dim) } -func (s *JSONExprSuite) insertFlushIndexLoad(ctx context.Context, c *MiniCluster, dbName, collectionName string, rowNum int, dim int, fieldData []*schemapb.FieldData) { - hashKeys := generateHashKeys(rowNum) - insertResult, err := c.proxy.Insert(ctx, &milvuspb.InsertRequest{ +func (s *JSONExprSuite) insertFlushIndexLoad(ctx context.Context, dbName, collectionName string, rowNum int, dim int, fieldData []*schemapb.FieldData) { + hashKeys := integration.GenerateHashKeys(rowNum) + insertResult, err := s.Cluster.Proxy.Insert(ctx, &milvuspb.InsertRequest{ DbName: dbName, CollectionName: collectionName, FieldsData: fieldData, @@ -679,7 +680,7 @@ func (s *JSONExprSuite) insertFlushIndexLoad(ctx context.Context, c *MiniCluster s.Equal(insertResult.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success) // flush - flushResp, err := c.proxy.Flush(ctx, &milvuspb.FlushRequest{ + flushResp, err := s.Cluster.Proxy.Flush(ctx, &milvuspb.FlushRequest{ DbName: dbName, CollectionNames: []string{collectionName}, }) @@ -688,7 +689,7 @@ func (s *JSONExprSuite) insertFlushIndexLoad(ctx context.Context, c *MiniCluster ids := segmentIDs.GetData() s.NotEmpty(segmentIDs) - segments, err := c.metaWatcher.ShowSegments() + segments, err := s.Cluster.MetaWatcher.ShowSegments() s.NoError(err) s.NotEmpty(segments) for _, segment := range segments { @@ -697,7 +698,7 @@ func (s *JSONExprSuite) insertFlushIndexLoad(ctx context.Context, c *MiniCluster if has && len(ids) > 0 { flushed := func() bool { - resp, err := c.proxy.GetFlushState(ctx, &milvuspb.GetFlushStateRequest{ + resp, err := s.Cluster.Proxy.GetFlushState(ctx, &milvuspb.GetFlushStateRequest{ SegmentIDs: ids, }) if err != nil { @@ -718,9 +719,9 @@ func (s *JSONExprSuite) insertFlushIndexLoad(ctx context.Context, c *MiniCluster } // create index - createIndexStatus, err := c.proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ + createIndexStatus, err := s.Cluster.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ CollectionName: collectionName, - FieldName: floatVecField, + FieldName: integration.FloatVecField, IndexName: "_default", ExtraParams: []*commonpb.KeyValuePair{ { @@ -746,10 +747,10 @@ func (s *JSONExprSuite) insertFlushIndexLoad(ctx context.Context, c *MiniCluster } s.NoError(err) s.Equal(commonpb.ErrorCode_Success, createIndexStatus.GetErrorCode()) - waitingForIndexBuilt(ctx, c, s.T(), collectionName, floatVecField) + s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField) // load - loadStatus, err := c.proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ + loadStatus, err := s.Cluster.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ DbName: dbName, CollectionName: collectionName, }) @@ -759,7 +760,7 @@ func (s *JSONExprSuite) insertFlushIndexLoad(ctx context.Context, c *MiniCluster } s.Equal(commonpb.ErrorCode_Success, loadStatus.GetErrorCode()) for { - loadProgress, err := c.proxy.GetLoadingProgress(ctx, &milvuspb.GetLoadingProgressRequest{ + loadProgress, err := s.Cluster.Proxy.GetLoadingProgress(ctx, &milvuspb.GetLoadingProgressRequest{ CollectionName: collectionName, }) if err != nil { @@ -772,16 +773,16 @@ func (s *JSONExprSuite) insertFlushIndexLoad(ctx context.Context, c *MiniCluster } } -func (s *JSONExprSuite) doSearch(cluster *MiniCluster, collectionName string, outputField []string, expr string, dim int, checkFunc func(results *milvuspb.SearchResults)) { +func (s *JSONExprSuite) doSearch(collectionName string, outputField []string, expr string, dim int, checkFunc func(results *milvuspb.SearchResults)) { nq := 1 topk := 10 roundDecimal := -1 - params := getSearchParams(IndexFaissIvfFlat, distance.L2) - searchReq := constructSearchRequest("", collectionName, expr, - floatVecField, schemapb.DataType_FloatVector, outputField, distance.L2, params, nq, dim, topk, roundDecimal) + params := integration.GetSearchParams(integration.IndexFaissIvfFlat, distance.L2) + searchReq := integration.ConstructSearchRequest("", collectionName, expr, + integration.FloatVecField, schemapb.DataType_FloatVector, outputField, distance.L2, params, nq, dim, topk, roundDecimal) - searchResult, err := cluster.proxy.Search(context.Background(), searchReq) + searchResult, err := s.Cluster.Proxy.Search(context.Background(), searchReq) if searchResult.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { log.Warn("searchResult fail reason", zap.String("reason", searchResult.GetStatus().GetReason())) @@ -845,16 +846,16 @@ func newJSONData(fieldName string, rowNum int) *schemapb.FieldData { } } -func (s *JSONExprSuite) doSearchWithInvalidExpr(cluster *MiniCluster, collectionName string, outputField []string, expr string, dim int) { +func (s *JSONExprSuite) doSearchWithInvalidExpr(collectionName string, outputField []string, expr string, dim int) { nq := 1 topk := 10 roundDecimal := -1 - params := getSearchParams(IndexFaissIvfFlat, distance.L2) - searchReq := constructSearchRequest("", collectionName, expr, - floatVecField, schemapb.DataType_FloatVector, outputField, distance.L2, params, nq, dim, topk, roundDecimal) + params := integration.GetSearchParams(integration.IndexFaissIvfFlat, distance.L2) + searchReq := integration.ConstructSearchRequest("", collectionName, expr, + integration.FloatVecField, schemapb.DataType_FloatVector, outputField, distance.L2, params, nq, dim, topk, roundDecimal) - searchResult, err := cluster.proxy.Search(context.Background(), searchReq) + searchResult, err := s.Cluster.Proxy.Search(context.Background(), searchReq) if searchResult.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { log.Warn("searchResult fail reason", zap.String("reason", searchResult.GetStatus().GetReason())) diff --git a/tests/integration/meta_watcher_test.go b/tests/integration/meta_watcher_test.go index 44e62e7842..067bf5a310 100644 --- a/tests/integration/meta_watcher_test.go +++ b/tests/integration/meta_watcher_test.go @@ -41,7 +41,7 @@ type MetaWatcherSuite struct { } func (s *MetaWatcherSuite) TestShowSessions() { - sessions, err := s.Cluster.metaWatcher.ShowSessions() + sessions, err := s.Cluster.MetaWatcher.ShowSessions() s.NoError(err) s.NotEmpty(sessions) for _, session := range sessions { @@ -102,7 +102,7 @@ func (s *MetaWatcherSuite) TestShowSegments() { marshaledSchema, err := proto.Marshal(schema) s.NoError(err) - createCollectionStatus, err := c.proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ + createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ DbName: dbName, CollectionName: collectionName, Schema: marshaledSchema, @@ -112,14 +112,14 @@ func (s *MetaWatcherSuite) TestShowSegments() { s.Equal(createCollectionStatus.GetErrorCode(), commonpb.ErrorCode_Success) log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus)) - showCollectionsResp, err := c.proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{}) + showCollectionsResp, err := c.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{}) s.NoError(err) s.Equal(showCollectionsResp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success) log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp)) - fVecColumn := newFloatVectorFieldData(floatVecField, rowNum, dim) - hashKeys := generateHashKeys(rowNum) - insertResult, err := c.proxy.Insert(ctx, &milvuspb.InsertRequest{ + fVecColumn := NewFloatVectorFieldData(floatVecField, rowNum, dim) + hashKeys := GenerateHashKeys(rowNum) + insertResult, err := c.Proxy.Insert(ctx, &milvuspb.InsertRequest{ DbName: dbName, CollectionName: collectionName, FieldsData: []*schemapb.FieldData{fVecColumn}, @@ -129,7 +129,7 @@ func (s *MetaWatcherSuite) TestShowSegments() { s.NoError(err) s.Equal(insertResult.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success) - segments, err := c.metaWatcher.ShowSegments() + segments, err := c.MetaWatcher.ShowSegments() s.NoError(err) s.NotEmpty(segments) for _, segment := range segments { @@ -190,7 +190,7 @@ func (s *MetaWatcherSuite) TestShowReplicas() { marshaledSchema, err := proto.Marshal(schema) s.NoError(err) - createCollectionStatus, err := c.proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ + createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ DbName: dbName, CollectionName: collectionName, Schema: marshaledSchema, @@ -203,14 +203,14 @@ func (s *MetaWatcherSuite) TestShowReplicas() { s.Equal(createCollectionStatus.GetErrorCode(), commonpb.ErrorCode_Success) log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus)) - showCollectionsResp, err := c.proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{}) + showCollectionsResp, err := c.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{}) s.NoError(err) s.Equal(showCollectionsResp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success) log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp)) - fVecColumn := newFloatVectorFieldData(floatVecField, rowNum, dim) - hashKeys := generateHashKeys(rowNum) - insertResult, err := c.proxy.Insert(ctx, &milvuspb.InsertRequest{ + fVecColumn := NewFloatVectorFieldData(floatVecField, rowNum, dim) + hashKeys := GenerateHashKeys(rowNum) + insertResult, err := c.Proxy.Insert(ctx, &milvuspb.InsertRequest{ DbName: dbName, CollectionName: collectionName, FieldsData: []*schemapb.FieldData{fVecColumn}, @@ -221,7 +221,7 @@ func (s *MetaWatcherSuite) TestShowReplicas() { s.Equal(insertResult.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success) // flush - flushResp, err := c.proxy.Flush(ctx, &milvuspb.FlushRequest{ + flushResp, err := c.Proxy.Flush(ctx, &milvuspb.FlushRequest{ DbName: dbName, CollectionNames: []string{collectionName}, }) @@ -230,7 +230,7 @@ func (s *MetaWatcherSuite) TestShowReplicas() { ids := segmentIDs.GetData() s.NotEmpty(segmentIDs) - segments, err := c.metaWatcher.ShowSegments() + segments, err := c.MetaWatcher.ShowSegments() s.NoError(err) s.NotEmpty(segments) for _, segment := range segments { @@ -239,7 +239,7 @@ func (s *MetaWatcherSuite) TestShowReplicas() { if has && len(ids) > 0 { flushed := func() bool { - resp, err := c.proxy.GetFlushState(ctx, &milvuspb.GetFlushStateRequest{ + resp, err := c.Proxy.GetFlushState(ctx, &milvuspb.GetFlushStateRequest{ SegmentIDs: ids, }) if err != nil { @@ -260,7 +260,7 @@ func (s *MetaWatcherSuite) TestShowReplicas() { } // create index - createIndexStatus, err := c.proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ + createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ CollectionName: collectionName, FieldName: floatVecField, IndexName: "_default", @@ -292,7 +292,7 @@ func (s *MetaWatcherSuite) TestShowReplicas() { waitingForIndexBuilt(ctx, c, s.T(), collectionName, floatVecField) // load - loadStatus, err := c.proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ + loadStatus, err := c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ DbName: dbName, CollectionName: collectionName, }) @@ -302,7 +302,7 @@ func (s *MetaWatcherSuite) TestShowReplicas() { } s.Equal(commonpb.ErrorCode_Success, loadStatus.GetErrorCode()) for { - loadProgress, err := c.proxy.GetLoadingProgress(ctx, &milvuspb.GetLoadingProgressRequest{ + loadProgress, err := c.Proxy.GetLoadingProgress(ctx, &milvuspb.GetLoadingProgressRequest{ CollectionName: collectionName, }) if err != nil { @@ -314,7 +314,7 @@ func (s *MetaWatcherSuite) TestShowReplicas() { time.Sleep(500 * time.Millisecond) } - replicas, err := c.metaWatcher.ShowReplicas() + replicas, err := c.MetaWatcher.ShowReplicas() s.NoError(err) s.NotEmpty(replicas) for _, replica := range replicas { diff --git a/tests/integration/minicluster.go b/tests/integration/minicluster.go index 9059add2f8..69819f9947 100644 --- a/tests/integration/minicluster.go +++ b/tests/integration/minicluster.go @@ -101,20 +101,20 @@ type MiniCluster struct { clusterConfig ClusterConfig factory dependency.Factory - chunkManager storage.ChunkManager + ChunkManager storage.ChunkManager - etcdCli *clientv3.Client + EtcdCli *clientv3.Client - proxy types.ProxyComponent - dataCoord types.DataCoordComponent - rootCoord types.RootCoordComponent + Proxy types.ProxyComponent + DataCoord types.DataCoordComponent + RootCoord types.RootCoordComponent + QueryCoord types.QueryCoordComponent - queryCoord types.QueryCoordComponent - queryNodes []types.QueryNodeComponent - dataNodes []types.DataNodeComponent - indexNodes []types.IndexNodeComponent + QueryNodes []types.QueryNodeComponent + DataNodes []types.DataNodeComponent + IndexNodes []types.IndexNodeComponent - metaWatcher MetaWatcher + MetaWatcher MetaWatcher } var params *paramtable.ComponentParam = paramtable.Get() @@ -145,10 +145,10 @@ func StartMiniCluster(ctx context.Context, opts ...Option) (cluster *MiniCluster if err != nil { return nil, err } - cluster.chunkManager = chunkManager + cluster.ChunkManager = chunkManager } - if cluster.etcdCli == nil { + if cluster.EtcdCli == nil { var etcdCli *clientv3.Client etcdCli, err = etcd.GetEtcdClient( params.EtcdCfg.UseEmbedEtcd.GetAsBool(), @@ -161,39 +161,39 @@ func StartMiniCluster(ctx context.Context, opts ...Option) (cluster *MiniCluster if err != nil { return nil, err } - cluster.etcdCli = etcdCli + cluster.EtcdCli = etcdCli } - cluster.metaWatcher = &EtcdMetaWatcher{ + cluster.MetaWatcher = &EtcdMetaWatcher{ rootPath: cluster.params[EtcdRootPath], - etcdCli: cluster.etcdCli, + etcdCli: cluster.EtcdCli, } - if cluster.rootCoord == nil { + if cluster.RootCoord == nil { var rootCoord types.RootCoordComponent rootCoord, err = cluster.CreateDefaultRootCoord() if err != nil { return nil, err } - cluster.rootCoord = rootCoord + cluster.RootCoord = rootCoord } - if cluster.dataCoord == nil { + if cluster.DataCoord == nil { var dataCoord types.DataCoordComponent dataCoord, err = cluster.CreateDefaultDataCoord() if err != nil { return nil, err } - cluster.dataCoord = dataCoord + cluster.DataCoord = dataCoord } - if cluster.queryCoord == nil { + if cluster.QueryCoord == nil { var queryCoord types.QueryCoordComponent queryCoord, err = cluster.CreateDefaultQueryCoord() if err != nil { return nil, err } - cluster.queryCoord = queryCoord + cluster.QueryCoord = queryCoord } //if cluster.indexCoord == nil { @@ -205,7 +205,7 @@ func StartMiniCluster(ctx context.Context, opts ...Option) (cluster *MiniCluster // cluster.indexCoord = indexCoord //} - if cluster.dataNodes == nil { + if cluster.DataNodes == nil { dataNodes := make([]types.DataNodeComponent, 0) for i := 0; i < cluster.clusterConfig.DataNodeNum; i++ { var dataNode types.DataNodeComponent @@ -215,10 +215,10 @@ func StartMiniCluster(ctx context.Context, opts ...Option) (cluster *MiniCluster } dataNodes = append(dataNodes, dataNode) } - cluster.dataNodes = dataNodes + cluster.DataNodes = dataNodes } - if cluster.queryNodes == nil { + if cluster.QueryNodes == nil { queryNodes := make([]types.QueryNodeComponent, 0) for i := 0; i < cluster.clusterConfig.QueryNodeNum; i++ { var queryNode types.QueryNodeComponent @@ -228,10 +228,10 @@ func StartMiniCluster(ctx context.Context, opts ...Option) (cluster *MiniCluster } queryNodes = append(queryNodes, queryNode) } - cluster.queryNodes = queryNodes + cluster.QueryNodes = queryNodes } - if cluster.indexNodes == nil { + if cluster.IndexNodes == nil { indexNodes := make([]types.IndexNodeComponent, 0) for i := 0; i < cluster.clusterConfig.IndexNodeNum; i++ { var indexNode types.IndexNodeComponent @@ -241,22 +241,22 @@ func StartMiniCluster(ctx context.Context, opts ...Option) (cluster *MiniCluster } indexNodes = append(indexNodes, indexNode) } - cluster.indexNodes = indexNodes + cluster.IndexNodes = indexNodes } - if cluster.proxy == nil { + if cluster.Proxy == nil { var proxy types.ProxyComponent proxy, err = cluster.CreateDefaultProxy() if err != nil { return } - cluster.proxy = proxy + cluster.Proxy = proxy } //cluster.dataCoord.SetIndexCoord(cluster.indexCoord) - cluster.dataCoord.SetRootCoord(cluster.rootCoord) + cluster.DataCoord.SetRootCoord(cluster.RootCoord) - err = cluster.rootCoord.SetDataCoord(cluster.dataCoord) + err = cluster.RootCoord.SetDataCoord(cluster.DataCoord) if err != nil { return } @@ -264,7 +264,7 @@ func StartMiniCluster(ctx context.Context, opts ...Option) (cluster *MiniCluster //if err != nil { // return //} - err = cluster.rootCoord.SetQueryCoord(cluster.queryCoord) + err = cluster.RootCoord.SetQueryCoord(cluster.QueryCoord) if err != nil { return } @@ -273,11 +273,11 @@ func StartMiniCluster(ctx context.Context, opts ...Option) (cluster *MiniCluster if err != nil { return } - err = cluster.queryCoord.SetDataCoord(cluster.dataCoord) + err = cluster.QueryCoord.SetDataCoord(cluster.DataCoord) if err != nil { return } - err = cluster.queryCoord.SetRootCoord(cluster.rootCoord) + err = cluster.QueryCoord.SetRootCoord(cluster.RootCoord) if err != nil { return } @@ -291,21 +291,21 @@ func StartMiniCluster(ctx context.Context, opts ...Option) (cluster *MiniCluster // return //} - for _, dataNode := range cluster.dataNodes { - err = dataNode.SetDataCoord(cluster.dataCoord) + for _, dataNode := range cluster.DataNodes { + err = dataNode.SetDataCoord(cluster.DataCoord) if err != nil { return } - err = dataNode.SetRootCoord(cluster.rootCoord) + err = dataNode.SetRootCoord(cluster.RootCoord) if err != nil { return } } - cluster.proxy.SetDataCoordClient(cluster.dataCoord) + cluster.Proxy.SetDataCoordClient(cluster.DataCoord) //cluster.proxy.SetIndexCoordClient(cluster.indexCoord) - cluster.proxy.SetQueryCoordClient(cluster.queryCoord) - cluster.proxy.SetRootCoordClient(cluster.rootCoord) + cluster.Proxy.SetQueryCoordClient(cluster.QueryCoord) + cluster.Proxy.SetRootCoordClient(cluster.RootCoord) return cluster, nil } @@ -316,41 +316,41 @@ func (cluster *MiniCluster) GetContext() context.Context { func (cluster *MiniCluster) Start() error { log.Info("mini cluster start") - err := cluster.rootCoord.Init() + err := cluster.RootCoord.Init() if err != nil { return err } - err = cluster.rootCoord.Start() + err = cluster.RootCoord.Start() if err != nil { return err } - err = cluster.rootCoord.Register() + err = cluster.RootCoord.Register() if err != nil { return err } - err = cluster.dataCoord.Init() + err = cluster.DataCoord.Init() if err != nil { return err } - err = cluster.dataCoord.Start() + err = cluster.DataCoord.Start() if err != nil { return err } - err = cluster.dataCoord.Register() + err = cluster.DataCoord.Register() if err != nil { return err } - err = cluster.queryCoord.Init() + err = cluster.QueryCoord.Init() if err != nil { return err } - err = cluster.queryCoord.Start() + err = cluster.QueryCoord.Start() if err != nil { return err } - err = cluster.queryCoord.Register() + err = cluster.QueryCoord.Register() if err != nil { return err } @@ -368,7 +368,7 @@ func (cluster *MiniCluster) Start() error { // return err //} - for _, dataNode := range cluster.dataNodes { + for _, dataNode := range cluster.DataNodes { err = dataNode.Init() if err != nil { return err @@ -383,7 +383,7 @@ func (cluster *MiniCluster) Start() error { } } - for _, queryNode := range cluster.queryNodes { + for _, queryNode := range cluster.QueryNodes { err = queryNode.Init() if err != nil { return err @@ -398,7 +398,7 @@ func (cluster *MiniCluster) Start() error { } } - for _, indexNode := range cluster.indexNodes { + for _, indexNode := range cluster.IndexNodes { err = indexNode.Init() if err != nil { return err @@ -413,15 +413,15 @@ func (cluster *MiniCluster) Start() error { } } - err = cluster.proxy.Init() + err = cluster.Proxy.Init() if err != nil { return err } - err = cluster.proxy.Start() + err = cluster.Proxy.Start() if err != nil { return err } - err = cluster.proxy.Register() + err = cluster.Proxy.Register() if err != nil { return err } @@ -431,43 +431,43 @@ func (cluster *MiniCluster) Start() error { func (cluster *MiniCluster) Stop() error { log.Info("mini cluster stop") - cluster.rootCoord.Stop() + cluster.RootCoord.Stop() log.Info("mini cluster rootCoord stopped") - cluster.dataCoord.Stop() + cluster.DataCoord.Stop() log.Info("mini cluster dataCoord stopped") //cluster.indexCoord.Stop() - cluster.queryCoord.Stop() + cluster.QueryCoord.Stop() log.Info("mini cluster queryCoord stopped") - cluster.proxy.Stop() + cluster.Proxy.Stop() log.Info("mini cluster proxy stopped") - for _, dataNode := range cluster.dataNodes { + for _, dataNode := range cluster.DataNodes { dataNode.Stop() } log.Info("mini cluster datanodes stopped") - for _, queryNode := range cluster.queryNodes { + for _, queryNode := range cluster.QueryNodes { queryNode.Stop() } log.Info("mini cluster querynodes stopped") - for _, indexNode := range cluster.indexNodes { + for _, indexNode := range cluster.IndexNodes { indexNode.Stop() } log.Info("mini cluster indexnodes stopped") - cluster.etcdCli.KV.Delete(cluster.ctx, params.EtcdCfg.RootPath.GetValue(), clientv3.WithPrefix()) - defer cluster.etcdCli.Close() + cluster.EtcdCli.KV.Delete(cluster.ctx, params.EtcdCfg.RootPath.GetValue(), clientv3.WithPrefix()) + defer cluster.EtcdCli.Close() - if cluster.chunkManager == nil { + if cluster.ChunkManager == nil { chunkManager, err := cluster.factory.NewPersistentStorageChunkManager(cluster.ctx) if err != nil { log.Warn("fail to create chunk manager to clean test data", zap.Error(err)) } else { - cluster.chunkManager = chunkManager + cluster.ChunkManager = chunkManager } } - cluster.chunkManager.RemoveWithPrefix(cluster.ctx, cluster.chunkManager.RootPath()) + cluster.ChunkManager.RemoveWithPrefix(cluster.ctx, cluster.ChunkManager.RootPath()) return nil } @@ -506,7 +506,7 @@ func WithClusterSize(clusterConfig ClusterConfig) Option { func WithEtcdClient(etcdCli *clientv3.Client) Option { return func(cluster *MiniCluster) { - cluster.etcdCli = etcdCli + cluster.EtcdCli = etcdCli } } @@ -518,19 +518,19 @@ func WithFactory(factory dependency.Factory) Option { func WithRootCoord(rootCoord types.RootCoordComponent) Option { return func(cluster *MiniCluster) { - cluster.rootCoord = rootCoord + cluster.RootCoord = rootCoord } } func WithDataCoord(dataCoord types.DataCoordComponent) Option { return func(cluster *MiniCluster) { - cluster.dataCoord = dataCoord + cluster.DataCoord = dataCoord } } func WithQueryCoord(queryCoord types.QueryCoordComponent) Option { return func(cluster *MiniCluster) { - cluster.queryCoord = queryCoord + cluster.QueryCoord = queryCoord } } @@ -542,25 +542,25 @@ func WithQueryCoord(queryCoord types.QueryCoordComponent) Option { func WithDataNodes(datanodes []types.DataNodeComponent) Option { return func(cluster *MiniCluster) { - cluster.dataNodes = datanodes + cluster.DataNodes = datanodes } } func WithQueryNodes(queryNodes []types.QueryNodeComponent) Option { return func(cluster *MiniCluster) { - cluster.queryNodes = queryNodes + cluster.QueryNodes = queryNodes } } func WithIndexNodes(indexNodes []types.IndexNodeComponent) Option { return func(cluster *MiniCluster) { - cluster.indexNodes = indexNodes + cluster.IndexNodes = indexNodes } } func WithProxy(proxy types.ProxyComponent) Option { return func(cluster *MiniCluster) { - cluster.proxy = proxy + cluster.Proxy = proxy } } @@ -572,7 +572,7 @@ func (cluster *MiniCluster) CreateDefaultRootCoord() (types.RootCoordComponent, port := funcutil.GetAvailablePort() rootCoord.SetAddress(funcutil.GetLocalIP() + ":" + fmt.Sprint(port)) rootCoord.SetProxyCreator(cluster.GetProxy) - rootCoord.SetEtcdClient(cluster.etcdCli) + rootCoord.SetEtcdClient(cluster.EtcdCli) return rootCoord, nil } @@ -582,7 +582,7 @@ func (cluster *MiniCluster) CreateDefaultDataCoord() (types.DataCoordComponent, dataCoord.SetAddress(funcutil.GetLocalIP() + ":" + fmt.Sprint(port)) dataCoord.SetDataNodeCreator(cluster.GetDataNode) dataCoord.SetIndexNodeCreator(cluster.GetIndexNode) - dataCoord.SetEtcdClient(cluster.etcdCli) + dataCoord.SetEtcdClient(cluster.EtcdCli) return dataCoord, nil } @@ -594,7 +594,7 @@ func (cluster *MiniCluster) CreateDefaultQueryCoord() (types.QueryCoordComponent port := funcutil.GetAvailablePort() queryCoord.SetAddress(funcutil.GetLocalIP() + ":" + fmt.Sprint(port)) queryCoord.SetQueryNodeCreator(cluster.GetQueryNode) - queryCoord.SetEtcdClient(cluster.etcdCli) + queryCoord.SetEtcdClient(cluster.EtcdCli) return queryCoord, nil } @@ -613,7 +613,7 @@ func (cluster *MiniCluster) CreateDefaultQueryCoord() (types.QueryCoordComponent func (cluster *MiniCluster) CreateDefaultDataNode() (types.DataNodeComponent, error) { log.Debug("mini cluster CreateDefaultDataNode") dataNode := datanode.NewDataNode(cluster.ctx, cluster.factory) - dataNode.SetEtcdClient(cluster.etcdCli) + dataNode.SetEtcdClient(cluster.EtcdCli) port := funcutil.GetAvailablePort() dataNode.SetAddress(funcutil.GetLocalIP() + ":" + fmt.Sprint(port)) return dataNode, nil @@ -622,7 +622,7 @@ func (cluster *MiniCluster) CreateDefaultDataNode() (types.DataNodeComponent, er func (cluster *MiniCluster) CreateDefaultQueryNode() (types.QueryNodeComponent, error) { log.Debug("mini cluster CreateDefaultQueryNode") queryNode := querynodev2.NewQueryNode(cluster.ctx, cluster.factory) - queryNode.SetEtcdClient(cluster.etcdCli) + queryNode.SetEtcdClient(cluster.EtcdCli) port := funcutil.GetAvailablePort() queryNode.SetAddress(funcutil.GetLocalIP() + ":" + fmt.Sprint(port)) return queryNode, nil @@ -631,7 +631,7 @@ func (cluster *MiniCluster) CreateDefaultQueryNode() (types.QueryNodeComponent, func (cluster *MiniCluster) CreateDefaultIndexNode() (types.IndexNodeComponent, error) { log.Debug("mini cluster CreateDefaultIndexNode") indexNode := indexnode.NewIndexNode(cluster.ctx, cluster.factory) - indexNode.SetEtcdClient(cluster.etcdCli) + indexNode.SetEtcdClient(cluster.EtcdCli) port := funcutil.GetAvailablePort() indexNode.SetAddress(funcutil.GetLocalIP() + ":" + fmt.Sprint(port)) return indexNode, nil @@ -640,7 +640,7 @@ func (cluster *MiniCluster) CreateDefaultIndexNode() (types.IndexNodeComponent, func (cluster *MiniCluster) CreateDefaultProxy() (types.ProxyComponent, error) { log.Debug("mini cluster CreateDefaultProxy") proxy, err := proxy2.NewProxy(cluster.ctx, cluster.factory) - proxy.SetEtcdClient(cluster.etcdCli) + proxy.SetEtcdClient(cluster.EtcdCli) if err != nil { return nil, err } @@ -657,7 +657,7 @@ func (cluster *MiniCluster) AddRootCoord(rootCoord types.RootCoordComponent) err cluster.mu.Lock() defer cluster.mu.Unlock() var err error - if cluster.rootCoord != nil { + if cluster.RootCoord != nil { return errors.New("rootCoord already exist, maybe you need to remove it first") } if rootCoord == nil { @@ -668,14 +668,14 @@ func (cluster *MiniCluster) AddRootCoord(rootCoord types.RootCoordComponent) err } // link - rootCoord.SetDataCoord(cluster.dataCoord) - rootCoord.SetQueryCoord(cluster.queryCoord) + rootCoord.SetDataCoord(cluster.DataCoord) + rootCoord.SetQueryCoord(cluster.QueryCoord) //rootCoord.SetIndexCoord(cluster.indexCoord) - cluster.dataCoord.SetRootCoord(rootCoord) - cluster.queryCoord.SetRootCoord(rootCoord) + cluster.DataCoord.SetRootCoord(rootCoord) + cluster.QueryCoord.SetRootCoord(rootCoord) //cluster.indexCoord.SetRootCoord(rootCoord) - cluster.proxy.SetRootCoordClient(rootCoord) - for _, dataNode := range cluster.dataNodes { + cluster.Proxy.SetRootCoordClient(rootCoord) + for _, dataNode := range cluster.DataNodes { err = dataNode.SetRootCoord(rootCoord) if err != nil { return err @@ -696,7 +696,7 @@ func (cluster *MiniCluster) AddRootCoord(rootCoord types.RootCoordComponent) err return err } - cluster.rootCoord = rootCoord + cluster.RootCoord = rootCoord log.Debug("mini cluster AddRootCoord succeed") return nil } @@ -707,13 +707,13 @@ func (cluster *MiniCluster) RemoveRootCoord(rootCoord types.RootCoordComponent) cluster.mu.Lock() defer cluster.mu.Unlock() - if cluster.rootCoord == nil { + if cluster.RootCoord == nil { log.Info("mini cluster has no rootCoord, no need to remove") return nil } - cluster.rootCoord.Stop() - cluster.rootCoord = nil + cluster.RootCoord.Stop() + cluster.RootCoord = nil log.Debug("mini cluster RemoveRootCoord succeed") return nil } @@ -725,7 +725,7 @@ func (cluster *MiniCluster) AddDataCoord(dataCoord types.DataCoordComponent) err cluster.mu.Lock() defer cluster.mu.Unlock() var err error - if cluster.dataCoord != nil { + if cluster.DataCoord != nil { return errors.New("dataCoord already exist, maybe you need to remove it first") } if dataCoord == nil { @@ -737,12 +737,12 @@ func (cluster *MiniCluster) AddDataCoord(dataCoord types.DataCoordComponent) err // link //dataCoord.SetIndexCoord(cluster.indexCoord) - dataCoord.SetRootCoord(cluster.rootCoord) - err = cluster.rootCoord.SetDataCoord(cluster.dataCoord) + dataCoord.SetRootCoord(cluster.RootCoord) + err = cluster.RootCoord.SetDataCoord(cluster.DataCoord) if err != nil { return err } - err = cluster.queryCoord.SetDataCoord(cluster.dataCoord) + err = cluster.QueryCoord.SetDataCoord(cluster.DataCoord) if err != nil { return err } @@ -750,8 +750,8 @@ func (cluster *MiniCluster) AddDataCoord(dataCoord types.DataCoordComponent) err //if err != nil { // return err //} - cluster.proxy.SetDataCoordClient(dataCoord) - for _, dataNode := range cluster.dataNodes { + cluster.Proxy.SetDataCoordClient(dataCoord) + for _, dataNode := range cluster.DataNodes { err = dataNode.SetDataCoord(dataCoord) if err != nil { return err @@ -772,7 +772,7 @@ func (cluster *MiniCluster) AddDataCoord(dataCoord types.DataCoordComponent) err return err } - cluster.dataCoord = dataCoord + cluster.DataCoord = dataCoord log.Debug("mini cluster AddDataCoord succeed") return nil } @@ -783,13 +783,13 @@ func (cluster *MiniCluster) RemoveDataCoord(dataCoord types.DataCoordComponent) cluster.mu.Lock() defer cluster.mu.Unlock() - if cluster.dataCoord == nil { + if cluster.DataCoord == nil { log.Info("mini cluster has no dataCoord, no need to remove") return nil } - cluster.dataCoord.Stop() - cluster.dataCoord = nil + cluster.DataCoord.Stop() + cluster.DataCoord = nil log.Debug("mini cluster RemoveDataCoord succeed") return nil } @@ -801,7 +801,7 @@ func (cluster *MiniCluster) AddQueryCoord(queryCoord types.QueryCoordComponent) cluster.mu.Lock() defer cluster.mu.Unlock() var err error - if cluster.queryCoord != nil { + if cluster.QueryCoord != nil { return errors.New("queryCoord already exist, maybe you need to remove it first") } if queryCoord == nil { @@ -812,11 +812,11 @@ func (cluster *MiniCluster) AddQueryCoord(queryCoord types.QueryCoordComponent) } // link - queryCoord.SetRootCoord(cluster.rootCoord) - queryCoord.SetDataCoord(cluster.dataCoord) + queryCoord.SetRootCoord(cluster.RootCoord) + queryCoord.SetDataCoord(cluster.DataCoord) //queryCoord.SetIndexCoord(cluster.indexCoord) - cluster.rootCoord.SetQueryCoord(queryCoord) - cluster.proxy.SetQueryCoordClient(queryCoord) + cluster.RootCoord.SetQueryCoord(queryCoord) + cluster.Proxy.SetQueryCoordClient(queryCoord) // start err = queryCoord.Init() @@ -832,7 +832,7 @@ func (cluster *MiniCluster) AddQueryCoord(queryCoord types.QueryCoordComponent) return err } - cluster.queryCoord = queryCoord + cluster.QueryCoord = queryCoord log.Debug("mini cluster AddQueryCoord succeed") return nil } @@ -843,13 +843,13 @@ func (cluster *MiniCluster) RemoveQueryCoord(queryCoord types.QueryCoordComponen cluster.mu.Lock() defer cluster.mu.Unlock() - if cluster.queryCoord == nil { + if cluster.QueryCoord == nil { log.Info("mini cluster has no queryCoord, no need to remove") return nil } - cluster.queryCoord.Stop() - cluster.queryCoord = nil + cluster.QueryCoord.Stop() + cluster.QueryCoord = nil log.Debug("mini cluster RemoveQueryCoord succeed") return nil } @@ -928,11 +928,11 @@ func (cluster *MiniCluster) AddDataNode(dataNode types.DataNodeComponent) error return err } } - err = dataNode.SetDataCoord(cluster.dataCoord) + err = dataNode.SetDataCoord(cluster.DataCoord) if err != nil { return err } - err = dataNode.SetRootCoord(cluster.rootCoord) + err = dataNode.SetRootCoord(cluster.RootCoord) if err != nil { return err } @@ -948,7 +948,7 @@ func (cluster *MiniCluster) AddDataNode(dataNode types.DataNodeComponent) error if err != nil { return err } - cluster.dataNodes = append(cluster.dataNodes, dataNode) + cluster.DataNodes = append(cluster.DataNodes, dataNode) cluster.clusterConfig.DataNodeNum = cluster.clusterConfig.DataNodeNum + 1 log.Debug("mini cluster AddDataNode succeed") return nil @@ -962,9 +962,9 @@ func (cluster *MiniCluster) RemoveDataNode(dataNode types.DataNodeComponent) err if dataNode == nil { // choose a node randomly - if len(cluster.dataNodes) > 0 { - randIndex := rand.Intn(len(cluster.dataNodes)) - dataNode = cluster.dataNodes[randIndex] + if len(cluster.DataNodes) > 0 { + randIndex := rand.Intn(len(cluster.DataNodes)) + dataNode = cluster.DataNodes[randIndex] } else { log.Debug("mini cluster has no dataNodes") return nil @@ -977,13 +977,13 @@ func (cluster *MiniCluster) RemoveDataNode(dataNode types.DataNodeComponent) err } newDataNodes := make([]types.DataNodeComponent, 0) - for _, dn := range cluster.dataNodes { + for _, dn := range cluster.DataNodes { if dn == dataNode { continue } newDataNodes = append(newDataNodes, dn) } - cluster.dataNodes = newDataNodes + cluster.DataNodes = newDataNodes cluster.clusterConfig.DataNodeNum = cluster.clusterConfig.DataNodeNum - 1 log.Debug("mini cluster RemoveDataNode succeed") return nil @@ -1014,7 +1014,7 @@ func (cluster *MiniCluster) AddQueryNode(queryNode types.QueryNodeComponent) err if err != nil { return err } - cluster.queryNodes = append(cluster.queryNodes, queryNode) + cluster.QueryNodes = append(cluster.QueryNodes, queryNode) cluster.clusterConfig.QueryNodeNum = cluster.clusterConfig.QueryNodeNum + 1 log.Debug("mini cluster AddQueryNode succeed") return nil @@ -1028,9 +1028,9 @@ func (cluster *MiniCluster) RemoveQueryNode(queryNode types.QueryNodeComponent) if queryNode == nil { // choose a node randomly - if len(cluster.queryNodes) > 0 { - randIndex := rand.Intn(len(cluster.queryNodes)) - queryNode = cluster.queryNodes[randIndex] + if len(cluster.QueryNodes) > 0 { + randIndex := rand.Intn(len(cluster.QueryNodes)) + queryNode = cluster.QueryNodes[randIndex] } else { log.Debug("mini cluster has no queryNodes") return nil @@ -1043,13 +1043,13 @@ func (cluster *MiniCluster) RemoveQueryNode(queryNode types.QueryNodeComponent) } newQueryNodes := make([]types.QueryNodeComponent, 0) - for _, qn := range cluster.queryNodes { + for _, qn := range cluster.QueryNodes { if qn == queryNode { continue } newQueryNodes = append(newQueryNodes, qn) } - cluster.queryNodes = newQueryNodes + cluster.QueryNodes = newQueryNodes cluster.clusterConfig.QueryNodeNum = cluster.clusterConfig.QueryNodeNum - 1 log.Debug("mini cluster RemoveQueryNode succeed") return nil @@ -1080,7 +1080,7 @@ func (cluster *MiniCluster) AddIndexNode(indexNode types.IndexNodeComponent) err if err != nil { return err } - cluster.indexNodes = append(cluster.indexNodes, indexNode) + cluster.IndexNodes = append(cluster.IndexNodes, indexNode) cluster.clusterConfig.IndexNodeNum = cluster.clusterConfig.IndexNodeNum + 1 log.Debug("mini cluster AddIndexNode succeed") return nil @@ -1094,9 +1094,9 @@ func (cluster *MiniCluster) RemoveIndexNode(indexNode types.IndexNodeComponent) if indexNode == nil { // choose a node randomly - if len(cluster.indexNodes) > 0 { - randIndex := rand.Intn(len(cluster.indexNodes)) - indexNode = cluster.indexNodes[randIndex] + if len(cluster.IndexNodes) > 0 { + randIndex := rand.Intn(len(cluster.IndexNodes)) + indexNode = cluster.IndexNodes[randIndex] } else { log.Debug("mini cluster has no queryNodes") return nil @@ -1109,13 +1109,13 @@ func (cluster *MiniCluster) RemoveIndexNode(indexNode types.IndexNodeComponent) } newIndexNodes := make([]types.IndexNodeComponent, 0) - for _, in := range cluster.indexNodes { + for _, in := range cluster.IndexNodes { if in == indexNode { continue } newIndexNodes = append(newIndexNodes, in) } - cluster.indexNodes = newIndexNodes + cluster.IndexNodes = newIndexNodes cluster.clusterConfig.IndexNodeNum = cluster.clusterConfig.IndexNodeNum - 1 log.Debug("mini cluster RemoveIndexNode succeed") return nil @@ -1129,46 +1129,46 @@ func (cluster *MiniCluster) UpdateClusterSize(clusterConfig ClusterConfig) error // todo concurrent concerns //cluster.mu.Lock() //defer cluster.mu.Unlock() - if clusterConfig.DataNodeNum > len(cluster.dataNodes) { - needAdd := clusterConfig.DataNodeNum - len(cluster.dataNodes) + if clusterConfig.DataNodeNum > len(cluster.DataNodes) { + needAdd := clusterConfig.DataNodeNum - len(cluster.DataNodes) for i := 0; i < needAdd; i++ { cluster.AddDataNode(nil) } - } else if clusterConfig.DataNodeNum < len(cluster.dataNodes) { - needRemove := len(cluster.dataNodes) - clusterConfig.DataNodeNum + } else if clusterConfig.DataNodeNum < len(cluster.DataNodes) { + needRemove := len(cluster.DataNodes) - clusterConfig.DataNodeNum for i := 0; i < needRemove; i++ { cluster.RemoveDataNode(nil) } } - if clusterConfig.QueryNodeNum > len(cluster.queryNodes) { - needAdd := clusterConfig.QueryNodeNum - len(cluster.queryNodes) + if clusterConfig.QueryNodeNum > len(cluster.QueryNodes) { + needAdd := clusterConfig.QueryNodeNum - len(cluster.QueryNodes) for i := 0; i < needAdd; i++ { cluster.AddQueryNode(nil) } - } else if clusterConfig.QueryNodeNum < len(cluster.queryNodes) { - needRemove := len(cluster.queryNodes) - clusterConfig.QueryNodeNum + } else if clusterConfig.QueryNodeNum < len(cluster.QueryNodes) { + needRemove := len(cluster.QueryNodes) - clusterConfig.QueryNodeNum for i := 0; i < needRemove; i++ { cluster.RemoveQueryNode(nil) } } - if clusterConfig.IndexNodeNum > len(cluster.indexNodes) { - needAdd := clusterConfig.IndexNodeNum - len(cluster.indexNodes) + if clusterConfig.IndexNodeNum > len(cluster.IndexNodes) { + needAdd := clusterConfig.IndexNodeNum - len(cluster.IndexNodes) for i := 0; i < needAdd; i++ { cluster.AddIndexNode(nil) } - } else if clusterConfig.IndexNodeNum < len(cluster.indexNodes) { - needRemove := len(cluster.indexNodes) - clusterConfig.IndexNodeNum + } else if clusterConfig.IndexNodeNum < len(cluster.IndexNodes) { + needRemove := len(cluster.IndexNodes) - clusterConfig.IndexNodeNum for i := 0; i < needRemove; i++ { cluster.RemoveIndexNode(nil) } } // validate - if clusterConfig.DataNodeNum != len(cluster.dataNodes) || - clusterConfig.QueryNodeNum != len(cluster.queryNodes) || - clusterConfig.IndexNodeNum != len(cluster.indexNodes) { + if clusterConfig.DataNodeNum != len(cluster.DataNodes) || + clusterConfig.QueryNodeNum != len(cluster.QueryNodes) || + clusterConfig.IndexNodeNum != len(cluster.IndexNodes) { return errors.New("Fail to update cluster size to target size") } @@ -1177,14 +1177,14 @@ func (cluster *MiniCluster) UpdateClusterSize(clusterConfig ClusterConfig) error } func (cluster *MiniCluster) GetProxy(ctx context.Context, addr string) (types.Proxy, error) { - if cluster.proxy.GetAddress() == addr { - return cluster.proxy, nil + if cluster.Proxy.GetAddress() == addr { + return cluster.Proxy, nil } return nil, nil } func (cluster *MiniCluster) GetQueryNode(ctx context.Context, addr string) (types.QueryNode, error) { - for _, queryNode := range cluster.queryNodes { + for _, queryNode := range cluster.QueryNodes { if queryNode.GetAddress() == addr { return queryNode, nil } @@ -1193,7 +1193,7 @@ func (cluster *MiniCluster) GetQueryNode(ctx context.Context, addr string) (type } func (cluster *MiniCluster) GetDataNode(ctx context.Context, addr string) (types.DataNode, error) { - for _, dataNode := range cluster.dataNodes { + for _, dataNode := range cluster.DataNodes { if dataNode.GetAddress() == addr { return dataNode, nil } @@ -1202,7 +1202,7 @@ func (cluster *MiniCluster) GetDataNode(ctx context.Context, addr string) (types } func (cluster *MiniCluster) GetIndexNode(ctx context.Context, addr string) (types.IndexNode, error) { - for _, indexNode := range cluster.indexNodes { + for _, indexNode := range cluster.IndexNodes { if indexNode.GetAddress() == addr { return indexNode, nil } @@ -1211,5 +1211,5 @@ func (cluster *MiniCluster) GetIndexNode(ctx context.Context, addr string) (type } func (cluster *MiniCluster) GetMetaWatcher() MetaWatcher { - return cluster.metaWatcher + return cluster.MetaWatcher } diff --git a/tests/integration/minicluster_test.go b/tests/integration/minicluster_test.go index 3ec04850ca..3cd31ecfd6 100644 --- a/tests/integration/minicluster_test.go +++ b/tests/integration/minicluster_test.go @@ -41,33 +41,33 @@ func (s *MiniClusterMethodsSuite) TestRemoveDataNode() { defer cancel() datanode := datanode.NewDataNode(ctx, c.factory) - datanode.SetEtcdClient(c.etcdCli) + datanode.SetEtcdClient(c.EtcdCli) //datanode := c.CreateDefaultDataNode() err := c.AddDataNode(datanode) s.NoError(err) s.Equal(2, c.clusterConfig.DataNodeNum) - s.Equal(2, len(c.dataNodes)) + s.Equal(2, len(c.DataNodes)) err = c.RemoveDataNode(datanode) s.NoError(err) s.Equal(1, c.clusterConfig.DataNodeNum) - s.Equal(1, len(c.dataNodes)) + s.Equal(1, len(c.DataNodes)) // add default node and remove randomly err = c.AddDataNode(nil) s.NoError(err) s.Equal(2, c.clusterConfig.DataNodeNum) - s.Equal(2, len(c.dataNodes)) + s.Equal(2, len(c.DataNodes)) err = c.RemoveDataNode(nil) s.NoError(err) s.Equal(1, c.clusterConfig.DataNodeNum) - s.Equal(1, len(c.dataNodes)) + s.Equal(1, len(c.DataNodes)) } func (s *MiniClusterMethodsSuite) TestRemoveQueryNode() { @@ -76,33 +76,33 @@ func (s *MiniClusterMethodsSuite) TestRemoveQueryNode() { defer cancel() queryNode := querynodev2.NewQueryNode(ctx, c.factory) - queryNode.SetEtcdClient(c.etcdCli) + queryNode.SetEtcdClient(c.EtcdCli) //queryNode := c.CreateDefaultQueryNode() err := c.AddQueryNode(queryNode) s.NoError(err) s.Equal(2, c.clusterConfig.QueryNodeNum) - s.Equal(2, len(c.queryNodes)) + s.Equal(2, len(c.QueryNodes)) err = c.RemoveQueryNode(queryNode) s.NoError(err) s.Equal(1, c.clusterConfig.QueryNodeNum) - s.Equal(1, len(c.queryNodes)) + s.Equal(1, len(c.QueryNodes)) // add default node and remove randomly err = c.AddQueryNode(nil) s.NoError(err) s.Equal(2, c.clusterConfig.QueryNodeNum) - s.Equal(2, len(c.queryNodes)) + s.Equal(2, len(c.QueryNodes)) err = c.RemoveQueryNode(nil) s.NoError(err) s.Equal(1, c.clusterConfig.QueryNodeNum) - s.Equal(1, len(c.queryNodes)) + s.Equal(1, len(c.QueryNodes)) } @@ -112,33 +112,33 @@ func (s *MiniClusterMethodsSuite) TestRemoveIndexNode() { defer cancel() indexNode := indexnode.NewIndexNode(ctx, c.factory) - indexNode.SetEtcdClient(c.etcdCli) + indexNode.SetEtcdClient(c.EtcdCli) //indexNode := c.CreateDefaultIndexNode() err := c.AddIndexNode(indexNode) s.NoError(err) s.Equal(2, c.clusterConfig.IndexNodeNum) - s.Equal(2, len(c.indexNodes)) + s.Equal(2, len(c.IndexNodes)) err = c.RemoveIndexNode(indexNode) s.NoError(err) s.Equal(1, c.clusterConfig.IndexNodeNum) - s.Equal(1, len(c.indexNodes)) + s.Equal(1, len(c.IndexNodes)) // add default node and remove randomly err = c.AddIndexNode(nil) s.NoError(err) s.Equal(2, c.clusterConfig.IndexNodeNum) - s.Equal(2, len(c.indexNodes)) + s.Equal(2, len(c.IndexNodes)) err = c.RemoveIndexNode(nil) s.NoError(err) s.Equal(1, c.clusterConfig.IndexNodeNum) - s.Equal(1, len(c.indexNodes)) + s.Equal(1, len(c.IndexNodes)) } @@ -164,9 +164,9 @@ func (s *MiniClusterMethodsSuite) TestUpdateClusterSize() { s.Equal(2, c.clusterConfig.QueryNodeNum) s.Equal(2, c.clusterConfig.IndexNodeNum) - s.Equal(2, len(c.dataNodes)) - s.Equal(2, len(c.queryNodes)) - s.Equal(2, len(c.indexNodes)) + s.Equal(2, len(c.DataNodes)) + s.Equal(2, len(c.QueryNodes)) + s.Equal(2, len(c.IndexNodes)) err = c.UpdateClusterSize(ClusterConfig{ DataNodeNum: 3, @@ -179,9 +179,9 @@ func (s *MiniClusterMethodsSuite) TestUpdateClusterSize() { s.Equal(2, c.clusterConfig.QueryNodeNum) s.Equal(1, c.clusterConfig.IndexNodeNum) - s.Equal(3, len(c.dataNodes)) - s.Equal(2, len(c.queryNodes)) - s.Equal(1, len(c.indexNodes)) + s.Equal(3, len(c.DataNodes)) + s.Equal(2, len(c.QueryNodes)) + s.Equal(1, len(c.IndexNodes)) } func TestMiniCluster(t *testing.T) { diff --git a/tests/integration/range_search_test.go b/tests/integration/rangesearch/range_search_test.go similarity index 66% rename from tests/integration/range_search_test.go rename to tests/integration/rangesearch/range_search_test.go index 13cc962f01..6d0b9e12dc 100644 --- a/tests/integration/range_search_test.go +++ b/tests/integration/rangesearch/range_search_test.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package integration +package rangesearch import ( "context" @@ -32,10 +32,11 @@ import ( "github.com/milvus-io/milvus/pkg/util/distance" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/tests/integration" ) type RangeSearchSuite struct { - MiniClusterSuite + integration.MiniClusterSuite } func (s *RangeSearchSuite) TestRangeSearchIP() { @@ -49,11 +50,11 @@ func (s *RangeSearchSuite) TestRangeSearchIP() { dim := 128 rowNum := 3000 - schema := constructSchema(collectionName, dim, true) + schema := integration.ConstructSchema(collectionName, dim, true) marshaledSchema, err := proto.Marshal(schema) s.NoError(err) - createCollectionStatus, err := c.proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ + createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ DbName: dbName, CollectionName: collectionName, Schema: marshaledSchema, @@ -67,14 +68,14 @@ func (s *RangeSearchSuite) TestRangeSearchIP() { } log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus)) - showCollectionsResp, err := c.proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{}) + showCollectionsResp, err := c.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{}) s.NoError(err) s.True(merr.Ok(showCollectionsResp.GetStatus())) log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp)) - fVecColumn := newFloatVectorFieldData(floatVecField, rowNum, dim) - hashKeys := generateHashKeys(rowNum) - insertResult, err := c.proxy.Insert(ctx, &milvuspb.InsertRequest{ + fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, rowNum, dim) + hashKeys := integration.GenerateHashKeys(rowNum) + insertResult, err := c.Proxy.Insert(ctx, &milvuspb.InsertRequest{ DbName: dbName, CollectionName: collectionName, FieldsData: []*schemapb.FieldData{fVecColumn}, @@ -85,7 +86,7 @@ func (s *RangeSearchSuite) TestRangeSearchIP() { s.True(merr.Ok(insertResult.GetStatus())) // flush - flushResp, err := c.proxy.Flush(ctx, &milvuspb.FlushRequest{ + flushResp, err := c.Proxy.Flush(ctx, &milvuspb.FlushRequest{ DbName: dbName, CollectionNames: []string{collectionName}, }) @@ -95,30 +96,30 @@ func (s *RangeSearchSuite) TestRangeSearchIP() { ids := segmentIDs.GetData() s.NotEmpty(segmentIDs) - segments, err := c.metaWatcher.ShowSegments() + segments, err := c.MetaWatcher.ShowSegments() s.NoError(err) s.NotEmpty(segments) for _, segment := range segments { log.Info("ShowSegments result", zap.String("segment", segment.String())) } - waitingForFlush(ctx, c, ids) + s.WaitForFlush(ctx, ids) // create index - createIndexStatus, err := c.proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ + createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ CollectionName: collectionName, - FieldName: floatVecField, + FieldName: integration.FloatVecField, IndexName: "_default", - ExtraParams: constructIndexParam(dim, IndexFaissIvfFlat, distance.IP), + ExtraParams: integration.ConstructIndexParam(dim, integration.IndexFaissIvfFlat, distance.IP), }) s.NoError(err) err = merr.Error(createIndexStatus) if err != nil { log.Warn("createIndexStatus fail reason", zap.Error(err)) } - waitingForIndexBuilt(ctx, c, s.T(), collectionName, floatVecField) + s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField) // load - loadStatus, err := c.proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ + loadStatus, err := c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ DbName: dbName, CollectionName: collectionName, }) @@ -127,23 +128,23 @@ func (s *RangeSearchSuite) TestRangeSearchIP() { if err != nil { log.Warn("LoadCollection fail reason", zap.Error(err)) } - waitingForLoad(ctx, c, collectionName) + s.WaitForLoad(ctx, collectionName) // search - expr := fmt.Sprintf("%s > 0", int64Field) + expr := fmt.Sprintf("%s > 0", integration.Int64Field) nq := 10 topk := 10 roundDecimal := -1 radius := 10 filter := 20 - params := getSearchParams(IndexFaissIvfFlat, distance.IP) + params := integration.GetSearchParams(integration.IndexFaissIvfFlat, distance.IP) // only pass in radius when range search params["radius"] = radius - searchReq := constructSearchRequest("", collectionName, expr, - floatVecField, schemapb.DataType_FloatVector, nil, distance.IP, params, nq, dim, topk, roundDecimal) + searchReq := integration.ConstructSearchRequest("", collectionName, expr, + integration.FloatVecField, schemapb.DataType_FloatVector, nil, distance.IP, params, nq, dim, topk, roundDecimal) - searchResult, _ := c.proxy.Search(ctx, searchReq) + searchResult, _ := c.Proxy.Search(ctx, searchReq) err = merr.Error(searchResult.GetStatus()) if err != nil { @@ -153,10 +154,10 @@ func (s *RangeSearchSuite) TestRangeSearchIP() { // pass in radius and range_filter when range search params["range_filter"] = filter - searchReq = constructSearchRequest("", collectionName, expr, - floatVecField, schemapb.DataType_FloatVector, nil, distance.IP, params, nq, dim, topk, roundDecimal) + searchReq = integration.ConstructSearchRequest("", collectionName, expr, + integration.FloatVecField, schemapb.DataType_FloatVector, nil, distance.IP, params, nq, dim, topk, roundDecimal) - searchResult, _ = c.proxy.Search(ctx, searchReq) + searchResult, _ = c.Proxy.Search(ctx, searchReq) err = merr.Error(searchResult.GetStatus()) if err != nil { @@ -167,10 +168,10 @@ func (s *RangeSearchSuite) TestRangeSearchIP() { // pass in illegal radius and range_filter when range search params["radius"] = filter params["range_filter"] = radius - searchReq = constructSearchRequest("", collectionName, expr, - floatVecField, schemapb.DataType_FloatVector, nil, distance.IP, params, nq, dim, topk, roundDecimal) + searchReq = integration.ConstructSearchRequest("", collectionName, expr, + integration.FloatVecField, schemapb.DataType_FloatVector, nil, distance.IP, params, nq, dim, topk, roundDecimal) - searchResult, _ = c.proxy.Search(ctx, searchReq) + searchResult, _ = c.Proxy.Search(ctx, searchReq) err = merr.Error(searchResult.GetStatus()) if err != nil { @@ -197,11 +198,11 @@ func (s *RangeSearchSuite) TestRangeSearchL2() { dim := 128 rowNum := 3000 - schema := constructSchema(collectionName, dim, true) + schema := integration.ConstructSchema(collectionName, dim, true) marshaledSchema, err := proto.Marshal(schema) s.NoError(err) - createCollectionStatus, err := c.proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ + createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ DbName: dbName, CollectionName: collectionName, Schema: marshaledSchema, @@ -215,14 +216,14 @@ func (s *RangeSearchSuite) TestRangeSearchL2() { } log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus)) - showCollectionsResp, err := c.proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{}) + showCollectionsResp, err := c.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{}) s.NoError(err) s.True(merr.Ok(showCollectionsResp.GetStatus())) log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp)) - fVecColumn := newFloatVectorFieldData(floatVecField, rowNum, dim) - hashKeys := generateHashKeys(rowNum) - insertResult, err := c.proxy.Insert(ctx, &milvuspb.InsertRequest{ + fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, rowNum, dim) + hashKeys := integration.GenerateHashKeys(rowNum) + insertResult, err := c.Proxy.Insert(ctx, &milvuspb.InsertRequest{ DbName: dbName, CollectionName: collectionName, FieldsData: []*schemapb.FieldData{fVecColumn}, @@ -233,7 +234,7 @@ func (s *RangeSearchSuite) TestRangeSearchL2() { s.True(merr.Ok(insertResult.GetStatus())) // flush - flushResp, err := c.proxy.Flush(ctx, &milvuspb.FlushRequest{ + flushResp, err := c.Proxy.Flush(ctx, &milvuspb.FlushRequest{ DbName: dbName, CollectionNames: []string{collectionName}, }) @@ -243,30 +244,30 @@ func (s *RangeSearchSuite) TestRangeSearchL2() { ids := segmentIDs.GetData() s.NotEmpty(segmentIDs) - segments, err := c.metaWatcher.ShowSegments() + segments, err := c.MetaWatcher.ShowSegments() s.NoError(err) s.NotEmpty(segments) for _, segment := range segments { log.Info("ShowSegments result", zap.String("segment", segment.String())) } - waitingForFlush(ctx, c, ids) + s.WaitForFlush(ctx, ids) // create index - createIndexStatus, err := c.proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ + createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ CollectionName: collectionName, - FieldName: floatVecField, + FieldName: integration.FloatVecField, IndexName: "_default", - ExtraParams: constructIndexParam(dim, IndexFaissIvfFlat, distance.L2), + ExtraParams: integration.ConstructIndexParam(dim, integration.IndexFaissIvfFlat, distance.L2), }) s.NoError(err) err = merr.Error(createIndexStatus) if err != nil { log.Warn("createIndexStatus fail reason", zap.Error(err)) } - waitingForIndexBuilt(ctx, c, s.T(), collectionName, floatVecField) + s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField) // load - loadStatus, err := c.proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ + loadStatus, err := c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ DbName: dbName, CollectionName: collectionName, }) @@ -275,22 +276,22 @@ func (s *RangeSearchSuite) TestRangeSearchL2() { if err != nil { log.Warn("LoadCollection fail reason", zap.Error(err)) } - waitingForLoad(ctx, c, collectionName) + s.WaitForLoad(ctx, collectionName) // search - expr := fmt.Sprintf("%s > 0", int64Field) + expr := fmt.Sprintf("%s > 0", integration.Int64Field) nq := 10 topk := 10 roundDecimal := -1 radius := 20 filter := 10 - params := getSearchParams(IndexFaissIvfFlat, distance.L2) + params := integration.GetSearchParams(integration.IndexFaissIvfFlat, distance.L2) // only pass in radius when range search params["radius"] = radius - searchReq := constructSearchRequest("", collectionName, expr, - floatVecField, schemapb.DataType_FloatVector, nil, distance.L2, params, nq, dim, topk, roundDecimal) + searchReq := integration.ConstructSearchRequest("", collectionName, expr, + integration.FloatVecField, schemapb.DataType_FloatVector, nil, distance.L2, params, nq, dim, topk, roundDecimal) - searchResult, _ := c.proxy.Search(ctx, searchReq) + searchResult, _ := c.Proxy.Search(ctx, searchReq) err = merr.Error(searchResult.GetStatus()) if err != nil { @@ -300,10 +301,10 @@ func (s *RangeSearchSuite) TestRangeSearchL2() { // pass in radius and range_filter when range search params["range_filter"] = filter - searchReq = constructSearchRequest("", collectionName, expr, - floatVecField, schemapb.DataType_FloatVector, nil, distance.L2, params, nq, dim, topk, roundDecimal) + searchReq = integration.ConstructSearchRequest("", collectionName, expr, + integration.FloatVecField, schemapb.DataType_FloatVector, nil, distance.L2, params, nq, dim, topk, roundDecimal) - searchResult, _ = c.proxy.Search(ctx, searchReq) + searchResult, _ = c.Proxy.Search(ctx, searchReq) err = merr.Error(searchResult.GetStatus()) if err != nil { @@ -314,10 +315,10 @@ func (s *RangeSearchSuite) TestRangeSearchL2() { // pass in illegal radius and range_filter when range search params["radius"] = filter params["range_filter"] = radius - searchReq = constructSearchRequest("", collectionName, expr, - floatVecField, schemapb.DataType_FloatVector, nil, distance.L2, params, nq, dim, topk, roundDecimal) + searchReq = integration.ConstructSearchRequest("", collectionName, expr, + integration.FloatVecField, schemapb.DataType_FloatVector, nil, distance.L2, params, nq, dim, topk, roundDecimal) - searchResult, _ = c.proxy.Search(ctx, searchReq) + searchResult, _ = c.Proxy.Search(ctx, searchReq) err = merr.Error(searchResult.GetStatus()) if err != nil { diff --git a/tests/integration/refresh_config_test.go b/tests/integration/refreshconfig/refresh_config_test.go similarity index 74% rename from tests/integration/refresh_config_test.go rename to tests/integration/refreshconfig/refresh_config_test.go index 03d6135caf..8368be6c44 100644 --- a/tests/integration/refresh_config_test.go +++ b/tests/integration/refreshconfig/refresh_config_test.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package integration +package refreshconfig import ( "context" @@ -29,12 +29,13 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/distance" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/tests/integration" "github.com/stretchr/testify/suite" "go.uber.org/zap" ) type RefreshConfigSuite struct { - MiniClusterSuite + integration.MiniClusterSuite } func (s *RefreshConfigSuite) TestRefreshPasswordLength() { @@ -42,7 +43,7 @@ func (s *RefreshConfigSuite) TestRefreshPasswordLength() { ctx, cancel := context.WithCancel(c.GetContext()) defer cancel() - resp, err := c.proxy.CreateCredential(ctx, &milvuspb.CreateCredentialRequest{ + resp, err := c.Proxy.CreateCredential(ctx, &milvuspb.CreateCredentialRequest{ Username: "test", Password: "1234", }) @@ -52,10 +53,10 @@ func (s *RefreshConfigSuite) TestRefreshPasswordLength() { params := paramtable.Get() key := fmt.Sprintf("%s/config/proxy/minpasswordlength", params.EtcdCfg.RootPath.GetValue()) - c.etcdCli.KV.Put(ctx, key, "3") + c.EtcdCli.KV.Put(ctx, key, "3") s.Eventually(func() bool { - resp, err = c.proxy.CreateCredential(ctx, &milvuspb.CreateCredentialRequest{ + resp, err = c.Proxy.CreateCredential(ctx, &milvuspb.CreateCredentialRequest{ Username: "test", Password: "1234", }) @@ -70,7 +71,7 @@ func (s *RefreshConfigSuite) TestRefreshDefaultIndexName() { ctx, cancel := context.WithCancel(c.GetContext()) defer cancel() params := paramtable.Get() - c.etcdCli.KV.Put(ctx, fmt.Sprintf("%s/config/common/defaultIndexName", params.EtcdCfg.RootPath.GetValue()), "a_index") + c.EtcdCli.KV.Put(ctx, fmt.Sprintf("%s/config/common/defaultIndexName", params.EtcdCfg.RootPath.GetValue()), "a_index") s.Eventually(func() bool { return params.CommonCfg.DefaultIndexName.GetValue() == "a_index" @@ -81,11 +82,11 @@ func (s *RefreshConfigSuite) TestRefreshDefaultIndexName() { collectionName := "test" rowNum := 100 - schema := constructSchema("test", 128, true) + schema := integration.ConstructSchema("test", 128, true) marshaledSchema, err := proto.Marshal(schema) s.Require().NoError(err) - createCollectionStatus, err := c.proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ + createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ DbName: "default", CollectionName: "test", Schema: marshaledSchema, @@ -97,9 +98,9 @@ func (s *RefreshConfigSuite) TestRefreshDefaultIndexName() { } s.Equal(createCollectionStatus.GetErrorCode(), commonpb.ErrorCode_Success) - fVecColumn := newFloatVectorFieldData(floatVecField, rowNum, dim) - hashKeys := generateHashKeys(rowNum) - _, err = c.proxy.Insert(ctx, &milvuspb.InsertRequest{ + fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, rowNum, dim) + hashKeys := integration.GenerateHashKeys(rowNum) + _, err = c.Proxy.Insert(ctx, &milvuspb.InsertRequest{ DbName: dbName, CollectionName: collectionName, FieldsData: []*schemapb.FieldData{fVecColumn}, @@ -108,14 +109,27 @@ func (s *RefreshConfigSuite) TestRefreshDefaultIndexName() { }) s.NoError(err) - _, err = c.proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ + // flush + flushResp, err := c.Proxy.Flush(ctx, &milvuspb.FlushRequest{ + DbName: dbName, + CollectionNames: []string{collectionName}, + }) + s.NoError(err) + segmentIDs, has := flushResp.GetCollSegIDs()[collectionName] + s.True(has) + ids := segmentIDs.GetData() + s.NotEmpty(segmentIDs) + + s.WaitForFlush(ctx, ids) + + _, err = c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ CollectionName: collectionName, - FieldName: floatVecField, - ExtraParams: constructIndexParam(dim, IndexFaissIvfFlat, distance.L2), + FieldName: integration.FloatVecField, + ExtraParams: integration.ConstructIndexParam(dim, integration.IndexFaissIvfFlat, distance.L2), }) s.NoError(err) - resp, err := c.proxy.DescribeIndex(ctx, &milvuspb.DescribeIndexRequest{ + resp, err := c.Proxy.DescribeIndex(ctx, &milvuspb.DescribeIndexRequest{ DbName: dbName, CollectionName: collectionName, }) diff --git a/tests/integration/suite_test.go b/tests/integration/suite.go similarity index 98% rename from tests/integration/suite_test.go rename to tests/integration/suite.go index 8415b4a85d..d86662192f 100644 --- a/tests/integration/suite_test.go +++ b/tests/integration/suite.go @@ -18,6 +18,7 @@ package integration import ( "context" + "math/rand" "os" "strings" "time" @@ -64,6 +65,7 @@ type MiniClusterSuite struct { } func (s *MiniClusterSuite) SetupSuite() { + rand.Seed(time.Now().UnixNano()) s.Require().NoError(s.SetupEmbedEtcd()) } diff --git a/tests/integration/upsert_test.go b/tests/integration/upsert/upsert_test.go similarity index 71% rename from tests/integration/upsert_test.go rename to tests/integration/upsert/upsert_test.go index ef13821829..f3c3be53fe 100644 --- a/tests/integration/upsert_test.go +++ b/tests/integration/upsert/upsert_test.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package integration +package upsert import ( "context" @@ -29,12 +29,13 @@ import ( "github.com/milvus-io/milvus/pkg/util/distance" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/tests/integration" "github.com/stretchr/testify/suite" "go.uber.org/zap" ) type UpsertSuite struct { - MiniClusterSuite + integration.MiniClusterSuite } func (s *UpsertSuite) TestUpsert() { @@ -48,11 +49,11 @@ func (s *UpsertSuite) TestUpsert() { dim := 128 rowNum := 3000 - schema := constructSchema(collectionName, dim, false) + schema := integration.ConstructSchema(collectionName, dim, false) marshaledSchema, err := proto.Marshal(schema) s.NoError(err) - createCollectionStatus, err := c.proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ + createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ DbName: dbName, CollectionName: collectionName, Schema: marshaledSchema, @@ -66,15 +67,15 @@ func (s *UpsertSuite) TestUpsert() { } log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus)) - showCollectionsResp, err := c.proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{}) + showCollectionsResp, err := c.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{}) s.NoError(err) s.True(merr.Ok(showCollectionsResp.GetStatus())) log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp)) - pkFieldData := newInt64FieldData(int64Field, rowNum) - fVecColumn := newFloatVectorFieldData(floatVecField, rowNum, dim) - hashKeys := generateHashKeys(rowNum) - upsertResult, err := c.proxy.Upsert(ctx, &milvuspb.UpsertRequest{ + pkFieldData := integration.NewInt64FieldData(integration.Int64Field, rowNum) + fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, rowNum, dim) + hashKeys := integration.GenerateHashKeys(rowNum) + upsertResult, err := c.Proxy.Upsert(ctx, &milvuspb.UpsertRequest{ DbName: dbName, CollectionName: collectionName, FieldsData: []*schemapb.FieldData{pkFieldData, fVecColumn}, @@ -85,7 +86,7 @@ func (s *UpsertSuite) TestUpsert() { s.True(merr.Ok(upsertResult.GetStatus())) // flush - flushResp, err := c.proxy.Flush(ctx, &milvuspb.FlushRequest{ + flushResp, err := c.Proxy.Flush(ctx, &milvuspb.FlushRequest{ DbName: dbName, CollectionNames: []string{collectionName}, }) @@ -95,20 +96,20 @@ func (s *UpsertSuite) TestUpsert() { ids := segmentIDs.GetData() s.NotEmpty(segmentIDs) - segments, err := c.metaWatcher.ShowSegments() + segments, err := c.MetaWatcher.ShowSegments() s.NoError(err) s.NotEmpty(segments) for _, segment := range segments { log.Info("ShowSegments result", zap.String("segment", segment.String())) } - waitingForFlush(ctx, c, ids) + s.WaitForFlush(ctx, ids) // create index - createIndexStatus, err := c.proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ + createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ CollectionName: collectionName, - FieldName: floatVecField, + FieldName: integration.FloatVecField, IndexName: "_default", - ExtraParams: constructIndexParam(dim, IndexFaissIvfFlat, distance.IP), + ExtraParams: integration.ConstructIndexParam(dim, integration.IndexFaissIvfFlat, distance.IP), }) s.NoError(err) err = merr.Error(createIndexStatus) @@ -116,10 +117,10 @@ func (s *UpsertSuite) TestUpsert() { log.Warn("createIndexStatus fail reason", zap.Error(err)) } - waitingForIndexBuilt(ctx, c, s.T(), collectionName, floatVecField) + s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField) // load - loadStatus, err := c.proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ + loadStatus, err := c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ DbName: dbName, CollectionName: collectionName, }) @@ -128,18 +129,18 @@ func (s *UpsertSuite) TestUpsert() { if err != nil { log.Warn("LoadCollection fail reason", zap.Error(err)) } - waitingForLoad(ctx, c, collectionName) + s.WaitForLoad(ctx, collectionName) // search - expr := fmt.Sprintf("%s > 0", int64Field) + expr := fmt.Sprintf("%s > 0", integration.Int64Field) nq := 10 topk := 10 roundDecimal := -1 - params := getSearchParams(IndexFaissIvfFlat, "") - searchReq := constructSearchRequest("", collectionName, expr, - floatVecField, schemapb.DataType_FloatVector, nil, distance.IP, params, nq, dim, topk, roundDecimal) + params := integration.GetSearchParams(integration.IndexFaissIvfFlat, "") + searchReq := integration.ConstructSearchRequest("", collectionName, expr, + integration.FloatVecField, schemapb.DataType_FloatVector, nil, distance.IP, params, nq, dim, topk, roundDecimal) - searchResult, _ := c.proxy.Search(ctx, searchReq) + searchResult, _ := c.Proxy.Search(ctx, searchReq) err = merr.Error(searchResult.GetStatus()) if err != nil { diff --git a/tests/integration/util_index.go b/tests/integration/util_index.go index b0f3609811..2dd0087743 100644 --- a/tests/integration/util_index.go +++ b/tests/integration/util_index.go @@ -42,9 +42,41 @@ const ( IndexDISKANN = indexparamcheck.IndexDISKANN ) +func (s *MiniClusterSuite) WaitForIndexBuilt(ctx context.Context, collection, field string) { + getIndexBuilt := func() bool { + resp, err := s.Cluster.Proxy.DescribeIndex(ctx, &milvuspb.DescribeIndexRequest{ + CollectionName: collection, + FieldName: field, + }) + if err != nil { + s.FailNow("failed to describe index") + return true + } + for _, desc := range resp.GetIndexDescriptions() { + if desc.GetFieldName() == field { + switch desc.GetState() { + case commonpb.IndexState_Finished: + return true + case commonpb.IndexState_Failed: + return false + } + } + } + return false + } + for !getIndexBuilt() { + select { + case <-ctx.Done(): + s.FailNow("failed to wait index built until ctx done") + return + case <-time.After(500 * time.Millisecond): + } + } +} + func waitingForIndexBuilt(ctx context.Context, cluster *MiniCluster, t *testing.T, collection, field string) { getIndexBuilt := func() bool { - resp, err := cluster.proxy.DescribeIndex(ctx, &milvuspb.DescribeIndexRequest{ + resp, err := cluster.Proxy.DescribeIndex(ctx, &milvuspb.DescribeIndexRequest{ CollectionName: collection, FieldName: field, }) @@ -74,7 +106,7 @@ func waitingForIndexBuilt(ctx context.Context, cluster *MiniCluster, t *testing. } } -func constructIndexParam(dim int, indexType string, metricType string) []*commonpb.KeyValuePair { +func ConstructIndexParam(dim int, indexType string, metricType string) []*commonpb.KeyValuePair { params := []*commonpb.KeyValuePair{ { Key: common.DimKey, @@ -126,7 +158,7 @@ func constructIndexParam(dim int, indexType string, metricType string) []*common return params } -func getSearchParams(indexType string, metricType string) map[string]any { +func GetSearchParams(indexType string, metricType string) map[string]any { params := make(map[string]any) switch indexType { case IndexFaissIDMap, IndexFaissBinIDMap: diff --git a/tests/integration/util_insert.go b/tests/integration/util_insert.go index 58508a4e2b..dcbc416a0b 100644 --- a/tests/integration/util_insert.go +++ b/tests/integration/util_insert.go @@ -26,9 +26,30 @@ import ( "github.com/milvus-io/milvus-proto/go-api/schemapb" ) +func (s *MiniClusterSuite) WaitForFlush(ctx context.Context, segIDs []int64) { + flushed := func() bool { + resp, err := s.Cluster.Proxy.GetFlushState(ctx, &milvuspb.GetFlushStateRequest{ + SegmentIDs: segIDs, + }) + if err != nil { + return false + } + return resp.GetFlushed() + } + for !flushed() { + select { + case <-ctx.Done(): + s.FailNow("failed to wait for flush until ctx done") + return + default: + time.Sleep(500 * time.Millisecond) + } + } +} + func waitingForFlush(ctx context.Context, cluster *MiniCluster, segIDs []int64) { flushed := func() bool { - resp, err := cluster.proxy.GetFlushState(ctx, &milvuspb.GetFlushStateRequest{ + resp, err := cluster.Proxy.GetFlushState(ctx, &milvuspb.GetFlushStateRequest{ SegmentIDs: segIDs, }) if err != nil { @@ -46,7 +67,7 @@ func waitingForFlush(ctx context.Context, cluster *MiniCluster, segIDs []int64) } } -func newInt64FieldData(fieldName string, numRows int) *schemapb.FieldData { +func NewInt64FieldData(fieldName string, numRows int) *schemapb.FieldData { return &schemapb.FieldData{ Type: schemapb.DataType_Int64, FieldName: fieldName, @@ -54,7 +75,7 @@ func newInt64FieldData(fieldName string, numRows int) *schemapb.FieldData { Scalars: &schemapb.ScalarField{ Data: &schemapb.ScalarField_LongData{ LongData: &schemapb.LongArray{ - Data: generateInt64Array(numRows), + Data: GenerateInt64Array(numRows), }, }, }, @@ -62,7 +83,7 @@ func newInt64FieldData(fieldName string, numRows int) *schemapb.FieldData { } } -func newStringFieldData(fieldName string, numRows int) *schemapb.FieldData { +func NewStringFieldData(fieldName string, numRows int) *schemapb.FieldData { return &schemapb.FieldData{ Type: schemapb.DataType_Int64, FieldName: fieldName, @@ -70,7 +91,7 @@ func newStringFieldData(fieldName string, numRows int) *schemapb.FieldData { Scalars: &schemapb.ScalarField{ Data: &schemapb.ScalarField_StringData{ StringData: &schemapb.StringArray{ - Data: generateStringArray(numRows), + Data: GenerateStringArray(numRows), }, }, }, @@ -78,7 +99,7 @@ func newStringFieldData(fieldName string, numRows int) *schemapb.FieldData { } } -func newFloatVectorFieldData(fieldName string, numRows, dim int) *schemapb.FieldData { +func NewFloatVectorFieldData(fieldName string, numRows, dim int) *schemapb.FieldData { return &schemapb.FieldData{ Type: schemapb.DataType_FloatVector, FieldName: fieldName, @@ -87,7 +108,7 @@ func newFloatVectorFieldData(fieldName string, numRows, dim int) *schemapb.Field Dim: int64(dim), Data: &schemapb.VectorField_FloatVector{ FloatVector: &schemapb.FloatArray{ - Data: generateFloatVectors(numRows, dim), + Data: GenerateFloatVectors(numRows, dim), }, }, }, @@ -95,7 +116,7 @@ func newFloatVectorFieldData(fieldName string, numRows, dim int) *schemapb.Field } } -func newBinaryVectorFieldData(fieldName string, numRows, dim int) *schemapb.FieldData { +func NewBinaryVectorFieldData(fieldName string, numRows, dim int) *schemapb.FieldData { return &schemapb.FieldData{ Type: schemapb.DataType_BinaryVector, FieldName: fieldName, @@ -103,14 +124,14 @@ func newBinaryVectorFieldData(fieldName string, numRows, dim int) *schemapb.Fiel Vectors: &schemapb.VectorField{ Dim: int64(dim), Data: &schemapb.VectorField_BinaryVector{ - BinaryVector: generateBinaryVectors(numRows, dim), + BinaryVector: GenerateBinaryVectors(numRows, dim), }, }, }, } } -func generateInt64Array(numRows int) []int64 { +func GenerateInt64Array(numRows int) []int64 { ret := make([]int64, numRows) for i := 0; i < numRows; i++ { ret[i] = int64(i) @@ -118,7 +139,7 @@ func generateInt64Array(numRows int) []int64 { return ret } -func generateStringArray(numRows int) []string { +func GenerateStringArray(numRows int) []string { ret := make([]string, numRows) for i := 0; i < numRows; i++ { ret[i] = fmt.Sprintf("%d", i) @@ -126,7 +147,7 @@ func generateStringArray(numRows int) []string { return ret } -func generateFloatVectors(numRows, dim int) []float32 { +func GenerateFloatVectors(numRows, dim int) []float32 { total := numRows * dim ret := make([]float32, 0, total) for i := 0; i < total; i++ { @@ -135,7 +156,7 @@ func generateFloatVectors(numRows, dim int) []float32 { return ret } -func generateBinaryVectors(numRows, dim int) []byte { +func GenerateBinaryVectors(numRows, dim int) []byte { total := (numRows * dim) / 8 ret := make([]byte, total) _, err := rand.Read(ret) @@ -145,7 +166,7 @@ func generateBinaryVectors(numRows, dim int) []byte { return ret } -func generateHashKeys(numRows int) []uint32 { +func GenerateHashKeys(numRows int) []uint32 { ret := make([]uint32, 0, numRows) for i := 0; i < numRows; i++ { ret = append(ret, rand.Uint32()) diff --git a/tests/integration/util_query.go b/tests/integration/util_query.go index a5af981ae8..9e3aca338a 100644 --- a/tests/integration/util_query.go +++ b/tests/integration/util_query.go @@ -44,9 +44,31 @@ const ( LimitKey = "limit" ) +func (s *MiniClusterSuite) WaitForLoad(ctx context.Context, collection string) { + cluster := s.Cluster + getLoadingProgress := func() *milvuspb.GetLoadingProgressResponse { + loadProgress, err := cluster.Proxy.GetLoadingProgress(ctx, &milvuspb.GetLoadingProgressRequest{ + CollectionName: collection, + }) + if err != nil { + panic("GetLoadingProgress fail") + } + return loadProgress + } + for getLoadingProgress().GetProgress() != 100 { + select { + case <-ctx.Done(): + s.FailNow("failed to wait for load") + return + default: + time.Sleep(500 * time.Millisecond) + } + } +} + func waitingForLoad(ctx context.Context, cluster *MiniCluster, collection string) { getLoadingProgress := func() *milvuspb.GetLoadingProgressResponse { - loadProgress, err := cluster.proxy.GetLoadingProgress(ctx, &milvuspb.GetLoadingProgressRequest{ + loadProgress, err := cluster.Proxy.GetLoadingProgress(ctx, &milvuspb.GetLoadingProgressRequest{ CollectionName: collection, }) if err != nil { @@ -64,7 +86,7 @@ func waitingForLoad(ctx context.Context, cluster *MiniCluster, collection string } } -func constructSearchRequest( +func ConstructSearchRequest( dbName, collectionName string, expr string, vecField string, diff --git a/tests/integration/util_schema.go b/tests/integration/util_schema.go index 45983c9638..b0d9603900 100644 --- a/tests/integration/util_schema.go +++ b/tests/integration/util_schema.go @@ -25,20 +25,20 @@ import ( ) const ( - boolField = "boolField" - int8Field = "int8Field" - int16Field = "int16Field" - int32Field = "int32Field" - int64Field = "int64Field" - floatField = "floatField" - doubleField = "doubleField" - varCharField = "varCharField" - jsonField = "jsonField" - floatVecField = "floatVecField" - binVecField = "binVecField" + BoolField = "boolField" + Int8Field = "int8Field" + Int16Field = "int16Field" + Int32Field = "int32Field" + Int64Field = "int64Field" + FloatField = "floatField" + DoubleField = "doubleField" + VarCharField = "varCharField" + JSONField = "jsonField" + FloatVecField = "floatVecField" + BinVecField = "binVecField" ) -func constructSchema(collection string, dim int, autoID bool, fields ...*schemapb.FieldSchema) *schemapb.CollectionSchema { +func ConstructSchema(collection string, dim int, autoID bool, fields ...*schemapb.FieldSchema) *schemapb.CollectionSchema { // if fields are specified, construct it if len(fields) > 0 { return &schemapb.CollectionSchema{ @@ -51,7 +51,7 @@ func constructSchema(collection string, dim int, autoID bool, fields ...*schemap // if no field is specified, use default pk := &schemapb.FieldSchema{ FieldID: 100, - Name: int64Field, + Name: Int64Field, IsPrimaryKey: true, Description: "", DataType: schemapb.DataType_Int64, @@ -61,7 +61,7 @@ func constructSchema(collection string, dim int, autoID bool, fields ...*schemap } fVec := &schemapb.FieldSchema{ FieldID: 101, - Name: floatVecField, + Name: FloatVecField, IsPrimaryKey: false, Description: "", DataType: schemapb.DataType_FloatVector,