From dd736ee8bad89a618e6d73951ebd99a3bd14ccaf Mon Sep 17 00:00:00 2001 From: godchen Date: Mon, 24 May 2021 11:53:07 +0800 Subject: [PATCH] Change session doc (#5355) Change session doc Signed-off-by: godchen --- .../appendix_a_basic_components.md | 84 ++++++------------- internal/util/sessionutil/session_util.go | 33 ++++---- 2 files changed, 44 insertions(+), 73 deletions(-) diff --git a/docs/developer_guides/appendix_a_basic_components.md b/docs/developer_guides/appendix_a_basic_components.md index 87278ce212..7de2f6c7e1 100644 --- a/docs/developer_guides/appendix_a_basic_components.md +++ b/docs/developer_guides/appendix_a_basic_components.md @@ -64,7 +64,7 @@ The ID is stored in a key-value pair on etcd. The key is metaRootPath + "/servic * The service creates a lease with etcd and stores a key-value pair in etcd. If the lease expires or the service goes offline, etcd will delete the key-value pair. You can judge whether this service is avaliable through the key. -* key: metaRootPath + "/services" + "/ServerName(-ServerID)(optional)" +* key: "/session" + "/ServerName(-ServerID)(optional)" * value: json format @@ -73,7 +73,7 @@ The ID is stored in a key-value pair on etcd. The key is metaRootPath + "/servic "ServerID": "ServerID", "ServerName": "ServerName", "Address": "ip:port", - "LeaseID": "LeaseID", + "Exclusive": "Exclusive", } ``` @@ -85,78 +85,48 @@ The ID is stored in a key-value pair on etcd. The key is metaRootPath + "/servic * All currently available services can be obtained by obtaining all the key-value pairs deposited during registration. If you want to get all the available nodes for a certain type of service, you can pass in the prefix of the corresponding key -* Registeration time can be compared with ServerID for ServerID will increase according to time. +* Registration time can be compared with ServerID for ServerID will increase according to time. ###### Interface ```go -const defaultIDKey = "services/id" -const defaultRetryTimes = 30 +const DefaultServiceRoot = "/session/" +const DefaultIDKey = "id" +const DefaultRetryTimes = 30 +const DefaultTTL = 10 // Session is a struct to store service's session, including ServerID, ServerName, // Address. // LeaseID will be assigned after registered in etcd. type Session struct { - ServerID int64 - ServerName string - Address string - LeaseID clientv3.LeaseID -} + ctx context.Context + ServerID int64 `json:"ServerID,omitempty"` + ServerName string `json:"ServerName,omitempty"` + Address string `json:"Address,omitempty"` + Exclusive bool `json:"Exclusive,omitempty"` -var ( - globalServerID = int64(-1) -) + etcdCli *clientv3.Client + leaseID clientv3.LeaseID + cancel context.CancelFunc +} // NewSession is a helper to build Session object.LeaseID will be assigned after // registeration. -func NewSession(serverID int64, serverName, address string) *Session {} +func NewSession(ctx context.Context, etcdAddress []string) *Session {} -// GlobalServerID returns [singleton] ServerID. -// Before SetGlobalServerID, GlobalServerID() returns -1 -func GlobalServerID() int64 {} +// GetSessions will get all sessions registered in etcd. +func (s *Session) GetSessions(prefix string) (map[string]*Session, error) {} -// SetGlobalServerID sets the [singleton] ServerID. ServerID returned by -// GlobalServerID(). Those who use GlobalServerID should call SetGlobalServerID() -// as early as possible in main() before use ServerID. -func SetGlobalServerID(id int64) {} +// Init will initialize base struct in the SessionManager, including getServerID, +// and process keepAliveResponse +func (s *Session) Init(serverName, address string, exclusive bool) <-chan bool {} -// GetServerID gets id from etcd with key: metaRootPath + "/services/id" -// Each server get ServerID and add one to id. -func GetServerID(etcd *etcdkv.EtcdKV) (int64, error) {} - -// RegisterService registers the service to etcd so that other services -// can find that the service is online and issue subsequent operations -// RegisterService will save a key-value in etcd -// key: metaRootPath + "/services/" + "ServerName(-ServerID)(optional)" -// value: json format -// { -// "ServerID": "ServerID", -// "ServerName": "ServerName", -// "Address": "ip:port", -// "LeaseID": "LeaseID", -// } -// MetaRootPath is configurable in the config file. -// Exclusive means whether this service can exist two at the same time, if so, -// it is false. Otherwise, set it to true and the key will not have ServerID. -// But ServerID still will be stored in value. -func RegisterService(etcdKV *etcdkv.EtcdKV, session *Session, ttl int64) (<-chan *clientv3.LeaseKeepAliveResponse, error) {} - -// ProcessKeepAliveResponse processes the response of etcd keepAlive interface -// If keepAlive fails for unexpected error, it will retry for default_retry_times times -func ProcessKeepAliveResponse(ctx context.Context, ch <-chan *clientv3.LeaseKeepAliveResponse) (signal <-chan bool) {} - - -// GetAllSessions gets all the services registered in etcd. -// This gets all the key with prefix metaRootPath + "/services/" + prefix -// For general, "datanode" to get all datanodes -func GetSessions(etcdKV *etcdkv.EtcdKV, prefix string) ([]*Session, error) {} - -// WatchServices watch all events in etcd. -// If a server register, a session will be sent to addChannel -// If a server offline, a session will be sent to deleteChannel -func WatchServices(ctx context.Context, etcdKV *etcdkv.EtcdKV, prefix string) (addChannel <-chan *Session, deleteChannel <-chan *Session) {} -``` +// WatchServices watch the service's up and down in etcd, and saves it into local +// sessions. +// If a server up, it will be add to addChannel. +// If a server is offline, it will be add to delChannel. +func (s *Session) WatchServices(prefix string) (addChannel <-chan *Session, delChannel <-chan *Session) {} #### A.3 Global Parameter Table diff --git a/internal/util/sessionutil/session_util.go b/internal/util/sessionutil/session_util.go index 03a3267622..66e66a1b9a 100644 --- a/internal/util/sessionutil/session_util.go +++ b/internal/util/sessionutil/session_util.go @@ -36,8 +36,9 @@ type Session struct { cancel context.CancelFunc } -// NewSession is a helper to build Session object.LeaseID will be assigned after -// registeration. +// NewSession is a helper to build Session object. +// ServerID and LeaseID will be assigned after registeration. +// etcdCli is initialized when NewSession func NewSession(ctx context.Context, etcdAddress []string) *Session { ctx, cancel := context.WithCancel(ctx) session := &Session{ @@ -62,7 +63,7 @@ func NewSession(ctx context.Context, etcdAddress []string) *Session { // Init will initialize base struct in the SessionManager, including getServerID, // and process keepAliveResponse -func (s *Session) Init(serverName, address string, exclusive bool) { +func (s *Session) Init(serverName, address string, exclusive bool) <-chan bool { s.ServerName = serverName s.Address = address s.Exclusive = exclusive @@ -76,10 +77,10 @@ func (s *Session) Init(serverName, address string, exclusive bool) { if err != nil { panic(err) } - s.processKeepAliveResponse(ch) + return s.processKeepAliveResponse(ch) } -// GetServerID gets id from etcd with key: metaRootPath + "/services/id" +// getServerID gets id from etcd with key: "/session"+"id" // Each server get ServerID and add one to id. func (s *Session) getServerID() (int64, error) { return s.getServerIDWithKey(DefaultIDKey, DefaultRetryTimes) @@ -128,22 +129,21 @@ func (s *Session) getServerIDWithKey(key string, retryTimes int) (int64, error) return nil } - err := retry.Retry(retryTimes, time.Millisecond*200, getServerIDWithKeyFn) + err := retry.Retry(retryTimes, time.Millisecond*500, getServerIDWithKeyFn) return res, err } -// RegisterService registers the service to etcd so that other services +// registerService registers the service to etcd so that other services // can find that the service is online and issue subsequent operations // RegisterService will save a key-value in etcd // key: metaRootPath + "/services" + "/ServerName-ServerID" // value: json format // { -// "ServerID": "ServerID", -// "ServerName": "ServerName", -// "Address": "ip:port", -// "LeaseID": "LeaseID", +// ServerID int64 `json:"ServerID,omitempty"` +// ServerName string `json:"ServerName,omitempty"` +// Address string `json:"Address,omitempty"` +// Exclusive bool `json:"Exclusive,omitempty"` // } -// MetaRootPath is configurable in the config file. // Exclusive means whether this service can exist two at the same time, if so, // it is false. Otherwise, set it to true. func (s *Session) registerService() (<-chan *clientv3.LeaseKeepAliveResponse, error) { @@ -188,14 +188,14 @@ func (s *Session) registerService() (<-chan *clientv3.LeaseKeepAliveResponse, er } return nil } - err := retry.Retry(DefaultRetryTimes, time.Millisecond*200, registerFn) + err := retry.Retry(DefaultRetryTimes, time.Millisecond*500, registerFn) if err != nil { return ch, nil } return ch, nil } -// ProcessKeepAliveResponse processes the response of etcd keepAlive interface +// processKeepAliveResponse processes the response of etcd keepAlive interface // If keepAlive fails for unexpected error, it will send a signal to the channel. func (s *Session) processKeepAliveResponse(ch <-chan *clientv3.LeaseKeepAliveResponse) (failChannel <-chan bool) { failCh := make(chan bool) @@ -207,11 +207,12 @@ func (s *Session) processKeepAliveResponse(ch <-chan *clientv3.LeaseKeepAliveRes return case resp, ok := <-ch: if !ok { - failCh <- true + close(failCh) } if resp == nil { - failCh <- true + close(failCh) } + failCh <- true } } }()