fix: Fix missing handling of FlushAllMsg in recovery storage (#46802)

issue: https://github.com/milvus-io/milvus/issues/46799

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
yihao.dai 2026-01-06 22:45:26 +08:00 committed by GitHub
parent cc7652327d
commit 746c8653cc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 41 additions and 3 deletions

View File

@ -314,6 +314,9 @@ func (r *recoveryStorageImpl) handleMessage(msg message.ImmutableMessage) {
case message.MessageTypeManualFlush:
immutableMsg := message.MustAsImmutableManualFlushMessageV2(msg)
r.handleManualFlush(immutableMsg)
case message.MessageTypeFlushAll:
immutableMsg := message.MustAsImmutableFlushAllMessageV2(msg)
r.handleFlushAll(immutableMsg)
case message.MessageTypeCreateCollection:
immutableMsg := message.MustAsImmutableCreateCollectionMessageV1(msg)
r.handleCreateCollection(immutableMsg)
@ -386,13 +389,21 @@ func (r *recoveryStorageImpl) handleManualFlush(msg message.ImmutableManualFlush
r.flushSegments(msg, segments)
}
// handleFlushAll handles the flush all message.
func (r *recoveryStorageImpl) handleFlushAll(msg message.ImmutableFlushAllMessageV2) {
segments := lo.MapValues(r.segments, func(segment *segmentRecoveryInfo, _ int64) struct{} {
return struct{}{}
})
r.flushSegments(msg, segments)
}
// flushSegments flushes the segments in the recovery storage.
func (r *recoveryStorageImpl) flushSegments(msg message.ImmutableMessage, sealSegmentIDs map[int64]struct{}) {
segmentIDs := make([]int64, 0)
rows := make([]uint64, 0)
binarySize := make([]uint64, 0)
for _, segment := range r.segments {
if _, ok := sealSegmentIDs[segment.meta.SegmentId]; ok {
for segmentID := range sealSegmentIDs {
if segment, ok := r.segments[segmentID]; ok {
segment.ObserveFlush(msg.TimeTick())
segmentIDs = append(segmentIDs, segment.meta.SegmentId)
rows = append(rows, segment.Rows())
@ -402,7 +413,11 @@ func (r *recoveryStorageImpl) flushSegments(msg message.ImmutableMessage, sealSe
if len(segmentIDs) != len(sealSegmentIDs) {
r.detectInconsistency(msg, "flush segments not exist", zap.Int64s("wanted", lo.Keys(sealSegmentIDs)), zap.Int64s("actually", segmentIDs))
}
r.Logger().Info("flush all segments of collection by manual flush", log.FieldMessage(msg), zap.Uint64s("rows", rows), zap.Uint64s("binarySize", binarySize))
r.Logger().Info("flush segments of collection by flush", log.FieldMessage(msg),
zap.Uint64s("rows", rows),
zap.Uint64s("binarySize", binarySize),
zap.Int("flushedSegmentCount", len(segmentIDs)),
)
}
// handleCreateCollection handles the create collection message.

View File

@ -275,6 +275,7 @@ func (b *streamBuilder) generateStreamMessage() []message.ImmutableMessage {
{op: b.createDelete, rate: 5},
{op: b.createTxn, rate: 5},
{op: b.createManualFlush, rate: 2},
{op: b.createFlushAll, rate: 2},
{op: b.createSchemaChange, rate: 1},
{op: b.createTruncateCollection, rate: 1},
}
@ -566,6 +567,28 @@ func (b *streamBuilder) createManualFlush() message.ImmutableMessage {
return nil
}
func (b *streamBuilder) createFlushAll() message.ImmutableMessage {
if rand.Int31n(3) < 1 {
return nil
}
for _, collection := range b.collectionIDs {
for partitionID := range collection {
for segmentID := range collection[partitionID] {
delete(collection[partitionID], segmentID)
}
}
}
b.nextMessage()
return message.NewFlushAllMessageBuilderV2().
WithVChannel(b.channel.Name).
WithHeader(&message.FlushAllMessageHeader{}).
WithBody(&message.FlushAllMessageBody{}).
MustBuildMutable().
WithTimeTick(b.timetick).
WithLastConfirmed(rmq.NewRmqID(b.lastConfirmedMessageID)).
IntoImmutableMessage(rmq.NewRmqID(b.messageID))
}
func (b *streamBuilder) createSchemaChange() message.ImmutableMessage {
for collectionID, collection := range b.collectionIDs {
if rand.Int31n(3) < 1 {