diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 82961813f8..256bd3c397 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -355,8 +355,9 @@ func (node *DataNode) Start() error { go node.compactionExecutor.start(node.ctx) if Params.DataNodeCfg.DataNodeTimeTickByRPC.GetAsBool() { - node.timeTickSender = newTimeTickSender(node.broker, node.session.ServerID) - go node.timeTickSender.start(node.ctx) + node.timeTickSender = newTimeTickSender(node.broker, node.session.ServerID, + retry.Attempts(20), retry.Sleep(time.Millisecond*100)) + node.timeTickSender.start() } node.stopWaiter.Add(1) @@ -420,6 +421,10 @@ func (node *DataNode) Stop() error { node.session.Stop() } + if node.timeTickSender != nil { + node.timeTickSender.Stop() + } + node.stopWaiter.Wait() }) return nil diff --git a/internal/datanode/event_manager_test.go b/internal/datanode/event_manager_test.go index 05d8bc4013..4c31a62715 100644 --- a/internal/datanode/event_manager_test.go +++ b/internal/datanode/event_manager_test.go @@ -71,6 +71,7 @@ func TestWatchChannel(t *testing.T) { node.broker = broker + node.timeTickSender.Stop() node.timeTickSender = newTimeTickSender(node.broker, 0) t.Run("test watch channel", func(t *testing.T) { diff --git a/internal/datanode/timetick_sender.go b/internal/datanode/timetick_sender.go index f1957409e2..a73406e54c 100644 --- a/internal/datanode/timetick_sender.go +++ b/internal/datanode/timetick_sender.go @@ -38,6 +38,11 @@ type timeTickSender struct { nodeID int64 broker broker.Broker + wg sync.WaitGroup + cancelFunc context.CancelFunc + + options []retry.Option + mu sync.Mutex channelStatesCaches map[string]*segmentStatesSequence // string -> *segmentStatesSequence } @@ -47,15 +52,33 @@ type segmentStatesSequence struct { data map[uint64][]*commonpb.SegmentStats // ts -> segmentStats } -func newTimeTickSender(broker broker.Broker, nodeID int64) *timeTickSender { +func newTimeTickSender(broker broker.Broker, nodeID int64, opts ...retry.Option) *timeTickSender { return &timeTickSender{ nodeID: nodeID, broker: broker, channelStatesCaches: make(map[string]*segmentStatesSequence, 0), + options: opts, } } -func (m *timeTickSender) start(ctx context.Context) { +func (m *timeTickSender) start() { + m.wg.Add(1) + ctx, cancel := context.WithCancel(context.Background()) + m.cancelFunc = cancel + go func() { + defer m.wg.Done() + m.work(ctx) + }() +} + +func (m *timeTickSender) Stop() { + if m.cancelFunc != nil { + m.cancelFunc() + m.wg.Wait() + } +} + +func (m *timeTickSender) work(ctx context.Context) { ticker := time.NewTicker(Params.DataNodeCfg.DataNodeTimeTickInterval.GetAsDuration(time.Millisecond)) defer ticker.Stop() for { @@ -157,7 +180,7 @@ func (m *timeTickSender) sendReport(ctx context.Context) error { log.RatedDebug(30, "timeTickSender send datanode timetick message", zap.Any("toSendMsgs", toSendMsgs), zap.Any("sendLastTss", sendLastTss)) err := retry.Do(ctx, func() error { return m.broker.ReportTimeTick(ctx, toSendMsgs) - }, retry.Attempts(20), retry.Sleep(time.Millisecond*100)) + }, m.options...) if err != nil { log.Error("ReportDataNodeTtMsgs fail after retry", zap.Error(err)) return err diff --git a/internal/datanode/timetick_sender_test.go b/internal/datanode/timetick_sender_test.go index 1f96168643..445e8a0a9c 100644 --- a/internal/datanode/timetick_sender_test.go +++ b/internal/datanode/timetick_sender_test.go @@ -31,6 +31,7 @@ import ( "github.com/milvus-io/milvus/internal/datanode/broker" "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/retry" ) func TestTimetickManagerNormal(t *testing.T) { @@ -138,7 +139,7 @@ func TestTimetickManagerSendErr(t *testing.T) { broker := broker.NewMockBroker(t) broker.EXPECT().ReportTimeTick(mock.Anything, mock.Anything).Return(errors.New("mock")).Maybe() - manager := newTimeTickSender(broker, 0) + manager := newTimeTickSender(broker, 0, retry.Attempts(1)) channelName1 := "channel1" ts := uint64(time.Now().Unix()) @@ -156,8 +157,6 @@ func TestTimetickManagerSendErr(t *testing.T) { } func TestTimetickManagerSendReport(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() mockDataCoord := mocks.NewMockDataCoordClient(t) called := atomic.NewBool(false) @@ -170,9 +169,11 @@ func TestTimetickManagerSendReport(t *testing.T) { Return(nil) mockDataCoord.EXPECT().ReportDataNodeTtMsgs(mock.Anything, mock.Anything).Return(merr.Status(nil), nil).Maybe() manager := newTimeTickSender(broker, 0) - go manager.start(ctx) + manager.start() assert.Eventually(t, func() bool { return called.Load() }, 2*time.Second, 500*time.Millisecond) + + manager.Stop() }