From f1ea9613ae198b7abfcae1d0a504e3425c84126a Mon Sep 17 00:00:00 2001 From: Cai Yudong Date: Mon, 7 Feb 2022 10:09:45 +0800 Subject: [PATCH] Add EtcdConfig and move PulsarConfig/RocksdbConfig/MinioConfig into BaseParamTable (#15434) Signed-off-by: yudong.cai --- cmd/roles/roles.go | 4 +- configs/milvus.yaml | 1 - internal/allocator/global_id_test.go | 2 +- internal/datacoord/server.go | 8 +- internal/datacoord/server_test.go | 4 +- internal/datanode/data_node.go | 6 +- internal/datanode/data_node_test.go | 6 +- .../datanode/flow_graph_delete_node_test.go | 6 +- .../flow_graph_insert_buffer_node_test.go | 8 +- internal/datanode/flow_graph_manager_test.go | 2 +- internal/datanode/mock_test.go | 2 +- .../datacoord/client/client_test.go | 4 +- internal/distributed/datacoord/service.go | 2 +- internal/distributed/datanode/service.go | 6 +- .../indexcoord/client/client_test.go | 4 +- internal/distributed/indexcoord/service.go | 2 +- .../distributed/indexcoord/service_test.go | 2 +- .../indexnode/client/client_test.go | 2 +- internal/distributed/indexnode/service.go | 2 +- .../distributed/indexnode/service_test.go | 2 +- internal/distributed/proxy/service.go | 10 +- .../querycoord/client/client_test.go | 4 +- internal/distributed/querycoord/service.go | 8 +- internal/distributed/querynode/service.go | 6 +- .../rootcoord/client/client_test.go | 4 +- internal/distributed/rootcoord/service.go | 8 +- .../distributed/rootcoord/service_test.go | 16 +- internal/indexcoord/index_coord.go | 8 +- internal/indexcoord/index_coord_mock.go | 4 +- internal/indexcoord/index_coord_mock_test.go | 2 +- internal/indexcoord/index_coord_test.go | 2 +- internal/indexcoord/meta_table_test.go | 8 +- internal/indexcoord/metrics_info_test.go | 2 +- internal/indexnode/indexnode.go | 6 +- internal/indexnode/indexnode_mock.go | 4 +- internal/indexnode/indexnode_mock_test.go | 4 +- internal/indexnode/indexnode_test.go | 6 +- internal/kv/etcd/embed_etcd_config_test.go | 4 +- internal/kv/etcd/embed_etcd_kv_test.go | 16 +- internal/kv/etcd/embed_etcd_restart_test.go | 6 +- internal/kv/etcd/etcd_kv_test.go | 2 +- internal/kv/etcd/metakv_factory.go | 12 +- internal/proxy/proxy.go | 4 +- internal/proxy/proxy_test.go | 26 +- internal/querycoord/channel_allocator_test.go | 6 +- .../querycoord/channel_unsubscribe_test.go | 12 +- internal/querycoord/cluster_test.go | 22 +- internal/querycoord/index_checker_test.go | 18 +- internal/querycoord/meta_test.go | 12 +- .../querycoord/mock_querynode_server_test.go | 6 +- internal/querycoord/query_coord.go | 10 +- internal/querycoord/query_coord_test.go | 10 +- internal/querycoord/querynode_test.go | 20 +- internal/querycoord/segment_allocator_test.go | 6 +- internal/querycoord/task_scheduler_test.go | 8 +- internal/querynode/impl_test.go | 4 +- internal/querynode/metrics_info_test.go | 4 +- internal/querynode/mock_test.go | 12 +- internal/querynode/query_collection_test.go | 4 +- internal/querynode/query_node.go | 10 +- internal/querynode/query_node_test.go | 10 +- internal/querynode/query_service.go | 4 +- internal/rootcoord/meta_snapshot_test.go | 14 +- internal/rootcoord/meta_table_test.go | 6 +- .../rootcoord/proxy_client_manager_test.go | 8 +- internal/rootcoord/proxy_manager.go | 6 +- internal/rootcoord/proxy_manager_test.go | 4 +- internal/rootcoord/root_coord.go | 16 +- internal/rootcoord/root_coord_test.go | 50 ++-- internal/rootcoord/suffix_snapshot_test.go | 6 +- internal/rootcoord/timestamp_test.go | 4 +- internal/util/etcd/etcd_util.go | 16 +- internal/util/etcd/etcd_util_test.go | 14 +- internal/util/paramtable/base_param.go | 234 ++++++++++++++---- internal/util/paramtable/base_param_test.go | 67 +++-- internal/util/paramtable/global_param.go | 178 ++----------- internal/util/paramtable/global_param_test.go | 38 --- 77 files changed, 542 insertions(+), 544 deletions(-) diff --git a/cmd/roles/roles.go b/cmd/roles/roles.go index a2b16e7d6c..e5856e1aa4 100644 --- a/cmd/roles/roles.go +++ b/cmd/roles/roles.go @@ -364,9 +364,9 @@ func (mr *MilvusRoles) Run(local bool, alias string) { } defer stopRocksmq() - if Params.BaseParams.UseEmbedEtcd { + if Params.EtcdCfg.UseEmbedEtcd { // start etcd server - etcd.InitEtcdServer(&Params.BaseParams) + etcd.InitEtcdServer(&Params.EtcdCfg) defer etcd.StopEtcdServer() } } else { diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 157cf79d1a..832c941536 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -218,7 +218,6 @@ msgChannel: queryNodeStats: "query-node-stats" # Cmd for loadIndex, flush, etc... cmd: "cmd" - dataCoordInsertChannel: "insert-channel-" dataCoordStatistic: "datacoord-statistics-channel" dataCoordTimeTick: "datacoord-timetick-channel" dataCoordSegmentInfo: "segment-info-channel" diff --git a/internal/allocator/global_id_test.go b/internal/allocator/global_id_test.go index 602142b7dc..a5afa01d7d 100644 --- a/internal/allocator/global_id_test.go +++ b/internal/allocator/global_id_test.go @@ -31,7 +31,7 @@ var Params paramtable.GlobalParamTable func TestGlobalTSOAllocator_All(t *testing.T) { Params.Init() - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.NoError(t, err) defer etcdCli.Close() etcdKV := tsoutil.NewTSOKVBase(etcdCli, "/test/root/kv", "gidTest") diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 283d4b5a83..fdeb6783f2 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -230,13 +230,13 @@ func (s *Server) Register() error { } func (s *Server) initSession() error { - s.session = sessionutil.NewSession(s.ctx, Params.BaseParams.MetaRootPath, s.etcdCli) + s.session = sessionutil.NewSession(s.ctx, Params.EtcdCfg.MetaRootPath, s.etcdCli) if s.session == nil { return errors.New("failed to initialize session") } s.session.Init(typeutil.DataCoordRole, Params.DataCoordCfg.Address, true, true) Params.DataCoordCfg.NodeID = s.session.ServerID - Params.BaseParams.SetLogger(Params.DataCoordCfg.NodeID) + Params.SetLogger(Params.DataCoordCfg.NodeID) return nil } @@ -417,7 +417,7 @@ func (s *Server) startSegmentManager() { } func (s *Server) initMeta() error { - etcdKV := etcdkv.NewEtcdKV(s.etcdCli, Params.BaseParams.MetaRootPath) + etcdKV := etcdkv.NewEtcdKV(s.etcdCli, Params.EtcdCfg.MetaRootPath) s.kvClient = etcdKV reloadEtcdFn := func() error { var err error @@ -731,7 +731,7 @@ func (s *Server) handleFlushingSegments(ctx context.Context) { func (s *Server) initRootCoordClient() error { var err error - if s.rootCoordClient, err = s.rootCoordClientCreator(s.ctx, Params.BaseParams.MetaRootPath, s.etcdCli); err != nil { + if s.rootCoordClient, err = s.rootCoordClientCreator(s.ctx, Params.EtcdCfg.MetaRootPath, s.etcdCli); err != nil { return err } if err = s.rootCoordClient.Init(); err != nil { diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index f8768ca1f1..563150bc94 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -2243,9 +2243,9 @@ func newTestServer(t *testing.T, receiveCh chan interface{}, opts ...Option) *Se err = factory.SetParams(m) assert.Nil(t, err) - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.Nil(t, err) - sessKey := path.Join(Params.BaseParams.MetaRootPath, sessionutil.DefaultServiceRoot) + sessKey := path.Join(Params.EtcdCfg.MetaRootPath, sessionutil.DefaultServiceRoot) _, err = etcdCli.Delete(context.Background(), sessKey, clientv3.WithPrefix()) assert.Nil(t, err) diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 1b9e6fa290..ebc644d9ae 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -193,14 +193,14 @@ func (node *DataNode) Register() error { } func (node *DataNode) initSession() error { - node.session = sessionutil.NewSession(node.ctx, Params.BaseParams.MetaRootPath, node.etcdCli) + node.session = sessionutil.NewSession(node.ctx, Params.EtcdCfg.MetaRootPath, node.etcdCli) if node.session == nil { return errors.New("failed to initialize session") } node.session.Init(typeutil.DataNodeRole, Params.DataNodeCfg.IP+":"+strconv.Itoa(Params.DataNodeCfg.Port), false, true) Params.DataNodeCfg.NodeID = node.session.ServerID node.NodeID = node.session.ServerID - Params.BaseParams.SetLogger(Params.DataNodeCfg.NodeID) + Params.SetLogger(Params.DataNodeCfg.NodeID) return nil } @@ -438,7 +438,7 @@ func (node *DataNode) Start() error { } connectEtcdFn := func() error { - etcdKV := etcdkv.NewEtcdKV(node.etcdCli, Params.BaseParams.MetaRootPath) + etcdKV := etcdkv.NewEtcdKV(node.etcdCli, Params.EtcdCfg.MetaRootPath) node.watchKv = etcdKV return nil } diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index ec3d376da1..9ec512894a 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -64,7 +64,7 @@ func TestDataNode(t *testing.T) { defer cancel() node := newIDLEDataNodeMock(ctx) - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.Nil(t, err) defer etcdCli.Close() node.SetEtcdClient(etcdCli) @@ -346,7 +346,7 @@ func TestDataNode(t *testing.T) { func TestWatchChannel(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) node := newIDLEDataNodeMock(ctx) - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.Nil(t, err) defer etcdCli.Close() node.SetEtcdClient(etcdCli) @@ -361,7 +361,7 @@ func TestWatchChannel(t *testing.T) { t.Run("test watch channel", func(t *testing.T) { // GOOSE TODO - kv := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath) + kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) oldInvalidCh := "datanode-etcd-test-by-dev-rootcoord-dml-channel-invalid" path := fmt.Sprintf("%s/%d/%s", Params.DataNodeCfg.ChannelWatchSubPath, node.NodeID, oldInvalidCh) err = kv.Save(path, string([]byte{23})) diff --git a/internal/datanode/flow_graph_delete_node_test.go b/internal/datanode/flow_graph_delete_node_test.go index 51d7724a1b..42cd8bc627 100644 --- a/internal/datanode/flow_graph_delete_node_test.go +++ b/internal/datanode/flow_graph_delete_node_test.go @@ -246,7 +246,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { chanName := "datanode-test-FlowGraphDeletenode-operate" testPath := "/test/datanode/root/meta" assert.NoError(t, clearEtcd(testPath)) - Params.BaseParams.MetaRootPath = testPath + Params.EtcdCfg.MetaRootPath = testPath Params.DataNodeCfg.DeleteBinlogRootPath = testPath c := &nodeConfig{ @@ -270,7 +270,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { chanName := "datanode-test-FlowGraphDeletenode-operate" testPath := "/test/datanode/root/meta" assert.NoError(t, clearEtcd(testPath)) - Params.BaseParams.MetaRootPath = testPath + Params.EtcdCfg.MetaRootPath = testPath Params.DataNodeCfg.DeleteBinlogRootPath = testPath c := &nodeConfig{ @@ -299,7 +299,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { chanName := "datanode-test-FlowGraphDeletenode-operate" testPath := "/test/datanode/root/meta" assert.NoError(t, clearEtcd(testPath)) - Params.BaseParams.MetaRootPath = testPath + Params.EtcdCfg.MetaRootPath = testPath Params.DataNodeCfg.DeleteBinlogRootPath = testPath c := &nodeConfig{ diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go index 0c55823e36..73c4d2dc50 100644 --- a/internal/datanode/flow_graph_insert_buffer_node_test.go +++ b/internal/datanode/flow_graph_insert_buffer_node_test.go @@ -61,7 +61,7 @@ func TestFlowGraphInsertBufferNodeCreate(t *testing.T) { testPath := "/test/datanode/root/meta" err := clearEtcd(testPath) require.NoError(t, err) - Params.BaseParams.MetaRootPath = testPath + Params.EtcdCfg.MetaRootPath = testPath Factory := &MetaFactory{} collMeta := Factory.GetCollectionMeta(UniqueID(0), "coll1") @@ -151,7 +151,7 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) { testPath := "/test/datanode/root/meta" err := clearEtcd(testPath) require.NoError(t, err) - Params.BaseParams.MetaRootPath = testPath + Params.EtcdCfg.MetaRootPath = testPath Factory := &MetaFactory{} collMeta := Factory.GetCollectionMeta(UniqueID(0), "coll1") @@ -352,7 +352,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) { testPath := "/test/datanode/root/meta" err := clearEtcd(testPath) require.NoError(t, err) - Params.BaseParams.MetaRootPath = testPath + Params.EtcdCfg.MetaRootPath = testPath Factory := &MetaFactory{} collMeta := Factory.GetCollectionMeta(UniqueID(0), "coll1") @@ -626,7 +626,7 @@ func TestInsertBufferNode_bufferInsertMsg(t *testing.T) { testPath := "/test/datanode/root/meta" err := clearEtcd(testPath) require.NoError(t, err) - Params.BaseParams.MetaRootPath = testPath + Params.EtcdCfg.MetaRootPath = testPath Factory := &MetaFactory{} collMeta := Factory.GetCollectionMeta(UniqueID(0), "coll1") diff --git a/internal/datanode/flow_graph_manager_test.go b/internal/datanode/flow_graph_manager_test.go index 778c81ca41..841280438c 100644 --- a/internal/datanode/flow_graph_manager_test.go +++ b/internal/datanode/flow_graph_manager_test.go @@ -32,7 +32,7 @@ func TestFlowGraphManager(t *testing.T) { ctx, cancel := context.WithCancel(context.TODO()) defer cancel() - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.Nil(t, err) defer etcdCli.Close() diff --git a/internal/datanode/mock_test.go b/internal/datanode/mock_test.go index b77e0cb665..baea8cf53b 100644 --- a/internal/datanode/mock_test.go +++ b/internal/datanode/mock_test.go @@ -107,7 +107,7 @@ func makeNewChannelNames(names []string, suffix string) []string { } func clearEtcd(rootPath string) error { - client, err := etcd.GetEtcdClient(&Params.BaseParams) + client, err := etcd.GetEtcdClient(&Params.EtcdCfg) if err != nil { return err } diff --git a/internal/distributed/datacoord/client/client_test.go b/internal/distributed/datacoord/client/client_test.go index 73f32de295..bdef90a161 100644 --- a/internal/distributed/datacoord/client/client_test.go +++ b/internal/distributed/datacoord/client/client_test.go @@ -32,9 +32,9 @@ func Test_NewClient(t *testing.T) { proxy.Params.InitOnce() ctx := context.Background() - etcdCli, err := etcd.GetEtcdClient(&proxy.Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&proxy.Params.EtcdCfg) assert.Nil(t, err) - client, err := NewClient(ctx, proxy.Params.BaseParams.MetaRootPath, etcdCli) + client, err := NewClient(ctx, proxy.Params.EtcdCfg.MetaRootPath, etcdCli) assert.Nil(t, err) assert.NotNil(t, client) diff --git a/internal/distributed/datacoord/service.go b/internal/distributed/datacoord/service.go index 449532b2d1..16641db867 100644 --- a/internal/distributed/datacoord/service.go +++ b/internal/distributed/datacoord/service.go @@ -89,7 +89,7 @@ func (s *Server) init() error { datacoord.Params.DataCoordCfg.Port = Params.Port datacoord.Params.DataCoordCfg.Address = Params.GetAddress() - etcdCli, err := etcd.GetEtcdClient(&datacoord.Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&datacoord.Params.EtcdCfg) if err != nil { log.Debug("DataCoord connect to etcd failed", zap.Error(err)) return err diff --git a/internal/distributed/datanode/service.go b/internal/distributed/datanode/service.go index 2578554fba..f0ed2275a7 100644 --- a/internal/distributed/datanode/service.go +++ b/internal/distributed/datanode/service.go @@ -225,7 +225,7 @@ func (s *Server) init() error { dn.Params.DataNodeCfg.Port = Params.Port dn.Params.DataNodeCfg.IP = Params.IP - etcdCli, err := etcd.GetEtcdClient(&dn.Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&dn.Params.EtcdCfg) if err != nil { log.Debug("DataNode connect to etcd failed", zap.Error(err)) return err @@ -245,7 +245,7 @@ func (s *Server) init() error { // --- RootCoord Client --- if s.newRootCoordClient != nil { log.Debug("Init root coord client ...") - rootCoordClient, err := s.newRootCoordClient(dn.Params.BaseParams.MetaRootPath, s.etcdCli) + rootCoordClient, err := s.newRootCoordClient(dn.Params.EtcdCfg.MetaRootPath, s.etcdCli) if err != nil { log.Debug("DataNode newRootCoordClient failed", zap.Error(err)) panic(err) @@ -272,7 +272,7 @@ func (s *Server) init() error { // --- Data Server Client --- if s.newDataCoordClient != nil { log.Debug("DataNode Init data service client ...") - dataCoordClient, err := s.newDataCoordClient(dn.Params.BaseParams.MetaRootPath, s.etcdCli) + dataCoordClient, err := s.newDataCoordClient(dn.Params.EtcdCfg.MetaRootPath, s.etcdCli) if err != nil { log.Debug("DataNode newDataCoordClient failed", zap.Error(err)) panic(err) diff --git a/internal/distributed/indexcoord/client/client_test.go b/internal/distributed/indexcoord/client/client_test.go index 819d58769b..b6c9e359ad 100644 --- a/internal/distributed/indexcoord/client/client_test.go +++ b/internal/distributed/indexcoord/client/client_test.go @@ -43,9 +43,9 @@ func TestIndexCoordClient(t *testing.T) { err = server.Run() assert.Nil(t, err) - etcdCli, err := etcd.GetEtcdClient(&indexcoord.Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&indexcoord.Params.EtcdCfg) assert.Nil(t, err) - icc, err := NewClient(ctx, indexcoord.Params.BaseParams.MetaRootPath, etcdCli) + icc, err := NewClient(ctx, indexcoord.Params.EtcdCfg.MetaRootPath, etcdCli) assert.Nil(t, err) assert.NotNil(t, icc) diff --git a/internal/distributed/indexcoord/service.go b/internal/distributed/indexcoord/service.go index 1b3f192db2..0e44ad4012 100644 --- a/internal/distributed/indexcoord/service.go +++ b/internal/distributed/indexcoord/service.go @@ -91,7 +91,7 @@ func (s *Server) init() error { closer := trace.InitTracing("IndexCoord") s.closer = closer - etcdCli, err := etcd.GetEtcdClient(&indexcoord.Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&indexcoord.Params.EtcdCfg) if err != nil { log.Debug("IndexCoord connect to etcd failed", zap.Error(err)) return err diff --git a/internal/distributed/indexcoord/service_test.go b/internal/distributed/indexcoord/service_test.go index 2d2cf7f0e7..35aec23c51 100644 --- a/internal/distributed/indexcoord/service_test.go +++ b/internal/distributed/indexcoord/service_test.go @@ -35,7 +35,7 @@ func TestIndexCoordinateServer(t *testing.T) { assert.Nil(t, err) assert.NotNil(t, server) Params.Init() - etcd, err := etcd.GetEtcdClient(&Params.BaseParamTable) + etcd, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.NoError(t, err) indexCoordClient := &indexcoord.Mock{} indexCoordClient.SetEtcdClient(etcd) diff --git a/internal/distributed/indexnode/client/client_test.go b/internal/distributed/indexnode/client/client_test.go index d47a91a7af..7204dc54fd 100644 --- a/internal/distributed/indexnode/client/client_test.go +++ b/internal/distributed/indexnode/client/client_test.go @@ -128,7 +128,7 @@ func TestIndexNodeClient(t *testing.T) { inm := &indexnode.Mock{} ParamsGlobal.InitOnce() - etcdCli, err := etcd.GetEtcdClient(&ParamsGlobal.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&ParamsGlobal.EtcdCfg) assert.NoError(t, err) inm.SetEtcdClient(etcdCli) err = ins.SetClient(inm) diff --git a/internal/distributed/indexnode/service.go b/internal/distributed/indexnode/service.go index c07c63df82..322b589381 100644 --- a/internal/distributed/indexnode/service.go +++ b/internal/distributed/indexnode/service.go @@ -149,7 +149,7 @@ func (s *Server) init() error { return err } - etcdCli, err := etcd.GetEtcdClient(&indexnode.Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&indexnode.Params.EtcdCfg) if err != nil { log.Debug("IndexNode connect to etcd failed", zap.Error(err)) return err diff --git a/internal/distributed/indexnode/service_test.go b/internal/distributed/indexnode/service_test.go index 0756284429..346776ca29 100644 --- a/internal/distributed/indexnode/service_test.go +++ b/internal/distributed/indexnode/service_test.go @@ -40,7 +40,7 @@ func TestIndexNodeServer(t *testing.T) { inm := &indexnode.Mock{} ParamsGlobal.InitOnce() - etcdCli, err := etcd.GetEtcdClient(&ParamsGlobal.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&ParamsGlobal.EtcdCfg) assert.NoError(t, err) inm.SetEtcdClient(etcdCli) err = server.SetClient(inm) diff --git a/internal/distributed/proxy/service.go b/internal/distributed/proxy/service.go index 95b30fdfa1..bbf599b695 100644 --- a/internal/distributed/proxy/service.go +++ b/internal/distributed/proxy/service.go @@ -174,7 +174,7 @@ func (s *Server) init() error { s.closer = closer log.Debug("init Proxy's tracer done", zap.String("service name", serviceName)) - etcdCli, err := etcd.GetEtcdClient(&proxy.Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&proxy.Params.EtcdCfg) if err != nil { log.Debug("Proxy connect to etcd failed", zap.Error(err)) return err @@ -193,7 +193,7 @@ func (s *Server) init() error { if s.rootCoordClient == nil { var err error log.Debug("create RootCoord client for Proxy") - s.rootCoordClient, err = rcc.NewClient(s.ctx, proxy.Params.BaseParams.MetaRootPath, etcdCli) + s.rootCoordClient, err = rcc.NewClient(s.ctx, proxy.Params.EtcdCfg.MetaRootPath, etcdCli) if err != nil { log.Warn("failed to create RootCoord client for Proxy", zap.Error(err)) return err @@ -222,7 +222,7 @@ func (s *Server) init() error { if s.dataCoordClient == nil { var err error log.Debug("create DataCoord client for Proxy") - s.dataCoordClient, err = dcc.NewClient(s.ctx, proxy.Params.BaseParams.MetaRootPath, etcdCli) + s.dataCoordClient, err = dcc.NewClient(s.ctx, proxy.Params.EtcdCfg.MetaRootPath, etcdCli) if err != nil { log.Warn("failed to create DataCoord client for Proxy", zap.Error(err)) return err @@ -251,7 +251,7 @@ func (s *Server) init() error { if s.indexCoordClient == nil { var err error log.Debug("create IndexCoord client for Proxy") - s.indexCoordClient, err = icc.NewClient(s.ctx, proxy.Params.BaseParams.MetaRootPath, etcdCli) + s.indexCoordClient, err = icc.NewClient(s.ctx, proxy.Params.EtcdCfg.MetaRootPath, etcdCli) if err != nil { log.Warn("failed to create IndexCoord client for Proxy", zap.Error(err)) return err @@ -280,7 +280,7 @@ func (s *Server) init() error { if s.queryCoordClient == nil { var err error log.Debug("create QueryCoord client for Proxy") - s.queryCoordClient, err = qcc.NewClient(s.ctx, proxy.Params.BaseParams.MetaRootPath, etcdCli) + s.queryCoordClient, err = qcc.NewClient(s.ctx, proxy.Params.EtcdCfg.MetaRootPath, etcdCli) if err != nil { log.Warn("failed to create QueryCoord client for Proxy", zap.Error(err)) return err diff --git a/internal/distributed/querycoord/client/client_test.go b/internal/distributed/querycoord/client/client_test.go index 1172945780..6ef8048fb5 100644 --- a/internal/distributed/querycoord/client/client_test.go +++ b/internal/distributed/querycoord/client/client_test.go @@ -34,9 +34,9 @@ func Test_NewClient(t *testing.T) { ctx := context.Background() - etcdCli, err := etcd.GetEtcdClient(&proxy.Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&proxy.Params.EtcdCfg) assert.NoError(t, err) - client, err := NewClient(ctx, proxy.Params.BaseParams.MetaRootPath, etcdCli) + client, err := NewClient(ctx, proxy.Params.EtcdCfg.MetaRootPath, etcdCli) assert.Nil(t, err) assert.NotNil(t, client) diff --git a/internal/distributed/querycoord/service.go b/internal/distributed/querycoord/service.go index ca15643beb..152aa9423e 100644 --- a/internal/distributed/querycoord/service.go +++ b/internal/distributed/querycoord/service.go @@ -115,7 +115,7 @@ func (s *Server) init() error { closer := trace.InitTracing("querycoord") s.closer = closer - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParamTable) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) if err != nil { log.Debug("QueryCoord connect to etcd failed", zap.Error(err)) return err @@ -133,7 +133,7 @@ func (s *Server) init() error { // --- Master Server Client --- if s.rootCoord == nil { - s.rootCoord, err = rcc.NewClient(s.loopCtx, qc.Params.BaseParams.MetaRootPath, s.etcdCli) + s.rootCoord, err = rcc.NewClient(s.loopCtx, qc.Params.EtcdCfg.MetaRootPath, s.etcdCli) if err != nil { log.Debug("QueryCoord try to new RootCoord client failed", zap.Error(err)) panic(err) @@ -164,7 +164,7 @@ func (s *Server) init() error { // --- Data service client --- if s.dataCoord == nil { - s.dataCoord, err = dcc.NewClient(s.loopCtx, qc.Params.BaseParams.MetaRootPath, s.etcdCli) + s.dataCoord, err = dcc.NewClient(s.loopCtx, qc.Params.EtcdCfg.MetaRootPath, s.etcdCli) if err != nil { log.Debug("QueryCoord try to new DataCoord client failed", zap.Error(err)) panic(err) @@ -192,7 +192,7 @@ func (s *Server) init() error { // --- IndexCoord --- if s.indexCoord == nil { - s.indexCoord, err = icc.NewClient(s.loopCtx, qc.Params.BaseParams.MetaRootPath, s.etcdCli) + s.indexCoord, err = icc.NewClient(s.loopCtx, qc.Params.EtcdCfg.MetaRootPath, s.etcdCli) if err != nil { log.Debug("QueryCoord try to new IndexCoord client failed", zap.Error(err)) panic(err) diff --git a/internal/distributed/querynode/service.go b/internal/distributed/querynode/service.go index 507c950f1f..b28955dd27 100644 --- a/internal/distributed/querynode/service.go +++ b/internal/distributed/querynode/service.go @@ -102,7 +102,7 @@ func (s *Server) init() error { log.Debug("QueryNode", zap.Int("port", Params.Port)) - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParamTable) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) if err != nil { log.Debug("QueryNode connect to etcd failed", zap.Error(err)) return err @@ -120,7 +120,7 @@ func (s *Server) init() error { // --- RootCoord Client --- if s.rootCoord == nil { - s.rootCoord, err = rcc.NewClient(s.ctx, qn.Params.BaseParams.MetaRootPath, s.etcdCli) + s.rootCoord, err = rcc.NewClient(s.ctx, qn.Params.EtcdCfg.MetaRootPath, s.etcdCli) if err != nil { log.Debug("QueryNode new RootCoordClient failed", zap.Error(err)) panic(err) @@ -150,7 +150,7 @@ func (s *Server) init() error { // --- IndexCoord --- if s.indexCoord == nil { - s.indexCoord, err = icc.NewClient(s.ctx, qn.Params.BaseParams.MetaRootPath, s.etcdCli) + s.indexCoord, err = icc.NewClient(s.ctx, qn.Params.EtcdCfg.MetaRootPath, s.etcdCli) if err != nil { log.Debug("QueryNode new IndexCoordClient failed", zap.Error(err)) panic(err) diff --git a/internal/distributed/rootcoord/client/client_test.go b/internal/distributed/rootcoord/client/client_test.go index 8a7b0430a0..222e1a95d4 100644 --- a/internal/distributed/rootcoord/client/client_test.go +++ b/internal/distributed/rootcoord/client/client_test.go @@ -33,9 +33,9 @@ func Test_NewClient(t *testing.T) { proxy.Params.InitOnce() ctx := context.Background() - etcdCli, err := etcd.GetEtcdClient(&proxy.Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&proxy.Params.EtcdCfg) assert.NoError(t, err) - client, err := NewClient(ctx, proxy.Params.BaseParams.MetaRootPath, etcdCli) + client, err := NewClient(ctx, proxy.Params.EtcdCfg.MetaRootPath, etcdCli) assert.Nil(t, err) assert.NotNil(t, client) diff --git a/internal/distributed/rootcoord/service.go b/internal/distributed/rootcoord/service.go index 9171386bf4..1bbdf16e70 100644 --- a/internal/distributed/rootcoord/service.go +++ b/internal/distributed/rootcoord/service.go @@ -158,7 +158,7 @@ func (s *Server) init() error { closer := trace.InitTracing("root_coord") s.closer = closer - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParamTable) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) if err != nil { log.Debug("RootCoord connect to etcd failed", zap.Error(err)) return err @@ -193,7 +193,7 @@ func (s *Server) init() error { if s.newDataCoordClient != nil { log.Debug("RootCoord start to create DataCoord client") - dataCoord := s.newDataCoordClient(rootcoord.Params.BaseParams.MetaRootPath, s.etcdCli) + dataCoord := s.newDataCoordClient(rootcoord.Params.EtcdCfg.MetaRootPath, s.etcdCli) if err := s.rootCoord.SetDataCoord(s.ctx, dataCoord); err != nil { panic(err) } @@ -201,7 +201,7 @@ func (s *Server) init() error { } if s.newIndexCoordClient != nil { log.Debug("RootCoord start to create IndexCoord client") - indexCoord := s.newIndexCoordClient(rootcoord.Params.BaseParams.MetaRootPath, s.etcdCli) + indexCoord := s.newIndexCoordClient(rootcoord.Params.EtcdCfg.MetaRootPath, s.etcdCli) if err := s.rootCoord.SetIndexCoord(indexCoord); err != nil { panic(err) } @@ -209,7 +209,7 @@ func (s *Server) init() error { } if s.newQueryCoordClient != nil { log.Debug("RootCoord start to create QueryCoord client") - queryCoord := s.newQueryCoordClient(rootcoord.Params.BaseParams.MetaRootPath, s.etcdCli) + queryCoord := s.newQueryCoordClient(rootcoord.Params.EtcdCfg.MetaRootPath, s.etcdCli) if err := s.rootCoord.SetQueryCoord(queryCoord); err != nil { panic(err) } diff --git a/internal/distributed/rootcoord/service_test.go b/internal/distributed/rootcoord/service_test.go index f23ec14ff5..48ac14dd15 100644 --- a/internal/distributed/rootcoord/service_test.go +++ b/internal/distributed/rootcoord/service_test.go @@ -80,8 +80,8 @@ func TestGrpcService(t *testing.T) { assert.Nil(t, err) rootcoord.Params.Init() - rootcoord.Params.BaseParams.MetaRootPath = fmt.Sprintf("/%d/test/meta", randVal) - rootcoord.Params.BaseParams.KvRootPath = fmt.Sprintf("/%d/test/kv", randVal) + rootcoord.Params.EtcdCfg.MetaRootPath = fmt.Sprintf("/%d/test/meta", randVal) + rootcoord.Params.EtcdCfg.KvRootPath = fmt.Sprintf("/%d/test/kv", randVal) rootcoord.Params.MsgChannelCfg.RootCoordSubName = fmt.Sprintf("msgChannel%d", randVal) rootcoord.Params.MsgChannelCfg.RootCoordTimeTick = fmt.Sprintf("timeTick%d", randVal) rootcoord.Params.MsgChannelCfg.RootCoordStatistics = fmt.Sprintf("stateChannel%d", randVal) @@ -99,9 +99,9 @@ func TestGrpcService(t *testing.T) { assert.Nil(t, err) svr.rootCoord.UpdateStateCode(internalpb.StateCode_Initializing) - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParamTable) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.Nil(t, err) - sessKey := path.Join(rootcoord.Params.BaseParams.MetaRootPath, sessionutil.DefaultServiceRoot) + sessKey := path.Join(rootcoord.Params.EtcdCfg.MetaRootPath, sessionutil.DefaultServiceRoot) _, err = etcdCli.Delete(ctx, sessKey, clientv3.WithPrefix()) assert.Nil(t, err) @@ -216,7 +216,7 @@ func TestGrpcService(t *testing.T) { svr.rootCoord.UpdateStateCode(internalpb.StateCode_Healthy) - cli, err := rcc.NewClient(context.Background(), rootcoord.Params.BaseParams.MetaRootPath, etcdCli) + cli, err := rcc.NewClient(context.Background(), rootcoord.Params.EtcdCfg.MetaRootPath, etcdCli) assert.Nil(t, err) err = cli.Init() @@ -915,11 +915,11 @@ func TestRun(t *testing.T) { rand.Seed(time.Now().UnixNano()) randVal := rand.Int() rootcoord.Params.Init() - rootcoord.Params.BaseParams.MetaRootPath = fmt.Sprintf("/%d/test/meta", randVal) + rootcoord.Params.EtcdCfg.MetaRootPath = fmt.Sprintf("/%d/test/meta", randVal) - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParamTable) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.Nil(t, err) - sessKey := path.Join(rootcoord.Params.BaseParams.MetaRootPath, sessionutil.DefaultServiceRoot) + sessKey := path.Join(rootcoord.Params.EtcdCfg.MetaRootPath, sessionutil.DefaultServiceRoot) _, err = etcdCli.Delete(ctx, sessKey, clientv3.WithPrefix()) assert.Nil(t, err) err = svr.Run() diff --git a/internal/indexcoord/index_coord.go b/internal/indexcoord/index_coord.go index 4a802a45cd..8825860772 100644 --- a/internal/indexcoord/index_coord.go +++ b/internal/indexcoord/index_coord.go @@ -139,12 +139,12 @@ func (i *IndexCoord) Register() error { } func (i *IndexCoord) initSession() error { - i.session = sessionutil.NewSession(i.loopCtx, Params.BaseParams.MetaRootPath, i.etcdCli) + i.session = sessionutil.NewSession(i.loopCtx, Params.EtcdCfg.MetaRootPath, i.etcdCli) if i.session == nil { return errors.New("failed to initialize session") } i.session.Init(typeutil.IndexCoordRole, Params.IndexCoordCfg.Address, true, true) - Params.BaseParams.SetLogger(i.session.ServerID) + Params.SetLogger(i.session.ServerID) return nil } @@ -164,7 +164,7 @@ func (i *IndexCoord) Init() error { } connectEtcdFn := func() error { - etcdKV := etcdkv.NewEtcdKV(i.etcdCli, Params.BaseParams.MetaRootPath) + etcdKV := etcdkv.NewEtcdKV(i.etcdCli, Params.EtcdCfg.MetaRootPath) metakv, err := NewMetaTable(etcdKV) if err != nil { return err @@ -207,7 +207,7 @@ func (i *IndexCoord) Init() error { } //init idAllocator - kvRootPath := Params.BaseParams.KvRootPath + kvRootPath := Params.EtcdCfg.KvRootPath etcdKV := tsoutil.NewTSOKVBase(i.etcdCli, kvRootPath, "index_gid") i.idAllocator = allocator.NewGlobalIDAllocator("idTimestamp", etcdKV) diff --git a/internal/indexcoord/index_coord_mock.go b/internal/indexcoord/index_coord_mock.go index f38045cb17..b6197bcb87 100644 --- a/internal/indexcoord/index_coord_mock.go +++ b/internal/indexcoord/index_coord_mock.go @@ -68,12 +68,12 @@ func (icm *Mock) Register() error { if icm.Failure { return errors.New("IndexCoordinate register failed") } - icm.etcdKV = etcdkv.NewEtcdKV(icm.etcdCli, Params.BaseParams.MetaRootPath) + icm.etcdKV = etcdkv.NewEtcdKV(icm.etcdCli, Params.EtcdCfg.MetaRootPath) err := icm.etcdKV.RemoveWithPrefix("session/" + typeutil.IndexCoordRole) if err != nil { return err } - session := sessionutil.NewSession(context.Background(), Params.BaseParams.MetaRootPath, icm.etcdCli) + session := sessionutil.NewSession(context.Background(), Params.EtcdCfg.MetaRootPath, icm.etcdCli) session.Init(typeutil.IndexCoordRole, Params.IndexCoordCfg.Address, true, false) session.Register() return err diff --git a/internal/indexcoord/index_coord_mock_test.go b/internal/indexcoord/index_coord_mock_test.go index 8b77951e3d..7a9953e65b 100644 --- a/internal/indexcoord/index_coord_mock_test.go +++ b/internal/indexcoord/index_coord_mock_test.go @@ -32,7 +32,7 @@ func TestIndexCoordMock(t *testing.T) { Params.Init() icm := Mock{} - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.Nil(t, err) icm.SetEtcdClient(etcdCli) defer etcdCli.Close() diff --git a/internal/indexcoord/index_coord_test.go b/internal/indexcoord/index_coord_test.go index 158993b684..eea740d32d 100644 --- a/internal/indexcoord/index_coord_test.go +++ b/internal/indexcoord/index_coord_test.go @@ -43,7 +43,7 @@ func TestIndexCoord(t *testing.T) { ctx := context.Background() inm0 := &indexnode.Mock{} Params.Init() - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.NoError(t, err) inm0.SetEtcdClient(etcdCli) err = inm0.Init() diff --git a/internal/indexcoord/meta_table_test.go b/internal/indexcoord/meta_table_test.go index f446698804..eaf26c2612 100644 --- a/internal/indexcoord/meta_table_test.go +++ b/internal/indexcoord/meta_table_test.go @@ -31,11 +31,11 @@ import ( func TestMetaTable(t *testing.T) { Params.Init() - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) defer etcdCli.Close() assert.NoError(t, err) assert.Nil(t, err) - etcdKV := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath) + etcdKV := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) req := &indexpb.BuildIndexRequest{ IndexBuildID: 1, @@ -314,11 +314,11 @@ func TestMetaTable(t *testing.T) { func TestMetaTable_Error(t *testing.T) { Params.Init() - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) defer etcdCli.Close() assert.NoError(t, err) - etcdKV := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath) + etcdKV := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) t.Run("reloadFromKV error", func(t *testing.T) { value := "indexMeta-1" diff --git a/internal/indexcoord/metrics_info_test.go b/internal/indexcoord/metrics_info_test.go index 373ff521c0..b6b58652e9 100644 --- a/internal/indexcoord/metrics_info_test.go +++ b/internal/indexcoord/metrics_info_test.go @@ -32,7 +32,7 @@ func TestGetSystemInfoMetrics(t *testing.T) { assert.Nil(t, err) Params.Init() - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) defer etcdCli.Close() assert.NoError(t, err) diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index 6b605410e5..7986643c13 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -146,13 +146,13 @@ func (i *IndexNode) initKnowhere() { } func (i *IndexNode) initSession() error { - i.session = sessionutil.NewSession(i.loopCtx, Params.BaseParams.MetaRootPath, i.etcdCli) + i.session = sessionutil.NewSession(i.loopCtx, Params.EtcdCfg.MetaRootPath, i.etcdCli) if i.session == nil { return errors.New("failed to initialize session") } i.session.Init(typeutil.IndexNodeRole, Params.IndexNodeCfg.IP+":"+strconv.Itoa(Params.IndexNodeCfg.Port), false, true) Params.IndexNodeCfg.NodeID = i.session.ServerID - Params.BaseParams.SetLogger(Params.IndexNodeCfg.NodeID) + Params.SetLogger(Params.IndexNodeCfg.NodeID) return nil } @@ -172,7 +172,7 @@ func (i *IndexNode) Init() error { } log.Debug("IndexNode init session successful", zap.Int64("serverID", i.session.ServerID)) - etcdKV := etcdkv.NewEtcdKV(i.etcdCli, Params.BaseParams.MetaRootPath) + etcdKV := etcdkv.NewEtcdKV(i.etcdCli, Params.EtcdCfg.MetaRootPath) i.etcdKV = etcdKV option := &miniokv.Option{ diff --git a/internal/indexnode/indexnode_mock.go b/internal/indexnode/indexnode_mock.go index ddcabdb1bf..4dba97157b 100644 --- a/internal/indexnode/indexnode_mock.go +++ b/internal/indexnode/indexnode_mock.go @@ -184,11 +184,11 @@ func (inm *Mock) Register() error { return errors.New("IndexNode register failed") } Params.Init() - inm.etcdKV = etcdkv.NewEtcdKV(inm.etcdCli, Params.BaseParams.MetaRootPath) + inm.etcdKV = etcdkv.NewEtcdKV(inm.etcdCli, Params.EtcdCfg.MetaRootPath) if err := inm.etcdKV.RemoveWithPrefix("session/" + typeutil.IndexNodeRole); err != nil { return err } - session := sessionutil.NewSession(context.Background(), Params.BaseParams.MetaRootPath, inm.etcdCli) + session := sessionutil.NewSession(context.Background(), Params.EtcdCfg.MetaRootPath, inm.etcdCli) session.Init(typeutil.IndexNodeRole, "localhost:21121", false, false) session.Register() return nil diff --git a/internal/indexnode/indexnode_mock_test.go b/internal/indexnode/indexnode_mock_test.go index 3f1c9a78fe..036033e267 100644 --- a/internal/indexnode/indexnode_mock_test.go +++ b/internal/indexnode/indexnode_mock_test.go @@ -36,7 +36,7 @@ func TestIndexNodeMock(t *testing.T) { inm := Mock{ Build: true, } - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.NoError(t, err) inm.SetEtcdClient(etcdCli) defer etcdCli.Close() @@ -154,7 +154,7 @@ func TestIndexNodeMockFiled(t *testing.T) { Build: true, Err: false, } - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.NoError(t, err) inm.SetEtcdClient(etcdCli) defer etcdCli.Close() diff --git a/internal/indexnode/indexnode_test.go b/internal/indexnode/indexnode_test.go index 334c071a28..39875d9004 100644 --- a/internal/indexnode/indexnode_test.go +++ b/internal/indexnode/indexnode_test.go @@ -64,7 +64,7 @@ func TestIndexNode(t *testing.T) { assert.Nil(t, err) Params.Init() - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.NoError(t, err) in.SetEtcdClient(etcdCli) defer etcdCli.Close() @@ -477,7 +477,7 @@ func TestCreateIndexFailed(t *testing.T) { assert.Nil(t, err) Params.Init() - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.NoError(t, err) in.SetEtcdClient(etcdCli) defer etcdCli.Close() @@ -749,7 +749,7 @@ func TestIndexNode_Error(t *testing.T) { assert.Nil(t, err) Params.Init() - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.NoError(t, err) in.SetEtcdClient(etcdCli) defer etcdCli.Close() diff --git a/internal/kv/etcd/embed_etcd_config_test.go b/internal/kv/etcd/embed_etcd_config_test.go index 6c238b3586..9d4033314b 100644 --- a/internal/kv/etcd/embed_etcd_config_test.go +++ b/internal/kv/etcd/embed_etcd_config_test.go @@ -36,14 +36,14 @@ func TestEtcdConfigLoad(te *testing.T) { // TODO, not sure if the relative path works for ci environment param.BaseTable.Save("etcd.config.path", "../../../configs/advanced/etcd.yaml") param.BaseTable.Save("etcd.data.dir", "etcd.test.data.dir") - param.LoadCfgToMemory() + param.EtcdCfg.LoadCfgToMemory() //clean up data defer func() { os.RemoveAll("etcd.test.data.dir") }() te.Run("Etcd Config", func(t *testing.T) { rootPath := "/test" - metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, param) + metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, ¶m.EtcdCfg) require.NoError(te, err) assert.NotNil(te, metaKv) require.NoError(t, err) diff --git a/internal/kv/etcd/embed_etcd_kv_test.go b/internal/kv/etcd/embed_etcd_kv_test.go index 9691892ded..e0ea7d12db 100644 --- a/internal/kv/etcd/embed_etcd_kv_test.go +++ b/internal/kv/etcd/embed_etcd_kv_test.go @@ -36,14 +36,14 @@ func TestEmbedEtcd(te *testing.T) { param.BaseTable.Save("etcd.use.embed", "true") param.BaseTable.Save("etcd.config.path", "../../../configs/advanced/etcd.yaml") param.BaseTable.Save("etcd.data.dir", "etcd.test.data.dir") - param.LoadCfgToMemory() + param.EtcdCfg.LoadCfgToMemory() //clean up data defer func() { os.RemoveAll("etcd.test.data.dir") }() te.Run("EtcdKV SaveAndLoad", func(t *testing.T) { rootPath := "/etcd/test/root/saveandload" - metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, param) + metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, ¶m.EtcdCfg) require.NoError(te, err) assert.NotNil(te, metaKv) require.NoError(t, err) @@ -153,7 +153,7 @@ func TestEmbedEtcd(te *testing.T) { te.Run("EtcdKV LoadWithRevision", func(t *testing.T) { rootPath := "/etcd/test/root/LoadWithRevision" - metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, param) + metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, ¶m.EtcdCfg) assert.Nil(t, err) defer metaKv.Close() @@ -198,7 +198,7 @@ func TestEmbedEtcd(te *testing.T) { te.Run("EtcdKV MultiSaveAndMultiLoad", func(t *testing.T) { rootPath := "/etcd/test/root/multi_save_and_multi_load" - metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, param) + metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, ¶m.EtcdCfg) assert.Nil(t, err) defer metaKv.Close() @@ -307,7 +307,7 @@ func TestEmbedEtcd(te *testing.T) { te.Run("EtcdKV MultiRemoveWithPrefix", func(t *testing.T) { rootPath := "/etcd/test/root/multi_remove_with_prefix" - metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, param) + metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, ¶m.EtcdCfg) require.NoError(t, err) defer metaKv.Close() @@ -395,7 +395,7 @@ func TestEmbedEtcd(te *testing.T) { te.Run("EtcdKV Watch", func(t *testing.T) { rootPath := "/etcd/test/root/watch" - metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, param) + metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, ¶m.EtcdCfg) assert.Nil(t, err) defer metaKv.Close() @@ -412,7 +412,7 @@ func TestEmbedEtcd(te *testing.T) { te.Run("Etcd Revision", func(t *testing.T) { rootPath := "/etcd/test/root/watch" - metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, param) + metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, ¶m.EtcdCfg) assert.Nil(t, err) defer metaKv.Close() @@ -463,7 +463,7 @@ func TestEmbedEtcd(te *testing.T) { te.Run("Etcd Lease", func(t *testing.T) { rootPath := "/etcd/test/root/lease" - metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, param) + metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, ¶m.EtcdCfg) assert.Nil(t, err) defer metaKv.Close() diff --git a/internal/kv/etcd/embed_etcd_restart_test.go b/internal/kv/etcd/embed_etcd_restart_test.go index 9f484b4e47..5be728dbc3 100644 --- a/internal/kv/etcd/embed_etcd_restart_test.go +++ b/internal/kv/etcd/embed_etcd_restart_test.go @@ -40,10 +40,10 @@ func TestEtcdRestartLoad(te *testing.T) { defer func() { os.RemoveAll("etcd.test.data.dir") }() - param.LoadCfgToMemory() + param.EtcdCfg.LoadCfgToMemory() te.Run("EtcdKV SaveRestartAndLoad", func(t *testing.T) { rootPath := "/etcd/test/root/saveRestartAndLoad" - metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, param) + metaKv, err := embed_etcd_kv.NewMetaKvFactory(rootPath, ¶m.EtcdCfg) require.NoError(te, err) assert.NotNil(te, metaKv) require.NoError(t, err) @@ -80,7 +80,7 @@ func TestEtcdRestartLoad(te *testing.T) { embed.Close() //restart and check test result - metaKv, _ = embed_etcd_kv.NewMetaKvFactory(rootPath, param) + metaKv, _ = embed_etcd_kv.NewMetaKvFactory(rootPath, ¶m.EtcdCfg) for _, test := range saveAndLoadTests { val, err := metaKv.Load(test.key) diff --git a/internal/kv/etcd/etcd_kv_test.go b/internal/kv/etcd/etcd_kv_test.go index 76d3639c6a..2b5937fe3d 100644 --- a/internal/kv/etcd/etcd_kv_test.go +++ b/internal/kv/etcd/etcd_kv_test.go @@ -38,7 +38,7 @@ func TestMain(m *testing.M) { } func TestEtcdKV_Load(te *testing.T) { - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) defer etcdCli.Close() assert.NoError(te, err) te.Run("EtcdKV SaveAndLoad", func(t *testing.T) { diff --git a/internal/kv/etcd/metakv_factory.go b/internal/kv/etcd/metakv_factory.go index ae5ec7226f..670eda34ca 100644 --- a/internal/kv/etcd/metakv_factory.go +++ b/internal/kv/etcd/metakv_factory.go @@ -27,12 +27,12 @@ import ( // NewMetaKvFactory returns an object that implements the kv.MetaKv interface using etcd. // The UseEmbedEtcd in the param is used to determine whether the etcd service is external or embedded. -func NewMetaKvFactory(rootPath string, param *paramtable.BaseParamTable) (kv.MetaKv, error) { +func NewMetaKvFactory(rootPath string, etcdCfg *paramtable.EtcdConfig) (kv.MetaKv, error) { log.Info("start etcd with rootPath", zap.String("rootpath", rootPath), - zap.Bool("isEmbed", param.UseEmbedEtcd)) - if param.UseEmbedEtcd { - path := param.EtcdConfigPath + zap.Bool("isEmbed", etcdCfg.UseEmbedEtcd)) + if etcdCfg.UseEmbedEtcd { + path := etcdCfg.ConfigPath var cfg *embed.Config if len(path) > 0 { cfgFromFile, err := embed.ConfigFromFile(path) @@ -43,14 +43,14 @@ func NewMetaKvFactory(rootPath string, param *paramtable.BaseParamTable) (kv.Met } else { cfg = embed.NewConfig() } - cfg.Dir = param.EtcdDataDir + cfg.Dir = etcdCfg.DataDir metaKv, err := NewEmbededEtcdKV(cfg, rootPath) if err != nil { return nil, err } return metaKv, err } - client, err := etcd.GetEtcdClient(param) + client, err := etcd.GetEtcdClient(etcdCfg) if err != nil { return nil, err } diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 525d2e44c1..2d7b5d81d6 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -135,13 +135,13 @@ func (node *Proxy) Register() error { // initSession initialize the session of Proxy. func (node *Proxy) initSession() error { - node.session = sessionutil.NewSession(node.ctx, Params.BaseParams.MetaRootPath, node.etcdCli) + node.session = sessionutil.NewSession(node.ctx, Params.EtcdCfg.MetaRootPath, node.etcdCli) if node.session == nil { return errors.New("new session failed, maybe etcd cannot be connected") } node.session.Init(typeutil.ProxyRole, Params.ProxyCfg.NetworkAddress, false, true) Params.ProxyCfg.ProxyID = node.session.ServerID - Params.BaseParams.SetLogger(Params.ProxyCfg.ProxyID) + Params.SetLogger(Params.ProxyCfg.ProxyID) return nil } diff --git a/internal/proxy/proxy_test.go b/internal/proxy/proxy_test.go index be0cc1eabe..2da66f87d1 100644 --- a/internal/proxy/proxy_test.go +++ b/internal/proxy/proxy_test.go @@ -123,7 +123,7 @@ func runRootCoord(ctx context.Context, localMsg bool) *grpcrootcoord.Server { go func() { rootcoord.Params.Init() if !localMsg { - logutil.SetupLogger(&rootcoord.Params.BaseParams.Log) + logutil.SetupLogger(&rootcoord.Params.Log) defer log.Sync() } @@ -154,7 +154,7 @@ func runQueryCoord(ctx context.Context, localMsg bool) *grpcquerycoord.Server { querycoord.Params.Init() if !localMsg { - logutil.SetupLogger(&querycoord.Params.BaseParams.Log) + logutil.SetupLogger(&querycoord.Params.Log) defer log.Sync() } @@ -186,7 +186,7 @@ func runQueryNode(ctx context.Context, localMsg bool, alias string) *grpcqueryno querynode.Params.Init() if !localMsg { - logutil.SetupLogger(&querynode.Params.BaseParams.Log) + logutil.SetupLogger(&querynode.Params.Log) defer log.Sync() } @@ -217,7 +217,7 @@ func runDataCoord(ctx context.Context, localMsg bool) *grpcdatacoordclient.Serve datacoord.Params.Init() if !localMsg { - logutil.SetupLogger(&datacoord.Params.BaseParams.Log) + logutil.SetupLogger(&datacoord.Params.Log) defer log.Sync() } @@ -245,7 +245,7 @@ func runDataNode(ctx context.Context, localMsg bool, alias string) *grpcdatanode datanode.Params.Init() if !localMsg { - logutil.SetupLogger(&datanode.Params.BaseParams.Log) + logutil.SetupLogger(&datanode.Params.Log) defer log.Sync() } @@ -276,7 +276,7 @@ func runIndexCoord(ctx context.Context, localMsg bool) *grpcindexcoord.Server { indexcoord.Params.Init() if !localMsg { - logutil.SetupLogger(&indexcoord.Params.BaseParams.Log) + logutil.SetupLogger(&indexcoord.Params.Log) defer log.Sync() } @@ -307,7 +307,7 @@ func runIndexNode(ctx context.Context, localMsg bool, alias string) *grpcindexno indexnode.Params.Init() if !localMsg { - logutil.SetupLogger(&indexnode.Params.BaseParams.Log) + logutil.SetupLogger(&indexnode.Params.Log) defer log.Sync() } @@ -317,7 +317,7 @@ func runIndexNode(ctx context.Context, localMsg bool, alias string) *grpcindexno panic(err) } wg.Done() - etcd, err := etcd.GetEtcdClient(&indexnode.Params.BaseParams) + etcd, err := etcd.GetEtcdClient(&indexnode.Params.EtcdCfg) if err != nil { panic(err) } @@ -521,7 +521,7 @@ func TestProxy(t *testing.T) { Params.Init() log.Info("Initialize parameter table of Proxy") - etcdcli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdcli, err := etcd.GetEtcdClient(&Params.EtcdCfg) defer etcdcli.Close() assert.NoError(t, err) proxy.SetEtcdClient(etcdcli) @@ -531,7 +531,7 @@ func TestProxy(t *testing.T) { go testServer.startGrpc(ctx, &wg) assert.NoError(t, testServer.waitForGrpcReady()) - rootCoordClient, err := rcc.NewClient(ctx, Params.BaseParams.MetaRootPath, etcdcli) + rootCoordClient, err := rcc.NewClient(ctx, Params.EtcdCfg.MetaRootPath, etcdcli) assert.NoError(t, err) err = rootCoordClient.Init() assert.NoError(t, err) @@ -540,7 +540,7 @@ func TestProxy(t *testing.T) { proxy.SetRootCoordClient(rootCoordClient) log.Info("Proxy set root coordinator client") - dataCoordClient, err := grpcdatacoordclient2.NewClient(ctx, Params.BaseParams.MetaRootPath, etcdcli) + dataCoordClient, err := grpcdatacoordclient2.NewClient(ctx, Params.EtcdCfg.MetaRootPath, etcdcli) assert.NoError(t, err) err = dataCoordClient.Init() assert.NoError(t, err) @@ -549,7 +549,7 @@ func TestProxy(t *testing.T) { proxy.SetDataCoordClient(dataCoordClient) log.Info("Proxy set data coordinator client") - queryCoordClient, err := grpcquerycoordclient.NewClient(ctx, Params.BaseParams.MetaRootPath, etcdcli) + queryCoordClient, err := grpcquerycoordclient.NewClient(ctx, Params.EtcdCfg.MetaRootPath, etcdcli) assert.NoError(t, err) err = queryCoordClient.Init() assert.NoError(t, err) @@ -558,7 +558,7 @@ func TestProxy(t *testing.T) { proxy.SetQueryCoordClient(queryCoordClient) log.Info("Proxy set query coordinator client") - indexCoordClient, err := grpcindexcoordclient.NewClient(ctx, Params.BaseParams.MetaRootPath, etcdcli) + indexCoordClient, err := grpcindexcoordclient.NewClient(ctx, Params.EtcdCfg.MetaRootPath, etcdcli) assert.NoError(t, err) err = indexCoordClient.Init() assert.NoError(t, err) diff --git a/internal/querycoord/channel_allocator_test.go b/internal/querycoord/channel_allocator_test.go index cb7bce042f..36b0a2024a 100644 --- a/internal/querycoord/channel_allocator_test.go +++ b/internal/querycoord/channel_allocator_test.go @@ -34,11 +34,11 @@ func TestShuffleChannelsToQueryNode(t *testing.T) { refreshParams() baseCtx, cancel := context.WithCancel(context.Background()) - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) defer etcdCli.Close() assert.Nil(t, err) - kv := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath) - clusterSession := sessionutil.NewSession(context.Background(), Params.BaseParams.MetaRootPath, etcdCli) + kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) + clusterSession := sessionutil.NewSession(context.Background(), Params.EtcdCfg.MetaRootPath, etcdCli) clusterSession.Init(typeutil.QueryCoordRole, Params.QueryCoordCfg.Address, true, false) clusterSession.Register() meta, err := newMeta(baseCtx, kv, nil, nil) diff --git a/internal/querycoord/channel_unsubscribe_test.go b/internal/querycoord/channel_unsubscribe_test.go index a81b2babd1..4d7dd13a5c 100644 --- a/internal/querycoord/channel_unsubscribe_test.go +++ b/internal/querycoord/channel_unsubscribe_test.go @@ -33,10 +33,10 @@ import ( func Test_HandlerReloadFromKV(t *testing.T) { refreshParams() baseCtx, cancel := context.WithCancel(context.Background()) - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.Nil(t, err) defer etcdCli.Close() - kv := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath) + kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) channelInfoKey := fmt.Sprintf("%s/%d", unsubscribeChannelInfoPrefix, defaultQueryNodeID) unsubscribeChannelInfo := &querypb.UnsubscribeChannelInfo{ @@ -59,10 +59,10 @@ func Test_HandlerReloadFromKV(t *testing.T) { func Test_AddUnsubscribeChannelInfo(t *testing.T) { refreshParams() baseCtx, cancel := context.WithCancel(context.Background()) - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.Nil(t, err) defer etcdCli.Close() - kv := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath) + kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) factory := msgstream.NewPmsFactory() handler, err := newChannelUnsubscribeHandler(baseCtx, kv, factory) assert.Nil(t, err) @@ -91,10 +91,10 @@ func Test_AddUnsubscribeChannelInfo(t *testing.T) { func Test_HandleChannelUnsubscribeLoop(t *testing.T) { refreshParams() baseCtx, cancel := context.WithCancel(context.Background()) - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.Nil(t, err) defer etcdCli.Close() - kv := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath) + kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) factory := msgstream.NewPmsFactory() m := map[string]interface{}{ "PulsarAddress": Params.PulsarCfg.Address, diff --git a/internal/querycoord/cluster_test.go b/internal/querycoord/cluster_test.go index 7a0fbbedc4..9f17b56f6d 100644 --- a/internal/querycoord/cluster_test.go +++ b/internal/querycoord/cluster_test.go @@ -390,14 +390,14 @@ func TestQueryNodeCluster_getMetrics(t *testing.T) { } func TestReloadClusterFromKV(t *testing.T) { - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) defer etcdCli.Close() assert.Nil(t, err) t.Run("Test LoadOnlineNodes", func(t *testing.T) { refreshParams() baseCtx := context.Background() - kv := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath) - clusterSession := sessionutil.NewSession(context.Background(), Params.BaseParams.MetaRootPath, etcdCli) + kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) + clusterSession := sessionutil.NewSession(context.Background(), Params.EtcdCfg.MetaRootPath, etcdCli) clusterSession.Init(typeutil.QueryCoordRole, Params.QueryCoordCfg.Address, true, false) clusterSession.Register() cluster := &queryNodeCluster{ @@ -425,8 +425,8 @@ func TestReloadClusterFromKV(t *testing.T) { t.Run("Test LoadOfflineNodes", func(t *testing.T) { refreshParams() ctx, cancel := context.WithCancel(context.Background()) - kv := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath) - clusterSession := sessionutil.NewSession(context.Background(), Params.BaseParams.MetaRootPath, etcdCli) + kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) + clusterSession := sessionutil.NewSession(context.Background(), Params.EtcdCfg.MetaRootPath, etcdCli) clusterSession.Init(typeutil.QueryCoordRole, Params.QueryCoordCfg.Address, true, false) clusterSession.Register() factory := msgstream.NewPmsFactory() @@ -470,11 +470,11 @@ func TestReloadClusterFromKV(t *testing.T) { func TestGrpcRequest(t *testing.T) { refreshParams() baseCtx, cancel := context.WithCancel(context.Background()) - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.Nil(t, err) defer etcdCli.Close() - kv := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath) - clusterSession := sessionutil.NewSession(context.Background(), Params.BaseParams.MetaRootPath, etcdCli) + kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) + clusterSession := sessionutil.NewSession(context.Background(), Params.EtcdCfg.MetaRootPath, etcdCli) clusterSession.Init(typeutil.QueryCoordRole, Params.QueryCoordCfg.Address, true, false) clusterSession.Register() factory := msgstream.NewPmsFactory() @@ -704,11 +704,11 @@ func TestEstimateSegmentSize(t *testing.T) { func TestSetNodeState(t *testing.T) { refreshParams() baseCtx, cancel := context.WithCancel(context.Background()) - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.Nil(t, err) defer etcdCli.Close() - kv := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath) - clusterSession := sessionutil.NewSession(context.Background(), Params.BaseParams.MetaRootPath, etcdCli) + kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) + clusterSession := sessionutil.NewSession(context.Background(), Params.EtcdCfg.MetaRootPath, etcdCli) clusterSession.Init(typeutil.QueryCoordRole, Params.QueryCoordCfg.Address, true, false) clusterSession.Register() factory := msgstream.NewPmsFactory() diff --git a/internal/querycoord/index_checker_test.go b/internal/querycoord/index_checker_test.go index c77fd2cbd8..d047356481 100644 --- a/internal/querycoord/index_checker_test.go +++ b/internal/querycoord/index_checker_test.go @@ -35,10 +35,10 @@ import ( func TestReloadFromKV(t *testing.T) { refreshParams() baseCtx, cancel := context.WithCancel(context.Background()) - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) defer etcdCli.Close() assert.Nil(t, err) - kv := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath) + kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) meta, err := newMeta(baseCtx, kv, nil, nil) assert.Nil(t, err) @@ -91,10 +91,10 @@ func TestReloadFromKV(t *testing.T) { func TestCheckIndexLoop(t *testing.T) { refreshParams() ctx, cancel := context.WithCancel(context.Background()) - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) defer etcdCli.Close() assert.Nil(t, err) - kv := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath) + kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) meta, err := newMeta(ctx, kv, nil, nil) assert.Nil(t, err) @@ -157,10 +157,10 @@ func TestCheckIndexLoop(t *testing.T) { func TestHandoffNotExistSegment(t *testing.T) { refreshParams() ctx, cancel := context.WithCancel(context.Background()) - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) defer etcdCli.Close() assert.Nil(t, err) - kv := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath) + kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) meta, err := newMeta(ctx, kv, nil, nil) assert.Nil(t, err) @@ -206,11 +206,11 @@ func TestHandoffNotExistSegment(t *testing.T) { func TestProcessHandoffAfterIndexDone(t *testing.T) { refreshParams() ctx, cancel := context.WithCancel(context.Background()) - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.Nil(t, err) defer etcdCli.Close() - kv := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath) + kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) meta, err := newMeta(ctx, kv, nil, nil) assert.Nil(t, err) taskScheduler := &TaskScheduler{ @@ -219,7 +219,7 @@ func TestProcessHandoffAfterIndexDone(t *testing.T) { client: kv, triggerTaskQueue: NewTaskQueue(), } - idAllocatorKV := tsoutil.NewTSOKVBase(etcdCli, Params.BaseParams.KvRootPath, "queryCoordTaskID") + idAllocatorKV := tsoutil.NewTSOKVBase(etcdCli, Params.EtcdCfg.KvRootPath, "queryCoordTaskID") idAllocator := allocator.NewGlobalIDAllocator("idTimestamp", idAllocatorKV) err = idAllocator.Initialize() assert.Nil(t, err) diff --git a/internal/querycoord/meta_test.go b/internal/querycoord/meta_test.go index e5cc18f53d..95bfe04fd4 100644 --- a/internal/querycoord/meta_test.go +++ b/internal/querycoord/meta_test.go @@ -64,10 +64,10 @@ func (tk *testKv) Load(key string) (string, error) { func TestReplica_Release(t *testing.T) { refreshParams() - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.Nil(t, err) defer etcdCli.Close() - etcdKV := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath) + etcdKV := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) meta, err := newMeta(context.Background(), etcdKV, nil, nil) assert.Nil(t, err) err = meta.addCollection(1, querypb.LoadType_loadCollection, nil) @@ -96,10 +96,10 @@ func TestReplica_Release(t *testing.T) { func TestMetaFunc(t *testing.T) { refreshParams() - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.Nil(t, err) defer etcdCli.Close() - kv := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath) + kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) nodeID := defaultQueryNodeID segmentInfos := make(map[UniqueID]*querypb.SegmentInfo) @@ -290,10 +290,10 @@ func TestMetaFunc(t *testing.T) { func TestReloadMetaFromKV(t *testing.T) { refreshParams() - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.Nil(t, err) defer etcdCli.Close() - kv := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath) + kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) meta := &MetaReplica{ client: kv, collectionInfos: map[UniqueID]*querypb.CollectionInfo{}, diff --git a/internal/querycoord/mock_querynode_server_test.go b/internal/querycoord/mock_querynode_server_test.go index 741db55113..f7b8b69359 100644 --- a/internal/querycoord/mock_querynode_server_test.go +++ b/internal/querycoord/mock_querynode_server_test.go @@ -101,12 +101,12 @@ func newQueryNodeServerMock(ctx context.Context) *queryNodeServerMock { } func (qs *queryNodeServerMock) Register() error { - log.Debug("query node session info", zap.String("metaPath", Params.BaseParams.MetaRootPath)) - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + log.Debug("query node session info", zap.String("metaPath", Params.EtcdCfg.MetaRootPath)) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) if err != nil { return err } - qs.session = sessionutil.NewSession(qs.ctx, Params.BaseParams.MetaRootPath, etcdCli) + qs.session = sessionutil.NewSession(qs.ctx, Params.EtcdCfg.MetaRootPath, etcdCli) qs.session.Init(typeutil.QueryNodeRole, qs.queryNodeIP+":"+strconv.FormatInt(qs.queryNodePort, 10), false, false) qs.queryNodeID = qs.session.ServerID log.Debug("query nodeID", zap.Int64("nodeID", qs.queryNodeID)) diff --git a/internal/querycoord/query_coord.go b/internal/querycoord/query_coord.go index 2335843545..0741733e4d 100644 --- a/internal/querycoord/query_coord.go +++ b/internal/querycoord/query_coord.go @@ -117,19 +117,19 @@ func (qc *QueryCoord) Register() error { } func (qc *QueryCoord) initSession() error { - qc.session = sessionutil.NewSession(qc.loopCtx, Params.BaseParams.MetaRootPath, qc.etcdCli) + qc.session = sessionutil.NewSession(qc.loopCtx, Params.EtcdCfg.MetaRootPath, qc.etcdCli) if qc.session == nil { return fmt.Errorf("session is nil, the etcd client connection may have failed") } qc.session.Init(typeutil.QueryCoordRole, Params.QueryCoordCfg.Address, true, true) Params.QueryCoordCfg.NodeID = uint64(qc.session.ServerID) - Params.BaseParams.SetLogger(qc.session.ServerID) + Params.SetLogger(qc.session.ServerID) return nil } // Init function initializes the queryCoord's meta, cluster, etcdKV and task scheduler func (qc *QueryCoord) Init() error { - log.Debug("query coordinator start init, session info", zap.String("metaPath", Params.BaseParams.MetaRootPath), zap.String("address", Params.QueryCoordCfg.Address)) + log.Debug("query coordinator start init, session info", zap.String("metaPath", Params.EtcdCfg.MetaRootPath), zap.String("address", Params.QueryCoordCfg.Address)) var initError error qc.initOnce.Do(func() { err := qc.initSession() @@ -139,12 +139,12 @@ func (qc *QueryCoord) Init() error { return } log.Debug("queryCoord try to connect etcd") - etcdKV := etcdkv.NewEtcdKV(qc.etcdCli, Params.BaseParams.MetaRootPath) + etcdKV := etcdkv.NewEtcdKV(qc.etcdCli, Params.EtcdCfg.MetaRootPath) qc.kvClient = etcdKV log.Debug("query coordinator try to connect etcd success") // init id allocator - idAllocatorKV := tsoutil.NewTSOKVBase(qc.etcdCli, Params.BaseParams.KvRootPath, "queryCoordTaskID") + idAllocatorKV := tsoutil.NewTSOKVBase(qc.etcdCli, Params.EtcdCfg.KvRootPath, "queryCoordTaskID") idAllocator := allocator.NewGlobalIDAllocator("idTimestamp", idAllocatorKV) initError = idAllocator.Initialize() if initError != nil { diff --git a/internal/querycoord/query_coord_test.go b/internal/querycoord/query_coord_test.go index 0ad7d775e7..82bf2a0b14 100644 --- a/internal/querycoord/query_coord_test.go +++ b/internal/querycoord/query_coord_test.go @@ -48,7 +48,7 @@ func refreshParams() { suffix := "-test-query-Coord" + strconv.FormatInt(rand.Int63(), 10) Params.MsgChannelCfg.QueryNodeStats = Params.MsgChannelCfg.QueryNodeStats + suffix Params.MsgChannelCfg.QueryCoordTimeTick = Params.MsgChannelCfg.QueryCoordTimeTick + suffix - Params.BaseParams.MetaRootPath = Params.BaseParams.MetaRootPath + suffix + Params.EtcdCfg.MetaRootPath = Params.EtcdCfg.MetaRootPath + suffix Params.MsgChannelCfg.RootCoordDml = "Dml" Params.MsgChannelCfg.RootCoordDelta = "delta" GlobalSegmentInfos = make(map[UniqueID]*querypb.SegmentInfo) @@ -92,7 +92,7 @@ func startQueryCoord(ctx context.Context) (*QueryCoord, error) { coord.SetRootCoord(rootCoord) coord.SetDataCoord(dataCoord) coord.SetIndexCoord(indexCoord) - etcd, err := etcd.GetEtcdClient(&Params.BaseParams) + etcd, err := etcd.GetEtcdClient(&Params.EtcdCfg) if err != nil { return nil, err } @@ -137,7 +137,7 @@ func startUnHealthyQueryCoord(ctx context.Context) (*QueryCoord, error) { coord.SetRootCoord(rootCoord) coord.SetDataCoord(dataCoord) - etcd, err := etcd.GetEtcdClient(&Params.BaseParams) + etcd, err := etcd.GetEtcdClient(&Params.EtcdCfg) if err != nil { return nil, err } @@ -156,12 +156,12 @@ func startUnHealthyQueryCoord(ctx context.Context) (*QueryCoord, error) { func TestWatchNodeLoop(t *testing.T) { baseCtx := context.Background() - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.Nil(t, err) t.Run("Test OfflineNodes", func(t *testing.T) { refreshParams() - kv := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath) + kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) kvs := make(map[string]string) session := &sessionutil.Session{ diff --git a/internal/querycoord/querynode_test.go b/internal/querycoord/querynode_test.go index 3179e5840d..47c0f656cc 100644 --- a/internal/querycoord/querynode_test.go +++ b/internal/querycoord/querynode_test.go @@ -37,12 +37,12 @@ import ( //func waitQueryNodeOnline(cluster *queryNodeCluster, nodeID int64) func removeNodeSession(id int64) error { - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) defer etcdCli.Close() if err != nil { return err } - kv := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath) + kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) err = kv.Remove(fmt.Sprintf("session/"+typeutil.QueryNodeRole+"-%d", id)) if err != nil { @@ -52,12 +52,12 @@ func removeNodeSession(id int64) error { } func removeAllSession() error { - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) defer etcdCli.Close() if err != nil { return err } - kv := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath) + kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) err = kv.RemoveWithPrefix("session") if err != nil { return err @@ -193,10 +193,10 @@ func TestQueryNode_getMetrics(t *testing.T) { func TestNewQueryNode(t *testing.T) { refreshParams() baseCtx, cancel := context.WithCancel(context.Background()) - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.Nil(t, err) defer etcdCli.Close() - kv := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath) + kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) queryNode1, err := startQueryNodeServer(baseCtx) assert.Nil(t, err) @@ -219,10 +219,10 @@ func TestNewQueryNode(t *testing.T) { func TestReleaseCollectionOnOfflineNode(t *testing.T) { refreshParams() baseCtx, cancel := context.WithCancel(context.Background()) - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.Nil(t, err) defer etcdCli.Close() - kv := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath) + kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) node, err := newQueryNode(baseCtx, "test", 100, kv) assert.Nil(t, err) @@ -290,10 +290,10 @@ func TestSealedSegmentChangeAfterQueryNodeStop(t *testing.T) { func TestGrpcRequestWithNodeOffline(t *testing.T) { refreshParams() baseCtx, cancel := context.WithCancel(context.Background()) - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.Nil(t, err) defer etcdCli.Close() - kv := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath) + kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) nodeServer, err := startQueryNodeServer(baseCtx) assert.Nil(t, err) address := nodeServer.queryNodeIP diff --git a/internal/querycoord/segment_allocator_test.go b/internal/querycoord/segment_allocator_test.go index ece6749036..e952459780 100644 --- a/internal/querycoord/segment_allocator_test.go +++ b/internal/querycoord/segment_allocator_test.go @@ -34,11 +34,11 @@ import ( func TestShuffleSegmentsToQueryNode(t *testing.T) { refreshParams() baseCtx, cancel := context.WithCancel(context.Background()) - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) defer etcdCli.Close() assert.Nil(t, err) - kv := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath) - clusterSession := sessionutil.NewSession(context.Background(), Params.BaseParams.MetaRootPath, etcdCli) + kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) + clusterSession := sessionutil.NewSession(context.Background(), Params.EtcdCfg.MetaRootPath, etcdCli) clusterSession.Init(typeutil.QueryCoordRole, Params.QueryCoordCfg.Address, true, false) factory := msgstream.NewPmsFactory() meta, err := newMeta(baseCtx, kv, factory, nil) diff --git a/internal/querycoord/task_scheduler_test.go b/internal/querycoord/task_scheduler_test.go index 9ec39b4ea9..9ffb605a9e 100644 --- a/internal/querycoord/task_scheduler_test.go +++ b/internal/querycoord/task_scheduler_test.go @@ -209,10 +209,10 @@ func TestWatchQueryChannel_ClearEtcdInfoAfterAssignedNodeDown(t *testing.T) { func TestUnMarshalTask(t *testing.T) { refreshParams() - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.Nil(t, err) defer etcdCli.Close() - kv := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath) + kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) baseCtx, cancel := context.WithCancel(context.Background()) taskScheduler := &TaskScheduler{ ctx: baseCtx, @@ -457,10 +457,10 @@ func TestUnMarshalTask(t *testing.T) { func TestReloadTaskFromKV(t *testing.T) { refreshParams() - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.Nil(t, err) defer etcdCli.Close() - kv := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath) + kv := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) assert.Nil(t, err) baseCtx, cancel := context.WithCancel(context.Background()) taskScheduler := &TaskScheduler{ diff --git a/internal/querynode/impl_test.go b/internal/querynode/impl_test.go index 4f8f1fc557..3b0ed7e2d2 100644 --- a/internal/querynode/impl_test.go +++ b/internal/querynode/impl_test.go @@ -480,7 +480,7 @@ func TestImpl_GetMetrics(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.NoError(t, err) defer etcdCli.Close() @@ -490,7 +490,7 @@ func TestImpl_GetMetrics(t *testing.T) { defer wg.Done() node, err := genSimpleQueryNode(ctx) assert.NoError(t, err) - node.session = sessionutil.NewSession(node.queryNodeLoopCtx, Params.BaseParams.MetaRootPath, etcdCli) + node.session = sessionutil.NewSession(node.queryNodeLoopCtx, Params.EtcdCfg.MetaRootPath, etcdCli) metricReq := make(map[string]string) metricReq[metricsinfo.MetricTypeKey] = "system_info" diff --git a/internal/querynode/metrics_info_test.go b/internal/querynode/metrics_info_test.go index 34c172e0ce..0c547c0cb7 100644 --- a/internal/querynode/metrics_info_test.go +++ b/internal/querynode/metrics_info_test.go @@ -35,10 +35,10 @@ func TestGetSystemInfoMetrics(t *testing.T) { node, err := genSimpleQueryNode(ctx) assert.NoError(t, err) - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.NoError(t, err) defer etcdCli.Close() - node.session = sessionutil.NewSession(node.queryNodeLoopCtx, Params.BaseParams.MetaRootPath, etcdCli) + node.session = sessionutil.NewSession(node.queryNodeLoopCtx, Params.EtcdCfg.MetaRootPath, etcdCli) req := &milvuspb.GetMetricsRequest{ Base: genCommonMsgBase(commonpb.MsgType_WatchQueryChannels), diff --git a/internal/querynode/mock_test.go b/internal/querynode/mock_test.go index 8709c7309b..dc94b3470b 100644 --- a/internal/querynode/mock_test.go +++ b/internal/querynode/mock_test.go @@ -369,11 +369,11 @@ func genMinioKV(ctx context.Context) (*minioKV.MinIOKV, error) { } func genEtcdKV() (*etcdkv.EtcdKV, error) { - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) if err != nil { return nil, err } - etcdKV := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath) + etcdKV := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) return etcdKV, nil } @@ -406,7 +406,7 @@ func genQueryMsgStream(ctx context.Context) (msgstream.MsgStream, error) { } func genLocalChunkManager() (storage.ChunkManager, error) { - p, err := Params.BaseParams.Load("storage.path") + p, err := Params.Load("storage.path") if err != nil { return nil, err } @@ -426,7 +426,7 @@ func genRemoteChunkManager(ctx context.Context) (storage.ChunkManager, error) { } func genVectorChunkManager(ctx context.Context) (storage.ChunkManager, error) { - p, err := Params.BaseParams.Load("storage.path") + p, err := Params.Load("storage.path") if err != nil { return nil, err } @@ -1287,14 +1287,14 @@ func genSimpleQueryNode(ctx context.Context) (*QueryNode, error) { return nil, err } node := NewQueryNode(ctx, fac) - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) if err != nil { return nil, err } node.etcdCli = etcdCli node.initSession() - etcdKV := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath) + etcdKV := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) node.etcdKV = etcdKV node.tSafeReplica = newTSafeReplica() diff --git a/internal/querynode/query_collection_test.go b/internal/querynode/query_collection_test.go index 16e6a76a00..a1fe1976b8 100644 --- a/internal/querynode/query_collection_test.go +++ b/internal/querynode/query_collection_test.go @@ -132,10 +132,10 @@ func TestQueryCollection_withoutVChannel(t *testing.T) { factory := msgstream.NewPmsFactory() err := factory.SetParams(m) assert.Nil(t, err) - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.Nil(t, err) defer etcdCli.Close() - etcdKV := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath) + etcdKV := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) schema := genTestCollectionSchema(0, false, 2) historicalReplica := newCollectionReplica(etcdKV) diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index 2ac65161fe..b4dd066f10 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -137,13 +137,13 @@ func NewQueryNode(ctx context.Context, factory msgstream.Factory) *QueryNode { } func (node *QueryNode) initSession() error { - node.session = sessionutil.NewSession(node.queryNodeLoopCtx, Params.BaseParams.MetaRootPath, node.etcdCli) + node.session = sessionutil.NewSession(node.queryNodeLoopCtx, Params.EtcdCfg.MetaRootPath, node.etcdCli) if node.session == nil { return fmt.Errorf("session is nil, the etcd client connection may have failed") } node.session.Init(typeutil.QueryNodeRole, Params.QueryNodeCfg.QueryNodeIP+":"+strconv.FormatInt(Params.QueryNodeCfg.QueryNodePort, 10), false, true) Params.QueryNodeCfg.QueryNodeID = node.session.ServerID - Params.BaseParams.SetLogger(Params.QueryNodeCfg.QueryNodeID) + Params.SetLogger(Params.QueryNodeCfg.QueryNodeID) log.Debug("QueryNode", zap.Int64("nodeID", Params.QueryNodeCfg.QueryNodeID), zap.String("node address", node.session.Address)) return nil } @@ -255,7 +255,7 @@ func (node *QueryNode) Init() error { var initError error = nil node.initOnce.Do(func() { //ctx := context.Background() - log.Debug("QueryNode session info", zap.String("metaPath", Params.BaseParams.MetaRootPath)) + log.Debug("QueryNode session info", zap.String("metaPath", Params.EtcdCfg.MetaRootPath)) err := node.initSession() if err != nil { log.Error("QueryNode init session failed", zap.Error(err)) @@ -264,8 +264,8 @@ func (node *QueryNode) Init() error { } Params.QueryNodeCfg.Refresh() - node.etcdKV = etcdkv.NewEtcdKV(node.etcdCli, Params.BaseParams.MetaRootPath) - log.Debug("queryNode try to connect etcd success", zap.Any("MetaRootPath", Params.BaseParams.MetaRootPath)) + node.etcdKV = etcdkv.NewEtcdKV(node.etcdCli, Params.EtcdCfg.MetaRootPath) + log.Debug("queryNode try to connect etcd success", zap.Any("MetaRootPath", Params.EtcdCfg.MetaRootPath)) node.tSafeReplica = newTSafeReplica() streamingReplica := newCollectionReplica(node.etcdKV) diff --git a/internal/querynode/query_node_test.go b/internal/querynode/query_node_test.go index abe75b44ca..2c63bca954 100644 --- a/internal/querynode/query_node_test.go +++ b/internal/querynode/query_node_test.go @@ -46,7 +46,7 @@ type queryCoordMock struct { func setup() { os.Setenv("QUERY_NODE_ID", "1") Params.Init() - Params.BaseParams.MetaRootPath = "/etcd/test/root/querynode" + Params.EtcdCfg.MetaRootPath = "/etcd/test/root/querynode" } func genTestCollectionSchema(collectionID UniqueID, isBinary bool, dim int) *schemapb.CollectionSchema { @@ -187,11 +187,11 @@ func newQueryNodeMock() *QueryNode { cancel() }() } - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) if err != nil { panic(err) } - etcdKV := etcdkv.NewEtcdKV(etcdCli, Params.BaseParams.MetaRootPath) + etcdKV := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) msFactory, err := newMessageStreamFactory() if err != nil { @@ -271,7 +271,7 @@ func TestQueryNode_register(t *testing.T) { node, err := genSimpleQueryNode(ctx) assert.NoError(t, err) - etcdcli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdcli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.NoError(t, err) defer etcdcli.Close() node.SetEtcdClient(etcdcli) @@ -289,7 +289,7 @@ func TestQueryNode_init(t *testing.T) { node, err := genSimpleQueryNode(ctx) assert.NoError(t, err) - etcdcli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdcli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.NoError(t, err) defer etcdcli.Close() node.SetEtcdClient(etcdcli) diff --git a/internal/querynode/query_service.go b/internal/querynode/query_service.go index 9452b6c9fb..7e44e54fd2 100644 --- a/internal/querynode/query_service.go +++ b/internal/querynode/query_service.go @@ -69,11 +69,11 @@ func newQueryService(ctx context.Context, queryServiceCtx, queryServiceCancel := context.WithCancel(ctx) //TODO godchen: change this to configuration - path, err := Params.BaseParams.Load("localStorage.Path") + path, err := Params.Load("localStorage.Path") if err != nil { path = "/tmp/milvus/data" } - enabled, _ := Params.BaseParams.Load("localStorage.enabled") + enabled, _ := Params.Load("localStorage.enabled") localCacheEnabled, _ := strconv.ParseBool(enabled) localChunkManager := storage.NewLocalChunkManager(path) diff --git a/internal/rootcoord/meta_snapshot_test.go b/internal/rootcoord/meta_snapshot_test.go index 07ae180210..e4ec59649f 100644 --- a/internal/rootcoord/meta_snapshot_test.go +++ b/internal/rootcoord/meta_snapshot_test.go @@ -37,7 +37,7 @@ func TestMetaSnapshot(t *testing.T) { rootPath := fmt.Sprintf("/test/meta/%d", randVal) tsKey := "timestamp" - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.Nil(t, err) defer etcdCli.Close() @@ -177,7 +177,7 @@ func TestGetRevOnEtcd(t *testing.T) { tsKey := "timestamp" key := path.Join(rootPath, tsKey) - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.Nil(t, err) defer etcdCli.Close() @@ -221,7 +221,7 @@ func TestLoad(t *testing.T) { rootPath := fmt.Sprintf("/test/meta/%d", randVal) tsKey := "timestamp" - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.Nil(t, err) defer etcdCli.Close() @@ -269,7 +269,7 @@ func TestMultiSave(t *testing.T) { rootPath := fmt.Sprintf("/test/meta/%d", randVal) tsKey := "timestamp" - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.Nil(t, err) defer etcdCli.Close() @@ -333,7 +333,7 @@ func TestMultiSaveAndRemoveWithPrefix(t *testing.T) { rootPath := fmt.Sprintf("/test/meta/%d", randVal) tsKey := "timestamp" - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.Nil(t, err) defer etcdCli.Close() @@ -411,7 +411,7 @@ func TestTsBackward(t *testing.T) { rootPath := fmt.Sprintf("/test/meta/%d", randVal) tsKey := "timestamp" - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.Nil(t, err) defer etcdCli.Close() @@ -438,7 +438,7 @@ func TestFix7150(t *testing.T) { rootPath := fmt.Sprintf("/test/meta/%d", randVal) tsKey := "timestamp" - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.Nil(t, err) defer etcdCli.Close() diff --git a/internal/rootcoord/meta_table_test.go b/internal/rootcoord/meta_table_test.go index 9ec8b17a82..dcbf56fde1 100644 --- a/internal/rootcoord/meta_table_test.go +++ b/internal/rootcoord/meta_table_test.go @@ -222,7 +222,7 @@ func TestMetaTable(t *testing.T) { return vtso } - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) require.Nil(t, err) defer etcdCli.Close() @@ -1142,7 +1142,7 @@ func TestMetaWithTimestamp(t *testing.T) { vtso++ return vtso } - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.Nil(t, err) defer etcdCli.Close() @@ -1299,7 +1299,7 @@ func TestFixIssue10540(t *testing.T) { Params.Init() rootPath := fmt.Sprintf("/test/meta/%d", randVal) - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.Nil(t, err) defer etcdCli.Close() diff --git a/internal/rootcoord/proxy_client_manager_test.go b/internal/rootcoord/proxy_client_manager_test.go index 34ec69017f..8c1296f9b9 100644 --- a/internal/rootcoord/proxy_client_manager_test.go +++ b/internal/rootcoord/proxy_client_manager_test.go @@ -32,7 +32,7 @@ func TestProxyClientManager_GetProxyClients(t *testing.T) { core, err := NewCore(context.Background(), nil) assert.Nil(t, err) - cli, err := etcd.GetEtcdClient(&Params.BaseParams) + cli, err := etcd.GetEtcdClient(&Params.EtcdCfg) defer cli.Close() assert.Nil(t, err) core.etcdCli = cli @@ -59,7 +59,7 @@ func TestProxyClientManager_AddProxyClient(t *testing.T) { core, err := NewCore(context.Background(), nil) assert.Nil(t, err) - cli, err := etcd.GetEtcdClient(&Params.BaseParams) + cli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.Nil(t, err) defer cli.Close() core.etcdCli = cli @@ -86,7 +86,7 @@ func TestProxyClientManager_InvalidateCollectionMetaCache(t *testing.T) { core, err := NewCore(ctx, nil) assert.Nil(t, err) - cli, err := etcd.GetEtcdClient(&Params.BaseParams) + cli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.Nil(t, err) defer cli.Close() core.etcdCli = cli @@ -117,7 +117,7 @@ func TestProxyClientManager_ReleaseDQLMessageStream(t *testing.T) { core, err := NewCore(ctx, nil) assert.Nil(t, err) - cli, err := etcd.GetEtcdClient(&Params.BaseParams) + cli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.Nil(t, err) defer cli.Close() core.etcdCli = cli diff --git a/internal/rootcoord/proxy_manager.go b/internal/rootcoord/proxy_manager.go index d9c2fe1a78..aba08922eb 100644 --- a/internal/rootcoord/proxy_manager.go +++ b/internal/rootcoord/proxy_manager.go @@ -88,7 +88,7 @@ func (p *proxyManager) WatchProxy() error { eventCh := p.etcdCli.Watch( p.ctx, - path.Join(Params.BaseParams.MetaRootPath, sessionutil.DefaultServiceRoot, typeutil.ProxyRole), + path.Join(Params.EtcdCfg.MetaRootPath, sessionutil.DefaultServiceRoot, typeutil.ProxyRole), clientv3.WithPrefix(), clientv3.WithCreatedNotify(), clientv3.WithPrevKV(), @@ -168,7 +168,7 @@ func (p *proxyManager) parseSession(value []byte) (*sessionutil.Session, error) func (p *proxyManager) getSessionsOnEtcd(ctx context.Context) ([]*sessionutil.Session, int64, error) { resp, err := p.etcdCli.Get( ctx, - path.Join(Params.BaseParams.MetaRootPath, sessionutil.DefaultServiceRoot, typeutil.ProxyRole), + path.Join(Params.EtcdCfg.MetaRootPath, sessionutil.DefaultServiceRoot, typeutil.ProxyRole), clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend), ) @@ -200,7 +200,7 @@ func listProxyInEtcd(ctx context.Context, cli *clientv3.Client) (map[int64]*sess defer cancel() resp, err := cli.Get( ctx2, - path.Join(Params.BaseParams.MetaRootPath, sessionutil.DefaultServiceRoot, typeutil.ProxyRole), + path.Join(Params.EtcdCfg.MetaRootPath, sessionutil.DefaultServiceRoot, typeutil.ProxyRole), clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend), ) diff --git a/internal/rootcoord/proxy_manager_test.go b/internal/rootcoord/proxy_manager_test.go index f7a580ed5f..bd574b5851 100644 --- a/internal/rootcoord/proxy_manager_test.go +++ b/internal/rootcoord/proxy_manager_test.go @@ -33,13 +33,13 @@ import ( func TestProxyManager(t *testing.T) { Params.Init() - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.Nil(t, err) defer etcdCli.Close() ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - sessKey := path.Join(Params.BaseParams.MetaRootPath, sessionutil.DefaultServiceRoot) + sessKey := path.Join(Params.EtcdCfg.MetaRootPath, sessionutil.DefaultServiceRoot) etcdCli.Delete(ctx, sessKey, clientv3.WithPrefix()) defer etcdCli.Delete(ctx, sessKey, clientv3.WithPrefix()) s1 := sessionutil.Session{ diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 0cba1c3ca3..f62b6e2705 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -935,12 +935,12 @@ func (c *Core) SetEtcdClient(etcdClient *clientv3.Client) { } func (c *Core) initSession() error { - c.session = sessionutil.NewSession(c.ctx, Params.BaseParams.MetaRootPath, c.etcdCli) + c.session = sessionutil.NewSession(c.ctx, Params.EtcdCfg.MetaRootPath, c.etcdCli) if c.session == nil { return fmt.Errorf("session is nil, the etcd client connection may have failed") } c.session.Init(typeutil.RootCoordRole, Params.RootCoordCfg.Address, true, true) - Params.BaseParams.SetLogger(c.session.ServerID) + Params.SetLogger(c.session.ServerID) return nil } @@ -959,18 +959,18 @@ func (c *Core) Init() error { return } connectEtcdFn := func() error { - if c.kvBase, initError = c.kvBaseCreate(Params.BaseParams.KvRootPath); initError != nil { + if c.kvBase, initError = c.kvBaseCreate(Params.EtcdCfg.KvRootPath); initError != nil { log.Error("RootCoord failed to new EtcdKV", zap.Any("reason", initError)) return initError } var metaKV kv.TxnKV - metaKV, initError = c.kvBaseCreate(Params.BaseParams.MetaRootPath) + metaKV, initError = c.kvBaseCreate(Params.EtcdCfg.MetaRootPath) if initError != nil { log.Error("RootCoord failed to new EtcdKV", zap.Any("reason", initError)) return initError } var ss *suffixSnapshot - if ss, initError = newSuffixSnapshot(metaKV, "_ts", Params.BaseParams.MetaRootPath, "snapshots"); initError != nil { + if ss, initError = newSuffixSnapshot(metaKV, "_ts", Params.EtcdCfg.MetaRootPath, "snapshots"); initError != nil { log.Error("RootCoord failed to new suffixSnapshot", zap.Error(initError)) return initError } @@ -981,14 +981,14 @@ func (c *Core) Init() error { return nil } - log.Debug("RootCoord, Connecting to Etcd", zap.String("kv root", Params.BaseParams.KvRootPath), zap.String("meta root", Params.BaseParams.MetaRootPath)) + log.Debug("RootCoord, Connecting to Etcd", zap.String("kv root", Params.EtcdCfg.KvRootPath), zap.String("meta root", Params.EtcdCfg.MetaRootPath)) err := retry.Do(c.ctx, connectEtcdFn, retry.Attempts(300)) if err != nil { return } log.Debug("RootCoord, Setting TSO and ID Allocator") - kv := tsoutil.NewTSOKVBase(c.etcdCli, Params.BaseParams.KvRootPath, "gid") + kv := tsoutil.NewTSOKVBase(c.etcdCli, Params.EtcdCfg.KvRootPath, "gid") idAllocator := allocator.NewGlobalIDAllocator("idTimestamp", kv) if initError = idAllocator.Initialize(); initError != nil { return @@ -1000,7 +1000,7 @@ func (c *Core) Init() error { return idAllocator.UpdateID() } - kv = tsoutil.NewTSOKVBase(c.etcdCli, Params.BaseParams.KvRootPath, "tso") + kv = tsoutil.NewTSOKVBase(c.etcdCli, Params.EtcdCfg.KvRootPath, "tso") tsoAllocator := tso.NewGlobalTSOAllocator("timestamp", kv) if initError = tsoAllocator.Initialize(); initError != nil { return diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index 54933af8f7..3f0fc53fee 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -435,7 +435,7 @@ func TestRootCoordInit(t *testing.T) { Params.Init() Params.RootCoordCfg.DmlChannelNum = TestDMLChannelNum - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.NoError(t, err) defer etcdCli.Close() @@ -445,8 +445,8 @@ func TestRootCoordInit(t *testing.T) { core.SetEtcdClient(etcdCli) randVal := rand.Int() - Params.BaseParams.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.BaseParams.MetaRootPath) - Params.BaseParams.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.BaseParams.KvRootPath) + Params.EtcdCfg.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.EtcdCfg.MetaRootPath) + Params.EtcdCfg.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.EtcdCfg.KvRootPath) err = core.Init() assert.Nil(t, err) @@ -461,8 +461,8 @@ func TestRootCoordInit(t *testing.T) { assert.Nil(t, err) randVal = rand.Int() - Params.BaseParams.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.BaseParams.MetaRootPath) - Params.BaseParams.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.BaseParams.KvRootPath) + Params.EtcdCfg.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.EtcdCfg.MetaRootPath) + Params.EtcdCfg.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.EtcdCfg.KvRootPath) core.kvBaseCreate = func(string) (kv.TxnKV, error) { return nil, retry.Unrecoverable(errors.New("injected")) @@ -481,11 +481,11 @@ func TestRootCoordInit(t *testing.T) { assert.Nil(t, err) randVal = rand.Int() - Params.BaseParams.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.BaseParams.MetaRootPath) - Params.BaseParams.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.BaseParams.KvRootPath) + Params.EtcdCfg.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.EtcdCfg.MetaRootPath) + Params.EtcdCfg.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.EtcdCfg.KvRootPath) core.kvBaseCreate = func(root string) (kv.TxnKV, error) { - if root == Params.BaseParams.MetaRootPath { + if root == Params.EtcdCfg.MetaRootPath { return nil, retry.Unrecoverable(errors.New("injected")) } return memkv.NewMemoryKV(), nil @@ -504,8 +504,8 @@ func TestRootCoordInit(t *testing.T) { assert.Nil(t, err) randVal = rand.Int() - Params.BaseParams.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.BaseParams.MetaRootPath) - Params.BaseParams.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.BaseParams.KvRootPath) + Params.EtcdCfg.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.EtcdCfg.MetaRootPath) + Params.EtcdCfg.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.EtcdCfg.KvRootPath) core.kvBaseCreate = func(string) (kv.TxnKV, error) { return nil, nil @@ -524,8 +524,8 @@ func TestRootCoordInit(t *testing.T) { assert.Nil(t, err) randVal = rand.Int() - Params.BaseParams.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.BaseParams.MetaRootPath) - Params.BaseParams.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.BaseParams.KvRootPath) + Params.EtcdCfg.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.EtcdCfg.MetaRootPath) + Params.EtcdCfg.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.EtcdCfg.KvRootPath) core.kvBaseCreate = func(string) (kv.TxnKV, error) { kv := memkv.NewMemoryKV() @@ -561,17 +561,17 @@ func TestRootCoord(t *testing.T) { randVal := rand.Int() Params.MsgChannelCfg.RootCoordTimeTick = fmt.Sprintf("rootcoord-time-tick-%d", randVal) Params.MsgChannelCfg.RootCoordStatistics = fmt.Sprintf("rootcoord-statistics-%d", randVal) - Params.BaseParams.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.BaseParams.MetaRootPath) - Params.BaseParams.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.BaseParams.KvRootPath) + Params.EtcdCfg.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.EtcdCfg.MetaRootPath) + Params.EtcdCfg.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.EtcdCfg.KvRootPath) Params.MsgChannelCfg.RootCoordSubName = fmt.Sprintf("subname-%d", randVal) Params.MsgChannelCfg.RootCoordDml = fmt.Sprintf("rootcoord-dml-test-%d", randVal) Params.MsgChannelCfg.RootCoordDelta = fmt.Sprintf("rootcoord-delta-test-%d", randVal) - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.NoError(t, err) defer etcdCli.Close() - sessKey := path.Join(Params.BaseParams.MetaRootPath, sessionutil.DefaultServiceRoot) + sessKey := path.Join(Params.EtcdCfg.MetaRootPath, sessionutil.DefaultServiceRoot) _, err = etcdCli.Delete(ctx, sessKey, clientv3.WithPrefix()) assert.Nil(t, err) defer func() { @@ -2305,7 +2305,7 @@ func TestRootCoord2(t *testing.T) { core, err := NewCore(ctx, msFactory) assert.Nil(t, err) - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.Nil(t, err) defer etcdCli.Close() @@ -2313,8 +2313,8 @@ func TestRootCoord2(t *testing.T) { Params.MsgChannelCfg.RootCoordTimeTick = fmt.Sprintf("rootcoord-time-tick-%d", randVal) Params.MsgChannelCfg.RootCoordStatistics = fmt.Sprintf("rootcoord-statistics-%d", randVal) - Params.BaseParams.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.BaseParams.MetaRootPath) - Params.BaseParams.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.BaseParams.KvRootPath) + Params.EtcdCfg.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.EtcdCfg.MetaRootPath) + Params.EtcdCfg.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.EtcdCfg.KvRootPath) Params.MsgChannelCfg.RootCoordSubName = fmt.Sprintf("subname-%d", randVal) dm := &dataMock{randVal: randVal} @@ -2591,8 +2591,8 @@ func TestCheckFlushedSegments(t *testing.T) { Params.MsgChannelCfg.RootCoordTimeTick = fmt.Sprintf("rootcoord-time-tick-%d", randVal) Params.MsgChannelCfg.RootCoordStatistics = fmt.Sprintf("rootcoord-statistics-%d", randVal) - Params.BaseParams.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.BaseParams.MetaRootPath) - Params.BaseParams.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.BaseParams.KvRootPath) + Params.EtcdCfg.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.EtcdCfg.MetaRootPath) + Params.EtcdCfg.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.EtcdCfg.KvRootPath) Params.MsgChannelCfg.RootCoordSubName = fmt.Sprintf("subname-%d", randVal) dm := &dataMock{randVal: randVal} @@ -2620,7 +2620,7 @@ func TestCheckFlushedSegments(t *testing.T) { return nil, nil } - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.Nil(t, err) defer etcdCli.Close() core.SetEtcdClient(etcdCli) @@ -2757,8 +2757,8 @@ func TestRootCoord_CheckZeroShardsNum(t *testing.T) { randVal := rand.Int() Params.MsgChannelCfg.RootCoordTimeTick = fmt.Sprintf("rootcoord-time-tick-%d", randVal) Params.MsgChannelCfg.RootCoordStatistics = fmt.Sprintf("rootcoord-statistics-%d", randVal) - Params.BaseParams.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.BaseParams.MetaRootPath) - Params.BaseParams.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.BaseParams.KvRootPath) + Params.EtcdCfg.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.EtcdCfg.MetaRootPath) + Params.EtcdCfg.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.EtcdCfg.KvRootPath) Params.MsgChannelCfg.RootCoordSubName = fmt.Sprintf("subname-%d", randVal) dm := &dataMock{randVal: randVal} @@ -2786,7 +2786,7 @@ func TestRootCoord_CheckZeroShardsNum(t *testing.T) { return nil, nil } - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) assert.NoError(t, err) defer etcdCli.Close() diff --git a/internal/rootcoord/suffix_snapshot_test.go b/internal/rootcoord/suffix_snapshot_test.go index 93cd846a11..5847c226fc 100644 --- a/internal/rootcoord/suffix_snapshot_test.go +++ b/internal/rootcoord/suffix_snapshot_test.go @@ -263,7 +263,7 @@ func Test_SuffixSnapshotLoad(t *testing.T) { rootPath := fmt.Sprintf("/test/meta/%d", randVal) sep := "_ts" - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) require.Nil(t, err) defer etcdCli.Close() etcdkv := etcdkv.NewEtcdKV(etcdCli, rootPath) @@ -315,7 +315,7 @@ func Test_SuffixSnapshotMultiSave(t *testing.T) { Params.Init() rootPath := fmt.Sprintf("/test/meta/%d", randVal) sep := "_ts" - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) require.Nil(t, err) defer etcdCli.Close() etcdkv := etcdkv.NewEtcdKV(etcdCli, rootPath) @@ -391,7 +391,7 @@ func Test_SuffixSnapshotMultiSaveAndRemoveWithPrefix(t *testing.T) { rootPath := fmt.Sprintf("/test/meta/%d", randVal) sep := "_ts" - etcdCli, err := etcd.GetEtcdClient(&Params.BaseParams) + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) require.Nil(t, err) defer etcdCli.Close() etcdkv := etcdkv.NewEtcdKV(etcdCli, rootPath) diff --git a/internal/rootcoord/timestamp_test.go b/internal/rootcoord/timestamp_test.go index 6124f45224..dc3816e9e7 100644 --- a/internal/rootcoord/timestamp_test.go +++ b/internal/rootcoord/timestamp_test.go @@ -90,8 +90,8 @@ func BenchmarkAllocTimestamp(b *testing.B) { Params.MsgChannelCfg.RootCoordTimeTick = fmt.Sprintf("master-time-tick-%d", randVal) Params.MsgChannelCfg.RootCoordStatistics = fmt.Sprintf("master-statistics-%d", randVal) - Params.BaseParams.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.BaseParams.MetaRootPath) - Params.BaseParams.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.BaseParams.KvRootPath) + Params.EtcdCfg.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.EtcdCfg.MetaRootPath) + Params.EtcdCfg.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.EtcdCfg.KvRootPath) Params.MsgChannelCfg.RootCoordSubName = fmt.Sprintf("subname-%d", randVal) err = core.SetDataCoord(ctx, &tbd{}) diff --git a/internal/util/etcd/etcd_util.go b/internal/util/etcd/etcd_util.go index 8812bb2910..07c88abe67 100644 --- a/internal/util/etcd/etcd_util.go +++ b/internal/util/etcd/etcd_util.go @@ -31,10 +31,10 @@ import ( var EtcdServer *embed.Etcd // InitEtcdServer initializes embedded etcd server singleton. -func InitEtcdServer(pt *paramtable.BaseParamTable) error { - if pt.UseEmbedEtcd { - path := pt.EtcdConfigPath - fmt.Println("path", path, "data", pt.EtcdDataDir) +func InitEtcdServer(etcdCfg *paramtable.EtcdConfig) error { + if etcdCfg.UseEmbedEtcd { + path := etcdCfg.ConfigPath + fmt.Println("path", path, "data", etcdCfg.DataDir) var cfg *embed.Config if len(path) > 0 { cfgFromFile, err := embed.ConfigFromFile(path) @@ -45,7 +45,7 @@ func InitEtcdServer(pt *paramtable.BaseParamTable) error { } else { cfg = embed.NewConfig() } - cfg.Dir = pt.EtcdDataDir + cfg.Dir = etcdCfg.DataDir e, err := embed.StartEtcd(cfg) if err != nil { return err @@ -64,11 +64,11 @@ func StopEtcdServer() { } // GetEtcdClient returns etcd client -func GetEtcdClient(pt *paramtable.BaseParamTable) (*clientv3.Client, error) { - if pt.UseEmbedEtcd { +func GetEtcdClient(cfg *paramtable.EtcdConfig) (*clientv3.Client, error) { + if cfg.UseEmbedEtcd { return GetEmbedEtcdClient() } - return GetRemoteEtcdClient(pt.EtcdEndpoints) + return GetRemoteEtcdClient(cfg.Endpoints) } // GetEmbedEtcdClient returns client of embed etcd server diff --git a/internal/util/etcd/etcd_util_test.go b/internal/util/etcd/etcd_util_test.go index f3fbb7955d..4671431d52 100644 --- a/internal/util/etcd/etcd_util_test.go +++ b/internal/util/etcd/etcd_util_test.go @@ -26,22 +26,22 @@ import ( "github.com/stretchr/testify/assert" ) -var Params paramtable.GlobalParamTable +var Params paramtable.BaseParamTable func TestEtcd(t *testing.T) { Params.Init() - Params.BaseParams.UseEmbedEtcd = true - Params.BaseParams.EtcdDataDir = "/tmp/data" - err := InitEtcdServer(&Params.BaseParams) + Params.EtcdCfg.UseEmbedEtcd = true + Params.EtcdCfg.DataDir = "/tmp/data" + err := InitEtcdServer(&Params.EtcdCfg) assert.NoError(t, err) - defer os.RemoveAll(Params.BaseParams.EtcdDataDir) + defer os.RemoveAll(Params.EtcdCfg.DataDir) defer StopEtcdServer() // port is binded - err = InitEtcdServer(&Params.BaseParams) + err = InitEtcdServer(&Params.EtcdCfg) assert.Error(t, err) - etcdCli, err := GetEtcdClient(&Params.BaseParams) + etcdCli, err := GetEtcdClient(&Params.EtcdCfg) assert.NoError(t, err) key := path.Join("test", "test") diff --git a/internal/util/paramtable/base_param.go b/internal/util/paramtable/base_param.go index 45b497375d..70e7ac934d 100644 --- a/internal/util/paramtable/base_param.go +++ b/internal/util/paramtable/base_param.go @@ -19,8 +19,8 @@ package paramtable import ( "os" "path" + "strconv" "strings" - "sync" "github.com/milvus-io/milvus/internal/util/metricsinfo" ) @@ -29,90 +29,234 @@ import ( // embedding BaseTable. It is used to quickly and easily access the system configuration. type BaseParamTable struct { BaseTable - once sync.Once - // --- ETCD --- - EtcdEndpoints []string - MetaRootPath string - KvRootPath string - - // --- Embed ETCD --- - UseEmbedEtcd bool - EtcdConfigPath string - EtcdDataDir string + EtcdCfg EtcdConfig + PulsarCfg PulsarConfig + RocksmqCfg RocksmqConfig + MinioCfg MinioConfig } // Init is an override method of BaseTable's Init. It mainly calls the // Init of BaseTable and do some other initialization. func (p *BaseParamTable) Init() { - p.once.Do(func() { - p.BaseTable.Init() - p.LoadCfgToMemory() - }) + p.BaseTable.Init() + + p.EtcdCfg.init(&p.BaseTable) + p.PulsarCfg.init(&p.BaseTable) + p.RocksmqCfg.init(&p.BaseTable) + p.MinioCfg.init(&p.BaseTable) } -// LoadCfgToMemory loads configurations from file into memory. -func (p *BaseParamTable) LoadCfgToMemory() { - p.initEtcdConf() +/////////////////////////////////////////////////////////////////////////////// +// --- etcd --- +type EtcdConfig struct { + Base *BaseTable + + // --- ETCD --- + Endpoints []string + MetaRootPath string + KvRootPath string + + // --- Embed ETCD --- + UseEmbedEtcd bool + ConfigPath string + DataDir string +} + +func (p *EtcdConfig) init(base *BaseTable) { + p.Base = base + p.LoadCfgToMemory() +} + +func (p *EtcdConfig) LoadCfgToMemory() { + p.initUseEmbedEtcd() + if p.UseEmbedEtcd { + p.initConfigPath() + p.initDataDir() + } else { + p.initEndpoints() + } p.initMetaRootPath() p.initKvRootPath() } -func (p *BaseParamTable) initEtcdConf() { - p.initUseEmbedEtcd() - if p.UseEmbedEtcd { - p.initConfigPath() - p.initEtcdDataDir() - } else { - p.initEtcdEndpoints() - } -} - -func (p *BaseParamTable) initUseEmbedEtcd() { - p.UseEmbedEtcd = p.ParseBool("etcd.use.embed", false) +func (p *EtcdConfig) initUseEmbedEtcd() { + p.UseEmbedEtcd = p.Base.ParseBool("etcd.use.embed", false) if p.UseEmbedEtcd && (os.Getenv(metricsinfo.DeployModeEnvKey) != metricsinfo.StandaloneDeployMode) { panic("embedded etcd can not be used under distributed mode") } } -func (p *BaseParamTable) initConfigPath() { - addr := p.LoadWithDefault("etcd.config.path", "") - p.EtcdConfigPath = addr +func (p *EtcdConfig) initConfigPath() { + addr := p.Base.LoadWithDefault("etcd.config.path", "") + p.ConfigPath = addr } -func (p *BaseParamTable) initEtcdDataDir() { - addr := p.LoadWithDefault("etcd.data.dir", "default.etcd") - p.EtcdDataDir = addr +func (p *EtcdConfig) initDataDir() { + addr := p.Base.LoadWithDefault("etcd.data.dir", "default.etcd") + p.DataDir = addr } -func (p *BaseParamTable) initEtcdEndpoints() { - endpoints, err := p.Load("_EtcdEndpoints") +func (p *EtcdConfig) initEndpoints() { + endpoints, err := p.Base.Load("_EtcdEndpoints") if err != nil { panic(err) } - p.EtcdEndpoints = strings.Split(endpoints, ",") + p.Endpoints = strings.Split(endpoints, ",") } -func (p *BaseParamTable) initMetaRootPath() { - rootPath, err := p.Load("etcd.rootPath") +func (p *EtcdConfig) initMetaRootPath() { + rootPath, err := p.Base.Load("etcd.rootPath") if err != nil { panic(err) } - subPath, err := p.Load("etcd.metaSubPath") + subPath, err := p.Base.Load("etcd.metaSubPath") if err != nil { panic(err) } p.MetaRootPath = path.Join(rootPath, subPath) } -func (p *BaseParamTable) initKvRootPath() { - rootPath, err := p.Load("etcd.rootPath") +func (p *EtcdConfig) initKvRootPath() { + rootPath, err := p.Base.Load("etcd.rootPath") if err != nil { panic(err) } - subPath, err := p.Load("etcd.kvSubPath") + subPath, err := p.Base.Load("etcd.kvSubPath") if err != nil { panic(err) } p.KvRootPath = path.Join(rootPath, subPath) } + +/////////////////////////////////////////////////////////////////////////////// +// --- pulsar --- +type PulsarConfig struct { + Base *BaseTable + + Address string + MaxMessageSize int +} + +func (p *PulsarConfig) init(base *BaseTable) { + p.Base = base + + p.initAddress() + p.initMaxMessageSize() +} + +func (p *PulsarConfig) initAddress() { + addr, err := p.Base.Load("_PulsarAddress") + if err != nil { + panic(err) + } + p.Address = addr +} + +func (p *PulsarConfig) initMaxMessageSize() { + maxMessageSizeStr, err := p.Base.Load("pulsar.maxMessageSize") + if err != nil { + p.MaxMessageSize = SuggestPulsarMaxMessageSize + } else { + maxMessageSize, err := strconv.Atoi(maxMessageSizeStr) + if err != nil { + p.MaxMessageSize = SuggestPulsarMaxMessageSize + } else { + p.MaxMessageSize = maxMessageSize + } + } +} + +/////////////////////////////////////////////////////////////////////////////// +// --- rocksmq --- +type RocksmqConfig struct { + Base *BaseTable + + Path string +} + +func (p *RocksmqConfig) init(base *BaseTable) { + p.Base = base + + p.initPath() +} + +func (p *RocksmqConfig) initPath() { + path, err := p.Base.Load("_RocksmqPath") + if err != nil { + panic(err) + } + p.Path = path +} + +/////////////////////////////////////////////////////////////////////////////// +// --- minio --- +type MinioConfig struct { + Base *BaseTable + + Address string + AccessKeyID string + SecretAccessKey string + UseSSL bool + BucketName string + RootPath string +} + +func (p *MinioConfig) init(base *BaseTable) { + p.Base = base + + p.initAddress() + p.initAccessKeyID() + p.initSecretAccessKey() + p.initUseSSL() + p.initBucketName() + p.initRootPath() +} + +func (p *MinioConfig) initAddress() { + endpoint, err := p.Base.Load("_MinioAddress") + if err != nil { + panic(err) + } + p.Address = endpoint +} + +func (p *MinioConfig) initAccessKeyID() { + keyID, err := p.Base.Load("_MinioAccessKeyID") + if err != nil { + panic(err) + } + p.AccessKeyID = keyID +} + +func (p *MinioConfig) initSecretAccessKey() { + key, err := p.Base.Load("_MinioSecretAccessKey") + if err != nil { + panic(err) + } + p.SecretAccessKey = key +} + +func (p *MinioConfig) initUseSSL() { + usessl, err := p.Base.Load("_MinioUseSSL") + if err != nil { + panic(err) + } + p.UseSSL, _ = strconv.ParseBool(usessl) +} + +func (p *MinioConfig) initBucketName() { + bucketName, err := p.Base.Load("_MinioBucketName") + if err != nil { + panic(err) + } + p.BucketName = bucketName +} + +func (p *MinioConfig) initRootPath() { + rootPath, err := p.Base.Load("minio.rootPath") + if err != nil { + panic(err) + } + p.RootPath = rootPath +} diff --git a/internal/util/paramtable/base_param_test.go b/internal/util/paramtable/base_param_test.go index 090bdab88a..6bb72d8e49 100644 --- a/internal/util/paramtable/base_param_test.go +++ b/internal/util/paramtable/base_param_test.go @@ -20,23 +20,62 @@ import ( ) func TestBaseParamTable(t *testing.T) { - var Params BaseParamTable - Params.Init() + var BaseParams BaseParamTable + BaseParams.Init() - assert.NotZero(t, len(Params.EtcdEndpoints)) - t.Logf("etcd endpoints = %s", Params.EtcdEndpoints) + t.Run("test etcdConfig", func(t *testing.T) { + Params := BaseParams.EtcdCfg - assert.NotEqual(t, Params.MetaRootPath, "") - t.Logf("meta root path = %s", Params.MetaRootPath) + assert.NotZero(t, len(Params.Endpoints)) + t.Logf("etcd endpoints = %s", Params.Endpoints) - assert.NotEqual(t, Params.KvRootPath, "") - t.Logf("kv root path = %s", Params.KvRootPath) + assert.NotEqual(t, Params.MetaRootPath, "") + t.Logf("meta root path = %s", Params.MetaRootPath) - // test UseEmbedEtcd - Params.Save("etcd.use.embed", "true") - assert.Nil(t, os.Setenv(metricsinfo.DeployModeEnvKey, metricsinfo.ClusterDeployMode)) - assert.Panics(t, func() { Params.initUseEmbedEtcd() }) + assert.NotEqual(t, Params.KvRootPath, "") + t.Logf("kv root path = %s", Params.KvRootPath) - assert.Nil(t, os.Setenv(metricsinfo.DeployModeEnvKey, metricsinfo.StandaloneDeployMode)) - Params.LoadCfgToMemory() + // test UseEmbedEtcd + Params.Base.Save("etcd.use.embed", "true") + assert.Nil(t, os.Setenv(metricsinfo.DeployModeEnvKey, metricsinfo.ClusterDeployMode)) + assert.Panics(t, func() { Params.initUseEmbedEtcd() }) + + assert.Nil(t, os.Setenv(metricsinfo.DeployModeEnvKey, metricsinfo.StandaloneDeployMode)) + Params.LoadCfgToMemory() + }) + + t.Run("test pulsarConfig", func(t *testing.T) { + Params := BaseParams.PulsarCfg + + assert.NotEqual(t, Params.Address, "") + t.Logf("pulsar address = %s", Params.Address) + + assert.Equal(t, Params.MaxMessageSize, SuggestPulsarMaxMessageSize) + }) + + t.Run("test rocksmqConfig", func(t *testing.T) { + Params := BaseParams.RocksmqCfg + + assert.NotEqual(t, Params.Path, "") + t.Logf("rocksmq path = %s", Params.Path) + }) + + t.Run("test minioConfig", func(t *testing.T) { + Params := BaseParams.MinioCfg + + addr := Params.Address + equal := addr == "localhost:9000" || addr == "minio:9000" + assert.Equal(t, equal, true) + t.Logf("minio address = %s", Params.Address) + + assert.Equal(t, Params.AccessKeyID, "minioadmin") + + assert.Equal(t, Params.SecretAccessKey, "minioadmin") + + assert.Equal(t, Params.UseSSL, false) + + t.Logf("Minio BucketName = %s", Params.BucketName) + + t.Logf("Minio rootpath = %s", Params.RootPath) + }) } diff --git a/internal/util/paramtable/global_param.go b/internal/util/paramtable/global_param.go index 29d9ca7a80..cea0b20157 100644 --- a/internal/util/paramtable/global_param.go +++ b/internal/util/paramtable/global_param.go @@ -48,12 +48,8 @@ const ( // GlobalParamTable is a derived struct of BaseParamTable. // It is used to quickly and easily access global system configuration. type GlobalParamTable struct { - once sync.Once - BaseParams BaseParamTable - - PulsarCfg pulsarConfig - RocksmqCfg rocksmqConfig - MinioCfg minioConfig + BaseParamTable + once sync.Once CommonCfg commonConfig KnowhereCfg knowhereConfig @@ -78,162 +74,26 @@ func (p *GlobalParamTable) InitOnce() { // Init initialize the global param table func (p *GlobalParamTable) Init() { - p.BaseParams.Init() + p.BaseParamTable.Init() - p.PulsarCfg.init(&p.BaseParams) - p.RocksmqCfg.init(&p.BaseParams) - p.MinioCfg.init(&p.BaseParams) + p.CommonCfg.init(&p.BaseParamTable) + p.KnowhereCfg.init(&p.BaseParamTable) + p.MsgChannelCfg.init(&p.BaseParamTable) - p.CommonCfg.init(&p.BaseParams) - p.KnowhereCfg.init(&p.BaseParams) - p.MsgChannelCfg.init(&p.BaseParams) - - p.RootCoordCfg.init(&p.BaseParams) - p.ProxyCfg.init(&p.BaseParams) - p.QueryCoordCfg.init(&p.BaseParams) - p.QueryNodeCfg.init(&p.BaseParams) - p.DataCoordCfg.init(&p.BaseParams) - p.DataNodeCfg.init(&p.BaseParams) - p.IndexCoordCfg.init(&p.BaseParams) - p.IndexNodeCfg.init(&p.BaseParams) + p.RootCoordCfg.init(&p.BaseParamTable) + p.ProxyCfg.init(&p.BaseParamTable) + p.QueryCoordCfg.init(&p.BaseParamTable) + p.QueryNodeCfg.init(&p.BaseParamTable) + p.DataCoordCfg.init(&p.BaseParamTable) + p.DataNodeCfg.init(&p.BaseParamTable) + p.IndexCoordCfg.init(&p.BaseParamTable) + p.IndexNodeCfg.init(&p.BaseParamTable) } // SetLogConfig set log config with given role func (p *GlobalParamTable) SetLogConfig(role string) { - p.BaseParams.RoleName = role - p.BaseParams.SetLogConfig() -} - -/////////////////////////////////////////////////////////////////////////////// -// --- pulsar --- -type pulsarConfig struct { - BaseParams *BaseParamTable - - Address string - MaxMessageSize int -} - -func (p *pulsarConfig) init(bp *BaseParamTable) { - p.BaseParams = bp - - p.initAddress() - p.initMaxMessageSize() -} - -func (p *pulsarConfig) initAddress() { - addr, err := p.BaseParams.Load("_PulsarAddress") - if err != nil { - panic(err) - } - p.Address = addr -} - -func (p *pulsarConfig) initMaxMessageSize() { - maxMessageSizeStr, err := p.BaseParams.Load("pulsar.maxMessageSize") - if err != nil { - p.MaxMessageSize = SuggestPulsarMaxMessageSize - } else { - maxMessageSize, err := strconv.Atoi(maxMessageSizeStr) - if err != nil { - p.MaxMessageSize = SuggestPulsarMaxMessageSize - } else { - p.MaxMessageSize = maxMessageSize - } - } -} - -/////////////////////////////////////////////////////////////////////////////// -// --- rocksmq --- -type rocksmqConfig struct { - BaseParams *BaseParamTable - - Path string -} - -func (p *rocksmqConfig) init(bp *BaseParamTable) { - p.BaseParams = bp - - p.initPath() -} - -func (p *rocksmqConfig) initPath() { - path, err := p.BaseParams.Load("_RocksmqPath") - if err != nil { - panic(err) - } - p.Path = path -} - -/////////////////////////////////////////////////////////////////////////////// -// --- minio --- -type minioConfig struct { - BaseParams *BaseParamTable - - Address string - AccessKeyID string - SecretAccessKey string - UseSSL bool - BucketName string - RootPath string -} - -func (p *minioConfig) init(bp *BaseParamTable) { - p.BaseParams = bp - - p.initAddress() - p.initAccessKeyID() - p.initSecretAccessKey() - p.initUseSSL() - p.initBucketName() - p.initRootPath() -} - -func (p *minioConfig) initAddress() { - endpoint, err := p.BaseParams.Load("_MinioAddress") - if err != nil { - panic(err) - } - p.Address = endpoint -} - -func (p *minioConfig) initAccessKeyID() { - keyID, err := p.BaseParams.Load("_MinioAccessKeyID") - if err != nil { - panic(err) - } - p.AccessKeyID = keyID -} - -func (p *minioConfig) initSecretAccessKey() { - key, err := p.BaseParams.Load("_MinioSecretAccessKey") - if err != nil { - panic(err) - } - p.SecretAccessKey = key -} - -func (p *minioConfig) initUseSSL() { - usessl, err := p.BaseParams.Load("_MinioUseSSL") - if err != nil { - panic(err) - } - p.UseSSL, _ = strconv.ParseBool(usessl) -} - -func (p *minioConfig) initBucketName() { - bucketName, err := p.BaseParams.Load("_MinioBucketName") - if err != nil { - panic(err) - } - p.BucketName = bucketName -} - -func (p *minioConfig) initRootPath() { - rootPath, err := p.BaseParams.Load("minio.rootPath") - if err != nil { - panic(err) - } - p.RootPath = rootPath + p.BaseTable.RoleName = role + p.BaseTable.SetLogConfig() } /////////////////////////////////////////////////////////////////////////////// @@ -302,7 +162,6 @@ type msgChannelConfig struct { QueryCoordTimeTick string QueryNodeStats string - DataCoordInsert string DataCoordStatistic string DataCoordTimeTick string DataCoordSegmentInfo string @@ -326,7 +185,6 @@ func (p *msgChannelConfig) init(bp *BaseParamTable) { p.initQueryCoordTimeTick() p.initQueryNodeStats() - p.initDataCoordInsert() p.initDataCoordStatistic() p.initDataCoordTimeTick() p.initDataCoordSegmentInfo() @@ -390,10 +248,6 @@ func (p *msgChannelConfig) initQueryNodeStats() { } // --- datacoord --- -func (p *msgChannelConfig) initDataCoordInsert() { - p.DataCoordInsert = p.initChanNamePrefix("msgChannel.chanNamePrefix.dataCoordInsertChannel") -} - func (p *msgChannelConfig) initDataCoordStatistic() { p.DataCoordStatistic = p.initChanNamePrefix("msgChannel.chanNamePrefix.dataCoordStatistic") } diff --git a/internal/util/paramtable/global_param_test.go b/internal/util/paramtable/global_param_test.go index 186046fa3e..c34cae40e8 100644 --- a/internal/util/paramtable/global_param_test.go +++ b/internal/util/paramtable/global_param_test.go @@ -32,41 +32,6 @@ func TestGlobalParamTable(t *testing.T) { var GlobalParams GlobalParamTable GlobalParams.Init() - t.Run("test pulsarConfig", func(t *testing.T) { - Params := GlobalParams.PulsarCfg - - assert.NotEqual(t, Params.Address, "") - t.Logf("pulsar address = %s", Params.Address) - - assert.Equal(t, Params.MaxMessageSize, SuggestPulsarMaxMessageSize) - }) - - t.Run("test rocksmqConfig", func(t *testing.T) { - Params := GlobalParams.RocksmqCfg - - assert.NotEqual(t, Params.Path, "") - t.Logf("rocksmq path = %s", Params.Path) - }) - - t.Run("test minioConfig", func(t *testing.T) { - Params := GlobalParams.MinioCfg - - addr := Params.Address - equal := addr == "localhost:9000" || addr == "minio:9000" - assert.Equal(t, equal, true) - t.Logf("minio address = %s", Params.Address) - - assert.Equal(t, Params.AccessKeyID, "minioadmin") - - assert.Equal(t, Params.SecretAccessKey, "minioadmin") - - assert.Equal(t, Params.UseSSL, false) - - t.Logf("Minio BucketName = %s", Params.BucketName) - - t.Logf("Minio rootpath = %s", Params.RootPath) - }) - t.Run("test commonConfig", func(t *testing.T) { Params := GlobalParams.CommonCfg @@ -120,9 +85,6 @@ func TestGlobalParamTable(t *testing.T) { t.Logf("querynode stats channel = %s", Params.QueryNodeStats) // -- datacoord -- - assert.Equal(t, Params.DataCoordInsert, "by-dev-insert-channel-") - t.Logf("datacoord insert channel = %s", Params.DataCoordInsert) - assert.Equal(t, Params.DataCoordTimeTick, "by-dev-datacoord-timetick-channel") t.Logf("datacoord timetick channel = %s", Params.DataCoordTimeTick)