enhance: implement external collection update task with source change detection (#45905)

issue: #45881 
Add persistent task management for external collections with automatic
detection of external_source and external_spec changes. When source
changes, the system aborts running tasks and creates new ones, ensuring
only one active task per collection. Tasks validate their source on
completion to prevent superseded tasks from committing results.

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
- Core invariant: at most one active UpdateExternalCollection task
exists per collection — tasks are serialized by collectionID
(collection-level locking) and any change to external_source or
external_spec aborts superseded tasks and causes a new task creation
(externalCollectionManager + external_collection_task_meta
collection-based locks enforce this).
- What was simplified/removed: per-task fine-grained locking and
concurrent multi-task acceptance per collection were replaced by
collection-level synchronization (external_collection_task_meta.go) and
a single persistent task lifecycle in DataCoord/Index task code;
redundant double-concurrent update paths were removed by checking
existing task presence in AddTask/LoadOrStore and aborting/overwriting
via Drop/Cancel flows.
- Why this does NOT cause data loss or regress behavior: task state
transitions and commit are validated against the current external
source/spec before applying changes — UpdateStateWithMeta and SetJobInfo
verify task metadata and persist via catalog only under matching
collection-state; DataNode externalCollectionManager persists task
results to in-memory manager and exposes Query/Drop flows (services.go)
without modifying existing segment data unless a task successfully
finishes and SetJobInfo atomically updates segments via meta/catalog
calls, preventing superseded tasks from committing stale results.
- New capability added: end-to-end external collection update workflow —
DataCoord Index task + Cluster RPC helpers + DataNode external task
runner and ExternalCollectionManager enable creating, querying,
cancelling, and applying external collection updates
(fragment-to-segment balancing, kept/updated segment handling, allocator
integration); accompanying unit tests cover success, failure,
cancellation, allocator errors, and balancing logic.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Signed-off-by: sunby <sunbingyi1992@gmail.com>
This commit is contained in:
Bingyi Sun 2025-12-29 19:53:21 +08:00 committed by GitHub
parent 293838bb67
commit f9827392bb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 4010 additions and 928 deletions

View File

@ -88,7 +88,7 @@ func (i *externalCollectionInspector) reloadFromMeta() {
continue
}
// Enqueue active tasks for processing
updateTask := newUpdateExternalCollectionTask(t, i.mt)
updateTask := newUpdateExternalCollectionTask(t, i.mt, i.allocator)
i.scheduler.Enqueue(updateTask)
}
}
@ -253,7 +253,7 @@ func (i *externalCollectionInspector) SubmitUpdateTask(collectionID int64) error
}
// Create and enqueue task
updateTask := newUpdateExternalCollectionTask(t, i.mt)
updateTask := newUpdateExternalCollectionTask(t, i.mt, i.allocator)
i.scheduler.Enqueue(updateTask)
log.Info("external collection update task submitted",

View File

@ -76,15 +76,19 @@ func (ectm *externalCollectionTaskMeta) reloadFromKV() error {
}
func (ectm *externalCollectionTaskMeta) AddTask(t *indexpb.UpdateExternalCollectionTask) error {
ectm.keyLock.Lock(t.GetTaskID())
defer ectm.keyLock.Unlock(t.GetTaskID())
// Lock on collectionID to prevent concurrent tasks for the same collection
ectm.keyLock.Lock(t.GetCollectionID())
defer ectm.keyLock.Unlock(t.GetCollectionID())
log.Ctx(ectm.ctx).Info("add update external collection task",
zap.Int64("taskID", t.GetTaskID()),
zap.Int64("collectionID", t.GetCollectionID()))
if _, ok := ectm.collectionID2Tasks.Get(t.GetCollectionID()); ok {
// Check if a task already exists for this collection
if existingTask, ok := ectm.collectionID2Tasks.Get(t.GetCollectionID()); ok {
log.Warn("update external collection task already exists for collection",
zap.Int64("existingTaskID", existingTask.GetTaskID()),
zap.Int64("newTaskID", t.GetTaskID()),
zap.Int64("collectionID", t.GetCollectionID()))
return merr.WrapErrTaskDuplicate(strconv.FormatInt(t.GetCollectionID(), 10))
}
@ -107,16 +111,28 @@ func (ectm *externalCollectionTaskMeta) AddTask(t *indexpb.UpdateExternalCollect
}
func (ectm *externalCollectionTaskMeta) DropTask(ctx context.Context, taskID int64) error {
ectm.keyLock.Lock(taskID)
defer ectm.keyLock.Unlock(taskID)
log.Ctx(ctx).Info("drop update external collection task by taskID", zap.Int64("taskID", taskID))
// First get the task to find its collectionID
t, ok := ectm.tasks.Get(taskID)
if !ok {
log.Info("remove update external collection task success, task already not exist", zap.Int64("taskID", taskID))
log.Ctx(ctx).Info("remove update external collection task success, task already not exist", zap.Int64("taskID", taskID))
return nil
}
// Lock on collectionID to serialize with AddTask operations
ectm.keyLock.Lock(t.GetCollectionID())
defer ectm.keyLock.Unlock(t.GetCollectionID())
log.Ctx(ctx).Info("drop update external collection task by taskID",
zap.Int64("taskID", taskID),
zap.Int64("collectionID", t.GetCollectionID()))
// Double-check task still exists after acquiring lock
t, ok = ectm.tasks.Get(taskID)
if !ok {
log.Ctx(ctx).Info("remove update external collection task success, task already not exist", zap.Int64("taskID", taskID))
return nil
}
if err := ectm.catalog.DropUpdateExternalCollectionTask(ctx, taskID); err != nil {
log.Warn("drop update external collection task failed",
zap.Int64("taskID", taskID),
@ -128,19 +144,28 @@ func (ectm *externalCollectionTaskMeta) DropTask(ctx context.Context, taskID int
ectm.tasks.Remove(taskID)
ectm.collectionID2Tasks.Remove(t.GetCollectionID())
log.Info("remove update external collection task success", zap.Int64("taskID", taskID))
log.Info("remove update external collection task success",
zap.Int64("taskID", taskID),
zap.Int64("collectionID", t.GetCollectionID()))
return nil
}
func (ectm *externalCollectionTaskMeta) UpdateVersion(taskID, nodeID int64) error {
ectm.keyLock.Lock(taskID)
defer ectm.keyLock.Unlock(taskID)
t, ok := ectm.tasks.Get(taskID)
if !ok {
return fmt.Errorf("task %d not found", taskID)
}
// Lock on collectionID for consistency with Add/Drop operations
ectm.keyLock.Lock(t.GetCollectionID())
defer ectm.keyLock.Unlock(t.GetCollectionID())
// Double-check task still exists after acquiring lock
t, ok = ectm.tasks.Get(taskID)
if !ok {
return fmt.Errorf("task %d not found", taskID)
}
cloneT := proto.Clone(t).(*indexpb.UpdateExternalCollectionTask)
cloneT.Version++
cloneT.NodeID = nodeID
@ -162,14 +187,21 @@ func (ectm *externalCollectionTaskMeta) UpdateVersion(taskID, nodeID int64) erro
}
func (ectm *externalCollectionTaskMeta) UpdateTaskState(taskID int64, state indexpb.JobState, failReason string) error {
ectm.keyLock.Lock(taskID)
defer ectm.keyLock.Unlock(taskID)
t, ok := ectm.tasks.Get(taskID)
if !ok {
return fmt.Errorf("task %d not found", taskID)
}
// Lock on collectionID for consistency with Add/Drop operations
ectm.keyLock.Lock(t.GetCollectionID())
defer ectm.keyLock.Unlock(t.GetCollectionID())
// Double-check task still exists after acquiring lock
t, ok = ectm.tasks.Get(taskID)
if !ok {
return fmt.Errorf("task %d not found", taskID)
}
cloneT := proto.Clone(t).(*indexpb.UpdateExternalCollectionTask)
cloneT.State = state
cloneT.FailReason = failReason

View File

@ -83,6 +83,13 @@ type Cluster interface {
QueryAnalyze(nodeID int64, in *workerpb.QueryJobsRequest) (*workerpb.AnalyzeResults, error)
// DropAnalyze drops an analysis task
DropAnalyze(nodeID int64, taskID int64) error
// CreateExternalCollectionTask creates and executes an external collection task
CreateExternalCollectionTask(nodeID int64, req *datapb.UpdateExternalCollectionRequest) error
// QueryExternalCollectionTask queries the status of an external collection task
QueryExternalCollectionTask(nodeID int64, taskID int64) (*datapb.UpdateExternalCollectionResponse, error)
// DropExternalCollectionTask drops an external collection task
DropExternalCollectionTask(nodeID int64, taskID int64) error
}
var _ Cluster = (*cluster)(nil)
@ -612,3 +619,71 @@ func (c *cluster) DropAnalyze(nodeID int64, taskID int64) error {
properties.AppendType(taskcommon.Analyze)
return c.dropTask(nodeID, properties)
}
func (c *cluster) CreateExternalCollectionTask(nodeID int64, req *datapb.UpdateExternalCollectionRequest) error {
timeout := paramtable.Get().DataCoordCfg.RequestTimeoutSeconds.GetAsDuration(time.Second)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
cli, err := c.nm.GetClient(nodeID)
if err != nil {
log.Ctx(ctx).Warn("failed to get client", zap.Error(err))
return err
}
properties := taskcommon.NewProperties(nil)
properties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
properties.AppendTaskID(req.GetTaskID())
properties.AppendType(taskcommon.ExternalCollection)
payload, err := proto.Marshal(req)
if err != nil {
log.Ctx(ctx).Warn("marshal request failed", zap.Error(err))
return err
}
// Submit task to worker - task will execute asynchronously in worker's goroutine pool
status, err := cli.CreateTask(ctx, &workerpb.CreateTaskRequest{
Payload: payload,
Properties: properties,
})
if err != nil {
log.Ctx(ctx).Warn("create external collection task failed", zap.Error(err))
return err
}
if err := merr.Error(status); err != nil {
log.Ctx(ctx).Warn("create external collection task returned error", zap.Error(err))
return err
}
return nil
}
func (c *cluster) QueryExternalCollectionTask(nodeID int64, taskID int64) (*datapb.UpdateExternalCollectionResponse, error) {
properties := taskcommon.NewProperties(nil)
properties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
properties.AppendTaskID(taskID)
properties.AppendType(taskcommon.ExternalCollection)
resp, err := c.queryTask(nodeID, properties)
if err != nil {
return nil, err
}
// Unmarshal the response payload
result := &datapb.UpdateExternalCollectionResponse{}
if err := proto.Unmarshal(resp.GetPayload(), result); err != nil {
return nil, err
}
return result, nil
}
func (c *cluster) DropExternalCollectionTask(nodeID int64, taskID int64) error {
properties := taskcommon.NewProperties(nil)
properties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
properties.AppendTaskID(taskID)
properties.AppendType(taskcommon.ExternalCollection)
return c.dropTask(nodeID, properties)
}

View File

@ -116,6 +116,53 @@ func (_c *MockCluster_CreateCompaction_Call) RunAndReturn(run func(int64, *datap
return _c
}
// CreateExternalCollectionTask provides a mock function with given fields: nodeID, req
func (_m *MockCluster) CreateExternalCollectionTask(nodeID int64, req *datapb.UpdateExternalCollectionRequest) error {
ret := _m.Called(nodeID, req)
if len(ret) == 0 {
panic("no return value specified for CreateExternalCollectionTask")
}
var r0 error
if rf, ok := ret.Get(0).(func(int64, *datapb.UpdateExternalCollectionRequest) error); ok {
r0 = rf(nodeID, req)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockCluster_CreateExternalCollectionTask_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CreateExternalCollectionTask'
type MockCluster_CreateExternalCollectionTask_Call struct {
*mock.Call
}
// CreateExternalCollectionTask is a helper method to define mock.On call
// - nodeID int64
// - req *datapb.UpdateExternalCollectionRequest
func (_e *MockCluster_Expecter) CreateExternalCollectionTask(nodeID interface{}, req interface{}) *MockCluster_CreateExternalCollectionTask_Call {
return &MockCluster_CreateExternalCollectionTask_Call{Call: _e.mock.On("CreateExternalCollectionTask", nodeID, req)}
}
func (_c *MockCluster_CreateExternalCollectionTask_Call) Run(run func(nodeID int64, req *datapb.UpdateExternalCollectionRequest)) *MockCluster_CreateExternalCollectionTask_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64), args[1].(*datapb.UpdateExternalCollectionRequest))
})
return _c
}
func (_c *MockCluster_CreateExternalCollectionTask_Call) Return(_a0 error) *MockCluster_CreateExternalCollectionTask_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockCluster_CreateExternalCollectionTask_Call) RunAndReturn(run func(int64, *datapb.UpdateExternalCollectionRequest) error) *MockCluster_CreateExternalCollectionTask_Call {
_c.Call.Return(run)
return _c
}
// CreateImport provides a mock function with given fields: nodeID, in, taskSlot
func (_m *MockCluster) CreateImport(nodeID int64, in *datapb.ImportRequest, taskSlot int64) error {
ret := _m.Called(nodeID, in, taskSlot)
@ -400,6 +447,53 @@ func (_c *MockCluster_DropCompaction_Call) RunAndReturn(run func(int64, int64) e
return _c
}
// DropExternalCollectionTask provides a mock function with given fields: nodeID, taskID
func (_m *MockCluster) DropExternalCollectionTask(nodeID int64, taskID int64) error {
ret := _m.Called(nodeID, taskID)
if len(ret) == 0 {
panic("no return value specified for DropExternalCollectionTask")
}
var r0 error
if rf, ok := ret.Get(0).(func(int64, int64) error); ok {
r0 = rf(nodeID, taskID)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockCluster_DropExternalCollectionTask_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DropExternalCollectionTask'
type MockCluster_DropExternalCollectionTask_Call struct {
*mock.Call
}
// DropExternalCollectionTask is a helper method to define mock.On call
// - nodeID int64
// - taskID int64
func (_e *MockCluster_Expecter) DropExternalCollectionTask(nodeID interface{}, taskID interface{}) *MockCluster_DropExternalCollectionTask_Call {
return &MockCluster_DropExternalCollectionTask_Call{Call: _e.mock.On("DropExternalCollectionTask", nodeID, taskID)}
}
func (_c *MockCluster_DropExternalCollectionTask_Call) Run(run func(nodeID int64, taskID int64)) *MockCluster_DropExternalCollectionTask_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64), args[1].(int64))
})
return _c
}
func (_c *MockCluster_DropExternalCollectionTask_Call) Return(_a0 error) *MockCluster_DropExternalCollectionTask_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockCluster_DropExternalCollectionTask_Call) RunAndReturn(run func(int64, int64) error) *MockCluster_DropExternalCollectionTask_Call {
_c.Call.Return(run)
return _c
}
// DropImport provides a mock function with given fields: nodeID, taskID
func (_m *MockCluster) DropImport(nodeID int64, taskID int64) error {
ret := _m.Called(nodeID, taskID)
@ -659,6 +753,65 @@ func (_c *MockCluster_QueryCompaction_Call) RunAndReturn(run func(int64, *datapb
return _c
}
// QueryExternalCollectionTask provides a mock function with given fields: nodeID, taskID
func (_m *MockCluster) QueryExternalCollectionTask(nodeID int64, taskID int64) (*datapb.UpdateExternalCollectionResponse, error) {
ret := _m.Called(nodeID, taskID)
if len(ret) == 0 {
panic("no return value specified for QueryExternalCollectionTask")
}
var r0 *datapb.UpdateExternalCollectionResponse
var r1 error
if rf, ok := ret.Get(0).(func(int64, int64) (*datapb.UpdateExternalCollectionResponse, error)); ok {
return rf(nodeID, taskID)
}
if rf, ok := ret.Get(0).(func(int64, int64) *datapb.UpdateExternalCollectionResponse); ok {
r0 = rf(nodeID, taskID)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*datapb.UpdateExternalCollectionResponse)
}
}
if rf, ok := ret.Get(1).(func(int64, int64) error); ok {
r1 = rf(nodeID, taskID)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockCluster_QueryExternalCollectionTask_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'QueryExternalCollectionTask'
type MockCluster_QueryExternalCollectionTask_Call struct {
*mock.Call
}
// QueryExternalCollectionTask is a helper method to define mock.On call
// - nodeID int64
// - taskID int64
func (_e *MockCluster_Expecter) QueryExternalCollectionTask(nodeID interface{}, taskID interface{}) *MockCluster_QueryExternalCollectionTask_Call {
return &MockCluster_QueryExternalCollectionTask_Call{Call: _e.mock.On("QueryExternalCollectionTask", nodeID, taskID)}
}
func (_c *MockCluster_QueryExternalCollectionTask_Call) Run(run func(nodeID int64, taskID int64)) *MockCluster_QueryExternalCollectionTask_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64), args[1].(int64))
})
return _c
}
func (_c *MockCluster_QueryExternalCollectionTask_Call) Return(_a0 *datapb.UpdateExternalCollectionResponse, _a1 error) *MockCluster_QueryExternalCollectionTask_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockCluster_QueryExternalCollectionTask_Call) RunAndReturn(run func(int64, int64) (*datapb.UpdateExternalCollectionResponse, error)) *MockCluster_QueryExternalCollectionTask_Call {
_c.Call.Return(run)
return _c
}
// QueryImport provides a mock function with given fields: nodeID, in
func (_m *MockCluster) QueryImport(nodeID int64, in *datapb.QueryImportRequest) (*datapb.QueryImportResponse, error) {
ret := _m.Called(nodeID, in)

View File

@ -21,11 +21,15 @@ import (
"fmt"
"time"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/datacoord/allocator"
"github.com/milvus-io/milvus/internal/datacoord/session"
globalTask "github.com/milvus-io/milvus/internal/datacoord/task"
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
"github.com/milvus-io/milvus/pkg/v2/taskcommon"
)
@ -35,16 +39,18 @@ type updateExternalCollectionTask struct {
times *taskcommon.Times
meta *meta
meta *meta
allocator allocator.Allocator
}
var _ globalTask.Task = (*updateExternalCollectionTask)(nil)
func newUpdateExternalCollectionTask(t *indexpb.UpdateExternalCollectionTask, mt *meta) *updateExternalCollectionTask {
func newUpdateExternalCollectionTask(t *indexpb.UpdateExternalCollectionTask, mt *meta, alloc allocator.Allocator) *updateExternalCollectionTask {
return &updateExternalCollectionTask{
UpdateExternalCollectionTask: t,
times: taskcommon.NewTimes(),
meta: mt,
allocator: alloc,
}
}
@ -118,32 +124,264 @@ func (t *updateExternalCollectionTask) SetState(state indexpb.JobState, failReas
t.FailReason = failReason
}
func (t *updateExternalCollectionTask) CreateTaskOnWorker(nodeID int64, cluster session.Cluster) {
ctx := context.Background()
func (t *updateExternalCollectionTask) UpdateStateWithMeta(state indexpb.JobState, failReason string) error {
if err := t.meta.externalCollectionTaskMeta.UpdateTaskState(t.GetTaskID(), state, failReason); err != nil {
log.Warn("update external collection task state failed",
zap.Int64("taskID", t.GetTaskID()),
zap.String("state", state.String()),
zap.String("failReason", failReason),
zap.Error(err))
return err
}
t.SetState(state, failReason)
return nil
}
// SetJobInfo processes the task response and updates segment information atomically
func (t *updateExternalCollectionTask) SetJobInfo(ctx context.Context, resp *datapb.UpdateExternalCollectionResponse) error {
log := log.Ctx(ctx).With(
zap.Int64("taskID", t.GetTaskID()),
zap.Int64("collectionID", t.GetCollectionID()),
)
// For external collections, we just need to update metadata
// This is a placeholder for actual logic that would:
// 1. Query external storage for collection statistics
// 2. Update collection metadata in meta
// 3. Mark task as finished
keptSegmentIDs := resp.GetKeptSegments()
updatedSegments := resp.GetUpdatedSegments()
log.Info("updating external collection metadata")
log.Info("processing external collection update response",
zap.Int("keptSegments", len(keptSegmentIDs)),
zap.Int("updatedSegments", len(updatedSegments)))
// TODO: Implement actual update logic
// For now, just mark as finished
t.SetState(indexpb.JobState_JobStateFinished, "")
// Build kept segments map for fast lookup
keptSegmentMap := make(map[int64]bool)
for _, segID := range keptSegmentIDs {
keptSegmentMap[segID] = true
}
log.Info("external collection metadata updated successfully")
// Allocate new IDs and update updatedSegments directly
for _, seg := range updatedSegments {
newSegmentID, err := t.allocator.AllocID(ctx)
if err != nil {
log.Warn("failed to allocate segment ID", zap.Error(err))
return err
}
log.Info("allocated new segment ID",
zap.Int64("oldID", seg.GetID()),
zap.Int64("newID", newSegmentID))
seg.ID = newSegmentID
seg.State = commonpb.SegmentState_Flushed
}
// Build update operators
var operators []UpdateOperator
// Operator 1: Drop segments not in kept list
dropOperator := func(modPack *updateSegmentPack) bool {
currentSegments := modPack.meta.segments.GetSegments()
for _, seg := range currentSegments {
// Skip segments not in this collection
if seg.GetCollectionID() != t.GetCollectionID() {
continue
}
// Skip segments that are already dropped
if seg.GetState() == commonpb.SegmentState_Dropped {
continue
}
// Drop segment if not in kept list
if !keptSegmentMap[seg.GetID()] {
segment := modPack.Get(seg.GetID())
if segment != nil {
updateSegStateAndPrepareMetrics(segment, commonpb.SegmentState_Dropped, modPack.metricMutation)
segment.DroppedAt = uint64(time.Now().UnixNano())
modPack.segments[seg.GetID()] = segment
log.Info("marking segment as dropped",
zap.Int64("segmentID", seg.GetID()),
zap.Int64("numRows", seg.GetNumOfRows()))
}
}
}
return true
}
operators = append(operators, dropOperator)
// Operator 2: Add new segments
for _, seg := range updatedSegments {
newSeg := seg // capture for closure
addOperator := func(modPack *updateSegmentPack) bool {
segInfo := NewSegmentInfo(newSeg)
modPack.segments[newSeg.GetID()] = segInfo
// Add binlogs increment
modPack.increments[newSeg.GetID()] = metastore.BinlogsIncrement{
Segment: newSeg,
}
// Update metrics
modPack.metricMutation.addNewSeg(
commonpb.SegmentState_Flushed,
newSeg.GetLevel(),
newSeg.GetIsSorted(),
newSeg.GetNumOfRows(),
)
log.Info("adding new segment",
zap.Int64("segmentID", newSeg.GetID()),
zap.Int64("numRows", newSeg.GetNumOfRows()))
return true
}
operators = append(operators, addOperator)
}
// Execute all operators atomically
if err := t.meta.UpdateSegmentsInfo(ctx, operators...); err != nil {
log.Warn("failed to update segments atomically", zap.Error(err))
return err
}
log.Info("external collection segments updated successfully",
zap.Int("updatedSegments", len(updatedSegments)),
zap.Int("keptSegments", len(keptSegmentIDs)))
return nil
}
func (t *updateExternalCollectionTask) CreateTaskOnWorker(nodeID int64, cluster session.Cluster) {
ctx := context.Background()
log := log.Ctx(ctx).With(
zap.Int64("taskID", t.GetTaskID()),
zap.Int64("collectionID", t.GetCollectionID()),
zap.Int64("nodeID", nodeID),
)
var err error
defer func() {
if err != nil {
log.Warn("failed to create external collection update task on worker", zap.Error(err))
t.UpdateStateWithMeta(indexpb.JobState_JobStateFailed, err.Error())
}
}()
log.Info("creating external collection update task on worker")
// Set node ID for this task
t.NodeID = nodeID
// Get current segments for the collection
segments := t.meta.SelectSegments(ctx, CollectionFilter(t.GetCollectionID()))
currentSegments := make([]*datapb.SegmentInfo, 0, len(segments))
for _, seg := range segments {
currentSegments = append(currentSegments, seg.SegmentInfo)
}
log.Info("collected current segments", zap.Int("segmentCount", len(currentSegments)))
// Build request
req := &datapb.UpdateExternalCollectionRequest{
CollectionID: t.GetCollectionID(),
TaskID: t.GetTaskID(),
CurrentSegments: currentSegments,
ExternalSource: t.GetExternalSource(),
ExternalSpec: t.GetExternalSpec(),
}
// Submit task to worker via unified task system
// Task will execute asynchronously in worker's goroutine pool
err = cluster.CreateExternalCollectionTask(nodeID, req)
if err != nil {
log.Warn("failed to create external collection task on worker", zap.Error(err))
return
}
// Mark task as in progress - QueryTaskOnWorker will check completion
if err = t.UpdateStateWithMeta(indexpb.JobState_JobStateInProgress, ""); err != nil {
log.Warn("failed to update task state to InProgress", zap.Error(err))
return
}
log.Info("external collection update task submitted successfully")
}
func (t *updateExternalCollectionTask) QueryTaskOnWorker(cluster session.Cluster) {
// External collection tasks finish immediately, so query is a no-op
ctx := context.Background()
log := log.Ctx(ctx).With(
zap.Int64("taskID", t.GetTaskID()),
zap.Int64("collectionID", t.GetCollectionID()),
zap.Int64("nodeID", t.GetNodeID()),
)
// Query task status from worker
resp, err := cluster.QueryExternalCollectionTask(t.GetNodeID(), t.GetTaskID())
if err != nil {
log.Warn("query external collection task result failed", zap.Error(err))
// If query fails, mark task as failed
t.UpdateStateWithMeta(indexpb.JobState_JobStateFailed, fmt.Sprintf("query task failed: %v", err))
return
}
state := resp.GetState()
failReason := resp.GetFailReason()
log.Info("queried external collection task status",
zap.String("state", state.String()),
zap.String("failReason", failReason))
// Handle different task states
switch state {
case indexpb.JobState_JobStateFinished:
// Process the response and update segment info
if err := t.SetJobInfo(ctx, resp); err != nil {
log.Warn("failed to process job info", zap.Error(err))
t.UpdateStateWithMeta(indexpb.JobState_JobStateFailed, fmt.Sprintf("failed to process job info: %v", err))
return
}
// Task completed successfully
if err := t.UpdateStateWithMeta(state, ""); err != nil {
log.Warn("failed to update task state to Finished", zap.Error(err))
return
}
log.Info("external collection task completed successfully")
case indexpb.JobState_JobStateFailed:
// Task failed
if err := t.UpdateStateWithMeta(state, failReason); err != nil {
log.Warn("failed to update task state to Failed", zap.Error(err))
return
}
log.Warn("external collection task failed", zap.String("reason", failReason))
case indexpb.JobState_JobStateInProgress:
// Task still in progress, no action needed
log.Info("external collection task still in progress")
case indexpb.JobState_JobStateNone, indexpb.JobState_JobStateRetry:
// Task not found or needs retry - mark as failed
log.Warn("external collection task in unexpected state, marking as failed",
zap.String("state", state.String()))
t.UpdateStateWithMeta(indexpb.JobState_JobStateFailed, fmt.Sprintf("task in unexpected state: %s", state.String()))
default:
log.Warn("external collection task in unknown state",
zap.String("state", state.String()))
}
}
func (t *updateExternalCollectionTask) DropTaskOnWorker(cluster session.Cluster) {
// External collection tasks don't run on workers, so drop is a no-op
ctx := context.Background()
log := log.Ctx(ctx).With(
zap.Int64("taskID", t.GetTaskID()),
zap.Int64("collectionID", t.GetCollectionID()),
zap.Int64("nodeID", t.GetNodeID()),
)
// Drop task on worker to cancel execution and clean up resources
err := cluster.DropExternalCollectionTask(t.GetNodeID(), t.GetTaskID())
if err != nil {
log.Warn("failed to drop external collection task on worker", zap.Error(err))
return
}
log.Info("external collection task dropped successfully")
}

View File

@ -0,0 +1,359 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datacoord
import (
"context"
"testing"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/datacoord/allocator"
"github.com/milvus-io/milvus/internal/datacoord/broker"
"github.com/milvus-io/milvus/internal/datacoord/session"
kvcatalog "github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
"github.com/milvus-io/milvus/pkg/v2/proto/rootcoordpb"
"github.com/milvus-io/milvus/pkg/v2/util/lock"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
type updateExternalCollectionTaskSuite struct {
suite.Suite
mt *meta
collID int64
taskID int64
nodeID int64
externalSource string
externalSpec string
}
func Test_updateExternalCollectionTaskSuite(t *testing.T) {
suite.Run(t, new(updateExternalCollectionTaskSuite))
}
func (s *updateExternalCollectionTaskSuite) SetupSuite() {
s.collID = 100
s.taskID = 200
s.nodeID = 1
s.externalSource = "s3"
s.externalSpec = "spec"
}
func (s *updateExternalCollectionTaskSuite) SetupTest() {
catalog := kvcatalog.NewCatalog(NewMetaMemoryKV(), "", "")
mockBroker := broker.NewMockBroker(s.T())
mockBroker.EXPECT().ShowCollectionIDs(mock.Anything).Return(&rootcoordpb.ShowCollectionIDsResponse{
Status: merr.Success(),
DbCollections: []*rootcoordpb.DBCollections{
{
DbName: "default",
CollectionIDs: []int64{s.collID},
},
},
}, nil)
mt, err := newMeta(context.Background(), catalog, nil, mockBroker)
s.Require().NoError(err)
s.mt = mt
// ensure each test starts from a clean segment/task state
s.mt.segments = NewSegmentsInfo()
s.mt.externalCollectionTaskMeta = &externalCollectionTaskMeta{
ctx: context.Background(),
catalog: catalog,
keyLock: lock.NewKeyLock[UniqueID](),
tasks: typeutil.NewConcurrentMap[UniqueID, *indexpb.UpdateExternalCollectionTask](),
collectionID2Tasks: typeutil.NewConcurrentMap[UniqueID, *indexpb.UpdateExternalCollectionTask](),
}
collection := &collectionInfo{
ID: s.collID,
Schema: newTestSchema(),
}
collection.Schema.ExternalSource = s.externalSource
collection.Schema.ExternalSpec = s.externalSpec
s.mt.AddCollection(collection)
}
func (s *updateExternalCollectionTaskSuite) TestSetJobInfo_KeepAndAddSegments() {
// Setup: Create initial segments
seg1 := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: s.collID,
State: commonpb.SegmentState_Flushed,
NumOfRows: 1000,
},
}
seg2 := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 2,
CollectionID: s.collID,
State: commonpb.SegmentState_Flushed,
NumOfRows: 2000,
},
}
s.mt.segments.SetSegment(1, seg1)
s.mt.segments.SetSegment(2, seg2)
// Create mock allocator
mockAlloc := allocator.NewMockAllocator(s.T())
mockAlloc.EXPECT().AllocID(mock.Anything).Return(int64(100), nil).Once()
mockAlloc.EXPECT().AllocID(mock.Anything).Return(int64(101), nil).Once()
// Create task
task := newUpdateExternalCollectionTask(&indexpb.UpdateExternalCollectionTask{
TaskID: s.taskID,
CollectionID: s.collID,
ExternalSource: s.externalSource,
ExternalSpec: s.externalSpec,
State: indexpb.JobState_JobStateInit,
}, s.mt, mockAlloc)
// Create response: keep segment 1, drop segment 2, add 2 new segments
resp := &datapb.UpdateExternalCollectionResponse{
KeptSegments: []int64{1},
UpdatedSegments: []*datapb.SegmentInfo{
{
ID: 10, // placeholder
CollectionID: s.collID,
State: commonpb.SegmentState_Flushed,
NumOfRows: 3000,
},
{
ID: 20, // placeholder
CollectionID: s.collID,
State: commonpb.SegmentState_Flushed,
NumOfRows: 4000,
},
},
State: indexpb.JobState_JobStateFinished,
}
// Execute
err := task.SetJobInfo(context.Background(), resp)
s.NoError(err)
// Verify: segment 1 should still be there and flushed
seg1After := s.mt.segments.GetSegment(1)
s.NotNil(seg1After)
s.Equal(commonpb.SegmentState_Flushed, seg1After.GetState())
// Verify: segment 2 should be dropped
seg2After := s.mt.segments.GetSegment(2)
s.NotNil(seg2After)
s.Equal(commonpb.SegmentState_Dropped, seg2After.GetState())
// Verify: new segments should be added with allocated IDs
newSeg1 := s.mt.segments.GetSegment(100)
s.NotNil(newSeg1)
s.Equal(int64(3000), newSeg1.GetNumOfRows())
s.Equal(commonpb.SegmentState_Flushed, newSeg1.GetState())
newSeg2 := s.mt.segments.GetSegment(101)
s.NotNil(newSeg2)
s.Equal(int64(4000), newSeg2.GetNumOfRows())
s.Equal(commonpb.SegmentState_Flushed, newSeg2.GetState())
}
func (s *updateExternalCollectionTaskSuite) TestSetJobInfo_DropAllSegments() {
// Setup: Create initial segments
seg1 := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: s.collID,
State: commonpb.SegmentState_Flushed,
NumOfRows: 1000,
},
}
seg2 := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 2,
CollectionID: s.collID,
State: commonpb.SegmentState_Flushed,
NumOfRows: 2000,
},
}
s.mt.segments.SetSegment(1, seg1)
s.mt.segments.SetSegment(2, seg2)
// Create mock allocator
mockAlloc := allocator.NewMockAllocator(s.T())
// Create task
task := newUpdateExternalCollectionTask(&indexpb.UpdateExternalCollectionTask{
TaskID: s.taskID,
CollectionID: s.collID,
ExternalSource: s.externalSource,
ExternalSpec: s.externalSpec,
State: indexpb.JobState_JobStateInit,
}, s.mt, mockAlloc)
// Create response: drop all segments, no new segments
resp := &datapb.UpdateExternalCollectionResponse{
KeptSegments: []int64{},
UpdatedSegments: []*datapb.SegmentInfo{},
State: indexpb.JobState_JobStateFinished,
}
// Execute
err := task.SetJobInfo(context.Background(), resp)
s.NoError(err)
// Verify: all segments should be dropped
seg1After := s.mt.segments.GetSegment(1)
s.NotNil(seg1After)
s.Equal(commonpb.SegmentState_Dropped, seg1After.GetState())
seg2After := s.mt.segments.GetSegment(2)
s.NotNil(seg2After)
s.Equal(commonpb.SegmentState_Dropped, seg2After.GetState())
}
func (s *updateExternalCollectionTaskSuite) TestSetJobInfo_AllocatorError() {
// Create mock allocator that fails
mockAlloc := allocator.NewMockAllocator(s.T())
mockAlloc.EXPECT().AllocID(mock.Anything).Return(int64(0), errors.New("alloc failed"))
// Create task
task := newUpdateExternalCollectionTask(&indexpb.UpdateExternalCollectionTask{
TaskID: s.taskID,
CollectionID: s.collID,
ExternalSource: s.externalSource,
ExternalSpec: s.externalSpec,
State: indexpb.JobState_JobStateInit,
}, s.mt, mockAlloc)
// Create response with new segments
resp := &datapb.UpdateExternalCollectionResponse{
KeptSegments: []int64{},
UpdatedSegments: []*datapb.SegmentInfo{
{
ID: 10,
CollectionID: s.collID,
NumOfRows: 1000,
},
},
State: indexpb.JobState_JobStateFinished,
}
// Execute
err := task.SetJobInfo(context.Background(), resp)
s.Error(err)
s.Contains(err.Error(), "alloc failed")
}
func (s *updateExternalCollectionTaskSuite) TestQueryTaskOnWorker_Success() {
// Create mock allocator
mockAlloc := allocator.NewMockAllocator(s.T())
mockAlloc.EXPECT().AllocID(mock.Anything).Return(int64(100), nil).Maybe()
// Create task
task := newUpdateExternalCollectionTask(&indexpb.UpdateExternalCollectionTask{
TaskID: s.taskID,
CollectionID: s.collID,
State: indexpb.JobState_JobStateInProgress,
NodeID: s.nodeID,
ExternalSource: s.externalSource,
ExternalSpec: s.externalSpec,
}, s.mt, mockAlloc)
// Add task to meta
s.mt.externalCollectionTaskMeta.tasks.Insert(s.taskID, task.UpdateExternalCollectionTask)
// Create mock cluster
mockCluster := session.NewMockCluster(s.T())
mockCluster.EXPECT().QueryExternalCollectionTask(s.nodeID, s.taskID).Return(&datapb.UpdateExternalCollectionResponse{
State: indexpb.JobState_JobStateFinished,
KeptSegments: []int64{},
UpdatedSegments: []*datapb.SegmentInfo{},
}, nil)
// Execute
task.QueryTaskOnWorker(mockCluster)
// Verify task state is finished
s.Equal(indexpb.JobState_JobStateFinished, task.GetState())
}
func (s *updateExternalCollectionTaskSuite) TestQueryTaskOnWorker_Failed() {
// Create mock allocator
mockAlloc := allocator.NewMockAllocator(s.T())
// Create task
task := newUpdateExternalCollectionTask(&indexpb.UpdateExternalCollectionTask{
TaskID: s.taskID,
CollectionID: s.collID,
State: indexpb.JobState_JobStateInProgress,
NodeID: s.nodeID,
ExternalSource: s.externalSource,
ExternalSpec: s.externalSpec,
}, s.mt, mockAlloc)
// Add task to meta
s.mt.externalCollectionTaskMeta.tasks.Insert(s.taskID, task.UpdateExternalCollectionTask)
// Create mock cluster that returns failed state
mockCluster := session.NewMockCluster(s.T())
mockCluster.EXPECT().QueryExternalCollectionTask(s.nodeID, s.taskID).Return(&datapb.UpdateExternalCollectionResponse{
State: indexpb.JobState_JobStateFailed,
FailReason: "task execution failed",
}, nil)
// Execute
task.QueryTaskOnWorker(mockCluster)
// Verify task state is failed
s.Equal(indexpb.JobState_JobStateFailed, task.GetState())
s.Equal("task execution failed", task.GetFailReason())
}
func (s *updateExternalCollectionTaskSuite) TestQueryTaskOnWorker_QueryError() {
// Create mock allocator
mockAlloc := allocator.NewMockAllocator(s.T())
// Create task
task := newUpdateExternalCollectionTask(&indexpb.UpdateExternalCollectionTask{
TaskID: s.taskID,
CollectionID: s.collID,
State: indexpb.JobState_JobStateInProgress,
NodeID: s.nodeID,
ExternalSource: s.externalSource,
ExternalSpec: s.externalSpec,
}, s.mt, mockAlloc)
// Add task to meta
s.mt.externalCollectionTaskMeta.tasks.Insert(s.taskID, task.UpdateExternalCollectionTask)
// Create mock cluster that returns error
mockCluster := session.NewMockCluster(s.T())
mockCluster.EXPECT().QueryExternalCollectionTask(s.nodeID, s.taskID).Return(nil, errors.New("query failed"))
// Execute
task.QueryTaskOnWorker(mockCluster)
// Verify task state is failed
s.Equal(indexpb.JobState_JobStateFailed, task.GetState())
s.Contains(task.GetFailReason(), "query task failed")
}

View File

@ -36,6 +36,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/compaction"
"github.com/milvus-io/milvus/internal/datanode/compactor"
"github.com/milvus-io/milvus/internal/datanode/external"
"github.com/milvus-io/milvus/internal/datanode/importv2"
"github.com/milvus-io/milvus/internal/datanode/index"
"github.com/milvus-io/milvus/internal/flushcommon/syncmgr"
@ -87,6 +88,8 @@ type DataNode struct {
taskScheduler *index.TaskScheduler
taskManager *index.TaskManager
externalCollectionManager *external.ExternalCollectionManager
compactionExecutor compactor.Executor
etcdCli *clientv3.Client
@ -124,6 +127,7 @@ func NewDataNode(ctx context.Context) *DataNode {
node.storageFactory = NewChunkMgrFactory()
node.taskScheduler = sc
node.taskManager = index.NewTaskManager(ctx2)
node.externalCollectionManager = external.NewExternalCollectionManager(ctx2, 8)
node.UpdateStateCode(commonpb.StateCode_Abnormal)
expr.Register("datanode", node)
return node
@ -300,6 +304,10 @@ func (node *DataNode) Stop() error {
node.importScheduler.Close()
}
if node.externalCollectionManager != nil {
node.externalCollectionManager.Close()
}
// cleanup all running tasks
node.taskManager.DeleteAllTasks()

272
internal/datanode/external/manager.go vendored Normal file
View File

@ -0,0 +1,272 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package external
import (
"context"
"fmt"
"sync"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
"github.com/milvus-io/milvus/pkg/v2/util/conc"
)
// TaskKey uniquely identifies an external collection task.
type TaskKey struct {
ClusterID string
TaskID int64
}
// TaskInfo stores the mutable state of an external collection task.
type TaskInfo struct {
Cancel context.CancelFunc
State indexpb.JobState
FailReason string
CollID int64
KeptSegments []int64
UpdatedSegments []*datapb.SegmentInfo
}
// Clone creates a deep copy so callers can freely mutate the result.
func (t *TaskInfo) Clone() *TaskInfo {
return &TaskInfo{
Cancel: t.Cancel,
State: t.State,
FailReason: t.FailReason,
CollID: t.CollID,
KeptSegments: cloneSegmentIDs(t.KeptSegments),
UpdatedSegments: cloneSegments(t.UpdatedSegments),
}
}
func makeTaskKey(clusterID string, taskID int64) TaskKey {
return TaskKey{
ClusterID: clusterID,
TaskID: taskID,
}
}
func cloneSegmentIDs(src []int64) []int64 {
if len(src) == 0 {
return nil
}
dst := make([]int64, len(src))
copy(dst, src)
return dst
}
func extractSegmentIDs(segments []*datapb.SegmentInfo) []int64 {
if len(segments) == 0 {
return nil
}
result := make([]int64, 0, len(segments))
for _, seg := range segments {
if seg == nil {
continue
}
result = append(result, seg.GetID())
}
return result
}
// cloneSegments returns deep copies of SegmentInfo slices.
func cloneSegments(src []*datapb.SegmentInfo) []*datapb.SegmentInfo {
if len(src) == 0 {
return nil
}
cloned := make([]*datapb.SegmentInfo, len(src))
for i, seg := range src {
if seg == nil {
continue
}
cloned[i] = proto.Clone(seg).(*datapb.SegmentInfo)
}
return cloned
}
// ExternalCollectionManager supervises the lifecycle of external collection tasks
// within a single datanode.
type ExternalCollectionManager struct {
ctx context.Context
mu sync.RWMutex
tasks map[TaskKey]*TaskInfo
pool *conc.Pool[any]
closeOnce sync.Once
}
// NewExternalCollectionManager constructs a manager with the provided worker pool size.
func NewExternalCollectionManager(ctx context.Context, poolSize int) *ExternalCollectionManager {
return &ExternalCollectionManager{
ctx: ctx,
tasks: make(map[TaskKey]*TaskInfo),
pool: conc.NewPool[any](poolSize),
}
}
// Close releases all background resources.
func (m *ExternalCollectionManager) Close() {
m.closeOnce.Do(func() {
if m.pool != nil {
m.pool.Release()
}
log.Info("external collection manager closed")
})
}
// LoadOrStore adds a task entry if absent and returns the existing entry if present.
func (m *ExternalCollectionManager) LoadOrStore(clusterID string, taskID int64, info *TaskInfo) *TaskInfo {
m.mu.Lock()
defer m.mu.Unlock()
key := makeTaskKey(clusterID, taskID)
if oldInfo, ok := m.tasks[key]; ok {
return oldInfo
}
m.tasks[key] = info
return nil
}
// Get returns a cloned snapshot of a task.
func (m *ExternalCollectionManager) Get(clusterID string, taskID int64) *TaskInfo {
m.mu.RLock()
defer m.mu.RUnlock()
key := makeTaskKey(clusterID, taskID)
if info, ok := m.tasks[key]; ok {
return info.Clone()
}
return nil
}
// Delete removes the task entry and returns the previous value.
func (m *ExternalCollectionManager) Delete(clusterID string, taskID int64) *TaskInfo {
m.mu.Lock()
defer m.mu.Unlock()
key := makeTaskKey(clusterID, taskID)
if info, ok := m.tasks[key]; ok {
delete(m.tasks, key)
return info
}
return nil
}
// UpdateState updates only the state/failReason fields.
func (m *ExternalCollectionManager) UpdateState(clusterID string, taskID int64, state indexpb.JobState, failReason string) {
m.mu.Lock()
defer m.mu.Unlock()
key := makeTaskKey(clusterID, taskID)
if info, ok := m.tasks[key]; ok {
info.State = state
info.FailReason = failReason
}
}
// UpdateResult commits the latest state plus kept/updated segments atomically.
func (m *ExternalCollectionManager) UpdateResult(clusterID string, taskID int64,
state indexpb.JobState,
failReason string,
keptSegments []int64,
updatedSegments []*datapb.SegmentInfo,
) {
m.mu.Lock()
defer m.mu.Unlock()
key := makeTaskKey(clusterID, taskID)
if info, ok := m.tasks[key]; ok {
info.State = state
info.FailReason = failReason
info.KeptSegments = append([]int64(nil), keptSegments...)
info.UpdatedSegments = cloneSegments(updatedSegments)
}
}
// CancelTask triggers the context cancellation for a task if it exists.
func (m *ExternalCollectionManager) CancelTask(clusterID string, taskID int64) bool {
key := makeTaskKey(clusterID, taskID)
m.mu.RLock()
info, ok := m.tasks[key]
var cancel context.CancelFunc
if ok {
cancel = info.Cancel
}
m.mu.RUnlock()
if cancel != nil {
cancel()
}
return ok
}
// SubmitTask registers and runs a task asynchronously in the manager pool.
func (m *ExternalCollectionManager) SubmitTask(
clusterID string,
req *datapb.UpdateExternalCollectionRequest,
taskFunc func(context.Context) (*datapb.UpdateExternalCollectionResponse, error),
) error {
taskID := req.GetTaskID()
taskCtx, cancel := context.WithCancel(m.ctx)
keptSegments := extractSegmentIDs(req.GetCurrentSegments())
info := &TaskInfo{
Cancel: cancel,
State: indexpb.JobState_JobStateInProgress,
FailReason: "",
CollID: req.GetCollectionID(),
KeptSegments: keptSegments,
UpdatedSegments: nil,
}
if oldInfo := m.LoadOrStore(clusterID, taskID, info); oldInfo != nil {
return fmt.Errorf("task already exists: taskID=%d", taskID)
}
// Submit to pool
m.pool.Submit(func() (any, error) {
defer cancel()
log.Info("executing external collection task in pool",
zap.Int64("taskID", taskID),
zap.Int64("collectionID", req.GetCollectionID()))
// Execute the task
resp, err := taskFunc(taskCtx)
if err != nil {
m.UpdateResult(clusterID, taskID, indexpb.JobState_JobStateFailed, err.Error(), info.KeptSegments, nil)
log.Warn("external collection task failed",
zap.Int64("taskID", taskID),
zap.Error(err))
return nil, err
}
state := resp.GetState()
if state == indexpb.JobState_JobStateNone {
state = indexpb.JobState_JobStateFinished
}
failReason := resp.GetFailReason()
kept := resp.GetKeptSegments()
if len(kept) == 0 {
kept = info.KeptSegments
}
m.UpdateResult(clusterID, taskID, state, failReason, kept, resp.GetUpdatedSegments())
log.Info("external collection task completed",
zap.Int64("taskID", taskID))
return nil, nil
})
return nil
}

View File

@ -0,0 +1,439 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package external
import (
"context"
"errors"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
)
func TestExternalCollectionManager_Basic(t *testing.T) {
ctx := context.Background()
manager := NewExternalCollectionManager(ctx, 4)
defer manager.Close()
clusterID := "test-cluster"
taskID := int64(1)
collID := int64(100)
// Test LoadOrStore - first time should succeed
info := &TaskInfo{
Cancel: func() {},
State: indexpb.JobState_JobStateInProgress,
FailReason: "",
CollID: collID,
}
oldInfo := manager.LoadOrStore(clusterID, taskID, info)
assert.Nil(t, oldInfo)
// Test Get
retrievedInfo := manager.Get(clusterID, taskID)
assert.NotNil(t, retrievedInfo)
assert.Equal(t, indexpb.JobState_JobStateInProgress, retrievedInfo.State)
assert.Equal(t, collID, retrievedInfo.CollID)
// Test LoadOrStore - second time should return existing
newInfo := &TaskInfo{
Cancel: func() {},
State: indexpb.JobState_JobStateFinished,
FailReason: "",
CollID: collID,
}
oldInfo = manager.LoadOrStore(clusterID, taskID, newInfo)
assert.NotNil(t, oldInfo)
assert.Equal(t, indexpb.JobState_JobStateInProgress, oldInfo.State) // should still be old state
// Test UpdateState
manager.UpdateState(clusterID, taskID, indexpb.JobState_JobStateFinished, "")
retrievedInfo = manager.Get(clusterID, taskID)
assert.Equal(t, indexpb.JobState_JobStateFinished, retrievedInfo.State)
// Test Delete
deletedInfo := manager.Delete(clusterID, taskID)
assert.NotNil(t, deletedInfo)
assert.Equal(t, indexpb.JobState_JobStateFinished, deletedInfo.State)
// Verify task is deleted
retrievedInfo = manager.Get(clusterID, taskID)
assert.Nil(t, retrievedInfo)
}
func TestExternalCollectionManager_SubmitTask_Success(t *testing.T) {
ctx := context.Background()
manager := NewExternalCollectionManager(ctx, 4)
defer manager.Close()
clusterID := "test-cluster"
taskID := int64(2)
collID := int64(200)
req := &datapb.UpdateExternalCollectionRequest{
TaskID: taskID,
CollectionID: collID,
}
// Track task execution
var executed atomic.Bool
taskFunc := func(ctx context.Context) (*datapb.UpdateExternalCollectionResponse, error) {
executed.Store(true)
return &datapb.UpdateExternalCollectionResponse{
State: indexpb.JobState_JobStateFinished,
KeptSegments: []int64{1, 2},
}, nil
}
// Submit task
err := manager.SubmitTask(clusterID, req, taskFunc)
assert.NoError(t, err)
require.Eventually(t, func() bool {
return executed.Load()
}, time.Second, 10*time.Millisecond)
require.Eventually(t, func() bool {
info := manager.Get(clusterID, taskID)
return info != nil && info.State == indexpb.JobState_JobStateFinished
}, time.Second, 10*time.Millisecond)
// Verify task was executed
assert.True(t, executed.Load())
// Task info should be retained until explicit drop
info := manager.Get(clusterID, taskID)
assert.NotNil(t, info)
assert.Equal(t, indexpb.JobState_JobStateFinished, info.State)
assert.Equal(t, []int64{1, 2}, info.KeptSegments)
}
func TestExternalCollectionManager_SubmitTask_Failure(t *testing.T) {
ctx := context.Background()
manager := NewExternalCollectionManager(ctx, 4)
defer manager.Close()
clusterID := "test-cluster"
taskID := int64(3)
collID := int64(300)
req := &datapb.UpdateExternalCollectionRequest{
TaskID: taskID,
CollectionID: collID,
}
// Task function that fails
expectedError := errors.New("task execution failed")
taskFunc := func(ctx context.Context) (*datapb.UpdateExternalCollectionResponse, error) {
return nil, expectedError
}
// Submit task
err := manager.SubmitTask(clusterID, req, taskFunc)
assert.NoError(t, err) // Submit should succeed
require.Eventually(t, func() bool {
info := manager.Get(clusterID, taskID)
return info != nil && info.State == indexpb.JobState_JobStateFailed
}, time.Second, 10*time.Millisecond)
// Task info should still be present with failure state
info := manager.Get(clusterID, taskID)
assert.NotNil(t, info)
assert.Equal(t, indexpb.JobState_JobStateFailed, info.State)
assert.Equal(t, expectedError.Error(), info.FailReason)
}
func TestExternalCollectionManager_CancelTask(t *testing.T) {
ctx := context.Background()
manager := NewExternalCollectionManager(ctx, 4)
defer manager.Close()
clusterID := "test-cluster"
taskID := int64(30)
collID := int64(3000)
req := &datapb.UpdateExternalCollectionRequest{
TaskID: taskID,
CollectionID: collID,
}
cancelObserved := make(chan struct{})
taskFunc := func(ctx context.Context) (*datapb.UpdateExternalCollectionResponse, error) {
select {
case <-ctx.Done():
close(cancelObserved)
return nil, ctx.Err()
case <-time.After(time.Second):
return &datapb.UpdateExternalCollectionResponse{
State: indexpb.JobState_JobStateFinished,
}, nil
}
}
err := manager.SubmitTask(clusterID, req, taskFunc)
assert.NoError(t, err)
require.Eventually(t, func() bool {
// ensure task has been registered
info := manager.Get(clusterID, taskID)
return info != nil
}, time.Second, 10*time.Millisecond)
cancelled := manager.CancelTask(clusterID, taskID)
assert.True(t, cancelled)
require.Eventually(t, func() bool {
select {
case <-cancelObserved:
return true
default:
return false
}
}, time.Second, 10*time.Millisecond)
info := manager.Get(clusterID, taskID)
require.NotNil(t, info)
assert.Equal(t, indexpb.JobState_JobStateFailed, info.State)
assert.Contains(t, info.FailReason, "context canceled")
}
func TestCloneSegmentIDs(t *testing.T) {
src := []int64{1, 2, 3}
dst := cloneSegmentIDs(src)
assert.Equal(t, src, dst)
dst[0] = 42
assert.NotEqual(t, src[0], dst[0], "modifying clone should not affect source")
}
func TestExtractSegmentIDs(t *testing.T) {
assert.Nil(t, extractSegmentIDs(nil))
segments := []*datapb.SegmentInfo{
nil,
{ID: 1},
{ID: 2},
}
assert.Equal(t, []int64{1, 2}, extractSegmentIDs(segments))
}
func TestCancelTaskMultipleTimes(t *testing.T) {
ctx := context.Background()
manager := NewExternalCollectionManager(ctx, 1)
defer manager.Close()
var calls int32
cancelFn := func() {
atomic.AddInt32(&calls, 1)
}
clusterID := "cluster"
taskID := int64(999)
manager.LoadOrStore(clusterID, taskID, &TaskInfo{
Cancel: cancelFn,
})
require.True(t, manager.CancelTask(clusterID, taskID))
require.True(t, manager.CancelTask(clusterID, taskID))
assert.Equal(t, int32(2), calls)
}
func TestExternalCollectionManager_SubmitTask_Duplicate(t *testing.T) {
ctx := context.Background()
manager := NewExternalCollectionManager(ctx, 4)
defer manager.Close()
clusterID := "test-cluster"
taskID := int64(4)
collID := int64(400)
req := &datapb.UpdateExternalCollectionRequest{
TaskID: taskID,
CollectionID: collID,
}
// Task function that blocks
blockChan := make(chan struct{})
taskFunc := func(ctx context.Context) (*datapb.UpdateExternalCollectionResponse, error) {
<-blockChan
return &datapb.UpdateExternalCollectionResponse{
State: indexpb.JobState_JobStateFinished,
}, nil
}
// Submit first task
err := manager.SubmitTask(clusterID, req, taskFunc)
assert.NoError(t, err)
// Verify task is in progress
info := manager.Get(clusterID, taskID)
assert.NotNil(t, info)
assert.Equal(t, indexpb.JobState_JobStateInProgress, info.State)
// Try to submit duplicate task
err = manager.SubmitTask(clusterID, req, taskFunc)
assert.Error(t, err)
assert.Contains(t, err.Error(), "task already exists")
// Unblock the task
close(blockChan)
require.Eventually(t, func() bool {
info := manager.Get(clusterID, taskID)
return info != nil && info.State == indexpb.JobState_JobStateFinished
}, time.Second, 10*time.Millisecond)
}
func TestExternalCollectionManager_MultipleTasksConcurrent(t *testing.T) {
ctx := context.Background()
manager := NewExternalCollectionManager(ctx, 4)
defer manager.Close()
clusterID := "test-cluster"
numTasks := 10
// Submit multiple tasks concurrently
for i := 0; i < numTasks; i++ {
taskID := int64(i + 100)
collID := int64(i + 1000)
req := &datapb.UpdateExternalCollectionRequest{
TaskID: taskID,
CollectionID: collID,
}
taskFunc := func(ctx context.Context) (*datapb.UpdateExternalCollectionResponse, error) {
return &datapb.UpdateExternalCollectionResponse{
State: indexpb.JobState_JobStateFinished,
}, nil
}
err := manager.SubmitTask(clusterID, req, taskFunc)
assert.NoError(t, err)
}
require.Eventually(t, func() bool {
for i := 0; i < numTasks; i++ {
taskID := int64(i + 100)
info := manager.Get(clusterID, taskID)
if info == nil || info.State != indexpb.JobState_JobStateFinished {
return false
}
}
return true
}, time.Second, 10*time.Millisecond)
// Tasks remain queryable until dropped
for i := 0; i < numTasks; i++ {
taskID := int64(i + 100)
info := manager.Get(clusterID, taskID)
assert.NotNil(t, info)
assert.Equal(t, indexpb.JobState_JobStateFinished, info.State)
}
}
func TestExternalCollectionManager_Close(t *testing.T) {
ctx := context.Background()
manager := NewExternalCollectionManager(ctx, 4)
clusterID := "test-cluster"
taskID := int64(5)
collID := int64(500)
req := &datapb.UpdateExternalCollectionRequest{
TaskID: taskID,
CollectionID: collID,
}
// Submit a task
var executed atomic.Bool
started := make(chan struct{})
unblock := make(chan struct{})
taskFunc := func(ctx context.Context) (*datapb.UpdateExternalCollectionResponse, error) {
close(started)
select {
case <-unblock:
case <-ctx.Done():
return nil, ctx.Err()
}
executed.Store(true)
return &datapb.UpdateExternalCollectionResponse{
State: indexpb.JobState_JobStateFinished,
}, nil
}
err := manager.SubmitTask(clusterID, req, taskFunc)
assert.NoError(t, err)
require.Eventually(t, func() bool {
select {
case <-started:
return true
default:
return false
}
}, time.Second, 10*time.Millisecond)
// Close manager while the task is still running
manager.Close()
close(unblock)
require.Eventually(t, func() bool {
return executed.Load()
}, time.Second, 10*time.Millisecond)
// Task should have executed before close
assert.True(t, executed.Load())
}
func TestExternalCollectionManager_UpdateStateNonExistent(t *testing.T) {
ctx := context.Background()
manager := NewExternalCollectionManager(ctx, 4)
defer manager.Close()
clusterID := "test-cluster"
taskID := int64(999)
// Try to update state of non-existent task (should not panic)
manager.UpdateState(clusterID, taskID, indexpb.JobState_JobStateFinished, "")
// Get should return nil
info := manager.Get(clusterID, taskID)
assert.Nil(t, info)
}
func TestExternalCollectionManager_DeleteNonExistent(t *testing.T) {
ctx := context.Background()
manager := NewExternalCollectionManager(ctx, 4)
defer manager.Close()
clusterID := "test-cluster"
taskID := int64(888)
// Try to delete non-existent task
info := manager.Delete(clusterID, taskID)
assert.Nil(t, info)
}

View File

@ -0,0 +1,456 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package external
import (
"context"
"fmt"
"sort"
"go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
"github.com/milvus-io/milvus/pkg/v2/util/timerecord"
)
func ensureContext(ctx context.Context) error {
if ctx == nil {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
default:
return nil
}
}
// Fragment represents a data fragment from external source
type Fragment struct {
FragmentID int64
RowCount int64
}
// FragmentRowRange represents the row index range for a fragment within a segment
type FragmentRowRange struct {
FragmentID int64
StartRow int64 // inclusive
EndRow int64 // exclusive
}
// SegmentRowMapping holds the row index mapping for all fragments in a segment
type SegmentRowMapping struct {
SegmentID int64
TotalRows int64
Ranges []FragmentRowRange
Fragments []Fragment
}
// NewSegmentRowMapping creates a row mapping from fragments
// Fragments are mapped sequentially: fragment1 gets [0, rowCount1), fragment2 gets [rowCount1, rowCount1+rowCount2), etc.
func NewSegmentRowMapping(segmentID int64, fragments []Fragment) *SegmentRowMapping {
mapping := &SegmentRowMapping{
SegmentID: segmentID,
Fragments: fragments,
Ranges: make([]FragmentRowRange, len(fragments)),
}
var offset int64
for i, f := range fragments {
mapping.Ranges[i] = FragmentRowRange{
FragmentID: f.FragmentID,
StartRow: offset,
EndRow: offset + f.RowCount,
}
offset += f.RowCount
}
mapping.TotalRows = offset
return mapping
}
// GetFragmentByRowIndex returns the fragment range that contains the given row index
// Returns nil if rowIndex is out of range
// To get local index within fragment: rowIndex - range.StartRow
func (m *SegmentRowMapping) GetFragmentByRowIndex(rowIndex int64) *FragmentRowRange {
if rowIndex < 0 || rowIndex >= m.TotalRows {
return nil
}
// Binary search for efficiency
left, right := 0, len(m.Ranges)-1
for left <= right {
mid := (left + right) / 2
r := &m.Ranges[mid]
if rowIndex >= r.StartRow && rowIndex < r.EndRow {
return r
} else if rowIndex < r.StartRow {
right = mid - 1
} else {
left = mid + 1
}
}
return nil
}
// SegmentFragments maps segment ID to its fragments
type SegmentFragments map[int64][]Fragment
// SegmentResult holds a segment and its fragment row mapping
type SegmentResult struct {
Segment *datapb.SegmentInfo
RowMapping *SegmentRowMapping
}
// UpdateExternalTask handles updating external collection segments
type UpdateExternalTask struct {
ctx context.Context
cancel context.CancelFunc
req *datapb.UpdateExternalCollectionRequest
tr *timerecord.TimeRecorder
state indexpb.JobState
failReason string
// Result after execution
updatedSegments []*datapb.SegmentInfo
segmentMappings map[int64]*SegmentRowMapping // segmentID -> row mapping
}
// NewUpdateExternalTask creates a new update external task
func NewUpdateExternalTask(
ctx context.Context,
cancel context.CancelFunc,
req *datapb.UpdateExternalCollectionRequest,
) *UpdateExternalTask {
return &UpdateExternalTask{
ctx: ctx,
cancel: cancel,
req: req,
tr: timerecord.NewTimeRecorder(fmt.Sprintf("UpdateExternalTask: %d", req.GetTaskID())),
state: indexpb.JobState_JobStateInit,
segmentMappings: make(map[int64]*SegmentRowMapping),
}
}
func (t *UpdateExternalTask) Ctx() context.Context {
return t.ctx
}
func (t *UpdateExternalTask) Name() string {
return fmt.Sprintf("UpdateExternalTask-%d", t.req.GetTaskID())
}
func (t *UpdateExternalTask) OnEnqueue(ctx context.Context) error {
t.tr.RecordSpan()
log.Ctx(ctx).Info("UpdateExternalTask enqueued",
zap.Int64("taskID", t.req.GetTaskID()),
zap.Int64("collectionID", t.req.GetCollectionID()))
return nil
}
func (t *UpdateExternalTask) SetState(state indexpb.JobState, failReason string) {
t.state = state
t.failReason = failReason
}
func (t *UpdateExternalTask) GetState() indexpb.JobState {
return t.state
}
func (t *UpdateExternalTask) GetSlot() int64 {
return 1
}
func (t *UpdateExternalTask) Reset() {
t.ctx = nil
t.cancel = nil
t.req = nil
t.tr = nil
t.updatedSegments = nil
t.segmentMappings = nil
}
func (t *UpdateExternalTask) PreExecute(ctx context.Context) error {
if err := ensureContext(ctx); err != nil {
return err
}
log.Ctx(ctx).Info("UpdateExternalTask PreExecute",
zap.Int64("taskID", t.req.GetTaskID()),
zap.Int64("collectionID", t.req.GetCollectionID()))
if t.req == nil {
return fmt.Errorf("request is nil")
}
return nil
}
func (t *UpdateExternalTask) Execute(ctx context.Context) error {
if err := ensureContext(ctx); err != nil {
return err
}
log.Ctx(ctx).Info("UpdateExternalTask Execute",
zap.Int64("taskID", t.req.GetTaskID()),
zap.Int64("collectionID", t.req.GetCollectionID()))
// TODO: Fetch fragments from external source
// newFragments := fetchFragmentsFromExternalSource(t.req.GetExternalSource(), t.req.GetExternalSpec())
var newFragments []Fragment
// Build current segment -> fragments mapping
// TODO: This mapping should come from metadata or be stored in SegmentInfo
currentSegmentFragments := t.buildCurrentSegmentFragments()
// Compare and organize segments
updatedSegments, err := t.organizeSegments(ctx, currentSegmentFragments, newFragments)
if err != nil {
return err
}
t.updatedSegments = updatedSegments
return nil
}
func (t *UpdateExternalTask) PostExecute(ctx context.Context) error {
if err := ensureContext(ctx); err != nil {
return err
}
log.Ctx(ctx).Info("UpdateExternalTask PostExecute",
zap.Int64("taskID", t.req.GetTaskID()),
zap.Int64("collectionID", t.req.GetCollectionID()),
zap.Int("updatedSegments", len(t.updatedSegments)))
return nil
}
// GetUpdatedSegments returns the result segments after execution
func (t *UpdateExternalTask) GetUpdatedSegments() []*datapb.SegmentInfo {
return t.updatedSegments
}
// GetSegmentMappings returns the row mappings for all segments (segmentID -> mapping)
func (t *UpdateExternalTask) GetSegmentMappings() map[int64]*SegmentRowMapping {
return t.segmentMappings
}
// buildCurrentSegmentFragments builds segment to fragments mapping from current segments
func (t *UpdateExternalTask) buildCurrentSegmentFragments() SegmentFragments {
result := make(SegmentFragments)
// TODO: Extract fragment information from segment metadata
// For now, this is a placeholder - fragment info should be stored in segment metadata
for _, seg := range t.req.GetCurrentSegments() {
// Placeholder: each segment has its own "virtual" fragment
result[seg.GetID()] = []Fragment{
{FragmentID: seg.GetID(), RowCount: seg.GetNumOfRows()},
}
}
return result
}
// organizeSegments compares fragments and organizes them into segments
func (t *UpdateExternalTask) organizeSegments(
ctx context.Context,
currentSegmentFragments SegmentFragments,
newFragments []Fragment,
) ([]*datapb.SegmentInfo, error) {
if err := ensureContext(ctx); err != nil {
return nil, err
}
log := log.Ctx(ctx)
// Build new fragment map for quick lookup
newFragmentMap := make(map[int64]Fragment)
for _, f := range newFragments {
newFragmentMap[f.FragmentID] = f
}
// Track which fragments are used by kept segments
usedFragments := make(map[int64]bool)
var keptSegments []*datapb.SegmentInfo
// Check each current segment
for _, seg := range t.req.GetCurrentSegments() {
if err := ensureContext(ctx); err != nil {
return nil, err
}
fragments := currentSegmentFragments[seg.GetID()]
allFragmentsExist := true
// Check if all fragments of this segment still exist
for _, f := range fragments {
if _, exists := newFragmentMap[f.FragmentID]; !exists {
allFragmentsExist = false
log.Info("Fragment removed from segment",
zap.Int64("segmentID", seg.GetID()),
zap.Int64("fragmentID", f.FragmentID))
break
}
}
if allFragmentsExist {
// Keep this segment unchanged
keptSegments = append(keptSegments, seg)
for _, f := range fragments {
if err := ensureContext(ctx); err != nil {
return nil, err
}
usedFragments[f.FragmentID] = true
}
// Compute row mapping for kept segment
t.segmentMappings[seg.GetID()] = NewSegmentRowMapping(seg.GetID(), fragments)
log.Debug("Segment kept unchanged",
zap.Int64("segmentID", seg.GetID()))
} else {
// Segment invalidated - its remaining fragments become orphans
log.Info("Segment invalidated due to removed fragments",
zap.Int64("segmentID", seg.GetID()))
}
}
// Collect orphan fragments (new + from invalidated segments)
var orphanFragments []Fragment
for _, f := range newFragments {
if err := ensureContext(ctx); err != nil {
return nil, err
}
if !usedFragments[f.FragmentID] {
orphanFragments = append(orphanFragments, f)
}
}
// Organize orphan fragments into new segments with balanced row counts
newSegments, err := t.balanceFragmentsToSegments(ctx, orphanFragments)
if err != nil {
return nil, err
}
// Combine kept and new segments
result := append(keptSegments, newSegments...)
log.Info("Segment organization complete",
zap.Int("keptSegments", len(keptSegments)),
zap.Int("newSegments", len(newSegments)),
zap.Int("totalSegments", len(result)))
return result, nil
}
// balanceFragmentsToSegments organizes fragments into segments with balanced row counts
func (t *UpdateExternalTask) balanceFragmentsToSegments(ctx context.Context, fragments []Fragment) ([]*datapb.SegmentInfo, error) {
if len(fragments) == 0 {
return nil, nil
}
if err := ensureContext(ctx); err != nil {
return nil, err
}
log := log.Ctx(ctx)
// Calculate total rows
var totalRows int64
for _, f := range fragments {
if err := ensureContext(ctx); err != nil {
return nil, err
}
totalRows += f.RowCount
}
// Determine target rows per segment (use a reasonable default)
// TODO: Make this configurable or based on segment size limits
targetRowsPerSegment := int64(1000000) // 1M rows per segment as default
if totalRows < targetRowsPerSegment {
targetRowsPerSegment = totalRows
}
numSegments := (totalRows + targetRowsPerSegment - 1) / targetRowsPerSegment
if numSegments == 0 {
numSegments = 1
}
avgRowsPerSegment := totalRows / numSegments
log.Info("Balancing fragments to segments",
zap.Int("numFragments", len(fragments)),
zap.Int64("totalRows", totalRows),
zap.Int64("numSegments", numSegments),
zap.Int64("avgRowsPerSegment", avgRowsPerSegment))
// Sort fragments by row count descending for better bin-packing
sortedFragments := make([]Fragment, len(fragments))
copy(sortedFragments, fragments)
sort.Slice(sortedFragments, func(i, j int) bool {
return sortedFragments[i].RowCount > sortedFragments[j].RowCount
})
// Initialize segment bins
type segmentBin struct {
fragments []Fragment
rowCount int64
}
bins := make([]segmentBin, numSegments)
// Greedy bin-packing: assign each fragment to the bin with lowest current row count
for _, f := range sortedFragments {
if err := ensureContext(ctx); err != nil {
return nil, err
}
// Find bin with minimum row count
minIdx := 0
for i := 1; i < len(bins); i++ {
if bins[i].rowCount < bins[minIdx].rowCount {
minIdx = i
}
}
bins[minIdx].fragments = append(bins[minIdx].fragments, f)
bins[minIdx].rowCount += f.RowCount
}
// Convert bins to SegmentInfo
var result []*datapb.SegmentInfo
for i, bin := range bins {
if err := ensureContext(ctx); err != nil {
return nil, err
}
if len(bin.fragments) == 0 {
continue
}
// TODO: Generate column groups
// Just placeholder here. ID will be assigned by coordinator.
segmentID := int64(i + 1)
seg := &datapb.SegmentInfo{
ID: segmentID,
CollectionID: t.req.GetCollectionID(),
NumOfRows: bin.rowCount,
// TODO: Fill other required fields
}
result = append(result, seg)
// Compute and store row mapping for new segment
t.segmentMappings[segmentID] = NewSegmentRowMapping(segmentID, bin.fragments)
log.Debug("Created new segment from fragments",
zap.Int64("segmentID placeholder", segmentID),
zap.Int64("rowCount", bin.rowCount),
zap.Int("numFragments", len(bin.fragments)))
}
return result, nil
}

View File

@ -0,0 +1,536 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package external
import (
"context"
"testing"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
)
type UpdateExternalTaskSuite struct {
suite.Suite
collectionID int64
taskID int64
}
func (s *UpdateExternalTaskSuite) SetupSuite() {
s.collectionID = 1000
s.taskID = 1
}
func (s *UpdateExternalTaskSuite) TestNewUpdateExternalTask() {
ctx, cancel := context.WithCancel(context.Background())
req := &datapb.UpdateExternalCollectionRequest{
CollectionID: s.collectionID,
TaskID: s.taskID,
ExternalSource: "test_source",
ExternalSpec: "test_spec",
}
task := NewUpdateExternalTask(ctx, cancel, req)
s.NotNil(task)
s.Equal(s.collectionID, task.req.GetCollectionID())
s.Equal(s.taskID, task.req.GetTaskID())
s.Equal(indexpb.JobState_JobStateInit, task.GetState())
s.Contains(task.Name(), "UpdateExternalTask")
}
func (s *UpdateExternalTaskSuite) TestTaskLifecycle() {
ctx, cancel := context.WithCancel(context.Background())
req := &datapb.UpdateExternalCollectionRequest{
CollectionID: s.collectionID,
TaskID: s.taskID,
ExternalSource: "test_source",
CurrentSegments: []*datapb.SegmentInfo{
{ID: 1, CollectionID: s.collectionID, NumOfRows: 1000},
{ID: 2, CollectionID: s.collectionID, NumOfRows: 2000},
},
}
task := NewUpdateExternalTask(ctx, cancel, req)
// Test OnEnqueue
err := task.OnEnqueue(ctx)
s.NoError(err)
// Test PreExecute
err = task.PreExecute(ctx)
s.NoError(err)
// Test Execute
err = task.Execute(ctx)
s.NoError(err)
// Test PostExecute
err = task.PostExecute(ctx)
s.NoError(err)
// Test GetSlot
s.Equal(int64(1), task.GetSlot())
}
func (s *UpdateExternalTaskSuite) TestPreExecuteWithNilRequest() {
ctx, cancel := context.WithCancel(context.Background())
task := &UpdateExternalTask{
ctx: ctx,
cancel: cancel,
req: nil,
}
err := task.PreExecute(ctx)
s.Error(err)
}
func (s *UpdateExternalTaskSuite) TestSetAndGetState() {
ctx, cancel := context.WithCancel(context.Background())
req := &datapb.UpdateExternalCollectionRequest{
CollectionID: s.collectionID,
TaskID: s.taskID,
}
task := NewUpdateExternalTask(ctx, cancel, req)
task.SetState(indexpb.JobState_JobStateInProgress, "")
s.Equal(indexpb.JobState_JobStateInProgress, task.GetState())
task.SetState(indexpb.JobState_JobStateFailed, "test failure")
s.Equal(indexpb.JobState_JobStateFailed, task.GetState())
s.Equal("test failure", task.failReason)
}
func (s *UpdateExternalTaskSuite) TestReset() {
ctx, cancel := context.WithCancel(context.Background())
req := &datapb.UpdateExternalCollectionRequest{
CollectionID: s.collectionID,
TaskID: s.taskID,
}
task := NewUpdateExternalTask(ctx, cancel, req)
task.Reset()
s.Nil(task.ctx)
s.Nil(task.cancel)
s.Nil(task.req)
s.Nil(task.tr)
s.Nil(task.updatedSegments)
}
func (s *UpdateExternalTaskSuite) TestBalanceFragmentsToSegments_Empty() {
ctx, cancel := context.WithCancel(context.Background())
req := &datapb.UpdateExternalCollectionRequest{
CollectionID: s.collectionID,
TaskID: s.taskID,
}
task := NewUpdateExternalTask(ctx, cancel, req)
result, err := task.balanceFragmentsToSegments(context.Background(), []Fragment{})
s.NoError(err)
s.Nil(result)
}
func (s *UpdateExternalTaskSuite) TestBalanceFragmentsToSegments_SingleFragment() {
ctx, cancel := context.WithCancel(context.Background())
req := &datapb.UpdateExternalCollectionRequest{
CollectionID: s.collectionID,
TaskID: s.taskID,
}
task := NewUpdateExternalTask(ctx, cancel, req)
fragments := []Fragment{
{FragmentID: 1, RowCount: 500},
}
result, err := task.balanceFragmentsToSegments(context.Background(), fragments)
s.NoError(err)
s.Len(result, 1)
s.Equal(int64(500), result[0].GetNumOfRows())
}
func (s *UpdateExternalTaskSuite) TestBalanceFragmentsToSegments_MultipleFragments() {
ctx, cancel := context.WithCancel(context.Background())
req := &datapb.UpdateExternalCollectionRequest{
CollectionID: s.collectionID,
TaskID: s.taskID,
}
task := NewUpdateExternalTask(ctx, cancel, req)
fragments := []Fragment{
{FragmentID: 1, RowCount: 300000},
{FragmentID: 2, RowCount: 400000},
{FragmentID: 3, RowCount: 500000},
{FragmentID: 4, RowCount: 600000},
{FragmentID: 5, RowCount: 200000},
}
result, err := task.balanceFragmentsToSegments(context.Background(), fragments)
s.NoError(err)
// Verify total rows are preserved
var totalRows int64
for _, seg := range result {
totalRows += seg.GetNumOfRows()
}
s.Equal(int64(2000000), totalRows)
// Verify segments are reasonably balanced
if len(result) > 1 {
var minRows, maxRows int64 = result[0].GetNumOfRows(), result[0].GetNumOfRows()
for _, seg := range result {
if seg.GetNumOfRows() < minRows {
minRows = seg.GetNumOfRows()
}
if seg.GetNumOfRows() > maxRows {
maxRows = seg.GetNumOfRows()
}
}
// The difference between max and min should be reasonable
// (less than 2x the average fragment size)
avgFragmentSize := int64(2000000 / 5)
s.Less(maxRows-minRows, avgFragmentSize*2)
}
}
func (s *UpdateExternalTaskSuite) TestPreExecuteContextCanceled() {
ctx, cancel := context.WithCancel(context.Background())
req := &datapb.UpdateExternalCollectionRequest{
CollectionID: s.collectionID,
TaskID: s.taskID,
}
task := NewUpdateExternalTask(ctx, cancel, req)
cancel()
err := task.PreExecute(ctx)
s.ErrorIs(err, context.Canceled)
}
func (s *UpdateExternalTaskSuite) TestExecuteContextCanceled() {
ctx, cancel := context.WithCancel(context.Background())
req := &datapb.UpdateExternalCollectionRequest{
CollectionID: s.collectionID,
TaskID: s.taskID,
}
task := NewUpdateExternalTask(ctx, cancel, req)
cancel()
err := task.Execute(ctx)
s.ErrorIs(err, context.Canceled)
}
func (s *UpdateExternalTaskSuite) TestBalanceFragmentsToSegmentsContextCanceled() {
ctx, cancel := context.WithCancel(context.Background())
req := &datapb.UpdateExternalCollectionRequest{
CollectionID: s.collectionID,
TaskID: s.taskID,
}
task := NewUpdateExternalTask(ctx, cancel, req)
cancel()
result, err := task.balanceFragmentsToSegments(ctx, []Fragment{{FragmentID: 1, RowCount: 10}})
s.ErrorIs(err, context.Canceled)
s.Nil(result)
}
func (s *UpdateExternalTaskSuite) TestOrganizeSegments_AllFragmentsExist() {
ctx, cancel := context.WithCancel(context.Background())
req := &datapb.UpdateExternalCollectionRequest{
CollectionID: s.collectionID,
TaskID: s.taskID,
CurrentSegments: []*datapb.SegmentInfo{
{ID: 1, CollectionID: s.collectionID, NumOfRows: 1000},
{ID: 2, CollectionID: s.collectionID, NumOfRows: 2000},
},
}
task := NewUpdateExternalTask(ctx, cancel, req)
// Simulate current segment fragments mapping
currentSegmentFragments := SegmentFragments{
1: []Fragment{{FragmentID: 101, RowCount: 1000}},
2: []Fragment{{FragmentID: 102, RowCount: 2000}},
}
// New fragments contain all existing fragments
newFragments := []Fragment{
{FragmentID: 101, RowCount: 1000},
{FragmentID: 102, RowCount: 2000},
}
result, err := task.organizeSegments(context.Background(), currentSegmentFragments, newFragments)
s.NoError(err)
// Both segments should be kept
s.Len(result, 2)
}
func (s *UpdateExternalTaskSuite) TestOrganizeSegments_FragmentRemoved() {
ctx, cancel := context.WithCancel(context.Background())
req := &datapb.UpdateExternalCollectionRequest{
CollectionID: s.collectionID,
TaskID: s.taskID,
CurrentSegments: []*datapb.SegmentInfo{
{ID: 1, CollectionID: s.collectionID, NumOfRows: 1000},
{ID: 2, CollectionID: s.collectionID, NumOfRows: 2000},
},
}
task := NewUpdateExternalTask(ctx, cancel, req)
// Segment 1 has fragment 101, Segment 2 has fragments 102 and 103
currentSegmentFragments := SegmentFragments{
1: []Fragment{{FragmentID: 101, RowCount: 1000}},
2: []Fragment{{FragmentID: 102, RowCount: 1000}, {FragmentID: 103, RowCount: 1000}},
}
// Fragment 103 is removed - segment 2 should be invalidated
newFragments := []Fragment{
{FragmentID: 101, RowCount: 1000},
{FragmentID: 102, RowCount: 1000},
}
result, err := task.organizeSegments(context.Background(), currentSegmentFragments, newFragments)
s.NoError(err)
// Segment 1 should be kept, segment 2 invalidated, fragment 102 becomes orphan
// Result should have segment 1 kept + new segment for orphan fragment 102
s.GreaterOrEqual(len(result), 1)
// Verify segment 1 is in the result
hasSegment1 := false
for _, seg := range result {
if seg.GetID() == 1 {
hasSegment1 = true
break
}
}
s.True(hasSegment1, "Segment 1 should be kept")
}
func (s *UpdateExternalTaskSuite) TestOrganizeSegments_NewFragmentsAdded() {
ctx, cancel := context.WithCancel(context.Background())
req := &datapb.UpdateExternalCollectionRequest{
CollectionID: s.collectionID,
TaskID: s.taskID,
CurrentSegments: []*datapb.SegmentInfo{
{ID: 1, CollectionID: s.collectionID, NumOfRows: 1000},
},
}
task := NewUpdateExternalTask(ctx, cancel, req)
currentSegmentFragments := SegmentFragments{
1: []Fragment{{FragmentID: 101, RowCount: 1000}},
}
// New fragments include existing + new ones
newFragments := []Fragment{
{FragmentID: 101, RowCount: 1000},
{FragmentID: 102, RowCount: 2000}, // new
{FragmentID: 103, RowCount: 3000}, // new
}
result, err := task.organizeSegments(context.Background(), currentSegmentFragments, newFragments)
s.NoError(err)
// Should have segment 1 kept + new segments for orphan fragments
s.GreaterOrEqual(len(result), 2)
// Verify total rows
var totalRows int64
for _, seg := range result {
totalRows += seg.GetNumOfRows()
}
s.Equal(int64(6000), totalRows)
}
func (s *UpdateExternalTaskSuite) TestNewSegmentRowMapping() {
fragments := []Fragment{
{FragmentID: 1, RowCount: 100},
{FragmentID: 2, RowCount: 200},
{FragmentID: 3, RowCount: 150},
}
mapping := NewSegmentRowMapping(1001, fragments)
s.Equal(int64(1001), mapping.SegmentID)
s.Equal(int64(450), mapping.TotalRows)
s.Len(mapping.Ranges, 3)
// Check ranges
s.Equal(int64(1), mapping.Ranges[0].FragmentID)
s.Equal(int64(0), mapping.Ranges[0].StartRow)
s.Equal(int64(100), mapping.Ranges[0].EndRow)
s.Equal(int64(2), mapping.Ranges[1].FragmentID)
s.Equal(int64(100), mapping.Ranges[1].StartRow)
s.Equal(int64(300), mapping.Ranges[1].EndRow)
s.Equal(int64(3), mapping.Ranges[2].FragmentID)
s.Equal(int64(300), mapping.Ranges[2].StartRow)
s.Equal(int64(450), mapping.Ranges[2].EndRow)
}
func (s *UpdateExternalTaskSuite) TestGetFragmentByRowIndex() {
fragments := []Fragment{
{FragmentID: 1, RowCount: 100},
{FragmentID: 2, RowCount: 200},
{FragmentID: 3, RowCount: 150},
}
mapping := NewSegmentRowMapping(1001, fragments)
// Test first fragment
r := mapping.GetFragmentByRowIndex(0)
s.NotNil(r)
s.Equal(int64(1), r.FragmentID)
r = mapping.GetFragmentByRowIndex(99)
s.NotNil(r)
s.Equal(int64(1), r.FragmentID)
// Test second fragment
r = mapping.GetFragmentByRowIndex(100)
s.NotNil(r)
s.Equal(int64(2), r.FragmentID)
r = mapping.GetFragmentByRowIndex(299)
s.NotNil(r)
s.Equal(int64(2), r.FragmentID)
// Test third fragment
r = mapping.GetFragmentByRowIndex(300)
s.NotNil(r)
s.Equal(int64(3), r.FragmentID)
r = mapping.GetFragmentByRowIndex(449)
s.NotNil(r)
s.Equal(int64(3), r.FragmentID)
// Test out of range
r = mapping.GetFragmentByRowIndex(-1)
s.Nil(r)
r = mapping.GetFragmentByRowIndex(450)
s.Nil(r)
r = mapping.GetFragmentByRowIndex(1000)
s.Nil(r)
}
func (s *UpdateExternalTaskSuite) TestGetFragmentByRowIndex_LocalIndex() {
fragments := []Fragment{
{FragmentID: 1, RowCount: 100},
{FragmentID: 2, RowCount: 200},
}
mapping := NewSegmentRowMapping(1001, fragments)
// Row 0 -> fragment 1, local index 0
r := mapping.GetFragmentByRowIndex(0)
s.NotNil(r)
s.Equal(int64(1), r.FragmentID)
s.Equal(int64(0), 0-r.StartRow) // local index
// Row 50 -> fragment 1, local index 50
r = mapping.GetFragmentByRowIndex(50)
s.NotNil(r)
s.Equal(int64(1), r.FragmentID)
s.Equal(int64(50), 50-r.StartRow)
// Row 100 -> fragment 2, local index 0
r = mapping.GetFragmentByRowIndex(100)
s.NotNil(r)
s.Equal(int64(2), r.FragmentID)
s.Equal(int64(0), 100-r.StartRow)
// Row 150 -> fragment 2, local index 50
r = mapping.GetFragmentByRowIndex(150)
s.NotNil(r)
s.Equal(int64(2), r.FragmentID)
s.Equal(int64(50), 150-r.StartRow)
// Row 299 -> fragment 2, local index 199
r = mapping.GetFragmentByRowIndex(299)
s.NotNil(r)
s.Equal(int64(2), r.FragmentID)
s.Equal(int64(199), 299-r.StartRow)
}
func (s *UpdateExternalTaskSuite) TestSegmentRowMapping_EmptyFragments() {
mapping := NewSegmentRowMapping(1001, []Fragment{})
s.Equal(int64(0), mapping.TotalRows)
s.Len(mapping.Ranges, 0)
r := mapping.GetFragmentByRowIndex(0)
s.Nil(r)
}
func (s *UpdateExternalTaskSuite) TestMappingsComputedDuringOrganize() {
ctx, cancel := context.WithCancel(context.Background())
// Use segment ID 100 to avoid collision with placeholder ID (1)
req := &datapb.UpdateExternalCollectionRequest{
CollectionID: s.collectionID,
TaskID: s.taskID,
CurrentSegments: []*datapb.SegmentInfo{
{ID: 100, CollectionID: s.collectionID, NumOfRows: 1000},
},
}
task := NewUpdateExternalTask(ctx, cancel, req)
// Simulate current segment has fragment 101
currentSegmentFragments := SegmentFragments{
100: []Fragment{{FragmentID: 101, RowCount: 1000}},
}
// New fragments include existing + new ones
newFragments := []Fragment{
{FragmentID: 101, RowCount: 1000},
{FragmentID: 102, RowCount: 500},
}
_, err := task.organizeSegments(context.Background(), currentSegmentFragments, newFragments)
s.NoError(err)
mappings := task.GetSegmentMappings()
s.Len(mappings, 2)
// Check mapping for kept segment (ID=100)
mapping100 := mappings[100]
s.NotNil(mapping100)
s.Equal(int64(1000), mapping100.TotalRows)
s.Len(mapping100.Ranges, 1)
s.Equal(int64(101), mapping100.Ranges[0].FragmentID)
// Check mapping for new segment (ID=1, placeholder)
mapping1 := mappings[1]
s.NotNil(mapping1)
s.Equal(int64(500), mapping1.TotalRows)
s.Len(mapping1.Ranges, 1)
s.Equal(int64(102), mapping1.Ranges[0].FragmentID)
}
func TestUpdateExternalTaskSuite(t *testing.T) {
suite.Run(t, new(UpdateExternalTaskSuite))
}

View File

@ -30,6 +30,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/compaction"
"github.com/milvus-io/milvus/internal/datanode/compactor"
"github.com/milvus-io/milvus/internal/datanode/external"
"github.com/milvus-io/milvus/internal/datanode/importv2"
"github.com/milvus-io/milvus/internal/datanode/index"
"github.com/milvus-io/milvus/internal/flushcommon/io"
@ -40,6 +41,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/metrics"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
"github.com/milvus-io/milvus/pkg/v2/proto/internalpb"
"github.com/milvus-io/milvus/pkg/v2/proto/workerpb"
"github.com/milvus-io/milvus/pkg/v2/taskcommon"
@ -619,6 +621,12 @@ func (node *DataNode) CreateTask(ctx context.Context, request *workerpb.CreateTa
return merr.Status(err), nil
}
return node.createAnalyzeTask(ctx, req)
case taskcommon.ExternalCollection:
req := &datapb.UpdateExternalCollectionRequest{}
if err := proto.Unmarshal(request.GetPayload(), req); err != nil {
return merr.Status(err), nil
}
return node.createExternalCollectionTask(ctx, req)
default:
err := fmt.Errorf("unrecognized task type '%s', properties=%v", taskType, request.GetProperties())
log.Ctx(ctx).Warn("CreateTask failed", zap.Error(err))
@ -729,6 +737,31 @@ func (node *DataNode) QueryTask(ctx context.Context, request *workerpb.QueryTask
resProperties.AppendReason(results[0].GetFailReason())
}
return wrapQueryTaskResult(resp, resProperties)
case taskcommon.ExternalCollection:
// Query task state from external collection manager
info := node.externalCollectionManager.Get(clusterID, taskID)
if info == nil {
resp := &datapb.UpdateExternalCollectionResponse{
Status: merr.Success(),
State: indexpb.JobState_JobStateFailed,
FailReason: "task result not found",
}
resProperties := taskcommon.NewProperties(nil)
resProperties.AppendTaskState(taskcommon.Failed)
resProperties.AppendReason("task result not found")
return wrapQueryTaskResult(resp, resProperties)
}
resp := &datapb.UpdateExternalCollectionResponse{
Status: merr.Success(),
State: info.State,
FailReason: info.FailReason,
KeptSegments: info.KeptSegments,
UpdatedSegments: info.UpdatedSegments,
}
resProperties := taskcommon.NewProperties(nil)
resProperties.AppendTaskState(info.State)
resProperties.AppendReason(info.FailReason)
return wrapQueryTaskResult(resp, resProperties)
default:
err := fmt.Errorf("unrecognized task type '%s', properties=%v", taskType, request.GetProperties())
log.Ctx(ctx).Warn("QueryTask failed", zap.Error(err))
@ -772,6 +805,21 @@ func (node *DataNode) DropTask(ctx context.Context, request *workerpb.DropTaskRe
TaskIDs: []int64{taskID},
JobType: jobType,
})
case taskcommon.ExternalCollection:
// Drop external collection task from external collection manager
clusterID, err := properties.GetClusterID()
if err != nil {
return merr.Status(err), nil
}
cancelled := node.externalCollectionManager.CancelTask(clusterID, taskID)
info := node.externalCollectionManager.Delete(clusterID, taskID)
if !cancelled && info != nil && info.Cancel != nil {
info.Cancel()
}
log.Ctx(ctx).Info("DropTask for external collection completed",
zap.Int64("taskID", taskID),
zap.String("clusterID", clusterID))
return merr.Success(), nil
default:
err := fmt.Errorf("unrecognized task type '%s', properties=%v", taskType, request.GetProperties())
log.Ctx(ctx).Warn("DropTask failed", zap.Error(err))
@ -794,3 +842,77 @@ func (node *DataNode) SyncFileResource(ctx context.Context, req *internalpb.Sync
}
return merr.Success(), nil
}
// createExternalCollectionTask handles updating external collection segments
// This submits the task to the external collection manager for async execution
func (node *DataNode) createExternalCollectionTask(ctx context.Context, req *datapb.UpdateExternalCollectionRequest) (*commonpb.Status, error) {
log := log.Ctx(ctx).With(
zap.Int64("taskID", req.GetTaskID()),
zap.Int64("collectionID", req.GetCollectionID()),
)
log.Info("createExternalCollectionTask received",
zap.Int("currentSegments", len(req.GetCurrentSegments())),
zap.String("externalSource", req.GetExternalSource()))
if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
return merr.Status(err), nil
}
clusterID := paramtable.Get().CommonCfg.ClusterPrefix.GetValue()
// Submit task to external collection manager
// The task will execute asynchronously in the manager's goroutine pool
err := node.externalCollectionManager.SubmitTask(clusterID, req, func(taskCtx context.Context) (*datapb.UpdateExternalCollectionResponse, error) {
// Execute the task
task := external.NewUpdateExternalTask(taskCtx, func() {}, req)
if err := task.PreExecute(taskCtx); err != nil {
log.Warn("external collection task PreExecute failed", zap.Error(err))
return nil, err
}
if err := task.Execute(taskCtx); err != nil {
log.Warn("external collection task Execute failed", zap.Error(err))
return nil, err
}
if err := task.PostExecute(taskCtx); err != nil {
log.Warn("external collection task PostExecute failed", zap.Error(err))
return nil, err
}
log.Info("external collection task completed successfully",
zap.Int("updatedSegments", len(task.GetUpdatedSegments())))
resp := &datapb.UpdateExternalCollectionResponse{
Status: merr.Success(),
State: indexpb.JobState_JobStateFinished,
KeptSegments: extractSegmentIDs(req.GetCurrentSegments()),
UpdatedSegments: task.GetUpdatedSegments(),
}
return resp, nil
})
if err != nil {
log.Warn("failed to submit external collection task", zap.Error(err))
return merr.Status(err), nil
}
log.Info("external collection task submitted to manager")
return merr.Success(), nil
}
func extractSegmentIDs(segments []*datapb.SegmentInfo) []int64 {
if len(segments) == 0 {
return nil
}
result := make([]int64, 0, len(segments))
for _, seg := range segments {
if seg == nil {
continue
}
result = append(result, seg.GetID())
}
return result
}

View File

@ -1148,3 +1148,31 @@ message FileResourceInfo {
message CreateExternalCollectionResponse {
common.Status status = 1;
}
message UpdateExternalCollectionRequest {
common.MsgBase base = 1;
int64 collectionID = 2;
int64 taskID = 3;
repeated SegmentInfo currentSegments = 4;
string externalSource = 5;
string externalSpec = 6;
}
message UpdateExternalCollectionResponse {
common.Status status = 1;
repeated int64 keptSegments = 2;
repeated SegmentInfo updatedSegments = 3;
index.JobState state = 4;
string fail_reason = 5;
}
message QueryExternalCollectionRequest {
int64 taskID = 1;
}
message DropExternalCollectionRequest {
int64 taskID = 1;
}
// Note: QueryExternalCollectionRequest and DropExternalCollectionRequest are kept
// for internal use by the unified task system, but are not exposed as separate RPCs

File diff suppressed because it is too large Load Diff

View File

@ -19,13 +19,14 @@ package taskcommon
type Type = string
const (
TypeNone Type = "None"
PreImport Type = "PreImport"
Import Type = "Import"
Compaction Type = "Compaction"
Index Type = "Index"
Stats Type = "Stats"
Analyze Type = "Analyze"
TypeNone Type = "None"
PreImport Type = "PreImport"
Import Type = "Import"
Compaction Type = "Compaction"
Index Type = "Index"
Stats Type = "Stats"
Analyze Type = "Analyze"
ExternalCollection Type = "ExternalCollection"
)
var TypeList = []Type{
@ -35,4 +36,5 @@ var TypeList = []Type{
Index,
Stats,
Analyze,
ExternalCollection,
}