From e4794675829e624658bb995d4dfe2ee668c5b85e Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Sat, 31 May 2025 22:56:29 +0800 Subject: [PATCH] fix: panic when upgrading from old arch (#42422) issue: #42405 - add delete rows into header when upsert. Signed-off-by: chyezh --- internal/proxy/task_upsert_streaming.go | 1 + .../flusher/flusherimpl/flusher_components.go | 19 ++++++++++--------- .../interceptors/shard/shard_interceptor.go | 9 ++++++++- .../shard/shard_interceptor_test.go | 5 +++++ .../server/wal/utility/context.go | 12 ++++++++++++ .../server/wal/utility/context_test.go | 7 +++++++ tests/integration/datanode/datanode_test.go | 3 +++ 7 files changed, 46 insertions(+), 10 deletions(-) diff --git a/internal/proxy/task_upsert_streaming.go b/internal/proxy/task_upsert_streaming.go index f2ad87aed8..b4423ea81a 100644 --- a/internal/proxy/task_upsert_streaming.go +++ b/internal/proxy/task_upsert_streaming.go @@ -126,6 +126,7 @@ func (it *upsertTaskByStreamingService) packDeleteMessage(ctx context.Context) ( msg, err := message.NewDeleteMessageBuilderV1(). WithHeader(&message.DeleteMessageHeader{ CollectionId: it.upsertMsg.DeleteMsg.CollectionID, + Rows: uint64(it.upsertMsg.DeleteMsg.NumRows), }). WithBody(deleteMsg.DeleteRequest). WithVChannel(vchannel). diff --git a/internal/streamingnode/server/flusher/flusherimpl/flusher_components.go b/internal/streamingnode/server/flusher/flusherimpl/flusher_components.go index 3be3e9284d..fdde3d44cf 100644 --- a/internal/streamingnode/server/flusher/flusherimpl/flusher_components.go +++ b/internal/streamingnode/server/flusher/flusherimpl/flusher_components.go @@ -16,6 +16,7 @@ import ( "github.com/milvus-io/milvus/internal/streamingnode/server/resource" "github.com/milvus-io/milvus/internal/streamingnode/server/wal" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/shard/stats" + "github.com/milvus-io/milvus/internal/streamingnode/server/wal/utility" "github.com/milvus-io/milvus/internal/util/idalloc" "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/mq/msgstream" @@ -206,16 +207,16 @@ func (impl *flusherComponents) buildDataSyncServiceWithRetry(ctx context.Context zap.Int64("partitionID", segment.PartitionId), zap.Int64("segmentID", segment.SegmentId), ) - msg := message.NewFlushMessageBuilderV2(). - WithVChannel(recoverInfo.GetInfo().GetChannelName()). - WithHeader(&message.FlushMessageHeader{ - CollectionId: segment.CollectionId, - PartitionId: segment.PartitionId, - SegmentId: segment.SegmentId, - }). - WithBody(&message.FlushMessageBody{}).MustBuildMutable() if err := retry.Do(ctx, func() error { - appendResult, err := impl.wal.Append(ctx, msg) + msg := message.NewFlushMessageBuilderV2(). + WithVChannel(recoverInfo.GetInfo().GetChannelName()). + WithHeader(&message.FlushMessageHeader{ + CollectionId: segment.CollectionId, + PartitionId: segment.PartitionId, + SegmentId: segment.SegmentId, + }). + WithBody(&message.FlushMessageBody{}).MustBuildMutable() + appendResult, err := impl.wal.Append(utility.WithFlushFromOldArch(ctx), msg) if err != nil { logger.Warn("fail to append flush message for segments that not created by streaming service into wal", zap.Error(err)) return err diff --git a/internal/streamingnode/server/wal/interceptors/shard/shard_interceptor.go b/internal/streamingnode/server/wal/interceptors/shard/shard_interceptor.go index e38d5d37b3..737f5ec82a 100644 --- a/internal/streamingnode/server/wal/interceptors/shard/shard_interceptor.go +++ b/internal/streamingnode/server/wal/interceptors/shard/shard_interceptor.go @@ -13,6 +13,7 @@ import ( "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/txn" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/utility" "github.com/milvus-io/milvus/internal/util/streamingutil/status" + "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/proto/messagespb" "github.com/milvus-io/milvus/pkg/v2/streaming/util/message" ) @@ -260,11 +261,17 @@ func (impl *shardInterceptor) handleCreateSegment(ctx context.Context, msg messa func (impl *shardInterceptor) handleFlushSegment(ctx context.Context, msg message.MutableMessage, appendOp interceptors.Append) (message.MessageID, error) { flushMsg := message.MustAsMutableFlushMessageV2(msg) h := flushMsg.Header() + if utility.GetFlushFromOldArch(ctx) { + // The flush message come from old arch, so it's not managed by shard manager. + // We need to flush it into wal directly. + impl.shardManager.Logger().Info("flush segment from old arch, skip checking of shard manager", log.FieldMessage(msg)) + return appendOp(ctx, msg) + } + if err := impl.shardManager.CheckIfSegmentCanBeFlushed(h.GetCollectionId(), h.GetPartitionId(), h.GetSegmentId()); err != nil { // The segment can not be flushed at current shard, ignored return nil, status.NewUnrecoverableError(err.Error()) } - msgID, err := appendOp(ctx, msg) if err != nil { return nil, err diff --git a/internal/streamingnode/server/wal/interceptors/shard/shard_interceptor_test.go b/internal/streamingnode/server/wal/interceptors/shard/shard_interceptor_test.go index 11dfd9df8d..b80f204561 100644 --- a/internal/streamingnode/server/wal/interceptors/shard/shard_interceptor_test.go +++ b/internal/streamingnode/server/wal/interceptors/shard/shard_interceptor_test.go @@ -163,6 +163,11 @@ func TestShardInterceptor(t *testing.T) { assert.Error(t, err) assert.Nil(t, msgID) + // Flush from old arch should always be allowed. + msgID, err = i.DoAppend(utility.WithFlushFromOldArch(ctx), msg, appender) + assert.NoError(t, err) + assert.NotNil(t, msgID) + ctx = utility.WithExtraAppendResult(ctx, &utility.ExtraAppendResult{}) msg = message.NewManualFlushMessageBuilderV2(). WithVChannel(vchannel). diff --git a/internal/streamingnode/server/wal/utility/context.go b/internal/streamingnode/server/wal/utility/context.go index 135c44396c..76db912548 100644 --- a/internal/streamingnode/server/wal/utility/context.go +++ b/internal/streamingnode/server/wal/utility/context.go @@ -17,6 +17,7 @@ var ( extraAppendResultValue walCtxKey = 1 notPersistedValue walCtxKey = 2 metricsValue walCtxKey = 3 + flushFromOldArchValue walCtxKey = 4 ) // ExtraAppendResult is the extra append result. @@ -86,3 +87,14 @@ func WithAppendMetricsContext(ctx context.Context, m *metricsutil.AppendMetrics) func MustGetAppendMetrics(ctx context.Context) *metricsutil.AppendMetrics { return ctx.Value(metricsValue).(*metricsutil.AppendMetrics) } + +// WithFlushFromOldArch set flush from old arch to context +func WithFlushFromOldArch(ctx context.Context) context.Context { + return context.WithValue(ctx, flushFromOldArchValue, struct{}{}) +} + +// GetFlushFromOldArch get flush from old arch from context +func GetFlushFromOldArch(ctx context.Context) bool { + result := ctx.Value(flushFromOldArchValue) + return result != nil +} diff --git a/internal/streamingnode/server/wal/utility/context_test.go b/internal/streamingnode/server/wal/utility/context_test.go index 9ebc943a6c..f518b05d4a 100644 --- a/internal/streamingnode/server/wal/utility/context_test.go +++ b/internal/streamingnode/server/wal/utility/context_test.go @@ -82,3 +82,10 @@ func TestReplaceAppendResultTxnContext(t *testing.T) { retrievedResult := ctx.Value(extraAppendResultValue).(*ExtraAppendResult) assert.Equal(t, retrievedResult.TxnCtx.TxnID, newTxnCtx.TxnID) } + +func TestWithFlushFromOldArch(t *testing.T) { + ctx := context.Background() + assert.False(t, GetFlushFromOldArch(ctx)) + ctx = WithFlushFromOldArch(ctx) + assert.True(t, GetFlushFromOldArch(ctx)) +} diff --git a/tests/integration/datanode/datanode_test.go b/tests/integration/datanode/datanode_test.go index cb37d9202b..c2fad9ff1d 100644 --- a/tests/integration/datanode/datanode_test.go +++ b/tests/integration/datanode/datanode_test.go @@ -305,5 +305,8 @@ func (s *DataNodeSuite) TestSwapQN() { } func TestDataNodeUtil(t *testing.T) { + // node id conflict when running multiple node + g := integration.WithoutStreamingService() + defer g() suite.Run(t, new(DataNodeSuite)) }