From 200801271d24459afbfc39e69f702a1919104f78 Mon Sep 17 00:00:00 2001 From: godchen Date: Tue, 25 May 2021 20:15:46 +0800 Subject: [PATCH] Fix GetSession error (#5401) Fix GetSession error Signed-off-by: godchen --- .../distributed/datanode/client/client.go | 2 +- .../masterservice/client/client.go | 2 +- internal/util/sessionutil/session_util.go | 54 +++++++++++++------ .../util/sessionutil/session_util_test.go | 45 +++++++++------- 4 files changed, 66 insertions(+), 37 deletions(-) diff --git a/internal/distributed/datanode/client/client.go b/internal/distributed/datanode/client/client.go index ae12321347..5eaec91d56 100644 --- a/internal/distributed/datanode/client/client.go +++ b/internal/distributed/datanode/client/client.go @@ -50,7 +50,7 @@ type Client struct { func getDataNodeAddress(sess *sessionutil.Session, serverID int64) (string, error) { key := typeutil.DataNodeRole + "-" + strconv.FormatInt(serverID, 10) - msess, err := sess.GetSessions(key) + msess, _, err := sess.GetSessions(key) if err != nil { return "", err } diff --git a/internal/distributed/masterservice/client/client.go b/internal/distributed/masterservice/client/client.go index b223f467d1..3df4774b7d 100644 --- a/internal/distributed/masterservice/client/client.go +++ b/internal/distributed/masterservice/client/client.go @@ -43,7 +43,7 @@ type GrpcClient struct { func getMasterServiceAddr(sess *sessionutil.Session) (string, error) { key := typeutil.MasterServiceRole - msess, err := sess.GetSessions(key) + msess, _, err := sess.GetSessions(key) if err != nil { return "", err } diff --git a/internal/util/sessionutil/session_util.go b/internal/util/sessionutil/session_util.go index 1eef68a751..1897428ec5 100644 --- a/internal/util/sessionutil/session_util.go +++ b/internal/util/sessionutil/session_util.go @@ -16,10 +16,20 @@ import ( "go.uber.org/zap" ) -const DefaultServiceRoot = "/session/" -const DefaultIDKey = "id" -const DefaultRetryTimes = 30 -const DefaultTTL = 10 +const ( + DefaultServiceRoot = "/session/" + DefaultIDKey = "id" + DefaultRetryTimes = 30 + DefaultTTL = 10 +) + +type SessionEventType int + +const ( + SessionNoneEvent SessionEventType = iota + SessionAddEvent + SessionDelEvent +) // Session is a struct to store service's session, including ServerID, ServerName, // Address. @@ -36,6 +46,11 @@ type Session struct { cancel context.CancelFunc } +type SessionEvent struct { + EventType SessionEventType + Session *Session +} + // NewSession is a helper to build Session object. // ServerID and LeaseID will be assigned after registeration. // etcdCli is initialized when NewSession @@ -222,33 +237,33 @@ func (s *Session) processKeepAliveResponse(ch <-chan *clientv3.LeaseKeepAliveRes } // GetSessions will get all sessions registered in etcd. -func (s *Session) GetSessions(prefix string) (map[string]*Session, error) { +func (s *Session) GetSessions(prefix string) (map[string]*Session, int64, error) { res := make(map[string]*Session) key := path.Join(DefaultServiceRoot, prefix) resp, err := s.etcdCli.Get(s.ctx, key, clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)) if err != nil { - return nil, err + return nil, 0, err } for _, kv := range resp.Kvs { session := &Session{} err = json.Unmarshal([]byte(kv.Value), session) if err != nil { - return nil, err + return nil, 0, err } - res[string(kv.Key)] = session + _, mapKey := path.Split(string(kv.Key)) + res[mapKey] = session } - return res, nil + return res, resp.Header.Revision, nil } // WatchServices watch the service's up and down in etcd, and saves it into local // sessions. // If a server up, it will be add to addChannel. // If a server is offline, it will be add to delChannel. -func (s *Session) WatchServices(prefix string) (addChannel <-chan *Session, delChannel <-chan *Session) { - addCh := make(chan *Session, 10) - delCh := make(chan *Session, 10) - rch := s.etcdCli.Watch(s.ctx, path.Join(DefaultServiceRoot, prefix), clientv3.WithPrefix(), clientv3.WithPrevKV()) +func (s *Session) WatchServices(prefix string, revision int64) (eventChannel <-chan *SessionEvent) { + eventCh := make(chan *SessionEvent, 100) + rch := s.etcdCli.Watch(s.ctx, path.Join(DefaultServiceRoot, prefix), clientv3.WithPrefix(), clientv3.WithPrevKV(), clientv3.WithRev(revision)) go func() { for { select { @@ -259,6 +274,8 @@ func (s *Session) WatchServices(prefix string) (addChannel <-chan *Session, delC return } for _, ev := range wresp.Events { + session := &Session{} + var eventType SessionEventType switch ev.Type { case mvccpb.PUT: log.Debug("watch services", @@ -269,24 +286,27 @@ func (s *Session) WatchServices(prefix string) (addChannel <-chan *Session, delC log.Error("watch services", zap.Error(err)) continue } - addCh <- session + eventType = SessionAddEvent case mvccpb.DELETE: log.Debug("watch services", zap.Any("delete kv", ev.PrevKv)) - session := &Session{} err := json.Unmarshal([]byte(ev.PrevKv.Value), session) if err != nil { log.Error("watch services", zap.Error(err)) continue } - delCh <- session + eventType = SessionDelEvent + } + eventCh <- &SessionEvent{ + EventType: eventType, + Session: session, } } } } }() - return addCh, delCh + return eventCh } func initEtcd(etcdAddress string) (*clientv3.Client, error) { diff --git a/internal/util/sessionutil/session_util_test.go b/internal/util/sessionutil/session_util_test.go index 5a897d6fb0..346c4e1371 100644 --- a/internal/util/sessionutil/session_util_test.go +++ b/internal/util/sessionutil/session_util_test.go @@ -1,6 +1,7 @@ package sessionutil import ( + "strconv" "sync" "testing" "time" @@ -78,9 +79,12 @@ func TestInit(t *testing.T) { defer etcdKV.RemoveWithPrefix("") s := NewSession(ctx, []string{etcdAddr}) - s.Init("test", "testAddr", false) + s.Init("inittest", "testAddr", false) assert.NotEqual(t, int64(0), s.leaseID) assert.NotEqual(t, int64(0), s.ServerID) + sessions, _, err := s.GetSessions("inittest") + assert.Nil(t, err) + assert.Contains(t, sessions, "inittest-"+strconv.FormatInt(s.ServerID, 10)) } func TestUpdateSessions(t *testing.T) { @@ -106,10 +110,10 @@ func TestUpdateSessions(t *testing.T) { s := NewSession(ctx, []string{etcdAddr}) - sessions, err := s.GetSessions("test") + sessions, rev, err := s.GetSessions("test") assert.Nil(t, err) assert.Equal(t, len(sessions), 0) - addCh, delCh := s.WatchServices("test") + eventCh := s.WatchServices("test", rev) sList := []*Session{} @@ -129,29 +133,34 @@ func TestUpdateSessions(t *testing.T) { wg.Wait() assert.Eventually(t, func() bool { - sessions, _ := s.GetSessions("test") + sessions, _, _ := s.GetSessions("test") return len(sessions) == 10 }, 10*time.Second, 100*time.Millisecond) - notExistSessions, _ := s.GetSessions("testt") + notExistSessions, _, _ := s.GetSessions("testt") assert.Equal(t, len(notExistSessions), 0) - etcdKV.RemoveWithPrefix("") + etcdKV.RemoveWithPrefix(DefaultServiceRoot) assert.Eventually(t, func() bool { - sessions, _ := s.GetSessions("test") + sessions, _, _ := s.GetSessions("test") return len(sessions) == 0 }, 10*time.Second, 100*time.Millisecond) - addSessions := []*Session{} - for i := 0; i < 10; i++ { - session := <-addCh - addSessions = append(addSessions, session) + sessionEvents := []*SessionEvent{} + addEventLen := 0 + delEventLen := 0 + eventLength := len(eventCh) + for i := 0; i < eventLength; i++ { + sessionEvent := <-eventCh + if sessionEvent.EventType == SessionAddEvent { + addEventLen++ + } + if sessionEvent.EventType == SessionDelEvent { + delEventLen++ + } + sessionEvents = append(sessionEvents, sessionEvent) } - assert.Equal(t, len(addSessions), 10) + assert.Equal(t, len(sessionEvents), 20) + assert.Equal(t, addEventLen, 10) + assert.Equal(t, delEventLen, 10) - delSessions := []*Session{} - for i := 0; i < 10; i++ { - session := <-delCh - delSessions = append(delSessions, session) - } - assert.Equal(t, len(addSessions), 10) }