mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
fix: [Cherry-Pick] msgstream memory leak caused by config event (#29418)
relate: https://github.com/milvus-io/milvus/issues/28620 https://github.com/milvus-io/milvus/issues/28367 pr: https://github.com/milvus-io/milvus/pull/29266 https://github.com/milvus-io/milvus/pull/29377 https://github.com/milvus-io/milvus/pull/29403 --------- Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
This commit is contained in:
parent
38f50f6e4a
commit
2d517039b5
@ -19,7 +19,6 @@ package proxy
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
@ -187,10 +186,6 @@ func createStream(factory msgstream.Factory, pchans []pChan, repack repackFuncTy
|
||||
if repack != nil {
|
||||
stream.SetRepackFunc(repack)
|
||||
}
|
||||
runtime.SetFinalizer(stream, func(stream msgstream.MsgStream) {
|
||||
stream.Close()
|
||||
})
|
||||
|
||||
return stream, nil
|
||||
}
|
||||
|
||||
|
||||
@ -28,6 +28,7 @@ import (
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/samber/lo"
|
||||
uatomic "go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
@ -42,7 +43,10 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
var _ MsgStream = (*mqMsgStream)(nil)
|
||||
var (
|
||||
_ MsgStream = (*mqMsgStream)(nil)
|
||||
streamCounter uatomic.Int64
|
||||
)
|
||||
|
||||
type mqMsgStream struct {
|
||||
ctx context.Context
|
||||
@ -63,6 +67,7 @@ type mqMsgStream struct {
|
||||
closed int32
|
||||
onceChan sync.Once
|
||||
enableProduce atomic.Value
|
||||
configEvent config.EventHandler
|
||||
}
|
||||
|
||||
// NewMqMsgStream is used to generate a new mqMsgStream object
|
||||
@ -98,7 +103,7 @@ func NewMqMsgStream(ctx context.Context,
|
||||
}
|
||||
ctxLog := log.Ctx(ctx)
|
||||
stream.enableProduce.Store(paramtable.Get().CommonCfg.TTMsgEnabled.GetAsBool())
|
||||
paramtable.Get().Watch(paramtable.Get().CommonCfg.TTMsgEnabled.Key, config.NewHandler("enable send tt msg", func(event *config.Event) {
|
||||
stream.configEvent = config.NewHandler("enable send tt msg "+fmt.Sprint(streamCounter.Inc()), func(event *config.Event) {
|
||||
value, err := strconv.ParseBool(event.Value)
|
||||
if err != nil {
|
||||
ctxLog.Warn("Failed to parse bool value", zap.String("v", event.Value), zap.Error(err))
|
||||
@ -106,7 +111,8 @@ func NewMqMsgStream(ctx context.Context,
|
||||
}
|
||||
stream.enableProduce.Store(value)
|
||||
ctxLog.Info("Msg Stream state updated", zap.Bool("can_produce", stream.isEnabledProduce()))
|
||||
}))
|
||||
})
|
||||
paramtable.Get().Watch(paramtable.Get().CommonCfg.TTMsgEnabled.Key, stream.configEvent)
|
||||
ctxLog.Info("Msg Stream state", zap.Bool("can_produce", stream.isEnabledProduce()))
|
||||
|
||||
return stream, nil
|
||||
@ -229,6 +235,7 @@ func (ms *mqMsgStream) Close() {
|
||||
|
||||
ms.client.Close()
|
||||
close(ms.receiveBuf)
|
||||
paramtable.Get().Unwatch(paramtable.Get().CommonCfg.TTMsgEnabled.Key, ms.configEvent)
|
||||
}
|
||||
|
||||
func (ms *mqMsgStream) ComputeProduceChannelIndexes(tsMsgs []TsMsg) [][]int32 {
|
||||
|
||||
@ -152,6 +152,10 @@ func (p *ComponentParam) Watch(key string, watcher config.EventHandler) {
|
||||
p.baseTable.mgr.Dispatcher.Register(key, watcher)
|
||||
}
|
||||
|
||||
func (p *ComponentParam) Unwatch(key string, watcher config.EventHandler) {
|
||||
p.baseTable.mgr.Dispatcher.Unregister(key, watcher)
|
||||
}
|
||||
|
||||
func (p *ComponentParam) WatchKeyPrefix(keyPrefix string, watcher config.EventHandler) {
|
||||
p.baseTable.mgr.Dispatcher.RegisterForKeyPrefix(keyPrefix, watcher)
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user