diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 832458c83f..e4cd06052f 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -1417,7 +1417,7 @@ func (s *Server) ReportDataNodeTtMsgs(ctx context.Context, req *datapb.ReportDat } for _, ttMsg := range req.GetMsgs() { - sub := tsoutil.SubByNow(req.GetBase().GetTimestamp()) + sub := tsoutil.SubByNow(ttMsg.GetTimestamp()) metrics.DataCoordConsumeDataNodeTimeTickLag. WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), ttMsg.GetChannelName()). Set(float64(sub)) diff --git a/internal/datanode/timetick_sender.go b/internal/datanode/timetick_sender.go index 16c51ef08d..84dc293d0a 100644 --- a/internal/datanode/timetick_sender.go +++ b/internal/datanode/timetick_sender.go @@ -31,6 +31,7 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/commonpbutil" "github.com/milvus-io/milvus/pkg/util/retry" + "github.com/milvus-io/milvus/pkg/util/tsoutil" ) // timeTickSender is to merge channel states updated by flow graph node and send to datacoord periodically @@ -65,8 +66,8 @@ func (m *timeTickSender) start(ctx context.Context) { case <-ctx.Done(): log.Info("timeTickSender context done") return - case t := <-ticker.C: - m.sendReport(ctx, uint64(t.UnixMilli())) + case <-ticker.C: + m.sendReport(ctx) } } } @@ -154,10 +155,11 @@ func (m *timeTickSender) cleanStatesCache(sendedLastTss map[string]uint64) { log.Debug("timeTickSender channelStatesCaches", zap.Int("sizeAfterClean", len(m.channelStatesCaches))) } -func (m *timeTickSender) sendReport(ctx context.Context, submitTs Timestamp) error { +func (m *timeTickSender) sendReport(ctx context.Context) error { toSendMsgs, sendLastTss := m.mergeDatanodeTtMsg() log.Debug("timeTickSender send datanode timetick message", zap.Any("toSendMsgs", toSendMsgs), zap.Any("sendLastTss", sendLastTss)) err := retry.Do(ctx, func() error { + submitTs := tsoutil.ComposeTSByTime(time.Now(), 0) statusResp, err := m.dataCoord.ReportDataNodeTtMsgs(ctx, &datapb.ReportDataNodeTtMsgsRequest{ Base: commonpbutil.NewMsgBase( commonpbutil.WithMsgType(commonpb.MsgType_DataNodeTt), diff --git a/internal/datanode/timetick_sender_test.go b/internal/datanode/timetick_sender_test.go index def2da794d..03cd786871 100644 --- a/internal/datanode/timetick_sender_test.go +++ b/internal/datanode/timetick_sender_test.go @@ -90,7 +90,7 @@ func TestTimetickManagerNormal(t *testing.T) { } manager.update(channelName2, ts3, segmentStats3) - err := manager.sendReport(ctx, 100) + err := manager.sendReport(ctx) assert.NoError(t, err) _, channelExistAfterSubmit := manager.channelStatesCaches[channelName1] @@ -115,7 +115,7 @@ func TestTimetickManagerNormal(t *testing.T) { } manager.update(channelName3, ts4, segmentStats4) - err = manager.sendReport(ctx, 100) + err = manager.sendReport(ctx) assert.NoError(t, err) _, channelExistAfterSubmit2 := manager.channelStatesCaches[channelName1] @@ -140,7 +140,7 @@ func TestTimetickManagerSendErr(t *testing.T) { } // update first time manager.update(channelName1, ts, segmentStats) - err := manager.sendReport(ctx, 100) + err := manager.sendReport(ctx) assert.Error(t, err) } @@ -159,7 +159,7 @@ func TestTimetickManagerSendNotSuccess(t *testing.T) { } // update first time manager.update(channelName1, ts, segmentStats) - err := manager.sendReport(ctx, 100) + err := manager.sendReport(ctx) assert.Error(t, err) }