From de454956fef804366751e7afc277b41d202b5230 Mon Sep 17 00:00:00 2001 From: congqixia Date: Wed, 8 Dec 2021 10:11:04 +0800 Subject: [PATCH] Refactor session WatchSessions to allow rewatch when Rev Compacted (#12880) Signed-off-by: Congqi Xia --- internal/datacoord/server.go | 2 +- internal/distributed/connection_manager.go | 2 +- internal/indexcoord/index_coord.go | 2 +- internal/querycoord/query_coord.go | 2 +- internal/util/sessionutil/session_util.go | 162 ++++++++++++------ .../util/sessionutil/session_util_test.go | 140 ++++++++++++++- 6 files changed, 256 insertions(+), 54 deletions(-) diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 7fdacafdbb..3aa1c66583 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -370,7 +370,7 @@ func (s *Server) initServiceDiscovery() error { s.cluster.Startup(datanodes) - s.eventCh = s.session.WatchServices(typeutil.DataNodeRole, rev+1) + s.eventCh = s.session.WatchServices(typeutil.DataNodeRole, rev+1, nil) return nil } diff --git a/internal/distributed/connection_manager.go b/internal/distributed/connection_manager.go index f65ba301a9..42aba60e0c 100644 --- a/internal/distributed/connection_manager.go +++ b/internal/distributed/connection_manager.go @@ -115,7 +115,7 @@ func (cm *ConnectionManager) AddDependency(roleName string) error { } } - eventChannel := cm.session.WatchServices(roleName, rev) + eventChannel := cm.session.WatchServices(roleName, rev, nil) go cm.processEvent(eventChannel) return nil diff --git a/internal/indexcoord/index_coord.go b/internal/indexcoord/index_coord.go index 0fa4815f1c..4c070c7f1e 100644 --- a/internal/indexcoord/index_coord.go +++ b/internal/indexcoord/index_coord.go @@ -173,7 +173,7 @@ func (i *IndexCoord) Init() error { } log.Debug("IndexCoord", zap.Int("IndexNode number", len(i.nodeManager.nodeClients))) - i.eventChan = i.session.WatchServices(typeutil.IndexNodeRole, revision+1) + i.eventChan = i.session.WatchServices(typeutil.IndexNodeRole, revision+1, nil) nodeTasks := i.metaTable.GetNodeTaskStats() for nodeID, taskNum := range nodeTasks { i.nodeManager.pq.UpdatePriority(nodeID, taskNum) diff --git a/internal/querycoord/query_coord.go b/internal/querycoord/query_coord.go index c86ce7039f..f4f016a368 100644 --- a/internal/querycoord/query_coord.go +++ b/internal/querycoord/query_coord.go @@ -331,7 +331,7 @@ func (qc *QueryCoord) watchNodeLoop() { log.Debug("start a loadBalance task", zap.Any("task", loadBalanceTask)) } - qc.eventChan = qc.session.WatchServices(typeutil.QueryNodeRole, qc.cluster.getSessionVersion()+1) + qc.eventChan = qc.session.WatchServices(typeutil.QueryNodeRole, qc.cluster.getSessionVersion()+1, nil) for { select { case <-ctx.Done(): diff --git a/internal/util/sessionutil/session_util.go b/internal/util/sessionutil/session_util.go index d3e0b15331..1c0dd1cb16 100644 --- a/internal/util/sessionutil/session_util.go +++ b/internal/util/sessionutil/session_util.go @@ -13,6 +13,7 @@ import ( "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/util/retry" "go.etcd.io/etcd/api/v3/mvccpb" + v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" ) @@ -31,6 +32,11 @@ const ( // SessionEventType session event type type SessionEventType int +// Rewatch defines the behavior outer session watch handles ErrCompacted +// it should process the current full list of session +// and returns err if meta error or anything else goes wrong +type Rewatch func(sessions map[string]*Session) error + const ( // SessionNoneEvent place holder for zero value SessionNoneEvent SessionEventType = iota @@ -304,6 +310,36 @@ type SessionEvent struct { Session *Session } +type sessionWatcher struct { + s *Session + rch clientv3.WatchChan + eventCh chan *SessionEvent + prefix string + rewatch Rewatch +} + +func (w *sessionWatcher) start() { + go func() { + for { + select { + case <-w.s.ctx.Done(): + return + case wresp, ok := <-w.rch: + if !ok { + return + } + + err := w.handleWatchResponse(wresp) + // internal error not handled,goroutine quit + if err != nil { + log.Warn("watch goroutine found error", zap.Error(err)) + return + } + } + } + }() +} + // WatchServices watch the service's up and down in etcd, and send event to // eventChannel. // prefix is a parameter to know which service to watch and can be obtained in @@ -312,58 +348,86 @@ type SessionEvent struct { // in GetSessions. // If a server up, a event will be add to channel with eventType SessionAddType. // If a server down, a event will be add to channel with eventType SessionDelType. -func (s *Session) WatchServices(prefix string, revision int64) (eventChannel <-chan *SessionEvent) { - eventCh := make(chan *SessionEvent, 100) - rch := s.etcdCli.Watch(s.ctx, path.Join(s.metaRoot, DefaultServiceRoot, prefix), clientv3.WithPrefix(), clientv3.WithPrevKV(), clientv3.WithRev(revision)) - go func() { - for { - select { - case <-s.ctx.Done(): - return - case wresp, ok := <-rch: - if !ok { - return - } - if wresp.Err() != nil { - //close event channel - log.Warn("Watch service found error", zap.Error(wresp.Err())) - close(eventCh) - return - } - for _, ev := range wresp.Events { - session := &Session{} - var eventType SessionEventType - switch ev.Type { - case mvccpb.PUT: - log.Debug("watch services", - zap.Any("add kv", ev.Kv)) - err := json.Unmarshal([]byte(ev.Kv.Value), session) - if err != nil { - log.Error("watch services", zap.Error(err)) - continue - } - eventType = SessionAddEvent - case mvccpb.DELETE: - log.Debug("watch services", - zap.Any("delete kv", ev.PrevKv)) - err := json.Unmarshal([]byte(ev.PrevKv.Value), session) - if err != nil { - log.Error("watch services", zap.Error(err)) - continue - } - eventType = SessionDelEvent - } - log.Debug("WatchService", zap.Any("event type", eventType)) - eventCh <- &SessionEvent{ - EventType: eventType, - Session: session, - } - } +func (s *Session) WatchServices(prefix string, revision int64, rewatch Rewatch) (eventChannel <-chan *SessionEvent) { + w := &sessionWatcher{ + s: s, + eventCh: make(chan *SessionEvent, 100), + rch: s.etcdCli.Watch(s.ctx, path.Join(s.metaRoot, DefaultServiceRoot, prefix), clientv3.WithPrefix(), clientv3.WithPrevKV(), clientv3.WithRev(revision)), + prefix: prefix, + rewatch: rewatch, + } + w.start() + return w.eventCh +} +func (w *sessionWatcher) handleWatchResponse(wresp clientv3.WatchResponse) error { + if wresp.Err() != nil { + return w.handleWatchErr(wresp.Err()) + } + for _, ev := range wresp.Events { + session := &Session{} + var eventType SessionEventType + switch ev.Type { + case mvccpb.PUT: + log.Debug("watch services", + zap.Any("add kv", ev.Kv)) + err := json.Unmarshal([]byte(ev.Kv.Value), session) + if err != nil { + log.Error("watch services", zap.Error(err)) + continue } + eventType = SessionAddEvent + case mvccpb.DELETE: + log.Debug("watch services", + zap.Any("delete kv", ev.PrevKv)) + err := json.Unmarshal([]byte(ev.PrevKv.Value), session) + if err != nil { + log.Error("watch services", zap.Error(err)) + continue + } + eventType = SessionDelEvent } - }() - return eventCh + log.Debug("WatchService", zap.Any("event type", eventType)) + w.eventCh <- &SessionEvent{ + EventType: eventType, + Session: session, + } + } + return nil +} + +func (w *sessionWatcher) handleWatchErr(err error) error { + // if not ErrCompacted, just close the channel + if err != v3rpc.ErrCompacted { + //close event channel + log.Warn("Watch service found error", zap.Error(err)) + close(w.eventCh) + return err + } + + // rewatch is nil, no logic to handle + if w.rewatch == nil { + log.Warn("Watch service with ErrCompacted but no rewatch logic provided") + close(w.eventCh) + return err + } + + sessions, revision, err := w.s.GetSessions(w.prefix) + if err != nil { + log.Warn("GetSession before rewatch failed", zap.String("prefix", w.prefix), zap.Error(err)) + close(w.eventCh) + return err + } + + err = w.rewatch(sessions) + if err != nil { + log.Warn("WatchServices rewatch failed", zap.String("prefix", w.prefix), zap.Error(err)) + close(w.eventCh) + return err + } + + w.rch = w.s.etcdCli.Watch(w.s.ctx, path.Join(w.s.metaRoot, DefaultServiceRoot, w.prefix), clientv3.WithPrefix(), clientv3.WithPrevKV(), clientv3.WithRev(revision)) + return nil } // LivenessCheck performs liveness check with provided context and channel diff --git a/internal/util/sessionutil/session_util_test.go b/internal/util/sessionutil/session_util_test.go index 9300857b4d..5b6fc2457f 100644 --- a/internal/util/sessionutil/session_util_test.go +++ b/internal/util/sessionutil/session_util_test.go @@ -2,6 +2,7 @@ package sessionutil import ( "context" + "errors" "fmt" "math/rand" "strconv" @@ -13,6 +14,11 @@ import ( etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.etcd.io/etcd/api/v3/mvccpb" + v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" + clientv3 "go.etcd.io/etcd/client/v3" ) var Params paramtable.BaseTable @@ -115,7 +121,7 @@ func TestUpdateSessions(t *testing.T) { sessions, rev, err := s.GetSessions("test") assert.Nil(t, err) assert.Equal(t, len(sessions), 0) - eventCh := s.WatchServices("test", rev) + eventCh := s.WatchServices("test", rev, nil) sList := []*Session{} @@ -203,6 +209,138 @@ func TestSessionLivenessCheck(t *testing.T) { assert.False(t, flag) } +func TestWatcherHandleWatchResp(t *testing.T) { + ctx := context.Background() + Params.Init() + + endpoints, err := Params.Load("_EtcdEndpoints") + require.NoError(t, err) + + etcdEndpoints := strings.Split(endpoints, ",") + metaRoot := fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot) + etcdKV, err := etcdkv.NewEtcdKV(etcdEndpoints, "/by-dev/session-ut") + require.NoError(t, err) + + defer etcdKV.Close() + defer etcdKV.RemoveWithPrefix("/by-dev/session-ut") + s := NewSession(ctx, metaRoot, etcdEndpoints) + defer s.Revoke(time.Second) + + getWatcher := func(s *Session, rewatch Rewatch) *sessionWatcher { + return &sessionWatcher{ + s: s, + prefix: "test", + rewatch: rewatch, + eventCh: make(chan *SessionEvent, 10), + } + } + + t.Run("handle normal events", func(t *testing.T) { + w := getWatcher(s, nil) + wresp := clientv3.WatchResponse{ + Events: []*clientv3.Event{ + { + Type: mvccpb.PUT, + Kv: &mvccpb.KeyValue{ + Value: []byte(`{"ServerID": 1, "ServerName": "test1"}`), + }, + }, + { + Type: mvccpb.DELETE, + PrevKv: &mvccpb.KeyValue{ + Value: []byte(`{"ServerID": 2, "ServerName": "test2"}`), + }, + }, + }, + } + err := w.handleWatchResponse(wresp) + assert.NoError(t, err) + assert.Equal(t, 2, len(w.eventCh)) + }) + + t.Run("handle abnormal events", func(t *testing.T) { + w := getWatcher(s, nil) + wresp := clientv3.WatchResponse{ + Events: []*clientv3.Event{ + { + Type: mvccpb.PUT, + Kv: &mvccpb.KeyValue{ + Value: []byte(``), + }, + }, + { + Type: mvccpb.DELETE, + PrevKv: &mvccpb.KeyValue{ + Value: []byte(``), + }, + }, + }, + } + var err error + assert.NotPanics(t, func() { + err = w.handleWatchResponse(wresp) + }) + assert.NoError(t, err) + assert.Equal(t, 0, len(w.eventCh)) + }) + + t.Run("err compacted resp, nil Rewatch", func(t *testing.T) { + w := getWatcher(s, nil) + wresp := clientv3.WatchResponse{ + CompactRevision: 1, + } + err := w.handleWatchResponse(wresp) + assert.Error(t, err) + assert.Equal(t, v3rpc.ErrCompacted, err) + }) + + t.Run("err compacted resp, valid Rewatch", func(t *testing.T) { + w := getWatcher(s, func(sessions map[string]*Session) error { + return nil + }) + wresp := clientv3.WatchResponse{ + CompactRevision: 1, + } + err := w.handleWatchResponse(wresp) + assert.NoError(t, err) + }) + + t.Run("err canceled", func(t *testing.T) { + w := getWatcher(s, nil) + wresp := clientv3.WatchResponse{ + Canceled: true, + } + err := w.handleWatchResponse(wresp) + assert.Error(t, err) + }) + + t.Run("err handled but list failed", func(t *testing.T) { + s := NewSession(ctx, "/by-dev/session-ut", etcdEndpoints) + s.etcdCli.Close() + w := getWatcher(s, func(sessions map[string]*Session) error { + return nil + }) + wresp := clientv3.WatchResponse{ + CompactRevision: 1, + } + + err = w.handleWatchResponse(wresp) + assert.Error(t, err) + }) + + t.Run("err handled but rewatch failed", func(t *testing.T) { + w := getWatcher(s, func(sessions map[string]*Session) error { + return errors.New("mocked") + }) + wresp := clientv3.WatchResponse{ + CompactRevision: 1, + } + err := w.handleWatchResponse(wresp) + + assert.Error(t, err) + }) +} + func TestSessionRevoke(t *testing.T) { s := &Session{} assert.NotPanics(t, func() {