diff --git a/internal/querycoordv2/observers/leader_observer.go b/internal/querycoordv2/observers/leader_observer.go index 8192c7274e..fc3784c5af 100644 --- a/internal/querycoordv2/observers/leader_observer.go +++ b/internal/querycoordv2/observers/leader_observer.go @@ -33,11 +33,6 @@ import ( "github.com/milvus-io/milvus/pkg/util/paramtable" ) -const ( - interval = 1 * time.Second - RPCTimeout = 3 * time.Second -) - // LeaderObserver is to sync the distribution with leader type LeaderObserver struct { wg sync.WaitGroup @@ -79,7 +74,7 @@ func (o *LeaderObserver) Stop() { } func (o *LeaderObserver) schedule(ctx context.Context) { - ticker := time.NewTicker(interval) + ticker := time.NewTicker(paramtable.Get().QueryCoordCfg.LeaderViewUpdateInterval.GetAsDuration(time.Second)) defer ticker.Stop() for { select { diff --git a/internal/querycoordv2/session/cluster.go b/internal/querycoordv2/session/cluster.go index 1a2852d7dc..2a70dd6211 100644 --- a/internal/querycoordv2/session/cluster.go +++ b/internal/querycoordv2/session/cluster.go @@ -32,12 +32,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/pkg/log" -) - -const ( - updateTickerDuration = 1 * time.Minute - segmentBufferSize = 16 - bufferFlushPeriod = 500 * time.Millisecond + "github.com/milvus-io/milvus/pkg/util/paramtable" ) var ErrNodeNotFound = errors.New("NodeNotFound") @@ -100,7 +95,7 @@ func (c *QueryCluster) Stop() { func (c *QueryCluster) updateLoop() { defer c.wg.Done() - ticker := time.NewTicker(updateTickerDuration) + ticker := time.NewTicker(paramtable.Get().QueryCoordCfg.CheckNodeSessionInterval.GetAsDuration(time.Second)) defer ticker.Stop() for { select { diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 432c30f0c5..a2b3078722 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -1247,6 +1247,7 @@ type queryCoordConfig struct { UpdateNextTargetInterval ParamItem `refreshable:"false"` CheckNodeInReplicaInterval ParamItem `refreshable:"false"` CheckResourceGroupInterval ParamItem `refreshable:"false"` + LeaderViewUpdateInterval ParamItem `refreshable:"false"` EnableRGAutoRecover ParamItem `refreshable:"true"` CheckHealthInterval ParamItem `refreshable:"false"` CheckHealthRPCTimeout ParamItem `refreshable:"true"` @@ -1254,6 +1255,7 @@ type queryCoordConfig struct { CollectionRecoverTimesLimit ParamItem `refreshable:"true"` ObserverTaskParallel ParamItem `refreshable:"false"` CheckAutoBalanceConfigInterval ParamItem `refreshable:"false"` + CheckNodeSessionInterval ParamItem `refreshable:"false"` } func (p *queryCoordConfig) init(base *BaseTable) { @@ -1518,6 +1520,15 @@ func (p *queryCoordConfig) init(base *BaseTable) { } p.CheckResourceGroupInterval.Init(base.mgr) + p.LeaderViewUpdateInterval = ParamItem{ + Key: "queryCoord.leaderViewUpdateInterval", + Doc: "the interval duration(in seconds) for LeaderObserver to fetch LeaderView from querynodes", + Version: "2.3.4", + DefaultValue: "1", + PanicIfEmpty: true, + } + p.LeaderViewUpdateInterval.Init(base.mgr) + p.EnableRGAutoRecover = ParamItem{ Key: "queryCoord.enableRGAutoRecover", Version: "2.2.3", @@ -1585,6 +1596,16 @@ func (p *queryCoordConfig) init(base *BaseTable) { Export: true, } p.CheckAutoBalanceConfigInterval.Init(base.mgr) + + p.CheckNodeSessionInterval = ParamItem{ + Key: "queryCoord.checkNodeSessionInterval", + Version: "2.3.4", + DefaultValue: "60", + PanicIfEmpty: true, + Doc: "the interval(in seconds) of check querynode cluster session", + Export: true, + } + p.CheckNodeSessionInterval.Init(base.mgr) } // /////////////////////////////////////////////////////////////////////////////