diff --git a/pkg/mq/msgstream/mq_msgstream.go b/pkg/mq/msgstream/mq_msgstream.go index ddf906ab59..f1956ec294 100644 --- a/pkg/mq/msgstream/mq_msgstream.go +++ b/pkg/mq/msgstream/mq_msgstream.go @@ -39,6 +39,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/mq/common" "github.com/milvus-io/milvus/pkg/v2/mq/msgstream/mqwrapper" + "github.com/milvus-io/milvus/pkg/v2/streaming/util/message" "github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/retry" @@ -465,6 +466,11 @@ func (ms *mqMsgStream) receiveMsg(consumer mqwrapper.Consumer) { log.Ctx(ms.ctx).Warn("MqMsgStream get msg whose payload is nil") continue } + if message.CheckIfMessageFromStreaming(msg.Properties()) { + log.Ctx(ms.ctx).Warn("MqMsgStream can not consume the message from streaming service") + continue + } + // not need to check the preCreatedTopic is empty, related issue: https://github.com/milvus-io/milvus/issues/27295 // if the message not belong to the topic, will skip it tsMsg, err := ms.getTsMsgFromConsumerMsg(msg) @@ -841,6 +847,11 @@ func (ms *MqTtMsgStream) consumeToTtMsg(consumer mqwrapper.Consumer) { log.Warn("MqTtMsgStream get msg whose payload is nil") continue } + if message.CheckIfMessageFromStreaming(msg.Properties()) { + log.Warn("MqTtMsgStream can not consume the message from streaming service") + continue + } + // not need to check the preCreatedTopic is empty, related issue: https://github.com/milvus-io/milvus/issues/27295 // if the message not belong to the topic, will skip it tsMsg, err := ms.getTsMsgFromConsumerMsg(msg) diff --git a/pkg/streaming/util/message/message_test.go b/pkg/streaming/util/message/message_test.go index 5276d8f830..14706ac1ca 100644 --- a/pkg/streaming/util/message/message_test.go +++ b/pkg/streaming/util/message/message_test.go @@ -34,3 +34,12 @@ func TestVersion(t *testing.T) { assert.True(t, VersionV1.GT(VersionOld)) assert.True(t, VersionV2.GT(VersionV1)) } + +// TestCheckIfMessageFromStreaming tests CheckIfMessageFromStreaming function. +func TestCheckIfMessageFromStreaming(t *testing.T) { + assert.False(t, CheckIfMessageFromStreaming(nil)) + assert.False(t, CheckIfMessageFromStreaming(map[string]string{})) + assert.True(t, CheckIfMessageFromStreaming(map[string]string{ + messageVersion: "1", + })) +} diff --git a/pkg/streaming/util/message/properties.go b/pkg/streaming/util/message/properties.go index de2268bed6..aaf8d70144 100644 --- a/pkg/streaming/util/message/properties.go +++ b/pkg/streaming/util/message/properties.go @@ -73,3 +73,14 @@ func (prop propertiesImpl) EstimateSize() int { } return size } + +// CheckIfMessageFromStreaming checks if the message is from streaming. +func CheckIfMessageFromStreaming(props map[string]string) bool { + if props == nil { + return false + } + if props[messageVersion] != "" { + return true + } + return false +}