diff --git a/internal/streamingnode/server/wal/recovery/recovery_background_task.go b/internal/streamingnode/server/wal/recovery/recovery_background_task.go index 9e7fcdbe87..c91728a687 100644 --- a/internal/streamingnode/server/wal/recovery/recovery_background_task.go +++ b/internal/streamingnode/server/wal/recovery/recovery_background_task.go @@ -143,9 +143,23 @@ func (rs *recoveryStorageImpl) persistDirtySnapshot(ctx context.Context, lvl zap // sample the checkpoint for truncator to make wal truncation. rs.metrics.ObServePersistedMetrics(snapshot.Checkpoint.TimeTick) + rs.simpleTruncateCheckpoint(ctx, snapshot.Checkpoint) return } +func (rs *recoveryStorageImpl) simpleTruncateCheckpoint(ctx context.Context, checkpoint *WALCheckpoint) { + flusherCP := rs.getFlusherCheckpoint() + if flusherCP == nil { + return + } + // use the smaller one to truncate the wal. + if flusherCP.MessageID.LTE(checkpoint.MessageID) { + _ = rs.truncator.Truncate(ctx, flusherCP.MessageID) + } else { + _ = rs.truncator.Truncate(ctx, checkpoint.MessageID) + } +} + // dropAllVirtualChannel drops all virtual channels that are in the dropped state. // TODO: DropVirtualChannel will be called twice here, // call it in recovery storage is used to promise the drop virtual channel must be called after recovery.