diff --git a/internal/master/master.go b/internal/master/master.go index 9f5064080a..d88aba2327 100644 --- a/internal/master/master.go +++ b/internal/master/master.go @@ -107,6 +107,9 @@ func CreateServer(ctx context.Context) (*Master, error) { pulsarProxyStream.Start() var proxyStream ms.MsgStream = pulsarProxyStream proxyTimeTickBarrier := newSoftTimeTickBarrier(ctx, &proxyStream, Params.ProxyIDList, Params.SoftTimeTickBarrierInterval) + if err := proxyTimeTickBarrier.Start(); err != nil { + return nil, err + } tsMsgProducer.SetProxyTtBarrier(proxyTimeTickBarrier) pulsarWriteStream := ms.NewPulsarMsgStream(ctx, 1024) //output stream @@ -115,6 +118,9 @@ func CreateServer(ctx context.Context) (*Master, error) { pulsarWriteStream.Start() var writeStream ms.MsgStream = pulsarWriteStream writeTimeTickBarrier := newHardTimeTickBarrier(ctx, &writeStream, Params.WriteNodeIDList) + if err := writeTimeTickBarrier.Start(); err != nil { + return nil, err + } tsMsgProducer.SetWriteNodeTtBarrier(writeTimeTickBarrier) pulsarDMStream := ms.NewPulsarMsgStream(ctx, 1024) //input stream