diff --git a/cmd/roles/roles.go b/cmd/roles/roles.go index 36797f71a6..cc6d8b12ad 100644 --- a/cmd/roles/roles.go +++ b/cmd/roles/roles.go @@ -46,7 +46,6 @@ import ( "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/metrics" rocksmqimpl "github.com/milvus-io/milvus/pkg/v2/mq/mqimpl/rocksmq/server" - "github.com/milvus-io/milvus/pkg/v2/mq/msgstream/mqwrapper/nmq" "github.com/milvus-io/milvus/pkg/v2/tracer" "github.com/milvus-io/milvus/pkg/v2/util/etcd" "github.com/milvus-io/milvus/pkg/v2/util/expr" @@ -334,10 +333,8 @@ func (mr *MilvusRoles) Run() { params := paramtable.Get() if paramtable.Get().RocksmqEnable() { defer stopRocksmq() - } else if paramtable.Get().NatsmqEnable() { - defer nmq.CloseNatsMQ() } else { - panic("only support Rocksmq and Natsmq in standalone mode") + panic("only support Rocksmq in standalone mode") } if params.EtcdCfg.UseEmbedEtcd.GetAsBool() { // Start etcd server. diff --git a/cmd/tools/config/generate.go b/cmd/tools/config/generate.go index 58f309ae12..52ac7e5231 100644 --- a/cmd/tools/config/generate.go +++ b/cmd/tools/config/generate.go @@ -233,11 +233,11 @@ func WriteYaml(w io.Writer) { { name: "mq", header: ` -# Milvus supports four MQ: rocksmq(based on RockDB), natsmq(embedded nats-server), Pulsar and Kafka. +# Milvus supports four MQ: rocksmq(based on RockDB), Pulsar and Kafka. # You can change your mq by setting mq.type field. # If you don't set mq.type field as default, there is a note about enabling priority if we config multiple mq in this file. -# 1. standalone(local) mode: rocksmq(default) > natsmq > Pulsar > Kafka -# 2. cluster mode: Pulsar(default) > Kafka (rocksmq and natsmq is unsupported in cluster mode)`, +# 1. standalone(local) mode: rocksmq(default) > Pulsar > Kafka +# 2. cluster mode: Pulsar(default) > Kafka (rocksmq is unsupported in cluster mode)`, }, { name: "woodpecker", @@ -257,12 +257,6 @@ func WriteYaml(w io.Writer) { { name: "rocksmq", }, - { - name: "natsmq", - header: ` -# natsmq configuration. -# more detail: https://docs.nats.io/running-a-nats-service/configuration`, - }, { name: "mixCoord", header: "\n# Related configuration of mixCoord", diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 0fcc64cf35..0967f87aa6 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -158,14 +158,14 @@ minio: # 0 means using oss client by default, decrease these configration if ListObjects timeout listObjectsMaxKeys: 0 -# Milvus supports four MQ: rocksmq(based on RockDB), natsmq(embedded nats-server), Pulsar and Kafka. +# Milvus supports four MQ: rocksmq(based on RockDB), Pulsar and Kafka. # You can change your mq by setting mq.type field. # If you don't set mq.type field as default, there is a note about enabling priority if we config multiple mq in this file. -# 1. standalone(local) mode: rocksmq(default) > natsmq > Pulsar > Kafka -# 2. cluster mode: Pulsar(default) > Kafka (rocksmq and natsmq is unsupported in cluster mode) +# 1. standalone(local) mode: rocksmq(default) > Pulsar > Kafka +# 2. cluster mode: Pulsar(default) > Kafka (rocksmq is unsupported in cluster mode) mq: # Default value: "default" - # Valid values: [default, pulsar, kafka, rocksmq, natsmq, woodpecker] + # Valid values: [default, pulsar, kafka, rocksmq, woodpecker] type: default enablePursuitMode: true # Default value: "true" pursuitLag: 10 # time tick lag threshold to enter pursuit mode, in seconds @@ -256,27 +256,6 @@ rocksmq: compactionInterval: 86400 # Time interval to trigger rocksdb compaction to remove deleted data. Unit: Second compressionTypes: 0,0,7,7,7 # compaction compression type, only support use 0,7. 0 means not compress, 7 will use zstd. Length of types means num of rocksdb level. -# natsmq configuration. -# more detail: https://docs.nats.io/running-a-nats-service/configuration -natsmq: - server: - port: 4222 # Listening port of the NATS server. - storeDir: /var/lib/milvus/nats # Directory to use for JetStream storage of nats - maxFileStore: 17179869184 # Maximum size of the 'file' storage - maxPayload: 8388608 # Maximum number of bytes in a message payload - maxPending: 67108864 # Maximum number of bytes buffered for a connection Applies to client connections - initializeTimeout: 4000 # waiting for initialization of natsmq finished - monitor: - trace: false # If true enable protocol trace log messages - debug: false # If true enable debug log messages - logTime: true # If set to false, log without timestamps. - logFile: /tmp/milvus/logs/nats.log # Log file path relative to .. of milvus binary if use relative path - logSizeLimit: 536870912 # Size in bytes after the log file rolls over to a new one - retention: - maxAge: 4320 # Maximum age of any message in the P-channel - maxBytes: # How many bytes the single P-channel may contain. Removing oldest messages if the P-channel exceeds this size - maxMsgs: # How many message the single P-channel may contain. Removing oldest messages if the P-channel exceeds this limit - # Related configuration of mixCoord mixCoord: enableActiveStandby: false diff --git a/go.mod b/go.mod index aa8aaa8e8a..0a4c49b033 100644 --- a/go.mod +++ b/go.mod @@ -189,17 +189,11 @@ require ( github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect - github.com/minio/highwayhash v1.0.2 // indirect github.com/minio/md5-simd v1.1.2 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mtibben/percent v0.2.1 // indirect - github.com/nats-io/jwt/v2 v2.5.5 // indirect - github.com/nats-io/nats-server/v2 v2.10.12 // indirect - github.com/nats-io/nats.go v1.34.1 // indirect - github.com/nats-io/nkeys v0.4.7 // indirect - github.com/nats-io/nuid v1.0.1 // indirect github.com/opencontainers/runtime-spec v1.0.2 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/panjf2000/ants/v2 v2.7.2 // indirect diff --git a/go.sum b/go.sum index 172a267cb0..29c7f8e762 100644 --- a/go.sum +++ b/go.sum @@ -738,20 +738,14 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20250318084424-114f4050c3a6 h1:YHMFI6L github.com/milvus-io/cgosymbolizer v0.0.0-20250318084424-114f4050c3a6/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4= -github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250325034212-6e98baa34971 h1:CKKrOtri+dbTUkMJehDuSM489OIqJab1t0pUq+PV73E= -github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250325034212-6e98baa34971/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250407030015-dcf7688ad54a h1:W+9nVXKcI9FdiyrFbrs9BIFUqRW0pLY+Fn0fsmmuLyw= github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250407030015-dcf7688ad54a/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= -github.com/milvus-io/milvus-proto/go-api/v2 v2.5.8 h1:/oUdiYtwVlqiEMSzME7vDvir49Lt23nMpaZC9u22bIo= -github.com/milvus-io/milvus-proto/go-api/v2 v2.5.8/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE= github.com/milvus-io/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY= github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI= github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE= -github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= -github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34= github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM= github.com/minio/minio-go/v7 v7.0.73 h1:qr2vi96Qm7kZ4v7LLebjte+MQh621fFWnv93p12htEo= @@ -781,17 +775,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/nacos-group/nacos-sdk-go v1.0.8/go.mod h1:hlAPn3UdzlxIlSILAyOXKxjFSvDJ9oLzTJ9hLAK1KzA= github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg= -github.com/nats-io/jwt/v2 v2.5.5 h1:ROfXb50elFq5c9+1ztaUbdlrArNFl2+fQWP6B8HGEq4= -github.com/nats-io/jwt/v2 v2.5.5/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A= -github.com/nats-io/nats-server/v2 v2.10.12 h1:G6u+RDrHkw4bkwn7I911O5jqys7jJVRY6MwgndyUsnE= -github.com/nats-io/nats-server/v2 v2.10.12/go.mod h1:H1n6zXtYLFCgXcf/SF8QNTSIFuS8tyZQMN9NguUHdEs= github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w= -github.com/nats-io/nats.go v1.34.1 h1:syWey5xaNHZgicYBemv0nohUPPmaLteiBEUT6Q5+F/4= -github.com/nats-io/nats.go v1.34.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= -github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= -github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= -github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nrwiersma/avro-benchmarks v0.0.0-20210913175520-21aec48c8f76/go.mod h1:iKyFMidsk/sVYONJRE372sJuX/QTRPacU7imPqqsu7g= @@ -1338,7 +1323,6 @@ golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/internal/util/dependency/factory.go b/internal/util/dependency/factory.go index 602c63b75d..8de4079783 100644 --- a/internal/util/dependency/factory.go +++ b/internal/util/dependency/factory.go @@ -17,7 +17,6 @@ import ( const ( mqTypeDefault = "default" - mqTypeNatsmq = "natsmq" mqTypeRocksmq = "rocksmq" mqTypeKafka = "kafka" mqTypePulsar = "pulsar" @@ -26,7 +25,6 @@ const ( type mqEnable struct { Rocksmq bool - Natsmq bool Pulsar bool Kafka bool Woodpecker bool @@ -67,8 +65,8 @@ func NewFactory(standAlone bool) *DefaultFactory { // Init create a msg factory(TODO only support one mq at the same time.) // In order to guarantee backward compatibility of config file, we still support multiple mq configs. // The initialization of MQ follows the following rules, if the mq.type is default. -// 1. standalone(local) mode: rocksmq(default) > natsmq > Pulsar > Kafka -// 2. cluster mode: Pulsar(default) > Kafka (rocksmq and natsmq is unsupported in cluster mode) +// 1. standalone(local) mode: rocksmq(default) > Pulsar > Kafka +// 2. cluster mode: Pulsar(default) > Kafka (rocksmq is unsupported in cluster mode) func (f *DefaultFactory) Init(params *paramtable.ComponentParam) { // skip if using default factory if f.msgStreamFactory != nil { @@ -84,13 +82,11 @@ func (f *DefaultFactory) Init(params *paramtable.ComponentParam) { } func (f *DefaultFactory) initMQ(standalone bool, params *paramtable.ComponentParam) error { - mqType := mustSelectMQType(standalone, params.MQCfg.Type.GetValue(), mqEnable{params.RocksmqEnable(), params.NatsmqEnable(), params.PulsarEnable(), params.KafkaEnable(), params.WoodpeckerEnable()}) + mqType := mustSelectMQType(standalone, params.MQCfg.Type.GetValue(), mqEnable{params.RocksmqEnable(), params.PulsarEnable(), params.KafkaEnable(), params.WoodpeckerEnable()}) metrics.RegisterMQType(mqType) log.Info("try to init mq", zap.Bool("standalone", standalone), zap.String("mqType", mqType)) switch mqType { - case mqTypeNatsmq: - f.msgStreamFactory = msgstream.NewNatsmqFactory() case mqTypeRocksmq: f.msgStreamFactory = msgstream.NewRocksmqFactory(params.RocksmqCfg.Path.GetValue(), ¶ms.ServiceParam) case mqTypePulsar: @@ -135,10 +131,10 @@ func mustSelectMQType(standalone bool, mqType string, enable mqEnable) string { // Validate mq type. func validateMQType(standalone bool, mqType string) error { - if mqType != mqTypeNatsmq && mqType != mqTypeRocksmq && mqType != mqTypeKafka && mqType != mqTypePulsar && mqType != mqTypeWoodpecker { + if mqType != mqTypeRocksmq && mqType != mqTypeKafka && mqType != mqTypePulsar && mqType != mqTypeWoodpecker { return errors.Newf("mq type %s is invalid", mqType) } - if !standalone && (mqType == mqTypeRocksmq || mqType == mqTypeNatsmq) { + if !standalone && mqType == mqTypeRocksmq { return errors.Newf("mq %s is only valid in standalone mode") } return nil @@ -169,9 +165,6 @@ type Factory interface { func HealthCheck(mqType string) *common.MQClusterStatus { clusterStatus := &common.MQClusterStatus{MqType: mqType} switch mqType { - case mqTypeNatsmq: - // TODO: implement health check for nats mq - clusterStatus.Health = true case mqTypeRocksmq: // TODO: implement health checker for rocks mq clusterStatus.Health = true diff --git a/internal/util/dependency/factory_test.go b/internal/util/dependency/factory_test.go index bc0dd5053c..b8ec7b8546 100644 --- a/internal/util/dependency/factory_test.go +++ b/internal/util/dependency/factory_test.go @@ -11,35 +11,32 @@ import ( func TestValidateMQType(t *testing.T) { assert.Error(t, validateMQType(true, mqTypeDefault)) assert.Error(t, validateMQType(false, mqTypeDefault)) - assert.Error(t, validateMQType(false, mqTypeNatsmq)) assert.Error(t, validateMQType(false, mqTypeRocksmq)) assert.NoError(t, validateMQType(true, mqTypeWoodpecker)) assert.NoError(t, validateMQType(false, mqTypeWoodpecker)) } func TestSelectMQType(t *testing.T) { - assert.Equal(t, mustSelectMQType(true, mqTypeDefault, mqEnable{true, true, true, true, true}), mqTypeRocksmq) - assert.Equal(t, mustSelectMQType(true, mqTypeDefault, mqEnable{false, true, true, true, true}), mqTypePulsar) - assert.Equal(t, mustSelectMQType(true, mqTypeDefault, mqEnable{false, false, true, true, true}), mqTypePulsar) - assert.Equal(t, mustSelectMQType(true, mqTypeDefault, mqEnable{false, false, false, true, true}), mqTypeKafka) - assert.Equal(t, mustSelectMQType(true, mqTypeDefault, mqEnable{false, false, false, false, true}), mqTypeWoodpecker) - assert.Panics(t, func() { mustSelectMQType(true, mqTypeDefault, mqEnable{false, false, false, false, false}) }) - assert.Equal(t, mustSelectMQType(false, mqTypeDefault, mqEnable{true, true, true, true, true}), mqTypePulsar) - assert.Equal(t, mustSelectMQType(false, mqTypeDefault, mqEnable{false, true, true, true, true}), mqTypePulsar) - assert.Equal(t, mustSelectMQType(false, mqTypeDefault, mqEnable{false, false, true, true, true}), mqTypePulsar) - assert.Equal(t, mustSelectMQType(false, mqTypeDefault, mqEnable{false, false, false, true, true}), mqTypeKafka) - assert.Equal(t, mustSelectMQType(false, mqTypeDefault, mqEnable{false, false, false, false, true}), mqTypeWoodpecker) - assert.Panics(t, func() { mustSelectMQType(false, mqTypeDefault, mqEnable{false, false, false, false, false}) }) - assert.Equal(t, mustSelectMQType(true, mqTypeRocksmq, mqEnable{true, true, true, true, true}), mqTypeRocksmq) - assert.Equal(t, mustSelectMQType(true, mqTypeNatsmq, mqEnable{true, true, true, true, true}), mqTypeNatsmq) - assert.Equal(t, mustSelectMQType(true, mqTypePulsar, mqEnable{true, true, true, true, true}), mqTypePulsar) - assert.Equal(t, mustSelectMQType(true, mqTypeKafka, mqEnable{true, true, true, true, true}), mqTypeKafka) - assert.Equal(t, mustSelectMQType(true, mqTypeWoodpecker, mqEnable{true, true, true, true, true}), mqTypeWoodpecker) - assert.Panics(t, func() { mustSelectMQType(false, mqTypeRocksmq, mqEnable{true, true, true, true, true}) }) - assert.Panics(t, func() { mustSelectMQType(false, mqTypeNatsmq, mqEnable{true, true, true, true, true}) }) - assert.Equal(t, mustSelectMQType(false, mqTypePulsar, mqEnable{true, true, true, true, true}), mqTypePulsar) - assert.Equal(t, mustSelectMQType(false, mqTypeKafka, mqEnable{true, true, true, true, true}), mqTypeKafka) - assert.Equal(t, mustSelectMQType(false, mqTypeWoodpecker, mqEnable{true, true, true, true, true}), mqTypeWoodpecker) + assert.Equal(t, mustSelectMQType(true, mqTypeDefault, mqEnable{true, true, true, true}), mqTypeRocksmq) + assert.Equal(t, mustSelectMQType(true, mqTypeDefault, mqEnable{false, true, true, true}), mqTypePulsar) + assert.Equal(t, mustSelectMQType(true, mqTypeDefault, mqEnable{false, true, true, true}), mqTypePulsar) + assert.Equal(t, mustSelectMQType(true, mqTypeDefault, mqEnable{false, false, true, true}), mqTypeKafka) + assert.Equal(t, mustSelectMQType(true, mqTypeDefault, mqEnable{false, false, false, true}), mqTypeWoodpecker) + assert.Panics(t, func() { mustSelectMQType(true, mqTypeDefault, mqEnable{false, false, false, false}) }) + assert.Equal(t, mustSelectMQType(false, mqTypeDefault, mqEnable{true, true, true, true}), mqTypePulsar) + assert.Equal(t, mustSelectMQType(false, mqTypeDefault, mqEnable{false, true, true, true}), mqTypePulsar) + assert.Equal(t, mustSelectMQType(false, mqTypeDefault, mqEnable{false, true, true, true}), mqTypePulsar) + assert.Equal(t, mustSelectMQType(false, mqTypeDefault, mqEnable{false, false, true, true}), mqTypeKafka) + assert.Equal(t, mustSelectMQType(false, mqTypeDefault, mqEnable{false, false, false, true}), mqTypeWoodpecker) + assert.Panics(t, func() { mustSelectMQType(false, mqTypeDefault, mqEnable{false, false, false, false}) }) + assert.Equal(t, mustSelectMQType(true, mqTypeRocksmq, mqEnable{true, true, true, true}), mqTypeRocksmq) + assert.Equal(t, mustSelectMQType(true, mqTypePulsar, mqEnable{true, true, true, true}), mqTypePulsar) + assert.Equal(t, mustSelectMQType(true, mqTypeKafka, mqEnable{true, true, true, true}), mqTypeKafka) + assert.Equal(t, mustSelectMQType(true, mqTypeWoodpecker, mqEnable{true, true, true, true}), mqTypeWoodpecker) + assert.Panics(t, func() { mustSelectMQType(false, mqTypeRocksmq, mqEnable{true, true, true, true}) }) + assert.Equal(t, mustSelectMQType(false, mqTypePulsar, mqEnable{true, true, true, true}), mqTypePulsar) + assert.Equal(t, mustSelectMQType(false, mqTypeKafka, mqEnable{true, true, true, true}), mqTypeKafka) + assert.Equal(t, mustSelectMQType(false, mqTypeWoodpecker, mqEnable{true, true, true, true}), mqTypeWoodpecker) } func TestHealthCheck(t *testing.T) { @@ -53,7 +50,6 @@ func TestHealthCheck(t *testing.T) { mqType string health bool }{ - {mqTypeNatsmq, true}, {mqTypeRocksmq, true}, {mqTypePulsar, false}, {mqTypeKafka, false}, diff --git a/pkg/go.mod b/pkg/go.mod index 7a5b204b7e..92eefe7b96 100644 --- a/pkg/go.mod +++ b/pkg/go.mod @@ -22,8 +22,6 @@ require ( github.com/klauspost/compress v1.17.9 github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250325034212-6e98baa34971 github.com/minio/minio-go/v7 v7.0.73 - github.com/nats-io/nats-server/v2 v2.10.12 - github.com/nats-io/nats.go v1.34.1 github.com/panjf2000/ants/v2 v2.7.2 github.com/prometheus/client_golang v1.14.0 github.com/quasilyte/go-ruleguard/dsl v0.3.22 @@ -146,14 +144,10 @@ require ( github.com/mattn/go-isatty v0.0.19 // indirect github.com/mattn/go-runewidth v0.0.8 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect - github.com/minio/highwayhash v1.0.2 // indirect github.com/minio/md5-simd v1.1.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mtibben/percent v0.2.1 // indirect - github.com/nats-io/jwt/v2 v2.5.5 // indirect - github.com/nats-io/nkeys v0.4.7 // indirect - github.com/nats-io/nuid v1.0.1 // indirect github.com/opencontainers/runtime-spec v1.0.2 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/pierrec/lz4 v2.5.2+incompatible // indirect diff --git a/pkg/go.sum b/pkg/go.sum index 0e81c39a64..f9e4f17b3c 100644 --- a/pkg/go.sum +++ b/pkg/go.sum @@ -562,8 +562,6 @@ github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250325034212-6e98baa github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250325034212-6e98baa34971/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE= github.com/milvus-io/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk= -github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= -github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34= github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM= github.com/minio/minio-go/v7 v7.0.73 h1:qr2vi96Qm7kZ4v7LLebjte+MQh621fFWnv93p12htEo= @@ -589,17 +587,8 @@ github.com/mtibben/percent v0.2.1/go.mod h1:KG9uO+SZkUp+VkRHsCdYQV3XSZrrSpR3O9ib github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg= -github.com/nats-io/jwt/v2 v2.5.5 h1:ROfXb50elFq5c9+1ztaUbdlrArNFl2+fQWP6B8HGEq4= -github.com/nats-io/jwt/v2 v2.5.5/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A= -github.com/nats-io/nats-server/v2 v2.10.12 h1:G6u+RDrHkw4bkwn7I911O5jqys7jJVRY6MwgndyUsnE= -github.com/nats-io/nats-server/v2 v2.10.12/go.mod h1:H1n6zXtYLFCgXcf/SF8QNTSIFuS8tyZQMN9NguUHdEs= github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w= -github.com/nats-io/nats.go v1.34.1 h1:syWey5xaNHZgicYBemv0nohUPPmaLteiBEUT6Q5+F/4= -github.com/nats-io/nats.go v1.34.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= -github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= -github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= -github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nrwiersma/avro-benchmarks v0.0.0-20210913175520-21aec48c8f76/go.mod h1:iKyFMidsk/sVYONJRE372sJuX/QTRPacU7imPqqsu7g= @@ -1060,7 +1049,6 @@ golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/pkg/mq/msgstream/factory_test.go b/pkg/mq/msgstream/factory_test.go index 72bcee3eb0..01cf40a003 100644 --- a/pkg/mq/msgstream/factory_test.go +++ b/pkg/mq/msgstream/factory_test.go @@ -18,54 +18,16 @@ package msgstream import ( "context" - "os" "testing" "github.com/stretchr/testify/assert" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus/pkg/v2/mq/msgstream/mqwrapper" - "github.com/milvus-io/milvus/pkg/v2/mq/msgstream/mqwrapper/nmq" - "github.com/milvus-io/milvus/pkg/v2/util/paramtable" ) type positionGenerator func(channelName string, timestamp uint64, msgGroup string, targetMsgIDs []uint64) []*msgpb.MsgPosition -func TestNmq(t *testing.T) { - storeDir, err := os.MkdirTemp("", "milvus_mq_nmq") - assert.NoError(t, err) - defer os.RemoveAll(storeDir) - - paramtable.Init() - cfg := nmq.ParseServerOption(paramtable.Get()) - cfg.Opts.StoreDir = storeDir - nmq.MustInitNatsMQ(cfg) - defer nmq.CloseNatsMQ() - - f1 := NewNatsmqFactory() - f2 := NewNatsmqFactory() - - client, err := nmq.NewClientWithDefaultOptions(context.Background()) - if err != nil { - panic(err) - } - - testMQ(t, client, []Factory{f1, f2}, func(channelName string, timestamp uint64, msgGroup string, targetMsgIDs []uint64) []*msgpb.MsgPosition { - result := make([]*msgpb.MsgPosition, 0, len(targetMsgIDs)) - - for _, targetMsgID := range targetMsgIDs { - msgID := nmq.NewNmqID(targetMsgID).Serialize() - result = append(result, &msgpb.MsgPosition{ - ChannelName: channelName, - Timestamp: timestamp, - MsgGroup: msgGroup, - MsgID: msgID, - }) - } - return result - }) -} - func testMQ(t *testing.T, client mqwrapper.Client, factories []Factory, pg positionGenerator) { testStreamOperation(t, client) testFactoryCommonOperation(t, factories[0]) diff --git a/pkg/mq/msgstream/mq_factory.go b/pkg/mq/msgstream/mq_factory.go index 9fc19c60e8..ec7e19c799 100644 --- a/pkg/mq/msgstream/mq_factory.go +++ b/pkg/mq/msgstream/mq_factory.go @@ -33,7 +33,6 @@ import ( "github.com/milvus-io/milvus/pkg/v2/mq/common" "github.com/milvus-io/milvus/pkg/v2/mq/mqimpl/rocksmq/server" kafkawrapper "github.com/milvus-io/milvus/pkg/v2/mq/msgstream/mqwrapper/kafka" - "github.com/milvus-io/milvus/pkg/v2/mq/msgstream/mqwrapper/nmq" pulsarmqwrapper "github.com/milvus-io/milvus/pkg/v2/mq/msgstream/mqwrapper/pulsar" "github.com/milvus-io/milvus/pkg/v2/mq/msgstream/mqwrapper/rmq" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" @@ -223,19 +222,6 @@ func NewKmsFactory(config *paramtable.ServiceParam) Factory { return f } -// NewNatsmqFactory create a new nats-mq factory. -func NewNatsmqFactory() Factory { - paramtable.Init() - paramtable := paramtable.Get() - nmq.MustInitNatsMQ(nmq.ParseServerOption(paramtable)) - return &CommonFactory{ - Newer: nmq.NewClientWithDefaultOptions, - DispatcherFactory: ProtoUDFactory{}, - ReceiveBufSize: paramtable.MQCfg.ReceiveBufSize.GetAsInt64(), - MQBufSize: paramtable.MQCfg.MQBufSize.GetAsInt64(), - } -} - // NewRocksmqFactory creates a new message stream factory based on rocksmq. func NewRocksmqFactory(path string, cfg *paramtable.ServiceParam) Factory { if err := server.InitRocksMQ(path); err != nil { diff --git a/pkg/mq/msgstream/mqwrapper/nmq/nmq_client.go b/pkg/mq/msgstream/mqwrapper/nmq/nmq_client.go deleted file mode 100644 index 4f81f3566a..0000000000 --- a/pkg/mq/msgstream/mqwrapper/nmq/nmq_client.go +++ /dev/null @@ -1,201 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package nmq - -import ( - "context" - "fmt" - "net" - "strconv" - "time" - - "github.com/cockroachdb/errors" - "github.com/nats-io/nats.go" - - "github.com/milvus-io/milvus/pkg/v2/metrics" - "github.com/milvus-io/milvus/pkg/v2/mq/common" - "github.com/milvus-io/milvus/pkg/v2/mq/msgstream/mqwrapper" - "github.com/milvus-io/milvus/pkg/v2/util/paramtable" - "github.com/milvus-io/milvus/pkg/v2/util/timerecord" -) - -// nmqClient implements mqwrapper.Client. -var _ mqwrapper.Client = &nmqClient{} - -// nmqClient contains a natsmq client -type nmqClient struct { - conn *nats.Conn -} - -type nmqDialer struct { - ctx func() context.Context -} - -func (d *nmqDialer) Dial(network, address string) (net.Conn, error) { - ctx := d.ctx() - - dial := &net.Dialer{} - - // keep default 2s timeout - if _, ok := ctx.Deadline(); !ok { - dial.Timeout = 2 * time.Second - } - - return dial.DialContext(ctx, network, address) -} - -// NewClientWithDefaultOptions returns a new NMQ client with default options. -// It retrieves the NMQ client URL from the server configuration. -func NewClientWithDefaultOptions(ctx context.Context) (mqwrapper.Client, error) { - url := Nmq.ClientURL() - - opt := nats.SetCustomDialer(&nmqDialer{ - ctx: func() context.Context { return ctx }, - }) - - return NewClient(url, opt) -} - -// NewClient returns a new nmqClient object -func NewClient(url string, options ...nats.Option) (*nmqClient, error) { - c, err := nats.Connect(url, options...) - if err != nil { - return nil, errors.Wrap(err, "failed to set nmq client") - } - return &nmqClient{conn: c}, nil -} - -// CreateProducer creates a producer for natsmq client -func (nc *nmqClient) CreateProducer(ctx context.Context, options common.ProducerOptions) (mqwrapper.Producer, error) { - start := timerecord.NewTimeRecorder("create producer") - metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateProducerLabel, metrics.TotalLabel).Inc() - - // TODO: inject jetstream options. - js, err := nc.conn.JetStream() - if err != nil { - metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateProducerLabel, metrics.FailLabel).Inc() - return nil, errors.Wrap(err, "failed to create jetstream context") - } - // TODO: (1) investigate on performance of multiple streams vs multiple topics. - // (2) investigate if we should have topics under the same stream. - - _, err = js.AddStream(&nats.StreamConfig{ - Name: options.Topic, - Subjects: []string{options.Topic}, - MaxAge: paramtable.Get().NatsmqCfg.ServerRetentionMaxAge.GetAsDuration(time.Minute), - MaxBytes: paramtable.Get().NatsmqCfg.ServerRetentionMaxBytes.GetAsInt64(), - MaxMsgs: paramtable.Get().NatsmqCfg.ServerRetentionMaxMsgs.GetAsInt64(), - }) - if err != nil { - metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateProducerLabel, metrics.FailLabel).Inc() - return nil, errors.Wrap(err, "failed to add/connect to jetstream for producer") - } - rp := nmqProducer{js: js, topic: options.Topic} - - elapsed := start.ElapseSpan() - metrics.MsgStreamRequestLatency.WithLabelValues(metrics.CreateProducerLabel).Observe(float64(elapsed.Milliseconds())) - metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateProducerLabel, metrics.SuccessLabel).Inc() - return &rp, nil -} - -func (nc *nmqClient) Subscribe(ctx context.Context, options mqwrapper.ConsumerOptions) (mqwrapper.Consumer, error) { - start := timerecord.NewTimeRecorder("create consumer") - metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateConsumerLabel, metrics.TotalLabel).Inc() - - if options.Topic == "" { - metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateConsumerLabel, metrics.FailLabel).Inc() - return nil, errors.New("invalid consumer config: empty topic") - } - - if options.SubscriptionName == "" { - metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateConsumerLabel, metrics.FailLabel).Inc() - return nil, errors.New("invalid consumer config: empty subscription name") - } - // TODO: inject jetstream options. - js, err := nc.conn.JetStream() - if err != nil { - metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateConsumerLabel, metrics.FailLabel).Inc() - return nil, errors.Wrap(err, "failed to create jetstream context") - } - // TODO: do we allow passing in an existing natsChan from options? - // also, revisit the size or make it a user param - natsChan := make(chan *nats.Msg, options.BufSize) - // TODO: should we allow subscribe to a topic that doesn't exist yet? Current logic allows it. - _, err = js.AddStream(&nats.StreamConfig{ - Name: options.Topic, - Subjects: []string{options.Topic}, - MaxAge: paramtable.Get().NatsmqCfg.ServerRetentionMaxAge.GetAsDuration(time.Minute), - MaxBytes: paramtable.Get().NatsmqCfg.ServerRetentionMaxBytes.GetAsInt64(), - MaxMsgs: paramtable.Get().NatsmqCfg.ServerRetentionMaxMsgs.GetAsInt64(), - }) - if err != nil { - metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateConsumerLabel, metrics.FailLabel).Inc() - return nil, errors.Wrap(err, "failed to add/connect to jetstream for consumer") - } - closeChan := make(chan struct{}) - - var sub *nats.Subscription - position := options.SubscriptionInitialPosition - // TODO: should we only allow exclusive subscribe? Current logic allows double subscribe. - switch position { - case common.SubscriptionPositionLatest: - sub, err = js.ChanSubscribe(options.Topic, natsChan, nats.DeliverNew()) - case common.SubscriptionPositionEarliest: - sub, err = js.ChanSubscribe(options.Topic, natsChan, nats.DeliverAll()) - } - if err != nil { - metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateConsumerLabel, metrics.FailLabel).Inc() - return nil, errors.Wrap(err, fmt.Sprintf("failed to get consumer info, subscribe position: %d", position)) - } - - elapsed := start.ElapseSpan() - metrics.MsgStreamRequestLatency.WithLabelValues(metrics.CreateConsumerLabel).Observe(float64(elapsed.Milliseconds())) - metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateConsumerLabel, metrics.SuccessLabel).Inc() - return &Consumer{ - js: js, - sub: sub, - topic: options.Topic, - groupName: options.SubscriptionName, - options: options, - natsChan: natsChan, - closeChan: closeChan, - }, nil -} - -// EarliestMessageID returns the earliest message ID for nmq client -func (nc *nmqClient) EarliestMessageID() common.MessageID { - return &nmqID{messageID: 1} -} - -// StringToMsgID converts string id to MessageID -func (nc *nmqClient) StringToMsgID(id string) (common.MessageID, error) { - rID, err := strconv.ParseUint(id, 10, 64) - if err != nil { - return nil, errors.Wrap(err, "failed to parse string to MessageID") - } - return &nmqID{messageID: rID}, nil -} - -// BytesToMsgID converts a byte array to messageID -func (nc *nmqClient) BytesToMsgID(id []byte) (common.MessageID, error) { - rID := DeserializeNmqID(id) - return &nmqID{messageID: rID}, nil -} - -func (nc *nmqClient) Close() { - nc.conn.Close() -} diff --git a/pkg/mq/msgstream/mqwrapper/nmq/nmq_client_test.go b/pkg/mq/msgstream/mqwrapper/nmq/nmq_client_test.go deleted file mode 100644 index ad3e4f23c7..0000000000 --- a/pkg/mq/msgstream/mqwrapper/nmq/nmq_client_test.go +++ /dev/null @@ -1,275 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package nmq - -import ( - "context" - "fmt" - "math/rand" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/milvus-io/milvus/pkg/v2/mq/common" - "github.com/milvus-io/milvus/pkg/v2/mq/msgstream/mqwrapper" -) - -func createNmqClient() (*nmqClient, error) { - return NewClient(natsServerAddress) -} - -func Test_NewNmqClient(t *testing.T) { - client, err := createNmqClient() - assert.NoError(t, err) - assert.NotNil(t, client) - client.Close() - - tests := []struct { - description string - withTimeout bool - ctxTimeouted bool - expectErr bool - }{ - {"without context", false, false, false}, - {"without timeout context, no timeout", true, false, false}, - {"without timeout context, timeout", true, true, true}, - } - - for _, test := range tests { - t.Run(test.description, func(t *testing.T) { - ctx := context.Background() - var cancel context.CancelFunc - if test.withTimeout { - ctx, cancel = context.WithTimeout(ctx, time.Second) - if test.ctxTimeouted { - cancel() - } else { - defer cancel() - } - } - - client, err := NewClientWithDefaultOptions(ctx) - - if test.expectErr { - assert.Error(t, err) - assert.Nil(t, client) - } else { - assert.NoError(t, err) - assert.NotNil(t, client) - client.Close() - } - }) - } -} - -func TestNmqClient_CreateProducer(t *testing.T) { - client, err := createNmqClient() - assert.NoError(t, err) - assert.NotNil(t, client) - defer client.Close() - - topic := "TestNmqClient_CreateProducer" - proOpts := common.ProducerOptions{Topic: topic} - producer, err := client.CreateProducer(context.TODO(), proOpts) - assert.NoError(t, err) - assert.NotNil(t, producer) - defer producer.Close() - - nmqProducer := producer.(*nmqProducer) - assert.Equal(t, nmqProducer.Topic(), topic) - - msg := &common.ProducerMessage{ - Payload: []byte{}, - Properties: nil, - } - _, err = nmqProducer.Send(context.TODO(), msg) - assert.NoError(t, err) - - invalidOpts := common.ProducerOptions{Topic: ""} - producer, e := client.CreateProducer(context.TODO(), invalidOpts) - assert.Nil(t, producer) - assert.Error(t, e) -} - -func TestNmqClient_GetLatestMsg(t *testing.T) { - client, err := createNmqClient() - assert.NoError(t, err) - defer client.Close() - - topic := fmt.Sprintf("t2GetLatestMsg-%d", rand.Int()) - proOpts := common.ProducerOptions{Topic: topic} - producer, err := client.CreateProducer(context.TODO(), proOpts) - assert.NoError(t, err) - defer producer.Close() - - for i := 0; i < 10; i++ { - msg := &common.ProducerMessage{ - Payload: []byte{byte(i)}, - Properties: nil, - } - _, err = producer.Send(context.TODO(), msg) - assert.NoError(t, err) - } - - subName := "subName" - consumerOpts := mqwrapper.ConsumerOptions{ - Topic: topic, - SubscriptionName: subName, - SubscriptionInitialPosition: common.SubscriptionPositionEarliest, - BufSize: 1024, - } - - consumer, err := client.Subscribe(context.TODO(), consumerOpts) - assert.NoError(t, err) - - expectLastMsg, err := consumer.GetLatestMsgID() - assert.NoError(t, err) - - var actualLastMsg common.Message - ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) - defer cancel() - for i := 0; i < 10; i++ { - select { - case <-ctx.Done(): - fmt.Println(i) - assert.FailNow(t, "consumer failed to yield message in 100 milliseconds") - case msg := <-consumer.Chan(): - consumer.Ack(msg) - actualLastMsg = msg - } - } - require.NotNil(t, actualLastMsg) - ret, err := expectLastMsg.LessOrEqualThan(actualLastMsg.ID().Serialize()) - assert.NoError(t, err) - assert.True(t, ret) -} - -func TestNmqClient_IllegalSubscribe(t *testing.T) { - client, err := createNmqClient() - assert.NoError(t, err) - assert.NotNil(t, client) - defer client.Close() - - sub, err := client.Subscribe(context.TODO(), mqwrapper.ConsumerOptions{ - Topic: "", - }) - assert.Nil(t, sub) - assert.Error(t, err) - - sub, err = client.Subscribe(context.TODO(), mqwrapper.ConsumerOptions{ - Topic: "123", - SubscriptionName: "", - }) - assert.Nil(t, sub) - assert.Error(t, err) -} - -func TestNmqClient_Subscribe(t *testing.T) { - client, err := createNmqClient() - assert.NoError(t, err) - assert.NotNil(t, client) - defer client.Close() - - topic := "TestNmqClient_Subscribe" - proOpts := common.ProducerOptions{Topic: topic} - producer, err := client.CreateProducer(context.TODO(), proOpts) - assert.NoError(t, err) - assert.NotNil(t, producer) - defer producer.Close() - - subName := "subName" - consumerOpts := mqwrapper.ConsumerOptions{ - Topic: "", - SubscriptionName: subName, - SubscriptionInitialPosition: common.SubscriptionPositionEarliest, - BufSize: 1024, - } - - consumer, err := client.Subscribe(context.TODO(), consumerOpts) - assert.Error(t, err) - assert.Nil(t, consumer) - - consumerOpts.Topic = topic - consumer, err = client.Subscribe(context.TODO(), consumerOpts) - assert.NoError(t, err) - assert.NotNil(t, consumer) - defer consumer.Close() - assert.Equal(t, consumer.Subscription(), subName) - - msg := &common.ProducerMessage{ - Payload: []byte{1}, - Properties: nil, - } - _, err = producer.Send(context.TODO(), msg) - assert.NoError(t, err) - - ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) - defer cancel() - select { - case <-ctx.Done(): - assert.FailNow(t, "consumer failed to yield message in 100 milliseconds") - case msg := <-consumer.Chan(): - consumer.Ack(msg) - nmqmsg := msg.(*nmqMessage) - msgPayload := nmqmsg.Payload() - assert.NotEmpty(t, msgPayload) - msgTopic := nmqmsg.Topic() - assert.Equal(t, msgTopic, topic) - msgProp := nmqmsg.Properties() - assert.Empty(t, msgProp) - msgID := nmqmsg.ID() - rID := msgID.(*nmqID) - assert.Equal(t, rID.messageID, MessageIDType(1)) - } -} - -func TestNmqClient_EarliestMessageID(t *testing.T) { - client, _ := createNmqClient() - defer client.Close() - - mid := client.EarliestMessageID() - assert.NotNil(t, mid) - nmqmsg := mid.(*nmqID) - assert.Equal(t, nmqmsg.messageID, MessageIDType(1)) -} - -func TestNmqClient_StringToMsgID(t *testing.T) { - client, _ := createNmqClient() - defer client.Close() - - str := "5" - res, err := client.StringToMsgID(str) - assert.NoError(t, err) - assert.NotNil(t, res) - - str = "X" - res, err = client.StringToMsgID(str) - assert.Nil(t, res) - assert.Error(t, err) -} - -func TestNmqClient_BytesToMsgID(t *testing.T) { - client, _ := createNmqClient() - defer client.Close() - - mid := client.EarliestMessageID() - res, err := client.BytesToMsgID(mid.Serialize()) - assert.NoError(t, err) - assert.NotNil(t, res) -} diff --git a/pkg/mq/msgstream/mqwrapper/nmq/nmq_consumer.go b/pkg/mq/msgstream/mqwrapper/nmq/nmq_consumer.go deleted file mode 100644 index 015106af08..0000000000 --- a/pkg/mq/msgstream/mqwrapper/nmq/nmq_consumer.go +++ /dev/null @@ -1,183 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package nmq - -import ( - "fmt" - "sync" - - "github.com/cockroachdb/errors" - "github.com/nats-io/nats.go" - "go.uber.org/zap" - - "github.com/milvus-io/milvus/pkg/v2/log" - "github.com/milvus-io/milvus/pkg/v2/mq/common" - "github.com/milvus-io/milvus/pkg/v2/mq/msgstream/mqwrapper" - "github.com/milvus-io/milvus/pkg/v2/util/merr" -) - -// Consumer is a client that used to consume messages from natsmq -type Consumer struct { - options mqwrapper.ConsumerOptions - js nats.JetStreamContext - sub *nats.Subscription - topic string - groupName string - natsChan chan *nats.Msg - msgChan chan common.Message - closeChan chan struct{} - once sync.Once - closeOnce sync.Once - skip bool - wg sync.WaitGroup -} - -// Subscription returns the subscription name of this consumer -func (nc *Consumer) Subscription() string { - return nc.groupName -} - -// Chan returns a channel to read messages from natsmq -func (nc *Consumer) Chan() <-chan common.Message { - if err := nc.closed(); err != nil { - panic(err) - } - - if nc.sub == nil { - log.Error("accessing Chan of an uninitialized subscription.", zap.String("topic", nc.topic), zap.String("groupName", nc.groupName)) - panic("failed to chan a consumer without assign") - } - if nc.msgChan == nil { - nc.once.Do(func() { - nc.msgChan = make(chan common.Message, 256) - nc.wg.Add(1) - go func() { - defer nc.wg.Done() - for { - select { - case msg := <-nc.natsChan: - if nc.skip { - nc.skip = false - continue - } - nc.msgChan <- &nmqMessage{ - raw: msg, - } - case <-nc.closeChan: - log.Info("close nmq consumer ", zap.String("topic", nc.topic), zap.String("groupName", nc.groupName)) - close(nc.msgChan) - return - } - } - }() - }) - } - return nc.msgChan -} - -// Seek is used to seek the position in natsmq topic -func (nc *Consumer) Seek(id common.MessageID, inclusive bool) error { - if err := nc.closed(); err != nil { - return err - } - if nc.sub != nil { - return errors.New("can not seek() on an initilized consumer") - } - if nc.msgChan != nil { - return errors.New("Seek should be called before Chan") - } - log.Info("Seek is called", zap.String("topic", nc.topic), zap.Any("id", id)) - msgID := id.(*nmqID).messageID - // skip the first message when consume - nc.skip = !inclusive - var err error - nc.sub, err = nc.js.ChanSubscribe(nc.topic, nc.natsChan, nats.StartSequence(msgID)) - if err != nil { - log.Warn("fail to Seek", zap.Error(err)) - } - return err -} - -// Ack is used to ask a natsmq message -func (nc *Consumer) Ack(message common.Message) { - if err := message.(*nmqMessage).raw.Ack(); err != nil { - log.Warn("failed to ack message of nmq", zap.String("topic", message.Topic()), zap.Reflect("msgID", message.ID())) - } -} - -// Close is used to free the resources of this consumer -func (nc *Consumer) Close() { - nc.closeOnce.Do(func() { - if nc.sub == nil { - return - } - if err := nc.sub.Unsubscribe(); err != nil { - log.Warn("failed to unsubscribe subscription of nmq", zap.String("topic", nc.topic), zap.Error(err)) - } - close(nc.closeChan) - nc.wg.Wait() - }) -} - -// GetLatestMsgID returns the ID of the most recent message processed by the consumer. -func (nc *Consumer) GetLatestMsgID() (common.MessageID, error) { - if err := nc.closed(); err != nil { - return nil, err - } - - info, err := nc.js.StreamInfo(nc.topic) - if err != nil { - log.Warn("fail to get stream info of nats", zap.String("topic", nc.topic), zap.Error(err)) - return nil, errors.Wrap(err, "failed to get stream info of nats jetstream") - } - msgID := info.State.LastSeq - return &nmqID{messageID: msgID}, nil -} - -// CheckTopicValid verifies if the given topic is valid for this consumer. -// 1. topic should exist. -func (nc *Consumer) CheckTopicValid(topic string) error { - if err := nc.closed(); err != nil { - return err - } - - // A consumer is tied to a topic. In a multi-tenant situation, - // a consumer is not supposed to check on other topics. - if topic != nc.topic { - return fmt.Errorf("consumer of topic %s checking validness of topic %s", nc.topic, topic) - } - - // check if topic valid or exist. - _, err := nc.js.StreamInfo(topic) - if errors.Is(err, nats.ErrStreamNotFound) { - return merr.WrapErrMqTopicNotFound(topic, err.Error()) - } else if err != nil { - log.Warn("fail to get stream info of nats", zap.String("topic", nc.topic), zap.Error(err)) - return errors.Wrap(err, "failed to get stream info of nats jetstream") - } - return nil -} - -// Closed check if Consumer is closed. -func (nc *Consumer) closed() error { - select { - case <-nc.closeChan: - return errors.Newf("Closed Nmq Consumer, topic: %s, subscription name:", nc.topic, nc.groupName) - default: - return nil - } -} diff --git a/pkg/mq/msgstream/mqwrapper/nmq/nmq_consumer_test.go b/pkg/mq/msgstream/mqwrapper/nmq/nmq_consumer_test.go deleted file mode 100644 index 08cd68545a..0000000000 --- a/pkg/mq/msgstream/mqwrapper/nmq/nmq_consumer_test.go +++ /dev/null @@ -1,452 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package nmq - -import ( - "context" - "reflect" - "strings" - "testing" - "time" - - "github.com/stretchr/testify/assert" - - "github.com/milvus-io/milvus/pkg/v2/mq/common" - "github.com/milvus-io/milvus/pkg/v2/mq/msgstream/mqwrapper" -) - -func TestNatsConsumer_Subscription(t *testing.T) { - client, err := createNmqClient() - assert.NoError(t, err) - defer client.Close() - - topic := t.Name() - proOpts := common.ProducerOptions{Topic: topic} - _, err = client.CreateProducer(context.TODO(), proOpts) - assert.NoError(t, err) - - consumer, err := client.Subscribe(context.TODO(), mqwrapper.ConsumerOptions{ - Topic: topic, - SubscriptionName: topic, - SubscriptionInitialPosition: common.SubscriptionPositionEarliest, - BufSize: 1024, - }) - assert.NoError(t, err) - assert.NotNil(t, consumer) - defer consumer.Close() - - str := consumer.Subscription() - assert.NotNil(t, str) -} - -func Test_GetEarliestMessageID(t *testing.T) { - client, err := createNmqClient() - assert.NoError(t, err) - defer client.Close() - mid := client.EarliestMessageID() - - assert.NotNil(t, mid) - assert.Equal(t, mid.(*nmqID).messageID, MessageIDType(1)) -} - -func Test_BadLatestMessageID(t *testing.T) { - topic := t.Name() - client, err := createNmqClient() - assert.NoError(t, err) - defer client.Close() - - consumer, err := client.Subscribe(context.TODO(), mqwrapper.ConsumerOptions{ - Topic: topic, - SubscriptionName: topic, - SubscriptionInitialPosition: common.SubscriptionPositionEarliest, - BufSize: 1024, - }) - assert.NoError(t, err) - consumer.Close() - id, err := consumer.GetLatestMsgID() - assert.Nil(t, id) - assert.Error(t, err) -} - -func TestComsumeMessage(t *testing.T) { - client, err := createNmqClient() - assert.NoError(t, err) - defer client.Close() - - topic := t.Name() - p, err := client.CreateProducer(context.TODO(), common.ProducerOptions{Topic: topic}) - assert.NoError(t, err) - - c, err := client.Subscribe(context.TODO(), mqwrapper.ConsumerOptions{ - Topic: topic, - SubscriptionName: topic, - SubscriptionInitialPosition: common.SubscriptionPositionEarliest, - BufSize: 1024, - }) - assert.NoError(t, err) - defer c.Close() - - msg := []byte("test the first message") - prop := map[string]string{"k1": "v1", "k2": "v2"} - _, err = p.Send(context.Background(), &common.ProducerMessage{ - Payload: msg, - Properties: prop, - }) - assert.NoError(t, err) - recvMsg, err := c.(*Consumer).sub.NextMsg(1 * time.Second) - assert.NoError(t, err) - recvMsg.Ack() - assert.NoError(t, err) - assert.Equal(t, msg, recvMsg.Data) - properties := make(map[string]string) - for k, vs := range recvMsg.Header { - if len(vs) > 0 { - properties[k] = vs[0] - } - } - assert.True(t, reflect.DeepEqual(prop, properties)) - - msg2 := []byte("test the second message") - prop2 := map[string]string{"k1": "v3", "k4": "v4"} - _, err = p.Send(context.Background(), &common.ProducerMessage{ - Payload: msg2, - Properties: prop2, - }) - assert.NoError(t, err) - recvMsg, err = c.(*Consumer).sub.NextMsg(1 * time.Second) - assert.NoError(t, err) - recvMsg.Ack() - assert.Equal(t, msg2, recvMsg.Data) - properties = make(map[string]string) - for k, vs := range recvMsg.Header { - if len(vs) > 0 { - properties[k] = vs[0] - } - } - assert.True(t, reflect.DeepEqual(prop2, properties)) - - assert.NoError(t, err) - assert.NotNil(t, c) -} - -func TestNatsConsumer_Close(t *testing.T) { - client, err := createNmqClient() - assert.NoError(t, err) - defer client.Close() - - topic := t.Name() - c, err := client.Subscribe(context.TODO(), mqwrapper.ConsumerOptions{ - Topic: topic, - SubscriptionName: topic, - SubscriptionInitialPosition: common.SubscriptionPositionEarliest, - BufSize: 1024, - }) - assert.NoError(t, err) - assert.NotNil(t, c) - - str := c.Subscription() - assert.NotNil(t, str) - - c.Close() - - // Disallow double close. - assert.Panics(t, func() { c.Chan() }) - assert.Error(t, c.Seek(NewNmqID(1), false)) - assert.Error(t, func() error { _, err := c.GetLatestMsgID(); return err }()) - - c.Close() // Allow double close, nothing happened. -} - -func TestNatsClientErrorOnUnsubscribeTwice(t *testing.T) { - topic := t.Name() - client, err := createNmqClient() - assert.NoError(t, err) - defer client.Close() - - consumer, err := client.Subscribe(context.TODO(), mqwrapper.ConsumerOptions{ - Topic: topic, - SubscriptionName: topic, - SubscriptionInitialPosition: common.SubscriptionPositionEarliest, - BufSize: 1024, - }) - assert.NoError(t, err) - defer consumer.Close() - - err = consumer.(*Consumer).sub.Unsubscribe() - assert.NoError(t, err) - err = consumer.(*Consumer).sub.Unsubscribe() - assert.True(t, strings.Contains(err.Error(), "invalid subscription")) - t.Log(err) -} - -func TestCheckTopicValid(t *testing.T) { - client, err := createNmqClient() - assert.NoError(t, err) - defer client.Close() - - topic := t.Name() - consumer, err := client.Subscribe(context.TODO(), mqwrapper.ConsumerOptions{ - Topic: topic, - SubscriptionName: topic, - SubscriptionInitialPosition: common.SubscriptionPositionEarliest, - BufSize: 1024, - }) - assert.NoError(t, err) - assert.NotNil(t, consumer) - - str := consumer.Subscription() - assert.NotNil(t, str) - - // empty topic should pass - err = consumer.CheckTopicValid(topic) - assert.NoError(t, err) - - // Not allowed to check other topic's validness. - err = consumer.CheckTopicValid("BadTopic") - assert.Error(t, err) - - // not empty topic can pass - pub, err := client.CreateProducer(context.TODO(), common.ProducerOptions{ - Topic: topic, - }) - assert.NoError(t, err) - _, err = pub.Send(context.TODO(), &common.ProducerMessage{ - Payload: []byte("123123123"), - }) - assert.NoError(t, err) - - err = consumer.CheckTopicValid(topic) - assert.NoError(t, err) - - consumer.Close() - err = consumer.CheckTopicValid(topic) - assert.Error(t, err) -} - -func newTestConsumer(t *testing.T, topic string, position common.SubscriptionInitialPosition) (mqwrapper.Consumer, error) { - client, err := createNmqClient() - assert.NoError(t, err) - return client.Subscribe(context.TODO(), mqwrapper.ConsumerOptions{ - Topic: topic, - SubscriptionName: topic, - SubscriptionInitialPosition: position, - BufSize: 1024, - }) -} - -func newProducer(t *testing.T, topic string) (*nmqClient, mqwrapper.Producer) { - client, err := createNmqClient() - assert.NoError(t, err) - producer, err := client.CreateProducer(context.TODO(), common.ProducerOptions{Topic: topic}) - assert.NoError(t, err) - return client, producer -} - -func process(t *testing.T, msgs []string, p mqwrapper.Producer) { - for _, msg := range msgs { - _, err := p.Send(context.Background(), &common.ProducerMessage{ - Payload: []byte(msg), - Properties: map[string]string{}, - }) - assert.NoError(t, err) - } -} - -func TestNmqConsumer_GetLatestMsgID(t *testing.T) { - client, err := createNmqClient() - assert.NoError(t, err) - defer client.Close() - - topic := t.Name() - p, err := client.CreateProducer(context.TODO(), common.ProducerOptions{Topic: topic}) - assert.NoError(t, err) - - c, err := client.Subscribe(context.TODO(), mqwrapper.ConsumerOptions{ - Topic: topic, - SubscriptionName: topic, - SubscriptionInitialPosition: common.SubscriptionPositionEarliest, - BufSize: 1024, - }) - assert.NoError(t, err) - defer c.Close() - - latestMsgID, err := c.GetLatestMsgID() - assert.NoError(t, err) - - msgs := []string{"111", "222", "333", "444", "555"} - newMessageID := latestMsgID.(*nmqID).messageID + uint64(len(msgs)) - process(t, msgs, p) - latestMsgID, err = c.GetLatestMsgID() - assert.NoError(t, err) - assert.Equal(t, newMessageID, latestMsgID.(*nmqID).messageID) -} - -func TestNmqConsumer_ConsumeFromLatest(t *testing.T) { - client, err := createNmqClient() - assert.NoError(t, err) - defer client.Close() - - topic := t.Name() - p, err := client.CreateProducer(context.TODO(), common.ProducerOptions{Topic: topic}) - assert.NoError(t, err) - - msgs := []string{"111", "222", "333"} - process(t, msgs, p) - - c, err := client.Subscribe(context.TODO(), mqwrapper.ConsumerOptions{ - Topic: topic, - SubscriptionName: topic, - SubscriptionInitialPosition: common.SubscriptionPositionLatest, - BufSize: 1024, - }) - assert.NoError(t, err) - defer c.Close() - - msgs = []string{"444", "555"} - process(t, msgs, p) - - msg := <-c.Chan() - assert.Equal(t, "444", string(msg.Payload())) - msg = <-c.Chan() - assert.Equal(t, "555", string(msg.Payload())) -} - -func TestNmqConsumer_ConsumeFromEarliest(t *testing.T) { - client, err := createNmqClient() - assert.NoError(t, err) - defer client.Close() - - topic := t.Name() - p, err := client.CreateProducer(context.TODO(), common.ProducerOptions{Topic: topic}) - assert.NoError(t, err) - - msgs := []string{"111", "222"} - process(t, msgs, p) - - c, err := client.Subscribe(context.TODO(), mqwrapper.ConsumerOptions{ - Topic: topic, - SubscriptionName: topic, - SubscriptionInitialPosition: common.SubscriptionPositionEarliest, - BufSize: 1024, - }) - assert.NoError(t, err) - defer c.Close() - - msgs = []string{"333", "444", "555"} - process(t, msgs, p) - - msg := <-c.Chan() - assert.Equal(t, "111", string(msg.Payload())) - msg = <-c.Chan() - assert.Equal(t, "222", string(msg.Payload())) - - c2, err := client.Subscribe(context.TODO(), mqwrapper.ConsumerOptions{ - Topic: topic, - SubscriptionName: topic, - SubscriptionInitialPosition: common.SubscriptionPositionEarliest, - BufSize: 1024, - }) - assert.NoError(t, err) - defer c2.Close() - - msgs = []string{"777"} - process(t, msgs, p) - - msg = <-c2.Chan() - assert.Equal(t, "111", string(msg.Payload())) - msg = <-c2.Chan() - assert.Equal(t, "222", string(msg.Payload())) -} - -func TestNatsConsumer_SeekExclusive(t *testing.T) { - topic := t.Name() - c, p := newProducer(t, topic) - defer c.Close() - defer p.Close() - - msgs := []string{"111", "222", "333", "444", "555"} - process(t, msgs, p) - - msgID := &nmqID{messageID: 2} - consumer, err := newTestConsumer(t, topic, common.SubscriptionPositionUnknown) - assert.NoError(t, err) - defer consumer.Close() - err = consumer.Seek(msgID, false) - assert.NoError(t, err) - - msg := <-consumer.Chan() - assert.Equal(t, "333", string(msg.Payload())) - msg = <-consumer.Chan() - assert.Equal(t, "444", string(msg.Payload())) -} - -func TestNatsConsumer_SeekInclusive(t *testing.T) { - topic := t.Name() - c, p := newProducer(t, topic) - defer c.Close() - defer p.Close() - - msgs := []string{"111", "222", "333", "444", "555"} - - process(t, msgs, p) - - msgID := &nmqID{messageID: 2} - consumer, err := newTestConsumer(t, topic, common.SubscriptionPositionUnknown) - assert.NoError(t, err) - defer consumer.Close() - err = consumer.Seek(msgID, true) - assert.NoError(t, err) - - msg := <-consumer.Chan() - assert.Equal(t, "222", string(msg.Payload())) - msg = <-consumer.Chan() - assert.Equal(t, "333", string(msg.Payload())) -} - -func TestNatsConsumer_NoDoubleSeek(t *testing.T) { - topic := t.Name() - c, p := newProducer(t, topic) - defer c.Close() - defer p.Close() - - msgID := &nmqID{messageID: 2} - consumer, err := newTestConsumer(t, topic, common.SubscriptionPositionUnknown) - assert.NoError(t, err) - defer consumer.Close() - err = consumer.Seek(msgID, true) - assert.NoError(t, err) - err = consumer.Seek(msgID, true) - assert.Error(t, err) -} - -func TestNatsConsumer_ChanWithNoAssign(t *testing.T) { - topic := t.Name() - c, p := newProducer(t, topic) - defer c.Close() - defer p.Close() - - msgs := []string{"111", "222", "333", "444", "555"} - process(t, msgs, p) - - consumer, err := newTestConsumer(t, topic, common.SubscriptionPositionUnknown) - assert.NoError(t, err) - defer consumer.Close() - - assert.Panics(t, func() { - <-consumer.Chan() - }) -} diff --git a/pkg/mq/msgstream/mqwrapper/nmq/nmq_id.go b/pkg/mq/msgstream/mqwrapper/nmq/nmq_id.go deleted file mode 100644 index 62e5f4956f..0000000000 --- a/pkg/mq/msgstream/mqwrapper/nmq/nmq_id.go +++ /dev/null @@ -1,69 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package nmq - -import ( - "github.com/milvus-io/milvus/pkg/v2/common" - mqcommon "github.com/milvus-io/milvus/pkg/v2/mq/common" -) - -// MessageIDType is a type alias for server.UniqueID that represents the ID of a Nmq message. -type MessageIDType = uint64 - -// nmqID wraps message ID for natsmq -type nmqID struct { - messageID MessageIDType -} - -// Check if nmqID implements MessageID interface -var _ mqcommon.MessageID = &nmqID{} - -// NewNmqID creates and returns a new instance of the nmqID struct with the given MessageID. -func NewNmqID(id MessageIDType) mqcommon.MessageID { - return &nmqID{ - messageID: id, - } -} - -// Serialize convert nmq message id to []byte -func (nid *nmqID) Serialize() []byte { - return SerializeNmqID(nid.messageID) -} - -func (nid *nmqID) AtEarliestPosition() bool { - return nid.messageID <= 1 -} - -func (nid *nmqID) LessOrEqualThan(msgID []byte) (bool, error) { - return nid.messageID <= DeserializeNmqID(msgID), nil -} - -func (nid *nmqID) Equal(msgID []byte) (bool, error) { - return nid.messageID == DeserializeNmqID(msgID), nil -} - -// SerializeNmqID is used to serialize a message ID to byte array -func SerializeNmqID(messageID MessageIDType) []byte { - b := make([]byte, 8) - common.Endian.PutUint64(b, messageID) - return b -} - -// DeserializeNmqID is used to deserialize a message ID from byte array -func DeserializeNmqID(messageID []byte) MessageIDType { - return common.Endian.Uint64(messageID) -} diff --git a/pkg/mq/msgstream/mqwrapper/nmq/nmq_id_test.go b/pkg/mq/msgstream/mqwrapper/nmq/nmq_id_test.go deleted file mode 100644 index 5593499f00..0000000000 --- a/pkg/mq/msgstream/mqwrapper/nmq/nmq_id_test.go +++ /dev/null @@ -1,101 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package nmq - -import ( - "math" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestNmqID_Serialize(t *testing.T) { - rid := &nmqID{ - messageID: 8, - } - - bin := rid.Serialize() - assert.NotNil(t, bin) - assert.NotZero(t, len(bin)) -} - -func Test_AtEarliestPosition(t *testing.T) { - rid := &nmqID{ - messageID: 0, - } - assert.True(t, rid.AtEarliestPosition()) - - rid = &nmqID{ - messageID: math.MaxInt64, - } - assert.False(t, rid.AtEarliestPosition()) -} - -func TestLessOrEqualThan(t *testing.T) { - rid1 := &nmqID{ - messageID: 0, - } - rid2 := &nmqID{ - messageID: math.MaxInt64, - } - - ret, err := rid1.LessOrEqualThan(rid2.Serialize()) - assert.NoError(t, err) - assert.True(t, ret) - - ret, err = rid2.LessOrEqualThan(rid1.Serialize()) - assert.NoError(t, err) - assert.False(t, ret) - - ret, err = rid1.LessOrEqualThan(rid1.Serialize()) - assert.NoError(t, err) - assert.True(t, ret) -} - -func Test_Equal(t *testing.T) { - rid1 := &nmqID{ - messageID: 0, - } - - rid2 := &nmqID{ - messageID: math.MaxInt64, - } - - { - ret, err := rid1.Equal(rid1.Serialize()) - assert.NoError(t, err) - assert.True(t, ret) - } - - { - ret, err := rid1.Equal(rid2.Serialize()) - assert.NoError(t, err) - assert.False(t, ret) - } -} - -func Test_SerializeNmqID(t *testing.T) { - bin := SerializeNmqID(10) - assert.NotNil(t, bin) - assert.NotZero(t, len(bin)) -} - -func Test_DeserializeNmqID(t *testing.T) { - bin := SerializeNmqID(5) - id := DeserializeNmqID(bin) - assert.Equal(t, id, MessageIDType(5)) -} diff --git a/pkg/mq/msgstream/mqwrapper/nmq/nmq_message.go b/pkg/mq/msgstream/mqwrapper/nmq/nmq_message.go deleted file mode 100644 index 9bcafe01ae..0000000000 --- a/pkg/mq/msgstream/mqwrapper/nmq/nmq_message.go +++ /dev/null @@ -1,76 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package nmq - -import ( - "log" - - "github.com/nats-io/nats.go" - "go.uber.org/zap" - - "github.com/milvus-io/milvus/pkg/v2/mq/common" -) - -// Check nmqMessage implements ConsumerMessage -var ( - _ common.Message = (*nmqMessage)(nil) -) - -// nmqMessage wraps the message for natsmq -type nmqMessage struct { - raw *nats.Msg - - // lazy initialized field. - meta *nats.MsgMetadata -} - -// Topic returns the topic name of natsmq message -func (nm *nmqMessage) Topic() string { - // TODO: Dependency: implement of subscription logic of nmq. - // 1:1 Subject:Topic model is appied on this implementation. - // M:N model should be a optimize option in future. - return nm.raw.Subject -} - -// Properties returns the properties of natsmq message -func (nm *nmqMessage) Properties() map[string]string { - properties := make(map[string]string, len(nm.raw.Header)) - for k, vs := range nm.raw.Header { - if len(vs) > 0 { - properties[k] = vs[0] - } - } - return properties -} - -// Payload returns the payload of natsmq message -func (nm *nmqMessage) Payload() []byte { - return nm.raw.Data -} - -// ID returns the id of natsmq message -func (nm *nmqMessage) ID() common.MessageID { - if nm.meta == nil { - var err error - // raw is always a jetstream message, should never fail. - nm.meta, err = nm.raw.Metadata() - if err != nil { - log.Fatal("raw is always a jetstream message, should never fail", zap.Error(err)) - } - } - return &nmqID{messageID: nm.meta.Sequence.Stream} -} diff --git a/pkg/mq/msgstream/mqwrapper/nmq/nmq_message_test.go b/pkg/mq/msgstream/mqwrapper/nmq/nmq_message_test.go deleted file mode 100644 index d3980f0b10..0000000000 --- a/pkg/mq/msgstream/mqwrapper/nmq/nmq_message_test.go +++ /dev/null @@ -1,44 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package nmq - -import ( - "testing" - - "github.com/nats-io/nats.go" - "github.com/stretchr/testify/assert" -) - -func TestNmqMessage_All(t *testing.T) { - topic := t.Name() - raw := nats.NewMsg(topic) - raw.Data = []byte(`test payload`) - raw.Header.Add("test", "test") - nm := nmqMessage{ - meta: &nats.MsgMetadata{ - Sequence: nats.SequencePair{ - Stream: 12, - }, - }, - raw: raw, - } - payload := []byte("test payload") - assert.Equal(t, topic, nm.Topic()) - assert.Equal(t, MessageIDType(12), nm.ID().(*nmqID).messageID) - assert.Equal(t, payload, nm.Payload()) - assert.Equal(t, nm.Properties()["test"], "test") -} diff --git a/pkg/mq/msgstream/mqwrapper/nmq/nmq_producer.go b/pkg/mq/msgstream/mqwrapper/nmq/nmq_producer.go deleted file mode 100644 index a0345ec20c..0000000000 --- a/pkg/mq/msgstream/mqwrapper/nmq/nmq_producer.go +++ /dev/null @@ -1,78 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file exceapt in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package nmq - -import ( - "context" - - "github.com/nats-io/nats.go" - "go.uber.org/zap" - - "github.com/milvus-io/milvus/pkg/v2/log" - "github.com/milvus-io/milvus/pkg/v2/metrics" - "github.com/milvus-io/milvus/pkg/v2/mq/common" - "github.com/milvus-io/milvus/pkg/v2/mq/msgstream/mqwrapper" - "github.com/milvus-io/milvus/pkg/v2/util/timerecord" -) - -var _ mqwrapper.Producer = (*nmqProducer)(nil) - -// nmqProducer contains a natsmq producer -type nmqProducer struct { - js nats.JetStreamContext - topic string -} - -// Topic returns the topic of nmq producer -func (np *nmqProducer) Topic() string { - return np.topic -} - -// Send send the producer messages to natsmq -func (np *nmqProducer) Send(ctx context.Context, message *common.ProducerMessage) (common.MessageID, error) { - start := timerecord.NewTimeRecorder("send msg to stream") - metrics.MsgStreamOpCounter.WithLabelValues(metrics.SendMsgLabel, metrics.TotalLabel).Inc() - - // Encode message - msg := &nats.Msg{ - Subject: np.topic, - Header: make(nats.Header, len(message.Properties)), - Data: message.Payload, - } - for k, v := range message.Properties { - msg.Header.Add(k, v) - } - - // publish to nats-server - pa, err := np.js.PublishMsg(msg, nats.Context(ctx)) - if err != nil { - metrics.MsgStreamOpCounter.WithLabelValues(metrics.SendMsgLabel, metrics.FailLabel).Inc() - log.Warn("failed to publish message by nmq", zap.String("topic", np.topic), zap.Error(err), zap.Int("payload_size", len(message.Payload))) - return nil, err - } - - elapsed := start.ElapseSpan() - metrics.MsgStreamRequestLatency.WithLabelValues(metrics.SendMsgLabel).Observe(float64(elapsed.Milliseconds())) - metrics.MsgStreamOpCounter.WithLabelValues(metrics.SendMsgLabel, metrics.SuccessLabel).Inc() - return &nmqID{messageID: pa.Sequence}, err -} - -// Close does nothing currently -func (np *nmqProducer) Close() { - // No specific producer to be closed. - // stream doesn't close here. -} diff --git a/pkg/mq/msgstream/mqwrapper/nmq/nmq_producer_test.go b/pkg/mq/msgstream/mqwrapper/nmq/nmq_producer_test.go deleted file mode 100644 index d178e73168..0000000000 --- a/pkg/mq/msgstream/mqwrapper/nmq/nmq_producer_test.go +++ /dev/null @@ -1,47 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package nmq - -import ( - "context" - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/milvus-io/milvus/pkg/v2/mq/common" -) - -func TestNatsMQProducer(t *testing.T) { - c, err := createNmqClient() - assert.NoError(t, err) - defer c.Close() - topic := t.Name() - pOpts := common.ProducerOptions{Topic: topic} - - // Check Topic() - p, err := c.CreateProducer(context.TODO(), pOpts) - assert.NoError(t, err) - assert.Equal(t, p.(*nmqProducer).Topic(), topic) - - // Check Send() - msg := &common.ProducerMessage{ - Payload: []byte{}, - Properties: map[string]string{}, - } - _, err = p.Send(context.TODO(), msg) - assert.NoError(t, err) -} diff --git a/pkg/mq/msgstream/mqwrapper/nmq/nmq_server.go b/pkg/mq/msgstream/mqwrapper/nmq/nmq_server.go deleted file mode 100644 index 1377379896..0000000000 --- a/pkg/mq/msgstream/mqwrapper/nmq/nmq_server.go +++ /dev/null @@ -1,115 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package nmq - -import ( - "context" - "os" - "path" - "sync" - "time" - - "github.com/cockroachdb/errors" - "github.com/nats-io/nats-server/v2/server" - "go.uber.org/zap" - - "github.com/milvus-io/milvus/pkg/v2/log" - "github.com/milvus-io/milvus/pkg/v2/util/paramtable" -) - -// Nmq is global natsmq instance that will be initialized only once -var Nmq *server.Server - -// once is used to init global natsmq -var once sync.Once - -// NatsMQConfig is used to initialize NatsMQ. -type NatsMQConfig struct { - Opts server.Options - InitializeTimeout time.Duration -} - -// MustInitNatsMQ init global local natsmq instance. -// Panic if initailizing operation failed. -func MustInitNatsMQ(cfg *NatsMQConfig) { - once.Do(func() { - var err error - Nmq, err = initNatsMQ(cfg) - if err != nil { - log.Fatal("initialize nmq failed", zap.Error(err)) - } - }) -} - -func initNatsMQ(cfg *NatsMQConfig) (*server.Server, error) { - log.Info("try to initialize global nmq", zap.Any("config", cfg)) - natsServer, err := server.NewServer(&cfg.Opts) - if err != nil { - return nil, errors.Wrap(err, "fail to initailize nmq") - } - // Config log if log file set. - if cfg.Opts.LogFile != "" { - if err := os.MkdirAll(path.Dir(cfg.Opts.LogFile), 0o744); err != nil { - return nil, errors.Wrap(err, "fail to create directory for nats log file") - } - // make directory for the file - natsServer.ConfigureLogger() - } - // Start Nmq in background and wait until it's ready for connection. - if err := server.Run(natsServer); err != nil { - return nil, errors.Wrap(err, "start nmq failed") - } - // Wait for server to be ready for connections - if !natsServer.ReadyForConnections(cfg.InitializeTimeout) { - return nil, errors.New("nmq is not ready within timeout") - } - log.Info("initialize nmq finished", zap.String("client-url", natsServer.ClientURL()), zap.Error(err)) - return natsServer, nil -} - -// ParseServerOption get nats server option from paramstable. -func ParseServerOption(params *paramtable.ComponentParam) *NatsMQConfig { - return &NatsMQConfig{ - Opts: server.Options{ - Host: "127.0.0.1", // Force to use loopback address. - Port: params.NatsmqCfg.ServerPort.GetAsInt(), - MaxPayload: params.NatsmqCfg.ServerMaxPayload.GetAsInt32(), - MaxPending: params.NatsmqCfg.ServerMaxPending.GetAsInt64(), - JetStream: true, - JetStreamMaxStore: params.NatsmqCfg.ServerMaxFileStore.GetAsInt64(), - StoreDir: params.NatsmqCfg.ServerStoreDir.GetValue(), - Trace: params.NatsmqCfg.ServerMonitorTrace.GetAsBool(), - Debug: params.NatsmqCfg.ServerMonitorDebug.GetAsBool(), - Logtime: params.NatsmqCfg.ServerMonitorLogTime.GetAsBool(), - LogFile: params.NatsmqCfg.ServerMonitorLogFile.GetValue(), - LogSizeLimit: params.NatsmqCfg.ServerMonitorLogSizeLimit.GetAsInt64(), - }, - InitializeTimeout: time.Duration(params.NatsmqCfg.ServerInitializeTimeout.GetAsInt()) * time.Millisecond, - } -} - -// CloseNatsMQ is used to close global natsmq -func CloseNatsMQ() { - log.Ctx(context.TODO()).Debug("Closing Natsmq!") - if Nmq != nil { - // Shut down the server. - Nmq.Shutdown() - // Wait for server shutdown. - Nmq.WaitForShutdown() - Nmq = nil - } -} diff --git a/pkg/mq/msgstream/mqwrapper/nmq/nmq_server_test.go b/pkg/mq/msgstream/mqwrapper/nmq/nmq_server_test.go deleted file mode 100644 index 804607e4c0..0000000000 --- a/pkg/mq/msgstream/mqwrapper/nmq/nmq_server_test.go +++ /dev/null @@ -1,123 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package nmq - -import ( - "fmt" - "os" - "testing" - "time" - - "github.com/nats-io/nats-server/v2/server" - "github.com/stretchr/testify/assert" - - "github.com/milvus-io/milvus/pkg/v2/util/paramtable" -) - -var natsServerAddress string - -func TestMain(m *testing.M) { - exitCode := func() int { - paramtable.Init() - storeDir, _ := os.MkdirTemp("", "milvus_mq_nmq") - defer os.RemoveAll(storeDir) - - cfg := ParseServerOption(paramtable.Get()) - cfg.Opts.Port = server.RANDOM_PORT - cfg.Opts.StoreDir = storeDir - MustInitNatsMQ(cfg) - defer CloseNatsMQ() - - natsServerAddress = Nmq.ClientURL() - return m.Run() - }() - - os.Exit(exitCode) -} - -func TestInitNatsMQ(t *testing.T) { - func() { - cfg := ParseServerOption(paramtable.Get()) - storeDir, _ := os.MkdirTemp("", "milvus_mq_nmq") - defer os.RemoveAll(storeDir) - cfg.Opts.StoreDir = storeDir - cfg.Opts.Port = server.RANDOM_PORT - cfg.Opts.LogFile = "" - mq, err := initNatsMQ(cfg) - assert.NoError(t, err) - assert.NotNil(t, mq) - mq.Shutdown() - mq.WaitForShutdown() - }() - - func() { - cfg := ParseServerOption(paramtable.Get()) - storeDir, _ := os.MkdirTemp("", "milvus_mq_nmq") - defer os.RemoveAll(storeDir) - cfg.Opts.StoreDir = storeDir - cfg.Opts.Port = server.RANDOM_PORT - cfg.Opts.LogFile = "" - mq, err := initNatsMQ(cfg) - assert.NoError(t, err) - assert.NotNil(t, mq) - mq.Shutdown() - mq.WaitForShutdown() - }() - - func() { - cfg := ParseServerOption(paramtable.Get()) - storeDir, _ := os.MkdirTemp("", "milvus_mq_nmq") - defer os.RemoveAll(storeDir) - cfg.Opts.StoreDir = storeDir - cfg.Opts.Port = server.RANDOM_PORT - cfg.Opts.MaxPending = -1 - mq, err := initNatsMQ(cfg) - assert.Error(t, err) - assert.Nil(t, mq) - }() - - func() { - ex, err := os.Executable() - assert.NoError(t, err) - cfg := ParseServerOption(paramtable.Get()) - storeDir, _ := os.MkdirTemp("", "milvus_mq_nmq") - defer os.RemoveAll(storeDir) - cfg.Opts.StoreDir = storeDir - cfg.Opts.Port = server.RANDOM_PORT - cfg.Opts.LogFile = fmt.Sprintf("%s/test", ex) - mq, err := initNatsMQ(cfg) - assert.Error(t, err) - assert.Nil(t, mq) - }() -} - -func TestGetServerOptionDefault(t *testing.T) { - cfg := ParseServerOption(paramtable.Get()) - assert.Equal(t, "127.0.0.1", cfg.Opts.Host) - assert.Equal(t, 4222, cfg.Opts.Port) - assert.Equal(t, true, cfg.Opts.JetStream) - assert.Equal(t, "/var/lib/milvus/nats", cfg.Opts.StoreDir) - assert.Equal(t, int64(17179869184), cfg.Opts.JetStreamMaxStore) - assert.Equal(t, int32(8388608), cfg.Opts.MaxPayload) - assert.Equal(t, int64(67108864), cfg.Opts.MaxPending) - assert.Equal(t, 4000*time.Millisecond, cfg.InitializeTimeout) - assert.Equal(t, false, cfg.Opts.Debug) - assert.Equal(t, false, cfg.Opts.Trace) - assert.Equal(t, true, cfg.Opts.Logtime) - assert.Equal(t, "/tmp/milvus/logs/nats.log", cfg.Opts.LogFile) - assert.Equal(t, int64(536870912), cfg.Opts.LogSizeLimit) -} diff --git a/pkg/mq/msgstream/mqwrapper/rmq/rmq_client.go b/pkg/mq/msgstream/mqwrapper/rmq/rmq_client.go index 29d08939c8..ca7b862702 100644 --- a/pkg/mq/msgstream/mqwrapper/rmq/rmq_client.go +++ b/pkg/mq/msgstream/mqwrapper/rmq/rmq_client.go @@ -32,7 +32,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/util/timerecord" ) -// nmqClient implements mqwrapper.Client. +// rmqClient implements mqwrapper.Client. var _ mqwrapper.Client = &rmqClient{} // var InitRocksMQ = server.InitRocksMQ diff --git a/pkg/mq/msgstream/stream_bench_test.go b/pkg/mq/msgstream/stream_bench_test.go index f31cab18d6..c91d2f74db 100644 --- a/pkg/mq/msgstream/stream_bench_test.go +++ b/pkg/mq/msgstream/stream_bench_test.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "math/rand" - "os" "sync" "testing" @@ -12,33 +11,9 @@ import ( "github.com/milvus-io/milvus/pkg/v2/mq/common" "github.com/milvus-io/milvus/pkg/v2/mq/msgstream/mqwrapper" - "github.com/milvus-io/milvus/pkg/v2/mq/msgstream/mqwrapper/nmq" "github.com/milvus-io/milvus/pkg/v2/util/funcutil" - "github.com/milvus-io/milvus/pkg/v2/util/paramtable" ) -func BenchmarkProduceAndConsumeNatsMQ(b *testing.B) { - storeDir, err := os.MkdirTemp("", "milvus_mq_nmq") - assert.NoError(b, err) - defer os.RemoveAll(storeDir) - - paramtable.Init() - cfg := nmq.ParseServerOption(paramtable.Get()) - cfg.Opts.StoreDir = storeDir - nmq.MustInitNatsMQ(cfg) - - client, err := nmq.NewClientWithDefaultOptions(context.Background()) - if err != nil { - panic(err) - } - cases := generateRandBytes(64*1024, 10000) - b.ResetTimer() - - for i := 0; i < b.N; i++ { - benchmarkProduceAndConsume(b, client, cases) - } -} - func benchmarkProduceAndConsume(b *testing.B, mqClient mqwrapper.Client, cases [][]byte) { topic := fmt.Sprintf("test_produce_and_consume_topic_%d", rand.Int31n(100000)) wg := sync.WaitGroup{} diff --git a/pkg/util/paramtable/service_param.go b/pkg/util/paramtable/service_param.go index 96dc19c792..f93d3dfbfe 100644 --- a/pkg/util/paramtable/service_param.go +++ b/pkg/util/paramtable/service_param.go @@ -50,7 +50,6 @@ type ServiceParam struct { PulsarCfg PulsarConfig KafkaCfg KafkaConfig RocksmqCfg RocksmqConfig - NatsmqCfg NatsmqConfig MinioCfg MinioConfig ProfileCfg ProfileConfig } @@ -65,7 +64,6 @@ func (p *ServiceParam) init(bt *BaseTable) { p.PulsarCfg.Init(bt) p.KafkaCfg.Init(bt) p.RocksmqCfg.Init(bt) - p.NatsmqCfg.Init(bt) p.MinioCfg.Init(bt) p.ProfileCfg.Init(bt) } @@ -74,11 +72,6 @@ func (p *ServiceParam) RocksmqEnable() bool { return p.RocksmqCfg.Path.GetValue() != "" } -// NatsmqEnable checks if NATS messaging queue is enabled. -func (p *ServiceParam) NatsmqEnable() bool { - return p.NatsmqCfg.ServerStoreDir.GetValue() != "" -} - func (p *ServiceParam) PulsarEnable() bool { return p.PulsarCfg.Address.GetValue() != "" } @@ -548,7 +541,7 @@ func (p *MQConfig) Init(base *BaseTable) { Version: "2.3.0", DefaultValue: "default", Doc: `Default value: "default" -Valid values: [default, pulsar, kafka, rocksmq, natsmq, woodpecker]`, +Valid values: [default, pulsar, kafka, rocksmq, woodpecker]`, Export: true, } p.Type.Init(base.mgr) @@ -1216,141 +1209,6 @@ Set an easy-to-identify root key prefix for Milvus if etcd service already exist r.CompressionTypes.Init(base.mgr) } -// NatsmqConfig describes the configuration options for the Nats message queue -type NatsmqConfig struct { - ServerPort ParamItem `refreshable:"false"` - ServerStoreDir ParamItem `refreshable:"false"` - ServerMaxFileStore ParamItem `refreshable:"false"` - ServerMaxPayload ParamItem `refreshable:"false"` - ServerMaxPending ParamItem `refreshable:"false"` - ServerInitializeTimeout ParamItem `refreshable:"false"` - ServerMonitorTrace ParamItem `refreshable:"false"` - ServerMonitorDebug ParamItem `refreshable:"false"` - ServerMonitorLogTime ParamItem `refreshable:"false"` - ServerMonitorLogFile ParamItem `refreshable:"false"` - ServerMonitorLogSizeLimit ParamItem `refreshable:"false"` - ServerRetentionMaxAge ParamItem `refreshable:"true"` - ServerRetentionMaxBytes ParamItem `refreshable:"true"` - ServerRetentionMaxMsgs ParamItem `refreshable:"true"` -} - -// Init sets up a new NatsmqConfig instance using the provided BaseTable -func (r *NatsmqConfig) Init(base *BaseTable) { - r.ServerPort = ParamItem{ - Key: "natsmq.server.port", - Version: "2.3.0", - DefaultValue: "4222", - Doc: "Listening port of the NATS server.", - Export: true, - } - r.ServerPort.Init(base.mgr) - r.ServerStoreDir = ParamItem{ - Key: "natsmq.server.storeDir", - DefaultValue: "/var/lib/milvus/nats", - Version: "2.3.0", - Doc: `Directory to use for JetStream storage of nats`, - Export: true, - } - r.ServerStoreDir.Init(base.mgr) - r.ServerMaxFileStore = ParamItem{ - Key: "natsmq.server.maxFileStore", - Version: "2.3.0", - DefaultValue: "17179869184", - Doc: `Maximum size of the 'file' storage`, - Export: true, - } - r.ServerMaxFileStore.Init(base.mgr) - r.ServerMaxPayload = ParamItem{ - Key: "natsmq.server.maxPayload", - Version: "2.3.0", - DefaultValue: "8388608", - Doc: `Maximum number of bytes in a message payload`, - Export: true, - } - r.ServerMaxPayload.Init(base.mgr) - r.ServerMaxPending = ParamItem{ - Key: "natsmq.server.maxPending", - Version: "2.3.0", - DefaultValue: "67108864", - Doc: `Maximum number of bytes buffered for a connection Applies to client connections`, - Export: true, - } - r.ServerMaxPending.Init(base.mgr) - r.ServerInitializeTimeout = ParamItem{ - Key: "natsmq.server.initializeTimeout", - Version: "2.3.0", - DefaultValue: "4000", - Doc: `waiting for initialization of natsmq finished`, - Export: true, - } - r.ServerInitializeTimeout.Init(base.mgr) - r.ServerMonitorTrace = ParamItem{ - Key: "natsmq.server.monitor.trace", - Version: "2.3.0", - DefaultValue: "false", - Doc: `If true enable protocol trace log messages`, - Export: true, - } - r.ServerMonitorTrace.Init(base.mgr) - r.ServerMonitorDebug = ParamItem{ - Key: "natsmq.server.monitor.debug", - Version: "2.3.0", - DefaultValue: "false", - Doc: `If true enable debug log messages`, - Export: true, - } - r.ServerMonitorDebug.Init(base.mgr) - r.ServerMonitorLogTime = ParamItem{ - Key: "natsmq.server.monitor.logTime", - Version: "2.3.0", - DefaultValue: "true", - Doc: `If set to false, log without timestamps.`, - Export: true, - } - r.ServerMonitorLogTime.Init(base.mgr) - r.ServerMonitorLogFile = ParamItem{ - Key: "natsmq.server.monitor.logFile", - Version: "2.3.0", - DefaultValue: "/tmp/milvus/logs/nats.log", - Doc: `Log file path relative to .. of milvus binary if use relative path`, - Export: true, - } - r.ServerMonitorLogFile.Init(base.mgr) - r.ServerMonitorLogSizeLimit = ParamItem{ - Key: "natsmq.server.monitor.logSizeLimit", - Version: "2.3.0", - DefaultValue: "536870912", - Doc: `Size in bytes after the log file rolls over to a new one`, - Export: true, - } - r.ServerMonitorLogSizeLimit.Init(base.mgr) - - r.ServerRetentionMaxAge = ParamItem{ - Key: "natsmq.server.retention.maxAge", - Version: "2.3.0", - DefaultValue: "4320", - Doc: `Maximum age of any message in the P-channel`, - Export: true, - } - r.ServerRetentionMaxAge.Init(base.mgr) - r.ServerRetentionMaxBytes = ParamItem{ - Key: "natsmq.server.retention.maxBytes", - Version: "2.3.0", - DefaultValue: "", - Doc: `How many bytes the single P-channel may contain. Removing oldest messages if the P-channel exceeds this size`, - Export: true, - } - r.ServerRetentionMaxBytes.Init(base.mgr) - r.ServerRetentionMaxMsgs = ParamItem{ - Key: "natsmq.server.retention.maxMsgs", - Version: "2.3.0", - DefaultValue: "", - Doc: `How many message the single P-channel may contain. Removing oldest messages if the P-channel exceeds this limit`, - Export: true, - } - r.ServerRetentionMaxMsgs.Init(base.mgr) -} - // ///////////////////////////////////////////////////////////////////////////// // --- minio --- type MinioConfig struct { diff --git a/tests/python_client/chaos/nats-standalone-values.yaml b/tests/python_client/chaos/nats-standalone-values.yaml deleted file mode 100644 index f3440cf818..0000000000 --- a/tests/python_client/chaos/nats-standalone-values.yaml +++ /dev/null @@ -1,46 +0,0 @@ -cluster: - enabled: false - -log: - level: debug - -image: - all: - repository: milvusdb/milvus - tag: master-20230614-893c3c04 - pullPolicy: IfNotPresent -standalone: - resources: - limits: - cpu: 8 - memory: 16Gi - requests: - cpu: 4 - memory: 8Gi - -kafka: - enabled: false - name: kafka - replicaCount: 3 - defaultReplicationFactor: 2 - -etcd: - replicaCount: 3 - image: - repository: milvusdb/etcd - tag: 3.5.5-r2 -minio: - mode: standalone -pulsar: - enabled: false - -extraConfigFiles: - user.yaml: |+ - mq: - type: natsmq - dataCoord: - compaction: - indexBasedCompaction: false - indexCoord: - scheduler: - interval: 100 \ No newline at end of file