enhance: remove support of embeded nats mq (#41565)

issue: #41564

Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
Zhen Ye 2025-04-28 23:06:49 +08:00 committed by GitHub
parent bb7df40fc1
commit 9cb5271027
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 35 additions and 2145 deletions

View File

@ -46,7 +46,6 @@ import (
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/metrics"
rocksmqimpl "github.com/milvus-io/milvus/pkg/v2/mq/mqimpl/rocksmq/server"
"github.com/milvus-io/milvus/pkg/v2/mq/msgstream/mqwrapper/nmq"
"github.com/milvus-io/milvus/pkg/v2/tracer"
"github.com/milvus-io/milvus/pkg/v2/util/etcd"
"github.com/milvus-io/milvus/pkg/v2/util/expr"
@ -334,10 +333,8 @@ func (mr *MilvusRoles) Run() {
params := paramtable.Get()
if paramtable.Get().RocksmqEnable() {
defer stopRocksmq()
} else if paramtable.Get().NatsmqEnable() {
defer nmq.CloseNatsMQ()
} else {
panic("only support Rocksmq and Natsmq in standalone mode")
panic("only support Rocksmq in standalone mode")
}
if params.EtcdCfg.UseEmbedEtcd.GetAsBool() {
// Start etcd server.

View File

@ -233,11 +233,11 @@ func WriteYaml(w io.Writer) {
{
name: "mq",
header: `
# Milvus supports four MQ: rocksmq(based on RockDB), natsmq(embedded nats-server), Pulsar and Kafka.
# Milvus supports four MQ: rocksmq(based on RockDB), Pulsar and Kafka.
# You can change your mq by setting mq.type field.
# If you don't set mq.type field as default, there is a note about enabling priority if we config multiple mq in this file.
# 1. standalone(local) mode: rocksmq(default) > natsmq > Pulsar > Kafka
# 2. cluster mode: Pulsar(default) > Kafka (rocksmq and natsmq is unsupported in cluster mode)`,
# 1. standalone(local) mode: rocksmq(default) > Pulsar > Kafka
# 2. cluster mode: Pulsar(default) > Kafka (rocksmq is unsupported in cluster mode)`,
},
{
name: "woodpecker",
@ -257,12 +257,6 @@ func WriteYaml(w io.Writer) {
{
name: "rocksmq",
},
{
name: "natsmq",
header: `
# natsmq configuration.
# more detail: https://docs.nats.io/running-a-nats-service/configuration`,
},
{
name: "mixCoord",
header: "\n# Related configuration of mixCoord",

View File

@ -158,14 +158,14 @@ minio:
# 0 means using oss client by default, decrease these configration if ListObjects timeout
listObjectsMaxKeys: 0
# Milvus supports four MQ: rocksmq(based on RockDB), natsmq(embedded nats-server), Pulsar and Kafka.
# Milvus supports four MQ: rocksmq(based on RockDB), Pulsar and Kafka.
# You can change your mq by setting mq.type field.
# If you don't set mq.type field as default, there is a note about enabling priority if we config multiple mq in this file.
# 1. standalone(local) mode: rocksmq(default) > natsmq > Pulsar > Kafka
# 2. cluster mode: Pulsar(default) > Kafka (rocksmq and natsmq is unsupported in cluster mode)
# 1. standalone(local) mode: rocksmq(default) > Pulsar > Kafka
# 2. cluster mode: Pulsar(default) > Kafka (rocksmq is unsupported in cluster mode)
mq:
# Default value: "default"
# Valid values: [default, pulsar, kafka, rocksmq, natsmq, woodpecker]
# Valid values: [default, pulsar, kafka, rocksmq, woodpecker]
type: default
enablePursuitMode: true # Default value: "true"
pursuitLag: 10 # time tick lag threshold to enter pursuit mode, in seconds
@ -256,27 +256,6 @@ rocksmq:
compactionInterval: 86400 # Time interval to trigger rocksdb compaction to remove deleted data. Unit: Second
compressionTypes: 0,0,7,7,7 # compaction compression type, only support use 0,7. 0 means not compress, 7 will use zstd. Length of types means num of rocksdb level.
# natsmq configuration.
# more detail: https://docs.nats.io/running-a-nats-service/configuration
natsmq:
server:
port: 4222 # Listening port of the NATS server.
storeDir: /var/lib/milvus/nats # Directory to use for JetStream storage of nats
maxFileStore: 17179869184 # Maximum size of the 'file' storage
maxPayload: 8388608 # Maximum number of bytes in a message payload
maxPending: 67108864 # Maximum number of bytes buffered for a connection Applies to client connections
initializeTimeout: 4000 # waiting for initialization of natsmq finished
monitor:
trace: false # If true enable protocol trace log messages
debug: false # If true enable debug log messages
logTime: true # If set to false, log without timestamps.
logFile: /tmp/milvus/logs/nats.log # Log file path relative to .. of milvus binary if use relative path
logSizeLimit: 536870912 # Size in bytes after the log file rolls over to a new one
retention:
maxAge: 4320 # Maximum age of any message in the P-channel
maxBytes: # How many bytes the single P-channel may contain. Removing oldest messages if the P-channel exceeds this size
maxMsgs: # How many message the single P-channel may contain. Removing oldest messages if the P-channel exceeds this limit
# Related configuration of mixCoord
mixCoord:
enableActiveStandby: false

6
go.mod
View File

@ -189,17 +189,11 @@ require (
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/minio/md5-simd v1.1.2 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mtibben/percent v0.2.1 // indirect
github.com/nats-io/jwt/v2 v2.5.5 // indirect
github.com/nats-io/nats-server/v2 v2.10.12 // indirect
github.com/nats-io/nats.go v1.34.1 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/opencontainers/runtime-spec v1.0.2 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/panjf2000/ants/v2 v2.7.2 // indirect

16
go.sum
View File

@ -738,20 +738,14 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20250318084424-114f4050c3a6 h1:YHMFI6L
github.com/milvus-io/cgosymbolizer v0.0.0-20250318084424-114f4050c3a6/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250325034212-6e98baa34971 h1:CKKrOtri+dbTUkMJehDuSM489OIqJab1t0pUq+PV73E=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250325034212-6e98baa34971/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250407030015-dcf7688ad54a h1:W+9nVXKcI9FdiyrFbrs9BIFUqRW0pLY+Fn0fsmmuLyw=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250407030015-dcf7688ad54a/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.8 h1:/oUdiYtwVlqiEMSzME7vDvir49Lt23nMpaZC9u22bIo=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.8/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE=
github.com/milvus-io/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY=
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI=
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34=
github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM=
github.com/minio/minio-go/v7 v7.0.73 h1:qr2vi96Qm7kZ4v7LLebjte+MQh621fFWnv93p12htEo=
@ -781,17 +775,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/nacos-group/nacos-sdk-go v1.0.8/go.mod h1:hlAPn3UdzlxIlSILAyOXKxjFSvDJ9oLzTJ9hLAK1KzA=
github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg=
github.com/nats-io/jwt/v2 v2.5.5 h1:ROfXb50elFq5c9+1ztaUbdlrArNFl2+fQWP6B8HGEq4=
github.com/nats-io/jwt/v2 v2.5.5/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
github.com/nats-io/nats-server/v2 v2.10.12 h1:G6u+RDrHkw4bkwn7I911O5jqys7jJVRY6MwgndyUsnE=
github.com/nats-io/nats-server/v2 v2.10.12/go.mod h1:H1n6zXtYLFCgXcf/SF8QNTSIFuS8tyZQMN9NguUHdEs=
github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w=
github.com/nats-io/nats.go v1.34.1 h1:syWey5xaNHZgicYBemv0nohUPPmaLteiBEUT6Q5+F/4=
github.com/nats-io/nats.go v1.34.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/nrwiersma/avro-benchmarks v0.0.0-20210913175520-21aec48c8f76/go.mod h1:iKyFMidsk/sVYONJRE372sJuX/QTRPacU7imPqqsu7g=
@ -1338,7 +1323,6 @@ golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=

View File

@ -17,7 +17,6 @@ import (
const (
mqTypeDefault = "default"
mqTypeNatsmq = "natsmq"
mqTypeRocksmq = "rocksmq"
mqTypeKafka = "kafka"
mqTypePulsar = "pulsar"
@ -26,7 +25,6 @@ const (
type mqEnable struct {
Rocksmq bool
Natsmq bool
Pulsar bool
Kafka bool
Woodpecker bool
@ -67,8 +65,8 @@ func NewFactory(standAlone bool) *DefaultFactory {
// Init create a msg factory(TODO only support one mq at the same time.)
// In order to guarantee backward compatibility of config file, we still support multiple mq configs.
// The initialization of MQ follows the following rules, if the mq.type is default.
// 1. standalone(local) mode: rocksmq(default) > natsmq > Pulsar > Kafka
// 2. cluster mode: Pulsar(default) > Kafka (rocksmq and natsmq is unsupported in cluster mode)
// 1. standalone(local) mode: rocksmq(default) > Pulsar > Kafka
// 2. cluster mode: Pulsar(default) > Kafka (rocksmq is unsupported in cluster mode)
func (f *DefaultFactory) Init(params *paramtable.ComponentParam) {
// skip if using default factory
if f.msgStreamFactory != nil {
@ -84,13 +82,11 @@ func (f *DefaultFactory) Init(params *paramtable.ComponentParam) {
}
func (f *DefaultFactory) initMQ(standalone bool, params *paramtable.ComponentParam) error {
mqType := mustSelectMQType(standalone, params.MQCfg.Type.GetValue(), mqEnable{params.RocksmqEnable(), params.NatsmqEnable(), params.PulsarEnable(), params.KafkaEnable(), params.WoodpeckerEnable()})
mqType := mustSelectMQType(standalone, params.MQCfg.Type.GetValue(), mqEnable{params.RocksmqEnable(), params.PulsarEnable(), params.KafkaEnable(), params.WoodpeckerEnable()})
metrics.RegisterMQType(mqType)
log.Info("try to init mq", zap.Bool("standalone", standalone), zap.String("mqType", mqType))
switch mqType {
case mqTypeNatsmq:
f.msgStreamFactory = msgstream.NewNatsmqFactory()
case mqTypeRocksmq:
f.msgStreamFactory = msgstream.NewRocksmqFactory(params.RocksmqCfg.Path.GetValue(), &params.ServiceParam)
case mqTypePulsar:
@ -135,10 +131,10 @@ func mustSelectMQType(standalone bool, mqType string, enable mqEnable) string {
// Validate mq type.
func validateMQType(standalone bool, mqType string) error {
if mqType != mqTypeNatsmq && mqType != mqTypeRocksmq && mqType != mqTypeKafka && mqType != mqTypePulsar && mqType != mqTypeWoodpecker {
if mqType != mqTypeRocksmq && mqType != mqTypeKafka && mqType != mqTypePulsar && mqType != mqTypeWoodpecker {
return errors.Newf("mq type %s is invalid", mqType)
}
if !standalone && (mqType == mqTypeRocksmq || mqType == mqTypeNatsmq) {
if !standalone && mqType == mqTypeRocksmq {
return errors.Newf("mq %s is only valid in standalone mode")
}
return nil
@ -169,9 +165,6 @@ type Factory interface {
func HealthCheck(mqType string) *common.MQClusterStatus {
clusterStatus := &common.MQClusterStatus{MqType: mqType}
switch mqType {
case mqTypeNatsmq:
// TODO: implement health check for nats mq
clusterStatus.Health = true
case mqTypeRocksmq:
// TODO: implement health checker for rocks mq
clusterStatus.Health = true

View File

@ -11,35 +11,32 @@ import (
func TestValidateMQType(t *testing.T) {
assert.Error(t, validateMQType(true, mqTypeDefault))
assert.Error(t, validateMQType(false, mqTypeDefault))
assert.Error(t, validateMQType(false, mqTypeNatsmq))
assert.Error(t, validateMQType(false, mqTypeRocksmq))
assert.NoError(t, validateMQType(true, mqTypeWoodpecker))
assert.NoError(t, validateMQType(false, mqTypeWoodpecker))
}
func TestSelectMQType(t *testing.T) {
assert.Equal(t, mustSelectMQType(true, mqTypeDefault, mqEnable{true, true, true, true, true}), mqTypeRocksmq)
assert.Equal(t, mustSelectMQType(true, mqTypeDefault, mqEnable{false, true, true, true, true}), mqTypePulsar)
assert.Equal(t, mustSelectMQType(true, mqTypeDefault, mqEnable{false, false, true, true, true}), mqTypePulsar)
assert.Equal(t, mustSelectMQType(true, mqTypeDefault, mqEnable{false, false, false, true, true}), mqTypeKafka)
assert.Equal(t, mustSelectMQType(true, mqTypeDefault, mqEnable{false, false, false, false, true}), mqTypeWoodpecker)
assert.Panics(t, func() { mustSelectMQType(true, mqTypeDefault, mqEnable{false, false, false, false, false}) })
assert.Equal(t, mustSelectMQType(false, mqTypeDefault, mqEnable{true, true, true, true, true}), mqTypePulsar)
assert.Equal(t, mustSelectMQType(false, mqTypeDefault, mqEnable{false, true, true, true, true}), mqTypePulsar)
assert.Equal(t, mustSelectMQType(false, mqTypeDefault, mqEnable{false, false, true, true, true}), mqTypePulsar)
assert.Equal(t, mustSelectMQType(false, mqTypeDefault, mqEnable{false, false, false, true, true}), mqTypeKafka)
assert.Equal(t, mustSelectMQType(false, mqTypeDefault, mqEnable{false, false, false, false, true}), mqTypeWoodpecker)
assert.Panics(t, func() { mustSelectMQType(false, mqTypeDefault, mqEnable{false, false, false, false, false}) })
assert.Equal(t, mustSelectMQType(true, mqTypeRocksmq, mqEnable{true, true, true, true, true}), mqTypeRocksmq)
assert.Equal(t, mustSelectMQType(true, mqTypeNatsmq, mqEnable{true, true, true, true, true}), mqTypeNatsmq)
assert.Equal(t, mustSelectMQType(true, mqTypePulsar, mqEnable{true, true, true, true, true}), mqTypePulsar)
assert.Equal(t, mustSelectMQType(true, mqTypeKafka, mqEnable{true, true, true, true, true}), mqTypeKafka)
assert.Equal(t, mustSelectMQType(true, mqTypeWoodpecker, mqEnable{true, true, true, true, true}), mqTypeWoodpecker)
assert.Panics(t, func() { mustSelectMQType(false, mqTypeRocksmq, mqEnable{true, true, true, true, true}) })
assert.Panics(t, func() { mustSelectMQType(false, mqTypeNatsmq, mqEnable{true, true, true, true, true}) })
assert.Equal(t, mustSelectMQType(false, mqTypePulsar, mqEnable{true, true, true, true, true}), mqTypePulsar)
assert.Equal(t, mustSelectMQType(false, mqTypeKafka, mqEnable{true, true, true, true, true}), mqTypeKafka)
assert.Equal(t, mustSelectMQType(false, mqTypeWoodpecker, mqEnable{true, true, true, true, true}), mqTypeWoodpecker)
assert.Equal(t, mustSelectMQType(true, mqTypeDefault, mqEnable{true, true, true, true}), mqTypeRocksmq)
assert.Equal(t, mustSelectMQType(true, mqTypeDefault, mqEnable{false, true, true, true}), mqTypePulsar)
assert.Equal(t, mustSelectMQType(true, mqTypeDefault, mqEnable{false, true, true, true}), mqTypePulsar)
assert.Equal(t, mustSelectMQType(true, mqTypeDefault, mqEnable{false, false, true, true}), mqTypeKafka)
assert.Equal(t, mustSelectMQType(true, mqTypeDefault, mqEnable{false, false, false, true}), mqTypeWoodpecker)
assert.Panics(t, func() { mustSelectMQType(true, mqTypeDefault, mqEnable{false, false, false, false}) })
assert.Equal(t, mustSelectMQType(false, mqTypeDefault, mqEnable{true, true, true, true}), mqTypePulsar)
assert.Equal(t, mustSelectMQType(false, mqTypeDefault, mqEnable{false, true, true, true}), mqTypePulsar)
assert.Equal(t, mustSelectMQType(false, mqTypeDefault, mqEnable{false, true, true, true}), mqTypePulsar)
assert.Equal(t, mustSelectMQType(false, mqTypeDefault, mqEnable{false, false, true, true}), mqTypeKafka)
assert.Equal(t, mustSelectMQType(false, mqTypeDefault, mqEnable{false, false, false, true}), mqTypeWoodpecker)
assert.Panics(t, func() { mustSelectMQType(false, mqTypeDefault, mqEnable{false, false, false, false}) })
assert.Equal(t, mustSelectMQType(true, mqTypeRocksmq, mqEnable{true, true, true, true}), mqTypeRocksmq)
assert.Equal(t, mustSelectMQType(true, mqTypePulsar, mqEnable{true, true, true, true}), mqTypePulsar)
assert.Equal(t, mustSelectMQType(true, mqTypeKafka, mqEnable{true, true, true, true}), mqTypeKafka)
assert.Equal(t, mustSelectMQType(true, mqTypeWoodpecker, mqEnable{true, true, true, true}), mqTypeWoodpecker)
assert.Panics(t, func() { mustSelectMQType(false, mqTypeRocksmq, mqEnable{true, true, true, true}) })
assert.Equal(t, mustSelectMQType(false, mqTypePulsar, mqEnable{true, true, true, true}), mqTypePulsar)
assert.Equal(t, mustSelectMQType(false, mqTypeKafka, mqEnable{true, true, true, true}), mqTypeKafka)
assert.Equal(t, mustSelectMQType(false, mqTypeWoodpecker, mqEnable{true, true, true, true}), mqTypeWoodpecker)
}
func TestHealthCheck(t *testing.T) {
@ -53,7 +50,6 @@ func TestHealthCheck(t *testing.T) {
mqType string
health bool
}{
{mqTypeNatsmq, true},
{mqTypeRocksmq, true},
{mqTypePulsar, false},
{mqTypeKafka, false},

View File

@ -22,8 +22,6 @@ require (
github.com/klauspost/compress v1.17.9
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250325034212-6e98baa34971
github.com/minio/minio-go/v7 v7.0.73
github.com/nats-io/nats-server/v2 v2.10.12
github.com/nats-io/nats.go v1.34.1
github.com/panjf2000/ants/v2 v2.7.2
github.com/prometheus/client_golang v1.14.0
github.com/quasilyte/go-ruleguard/dsl v0.3.22
@ -146,14 +144,10 @@ require (
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/mattn/go-runewidth v0.0.8 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/minio/md5-simd v1.1.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mtibben/percent v0.2.1 // indirect
github.com/nats-io/jwt/v2 v2.5.5 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/opencontainers/runtime-spec v1.0.2 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pierrec/lz4 v2.5.2+incompatible // indirect

View File

@ -562,8 +562,6 @@ github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250325034212-6e98baa
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250325034212-6e98baa34971/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE=
github.com/milvus-io/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34=
github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM=
github.com/minio/minio-go/v7 v7.0.73 h1:qr2vi96Qm7kZ4v7LLebjte+MQh621fFWnv93p12htEo=
@ -589,17 +587,8 @@ github.com/mtibben/percent v0.2.1/go.mod h1:KG9uO+SZkUp+VkRHsCdYQV3XSZrrSpR3O9ib
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg=
github.com/nats-io/jwt/v2 v2.5.5 h1:ROfXb50elFq5c9+1ztaUbdlrArNFl2+fQWP6B8HGEq4=
github.com/nats-io/jwt/v2 v2.5.5/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
github.com/nats-io/nats-server/v2 v2.10.12 h1:G6u+RDrHkw4bkwn7I911O5jqys7jJVRY6MwgndyUsnE=
github.com/nats-io/nats-server/v2 v2.10.12/go.mod h1:H1n6zXtYLFCgXcf/SF8QNTSIFuS8tyZQMN9NguUHdEs=
github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w=
github.com/nats-io/nats.go v1.34.1 h1:syWey5xaNHZgicYBemv0nohUPPmaLteiBEUT6Q5+F/4=
github.com/nats-io/nats.go v1.34.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/nrwiersma/avro-benchmarks v0.0.0-20210913175520-21aec48c8f76/go.mod h1:iKyFMidsk/sVYONJRE372sJuX/QTRPacU7imPqqsu7g=
@ -1060,7 +1049,6 @@ golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=

View File

@ -18,54 +18,16 @@ package msgstream
import (
"context"
"os"
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/pkg/v2/mq/msgstream/mqwrapper"
"github.com/milvus-io/milvus/pkg/v2/mq/msgstream/mqwrapper/nmq"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)
type positionGenerator func(channelName string, timestamp uint64, msgGroup string, targetMsgIDs []uint64) []*msgpb.MsgPosition
func TestNmq(t *testing.T) {
storeDir, err := os.MkdirTemp("", "milvus_mq_nmq")
assert.NoError(t, err)
defer os.RemoveAll(storeDir)
paramtable.Init()
cfg := nmq.ParseServerOption(paramtable.Get())
cfg.Opts.StoreDir = storeDir
nmq.MustInitNatsMQ(cfg)
defer nmq.CloseNatsMQ()
f1 := NewNatsmqFactory()
f2 := NewNatsmqFactory()
client, err := nmq.NewClientWithDefaultOptions(context.Background())
if err != nil {
panic(err)
}
testMQ(t, client, []Factory{f1, f2}, func(channelName string, timestamp uint64, msgGroup string, targetMsgIDs []uint64) []*msgpb.MsgPosition {
result := make([]*msgpb.MsgPosition, 0, len(targetMsgIDs))
for _, targetMsgID := range targetMsgIDs {
msgID := nmq.NewNmqID(targetMsgID).Serialize()
result = append(result, &msgpb.MsgPosition{
ChannelName: channelName,
Timestamp: timestamp,
MsgGroup: msgGroup,
MsgID: msgID,
})
}
return result
})
}
func testMQ(t *testing.T, client mqwrapper.Client, factories []Factory, pg positionGenerator) {
testStreamOperation(t, client)
testFactoryCommonOperation(t, factories[0])

View File

@ -33,7 +33,6 @@ import (
"github.com/milvus-io/milvus/pkg/v2/mq/common"
"github.com/milvus-io/milvus/pkg/v2/mq/mqimpl/rocksmq/server"
kafkawrapper "github.com/milvus-io/milvus/pkg/v2/mq/msgstream/mqwrapper/kafka"
"github.com/milvus-io/milvus/pkg/v2/mq/msgstream/mqwrapper/nmq"
pulsarmqwrapper "github.com/milvus-io/milvus/pkg/v2/mq/msgstream/mqwrapper/pulsar"
"github.com/milvus-io/milvus/pkg/v2/mq/msgstream/mqwrapper/rmq"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
@ -223,19 +222,6 @@ func NewKmsFactory(config *paramtable.ServiceParam) Factory {
return f
}
// NewNatsmqFactory create a new nats-mq factory.
func NewNatsmqFactory() Factory {
paramtable.Init()
paramtable := paramtable.Get()
nmq.MustInitNatsMQ(nmq.ParseServerOption(paramtable))
return &CommonFactory{
Newer: nmq.NewClientWithDefaultOptions,
DispatcherFactory: ProtoUDFactory{},
ReceiveBufSize: paramtable.MQCfg.ReceiveBufSize.GetAsInt64(),
MQBufSize: paramtable.MQCfg.MQBufSize.GetAsInt64(),
}
}
// NewRocksmqFactory creates a new message stream factory based on rocksmq.
func NewRocksmqFactory(path string, cfg *paramtable.ServiceParam) Factory {
if err := server.InitRocksMQ(path); err != nil {

View File

@ -1,201 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package nmq
import (
"context"
"fmt"
"net"
"strconv"
"time"
"github.com/cockroachdb/errors"
"github.com/nats-io/nats.go"
"github.com/milvus-io/milvus/pkg/v2/metrics"
"github.com/milvus-io/milvus/pkg/v2/mq/common"
"github.com/milvus-io/milvus/pkg/v2/mq/msgstream/mqwrapper"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/timerecord"
)
// nmqClient implements mqwrapper.Client.
var _ mqwrapper.Client = &nmqClient{}
// nmqClient contains a natsmq client
type nmqClient struct {
conn *nats.Conn
}
type nmqDialer struct {
ctx func() context.Context
}
func (d *nmqDialer) Dial(network, address string) (net.Conn, error) {
ctx := d.ctx()
dial := &net.Dialer{}
// keep default 2s timeout
if _, ok := ctx.Deadline(); !ok {
dial.Timeout = 2 * time.Second
}
return dial.DialContext(ctx, network, address)
}
// NewClientWithDefaultOptions returns a new NMQ client with default options.
// It retrieves the NMQ client URL from the server configuration.
func NewClientWithDefaultOptions(ctx context.Context) (mqwrapper.Client, error) {
url := Nmq.ClientURL()
opt := nats.SetCustomDialer(&nmqDialer{
ctx: func() context.Context { return ctx },
})
return NewClient(url, opt)
}
// NewClient returns a new nmqClient object
func NewClient(url string, options ...nats.Option) (*nmqClient, error) {
c, err := nats.Connect(url, options...)
if err != nil {
return nil, errors.Wrap(err, "failed to set nmq client")
}
return &nmqClient{conn: c}, nil
}
// CreateProducer creates a producer for natsmq client
func (nc *nmqClient) CreateProducer(ctx context.Context, options common.ProducerOptions) (mqwrapper.Producer, error) {
start := timerecord.NewTimeRecorder("create producer")
metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateProducerLabel, metrics.TotalLabel).Inc()
// TODO: inject jetstream options.
js, err := nc.conn.JetStream()
if err != nil {
metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateProducerLabel, metrics.FailLabel).Inc()
return nil, errors.Wrap(err, "failed to create jetstream context")
}
// TODO: (1) investigate on performance of multiple streams vs multiple topics.
// (2) investigate if we should have topics under the same stream.
_, err = js.AddStream(&nats.StreamConfig{
Name: options.Topic,
Subjects: []string{options.Topic},
MaxAge: paramtable.Get().NatsmqCfg.ServerRetentionMaxAge.GetAsDuration(time.Minute),
MaxBytes: paramtable.Get().NatsmqCfg.ServerRetentionMaxBytes.GetAsInt64(),
MaxMsgs: paramtable.Get().NatsmqCfg.ServerRetentionMaxMsgs.GetAsInt64(),
})
if err != nil {
metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateProducerLabel, metrics.FailLabel).Inc()
return nil, errors.Wrap(err, "failed to add/connect to jetstream for producer")
}
rp := nmqProducer{js: js, topic: options.Topic}
elapsed := start.ElapseSpan()
metrics.MsgStreamRequestLatency.WithLabelValues(metrics.CreateProducerLabel).Observe(float64(elapsed.Milliseconds()))
metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateProducerLabel, metrics.SuccessLabel).Inc()
return &rp, nil
}
func (nc *nmqClient) Subscribe(ctx context.Context, options mqwrapper.ConsumerOptions) (mqwrapper.Consumer, error) {
start := timerecord.NewTimeRecorder("create consumer")
metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateConsumerLabel, metrics.TotalLabel).Inc()
if options.Topic == "" {
metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateConsumerLabel, metrics.FailLabel).Inc()
return nil, errors.New("invalid consumer config: empty topic")
}
if options.SubscriptionName == "" {
metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateConsumerLabel, metrics.FailLabel).Inc()
return nil, errors.New("invalid consumer config: empty subscription name")
}
// TODO: inject jetstream options.
js, err := nc.conn.JetStream()
if err != nil {
metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateConsumerLabel, metrics.FailLabel).Inc()
return nil, errors.Wrap(err, "failed to create jetstream context")
}
// TODO: do we allow passing in an existing natsChan from options?
// also, revisit the size or make it a user param
natsChan := make(chan *nats.Msg, options.BufSize)
// TODO: should we allow subscribe to a topic that doesn't exist yet? Current logic allows it.
_, err = js.AddStream(&nats.StreamConfig{
Name: options.Topic,
Subjects: []string{options.Topic},
MaxAge: paramtable.Get().NatsmqCfg.ServerRetentionMaxAge.GetAsDuration(time.Minute),
MaxBytes: paramtable.Get().NatsmqCfg.ServerRetentionMaxBytes.GetAsInt64(),
MaxMsgs: paramtable.Get().NatsmqCfg.ServerRetentionMaxMsgs.GetAsInt64(),
})
if err != nil {
metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateConsumerLabel, metrics.FailLabel).Inc()
return nil, errors.Wrap(err, "failed to add/connect to jetstream for consumer")
}
closeChan := make(chan struct{})
var sub *nats.Subscription
position := options.SubscriptionInitialPosition
// TODO: should we only allow exclusive subscribe? Current logic allows double subscribe.
switch position {
case common.SubscriptionPositionLatest:
sub, err = js.ChanSubscribe(options.Topic, natsChan, nats.DeliverNew())
case common.SubscriptionPositionEarliest:
sub, err = js.ChanSubscribe(options.Topic, natsChan, nats.DeliverAll())
}
if err != nil {
metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateConsumerLabel, metrics.FailLabel).Inc()
return nil, errors.Wrap(err, fmt.Sprintf("failed to get consumer info, subscribe position: %d", position))
}
elapsed := start.ElapseSpan()
metrics.MsgStreamRequestLatency.WithLabelValues(metrics.CreateConsumerLabel).Observe(float64(elapsed.Milliseconds()))
metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateConsumerLabel, metrics.SuccessLabel).Inc()
return &Consumer{
js: js,
sub: sub,
topic: options.Topic,
groupName: options.SubscriptionName,
options: options,
natsChan: natsChan,
closeChan: closeChan,
}, nil
}
// EarliestMessageID returns the earliest message ID for nmq client
func (nc *nmqClient) EarliestMessageID() common.MessageID {
return &nmqID{messageID: 1}
}
// StringToMsgID converts string id to MessageID
func (nc *nmqClient) StringToMsgID(id string) (common.MessageID, error) {
rID, err := strconv.ParseUint(id, 10, 64)
if err != nil {
return nil, errors.Wrap(err, "failed to parse string to MessageID")
}
return &nmqID{messageID: rID}, nil
}
// BytesToMsgID converts a byte array to messageID
func (nc *nmqClient) BytesToMsgID(id []byte) (common.MessageID, error) {
rID := DeserializeNmqID(id)
return &nmqID{messageID: rID}, nil
}
func (nc *nmqClient) Close() {
nc.conn.Close()
}

View File

@ -1,275 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package nmq
import (
"context"
"fmt"
"math/rand"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/milvus-io/milvus/pkg/v2/mq/common"
"github.com/milvus-io/milvus/pkg/v2/mq/msgstream/mqwrapper"
)
func createNmqClient() (*nmqClient, error) {
return NewClient(natsServerAddress)
}
func Test_NewNmqClient(t *testing.T) {
client, err := createNmqClient()
assert.NoError(t, err)
assert.NotNil(t, client)
client.Close()
tests := []struct {
description string
withTimeout bool
ctxTimeouted bool
expectErr bool
}{
{"without context", false, false, false},
{"without timeout context, no timeout", true, false, false},
{"without timeout context, timeout", true, true, true},
}
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
ctx := context.Background()
var cancel context.CancelFunc
if test.withTimeout {
ctx, cancel = context.WithTimeout(ctx, time.Second)
if test.ctxTimeouted {
cancel()
} else {
defer cancel()
}
}
client, err := NewClientWithDefaultOptions(ctx)
if test.expectErr {
assert.Error(t, err)
assert.Nil(t, client)
} else {
assert.NoError(t, err)
assert.NotNil(t, client)
client.Close()
}
})
}
}
func TestNmqClient_CreateProducer(t *testing.T) {
client, err := createNmqClient()
assert.NoError(t, err)
assert.NotNil(t, client)
defer client.Close()
topic := "TestNmqClient_CreateProducer"
proOpts := common.ProducerOptions{Topic: topic}
producer, err := client.CreateProducer(context.TODO(), proOpts)
assert.NoError(t, err)
assert.NotNil(t, producer)
defer producer.Close()
nmqProducer := producer.(*nmqProducer)
assert.Equal(t, nmqProducer.Topic(), topic)
msg := &common.ProducerMessage{
Payload: []byte{},
Properties: nil,
}
_, err = nmqProducer.Send(context.TODO(), msg)
assert.NoError(t, err)
invalidOpts := common.ProducerOptions{Topic: ""}
producer, e := client.CreateProducer(context.TODO(), invalidOpts)
assert.Nil(t, producer)
assert.Error(t, e)
}
func TestNmqClient_GetLatestMsg(t *testing.T) {
client, err := createNmqClient()
assert.NoError(t, err)
defer client.Close()
topic := fmt.Sprintf("t2GetLatestMsg-%d", rand.Int())
proOpts := common.ProducerOptions{Topic: topic}
producer, err := client.CreateProducer(context.TODO(), proOpts)
assert.NoError(t, err)
defer producer.Close()
for i := 0; i < 10; i++ {
msg := &common.ProducerMessage{
Payload: []byte{byte(i)},
Properties: nil,
}
_, err = producer.Send(context.TODO(), msg)
assert.NoError(t, err)
}
subName := "subName"
consumerOpts := mqwrapper.ConsumerOptions{
Topic: topic,
SubscriptionName: subName,
SubscriptionInitialPosition: common.SubscriptionPositionEarliest,
BufSize: 1024,
}
consumer, err := client.Subscribe(context.TODO(), consumerOpts)
assert.NoError(t, err)
expectLastMsg, err := consumer.GetLatestMsgID()
assert.NoError(t, err)
var actualLastMsg common.Message
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
for i := 0; i < 10; i++ {
select {
case <-ctx.Done():
fmt.Println(i)
assert.FailNow(t, "consumer failed to yield message in 100 milliseconds")
case msg := <-consumer.Chan():
consumer.Ack(msg)
actualLastMsg = msg
}
}
require.NotNil(t, actualLastMsg)
ret, err := expectLastMsg.LessOrEqualThan(actualLastMsg.ID().Serialize())
assert.NoError(t, err)
assert.True(t, ret)
}
func TestNmqClient_IllegalSubscribe(t *testing.T) {
client, err := createNmqClient()
assert.NoError(t, err)
assert.NotNil(t, client)
defer client.Close()
sub, err := client.Subscribe(context.TODO(), mqwrapper.ConsumerOptions{
Topic: "",
})
assert.Nil(t, sub)
assert.Error(t, err)
sub, err = client.Subscribe(context.TODO(), mqwrapper.ConsumerOptions{
Topic: "123",
SubscriptionName: "",
})
assert.Nil(t, sub)
assert.Error(t, err)
}
func TestNmqClient_Subscribe(t *testing.T) {
client, err := createNmqClient()
assert.NoError(t, err)
assert.NotNil(t, client)
defer client.Close()
topic := "TestNmqClient_Subscribe"
proOpts := common.ProducerOptions{Topic: topic}
producer, err := client.CreateProducer(context.TODO(), proOpts)
assert.NoError(t, err)
assert.NotNil(t, producer)
defer producer.Close()
subName := "subName"
consumerOpts := mqwrapper.ConsumerOptions{
Topic: "",
SubscriptionName: subName,
SubscriptionInitialPosition: common.SubscriptionPositionEarliest,
BufSize: 1024,
}
consumer, err := client.Subscribe(context.TODO(), consumerOpts)
assert.Error(t, err)
assert.Nil(t, consumer)
consumerOpts.Topic = topic
consumer, err = client.Subscribe(context.TODO(), consumerOpts)
assert.NoError(t, err)
assert.NotNil(t, consumer)
defer consumer.Close()
assert.Equal(t, consumer.Subscription(), subName)
msg := &common.ProducerMessage{
Payload: []byte{1},
Properties: nil,
}
_, err = producer.Send(context.TODO(), msg)
assert.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
select {
case <-ctx.Done():
assert.FailNow(t, "consumer failed to yield message in 100 milliseconds")
case msg := <-consumer.Chan():
consumer.Ack(msg)
nmqmsg := msg.(*nmqMessage)
msgPayload := nmqmsg.Payload()
assert.NotEmpty(t, msgPayload)
msgTopic := nmqmsg.Topic()
assert.Equal(t, msgTopic, topic)
msgProp := nmqmsg.Properties()
assert.Empty(t, msgProp)
msgID := nmqmsg.ID()
rID := msgID.(*nmqID)
assert.Equal(t, rID.messageID, MessageIDType(1))
}
}
func TestNmqClient_EarliestMessageID(t *testing.T) {
client, _ := createNmqClient()
defer client.Close()
mid := client.EarliestMessageID()
assert.NotNil(t, mid)
nmqmsg := mid.(*nmqID)
assert.Equal(t, nmqmsg.messageID, MessageIDType(1))
}
func TestNmqClient_StringToMsgID(t *testing.T) {
client, _ := createNmqClient()
defer client.Close()
str := "5"
res, err := client.StringToMsgID(str)
assert.NoError(t, err)
assert.NotNil(t, res)
str = "X"
res, err = client.StringToMsgID(str)
assert.Nil(t, res)
assert.Error(t, err)
}
func TestNmqClient_BytesToMsgID(t *testing.T) {
client, _ := createNmqClient()
defer client.Close()
mid := client.EarliestMessageID()
res, err := client.BytesToMsgID(mid.Serialize())
assert.NoError(t, err)
assert.NotNil(t, res)
}

View File

@ -1,183 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package nmq
import (
"fmt"
"sync"
"github.com/cockroachdb/errors"
"github.com/nats-io/nats.go"
"go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/mq/common"
"github.com/milvus-io/milvus/pkg/v2/mq/msgstream/mqwrapper"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
)
// Consumer is a client that used to consume messages from natsmq
type Consumer struct {
options mqwrapper.ConsumerOptions
js nats.JetStreamContext
sub *nats.Subscription
topic string
groupName string
natsChan chan *nats.Msg
msgChan chan common.Message
closeChan chan struct{}
once sync.Once
closeOnce sync.Once
skip bool
wg sync.WaitGroup
}
// Subscription returns the subscription name of this consumer
func (nc *Consumer) Subscription() string {
return nc.groupName
}
// Chan returns a channel to read messages from natsmq
func (nc *Consumer) Chan() <-chan common.Message {
if err := nc.closed(); err != nil {
panic(err)
}
if nc.sub == nil {
log.Error("accessing Chan of an uninitialized subscription.", zap.String("topic", nc.topic), zap.String("groupName", nc.groupName))
panic("failed to chan a consumer without assign")
}
if nc.msgChan == nil {
nc.once.Do(func() {
nc.msgChan = make(chan common.Message, 256)
nc.wg.Add(1)
go func() {
defer nc.wg.Done()
for {
select {
case msg := <-nc.natsChan:
if nc.skip {
nc.skip = false
continue
}
nc.msgChan <- &nmqMessage{
raw: msg,
}
case <-nc.closeChan:
log.Info("close nmq consumer ", zap.String("topic", nc.topic), zap.String("groupName", nc.groupName))
close(nc.msgChan)
return
}
}
}()
})
}
return nc.msgChan
}
// Seek is used to seek the position in natsmq topic
func (nc *Consumer) Seek(id common.MessageID, inclusive bool) error {
if err := nc.closed(); err != nil {
return err
}
if nc.sub != nil {
return errors.New("can not seek() on an initilized consumer")
}
if nc.msgChan != nil {
return errors.New("Seek should be called before Chan")
}
log.Info("Seek is called", zap.String("topic", nc.topic), zap.Any("id", id))
msgID := id.(*nmqID).messageID
// skip the first message when consume
nc.skip = !inclusive
var err error
nc.sub, err = nc.js.ChanSubscribe(nc.topic, nc.natsChan, nats.StartSequence(msgID))
if err != nil {
log.Warn("fail to Seek", zap.Error(err))
}
return err
}
// Ack is used to ask a natsmq message
func (nc *Consumer) Ack(message common.Message) {
if err := message.(*nmqMessage).raw.Ack(); err != nil {
log.Warn("failed to ack message of nmq", zap.String("topic", message.Topic()), zap.Reflect("msgID", message.ID()))
}
}
// Close is used to free the resources of this consumer
func (nc *Consumer) Close() {
nc.closeOnce.Do(func() {
if nc.sub == nil {
return
}
if err := nc.sub.Unsubscribe(); err != nil {
log.Warn("failed to unsubscribe subscription of nmq", zap.String("topic", nc.topic), zap.Error(err))
}
close(nc.closeChan)
nc.wg.Wait()
})
}
// GetLatestMsgID returns the ID of the most recent message processed by the consumer.
func (nc *Consumer) GetLatestMsgID() (common.MessageID, error) {
if err := nc.closed(); err != nil {
return nil, err
}
info, err := nc.js.StreamInfo(nc.topic)
if err != nil {
log.Warn("fail to get stream info of nats", zap.String("topic", nc.topic), zap.Error(err))
return nil, errors.Wrap(err, "failed to get stream info of nats jetstream")
}
msgID := info.State.LastSeq
return &nmqID{messageID: msgID}, nil
}
// CheckTopicValid verifies if the given topic is valid for this consumer.
// 1. topic should exist.
func (nc *Consumer) CheckTopicValid(topic string) error {
if err := nc.closed(); err != nil {
return err
}
// A consumer is tied to a topic. In a multi-tenant situation,
// a consumer is not supposed to check on other topics.
if topic != nc.topic {
return fmt.Errorf("consumer of topic %s checking validness of topic %s", nc.topic, topic)
}
// check if topic valid or exist.
_, err := nc.js.StreamInfo(topic)
if errors.Is(err, nats.ErrStreamNotFound) {
return merr.WrapErrMqTopicNotFound(topic, err.Error())
} else if err != nil {
log.Warn("fail to get stream info of nats", zap.String("topic", nc.topic), zap.Error(err))
return errors.Wrap(err, "failed to get stream info of nats jetstream")
}
return nil
}
// Closed check if Consumer is closed.
func (nc *Consumer) closed() error {
select {
case <-nc.closeChan:
return errors.Newf("Closed Nmq Consumer, topic: %s, subscription name:", nc.topic, nc.groupName)
default:
return nil
}
}

View File

@ -1,452 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package nmq
import (
"context"
"reflect"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/pkg/v2/mq/common"
"github.com/milvus-io/milvus/pkg/v2/mq/msgstream/mqwrapper"
)
func TestNatsConsumer_Subscription(t *testing.T) {
client, err := createNmqClient()
assert.NoError(t, err)
defer client.Close()
topic := t.Name()
proOpts := common.ProducerOptions{Topic: topic}
_, err = client.CreateProducer(context.TODO(), proOpts)
assert.NoError(t, err)
consumer, err := client.Subscribe(context.TODO(), mqwrapper.ConsumerOptions{
Topic: topic,
SubscriptionName: topic,
SubscriptionInitialPosition: common.SubscriptionPositionEarliest,
BufSize: 1024,
})
assert.NoError(t, err)
assert.NotNil(t, consumer)
defer consumer.Close()
str := consumer.Subscription()
assert.NotNil(t, str)
}
func Test_GetEarliestMessageID(t *testing.T) {
client, err := createNmqClient()
assert.NoError(t, err)
defer client.Close()
mid := client.EarliestMessageID()
assert.NotNil(t, mid)
assert.Equal(t, mid.(*nmqID).messageID, MessageIDType(1))
}
func Test_BadLatestMessageID(t *testing.T) {
topic := t.Name()
client, err := createNmqClient()
assert.NoError(t, err)
defer client.Close()
consumer, err := client.Subscribe(context.TODO(), mqwrapper.ConsumerOptions{
Topic: topic,
SubscriptionName: topic,
SubscriptionInitialPosition: common.SubscriptionPositionEarliest,
BufSize: 1024,
})
assert.NoError(t, err)
consumer.Close()
id, err := consumer.GetLatestMsgID()
assert.Nil(t, id)
assert.Error(t, err)
}
func TestComsumeMessage(t *testing.T) {
client, err := createNmqClient()
assert.NoError(t, err)
defer client.Close()
topic := t.Name()
p, err := client.CreateProducer(context.TODO(), common.ProducerOptions{Topic: topic})
assert.NoError(t, err)
c, err := client.Subscribe(context.TODO(), mqwrapper.ConsumerOptions{
Topic: topic,
SubscriptionName: topic,
SubscriptionInitialPosition: common.SubscriptionPositionEarliest,
BufSize: 1024,
})
assert.NoError(t, err)
defer c.Close()
msg := []byte("test the first message")
prop := map[string]string{"k1": "v1", "k2": "v2"}
_, err = p.Send(context.Background(), &common.ProducerMessage{
Payload: msg,
Properties: prop,
})
assert.NoError(t, err)
recvMsg, err := c.(*Consumer).sub.NextMsg(1 * time.Second)
assert.NoError(t, err)
recvMsg.Ack()
assert.NoError(t, err)
assert.Equal(t, msg, recvMsg.Data)
properties := make(map[string]string)
for k, vs := range recvMsg.Header {
if len(vs) > 0 {
properties[k] = vs[0]
}
}
assert.True(t, reflect.DeepEqual(prop, properties))
msg2 := []byte("test the second message")
prop2 := map[string]string{"k1": "v3", "k4": "v4"}
_, err = p.Send(context.Background(), &common.ProducerMessage{
Payload: msg2,
Properties: prop2,
})
assert.NoError(t, err)
recvMsg, err = c.(*Consumer).sub.NextMsg(1 * time.Second)
assert.NoError(t, err)
recvMsg.Ack()
assert.Equal(t, msg2, recvMsg.Data)
properties = make(map[string]string)
for k, vs := range recvMsg.Header {
if len(vs) > 0 {
properties[k] = vs[0]
}
}
assert.True(t, reflect.DeepEqual(prop2, properties))
assert.NoError(t, err)
assert.NotNil(t, c)
}
func TestNatsConsumer_Close(t *testing.T) {
client, err := createNmqClient()
assert.NoError(t, err)
defer client.Close()
topic := t.Name()
c, err := client.Subscribe(context.TODO(), mqwrapper.ConsumerOptions{
Topic: topic,
SubscriptionName: topic,
SubscriptionInitialPosition: common.SubscriptionPositionEarliest,
BufSize: 1024,
})
assert.NoError(t, err)
assert.NotNil(t, c)
str := c.Subscription()
assert.NotNil(t, str)
c.Close()
// Disallow double close.
assert.Panics(t, func() { c.Chan() })
assert.Error(t, c.Seek(NewNmqID(1), false))
assert.Error(t, func() error { _, err := c.GetLatestMsgID(); return err }())
c.Close() // Allow double close, nothing happened.
}
func TestNatsClientErrorOnUnsubscribeTwice(t *testing.T) {
topic := t.Name()
client, err := createNmqClient()
assert.NoError(t, err)
defer client.Close()
consumer, err := client.Subscribe(context.TODO(), mqwrapper.ConsumerOptions{
Topic: topic,
SubscriptionName: topic,
SubscriptionInitialPosition: common.SubscriptionPositionEarliest,
BufSize: 1024,
})
assert.NoError(t, err)
defer consumer.Close()
err = consumer.(*Consumer).sub.Unsubscribe()
assert.NoError(t, err)
err = consumer.(*Consumer).sub.Unsubscribe()
assert.True(t, strings.Contains(err.Error(), "invalid subscription"))
t.Log(err)
}
func TestCheckTopicValid(t *testing.T) {
client, err := createNmqClient()
assert.NoError(t, err)
defer client.Close()
topic := t.Name()
consumer, err := client.Subscribe(context.TODO(), mqwrapper.ConsumerOptions{
Topic: topic,
SubscriptionName: topic,
SubscriptionInitialPosition: common.SubscriptionPositionEarliest,
BufSize: 1024,
})
assert.NoError(t, err)
assert.NotNil(t, consumer)
str := consumer.Subscription()
assert.NotNil(t, str)
// empty topic should pass
err = consumer.CheckTopicValid(topic)
assert.NoError(t, err)
// Not allowed to check other topic's validness.
err = consumer.CheckTopicValid("BadTopic")
assert.Error(t, err)
// not empty topic can pass
pub, err := client.CreateProducer(context.TODO(), common.ProducerOptions{
Topic: topic,
})
assert.NoError(t, err)
_, err = pub.Send(context.TODO(), &common.ProducerMessage{
Payload: []byte("123123123"),
})
assert.NoError(t, err)
err = consumer.CheckTopicValid(topic)
assert.NoError(t, err)
consumer.Close()
err = consumer.CheckTopicValid(topic)
assert.Error(t, err)
}
func newTestConsumer(t *testing.T, topic string, position common.SubscriptionInitialPosition) (mqwrapper.Consumer, error) {
client, err := createNmqClient()
assert.NoError(t, err)
return client.Subscribe(context.TODO(), mqwrapper.ConsumerOptions{
Topic: topic,
SubscriptionName: topic,
SubscriptionInitialPosition: position,
BufSize: 1024,
})
}
func newProducer(t *testing.T, topic string) (*nmqClient, mqwrapper.Producer) {
client, err := createNmqClient()
assert.NoError(t, err)
producer, err := client.CreateProducer(context.TODO(), common.ProducerOptions{Topic: topic})
assert.NoError(t, err)
return client, producer
}
func process(t *testing.T, msgs []string, p mqwrapper.Producer) {
for _, msg := range msgs {
_, err := p.Send(context.Background(), &common.ProducerMessage{
Payload: []byte(msg),
Properties: map[string]string{},
})
assert.NoError(t, err)
}
}
func TestNmqConsumer_GetLatestMsgID(t *testing.T) {
client, err := createNmqClient()
assert.NoError(t, err)
defer client.Close()
topic := t.Name()
p, err := client.CreateProducer(context.TODO(), common.ProducerOptions{Topic: topic})
assert.NoError(t, err)
c, err := client.Subscribe(context.TODO(), mqwrapper.ConsumerOptions{
Topic: topic,
SubscriptionName: topic,
SubscriptionInitialPosition: common.SubscriptionPositionEarliest,
BufSize: 1024,
})
assert.NoError(t, err)
defer c.Close()
latestMsgID, err := c.GetLatestMsgID()
assert.NoError(t, err)
msgs := []string{"111", "222", "333", "444", "555"}
newMessageID := latestMsgID.(*nmqID).messageID + uint64(len(msgs))
process(t, msgs, p)
latestMsgID, err = c.GetLatestMsgID()
assert.NoError(t, err)
assert.Equal(t, newMessageID, latestMsgID.(*nmqID).messageID)
}
func TestNmqConsumer_ConsumeFromLatest(t *testing.T) {
client, err := createNmqClient()
assert.NoError(t, err)
defer client.Close()
topic := t.Name()
p, err := client.CreateProducer(context.TODO(), common.ProducerOptions{Topic: topic})
assert.NoError(t, err)
msgs := []string{"111", "222", "333"}
process(t, msgs, p)
c, err := client.Subscribe(context.TODO(), mqwrapper.ConsumerOptions{
Topic: topic,
SubscriptionName: topic,
SubscriptionInitialPosition: common.SubscriptionPositionLatest,
BufSize: 1024,
})
assert.NoError(t, err)
defer c.Close()
msgs = []string{"444", "555"}
process(t, msgs, p)
msg := <-c.Chan()
assert.Equal(t, "444", string(msg.Payload()))
msg = <-c.Chan()
assert.Equal(t, "555", string(msg.Payload()))
}
func TestNmqConsumer_ConsumeFromEarliest(t *testing.T) {
client, err := createNmqClient()
assert.NoError(t, err)
defer client.Close()
topic := t.Name()
p, err := client.CreateProducer(context.TODO(), common.ProducerOptions{Topic: topic})
assert.NoError(t, err)
msgs := []string{"111", "222"}
process(t, msgs, p)
c, err := client.Subscribe(context.TODO(), mqwrapper.ConsumerOptions{
Topic: topic,
SubscriptionName: topic,
SubscriptionInitialPosition: common.SubscriptionPositionEarliest,
BufSize: 1024,
})
assert.NoError(t, err)
defer c.Close()
msgs = []string{"333", "444", "555"}
process(t, msgs, p)
msg := <-c.Chan()
assert.Equal(t, "111", string(msg.Payload()))
msg = <-c.Chan()
assert.Equal(t, "222", string(msg.Payload()))
c2, err := client.Subscribe(context.TODO(), mqwrapper.ConsumerOptions{
Topic: topic,
SubscriptionName: topic,
SubscriptionInitialPosition: common.SubscriptionPositionEarliest,
BufSize: 1024,
})
assert.NoError(t, err)
defer c2.Close()
msgs = []string{"777"}
process(t, msgs, p)
msg = <-c2.Chan()
assert.Equal(t, "111", string(msg.Payload()))
msg = <-c2.Chan()
assert.Equal(t, "222", string(msg.Payload()))
}
func TestNatsConsumer_SeekExclusive(t *testing.T) {
topic := t.Name()
c, p := newProducer(t, topic)
defer c.Close()
defer p.Close()
msgs := []string{"111", "222", "333", "444", "555"}
process(t, msgs, p)
msgID := &nmqID{messageID: 2}
consumer, err := newTestConsumer(t, topic, common.SubscriptionPositionUnknown)
assert.NoError(t, err)
defer consumer.Close()
err = consumer.Seek(msgID, false)
assert.NoError(t, err)
msg := <-consumer.Chan()
assert.Equal(t, "333", string(msg.Payload()))
msg = <-consumer.Chan()
assert.Equal(t, "444", string(msg.Payload()))
}
func TestNatsConsumer_SeekInclusive(t *testing.T) {
topic := t.Name()
c, p := newProducer(t, topic)
defer c.Close()
defer p.Close()
msgs := []string{"111", "222", "333", "444", "555"}
process(t, msgs, p)
msgID := &nmqID{messageID: 2}
consumer, err := newTestConsumer(t, topic, common.SubscriptionPositionUnknown)
assert.NoError(t, err)
defer consumer.Close()
err = consumer.Seek(msgID, true)
assert.NoError(t, err)
msg := <-consumer.Chan()
assert.Equal(t, "222", string(msg.Payload()))
msg = <-consumer.Chan()
assert.Equal(t, "333", string(msg.Payload()))
}
func TestNatsConsumer_NoDoubleSeek(t *testing.T) {
topic := t.Name()
c, p := newProducer(t, topic)
defer c.Close()
defer p.Close()
msgID := &nmqID{messageID: 2}
consumer, err := newTestConsumer(t, topic, common.SubscriptionPositionUnknown)
assert.NoError(t, err)
defer consumer.Close()
err = consumer.Seek(msgID, true)
assert.NoError(t, err)
err = consumer.Seek(msgID, true)
assert.Error(t, err)
}
func TestNatsConsumer_ChanWithNoAssign(t *testing.T) {
topic := t.Name()
c, p := newProducer(t, topic)
defer c.Close()
defer p.Close()
msgs := []string{"111", "222", "333", "444", "555"}
process(t, msgs, p)
consumer, err := newTestConsumer(t, topic, common.SubscriptionPositionUnknown)
assert.NoError(t, err)
defer consumer.Close()
assert.Panics(t, func() {
<-consumer.Chan()
})
}

View File

@ -1,69 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package nmq
import (
"github.com/milvus-io/milvus/pkg/v2/common"
mqcommon "github.com/milvus-io/milvus/pkg/v2/mq/common"
)
// MessageIDType is a type alias for server.UniqueID that represents the ID of a Nmq message.
type MessageIDType = uint64
// nmqID wraps message ID for natsmq
type nmqID struct {
messageID MessageIDType
}
// Check if nmqID implements MessageID interface
var _ mqcommon.MessageID = &nmqID{}
// NewNmqID creates and returns a new instance of the nmqID struct with the given MessageID.
func NewNmqID(id MessageIDType) mqcommon.MessageID {
return &nmqID{
messageID: id,
}
}
// Serialize convert nmq message id to []byte
func (nid *nmqID) Serialize() []byte {
return SerializeNmqID(nid.messageID)
}
func (nid *nmqID) AtEarliestPosition() bool {
return nid.messageID <= 1
}
func (nid *nmqID) LessOrEqualThan(msgID []byte) (bool, error) {
return nid.messageID <= DeserializeNmqID(msgID), nil
}
func (nid *nmqID) Equal(msgID []byte) (bool, error) {
return nid.messageID == DeserializeNmqID(msgID), nil
}
// SerializeNmqID is used to serialize a message ID to byte array
func SerializeNmqID(messageID MessageIDType) []byte {
b := make([]byte, 8)
common.Endian.PutUint64(b, messageID)
return b
}
// DeserializeNmqID is used to deserialize a message ID from byte array
func DeserializeNmqID(messageID []byte) MessageIDType {
return common.Endian.Uint64(messageID)
}

View File

@ -1,101 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package nmq
import (
"math"
"testing"
"github.com/stretchr/testify/assert"
)
func TestNmqID_Serialize(t *testing.T) {
rid := &nmqID{
messageID: 8,
}
bin := rid.Serialize()
assert.NotNil(t, bin)
assert.NotZero(t, len(bin))
}
func Test_AtEarliestPosition(t *testing.T) {
rid := &nmqID{
messageID: 0,
}
assert.True(t, rid.AtEarliestPosition())
rid = &nmqID{
messageID: math.MaxInt64,
}
assert.False(t, rid.AtEarliestPosition())
}
func TestLessOrEqualThan(t *testing.T) {
rid1 := &nmqID{
messageID: 0,
}
rid2 := &nmqID{
messageID: math.MaxInt64,
}
ret, err := rid1.LessOrEqualThan(rid2.Serialize())
assert.NoError(t, err)
assert.True(t, ret)
ret, err = rid2.LessOrEqualThan(rid1.Serialize())
assert.NoError(t, err)
assert.False(t, ret)
ret, err = rid1.LessOrEqualThan(rid1.Serialize())
assert.NoError(t, err)
assert.True(t, ret)
}
func Test_Equal(t *testing.T) {
rid1 := &nmqID{
messageID: 0,
}
rid2 := &nmqID{
messageID: math.MaxInt64,
}
{
ret, err := rid1.Equal(rid1.Serialize())
assert.NoError(t, err)
assert.True(t, ret)
}
{
ret, err := rid1.Equal(rid2.Serialize())
assert.NoError(t, err)
assert.False(t, ret)
}
}
func Test_SerializeNmqID(t *testing.T) {
bin := SerializeNmqID(10)
assert.NotNil(t, bin)
assert.NotZero(t, len(bin))
}
func Test_DeserializeNmqID(t *testing.T) {
bin := SerializeNmqID(5)
id := DeserializeNmqID(bin)
assert.Equal(t, id, MessageIDType(5))
}

View File

@ -1,76 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package nmq
import (
"log"
"github.com/nats-io/nats.go"
"go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/v2/mq/common"
)
// Check nmqMessage implements ConsumerMessage
var (
_ common.Message = (*nmqMessage)(nil)
)
// nmqMessage wraps the message for natsmq
type nmqMessage struct {
raw *nats.Msg
// lazy initialized field.
meta *nats.MsgMetadata
}
// Topic returns the topic name of natsmq message
func (nm *nmqMessage) Topic() string {
// TODO: Dependency: implement of subscription logic of nmq.
// 1:1 Subject:Topic model is appied on this implementation.
// M:N model should be a optimize option in future.
return nm.raw.Subject
}
// Properties returns the properties of natsmq message
func (nm *nmqMessage) Properties() map[string]string {
properties := make(map[string]string, len(nm.raw.Header))
for k, vs := range nm.raw.Header {
if len(vs) > 0 {
properties[k] = vs[0]
}
}
return properties
}
// Payload returns the payload of natsmq message
func (nm *nmqMessage) Payload() []byte {
return nm.raw.Data
}
// ID returns the id of natsmq message
func (nm *nmqMessage) ID() common.MessageID {
if nm.meta == nil {
var err error
// raw is always a jetstream message, should never fail.
nm.meta, err = nm.raw.Metadata()
if err != nil {
log.Fatal("raw is always a jetstream message, should never fail", zap.Error(err))
}
}
return &nmqID{messageID: nm.meta.Sequence.Stream}
}

View File

@ -1,44 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package nmq
import (
"testing"
"github.com/nats-io/nats.go"
"github.com/stretchr/testify/assert"
)
func TestNmqMessage_All(t *testing.T) {
topic := t.Name()
raw := nats.NewMsg(topic)
raw.Data = []byte(`test payload`)
raw.Header.Add("test", "test")
nm := nmqMessage{
meta: &nats.MsgMetadata{
Sequence: nats.SequencePair{
Stream: 12,
},
},
raw: raw,
}
payload := []byte("test payload")
assert.Equal(t, topic, nm.Topic())
assert.Equal(t, MessageIDType(12), nm.ID().(*nmqID).messageID)
assert.Equal(t, payload, nm.Payload())
assert.Equal(t, nm.Properties()["test"], "test")
}

View File

@ -1,78 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file exceapt in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package nmq
import (
"context"
"github.com/nats-io/nats.go"
"go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/metrics"
"github.com/milvus-io/milvus/pkg/v2/mq/common"
"github.com/milvus-io/milvus/pkg/v2/mq/msgstream/mqwrapper"
"github.com/milvus-io/milvus/pkg/v2/util/timerecord"
)
var _ mqwrapper.Producer = (*nmqProducer)(nil)
// nmqProducer contains a natsmq producer
type nmqProducer struct {
js nats.JetStreamContext
topic string
}
// Topic returns the topic of nmq producer
func (np *nmqProducer) Topic() string {
return np.topic
}
// Send send the producer messages to natsmq
func (np *nmqProducer) Send(ctx context.Context, message *common.ProducerMessage) (common.MessageID, error) {
start := timerecord.NewTimeRecorder("send msg to stream")
metrics.MsgStreamOpCounter.WithLabelValues(metrics.SendMsgLabel, metrics.TotalLabel).Inc()
// Encode message
msg := &nats.Msg{
Subject: np.topic,
Header: make(nats.Header, len(message.Properties)),
Data: message.Payload,
}
for k, v := range message.Properties {
msg.Header.Add(k, v)
}
// publish to nats-server
pa, err := np.js.PublishMsg(msg, nats.Context(ctx))
if err != nil {
metrics.MsgStreamOpCounter.WithLabelValues(metrics.SendMsgLabel, metrics.FailLabel).Inc()
log.Warn("failed to publish message by nmq", zap.String("topic", np.topic), zap.Error(err), zap.Int("payload_size", len(message.Payload)))
return nil, err
}
elapsed := start.ElapseSpan()
metrics.MsgStreamRequestLatency.WithLabelValues(metrics.SendMsgLabel).Observe(float64(elapsed.Milliseconds()))
metrics.MsgStreamOpCounter.WithLabelValues(metrics.SendMsgLabel, metrics.SuccessLabel).Inc()
return &nmqID{messageID: pa.Sequence}, err
}
// Close does nothing currently
func (np *nmqProducer) Close() {
// No specific producer to be closed.
// stream doesn't close here.
}

View File

@ -1,47 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package nmq
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/pkg/v2/mq/common"
)
func TestNatsMQProducer(t *testing.T) {
c, err := createNmqClient()
assert.NoError(t, err)
defer c.Close()
topic := t.Name()
pOpts := common.ProducerOptions{Topic: topic}
// Check Topic()
p, err := c.CreateProducer(context.TODO(), pOpts)
assert.NoError(t, err)
assert.Equal(t, p.(*nmqProducer).Topic(), topic)
// Check Send()
msg := &common.ProducerMessage{
Payload: []byte{},
Properties: map[string]string{},
}
_, err = p.Send(context.TODO(), msg)
assert.NoError(t, err)
}

View File

@ -1,115 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package nmq
import (
"context"
"os"
"path"
"sync"
"time"
"github.com/cockroachdb/errors"
"github.com/nats-io/nats-server/v2/server"
"go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)
// Nmq is global natsmq instance that will be initialized only once
var Nmq *server.Server
// once is used to init global natsmq
var once sync.Once
// NatsMQConfig is used to initialize NatsMQ.
type NatsMQConfig struct {
Opts server.Options
InitializeTimeout time.Duration
}
// MustInitNatsMQ init global local natsmq instance.
// Panic if initailizing operation failed.
func MustInitNatsMQ(cfg *NatsMQConfig) {
once.Do(func() {
var err error
Nmq, err = initNatsMQ(cfg)
if err != nil {
log.Fatal("initialize nmq failed", zap.Error(err))
}
})
}
func initNatsMQ(cfg *NatsMQConfig) (*server.Server, error) {
log.Info("try to initialize global nmq", zap.Any("config", cfg))
natsServer, err := server.NewServer(&cfg.Opts)
if err != nil {
return nil, errors.Wrap(err, "fail to initailize nmq")
}
// Config log if log file set.
if cfg.Opts.LogFile != "" {
if err := os.MkdirAll(path.Dir(cfg.Opts.LogFile), 0o744); err != nil {
return nil, errors.Wrap(err, "fail to create directory for nats log file")
}
// make directory for the file
natsServer.ConfigureLogger()
}
// Start Nmq in background and wait until it's ready for connection.
if err := server.Run(natsServer); err != nil {
return nil, errors.Wrap(err, "start nmq failed")
}
// Wait for server to be ready for connections
if !natsServer.ReadyForConnections(cfg.InitializeTimeout) {
return nil, errors.New("nmq is not ready within timeout")
}
log.Info("initialize nmq finished", zap.String("client-url", natsServer.ClientURL()), zap.Error(err))
return natsServer, nil
}
// ParseServerOption get nats server option from paramstable.
func ParseServerOption(params *paramtable.ComponentParam) *NatsMQConfig {
return &NatsMQConfig{
Opts: server.Options{
Host: "127.0.0.1", // Force to use loopback address.
Port: params.NatsmqCfg.ServerPort.GetAsInt(),
MaxPayload: params.NatsmqCfg.ServerMaxPayload.GetAsInt32(),
MaxPending: params.NatsmqCfg.ServerMaxPending.GetAsInt64(),
JetStream: true,
JetStreamMaxStore: params.NatsmqCfg.ServerMaxFileStore.GetAsInt64(),
StoreDir: params.NatsmqCfg.ServerStoreDir.GetValue(),
Trace: params.NatsmqCfg.ServerMonitorTrace.GetAsBool(),
Debug: params.NatsmqCfg.ServerMonitorDebug.GetAsBool(),
Logtime: params.NatsmqCfg.ServerMonitorLogTime.GetAsBool(),
LogFile: params.NatsmqCfg.ServerMonitorLogFile.GetValue(),
LogSizeLimit: params.NatsmqCfg.ServerMonitorLogSizeLimit.GetAsInt64(),
},
InitializeTimeout: time.Duration(params.NatsmqCfg.ServerInitializeTimeout.GetAsInt()) * time.Millisecond,
}
}
// CloseNatsMQ is used to close global natsmq
func CloseNatsMQ() {
log.Ctx(context.TODO()).Debug("Closing Natsmq!")
if Nmq != nil {
// Shut down the server.
Nmq.Shutdown()
// Wait for server shutdown.
Nmq.WaitForShutdown()
Nmq = nil
}
}

View File

@ -1,123 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package nmq
import (
"fmt"
"os"
"testing"
"time"
"github.com/nats-io/nats-server/v2/server"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)
var natsServerAddress string
func TestMain(m *testing.M) {
exitCode := func() int {
paramtable.Init()
storeDir, _ := os.MkdirTemp("", "milvus_mq_nmq")
defer os.RemoveAll(storeDir)
cfg := ParseServerOption(paramtable.Get())
cfg.Opts.Port = server.RANDOM_PORT
cfg.Opts.StoreDir = storeDir
MustInitNatsMQ(cfg)
defer CloseNatsMQ()
natsServerAddress = Nmq.ClientURL()
return m.Run()
}()
os.Exit(exitCode)
}
func TestInitNatsMQ(t *testing.T) {
func() {
cfg := ParseServerOption(paramtable.Get())
storeDir, _ := os.MkdirTemp("", "milvus_mq_nmq")
defer os.RemoveAll(storeDir)
cfg.Opts.StoreDir = storeDir
cfg.Opts.Port = server.RANDOM_PORT
cfg.Opts.LogFile = ""
mq, err := initNatsMQ(cfg)
assert.NoError(t, err)
assert.NotNil(t, mq)
mq.Shutdown()
mq.WaitForShutdown()
}()
func() {
cfg := ParseServerOption(paramtable.Get())
storeDir, _ := os.MkdirTemp("", "milvus_mq_nmq")
defer os.RemoveAll(storeDir)
cfg.Opts.StoreDir = storeDir
cfg.Opts.Port = server.RANDOM_PORT
cfg.Opts.LogFile = ""
mq, err := initNatsMQ(cfg)
assert.NoError(t, err)
assert.NotNil(t, mq)
mq.Shutdown()
mq.WaitForShutdown()
}()
func() {
cfg := ParseServerOption(paramtable.Get())
storeDir, _ := os.MkdirTemp("", "milvus_mq_nmq")
defer os.RemoveAll(storeDir)
cfg.Opts.StoreDir = storeDir
cfg.Opts.Port = server.RANDOM_PORT
cfg.Opts.MaxPending = -1
mq, err := initNatsMQ(cfg)
assert.Error(t, err)
assert.Nil(t, mq)
}()
func() {
ex, err := os.Executable()
assert.NoError(t, err)
cfg := ParseServerOption(paramtable.Get())
storeDir, _ := os.MkdirTemp("", "milvus_mq_nmq")
defer os.RemoveAll(storeDir)
cfg.Opts.StoreDir = storeDir
cfg.Opts.Port = server.RANDOM_PORT
cfg.Opts.LogFile = fmt.Sprintf("%s/test", ex)
mq, err := initNatsMQ(cfg)
assert.Error(t, err)
assert.Nil(t, mq)
}()
}
func TestGetServerOptionDefault(t *testing.T) {
cfg := ParseServerOption(paramtable.Get())
assert.Equal(t, "127.0.0.1", cfg.Opts.Host)
assert.Equal(t, 4222, cfg.Opts.Port)
assert.Equal(t, true, cfg.Opts.JetStream)
assert.Equal(t, "/var/lib/milvus/nats", cfg.Opts.StoreDir)
assert.Equal(t, int64(17179869184), cfg.Opts.JetStreamMaxStore)
assert.Equal(t, int32(8388608), cfg.Opts.MaxPayload)
assert.Equal(t, int64(67108864), cfg.Opts.MaxPending)
assert.Equal(t, 4000*time.Millisecond, cfg.InitializeTimeout)
assert.Equal(t, false, cfg.Opts.Debug)
assert.Equal(t, false, cfg.Opts.Trace)
assert.Equal(t, true, cfg.Opts.Logtime)
assert.Equal(t, "/tmp/milvus/logs/nats.log", cfg.Opts.LogFile)
assert.Equal(t, int64(536870912), cfg.Opts.LogSizeLimit)
}

View File

@ -32,7 +32,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/util/timerecord"
)
// nmqClient implements mqwrapper.Client.
// rmqClient implements mqwrapper.Client.
var _ mqwrapper.Client = &rmqClient{}
// var InitRocksMQ = server.InitRocksMQ

View File

@ -4,7 +4,6 @@ import (
"context"
"fmt"
"math/rand"
"os"
"sync"
"testing"
@ -12,33 +11,9 @@ import (
"github.com/milvus-io/milvus/pkg/v2/mq/common"
"github.com/milvus-io/milvus/pkg/v2/mq/msgstream/mqwrapper"
"github.com/milvus-io/milvus/pkg/v2/mq/msgstream/mqwrapper/nmq"
"github.com/milvus-io/milvus/pkg/v2/util/funcutil"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)
func BenchmarkProduceAndConsumeNatsMQ(b *testing.B) {
storeDir, err := os.MkdirTemp("", "milvus_mq_nmq")
assert.NoError(b, err)
defer os.RemoveAll(storeDir)
paramtable.Init()
cfg := nmq.ParseServerOption(paramtable.Get())
cfg.Opts.StoreDir = storeDir
nmq.MustInitNatsMQ(cfg)
client, err := nmq.NewClientWithDefaultOptions(context.Background())
if err != nil {
panic(err)
}
cases := generateRandBytes(64*1024, 10000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
benchmarkProduceAndConsume(b, client, cases)
}
}
func benchmarkProduceAndConsume(b *testing.B, mqClient mqwrapper.Client, cases [][]byte) {
topic := fmt.Sprintf("test_produce_and_consume_topic_%d", rand.Int31n(100000))
wg := sync.WaitGroup{}

View File

@ -50,7 +50,6 @@ type ServiceParam struct {
PulsarCfg PulsarConfig
KafkaCfg KafkaConfig
RocksmqCfg RocksmqConfig
NatsmqCfg NatsmqConfig
MinioCfg MinioConfig
ProfileCfg ProfileConfig
}
@ -65,7 +64,6 @@ func (p *ServiceParam) init(bt *BaseTable) {
p.PulsarCfg.Init(bt)
p.KafkaCfg.Init(bt)
p.RocksmqCfg.Init(bt)
p.NatsmqCfg.Init(bt)
p.MinioCfg.Init(bt)
p.ProfileCfg.Init(bt)
}
@ -74,11 +72,6 @@ func (p *ServiceParam) RocksmqEnable() bool {
return p.RocksmqCfg.Path.GetValue() != ""
}
// NatsmqEnable checks if NATS messaging queue is enabled.
func (p *ServiceParam) NatsmqEnable() bool {
return p.NatsmqCfg.ServerStoreDir.GetValue() != ""
}
func (p *ServiceParam) PulsarEnable() bool {
return p.PulsarCfg.Address.GetValue() != ""
}
@ -548,7 +541,7 @@ func (p *MQConfig) Init(base *BaseTable) {
Version: "2.3.0",
DefaultValue: "default",
Doc: `Default value: "default"
Valid values: [default, pulsar, kafka, rocksmq, natsmq, woodpecker]`,
Valid values: [default, pulsar, kafka, rocksmq, woodpecker]`,
Export: true,
}
p.Type.Init(base.mgr)
@ -1216,141 +1209,6 @@ Set an easy-to-identify root key prefix for Milvus if etcd service already exist
r.CompressionTypes.Init(base.mgr)
}
// NatsmqConfig describes the configuration options for the Nats message queue
type NatsmqConfig struct {
ServerPort ParamItem `refreshable:"false"`
ServerStoreDir ParamItem `refreshable:"false"`
ServerMaxFileStore ParamItem `refreshable:"false"`
ServerMaxPayload ParamItem `refreshable:"false"`
ServerMaxPending ParamItem `refreshable:"false"`
ServerInitializeTimeout ParamItem `refreshable:"false"`
ServerMonitorTrace ParamItem `refreshable:"false"`
ServerMonitorDebug ParamItem `refreshable:"false"`
ServerMonitorLogTime ParamItem `refreshable:"false"`
ServerMonitorLogFile ParamItem `refreshable:"false"`
ServerMonitorLogSizeLimit ParamItem `refreshable:"false"`
ServerRetentionMaxAge ParamItem `refreshable:"true"`
ServerRetentionMaxBytes ParamItem `refreshable:"true"`
ServerRetentionMaxMsgs ParamItem `refreshable:"true"`
}
// Init sets up a new NatsmqConfig instance using the provided BaseTable
func (r *NatsmqConfig) Init(base *BaseTable) {
r.ServerPort = ParamItem{
Key: "natsmq.server.port",
Version: "2.3.0",
DefaultValue: "4222",
Doc: "Listening port of the NATS server.",
Export: true,
}
r.ServerPort.Init(base.mgr)
r.ServerStoreDir = ParamItem{
Key: "natsmq.server.storeDir",
DefaultValue: "/var/lib/milvus/nats",
Version: "2.3.0",
Doc: `Directory to use for JetStream storage of nats`,
Export: true,
}
r.ServerStoreDir.Init(base.mgr)
r.ServerMaxFileStore = ParamItem{
Key: "natsmq.server.maxFileStore",
Version: "2.3.0",
DefaultValue: "17179869184",
Doc: `Maximum size of the 'file' storage`,
Export: true,
}
r.ServerMaxFileStore.Init(base.mgr)
r.ServerMaxPayload = ParamItem{
Key: "natsmq.server.maxPayload",
Version: "2.3.0",
DefaultValue: "8388608",
Doc: `Maximum number of bytes in a message payload`,
Export: true,
}
r.ServerMaxPayload.Init(base.mgr)
r.ServerMaxPending = ParamItem{
Key: "natsmq.server.maxPending",
Version: "2.3.0",
DefaultValue: "67108864",
Doc: `Maximum number of bytes buffered for a connection Applies to client connections`,
Export: true,
}
r.ServerMaxPending.Init(base.mgr)
r.ServerInitializeTimeout = ParamItem{
Key: "natsmq.server.initializeTimeout",
Version: "2.3.0",
DefaultValue: "4000",
Doc: `waiting for initialization of natsmq finished`,
Export: true,
}
r.ServerInitializeTimeout.Init(base.mgr)
r.ServerMonitorTrace = ParamItem{
Key: "natsmq.server.monitor.trace",
Version: "2.3.0",
DefaultValue: "false",
Doc: `If true enable protocol trace log messages`,
Export: true,
}
r.ServerMonitorTrace.Init(base.mgr)
r.ServerMonitorDebug = ParamItem{
Key: "natsmq.server.monitor.debug",
Version: "2.3.0",
DefaultValue: "false",
Doc: `If true enable debug log messages`,
Export: true,
}
r.ServerMonitorDebug.Init(base.mgr)
r.ServerMonitorLogTime = ParamItem{
Key: "natsmq.server.monitor.logTime",
Version: "2.3.0",
DefaultValue: "true",
Doc: `If set to false, log without timestamps.`,
Export: true,
}
r.ServerMonitorLogTime.Init(base.mgr)
r.ServerMonitorLogFile = ParamItem{
Key: "natsmq.server.monitor.logFile",
Version: "2.3.0",
DefaultValue: "/tmp/milvus/logs/nats.log",
Doc: `Log file path relative to .. of milvus binary if use relative path`,
Export: true,
}
r.ServerMonitorLogFile.Init(base.mgr)
r.ServerMonitorLogSizeLimit = ParamItem{
Key: "natsmq.server.monitor.logSizeLimit",
Version: "2.3.0",
DefaultValue: "536870912",
Doc: `Size in bytes after the log file rolls over to a new one`,
Export: true,
}
r.ServerMonitorLogSizeLimit.Init(base.mgr)
r.ServerRetentionMaxAge = ParamItem{
Key: "natsmq.server.retention.maxAge",
Version: "2.3.0",
DefaultValue: "4320",
Doc: `Maximum age of any message in the P-channel`,
Export: true,
}
r.ServerRetentionMaxAge.Init(base.mgr)
r.ServerRetentionMaxBytes = ParamItem{
Key: "natsmq.server.retention.maxBytes",
Version: "2.3.0",
DefaultValue: "",
Doc: `How many bytes the single P-channel may contain. Removing oldest messages if the P-channel exceeds this size`,
Export: true,
}
r.ServerRetentionMaxBytes.Init(base.mgr)
r.ServerRetentionMaxMsgs = ParamItem{
Key: "natsmq.server.retention.maxMsgs",
Version: "2.3.0",
DefaultValue: "",
Doc: `How many message the single P-channel may contain. Removing oldest messages if the P-channel exceeds this limit`,
Export: true,
}
r.ServerRetentionMaxMsgs.Init(base.mgr)
}
// /////////////////////////////////////////////////////////////////////////////
// --- minio ---
type MinioConfig struct {

View File

@ -1,46 +0,0 @@
cluster:
enabled: false
log:
level: debug
image:
all:
repository: milvusdb/milvus
tag: master-20230614-893c3c04
pullPolicy: IfNotPresent
standalone:
resources:
limits:
cpu: 8
memory: 16Gi
requests:
cpu: 4
memory: 8Gi
kafka:
enabled: false
name: kafka
replicaCount: 3
defaultReplicationFactor: 2
etcd:
replicaCount: 3
image:
repository: milvusdb/etcd
tag: 3.5.5-r2
minio:
mode: standalone
pulsar:
enabled: false
extraConfigFiles:
user.yaml: |+
mq:
type: natsmq
dataCoord:
compaction:
indexBasedCompaction: false
indexCoord:
scheduler:
interval: 100