From 5365748338df067585e511bb56e9781fa45bb461 Mon Sep 17 00:00:00 2001 From: smellthemoon <64083300+smellthemoon@users.noreply.github.com> Date: Mon, 13 Nov 2023 10:18:17 +0800 Subject: [PATCH] Use single instance for mergedTimeTickerSender (#27730) Signed-off-by: lixinguo Co-authored-by: lixinguo --- .../datanode/flow_graph_insert_buffer_node.go | 2 +- internal/datanode/flow_graph_time_ticker.go | 22 ++++++++++++++----- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index 3e816bfb98..14a1d62121 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -743,7 +743,7 @@ func newInsertBufferNode( wTtMsgStream := wTt wTtMsgStream.EnableProduce(true) - mt := newMergedTimeTickerSender(func(ts Timestamp, segmentIDs []int64) error { + mt := getOrCreateMergedTimeTickerSender(func(ts Timestamp, segmentIDs []int64) error { stats := make([]*commonpb.SegmentStats, 0, len(segmentIDs)) for _, sid := range segmentIDs { stat, err := config.channel.getSegmentStatisticsUpdates(sid) diff --git a/internal/datanode/flow_graph_time_ticker.go b/internal/datanode/flow_graph_time_ticker.go index 50db66bfb6..7c4302b67f 100644 --- a/internal/datanode/flow_graph_time_ticker.go +++ b/internal/datanode/flow_graph_time_ticker.go @@ -46,19 +46,29 @@ type mergedTimeTickerSender struct { closeOnce sync.Once } -func newMergedTimeTickerSender(send sendTimeTick) *mergedTimeTickerSender { - mt := &mergedTimeTickerSender{ +var ( + uniqueMergedTimeTickerSender *mergedTimeTickerSender + getUniqueMergedTimeTickerSender sync.Once +) + +func newUniqueMergedTimeTickerSender(send sendTimeTick) *mergedTimeTickerSender { + return &mergedTimeTickerSender{ ts: 0, // 0 for not tt send segmentIDs: make(map[int64]struct{}), cond: sync.NewCond(&sync.Mutex{}), send: send, closeCh: make(chan struct{}), } - mt.wg.Add(2) - go mt.tick() - go mt.work() +} - return mt +func getOrCreateMergedTimeTickerSender(send sendTimeTick) *mergedTimeTickerSender { + getUniqueMergedTimeTickerSender.Do(func() { + uniqueMergedTimeTickerSender = newUniqueMergedTimeTickerSender(send) + }) + uniqueMergedTimeTickerSender.wg.Add(2) + go uniqueMergedTimeTickerSender.tick() + go uniqueMergedTimeTickerSender.work() + return uniqueMergedTimeTickerSender } func (mt *mergedTimeTickerSender) bufferTs(ts Timestamp, segmentIDs []int64) {