milvus/internal/datacoord/ddl_callbacks_snapshot_test.go
wei liu 975c91df16
feat: Add comprehensive snapshot functionality for collections (#44361)
issue: #44358

Implement complete snapshot management system including creation,
deletion, listing, description, and restoration capabilities across all
system components.

Key features:
- Create snapshots for entire collections
- Drop snapshots by name with proper cleanup
- List snapshots with collection filtering
- Describe snapshot details and metadata

Components added/modified:
- Client SDK with full snapshot API support and options
- DataCoord snapshot service with metadata management
- Proxy layer with task-based snapshot operations
- Protocol buffer definitions for snapshot RPCs
- Comprehensive unit tests with mockey framework
- Integration tests for end-to-end validation

Technical implementation:
- Snapshot metadata storage in etcd with proper indexing
- File-based snapshot data persistence in object storage
- Garbage collection integration for snapshot cleanup
- Error handling and validation across all operations
- Thread-safe operations with proper locking mechanisms

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
- Core invariant/assumption: snapshots are immutable point‑in‑time
captures identified by (collection, snapshot name/ID); etcd snapshot
metadata is authoritative for lifecycle (PENDING → COMMITTED → DELETING)
and per‑segment manifests live in object storage (Avro / StorageV2). GC
and restore logic must see snapshotRefIndex loaded
(snapshotMeta.IsRefIndexLoaded) before reclaiming or relying on
segment/index files.

- New capability added: full end‑to‑end snapshot subsystem — client SDK
APIs (Create/Drop/List/Describe/Restore + restore job queries),
DataCoord SnapshotWriter/Reader (Avro + StorageV2 manifests),
snapshotMeta in meta, SnapshotManager orchestration
(create/drop/describe/list/restore), copy‑segment restore
tasks/inspector/checker, proxy & RPC surface, GC integration, and
docs/tests — enabling point‑in‑time collection snapshots persisted to
object storage and restorations orchestrated across components.

- Logic removed/simplified and why: duplicated recursive
compaction/delta‑log traversal and ad‑hoc lookup code were consolidated
behind two focused APIs/owners (Handler.GetDeltaLogFromCompactTo for
delta traversal and SnapshotManager/SnapshotReader for snapshot I/O).
MixCoord/coordinator broker paths were converted to thin RPC proxies.
This eliminates multiple implementations of the same traversal/lookup,
reducing divergence and simplifying responsibility boundaries.

- Why this does NOT introduce data loss or regressions: snapshot
create/drop use explicit two‑phase semantics (PENDING → COMMIT/DELETING)
with SnapshotWriter writing manifests and metadata before commit; GC
uses snapshotRefIndex guards and
IsRefIndexLoaded/GetSnapshotBySegment/GetSnapshotByIndex checks to avoid
removing referenced files; restore flow pre‑allocates job IDs, validates
resources (partitions/indexes), performs rollback on failure
(rollbackRestoreSnapshot), and converts/updates segment/index metadata
only after successful copy tasks. Extensive unit and integration tests
exercise pending/deleting/GC/restore/error paths to ensure idempotence
and protection against premature deletion.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
2026-01-06 10:15:24 +08:00

488 lines
14 KiB
Go

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datacoord
import (
"context"
"errors"
"testing"
"github.com/bytedance/mockey"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
)
// --- Test createSnapshotV2AckCallback ---
func TestDDLCallbacks_CreateSnapshotV2AckCallback_Success(t *testing.T) {
ctx := context.Background()
// Track if CreateSnapshot was called
createSnapshotCalled := false
// Mock snapshotManager.CreateSnapshot using mockey
mockCreateSnapshot := mockey.Mock((*snapshotManager).CreateSnapshot).To(func(
sm *snapshotManager,
ctx context.Context,
collectionID int64,
name, description string,
) (int64, error) {
createSnapshotCalled = true
assert.Equal(t, int64(100), collectionID)
assert.Equal(t, "test_snapshot", name)
assert.Equal(t, "test description", description)
return 1001, nil
}).Build()
defer mockCreateSnapshot.UnPatch()
// Create DDLCallbacks with real snapshotManager (mocked methods)
server := &Server{
snapshotManager: &snapshotManager{},
}
callbacks := &DDLCallbacks{Server: server}
// Create test broadcast result using message builder
broadcastMsg := message.NewCreateSnapshotMessageBuilderV2().
WithHeader(&message.CreateSnapshotMessageHeader{
CollectionId: 100,
Name: "test_snapshot",
Description: "test description",
}).
WithBody(&message.CreateSnapshotMessageBody{}).
WithBroadcast([]string{"control_channel"}).
MustBuildBroadcast()
// Convert to typed broadcast message
typedMsg := message.MustAsBroadcastCreateSnapshotMessageV2(broadcastMsg)
result := message.BroadcastResultCreateSnapshotMessageV2{
Message: typedMsg,
}
// Execute
err := callbacks.createSnapshotV2AckCallback(ctx, result)
// Verify
assert.NoError(t, err)
assert.True(t, createSnapshotCalled)
}
func TestDDLCallbacks_CreateSnapshotV2AckCallback_CreateError(t *testing.T) {
ctx := context.Background()
expectedErr := errors.New("create snapshot error")
// Mock snapshotManager.CreateSnapshot to return error
mockCreateSnapshot := mockey.Mock((*snapshotManager).CreateSnapshot).To(func(
sm *snapshotManager,
ctx context.Context,
collectionID int64,
name, description string,
) (int64, error) {
return 0, expectedErr
}).Build()
defer mockCreateSnapshot.UnPatch()
// Create DDLCallbacks
server := &Server{
snapshotManager: &snapshotManager{},
}
callbacks := &DDLCallbacks{Server: server}
// Create test broadcast result
broadcastMsg := message.NewCreateSnapshotMessageBuilderV2().
WithHeader(&message.CreateSnapshotMessageHeader{
CollectionId: 100,
Name: "test_snapshot",
Description: "test description",
}).
WithBody(&message.CreateSnapshotMessageBody{}).
WithBroadcast([]string{"control_channel"}).
MustBuildBroadcast()
typedMsg := message.MustAsBroadcastCreateSnapshotMessageV2(broadcastMsg)
result := message.BroadcastResultCreateSnapshotMessageV2{
Message: typedMsg,
}
// Execute
err := callbacks.createSnapshotV2AckCallback(ctx, result)
// Verify
assert.Error(t, err)
assert.Equal(t, expectedErr, err)
}
// --- Test dropSnapshotV2AckCallback ---
func TestDDLCallbacks_DropSnapshotV2AckCallback_Success(t *testing.T) {
ctx := context.Background()
// Track if DropSnapshot was called
dropSnapshotCalled := false
// Mock snapshotManager.DropSnapshot using mockey
mockDropSnapshot := mockey.Mock((*snapshotManager).DropSnapshot).To(func(
sm *snapshotManager,
ctx context.Context,
name string,
) error {
dropSnapshotCalled = true
assert.Equal(t, "test_snapshot", name)
return nil
}).Build()
defer mockDropSnapshot.UnPatch()
// Create DDLCallbacks
server := &Server{
snapshotManager: &snapshotManager{},
}
callbacks := &DDLCallbacks{Server: server}
// Create test broadcast result
broadcastMsg := message.NewDropSnapshotMessageBuilderV2().
WithHeader(&message.DropSnapshotMessageHeader{
Name: "test_snapshot",
}).
WithBody(&message.DropSnapshotMessageBody{}).
WithBroadcast([]string{"control_channel"}).
MustBuildBroadcast()
typedMsg := message.MustAsBroadcastDropSnapshotMessageV2(broadcastMsg)
result := message.BroadcastResultDropSnapshotMessageV2{
Message: typedMsg,
}
// Execute
err := callbacks.dropSnapshotV2AckCallback(ctx, result)
// Verify
assert.NoError(t, err)
assert.True(t, dropSnapshotCalled)
}
func TestDDLCallbacks_DropSnapshotV2AckCallback_DropError(t *testing.T) {
ctx := context.Background()
expectedErr := errors.New("drop snapshot error")
// Mock snapshotManager.DropSnapshot to return error
mockDropSnapshot := mockey.Mock((*snapshotManager).DropSnapshot).To(func(
sm *snapshotManager,
ctx context.Context,
name string,
) error {
return expectedErr
}).Build()
defer mockDropSnapshot.UnPatch()
// Create DDLCallbacks
server := &Server{
snapshotManager: &snapshotManager{},
}
callbacks := &DDLCallbacks{Server: server}
// Create test broadcast result
broadcastMsg := message.NewDropSnapshotMessageBuilderV2().
WithHeader(&message.DropSnapshotMessageHeader{
Name: "test_snapshot",
}).
WithBody(&message.DropSnapshotMessageBody{}).
WithBroadcast([]string{"control_channel"}).
MustBuildBroadcast()
typedMsg := message.MustAsBroadcastDropSnapshotMessageV2(broadcastMsg)
result := message.BroadcastResultDropSnapshotMessageV2{
Message: typedMsg,
}
// Execute
err := callbacks.dropSnapshotV2AckCallback(ctx, result)
// Verify
assert.Error(t, err)
assert.Equal(t, expectedErr, err)
}
// --- Test restoreSnapshotV2AckCallback ---
func TestDDLCallbacks_RestoreSnapshotV2AckCallback_Success(t *testing.T) {
ctx := context.Background()
// Track calls
readSnapshotDataCalled := false
restoreDataCalled := false
getRestoreStateCalled := false
// Mock snapshotManager.ReadSnapshotData
mockReadSnapshotData := mockey.Mock((*snapshotManager).ReadSnapshotData).To(func(
sm *snapshotManager,
ctx context.Context,
name string,
) (*SnapshotData, error) {
readSnapshotDataCalled = true
assert.Equal(t, "test_snapshot", name)
return &SnapshotData{
SnapshotInfo: &datapb.SnapshotInfo{Name: name},
}, nil
}).Build()
defer mockReadSnapshotData.UnPatch()
// Mock snapshotManager.RestoreData
mockRestoreData := mockey.Mock((*snapshotManager).RestoreData).To(func(
sm *snapshotManager,
ctx context.Context,
snapshotData *SnapshotData,
collectionID int64,
jobID int64,
) (int64, error) {
restoreDataCalled = true
assert.Equal(t, int64(200), collectionID)
assert.Equal(t, int64(12345), jobID) // Verify jobID is passed from header
return jobID, nil
}).Build()
defer mockRestoreData.UnPatch()
// Mock snapshotManager.GetRestoreState to return completed immediately
mockGetRestoreState := mockey.Mock((*snapshotManager).GetRestoreState).To(func(
sm *snapshotManager,
ctx context.Context,
jobID int64,
) (*datapb.RestoreSnapshotInfo, error) {
getRestoreStateCalled = true
assert.Equal(t, int64(12345), jobID)
return &datapb.RestoreSnapshotInfo{
State: datapb.RestoreSnapshotState_RestoreSnapshotCompleted,
Progress: 100,
}, nil
}).Build()
defer mockGetRestoreState.UnPatch()
// Create DDLCallbacks
server := &Server{
snapshotManager: &snapshotManager{},
}
callbacks := &DDLCallbacks{Server: server}
// Create test broadcast result with pre-allocated jobID
broadcastMsg := message.NewRestoreSnapshotMessageBuilderV2().
WithHeader(&message.RestoreSnapshotMessageHeader{
SnapshotName: "test_snapshot",
CollectionId: 200,
JobId: 12345, // Pre-allocated jobID
}).
WithBody(&message.RestoreSnapshotMessageBody{}).
WithBroadcast([]string{"control_channel"}).
MustBuildBroadcast()
typedMsg := message.MustAsBroadcastRestoreSnapshotMessageV2(broadcastMsg)
result := message.BroadcastResultRestoreSnapshotMessageV2{
Message: typedMsg,
}
// Execute
err := callbacks.restoreSnapshotV2AckCallback(ctx, result)
// Verify
assert.NoError(t, err)
assert.True(t, readSnapshotDataCalled)
assert.True(t, restoreDataCalled)
assert.True(t, getRestoreStateCalled)
}
func TestDDLCallbacks_RestoreSnapshotV2AckCallback_ReadSnapshotDataError(t *testing.T) {
ctx := context.Background()
expectedErr := errors.New("snapshot not found")
// Mock snapshotManager.ReadSnapshotData to return error
mockReadSnapshotData := mockey.Mock((*snapshotManager).ReadSnapshotData).To(func(
sm *snapshotManager,
ctx context.Context,
name string,
) (*SnapshotData, error) {
return nil, expectedErr
}).Build()
defer mockReadSnapshotData.UnPatch()
// Create DDLCallbacks
server := &Server{
snapshotManager: &snapshotManager{},
}
callbacks := &DDLCallbacks{Server: server}
// Create test broadcast result
broadcastMsg := message.NewRestoreSnapshotMessageBuilderV2().
WithHeader(&message.RestoreSnapshotMessageHeader{
SnapshotName: "test_snapshot",
CollectionId: 200,
JobId: 12345,
}).
WithBody(&message.RestoreSnapshotMessageBody{}).
WithBroadcast([]string{"control_channel"}).
MustBuildBroadcast()
typedMsg := message.MustAsBroadcastRestoreSnapshotMessageV2(broadcastMsg)
result := message.BroadcastResultRestoreSnapshotMessageV2{
Message: typedMsg,
}
// Execute
err := callbacks.restoreSnapshotV2AckCallback(ctx, result)
// Verify
assert.Error(t, err)
assert.Equal(t, expectedErr, err)
}
func TestDDLCallbacks_RestoreSnapshotV2AckCallback_RestoreDataError(t *testing.T) {
ctx := context.Background()
expectedErr := errors.New("restore data error")
// Mock snapshotManager.ReadSnapshotData to return success
mockReadSnapshotData := mockey.Mock((*snapshotManager).ReadSnapshotData).To(func(
sm *snapshotManager,
ctx context.Context,
name string,
) (*SnapshotData, error) {
return &SnapshotData{
SnapshotInfo: &datapb.SnapshotInfo{Name: name},
}, nil
}).Build()
defer mockReadSnapshotData.UnPatch()
// Mock snapshotManager.RestoreData to return error
mockRestoreData := mockey.Mock((*snapshotManager).RestoreData).To(func(
sm *snapshotManager,
ctx context.Context,
snapshotData *SnapshotData,
collectionID int64,
jobID int64,
) (int64, error) {
return 0, expectedErr
}).Build()
defer mockRestoreData.UnPatch()
// Create DDLCallbacks
server := &Server{
snapshotManager: &snapshotManager{},
}
callbacks := &DDLCallbacks{Server: server}
// Create test broadcast result
broadcastMsg := message.NewRestoreSnapshotMessageBuilderV2().
WithHeader(&message.RestoreSnapshotMessageHeader{
SnapshotName: "test_snapshot",
CollectionId: 200,
JobId: 12345,
}).
WithBody(&message.RestoreSnapshotMessageBody{}).
WithBroadcast([]string{"control_channel"}).
MustBuildBroadcast()
typedMsg := message.MustAsBroadcastRestoreSnapshotMessageV2(broadcastMsg)
result := message.BroadcastResultRestoreSnapshotMessageV2{
Message: typedMsg,
}
// Execute
err := callbacks.restoreSnapshotV2AckCallback(ctx, result)
// Verify
assert.Error(t, err)
assert.Equal(t, expectedErr, err)
}
func TestDDLCallbacks_RestoreSnapshotV2AckCallback_GetRestoreStateError(t *testing.T) {
ctx := context.Background()
expectedErr := errors.New("get restore state error")
// Mock snapshotManager.ReadSnapshotData to return success
mockReadSnapshotData := mockey.Mock((*snapshotManager).ReadSnapshotData).To(func(
sm *snapshotManager,
ctx context.Context,
name string,
) (*SnapshotData, error) {
return &SnapshotData{
SnapshotInfo: &datapb.SnapshotInfo{Name: name},
}, nil
}).Build()
defer mockReadSnapshotData.UnPatch()
// Mock snapshotManager.RestoreData to return success
mockRestoreData := mockey.Mock((*snapshotManager).RestoreData).To(func(
sm *snapshotManager,
ctx context.Context,
snapshotData *SnapshotData,
collectionID int64,
jobID int64,
) (int64, error) {
return jobID, nil
}).Build()
defer mockRestoreData.UnPatch()
// Mock snapshotManager.GetRestoreState to return error
mockGetRestoreState := mockey.Mock((*snapshotManager).GetRestoreState).To(func(
sm *snapshotManager,
ctx context.Context,
jobID int64,
) (*datapb.RestoreSnapshotInfo, error) {
return nil, expectedErr
}).Build()
defer mockGetRestoreState.UnPatch()
// Create DDLCallbacks
server := &Server{
snapshotManager: &snapshotManager{},
}
callbacks := &DDLCallbacks{Server: server}
// Create test broadcast result
broadcastMsg := message.NewRestoreSnapshotMessageBuilderV2().
WithHeader(&message.RestoreSnapshotMessageHeader{
SnapshotName: "test_snapshot",
CollectionId: 200,
JobId: 12345,
}).
WithBody(&message.RestoreSnapshotMessageBody{}).
WithBroadcast([]string{"control_channel"}).
MustBuildBroadcast()
typedMsg := message.MustAsBroadcastRestoreSnapshotMessageV2(broadcastMsg)
result := message.BroadcastResultRestoreSnapshotMessageV2{
Message: typedMsg,
}
// Execute
err := callbacks.restoreSnapshotV2AckCallback(ctx, result)
// Verify
assert.Error(t, err)
assert.Equal(t, expectedErr, err)
}