diff --git a/cmd/milvus/mck.go b/cmd/milvus/mck.go index d4c3c626df..73502896d2 100644 --- a/cmd/milvus/mck.go +++ b/cmd/milvus/mck.go @@ -220,7 +220,14 @@ func (c *mck) connectEctd() { if c.etcdIP != "" { etcdCli, err = etcd.GetRemoteEtcdClient([]string{c.etcdIP}) } else { - etcdCli, err = etcd.GetEtcdClient(&c.params.EtcdCfg) + etcdCli, err = etcd.GetEtcdClient( + c.params.EtcdCfg.UseEmbedEtcd, + c.params.EtcdCfg.EtcdUseSSL, + c.params.EtcdCfg.Endpoints, + c.params.EtcdCfg.EtcdTLSCert, + c.params.EtcdCfg.EtcdTLSKey, + c.params.EtcdCfg.EtcdTLSCACert, + c.params.EtcdCfg.EtcdTLSMinVersion) } if err != nil { log.Fatal("failed to connect to etcd", zap.Error(err)) diff --git a/cmd/roles/roles.go b/cmd/roles/roles.go index d68eb00ac1..167dae3193 100644 --- a/cmd/roles/roles.go +++ b/cmd/roles/roles.go @@ -214,7 +214,12 @@ func (mr *MilvusRoles) Run(local bool, alias string) { if Params.EtcdCfg.UseEmbedEtcd { // Start etcd server. - etcd.InitEtcdServer(&Params.EtcdCfg) + etcd.InitEtcdServer( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.ConfigPath, + Params.EtcdCfg.DataDir, + Params.EtcdCfg.EtcdLogPath, + Params.EtcdCfg.EtcdLogLevel) defer etcd.StopEtcdServer() } } else { diff --git a/cmd/tools/migration/backend/etcd.go b/cmd/tools/migration/backend/etcd.go index 8f6aeaeed0..17216781cf 100644 --- a/cmd/tools/migration/backend/etcd.go +++ b/cmd/tools/migration/backend/etcd.go @@ -19,7 +19,14 @@ func (b etcdBasedBackend) CleanWithPrefix(prefix string) error { } func newEtcdBasedBackend(cfg *configs.MilvusConfig) (*etcdBasedBackend, error) { - etcdCli, err := etcd.GetEtcdClient(cfg.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + cfg.EtcdCfg.UseEmbedEtcd, + cfg.EtcdCfg.EtcdUseSSL, + cfg.EtcdCfg.Endpoints, + cfg.EtcdCfg.EtcdTLSCert, + cfg.EtcdCfg.EtcdTLSKey, + cfg.EtcdCfg.EtcdTLSCACert, + cfg.EtcdCfg.EtcdTLSMinVersion) if err != nil { return nil, err } diff --git a/cmd/tools/migration/migration/runner.go b/cmd/tools/migration/migration/runner.go index df106cab20..8d87078f81 100644 --- a/cmd/tools/migration/migration/runner.go +++ b/cmd/tools/migration/migration/runner.go @@ -73,7 +73,14 @@ func (r *Runner) WatchSessions() { } func (r *Runner) initEtcdCli() { - cli, err := etcd.GetEtcdClient(r.cfg.EtcdCfg) + cli, err := etcd.GetEtcdClient( + r.cfg.EtcdCfg.UseEmbedEtcd, + r.cfg.EtcdCfg.EtcdUseSSL, + r.cfg.EtcdCfg.Endpoints, + r.cfg.EtcdCfg.EtcdTLSCert, + r.cfg.EtcdCfg.EtcdTLSKey, + r.cfg.EtcdCfg.EtcdTLSCACert, + r.cfg.EtcdCfg.EtcdTLSMinVersion) console.AbnormalExitIf(err, r.backupFinished.Load()) r.etcdCli = cli } diff --git a/internal/config/etcd_source.go b/internal/config/etcd_source.go index c9b8331845..14dee1c1bf 100644 --- a/internal/config/etcd_source.go +++ b/internal/config/etcd_source.go @@ -24,6 +24,7 @@ import ( "time" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/util/etcd" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" ) @@ -46,23 +47,28 @@ type EtcdSource struct { eh EventHandler } -func NewEtcdSource(remoteInfo *EtcdInfo) (*EtcdSource, error) { - etcdCli, err := clientv3.New(clientv3.Config{ - Endpoints: remoteInfo.Endpoints, - DialTimeout: 5 * time.Second, - }) +func NewEtcdSource(etcdInfo *EtcdInfo) (*EtcdSource, error) { + etcdCli, err := etcd.GetEtcdClient( + etcdInfo.UseEmbed, + etcdInfo.UseSSL, + etcdInfo.Endpoints, + etcdInfo.CertFile, + etcdInfo.KeyFile, + etcdInfo.CaCertFile, + etcdInfo.MinVersion) if err != nil { return nil, err } - return &EtcdSource{ + es := &EtcdSource{ etcdCli: etcdCli, ctx: context.Background(), currentConfig: make(map[string]string), - keyPrefix: remoteInfo.KeyPrefix, - refreshMode: remoteInfo.RefreshMode, - refreshInterval: remoteInfo.RefreshInterval, + keyPrefix: etcdInfo.KeyPrefix, + refreshMode: etcdInfo.RefreshMode, + refreshInterval: etcdInfo.RefreshInterval, intervalDone: make(chan bool, 1), - }, nil + } + return es, nil } // GetConfigurationByKey implements ConfigSource diff --git a/internal/config/source.go b/internal/config/source.go index f7c491346e..fad085d072 100644 --- a/internal/config/source.go +++ b/internal/config/source.go @@ -38,8 +38,14 @@ type EventHandler interface { // EtcdInfo has attribute for config center source initialization type EtcdInfo struct { - Endpoints []string - KeyPrefix string + UseEmbed bool + UseSSL bool + Endpoints []string + KeyPrefix string + CertFile string + KeyFile string + CaCertFile string + MinVersion string RefreshMode int //Pull Configuration interval, unit is second diff --git a/internal/datacoord/garbage_collector_test.go b/internal/datacoord/garbage_collector_test.go index 30394b77d9..d0d93faaa6 100644 --- a/internal/datacoord/garbage_collector_test.go +++ b/internal/datacoord/garbage_collector_test.go @@ -48,7 +48,14 @@ func Test_garbageCollector_basic(t *testing.T) { meta, err := newMemoryMeta() assert.Nil(t, err) - etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) assert.Nil(t, err) etcdKV := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) segRefer, err := NewSegmentReferenceManager(etcdKV, nil) @@ -110,7 +117,14 @@ func Test_garbageCollector_scan(t *testing.T) { meta, err := newMemoryMeta() assert.Nil(t, err) - etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) assert.Nil(t, err) etcdKV := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) segRefer, err := NewSegmentReferenceManager(etcdKV, nil) diff --git a/internal/datacoord/segment_reference_manager_test.go b/internal/datacoord/segment_reference_manager_test.go index ee36b9ccaa..856d914424 100644 --- a/internal/datacoord/segment_reference_manager_test.go +++ b/internal/datacoord/segment_reference_manager_test.go @@ -36,7 +36,14 @@ func Test_SegmentReferenceManager(t *testing.T) { var segRefer *SegmentReferenceManager var err error Params.Init() - etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) assert.Nil(t, err) etcdKV := etcdkv.NewEtcdKV(etcdCli, "unittest") diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 18b218a4e0..ec68a0adf3 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -913,7 +913,14 @@ func TestService_WatchServices(t *testing.T) { func TestServer_watchQueryCoord(t *testing.T) { Params.Init() - etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) assert.Nil(t, err) etcdKV := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) assert.NotNil(t, etcdKV) @@ -3318,7 +3325,14 @@ func newTestServer(t *testing.T, receiveCh chan any, opts ...Option) *Server { Params.CommonCfg.DataCoordTimeTick = Params.CommonCfg.DataCoordTimeTick + strconv.Itoa(rand.Int()) factory := dependency.NewDefaultFactory(true) - etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) assert.Nil(t, err) sessKey := path.Join(Params.EtcdCfg.MetaRootPath, sessionutil.DefaultServiceRoot) _, err = etcdCli.Delete(context.Background(), sessKey, clientv3.WithPrefix()) @@ -3357,7 +3371,14 @@ func newTestServerWithMeta(t *testing.T, receiveCh chan any, meta *meta, opts .. Params.CommonCfg.DataCoordTimeTick = Params.CommonCfg.DataCoordTimeTick + strconv.Itoa(rand.Int()) factory := dependency.NewDefaultFactory(true) - etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) assert.Nil(t, err) sessKey := path.Join(Params.EtcdCfg.MetaRootPath, sessionutil.DefaultServiceRoot) _, err = etcdCli.Delete(context.Background(), sessKey, clientv3.WithPrefix()) @@ -3405,7 +3426,14 @@ func newTestServer2(t *testing.T, receiveCh chan any, opts ...Option) *Server { Params.CommonCfg.DataCoordTimeTick = Params.CommonCfg.DataCoordTimeTick + strconv.Itoa(rand.Int()) factory := dependency.NewDefaultFactory(true) - etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) assert.Nil(t, err) sessKey := path.Join(Params.EtcdCfg.MetaRootPath, sessionutil.DefaultServiceRoot) _, err = etcdCli.Delete(context.Background(), sessKey, clientv3.WithPrefix()) @@ -3599,7 +3627,14 @@ func testDataCoordBase(t *testing.T, opts ...Option) *Server { Params.CommonCfg.DataCoordTimeTick = Params.CommonCfg.DataCoordTimeTick + strconv.Itoa(rand.Int()) factory := dependency.NewDefaultFactory(true) - etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) assert.Nil(t, err) sessKey := path.Join(Params.EtcdCfg.MetaRootPath, sessionutil.DefaultServiceRoot) _, err = etcdCli.Delete(context.Background(), sessKey, clientv3.WithPrefix()) diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index 509ed5703c..1aa9a278b4 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -83,7 +83,14 @@ func TestDataNode(t *testing.T) { defer cancel() node := newIDLEDataNodeMock(ctx, schemapb.DataType_Int64) - etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) assert.Nil(t, err) defer etcdCli.Close() node.SetEtcdClient(etcdCli) @@ -654,7 +661,14 @@ func TestDataNode(t *testing.T) { chanName := "fake-by-dev-rootcoord-dml-test-syncsegments-1" node := newIDLEDataNodeMock(ctx, schemapb.DataType_Int64) - etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) assert.Nil(t, err) defer etcdCli.Close() node.SetEtcdClient(etcdCli) @@ -747,7 +761,14 @@ func TestDataNode_AddSegment(t *testing.T) { defer cancel() node := newIDLEDataNodeMock(ctx, schemapb.DataType_Int64) - etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) assert.Nil(t, err) defer etcdCli.Close() node.SetEtcdClient(etcdCli) @@ -816,7 +837,14 @@ func TestDataNode_AddSegment(t *testing.T) { func TestWatchChannel(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) node := newIDLEDataNodeMock(ctx, schemapb.DataType_Int64) - etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) assert.Nil(t, err) defer etcdCli.Close() node.SetEtcdClient(etcdCli) @@ -1087,7 +1115,14 @@ func TestDataNode_GetComponentStates(t *testing.T) { } func TestDataNode_ResendSegmentStats(t *testing.T) { - etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) assert.Nil(t, err) defer etcdCli.Close() dmChannelName := "fake-by-dev-rootcoord-dml-channel-test-ResendSegmentStats" diff --git a/internal/datanode/flow_graph_manager_test.go b/internal/datanode/flow_graph_manager_test.go index bd48f0559c..76bc424e6c 100644 --- a/internal/datanode/flow_graph_manager_test.go +++ b/internal/datanode/flow_graph_manager_test.go @@ -33,7 +33,14 @@ func TestFlowGraphManager(t *testing.T) { ctx, cancel := context.WithCancel(context.TODO()) defer cancel() - etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) assert.Nil(t, err) defer etcdCli.Close() diff --git a/internal/datanode/mock_test.go b/internal/datanode/mock_test.go index 9fa46e55a0..78c76c3afe 100644 --- a/internal/datanode/mock_test.go +++ b/internal/datanode/mock_test.go @@ -134,7 +134,14 @@ func makeNewChannelNames(names []string, suffix string) []string { } func clearEtcd(rootPath string) error { - client, err := etcd.GetEtcdClient(&Params.EtcdCfg) + client, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) if err != nil { return err } diff --git a/internal/distributed/datacoord/client/client_test.go b/internal/distributed/datacoord/client/client_test.go index 25a997ace5..1590b4999d 100644 --- a/internal/distributed/datacoord/client/client_test.go +++ b/internal/distributed/datacoord/client/client_test.go @@ -33,7 +33,14 @@ func Test_NewClient(t *testing.T) { proxy.Params.InitOnce() ctx := context.Background() - etcdCli, err := etcd.GetEtcdClient(&proxy.Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + proxy.Params.EtcdCfg.UseEmbedEtcd, + proxy.Params.EtcdCfg.EtcdUseSSL, + proxy.Params.EtcdCfg.Endpoints, + proxy.Params.EtcdCfg.EtcdTLSCert, + proxy.Params.EtcdCfg.EtcdTLSKey, + proxy.Params.EtcdCfg.EtcdTLSCACert, + proxy.Params.EtcdCfg.EtcdTLSMinVersion) assert.Nil(t, err) client, err := NewClient(ctx, proxy.Params.EtcdCfg.MetaRootPath, etcdCli) assert.Nil(t, err) diff --git a/internal/distributed/datacoord/service.go b/internal/distributed/datacoord/service.go index d0c88299b6..88b1842453 100644 --- a/internal/distributed/datacoord/service.go +++ b/internal/distributed/datacoord/service.go @@ -92,7 +92,14 @@ func (s *Server) init() error { datacoord.Params.DataCoordCfg.Port = Params.Port datacoord.Params.DataCoordCfg.Address = Params.GetAddress() - etcdCli, err := etcd.GetEtcdClient(&datacoord.Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) if err != nil { log.Warn("DataCoord connect to etcd failed", zap.Error(err)) return err diff --git a/internal/distributed/datanode/service.go b/internal/distributed/datanode/service.go index cc3d82ff92..e82e2e0825 100644 --- a/internal/distributed/datanode/service.go +++ b/internal/distributed/datanode/service.go @@ -233,7 +233,14 @@ func (s *Server) init() error { dn.Params.DataNodeCfg.Port = Params.Port dn.Params.DataNodeCfg.IP = Params.IP - etcdCli, err := etcd.GetEtcdClient(&dn.Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) if err != nil { log.Error("failed to connect to etcd", zap.Error(err)) return err diff --git a/internal/distributed/indexcoord/client/client_test.go b/internal/distributed/indexcoord/client/client_test.go index 696f38310e..bd16821036 100644 --- a/internal/distributed/indexcoord/client/client_test.go +++ b/internal/distributed/indexcoord/client/client_test.go @@ -41,8 +41,17 @@ func TestIndexCoordClient(t *testing.T) { factory := dependency.NewDefaultFactory(true) server, err := grpcindexcoord.NewServer(ctx, factory) assert.NoError(t, err) + + indexcoord.Params.InitOnce() icm := indexcoord.NewIndexCoordMock() - etcdCli, err := etcd.GetEtcdClient(&ClientParams.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + indexcoord.Params.EtcdCfg.UseEmbedEtcd, + indexcoord.Params.EtcdCfg.EtcdUseSSL, + indexcoord.Params.EtcdCfg.Endpoints, + indexcoord.Params.EtcdCfg.EtcdTLSCert, + indexcoord.Params.EtcdCfg.EtcdTLSKey, + indexcoord.Params.EtcdCfg.EtcdTLSCACert, + indexcoord.Params.EtcdCfg.EtcdTLSMinVersion) assert.NoError(t, err) icm.CallRegister = func() error { session := sessionutil.NewSession(context.Background(), indexcoord.Params.EtcdCfg.MetaRootPath, etcdCli) diff --git a/internal/distributed/indexcoord/service.go b/internal/distributed/indexcoord/service.go index cb61a027d4..9f2402e2a6 100644 --- a/internal/distributed/indexcoord/service.go +++ b/internal/distributed/indexcoord/service.go @@ -100,7 +100,14 @@ func (s *Server) init() error { closer := trace.InitTracing("IndexCoord") s.closer = closer - etcdCli, err := etcd.GetEtcdClient(&indexcoord.Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) if err != nil { log.Warn("IndexCoord connect to etcd failed", zap.Error(err)) return err diff --git a/internal/distributed/indexnode/client/client_test.go b/internal/distributed/indexnode/client/client_test.go index ab7a28d76b..2a25fdc9e9 100644 --- a/internal/distributed/indexnode/client/client_test.go +++ b/internal/distributed/indexnode/client/client_test.go @@ -134,7 +134,14 @@ func TestIndexNodeClient(t *testing.T) { inm := indexnode.NewIndexNodeMock() ParamsGlobal.InitOnce() - etcdCli, err := etcd.GetEtcdClient(&ParamsGlobal.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + ParamsGlobal.EtcdCfg.UseEmbedEtcd, + ParamsGlobal.EtcdCfg.EtcdUseSSL, + ParamsGlobal.EtcdCfg.Endpoints, + ParamsGlobal.EtcdCfg.EtcdTLSCert, + ParamsGlobal.EtcdCfg.EtcdTLSKey, + ParamsGlobal.EtcdCfg.EtcdTLSCACert, + ParamsGlobal.EtcdCfg.EtcdTLSMinVersion) assert.NoError(t, err) inm.SetEtcdClient(etcdCli) err = ins.SetClient(inm) diff --git a/internal/distributed/indexnode/service.go b/internal/distributed/indexnode/service.go index 78073a1f52..3dd7188974 100644 --- a/internal/distributed/indexnode/service.go +++ b/internal/distributed/indexnode/service.go @@ -156,7 +156,14 @@ func (s *Server) init() error { return err } - etcdCli, err := etcd.GetEtcdClient(&indexnode.Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) if err != nil { log.Warn("IndexNode connect to etcd failed", zap.Error(err)) return err diff --git a/internal/distributed/indexnode/service_test.go b/internal/distributed/indexnode/service_test.go index f4551b30db..e82c6c15f6 100644 --- a/internal/distributed/indexnode/service_test.go +++ b/internal/distributed/indexnode/service_test.go @@ -44,7 +44,14 @@ func TestIndexNodeServer(t *testing.T) { inm := indexnode.NewIndexNodeMock() ParamsGlobal.InitOnce() - etcdCli, err := etcd.GetEtcdClient(&ParamsGlobal.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + ParamsGlobal.EtcdCfg.UseEmbedEtcd, + ParamsGlobal.EtcdCfg.EtcdUseSSL, + ParamsGlobal.EtcdCfg.Endpoints, + ParamsGlobal.EtcdCfg.EtcdTLSCert, + ParamsGlobal.EtcdCfg.EtcdTLSKey, + ParamsGlobal.EtcdCfg.EtcdTLSCACert, + ParamsGlobal.EtcdCfg.EtcdTLSMinVersion) assert.NoError(t, err) inm.SetEtcdClient(etcdCli) err = server.SetClient(inm) diff --git a/internal/distributed/proxy/service.go b/internal/distributed/proxy/service.go index 5636491940..c9fc7258b0 100644 --- a/internal/distributed/proxy/service.go +++ b/internal/distributed/proxy/service.go @@ -324,7 +324,14 @@ func (s *Server) init() error { s.closer = closer log.Info("init Proxy's tracer done", zap.String("service name", serviceName)) - etcdCli, err := etcd.GetEtcdClient(&proxy.Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) if err != nil { log.Warn("Proxy connect to etcd failed", zap.Error(err)) return err diff --git a/internal/distributed/querycoord/client/client_test.go b/internal/distributed/querycoord/client/client_test.go index d212098522..e96c51301d 100644 --- a/internal/distributed/querycoord/client/client_test.go +++ b/internal/distributed/querycoord/client/client_test.go @@ -35,7 +35,14 @@ func Test_NewClient(t *testing.T) { ctx := context.Background() - etcdCli, err := etcd.GetEtcdClient(&proxy.Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + proxy.Params.EtcdCfg.UseEmbedEtcd, + proxy.Params.EtcdCfg.EtcdUseSSL, + proxy.Params.EtcdCfg.Endpoints, + proxy.Params.EtcdCfg.EtcdTLSCert, + proxy.Params.EtcdCfg.EtcdTLSKey, + proxy.Params.EtcdCfg.EtcdTLSCACert, + proxy.Params.EtcdCfg.EtcdTLSMinVersion) assert.NoError(t, err) client, err := NewClient(ctx, proxy.Params.EtcdCfg.MetaRootPath, etcdCli) assert.Nil(t, err) diff --git a/internal/distributed/querycoord/service.go b/internal/distributed/querycoord/service.go index b1acc19383..6d806c444e 100644 --- a/internal/distributed/querycoord/service.go +++ b/internal/distributed/querycoord/service.go @@ -118,7 +118,14 @@ func (s *Server) init() error { closer := trace.InitTracing("querycoord") s.closer = closer - etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) if err != nil { log.Warn("QueryCoord connect to etcd failed", zap.Error(err)) return err diff --git a/internal/distributed/querynode/service.go b/internal/distributed/querynode/service.go index c4f47567de..d055375071 100644 --- a/internal/distributed/querynode/service.go +++ b/internal/distributed/querynode/service.go @@ -105,7 +105,14 @@ func (s *Server) init() error { log.Info("QueryNode", zap.Int("port", Params.Port)) - etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) if err != nil { log.Warn("QueryNode connect to etcd failed", zap.Error(err)) return err diff --git a/internal/distributed/rootcoord/client/client_test.go b/internal/distributed/rootcoord/client/client_test.go index 1240d36519..da836d3658 100644 --- a/internal/distributed/rootcoord/client/client_test.go +++ b/internal/distributed/rootcoord/client/client_test.go @@ -35,7 +35,14 @@ func Test_NewClient(t *testing.T) { proxy.Params.InitOnce() ctx := context.Background() - etcdCli, err := etcd.GetEtcdClient(&proxy.Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + proxy.Params.EtcdCfg.UseEmbedEtcd, + proxy.Params.EtcdCfg.EtcdUseSSL, + proxy.Params.EtcdCfg.Endpoints, + proxy.Params.EtcdCfg.EtcdTLSCert, + proxy.Params.EtcdCfg.EtcdTLSKey, + proxy.Params.EtcdCfg.EtcdTLSCACert, + proxy.Params.EtcdCfg.EtcdTLSMinVersion) assert.NoError(t, err) client, err := NewClient(ctx, proxy.Params.EtcdCfg.MetaRootPath, etcdCli) assert.Nil(t, err) diff --git a/internal/distributed/rootcoord/service.go b/internal/distributed/rootcoord/service.go index 7b237dfd60..42b4c0ced1 100644 --- a/internal/distributed/rootcoord/service.go +++ b/internal/distributed/rootcoord/service.go @@ -162,7 +162,14 @@ func (s *Server) init() error { closer := trace.InitTracing("root_coord") s.closer = closer - etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) if err != nil { log.Warn("RootCoord connect to etcd failed", zap.Error(err)) return err diff --git a/internal/distributed/rootcoord/service_test.go b/internal/distributed/rootcoord/service_test.go index a4dadea07e..96645ca51c 100644 --- a/internal/distributed/rootcoord/service_test.go +++ b/internal/distributed/rootcoord/service_test.go @@ -178,7 +178,14 @@ func TestRun(t *testing.T) { rootcoord.Params.Init() rootcoord.Params.EtcdCfg.MetaRootPath = fmt.Sprintf("/%d/test/meta", randVal) - etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) assert.Nil(t, err) sessKey := path.Join(rootcoord.Params.EtcdCfg.MetaRootPath, sessionutil.DefaultServiceRoot) _, err = etcdCli.Delete(ctx, sessKey, clientv3.WithPrefix()) diff --git a/internal/indexcoord/index_coord_test.go b/internal/indexcoord/index_coord_test.go index 1903c176f2..a76130adae 100644 --- a/internal/indexcoord/index_coord_test.go +++ b/internal/indexcoord/index_coord_test.go @@ -51,7 +51,14 @@ func TestMockEtcd(t *testing.T) { Params.InitOnce() Params.EtcdCfg.MetaRootPath = "indexcoord-mock" - etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) assert.NoError(t, err) etcdKV := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) @@ -98,7 +105,14 @@ func testIndexCoord(t *testing.T) { // first start an IndexNode inm0 := indexnode.NewIndexNodeMock() - etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) assert.NoError(t, err) // start IndexCoord diff --git a/internal/kv/etcd/embed_etcd_config_test.go b/internal/kv/etcd/embed_etcd_config_test.go index 12b1076436..f58bb919d7 100644 --- a/internal/kv/etcd/embed_etcd_config_test.go +++ b/internal/kv/etcd/embed_etcd_config_test.go @@ -33,7 +33,6 @@ func TestEtcdConfigLoad(te *testing.T) { param := new(paramtable.ServiceParam) te.Setenv("etcd.use.embed", "true") - // TODO, not sure if the relative path works for ci environment te.Setenv("etcd.config.path", "../../../configs/advanced/etcd.yaml") te.Setenv("etcd.data.dir", "etcd.test.data.dir") diff --git a/internal/kv/etcd/etcd_kv_test.go b/internal/kv/etcd/etcd_kv_test.go index 4045f5840f..e26467918e 100644 --- a/internal/kv/etcd/etcd_kv_test.go +++ b/internal/kv/etcd/etcd_kv_test.go @@ -40,7 +40,14 @@ func TestMain(m *testing.M) { } func TestEtcdKV_Load(te *testing.T) { - etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) defer etcdCli.Close() assert.NoError(te, err) te.Run("EtcdKV SaveAndLoad", func(t *testing.T) { diff --git a/internal/kv/etcd/metakv_factory.go b/internal/kv/etcd/metakv_factory.go index 670eda34ca..22b3538251 100644 --- a/internal/kv/etcd/metakv_factory.go +++ b/internal/kv/etcd/metakv_factory.go @@ -50,7 +50,14 @@ func NewMetaKvFactory(rootPath string, etcdCfg *paramtable.EtcdConfig) (kv.MetaK } return metaKv, err } - client, err := etcd.GetEtcdClient(etcdCfg) + client, err := etcd.GetEtcdClient( + etcdCfg.UseEmbedEtcd, + etcdCfg.EtcdUseSSL, + etcdCfg.Endpoints, + etcdCfg.EtcdTLSCert, + etcdCfg.EtcdTLSKey, + etcdCfg.EtcdTLSCACert, + etcdCfg.EtcdTLSMinVersion) if err != nil { return nil, err } diff --git a/internal/metastore/kv/rootcoord/meta_snapshot_test.go b/internal/metastore/kv/rootcoord/meta_snapshot_test.go index 0acd446466..1396111df9 100644 --- a/internal/metastore/kv/rootcoord/meta_snapshot_test.go +++ b/internal/metastore/kv/rootcoord/meta_snapshot_test.go @@ -48,7 +48,14 @@ func TestMetaSnapshot(t *testing.T) { rootPath := fmt.Sprintf("/test/meta/%d", randVal) tsKey := "timestamp" - etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) assert.Nil(t, err) defer etcdCli.Close() @@ -188,7 +195,14 @@ func TestGetRevOnEtcd(t *testing.T) { tsKey := "timestamp" key := path.Join(rootPath, tsKey) - etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) assert.Nil(t, err) defer etcdCli.Close() @@ -232,7 +246,14 @@ func TestLoad(t *testing.T) { rootPath := fmt.Sprintf("/test/meta/%d", randVal) tsKey := "timestamp" - etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) assert.Nil(t, err) defer etcdCli.Close() @@ -280,7 +301,14 @@ func TestMultiSave(t *testing.T) { rootPath := fmt.Sprintf("/test/meta/%d", randVal) tsKey := "timestamp" - etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) assert.Nil(t, err) defer etcdCli.Close() @@ -344,7 +372,14 @@ func TestMultiSaveAndRemoveWithPrefix(t *testing.T) { rootPath := fmt.Sprintf("/test/meta/%d", randVal) tsKey := "timestamp" - etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) assert.Nil(t, err) defer etcdCli.Close() @@ -422,7 +457,14 @@ func TestTsBackward(t *testing.T) { rootPath := fmt.Sprintf("/test/meta/%d", randVal) tsKey := "timestamp" - etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) assert.Nil(t, err) defer etcdCli.Close() @@ -449,7 +491,14 @@ func TestFix7150(t *testing.T) { rootPath := fmt.Sprintf("/test/meta/%d", randVal) tsKey := "timestamp" - etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) assert.Nil(t, err) defer etcdCli.Close() diff --git a/internal/metastore/kv/rootcoord/suffix_snapshot_test.go b/internal/metastore/kv/rootcoord/suffix_snapshot_test.go index c7bdb423eb..3959fc1bd8 100644 --- a/internal/metastore/kv/rootcoord/suffix_snapshot_test.go +++ b/internal/metastore/kv/rootcoord/suffix_snapshot_test.go @@ -263,7 +263,14 @@ func Test_SuffixSnapshotLoad(t *testing.T) { rootPath := fmt.Sprintf("/test/meta/%d", randVal) sep := "_ts" - etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) require.Nil(t, err) defer etcdCli.Close() etcdkv := etcdkv.NewEtcdKV(etcdCli, rootPath) @@ -315,7 +322,14 @@ func Test_SuffixSnapshotMultiSave(t *testing.T) { Params.Init() rootPath := fmt.Sprintf("/test/meta/%d", randVal) sep := "_ts" - etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) require.Nil(t, err) defer etcdCli.Close() etcdkv := etcdkv.NewEtcdKV(etcdCli, rootPath) @@ -391,7 +405,14 @@ func Test_SuffixSnapshotMultiSaveAndRemoveWithPrefix(t *testing.T) { rootPath := fmt.Sprintf("/test/meta/%d", randVal) sep := "_ts" - etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) require.Nil(t, err) defer etcdCli.Close() etcdkv := etcdkv.NewEtcdKV(etcdCli, rootPath) diff --git a/internal/proxy/proxy_test.go b/internal/proxy/proxy_test.go index c3128ff64e..ce96a702f4 100644 --- a/internal/proxy/proxy_test.go +++ b/internal/proxy/proxy_test.go @@ -298,7 +298,14 @@ func runIndexNode(ctx context.Context, localMsg bool, alias string) *grpcindexno panic(err) } wg.Done() - etcd, err := etcd.GetEtcdClient(&indexnode.Params.EtcdCfg) + etcd, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) if err != nil { panic(err) } @@ -513,7 +520,14 @@ func TestProxy(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, proxy) - etcdcli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + etcdcli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) defer etcdcli.Close() assert.NoError(t, err) proxy.SetEtcdClient(etcdcli) diff --git a/internal/querycoordv2/balance/rowcount_based_balancer_test.go b/internal/querycoordv2/balance/rowcount_based_balancer_test.go index 1fdd153d25..c93dc1edd0 100644 --- a/internal/querycoordv2/balance/rowcount_based_balancer_test.go +++ b/internal/querycoordv2/balance/rowcount_based_balancer_test.go @@ -44,7 +44,14 @@ func (suite *RowCountBasedBalancerTestSuite) SetupSuite() { func (suite *RowCountBasedBalancerTestSuite) SetupTest() { var err error config := GenerateEtcdConfig() - cli, err := etcd.GetEtcdClient(&config) + cli, err := etcd.GetEtcdClient( + config.UseEmbedEtcd, + config.EtcdUseSSL, + config.Endpoints, + config.EtcdTLSCert, + config.EtcdTLSKey, + config.EtcdTLSCACert, + config.EtcdTLSMinVersion) suite.Require().NoError(err) suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath) suite.broker = meta.NewMockBroker(suite.T()) diff --git a/internal/querycoordv2/checkers/channel_checker_test.go b/internal/querycoordv2/checkers/channel_checker_test.go index ca052fd4b1..000171f24c 100644 --- a/internal/querycoordv2/checkers/channel_checker_test.go +++ b/internal/querycoordv2/checkers/channel_checker_test.go @@ -44,7 +44,14 @@ func (suite *ChannelCheckerTestSuite) SetupSuite() { func (suite *ChannelCheckerTestSuite) SetupTest() { var err error config := GenerateEtcdConfig() - cli, err := etcd.GetEtcdClient(&config) + cli, err := etcd.GetEtcdClient( + config.UseEmbedEtcd, + config.EtcdUseSSL, + config.Endpoints, + config.EtcdTLSCert, + config.EtcdTLSKey, + config.EtcdTLSCACert, + config.EtcdTLSMinVersion) suite.Require().NoError(err) suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath) diff --git a/internal/querycoordv2/checkers/segment_checker_test.go b/internal/querycoordv2/checkers/segment_checker_test.go index ef39640202..76007b45ba 100644 --- a/internal/querycoordv2/checkers/segment_checker_test.go +++ b/internal/querycoordv2/checkers/segment_checker_test.go @@ -44,7 +44,14 @@ func (suite *SegmentCheckerTestSuite) SetupSuite() { func (suite *SegmentCheckerTestSuite) SetupTest() { var err error config := GenerateEtcdConfig() - cli, err := etcd.GetEtcdClient(&config) + cli, err := etcd.GetEtcdClient( + config.UseEmbedEtcd, + config.EtcdUseSSL, + config.Endpoints, + config.EtcdTLSCert, + config.EtcdTLSKey, + config.EtcdTLSCACert, + config.EtcdTLSMinVersion) suite.Require().NoError(err) suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath) diff --git a/internal/querycoordv2/job/job_test.go b/internal/querycoordv2/job/job_test.go index bb12aba8f5..237b5bcd39 100644 --- a/internal/querycoordv2/job/job_test.go +++ b/internal/querycoordv2/job/job_test.go @@ -117,7 +117,14 @@ func (suite *JobSuite) SetupSuite() { func (suite *JobSuite) SetupTest() { config := GenerateEtcdConfig() - cli, err := etcd.GetEtcdClient(&config) + cli, err := etcd.GetEtcdClient( + config.UseEmbedEtcd, + config.EtcdUseSSL, + config.Endpoints, + config.EtcdTLSCert, + config.EtcdTLSKey, + config.EtcdTLSCACert, + config.EtcdTLSMinVersion) suite.Require().NoError(err) suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath) diff --git a/internal/querycoordv2/meta/collection_manager_test.go b/internal/querycoordv2/meta/collection_manager_test.go index ea767ce313..c372870e05 100644 --- a/internal/querycoordv2/meta/collection_manager_test.go +++ b/internal/querycoordv2/meta/collection_manager_test.go @@ -68,7 +68,14 @@ func (suite *CollectionManagerSuite) SetupSuite() { func (suite *CollectionManagerSuite) SetupTest() { var err error config := GenerateEtcdConfig() - cli, err := etcd.GetEtcdClient(&config) + cli, err := etcd.GetEtcdClient( + config.UseEmbedEtcd, + config.EtcdUseSSL, + config.Endpoints, + config.EtcdTLSCert, + config.EtcdTLSKey, + config.EtcdTLSCACert, + config.EtcdTLSMinVersion) suite.Require().NoError(err) suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath) suite.store = NewMetaStore(suite.kv) diff --git a/internal/querycoordv2/meta/replica_manager_test.go b/internal/querycoordv2/meta/replica_manager_test.go index 397e81203f..de8b08cfdf 100644 --- a/internal/querycoordv2/meta/replica_manager_test.go +++ b/internal/querycoordv2/meta/replica_manager_test.go @@ -51,7 +51,14 @@ func (suite *ReplicaManagerSuite) SetupSuite() { func (suite *ReplicaManagerSuite) SetupTest() { var err error config := GenerateEtcdConfig() - cli, err := etcd.GetEtcdClient(&config) + cli, err := etcd.GetEtcdClient( + config.UseEmbedEtcd, + config.EtcdUseSSL, + config.Endpoints, + config.EtcdTLSCert, + config.EtcdTLSKey, + config.EtcdTLSCACert, + config.EtcdTLSMinVersion) suite.Require().NoError(err) suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath) suite.store = NewMetaStore(suite.kv) diff --git a/internal/querycoordv2/observers/collection_observer_test.go b/internal/querycoordv2/observers/collection_observer_test.go index 45475de42b..af00e4aedd 100644 --- a/internal/querycoordv2/observers/collection_observer_test.go +++ b/internal/querycoordv2/observers/collection_observer_test.go @@ -145,7 +145,14 @@ func (suite *CollectionObserverSuite) SetupTest() { suite.idAllocator = RandomIncrementIDAllocator() log.Debug("create embedded etcd KV...") config := GenerateEtcdConfig() - client, err := etcd.GetEtcdClient(&config) + client, err := etcd.GetEtcdClient( + config.UseEmbedEtcd, + config.EtcdUseSSL, + config.Endpoints, + config.EtcdTLSCert, + config.EtcdTLSKey, + config.EtcdTLSCACert, + config.EtcdTLSMinVersion) suite.Require().NoError(err) suite.kv = etcdkv.NewEtcdKV(client, Params.EtcdCfg.MetaRootPath+"-"+RandomMetaRootPath()) suite.Require().NoError(err) diff --git a/internal/querycoordv2/observers/handoff_observer_test.go b/internal/querycoordv2/observers/handoff_observer_test.go index e38448a71f..8867c690ed 100644 --- a/internal/querycoordv2/observers/handoff_observer_test.go +++ b/internal/querycoordv2/observers/handoff_observer_test.go @@ -107,7 +107,14 @@ func (suite *HandoffObserverTestSuit) SetupTest() { suite.idAllocator = RandomIncrementIDAllocator() log.Debug("create embedded etcd KV...") config := GenerateEtcdConfig() - client, err := etcd.GetEtcdClient(&config) + client, err := etcd.GetEtcdClient( + config.UseEmbedEtcd, + config.EtcdUseSSL, + config.Endpoints, + config.EtcdTLSCert, + config.EtcdTLSKey, + config.EtcdTLSCACert, + config.EtcdTLSMinVersion) suite.Require().NoError(err) suite.kv = etcdkv.NewEtcdKV(client, Params.EtcdCfg.MetaRootPath+"-"+RandomMetaRootPath()) suite.Require().NoError(err) diff --git a/internal/querycoordv2/observers/leader_observer_test.go b/internal/querycoordv2/observers/leader_observer_test.go index d7ab056d1f..06b2bb294d 100644 --- a/internal/querycoordv2/observers/leader_observer_test.go +++ b/internal/querycoordv2/observers/leader_observer_test.go @@ -49,7 +49,14 @@ func (suite *LeaderObserverTestSuite) SetupSuite() { func (suite *LeaderObserverTestSuite) SetupTest() { var err error config := GenerateEtcdConfig() - cli, err := etcd.GetEtcdClient(&config) + cli, err := etcd.GetEtcdClient( + config.UseEmbedEtcd, + config.EtcdUseSSL, + config.Endpoints, + config.EtcdTLSCert, + config.EtcdTLSKey, + config.EtcdTLSCACert, + config.EtcdTLSMinVersion) suite.Require().NoError(err) suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath) diff --git a/internal/querycoordv2/server_test.go b/internal/querycoordv2/server_test.go index e3f3da75f4..34e69e76f2 100644 --- a/internal/querycoordv2/server_test.go +++ b/internal/querycoordv2/server_test.go @@ -393,7 +393,14 @@ func newQueryCoord() (*Server, error) { return nil, err } - etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) if err != nil { return nil, err } diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index 0f66846d62..478048ce60 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -111,7 +111,14 @@ func (suite *ServiceSuite) SetupSuite() { func (suite *ServiceSuite) SetupTest() { config := params.GenerateEtcdConfig() - cli, err := etcd.GetEtcdClient(&config) + cli, err := etcd.GetEtcdClient( + config.UseEmbedEtcd, + config.EtcdUseSSL, + config.Endpoints, + config.EtcdTLSCert, + config.EtcdTLSKey, + config.EtcdTLSCACert, + config.EtcdTLSMinVersion) suite.Require().NoError(err) suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath) diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index f13361d1f3..27910e3e08 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -118,7 +118,14 @@ func (suite *TaskSuite) SetupSuite() { func (suite *TaskSuite) SetupTest() { config := GenerateEtcdConfig() - cli, err := etcd.GetEtcdClient(&config) + cli, err := etcd.GetEtcdClient( + config.UseEmbedEtcd, + config.EtcdUseSSL, + config.Endpoints, + config.EtcdTLSCert, + config.EtcdTLSKey, + config.EtcdTLSCACert, + config.EtcdTLSMinVersion) suite.Require().NoError(err) suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath) diff --git a/internal/querynode/impl_test.go b/internal/querynode/impl_test.go index 8f5b7cae17..3ed3a00e67 100644 --- a/internal/querynode/impl_test.go +++ b/internal/querynode/impl_test.go @@ -444,7 +444,14 @@ func TestImpl_isHealthy(t *testing.T) { func TestImpl_ShowConfigurations(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) assert.NoError(t, err) defer etcdCli.Close() @@ -486,7 +493,14 @@ func TestImpl_GetMetrics(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) assert.NoError(t, err) defer etcdCli.Close() diff --git a/internal/querynode/metrics_info_test.go b/internal/querynode/metrics_info_test.go index 0df11b561d..e5f6899d18 100644 --- a/internal/querynode/metrics_info_test.go +++ b/internal/querynode/metrics_info_test.go @@ -37,7 +37,14 @@ func TestGetSystemInfoMetrics(t *testing.T) { node, err := genSimpleQueryNode(ctx) assert.NoError(t, err) - etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) assert.NoError(t, err) defer etcdCli.Close() node.session = sessionutil.NewSession(node.queryNodeLoopCtx, Params.EtcdCfg.MetaRootPath, etcdCli) @@ -64,7 +71,14 @@ func TestGetComponentConfigurationsFailed(t *testing.T) { node, err := genSimpleQueryNode(ctx) assert.NoError(t, err) - etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) assert.NoError(t, err) defer etcdCli.Close() node.session = sessionutil.NewSession(node.queryNodeLoopCtx, Params.EtcdCfg.MetaRootPath, etcdCli) diff --git a/internal/querynode/mock_test.go b/internal/querynode/mock_test.go index f570c3237a..6e5626f23f 100644 --- a/internal/querynode/mock_test.go +++ b/internal/querynode/mock_test.go @@ -523,7 +523,14 @@ func genCollectionMeta(collectionID UniqueID, schema *schemapb.CollectionSchema) // ---------- unittest util functions ---------- // functions of third-party func genEtcdKV() (*etcdkv.EtcdKV, error) { - etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) if err != nil { return nil, err } @@ -1689,7 +1696,14 @@ func saveChangeInfo(key string, value string) error { func genSimpleQueryNodeWithMQFactory(ctx context.Context, fac dependency.Factory) (*QueryNode, error) { node := NewQueryNode(ctx, fac) - etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) if err != nil { return nil, err } diff --git a/internal/querynode/query_node_test.go b/internal/querynode/query_node_test.go index 5de0d1c6ba..5f0645dc2d 100644 --- a/internal/querynode/query_node_test.go +++ b/internal/querynode/query_node_test.go @@ -82,7 +82,14 @@ func newQueryNodeMock() *QueryNode { cancel() }() } - etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) if err != nil { panic(err) } @@ -171,7 +178,14 @@ func TestQueryNode_register(t *testing.T) { node, err := genSimpleQueryNode(ctx) assert.NoError(t, err) - etcdcli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + etcdcli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) assert.NoError(t, err) defer etcdcli.Close() node.SetEtcdClient(etcdcli) @@ -189,7 +203,16 @@ func TestQueryNode_init(t *testing.T) { node, err := genSimpleQueryNode(ctx) assert.NoError(t, err) - etcdcli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + defer node.Stop() + + etcdcli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) assert.NoError(t, err) defer etcdcli.Close() node.SetEtcdClient(etcdcli) diff --git a/internal/rootcoord/proxy_client_manager_test.go b/internal/rootcoord/proxy_client_manager_test.go index 7c9a932a15..8bc3668ab2 100644 --- a/internal/rootcoord/proxy_client_manager_test.go +++ b/internal/rootcoord/proxy_client_manager_test.go @@ -106,7 +106,14 @@ func TestProxyClientManager_GetProxyClients(t *testing.T) { core, err := NewCore(context.Background(), nil) assert.Nil(t, err) - cli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + cli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) defer cli.Close() assert.Nil(t, err) core.etcdCli = cli @@ -130,7 +137,14 @@ func TestProxyClientManager_AddProxyClient(t *testing.T) { core, err := NewCore(context.Background(), nil) assert.Nil(t, err) - cli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + cli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) assert.Nil(t, err) defer cli.Close() core.etcdCli = cli diff --git a/internal/rootcoord/proxy_manager_test.go b/internal/rootcoord/proxy_manager_test.go index 03533ad5aa..d49da6f09d 100644 --- a/internal/rootcoord/proxy_manager_test.go +++ b/internal/rootcoord/proxy_manager_test.go @@ -34,7 +34,14 @@ import ( func TestProxyManager(t *testing.T) { Params.Init() - etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) assert.Nil(t, err) defer etcdCli.Close() ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) @@ -103,7 +110,14 @@ func TestProxyManager(t *testing.T) { func TestProxyManager_ErrCompacted(t *testing.T) { Params.Init() - etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) assert.Nil(t, err) defer etcdCli.Close() ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index 4cc0361439..ec637066b7 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -1332,7 +1332,14 @@ func TestRootcoord_EnableActiveStandby(t *testing.T) { ctx := context.Background() coreFactory := dependency.NewDefaultFactory(true) - etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) assert.NoError(t, err) defer etcdCli.Close() core, err := NewCore(ctx, coreFactory) diff --git a/internal/rootcoord/timestamp_bench_test.go b/internal/rootcoord/timestamp_bench_test.go index 706eaac7db..00ff9054b6 100644 --- a/internal/rootcoord/timestamp_bench_test.go +++ b/internal/rootcoord/timestamp_bench_test.go @@ -21,7 +21,14 @@ import ( func getTestEtcdCli() *clientv3.Client { Params.InitOnce() - cli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + cli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd, + Params.EtcdCfg.EtcdUseSSL, + Params.EtcdCfg.Endpoints, + Params.EtcdCfg.EtcdTLSCert, + Params.EtcdCfg.EtcdTLSKey, + Params.EtcdCfg.EtcdTLSCACert, + Params.EtcdCfg.EtcdTLSMinVersion) if err != nil { panic(err) } diff --git a/internal/util/etcd/etcd_server.go b/internal/util/etcd/etcd_server.go index c38be3f80a..75f81c43e4 100644 --- a/internal/util/etcd/etcd_server.go +++ b/internal/util/etcd/etcd_server.go @@ -4,7 +4,6 @@ import ( "sync" "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/util/paramtable" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/server/v3/embed" "go.etcd.io/etcd/server/v3/etcdserver/api/v3client" @@ -25,11 +24,17 @@ func GetEmbedEtcdClient() (*clientv3.Client, error) { } // InitEtcdServer initializes embedded etcd server singleton. -func InitEtcdServer(etcdCfg *paramtable.EtcdConfig) error { - if etcdCfg.UseEmbedEtcd { +func InitEtcdServer( + useEmbedEtcd bool, + configPath string, + dataDir string, + logPath string, + logLevel string, +) error { + if useEmbedEtcd { var initError error initOnce.Do(func() { - path := etcdCfg.ConfigPath + path := configPath var cfg *embed.Config if len(path) > 0 { cfgFromFile, err := embed.ConfigFromFile(path) @@ -40,22 +45,26 @@ func InitEtcdServer(etcdCfg *paramtable.EtcdConfig) error { } else { cfg = embed.NewConfig() } - cfg.Dir = etcdCfg.DataDir - cfg.LogOutputs = []string{etcdCfg.EtcdLogPath} - cfg.LogLevel = etcdCfg.EtcdLogLevel + cfg.Dir = dataDir + cfg.LogOutputs = []string{logPath} + cfg.LogLevel = logLevel e, err := embed.StartEtcd(cfg) if err != nil { log.Error("failed to init embedded Etcd server", zap.Error(err)) initError = err } etcdServer = e - log.Info("finish init Etcd config", zap.String("path", path), zap.String("data", etcdCfg.DataDir)) + log.Info("finish init Etcd config", zap.String("path", path), zap.String("data", dataDir)) }) return initError } return nil } +func HasServer() bool { + return etcdServer != nil +} + // StopEtcdServer stops embedded etcd server singleton. func StopEtcdServer() { if etcdServer != nil { diff --git a/internal/util/etcd/etcd_util.go b/internal/util/etcd/etcd_util.go index f63085c04b..60e0488596 100644 --- a/internal/util/etcd/etcd_util.go +++ b/internal/util/etcd/etcd_util.go @@ -25,7 +25,6 @@ import ( "github.com/pkg/errors" - "github.com/milvus-io/milvus/internal/util/paramtable" clientv3 "go.etcd.io/etcd/client/v3" ) @@ -34,14 +33,21 @@ var ( ) // GetEtcdClient returns etcd client -func GetEtcdClient(cfg *paramtable.EtcdConfig) (*clientv3.Client, error) { - if cfg.UseEmbedEtcd { +func GetEtcdClient( + useEmbedEtcd bool, + useSSL bool, + endpoints []string, + certFile string, + keyFile string, + caCertFile string, + minVersion string) (*clientv3.Client, error) { + if useEmbedEtcd { return GetEmbedEtcdClient() } - if cfg.EtcdUseSSL { - return GetRemoteEtcdSSLClient(cfg.Endpoints, cfg.EtcdTLSCert, cfg.EtcdTLSKey, cfg.EtcdTLSCACert, cfg.EtcdTLSMinVersion) + if useSSL { + return GetRemoteEtcdSSLClient(endpoints, certFile, keyFile, caCertFile, minVersion) } - return GetRemoteEtcdClient(cfg.Endpoints) + return GetRemoteEtcdClient(endpoints) } // GetRemoteEtcdClient returns client of remote etcd by given endpoints diff --git a/internal/util/etcd/etcd_util_test.go b/internal/util/etcd/etcd_util_test.go index 3d5cc7f9d7..f9427744af 100644 --- a/internal/util/etcd/etcd_util_test.go +++ b/internal/util/etcd/etcd_util_test.go @@ -19,26 +19,18 @@ package etcd import ( "context" "errors" - "os" "path" "testing" - "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/stretchr/testify/assert" ) -var Params paramtable.ServiceParam - func TestEtcd(t *testing.T) { - Params.Init() - Params.EtcdCfg.UseEmbedEtcd = true - Params.EtcdCfg.DataDir = "/tmp/data" - err := InitEtcdServer(&Params.EtcdCfg) + err := InitEtcdServer(true, "", "/tmp/data", "stdout", "info") assert.NoError(t, err) - defer os.RemoveAll(Params.EtcdCfg.DataDir) defer StopEtcdServer() - etcdCli, err := GetEtcdClient(&Params.EtcdCfg) + etcdCli, err := GetEtcdClient(true, false, []string{}, "", "", "", "") assert.NoError(t, err) key := path.Join("test", "test") @@ -50,26 +42,25 @@ func TestEtcd(t *testing.T) { assert.False(t, resp.Count < 1) assert.Equal(t, string(resp.Kvs[0].Value), "value") - Params.EtcdCfg.UseEmbedEtcd = false - Params.EtcdCfg.EtcdUseSSL = true - Params.EtcdCfg.EtcdTLSMinVersion = "1.3" - Params.EtcdCfg.EtcdTLSCACert = "../../../configs/cert/ca.pem" - Params.EtcdCfg.EtcdTLSCert = "../../../configs/cert/client.pem" - Params.EtcdCfg.EtcdTLSKey = "../../../configs/cert/client.key" - etcdCli, err = GetEtcdClient(&Params.EtcdCfg) - assert.NoError(t, err) - - Params.EtcdCfg.EtcdTLSMinVersion = "some not right word" - etcdCli, err = GetEtcdClient(&Params.EtcdCfg) + etcdCli, err = GetEtcdClient(false, true, []string{}, + "../../../configs/cert/client.pem", + "../../../configs/cert/client.key", + "../../../configs/cert/ca.pem", + "some not right word") assert.NotNil(t, err) - Params.EtcdCfg.EtcdTLSMinVersion = "1.2" - Params.EtcdCfg.EtcdTLSCACert = "wrong/file" - etcdCli, err = GetEtcdClient(&Params.EtcdCfg) + etcdCli, err = GetEtcdClient(false, true, []string{}, + "../../../configs/cert/client.pem", + "../../../configs/cert/client.key", + "wrong/file", + "1.2") assert.NotNil(t, err) - Params.EtcdCfg.EtcdTLSCACert = "../../../configs/cert/ca.pem" - Params.EtcdCfg.EtcdTLSCert = "wrong/file" + etcdCli, err = GetEtcdClient(false, true, []string{}, + "wrong/file", + "../../../configs/cert/client.key", + "../../../configs/cert/ca.pem", + "1.2") assert.NotNil(t, err) } diff --git a/internal/util/paramtable/base_table.go b/internal/util/paramtable/base_table.go index 66e542c426..ebb283a118 100644 --- a/internal/util/paramtable/base_table.go +++ b/internal/util/paramtable/base_table.go @@ -23,6 +23,7 @@ import ( config "github.com/milvus-io/milvus/internal/config" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/util/etcd" "github.com/milvus-io/milvus/internal/util/logutil" "github.com/milvus-io/milvus/internal/util/typeutil" "go.uber.org/zap" @@ -137,26 +138,38 @@ func (gp *BaseTable) initConfigsFromLocal(formatter func(key string) string) { } func (gp *BaseTable) initConfigsFromRemote(formatter func(key string) string) { - endpoints, err := gp.mgr.GetConfig("etcd.endpoints") + _, err := gp.mgr.GetConfig("etcd.endpoints") if err != nil { log.Info("cannot find etcd.endpoints") return } - rootPath, err := gp.mgr.GetConfig("etcd.rootPath") + _, err = gp.mgr.GetConfig("etcd.rootPath") if err != nil { log.Info("cannot find etcd.rootPath") return } + etcdConfig := EtcdConfig{} + etcdConfig.init(gp) + if etcdConfig.UseEmbedEtcd && !etcd.HasServer() { + return + } + info := &config.EtcdInfo{ + UseEmbed: etcdConfig.UseEmbedEtcd, + UseSSL: etcdConfig.EtcdUseSSL, + Endpoints: etcdConfig.Endpoints, + CertFile: etcdConfig.EtcdTLSCert, + KeyFile: etcdConfig.EtcdTLSKey, + CaCertFile: etcdConfig.EtcdTLSCACert, + MinVersion: etcdConfig.EtcdTLSMinVersion, + KeyPrefix: etcdConfig.MetaRootPath, + RefreshMode: config.ModeInterval, + RefreshInterval: 10 * time.Second, + } configFilePath := gp.configDir + "/" + gp.YamlFile gp.mgr, err = config.Init(config.WithEnvSource(formatter), config.WithFilesSource(configFilePath), - config.WithEtcdSource(&config.EtcdInfo{ - Endpoints: strings.Split(endpoints, ","), - KeyPrefix: rootPath, - RefreshMode: config.ModeInterval, - RefreshInterval: 10 * time.Second, - })) + config.WithEtcdSource(info)) if err != nil { log.Info("init with etcd failed", zap.Error(err)) return