From 0f4bd50ce367053e4c69cc6091e2caecb6740589 Mon Sep 17 00:00:00 2001 From: Cai Yudong Date: Fri, 11 Jun 2021 22:04:41 +0800 Subject: [PATCH] Support ETCD endpoints array (#5755) --- .env | 2 +- configs/milvus.yaml | 4 +- .../docker/distributed/docker-compose.yml | 10 +-- .../docker/standalone/docker-compose.yml | 2 +- docker-compose.yml | 2 +- internal/allocator/global_id_test.go | 14 ++-- internal/allocator/id.go | 17 +++-- internal/datanode/data_node.go | 2 +- internal/datanode/mock_test.go | 6 +- internal/datanode/param_table.go | 13 ++-- internal/datanode/param_table_test.go | 6 +- internal/dataservice/param.go | 11 +-- internal/dataservice/server.go | 9 +-- internal/dataservice/server_test.go | 6 +- .../distributed/datanode/datanode_test.go | 2 +- internal/distributed/datanode/service.go | 10 +-- .../distributed/dataservice/client/client.go | 4 +- internal/distributed/indexnode/service.go | 2 +- .../distributed/indexservice/client/client.go | 4 +- .../masterservice/client/client.go | 4 +- .../masterservice/masterservice_test.go | 16 ++--- internal/distributed/masterservice/server.go | 24 +++---- internal/distributed/proxynode/service.go | 8 +-- internal/distributed/querynode/service.go | 8 +-- .../distributed/queryservice/client/client.go | 4 +- internal/distributed/queryservice/service.go | 4 +- internal/indexnode/indexnode.go | 4 +- internal/indexnode/paramtable.go | 13 ++-- internal/indexservice/indexservice.go | 8 +-- internal/indexservice/paramtable.go | 15 ++-- internal/kv/etcd/etcd_kv_test.go | 71 +++++-------------- internal/kv/etcd/etcd_stats_watcher_test.go | 12 ++-- internal/masterservice/master_service.go | 10 +-- internal/masterservice/master_service_test.go | 2 +- internal/masterservice/meta_snapshot_test.go | 15 ++-- internal/masterservice/meta_table_test.go | 6 +- internal/masterservice/param_table.go | 11 +-- internal/masterservice/param_table_test.go | 4 +- internal/masterservice/proxy_node_manager.go | 4 +- .../masterservice/proxy_node_manager_test.go | 4 +- internal/msgstream/mq_msgstream_test.go | 10 +-- internal/proxynode/paramtable.go | 10 +-- internal/proxynode/proxy_node.go | 4 +- internal/querynode/param_table.go | 11 +-- internal/querynode/query_node.go | 2 +- internal/queryservice/param_table.go | 13 ++-- internal/queryservice/queryservice.go | 2 +- internal/tso/global_allocator_test.go | 13 ++-- internal/util/paramtable/paramtable.go | 17 ++--- .../server/rocksmq/rocksmq_impl_test.go | 42 +++++------ internal/util/sessionutil/session_util.go | 8 +-- .../util/sessionutil/session_util_test.go | 26 +++---- internal/util/tsoutil/tso.go | 4 +- 53 files changed, 238 insertions(+), 297 deletions(-) diff --git a/.env b/.env index 61403a8945..9f0f94f075 100644 --- a/.env +++ b/.env @@ -5,4 +5,4 @@ DATE_VERSION=20210428-144501 LATEST_DATE_VERSION=20210428-144501 MINIO_ADDRESS=minio:9000 PULSAR_ADDRESS=pulsar://pulsar:6650 -ETCD_ADDRESS=etcd:2379 +ETCD_ENDPOINTS=etcd:2379 diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 8b1cd5281d..7637b58b50 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -14,8 +14,8 @@ nodeID: # will be deprecated later queryNodeIDList: [1] etcd: - address: localhost - port: 2379 + endpoints: + - localhost:2379 rootPath: by-dev metaSubPath: meta # metaRootPath = rootPath + '/' + metaSubPath kvSubPath: kv # kvRootPath = rootPath + '/' + kvSubPath diff --git a/deployments/docker/distributed/docker-compose.yml b/deployments/docker/distributed/docker-compose.yml index d835276387..92eedd1550 100644 --- a/deployments/docker/distributed/docker-compose.yml +++ b/deployments/docker/distributed/docker-compose.yml @@ -31,7 +31,7 @@ services: image: registry.zilliz.com/milvus/milvus:master-release command: ["/milvus/bin/milvus", "run", "master"] environment: - ETCD_ADDRESS: etcd:2379 + ETCD_ENDPOINTS: etcd:2379 PULSAR_ADDRESS: pulsar://pulsar:6650 DATA_SERVICE_ADDRESS: dataservice:13333 INDEX_SERVICE_ADDRESS: indexservice:31000 @@ -47,7 +47,7 @@ services: image: registry.zilliz.com/milvus/milvus:master-release command: ["/milvus/bin/milvus", "run", "proxynode"] environment: - ETCD_ADDRESS: etcd:2379 + ETCD_ENDPOINTS: etcd:2379 PULSAR_ADDRESS: pulsar://pulsar:6650 MASTER_ADDRESS: master:53100 DATA_SERVICE_ADDRESS: dataservice:13333 @@ -91,7 +91,7 @@ services: image: registry.zilliz.com/milvus/milvus:master-release command: ["/milvus/bin/milvus", "run", "indexservice"] environment: - ETCD_ADDRESS: etcd:2379 + ETCD_ENDPOINTS: etcd:2379 MINIO_ADDRESS: minio:9000 MASTER_ADDRESS: master:53100 depends_on: @@ -116,7 +116,7 @@ services: image: registry.zilliz.com/milvus/milvus:master-release command: ["/milvus/bin/milvus", "run", "dataservice"] environment: - ETCD_ADDRESS: etcd:2379 + ETCD_ENDPOINTS: etcd:2379 MINIO_ADDRESS: minio:9000 PULSAR_ADDRESS: pulsar://pulsar:6650 MASTER_ADDRESS: master:53100 @@ -131,7 +131,7 @@ services: image: registry.zilliz.com/milvus/milvus:master-release command: ["/milvus/bin/milvus", "run", "datanode"] environment: - ETCD_ADDRESS: etcd:2379 + ETCD_ENDPOINTS: etcd:2379 MINIO_ADDRESS: minio:9000 PULSAR_ADDRESS: pulsar://pulsar:6650 MASTER_ADDRESS: master:53100 diff --git a/deployments/docker/standalone/docker-compose.yml b/deployments/docker/standalone/docker-compose.yml index 0924780f7c..5ef4c392b7 100644 --- a/deployments/docker/standalone/docker-compose.yml +++ b/deployments/docker/standalone/docker-compose.yml @@ -25,7 +25,7 @@ services: image: registry.zilliz.com/milvus/milvus:master-release command: ["/milvus/bin/standalone"] environment: - ETCD_ADDRESS: etcd:2379 + ETCD_ENDPOINTS: etcd:2379 MINIO_ADDRESS: minio:9000 ports: - "19530:19530" diff --git a/docker-compose.yml b/docker-compose.yml index aa8869c34d..8765186edd 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -21,7 +21,7 @@ services: environment: <<: *ccache PULSAR_ADDRESS: ${PULSAR_ADDRESS} - ETCD_ADDRESS: ${ETCD_ADDRESS} + ETCD_ENDPOINTS: ${ETCD_ENDPOINTS} MINIO_ADDRESS: ${MINIO_ADDRESS} volumes: &ubuntu-volumes - .:/go/src/github.com/milvus-io/milvus:delegated diff --git a/internal/allocator/global_id_test.go b/internal/allocator/global_id_test.go index 9e994d406c..65ac8d18ee 100644 --- a/internal/allocator/global_id_test.go +++ b/internal/allocator/global_id_test.go @@ -13,10 +13,9 @@ package allocator import ( "os" + "strings" "testing" - "github.com/milvus-io/milvus/internal/util/funcutil" - "github.com/milvus-io/milvus/internal/util/tsoutil" "github.com/stretchr/testify/assert" ) @@ -24,12 +23,12 @@ import ( var gTestIDAllocator *GlobalIDAllocator func TestGlobalTSOAllocator_All(t *testing.T) { - etcdAddress := os.Getenv("ETCD_ADDRESS") - if etcdAddress == "" { - ip := funcutil.GetLocalIP() - etcdAddress = ip + ":2379" + endpoints := os.Getenv("ETCD_ENDPOINTS") + if endpoints == "" { + endpoints = "localhost:2379" } - gTestIDAllocator = NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase([]string{etcdAddress}, "/test/root/kv", "gidTest")) + etcdEndpoints := strings.Split(endpoints, ",") + gTestIDAllocator = NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase(etcdEndpoints, "/test/root/kv", "gidTest")) t.Run("Initialize", func(t *testing.T) { err := gTestIDAllocator.Initialize() @@ -50,5 +49,4 @@ func TestGlobalTSOAllocator_All(t *testing.T) { assert.Nil(t, err) assert.Equal(t, count, uint32(idEnd-idStart)) }) - } diff --git a/internal/allocator/id.go b/internal/allocator/id.go index f70e8cebfc..7d88ce1d43 100644 --- a/internal/allocator/id.go +++ b/internal/allocator/id.go @@ -35,9 +35,9 @@ type UniqueID = typeutil.UniqueID type IDAllocator struct { Allocator - etcdAddr []string - metaRoot string - masterClient types.MasterService + etcdEndpoints []string + metaRoot string + masterClient types.MasterService countPerRPC uint32 @@ -47,8 +47,7 @@ type IDAllocator struct { PeerID UniqueID } -func NewIDAllocator(ctx context.Context, metaRoot string, etcdAddr []string) (*IDAllocator, error) { - +func NewIDAllocator(ctx context.Context, metaRoot string, etcdEndpoints []string) (*IDAllocator, error) { ctx1, cancel := context.WithCancel(ctx) a := &IDAllocator{ Allocator: Allocator{ @@ -56,9 +55,9 @@ func NewIDAllocator(ctx context.Context, metaRoot string, etcdAddr []string) (*I CancelFunc: cancel, Role: "IDAllocator", }, - countPerRPC: IDCountPerRPC, - metaRoot: metaRoot, - etcdAddr: etcdAddr, + countPerRPC: IDCountPerRPC, + metaRoot: metaRoot, + etcdEndpoints: etcdEndpoints, } a.TChan = &EmptyTicker{} a.Allocator.SyncFunc = a.syncID @@ -72,7 +71,7 @@ func NewIDAllocator(ctx context.Context, metaRoot string, etcdAddr []string) (*I func (ia *IDAllocator) Start() error { var err error - ia.masterClient, err = msc.NewClient(ia.metaRoot, ia.etcdAddr, 3*time.Second) + ia.masterClient, err = msc.NewClient(ia.metaRoot, ia.etcdEndpoints, 3*time.Second) if err != nil { panic(err) } diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 1c4c684b02..daee98bf37 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -127,7 +127,7 @@ func (node *DataNode) SetDataServiceInterface(ds types.DataService) error { // Register register data node at etcd func (node *DataNode) Register() error { - node.session = sessionutil.NewSession(node.ctx, Params.MetaRootPath, []string{Params.EtcdAddress}) + node.session = sessionutil.NewSession(node.ctx, Params.MetaRootPath, Params.EtcdEndpoints) node.session.Init(typeutil.DataNodeRole, Params.IP+":"+strconv.Itoa(Params.Port), false) Params.NodeID = node.session.ServerID return nil diff --git a/internal/datanode/mock_test.go b/internal/datanode/mock_test.go index 5bd9784af3..54fc567936 100644 --- a/internal/datanode/mock_test.go +++ b/internal/datanode/mock_test.go @@ -117,9 +117,9 @@ func refreshChannelNames() { } func clearEtcd(rootPath string) error { - etcdAddr := Params.EtcdAddress - log.Info("etcd tests address", zap.String("address", etcdAddr)) - etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) + etcdEndpoints := Params.EtcdEndpoints + log.Info("etcd tests address", zap.Any("endpoints", etcdEndpoints)) + etcdClient, err := clientv3.New(clientv3.Config{Endpoints: etcdEndpoints}) if err != nil { return err } diff --git a/internal/datanode/param_table.go b/internal/datanode/param_table.go index 3c2ad57bde..6e0cfd4d09 100644 --- a/internal/datanode/param_table.go +++ b/internal/datanode/param_table.go @@ -15,6 +15,7 @@ import ( "os" "path" "strconv" + "strings" "sync" "github.com/milvus-io/milvus/internal/log" @@ -49,8 +50,8 @@ type ParamTable struct { MsgChannelSubName string // --- ETCD --- - EtcdAddress string - MetaRootPath string + EtcdEndpoints []string + MetaRootPath string // --- MinIO --- MinioAddress string @@ -94,7 +95,7 @@ func (p *ParamTable) Init() { p.initMsgChannelSubName() // --- ETCD --- - p.initEtcdAddress() + p.initEtcdEndpoints() p.initMetaRootPath() // --- MinIO --- @@ -188,12 +189,12 @@ func (p *ParamTable) initMsgChannelSubName() { } // --- ETCD --- -func (p *ParamTable) initEtcdAddress() { - addr, err := p.Load("_EtcdAddress") +func (p *ParamTable) initEtcdEndpoints() { + endpoints, err := p.Load("_EtcdEndpoints") if err != nil { panic(err) } - p.EtcdAddress = addr + p.EtcdEndpoints = strings.Split(endpoints, ",") } func (p *ParamTable) initMetaRootPath() { diff --git a/internal/datanode/param_table_test.go b/internal/datanode/param_table_test.go index 0e1abeff6c..128181888b 100644 --- a/internal/datanode/param_table_test.go +++ b/internal/datanode/param_table_test.go @@ -65,9 +65,9 @@ func TestParamTable_DataNode(t *testing.T) { log.Println("MsgChannelSubName:", name) }) - t.Run("Test EtcdAddress", func(t *testing.T) { - addr := Params.EtcdAddress - log.Println("EtcdAddress:", addr) + t.Run("Test EtcdEndpoints", func(t *testing.T) { + endpoints := Params.EtcdEndpoints + log.Println("EtcdEndpoints:", endpoints) }) t.Run("Test MetaRootPath", func(t *testing.T) { diff --git a/internal/dataservice/param.go b/internal/dataservice/param.go index a5374d659d..25408a3923 100644 --- a/internal/dataservice/param.go +++ b/internal/dataservice/param.go @@ -12,6 +12,7 @@ package dataservice import ( "path" "strconv" + "strings" "sync" "github.com/milvus-io/milvus/internal/log" @@ -28,7 +29,7 @@ type ParamTable struct { Port int // --- ETCD --- - EtcdAddress string + EtcdEndpoints []string MetaRootPath string KvRootPath string SegmentBinlogSubPath string @@ -76,7 +77,7 @@ func (p *ParamTable) Init() { // set members p.initNodeID() - p.initEtcdAddress() + p.initEtcdEndpoints() p.initMetaRootPath() p.initKvRootPath() p.initSegmentBinlogSubPath() @@ -109,12 +110,12 @@ func (p *ParamTable) initNodeID() { p.NodeID = p.ParseInt64("dataservice.nodeID") } -func (p *ParamTable) initEtcdAddress() { - addr, err := p.Load("_EtcdAddress") +func (p *ParamTable) initEtcdEndpoints() { + endpoints, err := p.Load("_EtcdEndpoints") if err != nil { panic(err) } - p.EtcdAddress = addr + p.EtcdEndpoints = strings.Split(endpoints, ",") } func (p *ParamTable) initPulsarAddress() { diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go index f2b545be03..bbef29945d 100644 --- a/internal/dataservice/server.go +++ b/internal/dataservice/server.go @@ -84,8 +84,7 @@ func CreateServer(ctx context.Context, factory msgstream.Factory) (*Server, erro return datanodeclient.NewClient(addr, 3*time.Second) } s.masterClientCreator = func(addr string) (types.MasterService, error) { - return masterclient.NewClient(Params.MetaRootPath, - []string{Params.EtcdAddress}, masterClientTimout) + return masterclient.NewClient(Params.MetaRootPath, Params.EtcdEndpoints, masterClientTimout) } return s, nil @@ -93,7 +92,7 @@ func CreateServer(ctx context.Context, factory msgstream.Factory) (*Server, erro // Register register data service at etcd func (s *Server) Register() error { - s.session = sessionutil.NewSession(s.ctx, Params.MetaRootPath, []string{Params.EtcdAddress}) + s.session = sessionutil.NewSession(s.ctx, Params.MetaRootPath, Params.EtcdEndpoints) s.activeCh = s.session.Init(typeutil.DataServiceRole, Params.IP, true) Params.NodeID = s.session.ServerID return nil @@ -203,9 +202,7 @@ func (s *Server) initSegmentInfoChannel() error { func (s *Server) initMeta() error { connectEtcdFn := func() error { - etcdClient, err := clientv3.New(clientv3.Config{ - Endpoints: []string{Params.EtcdAddress}, - }) + etcdClient, err := clientv3.New(clientv3.Config{Endpoints: Params.EtcdEndpoints}) if err != nil { return err } diff --git a/internal/dataservice/server_test.go b/internal/dataservice/server_test.go index e0875082d0..333033396d 100644 --- a/internal/dataservice/server_test.go +++ b/internal/dataservice/server_test.go @@ -846,7 +846,7 @@ func newTestServer(t *testing.T, receiveCh chan interface{}) *Server { err = factory.SetParams(m) assert.Nil(t, err) - etcdCli, err := initEtcd(Params.EtcdAddress) + etcdCli, err := initEtcd(Params.EtcdEndpoints) assert.Nil(t, err) sessKey := path.Join(Params.MetaRootPath, sessionutil.DefaultServiceRoot) _, err = etcdCli.Delete(context.Background(), sessKey, clientv3.WithPrefix()) @@ -877,10 +877,10 @@ func closeTestServer(t *testing.T, svr *Server) { assert.Nil(t, err) } -func initEtcd(etcdAddress string) (*clientv3.Client, error) { +func initEtcd(etcdEndpoints []string) (*clientv3.Client, error) { var etcdCli *clientv3.Client connectEtcdFn := func() error { - etcd, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddress}, DialTimeout: 5 * time.Second}) + etcd, err := clientv3.New(clientv3.Config{Endpoints: etcdEndpoints, DialTimeout: 5 * time.Second}) if err != nil { return err } diff --git a/internal/distributed/datanode/datanode_test.go b/internal/distributed/datanode/datanode_test.go index 3de7546e0e..44548ed18d 100644 --- a/internal/distributed/datanode/datanode_test.go +++ b/internal/distributed/datanode/datanode_test.go @@ -125,7 +125,7 @@ func TestRun(t *testing.T) { dnServer.newMasterServiceClient = func() (types.MasterService, error) { return &mockMaster{}, nil } - dnServer.newDataServiceClient = func(etcdMetaRoot, etcdAddress string, timeout time.Duration) types.DataService { + dnServer.newDataServiceClient = func(string, []string, time.Duration) types.DataService { return &mockDataService{} } diff --git a/internal/distributed/datanode/service.go b/internal/distributed/datanode/service.go index 07869c6455..15ea7e2797 100644 --- a/internal/distributed/datanode/service.go +++ b/internal/distributed/datanode/service.go @@ -56,7 +56,7 @@ type Server struct { dataService types.DataService newMasterServiceClient func() (types.MasterService, error) - newDataServiceClient func(string, string, time.Duration) types.DataService + newDataServiceClient func(string, []string, time.Duration) types.DataService closer io.Closer } @@ -70,10 +70,10 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) msFactory: factory, grpcErrChan: make(chan error), newMasterServiceClient: func() (types.MasterService, error) { - return msc.NewClient(dn.Params.MetaRootPath, []string{dn.Params.EtcdAddress}, 3*time.Second) + return msc.NewClient(dn.Params.MetaRootPath, dn.Params.EtcdEndpoints, 3*time.Second) }, - newDataServiceClient: func(etcdMetaRoot, etcdAddress string, timeout time.Duration) types.DataService { - return dsc.NewClient(etcdMetaRoot, []string{etcdAddress}, timeout) + newDataServiceClient: func(etcdMetaRoot string, etcdEndpoints []string, timeout time.Duration) types.DataService { + return dsc.NewClient(etcdMetaRoot, etcdEndpoints, timeout) }, } @@ -206,7 +206,7 @@ func (s *Server) init() error { if s.newDataServiceClient != nil { log.Debug("Data service address", zap.String("address", Params.DataServiceAddress)) log.Debug("DataNode Init data service client ...") - dataServiceClient := s.newDataServiceClient(dn.Params.MetaRootPath, dn.Params.EtcdAddress, 10*time.Second) + dataServiceClient := s.newDataServiceClient(dn.Params.MetaRootPath, dn.Params.EtcdEndpoints, 10*time.Second) if err = dataServiceClient.Init(); err != nil { log.Debug("DataNode newDataServiceClient failed", zap.Error(err)) panic(err) diff --git a/internal/distributed/dataservice/client/client.go b/internal/distributed/dataservice/client/client.go index 6c692bf6ef..608d2e3b2b 100644 --- a/internal/distributed/dataservice/client/client.go +++ b/internal/distributed/dataservice/client/client.go @@ -59,8 +59,8 @@ func getDataServiceAddress(sess *sessionutil.Session) (string, error) { return ms.Address, nil } -func NewClient(metaRoot string, etcdAddr []string, timeout time.Duration) *Client { - sess := sessionutil.NewSession(context.Background(), metaRoot, etcdAddr) +func NewClient(metaRoot string, etcdEndpoints []string, timeout time.Duration) *Client { + sess := sessionutil.NewSession(context.Background(), metaRoot, etcdEndpoints) return &Client{ ctx: context.Background(), sess: sess, diff --git a/internal/distributed/indexnode/service.go b/internal/distributed/indexnode/service.go index 86b9484da6..90448a397f 100644 --- a/internal/distributed/indexnode/service.go +++ b/internal/distributed/indexnode/service.go @@ -139,7 +139,7 @@ func (s *Server) init() error { return err } - s.indexServiceClient = grpcindexserviceclient.NewClient(indexnode.Params.MetaRootPath, []string{indexnode.Params.EtcdAddress}, 3*time.Second) + s.indexServiceClient = grpcindexserviceclient.NewClient(indexnode.Params.MetaRootPath, indexnode.Params.EtcdEndpoints, 3*time.Second) err = s.indexServiceClient.Init() if err != nil { log.Debug("IndexNode indexSerticeClient init failed", zap.Error(err)) diff --git a/internal/distributed/indexservice/client/client.go b/internal/distributed/indexservice/client/client.go index 948417f774..b178888ffc 100644 --- a/internal/distributed/indexservice/client/client.go +++ b/internal/distributed/indexservice/client/client.go @@ -62,8 +62,8 @@ func getIndexServiceaddr(sess *sessionutil.Session) (string, error) { return ms.Address, nil } -func NewClient(metaRoot string, etcdAddr []string, timeout time.Duration) *Client { - sess := sessionutil.NewSession(context.Background(), metaRoot, etcdAddr) +func NewClient(metaRoot string, etcdEndpoints []string, timeout time.Duration) *Client { + sess := sessionutil.NewSession(context.Background(), metaRoot, etcdEndpoints) return &Client{ ctx: context.Background(), sess: sess, diff --git a/internal/distributed/masterservice/client/client.go b/internal/distributed/masterservice/client/client.go index 7c3574d8c5..5de8df8b9f 100644 --- a/internal/distributed/masterservice/client/client.go +++ b/internal/distributed/masterservice/client/client.go @@ -61,8 +61,8 @@ func getMasterServiceAddr(sess *sessionutil.Session) (string, error) { return ms.Address, nil } -func NewClient(metaRoot string, etcdAddr []string, timeout time.Duration) (*GrpcClient, error) { - sess := sessionutil.NewSession(context.Background(), metaRoot, etcdAddr) +func NewClient(metaRoot string, etcdEndpoints []string, timeout time.Duration) (*GrpcClient, error) { + sess := sessionutil.NewSession(context.Background(), metaRoot, etcdEndpoints) if sess == nil { err := fmt.Errorf("new session error, maybe can not connect to etcd") log.Debug("MasterServiceClient NewClient failed", zap.Error(err)) diff --git a/internal/distributed/masterservice/masterservice_test.go b/internal/distributed/masterservice/masterservice_test.go index 6f8b18eb6a..0f98ff5eaf 100644 --- a/internal/distributed/masterservice/masterservice_test.go +++ b/internal/distributed/masterservice/masterservice_test.go @@ -148,7 +148,7 @@ func TestGrpcService(t *testing.T) { assert.Nil(t, err) svr.masterService.UpdateStateCode(internalpb.StateCode_Initializing) - etcdCli, err := initEtcd(cms.Params.EtcdAddress) + etcdCli, err := initEtcd(cms.Params.EtcdEndpoints) assert.Nil(t, err) sessKey := path.Join(cms.Params.MetaRootPath, sessionutil.DefaultServiceRoot) _, err = etcdCli.Delete(ctx, sessKey, clientv3.WithPrefix()) @@ -255,7 +255,7 @@ func TestGrpcService(t *testing.T) { svr.masterService.UpdateStateCode(internalpb.StateCode_Healthy) - cli, err := grpcmasterserviceclient.NewClient(cms.Params.MetaRootPath, []string{cms.Params.EtcdAddress}, 3*time.Second) + cli, err := grpcmasterserviceclient.NewClient(cms.Params.MetaRootPath, cms.Params.EtcdEndpoints, 3*time.Second) assert.Nil(t, err) err = cli.Init() @@ -921,13 +921,13 @@ func TestRun(t *testing.T) { assert.NotNil(t, err) assert.EqualError(t, err, "listen tcp: address 1000000: invalid port") - svr.newDataServiceClient = func(metaRoot, address string, timeout time.Duration) types.DataService { + svr.newDataServiceClient = func(string, []string, time.Duration) types.DataService { return &mockDataService{} } - svr.newIndexServiceClient = func(etcdAddress, metaRootPath string, timeout time.Duration) types.IndexService { + svr.newIndexServiceClient = func(string, []string, time.Duration) types.IndexService { return &mockIndex{} } - svr.newQueryServiceClient = func(metaRootPath, etcdAddress string, timeout time.Duration) types.QueryService { + svr.newQueryServiceClient = func(string, []string, time.Duration) types.QueryService { return &mockQuery{} } @@ -938,7 +938,7 @@ func TestRun(t *testing.T) { cms.Params.Init() cms.Params.MetaRootPath = fmt.Sprintf("/%d/test/meta", randVal) - etcdCli, err := initEtcd(cms.Params.EtcdAddress) + etcdCli, err := initEtcd(cms.Params.EtcdEndpoints) assert.Nil(t, err) sessKey := path.Join(cms.Params.MetaRootPath, sessionutil.DefaultServiceRoot) _, err = etcdCli.Delete(ctx, sessKey, clientv3.WithPrefix()) @@ -951,10 +951,10 @@ func TestRun(t *testing.T) { } -func initEtcd(etcdAddress string) (*clientv3.Client, error) { +func initEtcd(etcdEndpoints []string) (*clientv3.Client, error) { var etcdCli *clientv3.Client connectEtcdFn := func() error { - etcd, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddress}, DialTimeout: 5 * time.Second}) + etcd, err := clientv3.New(clientv3.Config{Endpoints: etcdEndpoints, DialTimeout: 5 * time.Second}) if err != nil { return err } diff --git a/internal/distributed/masterservice/server.go b/internal/distributed/masterservice/server.go index 0c3edc2b52..edbbc6a7d5 100644 --- a/internal/distributed/masterservice/server.go +++ b/internal/distributed/masterservice/server.go @@ -58,9 +58,9 @@ type Server struct { indexService types.IndexService queryService types.QueryService - newIndexServiceClient func(string, string, time.Duration) types.IndexService - newDataServiceClient func(string, string, time.Duration) types.DataService - newQueryServiceClient func(string, string, time.Duration) types.QueryService + newIndexServiceClient func(string, []string, time.Duration) types.IndexService + newDataServiceClient func(string, []string, time.Duration) types.DataService + newQueryServiceClient func(string, []string, time.Duration) types.QueryService closer io.Closer } @@ -84,8 +84,8 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) func (s *Server) setClient() { ctx := context.Background() - s.newDataServiceClient = func(etcdMetaRoot, etcdAddress string, timeout time.Duration) types.DataService { - dsClient := dsc.NewClient(etcdMetaRoot, []string{etcdAddress}, timeout) + s.newDataServiceClient = func(etcdMetaRoot string, etcdEndpoints []string, timeout time.Duration) types.DataService { + dsClient := dsc.NewClient(etcdMetaRoot, etcdEndpoints, timeout) if err := dsClient.Init(); err != nil { panic(err) } @@ -97,8 +97,8 @@ func (s *Server) setClient() { } return dsClient } - s.newIndexServiceClient = func(metaRootPath, etcdAddress string, timeout time.Duration) types.IndexService { - isClient := isc.NewClient(metaRootPath, []string{etcdAddress}, timeout) + s.newIndexServiceClient = func(metaRootPath string, etcdEndpoints []string, timeout time.Duration) types.IndexService { + isClient := isc.NewClient(metaRootPath, etcdEndpoints, timeout) if err := isClient.Init(); err != nil { panic(err) } @@ -107,8 +107,8 @@ func (s *Server) setClient() { } return isClient } - s.newQueryServiceClient = func(metaRootPath, etcdAddress string, timeout time.Duration) types.QueryService { - qsClient, err := qsc.NewClient(metaRootPath, []string{etcdAddress}, timeout) + s.newQueryServiceClient = func(metaRootPath string, etcdEndpoints []string, timeout time.Duration) types.QueryService { + qsClient, err := qsc.NewClient(metaRootPath, etcdEndpoints, timeout) if err != nil { panic(err) } @@ -174,7 +174,7 @@ func (s *Server) init() error { if s.newDataServiceClient != nil { log.Debug("MasterService start to create DataService client") - dataService := s.newDataServiceClient(cms.Params.MetaRootPath, cms.Params.EtcdAddress, 3*time.Second) + dataService := s.newDataServiceClient(cms.Params.MetaRootPath, cms.Params.EtcdEndpoints, 3*time.Second) if err := s.masterService.SetDataService(ctx, dataService); err != nil { panic(err) } @@ -182,7 +182,7 @@ func (s *Server) init() error { } if s.newIndexServiceClient != nil { log.Debug("MasterService start to create IndexService client") - indexService := s.newIndexServiceClient(cms.Params.MetaRootPath, cms.Params.EtcdAddress, 3*time.Second) + indexService := s.newIndexServiceClient(cms.Params.MetaRootPath, cms.Params.EtcdEndpoints, 3*time.Second) if err := s.masterService.SetIndexService(indexService); err != nil { panic(err) } @@ -190,7 +190,7 @@ func (s *Server) init() error { } if s.newQueryServiceClient != nil { log.Debug("MasterService start to create QueryService client") - queryService := s.newQueryServiceClient(cms.Params.MetaRootPath, cms.Params.EtcdAddress, 3*time.Second) + queryService := s.newQueryServiceClient(cms.Params.MetaRootPath, cms.Params.EtcdEndpoints, 3*time.Second) if err := s.masterService.SetQueryService(queryService); err != nil { panic(err) } diff --git a/internal/distributed/proxynode/service.go b/internal/distributed/proxynode/service.go index 1966b1cd6d..571a0fa288 100644 --- a/internal/distributed/proxynode/service.go +++ b/internal/distributed/proxynode/service.go @@ -171,7 +171,7 @@ func (s *Server) init() error { masterServiceAddr := Params.MasterAddress log.Debug("ProxyNode", zap.String("master address", masterServiceAddr)) timeout := 3 * time.Second - s.masterServiceClient, err = grpcmasterserviceclient.NewClient(proxynode.Params.MetaRootPath, []string{proxynode.Params.EtcdAddress}, timeout) + s.masterServiceClient, err = grpcmasterserviceclient.NewClient(proxynode.Params.MetaRootPath, proxynode.Params.EtcdEndpoints, timeout) if err != nil { log.Debug("ProxyNode new masterServiceClient failed ", zap.Error(err)) return err @@ -192,7 +192,7 @@ func (s *Server) init() error { dataServiceAddr := Params.DataServiceAddress log.Debug("ProxyNode", zap.String("data service address", dataServiceAddr)) - s.dataServiceClient = grpcdataserviceclient.NewClient(proxynode.Params.MetaRootPath, []string{proxynode.Params.EtcdAddress}, timeout) + s.dataServiceClient = grpcdataserviceclient.NewClient(proxynode.Params.MetaRootPath, proxynode.Params.EtcdEndpoints, timeout) err = s.dataServiceClient.Init() if err != nil { log.Debug("ProxyNode dataServiceClient init failed ", zap.Error(err)) @@ -203,7 +203,7 @@ func (s *Server) init() error { indexServiceAddr := Params.IndexServerAddress log.Debug("ProxyNode", zap.String("index server address", indexServiceAddr)) - s.indexServiceClient = grpcindexserviceclient.NewClient(proxynode.Params.MetaRootPath, []string{proxynode.Params.EtcdAddress}, timeout) + s.indexServiceClient = grpcindexserviceclient.NewClient(proxynode.Params.MetaRootPath, proxynode.Params.EtcdEndpoints, timeout) err = s.indexServiceClient.Init() if err != nil { log.Debug("ProxyNode indexServiceClient init failed ", zap.Error(err)) @@ -214,7 +214,7 @@ func (s *Server) init() error { queryServiceAddr := Params.QueryServiceAddress log.Debug("ProxyNode", zap.String("query server address", queryServiceAddr)) - s.queryServiceClient, err = grpcqueryserviceclient.NewClient(proxynode.Params.MetaRootPath, []string{proxynode.Params.EtcdAddress}, timeout) + s.queryServiceClient, err = grpcqueryserviceclient.NewClient(proxynode.Params.MetaRootPath, proxynode.Params.EtcdEndpoints, timeout) if err != nil { return err } diff --git a/internal/distributed/querynode/service.go b/internal/distributed/querynode/service.go index 8ad28174a6..bfdc9949bc 100644 --- a/internal/distributed/querynode/service.go +++ b/internal/distributed/querynode/service.go @@ -105,7 +105,7 @@ func (s *Server) init() error { } // --- QueryService --- log.Debug("QueryNode start to new QueryServiceClient", zap.Any("QueryServiceAddress", Params.QueryServiceAddress)) - queryService, err := qsc.NewClient(qn.Params.MetaRootPath, []string{qn.Params.EtcdAddress}, 3*time.Second) + queryService, err := qsc.NewClient(qn.Params.MetaRootPath, qn.Params.EtcdEndpoints, 3*time.Second) if err != nil { log.Debug("QueryNode new QueryServiceClient failed", zap.Error(err)) panic(err) @@ -138,7 +138,7 @@ func (s *Server) init() error { addr := Params.MasterAddress log.Debug("QueryNode start to new MasterServiceClient", zap.Any("QueryServiceAddress", addr)) - masterService, err := msc.NewClient(qn.Params.MetaRootPath, []string{qn.Params.EtcdAddress}, 3*time.Second) + masterService, err := msc.NewClient(qn.Params.MetaRootPath, qn.Params.EtcdEndpoints, 3*time.Second) if err != nil { log.Debug("QueryNode new MasterServiceClient failed", zap.Error(err)) panic(err) @@ -167,7 +167,7 @@ func (s *Server) init() error { // --- IndexService --- log.Debug("Index service", zap.String("address", Params.IndexServiceAddress)) - indexService := isc.NewClient(qn.Params.MetaRootPath, []string{qn.Params.EtcdAddress}, 3*time.Second) + indexService := isc.NewClient(qn.Params.MetaRootPath, qn.Params.EtcdEndpoints, 3*time.Second) if err := indexService.Init(); err != nil { log.Debug("QueryNode IndexServiceClient Init failed", zap.Error(err)) @@ -193,7 +193,7 @@ func (s *Server) init() error { // --- DataService --- log.Debug("QueryNode start to new DataServiceClient", zap.Any("DataServiceAddress", Params.DataServiceAddress)) - dataService := dsc.NewClient(qn.Params.MetaRootPath, []string{qn.Params.EtcdAddress}, 3*time.Second) + dataService := dsc.NewClient(qn.Params.MetaRootPath, qn.Params.EtcdEndpoints, 3*time.Second) if err = dataService.Init(); err != nil { log.Debug("QueryNode DataServiceClient Init failed", zap.Error(err)) panic(err) diff --git a/internal/distributed/queryservice/client/client.go b/internal/distributed/queryservice/client/client.go index ef615524d8..6d5fdff6cf 100644 --- a/internal/distributed/queryservice/client/client.go +++ b/internal/distributed/queryservice/client/client.go @@ -59,8 +59,8 @@ func getQueryServiceAddress(sess *sessionutil.Session) (string, error) { } // NewClient creates a client for QueryService grpc call. -func NewClient(metaRootPath string, etcdAddr []string, timeout time.Duration) (*Client, error) { - sess := sessionutil.NewSession(context.Background(), metaRootPath, etcdAddr) +func NewClient(metaRootPath string, etcdEndpoints []string, timeout time.Duration) (*Client, error) { + sess := sessionutil.NewSession(context.Background(), metaRootPath, etcdEndpoints) return &Client{ ctx: context.Background(), diff --git a/internal/distributed/queryservice/service.go b/internal/distributed/queryservice/service.go index 45ab143906..985f230a67 100644 --- a/internal/distributed/queryservice/service.go +++ b/internal/distributed/queryservice/service.go @@ -109,7 +109,7 @@ func (s *Server) init() error { // --- Master Server Client --- log.Debug("QueryService try to new MasterService client", zap.Any("MasterServiceAddress", Params.MasterAddress)) - masterService, err := msc.NewClient(qs.Params.MetaRootPath, []string{qs.Params.EtcdAddress}, 3*time.Second) + masterService, err := msc.NewClient(qs.Params.MetaRootPath, qs.Params.EtcdEndpoints, 3*time.Second) if err != nil { log.Debug("QueryService try to new MasterService client failed", zap.Error(err)) panic(err) @@ -140,7 +140,7 @@ func (s *Server) init() error { // --- Data service client --- log.Debug("QueryService try to new DataService client", zap.Any("DataServiceAddress", Params.DataServiceAddress)) - dataService := dsc.NewClient(qs.Params.MetaRootPath, []string{qs.Params.EtcdAddress}, 3*time.Second) + dataService := dsc.NewClient(qs.Params.MetaRootPath, qs.Params.EtcdEndpoints, 3*time.Second) if err = dataService.Init(); err != nil { log.Debug("QueryService DataServiceClient Init failed", zap.Error(err)) panic(err) diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index 456769ea68..20a9083ad4 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -84,7 +84,7 @@ func NewIndexNode(ctx context.Context) (*IndexNode, error) { // Register register index node at etcd func (i *IndexNode) Register() error { - i.session = sessionutil.NewSession(i.loopCtx, Params.MetaRootPath, []string{Params.EtcdAddress}) + i.session = sessionutil.NewSession(i.loopCtx, Params.MetaRootPath, Params.EtcdEndpoints) i.session.Init(typeutil.IndexNodeRole, Params.IP+":"+strconv.Itoa(Params.Port), false) Params.NodeID = i.session.ServerID return nil @@ -94,7 +94,7 @@ func (i *IndexNode) Init() error { ctx := context.Background() connectEtcdFn := func() error { - etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}}) + etcdClient, err := clientv3.New(clientv3.Config{Endpoints: Params.EtcdEndpoints}) i.etcdKV = etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath) return err } diff --git a/internal/indexnode/paramtable.go b/internal/indexnode/paramtable.go index afe0631ed4..c50ef8e7ad 100644 --- a/internal/indexnode/paramtable.go +++ b/internal/indexnode/paramtable.go @@ -16,6 +16,7 @@ import ( "fmt" "path" "strconv" + "strings" "sync" "go.uber.org/zap" @@ -42,8 +43,8 @@ type ParamTable struct { MasterAddress string - EtcdAddress string - MetaRootPath string + EtcdEndpoints []string + MetaRootPath string MinIOAddress string MinIOAccessKeyID string @@ -71,7 +72,7 @@ func (pt *ParamTable) initParams() { pt.initMinIOSecretAccessKey() pt.initMinIOUseSSL() pt.initMinioBucketName() - pt.initEtcdAddress() + pt.initEtcdEndpoints() pt.initMetaRootPath() } @@ -159,12 +160,12 @@ func (pt *ParamTable) initMinIOUseSSL() { } } -func (pt *ParamTable) initEtcdAddress() { - addr, err := pt.Load("_EtcdAddress") +func (pt *ParamTable) initEtcdEndpoints() { + endpoints, err := pt.Load("_EtcdEndpoints") if err != nil { panic(err) } - pt.EtcdAddress = addr + pt.EtcdEndpoints = strings.Split(endpoints, ",") } func (pt *ParamTable) initMetaRootPath() { diff --git a/internal/indexservice/indexservice.go b/internal/indexservice/indexservice.go index 0e17faf9c2..0eb0cc3898 100644 --- a/internal/indexservice/indexservice.go +++ b/internal/indexservice/indexservice.go @@ -99,18 +99,18 @@ func NewIndexService(ctx context.Context) (*IndexService, error) { // Register register index service at etcd func (i *IndexService) Register() error { - i.session = sessionutil.NewSession(i.loopCtx, Params.MetaRootPath, []string{Params.EtcdAddress}) + i.session = sessionutil.NewSession(i.loopCtx, Params.MetaRootPath, Params.EtcdEndpoints) i.session.Init(typeutil.IndexServiceRole, Params.Address, true) i.eventChan = i.session.WatchServices(typeutil.IndexNodeRole, 0) return nil } func (i *IndexService) Init() error { - log.Debug("IndexService", zap.String("etcd address", Params.EtcdAddress)) + log.Debug("IndexService", zap.Any("etcd endpoints", Params.EtcdEndpoints)) i.assignChan = make(chan []UniqueID, 1024) connectEtcdFn := func() error { - etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}}) + etcdClient, err := clientv3.New(clientv3.Config{Endpoints: Params.EtcdEndpoints}) if err != nil { return err } @@ -132,7 +132,7 @@ func (i *IndexService) Init() error { //init idAllocator kvRootPath := Params.KvRootPath - i.idAllocator = allocator.NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase([]string{Params.EtcdAddress}, kvRootPath, "index_gid")) + i.idAllocator = allocator.NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase(Params.EtcdEndpoints, kvRootPath, "index_gid")) if err := i.idAllocator.Initialize(); err != nil { log.Debug("IndexService idAllocator initialize failed", zap.Error(err)) return err diff --git a/internal/indexservice/paramtable.go b/internal/indexservice/paramtable.go index 828a7748fd..279ac92afa 100644 --- a/internal/indexservice/paramtable.go +++ b/internal/indexservice/paramtable.go @@ -14,6 +14,7 @@ package indexservice import ( "path" "strconv" + "strings" "sync" "github.com/milvus-io/milvus/internal/log" @@ -28,9 +29,9 @@ type ParamTable struct { MasterAddress string - EtcdAddress string - KvRootPath string - MetaRootPath string + EtcdEndpoints []string + KvRootPath string + MetaRootPath string MinIOAddress string MinIOAccessKeyID string @@ -48,7 +49,7 @@ func (pt *ParamTable) Init() { once.Do(func() { pt.BaseTable.Init() pt.initLogCfg() - pt.initEtcdAddress() + pt.initEtcdEndpoints() pt.initMasterAddress() pt.initMetaRootPath() pt.initKvRootPath() @@ -60,12 +61,12 @@ func (pt *ParamTable) Init() { }) } -func (pt *ParamTable) initEtcdAddress() { - addr, err := pt.Load("_EtcdAddress") +func (pt *ParamTable) initEtcdEndpoints() { + endpoints, err := pt.Load("_EtcdEndpoints") if err != nil { panic(err) } - pt.EtcdAddress = addr + pt.EtcdEndpoints = strings.Split(endpoints, ",") } func (pt *ParamTable) initMetaRootPath() { diff --git a/internal/kv/etcd/etcd_kv_test.go b/internal/kv/etcd/etcd_kv_test.go index b40dc87a9d..6159eb4f7b 100644 --- a/internal/kv/etcd/etcd_kv_test.go +++ b/internal/kv/etcd/etcd_kv_test.go @@ -13,6 +13,7 @@ package etcdkv_test import ( "os" + "strings" "testing" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" @@ -29,14 +30,17 @@ func TestMain(m *testing.M) { os.Exit(code) } -func TestEtcdKV_Load(t *testing.T) { - - etcdAddr, err := Params.Load("_EtcdAddress") +func newEtcdClient() (*clientv3.Client, error) { + endpoints, err := Params.Load("_EtcdEndpoints") if err != nil { panic(err) } + etcdEndpoints := strings.Split(endpoints, ",") + return clientv3.New(clientv3.Config{Endpoints: etcdEndpoints}) +} - cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) +func TestEtcdKV_Load(t *testing.T) { + cli, err := newEtcdClient() assert.Nil(t, err) rootPath := "/etcd/test/root" etcdKV := etcdkv.NewEtcdKV(cli, rootPath) @@ -88,13 +92,7 @@ func TestEtcdKV_Load(t *testing.T) { } func TestEtcdKV_MultiSave(t *testing.T) { - - etcdAddr, err := Params.Load("_EtcdAddress") - if err != nil { - panic(err) - } - - cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) + cli, err := newEtcdClient() assert.Nil(t, err) rootPath := "/etcd/test/root" etcdKV := etcdkv.NewEtcdKV(cli, rootPath) @@ -119,13 +117,7 @@ func TestEtcdKV_MultiSave(t *testing.T) { } func TestEtcdKV_Remove(t *testing.T) { - - etcdAddr, err := Params.Load("_EtcdAddress") - if err != nil { - panic(err) - } - - cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) + cli, err := newEtcdClient() assert.Nil(t, err) rootPath := "/etcd/test/root" etcdKV := etcdkv.NewEtcdKV(cli, rootPath) @@ -190,13 +182,7 @@ func TestEtcdKV_Remove(t *testing.T) { } func TestEtcdKV_MultiSaveAndRemove(t *testing.T) { - - etcdAddr, err := Params.Load("_EtcdAddress") - if err != nil { - panic(err) - } - - cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) + cli, err := newEtcdClient() assert.Nil(t, err) rootPath := "/etcd/test/root" etcdKV := etcdkv.NewEtcdKV(cli, rootPath) @@ -232,12 +218,7 @@ func TestEtcdKV_MultiSaveAndRemove(t *testing.T) { } func TestEtcdKV_MultiRemoveWithPrefix(t *testing.T) { - etcdAddr, err := Params.Load("_EtcdAddress") - if err != nil { - panic(err) - } - - cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) + cli, err := newEtcdClient() assert.Nil(t, err) rootPath := "/etcd/test/root" etcdKV := etcdkv.NewEtcdKV(cli, rootPath) @@ -263,12 +244,7 @@ func TestEtcdKV_MultiRemoveWithPrefix(t *testing.T) { } func TestEtcdKV_MultiSaveAndRemoveWithPrefix(t *testing.T) { - etcdAddr, err := Params.Load("_EtcdAddress") - if err != nil { - panic(err) - } - - cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) + cli, err := newEtcdClient() assert.Nil(t, err) rootPath := "/etcd/test/root" etcdKV := etcdkv.NewEtcdKV(cli, rootPath) @@ -303,12 +279,7 @@ func TestEtcdKV_MultiSaveAndRemoveWithPrefix(t *testing.T) { } func TestEtcdKV_Watch(t *testing.T) { - etcdAddr, err := Params.Load("_EtcdAddress") - if err != nil { - panic(err) - } - - cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) + cli, err := newEtcdClient() assert.Nil(t, err) rootPath := "/etcd/test/root" etcdKV := etcdkv.NewEtcdKV(cli, rootPath) @@ -323,12 +294,7 @@ func TestEtcdKV_Watch(t *testing.T) { } func TestEtcdKV_WatchPrefix(t *testing.T) { - etcdAddr, err := Params.Load("_EtcdAddress") - if err != nil { - panic(err) - } - - cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) + cli, err := newEtcdClient() assert.Nil(t, err) rootPath := "/etcd/test/root" etcdKV := etcdkv.NewEtcdKV(cli, rootPath) @@ -343,12 +309,7 @@ func TestEtcdKV_WatchPrefix(t *testing.T) { } func TestEtcdKV_CompareAndSwap(t *testing.T) { - etcdAddr, err := Params.Load("_EtcdAddress") - if err != nil { - panic(err) - } - - cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) + cli, err := newEtcdClient() assert.Nil(t, err) rootPath := "/etcd/test/root" etcdKV := etcdkv.NewEtcdKV(cli, rootPath) diff --git a/internal/kv/etcd/etcd_stats_watcher_test.go b/internal/kv/etcd/etcd_stats_watcher_test.go index 38d70baef8..a4335766e6 100644 --- a/internal/kv/etcd/etcd_stats_watcher_test.go +++ b/internal/kv/etcd/etcd_stats_watcher_test.go @@ -14,6 +14,7 @@ package etcdkv import ( "context" "math/rand" + "strings" "testing" "time" @@ -26,9 +27,10 @@ func TestEtcdStatsWatcher(t *testing.T) { rand.Seed(time.Now().UnixNano()) var p paramtable.BaseTable p.Init() - addr, err := p.Load("_EtcdAddress") + endpoints, err := p.Load("_EtcdEndpoints") assert.Nil(t, err) - cli, err := clientv3.New(clientv3.Config{Endpoints: []string{addr}}) + etcdEndpoints := strings.Split(endpoints, ",") + cli, err := clientv3.New(clientv3.Config{Endpoints: etcdEndpoints}) assert.Nil(t, err) defer cli.Close() w := NewEtcdStatsWatcher(cli) @@ -55,15 +57,15 @@ func TestEtcdStatsWatcher(t *testing.T) { <-receiveCh size := w.GetSize() assert.EqualValues(t, 4, size) - } func TestEtcdStatsWatcherDone(t *testing.T) { var p paramtable.BaseTable p.Init() - addr, err := p.Load("_EtcdAddress") + endpoints, err := p.Load("_EtcdEndpoints") assert.Nil(t, err) - cli, err := clientv3.New(clientv3.Config{Endpoints: []string{addr}}) + etcdEndpoints := strings.Split(endpoints, ",") + cli, err := clientv3.New(clientv3.Config{Endpoints: etcdEndpoints}) assert.Nil(t, err) defer cli.Close() w := NewEtcdStatsWatcher(cli) diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go index 59ea2ab481..4ccbe221a3 100644 --- a/internal/masterservice/master_service.go +++ b/internal/masterservice/master_service.go @@ -908,7 +908,7 @@ func (c *Core) BuildIndex(segID typeutil.UniqueID, field *schemapb.FieldSchema, // Register register master service at etcd func (c *Core) Register() error { - c.session = sessionutil.NewSession(c.ctx, Params.MetaRootPath, []string{Params.EtcdAddress}) + c.session = sessionutil.NewSession(c.ctx, Params.MetaRootPath, Params.EtcdEndpoints) if c.session == nil { return fmt.Errorf("session is nil, maybe the etcd client connection fails") } @@ -920,7 +920,7 @@ func (c *Core) Init() error { var initError error = nil c.initOnce.Do(func() { connectEtcdFn := func() error { - if c.etcdCli, initError = clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}, DialTimeout: 5 * time.Second}); initError != nil { + if c.etcdCli, initError = clientv3.New(clientv3.Config{Endpoints: Params.EtcdEndpoints, DialTimeout: 5 * time.Second}); initError != nil { return initError } tsAlloc := func() typeutil.Timestamp { @@ -950,7 +950,7 @@ func (c *Core) Init() error { return } - idAllocator := allocator.NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase([]string{Params.EtcdAddress}, Params.KvRootPath, "gid")) + idAllocator := allocator.NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase(Params.EtcdEndpoints, Params.KvRootPath, "gid")) if initError = idAllocator.Initialize(); initError != nil { return } @@ -961,7 +961,7 @@ func (c *Core) Init() error { return idAllocator.UpdateID() } - tsoAllocator := tso.NewGlobalTSOAllocator("timestamp", tsoutil.NewTSOKVBase([]string{Params.EtcdAddress}, Params.KvRootPath, "tso")) + tsoAllocator := tso.NewGlobalTSOAllocator("timestamp", tsoutil.NewTSOKVBase(Params.EtcdEndpoints, Params.KvRootPath, "tso")) if initError = tsoAllocator.Initialize(); initError != nil { return } @@ -990,7 +990,7 @@ func (c *Core) Init() error { c.proxyNodeManager, initError = newProxyNodeManager( c.ctx, - []string{Params.EtcdAddress}, + Params.EtcdEndpoints, c.chanTimeTick.GetProxyNodes, c.proxyClientManager.GetProxyClients, ) diff --git a/internal/masterservice/master_service_test.go b/internal/masterservice/master_service_test.go index a247ad9c0b..4177482366 100644 --- a/internal/masterservice/master_service_test.go +++ b/internal/masterservice/master_service_test.go @@ -280,7 +280,7 @@ func TestMasterService(t *testing.T) { err = core.Register() assert.Nil(t, err) - etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}, DialTimeout: 5 * time.Second}) + etcdCli, err := clientv3.New(clientv3.Config{Endpoints: Params.EtcdEndpoints, DialTimeout: 5 * time.Second}) assert.Nil(t, err) sessKey := path.Join(Params.MetaRootPath, sessionutil.DefaultServiceRoot) _, err = etcdCli.Delete(ctx, sessKey, clientv3.WithPrefix()) diff --git a/internal/masterservice/meta_snapshot_test.go b/internal/masterservice/meta_snapshot_test.go index dce55e1f6a..94f3a46210 100644 --- a/internal/masterservice/meta_snapshot_test.go +++ b/internal/masterservice/meta_snapshot_test.go @@ -29,11 +29,10 @@ func TestMetaSnapshot(t *testing.T) { randVal := rand.Int() Params.Init() - etcdAddr := Params.EtcdAddress rootPath := fmt.Sprintf("/test/meta/%d", randVal) tsKey := "timestamp" - etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) + etcdCli, err := clientv3.New(clientv3.Config{Endpoints: Params.EtcdEndpoints}) assert.Nil(t, err) defer etcdCli.Close() @@ -168,12 +167,11 @@ func TestGetRevOnEtcd(t *testing.T) { randVal := rand.Int() Params.Init() - etcdAddr := Params.EtcdAddress rootPath := fmt.Sprintf("/test/meta/%d", randVal) tsKey := "timestamp" key := path.Join(rootPath, tsKey) - etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) + etcdCli, err := clientv3.New(clientv3.Config{Endpoints: Params.EtcdEndpoints}) assert.Nil(t, err) defer etcdCli.Close() @@ -214,11 +212,10 @@ func TestLoad(t *testing.T) { randVal := rand.Int() Params.Init() - etcdAddr := Params.EtcdAddress rootPath := fmt.Sprintf("/test/meta/%d", randVal) tsKey := "timestamp" - etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) + etcdCli, err := clientv3.New(clientv3.Config{Endpoints: Params.EtcdEndpoints}) assert.Nil(t, err) defer etcdCli.Close() @@ -262,11 +259,10 @@ func TestMultiSave(t *testing.T) { randVal := rand.Int() Params.Init() - etcdAddr := Params.EtcdAddress rootPath := fmt.Sprintf("/test/meta/%d", randVal) tsKey := "timestamp" - etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) + etcdCli, err := clientv3.New(clientv3.Config{Endpoints: Params.EtcdEndpoints}) assert.Nil(t, err) defer etcdCli.Close() @@ -326,11 +322,10 @@ func TestMultiSaveAndRemoveWithPrefix(t *testing.T) { randVal := rand.Int() Params.Init() - etcdAddr := Params.EtcdAddress rootPath := fmt.Sprintf("/test/meta/%d", randVal) tsKey := "timestamp" - etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) + etcdCli, err := clientv3.New(clientv3.Config{Endpoints: Params.EtcdEndpoints}) assert.Nil(t, err) var vtso typeutil.Timestamp diff --git a/internal/masterservice/meta_table_test.go b/internal/masterservice/meta_table_test.go index b93c923189..b5da6c6c19 100644 --- a/internal/masterservice/meta_table_test.go +++ b/internal/masterservice/meta_table_test.go @@ -163,7 +163,6 @@ func TestMetaTable(t *testing.T) { rand.Seed(time.Now().UnixNano()) randVal := rand.Int() Params.Init() - etcdAddr := Params.EtcdAddress rootPath := fmt.Sprintf("/test/meta/%d", randVal) var vtso typeutil.Timestamp @@ -172,7 +171,7 @@ func TestMetaTable(t *testing.T) { return vtso } - etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) + etcdCli, err := clientv3.New(clientv3.Config{Endpoints: Params.EtcdEndpoints}) assert.Nil(t, err) defer etcdCli.Close() skv, err := newMetaSnapshot(etcdCli, rootPath, TimestampPrefix, 7, ftso) @@ -1083,7 +1082,6 @@ func TestMetaWithTimestamp(t *testing.T) { rand.Seed(time.Now().UnixNano()) randVal := rand.Int() Params.Init() - etcdAddr := Params.EtcdAddress rootPath := fmt.Sprintf("/test/meta/%d", randVal) var tsoStart typeutil.Timestamp = 100 @@ -1093,7 +1091,7 @@ func TestMetaWithTimestamp(t *testing.T) { return vtso } - etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) + etcdCli, err := clientv3.New(clientv3.Config{Endpoints: Params.EtcdEndpoints}) assert.Nil(t, err) defer etcdCli.Close() diff --git a/internal/masterservice/param_table.go b/internal/masterservice/param_table.go index d0aa0e1297..017e334c7b 100644 --- a/internal/masterservice/param_table.go +++ b/internal/masterservice/param_table.go @@ -14,6 +14,7 @@ package masterservice import ( "path" "strconv" + "strings" "sync" "github.com/milvus-io/milvus/internal/log" @@ -30,7 +31,7 @@ type ParamTable struct { Port int PulsarAddress string - EtcdAddress string + EtcdEndpoints []string MetaRootPath string KvRootPath string MsgChannelSubName string @@ -61,7 +62,7 @@ func (p *ParamTable) Init() { } p.initPulsarAddress() - p.initEtcdAddress() + p.initEtcdEndpoints() p.initMetaRootPath() p.initKvRootPath() @@ -91,12 +92,12 @@ func (p *ParamTable) initPulsarAddress() { p.PulsarAddress = addr } -func (p *ParamTable) initEtcdAddress() { - addr, err := p.Load("_EtcdAddress") +func (p *ParamTable) initEtcdEndpoints() { + endpoints, err := p.Load("_EtcdEndpoints") if err != nil { panic(err) } - p.EtcdAddress = addr + p.EtcdEndpoints = strings.Split(endpoints, ",") } func (p *ParamTable) initMetaRootPath() { diff --git a/internal/masterservice/param_table_test.go b/internal/masterservice/param_table_test.go index a3af013a48..8bc04e3309 100644 --- a/internal/masterservice/param_table_test.go +++ b/internal/masterservice/param_table_test.go @@ -23,8 +23,8 @@ func TestParamTable(t *testing.T) { assert.NotEqual(t, Params.PulsarAddress, "") t.Logf("pulsar address = %s", Params.PulsarAddress) - assert.NotEqual(t, Params.EtcdAddress, "") - t.Logf("etcd address = %s", Params.EtcdAddress) + assert.NotZero(t, len(Params.EtcdEndpoints)) + t.Logf("etcd endpoints = %s", Params.EtcdEndpoints) assert.NotEqual(t, Params.MetaRootPath, "") t.Logf("meta root path = %s", Params.MetaRootPath) diff --git a/internal/masterservice/proxy_node_manager.go b/internal/masterservice/proxy_node_manager.go index 8d6e18de91..06b9192db9 100644 --- a/internal/masterservice/proxy_node_manager.go +++ b/internal/masterservice/proxy_node_manager.go @@ -37,8 +37,8 @@ type proxyNodeManager struct { delSessions []func(*sessionutil.Session) } -func newProxyNodeManager(ctx context.Context, etcdAddr []string, fns ...func([]*sessionutil.Session)) (*proxyNodeManager, error) { - cli, err := clientv3.New(clientv3.Config{Endpoints: etcdAddr}) +func newProxyNodeManager(ctx context.Context, etcdEndpoints []string, fns ...func([]*sessionutil.Session)) (*proxyNodeManager, error) { + cli, err := clientv3.New(clientv3.Config{Endpoints: etcdEndpoints}) if err != nil { return nil, err } diff --git a/internal/masterservice/proxy_node_manager_test.go b/internal/masterservice/proxy_node_manager_test.go index 237fbe2e68..64ba4e36e2 100644 --- a/internal/masterservice/proxy_node_manager_test.go +++ b/internal/masterservice/proxy_node_manager_test.go @@ -26,7 +26,7 @@ import ( func TestProxyNodeManager(t *testing.T) { Params.Init() - cli, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}}) + cli, err := clientv3.New(clientv3.Config{Endpoints: Params.EtcdEndpoints}) assert.Nil(t, err) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) @@ -60,7 +60,7 @@ func TestProxyNodeManager(t *testing.T) { t.Log("get sessions", sess[0], sess[1]) } - pm, err := newProxyNodeManager(ctx, []string{Params.EtcdAddress}, f1) + pm, err := newProxyNodeManager(ctx, Params.EtcdEndpoints, f1) assert.Nil(t, err) fa := func(sess *sessionutil.Session) { assert.Equal(t, int64(101), sess.ServerID) diff --git a/internal/msgstream/mq_msgstream_test.go b/internal/msgstream/mq_msgstream_test.go index d96b581d3d..ab60df6265 100644 --- a/internal/msgstream/mq_msgstream_test.go +++ b/internal/msgstream/mq_msgstream_test.go @@ -16,6 +16,7 @@ import ( "log" "math/rand" "os" + "strings" "testing" "time" @@ -1022,11 +1023,12 @@ func TestStream_MqMsgStream_Seek(t *testing.T) { /****************************************Rmq test******************************************/ func initRmq(name string) *etcdkv.EtcdKV { - etcdAddr := os.Getenv("ETCD_ADDRESS") - if etcdAddr == "" { - etcdAddr = "localhost:2379" + endpoints := os.Getenv("ETCD_ENDPOINTS") + if endpoints == "" { + endpoints = "localhost:2379" } - cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) + etcdEndpoints := strings.Split(endpoints, ",") + cli, err := clientv3.New(clientv3.Config{Endpoints: etcdEndpoints}) if err != nil { log.Fatalf("New clientv3 error = %v", err) } diff --git a/internal/proxynode/paramtable.go b/internal/proxynode/paramtable.go index 3c72ca27fb..4421a8f29c 100644 --- a/internal/proxynode/paramtable.go +++ b/internal/proxynode/paramtable.go @@ -38,7 +38,7 @@ type ParamTable struct { IP string NetworkAddress string - EtcdAddress string + EtcdEndpoints []string MetaRootPath string MasterAddress string PulsarAddress string @@ -82,7 +82,7 @@ func (pt *ParamTable) Init() { func (pt *ParamTable) initParams() { pt.initLogCfg() - pt.initEtcdAddress() + pt.initEtcdEndpoints() pt.initMetaRootPath() pt.initPulsarAddress() pt.initQueryNodeIDList() @@ -310,12 +310,12 @@ func (pt *ParamTable) initRoleName() { pt.RoleName = fmt.Sprintf("%s-%d", "ProxyNode", pt.ProxyID) } -func (pt *ParamTable) initEtcdAddress() { - addr, err := pt.Load("_EtcdAddress") +func (pt *ParamTable) initEtcdEndpoints() { + endpoints, err := pt.Load("_EtcdEndpoints") if err != nil { panic(err) } - pt.EtcdAddress = addr + pt.EtcdEndpoints = strings.Split(endpoints, ",") } func (pt *ParamTable) initMetaRootPath() { diff --git a/internal/proxynode/proxy_node.go b/internal/proxynode/proxy_node.go index 7dd2ac852f..8efc450702 100644 --- a/internal/proxynode/proxy_node.go +++ b/internal/proxynode/proxy_node.go @@ -91,7 +91,7 @@ func NewProxyNode(ctx context.Context, factory msgstream.Factory) (*ProxyNode, e // Register register proxy node at etcd func (node *ProxyNode) Register() error { - node.session = sessionutil.NewSession(node.ctx, Params.MetaRootPath, []string{Params.EtcdAddress}) + node.session = sessionutil.NewSession(node.ctx, Params.MetaRootPath, Params.EtcdEndpoints) node.session.Init(typeutil.ProxyNodeRole, Params.NetworkAddress, false) Params.ProxyID = node.session.ServerID return nil @@ -176,7 +176,7 @@ func (node *ProxyNode) Init() error { log.Debug("proxynode", zap.Strings("proxynode AsProducer:", Params.SearchChannelNames)) log.Debug("create query message stream ...") - idAllocator, err := allocator.NewIDAllocator(node.ctx, Params.MetaRootPath, []string{Params.EtcdAddress}) + idAllocator, err := allocator.NewIDAllocator(node.ctx, Params.MetaRootPath, Params.EtcdEndpoints) if err != nil { return err diff --git a/internal/querynode/param_table.go b/internal/querynode/param_table.go index 259cca5d8e..8710458559 100644 --- a/internal/querynode/param_table.go +++ b/internal/querynode/param_table.go @@ -16,6 +16,7 @@ import ( "os" "path" "strconv" + "strings" "sync" "github.com/milvus-io/milvus/internal/log" @@ -26,7 +27,7 @@ type ParamTable struct { paramtable.BaseTable PulsarAddress string - EtcdAddress string + EtcdEndpoints []string MetaRootPath string QueryNodeIP string @@ -107,7 +108,7 @@ func (p *ParamTable) Init() { p.initMinioBucketName() p.initPulsarAddress() - p.initEtcdAddress() + p.initEtcdEndpoints() p.initMetaRootPath() p.initGracefulTime() @@ -234,12 +235,12 @@ func (p *ParamTable) initSearchResultReceiveBufSize() { p.SearchResultReceiveBufSize = p.ParseInt64("queryNode.msgStream.searchResult.recvBufSize") } -func (p *ParamTable) initEtcdAddress() { - EtcdAddress, err := p.Load("_EtcdAddress") +func (p *ParamTable) initEtcdEndpoints() { + endpoints, err := p.Load("_EtcdEndpoints") if err != nil { panic(err) } - p.EtcdAddress = EtcdAddress + p.EtcdEndpoints = strings.Split(endpoints, ",") } func (p *ParamTable) initMetaRootPath() { diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index 70d5e63393..3b8c024a10 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -108,7 +108,7 @@ func NewQueryNodeWithoutID(ctx context.Context, factory msgstream.Factory) *Quer // Register register query node at etcd func (node *QueryNode) Register() error { - node.session = sessionutil.NewSession(node.queryNodeLoopCtx, Params.MetaRootPath, []string{Params.EtcdAddress}) + node.session = sessionutil.NewSession(node.queryNodeLoopCtx, Params.MetaRootPath, Params.EtcdEndpoints) node.session.Init(typeutil.QueryNodeRole, Params.QueryNodeIP+":"+strconv.FormatInt(Params.QueryNodePort, 10), false) Params.QueryNodeID = node.session.ServerID return nil diff --git a/internal/queryservice/param_table.go b/internal/queryservice/param_table.go index aca65fa1ee..2782f5a2c8 100644 --- a/internal/queryservice/param_table.go +++ b/internal/queryservice/param_table.go @@ -15,6 +15,7 @@ import ( "fmt" "path" "strconv" + "strings" "sync" "github.com/milvus-io/milvus/internal/log" @@ -47,8 +48,8 @@ type ParamTable struct { SearchResultChannelPrefix string // --- ETCD --- - EtcdAddress string - MetaRootPath string + EtcdEndpoints []string + MetaRootPath string } var Params ParamTable @@ -83,7 +84,7 @@ func (p *ParamTable) Init() { p.initSearchResultChannelPrefix() // --- ETCD --- - p.initEtcdAddress() + p.initEtcdEndpoints() p.initMetaRootPath() }) } @@ -174,12 +175,12 @@ func (p *ParamTable) initSearchResultChannelPrefix() { p.SearchResultChannelPrefix = channelName } -func (p *ParamTable) initEtcdAddress() { - addr, err := p.Load("_EtcdAddress") +func (p *ParamTable) initEtcdEndpoints() { + endpoints, err := p.Load("_EtcdEndpoints") if err != nil { panic(err) } - p.EtcdAddress = addr + p.EtcdEndpoints = strings.Split(endpoints, ",") } func (p *ParamTable) initMetaRootPath() { diff --git a/internal/queryservice/queryservice.go b/internal/queryservice/queryservice.go index 26d9559d3c..ad482ffa46 100644 --- a/internal/queryservice/queryservice.go +++ b/internal/queryservice/queryservice.go @@ -60,7 +60,7 @@ type QueryService struct { // Register register query service at etcd func (qs *QueryService) Register() error { - qs.session = sessionutil.NewSession(qs.loopCtx, Params.MetaRootPath, []string{Params.EtcdAddress}) + qs.session = sessionutil.NewSession(qs.loopCtx, Params.MetaRootPath, Params.EtcdEndpoints) qs.session.Init(typeutil.QueryServiceRole, Params.Address, true) Params.NodeID = uint64(qs.session.ServerID) return nil diff --git a/internal/tso/global_allocator_test.go b/internal/tso/global_allocator_test.go index 47c77d7fc4..012e2a707d 100644 --- a/internal/tso/global_allocator_test.go +++ b/internal/tso/global_allocator_test.go @@ -13,11 +13,10 @@ package tso import ( "os" + "strings" "testing" "time" - "github.com/milvus-io/milvus/internal/util/funcutil" - "github.com/milvus-io/milvus/internal/util/tsoutil" "github.com/stretchr/testify/assert" ) @@ -25,12 +24,12 @@ import ( var gTestTsoAllocator *GlobalTSOAllocator func TestGlobalTSOAllocator_All(t *testing.T) { - etcdAddress := os.Getenv("ETCD_ADDRESS") - if etcdAddress == "" { - ip := funcutil.GetLocalIP() - etcdAddress = ip + ":2379" + endpoints := os.Getenv("ETCD_ENDPOINTS") + if endpoints == "" { + endpoints = "localhost:2379" } - gTestTsoAllocator = NewGlobalTSOAllocator("timestamp", tsoutil.NewTSOKVBase([]string{etcdAddress}, "/test/root/kv", "tsoTest")) + etcdEndpoints := strings.Split(endpoints, ",") + gTestTsoAllocator = NewGlobalTSOAllocator("timestamp", tsoutil.NewTSOKVBase(etcdEndpoints, "/test/root/kv", "tsoTest")) t.Run("Initialize", func(t *testing.T) { err := gTestTsoAllocator.Initialize() assert.Nil(t, err) diff --git a/internal/util/paramtable/paramtable.go b/internal/util/paramtable/paramtable.go index abf6c85b88..0d16607e7b 100644 --- a/internal/util/paramtable/paramtable.go +++ b/internal/util/paramtable/paramtable.go @@ -60,23 +60,19 @@ func (gp *BaseTable) Init() { panic(err) } gp.tryloadFromEnv() - } func (gp *BaseTable) LoadFromKVPair(kvPairs []*commonpb.KeyValuePair) error { - for _, pair := range kvPairs { err := gp.Save(pair.Key, pair.Value) if err != nil { return err } } - return nil } func (gp *BaseTable) tryloadFromEnv() { - minioAddress := os.Getenv("MINIO_ADDRESS") if minioAddress == "" { minioHost, err := gp.Load("minio.address") @@ -94,19 +90,14 @@ func (gp *BaseTable) tryloadFromEnv() { panic(err) } - etcdAddress := os.Getenv("ETCD_ADDRESS") - if etcdAddress == "" { - etcdHost, err := gp.Load("etcd.address") + etcdEndpoints := os.Getenv("ETCD_ENDPOINTS") + if etcdEndpoints == "" { + etcdEndpoints, err = gp.Load("etcd.endpoints") if err != nil { panic(err) } - port, err := gp.Load("etcd.port") - if err != nil { - panic(err) - } - etcdAddress = etcdHost + ":" + port } - err = gp.Save("_EtcdAddress", etcdAddress) + err = gp.Save("_EtcdEndpoints", etcdEndpoints) if err != nil { panic(err) } diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_impl_test.go b/internal/util/rocksmq/server/rocksmq/rocksmq_impl_test.go index 423a4317aa..fe5667bcd2 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_impl_test.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_impl_test.go @@ -15,6 +15,7 @@ import ( "log" "os" "strconv" + "strings" "sync" "testing" "time" @@ -33,12 +34,17 @@ func TestFixChannelName(t *testing.T) { assert.Equal(t, len(fixName), FixedChannelNameLen) } -func TestRocksMQ(t *testing.T) { - etcdAddr := os.Getenv("ETCD_ADDRESS") - if etcdAddr == "" { - etcdAddr = "localhost:2379" +func newEtcdClient() (*clientv3.Client, error) { + endpoints := os.Getenv("ETCD_ENDPOINTS") + if endpoints == "" { + endpoints = "localhost:2379" } - cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) + etcdEndpoints := strings.Split(endpoints, ",") + return clientv3.New(clientv3.Config{Endpoints: etcdEndpoints}) +} + +func TestRocksMQ(t *testing.T) { + cli, err := newEtcdClient() assert.Nil(t, err) etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/root") defer etcdKV.Close() @@ -91,11 +97,7 @@ func TestRocksMQ(t *testing.T) { } func TestRocksMQ_Loop(t *testing.T) { - etcdAddr := os.Getenv("ETCD_ADDRESS") - if etcdAddr == "" { - etcdAddr = "localhost:2379" - } - cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) + cli, err := newEtcdClient() assert.Nil(t, err) etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/root") defer etcdKV.Close() @@ -159,11 +161,7 @@ func TestRocksMQ_Loop(t *testing.T) { } func TestRocksMQ_Goroutines(t *testing.T) { - etcdAddr := os.Getenv("ETCD_ADDRESS") - if etcdAddr == "" { - etcdAddr = "localhost:2379" - } - cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) + cli, err := newEtcdClient() assert.Nil(t, err) etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/root") defer etcdKV.Close() @@ -230,11 +228,7 @@ func TestRocksMQ_Goroutines(t *testing.T) { Consume: 90000 message / s */ func TestRocksMQ_Throughout(t *testing.T) { - etcdAddr := os.Getenv("ETCD_ADDRESS") - if etcdAddr == "" { - etcdAddr = "localhost:2379" - } - cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) + cli, err := newEtcdClient() assert.Nil(t, err) etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/root") defer etcdKV.Close() @@ -251,7 +245,7 @@ func TestRocksMQ_Throughout(t *testing.T) { assert.Nil(t, err) defer rmq.DestroyTopic(channelName) - entityNum := 1000000 + entityNum := 100000 pt0 := time.Now().UnixNano() / int64(time.Millisecond) for i := 0; i < entityNum; i++ { @@ -284,11 +278,7 @@ func TestRocksMQ_Throughout(t *testing.T) { } func TestRocksMQ_MultiChan(t *testing.T) { - etcdAddr := os.Getenv("ETCD_ADDRESS") - if etcdAddr == "" { - etcdAddr = "localhost:2379" - } - cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) + cli, err := newEtcdClient() assert.Nil(t, err) etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/root") defer etcdKV.Close() diff --git a/internal/util/sessionutil/session_util.go b/internal/util/sessionutil/session_util.go index ba41503546..c9bcea77fb 100644 --- a/internal/util/sessionutil/session_util.go +++ b/internal/util/sessionutil/session_util.go @@ -55,7 +55,7 @@ type SessionEvent struct { // NewSession is a helper to build Session object. // ServerID and LeaseID will be assigned after registeration. // etcdCli is initialized when NewSession -func NewSession(ctx context.Context, metaRoot string, etcdAddress []string) *Session { +func NewSession(ctx context.Context, metaRoot string, etcdEndpoints []string) *Session { ctx, cancel := context.WithCancel(ctx) session := &Session{ ctx: ctx, @@ -64,7 +64,7 @@ func NewSession(ctx context.Context, metaRoot string, etcdAddress []string) *Ses } connectEtcdFn := func() error { - etcdCli, err := clientv3.New(clientv3.Config{Endpoints: etcdAddress, DialTimeout: 5 * time.Second}) + etcdCli, err := clientv3.New(clientv3.Config{Endpoints: etcdEndpoints, DialTimeout: 5 * time.Second}) if err != nil { return err } @@ -310,10 +310,10 @@ func (s *Session) WatchServices(prefix string, revision int64) (eventChannel <-c return eventCh } -func initEtcd(etcdAddress string) (*clientv3.Client, error) { +func initEtcd(etcdEndpoints []string) (*clientv3.Client, error) { var etcdCli *clientv3.Client connectEtcdFn := func() error { - etcd, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddress}, DialTimeout: 5 * time.Second}) + etcd, err := clientv3.New(clientv3.Config{Endpoints: etcdEndpoints, DialTimeout: 5 * time.Second}) if err != nil { return err } diff --git a/internal/util/sessionutil/session_util_test.go b/internal/util/sessionutil/session_util_test.go index ec4be8c06e..e983e79f2f 100644 --- a/internal/util/sessionutil/session_util_test.go +++ b/internal/util/sessionutil/session_util_test.go @@ -4,6 +4,7 @@ import ( "fmt" "math/rand" "strconv" + "strings" "sync" "testing" "time" @@ -21,13 +22,14 @@ func TestGetServerIDConcurrently(t *testing.T) { ctx := context.Background() Params.Init() - etcdAddr, err := Params.Load("_EtcdAddress") + endpoints, err := Params.Load("_EtcdEndpoints") metaRoot := fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot) if err != nil { panic(err) } - cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) + etcdEndpoints := strings.Split(endpoints, ",") + cli, err := clientv3.New(clientv3.Config{Endpoints: etcdEndpoints}) assert.Nil(t, err) etcdKV := etcdkv.NewEtcdKV(cli, "") _, err = cli.Delete(ctx, metaRoot, clientv3.WithPrefix()) @@ -39,7 +41,7 @@ func TestGetServerIDConcurrently(t *testing.T) { var wg sync.WaitGroup var muList sync.Mutex = sync.Mutex{} - s := NewSession(ctx, metaRoot, []string{etcdAddr}) + s := NewSession(ctx, metaRoot, etcdEndpoints) res := make([]int64, 0) getIDFunc := func() { @@ -60,19 +62,19 @@ func TestGetServerIDConcurrently(t *testing.T) { for i := 1; i <= 10; i++ { assert.Contains(t, res, int64(i)) } - } func TestInit(t *testing.T) { ctx := context.Background() Params.Init() - etcdAddr, err := Params.Load("_EtcdAddress") + endpoints, err := Params.Load("_EtcdEndpoints") if err != nil { panic(err) } - cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) + etcdEndpoints := strings.Split(endpoints, ",") + cli, err := clientv3.New(clientv3.Config{Endpoints: etcdEndpoints}) assert.Nil(t, err) etcdKV := etcdkv.NewEtcdKV(cli, "") metaRoot := fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot) @@ -82,7 +84,7 @@ func TestInit(t *testing.T) { defer etcdKV.Close() defer etcdKV.RemoveWithPrefix("") - s := NewSession(ctx, metaRoot, []string{etcdAddr}) + s := NewSession(ctx, metaRoot, etcdEndpoints) s.Init("inittest", "testAddr", false) assert.NotEqual(t, int64(0), s.leaseID) assert.NotEqual(t, int64(0), s.ServerID) @@ -95,12 +97,13 @@ func TestUpdateSessions(t *testing.T) { ctx := context.Background() Params.Init() - etcdAddr, err := Params.Load("_EtcdAddress") + endpoints, err := Params.Load("_EtcdEndpoints") if err != nil { panic(err) } - cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) + etcdEndpoints := strings.Split(endpoints, ",") + cli, err := clientv3.New(clientv3.Config{Endpoints: etcdEndpoints}) assert.Nil(t, err) etcdKV := etcdkv.NewEtcdKV(cli, "") metaRoot := fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot) @@ -113,7 +116,7 @@ func TestUpdateSessions(t *testing.T) { var wg sync.WaitGroup var muList sync.Mutex = sync.Mutex{} - s := NewSession(ctx, metaRoot, []string{etcdAddr}) + s := NewSession(ctx, metaRoot, etcdEndpoints) sessions, rev, err := s.GetSessions("test") assert.Nil(t, err) @@ -123,7 +126,7 @@ func TestUpdateSessions(t *testing.T) { sList := []*Session{} getIDFunc := func() { - singleS := NewSession(ctx, metaRoot, []string{etcdAddr}) + singleS := NewSession(ctx, metaRoot, etcdEndpoints) singleS.Init("test", "testAddr", false) muList.Lock() sList = append(sList, singleS) @@ -167,5 +170,4 @@ func TestUpdateSessions(t *testing.T) { assert.Equal(t, len(sessionEvents), 20) assert.Equal(t, addEventLen, 10) assert.Equal(t, delEventLen, 10) - } diff --git a/internal/util/tsoutil/tso.go b/internal/util/tsoutil/tso.go index cb3dbed048..fe38fcf902 100644 --- a/internal/util/tsoutil/tso.go +++ b/internal/util/tsoutil/tso.go @@ -44,9 +44,9 @@ func Mod24H(ts uint64) uint64 { return (physical << logicalBits) | logical } -func NewTSOKVBase(etcdAddr []string, tsoRoot, subPath string) *etcdkv.EtcdKV { +func NewTSOKVBase(etcdEndpoints []string, tsoRoot, subPath string) *etcdkv.EtcdKV { client, _ := clientv3.New(clientv3.Config{ - Endpoints: etcdAddr, + Endpoints: etcdEndpoints, DialTimeout: 5 * time.Second, }) return etcdkv.NewEtcdKV(client, path.Join(tsoRoot, subPath))