From 9f31fc9a319adee8909e66acf5a4f8ec30180451 Mon Sep 17 00:00:00 2001 From: Bingyi Sun Date: Mon, 24 Jul 2023 14:07:00 +0800 Subject: [PATCH] Add broker timeout config item. (#25855) Signed-off-by: sunby --- configs/milvus.yaml | 1 + .../querycoordv2/meta/coordinator_broker.go | 19 ++++++++----------- pkg/util/paramtable/component_param.go | 11 +++++++++++ 3 files changed, 20 insertions(+), 11 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 979f474b9f..db2f4fae58 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -219,6 +219,7 @@ queryCoord: taskMergeCap: 1 taskExecutionCap: 256 enableActiveStandby: false # Enable active-standby + brokerTimeout: 5000 # broker rpc timeout in milliseconds # Related configuration of queryNode, used to run hybrid search between vector and scalar data. queryNode: diff --git a/internal/querycoordv2/meta/coordinator_broker.go b/internal/querycoordv2/meta/coordinator_broker.go index abd454e537..99d3af396a 100644 --- a/internal/querycoordv2/meta/coordinator_broker.go +++ b/internal/querycoordv2/meta/coordinator_broker.go @@ -35,13 +35,10 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/commonpbutil" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" . "github.com/milvus-io/milvus/pkg/util/typeutil" ) -const ( - brokerRPCTimeout = 5 * time.Second -) - type Broker interface { GetCollectionSchema(ctx context.Context, collectionID UniqueID) (*schemapb.CollectionSchema, error) GetPartitions(ctx context.Context, collectionID UniqueID) ([]UniqueID, error) @@ -67,7 +64,7 @@ func NewCoordinatorBroker( } func (broker *CoordinatorBroker) GetCollectionSchema(ctx context.Context, collectionID UniqueID) (*schemapb.CollectionSchema, error) { - ctx, cancel := context.WithTimeout(ctx, brokerRPCTimeout) + ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond)) defer cancel() req := &milvuspb.DescribeCollectionRequest{ @@ -96,7 +93,7 @@ func (broker *CoordinatorBroker) GetCollectionSchema(ctx context.Context, collec } func (broker *CoordinatorBroker) GetPartitions(ctx context.Context, collectionID UniqueID) ([]UniqueID, error) { - ctx, cancel := context.WithTimeout(ctx, brokerRPCTimeout) + ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond)) defer cancel() req := &milvuspb.ShowPartitionsRequest{ Base: commonpbutil.NewMsgBase( @@ -126,7 +123,7 @@ func (broker *CoordinatorBroker) GetPartitions(ctx context.Context, collectionID } func (broker *CoordinatorBroker) GetRecoveryInfo(ctx context.Context, collectionID UniqueID, partitionID UniqueID) ([]*datapb.VchannelInfo, []*datapb.SegmentBinlogs, error) { - ctx, cancel := context.WithTimeout(ctx, brokerRPCTimeout) + ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond)) defer cancel() getRecoveryInfoRequest := &datapb.GetRecoveryInfoRequest{ @@ -152,7 +149,7 @@ func (broker *CoordinatorBroker) GetRecoveryInfo(ctx context.Context, collection } func (broker *CoordinatorBroker) GetRecoveryInfoV2(ctx context.Context, collectionID UniqueID, partitionIDs ...UniqueID) ([]*datapb.VchannelInfo, []*datapb.SegmentInfo, error) { - ctx, cancel := context.WithTimeout(ctx, brokerRPCTimeout) + ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond)) defer cancel() getRecoveryInfoRequest := &datapb.GetRecoveryInfoRequestV2{ @@ -178,7 +175,7 @@ func (broker *CoordinatorBroker) GetRecoveryInfoV2(ctx context.Context, collecti } func (broker *CoordinatorBroker) GetSegmentInfo(ctx context.Context, ids ...UniqueID) (*datapb.GetSegmentInfoResponse, error) { - ctx, cancel := context.WithTimeout(ctx, brokerRPCTimeout) + ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond)) defer cancel() req := &datapb.GetSegmentInfoRequest{ @@ -203,7 +200,7 @@ func (broker *CoordinatorBroker) GetSegmentInfo(ctx context.Context, ids ...Uniq } func (broker *CoordinatorBroker) GetIndexInfo(ctx context.Context, collectionID UniqueID, segmentID UniqueID) ([]*querypb.FieldIndexInfo, error) { - ctx, cancel := context.WithTimeout(ctx, brokerRPCTimeout) + ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond)) defer cancel() resp, err := broker.dataCoord.GetIndexInfos(ctx, &indexpb.GetIndexInfoRequest{ @@ -243,7 +240,7 @@ func (broker *CoordinatorBroker) GetIndexInfo(ctx context.Context, collectionID } func (broker *CoordinatorBroker) DescribeIndex(ctx context.Context, collectionID UniqueID) ([]*indexpb.IndexInfo, error) { - ctx, cancel := context.WithTimeout(ctx, brokerRPCTimeout) + ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond)) defer cancel() resp, err := broker.dataCoord.DescribeIndex(ctx, &indexpb.DescribeIndexRequest{ diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index c4bc812283..94f8f5b3a7 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -1177,6 +1177,7 @@ type queryCoordConfig struct { EnableRGAutoRecover ParamItem `refreshable:"true"` CheckHealthInterval ParamItem `refreshable:"false"` CheckHealthRPCTimeout ParamItem `refreshable:"true"` + BrokerTimeout ParamItem `refreshable:"false"` } func (p *queryCoordConfig) init(base *BaseTable) { @@ -1468,6 +1469,16 @@ func (p *queryCoordConfig) init(base *BaseTable) { Export: true, } p.CheckHealthRPCTimeout.Init(base.mgr) + + p.BrokerTimeout = ParamItem{ + Key: "queryCoord.brokerTimeout", + Version: "2.3.0", + DefaultValue: "5000", + PanicIfEmpty: true, + Doc: "5000ms, querycoord broker rpc timeout", + Export: true, + } + p.BrokerTimeout.Init(base.mgr) } // /////////////////////////////////////////////////////////////////////////////