diff --git a/internal/datacoord/compaction_task_clustering.go b/internal/datacoord/compaction_task_clustering.go index 3ada077f82..b54f4b177c 100644 --- a/internal/datacoord/compaction_task_clustering.go +++ b/internal/datacoord/compaction_task_clustering.go @@ -279,10 +279,15 @@ func (t *clusteringCompactionTask) processMetaSaved() error { func (t *clusteringCompactionTask) processIndexing() error { // wait for segment indexed collectionIndexes := t.meta.GetIndexMeta().GetIndexesForCollection(t.GetCollectionID(), "") + if len(collectionIndexes) == 0 { + log.Debug("the collection has no index, no need to do indexing") + return t.completeTask() + } indexed := func() bool { for _, collectionIndex := range collectionIndexes { for _, segmentID := range t.ResultSegments { segmentIndexState := t.meta.GetIndexMeta().GetSegmentIndexState(t.GetCollectionID(), segmentID, collectionIndex.IndexID) + log.Debug("segment index state", zap.String("segment", segmentIndexState.String())) if segmentIndexState.GetState() != commonpb.IndexState_Finished { return false } @@ -292,7 +297,7 @@ func (t *clusteringCompactionTask) processIndexing() error { }() log.Debug("check compaction result segments index states", zap.Bool("indexed", indexed), zap.Int64("planID", t.GetPlanID()), zap.Int64s("segments", t.ResultSegments)) if indexed { - t.completeTask() + return t.completeTask() } return nil } diff --git a/internal/datacoord/compaction_task_clustering_test.go b/internal/datacoord/compaction_task_clustering_test.go index 2aaf484dbd..0e95223519 100644 --- a/internal/datacoord/compaction_task_clustering_test.go +++ b/internal/datacoord/compaction_task_clustering_test.go @@ -29,7 +29,9 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/metastore/kv/datacoord" + "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/merr" @@ -42,18 +44,20 @@ func TestClusteringCompactionTaskSuite(t *testing.T) { type ClusteringCompactionTaskSuite struct { suite.Suite - mockID atomic.Int64 - mockAlloc *NMockAllocator - meta *meta - mockSessMgr *MockSessionManager - handler *NMockHandler - session *MockSessionManager + mockID atomic.Int64 + mockAlloc *NMockAllocator + meta *meta + mockSessMgr *MockSessionManager + handler *NMockHandler + session *MockSessionManager + analyzeScheduler *taskScheduler } func (s *ClusteringCompactionTaskSuite) SetupTest() { + ctx := context.Background() cm := storage.NewLocalChunkManager(storage.RootPath("")) catalog := datacoord.NewCatalog(NewMetaMemoryKV(), "", "") - meta, err := newMeta(context.TODO(), catalog, cm) + meta, err := newMeta(ctx, catalog, cm) s.NoError(err) s.meta = meta @@ -75,6 +79,9 @@ func (s *ClusteringCompactionTaskSuite) SetupTest() { s.handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{}, nil).Maybe() s.session = NewMockSessionManager(s.T()) + + scheduler := newTaskScheduler(ctx, s.meta, nil, cm, newIndexEngineVersionManager(), nil) + s.analyzeScheduler = scheduler } func (s *ClusteringCompactionTaskSuite) SetupSubTest() { @@ -82,8 +89,6 @@ func (s *ClusteringCompactionTaskSuite) SetupSubTest() { } func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChange() { - channel := "Ch-1" - s.meta.AddSegment(context.TODO(), &SegmentInfo{ SegmentInfo: &datapb.SegmentInfo{ ID: 101, @@ -99,39 +104,9 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang PartitionStatsVersion: 10000, }, }) - session := NewSessionManagerImpl() + s.session.EXPECT().Compaction(mock.Anything, mock.Anything, mock.Anything).Return(nil) - schema := ConstructScalarClusteringSchema("TestClusteringCompactionTask", 32, true) - pk := &schemapb.FieldSchema{ - FieldID: 100, - Name: Int64Field, - IsPrimaryKey: true, - Description: "", - DataType: schemapb.DataType_Int64, - TypeParams: nil, - IndexParams: nil, - AutoID: true, - IsClusteringKey: true, - } - - task := &clusteringCompactionTask{ - CompactionTask: &datapb.CompactionTask{ - PlanID: 1, - TriggerID: 19530, - CollectionID: 1, - PartitionID: 10, - Channel: channel, - Type: datapb.CompactionType_ClusteringCompaction, - NodeID: 1, - State: datapb.CompactionTaskState_pipelining, - Schema: schema, - ClusteringKeyField: pk, - InputSegments: []int64{101, 102}, - ResultSegments: []int64{1000, 1100}, - }, - meta: s.meta, - sessions: session, - } + task := s.generateBasicTask(false) task.processPipelining() @@ -178,40 +153,49 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang s.Equal(int64(0), seg42.PartitionStatsVersion) } -func (s *ClusteringCompactionTaskSuite) generateBasicTask() *clusteringCompactionTask { - schema := ConstructScalarClusteringSchema("TestClusteringCompactionTask", 32, true) - pk := &schemapb.FieldSchema{ - FieldID: 100, - Name: Int64Field, - IsPrimaryKey: true, - DataType: schemapb.DataType_Int64, - AutoID: true, - IsClusteringKey: true, +func (s *ClusteringCompactionTaskSuite) generateBasicTask(vectorClusteringKey bool) *clusteringCompactionTask { + schema := ConstructClusteringSchema("TestClusteringCompactionTask", 32, true, vectorClusteringKey) + var pk *schemapb.FieldSchema + if vectorClusteringKey { + pk = &schemapb.FieldSchema{ + FieldID: 101, + Name: FloatVecField, + IsPrimaryKey: false, + DataType: schemapb.DataType_FloatVector, + IsClusteringKey: true, + } + } else { + pk = &schemapb.FieldSchema{ + FieldID: 100, + Name: Int64Field, + IsPrimaryKey: true, + DataType: schemapb.DataType_Int64, + AutoID: true, + IsClusteringKey: true, + } } - task := &clusteringCompactionTask{ - CompactionTask: &datapb.CompactionTask{ - PlanID: 1, - TriggerID: 19530, - CollectionID: 1, - PartitionID: 10, - Type: datapb.CompactionType_ClusteringCompaction, - NodeID: 1, - State: datapb.CompactionTaskState_pipelining, - Schema: schema, - ClusteringKeyField: pk, - InputSegments: []int64{101, 102}, - ResultSegments: []int64{1000, 1100}, - }, - meta: s.meta, - handler: s.handler, - sessions: s.session, + compactionTask := &datapb.CompactionTask{ + PlanID: 1, + TriggerID: 19530, + CollectionID: 1, + PartitionID: 10, + Type: datapb.CompactionType_ClusteringCompaction, + NodeID: 1, + State: datapb.CompactionTaskState_pipelining, + Schema: schema, + ClusteringKeyField: pk, + InputSegments: []int64{101, 102}, + ResultSegments: []int64{1000, 1100}, } + + task := newClusteringCompactionTask(compactionTask, s.meta, s.session, s.handler, s.analyzeScheduler) + task.maxRetryTimes = 0 return task } func (s *ClusteringCompactionTaskSuite) TestProcessRetryLogic() { - task := s.generateBasicTask() + task := s.generateBasicTask(false) task.maxRetryTimes = 3 // process pipelining fail s.Equal(false, task.Process()) @@ -226,96 +210,242 @@ func (s *ClusteringCompactionTaskSuite) TestProcessRetryLogic() { s.Equal(datapb.CompactionTaskState_failed, task.GetState()) } -func (s *ClusteringCompactionTaskSuite) TestProcessStateChange() { - task := s.generateBasicTask() - - // process pipelining fail - s.Equal(false, task.Process()) - s.Equal(datapb.CompactionTaskState_failed, task.GetState()) - - // process pipelining succeed - s.meta.AddSegment(context.TODO(), &SegmentInfo{ - SegmentInfo: &datapb.SegmentInfo{ - ID: 101, - State: commonpb.SegmentState_Flushed, - Level: datapb.SegmentLevel_L1, - }, - }) - s.meta.AddSegment(context.TODO(), &SegmentInfo{ - SegmentInfo: &datapb.SegmentInfo{ - ID: 102, - State: commonpb.SegmentState_Flushed, - Level: datapb.SegmentLevel_L2, - PartitionStatsVersion: 10000, - }, +func (s *ClusteringCompactionTaskSuite) TestProcessPipelining() { + s.Run("process pipelining fail, segment not found", func() { + task := s.generateBasicTask(false) + task.State = datapb.CompactionTaskState_pipelining + s.Equal(false, task.Process()) + s.Equal(datapb.CompactionTaskState_failed, task.GetState()) }) - s.session.EXPECT().Compaction(mock.Anything, mock.Anything, mock.Anything).Return(nil) - task.State = datapb.CompactionTaskState_pipelining - s.Equal(false, task.Process()) - s.Equal(datapb.CompactionTaskState_executing, task.GetState()) - - // process executing - s.session.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(nil, merr.WrapErrNodeNotFound(1)).Once() - s.Equal(false, task.Process()) - s.Equal(datapb.CompactionTaskState_pipelining, task.GetState()) - - // repipelining - s.Equal(false, task.Process()) - s.Equal(datapb.CompactionTaskState_pipelining, task.GetState()) - task.NodeID = 1 - s.Equal(false, task.Process()) - s.Equal(datapb.CompactionTaskState_executing, task.GetState()) - - // process executing - s.session.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(nil, nil).Once() - s.Equal(false, task.Process()) - s.Equal(datapb.CompactionTaskState_executing, task.GetState()) - s.session.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(&datapb.CompactionPlanResult{ - State: datapb.CompactionTaskState_executing, - }, nil).Once() - s.Equal(false, task.Process()) - s.Equal(datapb.CompactionTaskState_executing, task.GetState()) - s.session.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(&datapb.CompactionPlanResult{ - State: datapb.CompactionTaskState_completed, - Segments: []*datapb.CompactionSegment{ - { - SegmentID: 1000, + s.Run("pipelining fail, no datanode slot", func() { + task := s.generateBasicTask(false) + s.meta.AddSegment(context.TODO(), &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 101, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L1, }, - { - SegmentID: 1001, + }) + s.meta.AddSegment(context.TODO(), &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 102, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L2, + PartitionStatsVersion: 10000, }, - }, - }, nil).Once() - s.Equal(false, task.Process()) - s.Equal(datapb.CompactionTaskState_indexing, task.GetState()) + }) + s.session.EXPECT().Compaction(mock.Anything, mock.Anything, mock.Anything).Return(merr.WrapErrDataNodeSlotExhausted()) + task.State = datapb.CompactionTaskState_pipelining + s.False(task.Process()) + s.Equal(int64(NullNodeID), task.GetNodeID()) + }) + + s.Run("process succeed, scalar clustering key", func() { + task := s.generateBasicTask(false) + task.State = datapb.CompactionTaskState_pipelining + s.meta.AddSegment(context.TODO(), &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 101, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L1, + }, + }) + s.meta.AddSegment(context.TODO(), &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 102, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L2, + PartitionStatsVersion: 10000, + }, + }) + s.session.EXPECT().Compaction(mock.Anything, mock.Anything, mock.Anything).Return(nil) + task.State = datapb.CompactionTaskState_pipelining + s.Equal(false, task.Process()) + s.Equal(datapb.CompactionTaskState_executing, task.GetState()) + }) + + s.Run("process succeed, vector clustering key", func() { + task := s.generateBasicTask(true) + task.State = datapb.CompactionTaskState_pipelining + s.meta.AddSegment(context.TODO(), &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 101, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L1, + }, + }) + s.meta.AddSegment(context.TODO(), &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 102, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L2, + PartitionStatsVersion: 10000, + }, + }) + task.State = datapb.CompactionTaskState_pipelining + s.Equal(false, task.Process()) + s.Equal(datapb.CompactionTaskState_analyzing, task.GetState()) + }) } func (s *ClusteringCompactionTaskSuite) TestProcessExecuting() { - s.meta.AddSegment(context.TODO(), &SegmentInfo{ - SegmentInfo: &datapb.SegmentInfo{ - ID: 101, - State: commonpb.SegmentState_Flushed, - Level: datapb.SegmentLevel_L1, - }, + s.Run("process executing, get compaction result fail", func() { + task := s.generateBasicTask(false) + task.State = datapb.CompactionTaskState_executing + s.meta.AddSegment(context.TODO(), &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 101, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L1, + }, + }) + s.meta.AddSegment(context.TODO(), &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 102, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L2, + PartitionStatsVersion: 10000, + }, + }) + s.session.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(nil, merr.WrapErrNodeNotFound(1)).Once() + s.Equal(false, task.Process()) + s.Equal(datapb.CompactionTaskState_pipelining, task.GetState()) }) - s.meta.AddSegment(context.TODO(), &SegmentInfo{ - SegmentInfo: &datapb.SegmentInfo{ - ID: 102, - State: commonpb.SegmentState_Flushed, - Level: datapb.SegmentLevel_L2, - PartitionStatsVersion: 10000, - }, + + s.Run("process executing, compaction result not ready", func() { + task := s.generateBasicTask(false) + task.State = datapb.CompactionTaskState_executing + s.meta.AddSegment(context.TODO(), &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 101, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L1, + }, + }) + s.meta.AddSegment(context.TODO(), &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 102, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L2, + PartitionStatsVersion: 10000, + }, + }) + s.session.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(nil, nil).Once() + s.Equal(false, task.Process()) + s.Equal(datapb.CompactionTaskState_executing, task.GetState()) + s.session.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(&datapb.CompactionPlanResult{ + State: datapb.CompactionTaskState_executing, + }, nil).Once() + s.Equal(false, task.Process()) + s.Equal(datapb.CompactionTaskState_executing, task.GetState()) + }) + + s.Run("process executing, scalar clustering key, compaction result ready", func() { + task := s.generateBasicTask(false) + task.State = datapb.CompactionTaskState_executing + s.meta.AddSegment(context.TODO(), &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 101, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L1, + }, + }) + s.meta.AddSegment(context.TODO(), &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 102, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L2, + PartitionStatsVersion: 10000, + }, + }) + s.session.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(&datapb.CompactionPlanResult{ + State: datapb.CompactionTaskState_completed, + Segments: []*datapb.CompactionSegment{ + { + SegmentID: 1000, + }, + { + SegmentID: 1001, + }, + }, + }, nil).Once() + s.Equal(false, task.Process()) + s.Equal(datapb.CompactionTaskState_indexing, task.GetState()) + }) + + s.Run("process executing, compaction result ready", func() { + task := s.generateBasicTask(false) + task.State = datapb.CompactionTaskState_executing + s.meta.AddSegment(context.TODO(), &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 101, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L1, + }, + }) + s.meta.AddSegment(context.TODO(), &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 102, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L2, + PartitionStatsVersion: 10000, + }, + }) + s.session.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(&datapb.CompactionPlanResult{ + State: datapb.CompactionTaskState_completed, + Segments: []*datapb.CompactionSegment{ + { + SegmentID: 1000, + }, + { + SegmentID: 1001, + }, + }, + }, nil).Once() + s.Equal(false, task.Process()) + s.Equal(datapb.CompactionTaskState_indexing, task.GetState()) + }) + + s.Run("process executing, compaction result timeout", func() { + task := s.generateBasicTask(false) + task.State = datapb.CompactionTaskState_executing + task.StartTime = time.Now().Unix() + task.TimeoutInSeconds = 1 + s.meta.AddSegment(context.TODO(), &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 101, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L1, + }, + }) + s.meta.AddSegment(context.TODO(), &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 102, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L2, + PartitionStatsVersion: 10000, + }, + }) + s.session.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(&datapb.CompactionPlanResult{ + State: datapb.CompactionTaskState_executing, + Segments: []*datapb.CompactionSegment{ + { + SegmentID: 1000, + }, + { + SegmentID: 1001, + }, + }, + }, nil).Once() + time.Sleep(time.Second * 1) + s.Equal(true, task.Process()) + s.Equal(datapb.CompactionTaskState_cleaned, task.GetState()) }) - task := s.generateBasicTask() - s.session.EXPECT().Compaction(mock.Anything, mock.Anything, mock.Anything).Return(merr.WrapErrDataNodeSlotExhausted()) - task.State = datapb.CompactionTaskState_pipelining - s.NoError(task.doCompact()) - s.Equal(int64(NullNodeID), task.GetNodeID()) } func (s *ClusteringCompactionTaskSuite) TestProcessExecutingState() { - task := s.generateBasicTask() + task := s.generateBasicTask(false) s.session.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(&datapb.CompactionPlanResult{ State: datapb.CompactionTaskState_failed, }, nil).Once() @@ -323,7 +453,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessExecutingState() { s.Equal(datapb.CompactionTaskState_failed, task.GetState()) s.session.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(&datapb.CompactionPlanResult{ - State: datapb.CompactionTaskState_indexing, + State: datapb.CompactionTaskState_failed, }, nil).Once() s.NoError(task.processExecuting()) s.Equal(datapb.CompactionTaskState_failed, task.GetState()) @@ -355,9 +485,138 @@ func (s *ClusteringCompactionTaskSuite) TestProcessExecutingState() { s.Equal(datapb.CompactionTaskState_failed, task.GetState()) } +func (s *ClusteringCompactionTaskSuite) TestProcessIndexingState() { + s.Run("collection has no index", func() { + task := s.generateBasicTask(false) + task.State = datapb.CompactionTaskState_indexing + s.True(task.Process()) + s.Equal(datapb.CompactionTaskState_completed, task.GetState()) + }) + + s.Run("collection has index, segment is not indexed", func() { + task := s.generateBasicTask(false) + task.State = datapb.CompactionTaskState_indexing + index := &model.Index{ + CollectionID: 1, + IndexID: 3, + } + err := s.meta.indexMeta.CreateIndex(index) + s.NoError(err) + + s.False(task.Process()) + s.Equal(datapb.CompactionTaskState_indexing, task.GetState()) + }) + + s.Run("collection has index, segment indexed", func() { + task := s.generateBasicTask(false) + task.State = datapb.CompactionTaskState_indexing + index := &model.Index{ + CollectionID: 1, + IndexID: 3, + } + err := s.meta.indexMeta.CreateIndex(index) + s.NoError(err) + + s.meta.indexMeta.updateSegmentIndex(&model.SegmentIndex{ + IndexID: 3, + SegmentID: 1000, + CollectionID: 1, + IndexState: commonpb.IndexState_Finished, + }) + s.meta.indexMeta.updateSegmentIndex(&model.SegmentIndex{ + IndexID: 3, + SegmentID: 1100, + CollectionID: 1, + IndexState: commonpb.IndexState_Finished, + }) + + s.True(task.Process()) + s.Equal(datapb.CompactionTaskState_completed, task.GetState()) + }) +} + +func (s *ClusteringCompactionTaskSuite) TestProcessAnalyzingState() { + s.Run("analyze task not found", func() { + task := s.generateBasicTask(false) + task.State = datapb.CompactionTaskState_analyzing + s.False(task.Process()) + s.Equal(datapb.CompactionTaskState_failed, task.GetState()) + }) + + s.Run("analyze task failed", func() { + task := s.generateBasicTask(false) + task.State = datapb.CompactionTaskState_analyzing + task.AnalyzeTaskID = 7 + t := &indexpb.AnalyzeTask{ + CollectionID: task.CollectionID, + PartitionID: task.PartitionID, + FieldID: task.ClusteringKeyField.FieldID, + SegmentIDs: task.InputSegments, + TaskID: 7, + State: indexpb.JobState_JobStateFailed, + } + s.meta.analyzeMeta.AddAnalyzeTask(t) + s.False(task.Process()) + s.Equal(datapb.CompactionTaskState_failed, task.GetState()) + }) + + s.Run("analyze task fake finish, vector not support", func() { + task := s.generateBasicTask(false) + task.State = datapb.CompactionTaskState_analyzing + task.AnalyzeTaskID = 7 + t := &indexpb.AnalyzeTask{ + CollectionID: task.CollectionID, + PartitionID: task.PartitionID, + FieldID: task.ClusteringKeyField.FieldID, + SegmentIDs: task.InputSegments, + TaskID: 7, + State: indexpb.JobState_JobStateFinished, + CentroidsFile: "", + } + s.meta.analyzeMeta.AddAnalyzeTask(t) + s.False(task.Process()) + s.Equal(datapb.CompactionTaskState_failed, task.GetState()) + }) + + s.Run("analyze task finished", func() { + task := s.generateBasicTask(false) + task.State = datapb.CompactionTaskState_analyzing + task.AnalyzeTaskID = 7 + t := &indexpb.AnalyzeTask{ + CollectionID: task.CollectionID, + PartitionID: task.PartitionID, + FieldID: task.ClusteringKeyField.FieldID, + SegmentIDs: task.InputSegments, + TaskID: 7, + State: indexpb.JobState_JobStateFinished, + CentroidsFile: "somewhere", + } + s.meta.analyzeMeta.AddAnalyzeTask(t) + s.meta.AddSegment(context.TODO(), &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 101, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L1, + }, + }) + s.meta.AddSegment(context.TODO(), &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 102, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L2, + PartitionStatsVersion: 10000, + }, + }) + s.session.EXPECT().Compaction(mock.Anything, mock.Anything, mock.Anything).Return(nil) + + s.False(task.Process()) + s.Equal(datapb.CompactionTaskState_executing, task.GetState()) + }) +} + // fix: https://github.com/milvus-io/milvus/issues/35110 func (s *ClusteringCompactionTaskSuite) TestCompleteTask() { - task := s.generateBasicTask() + task := s.generateBasicTask(false) task.completeTask() partitionStats := s.meta.GetPartitionStatsMeta().GetPartitionStats(task.GetCollectionID(), task.GetPartitionID(), task.GetChannel(), task.GetPlanID()) s.True(partitionStats.GetCommitTime() > time.Now().Add(-2*time.Second).Unix()) @@ -368,7 +627,7 @@ const ( FloatVecField = "floatVecField" ) -func ConstructScalarClusteringSchema(collection string, dim int, autoID bool, fields ...*schemapb.FieldSchema) *schemapb.CollectionSchema { +func ConstructClusteringSchema(collection string, dim int, autoID bool, vectorClusteringKey bool, fields ...*schemapb.FieldSchema) *schemapb.CollectionSchema { // if fields are specified, construct it if len(fields) > 0 { return &schemapb.CollectionSchema{ @@ -380,15 +639,14 @@ func ConstructScalarClusteringSchema(collection string, dim int, autoID bool, fi // if no field is specified, use default pk := &schemapb.FieldSchema{ - FieldID: 100, - Name: Int64Field, - IsPrimaryKey: true, - Description: "", - DataType: schemapb.DataType_Int64, - TypeParams: nil, - IndexParams: nil, - AutoID: autoID, - IsClusteringKey: true, + FieldID: 100, + Name: Int64Field, + IsPrimaryKey: true, + Description: "", + DataType: schemapb.DataType_Int64, + TypeParams: nil, + IndexParams: nil, + AutoID: autoID, } fVec := &schemapb.FieldSchema{ FieldID: 101, @@ -404,6 +662,13 @@ func ConstructScalarClusteringSchema(collection string, dim int, autoID bool, fi }, IndexParams: nil, } + + if vectorClusteringKey { + pk.IsClusteringKey = true + } else { + fVec.IsClusteringKey = true + } + return &schemapb.CollectionSchema{ Name: collection, AutoID: autoID,