cai.zhang 7f470e6bd3
fix: Fix retry state with palyload is not nil (#44068)
issue: #43776

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
2025-08-27 18:11:49 +08:00

615 lines
20 KiB
Go

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package session
import (
"context"
"sync"
"time"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/workerpb"
"github.com/milvus-io/milvus/pkg/v2/taskcommon"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)
// WorkerSlots represents the slot information for a worker node
type WorkerSlots struct {
NodeID int64
AvailableSlots int64
}
// Cluster defines the interface for tasks
type Cluster interface {
// QuerySlot returns the slot information for all worker nodes
QuerySlot() map[int64]*WorkerSlots
// CreateCompaction creates a new compaction task on the specified node
CreateCompaction(nodeID int64, in *datapb.CompactionPlan) error
// QueryCompaction queries the status of a compaction task
QueryCompaction(nodeID int64, in *datapb.CompactionStateRequest) (*datapb.CompactionPlanResult, error)
// DropCompaction drops a compaction task
DropCompaction(nodeID int64, planID int64) error
// CreatePreImport creates a pre-import task
CreatePreImport(nodeID int64, in *datapb.PreImportRequest, taskSlot int64) error
// CreateImport creates an import task
CreateImport(nodeID int64, in *datapb.ImportRequest, taskSlot int64) error
// QueryPreImport queries the status of a pre-import task
QueryPreImport(nodeID int64, in *datapb.QueryPreImportRequest) (*datapb.QueryPreImportResponse, error)
// QueryImport queries the status of an import task
QueryImport(nodeID int64, in *datapb.QueryImportRequest) (*datapb.QueryImportResponse, error)
// DropImport drops an import task
DropImport(nodeID int64, taskID int64) error
// CreateIndex creates an index building task
CreateIndex(nodeID int64, in *workerpb.CreateJobRequest) error
// QueryIndex queries the status of index building tasks
QueryIndex(nodeID int64, in *workerpb.QueryJobsRequest) (*workerpb.IndexJobResults, error)
// DropIndex drops an index building task
DropIndex(nodeID int64, taskID int64) error
// CreateStats creates a statistics collection task
CreateStats(nodeID int64, in *workerpb.CreateStatsRequest) error
// QueryStats queries the status of statistics collection tasks
QueryStats(nodeID int64, in *workerpb.QueryJobsRequest) (*workerpb.StatsResults, error)
// DropStats drops a statistics collection task
DropStats(nodeID int64, taskID int64) error
// CreateAnalyze creates an analysis task
CreateAnalyze(nodeID int64, in *workerpb.AnalyzeRequest) error
// QueryAnalyze queries the status of analysis tasks
QueryAnalyze(nodeID int64, in *workerpb.QueryJobsRequest) (*workerpb.AnalyzeResults, error)
// DropAnalyze drops an analysis task
DropAnalyze(nodeID int64, taskID int64) error
}
var _ Cluster = (*cluster)(nil)
// cluster implements the Cluster interface
type cluster struct {
nm NodeManager
}
// NewCluster creates a new instance of cluster
func NewCluster(nm NodeManager) Cluster {
c := &cluster{
nm: nm,
}
return c
}
func (c *cluster) createTask(nodeID int64, in proto.Message, properties taskcommon.Properties) error {
timeout := paramtable.Get().DataCoordCfg.RequestTimeoutSeconds.GetAsDuration(time.Second)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
cli, err := c.nm.GetClient(nodeID)
if err != nil {
log.Ctx(ctx).Warn("failed to get client", zap.Error(err))
return err
}
payload, err := proto.Marshal(in)
if err != nil {
log.Ctx(ctx).Warn("marshal request failed", zap.Error(err))
return err
}
status, err := cli.CreateTask(ctx, &workerpb.CreateTaskRequest{
Payload: payload,
Properties: properties,
})
return merr.CheckRPCCall(status, err)
}
func (c *cluster) queryTask(nodeID int64, properties taskcommon.Properties) (*workerpb.QueryTaskResponse, error) {
timeout := paramtable.Get().DataCoordCfg.RequestTimeoutSeconds.GetAsDuration(time.Second)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
cli, err := c.nm.GetClient(nodeID)
if err != nil {
log.Ctx(ctx).Warn("failed to get client", zap.Error(err))
return nil, err
}
resp, err := cli.QueryTask(ctx, &workerpb.QueryTaskRequest{
Properties: properties,
})
if err = merr.CheckRPCCall(resp.GetStatus(), err); err != nil {
return nil, err
}
return resp, nil
}
func (c *cluster) dropTask(nodeID int64, properties taskcommon.Properties) error {
timeout := paramtable.Get().DataCoordCfg.RequestTimeoutSeconds.GetAsDuration(time.Second)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
cli, err := c.nm.GetClient(nodeID)
if err != nil {
log.Ctx(ctx).Warn("failed to get client", zap.Error(err))
return err
}
status, err := cli.DropTask(ctx, &workerpb.DropTaskRequest{
Properties: properties,
})
return merr.CheckRPCCall(status, err)
}
func (c *cluster) QuerySlot() map[int64]*WorkerSlots {
var (
mu = &sync.Mutex{}
wg = &sync.WaitGroup{}
availableNodeSlots = make(map[int64]*WorkerSlots)
)
for _, nodeID := range c.nm.GetClientIDs() {
wg.Add(1)
nodeID := nodeID
go func() {
defer wg.Done()
timeout := paramtable.Get().DataCoordCfg.RequestTimeoutSeconds.GetAsDuration(time.Second)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
cli, err := c.nm.GetClient(nodeID)
if err != nil {
log.Ctx(ctx).Warn("failed to get client", zap.Error(err))
return
}
resp, err := cli.QuerySlot(ctx, &datapb.QuerySlotRequest{})
if err = merr.CheckRPCCall(resp.GetStatus(), err); err != nil {
log.Ctx(ctx).Warn("failed to get node slot", zap.Int64("nodeID", nodeID), zap.Error(err))
return
}
mu.Lock()
defer mu.Unlock()
availableNodeSlots[nodeID] = &WorkerSlots{
NodeID: nodeID,
AvailableSlots: resp.GetAvailableSlots(),
}
}()
}
wg.Wait()
log.Ctx(context.TODO()).Debug("query slot done", zap.Any("nodeSlots", availableNodeSlots))
return availableNodeSlots
}
func (c *cluster) CreateCompaction(nodeID int64, in *datapb.CompactionPlan) error {
properties := taskcommon.NewProperties(nil)
properties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
properties.AppendTaskID(in.GetPlanID())
properties.AppendType(taskcommon.Compaction)
properties.AppendTaskSlot(in.GetSlotUsage())
return c.createTask(nodeID, in, properties)
}
func (c *cluster) QueryCompaction(nodeID int64, in *datapb.CompactionStateRequest) (*datapb.CompactionPlanResult, error) {
reqProperties := taskcommon.NewProperties(nil)
reqProperties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
reqProperties.AppendTaskID(in.GetPlanID())
reqProperties.AppendType(taskcommon.Compaction)
resp, err := c.queryTask(nodeID, reqProperties)
if err != nil {
return nil, err
}
resProperties := taskcommon.NewProperties(resp.GetProperties())
state, err := resProperties.GetTaskState()
if err != nil {
return nil, err
}
defaultResult := &datapb.CompactionPlanResult{State: taskcommon.ToCompactionState(state)}
payloadResultF := func() (*datapb.CompactionPlanResult, error) {
result := &datapb.CompactionStateResponse{}
err = proto.Unmarshal(resp.GetPayload(), result)
if err != nil {
return nil, err
}
var ret *datapb.CompactionPlanResult
for _, rst := range result.GetResults() {
if rst.GetPlanID() != in.GetPlanID() {
continue
}
err = binlog.CompressCompactionBinlogs(rst.GetSegments())
if err != nil {
return nil, err
}
ret = rst
break
}
return ret, err
}
switch state {
case taskcommon.None, taskcommon.Init, taskcommon.Retry:
return defaultResult, nil
case taskcommon.InProgress:
if resp.GetPayload() != nil {
return payloadResultF()
}
return defaultResult, nil
case taskcommon.Finished, taskcommon.Failed:
if resp.GetPayload() != nil {
return payloadResultF()
}
panic("the compaction result payload must not be empty with Finished/Failed state")
default:
panic("should not happen")
}
}
func (c *cluster) DropCompaction(nodeID int64, planID int64) error {
properties := taskcommon.NewProperties(nil)
properties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
properties.AppendTaskID(planID)
properties.AppendType(taskcommon.Compaction)
return c.dropTask(nodeID, properties)
}
func (c *cluster) CreatePreImport(nodeID int64, in *datapb.PreImportRequest, taskSlot int64) error {
// TODO: sheep, use taskSlot in request
properties := taskcommon.NewProperties(nil)
properties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
properties.AppendTaskID(in.GetTaskID())
properties.AppendType(taskcommon.PreImport)
properties.AppendTaskSlot(taskSlot)
return c.createTask(nodeID, in, properties)
}
func (c *cluster) CreateImport(nodeID int64, in *datapb.ImportRequest, taskSlot int64) error {
properties := taskcommon.NewProperties(nil)
properties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
properties.AppendTaskID(in.GetTaskID())
properties.AppendType(taskcommon.Import)
properties.AppendTaskSlot(taskSlot)
return c.createTask(nodeID, in, properties)
}
func (c *cluster) QueryPreImport(nodeID int64, in *datapb.QueryPreImportRequest) (*datapb.QueryPreImportResponse, error) {
repProperties := taskcommon.NewProperties(nil)
repProperties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
repProperties.AppendTaskID(in.GetTaskID())
repProperties.AppendType(taskcommon.PreImport)
resp, err := c.queryTask(nodeID, repProperties)
if err != nil {
return nil, err
}
resProperties := taskcommon.NewProperties(resp.GetProperties())
state, err := resProperties.GetTaskState()
if err != nil {
return nil, err
}
reason := resProperties.GetTaskReason()
defaultResult := &datapb.QueryPreImportResponse{State: taskcommon.ToImportState(state), Reason: reason}
payloadResultF := func() (*datapb.QueryPreImportResponse, error) {
result := &datapb.QueryPreImportResponse{}
err = proto.Unmarshal(resp.GetPayload(), result)
if err != nil {
return nil, err
}
return result, nil
}
switch state {
case taskcommon.None, taskcommon.Init, taskcommon.Retry:
return defaultResult, nil
case taskcommon.InProgress:
if resp.GetPayload() != nil {
return payloadResultF()
}
return defaultResult, nil
case taskcommon.Finished, taskcommon.Failed:
if resp.GetPayload() != nil {
return payloadResultF()
}
log.Warn("the preImport result payload must not be empty",
zap.Int64("taskID", in.GetTaskID()), zap.String("state", state.String()))
panic("the preImport result payload must not be empty with Finished/Failed state")
default:
panic("should not happen")
}
}
func (c *cluster) QueryImport(nodeID int64, in *datapb.QueryImportRequest) (*datapb.QueryImportResponse, error) {
reqProperties := taskcommon.NewProperties(nil)
reqProperties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
reqProperties.AppendTaskID(in.GetTaskID())
reqProperties.AppendType(taskcommon.Import)
resp, err := c.queryTask(nodeID, reqProperties)
if err != nil {
return nil, err
}
resProperties := taskcommon.NewProperties(resp.GetProperties())
state, err := resProperties.GetTaskState()
if err != nil {
return nil, err
}
reason := resProperties.GetTaskReason()
defaultResult := &datapb.QueryImportResponse{State: taskcommon.ToImportState(state), Reason: reason}
payloadResultF := func() (*datapb.QueryImportResponse, error) {
result := &datapb.QueryImportResponse{}
err = proto.Unmarshal(resp.GetPayload(), result)
if err != nil {
return nil, err
}
return result, nil
}
switch state {
case taskcommon.None, taskcommon.Init, taskcommon.Retry:
return defaultResult, nil
case taskcommon.InProgress:
if resp.GetPayload() != nil {
return payloadResultF()
}
return defaultResult, nil
case taskcommon.Finished, taskcommon.Failed:
if resp.GetPayload() != nil {
return payloadResultF()
}
log.Warn("the import result payload must not be empty",
zap.Int64("taskID", in.GetTaskID()), zap.String("state", state.String()))
panic("the import result payload must not be empty with Finished/Failed state")
default:
panic("should not happen")
}
}
func (c *cluster) DropImport(nodeID int64, taskID int64) error {
properties := taskcommon.NewProperties(nil)
properties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
properties.AppendTaskID(taskID)
properties.AppendType(taskcommon.Import)
return c.dropTask(nodeID, properties)
}
func (c *cluster) CreateIndex(nodeID int64, in *workerpb.CreateJobRequest) error {
properties := taskcommon.NewProperties(nil)
properties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
properties.AppendTaskID(in.GetBuildID())
properties.AppendType(taskcommon.Index)
properties.AppendTaskSlot(in.GetTaskSlot())
properties.AppendNumRows(in.GetNumRows())
properties.AppendTaskVersion(in.GetIndexVersion())
return c.createTask(nodeID, in, properties)
}
func (c *cluster) QueryIndex(nodeID int64, in *workerpb.QueryJobsRequest) (*workerpb.IndexJobResults, error) {
reqProperties := taskcommon.NewProperties(nil)
reqProperties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
reqProperties.AppendTaskID(in.GetTaskIDs()[0])
reqProperties.AppendType(taskcommon.Index)
resp, err := c.queryTask(nodeID, reqProperties)
if err != nil {
return nil, err
}
resProperties := taskcommon.NewProperties(resp.GetProperties())
state, err := resProperties.GetTaskState()
if err != nil {
return nil, err
}
reason := resProperties.GetTaskReason()
defaultResult := &workerpb.IndexJobResults{
Results: []*workerpb.IndexTaskInfo{
{
BuildID: in.GetTaskIDs()[0],
State: commonpb.IndexState(state),
FailReason: reason,
},
},
}
payloadResultF := func() (*workerpb.IndexJobResults, error) {
result := &workerpb.QueryJobsV2Response{}
err = proto.Unmarshal(resp.GetPayload(), result)
if err != nil {
return nil, err
}
return result.GetIndexJobResults(), nil
}
switch state {
case taskcommon.None, taskcommon.Init, taskcommon.Retry:
return defaultResult, nil
case taskcommon.InProgress:
if resp.GetPayload() != nil {
return payloadResultF()
}
return defaultResult, nil
case taskcommon.Finished, taskcommon.Failed:
if resp.GetPayload() != nil {
return payloadResultF()
}
log.Warn("the index result payload must not be empty",
zap.Int64("taskID", in.GetTaskIDs()[0]), zap.String("state", state.String()))
panic("the index result payload must not be empty with Finished/Failed state")
default:
panic("should not happen")
}
}
func (c *cluster) DropIndex(nodeID int64, taskID int64) error {
properties := taskcommon.NewProperties(nil)
properties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
properties.AppendTaskID(taskID)
properties.AppendType(taskcommon.Index)
return c.dropTask(nodeID, properties)
}
func (c *cluster) CreateStats(nodeID int64, in *workerpb.CreateStatsRequest) error {
properties := taskcommon.NewProperties(nil)
properties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
properties.AppendTaskID(in.GetTaskID())
properties.AppendType(taskcommon.Stats)
properties.AppendSubType(in.GetSubJobType().String())
properties.AppendTaskSlot(in.GetTaskSlot())
properties.AppendNumRows(in.GetNumRows())
properties.AppendTaskVersion(in.GetTaskVersion())
return c.createTask(nodeID, in, properties)
}
func (c *cluster) QueryStats(nodeID int64, in *workerpb.QueryJobsRequest) (*workerpb.StatsResults, error) {
reqProperties := taskcommon.NewProperties(nil)
reqProperties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
reqProperties.AppendTaskID(in.GetTaskIDs()[0])
reqProperties.AppendType(taskcommon.Stats)
resp, err := c.queryTask(nodeID, reqProperties)
if err != nil {
return nil, err
}
resProperties := taskcommon.NewProperties(resp.GetProperties())
state, err := resProperties.GetTaskState()
if err != nil {
return nil, err
}
reason := resProperties.GetTaskReason()
defaultResult := &workerpb.StatsResults{
Results: []*workerpb.StatsResult{
{
TaskID: in.GetTaskIDs()[0],
State: state,
FailReason: reason,
},
},
}
payloadResultF := func() (*workerpb.StatsResults, error) {
result := &workerpb.QueryJobsV2Response{}
err = proto.Unmarshal(resp.GetPayload(), result)
if err != nil {
return nil, err
}
return result.GetStatsJobResults(), nil
}
switch state {
case taskcommon.None, taskcommon.Init, taskcommon.Retry:
return defaultResult, nil
case taskcommon.InProgress:
if resp.GetPayload() != nil {
return payloadResultF()
}
return defaultResult, nil
case taskcommon.Finished, taskcommon.Failed:
if resp.GetPayload() != nil {
return payloadResultF()
}
log.Warn("the stats result payload must not be empty",
zap.Int64("taskID", in.GetTaskIDs()[0]), zap.String("state", state.String()))
panic("the stats result payload must not be empty with Finished/Failed state")
default:
panic("should not happen")
}
}
func (c *cluster) DropStats(nodeID int64, taskID int64) error {
properties := taskcommon.NewProperties(nil)
properties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
properties.AppendTaskID(taskID)
properties.AppendType(taskcommon.Stats)
return c.dropTask(nodeID, properties)
}
func (c *cluster) CreateAnalyze(nodeID int64, in *workerpb.AnalyzeRequest) error {
properties := taskcommon.NewProperties(nil)
properties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
properties.AppendTaskID(in.GetTaskID())
properties.AppendType(taskcommon.Analyze)
properties.AppendTaskSlot(in.GetTaskSlot())
properties.AppendTaskVersion(in.GetVersion())
return c.createTask(nodeID, in, properties)
}
func (c *cluster) QueryAnalyze(nodeID int64, in *workerpb.QueryJobsRequest) (*workerpb.AnalyzeResults, error) {
reqProperties := taskcommon.NewProperties(nil)
reqProperties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
reqProperties.AppendTaskID(in.GetTaskIDs()[0])
reqProperties.AppendType(taskcommon.Analyze)
resp, err := c.queryTask(nodeID, reqProperties)
if err != nil {
return nil, err
}
resProperties := taskcommon.NewProperties(resp.GetProperties())
state, err := resProperties.GetTaskState()
if err != nil {
return nil, err
}
reason := resProperties.GetTaskReason()
defaultResult := &workerpb.AnalyzeResults{
Results: []*workerpb.AnalyzeResult{
{
TaskID: in.GetTaskIDs()[0],
State: state,
FailReason: reason,
},
},
}
payloadResultF := func() (*workerpb.AnalyzeResults, error) {
result := &workerpb.QueryJobsV2Response{}
err = proto.Unmarshal(resp.GetPayload(), result)
if err != nil {
return nil, err
}
return result.GetAnalyzeJobResults(), nil
}
switch state {
case taskcommon.None, taskcommon.Init, taskcommon.Retry:
return defaultResult, nil
case taskcommon.InProgress:
if resp.GetPayload() != nil {
return payloadResultF()
}
return defaultResult, nil
case taskcommon.Finished, taskcommon.Failed:
if resp.GetPayload() != nil {
return payloadResultF()
}
log.Warn("the analyze result payload must not be empty",
zap.Int64("taskID", in.GetTaskIDs()[0]), zap.String("state", state.String()))
panic("the analyze result payload must not be empty with Finished/Failed state")
default:
panic("should not happen")
}
}
func (c *cluster) DropAnalyze(nodeID int64, taskID int64) error {
properties := taskcommon.NewProperties(nil)
properties.AppendClusterID(paramtable.Get().CommonCfg.ClusterPrefix.GetValue())
properties.AppendTaskID(taskID)
properties.AppendType(taskcommon.Analyze)
return c.dropTask(nodeID, properties)
}