From 9345caa135495177b87c41c7951224f607e51f25 Mon Sep 17 00:00:00 2001 From: tinswzy Date: Sun, 21 Dec 2025 19:01:17 +0800 Subject: [PATCH] fix: call truncate when checkpoint is persisted (#46382) issue: #44434 Signed-off-by: tinswzy --- .../wal/recovery/recovery_background_task.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/internal/streamingnode/server/wal/recovery/recovery_background_task.go b/internal/streamingnode/server/wal/recovery/recovery_background_task.go index 9e7fcdbe87..c91728a687 100644 --- a/internal/streamingnode/server/wal/recovery/recovery_background_task.go +++ b/internal/streamingnode/server/wal/recovery/recovery_background_task.go @@ -143,9 +143,23 @@ func (rs *recoveryStorageImpl) persistDirtySnapshot(ctx context.Context, lvl zap // sample the checkpoint for truncator to make wal truncation. rs.metrics.ObServePersistedMetrics(snapshot.Checkpoint.TimeTick) + rs.simpleTruncateCheckpoint(ctx, snapshot.Checkpoint) return } +func (rs *recoveryStorageImpl) simpleTruncateCheckpoint(ctx context.Context, checkpoint *WALCheckpoint) { + flusherCP := rs.getFlusherCheckpoint() + if flusherCP == nil { + return + } + // use the smaller one to truncate the wal. + if flusherCP.MessageID.LTE(checkpoint.MessageID) { + _ = rs.truncator.Truncate(ctx, flusherCP.MessageID) + } else { + _ = rs.truncator.Truncate(ctx, checkpoint.MessageID) + } +} + // dropAllVirtualChannel drops all virtual channels that are in the dropped state. // TODO: DropVirtualChannel will be called twice here, // call it in recovery storage is used to promise the drop virtual channel must be called after recovery.