diff --git a/internal/datacoord/task_analyze.go b/internal/datacoord/task_analyze.go index b9e41449ad..8d588b47fa 100644 --- a/internal/datacoord/task_analyze.go +++ b/internal/datacoord/task_analyze.go @@ -228,7 +228,7 @@ func (at *analyzeTask) PreCheck(ctx context.Context, dependency *taskScheduler) } func (at *analyzeTask) AssignTask(ctx context.Context, client types.IndexNodeClient, meta *meta) bool { - ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval) + ctx, cancel := context.WithTimeout(ctx, Params.DataCoordCfg.RequestTimeoutSeconds.GetAsDuration(time.Second)) defer cancel() resp, err := client.CreateJobV2(ctx, &workerpb.CreateJobV2Request{ ClusterID: at.req.GetClusterID(), @@ -257,7 +257,7 @@ func (at *analyzeTask) setResult(result *workerpb.AnalyzeResult) { } func (at *analyzeTask) QueryResult(ctx context.Context, client types.IndexNodeClient) { - ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval) + ctx, cancel := context.WithTimeout(ctx, Params.DataCoordCfg.RequestTimeoutSeconds.GetAsDuration(time.Second)) defer cancel() resp, err := client.QueryJobsV2(ctx, &workerpb.QueryJobsV2Request{ ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(), @@ -297,7 +297,7 @@ func (at *analyzeTask) QueryResult(ctx context.Context, client types.IndexNodeCl } func (at *analyzeTask) DropTaskOnWorker(ctx context.Context, client types.IndexNodeClient) bool { - ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval) + ctx, cancel := context.WithTimeout(ctx, Params.DataCoordCfg.RequestTimeoutSeconds.GetAsDuration(time.Second)) defer cancel() resp, err := client.DropJobsV2(ctx, &workerpb.DropJobsV2Request{ ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(), diff --git a/internal/datacoord/task_index.go b/internal/datacoord/task_index.go index d5e0737ba0..d00028a964 100644 --- a/internal/datacoord/task_index.go +++ b/internal/datacoord/task_index.go @@ -267,7 +267,7 @@ func (it *indexBuildTask) PreCheck(ctx context.Context, dependency *taskSchedule } func (it *indexBuildTask) AssignTask(ctx context.Context, client types.IndexNodeClient, meta *meta) bool { - ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval) + ctx, cancel := context.WithTimeout(ctx, Params.DataCoordCfg.RequestTimeoutSeconds.GetAsDuration(time.Second)) defer cancel() resp, err := client.CreateJobV2(ctx, &workerpb.CreateJobV2Request{ ClusterID: it.req.GetClusterID(), @@ -296,7 +296,7 @@ func (it *indexBuildTask) setResult(info *workerpb.IndexTaskInfo) { } func (it *indexBuildTask) QueryResult(ctx context.Context, node types.IndexNodeClient) { - ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval) + ctx, cancel := context.WithTimeout(ctx, Params.DataCoordCfg.RequestTimeoutSeconds.GetAsDuration(time.Second)) defer cancel() resp, err := node.QueryJobsV2(ctx, &workerpb.QueryJobsV2Request{ ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(), @@ -334,7 +334,7 @@ func (it *indexBuildTask) QueryResult(ctx context.Context, node types.IndexNodeC } func (it *indexBuildTask) DropTaskOnWorker(ctx context.Context, client types.IndexNodeClient) bool { - ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval) + ctx, cancel := context.WithTimeout(ctx, Params.DataCoordCfg.RequestTimeoutSeconds.GetAsDuration(time.Second)) defer cancel() resp, err := client.DropJobsV2(ctx, &workerpb.DropJobsV2Request{ ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(), diff --git a/internal/datacoord/task_scheduler.go b/internal/datacoord/task_scheduler.go index f4cf2f0085..ec9c11ded6 100644 --- a/internal/datacoord/task_scheduler.go +++ b/internal/datacoord/task_scheduler.go @@ -35,10 +35,6 @@ import ( "github.com/milvus-io/milvus/pkg/v2/util/lock" ) -const ( - reqTimeoutInterval = time.Second * 10 -) - type taskScheduler struct { sync.RWMutex diff --git a/internal/datacoord/task_stats.go b/internal/datacoord/task_stats.go index 33ac2d77ee..e87a13babf 100644 --- a/internal/datacoord/task_stats.go +++ b/internal/datacoord/task_stats.go @@ -234,7 +234,7 @@ func (st *statsTask) AssignTask(ctx context.Context, client types.IndexNodeClien st.req.InsertLogs = segment.GetBinlogs() st.req.DeltaLogs = segment.GetDeltalogs() - ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval) + ctx, cancel := context.WithTimeout(ctx, Params.DataCoordCfg.RequestTimeoutSeconds.GetAsDuration(time.Second)) defer cancel() resp, err := client.CreateJobV2(ctx, &workerpb.CreateJobV2Request{ ClusterID: st.req.GetClusterID(), @@ -257,7 +257,7 @@ func (st *statsTask) AssignTask(ctx context.Context, client types.IndexNodeClien } func (st *statsTask) QueryResult(ctx context.Context, client types.IndexNodeClient) { - ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval) + ctx, cancel := context.WithTimeout(ctx, Params.DataCoordCfg.RequestTimeoutSeconds.GetAsDuration(time.Second)) defer cancel() resp, err := client.QueryJobsV2(ctx, &workerpb.QueryJobsV2Request{ ClusterID: st.req.GetClusterID(), @@ -293,7 +293,7 @@ func (st *statsTask) QueryResult(ctx context.Context, client types.IndexNodeClie } func (st *statsTask) DropTaskOnWorker(ctx context.Context, client types.IndexNodeClient) bool { - ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval) + ctx, cancel := context.WithTimeout(ctx, Params.DataCoordCfg.RequestTimeoutSeconds.GetAsDuration(time.Second)) defer cancel() resp, err := client.DropJobsV2(ctx, &workerpb.DropJobsV2Request{ ClusterID: st.req.GetClusterID(), diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index d866a1dc80..2e958686eb 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -3484,6 +3484,8 @@ type dataCoordConfig struct { EnableStatsTask ParamItem `refreshable:"true"` TaskCheckInterval ParamItem `refreshable:"true"` + + RequestTimeoutSeconds ParamItem `refreshable:"true"` } func (p *dataCoordConfig) init(base *BaseTable) { @@ -4367,6 +4369,16 @@ During compaction, the size of segment # of rows is able to exceed segment max # Export: false, } p.TaskCheckInterval.Init(base.mgr) + + p.RequestTimeoutSeconds = ParamItem{ + Key: "dataCoord.requestTimeoutSeconds", + Version: "2.5.5", + Doc: "request timeout interval", + DefaultValue: "600", + PanicIfEmpty: false, + Export: false, + } + p.RequestTimeoutSeconds.Init(base.mgr) } // /////////////////////////////////////////////////////////////////////////////