diff --git a/internal/datacoord/compaction_trigger_v2.go b/internal/datacoord/compaction_trigger_v2.go index 150e572df6..6274e04a03 100644 --- a/internal/datacoord/compaction_trigger_v2.go +++ b/internal/datacoord/compaction_trigger_v2.go @@ -315,7 +315,7 @@ func (m *CompactionTriggerManager) SubmitClusteringViewToScheduler(ctx context.C return } - resultSegmentNum := totalRows / preferSegmentRows * 2 + resultSegmentNum := (totalRows/preferSegmentRows + 1) * 2 start, end, err := m.allocator.AllocN(resultSegmentNum) if err != nil { log.Warn("pre-allocate result segments failed", zap.String("view", view.String()), zap.Error(err)) diff --git a/internal/datanode/compaction/clustering_compactor.go b/internal/datanode/compaction/clustering_compactor.go index 7391f4ac4b..86772f68ea 100644 --- a/internal/datanode/compaction/clustering_compactor.go +++ b/internal/datanode/compaction/clustering_compactor.go @@ -189,6 +189,7 @@ func (t *clusteringCompactionTask) init() error { logIDAlloc := allocator.NewLocalAllocator(t.plan.GetBeginLogID(), math.MaxInt64) segIDAlloc := allocator.NewLocalAllocator(t.plan.GetPreAllocatedSegmentIDs().GetBegin(), t.plan.GetPreAllocatedSegmentIDs().GetEnd()) + log.Info("segment ID range", zap.Int64("begin", t.plan.GetPreAllocatedSegmentIDs().GetBegin()), zap.Int64("end", t.plan.GetPreAllocatedSegmentIDs().GetEnd())) t.logIDAlloc = logIDAlloc t.segIDAlloc = segIDAlloc @@ -197,11 +198,6 @@ func (t *clusteringCompactionTask) init() error { return merr.WrapErrIllegalCompactionPlan("empty schema in compactionPlan") } for _, field := range t.plan.Schema.Fields { - // todo(wayblink): supprot null in clustring compact - if field.GetNullable() { - return merr.WrapErrParameterInvalidMsg(fmt.Sprintf("clustering compaction can't be trigger in field(%s) which set nullable == true", field.GetName())) - } - if field.GetIsPrimaryKey() && field.GetFieldID() >= 100 && typeutil.IsPrimaryFieldType(field.GetDataType()) { pkField = field } @@ -252,10 +248,12 @@ func (t *clusteringCompactionTask) Compact() (*datapb.CompactionPlanResult, erro // 2, get analyze result if t.isVectorClusteringKey { if err := t.getVectorAnalyzeResult(ctx); err != nil { + log.Error("failed in analyze vector", zap.Error(err)) return nil, err } } else { if err := t.getScalarAnalyzeResult(ctx); err != nil { + log.Error("failed in analyze scalar", zap.Error(err)) return nil, err } } @@ -264,6 +262,7 @@ func (t *clusteringCompactionTask) Compact() (*datapb.CompactionPlanResult, erro log.Info("Clustering compaction start mapping", zap.Int("bufferNum", len(t.clusterBuffers))) uploadSegments, partitionStats, err := t.mapping(ctx, deltaPk2Ts) if err != nil { + log.Error("failed in mapping", zap.Error(err)) return nil, err } @@ -297,9 +296,9 @@ func (t *clusteringCompactionTask) getScalarAnalyzeResult(ctx context.Context) e if err != nil { return err } - plan := t.scalarPlan(analyzeDict) + buckets, containsNull := t.splitClusterByScalarValue(analyzeDict) scalarToClusterBufferMap := make(map[interface{}]*ClusterBuffer, 0) - for id, bucket := range plan { + for id, bucket := range buckets { fieldStats, err := storage.NewFieldStats(t.clusteringKeyField.FieldID, t.clusteringKeyField.DataType, 0) if err != nil { return err @@ -323,7 +322,29 @@ func (t *clusteringCompactionTask) getScalarAnalyzeResult(ctx context.Context) e scalarToClusterBufferMap[key] = buffer } } + var nullBuffer *ClusterBuffer + if containsNull { + fieldStats, err := storage.NewFieldStats(t.clusteringKeyField.FieldID, t.clusteringKeyField.DataType, 0) + if err != nil { + return err + } + nullBuffer = &ClusterBuffer{ + id: len(buckets), + flushedRowNum: map[typeutil.UniqueID]atomic.Int64{}, + flushedBinlogs: make(map[typeutil.UniqueID]map[typeutil.UniqueID]*datapb.FieldBinlog, 0), + uploadedSegments: make([]*datapb.CompactionSegment, 0), + uploadedSegmentStats: make(map[typeutil.UniqueID]storage.SegmentStats, 0), + clusteringKeyFieldStats: fieldStats, // null stats + } + if _, err = t.refreshBufferWriterWithPack(nullBuffer); err != nil { + return err + } + t.clusterBuffers = append(t.clusterBuffers, nullBuffer) + } t.keyToBufferFunc = func(key interface{}) *ClusterBuffer { + if key == nil { + return nullBuffer + } // todo: if keys are too many, the map will be quite large, we should mark the range of each buffer and select buffer by range return scalarToClusterBufferMap[key] } @@ -1011,17 +1032,7 @@ func (t *clusteringCompactionTask) scalarAnalyze(ctx context.Context) (map[inter var mutex sync.Mutex analyzeDict := make(map[interface{}]int64, 0) for _, segment := range inputSegments { - segmentClone := &datapb.CompactionSegmentBinlogs{ - SegmentID: segment.SegmentID, - FieldBinlogs: segment.FieldBinlogs, - Field2StatslogPaths: segment.Field2StatslogPaths, - Deltalogs: segment.Deltalogs, - InsertChannel: segment.InsertChannel, - Level: segment.Level, - CollectionID: segment.CollectionID, - PartitionID: segment.PartitionID, - IsSorted: segment.IsSorted, - } + segmentClone := proto.Clone(segment).(*datapb.CompactionSegmentBinlogs) future := t.mappingPool.Submit(func() (any, error) { analyzeResult, err := t.scalarAnalyzeSegment(ctx, segmentClone) mutex.Lock() @@ -1086,39 +1097,39 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment( for idx := 0; idx < binlogNum; idx++ { var ps []string for _, f := range segment.GetFieldBinlogs() { - // todo add a new reader only read one column - if f.FieldID == t.primaryKeyField.GetFieldID() || f.FieldID == t.clusteringKeyField.GetFieldID() || f.FieldID == common.RowIDField || f.FieldID == common.TimeStampField { - ps = append(ps, f.GetBinlogs()[idx].GetLogPath()) - } + ps = append(ps, f.GetBinlogs()[idx].GetLogPath()) } fieldBinlogPaths = append(fieldBinlogPaths, ps) } - for _, path := range fieldBinlogPaths { - bytesArr, err := t.binlogIO.Download(ctx, path) - blobs := make([]*storage.Blob, len(bytesArr)) - for i := range bytesArr { - blobs[i] = &storage.Blob{Value: bytesArr[i]} - } + for _, paths := range fieldBinlogPaths { + allValues, err := t.binlogIO.Download(ctx, paths) if err != nil { - log.Warn("download insertlogs wrong", zap.Strings("path", path), zap.Error(err)) + log.Warn("compact wrong, fail to download insertLogs", zap.Error(err)) + return nil, err + } + blobs := lo.Map(allValues, func(v []byte, i int) *storage.Blob { + return &storage.Blob{Key: paths[i], Value: v} + }) + + pkIter, err := storage.NewBinlogDeserializeReader(blobs, t.primaryKeyField.GetFieldID()) + if err != nil { + log.Warn("new insert binlogs Itr wrong", zap.Strings("path", paths), zap.Error(err)) return nil, err } - pkIter, err := storage.NewInsertBinlogIterator(blobs, t.primaryKeyField.GetFieldID(), t.primaryKeyField.GetDataType()) - if err != nil { - log.Warn("new insert binlogs Itr wrong", zap.Strings("path", path), zap.Error(err)) - return nil, err - } - - // log.Info("pkIter.RowNum()", zap.Int("pkIter.RowNum()", pkIter.RowNum()), zap.Bool("hasNext", pkIter.HasNext())) - for pkIter.HasNext() { - vIter, _ := pkIter.Next() - v, ok := vIter.(*storage.Value) - if !ok { - log.Warn("transfer interface to Value wrong", zap.Strings("path", path)) - return nil, errors.New("unexpected error") + for { + err := pkIter.Next() + if err != nil { + if err == sio.EOF { + pkIter.Close() + break + } else { + log.Warn("compact wrong, failed to iter through data", zap.Error(err)) + return nil, err + } } + v := pkIter.Value() // Filtering expired entity ts := typeutil.Timestamp(v.Timestamp) @@ -1137,7 +1148,7 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment( // rowValue := vIter.GetData().(*iterators.InsertRow).GetValue() row, ok := v.Value.(map[typeutil.UniqueID]interface{}) if !ok { - log.Warn("transfer interface to map wrong", zap.Strings("path", path)) + log.Warn("transfer interface to map wrong", zap.Strings("path", paths)) return nil, errors.New("unexpected error") } key := row[t.clusteringKeyField.GetFieldID()] @@ -1158,12 +1169,16 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment( return analyzeResult, nil } -func (t *clusteringCompactionTask) scalarPlan(dict map[interface{}]int64) [][]interface{} { +func (t *clusteringCompactionTask) splitClusterByScalarValue(dict map[interface{}]int64) ([][]interface{}, bool) { keys := lo.MapToSlice(dict, func(k interface{}, _ int64) interface{} { return k }) - sort.Slice(keys, func(i, j int) bool { - return storage.NewScalarFieldValue(t.clusteringKeyField.DataType, keys[i]).LE(storage.NewScalarFieldValue(t.clusteringKeyField.DataType, keys[j])) + + notNullKeys := lo.Filter(keys, func(i interface{}, j int) bool { + return i != nil + }) + sort.Slice(notNullKeys, func(i, j int) bool { + return storage.NewScalarFieldValue(t.clusteringKeyField.DataType, notNullKeys[i]).LE(storage.NewScalarFieldValue(t.clusteringKeyField.DataType, notNullKeys[j])) }) buckets := make([][]interface{}, 0) @@ -1171,8 +1186,8 @@ func (t *clusteringCompactionTask) scalarPlan(dict map[interface{}]int64) [][]in var currentBucketSize int64 = 0 maxRows := t.plan.MaxSegmentRows preferRows := t.plan.PreferSegmentRows - for _, key := range keys { - // todo can optimize + containsNull := len(keys) > len(notNullKeys) + for _, key := range notNullKeys { if dict[key] > preferRows { if len(currentBucket) != 0 { buckets = append(buckets, currentBucket) @@ -1195,7 +1210,7 @@ func (t *clusteringCompactionTask) scalarPlan(dict map[interface{}]int64) [][]in } } buckets = append(buckets, currentBucket) - return buckets + return buckets, containsNull } func (t *clusteringCompactionTask) refreshBufferWriterWithPack(buffer *ClusterBuffer) (bool, error) { diff --git a/internal/datanode/compaction/clustering_compactor_test.go b/internal/datanode/compaction/clustering_compactor_test.go index 8ed753410e..31973712c2 100644 --- a/internal/datanode/compaction/clustering_compactor_test.go +++ b/internal/datanode/compaction/clustering_compactor_test.go @@ -167,22 +167,6 @@ func (s *ClusteringCompactionTaskSuite) TestCompactionInit() { s.Equal(8, s.task.getWorkerPoolSize()) s.Equal(8, s.task.mappingPool.Cap()) s.Equal(8, s.task.flushPool.Cap()) - - s.task.plan.Schema = genCollectionSchema() - s.task.plan.Schema.Fields = append(s.task.plan.Schema.Fields, &schemapb.FieldSchema{ - FieldID: 104, - Name: "nullableFid", - DataType: schemapb.DataType_Int64, - Nullable: true, - }) - s.task.plan.ClusteringKeyField = 100 - s.task.plan.SegmentBinlogs = []*datapb.CompactionSegmentBinlogs{ - { - SegmentID: 100, - }, - } - err = s.task.init() - s.Require().Error(err) } func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormal() { diff --git a/tests/integration/compaction/clustering_compaction_null_data_test.go b/tests/integration/compaction/clustering_compaction_null_data_test.go new file mode 100644 index 0000000000..b286e996bc --- /dev/null +++ b/tests/integration/compaction/clustering_compaction_null_data_test.go @@ -0,0 +1,315 @@ +package compaction + +import ( + "context" + "fmt" + "strconv" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/suite" + "go.uber.org/zap" + "google.golang.org/protobuf/proto" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/metric" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/tests/integration" +) + +type ClusteringCompactionNullDataSuite struct { + integration.MiniClusterSuite +} + +func (s *ClusteringCompactionNullDataSuite) SetupSuite() { + paramtable.Init() + + paramtable.Get().Save(paramtable.Get().DataCoordCfg.TaskCheckInterval.Key, "1") + paramtable.Get().Save(paramtable.Get().DataCoordCfg.IndexTaskSchedulerInterval.Key, "100") + + s.Require().NoError(s.SetupEmbedEtcd()) +} + +func (s *ClusteringCompactionNullDataSuite) TestClusteringCompactionNullData() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + c := s.Cluster + + const ( + dim = 128 + dbName = "" + rowNum = 30000 + ) + + collectionName := "TestClusteringCompactionNullData" + funcutil.GenRandomStr() + + // 2000 rows for each segment, about 1MB. + paramtable.Get().Save(paramtable.Get().DataCoordCfg.SegmentMaxSize.Key, strconv.Itoa(1)) + defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.SegmentMaxSize.Key) + + paramtable.Get().Save(paramtable.Get().PulsarCfg.MaxMessageSize.Key, strconv.Itoa(500*1024)) + defer paramtable.Get().Reset(paramtable.Get().PulsarCfg.MaxMessageSize.Key) + + paramtable.Get().Save(paramtable.Get().DataNodeCfg.ClusteringCompactionWorkerPoolSize.Key, strconv.Itoa(8)) + defer paramtable.Get().Reset(paramtable.Get().DataNodeCfg.ClusteringCompactionWorkerPoolSize.Key) + + paramtable.Get().Save(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key, strconv.Itoa(102400)) + defer paramtable.Get().Reset(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key) + + paramtable.Get().Save(paramtable.Get().DataCoordCfg.EnableAutoCompaction.Key, "false") + defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.EnableAutoCompaction.Key) + + paramtable.Get().Save(paramtable.Get().DataCoordCfg.SegmentMaxSize.Key, "1") + defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.SegmentMaxSize.Key) + paramtable.Get().Save(paramtable.Get().DataCoordCfg.ClusteringCompactionMaxSegmentSizeRatio.Key, "1.0") + defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.ClusteringCompactionMaxSegmentSizeRatio.Key) + + paramtable.Get().Save(paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.Key, "1.0") + defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.Key) + + pk := &schemapb.FieldSchema{ + FieldID: 100, + Name: integration.Int64Field, + IsPrimaryKey: true, + Description: "", + DataType: schemapb.DataType_Int64, + TypeParams: nil, + IndexParams: nil, + AutoID: true, + IsClusteringKey: false, + } + clusteringField := &schemapb.FieldSchema{ + FieldID: 101, + Name: "clustering", + IsPrimaryKey: false, + Description: "clustering key", + DataType: schemapb.DataType_Int64, + IsClusteringKey: true, + Nullable: true, + } + fVec := &schemapb.FieldSchema{ + FieldID: 102, + Name: integration.FloatVecField, + IsPrimaryKey: false, + Description: "", + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: fmt.Sprintf("%d", dim), + }, + }, + IndexParams: nil, + } + schema := &schemapb.CollectionSchema{ + Name: collectionName, + AutoID: true, + Fields: []*schemapb.FieldSchema{pk, clusteringField, fVec}, + } + + marshaledSchema, err := proto.Marshal(schema) + s.NoError(err) + + createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ + DbName: dbName, + CollectionName: collectionName, + Schema: marshaledSchema, + ShardsNum: common.DefaultShardsNum, + }) + s.NoError(err) + if createCollectionStatus.GetErrorCode() != commonpb.ErrorCode_Success { + log.Warn("createCollectionStatus fail reason", zap.String("reason", createCollectionStatus.GetReason())) + } + s.Equal(createCollectionStatus.GetErrorCode(), commonpb.ErrorCode_Success) + + log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus)) + showCollectionsResp, err := c.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{}) + s.NoError(err) + s.Equal(showCollectionsResp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success) + log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp)) + + fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, rowNum, dim) + clusteringColumn := integration.NewInt64FieldDataNullableWithStart("clustering", rowNum, 1000) + hashKeys := integration.GenerateHashKeys(rowNum) + insertResult, err := c.Proxy.Insert(ctx, &milvuspb.InsertRequest{ + DbName: dbName, + CollectionName: collectionName, + FieldsData: []*schemapb.FieldData{clusteringColumn, fVecColumn}, + HashKeys: hashKeys, + NumRows: uint32(rowNum), + }) + s.NoError(err) + s.Equal(insertResult.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success) + + // flush + flushResp, err := c.Proxy.Flush(ctx, &milvuspb.FlushRequest{ + DbName: dbName, + CollectionNames: []string{collectionName}, + }) + s.NoError(err) + segmentIDs, has := flushResp.GetCollSegIDs()[collectionName] + ids := segmentIDs.GetData() + s.Require().NotEmpty(segmentIDs) + s.Require().True(has) + flushTs, has := flushResp.GetCollFlushTs()[collectionName] + s.True(has) + + segments, err := c.MetaWatcher.ShowSegments() + s.NoError(err) + s.NotEmpty(segments) + for _, segment := range segments { + log.Info("ShowSegments result", zap.String("segment", segment.String())) + } + s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName) + + indexType := integration.IndexFaissIvfFlat + metricType := metric.L2 + vecType := schemapb.DataType_FloatVector + + // create index + createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ + CollectionName: collectionName, + FieldName: fVecColumn.FieldName, + IndexName: "_default", + ExtraParams: integration.ConstructIndexParam(dim, indexType, metricType), + }) + if createIndexStatus.GetErrorCode() != commonpb.ErrorCode_Success { + log.Warn("createIndexStatus fail reason", zap.String("reason", createIndexStatus.GetReason())) + } + s.NoError(err) + s.Equal(commonpb.ErrorCode_Success, createIndexStatus.GetErrorCode()) + + s.WaitForIndexBuilt(ctx, collectionName, fVecColumn.FieldName) + + // load + loadStatus, err := c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ + DbName: dbName, + CollectionName: collectionName, + }) + s.NoError(err) + if loadStatus.GetErrorCode() != commonpb.ErrorCode_Success { + log.Warn("loadStatus fail reason", zap.String("reason", loadStatus.GetReason())) + } + s.Equal(commonpb.ErrorCode_Success, loadStatus.GetErrorCode()) + s.WaitForLoad(ctx, collectionName) + + compactReq := &milvuspb.ManualCompactionRequest{ + CollectionID: showCollectionsResp.CollectionIds[0], + MajorCompaction: true, + } + compactResp, err := c.Proxy.ManualCompaction(ctx, compactReq) + s.NoError(err) + log.Info("compact", zap.Any("compactResp", compactResp)) + + compacted := func() bool { + resp, err := c.Proxy.GetCompactionState(ctx, &milvuspb.GetCompactionStateRequest{ + CompactionID: compactResp.GetCompactionID(), + }) + if err != nil { + return false + } + return resp.GetState() == commonpb.CompactionState_Completed + } + for !compacted() { + time.Sleep(3 * time.Second) + } + desCollResp, err := c.Proxy.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{ + CollectionName: collectionName, + CollectionID: 0, + TimeStamp: 0, + }) + s.NoError(err) + s.Equal(desCollResp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success) + + flushedSegmentsResp, err := c.DataCoord.GetFlushedSegments(ctx, &datapb.GetFlushedSegmentsRequest{ + CollectionID: desCollResp.GetCollectionID(), + PartitionID: -1, + }) + s.NoError(err) + s.Equal(flushedSegmentsResp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success) + + // 30000*(128*4+8+8) = 15.1MB/1MB = 15+1 + // The check is done every 100 lines written, so the size of each segment may be up to 99 lines larger. + s.Contains([]int{15, 16}, len(flushedSegmentsResp.GetSegments())) + log.Info("get flushed segments done", zap.Int64s("segments", flushedSegmentsResp.GetSegments())) + totalRows := int64(0) + segsInfoResp, err := c.DataCoord.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{ + SegmentIDs: flushedSegmentsResp.GetSegments(), + }) + s.NoError(err) + s.Equal(segsInfoResp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success) + for _, segInfo := range segsInfoResp.GetInfos() { + s.LessOrEqual(segInfo.GetNumOfRows(), int64(1024*1024/128)) + totalRows += segInfo.GetNumOfRows() + } + + s.Equal(int64(rowNum), totalRows) + + log.Info("compact done") + + // search + expr := "clustering > 0" + nq := 10 + topk := 10 + roundDecimal := -1 + + params := integration.GetSearchParams(indexType, metricType) + searchReq := integration.ConstructSearchRequest("", collectionName, expr, + fVecColumn.FieldName, vecType, []string{"clustering"}, metricType, params, nq, dim, topk, roundDecimal) + + searchResult, err := c.Proxy.Search(ctx, searchReq) + err = merr.CheckRPCCall(searchResult, err) + s.NoError(err) + + checkWaitGroup := sync.WaitGroup{} + + checkQuerySegmentInfo := func() bool { + querySegmentInfo, err := c.Proxy.GetQuerySegmentInfo(ctx, &milvuspb.GetQuerySegmentInfoRequest{ + DbName: dbName, + CollectionName: collectionName, + }) + s.NoError(err) + + var queryRows int64 = 0 + for _, seg := range querySegmentInfo.Infos { + queryRows += seg.NumRows + } + + return queryRows == rowNum + } + + checkWaitGroup.Add(1) + go func() { + defer checkWaitGroup.Done() + timeoutCtx, cancelFunc := context.WithTimeout(ctx, time.Minute*2) + defer cancelFunc() + + for { + select { + case <-timeoutCtx.Done(): + s.Fail("check query segment info timeout") + return + default: + if checkQuerySegmentInfo() { + return + } + } + time.Sleep(time.Second * 3) + } + }() + + checkWaitGroup.Wait() + log.Info("TestClusteringCompactionNullData succeed") +} + +func TestClusteringCompactionNullData(t *testing.T) { + suite.Run(t, new(ClusteringCompactionNullDataSuite)) +} diff --git a/tests/integration/compaction/clustering_compaction_test.go b/tests/integration/compaction/clustering_compaction_test.go index 43e3ffcfec..7928973cf3 100644 --- a/tests/integration/compaction/clustering_compaction_test.go +++ b/tests/integration/compaction/clustering_compaction_test.go @@ -233,7 +233,7 @@ func (s *ClusteringCompactionSuite) TestClusteringCompaction() { log.Info("compact done") // search - expr := fmt.Sprintf("%s > 0", integration.Int64Field) + expr := "clustering > 0" nq := 10 topk := 10 roundDecimal := -1