fix: [2.4] replicate message exception when the ttMsgEnable config is changed dynamically (#38440)

- issue: #38177
- pr: #38178

Signed-off-by: SimFG <bang.fu@zilliz.com>
This commit is contained in:
SimFG 2024-12-14 23:24:51 +08:00 committed by GitHub
parent 1da4ac4159
commit 4c896c628c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 44 additions and 38 deletions

View File

@ -283,7 +283,7 @@ func getServiceWithChannel(initCtx context.Context, node *DataNode, info *datapb
metrics.DataNodeNumProducers.WithLabelValues(fmt.Sprint(config.serverID)).Inc()
log.Info("datanode AsProducer", zap.String("TimeTickChannelName", Params.CommonCfg.DataCoordTimeTick.GetValue()))
m.EnableProduce(true)
m.ForceEnableProduce(true)
updater = newMqStatsUpdater(config, m)
}

View File

@ -103,7 +103,7 @@ func (mtm *mockTtMsgStream) CheckTopicValid(channel string) error {
return nil
}
func (mtm *mockTtMsgStream) EnableProduce(can bool) {
func (mtm *mockTtMsgStream) ForceEnableProduce(can bool) {
}
func TestNewDmInputNode(t *testing.T) {

View File

@ -10,10 +10,10 @@ import (
type mockMsgStream struct {
msgstream.MsgStream
asProducer func([]string)
setRepack func(repackFunc msgstream.RepackFunc)
close func()
enableProduce func(bool)
asProducer func([]string)
setRepack func(repackFunc msgstream.RepackFunc)
close func()
forceEnableProduce func(bool)
}
func (m *mockMsgStream) AsProducer(producers []string) {
@ -34,9 +34,9 @@ func (m *mockMsgStream) Close() {
}
}
func (m *mockMsgStream) EnableProduce(enabled bool) {
if m.enableProduce != nil {
m.enableProduce(enabled)
func (m *mockMsgStream) ForceEnableProduce(enabled bool) {
if m.forceEnableProduce != nil {
m.forceEnableProduce(enabled)
}
}

View File

@ -311,7 +311,7 @@ func (ms *simpleMockMsgStream) CheckTopicValid(topic string) error {
return nil
}
func (ms *simpleMockMsgStream) EnableProduce(enabled bool) {
func (ms *simpleMockMsgStream) ForceEnableProduce(enabled bool) {
}
func newSimpleMockMsgStream() *simpleMockMsgStream {

View File

@ -272,7 +272,7 @@ func (node *Proxy) Init() error {
zap.Error(err))
return err
}
node.replicateMsgStream.EnableProduce(true)
node.replicateMsgStream.ForceEnableProduce(true)
node.replicateMsgStream.AsProducer([]string{replicateMsgChannel})
node.sched, err = newTaskScheduler(node.ctx, node.tsoAllocator, node.factory)

View File

@ -43,7 +43,7 @@ func (m *ReplicateStreamManager) newMsgStreamResource(channel string) resource.N
}
msgStream.SetRepackFunc(replicatePackFunc)
msgStream.AsProducer([]string{channel})
msgStream.EnableProduce(true)
msgStream.ForceEnableProduce(true)
res := resource.NewSimpleResource(msgStream, ReplicateMsgStreamTyp, channel, ReplicateMsgStreamExpireTime, func() {
msgStream.Close()

View File

@ -34,7 +34,7 @@ func TestReplicateManager(t *testing.T) {
mockMsgStream.asProducer = func(producers []string) {
i++
}
mockMsgStream.enableProduce = func(b bool) {
mockMsgStream.forceEnableProduce = func(b bool) {
i++
}
mockMsgStream.close = func() {

View File

@ -274,11 +274,11 @@ func (_c *MockMsgStream_Close_Call) RunAndReturn(run func()) *MockMsgStream_Clos
}
// EnableProduce provides a mock function with given fields: can
func (_m *MockMsgStream) EnableProduce(can bool) {
func (_m *MockMsgStream) ForceEnableProduce(can bool) {
_m.Called(can)
}
// MockMsgStream_EnableProduce_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'EnableProduce'
// MockMsgStream_EnableProduce_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ForceEnableProduce'
type MockMsgStream_EnableProduce_Call struct {
*mock.Call
}
@ -286,7 +286,7 @@ type MockMsgStream_EnableProduce_Call struct {
// EnableProduce is a helper method to define mock.On call
// - can bool
func (_e *MockMsgStream_Expecter) EnableProduce(can interface{}) *MockMsgStream_EnableProduce_Call {
return &MockMsgStream_EnableProduce_Call{Call: _e.mock.On("EnableProduce", can)}
return &MockMsgStream_EnableProduce_Call{Call: _e.mock.On("ForceEnableProduce", can)}
}
func (_c *MockMsgStream_EnableProduce_Call) Run(run func(can bool)) *MockMsgStream_EnableProduce_Call {

View File

@ -58,18 +58,19 @@ type mqMsgStream struct {
consumers map[string]mqwrapper.Consumer
consumerChannels []string
repackFunc RepackFunc
unmarshal UnmarshalDispatcher
receiveBuf chan *MsgPack
closeRWMutex *sync.RWMutex
streamCancel func()
bufSize int64
producerLock *sync.RWMutex
consumerLock *sync.Mutex
closed int32
onceChan sync.Once
enableProduce atomic.Value
configEvent config.EventHandler
repackFunc RepackFunc
unmarshal UnmarshalDispatcher
receiveBuf chan *MsgPack
closeRWMutex *sync.RWMutex
streamCancel func()
bufSize int64
producerLock *sync.RWMutex
consumerLock *sync.Mutex
closed int32
onceChan sync.Once
ttMsgEnable atomic.Value
forceEnableProduce atomic.Value
configEvent config.EventHandler
}
// NewMqMsgStream is used to generate a new mqMsgStream object
@ -104,14 +105,15 @@ func NewMqMsgStream(ctx context.Context,
closed: 0,
}
ctxLog := log.Ctx(ctx)
stream.enableProduce.Store(paramtable.Get().CommonCfg.TTMsgEnabled.GetAsBool())
stream.forceEnableProduce.Store(false)
stream.ttMsgEnable.Store(paramtable.Get().CommonCfg.TTMsgEnabled.GetAsBool())
stream.configEvent = config.NewHandler("enable send tt msg "+fmt.Sprint(streamCounter.Inc()), func(event *config.Event) {
value, err := strconv.ParseBool(event.Value)
if err != nil {
ctxLog.Warn("Failed to parse bool value", zap.String("v", event.Value), zap.Error(err))
return
}
stream.enableProduce.Store(value)
stream.ttMsgEnable.Store(value)
ctxLog.Info("Msg Stream state updated", zap.Bool("can_produce", stream.isEnabledProduce()))
})
paramtable.Get().Watch(paramtable.Get().CommonCfg.TTMsgEnabled.Key, stream.configEvent)
@ -265,12 +267,12 @@ func (ms *mqMsgStream) GetProduceChannels() []string {
return ms.producerChannels
}
func (ms *mqMsgStream) EnableProduce(can bool) {
ms.enableProduce.Store(can)
func (ms *mqMsgStream) ForceEnableProduce(can bool) {
ms.forceEnableProduce.Store(can)
}
func (ms *mqMsgStream) isEnabledProduce() bool {
return ms.enableProduce.Load().(bool)
return ms.forceEnableProduce.Load().(bool) || ms.ttMsgEnable.Load().(bool)
}
func (ms *mqMsgStream) Produce(msgPack *MsgPack) error {

View File

@ -114,6 +114,8 @@ func TestStream_ConfigEvent(t *testing.T) {
}
func TestStream_PulsarMsgStream_Insert(t *testing.T) {
Params.Save(Params.CommonCfg.TTMsgEnabled.Key, "false")
defer Params.Remove(Params.CommonCfg.TTMsgEnabled.Key)
pulsarAddress := getPulsarAddress()
c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8)
producerChannels := []string{c1, c2}
@ -129,12 +131,12 @@ func TestStream_PulsarMsgStream_Insert(t *testing.T) {
outputStream := getPulsarOutputStream(ctx, pulsarAddress, consumerChannels, consumerSubName)
{
inputStream.EnableProduce(false)
inputStream.ForceEnableProduce(false)
err := inputStream.Produce(&msgPack)
require.Error(t, err)
}
inputStream.EnableProduce(true)
inputStream.ForceEnableProduce(true)
err := inputStream.Produce(&msgPack)
require.NoErrorf(t, err, fmt.Sprintf("produce error = %v", err))
@ -187,6 +189,8 @@ func TestStream_PulsarMsgStream_TimeTick(t *testing.T) {
}
func TestStream_PulsarMsgStream_BroadCast(t *testing.T) {
Params.Save(Params.CommonCfg.TTMsgEnabled.Key, "false")
defer Params.Remove(Params.CommonCfg.TTMsgEnabled.Key)
pulsarAddress := getPulsarAddress()
c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8)
producerChannels := []string{c1, c2}
@ -202,12 +206,12 @@ func TestStream_PulsarMsgStream_BroadCast(t *testing.T) {
outputStream := getPulsarOutputStream(ctx, pulsarAddress, consumerChannels, consumerSubName)
{
inputStream.EnableProduce(false)
inputStream.ForceEnableProduce(false)
_, err := inputStream.Broadcast(&msgPack)
require.Error(t, err)
}
inputStream.EnableProduce(true)
inputStream.ForceEnableProduce(true)
_, err := inputStream.Broadcast(&msgPack)
require.NoErrorf(t, err, fmt.Sprintf("broadcast error = %v", err))

View File

@ -70,7 +70,7 @@ type MsgStream interface {
GetLatestMsgID(channel string) (MessageID, error)
CheckTopicValid(channel string) error
EnableProduce(can bool)
ForceEnableProduce(can bool)
}
type Factory interface {