From 6e7831470c7a0cde137b4be075f08825550f7930 Mon Sep 17 00:00:00 2001 From: jaime Date: Tue, 2 Aug 2022 21:26:33 +0800 Subject: [PATCH] Revert settings of Pulsar address and refine config code (#18494) Signed-off-by: yun.zhang --- cmd/roles/roles.go | 7 +- .../distributed/connection_manager_test.go | 5 +- .../mq/msgstream/mq_kafka_msgstream_test.go | 12 +- internal/mq/msgstream/mq_msgstream_test.go | 55 ++++--- .../mqwrapper/kafka/kafka_client_test.go | 6 +- .../mqwrapper/kafka/kafka_consumer_test.go | 2 +- .../mqwrapper/kafka/kafka_producer_test.go | 4 +- .../mqwrapper/pulsar/pulsar_client_test.go | 26 ++-- .../mqwrapper/pulsar/pulsar_consumer_test.go | 10 +- .../mqwrapper/pulsar/pulsar_producer_test.go | 2 +- internal/storage/minio_chunk_manager_test.go | 15 +- internal/util/dependency/factory.go | 5 +- internal/util/paramtable/base_table.go | 135 ++---------------- internal/util/paramtable/base_table_test.go | 5 +- internal/util/paramtable/service_param.go | 28 ++-- .../util/paramtable/service_param_test.go | 44 +++++- .../util/sessionutil/session_util_test.go | 25 +--- 17 files changed, 170 insertions(+), 216 deletions(-) diff --git a/cmd/roles/roles.go b/cmd/roles/roles.go index c02544b50f..e4fca12160 100644 --- a/cmd/roles/roles.go +++ b/cmd/roles/roles.go @@ -367,11 +367,14 @@ func (mr *MilvusRoles) Run(local bool, alias string) { Params.Init() if Params.RocksmqEnable() { - path, _ := Params.Load("_RocksmqPath") - err := rocksmqimpl.InitRocksMQ(path) + path, err := Params.Load("rocksmq.path") if err != nil { panic(err) } + + if err = rocksmqimpl.InitRocksMQ(path); err != nil { + panic(err) + } defer stopRocksmq() } diff --git a/internal/distributed/connection_manager_test.go b/internal/distributed/connection_manager_test.go index 7ae0d134d7..e773e7c5b4 100644 --- a/internal/distributed/connection_manager_test.go +++ b/internal/distributed/connection_manager_test.go @@ -280,10 +280,7 @@ func initSession(ctx context.Context) *sessionutil.Session { } metaRootPath := rootPath + "/" + subPath - endpoints, err := Params.Load("_EtcdEndpoints") - if err != nil { - panic(err) - } + endpoints := Params.LoadWithDefault("etcd.endpoints", paramtable.DefaultEtcdEndpoints) etcdEndpoints := strings.Split(endpoints, ",") log.Debug("metaRootPath", zap.Any("metaRootPath", metaRootPath)) diff --git a/internal/mq/msgstream/mq_kafka_msgstream_test.go b/internal/mq/msgstream/mq_kafka_msgstream_test.go index 097295be2d..fbfa3a9768 100644 --- a/internal/mq/msgstream/mq_kafka_msgstream_test.go +++ b/internal/mq/msgstream/mq_kafka_msgstream_test.go @@ -36,7 +36,7 @@ import ( // Note: kafka does not support get all data when consuming from the earliest position again. //func TestStream_KafkaTtMsgStream_NoSeek(t *testing.T) { -// kafkaAddress, _ := Params.Load("_KafkaBrokerList") +// kafkaAddress := Params.Get("kafka.brokerList") // c1 := funcutil.RandomString(8) // producerChannels := []string{c1} // consumerChannels := []string{c1} @@ -109,7 +109,7 @@ func skipTest(t *testing.T) { func TestStream_KafkaMsgStream_SeekToLast(t *testing.T) { skipTest(t) - kafkaAddress, _ := Params.Load("_KafkaBrokerList") + kafkaAddress := Params.Get("kafka.brokerList") c := funcutil.RandomString(8) producerChannels := []string{c} consumerChannels := []string{c} @@ -186,7 +186,7 @@ func TestStream_KafkaMsgStream_SeekToLast(t *testing.T) { func TestStream_KafkaTtMsgStream_Seek(t *testing.T) { skipTest(t) - kafkaAddress, _ := Params.Load("_KafkaBrokerList") + kafkaAddress := Params.Get("kafka.brokerList") c1 := funcutil.RandomString(8) producerChannels := []string{c1} consumerChannels := []string{c1} @@ -300,7 +300,7 @@ func TestStream_KafkaTtMsgStream_Seek(t *testing.T) { func TestStream_KafkaTtMsgStream_1(t *testing.T) { skipTest(t) - kafkaAddress, _ := Params.Load("_KafkaBrokerList") + kafkaAddress := Params.Get("kafka.brokerList") c1 := funcutil.RandomString(8) c2 := funcutil.RandomString(8) p1Channels := []string{c1} @@ -347,7 +347,7 @@ func TestStream_KafkaTtMsgStream_1(t *testing.T) { func TestStream_KafkaTtMsgStream_2(t *testing.T) { skipTest(t) - kafkaAddress, _ := Params.Load("_KafkaBrokerList") + kafkaAddress := Params.Get("kafka.brokerList") c1 := funcutil.RandomString(8) c2 := funcutil.RandomString(8) p1Channels := []string{c1} @@ -405,7 +405,7 @@ func TestStream_KafkaTtMsgStream_2(t *testing.T) { func TestStream_KafkaTtMsgStream_DataNodeTimetickMsgstream(t *testing.T) { skipTest(t) - kafkaAddress, _ := Params.Load("_KafkaBrokerList") + kafkaAddress := Params.Get("kafka.brokerList") c1 := funcutil.RandomString(8) p1Channels := []string{c1} consumerChannels := []string{c1} diff --git a/internal/mq/msgstream/mq_msgstream_test.go b/internal/mq/msgstream/mq_msgstream_test.go index 8f3363ca21..33558a97a3 100644 --- a/internal/mq/msgstream/mq_msgstream_test.go +++ b/internal/mq/msgstream/mq_msgstream_test.go @@ -57,6 +57,15 @@ func TestMain(m *testing.M) { os.Exit(exitCode) } +func getPulsarAddress() string { + pulsarHost := Params.LoadWithDefault("pulsar.address", "") + port := Params.LoadWithDefault("pulsar.port", "") + if len(pulsarHost) != 0 && len(port) != 0 { + return "pulsar://" + pulsarHost + ":" + port + } + panic("invalid pulsar address") +} + type fixture struct { t *testing.T etcdKV *etcdkv.EtcdKV @@ -67,7 +76,7 @@ type parameters struct { } func (f *fixture) setup() []parameters { - pulsarAddress, _ := Params.Load("_PulsarAddress") + pulsarAddress := getPulsarAddress() pulsarClient, err := pulsarwrapper.NewClient(pulsar.ClientOptions{URL: pulsarAddress}) assert.Nil(f.t, err) @@ -370,7 +379,7 @@ func TestMqMsgStream_SeekNotSubscribed(t *testing.T) { /* ========================== Pulsar & RocksMQ Tests ========================== */ func TestStream_PulsarMsgStream_Insert(t *testing.T) { - pulsarAddress, _ := Params.Load("_PulsarAddress") + pulsarAddress := getPulsarAddress() c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8) producerChannels := []string{c1, c2} consumerChannels := []string{c1, c2} @@ -393,7 +402,7 @@ func TestStream_PulsarMsgStream_Insert(t *testing.T) { } func TestStream_PulsarMsgStream_Delete(t *testing.T) { - pulsarAddress, _ := Params.Load("_PulsarAddress") + pulsarAddress := getPulsarAddress() c := funcutil.RandomString(8) producerChannels := []string{c} consumerChannels := []string{c} @@ -414,7 +423,7 @@ func TestStream_PulsarMsgStream_Delete(t *testing.T) { } func TestStream_PulsarMsgStream_Search(t *testing.T) { - pulsarAddress, _ := Params.Load("_PulsarAddress") + pulsarAddress := getPulsarAddress() c := funcutil.RandomString(8) producerChannels := []string{c} consumerChannels := []string{c} @@ -437,7 +446,7 @@ func TestStream_PulsarMsgStream_Search(t *testing.T) { } func TestStream_PulsarMsgStream_SearchResult(t *testing.T) { - pulsarAddress, _ := Params.Load("_PulsarAddress") + pulsarAddress := getPulsarAddress() c := funcutil.RandomString(8) producerChannels := []string{c} consumerChannels := []string{c} @@ -459,7 +468,7 @@ func TestStream_PulsarMsgStream_SearchResult(t *testing.T) { } func TestStream_PulsarMsgStream_TimeTick(t *testing.T) { - pulsarAddress, _ := Params.Load("_PulsarAddress") + pulsarAddress := getPulsarAddress() c := funcutil.RandomString(8) producerChannels := []string{c} consumerChannels := []string{c} @@ -481,7 +490,7 @@ func TestStream_PulsarMsgStream_TimeTick(t *testing.T) { } func TestStream_PulsarMsgStream_BroadCast(t *testing.T) { - pulsarAddress, _ := Params.Load("_PulsarAddress") + pulsarAddress := getPulsarAddress() c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8) producerChannels := []string{c1, c2} consumerChannels := []string{c1, c2} @@ -504,7 +513,7 @@ func TestStream_PulsarMsgStream_BroadCast(t *testing.T) { } func TestStream_PulsarMsgStream_RepackFunc(t *testing.T) { - pulsarAddress, _ := Params.Load("_PulsarAddress") + pulsarAddress := getPulsarAddress() c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8) producerChannels := []string{c1, c2} consumerChannels := []string{c1, c2} @@ -526,7 +535,7 @@ func TestStream_PulsarMsgStream_RepackFunc(t *testing.T) { } func TestStream_PulsarMsgStream_InsertRepackFunc(t *testing.T) { - pulsarAddress, _ := Params.Load("_PulsarAddress") + pulsarAddress := getPulsarAddress() c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8) producerChannels := []string{c1, c2} consumerChannels := []string{c1, c2} @@ -583,7 +592,7 @@ func TestStream_PulsarMsgStream_InsertRepackFunc(t *testing.T) { } func TestStream_PulsarMsgStream_DeleteRepackFunc(t *testing.T) { - pulsarAddress, _ := Params.Load("_PulsarAddress") + pulsarAddress := getPulsarAddress() c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8) producerChannels := []string{c1, c2} consumerChannels := []string{c1, c2} @@ -638,7 +647,7 @@ func TestStream_PulsarMsgStream_DeleteRepackFunc(t *testing.T) { } func TestStream_PulsarMsgStream_DefaultRepackFunc(t *testing.T) { - pulsarAddress, _ := Params.Load("_PulsarAddress") + pulsarAddress := getPulsarAddress() c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8) producerChannels := []string{c1, c2} consumerChannels := []string{c1, c2} @@ -672,7 +681,7 @@ func TestStream_PulsarMsgStream_DefaultRepackFunc(t *testing.T) { } func TestStream_PulsarTtMsgStream_Insert(t *testing.T) { - pulsarAddress, _ := Params.Load("_PulsarAddress") + pulsarAddress := getPulsarAddress() c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8) producerChannels := []string{c1, c2} consumerChannels := []string{c1, c2} @@ -706,7 +715,7 @@ func TestStream_PulsarTtMsgStream_Insert(t *testing.T) { } func TestStream_PulsarTtMsgStream_NoSeek(t *testing.T) { - pulsarAddress, _ := Params.Load("_PulsarAddress") + pulsarAddress := getPulsarAddress() c1 := funcutil.RandomString(8) producerChannels := []string{c1} consumerChannels := []string{c1} @@ -773,7 +782,7 @@ func TestStream_PulsarTtMsgStream_NoSeek(t *testing.T) { } func TestStream_PulsarMsgStream_SeekToLast(t *testing.T) { - pulsarAddress, _ := Params.Load("_PulsarAddress") + pulsarAddress := getPulsarAddress() c := funcutil.RandomString(8) producerChannels := []string{c} consumerChannels := []string{c} @@ -850,7 +859,7 @@ func TestStream_PulsarMsgStream_SeekToLast(t *testing.T) { } func TestStream_PulsarTtMsgStream_Seek(t *testing.T) { - pulsarAddress, _ := Params.Load("_PulsarAddress") + pulsarAddress := getPulsarAddress() c1 := funcutil.RandomString(8) producerChannels := []string{c1} consumerChannels := []string{c1} @@ -962,7 +971,7 @@ func TestStream_PulsarTtMsgStream_Seek(t *testing.T) { } func TestStream_PulsarTtMsgStream_UnMarshalHeader(t *testing.T) { - pulsarAddress, _ := Params.Load("_PulsarAddress") + pulsarAddress := getPulsarAddress() c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8) producerChannels := []string{c1, c2} consumerChannels := []string{c1, c2} @@ -1065,7 +1074,7 @@ func sendMsgPacks(ms MsgStream, msgPacks []*MsgPack) error { // 2. The count of consumed msg should be equal to the count of produced msg // func TestStream_PulsarTtMsgStream_1(t *testing.T) { - pulsarAddr, _ := Params.Load("_PulsarAddress") + pulsarAddr := getPulsarAddress() c1 := funcutil.RandomString(8) c2 := funcutil.RandomString(8) p1Channels := []string{c1} @@ -1128,7 +1137,7 @@ func TestStream_PulsarTtMsgStream_1(t *testing.T) { // 2. The count of consumed msg should be equal to the count of produced msg // func TestStream_PulsarTtMsgStream_2(t *testing.T) { - pulsarAddr, _ := Params.Load("_PulsarAddress") + pulsarAddr := getPulsarAddress() c1 := funcutil.RandomString(8) c2 := funcutil.RandomString(8) p1Channels := []string{c1} @@ -1184,7 +1193,7 @@ func TestStream_PulsarTtMsgStream_2(t *testing.T) { } func TestStream_MqMsgStream_Seek(t *testing.T) { - pulsarAddress, _ := Params.Load("_PulsarAddress") + pulsarAddress := getPulsarAddress() c := funcutil.RandomString(8) producerChannels := []string{c} consumerChannels := []string{c} @@ -1228,7 +1237,7 @@ func TestStream_MqMsgStream_Seek(t *testing.T) { } func TestStream_MqMsgStream_SeekInvalidMessage(t *testing.T) { - pulsarAddress, _ := Params.Load("_PulsarAddress") + pulsarAddress := getPulsarAddress() c := funcutil.RandomString(8) producerChannels := []string{c} consumerChannels := []string{c} @@ -1349,7 +1358,7 @@ func TestStream_RMqMsgStream_SeekInvalidMessage(t *testing.T) { } func TestStream_MqMsgStream_SeekLatest(t *testing.T) { - pulsarAddress, _ := Params.Load("_PulsarAddress") + pulsarAddress := getPulsarAddress() c := funcutil.RandomString(8) producerChannels := []string{c} consumerChannels := []string{c} @@ -1697,7 +1706,7 @@ func TestStream_RmqTtMsgStream_Seek(t *testing.T) { } func TestStream_BroadcastMark(t *testing.T) { - pulsarAddress, _ := Params.Load("_PulsarAddress") + pulsarAddress := getPulsarAddress() c1 := funcutil.RandomString(8) c2 := funcutil.RandomString(8) producerChannels := []string{c1, c2} @@ -1759,7 +1768,7 @@ func TestStream_BroadcastMark(t *testing.T) { } func TestStream_ProduceMark(t *testing.T) { - pulsarAddress, _ := Params.Load("_PulsarAddress") + pulsarAddress := getPulsarAddress() c1 := funcutil.RandomString(8) c2 := funcutil.RandomString(8) producerChannels := []string{c1, c2} diff --git a/internal/mq/msgstream/mqwrapper/kafka/kafka_client_test.go b/internal/mq/msgstream/mqwrapper/kafka/kafka_client_test.go index a5ef4fb9d0..51beb7d084 100644 --- a/internal/mq/msgstream/mqwrapper/kafka/kafka_client_test.go +++ b/internal/mq/msgstream/mqwrapper/kafka/kafka_client_test.go @@ -251,7 +251,7 @@ func TestKafkaClient_ConsumeFromLatest(t *testing.T) { } func TestKafkaClient_EarliestMessageID(t *testing.T) { - kafkaAddress, _ := Params.Load("_KafkaBrokerList") + kafkaAddress := Params.Get("kafka.brokerList") kc := NewKafkaClientInstance(kafkaAddress) defer kc.Close() @@ -260,7 +260,7 @@ func TestKafkaClient_EarliestMessageID(t *testing.T) { } func TestKafkaClient_MsgSerializAndDeserialize(t *testing.T) { - kafkaAddress, _ := Params.Load("_KafkaBrokerList") + kafkaAddress := Params.Get("kafka.brokerList") kc := NewKafkaClientInstance(kafkaAddress) defer kc.Close() @@ -292,7 +292,7 @@ func TestKafkaClient_NewKafkaClientInstanceWithConfig(t *testing.T) { } func createKafkaClient(t *testing.T) *kafkaClient { - kafkaAddress, _ := Params.Load("_KafkaBrokerList") + kafkaAddress := Params.Get("kafka.brokerList") kc := NewKafkaClientInstance(kafkaAddress) assert.NotNil(t, kc) return kc diff --git a/internal/mq/msgstream/mqwrapper/kafka/kafka_consumer_test.go b/internal/mq/msgstream/mqwrapper/kafka/kafka_consumer_test.go index 159c074bbb..5cedc35b7e 100644 --- a/internal/mq/msgstream/mqwrapper/kafka/kafka_consumer_test.go +++ b/internal/mq/msgstream/mqwrapper/kafka/kafka_consumer_test.go @@ -142,7 +142,7 @@ func testKafkaConsumerProduceData(t *testing.T, topic string, data []int) { } func createConfig(groupID string) *kafka.ConfigMap { - kafkaAddress, _ := Params.Load("_KafkaBrokerList") + kafkaAddress := Params.Get("kafka.brokerList") return &kafka.ConfigMap{ "bootstrap.servers": kafkaAddress, "group.id": groupID, diff --git a/internal/mq/msgstream/mqwrapper/kafka/kafka_producer_test.go b/internal/mq/msgstream/mqwrapper/kafka/kafka_producer_test.go index df48600266..5731ef49d1 100644 --- a/internal/mq/msgstream/mqwrapper/kafka/kafka_producer_test.go +++ b/internal/mq/msgstream/mqwrapper/kafka/kafka_producer_test.go @@ -15,7 +15,7 @@ import ( ) func TestKafkaProducer_SendSuccess(t *testing.T) { - kafkaAddress, _ := Params.Load("_KafkaBrokerList") + kafkaAddress := Params.Get("kafka.brokerList") kc := NewKafkaClientInstance(kafkaAddress) defer kc.Close() assert.NotNil(t, kc) @@ -42,7 +42,7 @@ func TestKafkaProducer_SendSuccess(t *testing.T) { } func TestKafkaProducer_SendFail(t *testing.T) { - kafkaAddress, _ := Params.Load("_KafkaBrokerList") + kafkaAddress := Params.Get("kafka.brokerList") { deliveryChan := make(chan kafka.Event, 1) diff --git a/internal/mq/msgstream/mqwrapper/pulsar/pulsar_client_test.go b/internal/mq/msgstream/mqwrapper/pulsar/pulsar_client_test.go index d21c3ccd80..6d76c1ca23 100644 --- a/internal/mq/msgstream/mqwrapper/pulsar/pulsar_client_test.go +++ b/internal/mq/msgstream/mqwrapper/pulsar/pulsar_client_test.go @@ -48,6 +48,16 @@ func TestMain(m *testing.M) { os.Exit(exitCode) } +func getPulsarAddress() string { + pulsarHost := Params.LoadWithDefault("pulsar.address", "") + port := Params.LoadWithDefault("pulsar.port", "") + log.Info("pulsar address", zap.String("host", pulsarHost), zap.String("port", port)) + if len(pulsarHost) != 0 && len(port) != 0 { + return "pulsar://" + pulsarHost + ":" + port + } + panic("invalid pulsar address") +} + func IntToBytes(n int) []byte { tmp := int32(n) bytesBuffer := bytes.NewBuffer([]byte{}) @@ -193,7 +203,7 @@ func Consume3(ctx context.Context, t *testing.T, pc *pulsarClient, topic string, } func TestPulsarClient_Consume1(t *testing.T) { - pulsarAddress, _ := Params.Load("_PulsarAddress") + pulsarAddress := getPulsarAddress() pc, err := NewClient(pulsar.ClientOptions{URL: pulsarAddress}) defer pc.Close() assert.NoError(t, err) @@ -344,7 +354,7 @@ func Consume23(ctx context.Context, t *testing.T, pc *pulsarClient, topic string } func TestPulsarClient_Consume2(t *testing.T) { - pulsarAddress, _ := Params.Load("_PulsarAddress") + pulsarAddress := getPulsarAddress() pc, err := NewClient(pulsar.ClientOptions{URL: pulsarAddress}) defer pc.Close() assert.NoError(t, err) @@ -394,7 +404,7 @@ func TestPulsarClient_Consume2(t *testing.T) { } func TestPulsarClient_SeekPosition(t *testing.T) { - pulsarAddress, _ := Params.Load("_PulsarAddress") + pulsarAddress := getPulsarAddress() pc, err := NewClient(pulsar.ClientOptions{URL: pulsarAddress}) defer pc.Close() assert.NoError(t, err) @@ -467,7 +477,7 @@ func TestPulsarClient_SeekPosition(t *testing.T) { } func TestPulsarClient_SeekLatest(t *testing.T) { - pulsarAddress, _ := Params.Load("_PulsarAddress") + pulsarAddress := getPulsarAddress() pc, err := NewClient(pulsar.ClientOptions{URL: pulsarAddress}) defer pc.Close() assert.NoError(t, err) @@ -530,7 +540,7 @@ func TestPulsarClient_SeekLatest(t *testing.T) { } func TestPulsarClient_EarliestMessageID(t *testing.T) { - pulsarAddress, _ := Params.Load("_PulsarAddress") + pulsarAddress := getPulsarAddress() client, _ := NewClient(pulsar.ClientOptions{URL: pulsarAddress}) defer client.Close() @@ -539,7 +549,7 @@ func TestPulsarClient_EarliestMessageID(t *testing.T) { } func TestPulsarClient_StringToMsgID(t *testing.T) { - pulsarAddress, _ := Params.Load("_PulsarAddress") + pulsarAddress := getPulsarAddress() client, _ := NewClient(pulsar.ClientOptions{URL: pulsarAddress}) defer client.Close() @@ -557,7 +567,7 @@ func TestPulsarClient_StringToMsgID(t *testing.T) { } func TestPulsarClient_BytesToMsgID(t *testing.T) { - pulsarAddress, _ := Params.Load("_PulsarAddress") + pulsarAddress := getPulsarAddress() client, _ := NewClient(pulsar.ClientOptions{URL: pulsarAddress}) defer client.Close() @@ -650,7 +660,7 @@ func TestPulsarCtl(t *testing.T) { topic := "test" subName := "hello" - pulsarAddress, _ := Params.Load("_PulsarAddress") + pulsarAddress := getPulsarAddress() pc, err := NewClient(pulsar.ClientOptions{URL: pulsarAddress}) assert.Nil(t, err) consumer, err := pc.Subscribe(mqwrapper.ConsumerOptions{ diff --git a/internal/mq/msgstream/mqwrapper/pulsar/pulsar_consumer_test.go b/internal/mq/msgstream/mqwrapper/pulsar/pulsar_consumer_test.go index 61ac7b0efd..ca113b45d5 100644 --- a/internal/mq/msgstream/mqwrapper/pulsar/pulsar_consumer_test.go +++ b/internal/mq/msgstream/mqwrapper/pulsar/pulsar_consumer_test.go @@ -32,7 +32,7 @@ import ( ) func TestPulsarConsumer_Subscription(t *testing.T) { - pulsarAddress, _ := Params.Load("_PulsarAddress") + pulsarAddress := getPulsarAddress() pc, err := NewClient(pulsar.ClientOptions{URL: pulsarAddress}) assert.Nil(t, err) defer pc.Close() @@ -64,7 +64,7 @@ func Test_PatchEarliestMessageID(t *testing.T) { } func TestComsumeCompressedMessage(t *testing.T) { - pulsarAddress, _ := Params.Load("_PulsarAddress") + pulsarAddress := getPulsarAddress() pc, err := NewClient(pulsar.ClientOptions{URL: pulsarAddress}) assert.Nil(t, err) defer pc.Close() @@ -112,7 +112,7 @@ func TestComsumeCompressedMessage(t *testing.T) { } func TestPulsarConsumer_Close(t *testing.T) { - pulsarAddress, _ := Params.Load("_PulsarAddress") + pulsarAddress := getPulsarAddress() pc, err := NewClient(pulsar.ClientOptions{URL: pulsarAddress}) assert.Nil(t, err) @@ -139,7 +139,7 @@ func TestPulsarConsumer_Close(t *testing.T) { func TestPulsarClientCloseUnsubscribeError(t *testing.T) { topic := "TestPulsarClientCloseUnsubscribeError" subName := "test" - pulsarAddress, _ := Params.Load("_PulsarAddress") + pulsarAddress := getPulsarAddress() client, err := pulsar.NewClient(pulsar.ClientOptions{URL: pulsarAddress}) defer client.Close() @@ -191,7 +191,7 @@ func TestPulsarClientCloseUnsubscribeError(t *testing.T) { func TestPulsarClientUnsubscribeTwice(t *testing.T) { topic := "TestPulsarClientUnsubscribeTwice" subName := "test" - pulsarAddress, _ := Params.Load("_PulsarAddress") + pulsarAddress := getPulsarAddress() client, err := pulsar.NewClient(pulsar.ClientOptions{URL: pulsarAddress}) defer client.Close() diff --git a/internal/mq/msgstream/mqwrapper/pulsar/pulsar_producer_test.go b/internal/mq/msgstream/mqwrapper/pulsar/pulsar_producer_test.go index fcfc96a150..1411f133a6 100644 --- a/internal/mq/msgstream/mqwrapper/pulsar/pulsar_producer_test.go +++ b/internal/mq/msgstream/mqwrapper/pulsar/pulsar_producer_test.go @@ -27,7 +27,7 @@ import ( ) func TestPulsarProducer(t *testing.T) { - pulsarAddress, _ := Params.Load("_PulsarAddress") + pulsarAddress := getPulsarAddress() pc, err := NewClient(pulsar.ClientOptions{URL: pulsarAddress}) defer pc.Close() assert.NoError(t, err) diff --git a/internal/storage/minio_chunk_manager_test.go b/internal/storage/minio_chunk_manager_test.go index 0409a984ca..09d4524367 100644 --- a/internal/storage/minio_chunk_manager_test.go +++ b/internal/storage/minio_chunk_manager_test.go @@ -20,15 +20,18 @@ import ( "context" "path" "strconv" + "strings" "testing" + "github.com/milvus-io/milvus/internal/util/paramtable" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) // TODO: NewMinioChunkManager is deprecated. Rewrite this unittest. func newMinIOChunkManager(ctx context.Context, bucketName string) (*MinioChunkManager, error) { - endPoint, _ := Params.Load("_MinioAddress") + endPoint := getMinioAddress() accessKeyID, _ := Params.Load("minio.accessKeyID") secretAccessKey, _ := Params.Load("minio.secretAccessKey") useSSLStr, _ := Params.Load("minio.useSSL") @@ -45,6 +48,16 @@ func newMinIOChunkManager(ctx context.Context, bucketName string) (*MinioChunkMa ) return client, err } + +func getMinioAddress() string { + minioHost := Params.LoadWithDefault("minio.address", paramtable.DefaultMinioHost) + if strings.Contains(minioHost, ":") { + return minioHost + } + port := Params.LoadWithDefault("minio.port", paramtable.DefaultMinioPort) + return minioHost + ":" + port +} + func TestMinIOCMFail(t *testing.T) { ctx := context.Background() endPoint, _ := Params.Load("9.9.9.9") diff --git a/internal/util/dependency/factory.go b/internal/util/dependency/factory.go index 956970214c..92df2dfba3 100644 --- a/internal/util/dependency/factory.go +++ b/internal/util/dependency/factory.go @@ -74,7 +74,10 @@ func (f *DefaultFactory) Init(params *paramtable.ComponentParam) { func (f *DefaultFactory) initMQLocalService(params *paramtable.ComponentParam) msgstream.Factory { if params.RocksmqEnable() { - path, _ := params.Load("_RocksmqPath") + path, err := params.Load("rocksmq.path") + if err != nil { + panic(err) + } return msgstream.NewRmsFactory(path) } return nil diff --git a/internal/util/paramtable/base_table.go b/internal/util/paramtable/base_table.go index 2ceb8e243f..258e8d195a 100644 --- a/internal/util/paramtable/base_table.go +++ b/internal/util/paramtable/base_table.go @@ -86,7 +86,6 @@ func (gp *BaseTable) GlobalInitWithYaml(yaml string) { // Init initializes the param table. func (gp *BaseTable) Init() { - var err error formatter := func(key string) string { ret := strings.ToLower(key) ret = strings.TrimPrefix(ret, "milvus.") @@ -95,6 +94,13 @@ func (gp *BaseTable) Init() { ret = strings.ReplaceAll(ret, ".", "") return ret } + gp.initConfigsFromLocal(formatter) + gp.initConfigsFromRemote(formatter) + gp.InitLogCfg() +} + +func (gp *BaseTable) initConfigsFromLocal(formatter func(key string) string) { + var err error gp.mgr, err = config.Init(config.WithEnvSource(formatter)) if err != nil { return @@ -107,8 +113,9 @@ func (gp *BaseTable) Init() { log.Warn("init baseTable with file failed", zap.String("configFile", configFilePath), zap.Error(err)) return } - defer gp.InitLogCfg() +} +func (gp *BaseTable) initConfigsFromRemote(formatter func(key string) string) { endpoints, err := gp.mgr.GetConfig("etcd.endpoints") if err != nil { log.Info("cannot find etcd.endpoints") @@ -119,6 +126,8 @@ func (gp *BaseTable) Init() { log.Info("cannot find etcd.rootPath") return } + + configFilePath := gp.configDir + "/" + defaultYaml gp.mgr, err = config.Init(config.WithEnvSource(formatter), config.WithFilesSource(configFilePath), config.WithEtcdSource(&config.EtcdInfo{ @@ -400,125 +409,3 @@ func (gp *BaseTable) SetLogger(id UniqueID) { gp.LogCfgFunc(gp.Log) } } - -// func (gp *BaseTable) loadKafkaConfig() { -// brokerList := os.Getenv("KAFKA_BROKER_LIST") -// if brokerList == "" { -// brokerList = gp.Get("kafka.brokerList") -// } -// gp.Save("_KafkaBrokerList", brokerList) -// } - -// func (gp *BaseTable) loadPulsarConfig() { -// pulsarAddress := os.Getenv("PULSAR_ADDRESS") -// if pulsarAddress == "" { -// pulsarHost := gp.Get("pulsar.address") -// port := gp.Get("pulsar.port") -// if len(pulsarHost) != 0 && len(port) != 0 { -// pulsarAddress = "pulsar://" + pulsarHost + ":" + port -// } -// } -// gp.Save("_PulsarAddress", pulsarAddress) - -// // parse pulsar address to find the host -// pulsarURL, err := url.ParseRequestURI(pulsarAddress) -// if err != nil { -// gp.Save("_PulsarWebAddress", "") -// log.Info("failed to parse pulsar config, assume pulsar not used", zap.Error(err)) -// return -// } -// webport := gp.LoadWithDefault("pulsar.webport", "80") -// pulsarWebAddress := "http://" + pulsarURL.Hostname() + ":" + webport -// gp.Save("_PulsarWebAddress", pulsarWebAddress) -// log.Info("Pulsar config", zap.String("pulsar url", pulsarAddress), zap.String("pulsar web url", pulsarWebAddress)) -// } - -// func (gp *BaseTable) loadRocksMQConfig() { -// rocksmqPath := os.Getenv("ROCKSMQ_PATH") -// if rocksmqPath == "" { -// rocksmqPath = gp.Get("rocksmq.path") -// } -// gp.Save("_RocksmqPath", rocksmqPath) -// } - -// func (gp *BaseTable) loadMQConfig() { -// gp.loadPulsarConfig() -// gp.loadKafkaConfig() -// gp.loadRocksMQConfig() -// } - -// func (gp *BaseTable) loadEtcdConfig() { -// etcdEndpoints := os.Getenv("ETCD_ENDPOINTS") -// if etcdEndpoints == "" { -// etcdEndpoints = gp.LoadWithDefault("etcd.endpoints", DefaultEtcdEndpoints) -// } -// gp.Save("_EtcdEndpoints", etcdEndpoints) -// } - -// func (gp *BaseTable) loadMinioConfig() { -// minioAddress := os.Getenv("MINIO_ADDRESS") -// if minioAddress == "" { -// minioHost := gp.LoadWithDefault("minio.address", DefaultMinioHost) -// port := gp.LoadWithDefault("minio.port", DefaultMinioPort) -// minioAddress = minioHost + ":" + port -// } -// gp.Save("_MinioAddress", minioAddress) - -// minioAccessKey := os.Getenv("MINIO_ACCESS_KEY") -// if minioAccessKey == "" { -// minioAccessKey = gp.LoadWithDefault("minio.accessKeyID", DefaultMinioAccessKey) -// } -// gp.Save("_MinioAccessKeyID", minioAccessKey) - -// minioSecretKey := os.Getenv("MINIO_SECRET_KEY") -// if minioSecretKey == "" { -// minioSecretKey = gp.LoadWithDefault("minio.secretAccessKey", DefaultMinioSecretAccessKey) -// } -// gp.Save("_MinioSecretAccessKey", minioSecretKey) - -// minioUseSSL := os.Getenv("MINIO_USE_SSL") -// if minioUseSSL == "" { -// minioUseSSL = gp.LoadWithDefault("minio.useSSL", DefaultMinioUseSSL) -// } -// gp.Save("_MinioUseSSL", minioUseSSL) - -// minioBucketName := os.Getenv("MINIO_BUCKET_NAME") -// if minioBucketName == "" { -// minioBucketName = gp.LoadWithDefault("minio.bucketName", DefaultMinioBucketName) -// } -// gp.Save("_MinioBucketName", minioBucketName) - -// minioUseIAM := os.Getenv("MINIO_USE_IAM") -// if minioUseIAM == "" { -// minioUseIAM = gp.LoadWithDefault("minio.useIAM", DefaultMinioUseIAM) -// } -// gp.Save("_MinioUseIAM", minioUseIAM) - -// minioIAMEndpoint := os.Getenv("MINIO_IAM_ENDPOINT") -// if minioIAMEndpoint == "" { -// minioIAMEndpoint = gp.LoadWithDefault("minio.iamEndpoint", DefaultMinioIAMEndpoint) -// } -// gp.Save("_MinioIAMEndpoint", minioIAMEndpoint) -// } - -// func (gp *BaseTable) loadDataNodeConfig() { -// insertBufferFlushSize := os.Getenv("DATA_NODE_IBUFSIZE") -// if insertBufferFlushSize == "" { -// insertBufferFlushSize = gp.LoadWithDefault("datanode.flush.insertBufSize", DefaultInsertBufferSize) -// } -// gp.Save("_DATANODE_INSERTBUFSIZE", insertBufferFlushSize) -// } - -// func (gp *BaseTable) loadOtherEnvs() { -// // try to load environment start with ENV_PREFIX -// for _, e := range os.Environ() { -// parts := strings.SplitN(e, "=", 2) -// if strings.Contains(parts[0], DefaultEnvPrefix) { -// parts := strings.SplitN(e, "=", 2) -// // remove the ENV PREFIX and use the rest as key -// keyParts := strings.SplitAfterN(parts[0], ".", 2) -// // mem kv throw no errors -// gp.Save(keyParts[1], parts[1]) -// } -// } -// } diff --git a/internal/util/paramtable/base_table_test.go b/internal/util/paramtable/base_table_test.go index b634d2625c..0852579405 100644 --- a/internal/util/paramtable/base_table_test.go +++ b/internal/util/paramtable/base_table_test.go @@ -92,8 +92,11 @@ func TestBaseTable_Pulsar(t *testing.T) { os.Setenv("PULSAR_ADDRESS", "pulsar://localhost:6650") baseParams.Init() - address := baseParams.Get("_PulsarAddress") + address := baseParams.Get("pulsar.address") assert.Equal(t, "pulsar://localhost:6650", address) + + port := baseParams.Get("pulsar.port") + assert.NotEqual(t, "", port) } // func TestBaseTable_ConfDir(t *testing.T) { diff --git a/internal/util/paramtable/service_param.go b/internal/util/paramtable/service_param.go index 7bf369cec7..2605aa36f4 100644 --- a/internal/util/paramtable/service_param.go +++ b/internal/util/paramtable/service_param.go @@ -220,17 +220,23 @@ func (p *PulsarConfig) init(base *BaseTable) { } func (p *PulsarConfig) initAddress() { - addr := p.Base.LoadWithDefault("pulsar.address", "localhost") - // for compatible - if strings.Contains(addr, ":") { - p.Address = addr - } else { - port := p.Base.LoadWithDefault("pulsar.port", "6650") - p.Address = "pulsar://" + addr + ":" + port + pulsarHost := p.Base.LoadWithDefault("pulsar.address", "") + if strings.Contains(pulsarHost, ":") { + p.Address = pulsarHost + return + } + + port := p.Base.LoadWithDefault("pulsar.port", "") + if len(pulsarHost) != 0 && len(port) != 0 { + p.Address = "pulsar://" + pulsarHost + ":" + port } } func (p *PulsarConfig) initWebAddress() { + if p.Address == "" { + return + } + pulsarURL, err := url.ParseRequestURI(p.Address) if err != nil { p.WebAddress = "" @@ -358,7 +364,7 @@ func (p *MinioConfig) initAddress() { } func (p *MinioConfig) initAccessKeyID() { - keyID, err := p.Base.Load("_MinioAccessKeyID") + keyID, err := p.Base.Load("minio.accessKeyID") if err != nil { panic(err) } @@ -366,7 +372,7 @@ func (p *MinioConfig) initAccessKeyID() { } func (p *MinioConfig) initSecretAccessKey() { - key, err := p.Base.Load("_MinioSecretAccessKey") + key, err := p.Base.Load("minio.secretAccessKey") if err != nil { panic(err) } @@ -374,7 +380,7 @@ func (p *MinioConfig) initSecretAccessKey() { } func (p *MinioConfig) initUseSSL() { - usessl, err := p.Base.Load("_MinioUseSSL") + usessl, err := p.Base.Load("minio.useSSL") if err != nil { panic(err) } @@ -382,7 +388,7 @@ func (p *MinioConfig) initUseSSL() { } func (p *MinioConfig) initBucketName() { - bucketName, err := p.Base.Load("_MinioBucketName") + bucketName, err := p.Base.Load("minio.bucketName") if err != nil { panic(err) } diff --git a/internal/util/paramtable/service_param_test.go b/internal/util/paramtable/service_param_test.go index 7f35df29eb..4fa9f059e2 100644 --- a/internal/util/paramtable/service_param_test.go +++ b/internal/util/paramtable/service_param_test.go @@ -60,12 +60,50 @@ func TestServiceParam(t *testing.T) { }) t.Run("test pulsarConfig", func(t *testing.T) { + { + Params := SParams.PulsarCfg + assert.NotEqual(t, Params.Address, "") + t.Logf("pulsar address = %s", Params.Address) + assert.Equal(t, Params.MaxMessageSize, SuggestPulsarMaxMessageSize) + } + + address := "pulsar://localhost:6650" + { + Params := SParams.PulsarCfg + SParams.BaseTable.Save("pulsar.address", address) + Params.initAddress() + assert.Equal(t, Params.Address, address) + } + + { + Params := SParams.PulsarCfg + SParams.BaseTable.Save("pulsar.address", "localhost") + SParams.BaseTable.Save("pulsar.port", "6650") + Params.initAddress() + assert.Equal(t, Params.Address, address) + } + }) + + t.Run("test pulsar web config", func(t *testing.T) { Params := SParams.PulsarCfg - assert.NotEqual(t, Params.Address, "") - t.Logf("pulsar address = %s", Params.Address) - assert.Equal(t, Params.MaxMessageSize, SuggestPulsarMaxMessageSize) + { + Params.initWebAddress() + assert.NotEqual(t, Params.WebAddress, "") + } + + { + Params.Address = Params.Address + "invalid" + Params.initWebAddress() + assert.Equal(t, Params.WebAddress, "") + } + + { + Params.Address = "" + Params.initWebAddress() + assert.Equal(t, Params.WebAddress, "") + } }) t.Run("test rocksmqConfig", func(t *testing.T) { diff --git a/internal/util/sessionutil/session_util_test.go b/internal/util/sessionutil/session_util_test.go index b15c0527c5..9442e34cab 100644 --- a/internal/util/sessionutil/session_util_test.go +++ b/internal/util/sessionutil/session_util_test.go @@ -29,11 +29,8 @@ func TestGetServerIDConcurrently(t *testing.T) { ctx := context.Background() Params.Init() - endpoints, err := Params.Load("_EtcdEndpoints") + endpoints := Params.LoadWithDefault("etcd.endpoints", paramtable.DefaultEtcdEndpoints) metaRoot := fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot) - if err != nil { - panic(err) - } etcdEndpoints := strings.Split(endpoints, ",") etcdCli, err := etcd.GetRemoteEtcdClient(etcdEndpoints) @@ -76,10 +73,7 @@ func TestInit(t *testing.T) { ctx := context.Background() Params.Init() - endpoints, err := Params.Load("_EtcdEndpoints") - if err != nil { - panic(err) - } + endpoints := Params.LoadWithDefault("etcd.endpoints", paramtable.DefaultEtcdEndpoints) metaRoot := fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot) etcdEndpoints := strings.Split(endpoints, ",") @@ -106,11 +100,7 @@ func TestUpdateSessions(t *testing.T) { ctx := context.Background() Params.Init() - endpoints, err := Params.Load("_EtcdEndpoints") - if err != nil { - panic(err) - } - + endpoints := Params.LoadWithDefault("etcd.endpoints", paramtable.DefaultEtcdEndpoints) etcdEndpoints := strings.Split(endpoints, ",") metaRoot := fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot) etcdCli, err := etcd.GetRemoteEtcdClient(etcdEndpoints) @@ -224,9 +214,7 @@ func TestWatcherHandleWatchResp(t *testing.T) { ctx := context.Background() Params.Init() - endpoints, err := Params.Load("_EtcdEndpoints") - require.NoError(t, err) - + endpoints := Params.LoadWithDefault("etcd.endpoints", paramtable.DefaultEtcdEndpoints) etcdEndpoints := strings.Split(endpoints, ",") metaRoot := fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot) @@ -375,10 +363,7 @@ func TestSessionRevoke(t *testing.T) { ctx := context.Background() Params.Init() - endpoints, err := Params.Load("_EtcdEndpoints") - if err != nil { - panic(err) - } + endpoints := Params.LoadWithDefault("etcd.endpoints", paramtable.DefaultEtcdEndpoints) metaRoot := fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot) etcdEndpoints := strings.Split(endpoints, ",")