diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index 3e816bfb98..14a1d62121 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -743,7 +743,7 @@ func newInsertBufferNode( wTtMsgStream := wTt wTtMsgStream.EnableProduce(true) - mt := newMergedTimeTickerSender(func(ts Timestamp, segmentIDs []int64) error { + mt := getOrCreateMergedTimeTickerSender(func(ts Timestamp, segmentIDs []int64) error { stats := make([]*commonpb.SegmentStats, 0, len(segmentIDs)) for _, sid := range segmentIDs { stat, err := config.channel.getSegmentStatisticsUpdates(sid) diff --git a/internal/datanode/flow_graph_time_ticker.go b/internal/datanode/flow_graph_time_ticker.go index 50db66bfb6..7c4302b67f 100644 --- a/internal/datanode/flow_graph_time_ticker.go +++ b/internal/datanode/flow_graph_time_ticker.go @@ -46,19 +46,29 @@ type mergedTimeTickerSender struct { closeOnce sync.Once } -func newMergedTimeTickerSender(send sendTimeTick) *mergedTimeTickerSender { - mt := &mergedTimeTickerSender{ +var ( + uniqueMergedTimeTickerSender *mergedTimeTickerSender + getUniqueMergedTimeTickerSender sync.Once +) + +func newUniqueMergedTimeTickerSender(send sendTimeTick) *mergedTimeTickerSender { + return &mergedTimeTickerSender{ ts: 0, // 0 for not tt send segmentIDs: make(map[int64]struct{}), cond: sync.NewCond(&sync.Mutex{}), send: send, closeCh: make(chan struct{}), } - mt.wg.Add(2) - go mt.tick() - go mt.work() +} - return mt +func getOrCreateMergedTimeTickerSender(send sendTimeTick) *mergedTimeTickerSender { + getUniqueMergedTimeTickerSender.Do(func() { + uniqueMergedTimeTickerSender = newUniqueMergedTimeTickerSender(send) + }) + uniqueMergedTimeTickerSender.wg.Add(2) + go uniqueMergedTimeTickerSender.tick() + go uniqueMergedTimeTickerSender.work() + return uniqueMergedTimeTickerSender } func (mt *mergedTimeTickerSender) bufferTs(ts Timestamp, segmentIDs []int64) {