diff --git a/internal/distributed/streaming/internal/consumer/consumer_impl.go b/internal/distributed/streaming/internal/consumer/consumer_impl.go index a0cb5c470d..73870433d7 100644 --- a/internal/distributed/streaming/internal/consumer/consumer_impl.go +++ b/internal/distributed/streaming/internal/consumer/consumer_impl.go @@ -32,6 +32,7 @@ func NewResumableConsumer(factory factory, opts *ConsumerOptions) ResumableConsu inner: opts.MessageHandler, lastConfirmedMessageID: nil, lastTimeTick: 0, + lastMessageVersion: message.VersionOld, }, factory: factory, consumeErr: syncutil.NewFuture[error](), @@ -98,7 +99,17 @@ func (rc *resumableConsumerImpl) resumeLoop() { newDeliverFilters = append(newDeliverFilters, filter) } } - newDeliverFilters = append(newDeliverFilters, options.DeliverFilterTimeTickGT(rc.mh.lastTimeTick)) + if rc.mh.lastMessageVersion == message.VersionOld { + newDeliverFilters = append(newDeliverFilters, options.DeliverFilterTimeTickGTE(rc.mh.lastTimeTick)) + // If the message is old version, the message is write by msgstream, so different message may have same timetick. + // So we need to resume from the last timetick, and a message lost will happen if we skip it. + // Meanwhile, the message may be duplicated, so we need to deduplicate the message. + // It will be done on MsgPackAdaptorHandler with message id. + } else { + // New version message always have a unique timetick for every message (txn message will be treated as one message) + // So if we have seen the last timetick, we can skip it, only need to resume from the greater timetick. + newDeliverFilters = append(newDeliverFilters, options.DeliverFilterTimeTickGT(rc.mh.lastTimeTick)) + } deliverFilters = newDeliverFilters } opts := &handler.ConsumerOptions{ diff --git a/internal/distributed/streaming/internal/consumer/consumer_test.go b/internal/distributed/streaming/internal/consumer/consumer_test.go index a32c401746..eb0364d419 100644 --- a/internal/distributed/streaming/internal/consumer/consumer_test.go +++ b/internal/distributed/streaming/internal/consumer/consumer_test.go @@ -2,6 +2,8 @@ package consumer import ( "context" + "math/rand" + "strconv" "testing" "time" @@ -36,7 +38,7 @@ func TestResumableConsumer(t *testing.T) { "key": "value", "_t": "1", "_tt": message.EncodeUint64(456), - "_v": "1", + "_v": strconv.FormatInt(int64(rand.Int31n(2)), 10), "_lc": walimplstest.NewTestMessageID(123).Marshal(), }), }) diff --git a/internal/distributed/streaming/internal/consumer/message_handler.go b/internal/distributed/streaming/internal/consumer/message_handler.go index 9c42d89a26..7a211f2d5b 100644 --- a/internal/distributed/streaming/internal/consumer/message_handler.go +++ b/internal/distributed/streaming/internal/consumer/message_handler.go @@ -8,21 +8,25 @@ import ( type timeTickOrderMessageHandler struct { inner message.Handler lastConfirmedMessageID message.MessageID + lastMessageVersion message.Version lastTimeTick uint64 } func (mh *timeTickOrderMessageHandler) Handle(handleParam message.HandleParam) message.HandleResult { var lastConfirmedMessageID message.MessageID var lastTimeTick uint64 + var lastMessageVersion message.Version if handleParam.Message != nil { lastConfirmedMessageID = handleParam.Message.LastConfirmedMessageID() lastTimeTick = handleParam.Message.TimeTick() + lastMessageVersion = handleParam.Message.Version() } result := mh.inner.Handle(handleParam) if result.MessageHandled { mh.lastConfirmedMessageID = lastConfirmedMessageID mh.lastTimeTick = lastTimeTick + mh.lastMessageVersion = lastMessageVersion } return result } diff --git a/internal/proxy/task_upsert_streaming.go b/internal/proxy/task_upsert_streaming.go index b4423ea81a..bf8d1b1c93 100644 --- a/internal/proxy/task_upsert_streaming.go +++ b/internal/proxy/task_upsert_streaming.go @@ -126,7 +126,7 @@ func (it *upsertTaskByStreamingService) packDeleteMessage(ctx context.Context) ( msg, err := message.NewDeleteMessageBuilderV1(). WithHeader(&message.DeleteMessageHeader{ CollectionId: it.upsertMsg.DeleteMsg.CollectionID, - Rows: uint64(it.upsertMsg.DeleteMsg.NumRows), + Rows: uint64(deleteMsg.NumRows), }). WithBody(deleteMsg.DeleteRequest). WithVChannel(vchannel). diff --git a/internal/streamingnode/server/wal/metricsutil/wal_scan.go b/internal/streamingnode/server/wal/metricsutil/wal_scan.go index 147c904273..d8a84d50f8 100644 --- a/internal/streamingnode/server/wal/metricsutil/wal_scan.go +++ b/internal/streamingnode/server/wal/metricsutil/wal_scan.go @@ -161,20 +161,20 @@ func (m *ScannerMetrics) UpdatePendingQueueSize(size int) { } func (m *ScannerMetrics) UpdateTxnBufSize(size int) { - diff := size - m.previousTimeTickBufSize - m.timeTickBufSize.Add(float64(diff)) - m.previousTimeTickBufSize = size -} - -func (m *ScannerMetrics) UpdateTimeTickBufSize(size int) { diff := size - m.previousTxnBufSize m.txnBufSize.Add(float64(diff)) m.previousTxnBufSize = size } +func (m *ScannerMetrics) UpdateTimeTickBufSize(size int) { + diff := size - m.previousTimeTickBufSize + m.timeTickBufSize.Add(float64(diff)) + m.previousTimeTickBufSize = size +} + func (m *ScannerMetrics) Close() { m.UpdatePendingQueueSize(0) m.UpdateTimeTickBufSize(0) - m.UpdateTimeTickBufSize(0) + m.UpdateTxnBufSize(0) m.scannerTotal.WithLabelValues(m.scannerModel).Dec() } diff --git a/internal/streamingnode/server/wal/recovery/recovery_stream.go b/internal/streamingnode/server/wal/recovery/recovery_stream.go index 2a10f916d9..99efbdb7d5 100644 --- a/internal/streamingnode/server/wal/recovery/recovery_stream.go +++ b/internal/streamingnode/server/wal/recovery/recovery_stream.go @@ -58,7 +58,7 @@ L: // The recovery stream is reach the end, we can stop the recovery. break L } - r.observeMessage(msg) + r.ObserveMessage(ctx, msg) } } if rs.Error() != nil { diff --git a/pkg/streaming/util/message/adaptor/handler.go b/pkg/streaming/util/message/adaptor/handler.go index ce7303b65e..283e09724f 100644 --- a/pkg/streaming/util/message/adaptor/handler.go +++ b/pkg/streaming/util/message/adaptor/handler.go @@ -66,6 +66,13 @@ func (m *MsgPackAdaptorHandler) Handle(param message.HandleParam) message.Handle sendCh = m.channel } + // If there's no pending msgPack and no upstream message, + // return it immediately to ask for more message from upstream to avoid blocking. + if sendCh == nil && param.Upstream == nil { + return message.HandleResult{ + MessageHandled: messageHandled, + } + } select { case <-param.Ctx.Done(): return message.HandleResult{ @@ -137,7 +144,23 @@ func (m *BaseMsgPackAdaptorHandler) GenerateMsgPack(msg message.ImmutableMessage // addMsgPackIntoPending add message into pending msgPack. func (m *BaseMsgPackAdaptorHandler) addMsgPackIntoPending(msgs ...message.ImmutableMessage) { - newPack, err := NewMsgPackFromMessage(msgs...) + // Because the old version message may have same time tick, + // So we may read the same message multiple times on same time tick because of the auto-resuming by ResumableConsumer. + // we need to filter out the duplicate messages here. + dedupMessages := make([]message.ImmutableMessage, 0, len(msgs)) + for _, msg := range msgs { + exist := false + for _, existMsg := range dedupMessages { + if msg.MessageID().EQ(existMsg.MessageID()) { + exist = true + break + } + } + if !exist { + dedupMessages = append(dedupMessages, msg) + } + } + newPack, err := NewMsgPackFromMessage(dedupMessages...) if err != nil { m.Logger.Warn("failed to convert message to msgpack", zap.Error(err)) } diff --git a/pkg/streaming/util/message/adaptor/handler_test.go b/pkg/streaming/util/message/adaptor/handler_test.go index 71fe275b05..0cb49fd483 100644 --- a/pkg/streaming/util/message/adaptor/handler_test.go +++ b/pkg/streaming/util/message/adaptor/handler_test.go @@ -11,6 +11,54 @@ import ( "github.com/milvus-io/milvus/pkg/v2/streaming/walimpls/impls/rmq" ) +func TestDeplicatedMessageOnMsgPackHandler(t *testing.T) { + messageID := rmq.NewRmqID(1) + tt := uint64(100) + msg := message.CreateTestInsertMessage( + t, + 1, + 1000, + tt, + messageID, + ) + msg2 := message.CreateTestInsertMessage( + t, + 1, + 1000, + 101, + messageID, + ) + immutableMsg := msg.WithOldVersion().IntoImmutableMessage(messageID) + + upstream := make(chan message.ImmutableMessage, 3) + upstream <- immutableMsg + upstream <- immutableMsg + upstream <- msg2.IntoImmutableMessage(messageID) + + h := NewMsgPackAdaptorHandler() + done := make(chan struct{}) + go func() { + for msg := range h.Chan() { + assert.Len(t, msg.Msgs, 1) + } + close(done) + }() + var resp message.HandleResult + for i := 0; i < 10; i++ { + var upstream2 <-chan message.ImmutableMessage + if len(upstream) > 0 { + upstream2 = upstream + } + resp = h.Handle(message.HandleParam{ + Ctx: context.Background(), + Upstream: upstream2, + Message: resp.Incoming, + }) + } + h.Close() + <-done +} + func TestMsgPackAdaptorHandler(t *testing.T) { messageID := rmq.NewRmqID(1) tt := uint64(100) @@ -51,6 +99,13 @@ func TestMsgPackAdaptorHandler(t *testing.T) { assert.NoError(t, resp.Error) assert.Nil(t, resp.Incoming) assert.True(t, resp.MessageHandled) + + resp = h.Handle(message.HandleParam{ + Ctx: ctx, + }) + assert.NoError(t, resp.Error) + assert.Nil(t, resp.Incoming) + assert.False(t, resp.MessageHandled) h.Close() <-done