wei liu 975c91df16
feat: Add comprehensive snapshot functionality for collections (#44361)
issue: #44358

Implement complete snapshot management system including creation,
deletion, listing, description, and restoration capabilities across all
system components.

Key features:
- Create snapshots for entire collections
- Drop snapshots by name with proper cleanup
- List snapshots with collection filtering
- Describe snapshot details and metadata

Components added/modified:
- Client SDK with full snapshot API support and options
- DataCoord snapshot service with metadata management
- Proxy layer with task-based snapshot operations
- Protocol buffer definitions for snapshot RPCs
- Comprehensive unit tests with mockey framework
- Integration tests for end-to-end validation

Technical implementation:
- Snapshot metadata storage in etcd with proper indexing
- File-based snapshot data persistence in object storage
- Garbage collection integration for snapshot cleanup
- Error handling and validation across all operations
- Thread-safe operations with proper locking mechanisms

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
- Core invariant/assumption: snapshots are immutable point‑in‑time
captures identified by (collection, snapshot name/ID); etcd snapshot
metadata is authoritative for lifecycle (PENDING → COMMITTED → DELETING)
and per‑segment manifests live in object storage (Avro / StorageV2). GC
and restore logic must see snapshotRefIndex loaded
(snapshotMeta.IsRefIndexLoaded) before reclaiming or relying on
segment/index files.

- New capability added: full end‑to‑end snapshot subsystem — client SDK
APIs (Create/Drop/List/Describe/Restore + restore job queries),
DataCoord SnapshotWriter/Reader (Avro + StorageV2 manifests),
snapshotMeta in meta, SnapshotManager orchestration
(create/drop/describe/list/restore), copy‑segment restore
tasks/inspector/checker, proxy & RPC surface, GC integration, and
docs/tests — enabling point‑in‑time collection snapshots persisted to
object storage and restorations orchestrated across components.

- Logic removed/simplified and why: duplicated recursive
compaction/delta‑log traversal and ad‑hoc lookup code were consolidated
behind two focused APIs/owners (Handler.GetDeltaLogFromCompactTo for
delta traversal and SnapshotManager/SnapshotReader for snapshot I/O).
MixCoord/coordinator broker paths were converted to thin RPC proxies.
This eliminates multiple implementations of the same traversal/lookup,
reducing divergence and simplifying responsibility boundaries.

- Why this does NOT introduce data loss or regressions: snapshot
create/drop use explicit two‑phase semantics (PENDING → COMMIT/DELETING)
with SnapshotWriter writing manifests and metadata before commit; GC
uses snapshotRefIndex guards and
IsRefIndexLoaded/GetSnapshotBySegment/GetSnapshotByIndex checks to avoid
removing referenced files; restore flow pre‑allocates job IDs, validates
resources (partitions/indexes), performs rollback on failure
(rollbackRestoreSnapshot), and converts/updates segment/index metadata
only after successful copy tasks. Extensive unit and integration tests
exercise pending/deleting/GC/restore/error paths to ensure idempotence
and protection against premature deletion.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
2026-01-06 10:15:24 +08:00

764 lines
26 KiB
Go

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package session
import (
"context"
"sync"
"time"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/workerpb"
"github.com/milvus-io/milvus/pkg/v2/taskcommon"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)
// WorkerSlots represents the slot information for a worker node
type WorkerSlots struct {
NodeID int64
AvailableSlots int64
}
// Cluster defines the interface for tasks
type Cluster interface {
// QuerySlot returns the slot information for all worker nodes
QuerySlot() map[int64]*WorkerSlots
// CreateCompaction creates a new compaction task on the specified node
CreateCompaction(nodeID int64, in *datapb.CompactionPlan) error
// QueryCompaction queries the status of a compaction task
QueryCompaction(nodeID int64, in *datapb.CompactionStateRequest) (*datapb.CompactionPlanResult, error)
// DropCompaction drops a compaction task
DropCompaction(nodeID int64, planID int64) error
// CreatePreImport creates a pre-import task
CreatePreImport(nodeID int64, in *datapb.PreImportRequest, taskSlot int64) error
// CreateImport creates an import task
CreateImport(nodeID int64, in *datapb.ImportRequest, taskSlot int64) error
// QueryPreImport queries the status of a pre-import task
QueryPreImport(nodeID int64, in *datapb.QueryPreImportRequest) (*datapb.QueryPreImportResponse, error)
// QueryImport queries the status of an import task
QueryImport(nodeID int64, in *datapb.QueryImportRequest) (*datapb.QueryImportResponse, error)
// DropImport drops an import task
DropImport(nodeID int64, taskID int64) error
// CreateIndex creates an index building task
CreateIndex(nodeID int64, in *workerpb.CreateJobRequest) error
// QueryIndex queries the status of index building tasks
QueryIndex(nodeID int64, in *workerpb.QueryJobsRequest) (*workerpb.IndexJobResults, error)
// DropIndex drops an index building task
DropIndex(nodeID int64, taskID int64) error
// CreateStats creates a statistics collection task
CreateStats(nodeID int64, in *workerpb.CreateStatsRequest) error
// QueryStats queries the status of statistics collection tasks
QueryStats(nodeID int64, in *workerpb.QueryJobsRequest) (*workerpb.StatsResults, error)
// DropStats drops a statistics collection task
DropStats(nodeID int64, taskID int64) error
// CreateAnalyze creates an analysis task
CreateAnalyze(nodeID int64, in *workerpb.AnalyzeRequest) error
// QueryAnalyze queries the status of analysis tasks
QueryAnalyze(nodeID int64, in *workerpb.QueryJobsRequest) (*workerpb.AnalyzeResults, error)
// DropAnalyze drops an analysis task
DropAnalyze(nodeID int64, taskID int64) error
// CreateExternalCollectionTask creates and executes an external collection task
CreateExternalCollectionTask(nodeID int64, req *datapb.UpdateExternalCollectionRequest) error
// QueryExternalCollectionTask queries the status of an external collection task
QueryExternalCollectionTask(nodeID int64, taskID int64) (*datapb.UpdateExternalCollectionResponse, error)
// DropExternalCollectionTask drops an external collection task
DropExternalCollectionTask(nodeID int64, taskID int64) error
// CreateCopySegment creates a copy segment task
CreateCopySegment(nodeID int64, in *datapb.CopySegmentRequest) error
// QueryCopySegment queries the status of a copy segment task
QueryCopySegment(nodeID int64, in *datapb.QueryCopySegmentRequest) (*datapb.QueryCopySegmentResponse, error)
// DropCopySegment drops a copy segment task
DropCopySegment(nodeID int64, taskID int64) error
}
var _ Cluster = (*cluster)(nil)
// cluster implements the Cluster interface
type cluster struct {
nm NodeManager
}
// NewCluster creates a new instance of cluster
func NewCluster(nm NodeManager) Cluster {
c := &cluster{
nm: nm,
}
return c
}
func (c *cluster) createTask(nodeID int64, in proto.Message, properties taskcommon.Properties) error {
timeout := paramtable.Get().DataCoordCfg.RequestTimeoutSeconds.GetAsDuration(time.Second)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
cli, err := c.nm.GetClient(nodeID)
if err != nil {
log.Ctx(ctx).Warn("failed to get client", zap.Error(err))
return err
}
payload, err := proto.Marshal(in)
if err != nil {
log.Ctx(ctx).Warn("marshal request failed", zap.Error(err))
return err
}
status, err := cli.CreateTask(ctx, &workerpb.CreateTaskRequest{
Payload: payload,
Properties: properties,
})
return merr.CheckRPCCall(status, err)
}
func (c *cluster) queryTask(nodeID int64, properties taskcommon.Properties) (*workerpb.QueryTaskResponse, error) {
timeout := paramtable.Get().DataCoordCfg.RequestTimeoutSeconds.GetAsDuration(time.Second)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
cli, err := c.nm.GetClient(nodeID)
if err != nil {
log.Ctx(ctx).Warn("failed to get client", zap.Error(err))
return nil, err
}
resp, err := cli.QueryTask(ctx, &workerpb.QueryTaskRequest{
Properties: properties,
})
if err = merr.CheckRPCCall(resp.GetStatus(), err); err != nil {
return nil, err
}
return resp, nil
}
func (c *cluster) dropTask(nodeID int64, properties taskcommon.Properties) error {
timeout := paramtable.Get().DataCoordCfg.RequestTimeoutSeconds.GetAsDuration(time.Second)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
cli, err := c.nm.GetClient(nodeID)
if err != nil {
log.Ctx(ctx).Warn("failed to get client", zap.Error(err))
return err
}
status, err := cli.DropTask(ctx, &workerpb.DropTaskRequest{
Properties: properties,
})
return merr.CheckRPCCall(status, err)
}
func (c *cluster) QuerySlot() map[int64]*WorkerSlots {
var (
mu = &sync.Mutex{}
wg = &sync.WaitGroup{}
availableNodeSlots = make(map[int64]*WorkerSlots)
)
for _, nodeID := range c.nm.GetClientIDs() {
wg.Add(1)
nodeID := nodeID
go func() {
defer wg.Done()
timeout := paramtable.Get().DataCoordCfg.RequestTimeoutSeconds.GetAsDuration(time.Second)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
cli, err := c.nm.GetClient(nodeID)
if err != nil {
log.Ctx(ctx).Warn("failed to get client", zap.Error(err))
return
}
resp, err := cli.QuerySlot(ctx, &datapb.QuerySlotRequest{})
if err = merr.CheckRPCCall(resp.GetStatus(), err); err != nil {
log.Ctx(ctx).Warn("failed to get node slot", zap.Int64("nodeID", nodeID), zap.Error(err))
return
}
mu.Lock()
defer mu.Unlock()
availableNodeSlots[nodeID] = &WorkerSlots{
NodeID: nodeID,
AvailableSlots: resp.GetAvailableSlots(),
}
}()
}
wg.Wait()
log.Ctx(context.TODO()).Debug("query slot done", zap.Any("nodeSlots", availableNodeSlots))
return availableNodeSlots
}
func (c *cluster) CreateCompaction(nodeID int64, in *datapb.CompactionPlan) error {
properties := taskcommon.NewProperties(nil)
properties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
properties.AppendTaskID(in.GetPlanID())
properties.AppendType(taskcommon.Compaction)
properties.AppendTaskSlot(in.GetSlotUsage())
return c.createTask(nodeID, in, properties)
}
func (c *cluster) QueryCompaction(nodeID int64, in *datapb.CompactionStateRequest) (*datapb.CompactionPlanResult, error) {
reqProperties := taskcommon.NewProperties(nil)
reqProperties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
reqProperties.AppendTaskID(in.GetPlanID())
reqProperties.AppendType(taskcommon.Compaction)
resp, err := c.queryTask(nodeID, reqProperties)
if err != nil {
return nil, err
}
resProperties := taskcommon.NewProperties(resp.GetProperties())
state, err := resProperties.GetTaskState()
if err != nil {
return nil, err
}
defaultResult := &datapb.CompactionPlanResult{State: taskcommon.ToCompactionState(state)}
payloadResultF := func() (*datapb.CompactionPlanResult, error) {
result := &datapb.CompactionStateResponse{}
err = proto.Unmarshal(resp.GetPayload(), result)
if err != nil {
return nil, err
}
var ret *datapb.CompactionPlanResult
for _, rst := range result.GetResults() {
if rst.GetPlanID() != in.GetPlanID() {
continue
}
err = binlog.CompressCompactionBinlogs(rst.GetSegments())
if err != nil {
return nil, err
}
ret = rst
break
}
return ret, err
}
switch state {
case taskcommon.None, taskcommon.Init, taskcommon.Retry:
return defaultResult, nil
case taskcommon.InProgress:
if resp.GetPayload() != nil {
return payloadResultF()
}
return defaultResult, nil
case taskcommon.Finished, taskcommon.Failed:
if resp.GetPayload() != nil {
return payloadResultF()
}
panic("the compaction result payload must not be empty with Finished/Failed state")
default:
panic("should not happen")
}
}
func (c *cluster) DropCompaction(nodeID int64, planID int64) error {
properties := taskcommon.NewProperties(nil)
properties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
properties.AppendTaskID(planID)
properties.AppendType(taskcommon.Compaction)
return c.dropTask(nodeID, properties)
}
func (c *cluster) CreatePreImport(nodeID int64, in *datapb.PreImportRequest, taskSlot int64) error {
// TODO: sheep, use taskSlot in request
properties := taskcommon.NewProperties(nil)
properties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
properties.AppendTaskID(in.GetTaskID())
properties.AppendType(taskcommon.PreImport)
properties.AppendTaskSlot(taskSlot)
return c.createTask(nodeID, in, properties)
}
func (c *cluster) CreateImport(nodeID int64, in *datapb.ImportRequest, taskSlot int64) error {
properties := taskcommon.NewProperties(nil)
properties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
properties.AppendTaskID(in.GetTaskID())
properties.AppendType(taskcommon.Import)
properties.AppendTaskSlot(taskSlot)
return c.createTask(nodeID, in, properties)
}
func (c *cluster) QueryPreImport(nodeID int64, in *datapb.QueryPreImportRequest) (*datapb.QueryPreImportResponse, error) {
repProperties := taskcommon.NewProperties(nil)
repProperties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
repProperties.AppendTaskID(in.GetTaskID())
repProperties.AppendType(taskcommon.PreImport)
resp, err := c.queryTask(nodeID, repProperties)
if err != nil {
return nil, err
}
resProperties := taskcommon.NewProperties(resp.GetProperties())
state, err := resProperties.GetTaskState()
if err != nil {
return nil, err
}
reason := resProperties.GetTaskReason()
defaultResult := &datapb.QueryPreImportResponse{State: taskcommon.ToImportState(state), Reason: reason}
payloadResultF := func() (*datapb.QueryPreImportResponse, error) {
result := &datapb.QueryPreImportResponse{}
err = proto.Unmarshal(resp.GetPayload(), result)
if err != nil {
return nil, err
}
return result, nil
}
switch state {
case taskcommon.None, taskcommon.Init, taskcommon.Retry:
return defaultResult, nil
case taskcommon.InProgress:
if resp.GetPayload() != nil {
return payloadResultF()
}
return defaultResult, nil
case taskcommon.Finished, taskcommon.Failed:
if resp.GetPayload() != nil {
return payloadResultF()
}
log.Warn("the preImport result payload must not be empty",
zap.Int64("taskID", in.GetTaskID()), zap.String("state", state.String()))
panic("the preImport result payload must not be empty with Finished/Failed state")
default:
panic("should not happen")
}
}
func (c *cluster) QueryImport(nodeID int64, in *datapb.QueryImportRequest) (*datapb.QueryImportResponse, error) {
reqProperties := taskcommon.NewProperties(nil)
reqProperties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
reqProperties.AppendTaskID(in.GetTaskID())
reqProperties.AppendType(taskcommon.Import)
resp, err := c.queryTask(nodeID, reqProperties)
if err != nil {
return nil, err
}
resProperties := taskcommon.NewProperties(resp.GetProperties())
state, err := resProperties.GetTaskState()
if err != nil {
return nil, err
}
reason := resProperties.GetTaskReason()
defaultResult := &datapb.QueryImportResponse{State: taskcommon.ToImportState(state), Reason: reason}
payloadResultF := func() (*datapb.QueryImportResponse, error) {
result := &datapb.QueryImportResponse{}
err = proto.Unmarshal(resp.GetPayload(), result)
if err != nil {
return nil, err
}
return result, nil
}
switch state {
case taskcommon.None, taskcommon.Init, taskcommon.Retry:
return defaultResult, nil
case taskcommon.InProgress:
if resp.GetPayload() != nil {
return payloadResultF()
}
return defaultResult, nil
case taskcommon.Finished, taskcommon.Failed:
if resp.GetPayload() != nil {
return payloadResultF()
}
log.Warn("the import result payload must not be empty",
zap.Int64("taskID", in.GetTaskID()), zap.String("state", state.String()))
panic("the import result payload must not be empty with Finished/Failed state")
default:
panic("should not happen")
}
}
func (c *cluster) DropImport(nodeID int64, taskID int64) error {
properties := taskcommon.NewProperties(nil)
properties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
properties.AppendTaskID(taskID)
properties.AppendType(taskcommon.Import)
return c.dropTask(nodeID, properties)
}
func (c *cluster) CreateIndex(nodeID int64, in *workerpb.CreateJobRequest) error {
properties := taskcommon.NewProperties(nil)
properties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
properties.AppendTaskID(in.GetBuildID())
properties.AppendType(taskcommon.Index)
properties.AppendTaskSlot(in.GetTaskSlot())
properties.AppendNumRows(in.GetNumRows())
properties.AppendTaskVersion(in.GetIndexVersion())
return c.createTask(nodeID, in, properties)
}
func (c *cluster) QueryIndex(nodeID int64, in *workerpb.QueryJobsRequest) (*workerpb.IndexJobResults, error) {
reqProperties := taskcommon.NewProperties(nil)
reqProperties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
reqProperties.AppendTaskID(in.GetTaskIDs()[0])
reqProperties.AppendType(taskcommon.Index)
resp, err := c.queryTask(nodeID, reqProperties)
if err != nil {
return nil, err
}
resProperties := taskcommon.NewProperties(resp.GetProperties())
state, err := resProperties.GetTaskState()
if err != nil {
return nil, err
}
reason := resProperties.GetTaskReason()
defaultResult := &workerpb.IndexJobResults{
Results: []*workerpb.IndexTaskInfo{
{
BuildID: in.GetTaskIDs()[0],
State: commonpb.IndexState(state),
FailReason: reason,
},
},
}
payloadResultF := func() (*workerpb.IndexJobResults, error) {
result := &workerpb.QueryJobsV2Response{}
err = proto.Unmarshal(resp.GetPayload(), result)
if err != nil {
return nil, err
}
return result.GetIndexJobResults(), nil
}
switch state {
case taskcommon.None, taskcommon.Init, taskcommon.Retry:
return defaultResult, nil
case taskcommon.InProgress:
if resp.GetPayload() != nil {
return payloadResultF()
}
return defaultResult, nil
case taskcommon.Finished, taskcommon.Failed:
if resp.GetPayload() != nil {
return payloadResultF()
}
log.Warn("the index result payload must not be empty",
zap.Int64("taskID", in.GetTaskIDs()[0]), zap.String("state", state.String()))
panic("the index result payload must not be empty with Finished/Failed state")
default:
panic("should not happen")
}
}
func (c *cluster) DropIndex(nodeID int64, taskID int64) error {
properties := taskcommon.NewProperties(nil)
properties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
properties.AppendTaskID(taskID)
properties.AppendType(taskcommon.Index)
return c.dropTask(nodeID, properties)
}
func (c *cluster) CreateStats(nodeID int64, in *workerpb.CreateStatsRequest) error {
properties := taskcommon.NewProperties(nil)
properties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
properties.AppendTaskID(in.GetTaskID())
properties.AppendType(taskcommon.Stats)
properties.AppendSubType(in.GetSubJobType().String())
properties.AppendTaskSlot(in.GetTaskSlot())
properties.AppendNumRows(in.GetNumRows())
properties.AppendTaskVersion(in.GetTaskVersion())
return c.createTask(nodeID, in, properties)
}
func (c *cluster) QueryStats(nodeID int64, in *workerpb.QueryJobsRequest) (*workerpb.StatsResults, error) {
reqProperties := taskcommon.NewProperties(nil)
reqProperties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
reqProperties.AppendTaskID(in.GetTaskIDs()[0])
reqProperties.AppendType(taskcommon.Stats)
resp, err := c.queryTask(nodeID, reqProperties)
if err != nil {
return nil, err
}
resProperties := taskcommon.NewProperties(resp.GetProperties())
state, err := resProperties.GetTaskState()
if err != nil {
return nil, err
}
reason := resProperties.GetTaskReason()
defaultResult := &workerpb.StatsResults{
Results: []*workerpb.StatsResult{
{
TaskID: in.GetTaskIDs()[0],
State: state,
FailReason: reason,
},
},
}
payloadResultF := func() (*workerpb.StatsResults, error) {
result := &workerpb.QueryJobsV2Response{}
err = proto.Unmarshal(resp.GetPayload(), result)
if err != nil {
return nil, err
}
return result.GetStatsJobResults(), nil
}
switch state {
case taskcommon.None, taskcommon.Init, taskcommon.Retry:
return defaultResult, nil
case taskcommon.InProgress:
if resp.GetPayload() != nil {
return payloadResultF()
}
return defaultResult, nil
case taskcommon.Finished, taskcommon.Failed:
if resp.GetPayload() != nil {
return payloadResultF()
}
log.Warn("the stats result payload must not be empty",
zap.Int64("taskID", in.GetTaskIDs()[0]), zap.String("state", state.String()))
panic("the stats result payload must not be empty with Finished/Failed state")
default:
panic("should not happen")
}
}
func (c *cluster) DropStats(nodeID int64, taskID int64) error {
properties := taskcommon.NewProperties(nil)
properties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
properties.AppendTaskID(taskID)
properties.AppendType(taskcommon.Stats)
return c.dropTask(nodeID, properties)
}
func (c *cluster) CreateAnalyze(nodeID int64, in *workerpb.AnalyzeRequest) error {
properties := taskcommon.NewProperties(nil)
properties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
properties.AppendTaskID(in.GetTaskID())
properties.AppendType(taskcommon.Analyze)
properties.AppendTaskSlot(in.GetTaskSlot())
properties.AppendTaskVersion(in.GetVersion())
return c.createTask(nodeID, in, properties)
}
func (c *cluster) QueryAnalyze(nodeID int64, in *workerpb.QueryJobsRequest) (*workerpb.AnalyzeResults, error) {
reqProperties := taskcommon.NewProperties(nil)
reqProperties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
reqProperties.AppendTaskID(in.GetTaskIDs()[0])
reqProperties.AppendType(taskcommon.Analyze)
resp, err := c.queryTask(nodeID, reqProperties)
if err != nil {
return nil, err
}
resProperties := taskcommon.NewProperties(resp.GetProperties())
state, err := resProperties.GetTaskState()
if err != nil {
return nil, err
}
reason := resProperties.GetTaskReason()
defaultResult := &workerpb.AnalyzeResults{
Results: []*workerpb.AnalyzeResult{
{
TaskID: in.GetTaskIDs()[0],
State: state,
FailReason: reason,
},
},
}
payloadResultF := func() (*workerpb.AnalyzeResults, error) {
result := &workerpb.QueryJobsV2Response{}
err = proto.Unmarshal(resp.GetPayload(), result)
if err != nil {
return nil, err
}
return result.GetAnalyzeJobResults(), nil
}
switch state {
case taskcommon.None, taskcommon.Init, taskcommon.Retry:
return defaultResult, nil
case taskcommon.InProgress:
if resp.GetPayload() != nil {
return payloadResultF()
}
return defaultResult, nil
case taskcommon.Finished, taskcommon.Failed:
if resp.GetPayload() != nil {
return payloadResultF()
}
log.Warn("the analyze result payload must not be empty",
zap.Int64("taskID", in.GetTaskIDs()[0]), zap.String("state", state.String()))
panic("the analyze result payload must not be empty with Finished/Failed state")
default:
panic("should not happen")
}
}
func (c *cluster) DropAnalyze(nodeID int64, taskID int64) error {
properties := taskcommon.NewProperties(nil)
properties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
properties.AppendTaskID(taskID)
properties.AppendType(taskcommon.Analyze)
return c.dropTask(nodeID, properties)
}
func (c *cluster) CreateExternalCollectionTask(nodeID int64, req *datapb.UpdateExternalCollectionRequest) error {
timeout := paramtable.Get().DataCoordCfg.RequestTimeoutSeconds.GetAsDuration(time.Second)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
cli, err := c.nm.GetClient(nodeID)
if err != nil {
log.Ctx(ctx).Warn("failed to get client", zap.Error(err))
return err
}
properties := taskcommon.NewProperties(nil)
properties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
properties.AppendTaskID(req.GetTaskID())
properties.AppendType(taskcommon.ExternalCollection)
payload, err := proto.Marshal(req)
if err != nil {
log.Ctx(ctx).Warn("marshal request failed", zap.Error(err))
return err
}
// Submit task to worker - task will execute asynchronously in worker's goroutine pool
status, err := cli.CreateTask(ctx, &workerpb.CreateTaskRequest{
Payload: payload,
Properties: properties,
})
if err != nil {
log.Ctx(ctx).Warn("create external collection task failed", zap.Error(err))
return err
}
if err := merr.Error(status); err != nil {
log.Ctx(ctx).Warn("create external collection task returned error", zap.Error(err))
return err
}
return nil
}
func (c *cluster) QueryExternalCollectionTask(nodeID int64, taskID int64) (*datapb.UpdateExternalCollectionResponse, error) {
properties := taskcommon.NewProperties(nil)
properties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
properties.AppendTaskID(taskID)
properties.AppendType(taskcommon.ExternalCollection)
resp, err := c.queryTask(nodeID, properties)
if err != nil {
return nil, err
}
// Unmarshal the response payload
result := &datapb.UpdateExternalCollectionResponse{}
if err := proto.Unmarshal(resp.GetPayload(), result); err != nil {
return nil, err
}
return result, nil
}
func (c *cluster) DropExternalCollectionTask(nodeID int64, taskID int64) error {
properties := taskcommon.NewProperties(nil)
properties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
properties.AppendTaskID(taskID)
properties.AppendType(taskcommon.ExternalCollection)
return c.dropTask(nodeID, properties)
}
func (c *cluster) CreateCopySegment(nodeID int64, in *datapb.CopySegmentRequest) error {
properties := taskcommon.NewProperties(nil)
properties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
properties.AppendTaskID(in.GetTaskID())
properties.AppendType(taskcommon.CopySegment)
properties.AppendTaskSlot(in.GetTaskSlot())
return c.createTask(nodeID, in, properties)
}
func (c *cluster) QueryCopySegment(nodeID int64, in *datapb.QueryCopySegmentRequest) (*datapb.QueryCopySegmentResponse, error) {
reqProperties := taskcommon.NewProperties(nil)
reqProperties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
reqProperties.AppendTaskID(in.GetTaskID())
reqProperties.AppendType(taskcommon.CopySegment)
resp, err := c.queryTask(nodeID, reqProperties)
if err != nil {
return nil, err
}
resProperties := taskcommon.NewProperties(resp.GetProperties())
state, err := resProperties.GetTaskState()
if err != nil {
return nil, err
}
reason := resProperties.GetTaskReason()
defaultResult := &datapb.QueryCopySegmentResponse{
State: taskcommon.ToCopySegmentState(state),
Reason: reason,
}
payloadResultF := func() (*datapb.QueryCopySegmentResponse, error) {
result := &datapb.QueryCopySegmentResponse{}
err = proto.Unmarshal(resp.GetPayload(), result)
if err != nil {
return nil, err
}
return result, nil
}
switch state {
case taskcommon.None, taskcommon.Init, taskcommon.Retry:
return defaultResult, nil
case taskcommon.InProgress:
if resp.GetPayload() != nil {
return payloadResultF()
}
return defaultResult, nil
case taskcommon.Finished, taskcommon.Failed:
if resp.GetPayload() != nil {
return payloadResultF()
}
log.Warn("the copy segment result payload must not be empty",
zap.Int64("taskID", in.GetTaskID()), zap.String("state", state.String()))
panic("the copy segment result payload must not be empty with Finished/Failed state")
default:
panic("should not happen")
}
}
func (c *cluster) DropCopySegment(nodeID int64, taskID int64) error {
properties := taskcommon.NewProperties(nil)
properties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
properties.AppendTaskID(taskID)
properties.AppendType(taskcommon.CopySegment)
return c.dropTask(nodeID, properties)
}