// Licensed to the LF AI & Data foundation under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package datacoord import ( "context" "fmt" "sync" "time" "github.com/cockroachdb/errors" "go.uber.org/zap" "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/datacoord/task" "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/proto/indexpb" "github.com/milvus-io/milvus/pkg/v2/util/merr" ) type ExternalCollectionInspector interface { Start() Stop() SubmitUpdateTask(collectionID int64) error } var _ ExternalCollectionInspector = (*externalCollectionInspector)(nil) type externalCollectionInspector struct { ctx context.Context cancel context.CancelFunc loopWg sync.WaitGroup mt *meta scheduler task.GlobalScheduler allocator allocator.Allocator } func newExternalCollectionInspector(ctx context.Context, mt *meta, scheduler task.GlobalScheduler, allocator allocator.Allocator, ) *externalCollectionInspector { ctx, cancel := context.WithCancel(ctx) return &externalCollectionInspector{ ctx: ctx, cancel: cancel, loopWg: sync.WaitGroup{}, mt: mt, scheduler: scheduler, allocator: allocator, } } func (i *externalCollectionInspector) Start() { i.reloadFromMeta() i.loopWg.Add(1) go i.triggerUpdateTaskLoop() } func (i *externalCollectionInspector) Stop() { i.cancel() i.loopWg.Wait() } func (i *externalCollectionInspector) reloadFromMeta() { tasks := i.mt.externalCollectionTaskMeta.GetAllTasks() for _, t := range tasks { if t.GetState() != indexpb.JobState_JobStateInit && t.GetState() != indexpb.JobState_JobStateRetry && t.GetState() != indexpb.JobState_JobStateInProgress { continue } // Enqueue active tasks for processing updateTask := newUpdateExternalCollectionTask(t, i.mt) i.scheduler.Enqueue(updateTask) } } func (i *externalCollectionInspector) triggerUpdateTaskLoop() { log.Info("start external collection inspector loop...") defer i.loopWg.Done() ticker := time.NewTicker(Params.DataCoordCfg.TaskCheckInterval.GetAsDuration(time.Second)) defer ticker.Stop() for { select { case <-i.ctx.Done(): log.Warn("DataCoord context done, exit external collection inspector loop...") return case <-ticker.C: i.triggerUpdateTasks() } } } func (i *externalCollectionInspector) triggerUpdateTasks() { collections := i.mt.GetCollections() for _, collection := range collections { if !i.isExternalCollection(collection) { continue } // Check if we should trigger a task based on source changes if shouldTrigger := i.shouldTriggerTask(collection); shouldTrigger { if err := i.SubmitUpdateTask(collection.ID); err != nil { log.Warn("failed to submit update task for external collection", zap.Int64("collectionID", collection.ID), zap.Error(err)) } } } } func (i *externalCollectionInspector) isExternalCollection(collection *collectionInfo) bool { if collection.Schema == nil { return false } return collection.Schema.GetExternalSource() != "" } // shouldTriggerTask checks if we should create/replace a task based on external source changes // Returns shouldTrigger func (i *externalCollectionInspector) shouldTriggerTask(collection *collectionInfo) bool { externalSource := collection.Schema.GetExternalSource() externalSpec := collection.Schema.GetExternalSpec() existingTask := i.mt.externalCollectionTaskMeta.GetTaskByCollectionID(collection.ID) if existingTask == nil { // No task exists, create one return true } taskSource := existingTask.GetExternalSource() taskSpec := existingTask.GetExternalSpec() // Check if source or spec changed sourceChanged := externalSource != taskSource || externalSpec != taskSpec switch existingTask.GetState() { case indexpb.JobState_JobStateInit, indexpb.JobState_JobStateRetry, indexpb.JobState_JobStateInProgress: // Task is running if sourceChanged { // Source changed while task running - abort and replace log.Info("External source changed, replacing running task", zap.Int64("collectionID", collection.ID), zap.Int64("taskID", existingTask.GetTaskID()), zap.String("oldSource", taskSource), zap.String("newSource", externalSource)) // Abort old task from scheduler i.scheduler.AbortAndRemoveTask(existingTask.GetTaskID()) // Drop from meta i.mt.externalCollectionTaskMeta.DropTask(context.Background(), existingTask.GetTaskID()) return true } // Same source, task already running return false case indexpb.JobState_JobStateFinished: // Task completed if sourceChanged { // Source changed after completion, create new task log.Info("External source changed after completion, creating new task", zap.Int64("collectionID", collection.ID), zap.String("oldSource", taskSource), zap.String("newSource", externalSource)) // Drop old completed task, create new one i.mt.externalCollectionTaskMeta.DropTask(context.Background(), existingTask.GetTaskID()) return true } // Up to date return false case indexpb.JobState_JobStateFailed: if sourceChanged { // Source changed after failure - worth retrying with new source log.Info("External source changed after failure, creating new task", zap.Int64("collectionID", collection.ID), zap.String("oldSource", taskSource), zap.String("newSource", externalSource)) i.mt.externalCollectionTaskMeta.DropTask(context.Background(), existingTask.GetTaskID()) return true } // Same source failed - don't auto-retry return false default: return false } } func (i *externalCollectionInspector) SubmitUpdateTask(collectionID int64) error { log := log.Ctx(i.ctx).With(zap.Int64("collectionID", collectionID)) // Get collection info to retrieve external source and spec collection := i.mt.GetCollection(collectionID) if collection == nil { log.Warn("collection not found") return fmt.Errorf("collection %d not found", collectionID) } // Allocate task ID taskID, err := i.allocator.AllocID(context.Background()) if err != nil { log.Warn("failed to allocate task ID", zap.Error(err)) return err } // Create task t := &indexpb.UpdateExternalCollectionTask{ CollectionID: collectionID, TaskID: taskID, Version: 0, NodeID: 0, State: indexpb.JobState_JobStateInit, FailReason: "", ExternalSource: collection.Schema.GetExternalSource(), ExternalSpec: collection.Schema.GetExternalSpec(), } // Add task to meta if err = i.mt.externalCollectionTaskMeta.AddTask(t); err != nil { if errors.Is(err, merr.ErrTaskDuplicate) { log.Info("external collection update task already exists", zap.Int64("collectionID", collectionID)) return nil } log.Warn("failed to add task to meta", zap.Error(err)) return err } // Create and enqueue task updateTask := newUpdateExternalCollectionTask(t, i.mt) i.scheduler.Enqueue(updateTask) log.Info("external collection update task submitted", zap.Int64("taskID", taskID), zap.Int64("collectionID", collectionID), zap.String("externalSource", collection.Schema.GetExternalSource())) return nil }