From 4c45bc412f3bcf316776e15e32a165cbd0d5ccbe Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Tue, 23 Jul 2024 10:13:43 +0800 Subject: [PATCH] enhance: Add integration test for clustering compaction (#34881) issue: #34792 --------- Signed-off-by: cai.zhang Signed-off-by: Cai Zhang --- internal/datacoord/compaction_trigger_v2.go | 2 + .../compaction/clustering_compactor.go | 9 +- .../compaction/clustering_compaction_test.go | 130 ++++++++++++++---- 3 files changed, 105 insertions(+), 36 deletions(-) diff --git a/internal/datacoord/compaction_trigger_v2.go b/internal/datacoord/compaction_trigger_v2.go index 4949311103..9fd9b33e31 100644 --- a/internal/datacoord/compaction_trigger_v2.go +++ b/internal/datacoord/compaction_trigger_v2.go @@ -312,6 +312,8 @@ func (m *CompactionTriggerManager) SubmitClusteringViewToScheduler(ctx context.C zap.Int64("taskID", taskID), zap.Int64("planID", task.GetPlanID()), zap.String("type", task.GetType().String()), + zap.Int64("MaxSegmentRows", task.MaxSegmentRows), + zap.Int64("PreferSegmentRows", task.PreferSegmentRows), ) } diff --git a/internal/datanode/compaction/clustering_compactor.go b/internal/datanode/compaction/clustering_compactor.go index aa8d82755c..64c1b6ae62 100644 --- a/internal/datanode/compaction/clustering_compactor.go +++ b/internal/datanode/compaction/clustering_compactor.go @@ -596,10 +596,6 @@ func (t *clusteringCompactionTask) mappingSegment( if (remained+1)%100 == 0 { currentBufferTotalMemorySize := t.getBufferTotalUsedMemorySize() - currentBufferWrittenMemorySize := t.getCurrentBufferWrittenMemorySize() - log.Debug("current buffer size", zap.Int64("currentBufferTotalMemorySize", currentBufferTotalMemorySize), - zap.Int64("currentBufferWrittenMemorySize", currentBufferWrittenMemorySize)) - // trigger flushBinlog currentSegmentNumRows := clusterBuffer.currentSegmentRowNum.Load() if currentSegmentNumRows > t.plan.GetMaxSegmentRows() || @@ -622,7 +618,8 @@ func (t *clusteringCompactionTask) mappingSegment( } } else if currentBufferTotalMemorySize > t.getMemoryBufferBlockFlushThreshold() && !t.hasSignal.Load() { // reach flushBinlog trigger threshold - log.Debug("largest buffer need to flush", zap.Int64("currentBufferTotalMemorySize", currentBufferTotalMemorySize)) + log.Debug("largest buffer need to flush", + zap.Int64("currentBufferTotalMemorySize", currentBufferTotalMemorySize)) t.flushChan <- FlushSignal{} t.hasSignal.Store(true) } @@ -1168,7 +1165,7 @@ func (t *clusteringCompactionTask) refreshBufferWriterWithPack(buffer *ClusterBu segmentID = buffer.writer.GetSegmentID() buffer.bufferMemorySize.Add(int64(buffer.writer.WrittenMemorySize())) } - if buffer.writer == nil || buffer.currentSegmentRowNum.Load()+buffer.writer.GetRowNum() > t.plan.GetMaxSegmentRows() { + if buffer.writer == nil || buffer.currentSegmentRowNum.Load() > t.plan.GetMaxSegmentRows() { pack = true segmentID, err = t.segIDAlloc.AllocOne() if err != nil { diff --git a/tests/integration/compaction/clustering_compaction_test.go b/tests/integration/compaction/clustering_compaction_test.go index 9b6731f5c0..49f0905d3b 100644 --- a/tests/integration/compaction/clustering_compaction_test.go +++ b/tests/integration/compaction/clustering_compaction_test.go @@ -19,6 +19,8 @@ package compaction import ( "context" "fmt" + "strconv" + "sync" "testing" "time" @@ -29,12 +31,13 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" - "github.com/milvus-io/milvus/internal/util/hookutil" + "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metric" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/tests/integration" ) @@ -50,11 +53,30 @@ func (s *ClusteringCompactionSuite) TestClusteringCompaction() { const ( dim = 128 dbName = "" - rowNum = 3000 + rowNum = 30000 ) collectionName := "TestClusteringCompaction" + funcutil.GenRandomStr() + // 2000 rows for each segment, about 1MB. + paramtable.Get().Save(paramtable.Get().DataCoordCfg.SegmentMaxSize.Key, strconv.Itoa(1)) + defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.SegmentMaxSize.Key) + + paramtable.Get().Save(paramtable.Get().DataNodeCfg.ClusteringCompactionWorkerPoolSize.Key, strconv.Itoa(8)) + defer paramtable.Get().Reset(paramtable.Get().DataNodeCfg.ClusteringCompactionWorkerPoolSize.Key) + + paramtable.Get().Save(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key, strconv.Itoa(102400)) + defer paramtable.Get().Reset(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key) + + paramtable.Get().Save(paramtable.Get().DataCoordCfg.EnableAutoCompaction.Key, "false") + defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.EnableAutoCompaction.Key) + + paramtable.Get().Save(paramtable.Get().DataCoordCfg.ClusteringCompactionMaxSegmentSize.Key, "1m") + defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.ClusteringCompactionMaxSegmentSize.Key) + + paramtable.Get().Save(paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSize.Key, "1m") + defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSize.Key) + schema := ConstructScalarClusteringSchema(collectionName, dim, true) marshaledSchema, err := proto.Marshal(schema) s.NoError(err) @@ -78,11 +100,12 @@ func (s *ClusteringCompactionSuite) TestClusteringCompaction() { log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp)) fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, rowNum, dim) + clusteringColumn := integration.NewInt64SameFieldData("clustering", rowNum, 100) hashKeys := integration.GenerateHashKeys(rowNum) insertResult, err := c.Proxy.Insert(ctx, &milvuspb.InsertRequest{ DbName: dbName, CollectionName: collectionName, - FieldsData: []*schemapb.FieldData{fVecColumn}, + FieldsData: []*schemapb.FieldData{clusteringColumn, fVecColumn}, HashKeys: hashKeys, NumRows: uint32(rowNum), }) @@ -161,6 +184,37 @@ func (s *ClusteringCompactionSuite) TestClusteringCompaction() { for !compacted() { time.Sleep(3 * time.Second) } + desCollResp, err := c.Proxy.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{ + CollectionName: collectionName, + CollectionID: 0, + TimeStamp: 0, + }) + s.NoError(err) + s.Equal(desCollResp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success) + + flushedSegmentsResp, err := c.DataCoord.GetFlushedSegments(ctx, &datapb.GetFlushedSegmentsRequest{ + CollectionID: desCollResp.GetCollectionID(), + PartitionID: -1, + }) + s.NoError(err) + s.Equal(flushedSegmentsResp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success) + + // 30000*(128*4+8+8) = 15.1MB/1MB = 15+1 + // The check is done every 100 lines written, so the size of each segment may be up to 99 lines larger. + s.Contains([]int{15, 16}, len(flushedSegmentsResp.GetSegments())) + log.Info("get flushed segments done", zap.Int64s("segments", flushedSegmentsResp.GetSegments())) + totalRows := int64(0) + segsInfoResp, err := c.DataCoord.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{ + SegmentIDs: flushedSegmentsResp.GetSegments(), + }) + s.NoError(err) + s.Equal(segsInfoResp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success) + for _, segInfo := range segsInfoResp.GetInfos() { + totalRows += segInfo.GetNumOfRows() + } + + s.Equal(int64(rowNum), totalRows) + log.Info("compact done") // search @@ -173,40 +227,48 @@ func (s *ClusteringCompactionSuite) TestClusteringCompaction() { searchReq := integration.ConstructSearchRequest("", collectionName, expr, fVecColumn.FieldName, vecType, nil, metricType, params, nq, dim, topk, roundDecimal) - searchCheckReport := func() { - timeoutCtx, cancelFunc := context.WithTimeout(ctx, 5*time.Second) + searchResult, err := c.Proxy.Search(ctx, searchReq) + err = merr.CheckRPCCall(searchResult, err) + s.NoError(err) + + checkWaitGroup := sync.WaitGroup{} + + checkQuerySegmentInfo := func() bool { + querySegmentInfo, err := c.Proxy.GetQuerySegmentInfo(ctx, &milvuspb.GetQuerySegmentInfoRequest{ + DbName: dbName, + CollectionName: collectionName, + }) + s.NoError(err) + + var queryRows int64 = 0 + for _, seg := range querySegmentInfo.Infos { + queryRows += seg.NumRows + } + + return queryRows == rowNum + } + + checkWaitGroup.Add(1) + go func() { + defer checkWaitGroup.Done() + timeoutCtx, cancelFunc := context.WithTimeout(ctx, time.Minute*2) defer cancelFunc() for { select { case <-timeoutCtx.Done(): - s.Fail("search check timeout") - case report := <-c.Extension.GetReportChan(): - reportInfo := report.(map[string]any) - log.Info("search report info", zap.Any("reportInfo", reportInfo)) - s.Equal(hookutil.OpTypeSearch, reportInfo[hookutil.OpTypeKey]) - s.NotEqualValues(0, reportInfo[hookutil.ResultDataSizeKey]) - s.NotEqualValues(0, reportInfo[hookutil.RelatedDataSizeKey]) - s.EqualValues(rowNum, reportInfo[hookutil.RelatedCntKey]) + s.Fail("check query segment info timeout") return + default: + if checkQuerySegmentInfo() { + return + } } + time.Sleep(time.Second * 3) } - } - go searchCheckReport() - searchResult, err := c.Proxy.Search(ctx, searchReq) - err = merr.CheckRPCCall(searchResult, err) - s.NoError(err) - - querySegmentInfo, err := c.Proxy.GetQuerySegmentInfo(ctx, &milvuspb.GetQuerySegmentInfoRequest{ - DbName: dbName, - CollectionName: collectionName, - }) - var queryRows int64 = 0 - for _, seg := range querySegmentInfo.Infos { - queryRows += seg.NumRows - } - s.Equal(int64(3000), queryRows) + }() + checkWaitGroup.Wait() log.Info("TestClusteringCompaction succeed") } @@ -230,10 +292,18 @@ func ConstructScalarClusteringSchema(collection string, dim int, autoID bool, fi TypeParams: nil, IndexParams: nil, AutoID: autoID, + IsClusteringKey: false, + } + clusteringField := &schemapb.FieldSchema{ + FieldID: 101, + Name: "clustering", + IsPrimaryKey: false, + Description: "clustering key", + DataType: schemapb.DataType_Int64, IsClusteringKey: true, } fVec := &schemapb.FieldSchema{ - FieldID: 101, + FieldID: 102, Name: integration.FloatVecField, IsPrimaryKey: false, Description: "", @@ -249,7 +319,7 @@ func ConstructScalarClusteringSchema(collection string, dim int, autoID bool, fi return &schemapb.CollectionSchema{ Name: collection, AutoID: autoID, - Fields: []*schemapb.FieldSchema{pk, fVec}, + Fields: []*schemapb.FieldSchema{pk, clusteringField, fVec}, } }