mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
Add broker timeout config item. (#25855)
Signed-off-by: sunby <sunbingyi1992@gmail.com>
This commit is contained in:
parent
efcaa07fe9
commit
9f31fc9a31
@ -219,6 +219,7 @@ queryCoord:
|
||||
taskMergeCap: 1
|
||||
taskExecutionCap: 256
|
||||
enableActiveStandby: false # Enable active-standby
|
||||
brokerTimeout: 5000 # broker rpc timeout in milliseconds
|
||||
|
||||
# Related configuration of queryNode, used to run hybrid search between vector and scalar data.
|
||||
queryNode:
|
||||
|
||||
@ -35,13 +35,10 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
. "github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
const (
|
||||
brokerRPCTimeout = 5 * time.Second
|
||||
)
|
||||
|
||||
type Broker interface {
|
||||
GetCollectionSchema(ctx context.Context, collectionID UniqueID) (*schemapb.CollectionSchema, error)
|
||||
GetPartitions(ctx context.Context, collectionID UniqueID) ([]UniqueID, error)
|
||||
@ -67,7 +64,7 @@ func NewCoordinatorBroker(
|
||||
}
|
||||
|
||||
func (broker *CoordinatorBroker) GetCollectionSchema(ctx context.Context, collectionID UniqueID) (*schemapb.CollectionSchema, error) {
|
||||
ctx, cancel := context.WithTimeout(ctx, brokerRPCTimeout)
|
||||
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond))
|
||||
defer cancel()
|
||||
|
||||
req := &milvuspb.DescribeCollectionRequest{
|
||||
@ -96,7 +93,7 @@ func (broker *CoordinatorBroker) GetCollectionSchema(ctx context.Context, collec
|
||||
}
|
||||
|
||||
func (broker *CoordinatorBroker) GetPartitions(ctx context.Context, collectionID UniqueID) ([]UniqueID, error) {
|
||||
ctx, cancel := context.WithTimeout(ctx, brokerRPCTimeout)
|
||||
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond))
|
||||
defer cancel()
|
||||
req := &milvuspb.ShowPartitionsRequest{
|
||||
Base: commonpbutil.NewMsgBase(
|
||||
@ -126,7 +123,7 @@ func (broker *CoordinatorBroker) GetPartitions(ctx context.Context, collectionID
|
||||
}
|
||||
|
||||
func (broker *CoordinatorBroker) GetRecoveryInfo(ctx context.Context, collectionID UniqueID, partitionID UniqueID) ([]*datapb.VchannelInfo, []*datapb.SegmentBinlogs, error) {
|
||||
ctx, cancel := context.WithTimeout(ctx, brokerRPCTimeout)
|
||||
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond))
|
||||
defer cancel()
|
||||
|
||||
getRecoveryInfoRequest := &datapb.GetRecoveryInfoRequest{
|
||||
@ -152,7 +149,7 @@ func (broker *CoordinatorBroker) GetRecoveryInfo(ctx context.Context, collection
|
||||
}
|
||||
|
||||
func (broker *CoordinatorBroker) GetRecoveryInfoV2(ctx context.Context, collectionID UniqueID, partitionIDs ...UniqueID) ([]*datapb.VchannelInfo, []*datapb.SegmentInfo, error) {
|
||||
ctx, cancel := context.WithTimeout(ctx, brokerRPCTimeout)
|
||||
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond))
|
||||
defer cancel()
|
||||
|
||||
getRecoveryInfoRequest := &datapb.GetRecoveryInfoRequestV2{
|
||||
@ -178,7 +175,7 @@ func (broker *CoordinatorBroker) GetRecoveryInfoV2(ctx context.Context, collecti
|
||||
}
|
||||
|
||||
func (broker *CoordinatorBroker) GetSegmentInfo(ctx context.Context, ids ...UniqueID) (*datapb.GetSegmentInfoResponse, error) {
|
||||
ctx, cancel := context.WithTimeout(ctx, brokerRPCTimeout)
|
||||
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond))
|
||||
defer cancel()
|
||||
|
||||
req := &datapb.GetSegmentInfoRequest{
|
||||
@ -203,7 +200,7 @@ func (broker *CoordinatorBroker) GetSegmentInfo(ctx context.Context, ids ...Uniq
|
||||
}
|
||||
|
||||
func (broker *CoordinatorBroker) GetIndexInfo(ctx context.Context, collectionID UniqueID, segmentID UniqueID) ([]*querypb.FieldIndexInfo, error) {
|
||||
ctx, cancel := context.WithTimeout(ctx, brokerRPCTimeout)
|
||||
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond))
|
||||
defer cancel()
|
||||
|
||||
resp, err := broker.dataCoord.GetIndexInfos(ctx, &indexpb.GetIndexInfoRequest{
|
||||
@ -243,7 +240,7 @@ func (broker *CoordinatorBroker) GetIndexInfo(ctx context.Context, collectionID
|
||||
}
|
||||
|
||||
func (broker *CoordinatorBroker) DescribeIndex(ctx context.Context, collectionID UniqueID) ([]*indexpb.IndexInfo, error) {
|
||||
ctx, cancel := context.WithTimeout(ctx, brokerRPCTimeout)
|
||||
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond))
|
||||
defer cancel()
|
||||
|
||||
resp, err := broker.dataCoord.DescribeIndex(ctx, &indexpb.DescribeIndexRequest{
|
||||
|
||||
@ -1177,6 +1177,7 @@ type queryCoordConfig struct {
|
||||
EnableRGAutoRecover ParamItem `refreshable:"true"`
|
||||
CheckHealthInterval ParamItem `refreshable:"false"`
|
||||
CheckHealthRPCTimeout ParamItem `refreshable:"true"`
|
||||
BrokerTimeout ParamItem `refreshable:"false"`
|
||||
}
|
||||
|
||||
func (p *queryCoordConfig) init(base *BaseTable) {
|
||||
@ -1468,6 +1469,16 @@ func (p *queryCoordConfig) init(base *BaseTable) {
|
||||
Export: true,
|
||||
}
|
||||
p.CheckHealthRPCTimeout.Init(base.mgr)
|
||||
|
||||
p.BrokerTimeout = ParamItem{
|
||||
Key: "queryCoord.brokerTimeout",
|
||||
Version: "2.3.0",
|
||||
DefaultValue: "5000",
|
||||
PanicIfEmpty: true,
|
||||
Doc: "5000ms, querycoord broker rpc timeout",
|
||||
Export: true,
|
||||
}
|
||||
p.BrokerTimeout.Init(base.mgr)
|
||||
}
|
||||
|
||||
// /////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user