From 53438c751d013cdb93aae2e67e21d47dc4a46c71 Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Tue, 25 Mar 2025 15:22:23 +0800 Subject: [PATCH] fix: skip consuming from streaming service message (#40879) issue: #40532 pr: #40877 Signed-off-by: chyezh --- pkg/mq/msgstream/mq_msgstream.go | 11 +++++++++++ pkg/streaming/util/message/message_test.go | 9 +++++++++ pkg/streaming/util/message/properties.go | 11 +++++++++++ 3 files changed, 31 insertions(+) diff --git a/pkg/mq/msgstream/mq_msgstream.go b/pkg/mq/msgstream/mq_msgstream.go index ddf906ab59..f1956ec294 100644 --- a/pkg/mq/msgstream/mq_msgstream.go +++ b/pkg/mq/msgstream/mq_msgstream.go @@ -39,6 +39,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/mq/common" "github.com/milvus-io/milvus/pkg/v2/mq/msgstream/mqwrapper" + "github.com/milvus-io/milvus/pkg/v2/streaming/util/message" "github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/retry" @@ -465,6 +466,11 @@ func (ms *mqMsgStream) receiveMsg(consumer mqwrapper.Consumer) { log.Ctx(ms.ctx).Warn("MqMsgStream get msg whose payload is nil") continue } + if message.CheckIfMessageFromStreaming(msg.Properties()) { + log.Ctx(ms.ctx).Warn("MqMsgStream can not consume the message from streaming service") + continue + } + // not need to check the preCreatedTopic is empty, related issue: https://github.com/milvus-io/milvus/issues/27295 // if the message not belong to the topic, will skip it tsMsg, err := ms.getTsMsgFromConsumerMsg(msg) @@ -841,6 +847,11 @@ func (ms *MqTtMsgStream) consumeToTtMsg(consumer mqwrapper.Consumer) { log.Warn("MqTtMsgStream get msg whose payload is nil") continue } + if message.CheckIfMessageFromStreaming(msg.Properties()) { + log.Warn("MqTtMsgStream can not consume the message from streaming service") + continue + } + // not need to check the preCreatedTopic is empty, related issue: https://github.com/milvus-io/milvus/issues/27295 // if the message not belong to the topic, will skip it tsMsg, err := ms.getTsMsgFromConsumerMsg(msg) diff --git a/pkg/streaming/util/message/message_test.go b/pkg/streaming/util/message/message_test.go index 5276d8f830..14706ac1ca 100644 --- a/pkg/streaming/util/message/message_test.go +++ b/pkg/streaming/util/message/message_test.go @@ -34,3 +34,12 @@ func TestVersion(t *testing.T) { assert.True(t, VersionV1.GT(VersionOld)) assert.True(t, VersionV2.GT(VersionV1)) } + +// TestCheckIfMessageFromStreaming tests CheckIfMessageFromStreaming function. +func TestCheckIfMessageFromStreaming(t *testing.T) { + assert.False(t, CheckIfMessageFromStreaming(nil)) + assert.False(t, CheckIfMessageFromStreaming(map[string]string{})) + assert.True(t, CheckIfMessageFromStreaming(map[string]string{ + messageVersion: "1", + })) +} diff --git a/pkg/streaming/util/message/properties.go b/pkg/streaming/util/message/properties.go index de2268bed6..aaf8d70144 100644 --- a/pkg/streaming/util/message/properties.go +++ b/pkg/streaming/util/message/properties.go @@ -73,3 +73,14 @@ func (prop propertiesImpl) EstimateSize() int { } return size } + +// CheckIfMessageFromStreaming checks if the message is from streaming. +func CheckIfMessageFromStreaming(props map[string]string) bool { + if props == nil { + return false + } + if props[messageVersion] != "" { + return true + } + return false +}