diff --git a/configs/advanced/channel.yaml b/configs/advanced/channel.yaml index ee741eca99..46aad3a6d0 100644 --- a/configs/advanced/channel.yaml +++ b/configs/advanced/channel.yaml @@ -12,6 +12,7 @@ msgChannel: # channel name generation rule: ${namePrefix}-${ChannelIdx} chanNamePrefix: + cluster: "by-dev" rootCoordTimeTick: "rootcoord-timetick" rootCoordStatistics: "rootcoord-statistics" rootCoordDml: "rootcoord-dml" @@ -29,7 +30,7 @@ msgChannel: # sub name generation rule: ${subNamePrefix}-${NodeID} subNamePrefix: - rootCoordSubNamePrefix: "rootcoord" + rootCoordSubNamePrefix: "rootCoord" proxySubNamePrefix: "proxy" queryNodeSubNamePrefix: "queryNode" dataNodeSubNamePrefix: "dataNode" diff --git a/internal/datacoord/param_table.go b/internal/datacoord/param_table.go index 66561ac27f..3fb8039045 100644 --- a/internal/datacoord/param_table.go +++ b/internal/datacoord/param_table.go @@ -46,11 +46,13 @@ type ParamTable struct { FlushStreamPosSubPath string StatsStreamPosSubPath string - // segment + // --- SEGMENTS --- SegmentMaxSize float64 SegmentSealProportion float64 SegAssignmentExpiration int64 + // --- Channels --- + ClusterChannelPrefix string InsertChannelPrefixName string StatisticsChannelName string TimeTickChannelName string @@ -63,37 +65,46 @@ type ParamTable struct { var Params ParamTable var once sync.Once +/* Init params from base table as well as data coord yaml*/ func (p *ParamTable) Init() { + // load yaml + p.BaseTable.Init() + + if err := p.LoadYaml("advanced/data_coord.yaml"); err != nil { + panic(err) + } + + // set members + p.initEtcdEndpoints() + p.initMetaRootPath() + p.initKvRootPath() + p.initSegmentBinlogSubPath() + p.initCollectionBinlogSubPath() + + p.initPulsarAddress() + p.initRocksmqPath() + + p.initSegmentMaxSize() + p.initSegmentSealProportion() + p.initSegAssignmentExpiration() + + // Has to init global msgchannel prefix before other channel names + p.initClusterMsgChannelPrefix() + p.initInsertChannelPrefixName() + p.initStatisticsChannelName() + p.initTimeTickChannelName() + p.initSegmentInfoChannelName() + p.initDataCoordSubscriptionName() + p.initLogCfg() + + p.initFlushStreamPosSubPath() + p.initStatsStreamPosSubPath() +} + +// Init once ensure param table is a singleton +func (p *ParamTable) InitOnce() { once.Do(func() { - // load yaml - p.BaseTable.Init() - - if err := p.LoadYaml("advanced/data_coord.yaml"); err != nil { - panic(err) - } - - // set members - p.initEtcdEndpoints() - p.initMetaRootPath() - p.initKvRootPath() - p.initSegmentBinlogSubPath() - p.initCollectionBinlogSubPath() - - p.initPulsarAddress() - p.initRocksmqPath() - - p.initSegmentMaxSize() - p.initSegmentSealProportion() - p.initSegAssignmentExpiration() - p.initInsertChannelPrefixName() - p.initStatisticsChannelName() - p.initTimeTickChannelName() - p.initSegmentInfoChannelName() - p.initDataCoordSubscriptionName() - p.initLogCfg() - - p.initFlushStreamPosSubPath() - p.initStatsStreamPosSubPath() + p.Init() }) } @@ -173,44 +184,57 @@ func (p *ParamTable) initSegAssignmentExpiration() { p.SegAssignmentExpiration = p.ParseInt64("datacoord.segment.assignmentExpiration") } -func (p *ParamTable) initInsertChannelPrefixName() { - var err error - p.InsertChannelPrefixName, err = p.Load("msgChannel.chanNamePrefix.dataCoordInsertChannel") +func (p *ParamTable) initClusterMsgChannelPrefix() { + config, err := p.Load("msgChannel.chanNamePrefix.cluster") if err != nil { panic(err) } + p.ClusterChannelPrefix = config +} + +func (p *ParamTable) initInsertChannelPrefixName() { + config, err := p.Load("msgChannel.chanNamePrefix.dataCoordInsertChannel") + if err != nil { + panic(err) + } + s := []string{p.ClusterChannelPrefix, config} + p.InsertChannelPrefixName = strings.Join(s, "-") } func (p *ParamTable) initStatisticsChannelName() { - var err error - p.StatisticsChannelName, err = p.Load("msgChannel.chanNamePrefix.dataCoordStatistic") + config, err := p.Load("msgChannel.chanNamePrefix.dataCoordStatistic") if err != nil { panic(err) } + s := []string{p.ClusterChannelPrefix, config} + p.StatisticsChannelName = strings.Join(s, "-") } func (p *ParamTable) initTimeTickChannelName() { - var err error - p.TimeTickChannelName, err = p.Load("msgChannel.chanNamePrefix.dataCoordTimeTick") + config, err := p.Load("msgChannel.chanNamePrefix.dataCoordTimeTick") if err != nil { panic(err) } + s := []string{p.ClusterChannelPrefix, config} + p.TimeTickChannelName = strings.Join(s, "-") } func (p *ParamTable) initSegmentInfoChannelName() { - var err error - p.SegmentInfoChannelName, err = p.Load("msgChannel.chanNamePrefix.dataCoordSegmentInfo") + config, err := p.Load("msgChannel.chanNamePrefix.dataCoordSegmentInfo") if err != nil { panic(err) } + s := []string{p.ClusterChannelPrefix, config} + p.SegmentInfoChannelName = strings.Join(s, "-") } func (p *ParamTable) initDataCoordSubscriptionName() { - var err error - p.DataCoordSubscriptionName, err = p.Load("msgChannel.subNamePrefix.dataCoordSubNamePrefix") + config, err := p.Load("msgChannel.subNamePrefix.dataCoordSubNamePrefix") if err != nil { panic(err) } + s := []string{p.ClusterChannelPrefix, config} + p.DataCoordSubscriptionName = strings.Join(s, "-") } func (p *ParamTable) initLogCfg() { diff --git a/internal/datacoord/param_table_test.go b/internal/datacoord/param_table_test.go new file mode 100644 index 0000000000..6a354ae2f3 --- /dev/null +++ b/internal/datacoord/param_table_test.go @@ -0,0 +1,39 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package datacoord + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +//TODO add more test for other parameters +func TestParamTable(t *testing.T) { + Params.Init() + + assert.Equal(t, Params.InsertChannelPrefixName, "by-dev-insert-channel-") + t.Logf("data coord insert channel = %s", Params.InsertChannelPrefixName) + + assert.Equal(t, Params.StatisticsChannelName, "by-dev-datacoord-statistics-channel") + t.Logf("data coord stats channel = %s", Params.StatisticsChannelName) + + assert.Equal(t, Params.TimeTickChannelName, "by-dev-datacoord-timetick-channel") + t.Logf("data coord timetick channel = %s", Params.TimeTickChannelName) + + assert.Equal(t, Params.SegmentInfoChannelName, "by-dev-segment-info-channel") + t.Logf("data coord segment info channel = %s", Params.SegmentInfoChannelName) + + assert.Equal(t, Params.DataCoordSubscriptionName, "by-dev-dataCoord") + t.Logf("data coord subscription channel = %s", Params.DataCoordSubscriptionName) + +} diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index e7d7b188da..682d0fa140 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -1431,8 +1431,8 @@ func TestPostFlush(t *testing.T) { func newTestServer(t *testing.T, receiveCh chan interface{}, opts ...Option) *Server { Params.Init() - Params.TimeTickChannelName = strconv.Itoa(rand.Int()) - Params.StatisticsChannelName = strconv.Itoa(rand.Int()) + Params.TimeTickChannelName = Params.TimeTickChannelName + strconv.Itoa(rand.Int()) + Params.StatisticsChannelName = Params.StatisticsChannelName + strconv.Itoa(rand.Int()) var err error factory := msgstream.NewPmsFactory() m := map[string]interface{}{ diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index 3c87694f4b..c586877b6d 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -17,6 +17,7 @@ import ( "math" "math/rand" "os" + "strconv" "strings" "sync" "testing" @@ -43,7 +44,9 @@ func TestMain(t *testing.M) { rand.Seed(time.Now().Unix()) Params.InitAlias("datanode-alias-1") Params.Init() - refreshChannelNames() + // change to specific channel for test + Params.TimeTickChannelName = Params.TimeTickChannelName + strconv.Itoa(rand.Int()) + Params.SegmentStatisticsChannelName = Params.SegmentStatisticsChannelName + strconv.Itoa(rand.Int()) code := t.Run() os.Exit(code) } diff --git a/internal/datanode/mock_test.go b/internal/datanode/mock_test.go index 1977582f17..6c39c45c89 100644 --- a/internal/datanode/mock_test.go +++ b/internal/datanode/mock_test.go @@ -99,11 +99,6 @@ func makeNewChannelNames(names []string, suffix string) []string { return ret } -func refreshChannelNames() { - Params.SegmentStatisticsChannelName = "datanode-refresh-segment-statistics" - Params.TimeTickChannelName = "datanode-refresh-hard-timetick" -} - func clearEtcd(rootPath string) error { etcdKV, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, rootPath) if err != nil { diff --git a/internal/datanode/param_table.go b/internal/datanode/param_table.go index ddb6112de0..6d94534cd3 100644 --- a/internal/datanode/param_table.go +++ b/internal/datanode/param_table.go @@ -44,6 +44,9 @@ type ParamTable struct { // --- Rocksmq --- RocksmqPath string + // --- Cluster channels --- + ClusterChannelPrefix string + // - seg statistics channel - SegmentStatisticsChannelName string @@ -75,47 +78,54 @@ func (p *ParamTable) InitAlias(alias string) { p.Alias = alias } -func (p *ParamTable) Init() { +func (p *ParamTable) InitOnce() { once.Do(func() { - p.BaseTable.Init() - err := p.LoadYaml("advanced/data_node.yaml") - if err != nil { - panic(err) - } - - // === DataNode Internal Components Configs === - p.initFlowGraphMaxQueueLength() - p.initFlowGraphMaxParallelism() - p.initFlushInsertBufferSize() - p.initInsertBinlogRootPath() - p.initStatsBinlogRootPath() - p.initLogCfg() - - // === DataNode External Components Configs === - // --- Pulsar --- - p.initPulsarAddress() - - p.initRocksmqPath() - - // - seg statistics channel - - p.initSegmentStatisticsChannelName() - - // - timetick channel - - p.initTimeTickChannelName() - - // --- ETCD --- - p.initEtcdEndpoints() - p.initMetaRootPath() - - // --- MinIO --- - p.initMinioAddress() - p.initMinioAccessKeyID() - p.initMinioSecretAccessKey() - p.initMinioUseSSL() - p.initMinioBucketName() + p.Init() }) } +func (p *ParamTable) Init() { + p.BaseTable.Init() + err := p.LoadYaml("advanced/data_node.yaml") + if err != nil { + panic(err) + } + + // === DataNode Internal Components Configs === + p.initFlowGraphMaxQueueLength() + p.initFlowGraphMaxParallelism() + p.initFlushInsertBufferSize() + p.initInsertBinlogRootPath() + p.initStatsBinlogRootPath() + p.initLogCfg() + + // === DataNode External Components Configs === + // --- Pulsar --- + p.initPulsarAddress() + + p.initRocksmqPath() + + // Has to init global msgchannel prefix before other channel names + p.initClusterMsgChannelPrefix() + + // - seg statistics channel - + p.initSegmentStatisticsChannelName() + + // - timetick channel - + p.initTimeTickChannelName() + + // --- ETCD --- + p.initEtcdEndpoints() + p.initMetaRootPath() + + // --- MinIO --- + p.initMinioAddress() + p.initMinioAccessKeyID() + p.initMinioSecretAccessKey() + p.initMinioUseSSL() + p.initMinioBucketName() +} + // ==== DataNode internal components configs ==== // ---- flowgraph configs ---- func (p *ParamTable) initFlowGraphMaxQueueLength() { @@ -165,30 +175,40 @@ func (p *ParamTable) initRocksmqPath() { p.RocksmqPath = path } -func (p *ParamTable) initSegmentStatisticsChannelName() { - - path, err := p.Load("msgChannel.chanNamePrefix.dataCoordStatistic") +func (p *ParamTable) initClusterMsgChannelPrefix() { + name, err := p.Load("msgChannel.chanNamePrefix.cluster") if err != nil { panic(err) } - p.SegmentStatisticsChannelName = path + p.ClusterChannelPrefix = name +} + +func (p *ParamTable) initSegmentStatisticsChannelName() { + config, err := p.Load("msgChannel.chanNamePrefix.dataCoordStatistic") + if err != nil { + panic(err) + } + s := []string{p.ClusterChannelPrefix, config} + p.SegmentStatisticsChannelName = strings.Join(s, "-") } func (p *ParamTable) initTimeTickChannelName() { - path, err := p.Load("msgChannel.chanNamePrefix.dataCoordTimeTick") + config, err := p.Load("msgChannel.chanNamePrefix.dataCoordTimeTick") if err != nil { panic(err) } - p.TimeTickChannelName = path + s := []string{p.ClusterChannelPrefix, config} + p.TimeTickChannelName = strings.Join(s, "-") } // - msg channel subname - func (p *ParamTable) initMsgChannelSubName() { - name, err := p.Load("msgChannel.subNamePrefix.dataNodeSubNamePrefix") + config, err := p.Load("msgChannel.subNamePrefix.dataNodeSubNamePrefix") if err != nil { panic(err) } - p.MsgChannelSubName = name + "-" + strconv.FormatInt(p.NodeID, 10) + s := []string{p.ClusterChannelPrefix, config, strconv.FormatInt(p.NodeID, 10)} + p.MsgChannelSubName = strings.Join(s, "-") } // --- ETCD --- diff --git a/internal/datanode/param_table_test.go b/internal/datanode/param_table_test.go index 7cc25b5462..03f785b7c4 100644 --- a/internal/datanode/param_table_test.go +++ b/internal/datanode/param_table_test.go @@ -15,10 +15,14 @@ import ( "log" "testing" "time" + + "github.com/stretchr/testify/assert" ) -func TestParamTable_DataNode(t *testing.T) { - +func TestParamTable(t *testing.T) { + Params.Init() + Params.NodeID = 2 + Params.initMsgChannelSubName() t.Run("Test NodeID", func(t *testing.T) { id := Params.NodeID log.Println("NodeID:", id) @@ -55,18 +59,27 @@ func TestParamTable_DataNode(t *testing.T) { log.Println("PulsarAddress:", address) }) + t.Run("Test ClusterChannelPrefix", func(t *testing.T) { + path := Params.ClusterChannelPrefix + assert.Equal(t, path, "by-dev") + log.Println("ClusterChannelPrefix:", Params.ClusterChannelPrefix) + }) + t.Run("Test SegmentStatisticsChannelName", func(t *testing.T) { path := Params.SegmentStatisticsChannelName + assert.Equal(t, path, "by-dev-datacoord-statistics-channel") log.Println("SegmentStatisticsChannelName:", path) }) t.Run("Test TimeTickChannelName", func(t *testing.T) { name := Params.TimeTickChannelName + assert.Equal(t, name, "by-dev-datacoord-timetick-channel") log.Println("TimeTickChannelName:", name) }) t.Run("Test msgChannelSubName", func(t *testing.T) { name := Params.MsgChannelSubName + assert.Equal(t, name, "by-dev-dataNode-2") log.Println("MsgChannelSubName:", name) }) diff --git a/internal/distributed/datacoord/service.go b/internal/distributed/datacoord/service.go index 6c7cb59eb1..e0d822f6ac 100644 --- a/internal/distributed/datacoord/service.go +++ b/internal/distributed/datacoord/service.go @@ -69,12 +69,11 @@ func NewServer(ctx context.Context, factory msgstream.Factory, opts ...datacoord func (s *Server) init() error { Params.Init() - Params.LoadFromEnv() closer := trace.InitTracing("datacoord") s.closer = closer - datacoord.Params.Init() + datacoord.Params.InitOnce() datacoord.Params.IP = Params.IP datacoord.Params.Port = Params.Port diff --git a/internal/distributed/datanode/param_table.go b/internal/distributed/datanode/param_table.go index a25190de54..cd617a1d85 100644 --- a/internal/distributed/datanode/param_table.go +++ b/internal/distributed/datanode/param_table.go @@ -48,6 +48,9 @@ func (pt *ParamTable) Init() { pt.initDataCoordAddress() pt.initPort() + pt.LoadFromEnv() + pt.LoadFromArgs() + pt.initServerMaxSendSize() pt.initServerMaxRecvSize() }) diff --git a/internal/distributed/datanode/service.go b/internal/distributed/datanode/service.go index e548b684ad..e9b609f31d 100644 --- a/internal/distributed/datanode/service.go +++ b/internal/distributed/datanode/service.go @@ -169,10 +169,8 @@ func (s *Server) Stop() error { func (s *Server) init() error { ctx := context.Background() Params.Init() - Params.LoadFromEnv() - Params.LoadFromArgs() - dn.Params.Init() + dn.Params.InitOnce() dn.Params.Port = Params.Port dn.Params.IP = Params.IP diff --git a/internal/distributed/indexcoord/service.go b/internal/distributed/indexcoord/service.go index c658244a09..8f97e91e59 100644 --- a/internal/distributed/indexcoord/service.go +++ b/internal/distributed/indexcoord/service.go @@ -65,7 +65,8 @@ func (s *Server) Run() error { func (s *Server) init() error { Params.Init() - indexcoord.Params.Init() + + indexcoord.Params.InitOnce() indexcoord.Params.Address = Params.ServiceAddress indexcoord.Params.Port = Params.ServicePort diff --git a/internal/distributed/indexnode/paramtable.go b/internal/distributed/indexnode/paramtable.go index ba0b5a84ea..bf2f941299 100644 --- a/internal/distributed/indexnode/paramtable.go +++ b/internal/distributed/indexnode/paramtable.go @@ -46,6 +46,13 @@ func (pt *ParamTable) Init() { pt.initServerMaxSendSize() pt.initServerMaxRecvSize() + + if !funcutil.CheckPortAvailable(pt.Port) { + pt.Port = funcutil.GetAvailablePort() + log.Warn("IndexNode init", zap.Any("Port", pt.Port)) + } + pt.LoadFromEnv() + pt.LoadFromArgs() }) } diff --git a/internal/distributed/indexnode/service.go b/internal/distributed/indexnode/service.go index 3a6268aaba..1239db687d 100644 --- a/internal/distributed/indexnode/service.go +++ b/internal/distributed/indexnode/service.go @@ -92,14 +92,8 @@ func (s *Server) startGrpcLoop(grpcPort int) { func (s *Server) init() error { var err error Params.Init() - if !funcutil.CheckPortAvailable(Params.Port) { - Params.Port = funcutil.GetAvailablePort() - log.Warn("IndexNode init", zap.Any("Port", Params.Port)) - } - Params.LoadFromEnv() - Params.LoadFromArgs() - indexnode.Params.Init() + indexnode.Params.InitOnce() indexnode.Params.Port = Params.Port indexnode.Params.IP = Params.IP indexnode.Params.Address = Params.Address diff --git a/internal/distributed/proxy/paramtable.go b/internal/distributed/proxy/paramtable.go index 1bbba70a2f..40e524dd28 100644 --- a/internal/distributed/proxy/paramtable.go +++ b/internal/distributed/proxy/paramtable.go @@ -47,6 +47,10 @@ func (pt *ParamTable) Init() { pt.BaseTable.Init() pt.initParams() + pt.LoadFromEnv() + pt.LoadFromArgs() + pt.Address = pt.IP + ":" + strconv.FormatInt(int64(pt.Port), 10) + pt.initServerMaxSendSize() pt.initServerMaxRecvSize() }) diff --git a/internal/distributed/proxy/service.go b/internal/distributed/proxy/service.go index 9effdb8a21..06d99178f9 100644 --- a/internal/distributed/proxy/service.go +++ b/internal/distributed/proxy/service.go @@ -132,11 +132,8 @@ func (s *Server) init() error { Params.Port = funcutil.GetAvailablePort() log.Warn("Proxy init", zap.Any("Port", Params.Port)) } - Params.LoadFromEnv() - Params.LoadFromArgs() - Params.Address = Params.IP + ":" + strconv.FormatInt(int64(Params.Port), 10) - proxy.Params.Init() + proxy.Params.InitOnce() log.Debug("init params done ...") // NetworkPort & IP don't matter here, NetworkAddress matters diff --git a/internal/distributed/querycoord/service.go b/internal/distributed/querycoord/service.go index 16f909b0e6..11af61827c 100644 --- a/internal/distributed/querycoord/service.go +++ b/internal/distributed/querycoord/service.go @@ -87,7 +87,8 @@ func (s *Server) Run() error { func (s *Server) init() error { Params.Init() - qc.Params.Init() + + qc.Params.InitOnce() qc.Params.Port = Params.Port closer := trace.InitTracing("querycoord") diff --git a/internal/distributed/querynode/param_table.go b/internal/distributed/querynode/param_table.go index 4b76e6ef11..614ccceb14 100644 --- a/internal/distributed/querynode/param_table.go +++ b/internal/distributed/querynode/param_table.go @@ -51,6 +51,9 @@ func (pt *ParamTable) Init() { pt.initDataCoordAddress() pt.initQueryCoordAddress() + pt.LoadFromEnv() + pt.LoadFromArgs() + pt.initServerMaxSendSize() pt.initServerMaxRecvSize() }) diff --git a/internal/distributed/querynode/service.go b/internal/distributed/querynode/service.go index abc7b90860..e4717661ef 100644 --- a/internal/distributed/querynode/service.go +++ b/internal/distributed/querynode/service.go @@ -75,10 +75,8 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) func (s *Server) init() error { Params.Init() - Params.LoadFromEnv() - Params.LoadFromArgs() - qn.Params.Init() + qn.Params.InitOnce() qn.Params.QueryNodeIP = Params.QueryNodeIP qn.Params.QueryNodePort = int64(Params.QueryNodePort) qn.Params.QueryNodeID = Params.QueryNodeID diff --git a/internal/distributed/rootcoord/service.go b/internal/distributed/rootcoord/service.go index e5771b7283..38f7b9f692 100644 --- a/internal/distributed/rootcoord/service.go +++ b/internal/distributed/rootcoord/service.go @@ -128,7 +128,7 @@ func (s *Server) Run() error { func (s *Server) init() error { Params.Init() - rootcoord.Params.Init() + rootcoord.Params.InitOnce() rootcoord.Params.Address = Params.Address rootcoord.Params.Port = Params.Port log.Debug("grpc init done ...") diff --git a/internal/indexcoord/index_coord.go b/internal/indexcoord/index_coord.go index 8cac8a5175..33782e4405 100644 --- a/internal/indexcoord/index_coord.go +++ b/internal/indexcoord/index_coord.go @@ -112,8 +112,8 @@ func (i *IndexCoord) Register() error { func (i *IndexCoord) Init() error { var initErr error = nil + Params.InitOnce() i.initOnce.Do(func() { - Params.Init() log.Debug("IndexCoord", zap.Any("etcd endpoints", Params.EtcdEndpoints)) i.UpdateStateCode(internalpb.StateCode_Initializing) diff --git a/internal/indexcoord/param_table.go b/internal/indexcoord/param_table.go index 7cb89204dc..34362a35ee 100644 --- a/internal/indexcoord/param_table.go +++ b/internal/indexcoord/param_table.go @@ -44,17 +44,27 @@ var Params ParamTable var once sync.Once func (pt *ParamTable) Init() { + pt.BaseTable.Init() + // TODO, load index_node.yaml + /*err := pt.LoadYaml("advanced/index_coord.yaml") + if err != nil { + panic(err) + }*/ + + pt.initLogCfg() + pt.initEtcdEndpoints() + pt.initMetaRootPath() + pt.initKvRootPath() + pt.initMinIOAddress() + pt.initMinIOAccessKeyID() + pt.initMinIOSecretAccessKey() + pt.initMinIOUseSSL() + pt.initMinioBucketName() +} + +func (pt *ParamTable) InitOnce() { once.Do(func() { - pt.BaseTable.Init() - pt.initLogCfg() - pt.initEtcdEndpoints() - pt.initMetaRootPath() - pt.initKvRootPath() - pt.initMinIOAddress() - pt.initMinIOAccessKeyID() - pt.initMinIOSecretAccessKey() - pt.initMinIOUseSSL() - pt.initMinioBucketName() + pt.Init() }) } diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index ccbcd4b3b0..e2ae1373be 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -112,7 +112,6 @@ func (i *IndexNode) initKnowhere() { } func (i *IndexNode) Init() error { - Params.Init() i.UpdateStateCode(internalpb.StateCode_Initializing) log.Debug("IndexNode", zap.Any("State", internalpb.StateCode_Initializing)) connectEtcdFn := func() error { diff --git a/internal/indexnode/param_table.go b/internal/indexnode/param_table.go index 6c00eada44..bceae0410d 100644 --- a/internal/indexnode/param_table.go +++ b/internal/indexnode/param_table.go @@ -60,14 +60,25 @@ func (pt *ParamTable) InitAlias(alias string) { } func (pt *ParamTable) Init() { + pt.BaseTable.Init() + if err := pt.LoadYaml("advanced/knowhere.yaml"); err != nil { + panic(err) + } + + // TODO, load index_node.yaml + /*err := pt.LoadYaml("advanced/index_node.yaml") + if err != nil { + panic(err) + }*/ + + pt.initLogCfg() + pt.initParams() + pt.initKnowhereSimdType() +} + +func (pt *ParamTable) InitOnce() { once.Do(func() { - pt.BaseTable.Init() - if err := pt.LoadYaml("advanced/knowhere.yaml"); err != nil { - panic(err) - } - pt.initLogCfg() - pt.initParams() - pt.initKnowhereSimdType() + pt.Init() }) } diff --git a/internal/proxy/param_table.go b/internal/proxy/param_table.go index 53de96839f..3d21fba3ca 100644 --- a/internal/proxy/param_table.go +++ b/internal/proxy/param_table.go @@ -47,19 +47,24 @@ type ParamTable struct { RocksmqPath string // not used in Proxy - ProxyID UniqueID - TimeTickInterval time.Duration + ProxyID UniqueID + TimeTickInterval time.Duration + MsgStreamTimeTickBufSize int64 + MaxNameLength int64 + MaxFieldNum int64 + MaxShardNum int32 + MaxDimension int64 + DefaultPartitionName string + DefaultIndexName string + + // --- Channels --- + ClusterChannelPrefix string + ProxyTimeTickChannelNames []string + ProxySubName string + + // required from query coord SearchResultChannelNames []string RetrieveResultChannelNames []string - ProxySubName string - ProxyTimeTickChannelNames []string - MsgStreamTimeTickBufSize int64 - MaxNameLength int64 - MaxFieldNum int64 - MaxShardNum int32 - MaxDimension int64 - DefaultPartitionName string - DefaultIndexName string MaxTaskNum int64 @@ -71,24 +76,27 @@ type ParamTable struct { var Params ParamTable var once sync.Once -func (pt *ParamTable) Init() { +func (pt *ParamTable) InitOnce() { once.Do(func() { - pt.BaseTable.Init() - err := pt.LoadYaml("advanced/proxy.yaml") - if err != nil { - panic(err) - } - pt.initParams() + pt.Init() }) } -func (pt *ParamTable) initParams() { +func (pt *ParamTable) Init() { + pt.BaseTable.Init() + err := pt.LoadYaml("advanced/proxy.yaml") + if err != nil { + panic(err) + } + pt.initLogCfg() pt.initEtcdEndpoints() pt.initMetaRootPath() pt.initPulsarAddress() pt.initRocksmqPath() pt.initTimeTickInterval() + // Has to init global msgchannel prefix before other channel names + pt.initClusterMsgChannelPrefix() pt.initProxySubName() pt.initProxyTimeTickChannelNames() pt.initMsgStreamTimeTickBufSize() @@ -137,20 +145,30 @@ func (pt *ParamTable) initTimeTickInterval() { pt.TimeTickInterval = time.Duration(interval) * time.Millisecond } -func (pt *ParamTable) initProxySubName() { - prefix, err := pt.Load("msgChannel.subNamePrefix.proxySubNamePrefix") +func (pt *ParamTable) initClusterMsgChannelPrefix() { + config, err := pt.Load("msgChannel.chanNamePrefix.cluster") if err != nil { panic(err) } - pt.ProxySubName = prefix + "-" + strconv.FormatInt(pt.ProxyID, 10) + pt.ClusterChannelPrefix = config +} + +func (pt *ParamTable) initProxySubName() { + config, err := pt.Load("msgChannel.subNamePrefix.proxySubNamePrefix") + if err != nil { + panic(err) + } + s := []string{pt.ClusterChannelPrefix, config, strconv.FormatInt(pt.ProxyID, 10)} + pt.ProxySubName = strings.Join(s, "-") } func (pt *ParamTable) initProxyTimeTickChannelNames() { - prefix, err := pt.Load("msgChannel.chanNamePrefix.proxyTimeTick") + config, err := pt.Load("msgChannel.chanNamePrefix.proxyTimeTick") if err != nil { panic(err) } - prefix += "-0" + s := []string{pt.ClusterChannelPrefix, config, "0"} + prefix := strings.Join(s, "-") pt.ProxyTimeTickChannelNames = []string{prefix} } diff --git a/internal/proxy/param_table_test.go b/internal/proxy/param_table_test.go index d0b4e5d884..864283ca6c 100644 --- a/internal/proxy/param_table_test.go +++ b/internal/proxy/param_table_test.go @@ -13,6 +13,8 @@ package proxy import ( "testing" + + "github.com/stretchr/testify/assert" ) func TestParamTable_Normal(t *testing.T) { @@ -39,10 +41,12 @@ func TestParamTable_Normal(t *testing.T) { }) t.Run("ProxySubName", func(t *testing.T) { + assert.Equal(t, Params.ProxySubName, "by-dev-proxy-0") t.Logf("ProxySubName: %s", Params.ProxySubName) }) t.Run("ProxyTimeTickChannelNames", func(t *testing.T) { + assert.Equal(t, Params.ProxyTimeTickChannelNames, []string{"by-dev-proxyTimeTick-0"}) t.Logf("ProxyTimeTickChannelNames: %v", Params.ProxyTimeTickChannelNames) }) diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 1b9b87effc..e7c87b7bd9 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -151,6 +151,8 @@ func (node *Proxy) Init() error { } log.Debug("Proxy CreateQueryChannel success") + // TODO SearchResultChannelNames and RetrieveResultChannelNames should not be part in the Param table + // we should maintain a separate map for search result Params.SearchResultChannelNames = []string{resp.ResultChannel} Params.RetrieveResultChannelNames = []string{resp.ResultChannel} log.Debug("Proxy CreateQueryChannel success", zap.Any("SearchResultChannelNames", Params.SearchResultChannelNames)) diff --git a/internal/querycoord/param_table.go b/internal/querycoord/param_table.go index bbc3b2d15f..3b937ca4ff 100644 --- a/internal/querycoord/param_table.go +++ b/internal/querycoord/param_table.go @@ -43,7 +43,8 @@ type ParamTable struct { Log log.Config RoleName string - // search + // channels + ClusterChannelPrefix string SearchChannelPrefix string SearchResultChannelPrefix string @@ -63,41 +64,47 @@ type ParamTable struct { var Params ParamTable var once sync.Once -func (p *ParamTable) Init() { +func (p *ParamTable) InitOnce() { once.Do(func() { - p.BaseTable.Init() - err := p.LoadYaml("advanced/query_node.yaml") - if err != nil { - panic(err) - } - - err = p.LoadYaml("milvus.yaml") - if err != nil { - panic(err) - } - - p.initLogCfg() - - p.initStatsChannelName() - p.initTimeTickChannelName() - p.initQueryCoordAddress() - p.initRoleName() - p.initSearchChannelPrefix() - p.initSearchResultChannelPrefix() - - // --- ETCD --- - p.initEtcdEndpoints() - p.initMetaRootPath() - p.initKvRootPath() - - //--- Minio ---- - p.initMinioEndPoint() - p.initMinioAccessKeyID() - p.initMinioSecretAccessKey() - p.initMinioUseSSLStr() - p.initMinioBucketName() + p.Init() }) } +func (p *ParamTable) Init() { + p.BaseTable.Init() + err := p.LoadYaml("advanced/query_node.yaml") + if err != nil { + panic(err) + } + + err = p.LoadYaml("milvus.yaml") + if err != nil { + panic(err) + } + + p.initLogCfg() + + p.initQueryCoordAddress() + p.initRoleName() + + // --- Channels --- + p.initClusterMsgChannelPrefix() + p.initSearchChannelPrefix() + p.initSearchResultChannelPrefix() + p.initStatsChannelName() + p.initTimeTickChannelName() + + // --- ETCD --- + p.initEtcdEndpoints() + p.initMetaRootPath() + p.initKvRootPath() + + //--- Minio ---- + p.initMinioEndPoint() + p.initMinioAccessKeyID() + p.initMinioSecretAccessKey() + p.initMinioUseSSLStr() + p.initMinioBucketName() +} func (p *ParamTable) initLogCfg() { p.Log = log.Config{} @@ -125,23 +132,6 @@ func (p *ParamTable) initLogCfg() { } } -func (p *ParamTable) initStatsChannelName() { - channels, err := p.Load("msgChannel.chanNamePrefix.queryNodeStats") - if err != nil { - panic(err) - } - p.StatsChannelName = channels -} - -func (p *ParamTable) initTimeTickChannelName() { - timeTickChannelName, err := p.Load("msgChannel.chanNamePrefix.queryTimeTick") - if err != nil { - panic(err) - } - p.TimeTickChannelName = timeTickChannelName - -} - func (p *ParamTable) initQueryCoordAddress() { url, err := p.Load("_QueryCoordAddress") if err != nil { @@ -154,22 +144,49 @@ func (p *ParamTable) initRoleName() { p.RoleName = fmt.Sprintf("%s-%d", "QueryCoord", p.NodeID) } +func (p *ParamTable) initClusterMsgChannelPrefix() { + config, err := p.Load("msgChannel.chanNamePrefix.cluster") + if err != nil { + panic(err) + } + p.ClusterChannelPrefix = config +} + func (p *ParamTable) initSearchChannelPrefix() { - channelName, err := p.Load("msgChannel.chanNamePrefix.search") + config, err := p.Load("msgChannel.chanNamePrefix.search") if err != nil { log.Error(err.Error()) } - p.SearchChannelPrefix = channelName + s := []string{p.ClusterChannelPrefix, config} + p.SearchChannelPrefix = strings.Join(s, "-") } func (p *ParamTable) initSearchResultChannelPrefix() { - channelName, err := p.Load("msgChannel.chanNamePrefix.searchResult") + config, err := p.Load("msgChannel.chanNamePrefix.searchResult") if err != nil { log.Error(err.Error()) } + s := []string{p.ClusterChannelPrefix, config} + p.SearchResultChannelPrefix = strings.Join(s, "-") +} - p.SearchResultChannelPrefix = channelName +func (p *ParamTable) initStatsChannelName() { + config, err := p.Load("msgChannel.chanNamePrefix.queryNodeStats") + if err != nil { + panic(err) + } + s := []string{p.ClusterChannelPrefix, config} + p.StatsChannelName = strings.Join(s, "-") +} + +func (p *ParamTable) initTimeTickChannelName() { + config, err := p.Load("msgChannel.chanNamePrefix.queryTimeTick") + if err != nil { + panic(err) + } + s := []string{p.ClusterChannelPrefix, config} + p.TimeTickChannelName = strings.Join(s, "-") } func (p *ParamTable) initEtcdEndpoints() { diff --git a/internal/querycoord/param_table_test.go b/internal/querycoord/param_table_test.go new file mode 100644 index 0000000000..f395dcf6b4 --- /dev/null +++ b/internal/querycoord/param_table_test.go @@ -0,0 +1,35 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package querycoord + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +//TODO add more test for other parameters +func TestParamTable(t *testing.T) { + Params.Init() + + assert.Equal(t, Params.SearchChannelPrefix, "by-dev-search") + t.Logf("query coord search channel = %s", Params.SearchChannelPrefix) + + assert.Equal(t, Params.SearchResultChannelPrefix, "by-dev-searchResult") + t.Logf("query coord search result channel = %s", Params.SearchResultChannelPrefix) + + assert.Equal(t, Params.StatsChannelName, "by-dev-query-node-stats") + t.Logf("query coord stats channel = %s", Params.StatsChannelName) + + assert.Equal(t, Params.TimeTickChannelName, "by-dev-queryTimeTick") + t.Logf("query coord time tick channel = %s", Params.TimeTickChannelName) +} diff --git a/internal/querynode/metrics_info.go b/internal/querynode/metrics_info.go index e82c374a2b..5c678b3c0e 100644 --- a/internal/querynode/metrics_info.go +++ b/internal/querynode/metrics_info.go @@ -46,7 +46,7 @@ func getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest, SearchPulsarBufSize: Params.SearchPulsarBufSize, SearchResultReceiveBufSize: Params.SearchResultReceiveBufSize, RetrieveReceiveBufSize: Params.RetrieveReceiveBufSize, - RetrievePulsarBufSize: Params.retrievePulsarBufSize, + RetrievePulsarBufSize: Params.RetrievePulsarBufSize, RetrieveResultReceiveBufSize: Params.RetrieveResultReceiveBufSize, }, } diff --git a/internal/querynode/param_table.go b/internal/querynode/param_table.go index 5944e3f3b8..68037e6871 100644 --- a/internal/querynode/param_table.go +++ b/internal/querynode/param_table.go @@ -30,11 +30,16 @@ type ParamTable struct { EtcdEndpoints []string MetaRootPath string - Alias string - QueryNodeIP string - QueryNodePort int64 - QueryNodeID UniqueID + Alias string + QueryNodeIP string + QueryNodePort int64 + QueryNodeID UniqueID + + // channel prefix + ClusterChannelPrefix string QueryTimeTickChannelName string + StatsChannelName string + MsgChannelSubName string FlowGraphMaxQueueLength int32 FlowGraphMaxParallelism int32 @@ -57,16 +62,14 @@ type ParamTable struct { RetrieveChannelNames []string RetrieveResultChannelNames []string RetrieveReceiveBufSize int64 - retrievePulsarBufSize int64 + RetrievePulsarBufSize int64 RetrieveResultReceiveBufSize int64 // stats StatsPublishInterval int - StatsChannelName string - GracefulTime int64 - MsgChannelSubName string - SliceIndex int + GracefulTime int64 + SliceIndex int // segcore ChunkRows int64 @@ -82,55 +85,53 @@ func (p *ParamTable) InitAlias(alias string) { p.Alias = alias } -func (p *ParamTable) Init() { +func (p *ParamTable) InitOnce() { once.Do(func() { - p.BaseTable.Init() - if err := p.LoadYaml("advanced/query_node.yaml"); err != nil { - panic(err) - } - if err := p.LoadYaml("advanced/knowhere.yaml"); err != nil { - panic(err) - } - - //p.initQueryTimeTickChannelName() - - p.initMinioEndPoint() - p.initMinioAccessKeyID() - p.initMinioSecretAccessKey() - p.initMinioUseSSLStr() - p.initMinioBucketName() - - p.initPulsarAddress() - p.initRocksmqPath() - p.initEtcdEndpoints() - p.initMetaRootPath() - - p.initGracefulTime() - - p.initFlowGraphMaxQueueLength() - p.initFlowGraphMaxParallelism() - - p.initSearchReceiveBufSize() - p.initSearchPulsarBufSize() - p.initSearchResultReceiveBufSize() - - p.initStatsPublishInterval() - p.initStatsChannelName() - - p.initSegcoreChunkRows() - p.initKnowhereSimdType() - - p.initLogCfg() + p.Init() }) } -// ---------------------------------------------------------- query node -func (p *ParamTable) initQueryTimeTickChannelName() { - ch, err := p.Load("msgChannel.chanNamePrefix.queryTimeTick") - if err != nil { - log.Warn(err.Error()) +func (p *ParamTable) Init() { + p.BaseTable.Init() + if err := p.LoadYaml("advanced/query_node.yaml"); err != nil { + panic(err) } - p.QueryTimeTickChannelName = ch + if err := p.LoadYaml("advanced/knowhere.yaml"); err != nil { + panic(err) + } + + p.initMinioEndPoint() + p.initMinioAccessKeyID() + p.initMinioSecretAccessKey() + p.initMinioUseSSLStr() + p.initMinioBucketName() + + p.initPulsarAddress() + p.initRocksmqPath() + p.initEtcdEndpoints() + p.initMetaRootPath() + + p.initGracefulTime() + + p.initFlowGraphMaxQueueLength() + p.initFlowGraphMaxParallelism() + + p.initSearchReceiveBufSize() + p.initSearchPulsarBufSize() + p.initSearchResultReceiveBufSize() + + // Has to init global msgchannel prefix before other channel names + p.initClusterMsgChannelPrefix() + p.initQueryTimeTickChannelName() + p.initStatsChannelName() + p.initMsgChannelSubName() + + p.initStatsPublishInterval() + + p.initSegcoreChunkRows() + p.initKnowhereSimdType() + + p.initLogCfg() } // ---------------------------------------------------------- minio @@ -222,6 +223,44 @@ func (p *ParamTable) initSearchResultReceiveBufSize() { p.SearchResultReceiveBufSize = p.ParseInt64("queryNode.msgStream.searchResult.recvBufSize") } +// ------------------------ channel names +func (p *ParamTable) initClusterMsgChannelPrefix() { + name, err := p.Load("msgChannel.chanNamePrefix.cluster") + if err != nil { + panic(err) + } + p.ClusterChannelPrefix = name +} + +func (p *ParamTable) initQueryTimeTickChannelName() { + config, err := p.Load("msgChannel.chanNamePrefix.queryTimeTick") + if err != nil { + log.Warn(err.Error()) + } + s := []string{p.ClusterChannelPrefix, config} + p.QueryTimeTickChannelName = strings.Join(s, "-") +} + +func (p *ParamTable) initMsgChannelSubName() { + namePrefix, err := p.Load("msgChannel.subNamePrefix.queryNodeSubNamePrefix") + if err != nil { + log.Warn(err.Error()) + } + + s := []string{p.ClusterChannelPrefix, namePrefix, strconv.FormatInt(p.QueryNodeID, 10)} + p.MsgChannelSubName = strings.Join(s, "-") +} + +func (p *ParamTable) initStatsChannelName() { + config, err := p.Load("msgChannel.chanNamePrefix.queryNodeStats") + if err != nil { + panic(err) + } + s := []string{p.ClusterChannelPrefix, config} + p.StatsChannelName = strings.Join(s, "-") +} + +// ETCD configs func (p *ParamTable) initEtcdEndpoints() { endpoints, err := p.Load("_EtcdEndpoints") if err != nil { @@ -246,23 +285,6 @@ func (p *ParamTable) initGracefulTime() { p.GracefulTime = p.ParseInt64("queryNode.gracefulTime") } -func (p *ParamTable) initMsgChannelSubName() { - namePrefix, err := p.Load("msgChannel.subNamePrefix.queryNodeSubNamePrefix") - if err != nil { - log.Warn(err.Error()) - } - subName := namePrefix + "-" + strconv.FormatInt(p.QueryNodeID, 10) - p.MsgChannelSubName = subName -} - -func (p *ParamTable) initStatsChannelName() { - channels, err := p.Load("msgChannel.chanNamePrefix.queryNodeStats") - if err != nil { - panic(err) - } - p.StatsChannelName = channels -} - func (p *ParamTable) initSegcoreChunkRows() { p.ChunkRows = p.ParseInt64("queryNode.segcore.chunkRows") } diff --git a/internal/querynode/param_table_test.go b/internal/querynode/param_table_test.go index 16dc54836d..ebe49d04fe 100644 --- a/internal/querynode/param_table_test.go +++ b/internal/querynode/param_table_test.go @@ -26,13 +26,6 @@ func TestParamTable_PulsarAddress(t *testing.T) { assert.Equal(t, "6650", split[len(split)-1]) } -func TestParamTable_QueryNode(t *testing.T) { - t.Run("Test time tick channel", func(t *testing.T) { - ch := Params.QueryTimeTickChannelName - assert.Equal(t, ch, "queryTimeTick") - }) -} - func TestParamTable_minio(t *testing.T) { t.Run("Test endPoint", func(t *testing.T) { endPoint := Params.MinioEndPoint @@ -87,16 +80,22 @@ func TestParamTable_flowGraphMaxParallelism(t *testing.T) { } func TestParamTable_msgChannelSubName(t *testing.T) { + Params.QueryNodeID = 3 Params.initMsgChannelSubName() name := Params.MsgChannelSubName - expectName := "queryNode-0" - assert.Equal(t, expectName, name) + assert.Equal(t, name, "by-dev-queryNode-3") } func TestParamTable_statsChannelName(t *testing.T) { + Params.Init() name := Params.StatsChannelName - contains := strings.Contains(name, "query-node-stats") - assert.Equal(t, contains, true) + assert.Equal(t, name, "by-dev-query-node-stats") +} + +func TestParamTable_QueryTimeTickChannel(t *testing.T) { + Params.Init() + ch := Params.QueryTimeTickChannelName + assert.Equal(t, ch, "by-dev-queryTimeTick") } func TestParamTable_metaRootPath(t *testing.T) { diff --git a/internal/querynode/query_node_test.go b/internal/querynode/query_node_test.go index 5cd1a1ee75..6b23dae4e2 100644 --- a/internal/querynode/query_node_test.go +++ b/internal/querynode/query_node_test.go @@ -38,9 +38,6 @@ type queryCoordMock struct { func setup() { os.Setenv("QUERY_NODE_ID", "1") Params.Init() - //Params.QueryNodeID = 1 - Params.initQueryTimeTickChannelName() - Params.initStatsChannelName() Params.MetaRootPath = "/etcd/test/root/querynode" } @@ -209,11 +206,6 @@ func makeNewChannelNames(names []string, suffix string) []string { return ret } -func refreshChannelNames() { - suffix := "-test-query-node" + strconv.FormatInt(rand.Int63n(1000000), 10) - Params.StatsChannelName = Params.StatsChannelName + suffix -} - func newMessageStreamFactory() (msgstream.Factory, error) { const receiveBufSize = 1024 @@ -229,7 +221,7 @@ func newMessageStreamFactory() (msgstream.Factory, error) { func TestMain(m *testing.M) { setup() - refreshChannelNames() + Params.StatsChannelName = Params.StatsChannelName + strconv.Itoa(rand.Int()) exitCode := m.Run() os.Exit(exitCode) } diff --git a/internal/rootcoord/param_table.go b/internal/rootcoord/param_table.go index 3ba696b8b1..3bfa607da0 100644 --- a/internal/rootcoord/param_table.go +++ b/internal/rootcoord/param_table.go @@ -30,14 +30,16 @@ type ParamTable struct { Address string Port int - PulsarAddress string - EtcdEndpoints []string - MetaRootPath string - KvRootPath string - MsgChannelSubName string - TimeTickChannel string - StatisticsChannel string - DmlChannelName string + PulsarAddress string + EtcdEndpoints []string + MetaRootPath string + KvRootPath string + + ClusterChannelPrefix string + MsgChannelSubName string + TimeTickChannel string + StatisticsChannel string + DmlChannelName string DmlChannelNum int64 MaxPartitionNum int64 @@ -56,39 +58,45 @@ type ParamTable struct { RoleName string } -func (p *ParamTable) Init() { +func (p *ParamTable) InitOnce() { once.Do(func() { - // load yaml - p.BaseTable.Init() - err := p.LoadYaml("advanced/root_coord.yaml") - if err != nil { - panic(err) - } - - p.initPulsarAddress() - p.initEtcdEndpoints() - p.initMetaRootPath() - p.initKvRootPath() - - p.initMsgChannelSubName() - p.initTimeTickChannel() - p.initStatisticsChannelName() - p.initDmlChannelName() - - p.initDmlChannelNum() - p.initMaxPartitionNum() - p.initMinSegmentSizeToEnableIndex() - p.initDefaultPartitionName() - p.initDefaultIndexName() - - p.initTimeout() - p.initTimeTickInterval() - - p.initLogCfg() - p.initRoleName() + p.Init() }) } +func (p *ParamTable) Init() { + // load yaml + p.BaseTable.Init() + err := p.LoadYaml("advanced/root_coord.yaml") + if err != nil { + panic(err) + } + + p.initPulsarAddress() + p.initEtcdEndpoints() + p.initMetaRootPath() + p.initKvRootPath() + + // Has to init global msgchannel prefix before other channel names + p.initClusterMsgChannelPrefix() + p.initMsgChannelSubName() + p.initTimeTickChannel() + p.initStatisticsChannelName() + p.initDmlChannelName() + + p.initDmlChannelNum() + p.initMaxPartitionNum() + p.initMinSegmentSizeToEnableIndex() + p.initDefaultPartitionName() + p.initDefaultIndexName() + + p.initTimeout() + p.initTimeTickInterval() + + p.initLogCfg() + p.initRoleName() +} + func (p *ParamTable) initPulsarAddress() { addr, err := p.Load("_PulsarAddress") if err != nil { @@ -129,36 +137,48 @@ func (p *ParamTable) initKvRootPath() { p.KvRootPath = rootPath + "/" + subPath } -func (p *ParamTable) initMsgChannelSubName() { - name, err := p.Load("msgChannel.subNamePrefix.rootCoordSubNamePrefix") +func (p *ParamTable) initClusterMsgChannelPrefix() { + config, err := p.Load("msgChannel.chanNamePrefix.cluster") if err != nil { panic(err) } - p.MsgChannelSubName = name + p.ClusterChannelPrefix = config +} + +func (p *ParamTable) initMsgChannelSubName() { + config, err := p.Load("msgChannel.subNamePrefix.rootCoordSubNamePrefix") + if err != nil { + panic(err) + } + s := []string{p.ClusterChannelPrefix, config} + p.MsgChannelSubName = strings.Join(s, "-") } func (p *ParamTable) initTimeTickChannel() { - channel, err := p.Load("msgChannel.chanNamePrefix.rootCoordTimeTick") + config, err := p.Load("msgChannel.chanNamePrefix.rootCoordTimeTick") if err != nil { panic(err) } - p.TimeTickChannel = channel + s := []string{p.ClusterChannelPrefix, config} + p.TimeTickChannel = strings.Join(s, "-") } func (p *ParamTable) initStatisticsChannelName() { - channel, err := p.Load("msgChannel.chanNamePrefix.rootCoordStatistics") + config, err := p.Load("msgChannel.chanNamePrefix.rootCoordStatistics") if err != nil { panic(err) } - p.StatisticsChannel = channel + s := []string{p.ClusterChannelPrefix, config} + p.StatisticsChannel = strings.Join(s, "-") } func (p *ParamTable) initDmlChannelName() { - channel, err := p.Load("msgChannel.chanNamePrefix.rootCoordDml") + config, err := p.Load("msgChannel.chanNamePrefix.rootCoordDml") if err != nil { panic(err) } - p.DmlChannelName = channel + s := []string{p.ClusterChannelPrefix, config} + p.DmlChannelName = strings.Join(s, "-") } func (p *ParamTable) initDmlChannelNum() { diff --git a/internal/rootcoord/param_table_test.go b/internal/rootcoord/param_table_test.go index a0eb2b9998..2f8a8805f3 100644 --- a/internal/rootcoord/param_table_test.go +++ b/internal/rootcoord/param_table_test.go @@ -33,15 +33,18 @@ func TestParamTable(t *testing.T) { assert.NotEqual(t, Params.KvRootPath, "") t.Logf("kv root path = %s", Params.KvRootPath) - assert.NotEqual(t, Params.MsgChannelSubName, "") + assert.Equal(t, Params.MsgChannelSubName, "by-dev-rootCoord") t.Logf("msg channel sub name = %s", Params.MsgChannelSubName) - assert.NotEqual(t, Params.TimeTickChannel, "") + assert.Equal(t, Params.TimeTickChannel, "by-dev-rootcoord-timetick") t.Logf("master time tick channel = %s", Params.TimeTickChannel) - assert.NotEqual(t, Params.StatisticsChannel, "") + assert.Equal(t, Params.StatisticsChannel, "by-dev-rootcoord-statistics") t.Logf("master statistics channel = %s", Params.StatisticsChannel) + assert.Equal(t, Params.DmlChannelName, "by-dev-rootcoord-dml") + t.Logf("dml channel = %s", Params.DmlChannelName) + assert.NotEqual(t, Params.MaxPartitionNum, 0) t.Logf("master MaxPartitionNum = %d", Params.MaxPartitionNum)