mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
Add alias in paramtable (#5878)
Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
parent
e523cbe7c0
commit
ab7f642740
@ -244,7 +244,7 @@ func main() {
|
||||
panic(err)
|
||||
}
|
||||
defer removePidFile(fd)
|
||||
role.Run(localMsg)
|
||||
role.Run(localMsg, svrAlias)
|
||||
case "stop":
|
||||
if err := stopPid(filename, runtimeDir); err != nil {
|
||||
panic(err)
|
||||
|
||||
@ -102,12 +102,13 @@ func (mr *MilvusRoles) runRootCoord(ctx context.Context, localMsg bool) *compone
|
||||
return rc
|
||||
}
|
||||
|
||||
func (mr *MilvusRoles) runProxy(ctx context.Context, localMsg bool) *components.Proxy {
|
||||
func (mr *MilvusRoles) runProxy(ctx context.Context, localMsg bool, alias string) *components.Proxy {
|
||||
var pn *components.Proxy
|
||||
var wg sync.WaitGroup
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
proxynode.Params.InitAlias(alias)
|
||||
proxynode.Params.Init()
|
||||
|
||||
if !localMsg {
|
||||
@ -158,12 +159,13 @@ func (mr *MilvusRoles) runQueryCoord(ctx context.Context, localMsg bool) *compon
|
||||
return qs
|
||||
}
|
||||
|
||||
func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool) *components.QueryNode {
|
||||
func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool, alias string) *components.QueryNode {
|
||||
var qn *components.QueryNode
|
||||
var wg sync.WaitGroup
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
querynode.Params.InitAlias(alias)
|
||||
querynode.Params.Init()
|
||||
|
||||
if !localMsg {
|
||||
@ -214,12 +216,13 @@ func (mr *MilvusRoles) runDataCoord(ctx context.Context, localMsg bool) *compone
|
||||
return ds
|
||||
}
|
||||
|
||||
func (mr *MilvusRoles) runDataNode(ctx context.Context, localMsg bool) *components.DataNode {
|
||||
func (mr *MilvusRoles) runDataNode(ctx context.Context, localMsg bool, alias string) *components.DataNode {
|
||||
var dn *components.DataNode
|
||||
var wg sync.WaitGroup
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
datanode.Params.InitAlias(alias)
|
||||
datanode.Params.Init()
|
||||
|
||||
if !localMsg {
|
||||
@ -269,12 +272,13 @@ func (mr *MilvusRoles) runIndexCoord(ctx context.Context, localMsg bool) *compon
|
||||
return is
|
||||
}
|
||||
|
||||
func (mr *MilvusRoles) runIndexNode(ctx context.Context, localMsg bool) *components.IndexNode {
|
||||
func (mr *MilvusRoles) runIndexNode(ctx context.Context, localMsg bool, alias string) *components.IndexNode {
|
||||
var in *components.IndexNode
|
||||
var wg sync.WaitGroup
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
indexnode.Params.InitAlias(alias)
|
||||
indexnode.Params.Init()
|
||||
|
||||
if !localMsg {
|
||||
@ -316,7 +320,7 @@ func (mr *MilvusRoles) runMsgStreamCoord(ctx context.Context) *components.MsgStr
|
||||
return mss
|
||||
}
|
||||
|
||||
func (mr *MilvusRoles) Run(localMsg bool) {
|
||||
func (mr *MilvusRoles) Run(localMsg bool, alias string) {
|
||||
if os.Getenv("DEPLOY_MODE") == "STANDALONE" {
|
||||
closer := trace.InitTracing("standalone")
|
||||
if closer != nil {
|
||||
@ -344,7 +348,7 @@ func (mr *MilvusRoles) Run(localMsg bool) {
|
||||
|
||||
var pn *components.Proxy
|
||||
if mr.EnableProxy {
|
||||
pn = mr.runProxy(ctx, localMsg)
|
||||
pn = mr.runProxy(ctx, localMsg, alias)
|
||||
if pn != nil {
|
||||
defer pn.Stop()
|
||||
}
|
||||
@ -360,7 +364,7 @@ func (mr *MilvusRoles) Run(localMsg bool) {
|
||||
|
||||
var qn *components.QueryNode
|
||||
if mr.EnableQueryNode {
|
||||
qn = mr.runQueryNode(ctx, localMsg)
|
||||
qn = mr.runQueryNode(ctx, localMsg, alias)
|
||||
if qn != nil {
|
||||
defer qn.Stop()
|
||||
}
|
||||
@ -376,7 +380,7 @@ func (mr *MilvusRoles) Run(localMsg bool) {
|
||||
|
||||
var dn *components.DataNode
|
||||
if mr.EnableDataNode {
|
||||
dn = mr.runDataNode(ctx, localMsg)
|
||||
dn = mr.runDataNode(ctx, localMsg, alias)
|
||||
if dn != nil {
|
||||
defer dn.Stop()
|
||||
}
|
||||
@ -392,7 +396,7 @@ func (mr *MilvusRoles) Run(localMsg bool) {
|
||||
|
||||
var in *components.IndexNode
|
||||
if mr.EnableIndexNode {
|
||||
in = mr.runIndexNode(ctx, localMsg)
|
||||
in = mr.runIndexNode(ctx, localMsg, alias)
|
||||
if in != nil {
|
||||
defer in.Stop()
|
||||
}
|
||||
|
||||
@ -12,14 +12,11 @@
|
||||
msgChannel:
|
||||
# channel name generation rule: ${namePrefix}-${ChannelIdx}
|
||||
chanNamePrefix:
|
||||
dataDefinition: "data-definition"
|
||||
masterTimeTick: "master-timetick"
|
||||
masterStatistics: "master-statistics"
|
||||
search: "search"
|
||||
searchResult: "searchResult"
|
||||
k2s: "k2s"
|
||||
proxyTimeTick: "proxyTimeTick"
|
||||
proxyServiceTimeTick: "proxyServiceTimeTick"
|
||||
queryTimeTick: "queryTimeTick"
|
||||
queryNodeStats: "query-node-stats"
|
||||
# cmd for loadIndex, flush, etc...
|
||||
@ -36,7 +33,3 @@ msgChannel:
|
||||
queryNodeSubNamePrefix: "queryNode"
|
||||
dataNodeSubNamePrefix: "dataNode"
|
||||
dataServiceSubNamePrefix: "dataService"
|
||||
|
||||
# default channel range [0, 1)
|
||||
channelRange:
|
||||
k2s: [0, 1]
|
||||
|
||||
@ -10,10 +10,7 @@
|
||||
# or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
dataservice:
|
||||
nodeID: 14040
|
||||
segment:
|
||||
size: 512 # MB
|
||||
sizeFactor: 0.75
|
||||
IDAssignExpiration: 2000 # ms
|
||||
insertChannelNum: 2
|
||||
dataNodeNum: 1
|
||||
|
||||
@ -12,7 +12,6 @@
|
||||
master:
|
||||
maxPartitionNum: 4096
|
||||
minSegmentSizeToEnableIndex: 1024
|
||||
nodeID: 100
|
||||
timeout: 3600 # time out, 5 seconds
|
||||
timeTickInterval: 200 # ms
|
||||
|
||||
|
||||
@ -1,13 +0,0 @@
|
||||
# Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
# or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
queryService:
|
||||
nodeID: 200
|
||||
@ -20,10 +20,6 @@ etcd:
|
||||
collectionBinlogSubPath: dataservice/binlog/collection # Full Path = rootPath/metaSubPath/collectionBinglogSubPath
|
||||
flushStreamPosSubPath: dataservice/flushstream # Full path = rootPath/metaSubPath/flushStreamPosSubPath
|
||||
statsStreamPosSubPath: dataservice/statsstream # Full path = rootPath/metaSubPath/statsStreamPosSubPath
|
||||
segmentDmlPosSubPath: dataservice/segmentdml # Full path = rootPath/metaSubPath/segmentDmlPosSubPath
|
||||
segmentDdlPosSubPath: dataservice/segmentddl # Full path = rootPath/metaSubPath/segmentDdlPosSubPath
|
||||
dmlChanPosSubPath: dataservice/dmlchannel # Full Path = root/metaSubPath/dmlChanPosSubPath
|
||||
ddlChanPosSubPath: dataservice/ddlchannel # Full Path = root/metaSubPath/ddlChanPosSubPath
|
||||
|
||||
minio:
|
||||
address: localhost
|
||||
@ -49,10 +45,6 @@ queryService:
|
||||
address: localhost
|
||||
port: 19531
|
||||
|
||||
proxyService:
|
||||
address: localhost
|
||||
port: 21122
|
||||
|
||||
queryNode:
|
||||
gracefulTime: 1000 # ms, for search
|
||||
port: 21123
|
||||
@ -72,11 +64,10 @@ dataNode:
|
||||
port: 21124
|
||||
|
||||
log:
|
||||
level: debug # info, warn, error, panic, fatal, dpanic,
|
||||
level: debug # info, warn, error, panic, fatal
|
||||
file:
|
||||
rootPath: "" # default to stdout, stderr
|
||||
maxSize: 300 # MB
|
||||
maxAge: 10 # day
|
||||
maxBackups: 20
|
||||
dev: true # false, change behaviour of dpaniclevel
|
||||
format: text # text/json
|
||||
|
||||
@ -30,6 +30,7 @@ import (
|
||||
)
|
||||
|
||||
func TestMain(t *testing.M) {
|
||||
Params.InitAlias("datanode-alias-1")
|
||||
Params.Init()
|
||||
refreshChannelNames()
|
||||
code := t.Run()
|
||||
|
||||
@ -34,6 +34,7 @@ type ParamTable struct {
|
||||
InsertBinlogRootPath string
|
||||
StatsBinlogRootPath string
|
||||
Log log.Config
|
||||
Alias string // Different datanode in one machine
|
||||
|
||||
// === DataNode External Components Configs ===
|
||||
// --- Pulsar ---
|
||||
@ -63,6 +64,10 @@ type ParamTable struct {
|
||||
var Params ParamTable
|
||||
var once sync.Once
|
||||
|
||||
func (p *ParamTable) InitAlias(alias string) {
|
||||
p.Alias = alias
|
||||
}
|
||||
|
||||
func (p *ParamTable) Init() {
|
||||
once.Do(func() {
|
||||
p.BaseTable.Init()
|
||||
@ -243,15 +248,6 @@ func (p *ParamTable) initLogCfg() {
|
||||
panic(err)
|
||||
}
|
||||
p.Log.Level = level
|
||||
devStr, err := p.Load("log.dev")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
dev, err := strconv.ParseBool(devStr)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
p.Log.Development = dev
|
||||
p.Log.File.MaxSize = p.ParseInt("log.file.maxSize")
|
||||
p.Log.File.MaxBackups = p.ParseInt("log.file.maxBackups")
|
||||
p.Log.File.MaxDays = p.ParseInt("log.file.maxAge")
|
||||
@ -260,7 +256,7 @@ func (p *ParamTable) initLogCfg() {
|
||||
panic(err)
|
||||
}
|
||||
if len(rootPath) != 0 {
|
||||
p.Log.File.Filename = path.Join(rootPath, "datanode-"+strconv.FormatInt(p.NodeID, 10)+".log")
|
||||
p.Log.File.Filename = path.Join(rootPath, "datanode"+p.Alias+".log")
|
||||
} else {
|
||||
p.Log.File.Filename = ""
|
||||
}
|
||||
|
||||
@ -18,13 +18,17 @@ import (
|
||||
|
||||
func TestParamTable_DataNode(t *testing.T) {
|
||||
|
||||
Params.Init()
|
||||
|
||||
t.Run("Test NodeID", func(t *testing.T) {
|
||||
id := Params.NodeID
|
||||
log.Println("NodeID:", id)
|
||||
})
|
||||
|
||||
t.Run("Test Alias", func(t *testing.T) {
|
||||
alias := Params.Alias
|
||||
log.Println("Alias:", alias)
|
||||
|
||||
})
|
||||
|
||||
t.Run("Test flowGraphMaxQueueLength", func(t *testing.T) {
|
||||
length := Params.FlowGraphMaxQueueLength
|
||||
log.Println("flowGraphMaxQueueLength:", length)
|
||||
|
||||
@ -34,9 +34,6 @@ type ParamTable struct {
|
||||
KvRootPath string
|
||||
SegmentBinlogSubPath string
|
||||
CollectionBinlogSubPath string
|
||||
SegmentDmlPosSubPath string
|
||||
SegmentDdlPosSubPath string
|
||||
DmlChannelPosSubPath string
|
||||
|
||||
// --- Pulsar ---
|
||||
PulsarAddress string
|
||||
@ -50,14 +47,10 @@ type ParamTable struct {
|
||||
SegIDAssignExpiration int64
|
||||
|
||||
InsertChannelPrefixName string
|
||||
InsertChannelNum int64
|
||||
StatisticsChannelName string
|
||||
TimeTickChannelName string
|
||||
DataNodeNum int
|
||||
SegmentInfoChannelName string
|
||||
DataServiceSubscriptionName string
|
||||
K2SChannelNames []string
|
||||
ProxyTimeTickChannelName string
|
||||
|
||||
Log log.Config
|
||||
}
|
||||
@ -75,8 +68,6 @@ func (p *ParamTable) Init() {
|
||||
}
|
||||
|
||||
// set members
|
||||
p.initNodeID()
|
||||
|
||||
p.initEtcdEndpoints()
|
||||
p.initMetaRootPath()
|
||||
p.initKvRootPath()
|
||||
@ -89,27 +80,17 @@ func (p *ParamTable) Init() {
|
||||
p.initSegmentSizeFactor()
|
||||
p.initSegIDAssignExpiration()
|
||||
p.initInsertChannelPrefixName()
|
||||
p.initInsertChannelNum()
|
||||
p.initStatisticsChannelName()
|
||||
p.initTimeTickChannelName()
|
||||
p.initDataNodeNum()
|
||||
p.initSegmentInfoChannelName()
|
||||
p.initDataServiceSubscriptionName()
|
||||
p.initK2SChannelNames()
|
||||
p.initLogCfg()
|
||||
p.initProxyServiceTimeTickChannelName()
|
||||
|
||||
p.initFlushStreamPosSubPath()
|
||||
p.initStatsStreamPosSubPath()
|
||||
p.initSegmentDmlPosSubPath()
|
||||
p.initDmlChannelPosSubPath()
|
||||
})
|
||||
}
|
||||
|
||||
func (p *ParamTable) initNodeID() {
|
||||
p.NodeID = p.ParseInt64("dataservice.nodeID")
|
||||
}
|
||||
|
||||
func (p *ParamTable) initEtcdEndpoints() {
|
||||
endpoints, err := p.Load("_EtcdEndpoints")
|
||||
if err != nil {
|
||||
@ -186,10 +167,6 @@ func (p *ParamTable) initInsertChannelPrefixName() {
|
||||
}
|
||||
}
|
||||
|
||||
func (p *ParamTable) initInsertChannelNum() {
|
||||
p.InsertChannelNum = p.ParseInt64("dataservice.insertChannelNum")
|
||||
}
|
||||
|
||||
func (p *ParamTable) initStatisticsChannelName() {
|
||||
var err error
|
||||
p.StatisticsChannelName, err = p.Load("msgChannel.chanNamePrefix.dataServiceStatistic")
|
||||
@ -206,10 +183,6 @@ func (p *ParamTable) initTimeTickChannelName() {
|
||||
}
|
||||
}
|
||||
|
||||
func (p *ParamTable) initDataNodeNum() {
|
||||
p.DataNodeNum = p.ParseInt("dataservice.dataNodeNum")
|
||||
}
|
||||
|
||||
func (p *ParamTable) initSegmentInfoChannelName() {
|
||||
var err error
|
||||
p.SegmentInfoChannelName, err = p.Load("msgChannel.chanNamePrefix.dataServiceSegmentInfo")
|
||||
@ -226,24 +199,6 @@ func (p *ParamTable) initDataServiceSubscriptionName() {
|
||||
}
|
||||
}
|
||||
|
||||
func (p *ParamTable) initK2SChannelNames() {
|
||||
prefix, err := p.Load("msgChannel.chanNamePrefix.k2s")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
prefix += "-"
|
||||
iRangeStr, err := p.Load("msgChannel.channelRange.k2s")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
channelIDs := paramtable.ConvertRangeToIntSlice(iRangeStr, ",")
|
||||
var ret []string
|
||||
for _, ID := range channelIDs {
|
||||
ret = append(ret, prefix+strconv.Itoa(ID))
|
||||
}
|
||||
p.K2SChannelNames = ret
|
||||
}
|
||||
|
||||
func (p *ParamTable) initLogCfg() {
|
||||
p.Log = log.Config{}
|
||||
format, err := p.Load("log.format")
|
||||
@ -256,15 +211,6 @@ func (p *ParamTable) initLogCfg() {
|
||||
panic(err)
|
||||
}
|
||||
p.Log.Level = level
|
||||
devStr, err := p.Load("log.dev")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
dev, err := strconv.ParseBool(devStr)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
p.Log.Development = dev
|
||||
p.Log.File.MaxSize = p.ParseInt("log.file.maxSize")
|
||||
p.Log.File.MaxBackups = p.ParseInt("log.file.maxBackups")
|
||||
p.Log.File.MaxDays = p.ParseInt("log.file.maxAge")
|
||||
@ -279,14 +225,6 @@ func (p *ParamTable) initLogCfg() {
|
||||
}
|
||||
}
|
||||
|
||||
func (p *ParamTable) initProxyServiceTimeTickChannelName() {
|
||||
ch, err := p.Load("msgChannel.chanNamePrefix.proxyServiceTimeTick")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
p.ProxyTimeTickChannelName = ch
|
||||
}
|
||||
|
||||
func (p *ParamTable) initFlushStreamPosSubPath() {
|
||||
subPath, err := p.Load("etcd.flushStreamPosSubPath")
|
||||
if err != nil {
|
||||
@ -302,19 +240,3 @@ func (p *ParamTable) initStatsStreamPosSubPath() {
|
||||
}
|
||||
p.StatsStreamPosSubPath = subPath
|
||||
}
|
||||
|
||||
func (p *ParamTable) initSegmentDmlPosSubPath() {
|
||||
subPath, err := p.Load("etcd.segmentDmlPosSubPath")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
p.SegmentDmlPosSubPath = subPath
|
||||
}
|
||||
|
||||
func (p *ParamTable) initDmlChannelPosSubPath() {
|
||||
subPath, err := p.Load("etcd.dmlChanPosSubPath")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
p.DmlChannelPosSubPath = subPath
|
||||
}
|
||||
|
||||
@ -405,37 +405,6 @@ func TestChannel(t *testing.T) {
|
||||
assert.Nil(t, err)
|
||||
time.Sleep(time.Second)
|
||||
})
|
||||
|
||||
t.Run("Test ProxyTimeTickChannel", func(t *testing.T) {
|
||||
genMsg := func(msgType commonpb.MsgType, t Timestamp) *msgstream.TimeTickMsg {
|
||||
return &msgstream.TimeTickMsg{
|
||||
BaseMsg: msgstream.BaseMsg{
|
||||
HashValues: []uint32{0},
|
||||
},
|
||||
TimeTickMsg: internalpb.TimeTickMsg{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: msgType,
|
||||
MsgID: 0,
|
||||
Timestamp: t,
|
||||
SourceID: 0,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
timeTickStream, _ := svr.msFactory.NewMsgStream(svr.ctx)
|
||||
timeTickStream.AsProducer([]string{Params.ProxyTimeTickChannelName})
|
||||
timeTickStream.Start()
|
||||
defer timeTickStream.Close()
|
||||
|
||||
msgPack := msgstream.MsgPack{}
|
||||
msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_TimeTick, 123))
|
||||
msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_SegmentInfo, 234))
|
||||
msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_TimeTick, 345))
|
||||
err := timeTickStream.Produce(&msgPack)
|
||||
assert.Nil(t, err)
|
||||
time.Sleep(time.Second)
|
||||
})
|
||||
}
|
||||
|
||||
func TestSaveBinlogPaths(t *testing.T) {
|
||||
|
||||
@ -40,6 +40,7 @@ type ParamTable struct {
|
||||
Port int
|
||||
|
||||
NodeID int64
|
||||
Alias string
|
||||
|
||||
MasterAddress string
|
||||
|
||||
@ -58,6 +59,10 @@ type ParamTable struct {
|
||||
var Params ParamTable
|
||||
var once sync.Once
|
||||
|
||||
func (pt *ParamTable) InitAlias(alias string) {
|
||||
pt.Alias = alias
|
||||
}
|
||||
|
||||
func (pt *ParamTable) Init() {
|
||||
once.Do(func() {
|
||||
pt.BaseTable.Init()
|
||||
@ -200,15 +205,6 @@ func (pt *ParamTable) initLogCfg() {
|
||||
panic(err)
|
||||
}
|
||||
pt.Log.Level = level
|
||||
devStr, err := pt.Load("log.dev")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
dev, err := strconv.ParseBool(devStr)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
pt.Log.Development = dev
|
||||
pt.Log.File.MaxSize = pt.ParseInt("log.file.maxSize")
|
||||
pt.Log.File.MaxBackups = pt.ParseInt("log.file.maxBackups")
|
||||
pt.Log.File.MaxDays = pt.ParseInt("log.file.maxAge")
|
||||
@ -217,7 +213,7 @@ func (pt *ParamTable) initLogCfg() {
|
||||
panic(err)
|
||||
}
|
||||
if len(rootPath) != 0 {
|
||||
pt.Log.File.Filename = path.Join(rootPath, fmt.Sprintf("indexnode-%d.log", pt.NodeID))
|
||||
pt.Log.File.Filename = path.Join(rootPath, fmt.Sprintf("indexnode-%s.log", pt.Alias))
|
||||
} else {
|
||||
pt.Log.File.Filename = ""
|
||||
}
|
||||
|
||||
@ -156,15 +156,6 @@ func (pt *ParamTable) initLogCfg() {
|
||||
panic(err)
|
||||
}
|
||||
pt.Log.Level = level
|
||||
devStr, err := pt.Load("log.dev")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
dev, err := strconv.ParseBool(devStr)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
pt.Log.Development = dev
|
||||
pt.Log.File.MaxSize = pt.ParseInt("log.file.maxSize")
|
||||
pt.Log.File.MaxBackups = pt.ParseInt("log.file.maxBackups")
|
||||
pt.Log.File.MaxDays = pt.ParseInt("log.file.maxAge")
|
||||
|
||||
@ -585,16 +585,6 @@ func (ss *SegmentStatisticsMsg) Unmarshal(input MarshalType) (TsMsg, error) {
|
||||
return segStatsMsg, nil
|
||||
}
|
||||
|
||||
///////////////////////////////////////////Key2Seg//////////////////////////////////////////
|
||||
//type Key2SegMsg struct {
|
||||
// BaseMsg
|
||||
// internalpb.Key2SegMsg
|
||||
//}
|
||||
//
|
||||
//func (k2st *Key2SegMsg) Type() MsgType {
|
||||
// return
|
||||
//}
|
||||
|
||||
/////////////////////////////////////////CreateCollection//////////////////////////////////////////
|
||||
type CreateCollectionMsg struct {
|
||||
BaseMsg
|
||||
|
||||
@ -35,6 +35,7 @@ type ParamTable struct {
|
||||
NetworkPort int
|
||||
IP string
|
||||
NetworkAddress string
|
||||
Alias string
|
||||
|
||||
EtcdEndpoints []string
|
||||
MetaRootPath string
|
||||
@ -43,7 +44,6 @@ type ParamTable struct {
|
||||
|
||||
ProxyID UniqueID
|
||||
TimeTickInterval time.Duration
|
||||
K2SChannelNames []string
|
||||
SearchResultChannelNames []string
|
||||
RetrieveResultChannelNames []string
|
||||
ProxySubName string
|
||||
@ -80,7 +80,6 @@ func (pt *ParamTable) initParams() {
|
||||
pt.initMetaRootPath()
|
||||
pt.initPulsarAddress()
|
||||
pt.initTimeTickInterval()
|
||||
pt.initK2SChannelNames()
|
||||
pt.initProxySubName()
|
||||
pt.initProxyTimeTickChannelNames()
|
||||
pt.initMsgStreamTimeTickBufSize()
|
||||
@ -94,6 +93,10 @@ func (pt *ParamTable) initParams() {
|
||||
pt.initRoleName()
|
||||
}
|
||||
|
||||
func (pt *ParamTable) InitAlias(alias string) {
|
||||
pt.Alias = alias
|
||||
}
|
||||
|
||||
func (pt *ParamTable) initPulsarAddress() {
|
||||
ret, err := pt.Load("_PulsarAddress")
|
||||
if err != nil {
|
||||
@ -114,30 +117,12 @@ func (pt *ParamTable) initTimeTickInterval() {
|
||||
pt.TimeTickInterval = time.Duration(interval) * time.Millisecond
|
||||
}
|
||||
|
||||
func (pt *ParamTable) initK2SChannelNames() {
|
||||
prefix, err := pt.Load("msgChannel.chanNamePrefix.k2s")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
prefix += "-"
|
||||
k2sRangeStr, err := pt.Load("msgChannel.channelRange.k2s")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
channelIDs := paramtable.ConvertRangeToIntSlice(k2sRangeStr, ",")
|
||||
var ret []string
|
||||
for _, ID := range channelIDs {
|
||||
ret = append(ret, prefix+strconv.Itoa(ID))
|
||||
}
|
||||
pt.K2SChannelNames = ret
|
||||
}
|
||||
|
||||
func (pt *ParamTable) initProxySubName() {
|
||||
prefix, err := pt.Load("msgChannel.subNamePrefix.proxySubNamePrefix")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
pt.ProxySubName = prefix + "-" + strconv.Itoa(int(pt.ProxyID))
|
||||
pt.ProxySubName = prefix + "-" + pt.Alias
|
||||
}
|
||||
|
||||
func (pt *ParamTable) initProxyTimeTickChannelNames() {
|
||||
@ -253,15 +238,6 @@ func (pt *ParamTable) initLogCfg() {
|
||||
panic(err)
|
||||
}
|
||||
pt.Log.Level = level
|
||||
devStr, err := pt.Load("log.dev")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
dev, err := strconv.ParseBool(devStr)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
pt.Log.Development = dev
|
||||
pt.Log.File.MaxSize = pt.ParseInt("log.file.maxSize")
|
||||
pt.Log.File.MaxBackups = pt.ParseInt("log.file.maxBackups")
|
||||
pt.Log.File.MaxDays = pt.ParseInt("log.file.maxAge")
|
||||
@ -270,14 +246,14 @@ func (pt *ParamTable) initLogCfg() {
|
||||
panic(err)
|
||||
}
|
||||
if len(rootPath) != 0 {
|
||||
pt.Log.File.Filename = path.Join(rootPath, fmt.Sprintf("proxynode-%d.log", pt.ProxyID))
|
||||
pt.Log.File.Filename = path.Join(rootPath, fmt.Sprintf("proxynode%s.log", pt.Alias))
|
||||
} else {
|
||||
pt.Log.File.Filename = ""
|
||||
}
|
||||
}
|
||||
|
||||
func (pt *ParamTable) initRoleName() {
|
||||
pt.RoleName = fmt.Sprintf("%s-%d", "ProxyNode", pt.ProxyID)
|
||||
pt.RoleName = fmt.Sprintf("%s-%s", "ProxyNode", pt.Alias)
|
||||
}
|
||||
|
||||
func (pt *ParamTable) initEtcdEndpoints() {
|
||||
|
||||
@ -29,6 +29,7 @@ type ParamTable struct {
|
||||
EtcdEndpoints []string
|
||||
MetaRootPath string
|
||||
|
||||
Alias string
|
||||
QueryNodeIP string
|
||||
QueryNodePort int64
|
||||
QueryNodeID UniqueID
|
||||
@ -72,6 +73,10 @@ type ParamTable struct {
|
||||
var Params ParamTable
|
||||
var once sync.Once
|
||||
|
||||
func (p *ParamTable) InitAlias(alias string) {
|
||||
p.Alias = alias
|
||||
}
|
||||
|
||||
func (p *ParamTable) Init() {
|
||||
once.Do(func() {
|
||||
p.BaseTable.Init()
|
||||
@ -93,7 +98,6 @@ func (p *ParamTable) Init() {
|
||||
p.initMetaRootPath()
|
||||
|
||||
p.initGracefulTime()
|
||||
p.initMsgChannelSubName()
|
||||
|
||||
p.initFlowGraphMaxQueueLength()
|
||||
p.initFlowGraphMaxParallelism()
|
||||
@ -224,12 +228,12 @@ func (p *ParamTable) initGracefulTime() {
|
||||
}
|
||||
|
||||
func (p *ParamTable) initMsgChannelSubName() {
|
||||
// TODO: subName = namePrefix + "-" + queryNodeID, queryNodeID is assigned by master
|
||||
name, err := p.Load("msgChannel.subNamePrefix.queryNodeSubNamePrefix")
|
||||
namePrefix, err := p.Load("msgChannel.subNamePrefix.queryNodeSubNamePrefix")
|
||||
if err != nil {
|
||||
log.Error(err.Error())
|
||||
}
|
||||
p.MsgChannelSubName = name
|
||||
subName := namePrefix + "-" + strconv.FormatInt(p.QueryNodeID, 10)
|
||||
p.MsgChannelSubName = subName
|
||||
}
|
||||
|
||||
func (p *ParamTable) initStatsChannelName() {
|
||||
@ -252,15 +256,6 @@ func (p *ParamTable) initLogCfg() {
|
||||
panic(err)
|
||||
}
|
||||
p.Log.Level = level
|
||||
devStr, err := p.Load("log.dev")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
dev, err := strconv.ParseBool(devStr)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
p.Log.Development = dev
|
||||
p.Log.File.MaxSize = p.ParseInt("log.file.maxSize")
|
||||
p.Log.File.MaxBackups = p.ParseInt("log.file.maxBackups")
|
||||
p.Log.File.MaxDays = p.ParseInt("log.file.maxAge")
|
||||
@ -269,7 +264,7 @@ func (p *ParamTable) initLogCfg() {
|
||||
panic(err)
|
||||
}
|
||||
if len(rootPath) != 0 {
|
||||
p.Log.File.Filename = path.Join(rootPath, fmt.Sprintf("querynode-%d.log", p.QueryNodeID))
|
||||
p.Log.File.Filename = path.Join(rootPath, fmt.Sprintf("querynode-%s.log", p.Alias))
|
||||
} else {
|
||||
p.Log.File.Filename = ""
|
||||
}
|
||||
|
||||
@ -87,8 +87,9 @@ func TestParamTable_flowGraphMaxParallelism(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestParamTable_msgChannelSubName(t *testing.T) {
|
||||
Params.initMsgChannelSubName()
|
||||
name := Params.MsgChannelSubName
|
||||
expectName := "queryNode"
|
||||
expectName := "queryNode-0"
|
||||
assert.Equal(t, expectName, name)
|
||||
}
|
||||
|
||||
|
||||
@ -115,6 +115,9 @@ func (node *QueryNode) Register() error {
|
||||
node.session.Init(typeutil.QueryNodeRole, Params.QueryNodeIP+":"+strconv.FormatInt(Params.QueryNodePort, 10), false)
|
||||
Params.QueryNodeID = node.session.ServerID
|
||||
log.Debug("query nodeID", zap.Int64("nodeID", Params.QueryNodeID))
|
||||
|
||||
// This param needs valid QueryNodeID
|
||||
Params.initMsgChannelSubName()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@ -14,7 +14,6 @@ package queryservice
|
||||
import (
|
||||
"fmt"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
@ -64,17 +63,11 @@ func (p *ParamTable) Init() {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
err = p.LoadYaml("advanced/query_service.yaml")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
err = p.LoadYaml("milvus.yaml")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
p.initNodeID()
|
||||
p.initLogCfg()
|
||||
|
||||
p.initStatsChannelName()
|
||||
@ -91,10 +84,6 @@ func (p *ParamTable) Init() {
|
||||
})
|
||||
}
|
||||
|
||||
func (p *ParamTable) initNodeID() {
|
||||
p.NodeID = uint64(p.ParseInt64("queryService.nodeID"))
|
||||
}
|
||||
|
||||
func (p *ParamTable) initLogCfg() {
|
||||
p.Log = log.Config{}
|
||||
format, err := p.Load("log.format")
|
||||
@ -107,15 +96,6 @@ func (p *ParamTable) initLogCfg() {
|
||||
panic(err)
|
||||
}
|
||||
p.Log.Level = level
|
||||
devStr, err := p.Load("log.dev")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
dev, err := strconv.ParseBool(devStr)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
p.Log.Development = dev
|
||||
p.Log.File.MaxSize = p.ParseInt("log.file.maxSize")
|
||||
p.Log.File.MaxBackups = p.ParseInt("log.file.maxBackups")
|
||||
p.Log.File.MaxDays = p.ParseInt("log.file.maxAge")
|
||||
|
||||
@ -13,7 +13,6 @@ package rootcoord
|
||||
|
||||
import (
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
@ -200,15 +199,6 @@ func (p *ParamTable) initLogCfg() {
|
||||
panic(err)
|
||||
}
|
||||
p.Log.Level = level
|
||||
devStr, err := p.Load("log.dev")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
dev, err := strconv.ParseBool(devStr)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
p.Log.Development = dev
|
||||
p.Log.File.MaxSize = p.ParseInt("log.file.maxSize")
|
||||
p.Log.File.MaxBackups = p.ParseInt("log.file.maxBackups")
|
||||
p.Log.File.MaxDays = p.ParseInt("log.file.maxAge")
|
||||
|
||||
@ -12,7 +12,6 @@
|
||||
package paramtable
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
@ -47,15 +46,6 @@ func (p *ParamTable) initLogCfg() {
|
||||
panic(err)
|
||||
}
|
||||
p.LogConfig.Level = level
|
||||
devStr, err := p.Load("log.dev")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
dev, err := strconv.ParseBool(devStr)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
p.LogConfig.Development = dev
|
||||
p.LogConfig.File.MaxSize = p.ParseInt("log.file.maxSize")
|
||||
p.LogConfig.File.MaxBackups = p.ParseInt("log.file.maxBackups")
|
||||
p.LogConfig.File.MaxDays = p.ParseInt("log.file.maxAge")
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user