From 9ce5f08cc7c6f09d0ebf0b1470fc7cdd1dbee445 Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Mon, 15 Dec 2025 13:59:15 +0800 Subject: [PATCH] fix: lost broadcasting persisted before making message broadcast (#46328) issue: #43897 Signed-off-by: chyezh --- internal/streamingcoord/server/broadcaster/broadcast_task.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/streamingcoord/server/broadcaster/broadcast_task.go b/internal/streamingcoord/server/broadcaster/broadcast_task.go index d7dee9e699..20cfbf5207 100644 --- a/internal/streamingcoord/server/broadcaster/broadcast_task.go +++ b/internal/streamingcoord/server/broadcaster/broadcast_task.go @@ -28,7 +28,7 @@ func newBroadcastTaskFromProto(proto *streamingpb.BroadcastTask, metrics *broadc taskMetricsGuard: m, msg: msg, task: proto, - dirty: true, // the task is recovered from the recovery info, so it's persisted. + dirty: false, // the task is recovered from the recovery info, so it's persisted. ackCallbackScheduler: ackCallbackScheduler, done: make(chan struct{}), allAcked: make(chan struct{}), @@ -76,7 +76,7 @@ func newBroadcastTaskFromBroadcastMessage(msg message.BroadcastMutableMessage, m AckedVchannelBitmap: make([]byte, len(header.VChannels)), AckedCheckpoints: make([]*streamingpb.AckedCheckpoint, len(header.VChannels)), }, - dirty: false, + dirty: true, ackCallbackScheduler: ackCallbackScheduler, done: make(chan struct{}), allAcked: make(chan struct{}),