mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-08 01:58:34 +08:00
fix: stream connection leak in case of error (#38320)
issue: #38318 Signed-off-by: jaime <yun.zhang@zilliz.com>
This commit is contained in:
parent
9be106dedf
commit
37a52286b1
@ -91,7 +91,16 @@ func NewDispatcher(ctx context.Context,
|
|||||||
log := log.With(zap.String("pchannel", pchannel),
|
log := log.With(zap.String("pchannel", pchannel),
|
||||||
zap.String("subName", subName), zap.Bool("isMain", isMain))
|
zap.String("subName", subName), zap.Bool("isMain", isMain))
|
||||||
log.Info("creating dispatcher...")
|
log.Info("creating dispatcher...")
|
||||||
stream, err := factory.NewTtMsgStream(ctx)
|
|
||||||
|
var stream msgstream.MsgStream
|
||||||
|
var err error
|
||||||
|
defer func() {
|
||||||
|
if err != nil && stream != nil {
|
||||||
|
stream.Close()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
stream, err = factory.NewTtMsgStream(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -106,7 +115,6 @@ func NewDispatcher(ctx context.Context,
|
|||||||
|
|
||||||
err = stream.Seek(ctx, []*Pos{position}, includeCurrentMsg)
|
err = stream.Seek(ctx, []*Pos{position}, includeCurrentMsg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
stream.Close()
|
|
||||||
log.Error("seek failed", zap.Error(err))
|
log.Error("seek failed", zap.Error(err))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -114,7 +122,7 @@ func NewDispatcher(ctx context.Context,
|
|||||||
log.Info("seek successfully", zap.Uint64("posTs", position.GetTimestamp()),
|
log.Info("seek successfully", zap.Uint64("posTs", position.GetTimestamp()),
|
||||||
zap.Time("posTime", posTime), zap.Duration("tsLag", time.Since(posTime)))
|
zap.Time("posTime", posTime), zap.Duration("tsLag", time.Since(posTime)))
|
||||||
} else {
|
} else {
|
||||||
err := stream.AsConsumer(ctx, []string{pchannel}, subName, subPos)
|
err = stream.AsConsumer(ctx, []string{pchannel}, subName, subPos)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("asConsumer failed", zap.Error(err))
|
log.Error("asConsumer failed", zap.Error(err))
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|||||||
@ -55,6 +55,7 @@ func TestDispatcher(t *testing.T) {
|
|||||||
t.Run("test AsConsumer fail", func(t *testing.T) {
|
t.Run("test AsConsumer fail", func(t *testing.T) {
|
||||||
ms := msgstream.NewMockMsgStream(t)
|
ms := msgstream.NewMockMsgStream(t)
|
||||||
ms.EXPECT().AsConsumer(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(errors.New("mock error"))
|
ms.EXPECT().AsConsumer(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(errors.New("mock error"))
|
||||||
|
ms.EXPECT().Close().Return()
|
||||||
factory := &msgstream.MockMqFactory{
|
factory := &msgstream.MockMqFactory{
|
||||||
NewMsgStreamFunc: func(ctx context.Context) (msgstream.MsgStream, error) {
|
NewMsgStreamFunc: func(ctx context.Context) (msgstream.MsgStream, error) {
|
||||||
return ms, nil
|
return ms, nil
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user