mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-02-02 01:06:41 +08:00
Improve the panic code about the rootcoord/session/rocksmq (#24859)
Signed-off-by: SimFG <bang.fu@zilliz.com>
This commit is contained in:
parent
4325783a8c
commit
307d687ef7
@ -697,6 +697,7 @@ func (s *Server) processSessionEvent(ctx context.Context, role string, event *se
|
||||
if err := retry.Do(ctx, func() error {
|
||||
return s.segReferManager.ReleaseSegmentsLockByNodeID(event.Session.ServerID)
|
||||
}, retry.Attempts(100)); err != nil {
|
||||
log.Error("release segments lock failed", zap.Error(err))
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -1018,7 +1018,7 @@ func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, firstID UniqueI
|
||||
consumers, ok := vals.([]*Consumer)
|
||||
if !ok || len(consumers) == 0 {
|
||||
log.Error("update ack with no consumer", zap.String("topic", topicName))
|
||||
panic("update ack with no consumer")
|
||||
return nil
|
||||
}
|
||||
|
||||
// find min id of all consumer
|
||||
|
||||
@ -24,9 +24,9 @@ import (
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/metrics"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/metrics"
|
||||
"github.com/milvus-io/milvus/internal/mq/msgstream"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
"go.uber.org/zap"
|
||||
@ -224,14 +224,21 @@ func (d *dmlChannels) getChannelNum() int {
|
||||
return len(d.listChannels())
|
||||
}
|
||||
|
||||
func (d *dmlChannels) getMsgStreamByName(chanName string) (*dmlMsgStream, error) {
|
||||
v, ok := d.pool.Load(chanName)
|
||||
if !ok {
|
||||
log.Error("invalid channel name", zap.String("chanName", chanName))
|
||||
return nil, errors.Newf("invalid channel name: %s", chanName)
|
||||
}
|
||||
return v.(*dmlMsgStream), nil
|
||||
}
|
||||
|
||||
func (d *dmlChannels) broadcast(chanNames []string, pack *msgstream.MsgPack) error {
|
||||
for _, chanName := range chanNames {
|
||||
v, ok := d.pool.Load(chanName)
|
||||
if !ok {
|
||||
log.Error("invalid channel name", zap.String("chanName", chanName))
|
||||
panic("invalid channel name: " + chanName)
|
||||
dms, err := d.getMsgStreamByName(chanName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
dms := v.(*dmlMsgStream)
|
||||
|
||||
dms.mutex.RLock()
|
||||
if dms.refcnt > 0 {
|
||||
@ -249,12 +256,10 @@ func (d *dmlChannels) broadcast(chanNames []string, pack *msgstream.MsgPack) err
|
||||
func (d *dmlChannels) broadcastMark(chanNames []string, pack *msgstream.MsgPack) (map[string][]byte, error) {
|
||||
result := make(map[string][]byte)
|
||||
for _, chanName := range chanNames {
|
||||
v, ok := d.pool.Load(chanName)
|
||||
if !ok {
|
||||
log.Error("invalid channel name", zap.String("chanName", chanName))
|
||||
panic("invalid channel name: " + chanName)
|
||||
dms, err := d.getMsgStreamByName(chanName)
|
||||
if err != nil {
|
||||
return result, err
|
||||
}
|
||||
dms := v.(*dmlMsgStream)
|
||||
|
||||
dms.mutex.RLock()
|
||||
if dms.refcnt > 0 {
|
||||
@ -278,12 +283,10 @@ func (d *dmlChannels) broadcastMark(chanNames []string, pack *msgstream.MsgPack)
|
||||
|
||||
func (d *dmlChannels) addChannels(names ...string) {
|
||||
for _, name := range names {
|
||||
v, ok := d.pool.Load(name)
|
||||
if !ok {
|
||||
log.Error("invalid channel name", zap.String("chanName", name))
|
||||
panic("invalid channel name: " + name)
|
||||
dms, err := d.getMsgStreamByName(name)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
dms := v.(*dmlMsgStream)
|
||||
|
||||
d.mut.Lock()
|
||||
dms.IncRefcnt()
|
||||
@ -294,12 +297,10 @@ func (d *dmlChannels) addChannels(names ...string) {
|
||||
|
||||
func (d *dmlChannels) removeChannels(names ...string) {
|
||||
for _, name := range names {
|
||||
v, ok := d.pool.Load(name)
|
||||
if !ok {
|
||||
log.Error("invalid channel name", zap.String("chanName", name))
|
||||
panic("invalid channel name: " + name)
|
||||
dms, err := d.getMsgStreamByName(name)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
dms := v.(*dmlMsgStream)
|
||||
|
||||
d.mut.Lock()
|
||||
dms.DecRefCnt()
|
||||
|
||||
@ -136,10 +136,13 @@ func TestDmlChannels(t *testing.T) {
|
||||
assert.Equal(t, 0, len(chanNames))
|
||||
|
||||
randStr := funcutil.RandomString(8)
|
||||
assert.Panics(t, func() { dml.addChannels(randStr) })
|
||||
assert.Panics(t, func() { dml.broadcast([]string{randStr}, nil) })
|
||||
assert.Panics(t, func() { dml.broadcastMark([]string{randStr}, nil) })
|
||||
assert.Panics(t, func() { dml.removeChannels(randStr) })
|
||||
dml.addChannels(randStr)
|
||||
assert.Error(t, dml.broadcast([]string{randStr}, nil))
|
||||
{
|
||||
_, err := dml.broadcastMark([]string{randStr}, nil)
|
||||
assert.Error(t, err)
|
||||
}
|
||||
dml.removeChannels(randStr)
|
||||
|
||||
chans0 := dml.getChannelNames(2)
|
||||
dml.addChannels(chans0...)
|
||||
|
||||
@ -319,7 +319,16 @@ func (s *Session) getSessionKey() string {
|
||||
}
|
||||
|
||||
func (s *Session) initWatchSessionCh() {
|
||||
getResp, err := s.etcdCli.Get(context.Background(), s.getSessionKey())
|
||||
var (
|
||||
err error
|
||||
getResp *clientv3.GetResponse
|
||||
)
|
||||
|
||||
err = retry.Do(context.Background(), func() error {
|
||||
getResp, err = s.etcdCli.Get(context.Background(), s.getSessionKey())
|
||||
log.Warn("fail to get the session key from the etcd", zap.Error(err))
|
||||
return err
|
||||
}, retry.Attempts(100))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user