From 9729b6079d5f21f38641c83d16f7d7177a37847f Mon Sep 17 00:00:00 2001 From: aoiasd <45024769+aoiasd@users.noreply.github.com> Date: Fri, 19 May 2023 16:23:24 +0800 Subject: [PATCH] Fix rocksmq retention no trigger at datacoord timetick channel (#24170) Signed-off-by: aoiasd --- .../mq/mqimpl/rocksmq/client/client_impl.go | 14 +- .../mqimpl/rocksmq/client/client_impl_test.go | 26 + .../mq/mqimpl/rocksmq/server/mock_rocksmq.go | 583 ++++++++++++++++++ .../mq/mqimpl/rocksmq/server/rocksmq_impl.go | 3 +- .../rocksmq/server/rocksmq_impl_test.go | 9 +- .../rocksmq/server/rocksmq_retention.go | 8 +- 6 files changed, 632 insertions(+), 11 deletions(-) create mode 100644 internal/mq/mqimpl/rocksmq/server/mock_rocksmq.go diff --git a/internal/mq/mqimpl/rocksmq/client/client_impl.go b/internal/mq/mqimpl/rocksmq/client/client_impl.go index 78ab31b2e5..c4efd8a918 100644 --- a/internal/mq/mqimpl/rocksmq/client/client_impl.go +++ b/internal/mq/mqimpl/rocksmq/client/client_impl.go @@ -74,6 +74,7 @@ func (c *client) Subscribe(options ConsumerOptions) (Consumer, error) { if reflect.ValueOf(c.server).IsNil() { return nil, newError(0, "Rmq server is nil") } + exist, con, err := c.server.ExistConsumerGroup(options.Topic, options.SubscriptionName) if err != nil { return nil, err @@ -103,12 +104,6 @@ func (c *client) Subscribe(options ConsumerOptions) (Consumer, error) { return nil, err } - if options.SubscriptionInitialPosition == mqwrapper.SubscriptionPositionLatest { - err = c.server.SeekToLatest(options.Topic, options.SubscriptionName) - if err != nil { - return nil, err - } - } // Register self in rocksmq server cons := &server.Consumer{ Topic: consumer.topic, @@ -117,6 +112,13 @@ func (c *client) Subscribe(options ConsumerOptions) (Consumer, error) { } c.server.RegisterConsumer(cons) + if options.SubscriptionInitialPosition == mqwrapper.SubscriptionPositionLatest { + err = c.server.SeekToLatest(options.Topic, options.SubscriptionName) + if err != nil { + return nil, err + } + } + // Take messages from RocksDB and put it into consumer.Chan(), // trigger by consumer.MsgMutex which trigger by producer c.consumerOptions = append(c.consumerOptions, options) diff --git a/internal/mq/mqimpl/rocksmq/client/client_impl_test.go b/internal/mq/mqimpl/rocksmq/client/client_impl_test.go index 5381e1ceb0..6011557f19 100644 --- a/internal/mq/mqimpl/rocksmq/client/client_impl_test.go +++ b/internal/mq/mqimpl/rocksmq/client/client_impl_test.go @@ -12,12 +12,15 @@ package client import ( + "fmt" "os" "testing" "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/milvus-io/milvus/internal/mq/mqimpl/rocksmq/server" "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -135,6 +138,29 @@ func TestClient_Subscribe(t *testing.T) { assert.NoError(t, err) } +func TestClient_SubscribeError(t *testing.T) { + mockMQ := server.NewMockRocksMQ(t) + client, err := NewClient(Options{ + Server: mockMQ, + }) + testTopic := newTopicName() + testGroupName := newConsumerName() + + assert.NoError(t, err) + mockMQ.EXPECT().ExistConsumerGroup(testTopic, testGroupName).Return(false, nil, nil) + mockMQ.EXPECT().CreateConsumerGroup(testTopic, testGroupName).Return(nil) + mockMQ.EXPECT().RegisterConsumer(mock.Anything).Return(nil) + mockMQ.EXPECT().SeekToLatest(testTopic, testGroupName).Return(fmt.Errorf("test error")) + + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: testTopic, + SubscriptionName: testGroupName, + SubscriptionInitialPosition: mqwrapper.SubscriptionPositionLatest, + }) + assert.Error(t, err) + assert.Nil(t, consumer) +} + func TestClient_SeekLatest(t *testing.T) { os.MkdirAll(rmqPath, os.ModePerm) rmqPathTest := rmqPath + "/seekLatest" diff --git a/internal/mq/mqimpl/rocksmq/server/mock_rocksmq.go b/internal/mq/mqimpl/rocksmq/server/mock_rocksmq.go new file mode 100644 index 0000000000..1731d01f4f --- /dev/null +++ b/internal/mq/mqimpl/rocksmq/server/mock_rocksmq.go @@ -0,0 +1,583 @@ +// Code generated by mockery v2.16.0. DO NOT EDIT. + +package server + +import mock "github.com/stretchr/testify/mock" + +// MockRocksMQ is an autogenerated mock type for the RocksMQ type +type MockRocksMQ struct { + mock.Mock +} + +type MockRocksMQ_Expecter struct { + mock *mock.Mock +} + +func (_m *MockRocksMQ) EXPECT() *MockRocksMQ_Expecter { + return &MockRocksMQ_Expecter{mock: &_m.Mock} +} + +// CheckTopicValid provides a mock function with given fields: topicName +func (_m *MockRocksMQ) CheckTopicValid(topicName string) error { + ret := _m.Called(topicName) + + var r0 error + if rf, ok := ret.Get(0).(func(string) error); ok { + r0 = rf(topicName) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockRocksMQ_CheckTopicValid_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckTopicValid' +type MockRocksMQ_CheckTopicValid_Call struct { + *mock.Call +} + +// CheckTopicValid is a helper method to define mock.On call +// - topicName string +func (_e *MockRocksMQ_Expecter) CheckTopicValid(topicName interface{}) *MockRocksMQ_CheckTopicValid_Call { + return &MockRocksMQ_CheckTopicValid_Call{Call: _e.mock.On("CheckTopicValid", topicName)} +} + +func (_c *MockRocksMQ_CheckTopicValid_Call) Run(run func(topicName string)) *MockRocksMQ_CheckTopicValid_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string)) + }) + return _c +} + +func (_c *MockRocksMQ_CheckTopicValid_Call) Return(_a0 error) *MockRocksMQ_CheckTopicValid_Call { + _c.Call.Return(_a0) + return _c +} + +// Close provides a mock function with given fields: +func (_m *MockRocksMQ) Close() { + _m.Called() +} + +// MockRocksMQ_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close' +type MockRocksMQ_Close_Call struct { + *mock.Call +} + +// Close is a helper method to define mock.On call +func (_e *MockRocksMQ_Expecter) Close() *MockRocksMQ_Close_Call { + return &MockRocksMQ_Close_Call{Call: _e.mock.On("Close")} +} + +func (_c *MockRocksMQ_Close_Call) Run(run func()) *MockRocksMQ_Close_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockRocksMQ_Close_Call) Return() *MockRocksMQ_Close_Call { + _c.Call.Return() + return _c +} + +// Consume provides a mock function with given fields: topicName, groupName, n +func (_m *MockRocksMQ) Consume(topicName string, groupName string, n int) ([]ConsumerMessage, error) { + ret := _m.Called(topicName, groupName, n) + + var r0 []ConsumerMessage + if rf, ok := ret.Get(0).(func(string, string, int) []ConsumerMessage); ok { + r0 = rf(topicName, groupName, n) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]ConsumerMessage) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(string, string, int) error); ok { + r1 = rf(topicName, groupName, n) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockRocksMQ_Consume_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Consume' +type MockRocksMQ_Consume_Call struct { + *mock.Call +} + +// Consume is a helper method to define mock.On call +// - topicName string +// - groupName string +// - n int +func (_e *MockRocksMQ_Expecter) Consume(topicName interface{}, groupName interface{}, n interface{}) *MockRocksMQ_Consume_Call { + return &MockRocksMQ_Consume_Call{Call: _e.mock.On("Consume", topicName, groupName, n)} +} + +func (_c *MockRocksMQ_Consume_Call) Run(run func(topicName string, groupName string, n int)) *MockRocksMQ_Consume_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string), args[1].(string), args[2].(int)) + }) + return _c +} + +func (_c *MockRocksMQ_Consume_Call) Return(_a0 []ConsumerMessage, _a1 error) *MockRocksMQ_Consume_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +// CreateConsumerGroup provides a mock function with given fields: topicName, groupName +func (_m *MockRocksMQ) CreateConsumerGroup(topicName string, groupName string) error { + ret := _m.Called(topicName, groupName) + + var r0 error + if rf, ok := ret.Get(0).(func(string, string) error); ok { + r0 = rf(topicName, groupName) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockRocksMQ_CreateConsumerGroup_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CreateConsumerGroup' +type MockRocksMQ_CreateConsumerGroup_Call struct { + *mock.Call +} + +// CreateConsumerGroup is a helper method to define mock.On call +// - topicName string +// - groupName string +func (_e *MockRocksMQ_Expecter) CreateConsumerGroup(topicName interface{}, groupName interface{}) *MockRocksMQ_CreateConsumerGroup_Call { + return &MockRocksMQ_CreateConsumerGroup_Call{Call: _e.mock.On("CreateConsumerGroup", topicName, groupName)} +} + +func (_c *MockRocksMQ_CreateConsumerGroup_Call) Run(run func(topicName string, groupName string)) *MockRocksMQ_CreateConsumerGroup_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string), args[1].(string)) + }) + return _c +} + +func (_c *MockRocksMQ_CreateConsumerGroup_Call) Return(_a0 error) *MockRocksMQ_CreateConsumerGroup_Call { + _c.Call.Return(_a0) + return _c +} + +// CreateTopic provides a mock function with given fields: topicName +func (_m *MockRocksMQ) CreateTopic(topicName string) error { + ret := _m.Called(topicName) + + var r0 error + if rf, ok := ret.Get(0).(func(string) error); ok { + r0 = rf(topicName) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockRocksMQ_CreateTopic_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CreateTopic' +type MockRocksMQ_CreateTopic_Call struct { + *mock.Call +} + +// CreateTopic is a helper method to define mock.On call +// - topicName string +func (_e *MockRocksMQ_Expecter) CreateTopic(topicName interface{}) *MockRocksMQ_CreateTopic_Call { + return &MockRocksMQ_CreateTopic_Call{Call: _e.mock.On("CreateTopic", topicName)} +} + +func (_c *MockRocksMQ_CreateTopic_Call) Run(run func(topicName string)) *MockRocksMQ_CreateTopic_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string)) + }) + return _c +} + +func (_c *MockRocksMQ_CreateTopic_Call) Return(_a0 error) *MockRocksMQ_CreateTopic_Call { + _c.Call.Return(_a0) + return _c +} + +// DestroyConsumerGroup provides a mock function with given fields: topicName, groupName +func (_m *MockRocksMQ) DestroyConsumerGroup(topicName string, groupName string) error { + ret := _m.Called(topicName, groupName) + + var r0 error + if rf, ok := ret.Get(0).(func(string, string) error); ok { + r0 = rf(topicName, groupName) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockRocksMQ_DestroyConsumerGroup_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DestroyConsumerGroup' +type MockRocksMQ_DestroyConsumerGroup_Call struct { + *mock.Call +} + +// DestroyConsumerGroup is a helper method to define mock.On call +// - topicName string +// - groupName string +func (_e *MockRocksMQ_Expecter) DestroyConsumerGroup(topicName interface{}, groupName interface{}) *MockRocksMQ_DestroyConsumerGroup_Call { + return &MockRocksMQ_DestroyConsumerGroup_Call{Call: _e.mock.On("DestroyConsumerGroup", topicName, groupName)} +} + +func (_c *MockRocksMQ_DestroyConsumerGroup_Call) Run(run func(topicName string, groupName string)) *MockRocksMQ_DestroyConsumerGroup_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string), args[1].(string)) + }) + return _c +} + +func (_c *MockRocksMQ_DestroyConsumerGroup_Call) Return(_a0 error) *MockRocksMQ_DestroyConsumerGroup_Call { + _c.Call.Return(_a0) + return _c +} + +// DestroyTopic provides a mock function with given fields: topicName +func (_m *MockRocksMQ) DestroyTopic(topicName string) error { + ret := _m.Called(topicName) + + var r0 error + if rf, ok := ret.Get(0).(func(string) error); ok { + r0 = rf(topicName) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockRocksMQ_DestroyTopic_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DestroyTopic' +type MockRocksMQ_DestroyTopic_Call struct { + *mock.Call +} + +// DestroyTopic is a helper method to define mock.On call +// - topicName string +func (_e *MockRocksMQ_Expecter) DestroyTopic(topicName interface{}) *MockRocksMQ_DestroyTopic_Call { + return &MockRocksMQ_DestroyTopic_Call{Call: _e.mock.On("DestroyTopic", topicName)} +} + +func (_c *MockRocksMQ_DestroyTopic_Call) Run(run func(topicName string)) *MockRocksMQ_DestroyTopic_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string)) + }) + return _c +} + +func (_c *MockRocksMQ_DestroyTopic_Call) Return(_a0 error) *MockRocksMQ_DestroyTopic_Call { + _c.Call.Return(_a0) + return _c +} + +// ExistConsumerGroup provides a mock function with given fields: topicName, groupName +func (_m *MockRocksMQ) ExistConsumerGroup(topicName string, groupName string) (bool, *Consumer, error) { + ret := _m.Called(topicName, groupName) + + var r0 bool + if rf, ok := ret.Get(0).(func(string, string) bool); ok { + r0 = rf(topicName, groupName) + } else { + r0 = ret.Get(0).(bool) + } + + var r1 *Consumer + if rf, ok := ret.Get(1).(func(string, string) *Consumer); ok { + r1 = rf(topicName, groupName) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(*Consumer) + } + } + + var r2 error + if rf, ok := ret.Get(2).(func(string, string) error); ok { + r2 = rf(topicName, groupName) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// MockRocksMQ_ExistConsumerGroup_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ExistConsumerGroup' +type MockRocksMQ_ExistConsumerGroup_Call struct { + *mock.Call +} + +// ExistConsumerGroup is a helper method to define mock.On call +// - topicName string +// - groupName string +func (_e *MockRocksMQ_Expecter) ExistConsumerGroup(topicName interface{}, groupName interface{}) *MockRocksMQ_ExistConsumerGroup_Call { + return &MockRocksMQ_ExistConsumerGroup_Call{Call: _e.mock.On("ExistConsumerGroup", topicName, groupName)} +} + +func (_c *MockRocksMQ_ExistConsumerGroup_Call) Run(run func(topicName string, groupName string)) *MockRocksMQ_ExistConsumerGroup_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string), args[1].(string)) + }) + return _c +} + +func (_c *MockRocksMQ_ExistConsumerGroup_Call) Return(_a0 bool, _a1 *Consumer, _a2 error) *MockRocksMQ_ExistConsumerGroup_Call { + _c.Call.Return(_a0, _a1, _a2) + return _c +} + +// GetLatestMsg provides a mock function with given fields: topicName +func (_m *MockRocksMQ) GetLatestMsg(topicName string) (int64, error) { + ret := _m.Called(topicName) + + var r0 int64 + if rf, ok := ret.Get(0).(func(string) int64); ok { + r0 = rf(topicName) + } else { + r0 = ret.Get(0).(int64) + } + + var r1 error + if rf, ok := ret.Get(1).(func(string) error); ok { + r1 = rf(topicName) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockRocksMQ_GetLatestMsg_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetLatestMsg' +type MockRocksMQ_GetLatestMsg_Call struct { + *mock.Call +} + +// GetLatestMsg is a helper method to define mock.On call +// - topicName string +func (_e *MockRocksMQ_Expecter) GetLatestMsg(topicName interface{}) *MockRocksMQ_GetLatestMsg_Call { + return &MockRocksMQ_GetLatestMsg_Call{Call: _e.mock.On("GetLatestMsg", topicName)} +} + +func (_c *MockRocksMQ_GetLatestMsg_Call) Run(run func(topicName string)) *MockRocksMQ_GetLatestMsg_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string)) + }) + return _c +} + +func (_c *MockRocksMQ_GetLatestMsg_Call) Return(_a0 int64, _a1 error) *MockRocksMQ_GetLatestMsg_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +// Notify provides a mock function with given fields: topicName, groupName +func (_m *MockRocksMQ) Notify(topicName string, groupName string) { + _m.Called(topicName, groupName) +} + +// MockRocksMQ_Notify_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Notify' +type MockRocksMQ_Notify_Call struct { + *mock.Call +} + +// Notify is a helper method to define mock.On call +// - topicName string +// - groupName string +func (_e *MockRocksMQ_Expecter) Notify(topicName interface{}, groupName interface{}) *MockRocksMQ_Notify_Call { + return &MockRocksMQ_Notify_Call{Call: _e.mock.On("Notify", topicName, groupName)} +} + +func (_c *MockRocksMQ_Notify_Call) Run(run func(topicName string, groupName string)) *MockRocksMQ_Notify_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string), args[1].(string)) + }) + return _c +} + +func (_c *MockRocksMQ_Notify_Call) Return() *MockRocksMQ_Notify_Call { + _c.Call.Return() + return _c +} + +// Produce provides a mock function with given fields: topicName, messages +func (_m *MockRocksMQ) Produce(topicName string, messages []ProducerMessage) ([]int64, error) { + ret := _m.Called(topicName, messages) + + var r0 []int64 + if rf, ok := ret.Get(0).(func(string, []ProducerMessage) []int64); ok { + r0 = rf(topicName, messages) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]int64) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(string, []ProducerMessage) error); ok { + r1 = rf(topicName, messages) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockRocksMQ_Produce_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Produce' +type MockRocksMQ_Produce_Call struct { + *mock.Call +} + +// Produce is a helper method to define mock.On call +// - topicName string +// - messages []ProducerMessage +func (_e *MockRocksMQ_Expecter) Produce(topicName interface{}, messages interface{}) *MockRocksMQ_Produce_Call { + return &MockRocksMQ_Produce_Call{Call: _e.mock.On("Produce", topicName, messages)} +} + +func (_c *MockRocksMQ_Produce_Call) Run(run func(topicName string, messages []ProducerMessage)) *MockRocksMQ_Produce_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string), args[1].([]ProducerMessage)) + }) + return _c +} + +func (_c *MockRocksMQ_Produce_Call) Return(_a0 []int64, _a1 error) *MockRocksMQ_Produce_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +// RegisterConsumer provides a mock function with given fields: consumer +func (_m *MockRocksMQ) RegisterConsumer(consumer *Consumer) error { + ret := _m.Called(consumer) + + var r0 error + if rf, ok := ret.Get(0).(func(*Consumer) error); ok { + r0 = rf(consumer) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockRocksMQ_RegisterConsumer_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RegisterConsumer' +type MockRocksMQ_RegisterConsumer_Call struct { + *mock.Call +} + +// RegisterConsumer is a helper method to define mock.On call +// - consumer *Consumer +func (_e *MockRocksMQ_Expecter) RegisterConsumer(consumer interface{}) *MockRocksMQ_RegisterConsumer_Call { + return &MockRocksMQ_RegisterConsumer_Call{Call: _e.mock.On("RegisterConsumer", consumer)} +} + +func (_c *MockRocksMQ_RegisterConsumer_Call) Run(run func(consumer *Consumer)) *MockRocksMQ_RegisterConsumer_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*Consumer)) + }) + return _c +} + +func (_c *MockRocksMQ_RegisterConsumer_Call) Return(_a0 error) *MockRocksMQ_RegisterConsumer_Call { + _c.Call.Return(_a0) + return _c +} + +// Seek provides a mock function with given fields: topicName, groupName, msgID +func (_m *MockRocksMQ) Seek(topicName string, groupName string, msgID int64) error { + ret := _m.Called(topicName, groupName, msgID) + + var r0 error + if rf, ok := ret.Get(0).(func(string, string, int64) error); ok { + r0 = rf(topicName, groupName, msgID) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockRocksMQ_Seek_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Seek' +type MockRocksMQ_Seek_Call struct { + *mock.Call +} + +// Seek is a helper method to define mock.On call +// - topicName string +// - groupName string +// - msgID int64 +func (_e *MockRocksMQ_Expecter) Seek(topicName interface{}, groupName interface{}, msgID interface{}) *MockRocksMQ_Seek_Call { + return &MockRocksMQ_Seek_Call{Call: _e.mock.On("Seek", topicName, groupName, msgID)} +} + +func (_c *MockRocksMQ_Seek_Call) Run(run func(topicName string, groupName string, msgID int64)) *MockRocksMQ_Seek_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string), args[1].(string), args[2].(int64)) + }) + return _c +} + +func (_c *MockRocksMQ_Seek_Call) Return(_a0 error) *MockRocksMQ_Seek_Call { + _c.Call.Return(_a0) + return _c +} + +// SeekToLatest provides a mock function with given fields: topicName, groupName +func (_m *MockRocksMQ) SeekToLatest(topicName string, groupName string) error { + ret := _m.Called(topicName, groupName) + + var r0 error + if rf, ok := ret.Get(0).(func(string, string) error); ok { + r0 = rf(topicName, groupName) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockRocksMQ_SeekToLatest_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SeekToLatest' +type MockRocksMQ_SeekToLatest_Call struct { + *mock.Call +} + +// SeekToLatest is a helper method to define mock.On call +// - topicName string +// - groupName string +func (_e *MockRocksMQ_Expecter) SeekToLatest(topicName interface{}, groupName interface{}) *MockRocksMQ_SeekToLatest_Call { + return &MockRocksMQ_SeekToLatest_Call{Call: _e.mock.On("SeekToLatest", topicName, groupName)} +} + +func (_c *MockRocksMQ_SeekToLatest_Call) Run(run func(topicName string, groupName string)) *MockRocksMQ_SeekToLatest_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string), args[1].(string)) + }) + return _c +} + +func (_c *MockRocksMQ_SeekToLatest_Call) Return(_a0 error) *MockRocksMQ_SeekToLatest_Call { + _c.Call.Return(_a0) + return _c +} + +type mockConstructorTestingTNewMockRocksMQ interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockRocksMQ creates a new instance of MockRocksMQ. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockRocksMQ(t mockConstructorTestingTNewMockRocksMQ) *MockRocksMQ { + mock := &MockRocksMQ{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go index 50956d3090..2ed0dba7cf 100644 --- a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go +++ b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go @@ -1040,7 +1040,8 @@ func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, firstID UniqueI if vals, ok := rmq.consumers.Load(topicName); ok { consumers, ok := vals.([]*Consumer) if !ok || len(consumers) == 0 { - return nil + log.Error("update ack with no consumer", zap.String("topic", topicName)) + panic("update ack with no consumer") } // find min id of all consumer diff --git a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go index 8516115ee9..e12b638b20 100644 --- a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go +++ b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go @@ -1214,15 +1214,22 @@ func TestRocksmq_updateAckedInfoErr(t *testing.T) { consumer := &Consumer{ Topic: topicName, GroupName: groupName + strconv.Itoa(i), + MsgMutex: make(chan struct{}), } //make sure consumer not in rmq.consumersID - _ = rmq.DestroyConsumerGroup(topicName, groupName) + rmq.DestroyConsumerGroup(topicName, groupName+strconv.Itoa(i)) //add consumer to rmq.consumers rmq.RegisterConsumer(consumer) } // update acked for all page in rmq but some consumer not in rmq.consumers assert.Error(t, rmq.updateAckedInfo(topicName, groupName, 0, ids[len(ids)-1])) + + for i := 0; i < 2; i++ { + rmq.DestroyConsumerGroup(topicName, groupName+strconv.Itoa(i)) + } + // update acked for topic without any consumer + assert.Panics(t, func() { rmq.updateAckedInfo(topicName, groupName, 0, ids[len(ids)-1]) }) } func TestRocksmq_Info(t *testing.T) { diff --git a/internal/mq/mqimpl/rocksmq/server/rocksmq_retention.go b/internal/mq/mqimpl/rocksmq/server/rocksmq_retention.go index 543be42c4d..bbe6ac3fbd 100644 --- a/internal/mq/mqimpl/rocksmq/server/rocksmq_retention.go +++ b/internal/mq/mqimpl/rocksmq/server/rocksmq_retention.go @@ -137,6 +137,7 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error { start := time.Now() var deletedAckedSize int64 var pageCleaned UniqueID + var lastAck int64 var pageEndID UniqueID var err error @@ -181,6 +182,7 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error { if err != nil { return err } + lastAck = ackedTs if msgTimeExpiredCheck(ackedTs) { pageEndID = pageID pValue := pageIter.Value() @@ -201,9 +203,9 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error { return err } - log.Debug("Expired check by retention time", zap.Any("topic", topic), - zap.Any("pageEndID", pageEndID), zap.Any("deletedAckedSize", deletedAckedSize), - zap.Any("pageCleaned", pageCleaned), zap.Any("time taken", time.Since(start).Milliseconds())) + log.Info("Expired check by retention time", zap.String("topic", topic), + zap.Int64("pageEndID", pageEndID), zap.Int64("deletedAckedSize", deletedAckedSize), zap.Int64("lastAck", lastAck), + zap.Int64("pageCleaned", pageCleaned), zap.Int64("time taken", time.Since(start).Milliseconds())) for ; pageIter.Valid(); pageIter.Next() { pValue := pageIter.Value()