From 490c5d5088ca992e61197e4f5052af841c842bb9 Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Thu, 10 Jul 2025 10:36:48 +0800 Subject: [PATCH] fix: lost message version after compatible message modification (#43217) issue: #43018 Signed-off-by: chyezh --- .../server/wal/adaptor/old_version_message.go | 5 +- .../wal/adaptor/old_version_message_test.go | 1 + .../util/mock_message/mock_MutableMessage.go | 47 +++++++++++++++++++ pkg/streaming/util/message/adaptor/message.go | 2 +- .../util/message/adaptor/message_test.go | 2 +- pkg/streaming/util/message/message.go | 5 ++ .../util/message/message_builder_test.go | 2 + pkg/streaming/util/message/message_impl.go | 6 +++ 8 files changed, 67 insertions(+), 3 deletions(-) diff --git a/internal/streamingnode/server/wal/adaptor/old_version_message.go b/internal/streamingnode/server/wal/adaptor/old_version_message.go index de648c0384..4e79cba423 100644 --- a/internal/streamingnode/server/wal/adaptor/old_version_message.go +++ b/internal/streamingnode/server/wal/adaptor/old_version_message.go @@ -57,7 +57,10 @@ func newOldVersionImmutableMessage( if err != nil { return nil, err } - return mutableMessage.WithLastConfirmed(lastConfirmedMessageID).IntoImmutableMessage(msg.MessageID()), nil + return mutableMessage. + WithLastConfirmed(lastConfirmedMessageID). + WithOldVersion(). + IntoImmutableMessage(msg.MessageID()), nil } // newV1CreateCollectionMsgFromV0 creates a new create collection message from the old version create collection message. diff --git a/internal/streamingnode/server/wal/adaptor/old_version_message_test.go b/internal/streamingnode/server/wal/adaptor/old_version_message_test.go index 3c219e7ef1..9cdb12eb1b 100644 --- a/internal/streamingnode/server/wal/adaptor/old_version_message_test.go +++ b/internal/streamingnode/server/wal/adaptor/old_version_message_test.go @@ -53,6 +53,7 @@ func TestNewOldVersionImmutableMessage(t *testing.T) { msg, err := newOldVersionImmutableMessage(ctx, pchannel, lastConfirmedMessageID, message.NewImmutableMesasge(messageID, payload, map[string]string{})) assert.NoError(t, err) + assert.Equal(t, message.VersionOld, msg.Version()) assert.NotNil(t, msg.LastConfirmedMessageID()) assert.Equal(t, msg.VChannel(), "test1-v0") assert.Equal(t, msg.TimeTick(), tt) diff --git a/pkg/mocks/streaming/util/mock_message/mock_MutableMessage.go b/pkg/mocks/streaming/util/mock_message/mock_MutableMessage.go index 2b3420583e..4a128e649c 100644 --- a/pkg/mocks/streaming/util/mock_message/mock_MutableMessage.go +++ b/pkg/mocks/streaming/util/mock_message/mock_MutableMessage.go @@ -762,6 +762,53 @@ func (_c *MockMutableMessage_WithLastConfirmedUseMessageID_Call) RunAndReturn(ru return _c } +// WithOldVersion provides a mock function with no fields +func (_m *MockMutableMessage) WithOldVersion() message.MutableMessage { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for WithOldVersion") + } + + var r0 message.MutableMessage + if rf, ok := ret.Get(0).(func() message.MutableMessage); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(message.MutableMessage) + } + } + + return r0 +} + +// MockMutableMessage_WithOldVersion_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'WithOldVersion' +type MockMutableMessage_WithOldVersion_Call struct { + *mock.Call +} + +// WithOldVersion is a helper method to define mock.On call +func (_e *MockMutableMessage_Expecter) WithOldVersion() *MockMutableMessage_WithOldVersion_Call { + return &MockMutableMessage_WithOldVersion_Call{Call: _e.mock.On("WithOldVersion")} +} + +func (_c *MockMutableMessage_WithOldVersion_Call) Run(run func()) *MockMutableMessage_WithOldVersion_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockMutableMessage_WithOldVersion_Call) Return(_a0 message.MutableMessage) *MockMutableMessage_WithOldVersion_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockMutableMessage_WithOldVersion_Call) RunAndReturn(run func() message.MutableMessage) *MockMutableMessage_WithOldVersion_Call { + _c.Call.Return(run) + return _c +} + // WithTimeTick provides a mock function with given fields: tt func (_m *MockMutableMessage) WithTimeTick(tt uint64) message.MutableMessage { ret := _m.Called(tt) diff --git a/pkg/streaming/util/message/adaptor/message.go b/pkg/streaming/util/message/adaptor/message.go index 2e21d65ec5..123c4be31b 100644 --- a/pkg/streaming/util/message/adaptor/message.go +++ b/pkg/streaming/util/message/adaptor/message.go @@ -97,7 +97,7 @@ func parseTxnMsg(msg message.ImmutableMessage) ([]msgstream.TsMsg, error) { // parseSingleMsg converts message to ts message. func parseSingleMsg(msg message.ImmutableMessage) (msgstream.TsMsg, error) { switch msg.Version() { - case message.VersionV1: + case message.VersionV1, message.VersionOld: return fromMessageToTsMsgV1(msg) case message.VersionV2: return fromMessageToTsMsgV2(msg) diff --git a/pkg/streaming/util/message/adaptor/message_test.go b/pkg/streaming/util/message/adaptor/message_test.go index 0caef7c559..f617e75091 100644 --- a/pkg/streaming/util/message/adaptor/message_test.go +++ b/pkg/streaming/util/message/adaptor/message_test.go @@ -24,7 +24,7 @@ func TestNewMsgPackFromInsertMessage(t *testing.T) { immutableMessages := make([]message.ImmutableMessage, 0, len(fieldCount)) for segmentID, rowNum := range fieldCount { insertMsg := message.CreateTestInsertMessage(t, segmentID, rowNum, tt, id) - immutableMessage := insertMsg.IntoImmutableMessage(id) + immutableMessage := insertMsg.WithOldVersion().IntoImmutableMessage(id) immutableMessages = append(immutableMessages, immutableMessage) } diff --git a/pkg/streaming/util/message/message.go b/pkg/streaming/util/message/message.go index a2bf796d89..9f464ee858 100644 --- a/pkg/streaming/util/message/message.go +++ b/pkg/streaming/util/message/message.go @@ -79,6 +79,11 @@ type MutableMessage interface { // !!! preserved for streaming system internal usage, don't call it outside of streaming system. WithLastConfirmed(id MessageID) MutableMessage + // WithOldVersion sets the version of current message to be old version. + // !!! preserved for streaming system internal usage, don't call it outside of streaming system. + // TODO: used for old version message compatibility, will be removed in the future. + WithOldVersion() MutableMessage + // WithLastConfirmedUseMessageID sets the last confirmed message id of current message to be the same as message id. // !!! preserved for streaming system internal usage, don't call it outside of streaming system. WithLastConfirmedUseMessageID() MutableMessage diff --git a/pkg/streaming/util/message/message_builder_test.go b/pkg/streaming/util/message/message_builder_test.go index 1464c65a89..8e3f5773f9 100644 --- a/pkg/streaming/util/message/message_builder_test.go +++ b/pkg/streaming/util/message/message_builder_test.go @@ -45,6 +45,7 @@ func TestMessage(t *testing.T) { lcMsgID := walimplstest.NewTestMessageID(1) mutableMessage.WithLastConfirmed(lcMsgID) + mutableMessage.WithOldVersion() v, ok = mutableMessage.Properties().Get("_lc") assert.True(t, ok) assert.Equal(t, v, "1") @@ -53,6 +54,7 @@ func TestMessage(t *testing.T) { assert.True(t, ok) assert.Equal(t, "v1", v) assert.Equal(t, "v1", mutableMessage.VChannel()) + assert.Equal(t, message.VersionOld, mutableMessage.Version()) msgID := walimplstest.NewTestMessageID(1) immutableMessage := message.NewImmutableMesasge(msgID, diff --git a/pkg/streaming/util/message/message_impl.go b/pkg/streaming/util/message/message_impl.go index c3c4b2b346..30e0d6fa26 100644 --- a/pkg/streaming/util/message/message_impl.go +++ b/pkg/streaming/util/message/message_impl.go @@ -97,6 +97,12 @@ func (m *messageImpl) WithLastConfirmed(id MessageID) MutableMessage { return m } +// WithOldVersion sets the version of current message to be old version. +func (m *messageImpl) WithOldVersion() MutableMessage { + m.properties.Set(messageVersion, VersionOld.String()) + return m +} + // WithLastConfirmedUseMessageID sets the last confirmed message id of current message to be the same as message id. func (m *messageImpl) WithLastConfirmedUseMessageID() MutableMessage { m.properties.Delete(messageLastConfirmed)