mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
issue: #45881 Add persistent task management for external collections with automatic detection of external_source and external_spec changes. When source changes, the system aborts running tasks and creates new ones, ensuring only one active task per collection. Tasks validate their source on completion to prevent superseded tasks from committing results. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> - Core invariant: at most one active UpdateExternalCollection task exists per collection — tasks are serialized by collectionID (collection-level locking) and any change to external_source or external_spec aborts superseded tasks and causes a new task creation (externalCollectionManager + external_collection_task_meta collection-based locks enforce this). - What was simplified/removed: per-task fine-grained locking and concurrent multi-task acceptance per collection were replaced by collection-level synchronization (external_collection_task_meta.go) and a single persistent task lifecycle in DataCoord/Index task code; redundant double-concurrent update paths were removed by checking existing task presence in AddTask/LoadOrStore and aborting/overwriting via Drop/Cancel flows. - Why this does NOT cause data loss or regress behavior: task state transitions and commit are validated against the current external source/spec before applying changes — UpdateStateWithMeta and SetJobInfo verify task metadata and persist via catalog only under matching collection-state; DataNode externalCollectionManager persists task results to in-memory manager and exposes Query/Drop flows (services.go) without modifying existing segment data unless a task successfully finishes and SetJobInfo atomically updates segments via meta/catalog calls, preventing superseded tasks from committing stale results. - New capability added: end-to-end external collection update workflow — DataCoord Index task + Cluster RPC helpers + DataNode external task runner and ExternalCollectionManager enable creating, querying, cancelling, and applying external collection updates (fragment-to-segment balancing, kept/updated segment handling, allocator integration); accompanying unit tests cover success, failure, cancellation, allocator errors, and balancing logic. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Signed-off-by: sunby <sunbingyi1992@gmail.com>
690 lines
23 KiB
Go
690 lines
23 KiB
Go
// Licensed to the LF AI & Data foundation under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package session
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"go.uber.org/zap"
|
|
"google.golang.org/protobuf/proto"
|
|
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
|
"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
|
|
"github.com/milvus-io/milvus/pkg/v2/log"
|
|
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
|
|
"github.com/milvus-io/milvus/pkg/v2/proto/workerpb"
|
|
"github.com/milvus-io/milvus/pkg/v2/taskcommon"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/merr"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
|
|
)
|
|
|
|
// WorkerSlots represents the slot information for a worker node
|
|
type WorkerSlots struct {
|
|
NodeID int64
|
|
AvailableSlots int64
|
|
}
|
|
|
|
// Cluster defines the interface for tasks
|
|
type Cluster interface {
|
|
// QuerySlot returns the slot information for all worker nodes
|
|
QuerySlot() map[int64]*WorkerSlots
|
|
|
|
// CreateCompaction creates a new compaction task on the specified node
|
|
CreateCompaction(nodeID int64, in *datapb.CompactionPlan) error
|
|
// QueryCompaction queries the status of a compaction task
|
|
QueryCompaction(nodeID int64, in *datapb.CompactionStateRequest) (*datapb.CompactionPlanResult, error)
|
|
// DropCompaction drops a compaction task
|
|
DropCompaction(nodeID int64, planID int64) error
|
|
|
|
// CreatePreImport creates a pre-import task
|
|
CreatePreImport(nodeID int64, in *datapb.PreImportRequest, taskSlot int64) error
|
|
// CreateImport creates an import task
|
|
CreateImport(nodeID int64, in *datapb.ImportRequest, taskSlot int64) error
|
|
// QueryPreImport queries the status of a pre-import task
|
|
QueryPreImport(nodeID int64, in *datapb.QueryPreImportRequest) (*datapb.QueryPreImportResponse, error)
|
|
// QueryImport queries the status of an import task
|
|
QueryImport(nodeID int64, in *datapb.QueryImportRequest) (*datapb.QueryImportResponse, error)
|
|
// DropImport drops an import task
|
|
DropImport(nodeID int64, taskID int64) error
|
|
|
|
// CreateIndex creates an index building task
|
|
CreateIndex(nodeID int64, in *workerpb.CreateJobRequest) error
|
|
// QueryIndex queries the status of index building tasks
|
|
QueryIndex(nodeID int64, in *workerpb.QueryJobsRequest) (*workerpb.IndexJobResults, error)
|
|
// DropIndex drops an index building task
|
|
DropIndex(nodeID int64, taskID int64) error
|
|
|
|
// CreateStats creates a statistics collection task
|
|
CreateStats(nodeID int64, in *workerpb.CreateStatsRequest) error
|
|
// QueryStats queries the status of statistics collection tasks
|
|
QueryStats(nodeID int64, in *workerpb.QueryJobsRequest) (*workerpb.StatsResults, error)
|
|
// DropStats drops a statistics collection task
|
|
DropStats(nodeID int64, taskID int64) error
|
|
|
|
// CreateAnalyze creates an analysis task
|
|
CreateAnalyze(nodeID int64, in *workerpb.AnalyzeRequest) error
|
|
// QueryAnalyze queries the status of analysis tasks
|
|
QueryAnalyze(nodeID int64, in *workerpb.QueryJobsRequest) (*workerpb.AnalyzeResults, error)
|
|
// DropAnalyze drops an analysis task
|
|
DropAnalyze(nodeID int64, taskID int64) error
|
|
|
|
// CreateExternalCollectionTask creates and executes an external collection task
|
|
CreateExternalCollectionTask(nodeID int64, req *datapb.UpdateExternalCollectionRequest) error
|
|
// QueryExternalCollectionTask queries the status of an external collection task
|
|
QueryExternalCollectionTask(nodeID int64, taskID int64) (*datapb.UpdateExternalCollectionResponse, error)
|
|
// DropExternalCollectionTask drops an external collection task
|
|
DropExternalCollectionTask(nodeID int64, taskID int64) error
|
|
}
|
|
|
|
var _ Cluster = (*cluster)(nil)
|
|
|
|
// cluster implements the Cluster interface
|
|
type cluster struct {
|
|
nm NodeManager
|
|
}
|
|
|
|
// NewCluster creates a new instance of cluster
|
|
func NewCluster(nm NodeManager) Cluster {
|
|
c := &cluster{
|
|
nm: nm,
|
|
}
|
|
return c
|
|
}
|
|
|
|
func (c *cluster) createTask(nodeID int64, in proto.Message, properties taskcommon.Properties) error {
|
|
timeout := paramtable.Get().DataCoordCfg.RequestTimeoutSeconds.GetAsDuration(time.Second)
|
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
|
defer cancel()
|
|
cli, err := c.nm.GetClient(nodeID)
|
|
if err != nil {
|
|
log.Ctx(ctx).Warn("failed to get client", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
payload, err := proto.Marshal(in)
|
|
if err != nil {
|
|
log.Ctx(ctx).Warn("marshal request failed", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
status, err := cli.CreateTask(ctx, &workerpb.CreateTaskRequest{
|
|
Payload: payload,
|
|
Properties: properties,
|
|
})
|
|
return merr.CheckRPCCall(status, err)
|
|
}
|
|
|
|
func (c *cluster) queryTask(nodeID int64, properties taskcommon.Properties) (*workerpb.QueryTaskResponse, error) {
|
|
timeout := paramtable.Get().DataCoordCfg.RequestTimeoutSeconds.GetAsDuration(time.Second)
|
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
|
defer cancel()
|
|
cli, err := c.nm.GetClient(nodeID)
|
|
if err != nil {
|
|
log.Ctx(ctx).Warn("failed to get client", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
|
|
resp, err := cli.QueryTask(ctx, &workerpb.QueryTaskRequest{
|
|
Properties: properties,
|
|
})
|
|
if err = merr.CheckRPCCall(resp.GetStatus(), err); err != nil {
|
|
return nil, err
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
func (c *cluster) dropTask(nodeID int64, properties taskcommon.Properties) error {
|
|
timeout := paramtable.Get().DataCoordCfg.RequestTimeoutSeconds.GetAsDuration(time.Second)
|
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
|
defer cancel()
|
|
cli, err := c.nm.GetClient(nodeID)
|
|
if err != nil {
|
|
log.Ctx(ctx).Warn("failed to get client", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
status, err := cli.DropTask(ctx, &workerpb.DropTaskRequest{
|
|
Properties: properties,
|
|
})
|
|
return merr.CheckRPCCall(status, err)
|
|
}
|
|
|
|
func (c *cluster) QuerySlot() map[int64]*WorkerSlots {
|
|
var (
|
|
mu = &sync.Mutex{}
|
|
wg = &sync.WaitGroup{}
|
|
availableNodeSlots = make(map[int64]*WorkerSlots)
|
|
)
|
|
for _, nodeID := range c.nm.GetClientIDs() {
|
|
wg.Add(1)
|
|
nodeID := nodeID
|
|
go func() {
|
|
defer wg.Done()
|
|
timeout := paramtable.Get().DataCoordCfg.RequestTimeoutSeconds.GetAsDuration(time.Second)
|
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
|
defer cancel()
|
|
cli, err := c.nm.GetClient(nodeID)
|
|
if err != nil {
|
|
log.Ctx(ctx).Warn("failed to get client", zap.Error(err))
|
|
return
|
|
}
|
|
resp, err := cli.QuerySlot(ctx, &datapb.QuerySlotRequest{})
|
|
if err = merr.CheckRPCCall(resp.GetStatus(), err); err != nil {
|
|
log.Ctx(ctx).Warn("failed to get node slot", zap.Int64("nodeID", nodeID), zap.Error(err))
|
|
return
|
|
}
|
|
mu.Lock()
|
|
defer mu.Unlock()
|
|
availableNodeSlots[nodeID] = &WorkerSlots{
|
|
NodeID: nodeID,
|
|
AvailableSlots: resp.GetAvailableSlots(),
|
|
}
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
log.Ctx(context.TODO()).Debug("query slot done", zap.Any("nodeSlots", availableNodeSlots))
|
|
return availableNodeSlots
|
|
}
|
|
|
|
func (c *cluster) CreateCompaction(nodeID int64, in *datapb.CompactionPlan) error {
|
|
properties := taskcommon.NewProperties(nil)
|
|
properties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
|
|
properties.AppendTaskID(in.GetPlanID())
|
|
properties.AppendType(taskcommon.Compaction)
|
|
properties.AppendTaskSlot(in.GetSlotUsage())
|
|
return c.createTask(nodeID, in, properties)
|
|
}
|
|
|
|
func (c *cluster) QueryCompaction(nodeID int64, in *datapb.CompactionStateRequest) (*datapb.CompactionPlanResult, error) {
|
|
reqProperties := taskcommon.NewProperties(nil)
|
|
reqProperties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
|
|
reqProperties.AppendTaskID(in.GetPlanID())
|
|
reqProperties.AppendType(taskcommon.Compaction)
|
|
resp, err := c.queryTask(nodeID, reqProperties)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
resProperties := taskcommon.NewProperties(resp.GetProperties())
|
|
state, err := resProperties.GetTaskState()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
defaultResult := &datapb.CompactionPlanResult{State: taskcommon.ToCompactionState(state)}
|
|
payloadResultF := func() (*datapb.CompactionPlanResult, error) {
|
|
result := &datapb.CompactionStateResponse{}
|
|
err = proto.Unmarshal(resp.GetPayload(), result)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var ret *datapb.CompactionPlanResult
|
|
for _, rst := range result.GetResults() {
|
|
if rst.GetPlanID() != in.GetPlanID() {
|
|
continue
|
|
}
|
|
err = binlog.CompressCompactionBinlogs(rst.GetSegments())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ret = rst
|
|
break
|
|
}
|
|
return ret, err
|
|
}
|
|
|
|
switch state {
|
|
case taskcommon.None, taskcommon.Init, taskcommon.Retry:
|
|
return defaultResult, nil
|
|
case taskcommon.InProgress:
|
|
if resp.GetPayload() != nil {
|
|
return payloadResultF()
|
|
}
|
|
return defaultResult, nil
|
|
case taskcommon.Finished, taskcommon.Failed:
|
|
if resp.GetPayload() != nil {
|
|
return payloadResultF()
|
|
}
|
|
panic("the compaction result payload must not be empty with Finished/Failed state")
|
|
default:
|
|
panic("should not happen")
|
|
}
|
|
}
|
|
|
|
func (c *cluster) DropCompaction(nodeID int64, planID int64) error {
|
|
properties := taskcommon.NewProperties(nil)
|
|
properties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
|
|
properties.AppendTaskID(planID)
|
|
properties.AppendType(taskcommon.Compaction)
|
|
return c.dropTask(nodeID, properties)
|
|
}
|
|
|
|
func (c *cluster) CreatePreImport(nodeID int64, in *datapb.PreImportRequest, taskSlot int64) error {
|
|
// TODO: sheep, use taskSlot in request
|
|
properties := taskcommon.NewProperties(nil)
|
|
properties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
|
|
properties.AppendTaskID(in.GetTaskID())
|
|
properties.AppendType(taskcommon.PreImport)
|
|
properties.AppendTaskSlot(taskSlot)
|
|
return c.createTask(nodeID, in, properties)
|
|
}
|
|
|
|
func (c *cluster) CreateImport(nodeID int64, in *datapb.ImportRequest, taskSlot int64) error {
|
|
properties := taskcommon.NewProperties(nil)
|
|
properties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
|
|
properties.AppendTaskID(in.GetTaskID())
|
|
properties.AppendType(taskcommon.Import)
|
|
properties.AppendTaskSlot(taskSlot)
|
|
return c.createTask(nodeID, in, properties)
|
|
}
|
|
|
|
func (c *cluster) QueryPreImport(nodeID int64, in *datapb.QueryPreImportRequest) (*datapb.QueryPreImportResponse, error) {
|
|
repProperties := taskcommon.NewProperties(nil)
|
|
repProperties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
|
|
repProperties.AppendTaskID(in.GetTaskID())
|
|
repProperties.AppendType(taskcommon.PreImport)
|
|
resp, err := c.queryTask(nodeID, repProperties)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
resProperties := taskcommon.NewProperties(resp.GetProperties())
|
|
state, err := resProperties.GetTaskState()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
reason := resProperties.GetTaskReason()
|
|
|
|
defaultResult := &datapb.QueryPreImportResponse{State: taskcommon.ToImportState(state), Reason: reason}
|
|
payloadResultF := func() (*datapb.QueryPreImportResponse, error) {
|
|
result := &datapb.QueryPreImportResponse{}
|
|
err = proto.Unmarshal(resp.GetPayload(), result)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
switch state {
|
|
case taskcommon.None, taskcommon.Init, taskcommon.Retry:
|
|
return defaultResult, nil
|
|
case taskcommon.InProgress:
|
|
if resp.GetPayload() != nil {
|
|
return payloadResultF()
|
|
}
|
|
return defaultResult, nil
|
|
case taskcommon.Finished, taskcommon.Failed:
|
|
if resp.GetPayload() != nil {
|
|
return payloadResultF()
|
|
}
|
|
log.Warn("the preImport result payload must not be empty",
|
|
zap.Int64("taskID", in.GetTaskID()), zap.String("state", state.String()))
|
|
panic("the preImport result payload must not be empty with Finished/Failed state")
|
|
default:
|
|
panic("should not happen")
|
|
}
|
|
}
|
|
|
|
func (c *cluster) QueryImport(nodeID int64, in *datapb.QueryImportRequest) (*datapb.QueryImportResponse, error) {
|
|
reqProperties := taskcommon.NewProperties(nil)
|
|
reqProperties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
|
|
reqProperties.AppendTaskID(in.GetTaskID())
|
|
reqProperties.AppendType(taskcommon.Import)
|
|
resp, err := c.queryTask(nodeID, reqProperties)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
resProperties := taskcommon.NewProperties(resp.GetProperties())
|
|
state, err := resProperties.GetTaskState()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
reason := resProperties.GetTaskReason()
|
|
|
|
defaultResult := &datapb.QueryImportResponse{State: taskcommon.ToImportState(state), Reason: reason}
|
|
payloadResultF := func() (*datapb.QueryImportResponse, error) {
|
|
result := &datapb.QueryImportResponse{}
|
|
err = proto.Unmarshal(resp.GetPayload(), result)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
switch state {
|
|
case taskcommon.None, taskcommon.Init, taskcommon.Retry:
|
|
return defaultResult, nil
|
|
case taskcommon.InProgress:
|
|
if resp.GetPayload() != nil {
|
|
return payloadResultF()
|
|
}
|
|
return defaultResult, nil
|
|
case taskcommon.Finished, taskcommon.Failed:
|
|
if resp.GetPayload() != nil {
|
|
return payloadResultF()
|
|
}
|
|
log.Warn("the import result payload must not be empty",
|
|
zap.Int64("taskID", in.GetTaskID()), zap.String("state", state.String()))
|
|
panic("the import result payload must not be empty with Finished/Failed state")
|
|
default:
|
|
panic("should not happen")
|
|
}
|
|
}
|
|
|
|
func (c *cluster) DropImport(nodeID int64, taskID int64) error {
|
|
properties := taskcommon.NewProperties(nil)
|
|
properties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
|
|
properties.AppendTaskID(taskID)
|
|
properties.AppendType(taskcommon.Import)
|
|
return c.dropTask(nodeID, properties)
|
|
}
|
|
|
|
func (c *cluster) CreateIndex(nodeID int64, in *workerpb.CreateJobRequest) error {
|
|
properties := taskcommon.NewProperties(nil)
|
|
properties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
|
|
properties.AppendTaskID(in.GetBuildID())
|
|
properties.AppendType(taskcommon.Index)
|
|
properties.AppendTaskSlot(in.GetTaskSlot())
|
|
properties.AppendNumRows(in.GetNumRows())
|
|
properties.AppendTaskVersion(in.GetIndexVersion())
|
|
return c.createTask(nodeID, in, properties)
|
|
}
|
|
|
|
func (c *cluster) QueryIndex(nodeID int64, in *workerpb.QueryJobsRequest) (*workerpb.IndexJobResults, error) {
|
|
reqProperties := taskcommon.NewProperties(nil)
|
|
reqProperties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
|
|
reqProperties.AppendTaskID(in.GetTaskIDs()[0])
|
|
reqProperties.AppendType(taskcommon.Index)
|
|
resp, err := c.queryTask(nodeID, reqProperties)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
resProperties := taskcommon.NewProperties(resp.GetProperties())
|
|
state, err := resProperties.GetTaskState()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
reason := resProperties.GetTaskReason()
|
|
|
|
defaultResult := &workerpb.IndexJobResults{
|
|
Results: []*workerpb.IndexTaskInfo{
|
|
{
|
|
BuildID: in.GetTaskIDs()[0],
|
|
State: commonpb.IndexState(state),
|
|
FailReason: reason,
|
|
},
|
|
},
|
|
}
|
|
|
|
payloadResultF := func() (*workerpb.IndexJobResults, error) {
|
|
result := &workerpb.QueryJobsV2Response{}
|
|
err = proto.Unmarshal(resp.GetPayload(), result)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return result.GetIndexJobResults(), nil
|
|
}
|
|
|
|
switch state {
|
|
case taskcommon.None, taskcommon.Init, taskcommon.Retry:
|
|
return defaultResult, nil
|
|
case taskcommon.InProgress:
|
|
if resp.GetPayload() != nil {
|
|
return payloadResultF()
|
|
}
|
|
return defaultResult, nil
|
|
case taskcommon.Finished, taskcommon.Failed:
|
|
if resp.GetPayload() != nil {
|
|
return payloadResultF()
|
|
}
|
|
log.Warn("the index result payload must not be empty",
|
|
zap.Int64("taskID", in.GetTaskIDs()[0]), zap.String("state", state.String()))
|
|
panic("the index result payload must not be empty with Finished/Failed state")
|
|
default:
|
|
panic("should not happen")
|
|
}
|
|
}
|
|
|
|
func (c *cluster) DropIndex(nodeID int64, taskID int64) error {
|
|
properties := taskcommon.NewProperties(nil)
|
|
properties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
|
|
properties.AppendTaskID(taskID)
|
|
properties.AppendType(taskcommon.Index)
|
|
return c.dropTask(nodeID, properties)
|
|
}
|
|
|
|
func (c *cluster) CreateStats(nodeID int64, in *workerpb.CreateStatsRequest) error {
|
|
properties := taskcommon.NewProperties(nil)
|
|
properties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
|
|
properties.AppendTaskID(in.GetTaskID())
|
|
properties.AppendType(taskcommon.Stats)
|
|
properties.AppendSubType(in.GetSubJobType().String())
|
|
properties.AppendTaskSlot(in.GetTaskSlot())
|
|
properties.AppendNumRows(in.GetNumRows())
|
|
properties.AppendTaskVersion(in.GetTaskVersion())
|
|
return c.createTask(nodeID, in, properties)
|
|
}
|
|
|
|
func (c *cluster) QueryStats(nodeID int64, in *workerpb.QueryJobsRequest) (*workerpb.StatsResults, error) {
|
|
reqProperties := taskcommon.NewProperties(nil)
|
|
reqProperties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
|
|
reqProperties.AppendTaskID(in.GetTaskIDs()[0])
|
|
reqProperties.AppendType(taskcommon.Stats)
|
|
resp, err := c.queryTask(nodeID, reqProperties)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
resProperties := taskcommon.NewProperties(resp.GetProperties())
|
|
state, err := resProperties.GetTaskState()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
reason := resProperties.GetTaskReason()
|
|
|
|
defaultResult := &workerpb.StatsResults{
|
|
Results: []*workerpb.StatsResult{
|
|
{
|
|
TaskID: in.GetTaskIDs()[0],
|
|
State: state,
|
|
FailReason: reason,
|
|
},
|
|
},
|
|
}
|
|
payloadResultF := func() (*workerpb.StatsResults, error) {
|
|
result := &workerpb.QueryJobsV2Response{}
|
|
err = proto.Unmarshal(resp.GetPayload(), result)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return result.GetStatsJobResults(), nil
|
|
}
|
|
|
|
switch state {
|
|
case taskcommon.None, taskcommon.Init, taskcommon.Retry:
|
|
return defaultResult, nil
|
|
case taskcommon.InProgress:
|
|
if resp.GetPayload() != nil {
|
|
return payloadResultF()
|
|
}
|
|
return defaultResult, nil
|
|
|
|
case taskcommon.Finished, taskcommon.Failed:
|
|
if resp.GetPayload() != nil {
|
|
return payloadResultF()
|
|
}
|
|
log.Warn("the stats result payload must not be empty",
|
|
zap.Int64("taskID", in.GetTaskIDs()[0]), zap.String("state", state.String()))
|
|
panic("the stats result payload must not be empty with Finished/Failed state")
|
|
default:
|
|
panic("should not happen")
|
|
}
|
|
}
|
|
|
|
func (c *cluster) DropStats(nodeID int64, taskID int64) error {
|
|
properties := taskcommon.NewProperties(nil)
|
|
properties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
|
|
properties.AppendTaskID(taskID)
|
|
properties.AppendType(taskcommon.Stats)
|
|
return c.dropTask(nodeID, properties)
|
|
}
|
|
|
|
func (c *cluster) CreateAnalyze(nodeID int64, in *workerpb.AnalyzeRequest) error {
|
|
properties := taskcommon.NewProperties(nil)
|
|
properties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
|
|
properties.AppendTaskID(in.GetTaskID())
|
|
properties.AppendType(taskcommon.Analyze)
|
|
properties.AppendTaskSlot(in.GetTaskSlot())
|
|
properties.AppendTaskVersion(in.GetVersion())
|
|
return c.createTask(nodeID, in, properties)
|
|
}
|
|
|
|
func (c *cluster) QueryAnalyze(nodeID int64, in *workerpb.QueryJobsRequest) (*workerpb.AnalyzeResults, error) {
|
|
reqProperties := taskcommon.NewProperties(nil)
|
|
reqProperties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
|
|
reqProperties.AppendTaskID(in.GetTaskIDs()[0])
|
|
reqProperties.AppendType(taskcommon.Analyze)
|
|
resp, err := c.queryTask(nodeID, reqProperties)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
resProperties := taskcommon.NewProperties(resp.GetProperties())
|
|
state, err := resProperties.GetTaskState()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
reason := resProperties.GetTaskReason()
|
|
|
|
defaultResult := &workerpb.AnalyzeResults{
|
|
Results: []*workerpb.AnalyzeResult{
|
|
{
|
|
TaskID: in.GetTaskIDs()[0],
|
|
State: state,
|
|
FailReason: reason,
|
|
},
|
|
},
|
|
}
|
|
payloadResultF := func() (*workerpb.AnalyzeResults, error) {
|
|
result := &workerpb.QueryJobsV2Response{}
|
|
err = proto.Unmarshal(resp.GetPayload(), result)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return result.GetAnalyzeJobResults(), nil
|
|
}
|
|
|
|
switch state {
|
|
case taskcommon.None, taskcommon.Init, taskcommon.Retry:
|
|
return defaultResult, nil
|
|
case taskcommon.InProgress:
|
|
if resp.GetPayload() != nil {
|
|
return payloadResultF()
|
|
}
|
|
return defaultResult, nil
|
|
case taskcommon.Finished, taskcommon.Failed:
|
|
if resp.GetPayload() != nil {
|
|
return payloadResultF()
|
|
}
|
|
log.Warn("the analyze result payload must not be empty",
|
|
zap.Int64("taskID", in.GetTaskIDs()[0]), zap.String("state", state.String()))
|
|
panic("the analyze result payload must not be empty with Finished/Failed state")
|
|
default:
|
|
panic("should not happen")
|
|
}
|
|
}
|
|
|
|
func (c *cluster) DropAnalyze(nodeID int64, taskID int64) error {
|
|
properties := taskcommon.NewProperties(nil)
|
|
properties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
|
|
properties.AppendTaskID(taskID)
|
|
properties.AppendType(taskcommon.Analyze)
|
|
return c.dropTask(nodeID, properties)
|
|
}
|
|
|
|
func (c *cluster) CreateExternalCollectionTask(nodeID int64, req *datapb.UpdateExternalCollectionRequest) error {
|
|
timeout := paramtable.Get().DataCoordCfg.RequestTimeoutSeconds.GetAsDuration(time.Second)
|
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
|
defer cancel()
|
|
|
|
cli, err := c.nm.GetClient(nodeID)
|
|
if err != nil {
|
|
log.Ctx(ctx).Warn("failed to get client", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
properties := taskcommon.NewProperties(nil)
|
|
properties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
|
|
properties.AppendTaskID(req.GetTaskID())
|
|
properties.AppendType(taskcommon.ExternalCollection)
|
|
|
|
payload, err := proto.Marshal(req)
|
|
if err != nil {
|
|
log.Ctx(ctx).Warn("marshal request failed", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
// Submit task to worker - task will execute asynchronously in worker's goroutine pool
|
|
status, err := cli.CreateTask(ctx, &workerpb.CreateTaskRequest{
|
|
Payload: payload,
|
|
Properties: properties,
|
|
})
|
|
if err != nil {
|
|
log.Ctx(ctx).Warn("create external collection task failed", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
if err := merr.Error(status); err != nil {
|
|
log.Ctx(ctx).Warn("create external collection task returned error", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *cluster) QueryExternalCollectionTask(nodeID int64, taskID int64) (*datapb.UpdateExternalCollectionResponse, error) {
|
|
properties := taskcommon.NewProperties(nil)
|
|
properties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
|
|
properties.AppendTaskID(taskID)
|
|
properties.AppendType(taskcommon.ExternalCollection)
|
|
|
|
resp, err := c.queryTask(nodeID, properties)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Unmarshal the response payload
|
|
result := &datapb.UpdateExternalCollectionResponse{}
|
|
if err := proto.Unmarshal(resp.GetPayload(), result); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func (c *cluster) DropExternalCollectionTask(nodeID int64, taskID int64) error {
|
|
properties := taskcommon.NewProperties(nil)
|
|
properties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
|
|
properties.AppendTaskID(taskID)
|
|
properties.AppendType(taskcommon.ExternalCollection)
|
|
return c.dropTask(nodeID, properties)
|
|
}
|