milvus/internal/datanode/external/manager_test.go
Bingyi Sun f9827392bb
enhance: implement external collection update task with source change detection (#45905)
issue: #45881 
Add persistent task management for external collections with automatic
detection of external_source and external_spec changes. When source
changes, the system aborts running tasks and creates new ones, ensuring
only one active task per collection. Tasks validate their source on
completion to prevent superseded tasks from committing results.

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
- Core invariant: at most one active UpdateExternalCollection task
exists per collection — tasks are serialized by collectionID
(collection-level locking) and any change to external_source or
external_spec aborts superseded tasks and causes a new task creation
(externalCollectionManager + external_collection_task_meta
collection-based locks enforce this).
- What was simplified/removed: per-task fine-grained locking and
concurrent multi-task acceptance per collection were replaced by
collection-level synchronization (external_collection_task_meta.go) and
a single persistent task lifecycle in DataCoord/Index task code;
redundant double-concurrent update paths were removed by checking
existing task presence in AddTask/LoadOrStore and aborting/overwriting
via Drop/Cancel flows.
- Why this does NOT cause data loss or regress behavior: task state
transitions and commit are validated against the current external
source/spec before applying changes — UpdateStateWithMeta and SetJobInfo
verify task metadata and persist via catalog only under matching
collection-state; DataNode externalCollectionManager persists task
results to in-memory manager and exposes Query/Drop flows (services.go)
without modifying existing segment data unless a task successfully
finishes and SetJobInfo atomically updates segments via meta/catalog
calls, preventing superseded tasks from committing stale results.
- New capability added: end-to-end external collection update workflow —
DataCoord Index task + Cluster RPC helpers + DataNode external task
runner and ExternalCollectionManager enable creating, querying,
cancelling, and applying external collection updates
(fragment-to-segment balancing, kept/updated segment handling, allocator
integration); accompanying unit tests cover success, failure,
cancellation, allocator errors, and balancing logic.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Signed-off-by: sunby <sunbingyi1992@gmail.com>
2025-12-29 19:53:21 +08:00

440 lines
12 KiB
Go

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package external
import (
"context"
"errors"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
)
func TestExternalCollectionManager_Basic(t *testing.T) {
ctx := context.Background()
manager := NewExternalCollectionManager(ctx, 4)
defer manager.Close()
clusterID := "test-cluster"
taskID := int64(1)
collID := int64(100)
// Test LoadOrStore - first time should succeed
info := &TaskInfo{
Cancel: func() {},
State: indexpb.JobState_JobStateInProgress,
FailReason: "",
CollID: collID,
}
oldInfo := manager.LoadOrStore(clusterID, taskID, info)
assert.Nil(t, oldInfo)
// Test Get
retrievedInfo := manager.Get(clusterID, taskID)
assert.NotNil(t, retrievedInfo)
assert.Equal(t, indexpb.JobState_JobStateInProgress, retrievedInfo.State)
assert.Equal(t, collID, retrievedInfo.CollID)
// Test LoadOrStore - second time should return existing
newInfo := &TaskInfo{
Cancel: func() {},
State: indexpb.JobState_JobStateFinished,
FailReason: "",
CollID: collID,
}
oldInfo = manager.LoadOrStore(clusterID, taskID, newInfo)
assert.NotNil(t, oldInfo)
assert.Equal(t, indexpb.JobState_JobStateInProgress, oldInfo.State) // should still be old state
// Test UpdateState
manager.UpdateState(clusterID, taskID, indexpb.JobState_JobStateFinished, "")
retrievedInfo = manager.Get(clusterID, taskID)
assert.Equal(t, indexpb.JobState_JobStateFinished, retrievedInfo.State)
// Test Delete
deletedInfo := manager.Delete(clusterID, taskID)
assert.NotNil(t, deletedInfo)
assert.Equal(t, indexpb.JobState_JobStateFinished, deletedInfo.State)
// Verify task is deleted
retrievedInfo = manager.Get(clusterID, taskID)
assert.Nil(t, retrievedInfo)
}
func TestExternalCollectionManager_SubmitTask_Success(t *testing.T) {
ctx := context.Background()
manager := NewExternalCollectionManager(ctx, 4)
defer manager.Close()
clusterID := "test-cluster"
taskID := int64(2)
collID := int64(200)
req := &datapb.UpdateExternalCollectionRequest{
TaskID: taskID,
CollectionID: collID,
}
// Track task execution
var executed atomic.Bool
taskFunc := func(ctx context.Context) (*datapb.UpdateExternalCollectionResponse, error) {
executed.Store(true)
return &datapb.UpdateExternalCollectionResponse{
State: indexpb.JobState_JobStateFinished,
KeptSegments: []int64{1, 2},
}, nil
}
// Submit task
err := manager.SubmitTask(clusterID, req, taskFunc)
assert.NoError(t, err)
require.Eventually(t, func() bool {
return executed.Load()
}, time.Second, 10*time.Millisecond)
require.Eventually(t, func() bool {
info := manager.Get(clusterID, taskID)
return info != nil && info.State == indexpb.JobState_JobStateFinished
}, time.Second, 10*time.Millisecond)
// Verify task was executed
assert.True(t, executed.Load())
// Task info should be retained until explicit drop
info := manager.Get(clusterID, taskID)
assert.NotNil(t, info)
assert.Equal(t, indexpb.JobState_JobStateFinished, info.State)
assert.Equal(t, []int64{1, 2}, info.KeptSegments)
}
func TestExternalCollectionManager_SubmitTask_Failure(t *testing.T) {
ctx := context.Background()
manager := NewExternalCollectionManager(ctx, 4)
defer manager.Close()
clusterID := "test-cluster"
taskID := int64(3)
collID := int64(300)
req := &datapb.UpdateExternalCollectionRequest{
TaskID: taskID,
CollectionID: collID,
}
// Task function that fails
expectedError := errors.New("task execution failed")
taskFunc := func(ctx context.Context) (*datapb.UpdateExternalCollectionResponse, error) {
return nil, expectedError
}
// Submit task
err := manager.SubmitTask(clusterID, req, taskFunc)
assert.NoError(t, err) // Submit should succeed
require.Eventually(t, func() bool {
info := manager.Get(clusterID, taskID)
return info != nil && info.State == indexpb.JobState_JobStateFailed
}, time.Second, 10*time.Millisecond)
// Task info should still be present with failure state
info := manager.Get(clusterID, taskID)
assert.NotNil(t, info)
assert.Equal(t, indexpb.JobState_JobStateFailed, info.State)
assert.Equal(t, expectedError.Error(), info.FailReason)
}
func TestExternalCollectionManager_CancelTask(t *testing.T) {
ctx := context.Background()
manager := NewExternalCollectionManager(ctx, 4)
defer manager.Close()
clusterID := "test-cluster"
taskID := int64(30)
collID := int64(3000)
req := &datapb.UpdateExternalCollectionRequest{
TaskID: taskID,
CollectionID: collID,
}
cancelObserved := make(chan struct{})
taskFunc := func(ctx context.Context) (*datapb.UpdateExternalCollectionResponse, error) {
select {
case <-ctx.Done():
close(cancelObserved)
return nil, ctx.Err()
case <-time.After(time.Second):
return &datapb.UpdateExternalCollectionResponse{
State: indexpb.JobState_JobStateFinished,
}, nil
}
}
err := manager.SubmitTask(clusterID, req, taskFunc)
assert.NoError(t, err)
require.Eventually(t, func() bool {
// ensure task has been registered
info := manager.Get(clusterID, taskID)
return info != nil
}, time.Second, 10*time.Millisecond)
cancelled := manager.CancelTask(clusterID, taskID)
assert.True(t, cancelled)
require.Eventually(t, func() bool {
select {
case <-cancelObserved:
return true
default:
return false
}
}, time.Second, 10*time.Millisecond)
info := manager.Get(clusterID, taskID)
require.NotNil(t, info)
assert.Equal(t, indexpb.JobState_JobStateFailed, info.State)
assert.Contains(t, info.FailReason, "context canceled")
}
func TestCloneSegmentIDs(t *testing.T) {
src := []int64{1, 2, 3}
dst := cloneSegmentIDs(src)
assert.Equal(t, src, dst)
dst[0] = 42
assert.NotEqual(t, src[0], dst[0], "modifying clone should not affect source")
}
func TestExtractSegmentIDs(t *testing.T) {
assert.Nil(t, extractSegmentIDs(nil))
segments := []*datapb.SegmentInfo{
nil,
{ID: 1},
{ID: 2},
}
assert.Equal(t, []int64{1, 2}, extractSegmentIDs(segments))
}
func TestCancelTaskMultipleTimes(t *testing.T) {
ctx := context.Background()
manager := NewExternalCollectionManager(ctx, 1)
defer manager.Close()
var calls int32
cancelFn := func() {
atomic.AddInt32(&calls, 1)
}
clusterID := "cluster"
taskID := int64(999)
manager.LoadOrStore(clusterID, taskID, &TaskInfo{
Cancel: cancelFn,
})
require.True(t, manager.CancelTask(clusterID, taskID))
require.True(t, manager.CancelTask(clusterID, taskID))
assert.Equal(t, int32(2), calls)
}
func TestExternalCollectionManager_SubmitTask_Duplicate(t *testing.T) {
ctx := context.Background()
manager := NewExternalCollectionManager(ctx, 4)
defer manager.Close()
clusterID := "test-cluster"
taskID := int64(4)
collID := int64(400)
req := &datapb.UpdateExternalCollectionRequest{
TaskID: taskID,
CollectionID: collID,
}
// Task function that blocks
blockChan := make(chan struct{})
taskFunc := func(ctx context.Context) (*datapb.UpdateExternalCollectionResponse, error) {
<-blockChan
return &datapb.UpdateExternalCollectionResponse{
State: indexpb.JobState_JobStateFinished,
}, nil
}
// Submit first task
err := manager.SubmitTask(clusterID, req, taskFunc)
assert.NoError(t, err)
// Verify task is in progress
info := manager.Get(clusterID, taskID)
assert.NotNil(t, info)
assert.Equal(t, indexpb.JobState_JobStateInProgress, info.State)
// Try to submit duplicate task
err = manager.SubmitTask(clusterID, req, taskFunc)
assert.Error(t, err)
assert.Contains(t, err.Error(), "task already exists")
// Unblock the task
close(blockChan)
require.Eventually(t, func() bool {
info := manager.Get(clusterID, taskID)
return info != nil && info.State == indexpb.JobState_JobStateFinished
}, time.Second, 10*time.Millisecond)
}
func TestExternalCollectionManager_MultipleTasksConcurrent(t *testing.T) {
ctx := context.Background()
manager := NewExternalCollectionManager(ctx, 4)
defer manager.Close()
clusterID := "test-cluster"
numTasks := 10
// Submit multiple tasks concurrently
for i := 0; i < numTasks; i++ {
taskID := int64(i + 100)
collID := int64(i + 1000)
req := &datapb.UpdateExternalCollectionRequest{
TaskID: taskID,
CollectionID: collID,
}
taskFunc := func(ctx context.Context) (*datapb.UpdateExternalCollectionResponse, error) {
return &datapb.UpdateExternalCollectionResponse{
State: indexpb.JobState_JobStateFinished,
}, nil
}
err := manager.SubmitTask(clusterID, req, taskFunc)
assert.NoError(t, err)
}
require.Eventually(t, func() bool {
for i := 0; i < numTasks; i++ {
taskID := int64(i + 100)
info := manager.Get(clusterID, taskID)
if info == nil || info.State != indexpb.JobState_JobStateFinished {
return false
}
}
return true
}, time.Second, 10*time.Millisecond)
// Tasks remain queryable until dropped
for i := 0; i < numTasks; i++ {
taskID := int64(i + 100)
info := manager.Get(clusterID, taskID)
assert.NotNil(t, info)
assert.Equal(t, indexpb.JobState_JobStateFinished, info.State)
}
}
func TestExternalCollectionManager_Close(t *testing.T) {
ctx := context.Background()
manager := NewExternalCollectionManager(ctx, 4)
clusterID := "test-cluster"
taskID := int64(5)
collID := int64(500)
req := &datapb.UpdateExternalCollectionRequest{
TaskID: taskID,
CollectionID: collID,
}
// Submit a task
var executed atomic.Bool
started := make(chan struct{})
unblock := make(chan struct{})
taskFunc := func(ctx context.Context) (*datapb.UpdateExternalCollectionResponse, error) {
close(started)
select {
case <-unblock:
case <-ctx.Done():
return nil, ctx.Err()
}
executed.Store(true)
return &datapb.UpdateExternalCollectionResponse{
State: indexpb.JobState_JobStateFinished,
}, nil
}
err := manager.SubmitTask(clusterID, req, taskFunc)
assert.NoError(t, err)
require.Eventually(t, func() bool {
select {
case <-started:
return true
default:
return false
}
}, time.Second, 10*time.Millisecond)
// Close manager while the task is still running
manager.Close()
close(unblock)
require.Eventually(t, func() bool {
return executed.Load()
}, time.Second, 10*time.Millisecond)
// Task should have executed before close
assert.True(t, executed.Load())
}
func TestExternalCollectionManager_UpdateStateNonExistent(t *testing.T) {
ctx := context.Background()
manager := NewExternalCollectionManager(ctx, 4)
defer manager.Close()
clusterID := "test-cluster"
taskID := int64(999)
// Try to update state of non-existent task (should not panic)
manager.UpdateState(clusterID, taskID, indexpb.JobState_JobStateFinished, "")
// Get should return nil
info := manager.Get(clusterID, taskID)
assert.Nil(t, info)
}
func TestExternalCollectionManager_DeleteNonExistent(t *testing.T) {
ctx := context.Background()
manager := NewExternalCollectionManager(ctx, 4)
defer manager.Close()
clusterID := "test-cluster"
taskID := int64(888)
// Try to delete non-existent task
info := manager.Delete(clusterID, taskID)
assert.Nil(t, info)
}