diff --git a/cmd/main.go b/cmd/main.go index 832ff594bd..d2e5deda13 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -244,7 +244,7 @@ func main() { panic(err) } defer removePidFile(fd) - role.Run(localMsg) + role.Run(localMsg, svrAlias) case "stop": if err := stopPid(filename, runtimeDir); err != nil { panic(err) diff --git a/cmd/roles/roles.go b/cmd/roles/roles.go index 21e4b75eb8..28b6f1975a 100644 --- a/cmd/roles/roles.go +++ b/cmd/roles/roles.go @@ -102,12 +102,13 @@ func (mr *MilvusRoles) runRootCoord(ctx context.Context, localMsg bool) *compone return rc } -func (mr *MilvusRoles) runProxy(ctx context.Context, localMsg bool) *components.Proxy { +func (mr *MilvusRoles) runProxy(ctx context.Context, localMsg bool, alias string) *components.Proxy { var pn *components.Proxy var wg sync.WaitGroup wg.Add(1) go func() { + proxynode.Params.InitAlias(alias) proxynode.Params.Init() if !localMsg { @@ -158,12 +159,13 @@ func (mr *MilvusRoles) runQueryCoord(ctx context.Context, localMsg bool) *compon return qs } -func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool) *components.QueryNode { +func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool, alias string) *components.QueryNode { var qn *components.QueryNode var wg sync.WaitGroup wg.Add(1) go func() { + querynode.Params.InitAlias(alias) querynode.Params.Init() if !localMsg { @@ -214,12 +216,13 @@ func (mr *MilvusRoles) runDataCoord(ctx context.Context, localMsg bool) *compone return ds } -func (mr *MilvusRoles) runDataNode(ctx context.Context, localMsg bool) *components.DataNode { +func (mr *MilvusRoles) runDataNode(ctx context.Context, localMsg bool, alias string) *components.DataNode { var dn *components.DataNode var wg sync.WaitGroup wg.Add(1) go func() { + datanode.Params.InitAlias(alias) datanode.Params.Init() if !localMsg { @@ -269,12 +272,13 @@ func (mr *MilvusRoles) runIndexCoord(ctx context.Context, localMsg bool) *compon return is } -func (mr *MilvusRoles) runIndexNode(ctx context.Context, localMsg bool) *components.IndexNode { +func (mr *MilvusRoles) runIndexNode(ctx context.Context, localMsg bool, alias string) *components.IndexNode { var in *components.IndexNode var wg sync.WaitGroup wg.Add(1) go func() { + indexnode.Params.InitAlias(alias) indexnode.Params.Init() if !localMsg { @@ -316,7 +320,7 @@ func (mr *MilvusRoles) runMsgStreamCoord(ctx context.Context) *components.MsgStr return mss } -func (mr *MilvusRoles) Run(localMsg bool) { +func (mr *MilvusRoles) Run(localMsg bool, alias string) { if os.Getenv("DEPLOY_MODE") == "STANDALONE" { closer := trace.InitTracing("standalone") if closer != nil { @@ -344,7 +348,7 @@ func (mr *MilvusRoles) Run(localMsg bool) { var pn *components.Proxy if mr.EnableProxy { - pn = mr.runProxy(ctx, localMsg) + pn = mr.runProxy(ctx, localMsg, alias) if pn != nil { defer pn.Stop() } @@ -360,7 +364,7 @@ func (mr *MilvusRoles) Run(localMsg bool) { var qn *components.QueryNode if mr.EnableQueryNode { - qn = mr.runQueryNode(ctx, localMsg) + qn = mr.runQueryNode(ctx, localMsg, alias) if qn != nil { defer qn.Stop() } @@ -376,7 +380,7 @@ func (mr *MilvusRoles) Run(localMsg bool) { var dn *components.DataNode if mr.EnableDataNode { - dn = mr.runDataNode(ctx, localMsg) + dn = mr.runDataNode(ctx, localMsg, alias) if dn != nil { defer dn.Stop() } @@ -392,7 +396,7 @@ func (mr *MilvusRoles) Run(localMsg bool) { var in *components.IndexNode if mr.EnableIndexNode { - in = mr.runIndexNode(ctx, localMsg) + in = mr.runIndexNode(ctx, localMsg, alias) if in != nil { defer in.Stop() } diff --git a/configs/advanced/channel.yaml b/configs/advanced/channel.yaml index 3e5d137b7d..ae52394aa2 100644 --- a/configs/advanced/channel.yaml +++ b/configs/advanced/channel.yaml @@ -12,14 +12,11 @@ msgChannel: # channel name generation rule: ${namePrefix}-${ChannelIdx} chanNamePrefix: - dataDefinition: "data-definition" masterTimeTick: "master-timetick" masterStatistics: "master-statistics" search: "search" searchResult: "searchResult" - k2s: "k2s" proxyTimeTick: "proxyTimeTick" - proxyServiceTimeTick: "proxyServiceTimeTick" queryTimeTick: "queryTimeTick" queryNodeStats: "query-node-stats" # cmd for loadIndex, flush, etc... @@ -36,7 +33,3 @@ msgChannel: queryNodeSubNamePrefix: "queryNode" dataNodeSubNamePrefix: "dataNode" dataServiceSubNamePrefix: "dataService" - - # default channel range [0, 1) - channelRange: - k2s: [0, 1] diff --git a/configs/advanced/data_service.yaml b/configs/advanced/data_service.yaml index 992ad79219..b3b602a20f 100644 --- a/configs/advanced/data_service.yaml +++ b/configs/advanced/data_service.yaml @@ -10,10 +10,7 @@ # or implied. See the License for the specific language governing permissions and limitations under the License. dataservice: - nodeID: 14040 segment: size: 512 # MB sizeFactor: 0.75 IDAssignExpiration: 2000 # ms - insertChannelNum: 2 - dataNodeNum: 1 diff --git a/configs/advanced/master.yaml b/configs/advanced/master.yaml index 9c9b955981..af13594134 100644 --- a/configs/advanced/master.yaml +++ b/configs/advanced/master.yaml @@ -12,7 +12,6 @@ master: maxPartitionNum: 4096 minSegmentSizeToEnableIndex: 1024 - nodeID: 100 timeout: 3600 # time out, 5 seconds timeTickInterval: 200 # ms diff --git a/configs/advanced/query_service.yaml b/configs/advanced/query_service.yaml deleted file mode 100644 index 5c94293a1f..0000000000 --- a/configs/advanced/query_service.yaml +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright (C) 2019-2020 Zilliz. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software distributed under the License -# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -# or implied. See the License for the specific language governing permissions and limitations under the License. - -queryService: - nodeID: 200 diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 2e5becdf0d..7f5ae838b7 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -20,10 +20,6 @@ etcd: collectionBinlogSubPath: dataservice/binlog/collection # Full Path = rootPath/metaSubPath/collectionBinglogSubPath flushStreamPosSubPath: dataservice/flushstream # Full path = rootPath/metaSubPath/flushStreamPosSubPath statsStreamPosSubPath: dataservice/statsstream # Full path = rootPath/metaSubPath/statsStreamPosSubPath - segmentDmlPosSubPath: dataservice/segmentdml # Full path = rootPath/metaSubPath/segmentDmlPosSubPath - segmentDdlPosSubPath: dataservice/segmentddl # Full path = rootPath/metaSubPath/segmentDdlPosSubPath - dmlChanPosSubPath: dataservice/dmlchannel # Full Path = root/metaSubPath/dmlChanPosSubPath - ddlChanPosSubPath: dataservice/ddlchannel # Full Path = root/metaSubPath/ddlChanPosSubPath minio: address: localhost @@ -49,10 +45,6 @@ queryService: address: localhost port: 19531 -proxyService: - address: localhost - port: 21122 - queryNode: gracefulTime: 1000 # ms, for search port: 21123 @@ -72,11 +64,10 @@ dataNode: port: 21124 log: - level: debug # info, warn, error, panic, fatal, dpanic, + level: debug # info, warn, error, panic, fatal file: rootPath: "" # default to stdout, stderr maxSize: 300 # MB maxAge: 10 # day maxBackups: 20 - dev: true # false, change behaviour of dpaniclevel format: text # text/json diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index 15c734775b..5408f79e17 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -30,6 +30,7 @@ import ( ) func TestMain(t *testing.M) { + Params.InitAlias("datanode-alias-1") Params.Init() refreshChannelNames() code := t.Run() diff --git a/internal/datanode/param_table.go b/internal/datanode/param_table.go index d81912c1be..21101996c2 100644 --- a/internal/datanode/param_table.go +++ b/internal/datanode/param_table.go @@ -34,6 +34,7 @@ type ParamTable struct { InsertBinlogRootPath string StatsBinlogRootPath string Log log.Config + Alias string // Different datanode in one machine // === DataNode External Components Configs === // --- Pulsar --- @@ -63,6 +64,10 @@ type ParamTable struct { var Params ParamTable var once sync.Once +func (p *ParamTable) InitAlias(alias string) { + p.Alias = alias +} + func (p *ParamTable) Init() { once.Do(func() { p.BaseTable.Init() @@ -243,15 +248,6 @@ func (p *ParamTable) initLogCfg() { panic(err) } p.Log.Level = level - devStr, err := p.Load("log.dev") - if err != nil { - panic(err) - } - dev, err := strconv.ParseBool(devStr) - if err != nil { - panic(err) - } - p.Log.Development = dev p.Log.File.MaxSize = p.ParseInt("log.file.maxSize") p.Log.File.MaxBackups = p.ParseInt("log.file.maxBackups") p.Log.File.MaxDays = p.ParseInt("log.file.maxAge") @@ -260,7 +256,7 @@ func (p *ParamTable) initLogCfg() { panic(err) } if len(rootPath) != 0 { - p.Log.File.Filename = path.Join(rootPath, "datanode-"+strconv.FormatInt(p.NodeID, 10)+".log") + p.Log.File.Filename = path.Join(rootPath, "datanode"+p.Alias+".log") } else { p.Log.File.Filename = "" } diff --git a/internal/datanode/param_table_test.go b/internal/datanode/param_table_test.go index 128181888b..26bd2cd45c 100644 --- a/internal/datanode/param_table_test.go +++ b/internal/datanode/param_table_test.go @@ -18,13 +18,17 @@ import ( func TestParamTable_DataNode(t *testing.T) { - Params.Init() - t.Run("Test NodeID", func(t *testing.T) { id := Params.NodeID log.Println("NodeID:", id) }) + t.Run("Test Alias", func(t *testing.T) { + alias := Params.Alias + log.Println("Alias:", alias) + + }) + t.Run("Test flowGraphMaxQueueLength", func(t *testing.T) { length := Params.FlowGraphMaxQueueLength log.Println("flowGraphMaxQueueLength:", length) diff --git a/internal/dataservice/param.go b/internal/dataservice/param.go index 25408a3923..06d6cc8170 100644 --- a/internal/dataservice/param.go +++ b/internal/dataservice/param.go @@ -34,9 +34,6 @@ type ParamTable struct { KvRootPath string SegmentBinlogSubPath string CollectionBinlogSubPath string - SegmentDmlPosSubPath string - SegmentDdlPosSubPath string - DmlChannelPosSubPath string // --- Pulsar --- PulsarAddress string @@ -50,14 +47,10 @@ type ParamTable struct { SegIDAssignExpiration int64 InsertChannelPrefixName string - InsertChannelNum int64 StatisticsChannelName string TimeTickChannelName string - DataNodeNum int SegmentInfoChannelName string DataServiceSubscriptionName string - K2SChannelNames []string - ProxyTimeTickChannelName string Log log.Config } @@ -75,8 +68,6 @@ func (p *ParamTable) Init() { } // set members - p.initNodeID() - p.initEtcdEndpoints() p.initMetaRootPath() p.initKvRootPath() @@ -89,27 +80,17 @@ func (p *ParamTable) Init() { p.initSegmentSizeFactor() p.initSegIDAssignExpiration() p.initInsertChannelPrefixName() - p.initInsertChannelNum() p.initStatisticsChannelName() p.initTimeTickChannelName() - p.initDataNodeNum() p.initSegmentInfoChannelName() p.initDataServiceSubscriptionName() - p.initK2SChannelNames() p.initLogCfg() - p.initProxyServiceTimeTickChannelName() p.initFlushStreamPosSubPath() p.initStatsStreamPosSubPath() - p.initSegmentDmlPosSubPath() - p.initDmlChannelPosSubPath() }) } -func (p *ParamTable) initNodeID() { - p.NodeID = p.ParseInt64("dataservice.nodeID") -} - func (p *ParamTable) initEtcdEndpoints() { endpoints, err := p.Load("_EtcdEndpoints") if err != nil { @@ -186,10 +167,6 @@ func (p *ParamTable) initInsertChannelPrefixName() { } } -func (p *ParamTable) initInsertChannelNum() { - p.InsertChannelNum = p.ParseInt64("dataservice.insertChannelNum") -} - func (p *ParamTable) initStatisticsChannelName() { var err error p.StatisticsChannelName, err = p.Load("msgChannel.chanNamePrefix.dataServiceStatistic") @@ -206,10 +183,6 @@ func (p *ParamTable) initTimeTickChannelName() { } } -func (p *ParamTable) initDataNodeNum() { - p.DataNodeNum = p.ParseInt("dataservice.dataNodeNum") -} - func (p *ParamTable) initSegmentInfoChannelName() { var err error p.SegmentInfoChannelName, err = p.Load("msgChannel.chanNamePrefix.dataServiceSegmentInfo") @@ -226,24 +199,6 @@ func (p *ParamTable) initDataServiceSubscriptionName() { } } -func (p *ParamTable) initK2SChannelNames() { - prefix, err := p.Load("msgChannel.chanNamePrefix.k2s") - if err != nil { - panic(err) - } - prefix += "-" - iRangeStr, err := p.Load("msgChannel.channelRange.k2s") - if err != nil { - panic(err) - } - channelIDs := paramtable.ConvertRangeToIntSlice(iRangeStr, ",") - var ret []string - for _, ID := range channelIDs { - ret = append(ret, prefix+strconv.Itoa(ID)) - } - p.K2SChannelNames = ret -} - func (p *ParamTable) initLogCfg() { p.Log = log.Config{} format, err := p.Load("log.format") @@ -256,15 +211,6 @@ func (p *ParamTable) initLogCfg() { panic(err) } p.Log.Level = level - devStr, err := p.Load("log.dev") - if err != nil { - panic(err) - } - dev, err := strconv.ParseBool(devStr) - if err != nil { - panic(err) - } - p.Log.Development = dev p.Log.File.MaxSize = p.ParseInt("log.file.maxSize") p.Log.File.MaxBackups = p.ParseInt("log.file.maxBackups") p.Log.File.MaxDays = p.ParseInt("log.file.maxAge") @@ -279,14 +225,6 @@ func (p *ParamTable) initLogCfg() { } } -func (p *ParamTable) initProxyServiceTimeTickChannelName() { - ch, err := p.Load("msgChannel.chanNamePrefix.proxyServiceTimeTick") - if err != nil { - panic(err) - } - p.ProxyTimeTickChannelName = ch -} - func (p *ParamTable) initFlushStreamPosSubPath() { subPath, err := p.Load("etcd.flushStreamPosSubPath") if err != nil { @@ -302,19 +240,3 @@ func (p *ParamTable) initStatsStreamPosSubPath() { } p.StatsStreamPosSubPath = subPath } - -func (p *ParamTable) initSegmentDmlPosSubPath() { - subPath, err := p.Load("etcd.segmentDmlPosSubPath") - if err != nil { - panic(err) - } - p.SegmentDmlPosSubPath = subPath -} - -func (p *ParamTable) initDmlChannelPosSubPath() { - subPath, err := p.Load("etcd.dmlChanPosSubPath") - if err != nil { - panic(err) - } - p.DmlChannelPosSubPath = subPath -} diff --git a/internal/dataservice/server_test.go b/internal/dataservice/server_test.go index d5fbb58d54..32a462e02a 100644 --- a/internal/dataservice/server_test.go +++ b/internal/dataservice/server_test.go @@ -405,37 +405,6 @@ func TestChannel(t *testing.T) { assert.Nil(t, err) time.Sleep(time.Second) }) - - t.Run("Test ProxyTimeTickChannel", func(t *testing.T) { - genMsg := func(msgType commonpb.MsgType, t Timestamp) *msgstream.TimeTickMsg { - return &msgstream.TimeTickMsg{ - BaseMsg: msgstream.BaseMsg{ - HashValues: []uint32{0}, - }, - TimeTickMsg: internalpb.TimeTickMsg{ - Base: &commonpb.MsgBase{ - MsgType: msgType, - MsgID: 0, - Timestamp: t, - SourceID: 0, - }, - }, - } - } - - timeTickStream, _ := svr.msFactory.NewMsgStream(svr.ctx) - timeTickStream.AsProducer([]string{Params.ProxyTimeTickChannelName}) - timeTickStream.Start() - defer timeTickStream.Close() - - msgPack := msgstream.MsgPack{} - msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_TimeTick, 123)) - msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_SegmentInfo, 234)) - msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_TimeTick, 345)) - err := timeTickStream.Produce(&msgPack) - assert.Nil(t, err) - time.Sleep(time.Second) - }) } func TestSaveBinlogPaths(t *testing.T) { diff --git a/internal/indexnode/paramtable.go b/internal/indexnode/paramtable.go index c50ef8e7ad..26d6c9ae66 100644 --- a/internal/indexnode/paramtable.go +++ b/internal/indexnode/paramtable.go @@ -40,6 +40,7 @@ type ParamTable struct { Port int NodeID int64 + Alias string MasterAddress string @@ -58,6 +59,10 @@ type ParamTable struct { var Params ParamTable var once sync.Once +func (pt *ParamTable) InitAlias(alias string) { + pt.Alias = alias +} + func (pt *ParamTable) Init() { once.Do(func() { pt.BaseTable.Init() @@ -200,15 +205,6 @@ func (pt *ParamTable) initLogCfg() { panic(err) } pt.Log.Level = level - devStr, err := pt.Load("log.dev") - if err != nil { - panic(err) - } - dev, err := strconv.ParseBool(devStr) - if err != nil { - panic(err) - } - pt.Log.Development = dev pt.Log.File.MaxSize = pt.ParseInt("log.file.maxSize") pt.Log.File.MaxBackups = pt.ParseInt("log.file.maxBackups") pt.Log.File.MaxDays = pt.ParseInt("log.file.maxAge") @@ -217,7 +213,7 @@ func (pt *ParamTable) initLogCfg() { panic(err) } if len(rootPath) != 0 { - pt.Log.File.Filename = path.Join(rootPath, fmt.Sprintf("indexnode-%d.log", pt.NodeID)) + pt.Log.File.Filename = path.Join(rootPath, fmt.Sprintf("indexnode-%s.log", pt.Alias)) } else { pt.Log.File.Filename = "" } diff --git a/internal/indexservice/paramtable.go b/internal/indexservice/paramtable.go index 279ac92afa..fa9655da9d 100644 --- a/internal/indexservice/paramtable.go +++ b/internal/indexservice/paramtable.go @@ -156,15 +156,6 @@ func (pt *ParamTable) initLogCfg() { panic(err) } pt.Log.Level = level - devStr, err := pt.Load("log.dev") - if err != nil { - panic(err) - } - dev, err := strconv.ParseBool(devStr) - if err != nil { - panic(err) - } - pt.Log.Development = dev pt.Log.File.MaxSize = pt.ParseInt("log.file.maxSize") pt.Log.File.MaxBackups = pt.ParseInt("log.file.maxBackups") pt.Log.File.MaxDays = pt.ParseInt("log.file.maxAge") diff --git a/internal/msgstream/msg.go b/internal/msgstream/msg.go index 141cfe0736..15f4127b1b 100644 --- a/internal/msgstream/msg.go +++ b/internal/msgstream/msg.go @@ -585,16 +585,6 @@ func (ss *SegmentStatisticsMsg) Unmarshal(input MarshalType) (TsMsg, error) { return segStatsMsg, nil } -///////////////////////////////////////////Key2Seg////////////////////////////////////////// -//type Key2SegMsg struct { -// BaseMsg -// internalpb.Key2SegMsg -//} -// -//func (k2st *Key2SegMsg) Type() MsgType { -// return -//} - /////////////////////////////////////////CreateCollection////////////////////////////////////////// type CreateCollectionMsg struct { BaseMsg diff --git a/internal/proxynode/paramtable.go b/internal/proxynode/paramtable.go index 534915c8de..42fd820008 100644 --- a/internal/proxynode/paramtable.go +++ b/internal/proxynode/paramtable.go @@ -35,6 +35,7 @@ type ParamTable struct { NetworkPort int IP string NetworkAddress string + Alias string EtcdEndpoints []string MetaRootPath string @@ -43,7 +44,6 @@ type ParamTable struct { ProxyID UniqueID TimeTickInterval time.Duration - K2SChannelNames []string SearchResultChannelNames []string RetrieveResultChannelNames []string ProxySubName string @@ -80,7 +80,6 @@ func (pt *ParamTable) initParams() { pt.initMetaRootPath() pt.initPulsarAddress() pt.initTimeTickInterval() - pt.initK2SChannelNames() pt.initProxySubName() pt.initProxyTimeTickChannelNames() pt.initMsgStreamTimeTickBufSize() @@ -94,6 +93,10 @@ func (pt *ParamTable) initParams() { pt.initRoleName() } +func (pt *ParamTable) InitAlias(alias string) { + pt.Alias = alias +} + func (pt *ParamTable) initPulsarAddress() { ret, err := pt.Load("_PulsarAddress") if err != nil { @@ -114,30 +117,12 @@ func (pt *ParamTable) initTimeTickInterval() { pt.TimeTickInterval = time.Duration(interval) * time.Millisecond } -func (pt *ParamTable) initK2SChannelNames() { - prefix, err := pt.Load("msgChannel.chanNamePrefix.k2s") - if err != nil { - panic(err) - } - prefix += "-" - k2sRangeStr, err := pt.Load("msgChannel.channelRange.k2s") - if err != nil { - panic(err) - } - channelIDs := paramtable.ConvertRangeToIntSlice(k2sRangeStr, ",") - var ret []string - for _, ID := range channelIDs { - ret = append(ret, prefix+strconv.Itoa(ID)) - } - pt.K2SChannelNames = ret -} - func (pt *ParamTable) initProxySubName() { prefix, err := pt.Load("msgChannel.subNamePrefix.proxySubNamePrefix") if err != nil { panic(err) } - pt.ProxySubName = prefix + "-" + strconv.Itoa(int(pt.ProxyID)) + pt.ProxySubName = prefix + "-" + pt.Alias } func (pt *ParamTable) initProxyTimeTickChannelNames() { @@ -253,15 +238,6 @@ func (pt *ParamTable) initLogCfg() { panic(err) } pt.Log.Level = level - devStr, err := pt.Load("log.dev") - if err != nil { - panic(err) - } - dev, err := strconv.ParseBool(devStr) - if err != nil { - panic(err) - } - pt.Log.Development = dev pt.Log.File.MaxSize = pt.ParseInt("log.file.maxSize") pt.Log.File.MaxBackups = pt.ParseInt("log.file.maxBackups") pt.Log.File.MaxDays = pt.ParseInt("log.file.maxAge") @@ -270,14 +246,14 @@ func (pt *ParamTable) initLogCfg() { panic(err) } if len(rootPath) != 0 { - pt.Log.File.Filename = path.Join(rootPath, fmt.Sprintf("proxynode-%d.log", pt.ProxyID)) + pt.Log.File.Filename = path.Join(rootPath, fmt.Sprintf("proxynode%s.log", pt.Alias)) } else { pt.Log.File.Filename = "" } } func (pt *ParamTable) initRoleName() { - pt.RoleName = fmt.Sprintf("%s-%d", "ProxyNode", pt.ProxyID) + pt.RoleName = fmt.Sprintf("%s-%s", "ProxyNode", pt.Alias) } func (pt *ParamTable) initEtcdEndpoints() { diff --git a/internal/querynode/param_table.go b/internal/querynode/param_table.go index 50eb0d3aee..6b0a9529e4 100644 --- a/internal/querynode/param_table.go +++ b/internal/querynode/param_table.go @@ -29,6 +29,7 @@ type ParamTable struct { EtcdEndpoints []string MetaRootPath string + Alias string QueryNodeIP string QueryNodePort int64 QueryNodeID UniqueID @@ -72,6 +73,10 @@ type ParamTable struct { var Params ParamTable var once sync.Once +func (p *ParamTable) InitAlias(alias string) { + p.Alias = alias +} + func (p *ParamTable) Init() { once.Do(func() { p.BaseTable.Init() @@ -93,7 +98,6 @@ func (p *ParamTable) Init() { p.initMetaRootPath() p.initGracefulTime() - p.initMsgChannelSubName() p.initFlowGraphMaxQueueLength() p.initFlowGraphMaxParallelism() @@ -224,12 +228,12 @@ func (p *ParamTable) initGracefulTime() { } func (p *ParamTable) initMsgChannelSubName() { - // TODO: subName = namePrefix + "-" + queryNodeID, queryNodeID is assigned by master - name, err := p.Load("msgChannel.subNamePrefix.queryNodeSubNamePrefix") + namePrefix, err := p.Load("msgChannel.subNamePrefix.queryNodeSubNamePrefix") if err != nil { log.Error(err.Error()) } - p.MsgChannelSubName = name + subName := namePrefix + "-" + strconv.FormatInt(p.QueryNodeID, 10) + p.MsgChannelSubName = subName } func (p *ParamTable) initStatsChannelName() { @@ -252,15 +256,6 @@ func (p *ParamTable) initLogCfg() { panic(err) } p.Log.Level = level - devStr, err := p.Load("log.dev") - if err != nil { - panic(err) - } - dev, err := strconv.ParseBool(devStr) - if err != nil { - panic(err) - } - p.Log.Development = dev p.Log.File.MaxSize = p.ParseInt("log.file.maxSize") p.Log.File.MaxBackups = p.ParseInt("log.file.maxBackups") p.Log.File.MaxDays = p.ParseInt("log.file.maxAge") @@ -269,7 +264,7 @@ func (p *ParamTable) initLogCfg() { panic(err) } if len(rootPath) != 0 { - p.Log.File.Filename = path.Join(rootPath, fmt.Sprintf("querynode-%d.log", p.QueryNodeID)) + p.Log.File.Filename = path.Join(rootPath, fmt.Sprintf("querynode-%s.log", p.Alias)) } else { p.Log.File.Filename = "" } diff --git a/internal/querynode/param_table_test.go b/internal/querynode/param_table_test.go index 338c63c664..16dc54836d 100644 --- a/internal/querynode/param_table_test.go +++ b/internal/querynode/param_table_test.go @@ -87,8 +87,9 @@ func TestParamTable_flowGraphMaxParallelism(t *testing.T) { } func TestParamTable_msgChannelSubName(t *testing.T) { + Params.initMsgChannelSubName() name := Params.MsgChannelSubName - expectName := "queryNode" + expectName := "queryNode-0" assert.Equal(t, expectName, name) } diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index 18ccf1d702..a94c56fa6a 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -115,6 +115,9 @@ func (node *QueryNode) Register() error { node.session.Init(typeutil.QueryNodeRole, Params.QueryNodeIP+":"+strconv.FormatInt(Params.QueryNodePort, 10), false) Params.QueryNodeID = node.session.ServerID log.Debug("query nodeID", zap.Int64("nodeID", Params.QueryNodeID)) + + // This param needs valid QueryNodeID + Params.initMsgChannelSubName() return nil } diff --git a/internal/queryservice/param_table.go b/internal/queryservice/param_table.go index 414274c8e0..d7b2cee8a9 100644 --- a/internal/queryservice/param_table.go +++ b/internal/queryservice/param_table.go @@ -14,7 +14,6 @@ package queryservice import ( "fmt" "path" - "strconv" "strings" "sync" @@ -64,17 +63,11 @@ func (p *ParamTable) Init() { panic(err) } - err = p.LoadYaml("advanced/query_service.yaml") - if err != nil { - panic(err) - } - err = p.LoadYaml("milvus.yaml") if err != nil { panic(err) } - p.initNodeID() p.initLogCfg() p.initStatsChannelName() @@ -91,10 +84,6 @@ func (p *ParamTable) Init() { }) } -func (p *ParamTable) initNodeID() { - p.NodeID = uint64(p.ParseInt64("queryService.nodeID")) -} - func (p *ParamTable) initLogCfg() { p.Log = log.Config{} format, err := p.Load("log.format") @@ -107,15 +96,6 @@ func (p *ParamTable) initLogCfg() { panic(err) } p.Log.Level = level - devStr, err := p.Load("log.dev") - if err != nil { - panic(err) - } - dev, err := strconv.ParseBool(devStr) - if err != nil { - panic(err) - } - p.Log.Development = dev p.Log.File.MaxSize = p.ParseInt("log.file.maxSize") p.Log.File.MaxBackups = p.ParseInt("log.file.maxBackups") p.Log.File.MaxDays = p.ParseInt("log.file.maxAge") diff --git a/internal/rootcoord/param_table.go b/internal/rootcoord/param_table.go index 67c12e47e1..7007849fba 100644 --- a/internal/rootcoord/param_table.go +++ b/internal/rootcoord/param_table.go @@ -13,7 +13,6 @@ package rootcoord import ( "path" - "strconv" "strings" "sync" @@ -200,15 +199,6 @@ func (p *ParamTable) initLogCfg() { panic(err) } p.Log.Level = level - devStr, err := p.Load("log.dev") - if err != nil { - panic(err) - } - dev, err := strconv.ParseBool(devStr) - if err != nil { - panic(err) - } - p.Log.Development = dev p.Log.File.MaxSize = p.ParseInt("log.file.maxSize") p.Log.File.MaxBackups = p.ParseInt("log.file.maxBackups") p.Log.File.MaxDays = p.ParseInt("log.file.maxAge") diff --git a/internal/util/paramtable/param.go b/internal/util/paramtable/param.go index f05dce3562..222dc7dcd6 100644 --- a/internal/util/paramtable/param.go +++ b/internal/util/paramtable/param.go @@ -12,7 +12,6 @@ package paramtable import ( - "strconv" "sync" "github.com/milvus-io/milvus/internal/log" @@ -47,15 +46,6 @@ func (p *ParamTable) initLogCfg() { panic(err) } p.LogConfig.Level = level - devStr, err := p.Load("log.dev") - if err != nil { - panic(err) - } - dev, err := strconv.ParseBool(devStr) - if err != nil { - panic(err) - } - p.LogConfig.Development = dev p.LogConfig.File.MaxSize = p.ParseInt("log.file.maxSize") p.LogConfig.File.MaxBackups = p.ParseInt("log.file.maxBackups") p.LogConfig.File.MaxDays = p.ParseInt("log.file.maxAge")