enhance: Export request timeout interval in config (#40119)

Signed-off-by: cai.zhang <cai.zhang@zilliz.com>
Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
cai.zhang 2025-02-23 15:15:54 +08:00 committed by GitHub
parent cb7f2fa6fd
commit 9f5b488f9a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 21 additions and 13 deletions

View File

@ -228,7 +228,7 @@ func (at *analyzeTask) PreCheck(ctx context.Context, dependency *taskScheduler)
}
func (at *analyzeTask) AssignTask(ctx context.Context, client types.IndexNodeClient, meta *meta) bool {
ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval)
ctx, cancel := context.WithTimeout(ctx, Params.DataCoordCfg.RequestTimeoutSeconds.GetAsDuration(time.Second))
defer cancel()
resp, err := client.CreateJobV2(ctx, &workerpb.CreateJobV2Request{
ClusterID: at.req.GetClusterID(),
@ -257,7 +257,7 @@ func (at *analyzeTask) setResult(result *workerpb.AnalyzeResult) {
}
func (at *analyzeTask) QueryResult(ctx context.Context, client types.IndexNodeClient) {
ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval)
ctx, cancel := context.WithTimeout(ctx, Params.DataCoordCfg.RequestTimeoutSeconds.GetAsDuration(time.Second))
defer cancel()
resp, err := client.QueryJobsV2(ctx, &workerpb.QueryJobsV2Request{
ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(),
@ -297,7 +297,7 @@ func (at *analyzeTask) QueryResult(ctx context.Context, client types.IndexNodeCl
}
func (at *analyzeTask) DropTaskOnWorker(ctx context.Context, client types.IndexNodeClient) bool {
ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval)
ctx, cancel := context.WithTimeout(ctx, Params.DataCoordCfg.RequestTimeoutSeconds.GetAsDuration(time.Second))
defer cancel()
resp, err := client.DropJobsV2(ctx, &workerpb.DropJobsV2Request{
ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(),

View File

@ -267,7 +267,7 @@ func (it *indexBuildTask) PreCheck(ctx context.Context, dependency *taskSchedule
}
func (it *indexBuildTask) AssignTask(ctx context.Context, client types.IndexNodeClient, meta *meta) bool {
ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval)
ctx, cancel := context.WithTimeout(ctx, Params.DataCoordCfg.RequestTimeoutSeconds.GetAsDuration(time.Second))
defer cancel()
resp, err := client.CreateJobV2(ctx, &workerpb.CreateJobV2Request{
ClusterID: it.req.GetClusterID(),
@ -296,7 +296,7 @@ func (it *indexBuildTask) setResult(info *workerpb.IndexTaskInfo) {
}
func (it *indexBuildTask) QueryResult(ctx context.Context, node types.IndexNodeClient) {
ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval)
ctx, cancel := context.WithTimeout(ctx, Params.DataCoordCfg.RequestTimeoutSeconds.GetAsDuration(time.Second))
defer cancel()
resp, err := node.QueryJobsV2(ctx, &workerpb.QueryJobsV2Request{
ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(),
@ -334,7 +334,7 @@ func (it *indexBuildTask) QueryResult(ctx context.Context, node types.IndexNodeC
}
func (it *indexBuildTask) DropTaskOnWorker(ctx context.Context, client types.IndexNodeClient) bool {
ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval)
ctx, cancel := context.WithTimeout(ctx, Params.DataCoordCfg.RequestTimeoutSeconds.GetAsDuration(time.Second))
defer cancel()
resp, err := client.DropJobsV2(ctx, &workerpb.DropJobsV2Request{
ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(),

View File

@ -35,10 +35,6 @@ import (
"github.com/milvus-io/milvus/pkg/v2/util/lock"
)
const (
reqTimeoutInterval = time.Second * 10
)
type taskScheduler struct {
sync.RWMutex

View File

@ -234,7 +234,7 @@ func (st *statsTask) AssignTask(ctx context.Context, client types.IndexNodeClien
st.req.InsertLogs = segment.GetBinlogs()
st.req.DeltaLogs = segment.GetDeltalogs()
ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
ctx, cancel := context.WithTimeout(ctx, Params.DataCoordCfg.RequestTimeoutSeconds.GetAsDuration(time.Second))
defer cancel()
resp, err := client.CreateJobV2(ctx, &workerpb.CreateJobV2Request{
ClusterID: st.req.GetClusterID(),
@ -257,7 +257,7 @@ func (st *statsTask) AssignTask(ctx context.Context, client types.IndexNodeClien
}
func (st *statsTask) QueryResult(ctx context.Context, client types.IndexNodeClient) {
ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
ctx, cancel := context.WithTimeout(ctx, Params.DataCoordCfg.RequestTimeoutSeconds.GetAsDuration(time.Second))
defer cancel()
resp, err := client.QueryJobsV2(ctx, &workerpb.QueryJobsV2Request{
ClusterID: st.req.GetClusterID(),
@ -293,7 +293,7 @@ func (st *statsTask) QueryResult(ctx context.Context, client types.IndexNodeClie
}
func (st *statsTask) DropTaskOnWorker(ctx context.Context, client types.IndexNodeClient) bool {
ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
ctx, cancel := context.WithTimeout(ctx, Params.DataCoordCfg.RequestTimeoutSeconds.GetAsDuration(time.Second))
defer cancel()
resp, err := client.DropJobsV2(ctx, &workerpb.DropJobsV2Request{
ClusterID: st.req.GetClusterID(),

View File

@ -3484,6 +3484,8 @@ type dataCoordConfig struct {
EnableStatsTask ParamItem `refreshable:"true"`
TaskCheckInterval ParamItem `refreshable:"true"`
RequestTimeoutSeconds ParamItem `refreshable:"true"`
}
func (p *dataCoordConfig) init(base *BaseTable) {
@ -4367,6 +4369,16 @@ During compaction, the size of segment # of rows is able to exceed segment max #
Export: false,
}
p.TaskCheckInterval.Init(base.mgr)
p.RequestTimeoutSeconds = ParamItem{
Key: "dataCoord.requestTimeoutSeconds",
Version: "2.5.5",
Doc: "request timeout interval",
DefaultValue: "600",
PanicIfEmpty: false,
Export: false,
}
p.RequestTimeoutSeconds.Init(base.mgr)
}
// /////////////////////////////////////////////////////////////////////////////