milvus/internal/datacoord/copy_segment_task_test.go
wei liu 975c91df16
feat: Add comprehensive snapshot functionality for collections (#44361)
issue: #44358

Implement complete snapshot management system including creation,
deletion, listing, description, and restoration capabilities across all
system components.

Key features:
- Create snapshots for entire collections
- Drop snapshots by name with proper cleanup
- List snapshots with collection filtering
- Describe snapshot details and metadata

Components added/modified:
- Client SDK with full snapshot API support and options
- DataCoord snapshot service with metadata management
- Proxy layer with task-based snapshot operations
- Protocol buffer definitions for snapshot RPCs
- Comprehensive unit tests with mockey framework
- Integration tests for end-to-end validation

Technical implementation:
- Snapshot metadata storage in etcd with proper indexing
- File-based snapshot data persistence in object storage
- Garbage collection integration for snapshot cleanup
- Error handling and validation across all operations
- Thread-safe operations with proper locking mechanisms

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
- Core invariant/assumption: snapshots are immutable point‑in‑time
captures identified by (collection, snapshot name/ID); etcd snapshot
metadata is authoritative for lifecycle (PENDING → COMMITTED → DELETING)
and per‑segment manifests live in object storage (Avro / StorageV2). GC
and restore logic must see snapshotRefIndex loaded
(snapshotMeta.IsRefIndexLoaded) before reclaiming or relying on
segment/index files.

- New capability added: full end‑to‑end snapshot subsystem — client SDK
APIs (Create/Drop/List/Describe/Restore + restore job queries),
DataCoord SnapshotWriter/Reader (Avro + StorageV2 manifests),
snapshotMeta in meta, SnapshotManager orchestration
(create/drop/describe/list/restore), copy‑segment restore
tasks/inspector/checker, proxy & RPC surface, GC integration, and
docs/tests — enabling point‑in‑time collection snapshots persisted to
object storage and restorations orchestrated across components.

- Logic removed/simplified and why: duplicated recursive
compaction/delta‑log traversal and ad‑hoc lookup code were consolidated
behind two focused APIs/owners (Handler.GetDeltaLogFromCompactTo for
delta traversal and SnapshotManager/SnapshotReader for snapshot I/O).
MixCoord/coordinator broker paths were converted to thin RPC proxies.
This eliminates multiple implementations of the same traversal/lookup,
reducing divergence and simplifying responsibility boundaries.

- Why this does NOT introduce data loss or regressions: snapshot
create/drop use explicit two‑phase semantics (PENDING → COMMIT/DELETING)
with SnapshotWriter writing manifests and metadata before commit; GC
uses snapshotRefIndex guards and
IsRefIndexLoaded/GetSnapshotBySegment/GetSnapshotByIndex checks to avoid
removing referenced files; restore flow pre‑allocates job IDs, validates
resources (partitions/indexes), performs rollback on failure
(rollbackRestoreSnapshot), and converts/updates segment/index metadata
only after successful copy tasks. Extensive unit and integration tests
exercise pending/deleting/GC/restore/error paths to ensure idempotence
and protection against premature deletion.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
2026-01-06 10:15:24 +08:00

454 lines
12 KiB
Go

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datacoord
import (
"testing"
"time"
"github.com/stretchr/testify/suite"
"go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/taskcommon"
"github.com/milvus-io/milvus/pkg/v2/util/timerecord"
)
type CopySegmentTaskSuite struct {
suite.Suite
}
func TestCopySegmentTask(t *testing.T) {
suite.Run(t, new(CopySegmentTaskSuite))
}
func (s *CopySegmentTaskSuite) TestCopySegmentTask_GettersAndSetters() {
idMappings := []*datapb.CopySegmentIDMapping{
{SourceSegmentId: 1, TargetSegmentId: 101, PartitionId: 10},
{SourceSegmentId: 2, TargetSegmentId: 102, PartitionId: 10},
}
task := &copySegmentTask{
tr: timerecord.NewTimeRecorder("test task"),
times: taskcommon.NewTimes(),
}
task.task.Store(&datapb.CopySegmentTask{
TaskId: 1001,
JobId: 100,
CollectionId: 2,
NodeId: 5,
TaskVersion: 1,
TaskSlot: 1,
State: datapb.CopySegmentTaskState_CopySegmentTaskPending,
Reason: "test reason",
IdMappings: idMappings,
CreatedTs: 12345,
CompleteTs: 54321,
})
// Test all getters
s.Equal(int64(1001), task.GetTaskId())
s.Equal(int64(100), task.GetJobId())
s.Equal(int64(2), task.GetCollectionId())
s.Equal(int64(5), task.GetNodeId())
s.Equal(datapb.CopySegmentTaskState_CopySegmentTaskPending, task.GetState())
s.Equal("test reason", task.GetReason())
s.Equal(idMappings, task.GetIdMappings())
s.NotNil(task.GetTR())
// Test task.Task interface methods
s.Equal(int64(1001), task.GetTaskID())
s.Equal(taskcommon.CopySegment, task.GetTaskType())
s.Equal(taskcommon.FromCopySegmentState(datapb.CopySegmentTaskState_CopySegmentTaskPending), task.GetTaskState())
s.Equal(int64(1), task.GetTaskSlot())
s.Equal(int64(1), task.GetTaskVersion())
}
func (s *CopySegmentTaskSuite) TestCopySegmentTask_Clone() {
original := &copySegmentTask{
tr: timerecord.NewTimeRecorder("test task"),
times: taskcommon.NewTimes(),
}
original.task.Store(&datapb.CopySegmentTask{
TaskId: 1001,
JobId: 100,
CollectionId: 2,
State: datapb.CopySegmentTaskState_CopySegmentTaskPending,
Reason: "original reason",
})
// Clone the task
cloned := original.Clone()
// Verify cloned task has same values
s.Equal(original.GetTaskId(), cloned.GetTaskId())
s.Equal(original.GetJobId(), cloned.GetJobId())
s.Equal(original.GetCollectionId(), cloned.GetCollectionId())
s.Equal(original.GetState(), cloned.GetState())
s.Equal(original.GetReason(), cloned.GetReason())
// Verify time recorder is same reference
s.Equal(original.GetTR(), cloned.GetTR())
// Verify they share the same task pointer (not deep copy)
clonedImpl := cloned.(*copySegmentTask)
s.Equal(original.task.Load(), clonedImpl.task.Load())
}
func (s *CopySegmentTaskSuite) TestWithCopyTaskJob() {
task1 := &copySegmentTask{
tr: timerecord.NewTimeRecorder("task1"),
times: taskcommon.NewTimes(),
}
task1.task.Store(&datapb.CopySegmentTask{
TaskId: 1001,
JobId: 100,
})
task2 := &copySegmentTask{
tr: timerecord.NewTimeRecorder("task2"),
times: taskcommon.NewTimes(),
}
task2.task.Store(&datapb.CopySegmentTask{
TaskId: 1002,
JobId: 200,
})
filter := WithCopyTaskJob(100)
s.True(filter(task1))
s.False(filter(task2))
}
func (s *CopySegmentTaskSuite) TestWithCopyTaskStates() {
pendingTask := &copySegmentTask{
tr: timerecord.NewTimeRecorder("pending"),
times: taskcommon.NewTimes(),
}
pendingTask.task.Store(&datapb.CopySegmentTask{
TaskId: 1001,
State: datapb.CopySegmentTaskState_CopySegmentTaskPending,
})
inProgressTask := &copySegmentTask{
tr: timerecord.NewTimeRecorder("inprogress"),
times: taskcommon.NewTimes(),
}
inProgressTask.task.Store(&datapb.CopySegmentTask{
TaskId: 1002,
State: datapb.CopySegmentTaskState_CopySegmentTaskInProgress,
})
completedTask := &copySegmentTask{
tr: timerecord.NewTimeRecorder("completed"),
times: taskcommon.NewTimes(),
}
completedTask.task.Store(&datapb.CopySegmentTask{
TaskId: 1003,
State: datapb.CopySegmentTaskState_CopySegmentTaskCompleted,
})
// Test filter with single state
filter1 := WithCopyTaskStates(datapb.CopySegmentTaskState_CopySegmentTaskPending)
s.True(filter1(pendingTask))
s.False(filter1(inProgressTask))
s.False(filter1(completedTask))
// Test filter with multiple states
filter2 := WithCopyTaskStates(
datapb.CopySegmentTaskState_CopySegmentTaskPending,
datapb.CopySegmentTaskState_CopySegmentTaskInProgress,
)
s.True(filter2(pendingTask))
s.True(filter2(inProgressTask))
s.False(filter2(completedTask))
}
func (s *CopySegmentTaskSuite) TestUpdateCopyTaskState() {
task := &copySegmentTask{
tr: timerecord.NewTimeRecorder("test task"),
times: taskcommon.NewTimes(),
}
task.task.Store(&datapb.CopySegmentTask{
TaskId: 1001,
State: datapb.CopySegmentTaskState_CopySegmentTaskPending,
})
// Apply update action
action := UpdateCopyTaskState(datapb.CopySegmentTaskState_CopySegmentTaskInProgress)
action(task)
// Verify state is updated
s.Equal(datapb.CopySegmentTaskState_CopySegmentTaskInProgress, task.GetState())
}
func (s *CopySegmentTaskSuite) TestUpdateCopyTaskReason() {
task := &copySegmentTask{
tr: timerecord.NewTimeRecorder("test task"),
times: taskcommon.NewTimes(),
}
task.task.Store(&datapb.CopySegmentTask{
TaskId: 1001,
Reason: "",
})
// Apply update action
action := UpdateCopyTaskReason("task failed")
action(task)
// Verify reason is updated
s.Equal("task failed", task.GetReason())
}
func (s *CopySegmentTaskSuite) TestUpdateCopyTaskNodeID() {
task := &copySegmentTask{
tr: timerecord.NewTimeRecorder("test task"),
times: taskcommon.NewTimes(),
}
task.task.Store(&datapb.CopySegmentTask{
TaskId: 1001,
NodeId: 0,
})
// Apply update action
action := UpdateCopyTaskNodeID(5)
action(task)
// Verify node ID is updated
s.Equal(int64(5), task.GetNodeId())
}
func (s *CopySegmentTaskSuite) TestUpdateCopyTaskCompleteTs() {
task := &copySegmentTask{
tr: timerecord.NewTimeRecorder("test task"),
times: taskcommon.NewTimes(),
}
task.task.Store(&datapb.CopySegmentTask{
TaskId: 1001,
CompleteTs: 0,
})
now := uint64(time.Now().UnixNano())
// Apply update action
action := UpdateCopyTaskCompleteTs(now)
action(task)
// Verify complete timestamp is updated
s.Equal(now, task.task.Load().GetCompleteTs())
}
func (s *CopySegmentTaskSuite) TestMultipleUpdateActions() {
task := &copySegmentTask{
tr: timerecord.NewTimeRecorder("test task"),
times: taskcommon.NewTimes(),
}
task.task.Store(&datapb.CopySegmentTask{
TaskId: 1001,
State: datapb.CopySegmentTaskState_CopySegmentTaskPending,
NodeId: 0,
Reason: "",
CompleteTs: 0,
})
// Apply multiple update actions
actions := []UpdateCopySegmentTaskAction{
UpdateCopyTaskState(datapb.CopySegmentTaskState_CopySegmentTaskCompleted),
UpdateCopyTaskNodeID(5),
UpdateCopyTaskReason("completed successfully"),
UpdateCopyTaskCompleteTs(12345),
}
for _, action := range actions {
action(task)
}
// Verify all updates are applied
s.Equal(datapb.CopySegmentTaskState_CopySegmentTaskCompleted, task.GetState())
s.Equal(int64(5), task.GetNodeId())
s.Equal("completed successfully", task.GetReason())
s.Equal(uint64(12345), task.task.Load().GetCompleteTs())
}
func (s *CopySegmentTaskSuite) TestWrapCopySegmentTaskLog() {
task := &copySegmentTask{
tr: timerecord.NewTimeRecorder("test task"),
times: taskcommon.NewTimes(),
}
task.task.Store(&datapb.CopySegmentTask{
TaskId: 1001,
JobId: 100,
CollectionId: 2,
State: datapb.CopySegmentTaskState_CopySegmentTaskPending,
})
// Test without extra fields
fields := WrapCopySegmentTaskLog(task)
s.Len(fields, 4)
// Verify field names
fieldNames := make(map[string]bool)
for _, field := range fields {
fieldNames[field.Key] = true
}
s.True(fieldNames["taskID"])
s.True(fieldNames["jobID"])
s.True(fieldNames["collectionID"])
s.True(fieldNames["state"])
// Test with extra fields
extraFields := []zap.Field{
zap.String("extra", "value"),
zap.Int64("number", 123),
}
fieldsWithExtra := WrapCopySegmentTaskLog(task, extraFields...)
s.Len(fieldsWithExtra, 6)
}
func (s *CopySegmentTaskSuite) TestCombinedFilters() {
tasks := []CopySegmentTask{
&copySegmentTask{
tr: timerecord.NewTimeRecorder("task1"),
times: taskcommon.NewTimes(),
},
&copySegmentTask{
tr: timerecord.NewTimeRecorder("task2"),
times: taskcommon.NewTimes(),
},
&copySegmentTask{
tr: timerecord.NewTimeRecorder("task3"),
times: taskcommon.NewTimes(),
},
&copySegmentTask{
tr: timerecord.NewTimeRecorder("task4"),
times: taskcommon.NewTimes(),
},
}
tasks[0].(*copySegmentTask).task.Store(&datapb.CopySegmentTask{
TaskId: 1001,
JobId: 100,
State: datapb.CopySegmentTaskState_CopySegmentTaskPending,
})
tasks[1].(*copySegmentTask).task.Store(&datapb.CopySegmentTask{
TaskId: 1002,
JobId: 100,
State: datapb.CopySegmentTaskState_CopySegmentTaskInProgress,
})
tasks[2].(*copySegmentTask).task.Store(&datapb.CopySegmentTask{
TaskId: 1003,
JobId: 200,
State: datapb.CopySegmentTaskState_CopySegmentTaskPending,
})
tasks[3].(*copySegmentTask).task.Store(&datapb.CopySegmentTask{
TaskId: 1004,
JobId: 100,
State: datapb.CopySegmentTaskState_CopySegmentTaskCompleted,
})
// Filter: jobID = 100 AND state = Pending or InProgress
jobFilter := WithCopyTaskJob(100)
stateFilter := WithCopyTaskStates(
datapb.CopySegmentTaskState_CopySegmentTaskPending,
datapb.CopySegmentTaskState_CopySegmentTaskInProgress,
)
var filtered []CopySegmentTask
for _, task := range tasks {
if jobFilter(task) && stateFilter(task) {
filtered = append(filtered, task)
}
}
// Should match tasks 1001 and 1002
s.Len(filtered, 2)
s.Equal(int64(1001), filtered[0].GetTaskId())
s.Equal(int64(1002), filtered[1].GetTaskId())
}
func (s *CopySegmentTaskSuite) TestTaskTimeOperations() {
task := &copySegmentTask{
tr: timerecord.NewTimeRecorder("test task"),
times: taskcommon.NewTimes(),
}
task.task.Store(&datapb.CopySegmentTask{
TaskId: 1001,
})
// Set and get task time
now := time.Now()
task.SetTaskTime(taskcommon.TimeStart, now)
retrievedTime := task.GetTaskTime(taskcommon.TimeStart)
s.True(retrievedTime.Equal(now))
}
func (s *CopySegmentTaskSuite) TestTaskWithEmptyIdMappings() {
task := &copySegmentTask{
tr: timerecord.NewTimeRecorder("test task"),
times: taskcommon.NewTimes(),
}
task.task.Store(&datapb.CopySegmentTask{
TaskId: 1001,
IdMappings: nil,
})
// Should return nil, not panic
mappings := task.GetIdMappings()
s.Nil(mappings)
}
func (s *CopySegmentTaskSuite) TestStateConversion() {
testCases := []struct {
copySegmentState datapb.CopySegmentTaskState
expectedState taskcommon.State
}{
{datapb.CopySegmentTaskState_CopySegmentTaskPending, taskcommon.Init},
{datapb.CopySegmentTaskState_CopySegmentTaskInProgress, taskcommon.InProgress},
{datapb.CopySegmentTaskState_CopySegmentTaskCompleted, taskcommon.Finished},
{datapb.CopySegmentTaskState_CopySegmentTaskFailed, taskcommon.Failed},
}
for _, tc := range testCases {
task := &copySegmentTask{
tr: timerecord.NewTimeRecorder("test task"),
times: taskcommon.NewTimes(),
}
task.task.Store(&datapb.CopySegmentTask{
TaskId: 1001,
State: tc.copySegmentState,
})
s.Equal(tc.expectedState, task.GetTaskState(),
"State conversion failed for %v", tc.copySegmentState)
}
}
func (s *CopySegmentTaskSuite) TestTaskType() {
task := &copySegmentTask{
tr: timerecord.NewTimeRecorder("test task"),
times: taskcommon.NewTimes(),
}
task.task.Store(&datapb.CopySegmentTask{
TaskId: 1001,
})
s.Equal(taskcommon.CopySegment, task.GetTaskType())
}