Zhen Ye f0f5147aef
fix: streaming consumer may get stucked when handler is un-consumed (#36818)
issue: #36378

Signed-off-by: chyezh <chyezh@outlook.com>
2024-10-14 15:23:23 +08:00

31 lines
771 B
Go

package consumer
import (
"context"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
)
// timeTickOrderMessageHandler is a message handler that will record the last sent message id.
type timeTickOrderMessageHandler struct {
inner message.Handler
lastConfirmedMessageID message.MessageID
lastTimeTick uint64
}
func (mh *timeTickOrderMessageHandler) Handle(ctx context.Context, msg message.ImmutableMessage) (bool, error) {
lastConfirmedMessageID := msg.LastConfirmedMessageID()
timetick := msg.TimeTick()
ok, err := mh.inner.Handle(ctx, msg)
if ok {
mh.lastConfirmedMessageID = lastConfirmedMessageID
mh.lastTimeTick = timetick
}
return ok, err
}
func (mh *timeTickOrderMessageHandler) Close() {
mh.inner.Close()
}