From c339df26fc04b3b14c9043fb8aad421f5b1560f5 Mon Sep 17 00:00:00 2001 From: wayblink Date: Mon, 22 Jul 2024 11:27:51 +0800 Subject: [PATCH] enhance: refine clustering compaction basic it (#34793) #34792 Signed-off-by: wayblink --- .../compaction/clustering_compaction_test.go | 80 ++++++++++++++++++- 1 file changed, 79 insertions(+), 1 deletion(-) diff --git a/tests/integration/compaction/clustering_compaction_test.go b/tests/integration/compaction/clustering_compaction_test.go index 64680522ba..9b6731f5c0 100644 --- a/tests/integration/compaction/clustering_compaction_test.go +++ b/tests/integration/compaction/clustering_compaction_test.go @@ -29,9 +29,12 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/util/hookutil" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/metric" "github.com/milvus-io/milvus/tests/integration" ) @@ -107,6 +110,37 @@ func (s *ClusteringCompactionSuite) TestClusteringCompaction() { } s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName) + indexType := integration.IndexFaissIvfFlat + metricType := metric.L2 + vecType := schemapb.DataType_FloatVector + + // create index + createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ + CollectionName: collectionName, + FieldName: fVecColumn.FieldName, + IndexName: "_default", + ExtraParams: integration.ConstructIndexParam(dim, indexType, metricType), + }) + if createIndexStatus.GetErrorCode() != commonpb.ErrorCode_Success { + log.Warn("createIndexStatus fail reason", zap.String("reason", createIndexStatus.GetReason())) + } + s.NoError(err) + s.Equal(commonpb.ErrorCode_Success, createIndexStatus.GetErrorCode()) + + s.WaitForIndexBuilt(ctx, collectionName, fVecColumn.FieldName) + + // load + loadStatus, err := c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ + DbName: dbName, + CollectionName: collectionName, + }) + s.NoError(err) + if loadStatus.GetErrorCode() != commonpb.ErrorCode_Success { + log.Warn("loadStatus fail reason", zap.String("reason", loadStatus.GetReason())) + } + s.Equal(commonpb.ErrorCode_Success, loadStatus.GetErrorCode()) + s.WaitForLoad(ctx, collectionName) + compactReq := &milvuspb.ManualCompactionRequest{ CollectionID: showCollectionsResp.CollectionIds[0], MajorCompaction: true, @@ -125,10 +159,54 @@ func (s *ClusteringCompactionSuite) TestClusteringCompaction() { return resp.GetState() == commonpb.CompactionState_Completed } for !compacted() { - time.Sleep(1 * time.Second) + time.Sleep(3 * time.Second) } log.Info("compact done") + // search + expr := fmt.Sprintf("%s > 0", integration.Int64Field) + nq := 10 + topk := 10 + roundDecimal := -1 + + params := integration.GetSearchParams(indexType, metricType) + searchReq := integration.ConstructSearchRequest("", collectionName, expr, + fVecColumn.FieldName, vecType, nil, metricType, params, nq, dim, topk, roundDecimal) + + searchCheckReport := func() { + timeoutCtx, cancelFunc := context.WithTimeout(ctx, 5*time.Second) + defer cancelFunc() + + for { + select { + case <-timeoutCtx.Done(): + s.Fail("search check timeout") + case report := <-c.Extension.GetReportChan(): + reportInfo := report.(map[string]any) + log.Info("search report info", zap.Any("reportInfo", reportInfo)) + s.Equal(hookutil.OpTypeSearch, reportInfo[hookutil.OpTypeKey]) + s.NotEqualValues(0, reportInfo[hookutil.ResultDataSizeKey]) + s.NotEqualValues(0, reportInfo[hookutil.RelatedDataSizeKey]) + s.EqualValues(rowNum, reportInfo[hookutil.RelatedCntKey]) + return + } + } + } + go searchCheckReport() + searchResult, err := c.Proxy.Search(ctx, searchReq) + err = merr.CheckRPCCall(searchResult, err) + s.NoError(err) + + querySegmentInfo, err := c.Proxy.GetQuerySegmentInfo(ctx, &milvuspb.GetQuerySegmentInfoRequest{ + DbName: dbName, + CollectionName: collectionName, + }) + var queryRows int64 = 0 + for _, seg := range querySegmentInfo.Infos { + queryRows += seg.NumRows + } + s.Equal(int64(3000), queryRows) + log.Info("TestClusteringCompaction succeed") }