From 7efb02a4c5d5ed91bab45b999fd82016bc7bf224 Mon Sep 17 00:00:00 2001 From: Cai Yudong Date: Sun, 9 Jan 2022 23:45:33 +0800 Subject: [PATCH] Use PulsarConfig in GlobalParams for all components (#15046) Signed-off-by: yudong.cai --- internal/datacoord/server.go | 2 +- internal/datacoord/server_test.go | 2 +- internal/datanode/data_node.go | 2 +- internal/datanode/data_node_test.go | 2 +- internal/datanode/data_sync_service_test.go | 2 +- .../flow_graph_insert_buffer_node_test.go | 8 +- internal/proxy/proxy.go | 2 +- internal/proxy/task.go | 2 +- internal/querycoord/cluster_test.go | 2 +- internal/querycoord/query_coord.go | 2 +- internal/querynode/load_service_test.go | 2 +- internal/querynode/mock_test.go | 2 +- internal/querynode/query_collection_test.go | 2 +- internal/querynode/query_node.go | 2 +- internal/querynode/query_node_test.go | 2 +- internal/querynode/stats_service_test.go | 4 +- internal/rootcoord/dml_channels_test.go | 2 +- internal/rootcoord/root_coord.go | 6 +- internal/rootcoord/root_coord_test.go | 8 +- internal/rootcoord/timestamp_test.go | 2 +- internal/rootcoord/timeticksync_test.go | 2 +- internal/util/paramtable/global_param.go | 130 ++++++------------ internal/util/paramtable/global_param_test.go | 25 ++-- 23 files changed, 79 insertions(+), 136 deletions(-) diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 0e93e87bc5..293378b1ad 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -256,7 +256,7 @@ func (s *Server) Init() error { func (s *Server) Start() error { var err error m := map[string]interface{}{ - "PulsarAddress": Params.DataCoordCfg.PulsarAddress, + "PulsarAddress": Params.PulsarCfg.Address, "ReceiveBufSize": 1024, "PulsarBufSize": 1024} err = s.msFactory.SetParams(m) diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index d081eeeedc..a271830bc4 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -2237,7 +2237,7 @@ func newTestServer(t *testing.T, receiveCh chan interface{}, opts ...Option) *Se var err error factory := msgstream.NewPmsFactory() m := map[string]interface{}{ - "pulsarAddress": Params.DataCoordCfg.PulsarAddress, + "pulsarAddress": Params.PulsarCfg.Address, "receiveBufSize": 1024, "pulsarBufSize": 1024, } diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index a6396a418c..48dfeec894 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -221,7 +221,7 @@ func (node *DataNode) Init() error { Params.DataNodeCfg.Refresh() m := map[string]interface{}{ - "PulsarAddress": Params.DataNodeCfg.PulsarAddress, + "PulsarAddress": Params.PulsarCfg.Address, "ReceiveBufSize": 1024, "PulsarBufSize": 1024, } diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index 9d19eef295..debb45c3c3 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -235,7 +235,7 @@ func TestDataNode(t *testing.T) { // pulsar produce msFactory := msgstream.NewPmsFactory() m := map[string]interface{}{ - "pulsarAddress": Params.DataNodeCfg.PulsarAddress, + "pulsarAddress": Params.PulsarCfg.Address, "receiveBufSize": 1024, "pulsarBufSize": 1024} err = msFactory.SetParams(m) diff --git a/internal/datanode/data_sync_service_test.go b/internal/datanode/data_sync_service_test.go index ed3f677d59..a8acc0b2ec 100644 --- a/internal/datanode/data_sync_service_test.go +++ b/internal/datanode/data_sync_service_test.go @@ -177,7 +177,7 @@ func TestDataSyncService_Start(t *testing.T) { defer cancel() // init data node - pulsarURL := Params.DataNodeCfg.PulsarAddress + pulsarURL := Params.PulsarCfg.Address Factory := &MetaFactory{} collMeta := Factory.GetCollectionMeta(UniqueID(0), "coll1") diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go index fe6af4ac13..0c55823e36 100644 --- a/internal/datanode/flow_graph_insert_buffer_node_test.go +++ b/internal/datanode/flow_graph_insert_buffer_node_test.go @@ -76,7 +76,7 @@ func TestFlowGraphInsertBufferNodeCreate(t *testing.T) { msFactory := msgstream.NewPmsFactory() m := map[string]interface{}{ "receiveBufSize": 1024, - "pulsarAddress": Params.DataNodeCfg.PulsarAddress, + "pulsarAddress": Params.PulsarCfg.Address, "pulsarBufSize": 1024} err = msFactory.SetParams(m) assert.Nil(t, err) @@ -166,7 +166,7 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) { msFactory := msgstream.NewPmsFactory() m := map[string]interface{}{ "receiveBufSize": 1024, - "pulsarAddress": Params.DataNodeCfg.PulsarAddress, + "pulsarAddress": Params.PulsarCfg.Address, "pulsarBufSize": 1024} err = msFactory.SetParams(m) assert.Nil(t, err) @@ -372,7 +372,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) { msFactory := msgstream.NewPmsFactory() m := map[string]interface{}{ "receiveBufSize": 1024, - "pulsarAddress": Params.DataNodeCfg.PulsarAddress, + "pulsarAddress": Params.PulsarCfg.Address, "pulsarBufSize": 1024} err = msFactory.SetParams(m) assert.Nil(t, err) @@ -646,7 +646,7 @@ func TestInsertBufferNode_bufferInsertMsg(t *testing.T) { msFactory := msgstream.NewPmsFactory() m := map[string]interface{}{ "receiveBufSize": 1024, - "pulsarAddress": Params.DataNodeCfg.PulsarAddress, + "pulsarAddress": Params.PulsarCfg.Address, "pulsarBufSize": 1024} err = msFactory.SetParams(m) assert.Nil(t, err) diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index b99cef9353..67b67a087c 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -177,7 +177,7 @@ func (node *Proxy) Init() error { } m := map[string]interface{}{ - "PulsarAddress": Params.ProxyCfg.PulsarAddress, + "PulsarAddress": Params.PulsarCfg.Address, "PulsarBufSize": 1024} log.Debug("set parameters for ms factory", zap.String("role", typeutil.ProxyRole), zap.Any("parameters", m)) if err := node.msFactory.SetParams(m); err != nil { diff --git a/internal/proxy/task.go b/internal/proxy/task.go index 394e16b1d2..9e086705c5 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -891,7 +891,7 @@ func (it *insertTask) _assignSegmentID(stream msgstream.MsgStream, pack *msgstre return 0 } - threshold := Params.ProxyCfg.PulsarMaxMessageSize + threshold := Params.PulsarCfg.MaxMessageSize // not accurate /* #nosec G103 */ getFixedSizeOfInsertMsg := func(msg *msgstream.InsertMsg) int { diff --git a/internal/querycoord/cluster_test.go b/internal/querycoord/cluster_test.go index 3ab8a74eb0..9fa59b6914 100644 --- a/internal/querycoord/cluster_test.go +++ b/internal/querycoord/cluster_test.go @@ -469,7 +469,7 @@ func TestGrpcRequest(t *testing.T) { clusterSession.Register() factory := msgstream.NewPmsFactory() m := map[string]interface{}{ - "PulsarAddress": Params.QueryCoordCfg.PulsarAddress, + "PulsarAddress": Params.PulsarCfg.Address, "ReceiveBufSize": 1024, "PulsarBufSize": 1024} err = factory.SetParams(m) diff --git a/internal/querycoord/query_coord.go b/internal/querycoord/query_coord.go index 184c0046c1..7566fd0af0 100644 --- a/internal/querycoord/query_coord.go +++ b/internal/querycoord/query_coord.go @@ -191,7 +191,7 @@ func (qc *QueryCoord) Init() error { // Start function starts the goroutines to watch the meta and node updates func (qc *QueryCoord) Start() error { m := map[string]interface{}{ - "PulsarAddress": Params.QueryCoordCfg.PulsarAddress, + "PulsarAddress": Params.PulsarCfg.Address, "ReceiveBufSize": 1024, "PulsarBufSize": 1024} err := qc.msFactory.SetParams(m) diff --git a/internal/querynode/load_service_test.go b/internal/querynode/load_service_test.go index 1ce337b480..cac268036b 100644 --- a/internal/querynode/load_service_test.go +++ b/internal/querynode/load_service_test.go @@ -928,7 +928,7 @@ func doInsert(ctx context.Context, collectionID UniqueID, partitionID UniqueID, msFactory := msgstream.NewPmsFactory() m := map[string]interface{}{ "receiveBufSize": receiveBufSize, - "pulsarAddress": Params.QueryNodeCfg.PulsarAddress, + "pulsarAddress": Params.PulsarCfg.Address, "pulsarBufSize": 1024} err := msFactory.SetParams(m) if err != nil { diff --git a/internal/querynode/mock_test.go b/internal/querynode/mock_test.go index 41511f92e6..e0557e041e 100644 --- a/internal/querynode/mock_test.go +++ b/internal/querynode/mock_test.go @@ -382,7 +382,7 @@ func genEtcdKV() (*etcdkv.EtcdKV, error) { func genFactory() (msgstream.Factory, error) { const receiveBufSize = 1024 - pulsarURL := Params.QueryNodeCfg.PulsarAddress + pulsarURL := Params.PulsarCfg.Address msFactory := msgstream.NewPmsFactory() m := map[string]interface{}{ "receiveBufSize": receiveBufSize, diff --git a/internal/querynode/query_collection_test.go b/internal/querynode/query_collection_test.go index 43a80e2976..04c7117170 100644 --- a/internal/querynode/query_collection_test.go +++ b/internal/querynode/query_collection_test.go @@ -126,7 +126,7 @@ func updateTSafe(queryCollection *queryCollection, timestamp Timestamp) error { func TestQueryCollection_withoutVChannel(t *testing.T) { ctx := context.Background() m := map[string]interface{}{ - "PulsarAddress": Params.QueryNodeCfg.PulsarAddress, + "PulsarAddress": Params.PulsarCfg.Address, "ReceiveBufSize": 1024, "PulsarBufSize": 1024} factory := msgstream.NewPmsFactory() diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index 15d386bb7a..e82f9f62dd 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -250,7 +250,7 @@ func (node *QueryNode) Init() error { func (node *QueryNode) Start() error { var err error m := map[string]interface{}{ - "PulsarAddress": Params.QueryNodeCfg.PulsarAddress, + "PulsarAddress": Params.PulsarCfg.Address, "ReceiveBufSize": 1024, "PulsarBufSize": 1024} err = node.msFactory.SetParams(m) diff --git a/internal/querynode/query_node_test.go b/internal/querynode/query_node_test.go index fc9b1947ba..39132d56df 100644 --- a/internal/querynode/query_node_test.go +++ b/internal/querynode/query_node_test.go @@ -223,7 +223,7 @@ func makeNewChannelNames(names []string, suffix string) []string { func newMessageStreamFactory() (msgstream.Factory, error) { const receiveBufSize = 1024 - pulsarURL := Params.QueryNodeCfg.PulsarAddress + pulsarURL := Params.PulsarCfg.Address msFactory := msgstream.NewPmsFactory() m := map[string]interface{}{ "receiveBufSize": receiveBufSize, diff --git a/internal/querynode/stats_service_test.go b/internal/querynode/stats_service_test.go index 4a86be6360..1f37fde4bd 100644 --- a/internal/querynode/stats_service_test.go +++ b/internal/querynode/stats_service_test.go @@ -31,7 +31,7 @@ func TestStatsService_start(t *testing.T) { msFactory := msgstream.NewPmsFactory() m := map[string]interface{}{ - "PulsarAddress": Params.QueryNodeCfg.PulsarAddress, + "PulsarAddress": Params.PulsarCfg.Address, "ReceiveBufSize": 1024, "PulsarBufSize": 1024} msFactory.SetParams(m) @@ -55,7 +55,7 @@ func TestSegmentManagement_sendSegmentStatistic(t *testing.T) { msFactory := msgstream.NewPmsFactory() m := map[string]interface{}{ "receiveBufSize": receiveBufSize, - "pulsarAddress": Params.QueryNodeCfg.PulsarAddress, + "pulsarAddress": Params.PulsarCfg.Address, "pulsarBufSize": 1024} err = msFactory.SetParams(m) assert.Nil(t, err) diff --git a/internal/rootcoord/dml_channels_test.go b/internal/rootcoord/dml_channels_test.go index e37162aaf7..086a33acd8 100644 --- a/internal/rootcoord/dml_channels_test.go +++ b/internal/rootcoord/dml_channels_test.go @@ -42,7 +42,7 @@ func TestDmlChannels(t *testing.T) { Params.Init() m := map[string]interface{}{ - "pulsarAddress": Params.RootCoordCfg.PulsarAddress, + "pulsarAddress": Params.PulsarCfg.Address, "receiveBufSize": 1024, "pulsarBufSize": 1024} err := factory.SetParams(m) diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 03d2952d9a..132a76c934 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -443,8 +443,8 @@ func (c *Core) setDdMsgSendFlag(b bool) error { } func (c *Core) setMsgStreams() error { - if Params.RootCoordCfg.PulsarAddress == "" { - return fmt.Errorf("pulsarAddress is empty") + if Params.PulsarCfg.Address == "" { + return fmt.Errorf("pulsar address is empty") } if Params.RootCoordCfg.MsgChannelSubName == "" { return fmt.Errorf("msgChannelSubName is empty") @@ -1021,7 +1021,7 @@ func (c *Core) Init() error { } m := map[string]interface{}{ - "PulsarAddress": Params.RootCoordCfg.PulsarAddress, + "PulsarAddress": Params.PulsarCfg.Address, "ReceiveBufSize": 1024, "PulsarBufSize": 1024} if initError = c.msFactory.SetParams(m); initError != nil { diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index 6b787bf05b..2c66fde89d 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -619,7 +619,7 @@ func TestRootCoord(t *testing.T) { tmpFactory := msgstream.NewPmsFactory() m := map[string]interface{}{ - "pulsarAddress": Params.RootCoordCfg.PulsarAddress, + "pulsarAddress": Params.PulsarCfg.Address, "receiveBufSize": 1024, "pulsarBufSize": 1024} err = tmpFactory.SetParams(m) @@ -2355,7 +2355,7 @@ func TestRootCoord2(t *testing.T) { m := map[string]interface{}{ "receiveBufSize": 1024, - "pulsarAddress": Params.RootCoordCfg.PulsarAddress, + "pulsarAddress": Params.PulsarCfg.Address, "pulsarBufSize": 1024} err = msFactory.SetParams(m) assert.Nil(t, err) @@ -2636,7 +2636,7 @@ func TestCheckFlushedSegments(t *testing.T) { m := map[string]interface{}{ "receiveBufSize": 1024, - "pulsarAddress": Params.RootCoordCfg.PulsarAddress, + "pulsarAddress": Params.PulsarCfg.Address, "pulsarBufSize": 1024} err = msFactory.SetParams(m) assert.Nil(t, err) @@ -2803,7 +2803,7 @@ func TestRootCoord_CheckZeroShardsNum(t *testing.T) { m := map[string]interface{}{ "receiveBufSize": 1024, - "pulsarAddress": Params.RootCoordCfg.PulsarAddress, + "pulsarAddress": Params.PulsarCfg.Address, "pulsarBufSize": 1024} err = msFactory.SetParams(m) assert.Nil(t, err) diff --git a/internal/rootcoord/timestamp_test.go b/internal/rootcoord/timestamp_test.go index 8d23dfb765..326ed9317a 100644 --- a/internal/rootcoord/timestamp_test.go +++ b/internal/rootcoord/timestamp_test.go @@ -122,7 +122,7 @@ func BenchmarkAllocTimestamp(b *testing.B) { m := map[string]interface{}{ "receiveBufSize": 1024, - "pulsarAddress": Params.RootCoordCfg.PulsarAddress, + "pulsarAddress": Params.PulsarCfg.Address, "pulsarBufSize": 1024} err = msFactory.SetParams(m) assert.Nil(b, err) diff --git a/internal/rootcoord/timeticksync_test.go b/internal/rootcoord/timeticksync_test.go index bb4e58b762..f6bac1d768 100644 --- a/internal/rootcoord/timeticksync_test.go +++ b/internal/rootcoord/timeticksync_test.go @@ -38,7 +38,7 @@ func TestTimetickSync(t *testing.T) { factory := msgstream.NewPmsFactory() m := map[string]interface{}{ - "pulsarAddress": Params.RootCoordCfg.PulsarAddress, + "pulsarAddress": Params.PulsarCfg.Address, "receiveBufSize": 1024, "pulsarBufSize": 1024} err := factory.SetParams(m) diff --git a/internal/util/paramtable/global_param.go b/internal/util/paramtable/global_param.go index 1f5ea7c97f..4d5c2473dc 100644 --- a/internal/util/paramtable/global_param.go +++ b/internal/util/paramtable/global_param.go @@ -53,6 +53,7 @@ type GlobalParamTable struct { once sync.Once BaseParams BaseParamTable + PulsarCfg pulsarConfig //CommonCfg commonConfig //KnowhereCfg knowhereConfig //MsgChannelCfg msgChannelConfig @@ -78,6 +79,7 @@ func (p *GlobalParamTable) InitOnce() { func (p *GlobalParamTable) Init() { p.BaseParams.Init() + p.PulsarCfg.init(&p.BaseParams) //p.CommonCfg.init(&p.BaseParams) //p.KnowhereCfg.init(&p.BaseParams) //p.MsgChannelCfg.init(&p.BaseParams) @@ -100,6 +102,43 @@ func (p *GlobalParamTable) SetLogConfig(role string) { // TODO: considering remove it: comment a large block of code is not a good practice, old code can be found with git /////////////////////////////////////////////////////////////////////////////// +// --- pulsar --- +type pulsarConfig struct { + BaseParams *BaseParamTable + + Address string + MaxMessageSize int +} + +func (p *pulsarConfig) init(bp *BaseParamTable) { + p.BaseParams = bp + + p.initAddress() + p.initMaxMessageSize() +} + +func (p *pulsarConfig) initAddress() { + addr, err := p.BaseParams.Load("_PulsarAddress") + if err != nil { + panic(err) + } + p.Address = addr +} + +func (p *pulsarConfig) initMaxMessageSize() { + maxMessageSizeStr, err := p.BaseParams.Load("pulsar.maxMessageSize") + if err != nil { + p.MaxMessageSize = SuggestPulsarMaxMessageSize + } else { + maxMessageSize, err := strconv.Atoi(maxMessageSizeStr) + if err != nil { + p.MaxMessageSize = SuggestPulsarMaxMessageSize + } else { + p.MaxMessageSize = maxMessageSize + } + } +} + // --- common --- //type commonConfig struct { // BaseParams *BaseParamTable @@ -294,8 +333,6 @@ type rootCoordConfig struct { Address string Port int - PulsarAddress string - ClusterChannelPrefix string MsgChannelSubName string TimeTickChannel string @@ -316,8 +353,6 @@ type rootCoordConfig struct { func (p *rootCoordConfig) init(bp *BaseParamTable) { p.BaseParams = bp - p.initPulsarAddress() - // Has to init global msgchannel prefix before other channel names p.initClusterMsgChannelPrefix() p.initMsgChannelSubName() @@ -333,14 +368,6 @@ func (p *rootCoordConfig) init(bp *BaseParamTable) { p.initDefaultIndexName() } -func (p *rootCoordConfig) initPulsarAddress() { - addr, err := p.BaseParams.Load("_PulsarAddress") - if err != nil { - panic(err) - } - p.PulsarAddress = addr -} - func (p *rootCoordConfig) initClusterMsgChannelPrefix() { config, err := p.BaseParams.Load("msgChannel.chanNamePrefix.cluster") if err != nil { @@ -429,8 +456,6 @@ type proxyConfig struct { Alias string - PulsarAddress string - RocksmqPath string // not used in Proxy ProxyID UniqueID @@ -456,8 +481,6 @@ type proxyConfig struct { MaxTaskNum int64 - PulsarMaxMessageSize int - RetentionDuration int64 CreatedTime time.Time @@ -467,7 +490,6 @@ type proxyConfig struct { func (p *proxyConfig) init(bp *BaseParamTable) { p.BaseParams = bp - p.initPulsarAddress() p.initRocksmqPath() p.initTimeTickInterval() @@ -483,8 +505,6 @@ func (p *proxyConfig) init(bp *BaseParamTable) { p.initDefaultPartitionName() p.initDefaultIndexName() - p.initPulsarMaxMessageSize() - p.initMaxTaskNum() p.initBufFlagExpireTime() p.initBufFlagCleanupInterval() @@ -501,14 +521,6 @@ func (p *proxyConfig) InitAlias(alias string) { p.Alias = alias } -func (p *proxyConfig) initPulsarAddress() { - ret, err := p.BaseParams.Load("_PulsarAddress") - if err != nil { - panic(err) - } - p.PulsarAddress = ret -} - func (p *proxyConfig) initRocksmqPath() { path, err := p.BaseParams.Load("_RocksmqPath") if err != nil { @@ -599,20 +611,6 @@ func (p *proxyConfig) initDefaultIndexName() { p.DefaultIndexName = name } -func (p *proxyConfig) initPulsarMaxMessageSize() { - maxMessageSizeStr, err := p.BaseParams.Load("pulsar.maxMessageSize") - if err != nil { - p.PulsarMaxMessageSize = SuggestPulsarMaxMessageSize - } else { - maxMessageSize, err := strconv.Atoi(maxMessageSizeStr) - if err != nil { - p.PulsarMaxMessageSize = SuggestPulsarMaxMessageSize - } else { - p.PulsarMaxMessageSize = maxMessageSize - } - } -} - func (p *proxyConfig) initMaxTaskNum() { p.MaxTaskNum = p.BaseParams.ParseInt64WithDefault("proxy.maxTaskNum", 1024) } @@ -666,9 +664,6 @@ type queryCoordConfig struct { DmlChannelPrefix string DeltaChannelPrefix string - // --- Pulsar --- - PulsarAddress string - //---- Handoff --- AutoHandoff bool @@ -696,9 +691,6 @@ func (p *queryCoordConfig) init(bp *BaseParamTable) { p.initMinioUseSSLStr() p.initMinioBucketName() - //--- Pulsar ---- - p.initPulsarAddress() - //---- Handoff --- p.initAutoHandoff() @@ -801,14 +793,6 @@ func (p *queryCoordConfig) initMinioBucketName() { p.MinioBucketName = bucketName } -func (p *queryCoordConfig) initPulsarAddress() { - addr, err := p.BaseParams.Load("_PulsarAddress") - if err != nil { - panic(err) - } - p.PulsarAddress = addr -} - func (p *queryCoordConfig) initAutoHandoff() { handoff, err := p.BaseParams.Load("queryCoord.autoHandoff") if err != nil { @@ -879,8 +863,7 @@ func (p *queryCoordConfig) initDeltaChannelName() { type queryNodeConfig struct { BaseParams *BaseParamTable - PulsarAddress string - RocksmqPath string + RocksmqPath string Alias string QueryNodeIP string @@ -950,7 +933,6 @@ func (p *queryNodeConfig) init(bp *BaseParamTable) { p.initMinioUseSSLStr() p.initMinioBucketName() - p.initPulsarAddress() p.initRocksmqPath() p.initGracefulTime() @@ -1053,14 +1035,6 @@ func (p *queryNodeConfig) initMinioBucketName() { p.MinioBucketName = bucketName } -func (p *queryNodeConfig) initPulsarAddress() { - url, err := p.BaseParams.Load("_PulsarAddress") - if err != nil { - panic(err) - } - p.PulsarAddress = url -} - func (p *queryNodeConfig) initRocksmqPath() { path, err := p.BaseParams.Load("_RocksmqPath") if err != nil { @@ -1184,9 +1158,6 @@ type dataCoordConfig struct { MinioBucketName string MinioRootPath string - // --- Pulsar --- - PulsarAddress string - // --- Rocksmq --- RocksmqPath string @@ -1222,7 +1193,6 @@ func (p *dataCoordConfig) init(bp *BaseParamTable) { p.initChannelWatchPrefix() - p.initPulsarAddress() p.initRocksmqPath() p.initSegmentMaxSize() @@ -1254,14 +1224,6 @@ func (p *dataCoordConfig) init(bp *BaseParamTable) { p.initGCDropTolerance() } -func (p *dataCoordConfig) initPulsarAddress() { - addr, err := p.BaseParams.Load("_PulsarAddress") - if err != nil { - panic(err) - } - p.PulsarAddress = addr -} - func (p *dataCoordConfig) initRocksmqPath() { path, err := p.BaseParams.Load("_RocksmqPath") if err != nil { @@ -1436,9 +1398,6 @@ type dataNodeConfig struct { DmlChannelName string DeltaChannelName string - // Pulsar address - PulsarAddress string - // Rocksmq path RocksmqPath string @@ -1475,7 +1434,6 @@ func (p *dataNodeConfig) init(bp *BaseParamTable) { p.initStatsBinlogRootPath() p.initDeleteBinlogRootPath() - p.initPulsarAddress() p.initRocksmqPath() // Must init global msgchannel prefix before other channel names @@ -1542,14 +1500,6 @@ func (p *dataNodeConfig) initDeleteBinlogRootPath() { p.DeleteBinlogRootPath = path.Join(rootPath, "delta_log") } -func (p *dataNodeConfig) initPulsarAddress() { - url, err := p.BaseParams.Load("_PulsarAddress") - if err != nil { - panic(err) - } - p.PulsarAddress = url -} - func (p *dataNodeConfig) initRocksmqPath() { path, err := p.BaseParams.Load("_RocksmqPath") if err != nil { diff --git a/internal/util/paramtable/global_param_test.go b/internal/util/paramtable/global_param_test.go index 0148116066..18fc465469 100644 --- a/internal/util/paramtable/global_param_test.go +++ b/internal/util/paramtable/global_param_test.go @@ -15,7 +15,6 @@ import ( "log" "os" "path" - "strings" "testing" "time" @@ -33,12 +32,18 @@ func TestGlobalParamTable(t *testing.T) { var GlobalParams GlobalParamTable GlobalParams.Init() + t.Run("test pulsarConfig", func(t *testing.T) { + Params := GlobalParams.PulsarCfg + + assert.NotEqual(t, Params.Address, "") + t.Logf("pulsar address = %s", Params.Address) + + assert.Equal(t, Params.MaxMessageSize, SuggestPulsarMaxMessageSize) + }) + t.Run("test rootCoordConfig", func(t *testing.T) { Params := GlobalParams.RootCoordCfg - assert.NotEqual(t, Params.PulsarAddress, "") - t.Logf("pulsar address = %s", Params.PulsarAddress) - assert.Equal(t, Params.MsgChannelSubName, "by-dev-rootCoord") t.Logf("msg channel sub name = %s", Params.MsgChannelSubName) @@ -75,8 +80,6 @@ func TestGlobalParamTable(t *testing.T) { t.Run("test proxyConfig", func(t *testing.T) { Params := GlobalParams.ProxyCfg - t.Logf("PulsarAddress: %s", Params.PulsarAddress) - t.Logf("RocksmqPath: %s", Params.RocksmqPath) t.Logf("TimeTickInterval: %v", Params.TimeTickInterval) @@ -101,8 +104,6 @@ func TestGlobalParamTable(t *testing.T) { t.Logf("DefaultIndexName: %s", Params.DefaultIndexName) - t.Logf("PulsarMaxMessageSize: %d", Params.PulsarMaxMessageSize) - //t.Logf("RoleName: %s", typeutil.ProxyRole) t.Logf("MaxTaskNum: %d", Params.MaxTaskNum) @@ -166,11 +167,6 @@ func TestGlobalParamTable(t *testing.T) { t.Run("test queryNodeConfig", func(t *testing.T) { Params := GlobalParams.QueryNodeCfg - address := Params.PulsarAddress - split := strings.Split(address, ":") - assert.Equal(t, "pulsar", split[0]) - assert.Equal(t, "6650", split[len(split)-1]) - cacheSize := Params.CacheSize assert.Equal(t, int64(32), cacheSize) err := os.Setenv("CACHE_SIZE", "2") @@ -265,9 +261,6 @@ func TestGlobalParamTable(t *testing.T) { path1 := Params.InsertBinlogRootPath log.Println("InsertBinlogRootPath:", path1) - address := Params.PulsarAddress - log.Println("PulsarAddress:", address) - path1 = Params.ClusterChannelPrefix assert.Equal(t, path1, "by-dev") log.Println("ClusterChannelPrefix:", Params.ClusterChannelPrefix)