mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
issue: #44358 Implement complete snapshot management system including creation, deletion, listing, description, and restoration capabilities across all system components. Key features: - Create snapshots for entire collections - Drop snapshots by name with proper cleanup - List snapshots with collection filtering - Describe snapshot details and metadata Components added/modified: - Client SDK with full snapshot API support and options - DataCoord snapshot service with metadata management - Proxy layer with task-based snapshot operations - Protocol buffer definitions for snapshot RPCs - Comprehensive unit tests with mockey framework - Integration tests for end-to-end validation Technical implementation: - Snapshot metadata storage in etcd with proper indexing - File-based snapshot data persistence in object storage - Garbage collection integration for snapshot cleanup - Error handling and validation across all operations - Thread-safe operations with proper locking mechanisms <!-- This is an auto-generated comment: release notes by coderabbit.ai --> - Core invariant/assumption: snapshots are immutable point‑in‑time captures identified by (collection, snapshot name/ID); etcd snapshot metadata is authoritative for lifecycle (PENDING → COMMITTED → DELETING) and per‑segment manifests live in object storage (Avro / StorageV2). GC and restore logic must see snapshotRefIndex loaded (snapshotMeta.IsRefIndexLoaded) before reclaiming or relying on segment/index files. - New capability added: full end‑to‑end snapshot subsystem — client SDK APIs (Create/Drop/List/Describe/Restore + restore job queries), DataCoord SnapshotWriter/Reader (Avro + StorageV2 manifests), snapshotMeta in meta, SnapshotManager orchestration (create/drop/describe/list/restore), copy‑segment restore tasks/inspector/checker, proxy & RPC surface, GC integration, and docs/tests — enabling point‑in‑time collection snapshots persisted to object storage and restorations orchestrated across components. - Logic removed/simplified and why: duplicated recursive compaction/delta‑log traversal and ad‑hoc lookup code were consolidated behind two focused APIs/owners (Handler.GetDeltaLogFromCompactTo for delta traversal and SnapshotManager/SnapshotReader for snapshot I/O). MixCoord/coordinator broker paths were converted to thin RPC proxies. This eliminates multiple implementations of the same traversal/lookup, reducing divergence and simplifying responsibility boundaries. - Why this does NOT introduce data loss or regressions: snapshot create/drop use explicit two‑phase semantics (PENDING → COMMIT/DELETING) with SnapshotWriter writing manifests and metadata before commit; GC uses snapshotRefIndex guards and IsRefIndexLoaded/GetSnapshotBySegment/GetSnapshotByIndex checks to avoid removing referenced files; restore flow pre‑allocates job IDs, validates resources (partitions/indexes), performs rollback on failure (rollbackRestoreSnapshot), and converts/updates segment/index metadata only after successful copy tasks. Extensive unit and integration tests exercise pending/deleting/GC/restore/error paths to ensure idempotence and protection against premature deletion. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Signed-off-by: Wei Liu <wei.liu@zilliz.com>
454 lines
12 KiB
Go
454 lines
12 KiB
Go
// Licensed to the LF AI & Data foundation under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package datacoord
|
|
|
|
import (
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/suite"
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
|
|
"github.com/milvus-io/milvus/pkg/v2/taskcommon"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/timerecord"
|
|
)
|
|
|
|
type CopySegmentTaskSuite struct {
|
|
suite.Suite
|
|
}
|
|
|
|
func TestCopySegmentTask(t *testing.T) {
|
|
suite.Run(t, new(CopySegmentTaskSuite))
|
|
}
|
|
|
|
func (s *CopySegmentTaskSuite) TestCopySegmentTask_GettersAndSetters() {
|
|
idMappings := []*datapb.CopySegmentIDMapping{
|
|
{SourceSegmentId: 1, TargetSegmentId: 101, PartitionId: 10},
|
|
{SourceSegmentId: 2, TargetSegmentId: 102, PartitionId: 10},
|
|
}
|
|
|
|
task := ©SegmentTask{
|
|
tr: timerecord.NewTimeRecorder("test task"),
|
|
times: taskcommon.NewTimes(),
|
|
}
|
|
|
|
task.task.Store(&datapb.CopySegmentTask{
|
|
TaskId: 1001,
|
|
JobId: 100,
|
|
CollectionId: 2,
|
|
NodeId: 5,
|
|
TaskVersion: 1,
|
|
TaskSlot: 1,
|
|
State: datapb.CopySegmentTaskState_CopySegmentTaskPending,
|
|
Reason: "test reason",
|
|
IdMappings: idMappings,
|
|
CreatedTs: 12345,
|
|
CompleteTs: 54321,
|
|
})
|
|
|
|
// Test all getters
|
|
s.Equal(int64(1001), task.GetTaskId())
|
|
s.Equal(int64(100), task.GetJobId())
|
|
s.Equal(int64(2), task.GetCollectionId())
|
|
s.Equal(int64(5), task.GetNodeId())
|
|
s.Equal(datapb.CopySegmentTaskState_CopySegmentTaskPending, task.GetState())
|
|
s.Equal("test reason", task.GetReason())
|
|
s.Equal(idMappings, task.GetIdMappings())
|
|
s.NotNil(task.GetTR())
|
|
|
|
// Test task.Task interface methods
|
|
s.Equal(int64(1001), task.GetTaskID())
|
|
s.Equal(taskcommon.CopySegment, task.GetTaskType())
|
|
s.Equal(taskcommon.FromCopySegmentState(datapb.CopySegmentTaskState_CopySegmentTaskPending), task.GetTaskState())
|
|
s.Equal(int64(1), task.GetTaskSlot())
|
|
s.Equal(int64(1), task.GetTaskVersion())
|
|
}
|
|
|
|
func (s *CopySegmentTaskSuite) TestCopySegmentTask_Clone() {
|
|
original := ©SegmentTask{
|
|
tr: timerecord.NewTimeRecorder("test task"),
|
|
times: taskcommon.NewTimes(),
|
|
}
|
|
|
|
original.task.Store(&datapb.CopySegmentTask{
|
|
TaskId: 1001,
|
|
JobId: 100,
|
|
CollectionId: 2,
|
|
State: datapb.CopySegmentTaskState_CopySegmentTaskPending,
|
|
Reason: "original reason",
|
|
})
|
|
|
|
// Clone the task
|
|
cloned := original.Clone()
|
|
|
|
// Verify cloned task has same values
|
|
s.Equal(original.GetTaskId(), cloned.GetTaskId())
|
|
s.Equal(original.GetJobId(), cloned.GetJobId())
|
|
s.Equal(original.GetCollectionId(), cloned.GetCollectionId())
|
|
s.Equal(original.GetState(), cloned.GetState())
|
|
s.Equal(original.GetReason(), cloned.GetReason())
|
|
|
|
// Verify time recorder is same reference
|
|
s.Equal(original.GetTR(), cloned.GetTR())
|
|
|
|
// Verify they share the same task pointer (not deep copy)
|
|
clonedImpl := cloned.(*copySegmentTask)
|
|
s.Equal(original.task.Load(), clonedImpl.task.Load())
|
|
}
|
|
|
|
func (s *CopySegmentTaskSuite) TestWithCopyTaskJob() {
|
|
task1 := ©SegmentTask{
|
|
tr: timerecord.NewTimeRecorder("task1"),
|
|
times: taskcommon.NewTimes(),
|
|
}
|
|
task1.task.Store(&datapb.CopySegmentTask{
|
|
TaskId: 1001,
|
|
JobId: 100,
|
|
})
|
|
|
|
task2 := ©SegmentTask{
|
|
tr: timerecord.NewTimeRecorder("task2"),
|
|
times: taskcommon.NewTimes(),
|
|
}
|
|
task2.task.Store(&datapb.CopySegmentTask{
|
|
TaskId: 1002,
|
|
JobId: 200,
|
|
})
|
|
|
|
filter := WithCopyTaskJob(100)
|
|
|
|
s.True(filter(task1))
|
|
s.False(filter(task2))
|
|
}
|
|
|
|
func (s *CopySegmentTaskSuite) TestWithCopyTaskStates() {
|
|
pendingTask := ©SegmentTask{
|
|
tr: timerecord.NewTimeRecorder("pending"),
|
|
times: taskcommon.NewTimes(),
|
|
}
|
|
pendingTask.task.Store(&datapb.CopySegmentTask{
|
|
TaskId: 1001,
|
|
State: datapb.CopySegmentTaskState_CopySegmentTaskPending,
|
|
})
|
|
|
|
inProgressTask := ©SegmentTask{
|
|
tr: timerecord.NewTimeRecorder("inprogress"),
|
|
times: taskcommon.NewTimes(),
|
|
}
|
|
inProgressTask.task.Store(&datapb.CopySegmentTask{
|
|
TaskId: 1002,
|
|
State: datapb.CopySegmentTaskState_CopySegmentTaskInProgress,
|
|
})
|
|
|
|
completedTask := ©SegmentTask{
|
|
tr: timerecord.NewTimeRecorder("completed"),
|
|
times: taskcommon.NewTimes(),
|
|
}
|
|
completedTask.task.Store(&datapb.CopySegmentTask{
|
|
TaskId: 1003,
|
|
State: datapb.CopySegmentTaskState_CopySegmentTaskCompleted,
|
|
})
|
|
|
|
// Test filter with single state
|
|
filter1 := WithCopyTaskStates(datapb.CopySegmentTaskState_CopySegmentTaskPending)
|
|
s.True(filter1(pendingTask))
|
|
s.False(filter1(inProgressTask))
|
|
s.False(filter1(completedTask))
|
|
|
|
// Test filter with multiple states
|
|
filter2 := WithCopyTaskStates(
|
|
datapb.CopySegmentTaskState_CopySegmentTaskPending,
|
|
datapb.CopySegmentTaskState_CopySegmentTaskInProgress,
|
|
)
|
|
s.True(filter2(pendingTask))
|
|
s.True(filter2(inProgressTask))
|
|
s.False(filter2(completedTask))
|
|
}
|
|
|
|
func (s *CopySegmentTaskSuite) TestUpdateCopyTaskState() {
|
|
task := ©SegmentTask{
|
|
tr: timerecord.NewTimeRecorder("test task"),
|
|
times: taskcommon.NewTimes(),
|
|
}
|
|
task.task.Store(&datapb.CopySegmentTask{
|
|
TaskId: 1001,
|
|
State: datapb.CopySegmentTaskState_CopySegmentTaskPending,
|
|
})
|
|
|
|
// Apply update action
|
|
action := UpdateCopyTaskState(datapb.CopySegmentTaskState_CopySegmentTaskInProgress)
|
|
action(task)
|
|
|
|
// Verify state is updated
|
|
s.Equal(datapb.CopySegmentTaskState_CopySegmentTaskInProgress, task.GetState())
|
|
}
|
|
|
|
func (s *CopySegmentTaskSuite) TestUpdateCopyTaskReason() {
|
|
task := ©SegmentTask{
|
|
tr: timerecord.NewTimeRecorder("test task"),
|
|
times: taskcommon.NewTimes(),
|
|
}
|
|
task.task.Store(&datapb.CopySegmentTask{
|
|
TaskId: 1001,
|
|
Reason: "",
|
|
})
|
|
|
|
// Apply update action
|
|
action := UpdateCopyTaskReason("task failed")
|
|
action(task)
|
|
|
|
// Verify reason is updated
|
|
s.Equal("task failed", task.GetReason())
|
|
}
|
|
|
|
func (s *CopySegmentTaskSuite) TestUpdateCopyTaskNodeID() {
|
|
task := ©SegmentTask{
|
|
tr: timerecord.NewTimeRecorder("test task"),
|
|
times: taskcommon.NewTimes(),
|
|
}
|
|
task.task.Store(&datapb.CopySegmentTask{
|
|
TaskId: 1001,
|
|
NodeId: 0,
|
|
})
|
|
|
|
// Apply update action
|
|
action := UpdateCopyTaskNodeID(5)
|
|
action(task)
|
|
|
|
// Verify node ID is updated
|
|
s.Equal(int64(5), task.GetNodeId())
|
|
}
|
|
|
|
func (s *CopySegmentTaskSuite) TestUpdateCopyTaskCompleteTs() {
|
|
task := ©SegmentTask{
|
|
tr: timerecord.NewTimeRecorder("test task"),
|
|
times: taskcommon.NewTimes(),
|
|
}
|
|
task.task.Store(&datapb.CopySegmentTask{
|
|
TaskId: 1001,
|
|
CompleteTs: 0,
|
|
})
|
|
|
|
now := uint64(time.Now().UnixNano())
|
|
|
|
// Apply update action
|
|
action := UpdateCopyTaskCompleteTs(now)
|
|
action(task)
|
|
|
|
// Verify complete timestamp is updated
|
|
s.Equal(now, task.task.Load().GetCompleteTs())
|
|
}
|
|
|
|
func (s *CopySegmentTaskSuite) TestMultipleUpdateActions() {
|
|
task := ©SegmentTask{
|
|
tr: timerecord.NewTimeRecorder("test task"),
|
|
times: taskcommon.NewTimes(),
|
|
}
|
|
task.task.Store(&datapb.CopySegmentTask{
|
|
TaskId: 1001,
|
|
State: datapb.CopySegmentTaskState_CopySegmentTaskPending,
|
|
NodeId: 0,
|
|
Reason: "",
|
|
CompleteTs: 0,
|
|
})
|
|
|
|
// Apply multiple update actions
|
|
actions := []UpdateCopySegmentTaskAction{
|
|
UpdateCopyTaskState(datapb.CopySegmentTaskState_CopySegmentTaskCompleted),
|
|
UpdateCopyTaskNodeID(5),
|
|
UpdateCopyTaskReason("completed successfully"),
|
|
UpdateCopyTaskCompleteTs(12345),
|
|
}
|
|
|
|
for _, action := range actions {
|
|
action(task)
|
|
}
|
|
|
|
// Verify all updates are applied
|
|
s.Equal(datapb.CopySegmentTaskState_CopySegmentTaskCompleted, task.GetState())
|
|
s.Equal(int64(5), task.GetNodeId())
|
|
s.Equal("completed successfully", task.GetReason())
|
|
s.Equal(uint64(12345), task.task.Load().GetCompleteTs())
|
|
}
|
|
|
|
func (s *CopySegmentTaskSuite) TestWrapCopySegmentTaskLog() {
|
|
task := ©SegmentTask{
|
|
tr: timerecord.NewTimeRecorder("test task"),
|
|
times: taskcommon.NewTimes(),
|
|
}
|
|
task.task.Store(&datapb.CopySegmentTask{
|
|
TaskId: 1001,
|
|
JobId: 100,
|
|
CollectionId: 2,
|
|
State: datapb.CopySegmentTaskState_CopySegmentTaskPending,
|
|
})
|
|
|
|
// Test without extra fields
|
|
fields := WrapCopySegmentTaskLog(task)
|
|
s.Len(fields, 4)
|
|
|
|
// Verify field names
|
|
fieldNames := make(map[string]bool)
|
|
for _, field := range fields {
|
|
fieldNames[field.Key] = true
|
|
}
|
|
s.True(fieldNames["taskID"])
|
|
s.True(fieldNames["jobID"])
|
|
s.True(fieldNames["collectionID"])
|
|
s.True(fieldNames["state"])
|
|
|
|
// Test with extra fields
|
|
extraFields := []zap.Field{
|
|
zap.String("extra", "value"),
|
|
zap.Int64("number", 123),
|
|
}
|
|
fieldsWithExtra := WrapCopySegmentTaskLog(task, extraFields...)
|
|
s.Len(fieldsWithExtra, 6)
|
|
}
|
|
|
|
func (s *CopySegmentTaskSuite) TestCombinedFilters() {
|
|
tasks := []CopySegmentTask{
|
|
©SegmentTask{
|
|
tr: timerecord.NewTimeRecorder("task1"),
|
|
times: taskcommon.NewTimes(),
|
|
},
|
|
©SegmentTask{
|
|
tr: timerecord.NewTimeRecorder("task2"),
|
|
times: taskcommon.NewTimes(),
|
|
},
|
|
©SegmentTask{
|
|
tr: timerecord.NewTimeRecorder("task3"),
|
|
times: taskcommon.NewTimes(),
|
|
},
|
|
©SegmentTask{
|
|
tr: timerecord.NewTimeRecorder("task4"),
|
|
times: taskcommon.NewTimes(),
|
|
},
|
|
}
|
|
|
|
tasks[0].(*copySegmentTask).task.Store(&datapb.CopySegmentTask{
|
|
TaskId: 1001,
|
|
JobId: 100,
|
|
State: datapb.CopySegmentTaskState_CopySegmentTaskPending,
|
|
})
|
|
tasks[1].(*copySegmentTask).task.Store(&datapb.CopySegmentTask{
|
|
TaskId: 1002,
|
|
JobId: 100,
|
|
State: datapb.CopySegmentTaskState_CopySegmentTaskInProgress,
|
|
})
|
|
tasks[2].(*copySegmentTask).task.Store(&datapb.CopySegmentTask{
|
|
TaskId: 1003,
|
|
JobId: 200,
|
|
State: datapb.CopySegmentTaskState_CopySegmentTaskPending,
|
|
})
|
|
tasks[3].(*copySegmentTask).task.Store(&datapb.CopySegmentTask{
|
|
TaskId: 1004,
|
|
JobId: 100,
|
|
State: datapb.CopySegmentTaskState_CopySegmentTaskCompleted,
|
|
})
|
|
|
|
// Filter: jobID = 100 AND state = Pending or InProgress
|
|
jobFilter := WithCopyTaskJob(100)
|
|
stateFilter := WithCopyTaskStates(
|
|
datapb.CopySegmentTaskState_CopySegmentTaskPending,
|
|
datapb.CopySegmentTaskState_CopySegmentTaskInProgress,
|
|
)
|
|
|
|
var filtered []CopySegmentTask
|
|
for _, task := range tasks {
|
|
if jobFilter(task) && stateFilter(task) {
|
|
filtered = append(filtered, task)
|
|
}
|
|
}
|
|
|
|
// Should match tasks 1001 and 1002
|
|
s.Len(filtered, 2)
|
|
s.Equal(int64(1001), filtered[0].GetTaskId())
|
|
s.Equal(int64(1002), filtered[1].GetTaskId())
|
|
}
|
|
|
|
func (s *CopySegmentTaskSuite) TestTaskTimeOperations() {
|
|
task := ©SegmentTask{
|
|
tr: timerecord.NewTimeRecorder("test task"),
|
|
times: taskcommon.NewTimes(),
|
|
}
|
|
task.task.Store(&datapb.CopySegmentTask{
|
|
TaskId: 1001,
|
|
})
|
|
|
|
// Set and get task time
|
|
now := time.Now()
|
|
task.SetTaskTime(taskcommon.TimeStart, now)
|
|
|
|
retrievedTime := task.GetTaskTime(taskcommon.TimeStart)
|
|
s.True(retrievedTime.Equal(now))
|
|
}
|
|
|
|
func (s *CopySegmentTaskSuite) TestTaskWithEmptyIdMappings() {
|
|
task := ©SegmentTask{
|
|
tr: timerecord.NewTimeRecorder("test task"),
|
|
times: taskcommon.NewTimes(),
|
|
}
|
|
task.task.Store(&datapb.CopySegmentTask{
|
|
TaskId: 1001,
|
|
IdMappings: nil,
|
|
})
|
|
|
|
// Should return nil, not panic
|
|
mappings := task.GetIdMappings()
|
|
s.Nil(mappings)
|
|
}
|
|
|
|
func (s *CopySegmentTaskSuite) TestStateConversion() {
|
|
testCases := []struct {
|
|
copySegmentState datapb.CopySegmentTaskState
|
|
expectedState taskcommon.State
|
|
}{
|
|
{datapb.CopySegmentTaskState_CopySegmentTaskPending, taskcommon.Init},
|
|
{datapb.CopySegmentTaskState_CopySegmentTaskInProgress, taskcommon.InProgress},
|
|
{datapb.CopySegmentTaskState_CopySegmentTaskCompleted, taskcommon.Finished},
|
|
{datapb.CopySegmentTaskState_CopySegmentTaskFailed, taskcommon.Failed},
|
|
}
|
|
|
|
for _, tc := range testCases {
|
|
task := ©SegmentTask{
|
|
tr: timerecord.NewTimeRecorder("test task"),
|
|
times: taskcommon.NewTimes(),
|
|
}
|
|
task.task.Store(&datapb.CopySegmentTask{
|
|
TaskId: 1001,
|
|
State: tc.copySegmentState,
|
|
})
|
|
|
|
s.Equal(tc.expectedState, task.GetTaskState(),
|
|
"State conversion failed for %v", tc.copySegmentState)
|
|
}
|
|
}
|
|
|
|
func (s *CopySegmentTaskSuite) TestTaskType() {
|
|
task := ©SegmentTask{
|
|
tr: timerecord.NewTimeRecorder("test task"),
|
|
times: taskcommon.NewTimes(),
|
|
}
|
|
task.task.Store(&datapb.CopySegmentTask{
|
|
TaskId: 1001,
|
|
})
|
|
|
|
s.Equal(taskcommon.CopySegment, task.GetTaskType())
|
|
}
|