diff --git a/internal/datanode/flow_graph_time_ticker.go b/internal/datanode/flow_graph_time_ticker.go index 38da1798eb..45c8239129 100644 --- a/internal/datanode/flow_graph_time_ticker.go +++ b/internal/datanode/flow_graph_time_ticker.go @@ -74,13 +74,13 @@ func (mt *mergedTimeTickerSender) bufferTs(ts Timestamp, segmentIDs []int64) { func (mt *mergedTimeTickerSender) tick() { defer mt.wg.Done() // this duration might be configuable in the future - t := time.NewTicker(time.Millisecond * 100) // 100 millisecond, 1/2 of rootcoord timetick duration + t := time.NewTicker(Params.DataNodeCfg.DataNodeTimeTickInterval.GetAsDuration(time.Millisecond)) // 500 millisecond defer t.Stop() for { select { case <-t.C: mt.cond.L.Lock() - mt.cond.Signal() // allow worker to check every 0.1s + mt.cond.Signal() mt.cond.L.Unlock() case <-mt.closeCh: return diff --git a/internal/util/paramtable/component_param.go b/internal/util/paramtable/component_param.go index 2a7e13c024..4ca94eebf3 100644 --- a/internal/util/paramtable/component_param.go +++ b/internal/util/paramtable/component_param.go @@ -2015,6 +2015,9 @@ type dataNodeConfig struct { MemoryForceSyncEnable ParamItem `refreshable:"true"` MemoryForceSyncSegmentNum ParamItem `refreshable:"true"` MemoryWatermark ParamItem `refreshable:"true"` + + // DataNode send timetick interval per collection + DataNodeTimeTickInterval ParamItem `refreshable:"false"` } func (p *dataNodeConfig) init(base *BaseTable) { @@ -2122,6 +2125,13 @@ func (p *dataNodeConfig) init(base *BaseTable) { } p.IOConcurrency.Init(base.mgr) + p.DataNodeTimeTickInterval = ParamItem{ + Key: "datanode.timetick.interval", + Version: "2.2.5", + PanicIfEmpty: false, + DefaultValue: "500", + } + p.DataNodeTimeTickInterval.Init(base.mgr) } // /////////////////////////////////////////////////////////////////////////////