From 39614aa8eb9b8a7a95dcf9b343893e4a9f672301 Mon Sep 17 00:00:00 2001 From: yukun Date: Fri, 25 Jun 2021 19:44:11 +0800 Subject: [PATCH] Add rocksmq_path in config and paramtable (#6099) * Add rocksmq_path in config and paramtable Signed-off-by: fishpenguin * Add rocksdbPath in NewRmsFactory Signed-off-by: fishpenguin * Change rdb default path to /vat/lib/milvus/rdb_data Signed-off-by: fishpenguin --- cmd/roles/roles.go | 18 ++++++++++-------- configs/milvus.yaml | 3 +++ docs/developer_guides/chap05_proxy.md | 1 + internal/datacoord/param.go | 12 ++++++++++++ internal/datanode/param_table.go | 13 +++++++++++++ internal/msgstream/mq_factory.go | 6 ++++-- internal/proxy/paramtable.go | 10 ++++++++++ internal/querynode/param_table.go | 10 ++++++++++ internal/rootcoord/param_table.go | 10 ++++++++++ internal/util/paramtable/basetable.go | 13 +++++++++++++ 10 files changed, 86 insertions(+), 10 deletions(-) diff --git a/cmd/roles/roles.go b/cmd/roles/roles.go index ca8dde78a4..1606c4e801 100644 --- a/cmd/roles/roles.go +++ b/cmd/roles/roles.go @@ -38,9 +38,9 @@ import ( "github.com/milvus-io/milvus/internal/util/trace" ) -func newMsgFactory(localMsg bool) msgstream.Factory { +func newMsgFactory(localMsg bool, rocksmqPath string) msgstream.Factory { if localMsg { - return msgstream.NewRmsFactory() + return msgstream.NewRmsFactory(rocksmqPath) } return msgstream.NewPmsFactory() } @@ -87,7 +87,7 @@ func (mr *MilvusRoles) runRootCoord(ctx context.Context, localMsg bool) *compone defer log.Sync() } - factory := newMsgFactory(localMsg) + factory := newMsgFactory(localMsg, rootcoord.Params.RocksmqPath) var err error rc, err = components.NewRootCoord(ctx, factory) if err != nil { @@ -116,7 +116,7 @@ func (mr *MilvusRoles) runProxy(ctx context.Context, localMsg bool, alias string defer log.Sync() } - factory := newMsgFactory(localMsg) + factory := newMsgFactory(localMsg, proxy.Params.RocksmqPath) var err error pn, err = components.NewProxy(ctx, factory) if err != nil { @@ -144,7 +144,9 @@ func (mr *MilvusRoles) runQueryCoord(ctx context.Context, localMsg bool) *compon defer log.Sync() } - factory := newMsgFactory(localMsg) + // FIXME(yukun): newMsgFactory requires parameter rocksmqPath, but won't be used here + // so hardcode the path to /tmp/invalid_milvus_rdb + factory := newMsgFactory(localMsg, "/tmp/invalid_milvus_rdb") var err error qs, err = components.NewQueryCoord(ctx, factory) if err != nil { @@ -173,7 +175,7 @@ func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool, alias st defer log.Sync() } - factory := newMsgFactory(localMsg) + factory := newMsgFactory(localMsg, querynode.Params.RocksmqPath) var err error qn, err = components.NewQueryNode(ctx, factory) if err != nil { @@ -201,7 +203,7 @@ func (mr *MilvusRoles) runDataCoord(ctx context.Context, localMsg bool) *compone defer log.Sync() } - factory := newMsgFactory(localMsg) + factory := newMsgFactory(localMsg, datacoord.Params.RocksmqPath) var err error ds, err = components.NewDataCoord(ctx, factory) if err != nil { @@ -230,7 +232,7 @@ func (mr *MilvusRoles) runDataNode(ctx context.Context, localMsg bool, alias str defer log.Sync() } - factory := newMsgFactory(localMsg) + factory := newMsgFactory(localMsg, datanode.Params.RocksmqPath) var err error dn, err = components.NewDataNode(ctx, factory) if err != nil { diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 1bd91d3ce3..937c4c1a18 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -34,6 +34,9 @@ pulsar: port: 6650 maxMessageSize: 5242880 # 5 * 1024 * 1024 Bytes +rocksmq: + path: /var/lib/milvus/rdb_data + rootCoord: address: localhost port: 53100 diff --git a/docs/developer_guides/chap05_proxy.md b/docs/developer_guides/chap05_proxy.md index a52c62dccf..1c6a19271d 100644 --- a/docs/developer_guides/chap05_proxy.md +++ b/docs/developer_guides/chap05_proxy.md @@ -381,6 +381,7 @@ type GlobalParamsTable struct { MasterAddress string PulsarAddress string + RocksmqPath string ProxyID UniqueID TimeTickInterval time.Duration diff --git a/internal/datacoord/param.go b/internal/datacoord/param.go index c1f13b756c..445c29ac55 100644 --- a/internal/datacoord/param.go +++ b/internal/datacoord/param.go @@ -38,6 +38,9 @@ type ParamTable struct { // --- Pulsar --- PulsarAddress string + // --- Rocksmq --- + RocksmqPath string + FlushStreamPosSubPath string StatsStreamPosSubPath string @@ -75,6 +78,7 @@ func (p *ParamTable) Init() { p.initCollectionBinlogSubPath() p.initPulsarAddress() + p.initRocksmqPath() p.initSegmentMaxSize() p.initSegmentSealProportion() @@ -107,6 +111,14 @@ func (p *ParamTable) initPulsarAddress() { p.PulsarAddress = addr } +func (p *ParamTable) initRocksmqPath() { + path, err := p.Load("_RocksmqPath") + if err != nil { + panic(err) + } + p.RocksmqPath = path +} + func (p *ParamTable) initMetaRootPath() { rootPath, err := p.Load("etcd.rootPath") if err != nil { diff --git a/internal/datanode/param_table.go b/internal/datanode/param_table.go index c264af8955..caf296c7ff 100644 --- a/internal/datanode/param_table.go +++ b/internal/datanode/param_table.go @@ -40,6 +40,9 @@ type ParamTable struct { // --- Pulsar --- PulsarAddress string + // --- Rocksmq --- + RocksmqPath string + // - seg statistics channel - SegmentStatisticsChannelName string @@ -88,6 +91,8 @@ func (p *ParamTable) Init() { // --- Pulsar --- p.initPulsarAddress() + p.initRocksmqPath() + // - seg statistics channel - p.initSegmentStatisticsChannelName() @@ -148,6 +153,14 @@ func (p *ParamTable) initPulsarAddress() { p.PulsarAddress = url } +func (p *ParamTable) initRocksmqPath() { + path, err := p.Load("_RocksmqPath") + if err != nil { + panic(err) + } + p.RocksmqPath = path +} + func (p *ParamTable) initSegmentStatisticsChannelName() { path, err := p.Load("msgChannel.chanNamePrefix.dataCoordStatistic") diff --git a/internal/msgstream/mq_factory.go b/internal/msgstream/mq_factory.go index 3ff2b92805..af3063c73e 100644 --- a/internal/msgstream/mq_factory.go +++ b/internal/msgstream/mq_factory.go @@ -15,6 +15,7 @@ import ( "context" "github.com/apache/pulsar-client-go/pulsar" + "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/util/mqclient" "github.com/milvus-io/milvus/internal/util/rocksmq/client/rocksmq" rocksmqserver "github.com/milvus-io/milvus/internal/util/rocksmq/server/rocksmq" @@ -103,13 +104,14 @@ func (f *RmsFactory) NewQueryMsgStream(ctx context.Context) (MsgStream, error) { return NewMemMsgStream(ctx, f.ReceiveBufSize) } -func NewRmsFactory() Factory { +func NewRmsFactory(rocksmqPath string) Factory { f := &RmsFactory{ dispatcherFactory: ProtoUDFactory{}, ReceiveBufSize: 1024, RmqBufSize: 1024, } - rocksmqserver.InitRocksMQ("/tmp/milvus_rdb") + log.Debug("RocksmqPath=" + rocksmqPath) + rocksmqserver.InitRocksMQ(rocksmqPath) return f } diff --git a/internal/proxy/paramtable.go b/internal/proxy/paramtable.go index e823175c0a..d285ab357a 100644 --- a/internal/proxy/paramtable.go +++ b/internal/proxy/paramtable.go @@ -40,6 +40,7 @@ type ParamTable struct { MetaRootPath string RootCoordAddress string PulsarAddress string + RocksmqPath string ProxyID UniqueID TimeTickInterval time.Duration @@ -78,6 +79,7 @@ func (pt *ParamTable) initParams() { pt.initEtcdEndpoints() pt.initMetaRootPath() pt.initPulsarAddress() + pt.initRocksmqPath() pt.initTimeTickInterval() pt.initProxySubName() pt.initProxyTimeTickChannelNames() @@ -104,6 +106,14 @@ func (pt *ParamTable) initPulsarAddress() { pt.PulsarAddress = ret } +func (pt *ParamTable) initRocksmqPath() { + path, err := pt.Load("_RocksmqPath") + if err != nil { + panic(err) + } + pt.RocksmqPath = path +} + func (pt *ParamTable) initTimeTickInterval() { intervalStr, err := pt.Load("proxy.timeTickInterval") if err != nil { diff --git a/internal/querynode/param_table.go b/internal/querynode/param_table.go index 6b0a9529e4..10985125fc 100644 --- a/internal/querynode/param_table.go +++ b/internal/querynode/param_table.go @@ -26,6 +26,7 @@ type ParamTable struct { paramtable.BaseTable PulsarAddress string + RocksmqPath string EtcdEndpoints []string MetaRootPath string @@ -94,6 +95,7 @@ func (p *ParamTable) Init() { p.initMinioBucketName() p.initPulsarAddress() + p.initRocksmqPath() p.initEtcdEndpoints() p.initMetaRootPath() @@ -175,6 +177,14 @@ func (p *ParamTable) initPulsarAddress() { p.PulsarAddress = url } +func (p *ParamTable) initRocksmqPath() { + path, err := p.Load("_RocksmqPath") + if err != nil { + panic(err) + } + p.RocksmqPath = path +} + // advanced params // stats func (p *ParamTable) initStatsPublishInterval() { diff --git a/internal/rootcoord/param_table.go b/internal/rootcoord/param_table.go index a0791043c3..ff3b0f15ef 100644 --- a/internal/rootcoord/param_table.go +++ b/internal/rootcoord/param_table.go @@ -30,6 +30,7 @@ type ParamTable struct { Port int PulsarAddress string + RocksmqPath string EtcdEndpoints []string MetaRootPath string KvRootPath string @@ -61,6 +62,7 @@ func (p *ParamTable) Init() { } p.initPulsarAddress() + p.initRocksmqPath() p.initEtcdEndpoints() p.initMetaRootPath() p.initKvRootPath() @@ -91,6 +93,14 @@ func (p *ParamTable) initPulsarAddress() { p.PulsarAddress = addr } +func (p *ParamTable) initRocksmqPath() { + path, err := p.Load("_RocksmqPath") + if err != nil { + panic(err) + } + p.RocksmqPath = path +} + func (p *ParamTable) initEtcdEndpoints() { endpoints, err := p.Load("_EtcdEndpoints") if err != nil { diff --git a/internal/util/paramtable/basetable.go b/internal/util/paramtable/basetable.go index b183b23500..7c55af6fab 100644 --- a/internal/util/paramtable/basetable.go +++ b/internal/util/paramtable/basetable.go @@ -119,6 +119,19 @@ func (gp *BaseTable) tryloadFromEnv() { panic(err) } + rocksmqPath := os.Getenv("ROCKSMQ_PATH") + if rocksmqPath == "" { + path, err := gp.Load("rocksmq.path") + if err != nil { + panic(err) + } + rocksmqPath = path + } + err = gp.Save("_RocksmqPath", rocksmqPath) + if err != nil { + panic(err) + } + rootCoordAddress := os.Getenv("ROOT_COORD_ADDRESS") if rootCoordAddress == "" { rootCoordHost, err := gp.Load("rootCoord.address")