From 01fc411566f7c06949ff4855e9d3e615dfb8460c Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Thu, 30 Jun 2022 18:54:19 +0800 Subject: [PATCH] Handle etcd compacted error (#17886) Signed-off-by: Cai.Zhang --- internal/distributed/connection_manager.go | 10 +- .../distributed/connection_manager_test.go | 86 ++++++++++---- internal/indexcoord/index_coord.go | 6 +- internal/querycoord/query_coord.go | 29 ++++- internal/querycoord/query_coord_test.go | 109 ++++++++++++++++++ internal/rootcoord/proxy_manager.go | 18 ++- internal/rootcoord/proxy_manager_test.go | 47 ++++++++ internal/rootcoord/timeticksync.go | 2 + 8 files changed, 275 insertions(+), 32 deletions(-) diff --git a/internal/distributed/connection_manager.go b/internal/distributed/connection_manager.go index efb5f6f0d4..b3ec7f2fb1 100644 --- a/internal/distributed/connection_manager.go +++ b/internal/distributed/connection_manager.go @@ -19,7 +19,9 @@ package distributed import ( "context" "errors" + "os" "sync" + "syscall" "time" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" @@ -232,7 +234,13 @@ func (cm *ConnectionManager) processEvent(channel <-chan *sessionutil.SessionEve } case ev, ok := <-channel: if !ok { - //TODO silverxia add retry logic + log.Error("watch service channel closed", zap.Int64("serverID", cm.session.ServerID)) + go cm.Stop() + if cm.session.TriggerKill { + if p, err := os.FindProcess(os.Getpid()); err == nil { + p.Signal(syscall.SIGINT) + } + } return } switch ev.EventType { diff --git a/internal/distributed/connection_manager_test.go b/internal/distributed/connection_manager_test.go index 236f107f97..7ae0d134d7 100644 --- a/internal/distributed/connection_manager_test.go +++ b/internal/distributed/connection_manager_test.go @@ -19,7 +19,10 @@ package distributed import ( "context" "net" + "os" + "os/signal" "strings" + "syscall" "testing" "time" @@ -175,33 +178,68 @@ func TestConnectionManager(t *testing.T) { } func TestConnectionManager_processEvent(t *testing.T) { - cm := &ConnectionManager{ - closeCh: make(chan struct{}), - } + t.Run("close closeCh", func(t *testing.T) { + cm := &ConnectionManager{ + closeCh: make(chan struct{}), + } - ech := make(chan *sessionutil.SessionEvent) - flag := false - signal := make(chan struct{}, 1) - go func() { - cm.processEvent(ech) - flag = true - signal <- struct{}{} - }() + ech := make(chan *sessionutil.SessionEvent) + flag := false + signal := make(chan struct{}, 1) + go func() { + assert.Panics(t, func() { + cm.processEvent(ech) + }) - close(ech) - <-signal - assert.True(t, flag) + flag = true + signal <- struct{}{} + }() - ech = make(chan *sessionutil.SessionEvent) - flag = false - go func() { - cm.processEvent(ech) - flag = true - signal <- struct{}{} - }() - close(cm.closeCh) - <-signal - assert.True(t, flag) + close(ech) + <-signal + assert.True(t, flag) + + ech = make(chan *sessionutil.SessionEvent) + flag = false + go func() { + cm.processEvent(ech) + flag = true + signal <- struct{}{} + }() + close(cm.closeCh) + <-signal + assert.True(t, flag) + }) + + t.Run("close watch chan", func(t *testing.T) { + sc := make(chan os.Signal, 1) + signal.Notify(sc, syscall.SIGINT) + defer signal.Reset(syscall.SIGINT) + sigQuit := make(chan struct{}, 1) + + cm := &ConnectionManager{ + closeCh: make(chan struct{}), + session: &sessionutil.Session{ + ServerID: 1, + TriggerKill: true, + }, + } + + ech := make(chan *sessionutil.SessionEvent) + + go func() { + <-sc + sigQuit <- struct{}{} + }() + + go func() { + cm.processEvent(ech) + }() + + close(ech) + + <-sigQuit + }) } type testRootCoord struct { diff --git a/internal/indexcoord/index_coord.go b/internal/indexcoord/index_coord.go index 99dd8c1d90..5f2b6fe69f 100644 --- a/internal/indexcoord/index_coord.go +++ b/internal/indexcoord/index_coord.go @@ -876,10 +876,10 @@ func (i *IndexCoord) watchMetaLoop() { } if err := resp.Err(); err != nil { if err == v3rpc.ErrCompacted { - newMetaTable, err := NewMetaTable(i.metaTable.client) - if err != nil { + newMetaTable, err2 := NewMetaTable(i.metaTable.client) + if err2 != nil { log.Error("Constructing new meta table fails when etcd has a compaction error", - zap.String("path", indexFilePrefix), zap.String("etcd error", err.Error()), zap.Error(err)) + zap.String("path", indexFilePrefix), zap.String("etcd error", err.Error()), zap.Error(err2)) panic("failed to handle etcd request, exit..") } i.metaTable = newMetaTable diff --git a/internal/querycoord/query_coord.go b/internal/querycoord/query_coord.go index 53b715724d..9b25ba981f 100644 --- a/internal/querycoord/query_coord.go +++ b/internal/querycoord/query_coord.go @@ -29,6 +29,8 @@ import ( "syscall" "time" + v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" + "github.com/golang/protobuf/proto" "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" @@ -511,7 +513,32 @@ func (qc *QueryCoord) handoffNotificationLoop() { select { case <-ctx.Done(): return - case resp := <-watchChan: + case resp, ok := <-watchChan: + if !ok { + log.Warn("QueryCoord watch handoff segment loop failed because watch channel is closed") + panic("QueryCoord watch handoff segment loop failed because watch channel is closed") + } + if err := resp.Err(); err != nil { + // https://github.com/etcd-io/etcd/issues/8980 + if err == v3rpc.ErrCompacted { + qc.handoffHandler, err = newHandoffHandler(qc.loopCtx, qc.kvClient, qc.meta, qc.cluster, qc.scheduler, qc.broker) + if err != nil { + log.Error("query coordinator re new handoff handler failed", zap.Error(err)) + panic("failed to handle etcd request, exit..") + } + if err2 := qc.handoffHandler.reloadFromKV(); err2 != nil { + log.Error("reload index checker meta fails when etcd has a compaction error", + zap.String("etcd error", err.Error()), zap.Error(err2)) + panic("failed to handle etcd request, exit..") + } + qc.loopWg.Add(1) + go qc.handoffNotificationLoop() + return + } + log.Error("received error event from etcd watcher", zap.String("prefix", util.HandoffSegmentPrefix), + zap.Error(err)) + panic("failed to handle etcd request, exit..") + } for _, event := range resp.Events { segmentInfo := &querypb.SegmentInfo{} err := proto.Unmarshal(event.Kv.Value, segmentInfo) diff --git a/internal/querycoord/query_coord_test.go b/internal/querycoord/query_coord_test.go index a2b4406c4e..8be0e9ebe7 100644 --- a/internal/querycoord/query_coord_test.go +++ b/internal/querycoord/query_coord_test.go @@ -23,7 +23,9 @@ import ( "math/rand" "os" "os/signal" + "path" "strconv" + "sync" "syscall" "testing" "time" @@ -630,3 +632,110 @@ func TestLoadBalanceSegmentLoop(t *testing.T) { err = removeAllSession() assert.Nil(t, err) } + +func TestQueryCoord_watchHandoffSegmentLoop(t *testing.T) { + Params.Init() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + assert.Nil(t, err) + etcdKV := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) + + qc := &QueryCoord{ + loopCtx: ctx, + loopWg: sync.WaitGroup{}, + kvClient: etcdKV, + handoffHandler: &HandoffHandler{ + ctx: ctx, + cancel: cancel, + client: etcdKV, + }, + } + + t.Run("chan closed", func(t *testing.T) { + qc.loopWg.Add(1) + go func() { + assert.Panics(t, func() { + qc.handoffNotificationLoop() + }) + }() + + etcdCli.Close() + qc.loopWg.Wait() + }) + + t.Run("etcd compaction", func(t *testing.T) { + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + assert.Nil(t, err) + etcdKV = etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) + qc.kvClient = etcdKV + qc.handoffHandler.client = etcdKV + qc.handoffHandler.revision = 0 + qc.meta = &MetaReplica{} + qc.handoffHandler.meta = qc.meta + qc.handoffHandler.tasks = make(map[int64]*HandOffTask) + + for i := 1; i < 10; i++ { + segInfo := &querypb.SegmentInfo{ + SegmentID: UniqueID(i), + } + v, err := proto.Marshal(segInfo) + assert.Nil(t, err) + key := path.Join(util.HandoffSegmentPrefix, strconv.Itoa(i)) + err = etcdKV.Save(key, string(v)) + assert.Nil(t, err) + } + // The reason there the error is no handle is that if you run compact twice, an error will be reported; + // error msg is "etcdserver: mvcc: required revision has been compacted" + etcdCli.Compact(ctx, 10) + qc.loopWg.Add(1) + go qc.handoffNotificationLoop() + + time.Sleep(2 * time.Second) + for i := 1; i < 10; i++ { + k := path.Join(util.HandoffSegmentPrefix, strconv.Itoa(i)) + _, err = etcdCli.Delete(ctx, k) + assert.Nil(t, err) + } + cancel() + qc.loopWg.Wait() + }) + + t.Run("etcd compaction and reload failed", func(t *testing.T) { + ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + etcdCli, err = etcd.GetEtcdClient(&Params.EtcdCfg) + assert.Nil(t, err) + etcdKV = etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) + qc.loopCtx = ctx + qc.loopCancel = cancel + qc.kvClient = etcdKV + qc.handoffHandler.client = etcdKV + qc.handoffHandler.revision = 0 + qc.handoffHandler.tasks = make(map[int64]*HandOffTask) + + for i := 1; i < 10; i++ { + key := path.Join(util.HandoffSegmentPrefix, strconv.Itoa(i)) + v := "segment-" + strconv.Itoa(i) + err = etcdKV.Save(key, v) + assert.Nil(t, err) + } + // The reason there the error is no handle is that if you run compact twice, an error will be reported; + // error msg is "etcdserver: mvcc: required revision has been compacted" + etcdCli.Compact(ctx, 10) + qc.loopWg.Add(1) + go func() { + assert.Panics(t, func() { + qc.handoffNotificationLoop() + }) + }() + qc.loopWg.Wait() + + for i := 1; i < 10; i++ { + k := path.Join(util.HandoffSegmentPrefix, strconv.Itoa(i)) + _, err = etcdCli.Delete(ctx, k) + assert.Nil(t, err) + } + }) +} diff --git a/internal/rootcoord/proxy_manager.go b/internal/rootcoord/proxy_manager.go index 71f99bb5ac..43d9726d2f 100644 --- a/internal/rootcoord/proxy_manager.go +++ b/internal/rootcoord/proxy_manager.go @@ -23,6 +23,8 @@ import ( "path" "sync" + v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" + "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/util/sessionutil" @@ -82,6 +84,7 @@ func (p *proxyManager) WatchProxy() error { return err } log.Debug("succeed to init sessions on etcd", zap.Any("sessions", sessions), zap.Int64("revision", rev)) + // all init function should be clear meta firstly. for _, f := range p.initSessionsFunc { f(sessions) } @@ -105,13 +108,22 @@ func (p *proxyManager) startWatchEtcd(ctx context.Context, eventCh clientv3.Watc case <-ctx.Done(): log.Warn("stop watching etcd loop") return + // TODO @xiaocai2333: watch proxy by session WatchService. case event, ok := <-eventCh: if !ok { log.Warn("stop watching etcd loop due to closed etcd event channel") - return + panic("stop watching etcd loop due to closed etcd event channel") } if err := event.Err(); err != nil { - // TODO do we need to retry watch etcd when ErrCompacted, but the init session func may not be idempotent so skip + if err == v3rpc.ErrCompacted { + err2 := p.WatchProxy() + if err2 != nil { + log.Error("re watch proxy fails when etcd has a compaction error", + zap.String("etcd error", err.Error()), zap.Error(err2)) + panic("failed to handle etcd request, exit..") + } + return + } log.Error("Watch proxy service failed", zap.Error(err)) panic(err) } @@ -182,7 +194,7 @@ func (p *proxyManager) getSessionsOnEtcd(ctx context.Context) ([]*sessionutil.Se session, err := p.parseSession(v.Value) if err != nil { log.Debug("failed to unmarshal session", zap.Error(err)) - continue + return nil, 0, err } sessions = append(sessions, session) } diff --git a/internal/rootcoord/proxy_manager_test.go b/internal/rootcoord/proxy_manager_test.go index bd574b5851..03533ad5aa 100644 --- a/internal/rootcoord/proxy_manager_test.go +++ b/internal/rootcoord/proxy_manager_test.go @@ -20,6 +20,7 @@ import ( "context" "encoding/json" "path" + "strconv" "testing" "time" @@ -98,3 +99,49 @@ func TestProxyManager(t *testing.T) { pm.Stop() time.Sleep(100 * time.Millisecond) } + +func TestProxyManager_ErrCompacted(t *testing.T) { + Params.Init() + + etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) + assert.Nil(t, err) + defer etcdCli.Close() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + sessKey := path.Join(Params.EtcdCfg.MetaRootPath, sessionutil.DefaultServiceRoot) + f1 := func(sess []*sessionutil.Session) { + t.Log("get sessions num", len(sess)) + } + pm := newProxyManager(ctx, etcdCli, f1) + + eventCh := pm.etcdCli.Watch( + pm.ctx, + path.Join(Params.EtcdCfg.MetaRootPath, sessionutil.DefaultServiceRoot, typeutil.ProxyRole), + clientv3.WithPrefix(), + clientv3.WithCreatedNotify(), + clientv3.WithPrevKV(), + clientv3.WithRev(1), + ) + + for i := 1; i < 10; i++ { + k := path.Join(sessKey, typeutil.ProxyRole+strconv.FormatInt(int64(i), 10)) + v := "invalid session: " + strconv.FormatInt(int64(i), 10) + _, err = etcdCli.Put(ctx, k, v) + assert.Nil(t, err) + } + + // The reason there the error is no handle is that if you run compact twice, an error will be reported; + // error msg is "etcdserver: mvcc: required revision has been compacted" + etcdCli.Compact(ctx, 10) + + assert.Panics(t, func() { + pm.startWatchEtcd(pm.ctx, eventCh) + }) + + for i := 1; i < 10; i++ { + k := path.Join(sessKey, typeutil.ProxyRole+strconv.FormatInt(int64(i), 10)) + _, err = etcdCli.Delete(ctx, k) + assert.Nil(t, err) + } +} diff --git a/internal/rootcoord/timeticksync.go b/internal/rootcoord/timeticksync.go index 44c53d5a85..f3e438afb8 100644 --- a/internal/rootcoord/timeticksync.go +++ b/internal/rootcoord/timeticksync.go @@ -276,6 +276,8 @@ func (t *timetickSync) delSession(sess *sessionutil.Session) { func (t *timetickSync) initSessions(sess []*sessionutil.Session) { t.lock.Lock() defer t.lock.Unlock() + t.sess2ChanTsMap = make(map[typeutil.UniqueID]*chanTsMsg) + t.sess2ChanTsMap[t.sourceID] = nil for _, s := range sess { t.sess2ChanTsMap[s.ServerID] = nil log.Debug("Init proxy sessions for timeticksync", zap.Int64("serverID", s.ServerID))