diff --git a/cmd/roles/roles.go b/cmd/roles/roles.go index 63dd28f739..c9e0ef86bd 100644 --- a/cmd/roles/roles.go +++ b/cmd/roles/roles.go @@ -67,7 +67,10 @@ func init() { func newMsgFactory(localMsg bool) msgstream.Factory { if localMsg { - return msgstream.NewRmsFactory() + if Params.RocksmqEnable() { + return msgstream.NewRmsFactory() + } + return msgstream.NewPmsFactory() } return msgstream.NewPmsFactory() } diff --git a/docs/developer_guides/chap04_message_stream.md b/docs/developer_guides/chap04_message_stream.md index 7fdac8e920..32d65db5e6 100644 --- a/docs/developer_guides/chap04_message_stream.md +++ b/docs/developer_guides/chap04_message_stream.md @@ -213,7 +213,7 @@ type MsgStream interface { } type Factory interface { - SetParams(params map[string]interface{}) error + Init(params *paramtable.ComponentParam) error NewMsgStream(ctx context.Context) (MsgStream, error) NewTtMsgStream(ctx context.Context) (MsgStream, error) NewQueryMsgStream(ctx context.Context) (MsgStream, error) diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index bbf620e687..9896e338e8 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -259,11 +259,7 @@ func (s *Server) Init() error { // 4. set server state to Healthy func (s *Server) Start() error { var err error - m := map[string]interface{}{ - "PulsarAddress": Params.PulsarCfg.Address, - "ReceiveBufSize": 1024, - "PulsarBufSize": 1024} - err = s.msFactory.SetParams(m) + err = s.msFactory.Init(&Params) if err != nil { return err } diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 200337197f..d3611051ae 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -2320,12 +2320,7 @@ func newTestServer(t *testing.T, receiveCh chan interface{}, opts ...Option) *Se Params.CommonCfg.DataCoordTimeTick = Params.CommonCfg.DataCoordTimeTick + strconv.Itoa(rand.Int()) var err error factory := msgstream.NewPmsFactory() - m := map[string]interface{}{ - "pulsarAddress": Params.PulsarCfg.Address, - "receiveBufSize": 1024, - "pulsarBufSize": 1024, - } - err = factory.SetParams(m) + err = factory.Init(&Params) assert.Nil(t, err) etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index d13f2859e8..7354c5a87f 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -217,13 +217,7 @@ func (node *DataNode) Init() error { return err } - m := map[string]interface{}{ - "PulsarAddress": Params.PulsarCfg.Address, - "ReceiveBufSize": 1024, - "PulsarBufSize": 1024, - } - - if err := node.msFactory.SetParams(m); err != nil { + if err := node.msFactory.Init(&Params); err != nil { log.Warn("DataNode Init msFactory SetParams failed, use default", zap.Error(err)) return err diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index f94d4a37dd..69ead01bca 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -209,11 +209,7 @@ func TestDataNode(t *testing.T) { // pulsar produce msFactory := msgstream.NewPmsFactory() - m := map[string]interface{}{ - "pulsarAddress": Params.PulsarCfg.Address, - "receiveBufSize": 1024, - "pulsarBufSize": 1024} - err = msFactory.SetParams(m) + err = msFactory.Init(&Params) assert.NoError(t, err) insertStream, err := msFactory.NewMsgStream(node1.ctx) assert.NoError(t, err) diff --git a/internal/datanode/data_sync_service_test.go b/internal/datanode/data_sync_service_test.go index a66e09d253..ca0082a02e 100644 --- a/internal/datanode/data_sync_service_test.go +++ b/internal/datanode/data_sync_service_test.go @@ -181,7 +181,6 @@ func TestDataSyncService_Start(t *testing.T) { defer cancel() // init data node - pulsarURL := Params.PulsarCfg.Address Factory := &MetaFactory{} collMeta := Factory.GetCollectionMeta(UniqueID(0), "coll1") @@ -196,11 +195,7 @@ func TestDataSyncService_Start(t *testing.T) { allocFactory := NewAllocatorFactory(1) msFactory := msgstream.NewPmsFactory() - m := map[string]interface{}{ - "pulsarAddress": pulsarURL, - "receiveBufSize": 1024, - "pulsarBufSize": 1024} - err = msFactory.SetParams(m) + err = msFactory.Init(&Params) assert.Nil(t, err) insertChannelName := "data_sync_service_test_dml" diff --git a/internal/datanode/flow_graph_dmstream_input_node_test.go b/internal/datanode/flow_graph_dmstream_input_node_test.go index 340fc4747f..93eed80b0a 100644 --- a/internal/datanode/flow_graph_dmstream_input_node_test.go +++ b/internal/datanode/flow_graph_dmstream_input_node_test.go @@ -21,6 +21,8 @@ import ( "errors" "testing" + "github.com/milvus-io/milvus/internal/util/paramtable" + "github.com/milvus-io/milvus/internal/mq/msgstream" "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper" "github.com/milvus-io/milvus/internal/proto/internalpb" @@ -28,17 +30,16 @@ import ( ) type mockMsgStreamFactory struct { - SetParamsReturnNil bool + InitReturnNil bool NewMsgStreamNoError bool } var _ msgstream.Factory = &mockMsgStreamFactory{} -func (mm *mockMsgStreamFactory) SetParams(params map[string]interface{}) error { - if !mm.SetParamsReturnNil { - return errors.New("Set Params Error") +func (mm *mockMsgStreamFactory) Init(params *paramtable.ComponentParam) error { + if !mm.InitReturnNil { + return errors.New("Init Error") } - return nil } diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go index 3767a7867f..1c96363d58 100644 --- a/internal/datanode/flow_graph_insert_buffer_node_test.go +++ b/internal/datanode/flow_graph_insert_buffer_node_test.go @@ -78,11 +78,7 @@ func TestFlowGraphInsertBufferNodeCreate(t *testing.T) { require.NoError(t, err) msFactory := msgstream.NewPmsFactory() - m := map[string]interface{}{ - "receiveBufSize": 1024, - "pulsarAddress": Params.PulsarCfg.Address, - "pulsarBufSize": 1024} - err = msFactory.SetParams(m) + err = msFactory.Init(&Params) assert.Nil(t, err) fm := NewRendezvousFlushManager(&allocator{}, cm, replica, func(*segmentFlushPack) {}, emptyFlushAndDropFunc) @@ -168,11 +164,7 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) { require.NoError(t, err) msFactory := msgstream.NewPmsFactory() - m := map[string]interface{}{ - "receiveBufSize": 1024, - "pulsarAddress": Params.PulsarCfg.Address, - "pulsarBufSize": 1024} - err = msFactory.SetParams(m) + err = msFactory.Init(&Params) assert.Nil(t, err) fm := NewRendezvousFlushManager(NewAllocatorFactory(), cm, replica, func(*segmentFlushPack) {}, emptyFlushAndDropFunc) @@ -372,11 +364,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) { colRep.metaService = newMetaService(mockRootCoord, collMeta.ID) msFactory := msgstream.NewPmsFactory() - m := map[string]interface{}{ - "receiveBufSize": 1024, - "pulsarAddress": Params.PulsarCfg.Address, - "pulsarBufSize": 1024} - err = msFactory.SetParams(m) + err = msFactory.Init(&Params) assert.Nil(t, err) flushPacks := []*segmentFlushPack{} @@ -649,11 +637,7 @@ func TestInsertBufferNode_bufferInsertMsg(t *testing.T) { require.NoError(t, err) msFactory := msgstream.NewPmsFactory() - m := map[string]interface{}{ - "receiveBufSize": 1024, - "pulsarAddress": Params.PulsarCfg.Address, - "pulsarBufSize": 1024} - err = msFactory.SetParams(m) + err = msFactory.Init(&Params) assert.Nil(t, err) fm := NewRendezvousFlushManager(&allocator{}, cm, replica, func(*segmentFlushPack) {}, emptyFlushAndDropFunc) diff --git a/internal/mq/msgstream/mq_factory.go b/internal/mq/msgstream/mq_factory.go index f0e34b2086..86791a469e 100644 --- a/internal/mq/msgstream/mq_factory.go +++ b/internal/mq/msgstream/mq_factory.go @@ -19,12 +19,13 @@ package msgstream import ( "context" + "github.com/milvus-io/milvus/internal/util/paramtable" + rmqimplserver "github.com/milvus-io/milvus/internal/mq/mqimpl/rocksmq/server" "github.com/apache/pulsar-client-go/pulsar" puslarmqwrapper "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper/pulsar" rmqwrapper "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper/rmq" - "github.com/mitchellh/mapstructure" ) // PmsFactory is a pulsar msgstream factory that implemented Factory interface(msgstream.go) @@ -36,12 +37,11 @@ type PmsFactory struct { PulsarBufSize int64 } -// SetParams is used to set parameters for PmsFactory -func (f *PmsFactory) SetParams(params map[string]interface{}) error { - err := mapstructure.Decode(params, f) - if err != nil { - return err - } +// Init is used to set parameters for PmsFactory +func (f *PmsFactory) Init(params *paramtable.ComponentParam) error { + f.PulsarBufSize = 1024 + f.ReceiveBufSize = 1024 + f.PulsarAddress = params.PulsarCfg.Address return nil } @@ -86,12 +86,10 @@ type RmsFactory struct { RmqBufSize int64 } -// SetParams is used to set parameters for RmsFactory -func (f *RmsFactory) SetParams(params map[string]interface{}) error { - err := mapstructure.Decode(params, f) - if err != nil { - return err - } +// Init is used to set parameters for RmsFactory +func (f *RmsFactory) Init(params *paramtable.ComponentParam) error { + f.RmqBufSize = 1024 + f.ReceiveBufSize = 1024 return nil } diff --git a/internal/mq/msgstream/mq_factory_test.go b/internal/mq/msgstream/mq_factory_test.go index 1ade7b2a8f..1022dab7f2 100644 --- a/internal/mq/msgstream/mq_factory_test.go +++ b/internal/mq/msgstream/mq_factory_test.go @@ -27,13 +27,7 @@ import ( func TestPmsFactory(t *testing.T) { pmsFactory := NewPmsFactory() - pulsarAddress, _ := Params.Load("_PulsarAddress") - m := map[string]interface{}{ - "PulsarAddress": pulsarAddress, - "receiveBufSize": 1024, - "pulsarBufSize": 1024, - } - pmsFactory.SetParams(m) + pmsFactory.Init(&Params) ctx := context.Background() _, err := pmsFactory.NewMsgStream(ctx) @@ -46,17 +40,10 @@ func TestPmsFactory(t *testing.T) { assert.Nil(t, err) } -func TestPmsFactory_SetParams(t *testing.T) { - pmsFactory := (*PmsFactory)(nil) - - pulsarAddress, _ := Params.Load("_PulsarAddress") - m := map[string]interface{}{ - "PulsarAddress": pulsarAddress, - "receiveBufSize": 1024, - "pulsarBufSize": 1024, - } - err := pmsFactory.SetParams(m) - assert.NotNil(t, err) +func TestPmsFactory_Init(t *testing.T) { + rmsFactory := NewRmsFactory() + err := rmsFactory.Init(&Params) + assert.Nil(t, err) } func TestRmsFactory(t *testing.T) { @@ -65,11 +52,7 @@ func TestRmsFactory(t *testing.T) { rmsFactory := NewRmsFactory() - m := map[string]interface{}{ - "ReceiveBufSize": 1024, - "RmqBufSize": 1024, - } - rmsFactory.SetParams(m) + rmsFactory.Init(&Params) ctx := context.Background() _, err := rmsFactory.NewMsgStream(ctx) @@ -82,13 +65,8 @@ func TestRmsFactory(t *testing.T) { assert.Nil(t, err) } -func TestRmsFactory_SetParams(t *testing.T) { - rmsFactory := (*RmsFactory)(nil) - - m := map[string]interface{}{ - "ReceiveBufSize": 1024, - "RmqBufSize": 1024, - } - err := rmsFactory.SetParams(m) - assert.NotNil(t, err) +func TestRmsFactory_Init(t *testing.T) { + rmsFactory := NewRmsFactory() + err := rmsFactory.Init(&Params) + assert.Nil(t, err) } diff --git a/internal/mq/msgstream/mq_msgstream_test.go b/internal/mq/msgstream/mq_msgstream_test.go index 725356aec8..5bc5616a2a 100644 --- a/internal/mq/msgstream/mq_msgstream_test.go +++ b/internal/mq/msgstream/mq_msgstream_test.go @@ -48,7 +48,7 @@ import ( "github.com/milvus-io/milvus/internal/util/paramtable" ) -var Params paramtable.BaseTable +var Params paramtable.ComponentParam func TestMain(m *testing.M) { Params.Init() diff --git a/internal/mq/msgstream/msgstream.go b/internal/mq/msgstream/msgstream.go index 985114f8f7..19bfc1b0ce 100644 --- a/internal/mq/msgstream/msgstream.go +++ b/internal/mq/msgstream/msgstream.go @@ -19,6 +19,8 @@ package msgstream import ( "context" + "github.com/milvus-io/milvus/internal/util/paramtable" + "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/util/typeutil" @@ -75,7 +77,7 @@ type MsgStream interface { // Factory is an interface that can be used to generate a new msgstream object type Factory interface { - SetParams(params map[string]interface{}) error + Init(params *paramtable.ComponentParam) error NewMsgStream(ctx context.Context) (MsgStream, error) NewTtMsgStream(ctx context.Context) (MsgStream, error) NewQueryMsgStream(ctx context.Context) (MsgStream, error) diff --git a/internal/proxy/mock_test.go b/internal/proxy/mock_test.go index 63c03263dc..7047e833ed 100644 --- a/internal/proxy/mock_test.go +++ b/internal/proxy/mock_test.go @@ -22,6 +22,8 @@ import ( "sync" "time" + "github.com/milvus-io/milvus/internal/util/paramtable" + "github.com/milvus-io/milvus/internal/mq/msgstream" "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper" "github.com/milvus-io/milvus/internal/proto/commonpb" @@ -373,7 +375,7 @@ func newSimpleMockMsgStream() *simpleMockMsgStream { type simpleMockMsgStreamFactory struct { } -func (factory *simpleMockMsgStreamFactory) SetParams(params map[string]interface{}) error { +func (factory *simpleMockMsgStreamFactory) Init(param *paramtable.ComponentParam) error { return nil } diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index aa91a773c7..05699d1e01 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -182,18 +182,15 @@ func (node *Proxy) Init() error { log.Debug("create query channel for Proxy done", zap.String("QueryResultChannel", resp.QueryResultChannel)) } - m := map[string]interface{}{ - "PulsarAddress": Params.PulsarCfg.Address, - "PulsarBufSize": 1024} - log.Debug("set parameters for ms factory", zap.String("role", typeutil.ProxyRole), zap.Any("parameters", m)) - if err := node.msFactory.SetParams(m); err != nil { + log.Debug("set parameters for ms factory", zap.String("role", typeutil.ProxyRole), zap.Any("parameters", Params.ServiceParam)) + if err := node.msFactory.Init(&Params); err != nil { log.Warn("failed to set parameters for ms factory", zap.Error(err), zap.String("role", typeutil.ProxyRole), - zap.Any("parameters", m)) + zap.Any("parameters", Params.ServiceParam)) return err } - log.Debug("set parameters for ms factory done", zap.String("role", typeutil.ProxyRole), zap.Any("parameters", m)) + log.Debug("set parameters for ms factory done", zap.String("role", typeutil.ProxyRole), zap.Any("parameters", Params.ServiceParam)) log.Debug("create id allocator", zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", Params.ProxyCfg.ProxyID)) idAllocator, err := allocator.NewIDAllocator(node.ctx, node.rootCoord, Params.ProxyCfg.ProxyID) diff --git a/internal/querycoord/channel_unsubscribe_test.go b/internal/querycoord/channel_unsubscribe_test.go index ef90821141..5a0d0b8ea1 100644 --- a/internal/querycoord/channel_unsubscribe_test.go +++ b/internal/querycoord/channel_unsubscribe_test.go @@ -96,11 +96,7 @@ func Test_HandleChannelUnsubscribeLoop(t *testing.T) { defer etcdCli.Close() kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) factory := msgstream.NewPmsFactory() - m := map[string]interface{}{ - "PulsarAddress": Params.PulsarCfg.Address, - "ReceiveBufSize": 1024, - "PulsarBufSize": 1024} - factory.SetParams(m) + factory.Init(&Params) handler, err := newChannelUnsubscribeHandler(baseCtx, kv, factory) assert.Nil(t, err) diff --git a/internal/querycoord/cluster_test.go b/internal/querycoord/cluster_test.go index 659cd8ed36..36c0fbf0de 100644 --- a/internal/querycoord/cluster_test.go +++ b/internal/querycoord/cluster_test.go @@ -457,11 +457,7 @@ func TestGrpcRequest(t *testing.T) { clusterSession.Init(typeutil.QueryCoordRole, Params.QueryCoordCfg.Address, true, false) clusterSession.Register() factory := msgstream.NewPmsFactory() - m := map[string]interface{}{ - "PulsarAddress": Params.PulsarCfg.Address, - "ReceiveBufSize": 1024, - "PulsarBufSize": 1024} - err = factory.SetParams(m) + err = factory.Init(&Params) assert.Nil(t, err) idAllocator := func() (UniqueID, error) { return 0, nil @@ -652,11 +648,7 @@ func TestSetNodeState(t *testing.T) { clusterSession.Init(typeutil.QueryCoordRole, Params.QueryCoordCfg.Address, true, false) clusterSession.Register() factory := msgstream.NewPmsFactory() - m := map[string]interface{}{ - "PulsarAddress": Params.PulsarCfg.Address, - "ReceiveBufSize": 1024, - "PulsarBufSize": 1024} - err = factory.SetParams(m) + err = factory.Init(&Params) assert.Nil(t, err) idAllocator := func() (UniqueID, error) { return 0, nil diff --git a/internal/querycoord/query_coord.go b/internal/querycoord/query_coord.go index c6421cb33b..a4fc71e81d 100644 --- a/internal/querycoord/query_coord.go +++ b/internal/querycoord/query_coord.go @@ -222,11 +222,7 @@ func (qc *QueryCoord) Init() error { // Start function starts the goroutines to watch the meta and node updates func (qc *QueryCoord) Start() error { - m := map[string]interface{}{ - "PulsarAddress": Params.PulsarCfg.Address, - "ReceiveBufSize": 1024, - "PulsarBufSize": 1024} - err := qc.msFactory.SetParams(m) + err := qc.msFactory.Init(&Params) if err != nil { return err } diff --git a/internal/querynode/mock_test.go b/internal/querynode/mock_test.go index eb1cc4c6a4..4316fc682f 100644 --- a/internal/querynode/mock_test.go +++ b/internal/querynode/mock_test.go @@ -24,6 +24,8 @@ import ( "math/rand" "strconv" + "github.com/milvus-io/milvus/internal/util/paramtable" + "github.com/milvus-io/milvus/internal/util/indexcgowrapper" "github.com/golang/protobuf/proto" @@ -623,13 +625,8 @@ func genEtcdKV() (*etcdkv.EtcdKV, error) { func genFactory() (msgstream.Factory, error) { const receiveBufSize = 1024 - pulsarURL := Params.PulsarCfg.Address msFactory := msgstream.NewPmsFactory() - m := map[string]interface{}{ - "receiveBufSize": receiveBufSize, - "pulsarAddress": pulsarURL, - "pulsarBufSize": 1024} - err := msFactory.SetParams(m) + err := msFactory.Init(&Params) if err != nil { return nil, err } @@ -640,11 +637,7 @@ func genInvalidFactory() (msgstream.Factory, error) { const receiveBufSize = 1024 msFactory := msgstream.NewPmsFactory() - m := map[string]interface{}{ - "receiveBufSize": receiveBufSize, - "pulsarAddress": "", - "pulsarBufSize": 1024} - err := msFactory.SetParams(m) + err := msFactory.Init(&Params) if err != nil { return nil, err } @@ -1835,7 +1828,7 @@ type mockMsgStreamFactory struct { var _ msgstream.Factory = &mockMsgStreamFactory{} -func (mm *mockMsgStreamFactory) SetParams(params map[string]interface{}) error { +func (mm *mockMsgStreamFactory) Init(params *paramtable.ComponentParam) error { return nil } diff --git a/internal/querynode/query_collection_test.go b/internal/querynode/query_collection_test.go index 45cf4bbb33..ca7a824896 100644 --- a/internal/querynode/query_collection_test.go +++ b/internal/querynode/query_collection_test.go @@ -125,12 +125,8 @@ func updateTSafe(queryCollection *queryCollection, timestamp Timestamp) error { func TestQueryCollection_withoutVChannel(t *testing.T) { ctx := context.Background() - m := map[string]interface{}{ - "PulsarAddress": Params.PulsarCfg.Address, - "ReceiveBufSize": 1024, - "PulsarBufSize": 1024} factory := msgstream.NewPmsFactory() - err := factory.SetParams(m) + err := factory.Init(&Params) assert.Nil(t, err) etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.Nil(t, err) diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index 9d23350880..27c9d46d32 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -331,12 +331,7 @@ func (node *QueryNode) Init() error { // Start mainly start QueryNode's query service. func (node *QueryNode) Start() error { - var err error - m := map[string]interface{}{ - "PulsarAddress": Params.PulsarCfg.Address, - "ReceiveBufSize": 1024, - "PulsarBufSize": 1024} - err = node.msFactory.SetParams(m) + err := node.msFactory.Init(&Params) if err != nil { return err } diff --git a/internal/querynode/query_node_test.go b/internal/querynode/query_node_test.go index e638f69d79..72bfafa256 100644 --- a/internal/querynode/query_node_test.go +++ b/internal/querynode/query_node_test.go @@ -227,13 +227,8 @@ func makeNewChannelNames(names []string, suffix string) []string { func newMessageStreamFactory() (msgstream.Factory, error) { const receiveBufSize = 1024 - pulsarURL := Params.PulsarCfg.Address msFactory := msgstream.NewPmsFactory() - m := map[string]interface{}{ - "receiveBufSize": receiveBufSize, - "pulsarAddress": pulsarURL, - "pulsarBufSize": 1024} - err := msFactory.SetParams(m) + err := msFactory.Init(&Params) return msFactory, err } diff --git a/internal/querynode/stats_service_test.go b/internal/querynode/stats_service_test.go index c1c458c8d1..9ac1aeaf71 100644 --- a/internal/querynode/stats_service_test.go +++ b/internal/querynode/stats_service_test.go @@ -30,11 +30,7 @@ func TestStatsService_start(t *testing.T) { initTestMeta(t, node, 0, 0) msFactory := msgstream.NewPmsFactory() - m := map[string]interface{}{ - "PulsarAddress": Params.PulsarCfg.Address, - "ReceiveBufSize": 1024, - "PulsarBufSize": 1024} - msFactory.SetParams(m) + msFactory.Init(&Params) node.statsService = newStatsService(node.queryNodeLoopCtx, node.historical.replica, msFactory) node.statsService.start() node.Stop() @@ -53,11 +49,7 @@ func TestSegmentManagement_sendSegmentStatistic(t *testing.T) { producerChannels := []string{Params.CommonCfg.QueryNodeStats} msFactory := msgstream.NewPmsFactory() - m := map[string]interface{}{ - "receiveBufSize": receiveBufSize, - "pulsarAddress": Params.PulsarCfg.Address, - "pulsarBufSize": 1024} - err = msFactory.SetParams(m) + err = msFactory.Init(&Params) assert.Nil(t, err) statsStream, err := msFactory.NewMsgStream(node.queryNodeLoopCtx) diff --git a/internal/rootcoord/dml_channels_test.go b/internal/rootcoord/dml_channels_test.go index 78b6884340..dde02fa513 100644 --- a/internal/rootcoord/dml_channels_test.go +++ b/internal/rootcoord/dml_channels_test.go @@ -41,11 +41,7 @@ func TestDmlChannels(t *testing.T) { factory := msgstream.NewPmsFactory() Params.Init() - m := map[string]interface{}{ - "pulsarAddress": Params.PulsarCfg.Address, - "receiveBufSize": 1024, - "pulsarBufSize": 1024} - err := factory.SetParams(m) + err := factory.Init(&Params) assert.Nil(t, err) dml := newDmlChannels(ctx, factory, dmlChanPrefix, totalDmlChannelNum) diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 42daa5b633..df127967c9 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -1055,11 +1055,7 @@ func (c *Core) Init() error { return tsoAllocator.GetLastSavedTime() } - m := map[string]interface{}{ - "PulsarAddress": Params.PulsarCfg.Address, - "ReceiveBufSize": 1024, - "PulsarBufSize": 1024} - if initError = c.msFactory.SetParams(m); initError != nil { + if initError = c.msFactory.Init(&Params); initError != nil { return } diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index a3e76869e7..7d3f75aab9 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -628,11 +628,7 @@ func TestRootCoord(t *testing.T) { tmpFactory := msgstream.NewPmsFactory() - m := map[string]interface{}{ - "pulsarAddress": Params.PulsarCfg.Address, - "receiveBufSize": 1024, - "pulsarBufSize": 1024} - err = tmpFactory.SetParams(m) + err = tmpFactory.Init(&Params) assert.Nil(t, err) timeTickStream, _ := tmpFactory.NewMsgStream(ctx) @@ -2420,11 +2416,7 @@ func TestRootCoord2(t *testing.T) { err = core.Register() assert.Nil(t, err) - m := map[string]interface{}{ - "receiveBufSize": 1024, - "pulsarAddress": Params.PulsarCfg.Address, - "pulsarBufSize": 1024} - err = msFactory.SetParams(m) + err = msFactory.Init(&Params) assert.Nil(t, err) timeTickStream, _ := msFactory.NewMsgStream(ctx) @@ -2708,11 +2700,7 @@ func TestCheckFlushedSegments(t *testing.T) { err = core.Register() assert.Nil(t, err) - m := map[string]interface{}{ - "receiveBufSize": 1024, - "pulsarAddress": Params.PulsarCfg.Address, - "pulsarBufSize": 1024} - err = msFactory.SetParams(m) + err = msFactory.Init(&Params) assert.Nil(t, err) timeTickStream, _ := msFactory.NewMsgStream(ctx) @@ -2875,11 +2863,7 @@ func TestRootCoord_CheckZeroShardsNum(t *testing.T) { err = core.Register() assert.Nil(t, err) - m := map[string]interface{}{ - "receiveBufSize": 1024, - "pulsarAddress": Params.PulsarCfg.Address, - "pulsarBufSize": 1024} - err = msFactory.SetParams(m) + err = msFactory.Init(&Params) assert.Nil(t, err) timeTickStream, _ := msFactory.NewMsgStream(ctx) diff --git a/internal/rootcoord/timestamp_test.go b/internal/rootcoord/timestamp_test.go index e5f7a728d9..454e1788f2 100644 --- a/internal/rootcoord/timestamp_test.go +++ b/internal/rootcoord/timestamp_test.go @@ -120,11 +120,7 @@ func BenchmarkAllocTimestamp(b *testing.B) { err = core.Start() assert.Nil(b, err) - m := map[string]interface{}{ - "receiveBufSize": 1024, - "pulsarAddress": Params.PulsarCfg.Address, - "pulsarBufSize": 1024} - err = msFactory.SetParams(m) + err = msFactory.Init(&Params) assert.Nil(b, err) b.ResetTimer() diff --git a/internal/rootcoord/timeticksync_test.go b/internal/rootcoord/timeticksync_test.go index 21da320ea6..c45674c38a 100644 --- a/internal/rootcoord/timeticksync_test.go +++ b/internal/rootcoord/timeticksync_test.go @@ -33,11 +33,7 @@ func TestTimetickSync(t *testing.T) { sourceID := int64(100) factory := msgstream.NewPmsFactory() - m := map[string]interface{}{ - "pulsarAddress": Params.PulsarCfg.Address, - "receiveBufSize": 1024, - "pulsarBufSize": 1024} - err := factory.SetParams(m) + err := factory.Init(&Params) assert.Nil(t, err) //chanMap := map[typeutil.UniqueID][]string{ diff --git a/internal/util/flowgraph/input_node_test.go b/internal/util/flowgraph/input_node_test.go index f1747af3a9..706d2f5ae7 100644 --- a/internal/util/flowgraph/input_node_test.go +++ b/internal/util/flowgraph/input_node_test.go @@ -21,6 +21,8 @@ import ( "os" "testing" + "github.com/milvus-io/milvus/internal/util/paramtable" + "github.com/milvus-io/milvus/internal/mq/msgstream" "github.com/stretchr/testify/assert" ) @@ -28,8 +30,8 @@ import ( func TestInputNode(t *testing.T) { os.Setenv("ROCKSMQ_PATH", "/tmp/MilvusTest/FlowGraph/TestInputNode") msFactory := msgstream.NewRmsFactory() - m := map[string]interface{}{} - err := msFactory.SetParams(m) + var Params paramtable.ComponentParam + err := msFactory.Init(&Params) assert.Nil(t, err) msgStream, _ := msFactory.NewMsgStream(context.TODO()) diff --git a/internal/util/flowgraph/node_test.go b/internal/util/flowgraph/node_test.go index 4984c9d3ba..6bd290b146 100644 --- a/internal/util/flowgraph/node_test.go +++ b/internal/util/flowgraph/node_test.go @@ -24,6 +24,8 @@ import ( "testing" "time" + "github.com/milvus-io/milvus/internal/util/paramtable" + "github.com/milvus-io/milvus/internal/mq/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/internalpb" @@ -57,8 +59,8 @@ func generateMsgPack() msgstream.MsgPack { func TestNodeCtx_Start(t *testing.T) { os.Setenv("ROCKSMQ_PATH", "/tmp/MilvusTest/FlowGraph/TestNodeStart") msFactory := msgstream.NewRmsFactory() - m := map[string]interface{}{} - err := msFactory.SetParams(m) + var Params paramtable.ComponentParam + err := msFactory.Init(&Params) assert.Nil(t, err) msgStream, _ := msFactory.NewMsgStream(context.TODO()) diff --git a/internal/util/paramtable/component_param.go b/internal/util/paramtable/component_param.go index 48bd4d3762..fdc4a44ca7 100644 --- a/internal/util/paramtable/component_param.go +++ b/internal/util/paramtable/component_param.go @@ -75,6 +75,14 @@ func (p *ComponentParam) SetLogConfig(role string) { p.BaseTable.SetLogConfig() } +func (p *ComponentParam) RocksmqEnable() bool { + return p.RocksmqCfg.Path != "" +} + +func (p *ComponentParam) PulsarEnable() bool { + return p.PulsarCfg.Address != "" +} + /////////////////////////////////////////////////////////////////////////////// // --- common --- type commonConfig struct {