diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index c45b5f0e3c..e814b0f0f3 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -20,6 +20,7 @@ import ( "fmt" "io" "math/rand" + "strconv" "sync/atomic" "time" @@ -28,6 +29,7 @@ import ( "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/typeutil" "github.com/milvus-io/milvus/internal/proto/commonpb" @@ -70,6 +72,8 @@ type DataNode struct { masterService types.MasterService dataService types.DataService + session *sessionutil.Session + flushChan chan<- *flushMsg replica Replica @@ -133,6 +137,11 @@ func (node *DataNode) SetDataServiceInterface(ds types.DataService) error { // At last, data node initializes its `dataSyncService` and `metaService`. func (node *DataNode) Init() error { ctx := context.Background() + + node.session = sessionutil.NewSession(ctx, []string{Params.EtcdAddress}, typeutil.DataNodeRole, + Params.IP+":"+strconv.Itoa(Params.Port), false) + node.session.Init() + req := &datapb.RegisterNodeRequest{ Base: &commonpb.MsgBase{ SourceID: node.NodeID, diff --git a/internal/dataservice/param.go b/internal/dataservice/param.go index 4e6825723c..deb4c8f6fa 100644 --- a/internal/dataservice/param.go +++ b/internal/dataservice/param.go @@ -25,6 +25,9 @@ type ParamTable struct { NodeID int64 + IP string + Port int + // --- ETCD --- EtcdAddress string MetaRootPath string diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go index a3dd7bf110..6be2c5267b 100644 --- a/internal/dataservice/server.go +++ b/internal/dataservice/server.go @@ -33,6 +33,7 @@ import ( "github.com/milvus-io/milvus/internal/timesync" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/retry" + "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/typeutil" "go.etcd.io/etcd/clientv3" "go.uber.org/zap" @@ -69,6 +70,7 @@ type Server struct { sync.Mutex name string } + session *sessionutil.Session segmentInfoStream msgstream.MsgStream flushMsgStream msgstream.MsgStream insertChannels []string @@ -106,6 +108,9 @@ func (s *Server) SetMasterClient(masterClient types.MasterService) { } func (s *Server) Init() error { + s.session = sessionutil.NewSession(s.ctx, []string{Params.EtcdAddress}, typeutil.DataServiceRole, + Params.IP, true) + s.session.Init() return nil } diff --git a/internal/dataservice/server_test.go b/internal/dataservice/server_test.go index f375d63047..cbc2cddf0f 100644 --- a/internal/dataservice/server_test.go +++ b/internal/dataservice/server_test.go @@ -22,8 +22,10 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/internal/util/retry" "github.com/milvus-io/milvus/internal/util/typeutil" "github.com/stretchr/testify/assert" + "go.etcd.io/etcd/clientv3" ) func TestRegisterNode(t *testing.T) { @@ -778,6 +780,12 @@ func newTestServer(t *testing.T) *Server { } err = factory.SetParams(m) assert.Nil(t, err) + + etcdCli, err := initEtcd(Params.EtcdAddress) + assert.Nil(t, err) + _, err = etcdCli.Delete(context.Background(), "/session", clientv3.WithPrefix()) + assert.Nil(t, err) + svr, err := CreateServer(context.TODO(), factory) assert.Nil(t, err) ms := newMockMasterService() @@ -804,3 +812,20 @@ func closeTestServer(t *testing.T, svr *Server) { err = svr.CleanMeta() assert.Nil(t, err) } + +func initEtcd(etcdAddress string) (*clientv3.Client, error) { + var etcdCli *clientv3.Client + connectEtcdFn := func() error { + etcd, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddress}, DialTimeout: 5 * time.Second}) + if err != nil { + return err + } + etcdCli = etcd + return nil + } + err := retry.Retry(100000, time.Millisecond*200, connectEtcdFn) + if err != nil { + return nil, err + } + return etcdCli, nil +} diff --git a/internal/distributed/datanode/service.go b/internal/distributed/datanode/service.go index 84921e2c94..0ec31b52bb 100644 --- a/internal/distributed/datanode/service.go +++ b/internal/distributed/datanode/service.go @@ -22,7 +22,6 @@ import ( "sync" "time" - etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" otgrpc "github.com/opentracing-contrib/go-grpc" "github.com/opentracing/opentracing-go" "go.uber.org/zap" @@ -40,7 +39,6 @@ import ( "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/funcutil" - "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/trace" ) @@ -57,9 +55,6 @@ type Server struct { masterService types.MasterService dataService types.DataService - etcdKV *etcdkv.EtcdKV - signal <-chan bool - newMasterServiceClient func(string) (types.MasterService, error) newDataServiceClient func(string) types.DataService @@ -174,11 +169,6 @@ func (s *Server) init() error { addr := Params.IP + ":" + strconv.Itoa(Params.Port) log.Debug("DataNode address", zap.String("address", addr)) - self := sessionutil.NewSession("datanode", funcutil.GetLocalIP()+":"+strconv.Itoa(Params.Port), false) - sm := sessionutil.NewSessionManager(ctx, dn.Params.EtcdAddress, dn.Params.MetaRootPath, self) - sm.Init() - sessionutil.SetGlobalSessionManager(sm) - err := s.startGrpc() if err != nil { return err diff --git a/internal/distributed/dataservice/paramtable.go b/internal/distributed/dataservice/paramtable.go index 960e35f759..daa9c8cbbb 100644 --- a/internal/distributed/dataservice/paramtable.go +++ b/internal/distributed/dataservice/paramtable.go @@ -20,6 +20,7 @@ import ( type ParamTable struct { paramtable.BaseTable + IP string Port int MasterAddress string } @@ -38,6 +39,7 @@ func (pt *ParamTable) Init() { func (pt *ParamTable) initParams() { pt.initMasterAddress() + pt.initDataServiceAddress() } func (pt *ParamTable) LoadFromEnv() { @@ -55,3 +57,11 @@ func (pt *ParamTable) initMasterAddress() { } pt.MasterAddress = ret } + +func (pt *ParamTable) initDataServiceAddress() { + ret, err := pt.Load("_DataServiceAddress") + if err != nil { + panic(err) + } + pt.IP = ret +} diff --git a/internal/distributed/dataservice/service.go b/internal/distributed/dataservice/service.go index 4cc0ddfe2b..d16b1cb868 100644 --- a/internal/distributed/dataservice/service.go +++ b/internal/distributed/dataservice/service.go @@ -32,7 +32,6 @@ import ( "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/funcutil" - "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/trace" otgrpc "github.com/opentracing-contrib/go-grpc" "github.com/opentracing/opentracing-go" @@ -89,11 +88,8 @@ func (s *Server) init() error { s.closer = closer dataservice.Params.Init() - - self := sessionutil.NewSession("dataservice", funcutil.GetLocalIP()+":"+strconv.Itoa(Params.Port), true) - sm := sessionutil.NewSessionManager(ctx, dataservice.Params.EtcdAddress, dataservice.Params.MetaRootPath, self) - sm.Init() - sessionutil.SetGlobalSessionManager(sm) + dataservice.Params.IP = Params.IP + dataservice.Params.Port = Params.Port err := s.startGrpc() if err != nil { diff --git a/internal/distributed/indexnode/service.go b/internal/distributed/indexnode/service.go index 88508eb1ab..57c9cf8b0d 100644 --- a/internal/distributed/indexnode/service.go +++ b/internal/distributed/indexnode/service.go @@ -24,7 +24,6 @@ import ( grpcindexserviceclient "github.com/milvus-io/milvus/internal/distributed/indexservice/client" "github.com/milvus-io/milvus/internal/indexnode" - etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/indexpb" @@ -32,7 +31,6 @@ import ( "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/funcutil" - "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/trace" otgrpc "github.com/opentracing-contrib/go-grpc" "github.com/opentracing/opentracing-go" @@ -45,9 +43,6 @@ type Server struct { grpcServer *grpc.Server grpcErrChan chan error - etcdKV *etcdkv.EtcdKV - signal <-chan bool - indexServiceClient types.IndexService loopCtx context.Context loopCancel func() @@ -100,7 +95,6 @@ func (s *Server) startGrpcLoop(grpcPort int) { } func (s *Server) init() error { - ctx := context.Background() var err error Params.Init() if !funcutil.CheckPortAvailable(Params.Port) { @@ -120,11 +114,6 @@ func (s *Server) init() error { Params.Address = Params.IP + ":" + strconv.FormatInt(int64(Params.Port), 10) - self := sessionutil.NewSession("indexnode", funcutil.GetLocalIP()+":"+strconv.Itoa(Params.Port), false) - sm := sessionutil.NewSessionManager(ctx, indexnode.Params.EtcdAddress, indexnode.Params.MetaRootPath, self) - sm.Init() - sessionutil.SetGlobalSessionManager(sm) - defer func() { if err != nil { err = s.Stop() diff --git a/internal/distributed/indexservice/service.go b/internal/distributed/indexservice/service.go index 674fba05cc..8bbd276568 100644 --- a/internal/distributed/indexservice/service.go +++ b/internal/distributed/indexservice/service.go @@ -28,7 +28,6 @@ import ( "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/util/funcutil" - "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/trace" "github.com/milvus-io/milvus/internal/util/typeutil" otgrpc "github.com/opentracing-contrib/go-grpc" @@ -65,18 +64,14 @@ func (s *Server) Run() error { } func (s *Server) init() error { - ctx := context.Background() Params.Init() indexservice.Params.Init() + indexservice.Params.Address = Params.ServiceAddress + indexservice.Params.Port = Params.ServicePort closer := trace.InitTracing("index_service") s.closer = closer - self := sessionutil.NewSession("indexservice", funcutil.GetLocalIP()+":"+strconv.Itoa(Params.ServicePort), true) - sm := sessionutil.NewSessionManager(ctx, indexservice.Params.EtcdAddress, indexservice.Params.MetaRootPath, self) - sm.Init() - sessionutil.SetGlobalSessionManager(sm) - s.loopWg.Add(1) go s.startGrpcLoop(Params.ServicePort) // wait for grpc indexservice loop start diff --git a/internal/distributed/masterservice/masterservice_test.go b/internal/distributed/masterservice/masterservice_test.go index 3065f1e565..f977415b49 100644 --- a/internal/distributed/masterservice/masterservice_test.go +++ b/internal/distributed/masterservice/masterservice_test.go @@ -34,8 +34,10 @@ import ( "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/internal/util/retry" "github.com/milvus-io/milvus/internal/util/typeutil" "github.com/stretchr/testify/assert" + "go.etcd.io/etcd/clientv3" ) func TestGrpcService(t *testing.T) { @@ -81,6 +83,12 @@ func TestGrpcService(t *testing.T) { core, ok := (svr.masterService).(*cms.Core) assert.True(t, ok) + + etcdCli, err := initEtcd(cms.Params.EtcdAddress) + assert.Nil(t, err) + _, err = etcdCli.Delete(ctx, "/session", clientv3.WithPrefix()) + assert.Nil(t, err) + err = core.Init() assert.Nil(t, err) @@ -861,6 +869,10 @@ func TestRun(t *testing.T) { cms.Params.Init() cms.Params.MetaRootPath = fmt.Sprintf("/%d/test/meta", randVal) + etcdCli, err := initEtcd(cms.Params.EtcdAddress) + assert.Nil(t, err) + _, err = etcdCli.Delete(ctx, "/session", clientv3.WithPrefix()) + assert.Nil(t, err) err = svr.Run() assert.Nil(t, err) @@ -868,3 +880,20 @@ func TestRun(t *testing.T) { assert.Nil(t, err) } + +func initEtcd(etcdAddress string) (*clientv3.Client, error) { + var etcdCli *clientv3.Client + connectEtcdFn := func() error { + etcd, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddress}, DialTimeout: 5 * time.Second}) + if err != nil { + return err + } + etcdCli = etcd + return nil + } + err := retry.Retry(100000, time.Millisecond*200, connectEtcdFn) + if err != nil { + return nil, err + } + return etcdCli, nil +} diff --git a/internal/distributed/masterservice/server.go b/internal/distributed/masterservice/server.go index d204ed337e..586ba204a7 100644 --- a/internal/distributed/masterservice/server.go +++ b/internal/distributed/masterservice/server.go @@ -29,7 +29,6 @@ import ( isc "github.com/milvus-io/milvus/internal/distributed/indexservice/client" psc "github.com/milvus-io/milvus/internal/distributed/proxyservice/client" qsc "github.com/milvus-io/milvus/internal/distributed/queryservice/client" - etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/log" cms "github.com/milvus-io/milvus/internal/masterservice" "github.com/milvus-io/milvus/internal/msgstream" @@ -41,7 +40,6 @@ import ( "github.com/milvus-io/milvus/internal/proto/masterpb" "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/util/funcutil" - "github.com/milvus-io/milvus/internal/util/sessionutil" ) // grpc wrapper @@ -70,9 +68,6 @@ type Server struct { connectIndexService bool connectQueryService bool - etcdKV *etcdkv.EtcdKV - signal <-chan bool - closer io.Closer } @@ -126,6 +121,8 @@ func (s *Server) init() error { Params.Init() cms.Params.Init() + cms.Params.Address = Params.Address + cms.Params.Port = Params.Port log.Debug("grpc init done ...") ctx := context.Background() @@ -135,11 +132,6 @@ func (s *Server) init() error { log.Debug("init params done") - self := sessionutil.NewSession("masterservice", funcutil.GetLocalIP()+":"+strconv.Itoa(Params.Port), true) - sm := sessionutil.NewSessionManager(ctx, cms.Params.EtcdAddress, cms.Params.MetaRootPath, self) - sm.Init() - sessionutil.SetGlobalSessionManager(sm) - err := s.startGrpc() if err != nil { return err diff --git a/internal/distributed/proxynode/service.go b/internal/distributed/proxynode/service.go index a8e30aa117..0aab245e5b 100644 --- a/internal/distributed/proxynode/service.go +++ b/internal/distributed/proxynode/service.go @@ -39,7 +39,6 @@ import ( "github.com/milvus-io/milvus/internal/proto/proxypb" "github.com/milvus-io/milvus/internal/proxynode" "github.com/milvus-io/milvus/internal/util/funcutil" - "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/trace" "github.com/opentracing/opentracing-go" ) @@ -165,11 +164,6 @@ func (s *Server) init() error { } }() - self := sessionutil.NewSession("proxynode", funcutil.GetLocalIP()+":"+strconv.Itoa(Params.Port), false) - sm := sessionutil.NewSessionManager(ctx, proxynode.Params.EtcdAddress, proxynode.Params.MetaRootPath, self) - sm.Init() - sessionutil.SetGlobalSessionManager(sm) - s.wg.Add(1) go s.startGrpcLoop(Params.Port) // wait for grpc server loop start diff --git a/internal/distributed/querynode/service.go b/internal/distributed/querynode/service.go index ab5b56606d..c768dfc919 100644 --- a/internal/distributed/querynode/service.go +++ b/internal/distributed/querynode/service.go @@ -42,7 +42,6 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" qn "github.com/milvus-io/milvus/internal/querynode" "github.com/milvus-io/milvus/internal/util/funcutil" - "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/trace" "github.com/milvus-io/milvus/internal/util/typeutil" ) @@ -92,11 +91,6 @@ func (s *Server) init() error { closer := trace.InitTracing(fmt.Sprintf("query_node ip: %s, port: %d", Params.QueryNodeIP, Params.QueryNodePort)) s.closer = closer - self := sessionutil.NewSession("querynode", funcutil.GetLocalIP()+":"+strconv.Itoa(Params.QueryNodePort), false) - sm := sessionutil.NewSessionManager(ctx, qn.Params.EtcdAddress, qn.Params.MetaRootPath, self) - sm.Init() - sessionutil.SetGlobalSessionManager(sm) - log.Debug("QueryNode", zap.Int("port", Params.QueryNodePort)) s.wg.Add(1) go s.startGrpcLoop(Params.QueryNodePort) diff --git a/internal/distributed/queryservice/service.go b/internal/distributed/queryservice/service.go index 159a845b57..492c23201a 100644 --- a/internal/distributed/queryservice/service.go +++ b/internal/distributed/queryservice/service.go @@ -27,7 +27,6 @@ import ( qs "github.com/milvus-io/milvus/internal/queryservice" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/funcutil" - "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/trace" otgrpc "github.com/opentracing-contrib/go-grpc" "github.com/opentracing/opentracing-go" @@ -92,15 +91,11 @@ func (s *Server) init() error { ctx := context.Background() Params.Init() qs.Params.Init() + qs.Params.Port = Params.Port closer := trace.InitTracing("query_service") s.closer = closer - self := sessionutil.NewSession("querynode", funcutil.GetLocalIP()+":"+strconv.Itoa(Params.Port), false) - sm := sessionutil.NewSessionManager(ctx, qs.Params.EtcdAddress, qs.Params.MetaRootPath, self) - sm.Init() - sessionutil.SetGlobalSessionManager(sm) - s.wg.Add(1) go s.startGrpcLoop(Params.Port) // wait for grpc server loop start diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index 067d8a50f3..1ed458be6c 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -16,6 +16,7 @@ import ( "errors" "io" "math/rand" + "strconv" "time" "go.uber.org/zap" @@ -29,6 +30,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/funcutil" + "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/typeutil" ) @@ -47,7 +49,8 @@ type IndexNode struct { sched *TaskScheduler - kv kv.BaseKV + kv kv.BaseKV + session *sessionutil.Session serviceClient types.IndexService // method factory @@ -76,6 +79,9 @@ func NewIndexNode(ctx context.Context) (*IndexNode, error) { func (i *IndexNode) Init() error { ctx := context.Background() + i.session = sessionutil.NewSession(ctx, []string{Params.EtcdAddress}, typeutil.IndexNodeRole, + Params.IP+":"+strconv.Itoa(Params.Port), false) + i.session.Init() err := funcutil.WaitForComponentHealthy(ctx, i.serviceClient, "IndexService", 1000000, time.Millisecond*200) if err != nil { diff --git a/internal/indexservice/indexservice.go b/internal/indexservice/indexservice.go index e30631eb53..516af34f84 100644 --- a/internal/indexservice/indexservice.go +++ b/internal/indexservice/indexservice.go @@ -31,6 +31,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/tso" "github.com/milvus-io/milvus/internal/util/retry" + "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/tsoutil" "github.com/milvus-io/milvus/internal/util/typeutil" "go.etcd.io/etcd/clientv3" @@ -53,7 +54,8 @@ type IndexService struct { loopCancel func() loopWg sync.WaitGroup - sched *TaskScheduler + sched *TaskScheduler + session *sessionutil.Session idAllocator *allocator.GlobalIDAllocator @@ -85,6 +87,12 @@ func NewIndexService(ctx context.Context) (*IndexService, error) { func (i *IndexService) Init() error { log.Debug("indexservice", zap.String("etcd address", Params.EtcdAddress)) + + ctx := context.Background() + i.session = sessionutil.NewSession(ctx, []string{Params.EtcdAddress}, typeutil.IndexServiceRole, + Params.Address, true) + i.session.Init() + connectEtcdFn := func() error { etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}}) if err != nil { diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go index 15b5cb6b99..5d86cf3683 100644 --- a/internal/masterservice/master_service.go +++ b/internal/masterservice/master_service.go @@ -41,6 +41,7 @@ import ( "github.com/milvus-io/milvus/internal/tso" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/retry" + "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/tsoutil" "github.com/milvus-io/milvus/internal/util/typeutil" ) @@ -150,6 +151,8 @@ type Core struct { startOnce sync.Once //isInit atomic.Value + session *sessionutil.Session + msFactory ms.Factory } @@ -818,6 +821,10 @@ func (c *Core) BuildIndex(segID typeutil.UniqueID, field *schemapb.FieldSchema, func (c *Core) Init() error { var initError error = nil c.initOnce.Do(func() { + c.session = sessionutil.NewSession(c.ctx, []string{Params.EtcdAddress}, typeutil.MasterServiceRole, + Params.Address, true) + c.session.Init() + connectEtcdFn := func() error { if c.etcdCli, initError = clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}, DialTimeout: 5 * time.Second}); initError != nil { return initError diff --git a/internal/masterservice/master_service_test.go b/internal/masterservice/master_service_test.go index b2cc6eec93..33995be4b2 100644 --- a/internal/masterservice/master_service_test.go +++ b/internal/masterservice/master_service_test.go @@ -16,7 +16,6 @@ import ( "encoding/json" "fmt" "math/rand" - "strconv" "sync" "testing" "time" @@ -35,7 +34,6 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/types" - "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/typeutil" "github.com/stretchr/testify/assert" @@ -255,11 +253,13 @@ func TestMasterService(t *testing.T) { err = core.SetQueryService(qm) assert.Nil(t, err) - // initialize master's session manager before core init - self := sessionutil.NewSession("masterservice", funcutil.GetLocalIP()+":"+strconv.Itoa(53100), true) - sm := sessionutil.NewSessionManager(ctx, Params.EtcdAddress, Params.MetaRootPath, self) - sm.Init() - sessionutil.SetGlobalSessionManager(sm) + //TODO initialize master's session manager before core init + /* + self := sessionutil.NewSession("masterservice", funcutil.GetLocalIP()+":"+strconv.Itoa(53100), true) + sm := sessionutil.NewSessionManager(ctx, Params.EtcdAddress, Params.MetaRootPath, self) + sm.Init() + sessionutil.SetGlobalSessionManager(sm) + */ err = core.Init() assert.Nil(t, err) diff --git a/internal/masterservice/param_table.go b/internal/masterservice/param_table.go index b944fc8f53..5fdfbd1e48 100644 --- a/internal/masterservice/param_table.go +++ b/internal/masterservice/param_table.go @@ -29,6 +29,9 @@ type ParamTable struct { NodeID uint64 + Address string + Port int + PulsarAddress string EtcdAddress string MetaRootPath string diff --git a/internal/proxynode/proxy_node.go b/internal/proxynode/proxy_node.go index 3da1f80171..dbea072d83 100644 --- a/internal/proxynode/proxy_node.go +++ b/internal/proxynode/proxy_node.go @@ -29,6 +29,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/proxypb" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/funcutil" + "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/typeutil" ) @@ -59,6 +60,8 @@ type ProxyNode struct { tsoAllocator *TimestampAllocator segAssigner *SegIDAssigner + session *sessionutil.Session + queryMsgStream msgstream.MsgStream msFactory msgstream.Factory @@ -86,6 +89,10 @@ func (node *ProxyNode) Init() error { // todo wait for proxyservice state changed to Healthy ctx := context.Background() + node.session = sessionutil.NewSession(ctx, []string{Params.EtcdAddress}, typeutil.ProxyNodeRole, + Params.NetworkAddress, false) + node.session.Init() + err := funcutil.WaitForComponentHealthy(ctx, node.proxyService, "ProxyService", 1000000, time.Millisecond*200) if err != nil { return err diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index 3c2ef3eb84..eb614975a9 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -29,6 +29,7 @@ import ( "errors" "fmt" "math/rand" + "strconv" "sync" "sync/atomic" "time" @@ -41,6 +42,8 @@ import ( "github.com/milvus-io/milvus/internal/proto/internalpb" queryPb "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/internal/util/sessionutil" + "github.com/milvus-io/milvus/internal/util/typeutil" ) type QueryNode struct { @@ -68,6 +71,8 @@ type QueryNode struct { msFactory msgstream.Factory scheduler *taskScheduler + + session *sessionutil.Session } func NewQueryNode(ctx context.Context, queryNodeID UniqueID, factory msgstream.Factory) *QueryNode { @@ -116,6 +121,10 @@ func NewQueryNodeWithoutID(ctx context.Context, factory msgstream.Factory) *Quer func (node *QueryNode) Init() error { ctx := context.Background() + node.session = sessionutil.NewSession(ctx, []string{Params.EtcdAddress}, typeutil.QueryNodeRole, + Params.QueryNodeIP+":"+strconv.FormatInt(Params.QueryNodePort, 10), false) + node.session.Init() + C.SegcoreInit() registerReq := &queryPb.RegisterNodeRequest{ Base: &commonpb.MsgBase{ diff --git a/internal/queryservice/param_table.go b/internal/queryservice/param_table.go index 71ff2f393b..aca65fa1ee 100644 --- a/internal/queryservice/param_table.go +++ b/internal/queryservice/param_table.go @@ -30,6 +30,7 @@ type ParamTable struct { NodeID uint64 Address string + Port int QueryServiceID UniqueID // stats diff --git a/internal/queryservice/queryservice.go b/internal/queryservice/queryservice.go index 37556dd9d0..7d80d93e09 100644 --- a/internal/queryservice/queryservice.go +++ b/internal/queryservice/queryservice.go @@ -22,6 +22,7 @@ import ( "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/typeutil" ) @@ -46,6 +47,8 @@ type QueryService struct { queryChannels []*queryChannelInfo qcMutex *sync.Mutex + session *sessionutil.Session + stateCode atomic.Value isInit atomic.Value enableGrpc bool @@ -54,6 +57,11 @@ type QueryService struct { } func (qs *QueryService) Init() error { + ctx := context.Background() + + qs.session = sessionutil.NewSession(ctx, []string{Params.EtcdAddress}, typeutil.QueryServiceRole, + Params.Address, true) + qs.session.Init() return nil } diff --git a/internal/util/sessionutil/session_util.go b/internal/util/sessionutil/session_util.go index de2b3d8f60..11c85caa88 100644 --- a/internal/util/sessionutil/session_util.go +++ b/internal/util/sessionutil/session_util.go @@ -5,20 +5,18 @@ import ( "encoding/json" "errors" "fmt" + "path" "strconv" - "strings" - "sync" "time" "github.com/coreos/etcd/mvcc/mvccpb" - etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/util/retry" "go.etcd.io/etcd/clientv3" "go.uber.org/zap" ) -const defaultServiceRoot = "/services/" +const defaultServiceRoot = "/session/" const defaultIDKey = "id" const defaultRetryTimes = 30 const defaultTTL = 10 @@ -27,119 +25,105 @@ const defaultTTL = 10 // Address. // LeaseID will be assigned after registered in etcd. type Session struct { - ServerID int64 - ServerName string - Address string - Exclusive bool - LeaseID clientv3.LeaseID -} + ctx context.Context + ServerID int64 `json:"ServerID,omitempty"` + ServerName string `json:"ServerName,omitempty"` + Address string `json:"Address,omitempty"` + Exclusive bool `json:"Exclusive,omitempty"` -var ( - globalSessionManager = &SessionManager{} -) - -// SessionManager is a struct to help store other service's session. -// including ServerID, ServerName, Address. -// It can fetch up-to-date sessions' information and watch service up and down. -type SessionManager struct { - ctx context.Context - etcdKV *etcdkv.EtcdKV - - Self *Session - Sessions sync.Map + etcdCli *clientv3.Client + leaseID clientv3.LeaseID + cancel context.CancelFunc } // NewSession is a helper to build Session object.LeaseID will be assigned after // registeration. -func NewSession(serverName, address string, exclusive bool) *Session { - return &Session{ +func NewSession(ctx context.Context, etcdAddress []string, serverName, address string, exclusive bool) *Session { + ctx, cancel := context.WithCancel(ctx) + session := &Session{ + ctx: ctx, ServerName: serverName, Address: address, Exclusive: exclusive, + cancel: cancel, } -} -// NewSessionManager is a helper to build SessionManager object. -func NewSessionManager(ctx context.Context, etcdAddress string, etcdPath string, self *Session) *SessionManager { - etcdKV, err := initEtcd(etcdAddress, etcdPath) + connectEtcdFn := func() error { + etcdCli, err := clientv3.New(clientv3.Config{Endpoints: etcdAddress, DialTimeout: 5 * time.Second}) + if err != nil { + return err + } + session.etcdCli = etcdCli + return nil + } + err := retry.Retry(100000, time.Millisecond*200, connectEtcdFn) if err != nil { return nil } - return &SessionManager{ - ctx: ctx, - etcdKV: etcdKV, - Self: self, - } + return session } // Init will initialize base struct in the SessionManager, including getServerID, // and process keepAliveResponse -func (sm *SessionManager) Init() { - sm.checkIDExist() - serverID, err := sm.getServerID() +func (s *Session) Init() { + s.checkIDExist() + serverID, err := s.getServerID() if err != nil { panic(err) } - sm.Self.ServerID = serverID - ch, err := sm.registerService() + s.ServerID = serverID + ch, err := s.registerService() if err != nil { panic(err) } - sm.processKeepAliveResponse(ch) -} - -// NewSession is a helper to build Session object.LeaseID will be assigned after -// registeration. -func NewSessionWithID(serverID int64, serverName, address string, exclusive bool) *Session { - return &Session{ - ServerID: serverID, - ServerName: serverName, - Address: address, - Exclusive: exclusive, - } -} - -// GlobalServerID returns [singleton] ServerID. -// Before SetGlobalServerID, GlobalServerID() returns -1 -func GlobalSessionManager() *SessionManager { - return globalSessionManager -} - -// SetGlobalServerID sets the [singleton] ServerID. ServerID returned by -// GlobalServerID(). Those who use GlobalServerID should call SetGlobalServerID() -// as early as possible in main() before use ServerID. -func SetGlobalSessionManager(sm *SessionManager) { - globalSessionManager = sm + s.processKeepAliveResponse(ch) } // GetServerID gets id from etcd with key: metaRootPath + "/services/id" // Each server get ServerID and add one to id. -func (sm *SessionManager) getServerID() (int64, error) { - return sm.getServerIDWithKey(defaultIDKey, defaultRetryTimes) +func (s *Session) getServerID() (int64, error) { + return s.getServerIDWithKey(defaultIDKey, defaultRetryTimes) } -func (sm *SessionManager) checkIDExist() { - sm.etcdKV.CompareVersionAndSwap(defaultServiceRoot+defaultIDKey, 0, "1") +func (s *Session) checkIDExist() { + s.etcdCli.Txn(s.ctx).If( + clientv3.Compare( + clientv3.Version(path.Join(defaultServiceRoot, defaultIDKey)), + "=", + 0)). + Then(clientv3.OpPut(path.Join(defaultServiceRoot, defaultIDKey), "1")).Commit() + } -func (sm *SessionManager) getServerIDWithKey(key string, retryTimes int) (int64, error) { +func (s *Session) getServerIDWithKey(key string, retryTimes int) (int64, error) { res := int64(0) getServerIDWithKeyFn := func() error { - value, err := sm.etcdKV.Load(defaultServiceRoot + key) + getResp, err := s.etcdCli.Get(s.ctx, path.Join(defaultServiceRoot, key)) if err != nil { return nil } + if getResp.Count <= 0 { + return fmt.Errorf("there is no value on key = %s", key) + } + value := string(getResp.Kvs[0].Value) valueInt, err := strconv.ParseInt(value, 10, 64) if err != nil { log.Debug("session", zap.Error(err)) return err } - err = sm.etcdKV.CompareValueAndSwap(defaultServiceRoot+key, value, - strconv.FormatInt(valueInt+1, 10)) + txnResp, err := s.etcdCli.Txn(s.ctx).If( + clientv3.Compare( + clientv3.Value(path.Join(defaultServiceRoot, defaultIDKey)), + "=", + value)). + Then(clientv3.OpPut(path.Join(defaultServiceRoot, defaultIDKey), strconv.FormatInt(valueInt+1, 10))).Commit() if err != nil { - log.Debug("session", zap.Error(err)) return err } + + if !txnResp.Succeeded { + return fmt.Errorf("function CompareAndSwap error for compare is false for key: %s", key) + } res = valueInt return nil } @@ -162,101 +146,110 @@ func (sm *SessionManager) getServerIDWithKey(key string, retryTimes int) (int64, // MetaRootPath is configurable in the config file. // Exclusive means whether this service can exist two at the same time, if so, // it is false. Otherwise, set it to true. -func (sm *SessionManager) registerService() (<-chan *clientv3.LeaseKeepAliveResponse, error) { - respID, err := sm.etcdKV.Grant(defaultTTL) - if err != nil { - log.Error("register service", zap.Error(err)) - return nil, err - } - sm.Self.LeaseID = respID +func (s *Session) registerService() (<-chan *clientv3.LeaseKeepAliveResponse, error) { + var ch <-chan *clientv3.LeaseKeepAliveResponse + registerFn := func() error { + resp, err := s.etcdCli.Grant(s.ctx, defaultTTL) + if err != nil { + log.Error("register service", zap.Error(err)) + return err + } + s.leaseID = resp.ID - sessionJSON, err := json.Marshal(sm.Self) - if err != nil { - return nil, err - } + sessionJSON, err := json.Marshal(s) + if err != nil { + return err + } - key := defaultServiceRoot + sm.Self.ServerName - if !sm.Self.Exclusive { - key = key + "-" + strconv.FormatInt(sm.Self.ServerID, 10) - } - err = sm.etcdKV.CompareVersionAndSwap(key, 0, string(sessionJSON), clientv3.WithLease(respID)) - if err != nil { - fmt.Printf("compare and swap error %s\n. maybe the key has registered", err) - return nil, err - } + key := s.ServerName + if !s.Exclusive { + key = key + "-" + strconv.FormatInt(s.ServerID, 10) + } + txnResp, err := s.etcdCli.Txn(s.ctx).If( + clientv3.Compare( + clientv3.Version(path.Join(defaultServiceRoot, key)), + "=", + 0)). + Then(clientv3.OpPut(path.Join(defaultServiceRoot, key), string(sessionJSON), clientv3.WithLease(resp.ID))).Commit() - ch, err := sm.etcdKV.KeepAlive(respID) + if err != nil { + fmt.Printf("compare and swap error %s\n. maybe the key has registered", err) + return err + } + + if !txnResp.Succeeded { + return fmt.Errorf("function CompareAndSwap error for compare is false for key: %s", key) + } + + ch, err = s.etcdCli.KeepAlive(s.ctx, resp.ID) + if err != nil { + fmt.Printf("keep alive error %s\n", err) + return err + } + return nil + } + err := retry.Retry(defaultRetryTimes, time.Millisecond*200, registerFn) if err != nil { - fmt.Printf("keep alive error %s\n", err) - return nil, err + return ch, nil } return ch, nil } // ProcessKeepAliveResponse processes the response of etcd keepAlive interface // If keepAlive fails for unexpected error, it will send a signal to the channel. -func (sm *SessionManager) processKeepAliveResponse(ch <-chan *clientv3.LeaseKeepAliveResponse) { +func (s *Session) processKeepAliveResponse(ch <-chan *clientv3.LeaseKeepAliveResponse) (failChannel <-chan bool) { + failCh := make(chan bool) go func() { for { select { - case <-sm.ctx.Done(): + case <-s.ctx.Done(): log.Error("keep alive", zap.Error(errors.New("context done"))) return case resp, ok := <-ch: if !ok { - panic("keepAlive with etcd failed") + failCh <- true } if resp == nil { - panic("keepAlive with etcd failed") + failCh <- true } } } }() + return failCh } -// UpdateSessions will update local sessions same as the sessions saved in etcd. -// It makes locally stored sessions up-to-date. -func (sm *SessionManager) UpdateSessions(prefix string) error { - resKey, resValue, err := sm.etcdKV.LoadWithPrefix(defaultServiceRoot + prefix) +// GetSessions will get all sessions registered in etcd. +func (s *Session) GetSessions(prefix string) (map[string]*Session, error) { + res := make(map[string]*Session) + key := path.Join(defaultServiceRoot, prefix) + resp, err := s.etcdCli.Get(s.ctx, key, clientv3.WithPrefix(), + clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)) if err != nil { - return err + return nil, err } - for i := 0; i < len(resKey); i++ { + for _, kv := range resp.Kvs { session := &Session{} - err = json.Unmarshal([]byte(resValue[i]), session) + err = json.Unmarshal([]byte(kv.Value), session) if err != nil { - return err + return nil, err } - sm.Sessions.Store(resKey[i], session) + res[string(kv.Key)] = session } - return nil -} - -// GetSessions gets all the services saved in memory. -// Before GetSessions, you should WatchServices or UpdateSessions first. -func (sm *SessionManager) GetSessions(prefix string) map[string]*Session { - sessions := map[string]*Session{} - sm.Sessions.Range(func(key, value interface{}) bool { - if strings.Contains(fmt.Sprint(key), prefix) { - sessions[fmt.Sprint(key)] = value.(*Session) - } - return true - }) - return sessions + return res, nil } // WatchServices watch the service's up and down in etcd, and saves it into local -// sessions. If a server up, it will be add to sessions. But it won't get the -// sessions startup before watch start. -// UpdateSessions and WatchServices is recommended. -func (sm *SessionManager) WatchServices(ctx context.Context, prefix string) (addChannel <-chan *Session, delChannel <-chan *Session) { +// sessions. +// If a server up, it will be add to addChannel. +// If a server is offline, it will be add to delChannel. +func (s *Session) WatchServices(prefix string) (addChannel <-chan *Session, delChannel <-chan *Session) { addCh := make(chan *Session, 10) delCh := make(chan *Session, 10) - rch := sm.etcdKV.WatchWithPrefix(defaultServiceRoot + prefix) + rch := s.etcdCli.Watch(s.ctx, path.Join(defaultServiceRoot, prefix), clientv3.WithPrefix(), clientv3.WithPrevKV()) go func() { for { select { - case <-ctx.Done(): + case <-s.ctx.Done(): return case wresp, ok := <-rch: if !ok { @@ -273,15 +266,17 @@ func (sm *SessionManager) WatchServices(ctx context.Context, prefix string) (add log.Error("watch services", zap.Error(err)) continue } - sm.Sessions.Store(string(ev.Kv.Key), session) addCh <- session case mvccpb.DELETE: log.Debug("watch services", - zap.Any("delete kv", ev.Kv)) - value, isloaded := sm.Sessions.LoadAndDelete(string(ev.Kv.Key)) - if isloaded { - delCh <- value.(*Session) + zap.Any("delete kv", ev.PrevKv)) + session := &Session{} + err := json.Unmarshal([]byte(ev.PrevKv.Value), session) + if err != nil { + log.Error("watch services", zap.Error(err)) + continue } + delCh <- session } } @@ -291,19 +286,19 @@ func (sm *SessionManager) WatchServices(ctx context.Context, prefix string) (add return addCh, delCh } -func initEtcd(etcdAddress, rootPath string) (*etcdkv.EtcdKV, error) { - var etcdKV *etcdkv.EtcdKV +func initEtcd(etcdAddress string) (*clientv3.Client, error) { + var etcdCli *clientv3.Client connectEtcdFn := func() error { - etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddress}, DialTimeout: 5 * time.Second}) + etcd, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddress}, DialTimeout: 5 * time.Second}) if err != nil { return err } - etcdKV = etcdkv.NewEtcdKV(etcdCli, rootPath) + etcdCli = etcd return nil } err := retry.Retry(100000, time.Millisecond*200, connectEtcdFn) if err != nil { return nil, err } - return etcdKV, nil + return etcdCli, nil } diff --git a/internal/util/sessionutil/session_util_test.go b/internal/util/sessionutil/session_util_test.go index 9b117aff75..f30e51b1cd 100644 --- a/internal/util/sessionutil/session_util_test.go +++ b/internal/util/sessionutil/session_util_test.go @@ -1,8 +1,6 @@ package sessionutil import ( - "fmt" - "math/rand" "sync" "testing" "time" @@ -18,8 +16,6 @@ var Params paramtable.BaseTable func TestGetServerIDConcurrently(t *testing.T) { ctx := context.Background() - rand.Seed(time.Now().UnixNano()) - randVal := rand.Int() Params.Init() etcdAddr, err := Params.Load("_EtcdAddress") @@ -29,8 +25,9 @@ func TestGetServerIDConcurrently(t *testing.T) { cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) assert.Nil(t, err) - rootPath := fmt.Sprintf("/%d/test/meta", randVal) - etcdKV := etcdkv.NewEtcdKV(cli, rootPath) + etcdKV := etcdkv.NewEtcdKV(cli, "") + _, err = cli.Delete(ctx, "/session", clientv3.WithPrefix()) + assert.Nil(t, err) defer etcdKV.Close() defer etcdKV.RemoveWithPrefix("") @@ -38,13 +35,12 @@ func TestGetServerIDConcurrently(t *testing.T) { var wg sync.WaitGroup var muList sync.Mutex = sync.Mutex{} - self := NewSession("test", "testAddr", false) - sm := NewSessionManager(ctx, etcdAddr, rootPath, self) + s := NewSession(ctx, []string{etcdAddr}, "test", "testAddr", false) res := make([]int64, 0) getIDFunc := func() { - sm.checkIDExist() - id, err := sm.getServerID() + s.checkIDExist() + id, err := s.getServerID() assert.Nil(t, err) muList.Lock() res = append(res, id) @@ -65,8 +61,6 @@ func TestGetServerIDConcurrently(t *testing.T) { func TestInit(t *testing.T) { ctx := context.Background() - rand.Seed(time.Now().UnixNano()) - randVal := rand.Int() Params.Init() etcdAddr, err := Params.Load("_EtcdAddress") @@ -76,23 +70,20 @@ func TestInit(t *testing.T) { cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) assert.Nil(t, err) - rootPath := fmt.Sprintf("/%d/test/meta", randVal) - etcdKV := etcdkv.NewEtcdKV(cli, rootPath) + etcdKV := etcdkv.NewEtcdKV(cli, "") + _, err = cli.Delete(ctx, "/session", clientv3.WithPrefix()) + assert.Nil(t, err) defer etcdKV.Close() defer etcdKV.RemoveWithPrefix("") - self := NewSession("test", "testAddr", false) - sm := NewSessionManager(ctx, etcdAddr, rootPath, self) - sm.Init() - assert.NotEqual(t, 0, sm.Self.LeaseID) - assert.NotEqual(t, 0, sm.Self.ServerID) + s := NewSession(ctx, []string{etcdAddr}, "test", "testAddr", false) + assert.NotEqual(t, 0, s.leaseID) + assert.NotEqual(t, 0, s.ServerID) } func TestUpdateSessions(t *testing.T) { ctx := context.Background() - rand.Seed(time.Now().UnixNano()) - randVal := rand.Int() Params.Init() etcdAddr, err := Params.Load("_EtcdAddress") @@ -102,8 +93,9 @@ func TestUpdateSessions(t *testing.T) { cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) assert.Nil(t, err) - rootPath := fmt.Sprintf("/%d/test/meta", randVal) - etcdKV := etcdkv.NewEtcdKV(cli, rootPath) + etcdKV := etcdkv.NewEtcdKV(cli, "") + _, err = cli.Delete(ctx, "/session", clientv3.WithPrefix()) + assert.Nil(t, err) defer etcdKV.Close() defer etcdKV.RemoveWithPrefix("") @@ -111,21 +103,20 @@ func TestUpdateSessions(t *testing.T) { var wg sync.WaitGroup var muList sync.Mutex = sync.Mutex{} - self := NewSession("test", "testAddr", false) - sm := NewSessionManager(ctx, etcdAddr, rootPath, self) + s := NewSession(ctx, []string{etcdAddr}, "test", "testAddr", false) - err = sm.UpdateSessions("test") + sessions, err := s.GetSessions("test") assert.Nil(t, err) - addCh, delCh := sm.WatchServices(ctx, "test") + assert.Equal(t, len(sessions), 0) + addCh, delCh := s.WatchServices("test") - sessionManagers := make([]*SessionManager, 0) + sList := []*Session{} getIDFunc := func() { - service := NewSession("test", "testAddr", false) - singleManager := NewSessionManager(ctx, etcdAddr, rootPath, service) - singleManager.Init() + singleS := NewSession(ctx, []string{etcdAddr}, "test", "testAddr", false) + singleS.Init() muList.Lock() - sessionManagers = append(sessionManagers, singleManager) + sList = append(sList, singleS) muList.Unlock() wg.Done() } @@ -137,13 +128,16 @@ func TestUpdateSessions(t *testing.T) { wg.Wait() assert.Eventually(t, func() bool { - return len(sm.GetSessions("test")) == 10 + sessions, _ := s.GetSessions("test") + return len(sessions) == 10 }, 10*time.Second, 100*time.Millisecond) - assert.Equal(t, len(sm.GetSessions("testt")), 0) + notExistSessions, _ := s.GetSessions("testt") + assert.Equal(t, len(notExistSessions), 0) etcdKV.RemoveWithPrefix("") assert.Eventually(t, func() bool { - return len(sm.GetSessions("test")) == 0 + sessions, _ := s.GetSessions("test") + return len(sessions) == 0 }, 10*time.Second, 100*time.Millisecond) addSessions := []*Session{}