From 307d687ef7a8263839bc831606d486cbb152889a Mon Sep 17 00:00:00 2001 From: SimFG Date: Fri, 16 Jun 2023 16:48:44 +0800 Subject: [PATCH] Improve the panic code about the rootcoord/session/rocksmq (#24859) Signed-off-by: SimFG --- internal/datacoord/server.go | 1 + .../mq/mqimpl/rocksmq/server/rocksmq_impl.go | 2 +- internal/rootcoord/dml_channels.go | 45 ++++++++++--------- internal/rootcoord/dml_channels_test.go | 11 +++-- internal/util/sessionutil/session_util.go | 11 ++++- 5 files changed, 42 insertions(+), 28 deletions(-) diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 8438dd58c1..1b394c60b4 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -697,6 +697,7 @@ func (s *Server) processSessionEvent(ctx context.Context, role string, event *se if err := retry.Do(ctx, func() error { return s.segReferManager.ReleaseSegmentsLockByNodeID(event.Session.ServerID) }, retry.Attempts(100)); err != nil { + log.Error("release segments lock failed", zap.Error(err)) panic(err) } } diff --git a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go index bf0f86fca2..b47153c1fe 100644 --- a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go +++ b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go @@ -1018,7 +1018,7 @@ func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, firstID UniqueI consumers, ok := vals.([]*Consumer) if !ok || len(consumers) == 0 { log.Error("update ack with no consumer", zap.String("topic", topicName)) - panic("update ack with no consumer") + return nil } // find min id of all consumer diff --git a/internal/rootcoord/dml_channels.go b/internal/rootcoord/dml_channels.go index 0b3545e4f1..5cbab3bcc3 100644 --- a/internal/rootcoord/dml_channels.go +++ b/internal/rootcoord/dml_channels.go @@ -24,9 +24,9 @@ import ( "strings" "sync" - "github.com/milvus-io/milvus/internal/metrics" - + "github.com/cockroachdb/errors" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/mq/msgstream" "github.com/milvus-io/milvus/internal/util/typeutil" "go.uber.org/zap" @@ -224,14 +224,21 @@ func (d *dmlChannels) getChannelNum() int { return len(d.listChannels()) } +func (d *dmlChannels) getMsgStreamByName(chanName string) (*dmlMsgStream, error) { + v, ok := d.pool.Load(chanName) + if !ok { + log.Error("invalid channel name", zap.String("chanName", chanName)) + return nil, errors.Newf("invalid channel name: %s", chanName) + } + return v.(*dmlMsgStream), nil +} + func (d *dmlChannels) broadcast(chanNames []string, pack *msgstream.MsgPack) error { for _, chanName := range chanNames { - v, ok := d.pool.Load(chanName) - if !ok { - log.Error("invalid channel name", zap.String("chanName", chanName)) - panic("invalid channel name: " + chanName) + dms, err := d.getMsgStreamByName(chanName) + if err != nil { + return err } - dms := v.(*dmlMsgStream) dms.mutex.RLock() if dms.refcnt > 0 { @@ -249,12 +256,10 @@ func (d *dmlChannels) broadcast(chanNames []string, pack *msgstream.MsgPack) err func (d *dmlChannels) broadcastMark(chanNames []string, pack *msgstream.MsgPack) (map[string][]byte, error) { result := make(map[string][]byte) for _, chanName := range chanNames { - v, ok := d.pool.Load(chanName) - if !ok { - log.Error("invalid channel name", zap.String("chanName", chanName)) - panic("invalid channel name: " + chanName) + dms, err := d.getMsgStreamByName(chanName) + if err != nil { + return result, err } - dms := v.(*dmlMsgStream) dms.mutex.RLock() if dms.refcnt > 0 { @@ -278,12 +283,10 @@ func (d *dmlChannels) broadcastMark(chanNames []string, pack *msgstream.MsgPack) func (d *dmlChannels) addChannels(names ...string) { for _, name := range names { - v, ok := d.pool.Load(name) - if !ok { - log.Error("invalid channel name", zap.String("chanName", name)) - panic("invalid channel name: " + name) + dms, err := d.getMsgStreamByName(name) + if err != nil { + continue } - dms := v.(*dmlMsgStream) d.mut.Lock() dms.IncRefcnt() @@ -294,12 +297,10 @@ func (d *dmlChannels) addChannels(names ...string) { func (d *dmlChannels) removeChannels(names ...string) { for _, name := range names { - v, ok := d.pool.Load(name) - if !ok { - log.Error("invalid channel name", zap.String("chanName", name)) - panic("invalid channel name: " + name) + dms, err := d.getMsgStreamByName(name) + if err != nil { + continue } - dms := v.(*dmlMsgStream) d.mut.Lock() dms.DecRefCnt() diff --git a/internal/rootcoord/dml_channels_test.go b/internal/rootcoord/dml_channels_test.go index 82ea8be254..948f0d7da8 100644 --- a/internal/rootcoord/dml_channels_test.go +++ b/internal/rootcoord/dml_channels_test.go @@ -136,10 +136,13 @@ func TestDmlChannels(t *testing.T) { assert.Equal(t, 0, len(chanNames)) randStr := funcutil.RandomString(8) - assert.Panics(t, func() { dml.addChannels(randStr) }) - assert.Panics(t, func() { dml.broadcast([]string{randStr}, nil) }) - assert.Panics(t, func() { dml.broadcastMark([]string{randStr}, nil) }) - assert.Panics(t, func() { dml.removeChannels(randStr) }) + dml.addChannels(randStr) + assert.Error(t, dml.broadcast([]string{randStr}, nil)) + { + _, err := dml.broadcastMark([]string{randStr}, nil) + assert.Error(t, err) + } + dml.removeChannels(randStr) chans0 := dml.getChannelNames(2) dml.addChannels(chans0...) diff --git a/internal/util/sessionutil/session_util.go b/internal/util/sessionutil/session_util.go index f22c5b70db..0a4115a464 100644 --- a/internal/util/sessionutil/session_util.go +++ b/internal/util/sessionutil/session_util.go @@ -319,7 +319,16 @@ func (s *Session) getSessionKey() string { } func (s *Session) initWatchSessionCh() { - getResp, err := s.etcdCli.Get(context.Background(), s.getSessionKey()) + var ( + err error + getResp *clientv3.GetResponse + ) + + err = retry.Do(context.Background(), func() error { + getResp, err = s.etcdCli.Get(context.Background(), s.getSessionKey()) + log.Warn("fail to get the session key from the etcd", zap.Error(err)) + return err + }, retry.Attempts(100)) if err != nil { panic(err) }