diff --git a/internal/datacoord/index_service.go b/internal/datacoord/index_service.go index f72a0c3846..12ef274083 100644 --- a/internal/datacoord/index_service.go +++ b/internal/datacoord/index_service.go @@ -72,7 +72,7 @@ func (s *Server) createIndexForSegment(segment *SegmentInfo, indexID UniqueID) e return err } s.taskScheduler.enqueue(&indexBuildTask{ - buildID: buildID, + taskID: buildID, taskInfo: &indexpb.IndexTaskInfo{ BuildID: buildID, State: commonpb.IndexState_Unissued, diff --git a/internal/datacoord/task_analyze.go b/internal/datacoord/task_analyze.go index a7a6d36f18..d2532a23b8 100644 --- a/internal/datacoord/task_analyze.go +++ b/internal/datacoord/task_analyze.go @@ -34,10 +34,14 @@ import ( "github.com/milvus-io/milvus/pkg/util/typeutil" ) +var _ Task = (*analyzeTask)(nil) + type analyzeTask struct { taskID int64 nodeID int64 taskInfo *indexpb.AnalyzeResult + + req *indexpb.AnalyzeRequest } func (at *analyzeTask) GetTaskID() int64 { @@ -82,12 +86,12 @@ func (at *analyzeTask) UpdateMetaBuildingState(nodeID int64, meta *meta) error { return nil } -func (at *analyzeTask) AssignTask(ctx context.Context, client types.IndexNodeClient, dependency *taskScheduler) (bool, bool) { +func (at *analyzeTask) PreCheck(ctx context.Context, dependency *taskScheduler) bool { t := dependency.meta.analyzeMeta.GetTask(at.GetTaskID()) if t == nil { log.Ctx(ctx).Info("task is nil, delete it", zap.Int64("taskID", at.GetTaskID())) at.SetState(indexpb.JobState_JobStateNone, "analyze task is nil") - return false, false + return true } var storageConfig *indexpb.StorageConfig @@ -113,7 +117,7 @@ func (at *analyzeTask) AssignTask(ctx context.Context, client types.IndexNodeCli RequestTimeoutMs: Params.MinioCfg.RequestTimeoutMs.GetAsInt64(), } } - req := &indexpb.AnalyzeRequest{ + at.req = &indexpb.AnalyzeRequest{ ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(), TaskID: at.GetTaskID(), CollectionID: t.CollectionID, @@ -123,7 +127,7 @@ func (at *analyzeTask) AssignTask(ctx context.Context, client types.IndexNodeCli FieldType: t.FieldType, Dim: t.Dim, SegmentStats: make(map[int64]*indexpb.SegmentStats), - Version: t.Version, + Version: t.Version + 1, StorageConfig: storageConfig, } @@ -142,13 +146,13 @@ func (at *analyzeTask) AssignTask(ctx context.Context, client types.IndexNodeCli log.Ctx(ctx).Warn("analyze stats task is processing, but segment is nil, delete the task", zap.Int64("taskID", at.GetTaskID()), zap.Int64("segmentID", segID)) at.SetState(indexpb.JobState_JobStateFailed, fmt.Sprintf("segmentInfo with ID: %d is nil", segID)) - return false, false + return true } totalSegmentsRows += info.GetNumOfRows() // get binlogIDs binlogIDs := getBinLogIDs(info, t.FieldID) - req.SegmentStats[segID] = &indexpb.SegmentStats{ + at.req.SegmentStats[segID] = &indexpb.SegmentStats{ ID: segID, NumRows: info.GetNumOfRows(), LogIDs: binlogIDs, @@ -160,7 +164,7 @@ func (at *analyzeTask) AssignTask(ctx context.Context, client types.IndexNodeCli log.Ctx(ctx).Info("analyze task get collection info failed", zap.Int64("collectionID", segments[0].GetCollectionID()), zap.Error(err)) at.SetState(indexpb.JobState_JobStateInit, err.Error()) - return false, false + return true } schema := collInfo.Schema @@ -175,35 +179,39 @@ func (at *analyzeTask) AssignTask(ctx context.Context, client types.IndexNodeCli dim, err := storage.GetDimFromParams(field.TypeParams) if err != nil { at.SetState(indexpb.JobState_JobStateInit, err.Error()) - return false, false + return true } - req.Dim = int64(dim) + at.req.Dim = int64(dim) totalSegmentsRawDataSize := float64(totalSegmentsRows) * float64(dim) * typeutil.VectorTypeSize(t.FieldType) // Byte numClusters := int64(math.Ceil(totalSegmentsRawDataSize / float64(Params.DataCoordCfg.ClusteringCompactionPreferSegmentSize.GetAsSize()))) if numClusters < Params.DataCoordCfg.ClusteringCompactionMinCentroidsNum.GetAsInt64() { log.Ctx(ctx).Info("data size is too small, skip analyze task", zap.Float64("raw data size", totalSegmentsRawDataSize), zap.Int64("num clusters", numClusters), zap.Int64("minimum num clusters required", Params.DataCoordCfg.ClusteringCompactionMinCentroidsNum.GetAsInt64())) at.SetState(indexpb.JobState_JobStateFinished, "") - return true, true + return true } if numClusters > Params.DataCoordCfg.ClusteringCompactionMaxCentroidsNum.GetAsInt64() { numClusters = Params.DataCoordCfg.ClusteringCompactionMaxCentroidsNum.GetAsInt64() } - req.NumClusters = numClusters - req.MaxTrainSizeRatio = Params.DataCoordCfg.ClusteringCompactionMaxTrainSizeRatio.GetAsFloat() // control clustering train data size + at.req.NumClusters = numClusters + at.req.MaxTrainSizeRatio = Params.DataCoordCfg.ClusteringCompactionMaxTrainSizeRatio.GetAsFloat() // control clustering train data size // config to detect data skewness - req.MinClusterSizeRatio = Params.DataCoordCfg.ClusteringCompactionMinClusterSizeRatio.GetAsFloat() - req.MaxClusterSizeRatio = Params.DataCoordCfg.ClusteringCompactionMaxClusterSizeRatio.GetAsFloat() - req.MaxClusterSize = Params.DataCoordCfg.ClusteringCompactionMaxClusterSize.GetAsSize() + at.req.MinClusterSizeRatio = Params.DataCoordCfg.ClusteringCompactionMinClusterSizeRatio.GetAsFloat() + at.req.MaxClusterSizeRatio = Params.DataCoordCfg.ClusteringCompactionMaxClusterSizeRatio.GetAsFloat() + at.req.MaxClusterSize = Params.DataCoordCfg.ClusteringCompactionMaxClusterSize.GetAsSize() + return false +} + +func (at *analyzeTask) AssignTask(ctx context.Context, client types.IndexNodeClient) bool { ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval) defer cancel() resp, err := client.CreateJobV2(ctx, &indexpb.CreateJobV2Request{ - ClusterID: req.GetClusterID(), - TaskID: req.GetTaskID(), + ClusterID: at.req.GetClusterID(), + TaskID: at.req.GetTaskID(), JobType: indexpb.JobType_JobTypeAnalyzeJob, Request: &indexpb.CreateJobV2Request_AnalyzeRequest{ - AnalyzeRequest: req, + AnalyzeRequest: at.req, }, }) if err == nil { @@ -212,12 +220,12 @@ func (at *analyzeTask) AssignTask(ctx context.Context, client types.IndexNodeCli if err != nil { log.Ctx(ctx).Warn("assign analyze task to indexNode failed", zap.Int64("taskID", at.GetTaskID()), zap.Error(err)) at.SetState(indexpb.JobState_JobStateRetry, err.Error()) - return false, true + return false } log.Ctx(ctx).Info("analyze task assigned successfully", zap.Int64("taskID", at.GetTaskID())) at.SetState(indexpb.JobState_JobStateInProgress, "") - return true, false + return true } func (at *analyzeTask) setResult(result *indexpb.AnalyzeResult) { diff --git a/internal/datacoord/task_index.go b/internal/datacoord/task_index.go index 9a7277c558..effefe00ee 100644 --- a/internal/datacoord/task_index.go +++ b/internal/datacoord/task_index.go @@ -37,13 +37,17 @@ import ( ) type indexBuildTask struct { - buildID int64 + taskID int64 nodeID int64 taskInfo *indexpb.IndexTaskInfo + + req *indexpb.CreateJobRequest } +var _ Task = (*indexBuildTask)(nil) + func (it *indexBuildTask) GetTaskID() int64 { - return it.buildID + return it.taskID } func (it *indexBuildTask) GetNodeID() int64 { @@ -73,35 +77,35 @@ func (it *indexBuildTask) GetFailReason() string { } func (it *indexBuildTask) UpdateVersion(ctx context.Context, meta *meta) error { - return meta.indexMeta.UpdateVersion(it.buildID) + return meta.indexMeta.UpdateVersion(it.taskID) } func (it *indexBuildTask) UpdateMetaBuildingState(nodeID int64, meta *meta) error { it.nodeID = nodeID - return meta.indexMeta.BuildIndex(it.buildID, nodeID) + return meta.indexMeta.BuildIndex(it.taskID, nodeID) } -func (it *indexBuildTask) AssignTask(ctx context.Context, client types.IndexNodeClient, dependency *taskScheduler) (bool, bool) { - segIndex, exist := dependency.meta.indexMeta.GetIndexJob(it.buildID) +func (it *indexBuildTask) PreCheck(ctx context.Context, dependency *taskScheduler) bool { + segIndex, exist := dependency.meta.indexMeta.GetIndexJob(it.taskID) if !exist || segIndex == nil { - log.Ctx(ctx).Info("index task has not exist in meta table, remove task", zap.Int64("buildID", it.buildID)) + log.Ctx(ctx).Info("index task has not exist in meta table, remove task", zap.Int64("taskID", it.taskID)) it.SetState(indexpb.JobState_JobStateNone, "index task has not exist in meta table") - return false, false + return true } segment := dependency.meta.GetSegment(segIndex.SegmentID) if !isSegmentHealthy(segment) || !dependency.meta.indexMeta.IsIndexExist(segIndex.CollectionID, segIndex.IndexID) { - log.Ctx(ctx).Info("task is no need to build index, remove it", zap.Int64("buildID", it.buildID)) + log.Ctx(ctx).Info("task is no need to build index, remove it", zap.Int64("taskID", it.taskID)) it.SetState(indexpb.JobState_JobStateNone, "task is no need to build index") - return false, false + return true } indexParams := dependency.meta.indexMeta.GetIndexParams(segIndex.CollectionID, segIndex.IndexID) indexType := GetIndexType(indexParams) if isFlatIndex(indexType) || segIndex.NumRows < Params.DataCoordCfg.MinSegmentNumRowsToEnableIndex.GetAsInt64() { - log.Ctx(ctx).Info("segment does not need index really", zap.Int64("buildID", it.buildID), + log.Ctx(ctx).Info("segment does not need index really", zap.Int64("taskID", it.taskID), zap.Int64("segmentID", segIndex.SegmentID), zap.Int64("num rows", segIndex.NumRows)) it.SetState(indexpb.JobState_JobStateFinished, "fake finished index success") - return true, true + return true } // vector index build needs information of optional scalar fields data optionalFields := make([]*indexpb.OptionalFieldInfo, 0) @@ -110,12 +114,12 @@ func (it *indexBuildTask) AssignTask(ctx context.Context, client types.IndexNode if err != nil || collInfo == nil { log.Ctx(ctx).Warn("get collection failed", zap.Int64("collID", segIndex.CollectionID), zap.Error(err)) it.SetState(indexpb.JobState_JobStateInit, err.Error()) - return false, false + return true } colSchema := collInfo.Schema partitionKeyField, err := typeutil.GetPartitionKeyFieldSchema(colSchema) if partitionKeyField == nil || err != nil { - log.Ctx(ctx).Warn("index builder get partition key field failed", zap.Int64("buildID", it.buildID), zap.Error(err)) + log.Ctx(ctx).Warn("index builder get partition key field failed", zap.Int64("taskID", it.taskID), zap.Error(err)) } else { if typeutil.IsFieldDataTypeSupportMaterializedView(partitionKeyField) { optionalFields = append(optionalFields, &indexpb.OptionalFieldInfo{ @@ -161,16 +165,16 @@ func (it *indexBuildTask) AssignTask(ctx context.Context, client types.IndexNode var err error indexParams, err = indexparams.UpdateDiskIndexBuildParams(Params, indexParams) if err != nil { - log.Ctx(ctx).Warn("failed to append index build params", zap.Int64("buildID", it.buildID), zap.Error(err)) + log.Ctx(ctx).Warn("failed to append index build params", zap.Int64("taskID", it.taskID), zap.Error(err)) it.SetState(indexpb.JobState_JobStateInit, err.Error()) - return false, false + return true } } - var req *indexpb.CreateJobRequest + collectionInfo, err := dependency.handler.GetCollection(ctx, segment.GetCollectionID()) if err != nil { log.Ctx(ctx).Info("index builder get collection info failed", zap.Int64("collectionID", segment.GetCollectionID()), zap.Error(err)) - return false, false + return true } schema := collectionInfo.Schema @@ -183,7 +187,7 @@ func (it *indexBuildTask) AssignTask(ctx context.Context, client types.IndexNode } } - dim, err := storage.GetDimFromParams(field.TypeParams) + dim, err := storage.GetDimFromParams(field.GetTypeParams()) if err != nil { log.Ctx(ctx).Warn("failed to get dim from field type params", zap.String("field type", field.GetDataType().String()), zap.Error(err)) @@ -195,84 +199,90 @@ func (it *indexBuildTask) AssignTask(ctx context.Context, client types.IndexNode if err != nil { log.Ctx(ctx).Warn("failed to get storage uri", zap.Error(err)) it.SetState(indexpb.JobState_JobStateInit, err.Error()) - return false, false + return true } indexStorePath, err := itypeutil.GetStorageURI(params.Params.CommonCfg.StorageScheme.GetValue(), params.Params.CommonCfg.StoragePathPrefix.GetValue()+"/index", segment.GetID()) if err != nil { log.Ctx(ctx).Warn("failed to get storage uri", zap.Error(err)) it.SetState(indexpb.JobState_JobStateInit, err.Error()) - return false, false + return true } - req = &indexpb.CreateJobRequest{ + it.req = &indexpb.CreateJobRequest{ ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(), IndexFilePrefix: path.Join(dependency.chunkManager.RootPath(), common.SegmentIndexPath), - BuildID: it.buildID, - IndexVersion: segIndex.IndexVersion, + BuildID: it.taskID, + IndexVersion: segIndex.IndexVersion + 1, StorageConfig: storageConfig, IndexParams: indexParams, TypeParams: typeParams, NumRows: segIndex.NumRows, + CurrentIndexVersion: dependency.indexEngineVersionManager.GetCurrentIndexEngineVersion(), CollectionID: segment.GetCollectionID(), PartitionID: segment.GetPartitionID(), SegmentID: segment.GetID(), FieldID: fieldID, - FieldName: field.Name, - FieldType: field.DataType, + FieldName: field.GetName(), + FieldType: field.GetDataType(), StorePath: storePath, StoreVersion: segment.GetStorageVersion(), IndexStorePath: indexStorePath, Dim: int64(dim), - CurrentIndexVersion: dependency.indexEngineVersionManager.GetCurrentIndexEngineVersion(), DataIds: binlogIDs, OptionalScalarFields: optionalFields, Field: field, } } else { - req = &indexpb.CreateJobRequest{ + it.req = &indexpb.CreateJobRequest{ ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(), IndexFilePrefix: path.Join(dependency.chunkManager.RootPath(), common.SegmentIndexPath), - BuildID: it.buildID, - IndexVersion: segIndex.IndexVersion, + BuildID: it.taskID, + IndexVersion: segIndex.IndexVersion + 1, StorageConfig: storageConfig, IndexParams: indexParams, TypeParams: typeParams, NumRows: segIndex.NumRows, CurrentIndexVersion: dependency.indexEngineVersionManager.GetCurrentIndexEngineVersion(), - DataIds: binlogIDs, CollectionID: segment.GetCollectionID(), PartitionID: segment.GetPartitionID(), SegmentID: segment.GetID(), FieldID: fieldID, - OptionalScalarFields: optionalFields, + FieldName: field.GetName(), + FieldType: field.GetDataType(), Dim: int64(dim), + DataIds: binlogIDs, + OptionalScalarFields: optionalFields, Field: field, } } + log.Ctx(ctx).Info("index task pre check successfully", zap.Int64("taskID", it.GetTaskID())) + return false +} + +func (it *indexBuildTask) AssignTask(ctx context.Context, client types.IndexNodeClient) bool { ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval) defer cancel() resp, err := client.CreateJobV2(ctx, &indexpb.CreateJobV2Request{ - ClusterID: req.GetClusterID(), - TaskID: req.GetBuildID(), + ClusterID: it.req.GetClusterID(), + TaskID: it.req.GetBuildID(), JobType: indexpb.JobType_JobTypeIndexJob, Request: &indexpb.CreateJobV2Request_IndexRequest{ - IndexRequest: req, + IndexRequest: it.req, }, }) if err == nil { err = merr.Error(resp) } if err != nil { - log.Ctx(ctx).Warn("assign index task to indexNode failed", zap.Int64("buildID", it.buildID), zap.Error(err)) + log.Ctx(ctx).Warn("assign index task to indexNode failed", zap.Int64("taskID", it.taskID), zap.Error(err)) it.SetState(indexpb.JobState_JobStateRetry, err.Error()) - return false, true + return false } - log.Ctx(ctx).Info("index task assigned successfully", zap.Int64("buildID", it.buildID), - zap.Int64("segmentID", segIndex.SegmentID)) + log.Ctx(ctx).Info("index task assigned successfully", zap.Int64("taskID", it.taskID)) it.SetState(indexpb.JobState_JobStateInProgress, "") - return true, false + return true } func (it *indexBuildTask) setResult(info *indexpb.IndexTaskInfo) { @@ -289,7 +299,7 @@ func (it *indexBuildTask) QueryResult(ctx context.Context, node types.IndexNodeC err = merr.Error(resp.GetStatus()) } if err != nil { - log.Ctx(ctx).Warn("get jobs info from IndexNode failed", zap.Int64("buildID", it.GetTaskID()), + log.Ctx(ctx).Warn("get jobs info from IndexNode failed", zap.Int64("taskID", it.GetTaskID()), zap.Int64("nodeID", it.GetNodeID()), zap.Error(err)) it.SetState(indexpb.JobState_JobStateRetry, err.Error()) return diff --git a/internal/datacoord/task_scheduler.go b/internal/datacoord/task_scheduler.go index 6b07689551..1893dc15cf 100644 --- a/internal/datacoord/task_scheduler.go +++ b/internal/datacoord/task_scheduler.go @@ -100,8 +100,8 @@ func (s *taskScheduler) reloadFromKV() { } if segIndex.IndexState != commonpb.IndexState_Finished && segIndex.IndexState != commonpb.IndexState_Failed { s.tasks[segIndex.BuildID] = &indexBuildTask{ - buildID: segIndex.BuildID, - nodeID: segIndex.NodeID, + taskID: segIndex.BuildID, + nodeID: segIndex.NodeID, taskInfo: &indexpb.IndexTaskInfo{ BuildID: segIndex.BuildID, State: segIndex.IndexState, @@ -223,6 +223,12 @@ func (s *taskScheduler) process(taskID UniqueID) bool { s.removeTask(taskID) case indexpb.JobState_JobStateInit: + // 0. pre check task + skip := task.PreCheck(s.ctx, s) + if skip { + return true + } + // 1. pick an indexNode client nodeID, client := s.nodeManager.PickClient() if client == nil { @@ -239,17 +245,13 @@ func (s *taskScheduler) process(taskID UniqueID) bool { log.Ctx(s.ctx).Info("update task version success", zap.Int64("taskID", taskID)) // 3. assign task to indexNode - success, skip := task.AssignTask(s.ctx, client, s) + success := task.AssignTask(s.ctx, client) if !success { log.Ctx(s.ctx).Warn("assign task to client failed", zap.Int64("taskID", taskID), zap.String("new state", task.GetState().String()), zap.String("fail reason", task.GetFailReason())) // If the problem is caused by the task itself, subsequent tasks will not be skipped. // If etcd fails or fails to send tasks to the node, the subsequent tasks will be skipped. - return !skip - } - if skip { - // create index for small segment(<1024), skip next steps. - return true + return false } log.Ctx(s.ctx).Info("assign task to client success", zap.Int64("taskID", taskID), zap.Int64("nodeID", nodeID)) diff --git a/internal/datacoord/task_scheduler_test.go b/internal/datacoord/task_scheduler_test.go index fef5fb8fe8..475617c2ed 100644 --- a/internal/datacoord/task_scheduler_test.go +++ b/internal/datacoord/task_scheduler_test.go @@ -927,7 +927,6 @@ func (s *taskSchedulerSuite) Test_analyzeTaskFailCase() { ctx := context.Background() catalog := catalogmocks.NewDataCoordCatalog(s.T()) - in := mocks.NewMockIndexNodeClient(s.T()) workerManager := NewMockWorkerManager(s.T()) mt := createMeta(catalog, @@ -958,9 +957,7 @@ func (s *taskSchedulerSuite) Test_analyzeTaskFailCase() { scheduler.scheduleDuration = s.duration scheduler.Start() - // taskID 1 peek client success, update version success. AssignTask failed --> state: Failed --> save - workerManager.EXPECT().PickClient().Return(s.nodeID, in).Once() - catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(nil).Once() + // taskID 1 PreCheck failed --> state: Failed --> save catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(nil).Once() workerManager.EXPECT().GetClientByID(mock.Anything).Return(nil, false).Once() @@ -1298,14 +1295,10 @@ func (s *taskSchedulerSuite) Test_indexTaskFailCase() { defer Params.CommonCfg.EnableStorageV2.SwapTempValue("False") scheduler.Start() - // peek client success, update version success, get collection info failed --> init - workerManager.EXPECT().PickClient().Return(s.nodeID, in).Once() - catalog.EXPECT().AlterSegmentIndexes(mock.Anything, mock.Anything).Return(nil).Once() + // get collection info failed --> init handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(nil, errors.New("mock error")).Once() - // peek client success, update version success, partition key field is nil, get collection info failed --> init - workerManager.EXPECT().PickClient().Return(s.nodeID, in).Once() - catalog.EXPECT().AlterSegmentIndexes(mock.Anything, mock.Anything).Return(nil).Once() + // partition key field is nil, get collection info failed --> init handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{ ID: collID, Schema: &schemapb.CollectionSchema{ @@ -1316,9 +1309,7 @@ func (s *taskSchedulerSuite) Test_indexTaskFailCase() { }, nil).Once() handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(nil, errors.New("mock error")).Once() - // peek client success, update version success, get collection info success, get dim failed --> init - workerManager.EXPECT().PickClient().Return(s.nodeID, in).Once() - catalog.EXPECT().AlterSegmentIndexes(mock.Anything, mock.Anything).Return(nil).Once() + // get collection info success, get dim failed --> init handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{ ID: collID, Schema: &schemapb.CollectionSchema{ @@ -1331,8 +1322,6 @@ func (s *taskSchedulerSuite) Test_indexTaskFailCase() { // peek client success, update version success, get collection info success, get dim success, get storage uri failed --> init s.NoError(err) - workerManager.EXPECT().PickClient().Return(s.nodeID, in).Once() - catalog.EXPECT().AlterSegmentIndexes(mock.Anything, mock.Anything).Return(nil).Once() handler.EXPECT().GetCollection(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, i int64) (*collectionInfo, error) { return &collectionInfo{ ID: collID, @@ -1676,14 +1665,12 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() { return merr.Success(), nil }).Once() t := &indexBuildTask{ - buildID: buildID, - nodeID: nodeID, + taskID: buildID, + nodeID: nodeID, taskInfo: &indexpb.IndexTaskInfo{ BuildID: buildID, State: commonpb.IndexState_Unissued, FailReason: "", - // CurrentIndexVersion: 0, - // IndexStoreVersion: 0, }, } scheduler.enqueue(t) @@ -1701,8 +1688,8 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() { return merr.Success(), nil }).Once() t := &indexBuildTask{ - buildID: buildID, - nodeID: nodeID, + taskID: buildID, + nodeID: nodeID, taskInfo: &indexpb.IndexTaskInfo{ BuildID: buildID, State: commonpb.IndexState_Unissued, @@ -1730,8 +1717,8 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() { return merr.Success(), nil }).Once() t := &indexBuildTask{ - buildID: buildID, - nodeID: nodeID, + taskID: buildID, + nodeID: nodeID, taskInfo: &indexpb.IndexTaskInfo{ BuildID: buildID, State: commonpb.IndexState_Unissued, @@ -1753,8 +1740,8 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() { return merr.Success(), nil }).Once() t := &indexBuildTask{ - buildID: buildID, - nodeID: nodeID, + taskID: buildID, + nodeID: nodeID, taskInfo: &indexpb.IndexTaskInfo{ BuildID: buildID, State: commonpb.IndexState_Unissued, diff --git a/internal/datacoord/types.go b/internal/datacoord/types.go index c6fe60365c..c1a138eb44 100644 --- a/internal/datacoord/types.go +++ b/internal/datacoord/types.go @@ -27,13 +27,14 @@ type Task interface { GetTaskID() int64 GetNodeID() int64 ResetNodeID() + PreCheck(ctx context.Context, dependency *taskScheduler) bool CheckTaskHealthy(mt *meta) bool SetState(state indexpb.JobState, failReason string) GetState() indexpb.JobState GetFailReason() string UpdateVersion(ctx context.Context, meta *meta) error UpdateMetaBuildingState(nodeID int64, meta *meta) error - AssignTask(ctx context.Context, client types.IndexNodeClient, dependency *taskScheduler) (bool, bool) + AssignTask(ctx context.Context, client types.IndexNodeClient) bool QueryResult(ctx context.Context, client types.IndexNodeClient) DropTaskOnWorker(ctx context.Context, client types.IndexNodeClient) bool SetJobInfo(meta *meta) error