From d6bc95de55670fc2d4e67fc3156bd39f60eb3865 Mon Sep 17 00:00:00 2001 From: congqixia Date: Thu, 23 May 2024 15:27:41 +0800 Subject: [PATCH] enhance: [2.4] Add param item to ignore bad message id in checkpoint (#33123) (#33249) Cherry-pick from master pr: #33123 #33158 See also #33122 This pr add param item `mq.ignoreBadPosition` to control behavior when mq failed to parse message id from checkpoint --------- Signed-off-by: Congqi Xia --------- Signed-off-by: Congqi Xia --- pkg/mq/msgstream/mq_msgstream.go | 38 +++++++++++---- pkg/mq/msgstream/mq_msgstream_test.go | 69 +++++++++++++++++++++++++++ pkg/util/paramtable/service_param.go | 13 ++++- 3 files changed, 109 insertions(+), 11 deletions(-) diff --git a/pkg/mq/msgstream/mq_msgstream.go b/pkg/mq/msgstream/mq_msgstream.go index a0dd919a55..86ad3f7dfe 100644 --- a/pkg/mq/msgstream/mq_msgstream.go +++ b/pkg/mq/msgstream/mq_msgstream.go @@ -481,7 +481,16 @@ func (ms *mqMsgStream) Seek(ctx context.Context, msgPositions []*msgpb.MsgPositi } messageID, err := ms.client.BytesToMsgID(mp.MsgID) if err != nil { - return err + if paramtable.Get().MQCfg.IgnoreBadPosition.GetAsBool() { + // try to use latest message ID first + messageID, err = consumer.GetLatestMsgID() + if err != nil { + log.Ctx(ctx).Warn("Ignoring bad message id", zap.Error(err)) + continue + } + } else { + return err + } } log.Info("MsgStream seek begin", zap.String("channel", mp.ChannelName), zap.Any("MessageID", mp.MsgID)) @@ -835,34 +844,44 @@ func (ms *MqTtMsgStream) Seek(ctx context.Context, msgPositions []*msgpb.MsgPosi var consumer mqwrapper.Consumer var mp *MsgPosition var err error - fn := func() error { + fn := func() (bool, error) { var ok bool consumer, ok = ms.consumers[mp.ChannelName] if !ok { - return fmt.Errorf("please subcribe the channel, channel name =%s", mp.ChannelName) + return false, fmt.Errorf("please subcribe the channel, channel name =%s", mp.ChannelName) } if consumer == nil { - return fmt.Errorf("consumer is nil") + return false, fmt.Errorf("consumer is nil") } seekMsgID, err := ms.client.BytesToMsgID(mp.MsgID) if err != nil { - return err + if paramtable.Get().MQCfg.IgnoreBadPosition.GetAsBool() { + // try to use latest message ID first + seekMsgID, err = consumer.GetLatestMsgID() + if err != nil { + log.Ctx(ctx).Warn("Ignoring bad message id", zap.Error(err)) + return false, nil + } + } else { + return false, err + } } + log.Info("MsgStream begin to seek start msg: ", zap.String("channel", mp.ChannelName), zap.Any("MessageID", mp.MsgID)) err = consumer.Seek(seekMsgID, true) if err != nil { log.Warn("Failed to seek", zap.String("channel", mp.ChannelName), zap.Error(err)) // stop retry if consumer topic not exist if errors.Is(err, merr.ErrMqTopicNotFound) { - return retry.Unrecoverable(err) + return false, err } - return err + return true, err } log.Info("MsgStream seek finished", zap.String("channel", mp.ChannelName)) - return nil + return false, nil } ms.consumerLock.Lock() @@ -873,7 +892,8 @@ func (ms *MqTtMsgStream) Seek(ctx context.Context, msgPositions []*msgpb.MsgPosi if len(mp.MsgID) == 0 { return fmt.Errorf("when msgID's length equal to 0, please use AsConsumer interface") } - err = retry.Do(ctx, fn, retry.Attempts(20), retry.Sleep(time.Millisecond*200), retry.MaxSleepTime(5*time.Second)) + err = retry.Handle(ctx, fn, retry.Attempts(20), retry.Sleep(time.Millisecond*200), retry.MaxSleepTime(5*time.Second)) + // err = retry.Do(ctx, fn, retry.Attempts(20), retry.Sleep(time.Millisecond*200), retry.MaxSleepTime(5*time.Second)) if err != nil { return fmt.Errorf("failed to seek, error %s", err.Error()) } diff --git a/pkg/mq/msgstream/mq_msgstream_test.go b/pkg/mq/msgstream/mq_msgstream_test.go index 46b9797e8e..8705eddf13 100644 --- a/pkg/mq/msgstream/mq_msgstream_test.go +++ b/pkg/mq/msgstream/mq_msgstream_test.go @@ -37,6 +37,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus/pkg/config" "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper" + kafkawrapper "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper/kafka" pulsarwrapper "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper/pulsar" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -1013,6 +1014,74 @@ func TestStream_MqMsgStream_SeekInvalidMessage(t *testing.T) { assert.Equal(t, result.Msgs[0].ID(), int64(1)) } +func TestSTream_MqMsgStream_SeekBadMessageID(t *testing.T) { + pulsarAddress := getPulsarAddress() + c := funcutil.RandomString(8) + producerChannels := []string{c} + consumerChannels := []string{c} + + msgPack := &MsgPack{} + ctx := context.Background() + inputStream := getPulsarInputStream(ctx, pulsarAddress, producerChannels) + defer inputStream.Close() + + outputStream := getPulsarOutputStream(ctx, pulsarAddress, consumerChannels, funcutil.RandomString(8)) + defer outputStream.Close() + + for i := 0; i < 10; i++ { + insertMsg := getTsMsg(commonpb.MsgType_Insert, int64(i)) + msgPack.Msgs = append(msgPack.Msgs, insertMsg) + } + + err := inputStream.Produce(msgPack) + assert.NoError(t, err) + var seekPosition *msgpb.MsgPosition + for i := 0; i < 10; i++ { + result := consumer(ctx, outputStream) + assert.Equal(t, result.Msgs[0].ID(), int64(i)) + seekPosition = result.EndPositions[0] + } + + // produce timetick for mqtt msgstream seek + msgPack = &MsgPack{} + msgPack.Msgs = append(msgPack.Msgs, getTimeTickMsg(1000)) + err = inputStream.Produce(msgPack) + assert.NoError(t, err) + + factory := ProtoUDFactory{} + pulsarClient, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) + outputStream2, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) + outputStream2.AsConsumer(ctx, consumerChannels, funcutil.RandomString(8), mqwrapper.SubscriptionPositionLatest) + defer outputStream2.Close() + + outputStream3, err := NewMqTtMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) + outputStream3.AsConsumer(ctx, consumerChannels, funcutil.RandomString(8), mqwrapper.SubscriptionPositionEarliest) + require.NoError(t, err) + + defer paramtable.Get().Reset(paramtable.Get().MQCfg.IgnoreBadPosition.Key) + + p := []*msgpb.MsgPosition{ + { + ChannelName: seekPosition.ChannelName, + Timestamp: seekPosition.Timestamp, + MsgGroup: seekPosition.MsgGroup, + MsgID: kafkawrapper.SerializeKafkaID(123), + }, + } + + paramtable.Get().Save(paramtable.Get().MQCfg.IgnoreBadPosition.Key, "false") + err = outputStream2.Seek(ctx, p) + assert.Error(t, err) + err = outputStream3.Seek(ctx, p) + assert.Error(t, err) + + paramtable.Get().Save(paramtable.Get().MQCfg.IgnoreBadPosition.Key, "true") + err = outputStream2.Seek(ctx, p) + assert.NoError(t, err) + err = outputStream3.Seek(ctx, p) + assert.NoError(t, err) +} + func TestStream_MqMsgStream_SeekLatest(t *testing.T) { pulsarAddress := getPulsarAddress() c := funcutil.RandomString(8) diff --git a/pkg/util/paramtable/service_param.go b/pkg/util/paramtable/service_param.go index 1fc9d5488c..67c2bbc826 100644 --- a/pkg/util/paramtable/service_param.go +++ b/pkg/util/paramtable/service_param.go @@ -472,8 +472,9 @@ type MQConfig struct { PursuitLag ParamItem `refreshable:"true"` PursuitBufferSize ParamItem `refreshable:"true"` - MQBufSize ParamItem `refreshable:"false"` - ReceiveBufSize ParamItem `refreshable:"false"` + MQBufSize ParamItem `refreshable:"false"` + ReceiveBufSize ParamItem `refreshable:"false"` + IgnoreBadPosition ParamItem `refreshable:"true"` } // Init initializes the MQConfig object with a BaseTable. @@ -531,6 +532,14 @@ Valid values: [default, pulsar, kafka, rocksmq, natsmq]`, Doc: "MQ consumer chan buffer length", } p.ReceiveBufSize.Init(base.mgr) + + p.IgnoreBadPosition = ParamItem{ + Key: "mq.ignoreBadPosition", + Version: "2.3.16", + DefaultValue: "false", + Doc: "A switch for ignoring message queue failing to parse message ID from checkpoint position. Usually caused by switching among different mq implementations. May caused data loss when used by mistake", + } + p.IgnoreBadPosition.Init(base.mgr) } // /////////////////////////////////////////////////////////////////////////////