From 746c8653cca6e71feeb16885ea461d5f39e187d3 Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Tue, 6 Jan 2026 22:45:26 +0800 Subject: [PATCH] fix: Fix missing handling of FlushAllMsg in recovery storage (#46802) issue: https://github.com/milvus-io/milvus/issues/46799 --------- Signed-off-by: bigsheeper --- .../wal/recovery/recovery_storage_impl.go | 21 ++++++++++++++--- .../wal/recovery/recovery_storage_test.go | 23 +++++++++++++++++++ 2 files changed, 41 insertions(+), 3 deletions(-) diff --git a/internal/streamingnode/server/wal/recovery/recovery_storage_impl.go b/internal/streamingnode/server/wal/recovery/recovery_storage_impl.go index 370e69c54f..dd3256139e 100644 --- a/internal/streamingnode/server/wal/recovery/recovery_storage_impl.go +++ b/internal/streamingnode/server/wal/recovery/recovery_storage_impl.go @@ -314,6 +314,9 @@ func (r *recoveryStorageImpl) handleMessage(msg message.ImmutableMessage) { case message.MessageTypeManualFlush: immutableMsg := message.MustAsImmutableManualFlushMessageV2(msg) r.handleManualFlush(immutableMsg) + case message.MessageTypeFlushAll: + immutableMsg := message.MustAsImmutableFlushAllMessageV2(msg) + r.handleFlushAll(immutableMsg) case message.MessageTypeCreateCollection: immutableMsg := message.MustAsImmutableCreateCollectionMessageV1(msg) r.handleCreateCollection(immutableMsg) @@ -386,13 +389,21 @@ func (r *recoveryStorageImpl) handleManualFlush(msg message.ImmutableManualFlush r.flushSegments(msg, segments) } +// handleFlushAll handles the flush all message. +func (r *recoveryStorageImpl) handleFlushAll(msg message.ImmutableFlushAllMessageV2) { + segments := lo.MapValues(r.segments, func(segment *segmentRecoveryInfo, _ int64) struct{} { + return struct{}{} + }) + r.flushSegments(msg, segments) +} + // flushSegments flushes the segments in the recovery storage. func (r *recoveryStorageImpl) flushSegments(msg message.ImmutableMessage, sealSegmentIDs map[int64]struct{}) { segmentIDs := make([]int64, 0) rows := make([]uint64, 0) binarySize := make([]uint64, 0) - for _, segment := range r.segments { - if _, ok := sealSegmentIDs[segment.meta.SegmentId]; ok { + for segmentID := range sealSegmentIDs { + if segment, ok := r.segments[segmentID]; ok { segment.ObserveFlush(msg.TimeTick()) segmentIDs = append(segmentIDs, segment.meta.SegmentId) rows = append(rows, segment.Rows()) @@ -402,7 +413,11 @@ func (r *recoveryStorageImpl) flushSegments(msg message.ImmutableMessage, sealSe if len(segmentIDs) != len(sealSegmentIDs) { r.detectInconsistency(msg, "flush segments not exist", zap.Int64s("wanted", lo.Keys(sealSegmentIDs)), zap.Int64s("actually", segmentIDs)) } - r.Logger().Info("flush all segments of collection by manual flush", log.FieldMessage(msg), zap.Uint64s("rows", rows), zap.Uint64s("binarySize", binarySize)) + r.Logger().Info("flush segments of collection by flush", log.FieldMessage(msg), + zap.Uint64s("rows", rows), + zap.Uint64s("binarySize", binarySize), + zap.Int("flushedSegmentCount", len(segmentIDs)), + ) } // handleCreateCollection handles the create collection message. diff --git a/internal/streamingnode/server/wal/recovery/recovery_storage_test.go b/internal/streamingnode/server/wal/recovery/recovery_storage_test.go index dfabbc51db..edeb7892e6 100644 --- a/internal/streamingnode/server/wal/recovery/recovery_storage_test.go +++ b/internal/streamingnode/server/wal/recovery/recovery_storage_test.go @@ -275,6 +275,7 @@ func (b *streamBuilder) generateStreamMessage() []message.ImmutableMessage { {op: b.createDelete, rate: 5}, {op: b.createTxn, rate: 5}, {op: b.createManualFlush, rate: 2}, + {op: b.createFlushAll, rate: 2}, {op: b.createSchemaChange, rate: 1}, {op: b.createTruncateCollection, rate: 1}, } @@ -566,6 +567,28 @@ func (b *streamBuilder) createManualFlush() message.ImmutableMessage { return nil } +func (b *streamBuilder) createFlushAll() message.ImmutableMessage { + if rand.Int31n(3) < 1 { + return nil + } + for _, collection := range b.collectionIDs { + for partitionID := range collection { + for segmentID := range collection[partitionID] { + delete(collection[partitionID], segmentID) + } + } + } + b.nextMessage() + return message.NewFlushAllMessageBuilderV2(). + WithVChannel(b.channel.Name). + WithHeader(&message.FlushAllMessageHeader{}). + WithBody(&message.FlushAllMessageBody{}). + MustBuildMutable(). + WithTimeTick(b.timetick). + WithLastConfirmed(rmq.NewRmqID(b.lastConfirmedMessageID)). + IntoImmutableMessage(rmq.NewRmqID(b.messageID)) +} + func (b *streamBuilder) createSchemaChange() message.ImmutableMessage { for collectionID, collection := range b.collectionIDs { if rand.Int31n(3) < 1 {