From 8e5d6193f9890d20daa8fc1aa2d04bb025dde2b9 Mon Sep 17 00:00:00 2001 From: jaime Date: Mon, 24 Jul 2023 14:23:00 +0800 Subject: [PATCH] Add a timeout config for bulkinsert request (#25789) Signed-off-by: jaime --- internal/datanode/services.go | 3 ++- pkg/util/paramtable/component_param.go | 11 +++++++++++ pkg/util/paramtable/component_param_test.go | 4 ++++ 3 files changed, 17 insertions(+), 1 deletion(-) diff --git a/internal/datanode/services.go b/internal/datanode/services.go index b1a0e7d6ee..610764093d 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -24,6 +24,7 @@ import ( "fmt" "path" "strconv" + "time" "github.com/cockroachdb/errors" "github.com/milvus-io/milvus/pkg/util/tsoutil" @@ -436,7 +437,7 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest) importResult.Infos = append(importResult.Infos, &commonpb.KeyValuePair{Key: importutil.ProgressPercent, Value: "0"}) // Spawn a new context to ignore cancellation from parental context. - newCtx, cancel := context.WithTimeout(context.TODO(), ImportCallTimeout) + newCtx, cancel := context.WithTimeout(context.TODO(), paramtable.Get().DataNodeCfg.BulkInsertTimeoutSeconds.GetAsDuration(time.Second)) defer cancel() // function to report import state to RootCoord. diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 15d835d986..68b5e90707 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2339,6 +2339,9 @@ type dataNodeConfig struct { // DataNode send timetick interval per collection DataNodeTimeTickInterval ParamItem `refreshable:"false"` + // timeout for bulkinsert + BulkInsertTimeoutSeconds ParamItem `refreshable:"true"` + // Skip BF SkipBFStatsLoad ParamItem `refreshable:"true"` } @@ -2498,6 +2501,14 @@ func (p *dataNodeConfig) init(base *BaseTable) { DefaultValue: "false", } p.SkipBFStatsLoad.Init(base.mgr) + + p.BulkInsertTimeoutSeconds = ParamItem{ + Key: "datanode.bulkinsert.timeout.seconds", + Version: "2.3.0", + PanicIfEmpty: false, + DefaultValue: "18000", + } + p.BulkInsertTimeoutSeconds.Init(base.mgr) } // ///////////////////////////////////////////////////////////////////////////// diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index eaf7f0eef8..a75f2f93da 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -389,6 +389,10 @@ func TestComponentParam(t *testing.T) { period := Params.SyncPeriod t.Logf("SyncPeriod: %v", period) assert.Equal(t, 10*time.Minute, Params.SyncPeriod.GetAsDuration(time.Second)) + + bulkinsertTimeout := Params.BulkInsertTimeoutSeconds + t.Logf("BulkInsertTimeoutSeconds: %v", bulkinsertTimeout) + assert.Equal(t, "18000", Params.BulkInsertTimeoutSeconds.GetValue()) }) t.Run("test indexNodeConfig", func(t *testing.T) {