From c507abdeaac56512c4c58bd60c5f1366a8a6188f Mon Sep 17 00:00:00 2001 From: neza2017 Date: Thu, 26 Nov 2020 16:51:44 +0800 Subject: [PATCH] Fix start time sync Signed-off-by: neza2017 --- internal/master/master.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/internal/master/master.go b/internal/master/master.go index 9f5064080a..d88aba2327 100644 --- a/internal/master/master.go +++ b/internal/master/master.go @@ -107,6 +107,9 @@ func CreateServer(ctx context.Context) (*Master, error) { pulsarProxyStream.Start() var proxyStream ms.MsgStream = pulsarProxyStream proxyTimeTickBarrier := newSoftTimeTickBarrier(ctx, &proxyStream, Params.ProxyIDList, Params.SoftTimeTickBarrierInterval) + if err := proxyTimeTickBarrier.Start(); err != nil { + return nil, err + } tsMsgProducer.SetProxyTtBarrier(proxyTimeTickBarrier) pulsarWriteStream := ms.NewPulsarMsgStream(ctx, 1024) //output stream @@ -115,6 +118,9 @@ func CreateServer(ctx context.Context) (*Master, error) { pulsarWriteStream.Start() var writeStream ms.MsgStream = pulsarWriteStream writeTimeTickBarrier := newHardTimeTickBarrier(ctx, &writeStream, Params.WriteNodeIDList) + if err := writeTimeTickBarrier.Start(); err != nil { + return nil, err + } tsMsgProducer.SetWriteNodeTtBarrier(writeTimeTickBarrier) pulsarDMStream := ms.NewPulsarMsgStream(ctx, 1024) //input stream