diff --git a/internal/allocator/id.go b/internal/allocator/id.go index 6bdd969d3c..e583da9efd 100644 --- a/internal/allocator/id.go +++ b/internal/allocator/id.go @@ -36,6 +36,7 @@ type IDAllocator struct { Allocator etcdAddr []string + metaRoot string masterAddress string masterClient types.MasterService @@ -47,7 +48,7 @@ type IDAllocator struct { PeerID UniqueID } -func NewIDAllocator(ctx context.Context, masterAddr string, etcdAddr []string) (*IDAllocator, error) { +func NewIDAllocator(ctx context.Context, masterAddr, metaRoot string, etcdAddr []string) (*IDAllocator, error) { ctx1, cancel := context.WithCancel(ctx) a := &IDAllocator{ @@ -57,6 +58,7 @@ func NewIDAllocator(ctx context.Context, masterAddr string, etcdAddr []string) ( Role: "IDAllocator", }, countPerRPC: IDCountPerRPC, + metaRoot: metaRoot, etcdAddr: etcdAddr, masterAddress: masterAddr, } @@ -72,7 +74,7 @@ func NewIDAllocator(ctx context.Context, masterAddr string, etcdAddr []string) ( func (ia *IDAllocator) Start() error { var err error - ia.masterClient, err = msc.NewClient(ia.masterAddress, ia.etcdAddr, 20*time.Second) + ia.masterClient, err = msc.NewClient(ia.masterAddress, ia.metaRoot, ia.etcdAddr, 20*time.Second) if err != nil { panic(err) } diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 968567db81..2d8bfc7450 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -127,7 +127,7 @@ func (node *DataNode) SetDataServiceInterface(ds types.DataService) error { // Register register data node at etcd func (node *DataNode) Register() error { - node.session = sessionutil.NewSession(node.ctx, []string{Params.EtcdAddress}) + node.session = sessionutil.NewSession(node.ctx, Params.MetaRootPath, []string{Params.EtcdAddress}) node.session.Init(typeutil.DataNodeRole, Params.IP+":"+strconv.Itoa(Params.Port), false) Params.NodeID = node.session.ServerID return nil diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go index 4ddf8642b3..3b1132cff1 100644 --- a/internal/dataservice/server.go +++ b/internal/dataservice/server.go @@ -113,7 +113,7 @@ func (s *Server) SetMasterClient(masterClient types.MasterService) { // Register register data service at etcd func (s *Server) Register() error { - s.session = sessionutil.NewSession(s.ctx, []string{Params.EtcdAddress}) + s.session = sessionutil.NewSession(s.ctx, Params.MetaRootPath, []string{Params.EtcdAddress}) s.session.Init(typeutil.DataServiceRole, Params.IP, true) Params.NodeID = s.session.ServerID return nil diff --git a/internal/dataservice/server_test.go b/internal/dataservice/server_test.go index 5d4a5b18d0..984312dca5 100644 --- a/internal/dataservice/server_test.go +++ b/internal/dataservice/server_test.go @@ -14,6 +14,7 @@ import ( "context" "math" "math/rand" + "path" "testing" "time" @@ -789,7 +790,8 @@ func newTestServer(t *testing.T) *Server { etcdCli, err := initEtcd(Params.EtcdAddress) assert.Nil(t, err) - _, err = etcdCli.Delete(context.Background(), sessionutil.DefaultServiceRoot, clientv3.WithPrefix()) + sessKey := path.Join(Params.MetaRootPath, sessionutil.DefaultServiceRoot) + _, err = etcdCli.Delete(context.Background(), sessKey, clientv3.WithPrefix()) assert.Nil(t, err) svr, err := CreateServer(context.TODO(), factory) diff --git a/internal/distributed/datanode/datanode_test.go b/internal/distributed/datanode/datanode_test.go index e403056333..1536a8bf21 100644 --- a/internal/distributed/datanode/datanode_test.go +++ b/internal/distributed/datanode/datanode_test.go @@ -125,7 +125,7 @@ func TestRun(t *testing.T) { dnServer.newMasterServiceClient = func(s string) (types.MasterService, error) { return &mockMaster{}, nil } - dnServer.newDataServiceClient = func(s, etcdAddress string, timeout time.Duration) types.DataService { + dnServer.newDataServiceClient = func(s, etcdMetaRoot, etcdAddress string, timeout time.Duration) types.DataService { return &mockDataService{} } diff --git a/internal/distributed/datanode/service.go b/internal/distributed/datanode/service.go index 1c617268f5..cc8cb278e3 100644 --- a/internal/distributed/datanode/service.go +++ b/internal/distributed/datanode/service.go @@ -56,7 +56,7 @@ type Server struct { dataService types.DataService newMasterServiceClient func(string) (types.MasterService, error) - newDataServiceClient func(string, string, time.Duration) types.DataService + newDataServiceClient func(string, string, string, time.Duration) types.DataService closer io.Closer } @@ -70,10 +70,10 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) msFactory: factory, grpcErrChan: make(chan error), newMasterServiceClient: func(s string) (types.MasterService, error) { - return msc.NewClient(s, []string{dn.Params.EtcdAddress}, 20*time.Second) + return msc.NewClient(s, dn.Params.MetaRootPath, []string{dn.Params.EtcdAddress}, 20*time.Second) }, - newDataServiceClient: func(s, etcdAddress string, timeout time.Duration) types.DataService { - return dsc.NewClient(Params.DataServiceAddress, []string{etcdAddress}, timeout) + newDataServiceClient: func(s, etcdMetaRoot, etcdAddress string, timeout time.Duration) types.DataService { + return dsc.NewClient(Params.DataServiceAddress, etcdMetaRoot, []string{etcdAddress}, timeout) }, } @@ -205,7 +205,7 @@ func (s *Server) init() error { if s.newDataServiceClient != nil { log.Debug("Data service address", zap.String("address", Params.DataServiceAddress)) log.Debug("DataNode Init data service client ...") - dataServiceClient := s.newDataServiceClient(Params.DataServiceAddress, dn.Params.EtcdAddress, 10) + dataServiceClient := s.newDataServiceClient(Params.DataServiceAddress, dn.Params.MetaRootPath, dn.Params.EtcdAddress, 10) if err = dataServiceClient.Init(); err != nil { panic(err) } diff --git a/internal/distributed/dataservice/client/client.go b/internal/distributed/dataservice/client/client.go index 45cc3a8538..6816f30fce 100644 --- a/internal/distributed/dataservice/client/client.go +++ b/internal/distributed/dataservice/client/client.go @@ -56,8 +56,8 @@ func getDataServiceAddress(sess *sessionutil.Session) (string, error) { return ms.Address, nil } -func NewClient(address string, etcdAddr []string, timeout time.Duration) *Client { - sess := sessionutil.NewSession(context.Background(), etcdAddr) +func NewClient(address, metaRoot string, etcdAddr []string, timeout time.Duration) *Client { + sess := sessionutil.NewSession(context.Background(), metaRoot, etcdAddr) return &Client{ addr: address, ctx: context.Background(), diff --git a/internal/distributed/dataservice/service.go b/internal/distributed/dataservice/service.go index 36001dcbf9..11f3d04e6e 100644 --- a/internal/distributed/dataservice/service.go +++ b/internal/distributed/dataservice/service.go @@ -68,7 +68,7 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) cancel: cancel, grpcErrChan: make(chan error), newMasterServiceClient: func(s string) (types.MasterService, error) { - return msc.NewClient(s, []string{dataservice.Params.EtcdAddress}, 10*time.Second) + return msc.NewClient(s, dataservice.Params.MetaRootPath, []string{dataservice.Params.EtcdAddress}, 10*time.Second) }, } s.dataService, err = dataservice.CreateServer(s.ctx, factory) diff --git a/internal/distributed/masterservice/client/client.go b/internal/distributed/masterservice/client/client.go index 3df4774b7d..20afaff4a3 100644 --- a/internal/distributed/masterservice/client/client.go +++ b/internal/distributed/masterservice/client/client.go @@ -54,8 +54,8 @@ func getMasterServiceAddr(sess *sessionutil.Session) (string, error) { return ms.Address, nil } -func NewClient(addr string, etcdAddr []string, timeout time.Duration) (*GrpcClient, error) { - sess := sessionutil.NewSession(context.Background(), etcdAddr) +func NewClient(addr string, metaRoot string, etcdAddr []string, timeout time.Duration) (*GrpcClient, error) { + sess := sessionutil.NewSession(context.Background(), metaRoot, etcdAddr) if sess == nil { return nil, fmt.Errorf("new session error, maybe can not connect to etcd") } diff --git a/internal/distributed/masterservice/masterservice_test.go b/internal/distributed/masterservice/masterservice_test.go index 4858d96e02..1d416ab321 100644 --- a/internal/distributed/masterservice/masterservice_test.go +++ b/internal/distributed/masterservice/masterservice_test.go @@ -13,8 +13,10 @@ package grpcmasterservice import ( "context" + "encoding/json" "fmt" "math/rand" + "path" "strconv" "strings" "sync" @@ -32,6 +34,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/masterpb" "github.com/milvus-io/milvus/internal/proto/milvuspb" + "github.com/milvus-io/milvus/internal/proto/proxypb" "github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/retry" @@ -87,6 +90,14 @@ func GenFlushedSegMsgPack(segID typeutil.UniqueID) *msgstream.MsgPack { return &msgPack } +type proxyNodeMock struct { + types.ProxyNode + invalidateCollectionMetaCache func(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) +} + +func (p *proxyNodeMock) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) { + return p.invalidateCollectionMetaCache(ctx, request) +} func TestGrpcService(t *testing.T) { const ( dbName = "testDB" @@ -141,7 +152,17 @@ func TestGrpcService(t *testing.T) { etcdCli, err := initEtcd(cms.Params.EtcdAddress) assert.Nil(t, err) - _, err = etcdCli.Delete(ctx, sessionutil.DefaultServiceRoot, clientv3.WithPrefix()) + sessKey := path.Join(cms.Params.MetaRootPath, sessionutil.DefaultServiceRoot) + _, err = etcdCli.Delete(ctx, sessKey, clientv3.WithPrefix()) + assert.Nil(t, err) + + pnb, err := json.Marshal( + &sessionutil.Session{ + ServerID: 100, + }, + ) + assert.Nil(t, err) + _, err = etcdCli.Put(ctx, path.Join(sessKey, typeutil.ProxyNodeRole+"-100"), string(pnb)) assert.Nil(t, err) err = core.Init() @@ -213,9 +234,15 @@ func TestGrpcService(t *testing.T) { } collectionMetaCache := make([]string, 0, 16) - core.CallInvalidateCollectionMetaCacheService = func(ctx context.Context, ts typeutil.Timestamp, dbName string, collectionName string) error { - collectionMetaCache = append(collectionMetaCache, collectionName) - return nil + pnm := proxyNodeMock{} + core.NewProxyClient = func(*sessionutil.Session) (types.ProxyNode, error) { + return &pnm, nil + } + pnm.invalidateCollectionMetaCache = func(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) { + collectionMetaCache = append(collectionMetaCache, request.CollectionName) + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, nil } core.CallReleaseCollectionService = func(ctx context.Context, ts typeutil.Timestamp, dbID typeutil.UniqueID, collectionID typeutil.UniqueID) error { @@ -227,7 +254,7 @@ func TestGrpcService(t *testing.T) { svr.masterService.UpdateStateCode(internalpb.StateCode_Healthy) - cli, err := grpcmasterserviceclient.NewClient(Params.Address, []string{cms.Params.EtcdAddress}, 3*time.Second) + cli, err := grpcmasterserviceclient.NewClient(Params.Address, cms.Params.MetaRootPath, []string{cms.Params.EtcdAddress}, 3*time.Second) assert.Nil(t, err) err = cli.Init() @@ -825,6 +852,9 @@ func (m *mockCore) Stop() error { return fmt.Errorf("stop error") } +func (m *mockCore) SetNewProxyClient(func(sess *sessionutil.Session) (types.ProxyNode, error)) { +} + type mockProxy struct { types.ProxyService } @@ -925,7 +955,7 @@ func TestRun(t *testing.T) { svr.newProxyServiceClient = func(s string) types.ProxyService { return &mockProxy{} } - svr.newDataServiceClient = func(s, address string, timeout time.Duration) types.DataService { + svr.newDataServiceClient = func(s, metaRoot, address string, timeout time.Duration) types.DataService { return &mockDataService{} } svr.newIndexServiceClient = func(s string) types.IndexService { @@ -944,7 +974,8 @@ func TestRun(t *testing.T) { etcdCli, err := initEtcd(cms.Params.EtcdAddress) assert.Nil(t, err) - _, err = etcdCli.Delete(ctx, sessionutil.DefaultServiceRoot, clientv3.WithPrefix()) + sessKey := path.Join(cms.Params.MetaRootPath, sessionutil.DefaultServiceRoot) + _, err = etcdCli.Delete(ctx, sessKey, clientv3.WithPrefix()) assert.Nil(t, err) err = svr.Run() assert.Nil(t, err) diff --git a/internal/distributed/masterservice/server.go b/internal/distributed/masterservice/server.go index 0ddde581f7..54aba1c525 100644 --- a/internal/distributed/masterservice/server.go +++ b/internal/distributed/masterservice/server.go @@ -27,6 +27,7 @@ import ( dsc "github.com/milvus-io/milvus/internal/distributed/dataservice/client" isc "github.com/milvus-io/milvus/internal/distributed/indexservice/client" + pnc "github.com/milvus-io/milvus/internal/distributed/proxynode/client" psc "github.com/milvus-io/milvus/internal/distributed/proxyservice/client" qsc "github.com/milvus-io/milvus/internal/distributed/queryservice/client" "github.com/milvus-io/milvus/internal/log" @@ -40,6 +41,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/masterpb" "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/util/funcutil" + "github.com/milvus-io/milvus/internal/util/sessionutil" ) // grpc wrapper @@ -59,7 +61,7 @@ type Server struct { queryService types.QueryService newProxyServiceClient func(string) types.ProxyService - newDataServiceClient func(string, string, time.Duration) types.DataService + newDataServiceClient func(string, string, string, time.Duration) types.DataService newIndexServiceClient func(string) types.IndexService newQueryServiceClient func(string) (types.QueryService, error) @@ -98,8 +100,8 @@ func (s *Server) setClient() { } return psClient } - s.newDataServiceClient = func(s, etcdAddress string, timeout time.Duration) types.DataService { - dsClient := dsc.NewClient(s, []string{etcdAddress}, timeout) + s.newDataServiceClient = func(s, etcdMetaRoot, etcdAddress string, timeout time.Duration) types.DataService { + dsClient := dsc.NewClient(s, etcdMetaRoot, []string{etcdAddress}, timeout) if err := dsClient.Init(); err != nil { panic(err) } @@ -173,6 +175,19 @@ func (s *Server) init() error { s.masterService.UpdateStateCode(internalpb.StateCode_Initializing) + s.masterService.SetNewProxyClient( + func(s *sessionutil.Session) (types.ProxyNode, error) { + cli := pnc.NewClient(ctx, s.Address) + if err := cli.Init(); err != nil { + return nil, err + } + if err := cli.Start(); err != nil { + return nil, err + } + return cli, nil + }, + ) + if s.newProxyServiceClient != nil { log.Debug("proxy service", zap.String("address", Params.ProxyServiceAddress)) proxyService := s.newProxyServiceClient(Params.ProxyServiceAddress) @@ -183,7 +198,7 @@ func (s *Server) init() error { } if s.newDataServiceClient != nil { log.Debug("data service", zap.String("address", Params.DataServiceAddress)) - dataService := s.newDataServiceClient(Params.DataServiceAddress, cms.Params.EtcdAddress, 10) + dataService := s.newDataServiceClient(Params.DataServiceAddress, cms.Params.MetaRootPath, cms.Params.EtcdAddress, 10) if err := s.masterService.SetDataService(ctx, dataService); err != nil { panic(err) } diff --git a/internal/distributed/proxynode/service.go b/internal/distributed/proxynode/service.go index 359fc0a850..9cb5e8fc16 100644 --- a/internal/distributed/proxynode/service.go +++ b/internal/distributed/proxynode/service.go @@ -189,7 +189,7 @@ func (s *Server) init() error { masterServiceAddr := Params.MasterAddress log.Debug("proxynode", zap.String("master address", masterServiceAddr)) timeout := 3 * time.Second - s.masterServiceClient, err = grpcmasterserviceclient.NewClient(masterServiceAddr, []string{proxynode.Params.EtcdAddress}, timeout) + s.masterServiceClient, err = grpcmasterserviceclient.NewClient(masterServiceAddr, proxynode.Params.MetaRootPath, []string{proxynode.Params.EtcdAddress}, timeout) if err != nil { return err } @@ -207,7 +207,7 @@ func (s *Server) init() error { dataServiceAddr := Params.DataServiceAddress log.Debug("proxynode", zap.String("data service address", dataServiceAddr)) - s.dataServiceClient = grpcdataserviceclient.NewClient(dataServiceAddr, []string{proxynode.Params.EtcdAddress}, 10) + s.dataServiceClient = grpcdataserviceclient.NewClient(dataServiceAddr, proxynode.Params.MetaRootPath, []string{proxynode.Params.EtcdAddress}, 10) err = s.dataServiceClient.Init() if err != nil { return err diff --git a/internal/distributed/querynode/service.go b/internal/distributed/querynode/service.go index 845c552fd2..e7340e3363 100644 --- a/internal/distributed/querynode/service.go +++ b/internal/distributed/querynode/service.go @@ -134,7 +134,7 @@ func (s *Server) init() error { log.Debug("Master service", zap.String("address", addr)) log.Debug("Init master service client ...") - masterService, err := msc.NewClient(addr, []string{qn.Params.EtcdAddress}, 20*time.Second) + masterService, err := msc.NewClient(addr, qn.Params.MetaRootPath, []string{qn.Params.EtcdAddress}, 20*time.Second) if err != nil { panic(err) } @@ -181,7 +181,7 @@ func (s *Server) init() error { log.Debug("Data service", zap.String("address", Params.DataServiceAddress)) log.Debug("QueryNode Init data service client ...") - dataService := dsc.NewClient(Params.DataServiceAddress, []string{qn.Params.EtcdAddress}, 10) + dataService := dsc.NewClient(Params.DataServiceAddress, qn.Params.MetaRootPath, []string{qn.Params.EtcdAddress}, 10) if err = dataService.Init(); err != nil { panic(err) } diff --git a/internal/distributed/queryservice/service.go b/internal/distributed/queryservice/service.go index d61e6f1241..8bb4413d3d 100644 --- a/internal/distributed/queryservice/service.go +++ b/internal/distributed/queryservice/service.go @@ -111,7 +111,7 @@ func (s *Server) init() error { log.Debug("Master service", zap.String("address", Params.MasterAddress)) log.Debug("Init master service client ...") - masterService, err := msc.NewClient(Params.MasterAddress, []string{qs.Params.EtcdAddress}, 20*time.Second) + masterService, err := msc.NewClient(Params.MasterAddress, qs.Params.MetaRootPath, []string{qs.Params.EtcdAddress}, 20*time.Second) if err != nil { panic(err) @@ -138,7 +138,7 @@ func (s *Server) init() error { log.Debug("DataService", zap.String("Address", Params.DataServiceAddress)) log.Debug("QueryService Init data service client ...") - dataService := dsc.NewClient(Params.DataServiceAddress, []string{qs.Params.EtcdAddress}, 10) + dataService := dsc.NewClient(Params.DataServiceAddress, qs.Params.MetaRootPath, []string{qs.Params.EtcdAddress}, 10) if err = dataService.Init(); err != nil { panic(err) } diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index ae7c6bf892..457667dbf1 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -79,7 +79,7 @@ func NewIndexNode(ctx context.Context) (*IndexNode, error) { // Register register index node at etcd func (i *IndexNode) Register() error { - i.session = sessionutil.NewSession(i.loopCtx, []string{Params.EtcdAddress}) + i.session = sessionutil.NewSession(i.loopCtx, Params.MetaRootPath, []string{Params.EtcdAddress}) i.session.Init(typeutil.IndexNodeRole, Params.IP+":"+strconv.Itoa(Params.Port), false) Params.NodeID = i.session.ServerID return nil diff --git a/internal/indexservice/indexservice.go b/internal/indexservice/indexservice.go index f6bd9086da..3e3d364726 100644 --- a/internal/indexservice/indexservice.go +++ b/internal/indexservice/indexservice.go @@ -87,7 +87,7 @@ func NewIndexService(ctx context.Context) (*IndexService, error) { // Register register index service at etcd func (i *IndexService) Register() error { - i.session = sessionutil.NewSession(i.loopCtx, []string{Params.EtcdAddress}) + i.session = sessionutil.NewSession(i.loopCtx, Params.MetaRootPath, []string{Params.EtcdAddress}) i.session.Init(typeutil.IndexServiceRole, Params.Address, true) return nil } diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go index 70d9bb615c..787d82e9af 100644 --- a/internal/masterservice/master_service.go +++ b/internal/masterservice/master_service.go @@ -16,6 +16,7 @@ import ( "encoding/json" "fmt" "math/rand" + "os" "sync" "sync/atomic" "time" @@ -128,8 +129,7 @@ type Core struct { CallBuildIndexService func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo) (typeutil.UniqueID, error) CallDropIndexService func(ctx context.Context, indexID typeutil.UniqueID) error - //proxy service interface, notify proxy service to drop collection - CallInvalidateCollectionMetaCacheService func(ctx context.Context, ts typeutil.Timestamp, dbName string, collectionName string) error + NewProxyClient func(sess *sessionutil.Session) (types.ProxyNode, error) //query service interface, notify query service to release collection CallReleaseCollectionService func(ctx context.Context, ts typeutil.Timestamp, dbID typeutil.UniqueID, collectionID typeutil.UniqueID) error @@ -137,6 +137,12 @@ type Core struct { //dd request scheduler ddReqQueue chan reqTask //dd request will be push into this chan + //proxynode manager + proxyNodeManager *proxyNodeManager + + // proxy clients + proxyClientManager *proxyClientManager + // channel timetick chanTimeTick *timetickSync @@ -151,7 +157,8 @@ type Core struct { startOnce sync.Once //isInit atomic.Value - session *sessionutil.Session + session *sessionutil.Session + sessCloseCh <-chan bool msFactory ms.Factory } @@ -226,8 +233,8 @@ func (c *Core) checkInit() error { if c.CallDropIndexService == nil { return fmt.Errorf("CallDropIndexService is nil") } - if c.CallInvalidateCollectionMetaCacheService == nil { - return fmt.Errorf("CallInvalidateCollectionMetaCacheService is nil") + if c.NewProxyClient == nil { + return fmt.Errorf("NewProxyNodeClient is nil") } if c.CallReleaseCollectionService == nil { return fmt.Errorf("CallReleaseCollectionService is nil") @@ -238,6 +245,7 @@ func (c *Core) checkInit() error { if c.DataNodeFlushedSegmentChan == nil { return fmt.Errorf("DataNodeFlushedSegmentChan is nil") } + return nil } @@ -458,6 +466,27 @@ func (c *Core) tsLoop() { } } +func (c *Core) sessionLoop() { + for { + select { + case <-c.ctx.Done(): + return + case _, ok := <-c.sessCloseCh: + if !ok { + log.Error("master service disconnect with etcd, process will exit in 1 second") + go func() { + time.Sleep(time.Second) + os.Exit(-1) + }() + } + } + } +} + +func (c *Core) watchProxyNodeLoop() { + +} + func (c *Core) setDdMsgSendFlag(b bool) error { flag, err := c.MetaTable.client.Load(DDMsgSendPrefix, 0) if err != nil { @@ -672,28 +701,18 @@ func (c *Core) SetProxyService(ctx context.Context, s types.ProxyService) error Params.ProxyTimeTickChannel = rsp.Value log.Debug("proxy time tick", zap.String("channel name", Params.ProxyTimeTickChannel)) - c.CallInvalidateCollectionMetaCacheService = func(ctx context.Context, ts typeutil.Timestamp, dbName string, collectionName string) error { - status, _ := s.InvalidateCollectionMetaCache(ctx, &proxypb.InvalidateCollMetaCacheRequest{ - Base: &commonpb.MsgBase{ - MsgType: 0, //TODO,MsgType - MsgID: 0, - Timestamp: ts, - SourceID: c.session.ServerID, - }, - DbName: dbName, - CollectionName: collectionName, - }) - if status == nil { - return fmt.Errorf("invalidate collection metacache resp is nil") - } - if status.ErrorCode != commonpb.ErrorCode_Success { - return fmt.Errorf(status.Reason) - } - return nil - } return nil } +//SetNewProxyClient create proxy node by this func +func (c *Core) SetNewProxyClient(f func(sess *sessionutil.Session) (types.ProxyNode, error)) { + if c.NewProxyClient == nil { + c.NewProxyClient = f + } else { + log.Debug("NewProxyClient has alread set") + } +} + func (c *Core) SetDataService(ctx context.Context, s types.DataService) error { rsp, err := s.GetSegmentInfoChannel(ctx) if err != nil { @@ -702,10 +721,18 @@ func (c *Core) SetDataService(ctx context.Context, s types.DataService) error { Params.DataServiceSegmentChannel = rsp.Value log.Debug("data service segment", zap.String("channel name", Params.DataServiceSegmentChannel)) - c.CallGetBinlogFilePathsService = func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error) { + c.CallGetBinlogFilePathsService = func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) (retFiles []string, retErr error) { + defer func() { + if err := recover(); err != nil { + retFiles = nil + retErr = fmt.Errorf("get bin log file paths panic, msg = %v", err) + } + }() ts, err := c.TSOAllocator(1) if err != nil { - return nil, err + retFiles = nil + retErr = err + return } binlog, err := s.GetInsertBinlogPaths(ctx, &datapb.GetInsertBinlogPathsRequest{ Base: &commonpb.MsgBase{ @@ -717,23 +744,40 @@ func (c *Core) SetDataService(ctx context.Context, s types.DataService) error { SegmentID: segID, }) if err != nil { - return nil, err + retFiles = nil + retErr = err + return } if binlog.Status.ErrorCode != commonpb.ErrorCode_Success { - return nil, fmt.Errorf("GetInsertBinlogPaths from data service failed, error = %s", binlog.Status.Reason) + retFiles = nil + retErr = fmt.Errorf("GetInsertBinlogPaths from data service failed, error = %s", binlog.Status.Reason) + return } for i := range binlog.FieldIDs { if binlog.FieldIDs[i] == fieldID { - return binlog.Paths[i].Values, nil + retFiles = binlog.Paths[i].Values + retErr = nil + return } } - return nil, fmt.Errorf("binlog file not exist, segment id = %d, field id = %d", segID, fieldID) + retFiles = nil + retErr = fmt.Errorf("binlog file not exist, segment id = %d, field id = %d", segID, fieldID) + return } - c.CallGetNumRowsService = func(segID typeutil.UniqueID, isFromFlushedChan bool) (int64, error) { + c.CallGetNumRowsService = func(segID typeutil.UniqueID, isFromFlushedChan bool) (retRows int64, retErr error) { + defer func() { + if err := recover(); err != nil { + retRows = 0 + retErr = fmt.Errorf("get num rows panic, msg = %v", err) + return + } + }() ts, err := c.TSOAllocator(1) if err != nil { - return 0, err + retRows = 0 + retErr = err + return } segInfo, err := s.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{ Base: &commonpb.MsgBase{ @@ -745,26 +789,41 @@ func (c *Core) SetDataService(ctx context.Context, s types.DataService) error { SegmentIDs: []typeutil.UniqueID{segID}, }) if err != nil { - return 0, err + retRows = 0 + retErr = err + return } if segInfo.Status.ErrorCode != commonpb.ErrorCode_Success { return 0, fmt.Errorf("GetSegmentInfo from data service failed, error = %s", segInfo.Status.Reason) } if len(segInfo.Infos) != 1 { log.Debug("get segment info empty") - return 0, nil + retRows = 0 + retErr = nil + return } if !isFromFlushedChan && segInfo.Infos[0].State != commonpb.SegmentState_Flushed { log.Debug("segment id not flushed", zap.Int64("segment id", segID)) - return 0, nil + retRows = 0 + retErr = nil + return } - return segInfo.Infos[0].NumRows, nil + retRows = segInfo.Infos[0].NumRows + retErr = nil + return } return nil } func (c *Core) SetIndexService(s types.IndexService) error { - c.CallBuildIndexService = func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo) (typeutil.UniqueID, error) { + c.CallBuildIndexService = func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo) (retID typeutil.UniqueID, retErr error) { + defer func() { + if err := recover(); err != nil { + retID = 0 + retErr = fmt.Errorf("build index panic, msg = %v", err) + return + } + }() rsp, err := s.BuildIndex(ctx, &indexpb.BuildIndexRequest{ DataPaths: binlog, TypeParams: field.TypeParams, @@ -773,32 +832,53 @@ func (c *Core) SetIndexService(s types.IndexService) error { IndexName: idxInfo.IndexName, }) if err != nil { - return 0, err + retID = 0 + retErr = err + return } if rsp.Status.ErrorCode != commonpb.ErrorCode_Success { - return 0, fmt.Errorf("BuildIndex from index service failed, error = %s", rsp.Status.Reason) + retID = 0 + retErr = fmt.Errorf("BuildIndex from index service failed, error = %s", rsp.Status.Reason) + return } - return rsp.IndexBuildID, nil + retID = rsp.IndexBuildID + retErr = nil + return } - c.CallDropIndexService = func(ctx context.Context, indexID typeutil.UniqueID) error { + c.CallDropIndexService = func(ctx context.Context, indexID typeutil.UniqueID) (retErr error) { + defer func() { + if err := recover(); err != nil { + retErr = fmt.Errorf("drop index from index service panic, msg = %v", err) + return + } + }() rsp, err := s.DropIndex(ctx, &indexpb.DropIndexRequest{ IndexID: indexID, }) if err != nil { - return err + retErr = err + return } if rsp.ErrorCode != commonpb.ErrorCode_Success { - return fmt.Errorf(rsp.Reason) + retErr = fmt.Errorf(rsp.Reason) + return } - return nil + retErr = nil + return } return nil } func (c *Core) SetQueryService(s types.QueryService) error { - c.CallReleaseCollectionService = func(ctx context.Context, ts typeutil.Timestamp, dbID typeutil.UniqueID, collectionID typeutil.UniqueID) error { + c.CallReleaseCollectionService = func(ctx context.Context, ts typeutil.Timestamp, dbID typeutil.UniqueID, collectionID typeutil.UniqueID) (retErr error) { + defer func() { + if err := recover(); err != nil { + retErr = fmt.Errorf("release collection from query service panic, msg = %v", err) + return + } + }() req := &querypb.ReleaseCollectionRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_ReleaseCollection, @@ -811,12 +891,15 @@ func (c *Core) SetQueryService(s types.QueryService) error { } rsp, err := s.ReleaseCollection(ctx, req) if err != nil { - return err + retErr = err + return } if rsp.ErrorCode != commonpb.ErrorCode_Success { - return fmt.Errorf("ReleaseCollection from query service failed, error = %s", rsp.Reason) + retErr = fmt.Errorf("ReleaseCollection from query service failed, error = %s", rsp.Reason) + return } - return nil + retErr = nil + return } return nil } @@ -851,8 +934,8 @@ func (c *Core) BuildIndex(segID typeutil.UniqueID, field *schemapb.FieldSchema, // Register register master service at etcd func (c *Core) Register() error { - c.session = sessionutil.NewSession(c.ctx, []string{Params.EtcdAddress}) - c.session.Init(typeutil.MasterServiceRole, Params.Address, true) + c.session = sessionutil.NewSession(c.ctx, Params.MetaRootPath, []string{Params.EtcdAddress}) + c.sessCloseCh = c.session.Init(typeutil.MasterServiceRole, Params.Address, true) return nil } @@ -919,10 +1002,17 @@ func (c *Core) Init() error { if initError = c.msFactory.SetParams(m); initError != nil { return } - c.chanTimeTick, initError = newTimeTickSync(c) - if initError != nil { - return - } + c.chanTimeTick = newTimeTickSync(c) + c.proxyClientManager = newProxyClientManager(c) + + c.proxyNodeManager, initError = newProxyNodeManager( + c.ctx, + []string{Params.EtcdAddress}, + c.chanTimeTick.GetProxyNodes, + c.proxyClientManager.GetProxyClients, + ) + c.proxyNodeManager.AddSession(c.chanTimeTick.AddProxyNode, c.proxyClientManager.AddProxyClient) + c.proxyNodeManager.DelSession(c.chanTimeTick.DelProxyNode, c.proxyClientManager.DelProxyClient) c.ddReqQueue = make(chan reqTask, 1024) initError = c.setMsgStreams() @@ -975,6 +1065,18 @@ func (c *Core) reSendDdMsg(ctx context.Context) error { if err = c.SendDdDropCollectionReq(ctx, &ddReq); err != nil { return err } + req := proxypb.InvalidateCollMetaCacheRequest{ + Base: &commonpb.MsgBase{ + MsgType: 0, //TODO, msg type + MsgID: 0, //TODO, msg id + Timestamp: ddReq.Base.Timestamp, + SourceID: c.session.ServerID, + }, + DbName: ddReq.DbName, + CollectionName: ddReq.CollectionName, + } + c.proxyClientManager.InvalidateCollectionMetaCache(c.ctx, &req) + case CreatePartitionDDType: var ddReq = internalpb.CreatePartitionRequest{} if err = proto.UnmarshalText(ddOp.Body, &ddReq); err != nil { @@ -983,6 +1085,17 @@ func (c *Core) reSendDdMsg(ctx context.Context) error { if err = c.SendDdCreatePartitionReq(ctx, &ddReq); err != nil { return err } + req := proxypb.InvalidateCollMetaCacheRequest{ + Base: &commonpb.MsgBase{ + MsgType: 0, //TODO, msg type + MsgID: 0, //TODO, msg id + Timestamp: ddReq.Base.Timestamp, + SourceID: c.session.ServerID, + }, + DbName: ddReq.DbName, + CollectionName: ddReq.CollectionName, + } + c.proxyClientManager.InvalidateCollectionMetaCache(c.ctx, &req) case DropPartitionDDType: var ddReq = internalpb.DropPartitionRequest{} if err = proto.UnmarshalText(ddOp.Body, &ddReq); err != nil { @@ -991,6 +1104,17 @@ func (c *Core) reSendDdMsg(ctx context.Context) error { if err = c.SendDdDropPartitionReq(ctx, &ddReq); err != nil { return err } + req := proxypb.InvalidateCollMetaCacheRequest{ + Base: &commonpb.MsgBase{ + MsgType: 0, //TODO, msg type + MsgID: 0, //TODO, msg id + Timestamp: ddReq.Base.Timestamp, + SourceID: c.session.ServerID, + }, + DbName: ddReq.DbName, + CollectionName: ddReq.CollectionName, + } + c.proxyClientManager.InvalidateCollectionMetaCache(c.ctx, &req) default: return fmt.Errorf("Invalid DdOperation %s", ddOp.Type) } @@ -1009,6 +1133,9 @@ func (c *Core) Start() error { log.Debug("master", zap.String("time tick channel name", Params.TimeTickChannel)) c.startOnce.Do(func() { + if err := c.proxyNodeManager.WatchProxyNode(); err != nil { + return + } if err := c.reSendDdMsg(c.ctx); err != nil { return } @@ -1017,6 +1144,7 @@ func (c *Core) Start() error { go c.startDataServiceSegmentLoop() go c.startDataNodeFlushedSegmentLoop() go c.tsLoop() + go c.sessionLoop() go c.chanTimeTick.StartWatch() c.stateCode.Store(internalpb.StateCode_Healthy) }) @@ -1649,6 +1777,17 @@ func (c *Core) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsReques } func (c *Core) AllocTimestamp(ctx context.Context, in *masterpb.AllocTimestampRequest) (*masterpb.AllocTimestampResponse, error) { + code := c.stateCode.Load().(internalpb.StateCode) + if code != internalpb.StateCode_Healthy { + return &masterpb.AllocTimestampResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]), + }, + Timestamp: 0, + Count: 0, + }, nil + } ts, err := c.TSOAllocator(in.Count) if err != nil { log.Debug("AllocTimestamp failed", zap.Int64("msgID", in.Base.MsgID), zap.Error(err)) @@ -1673,6 +1812,17 @@ func (c *Core) AllocTimestamp(ctx context.Context, in *masterpb.AllocTimestampRe } func (c *Core) AllocID(ctx context.Context, in *masterpb.AllocIDRequest) (*masterpb.AllocIDResponse, error) { + code := c.stateCode.Load().(internalpb.StateCode) + if code != internalpb.StateCode_Healthy { + return &masterpb.AllocIDResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]), + }, + ID: 0, + Count: 0, + }, nil + } start, _, err := c.IDAllocator(in.Count) if err != nil { log.Debug("AllocID failed", zap.Int64("msgID", in.Base.MsgID), zap.Error(err)) @@ -1698,6 +1848,13 @@ func (c *Core) AllocID(ctx context.Context, in *masterpb.AllocIDRequest) (*maste // UpdateChannelTimeTick used to handle ChannelTimeTickMsg func (c *Core) UpdateChannelTimeTick(ctx context.Context, in *internalpb.ChannelTimeTickMsg) (*commonpb.Status, error) { + code := c.stateCode.Load().(internalpb.StateCode) + if code != internalpb.StateCode_Healthy { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]), + }, nil + } status := &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, Reason: "", diff --git a/internal/masterservice/master_service_test.go b/internal/masterservice/master_service_test.go index cea087b755..66b90f21b9 100644 --- a/internal/masterservice/master_service_test.go +++ b/internal/masterservice/master_service_test.go @@ -43,9 +43,7 @@ import ( type proxyMock struct { types.ProxyService - randVal int - collArray []string - mutex sync.Mutex + randVal int } func (p *proxyMock) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) { @@ -56,7 +54,14 @@ func (p *proxyMock) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringRes Value: fmt.Sprintf("proxy-time-tick-%d", p.randVal), }, nil } -func (p *proxyMock) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) { + +type proxyNodeMock struct { + types.ProxyNode + collArray []string + mutex sync.Mutex +} + +func (p *proxyNodeMock) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) { p.mutex.Lock() defer p.mutex.Unlock() p.collArray = append(p.collArray, request.CollectionName) @@ -64,7 +69,7 @@ func (p *proxyMock) InvalidateCollectionMetaCache(ctx context.Context, request * ErrorCode: commonpb.ErrorCode_Success, }, nil } -func (p *proxyMock) GetCollArray() []string { +func (p *proxyNodeMock) GetCollArray() []string { p.mutex.Lock() defer p.mutex.Unlock() ret := make([]string, 0, len(p.collArray)) @@ -222,20 +227,36 @@ func TestMasterService(t *testing.T) { etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}, DialTimeout: 5 * time.Second}) assert.Nil(t, err) - _, err = etcdCli.Delete(ctx, sessionutil.DefaultServiceRoot, clientv3.WithPrefix()) + sessKey := path.Join(Params.MetaRootPath, sessionutil.DefaultServiceRoot) + _, err = etcdCli.Delete(ctx, sessKey, clientv3.WithPrefix()) assert.Nil(t, err) defer func() { - _, _ = etcdCli.Delete(ctx, sessionutil.DefaultServiceRoot, clientv3.WithPrefix()) + _, _ = etcdCli.Delete(ctx, sessKey, clientv3.WithPrefix()) }() + pnb, err := json.Marshal( + &sessionutil.Session{ + ServerID: 100, + }, + ) + assert.Nil(t, err) + _, err = etcdCli.Put(ctx, path.Join(sessKey, typeutil.ProxyNodeRole+"-100"), string(pnb)) + assert.Nil(t, err) + pm := &proxyMock{ - randVal: randVal, - collArray: make([]string, 0, 16), - mutex: sync.Mutex{}, + randVal: randVal, } err = core.SetProxyService(ctx, pm) assert.Nil(t, err) + pnm := &proxyNodeMock{ + collArray: make([]string, 0, 16), + mutex: sync.Mutex{}, + } + core.NewProxyClient = func(*sessionutil.Session) (types.ProxyNode, error) { + return pnm, nil + } + dm := &dataMock{randVal: randVal} err = core.SetDataService(ctx, dm) assert.Nil(t, err) @@ -571,8 +592,8 @@ func TestMasterService(t *testing.T) { assert.Equal(t, collMeta.ID, partMsg.CollectionID) assert.Equal(t, partMeta.PartitionID, partMsg.PartitionID) - assert.Equal(t, 1, len(pm.GetCollArray())) - assert.Equal(t, collName, pm.GetCollArray()[0]) + assert.Equal(t, 1, len(pnm.GetCollArray())) + assert.Equal(t, collName, pnm.GetCollArray()[0]) // check DD operation info flag, err := core.MetaTable.client.Load(DDMsgSendPrefix, 0) @@ -976,8 +997,8 @@ func TestMasterService(t *testing.T) { assert.Equal(t, collMeta.ID, dmsg.CollectionID) assert.Equal(t, dropPartID, dmsg.PartitionID) - assert.Equal(t, 2, len(pm.GetCollArray())) - assert.Equal(t, collName, pm.GetCollArray()[1]) + assert.Equal(t, 2, len(pnm.GetCollArray())) + assert.Equal(t, collName, pnm.GetCollArray()[1]) // check DD operation info flag, err := core.MetaTable.client.Load(DDMsgSendPrefix, 0) @@ -1024,7 +1045,7 @@ func TestMasterService(t *testing.T) { dmsg, ok := (msg.Msgs[0]).(*msgstream.DropCollectionMsg) assert.True(t, ok) assert.Equal(t, collMeta.ID, dmsg.CollectionID) - collArray := pm.GetCollArray() + collArray := pnm.GetCollArray() assert.Equal(t, 3, len(collArray)) assert.Equal(t, collName, collArray[2]) @@ -1049,7 +1070,7 @@ func TestMasterService(t *testing.T) { assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode) time.Sleep(time.Second) assert.Zero(t, len(ddStream.Chan())) - collArray = pm.GetCollArray() + collArray = pnm.GetCollArray() assert.Equal(t, 3, len(collArray)) assert.Equal(t, collName, collArray[2]) @@ -1454,9 +1475,9 @@ func TestMasterService(t *testing.T) { s2, err := json.Marshal(&p2) assert.Nil(t, err) - _, err = core.etcdCli.Put(ctx2, path.Join(sessionutil.DefaultServiceRoot, typeutil.ProxyNodeRole)+"-1", string(s1)) + _, err = core.etcdCli.Put(ctx2, path.Join(sessKey, typeutil.ProxyNodeRole)+"-1", string(s1)) assert.Nil(t, err) - _, err = core.etcdCli.Put(ctx2, path.Join(sessionutil.DefaultServiceRoot, typeutil.ProxyNodeRole)+"-2", string(s2)) + _, err = core.etcdCli.Put(ctx2, path.Join(sessKey, typeutil.ProxyNodeRole)+"-2", string(s2)) assert.Nil(t, err) time.Sleep(time.Second) @@ -1726,9 +1747,7 @@ func TestMasterService2(t *testing.T) { Params.MsgChannelSubName = fmt.Sprintf("subname-%d", randVal) pm := &proxyMock{ - randVal: randVal, - collArray: make([]string, 0, 16), - mutex: sync.Mutex{}, + randVal: randVal, } err = core.SetProxyService(ctx, pm) assert.Nil(t, err) @@ -1754,6 +1773,10 @@ func TestMasterService2(t *testing.T) { err = core.SetQueryService(qm) assert.Nil(t, err) + core.NewProxyClient = func(*sessionutil.Session) (types.ProxyNode, error) { + return nil, nil + } + err = core.Init() assert.Nil(t, err) @@ -1949,8 +1972,8 @@ func TestCheckInit(t *testing.T) { err = c.checkInit() assert.NotNil(t, err) - c.CallInvalidateCollectionMetaCacheService = func(ctx context.Context, ts typeutil.Timestamp, dbName, collectionName string) error { - return nil + c.NewProxyClient = func(*sessionutil.Session) (types.ProxyNode, error) { + return nil, nil } err = c.checkInit() assert.NotNil(t, err) diff --git a/internal/masterservice/proxy_client_manager.go b/internal/masterservice/proxy_client_manager.go new file mode 100644 index 0000000000..0a66799387 --- /dev/null +++ b/internal/masterservice/proxy_client_manager.go @@ -0,0 +1,113 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package masterservice + +import ( + "context" + "fmt" + "sync" + + "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/proto/proxypb" + "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/internal/util/sessionutil" + "go.uber.org/zap" +) + +type proxyClientManager struct { + core *Core + lock sync.Mutex + proxyClient map[int64]types.ProxyNode +} + +func newProxyClientManager(c *Core) *proxyClientManager { + return &proxyClientManager{ + core: c, + lock: sync.Mutex{}, + proxyClient: make(map[int64]types.ProxyNode), + } +} + +func (p *proxyClientManager) GetProxyClients(sess []*sessionutil.Session) { + p.lock.Lock() + defer p.lock.Unlock() + for _, s := range sess { + if _, ok := p.proxyClient[s.ServerID]; ok { + continue + } + pc, err := p.core.NewProxyClient(s) + if err != nil { + log.Debug("create proxy client failed", zap.String("proxy address", s.Address), zap.Int64("proxy id", s.ServerID), zap.Error(err)) + continue + } + p.proxyClient[s.ServerID] = pc + log.Debug("create proxy client", zap.String("proxy address", s.Address), zap.Int64("proxy id", s.ServerID)) + } +} + +func (p *proxyClientManager) AddProxyClient(s *sessionutil.Session) { + p.lock.Lock() + defer p.lock.Unlock() + if _, ok := p.proxyClient[s.ServerID]; ok { + return + } + pc, err := p.core.NewProxyClient(s) + if err != nil { + log.Debug("create proxy client", zap.String("proxy address", s.Address), zap.Int64("proxy id", s.ServerID), zap.Error(err)) + return + } + p.proxyClient[s.ServerID] = pc + log.Debug("create proxy client", zap.String("proxy address", s.Address), zap.Int64("proxy id", s.ServerID)) +} + +func (p *proxyClientManager) DelProxyClient(s *sessionutil.Session) { + p.lock.Lock() + defer p.lock.Unlock() + delete(p.proxyClient, s.ServerID) + log.Debug("remove proxy client", zap.String("proxy address", s.Address), zap.Int64("proxy id", s.ServerID)) +} + +func (p *proxyClientManager) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) { + p.lock.Lock() + defer p.lock.Unlock() + + if len(p.proxyClient) == 0 { + log.Debug("proxy client is empty,InvalidateCollectionMetaCache will not send to any client") + return + } + + for k, f := range p.proxyClient { + err := func() error { + defer func() { + if err := recover(); err != nil { + log.Debug("call InvalidateCollectionMetaCache panic", zap.Int64("proxy id", k), zap.Any("msg", err)) + } + + }() + sta, err := f.InvalidateCollectionMetaCache(ctx, request) + if err != nil { + return fmt.Errorf("grpc fail,error=%w", err) + } + if sta.ErrorCode != commonpb.ErrorCode_Success { + return fmt.Errorf("message = %s", sta.Reason) + } + return nil + }() + if err != nil { + log.Error("call invalidate collection meta failed", zap.Int64("proxy id", k), zap.Error(err)) + } else { + log.Debug("send invalidate collection meta cache to proxy node", zap.Int64("node id", k)) + } + + } +} diff --git a/internal/masterservice/proxy_node_manager.go b/internal/masterservice/proxy_node_manager.go new file mode 100644 index 0000000000..cdd1fc9d5e --- /dev/null +++ b/internal/masterservice/proxy_node_manager.go @@ -0,0 +1,153 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package masterservice + +import ( + "context" + "encoding/json" + "fmt" + "path" + "sync" + + "github.com/coreos/etcd/mvcc/mvccpb" + "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/util/sessionutil" + "github.com/milvus-io/milvus/internal/util/typeutil" + "go.etcd.io/etcd/clientv3" + "go.uber.org/zap" +) + +type proxyNodeManager struct { + ctx context.Context + cancel context.CancelFunc + lock sync.Mutex + etcdCli *clientv3.Client + getSessions []func([]*sessionutil.Session) + addSessions []func(*sessionutil.Session) + delSessions []func(*sessionutil.Session) +} + +func newProxyNodeManager(ctx context.Context, etcdAddr []string, fns ...func([]*sessionutil.Session)) (*proxyNodeManager, error) { + cli, err := clientv3.New(clientv3.Config{Endpoints: etcdAddr}) + if err != nil { + return nil, err + } + ctx2, cancel2 := context.WithCancel(ctx) + p := &proxyNodeManager{ + ctx: ctx2, + cancel: cancel2, + lock: sync.Mutex{}, + etcdCli: cli, + } + p.getSessions = append(p.getSessions, fns...) + return p, nil +} + +func (p *proxyNodeManager) AddSession(fns ...func(*sessionutil.Session)) { + p.lock.Lock() + defer p.lock.Unlock() + p.addSessions = append(p.addSessions, fns...) +} + +func (p *proxyNodeManager) DelSession(fns ...func(*sessionutil.Session)) { + p.lock.Lock() + defer p.lock.Unlock() + p.delSessions = append(p.delSessions, fns...) +} + +func (p *proxyNodeManager) WatchProxyNode() error { + ctx2, cancel := context.WithTimeout(p.ctx, RequestTimeout) + defer cancel() + resp, err := p.etcdCli.Get( + ctx2, + path.Join(Params.MetaRootPath, sessionutil.DefaultServiceRoot, typeutil.ProxyNodeRole), + clientv3.WithPrefix(), + clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend), + ) + if err != nil { + return fmt.Errorf("proxyNodeManager,watch proxy node failed, error = %w", err) + } + sessions := []*sessionutil.Session{} + for _, v := range resp.Kvs { + sess := new(sessionutil.Session) + err := json.Unmarshal(v.Value, sess) + if err != nil { + log.Debug("unmarshal SvrSession failed", zap.Error(err)) + continue + } + sessions = append(sessions, sess) + } + for _, f := range p.getSessions { + f(sessions) + } + for _, s := range sessions { + log.Debug("Get proxy node", zap.Int64("node id", s.ServerID), zap.String("node addr", s.Address), zap.String("node name", s.ServerName)) + } + + rch := p.etcdCli.Watch( + p.ctx, + path.Join(Params.MetaRootPath, sessionutil.DefaultServiceRoot, typeutil.ProxyNodeRole), + clientv3.WithPrefix(), + clientv3.WithCreatedNotify(), + clientv3.WithPrevKV(), + clientv3.WithRev(resp.Header.Revision+1), + ) + + go func() { + for { + select { + case <-p.ctx.Done(): + log.Debug("context done", zap.Error(p.ctx.Err())) + return + case wresp, ok := <-rch: + if !ok { + log.Debug("watch proxy node failed") + return + } + for _, ev := range wresp.Events { + switch ev.Type { + case mvccpb.PUT: + sess := new(sessionutil.Session) + err := json.Unmarshal(ev.Kv.Value, sess) + if err != nil { + log.Debug("watch proxy node, unmarshal failed", zap.Error(err)) + continue + } + p.lock.Lock() + for _, f := range p.addSessions { + f(sess) + } + p.lock.Unlock() + case mvccpb.DELETE: + sess := new(sessionutil.Session) + err := json.Unmarshal(ev.PrevKv.Value, sess) + if err != nil { + log.Debug("watch proxy node, unmarshal failed", zap.Error(err)) + continue + } + p.lock.Lock() + for _, f := range p.delSessions { + f(sess) + } + p.lock.Unlock() + } + } + } + } + }() + + return nil +} + +func (p *proxyNodeManager) Stop() { + p.cancel() +} diff --git a/internal/masterservice/proxy_node_manager_test.go b/internal/masterservice/proxy_node_manager_test.go new file mode 100644 index 0000000000..237fbe2e68 --- /dev/null +++ b/internal/masterservice/proxy_node_manager_test.go @@ -0,0 +1,94 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package masterservice + +import ( + "context" + "encoding/json" + "path" + "testing" + "time" + + "github.com/milvus-io/milvus/internal/util/sessionutil" + "github.com/milvus-io/milvus/internal/util/typeutil" + "github.com/stretchr/testify/assert" + "go.etcd.io/etcd/clientv3" +) + +func TestProxyNodeManager(t *testing.T) { + Params.Init() + cli, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}}) + assert.Nil(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + sessKey := path.Join(Params.MetaRootPath, sessionutil.DefaultServiceRoot) + cli.Delete(ctx, sessKey, clientv3.WithPrefix()) + defer cli.Delete(ctx, sessKey, clientv3.WithPrefix()) + s1 := sessionutil.Session{ + ServerID: 100, + } + b1, err := json.Marshal(&s1) + assert.Nil(t, err) + k1 := path.Join(sessKey, typeutil.ProxyNodeRole+"-100") + _, err = cli.Put(ctx, k1, string(b1)) + assert.Nil(t, err) + + s0 := sessionutil.Session{ + ServerID: 99, + } + b0, err := json.Marshal(&s0) + assert.Nil(t, err) + k0 := path.Join(sessKey, typeutil.ProxyNodeRole+"-99") + _, err = cli.Put(ctx, k0, string(b0)) + assert.Nil(t, err) + + f1 := func(sess []*sessionutil.Session) { + assert.Equal(t, len(sess), 2) + assert.Equal(t, int64(100), sess[0].ServerID) + assert.Equal(t, int64(99), sess[1].ServerID) + t.Log("get sessions", sess[0], sess[1]) + } + + pm, err := newProxyNodeManager(ctx, []string{Params.EtcdAddress}, f1) + assert.Nil(t, err) + fa := func(sess *sessionutil.Session) { + assert.Equal(t, int64(101), sess.ServerID) + t.Log("add session", sess) + } + fd := func(sess *sessionutil.Session) { + assert.Equal(t, int64(100), sess.ServerID) + t.Log("del session", sess) + } + pm.AddSession(fa) + pm.DelSession(fd) + + err = pm.WatchProxyNode() + assert.Nil(t, err) + t.Log("======== start watch proxy node ==========") + + s2 := sessionutil.Session{ + ServerID: 101, + } + b2, err := json.Marshal(&s2) + assert.Nil(t, err) + k2 := path.Join(sessKey, typeutil.ProxyNodeRole+"-101") + _, err = cli.Put(ctx, k2, string(b2)) + assert.Nil(t, err) + + _, err = cli.Delete(ctx, k1) + assert.Nil(t, err) + time.Sleep(time.Second) + pm.Stop() + time.Sleep(time.Second) +} diff --git a/internal/masterservice/task.go b/internal/masterservice/task.go index cf151c7452..7fac2126f3 100644 --- a/internal/masterservice/task.go +++ b/internal/masterservice/task.go @@ -21,6 +21,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/milvuspb" + "github.com/milvus-io/milvus/internal/proto/proxypb" "github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/util/typeutil" "go.uber.org/zap" @@ -310,13 +311,23 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error { //notify query service to release collection go func() { - if err = t.core.CallReleaseCollectionService(t.core.ctx, t.Req.Base.Timestamp, 0, collMeta.ID); err != nil { + if err = t.core.CallReleaseCollectionService(t.core.ctx, ts, 0, collMeta.ID); err != nil { log.Warn("CallReleaseCollectionService failed", zap.String("error", err.Error())) } }() + req := proxypb.InvalidateCollMetaCacheRequest{ + Base: &commonpb.MsgBase{ + MsgType: 0, //TODO, msg type + MsgID: 0, //TODO, msg id + Timestamp: ts, + SourceID: t.core.session.ServerID, + }, + DbName: t.Req.DbName, + CollectionName: t.Req.CollectionName, + } // error doesn't matter here - t.core.CallInvalidateCollectionMetaCacheService(ctx, t.Req.Base.Timestamp, t.Req.DbName, t.Req.CollectionName) + t.core.proxyClientManager.InvalidateCollectionMetaCache(ctx, &req) // Update DDOperation in etcd return t.core.setDdMsgSendFlag(true) @@ -482,8 +493,18 @@ func (t *CreatePartitionReqTask) Execute(ctx context.Context) error { t.core.SendTimeTick(ts) } + req := proxypb.InvalidateCollMetaCacheRequest{ + Base: &commonpb.MsgBase{ + MsgType: 0, //TODO, msg type + MsgID: 0, //TODO, msg id + Timestamp: ts, + SourceID: t.core.session.ServerID, + }, + DbName: t.Req.DbName, + CollectionName: t.Req.CollectionName, + } // error doesn't matter here - t.core.CallInvalidateCollectionMetaCacheService(ctx, t.Req.Base.Timestamp, t.Req.DbName, t.Req.CollectionName) + t.core.proxyClientManager.InvalidateCollectionMetaCache(ctx, &req) // Update DDOperation in etcd return t.core.setDdMsgSendFlag(true) @@ -548,8 +569,18 @@ func (t *DropPartitionReqTask) Execute(ctx context.Context) error { t.core.SendTimeTick(ts) } + req := proxypb.InvalidateCollMetaCacheRequest{ + Base: &commonpb.MsgBase{ + MsgType: 0, //TODO, msg type + MsgID: 0, //TODO, msg id + Timestamp: ts, + SourceID: t.core.session.ServerID, + }, + DbName: t.Req.DbName, + CollectionName: t.Req.CollectionName, + } // error doesn't matter here - t.core.CallInvalidateCollectionMetaCacheService(ctx, t.Req.Base.Timestamp, t.Req.DbName, t.Req.CollectionName) + t.core.proxyClientManager.InvalidateCollectionMetaCache(ctx, &req) // Update DDOperation in etcd return t.core.setDdMsgSendFlag(true) diff --git a/internal/masterservice/timeticksync.go b/internal/masterservice/timeticksync.go index 23bfdc8784..4100bb6a64 100644 --- a/internal/masterservice/timeticksync.go +++ b/internal/masterservice/timeticksync.go @@ -12,20 +12,15 @@ package masterservice import ( - "context" - "encoding/json" "fmt" - "path" "sync" - "github.com/coreos/etcd/mvcc/mvccpb" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/typeutil" - "go.etcd.io/etcd/clientv3" "go.uber.org/zap" ) @@ -37,35 +32,14 @@ type timetickSync struct { sendChan chan map[typeutil.UniqueID]*internalpb.ChannelTimeTickMsg } -func newTimeTickSync(core *Core) (*timetickSync, error) { - tss := timetickSync{ +func newTimeTickSync(core *Core) *timetickSync { + return &timetickSync{ lock: sync.Mutex{}, core: core, proxyTimeTick: make(map[typeutil.UniqueID]*internalpb.ChannelTimeTickMsg), chanStream: make(map[string]msgstream.MsgStream), sendChan: make(chan map[typeutil.UniqueID]*internalpb.ChannelTimeTickMsg, 16), } - - ctx2, cancel := context.WithTimeout(core.ctx, RequestTimeout) - defer cancel() - - resp, err := core.etcdCli.Get(ctx2, ProxyMetaPrefix, clientv3.WithPrefix()) - if err != nil { - return nil, err - } - for _, v := range resp.Kvs { - var sess sessionutil.Session - err := json.Unmarshal(v.Value, &sess) - if err != nil { - log.Debug("unmarshal SvrSession failed", zap.Error(err)) - continue - } - if _, ok := tss.proxyTimeTick[sess.ServerID]; !ok { - tss.proxyTimeTick[sess.ServerID] = nil - } - } - - return &tss, nil } // sendToChannel send all channels' timetick to sendChan @@ -101,50 +75,36 @@ func (t *timetickSync) UpdateTimeTick(in *internalpb.ChannelTimeTickMsg) error { return nil } +func (t *timetickSync) AddProxyNode(sess *sessionutil.Session) { + t.lock.Lock() + defer t.lock.Unlock() + t.proxyTimeTick[sess.ServerID] = nil +} + +func (t *timetickSync) DelProxyNode(sess *sessionutil.Session) { + t.lock.Lock() + defer t.lock.Unlock() + if _, ok := t.proxyTimeTick[sess.ServerID]; ok { + delete(t.proxyTimeTick, sess.ServerID) + t.sendToChannel() + } +} + +func (t *timetickSync) GetProxyNodes(sess []*sessionutil.Session) { + t.lock.Lock() + defer t.lock.Unlock() + for _, s := range sess { + t.proxyTimeTick[s.ServerID] = nil + } +} + // StartWatch watch proxy node change and process all channels' timetick msg func (t *timetickSync) StartWatch() { - proxyNodePrefix := path.Join(sessionutil.DefaultServiceRoot, typeutil.ProxyNodeRole) - rch := t.core.etcdCli.Watch(t.core.ctx, proxyNodePrefix, clientv3.WithPrefix(), clientv3.WithCreatedNotify()) for { select { case <-t.core.ctx.Done(): log.Debug("master service context done", zap.Error(t.core.ctx.Err())) return - case wresp, ok := <-rch: - if !ok { - log.Debug("time tick sync watch etcd failed") - } - for _, ev := range wresp.Events { - switch ev.Type { - case mvccpb.PUT: - var sess sessionutil.Session - err := json.Unmarshal(ev.Kv.Value, &sess) - if err != nil { - log.Debug("watch proxy node, unmarshal failed", zap.Error(err)) - continue - } - func() { - t.lock.Lock() - defer t.lock.Unlock() - t.proxyTimeTick[sess.ServerID] = nil - }() - case mvccpb.DELETE: - var sess sessionutil.Session - err := json.Unmarshal(ev.PrevKv.Value, &sess) - if err != nil { - log.Debug("watch proxy node, unmarshal failed", zap.Error(err)) - continue - } - func() { - t.lock.Lock() - defer t.lock.Unlock() - if _, ok := t.proxyTimeTick[sess.ServerID]; ok { - delete(t.proxyTimeTick, sess.ServerID) - t.sendToChannel() - } - }() - } - } case ptt, ok := <-t.sendChan: if !ok { log.Debug("timetickSync sendChan closed") diff --git a/internal/proxynode/proxy_node.go b/internal/proxynode/proxy_node.go index 9f2c24d345..ab8e190f92 100644 --- a/internal/proxynode/proxy_node.go +++ b/internal/proxynode/proxy_node.go @@ -87,7 +87,7 @@ func NewProxyNode(ctx context.Context, factory msgstream.Factory) (*ProxyNode, e // Register register proxy node at etcd func (node *ProxyNode) Register() error { - node.session = sessionutil.NewSession(node.ctx, []string{Params.EtcdAddress}) + node.session = sessionutil.NewSession(node.ctx, Params.MetaRootPath, []string{Params.EtcdAddress}) node.session.Init(typeutil.ProxyNodeRole, Params.NetworkAddress, false) Params.ProxyID = node.session.ServerID return nil @@ -181,7 +181,7 @@ func (node *ProxyNode) Init() error { log.Debug("create query message stream ...") masterAddr := Params.MasterAddress - idAllocator, err := allocator.NewIDAllocator(node.ctx, masterAddr, []string{Params.EtcdAddress}) + idAllocator, err := allocator.NewIDAllocator(node.ctx, masterAddr, Params.MetaRootPath, []string{Params.EtcdAddress}) if err != nil { return err diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index c883527fd3..da8ea14051 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -120,7 +120,7 @@ func NewQueryNodeWithoutID(ctx context.Context, factory msgstream.Factory) *Quer // Register register query node at etcd func (node *QueryNode) Register() error { - node.session = sessionutil.NewSession(node.queryNodeLoopCtx, []string{Params.EtcdAddress}) + node.session = sessionutil.NewSession(node.queryNodeLoopCtx, Params.MetaRootPath, []string{Params.EtcdAddress}) node.session.Init(typeutil.QueryNodeRole, Params.QueryNodeIP+":"+strconv.FormatInt(Params.QueryNodePort, 10), false) Params.QueryNodeID = node.session.ServerID return nil diff --git a/internal/queryservice/queryservice.go b/internal/queryservice/queryservice.go index 1ae0dd3200..bcbd03e5cb 100644 --- a/internal/queryservice/queryservice.go +++ b/internal/queryservice/queryservice.go @@ -58,7 +58,7 @@ type QueryService struct { // Register register query service at etcd func (qs *QueryService) Register() error { - qs.session = sessionutil.NewSession(qs.loopCtx, []string{Params.EtcdAddress}) + qs.session = sessionutil.NewSession(qs.loopCtx, Params.MetaRootPath, []string{Params.EtcdAddress}) qs.session.Init(typeutil.QueryServiceRole, Params.Address, true) Params.NodeID = uint64(qs.session.ServerID) return nil diff --git a/internal/types/types.go b/internal/types/types.go index 835d17c621..9d74e1a5d9 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -22,6 +22,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/proto/proxypb" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/util/sessionutil" ) type TimeTickProvider interface { @@ -123,6 +124,7 @@ type MasterComponent interface { SetDataService(context.Context, DataService) error SetIndexService(IndexService) error SetQueryService(QueryService) error + SetNewProxyClient(func(sess *sessionutil.Session) (ProxyNode, error)) } type ProxyNode interface { diff --git a/internal/util/sessionutil/session_util.go b/internal/util/sessionutil/session_util.go index 1897428ec5..d825af27b4 100644 --- a/internal/util/sessionutil/session_util.go +++ b/internal/util/sessionutil/session_util.go @@ -17,7 +17,7 @@ import ( ) const ( - DefaultServiceRoot = "/session/" + DefaultServiceRoot = "session/" DefaultIDKey = "id" DefaultRetryTimes = 30 DefaultTTL = 10 @@ -41,9 +41,10 @@ type Session struct { Address string `json:"Address,omitempty"` Exclusive bool `json:"Exclusive,omitempty"` - etcdCli *clientv3.Client - leaseID clientv3.LeaseID - cancel context.CancelFunc + etcdCli *clientv3.Client + leaseID clientv3.LeaseID + cancel context.CancelFunc + metaRoot string } type SessionEvent struct { @@ -54,11 +55,12 @@ type SessionEvent struct { // NewSession is a helper to build Session object. // ServerID and LeaseID will be assigned after registeration. // etcdCli is initialized when NewSession -func NewSession(ctx context.Context, etcdAddress []string) *Session { +func NewSession(ctx context.Context, metaRoot string, etcdAddress []string) *Session { ctx, cancel := context.WithCancel(ctx) session := &Session{ - ctx: ctx, - cancel: cancel, + ctx: ctx, + cancel: cancel, + metaRoot: metaRoot, } connectEtcdFn := func() error { @@ -104,17 +106,17 @@ func (s *Session) getServerID() (int64, error) { func (s *Session) checkIDExist() { s.etcdCli.Txn(s.ctx).If( clientv3.Compare( - clientv3.Version(path.Join(DefaultServiceRoot, DefaultIDKey)), + clientv3.Version(path.Join(s.metaRoot, DefaultServiceRoot, DefaultIDKey)), "=", 0)). - Then(clientv3.OpPut(path.Join(DefaultServiceRoot, DefaultIDKey), "1")).Commit() + Then(clientv3.OpPut(path.Join(s.metaRoot, DefaultServiceRoot, DefaultIDKey), "1")).Commit() } func (s *Session) getServerIDWithKey(key string, retryTimes int) (int64, error) { res := int64(0) getServerIDWithKeyFn := func() error { - getResp, err := s.etcdCli.Get(s.ctx, path.Join(DefaultServiceRoot, key)) + getResp, err := s.etcdCli.Get(s.ctx, path.Join(s.metaRoot, DefaultServiceRoot, key)) if err != nil { return nil } @@ -129,10 +131,10 @@ func (s *Session) getServerIDWithKey(key string, retryTimes int) (int64, error) } txnResp, err := s.etcdCli.Txn(s.ctx).If( clientv3.Compare( - clientv3.Value(path.Join(DefaultServiceRoot, DefaultIDKey)), + clientv3.Value(path.Join(s.metaRoot, DefaultServiceRoot, DefaultIDKey)), "=", value)). - Then(clientv3.OpPut(path.Join(DefaultServiceRoot, DefaultIDKey), strconv.FormatInt(valueInt+1, 10))).Commit() + Then(clientv3.OpPut(path.Join(s.metaRoot, DefaultServiceRoot, DefaultIDKey), strconv.FormatInt(valueInt+1, 10))).Commit() if err != nil { return err } @@ -182,10 +184,10 @@ func (s *Session) registerService() (<-chan *clientv3.LeaseKeepAliveResponse, er } txnResp, err := s.etcdCli.Txn(s.ctx).If( clientv3.Compare( - clientv3.Version(path.Join(DefaultServiceRoot, key)), + clientv3.Version(path.Join(s.metaRoot, DefaultServiceRoot, key)), "=", 0)). - Then(clientv3.OpPut(path.Join(DefaultServiceRoot, key), string(sessionJSON), clientv3.WithLease(resp.ID))).Commit() + Then(clientv3.OpPut(path.Join(s.metaRoot, DefaultServiceRoot, key), string(sessionJSON), clientv3.WithLease(resp.ID))).Commit() if err != nil { fmt.Printf("compare and swap error %s\n. maybe the key has registered", err) @@ -239,7 +241,7 @@ func (s *Session) processKeepAliveResponse(ch <-chan *clientv3.LeaseKeepAliveRes // GetSessions will get all sessions registered in etcd. func (s *Session) GetSessions(prefix string) (map[string]*Session, int64, error) { res := make(map[string]*Session) - key := path.Join(DefaultServiceRoot, prefix) + key := path.Join(s.metaRoot, DefaultServiceRoot, prefix) resp, err := s.etcdCli.Get(s.ctx, key, clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)) if err != nil { @@ -263,7 +265,7 @@ func (s *Session) GetSessions(prefix string) (map[string]*Session, int64, error) // If a server is offline, it will be add to delChannel. func (s *Session) WatchServices(prefix string, revision int64) (eventChannel <-chan *SessionEvent) { eventCh := make(chan *SessionEvent, 100) - rch := s.etcdCli.Watch(s.ctx, path.Join(DefaultServiceRoot, prefix), clientv3.WithPrefix(), clientv3.WithPrevKV(), clientv3.WithRev(revision)) + rch := s.etcdCli.Watch(s.ctx, path.Join(s.metaRoot, DefaultServiceRoot, prefix), clientv3.WithPrefix(), clientv3.WithPrevKV(), clientv3.WithRev(revision)) go func() { for { select { diff --git a/internal/util/sessionutil/session_util_test.go b/internal/util/sessionutil/session_util_test.go index 346c4e1371..ec4be8c06e 100644 --- a/internal/util/sessionutil/session_util_test.go +++ b/internal/util/sessionutil/session_util_test.go @@ -1,6 +1,8 @@ package sessionutil import ( + "fmt" + "math/rand" "strconv" "sync" "testing" @@ -20,6 +22,7 @@ func TestGetServerIDConcurrently(t *testing.T) { Params.Init() etcdAddr, err := Params.Load("_EtcdAddress") + metaRoot := fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot) if err != nil { panic(err) } @@ -27,7 +30,7 @@ func TestGetServerIDConcurrently(t *testing.T) { cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) assert.Nil(t, err) etcdKV := etcdkv.NewEtcdKV(cli, "") - _, err = cli.Delete(ctx, DefaultServiceRoot, clientv3.WithPrefix()) + _, err = cli.Delete(ctx, metaRoot, clientv3.WithPrefix()) assert.Nil(t, err) defer etcdKV.Close() @@ -36,7 +39,7 @@ func TestGetServerIDConcurrently(t *testing.T) { var wg sync.WaitGroup var muList sync.Mutex = sync.Mutex{} - s := NewSession(ctx, []string{etcdAddr}) + s := NewSession(ctx, metaRoot, []string{etcdAddr}) res := make([]int64, 0) getIDFunc := func() { @@ -72,13 +75,14 @@ func TestInit(t *testing.T) { cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) assert.Nil(t, err) etcdKV := etcdkv.NewEtcdKV(cli, "") - _, err = cli.Delete(ctx, DefaultServiceRoot, clientv3.WithPrefix()) + metaRoot := fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot) + _, err = cli.Delete(ctx, metaRoot, clientv3.WithPrefix()) assert.Nil(t, err) defer etcdKV.Close() defer etcdKV.RemoveWithPrefix("") - s := NewSession(ctx, []string{etcdAddr}) + s := NewSession(ctx, metaRoot, []string{etcdAddr}) s.Init("inittest", "testAddr", false) assert.NotEqual(t, int64(0), s.leaseID) assert.NotEqual(t, int64(0), s.ServerID) @@ -99,7 +103,8 @@ func TestUpdateSessions(t *testing.T) { cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) assert.Nil(t, err) etcdKV := etcdkv.NewEtcdKV(cli, "") - _, err = cli.Delete(ctx, DefaultServiceRoot, clientv3.WithPrefix()) + metaRoot := fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot) + _, err = cli.Delete(ctx, metaRoot, clientv3.WithPrefix()) assert.Nil(t, err) defer etcdKV.Close() @@ -108,7 +113,7 @@ func TestUpdateSessions(t *testing.T) { var wg sync.WaitGroup var muList sync.Mutex = sync.Mutex{} - s := NewSession(ctx, []string{etcdAddr}) + s := NewSession(ctx, metaRoot, []string{etcdAddr}) sessions, rev, err := s.GetSessions("test") assert.Nil(t, err) @@ -118,7 +123,7 @@ func TestUpdateSessions(t *testing.T) { sList := []*Session{} getIDFunc := func() { - singleS := NewSession(ctx, []string{etcdAddr}) + singleS := NewSession(ctx, metaRoot, []string{etcdAddr}) singleS.Init("test", "testAddr", false) muList.Lock() sList = append(sList, singleS) @@ -139,7 +144,7 @@ func TestUpdateSessions(t *testing.T) { notExistSessions, _, _ := s.GetSessions("testt") assert.Equal(t, len(notExistSessions), 0) - etcdKV.RemoveWithPrefix(DefaultServiceRoot) + etcdKV.RemoveWithPrefix(metaRoot) assert.Eventually(t, func() bool { sessions, _, _ := s.GetSessions("test") return len(sessions) == 0