From cb11abe684a2e90bba8baa35a2d5d6b1ca5c975e Mon Sep 17 00:00:00 2001 From: Xiaofan <83447078+xiaofan-luan@users.noreply.github.com> Date: Fri, 24 Mar 2023 22:24:03 +0800 Subject: [PATCH] Make datanode tt interval configurable (#22957) Signed-off-by: xiaofan-luan --- internal/datanode/flow_graph_time_ticker.go | 4 ++-- internal/util/paramtable/component_param.go | 8 ++++++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/internal/datanode/flow_graph_time_ticker.go b/internal/datanode/flow_graph_time_ticker.go index 255db2915a..60e458dd7d 100644 --- a/internal/datanode/flow_graph_time_ticker.go +++ b/internal/datanode/flow_graph_time_ticker.go @@ -72,13 +72,13 @@ func (mt *mergedTimeTickerSender) bufferTs(ts Timestamp, segmentIDs []int64) { func (mt *mergedTimeTickerSender) tick() { defer mt.wg.Done() // this duration might be configuable in the future - t := time.NewTicker(time.Millisecond * 100) // 100 millisecond, 1/2 of rootcoord timetick duration + t := time.NewTicker(time.Duration(Params.DataNodeCfg.DataNodeTimeTickInterval) * time.Millisecond) // 500 millisecond defer t.Stop() for { select { case <-t.C: mt.cond.L.Lock() - mt.cond.Signal() // allow worker to check every 0.1s + mt.cond.Signal() mt.cond.L.Unlock() case <-mt.closeCh: return diff --git a/internal/util/paramtable/component_param.go b/internal/util/paramtable/component_param.go index 6348611113..d527c41501 100644 --- a/internal/util/paramtable/component_param.go +++ b/internal/util/paramtable/component_param.go @@ -1564,6 +1564,9 @@ type dataNodeConfig struct { // io concurrency to fetch stats logs IOConcurrency int + // datanote send timetick interval per channel + DataNodeTimeTickInterval int + CreatedTime time.Time UpdatedTime time.Time @@ -1584,6 +1587,7 @@ func (p *dataNodeConfig) init(base *BaseTable) { p.initBinlogMaxSize() p.initSyncPeriod() p.initIOConcurrency() + p.initDataNodeTimeTickInterval() p.initChannelWatchPath() p.initMemoryForceSyncEnable() @@ -1645,6 +1649,10 @@ func (p *dataNodeConfig) initIOConcurrency() { p.IOConcurrency = p.Base.ParseIntWithDefault("dataNode.dataSync.ioConcurrency", 10) } +func (p *dataNodeConfig) initDataNodeTimeTickInterval() { + p.DataNodeTimeTickInterval = p.Base.ParseIntWithDefault("datanode.timetick.interval", 500) +} + func (p *dataNodeConfig) SetNodeID(id UniqueID) { p.NodeID.Store(id) }