fix: Lock before reading flusher cp sampling truncate cp (#42019)

Related to #42018

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2025-05-22 21:38:28 +08:00 committed by GitHub
parent c9b0748ff9
commit 244aa30076
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 12 additions and 3 deletions

View File

@ -128,12 +128,13 @@ func (rs *recoveryStorageImpl) persistDirtySnapshot(ctx context.Context, snapsho
} }
func (rs *recoveryStorageImpl) sampleTruncateCheckpoint(checkpoint *WALCheckpoint) { func (rs *recoveryStorageImpl) sampleTruncateCheckpoint(checkpoint *WALCheckpoint) {
if rs.flusherCheckpoint == nil { flusherCP := rs.getFlusherCheckpoint()
if flusherCP == nil {
return return
} }
// use the smaller one to truncate the wal. // use the smaller one to truncate the wal.
if rs.flusherCheckpoint.MessageID.LTE(checkpoint.MessageID) { if flusherCP.MessageID.LTE(checkpoint.MessageID) {
rs.truncator.SampleCheckpoint(rs.flusherCheckpoint) rs.truncator.SampleCheckpoint(flusherCP)
} else { } else {
rs.truncator.SampleCheckpoint(checkpoint) rs.truncator.SampleCheckpoint(checkpoint)
} }

View File

@ -408,3 +408,11 @@ func (r *recoveryStorageImpl) detectInconsistency(msg message.ImmutableMessage,
r.Logger().Warn("inconsistency detected", fields...) r.Logger().Warn("inconsistency detected", fields...)
r.metrics.ObserveInconsitentEvent() r.metrics.ObserveInconsitentEvent()
} }
// getFlusherCheckpoint returns flusher checkpoint concurrent-safe
// NOTE: shall not be called with r.mu.Lock()!
func (r *recoveryStorageImpl) getFlusherCheckpoint() *WALCheckpoint {
r.mu.Lock()
defer r.mu.Unlock()
return r.flusherCheckpoint
}