diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 79cfaff035..18af11f7cd 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -1137,7 +1137,7 @@ quotaAndLimits: # specific conditions, such as memory of nodes to water marker), true means always reject all dml requests. forceDeny: false ttProtection: - enabled: false + enabled: true # maxTimeTickDelay indicates the backpressure for DML Operations. # DML rates would be reduced according to the ratio of time tick delay to maxTimeTickDelay, # if time tick delay is greater than maxTimeTickDelay, all DML requests would be rejected. diff --git a/internal/.mockery.yaml b/internal/.mockery.yaml index 68e66a58c2..beb7119bdd 100644 --- a/internal/.mockery.yaml +++ b/internal/.mockery.yaml @@ -11,6 +11,7 @@ packages: WALAccesser: Utility: Broadcast: + Local: github.com/milvus-io/milvus/internal/streamingcoord/server/balancer: interfaces: Balancer: diff --git a/internal/distributed/streaming/local.go b/internal/distributed/streaming/local.go new file mode 100644 index 0000000000..d0e9092fc7 --- /dev/null +++ b/internal/distributed/streaming/local.go @@ -0,0 +1,31 @@ +package streaming + +import ( + "context" + + "github.com/milvus-io/milvus/pkg/v2/streaming/util/types" + "github.com/milvus-io/milvus/pkg/v2/util/typeutil" +) + +type localServiceImpl struct { + *walAccesserImpl +} + +func (w localServiceImpl) GetLatestMVCCTimestampIfLocal(ctx context.Context, vchannel string) (uint64, error) { + if !w.lifetime.Add(typeutil.LifetimeStateWorking) { + return 0, ErrWALAccesserClosed + } + defer w.lifetime.Done() + + return w.handlerClient.GetLatestMVCCTimestampIfLocal(ctx, vchannel) +} + +// GetMetrics gets the metrics of the wal. +func (w localServiceImpl) GetMetricsIfLocal(ctx context.Context) (*types.StreamingNodeMetrics, error) { + if !w.lifetime.Add(typeutil.LifetimeStateWorking) { + return nil, ErrWALAccesserClosed + } + defer w.lifetime.Done() + + return w.handlerClient.GetWALMetricsIfLocal(ctx) +} diff --git a/internal/distributed/streaming/streaming.go b/internal/distributed/streaming/streaming.go index b8c14760d8..7305f0dd2f 100644 --- a/internal/distributed/streaming/streaming.go +++ b/internal/distributed/streaming/streaming.go @@ -89,9 +89,8 @@ type WALAccesser interface { // WALName returns the name of the wal. WALName() string - // GetLatestMVCCTimestampIfLocal gets the latest mvcc timestamp of the vchannel. - // If the wal is located at remote, it will return 0, error. - GetLatestMVCCTimestampIfLocal(ctx context.Context, vchannel string) (uint64, error) + // Local returns the local services. + Local() Local // Txn returns a transaction for writing records to one vchannel. // It promises the atomicity written of the messages. @@ -123,6 +122,16 @@ type WALAccesser interface { AppendMessagesWithOption(ctx context.Context, opts AppendOption, msgs ...message.MutableMessage) AppendResponses } +type Local interface { + // GetLatestMVCCTimestampIfLocal gets the latest mvcc timestamp of the vchannel. + // If the wal is located at remote, it will return 0, error. + GetLatestMVCCTimestampIfLocal(ctx context.Context, vchannel string) (uint64, error) + + // GetMetricsIfLocal gets the metrics of the local wal. + // It will only return the metrics of the local wal but not the remote wal. + GetMetricsIfLocal(ctx context.Context) (*types.StreamingNodeMetrics, error) +} + // Broadcast is the interface for writing broadcast message into the wal. type Broadcast interface { // Append of Broadcast sends a broadcast message to all target vchannels. diff --git a/internal/distributed/streaming/wal.go b/internal/distributed/streaming/wal.go index 0b1629d71f..8a1ee58c9a 100644 --- a/internal/distributed/streaming/wal.go +++ b/internal/distributed/streaming/wal.go @@ -68,13 +68,8 @@ func (w *walAccesserImpl) WALName() string { return util.MustSelectWALName() } -func (w *walAccesserImpl) GetLatestMVCCTimestampIfLocal(ctx context.Context, vchannel string) (uint64, error) { - if !w.lifetime.Add(typeutil.LifetimeStateWorking) { - return 0, ErrWALAccesserClosed - } - defer w.lifetime.Done() - - return w.handlerClient.GetLatestMVCCTimestampIfLocal(ctx, vchannel) +func (w *walAccesserImpl) Local() Local { + return localServiceImpl{w} } // RawAppend writes a record to the log. diff --git a/internal/distributed/streaming/wal_test.go b/internal/distributed/streaming/wal_test.go index 97bc0b1fc6..a8ad16c0a6 100644 --- a/internal/distributed/streaming/wal_test.go +++ b/internal/distributed/streaming/wal_test.go @@ -182,7 +182,8 @@ func TestWAL(t *testing.T) { w.Close() - w.GetLatestMVCCTimestampIfLocal(ctx, vChannel1) + w.Local().GetLatestMVCCTimestampIfLocal(ctx, vChannel1) + w.Local().GetMetricsIfLocal(ctx) resp = w.AppendMessages(ctx, newInsertMessage(vChannel1)) assert.Error(t, resp.UnwrapFirstError()) diff --git a/internal/mocks/distributed/mock_streaming/mock_Local.go b/internal/mocks/distributed/mock_streaming/mock_Local.go new file mode 100644 index 0000000000..da39a2af4e --- /dev/null +++ b/internal/mocks/distributed/mock_streaming/mock_Local.go @@ -0,0 +1,153 @@ +// Code generated by mockery v2.53.3. DO NOT EDIT. + +package mock_streaming + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" + + types "github.com/milvus-io/milvus/pkg/v2/streaming/util/types" +) + +// MockLocal is an autogenerated mock type for the Local type +type MockLocal struct { + mock.Mock +} + +type MockLocal_Expecter struct { + mock *mock.Mock +} + +func (_m *MockLocal) EXPECT() *MockLocal_Expecter { + return &MockLocal_Expecter{mock: &_m.Mock} +} + +// GetLatestMVCCTimestampIfLocal provides a mock function with given fields: ctx, vchannel +func (_m *MockLocal) GetLatestMVCCTimestampIfLocal(ctx context.Context, vchannel string) (uint64, error) { + ret := _m.Called(ctx, vchannel) + + if len(ret) == 0 { + panic("no return value specified for GetLatestMVCCTimestampIfLocal") + } + + var r0 uint64 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) (uint64, error)); ok { + return rf(ctx, vchannel) + } + if rf, ok := ret.Get(0).(func(context.Context, string) uint64); ok { + r0 = rf(ctx, vchannel) + } else { + r0 = ret.Get(0).(uint64) + } + + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, vchannel) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockLocal_GetLatestMVCCTimestampIfLocal_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetLatestMVCCTimestampIfLocal' +type MockLocal_GetLatestMVCCTimestampIfLocal_Call struct { + *mock.Call +} + +// GetLatestMVCCTimestampIfLocal is a helper method to define mock.On call +// - ctx context.Context +// - vchannel string +func (_e *MockLocal_Expecter) GetLatestMVCCTimestampIfLocal(ctx interface{}, vchannel interface{}) *MockLocal_GetLatestMVCCTimestampIfLocal_Call { + return &MockLocal_GetLatestMVCCTimestampIfLocal_Call{Call: _e.mock.On("GetLatestMVCCTimestampIfLocal", ctx, vchannel)} +} + +func (_c *MockLocal_GetLatestMVCCTimestampIfLocal_Call) Run(run func(ctx context.Context, vchannel string)) *MockLocal_GetLatestMVCCTimestampIfLocal_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *MockLocal_GetLatestMVCCTimestampIfLocal_Call) Return(_a0 uint64, _a1 error) *MockLocal_GetLatestMVCCTimestampIfLocal_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockLocal_GetLatestMVCCTimestampIfLocal_Call) RunAndReturn(run func(context.Context, string) (uint64, error)) *MockLocal_GetLatestMVCCTimestampIfLocal_Call { + _c.Call.Return(run) + return _c +} + +// GetMetricsIfLocal provides a mock function with given fields: ctx +func (_m *MockLocal) GetMetricsIfLocal(ctx context.Context) (*types.StreamingNodeMetrics, error) { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for GetMetricsIfLocal") + } + + var r0 *types.StreamingNodeMetrics + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (*types.StreamingNodeMetrics, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) *types.StreamingNodeMetrics); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.StreamingNodeMetrics) + } + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockLocal_GetMetricsIfLocal_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetMetricsIfLocal' +type MockLocal_GetMetricsIfLocal_Call struct { + *mock.Call +} + +// GetMetricsIfLocal is a helper method to define mock.On call +// - ctx context.Context +func (_e *MockLocal_Expecter) GetMetricsIfLocal(ctx interface{}) *MockLocal_GetMetricsIfLocal_Call { + return &MockLocal_GetMetricsIfLocal_Call{Call: _e.mock.On("GetMetricsIfLocal", ctx)} +} + +func (_c *MockLocal_GetMetricsIfLocal_Call) Run(run func(ctx context.Context)) *MockLocal_GetMetricsIfLocal_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *MockLocal_GetMetricsIfLocal_Call) Return(_a0 *types.StreamingNodeMetrics, _a1 error) *MockLocal_GetMetricsIfLocal_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockLocal_GetMetricsIfLocal_Call) RunAndReturn(run func(context.Context) (*types.StreamingNodeMetrics, error)) *MockLocal_GetMetricsIfLocal_Call { + _c.Call.Return(run) + return _c +} + +// NewMockLocal creates a new instance of MockLocal. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockLocal(t interface { + mock.TestingT + Cleanup(func()) +}) *MockLocal { + mock := &MockLocal{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/mocks/distributed/mock_streaming/mock_WALAccesser.go b/internal/mocks/distributed/mock_streaming/mock_WALAccesser.go index 803acef111..969325da69 100644 --- a/internal/mocks/distributed/mock_streaming/mock_WALAccesser.go +++ b/internal/mocks/distributed/mock_streaming/mock_WALAccesser.go @@ -196,59 +196,49 @@ func (_c *MockWALAccesser_Broadcast_Call) RunAndReturn(run func() streaming.Broa return _c } -// GetLatestMVCCTimestampIfLocal provides a mock function with given fields: ctx, vchannel -func (_m *MockWALAccesser) GetLatestMVCCTimestampIfLocal(ctx context.Context, vchannel string) (uint64, error) { - ret := _m.Called(ctx, vchannel) +// Local provides a mock function with no fields +func (_m *MockWALAccesser) Local() streaming.Local { + ret := _m.Called() if len(ret) == 0 { - panic("no return value specified for GetLatestMVCCTimestampIfLocal") + panic("no return value specified for Local") } - var r0 uint64 - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, string) (uint64, error)); ok { - return rf(ctx, vchannel) - } - if rf, ok := ret.Get(0).(func(context.Context, string) uint64); ok { - r0 = rf(ctx, vchannel) + var r0 streaming.Local + if rf, ok := ret.Get(0).(func() streaming.Local); ok { + r0 = rf() } else { - r0 = ret.Get(0).(uint64) + if ret.Get(0) != nil { + r0 = ret.Get(0).(streaming.Local) + } } - if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { - r1 = rf(ctx, vchannel) - } else { - r1 = ret.Error(1) - } - - return r0, r1 + return r0 } -// MockWALAccesser_GetLatestMVCCTimestampIfLocal_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetLatestMVCCTimestampIfLocal' -type MockWALAccesser_GetLatestMVCCTimestampIfLocal_Call struct { +// MockWALAccesser_Local_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Local' +type MockWALAccesser_Local_Call struct { *mock.Call } -// GetLatestMVCCTimestampIfLocal is a helper method to define mock.On call -// - ctx context.Context -// - vchannel string -func (_e *MockWALAccesser_Expecter) GetLatestMVCCTimestampIfLocal(ctx interface{}, vchannel interface{}) *MockWALAccesser_GetLatestMVCCTimestampIfLocal_Call { - return &MockWALAccesser_GetLatestMVCCTimestampIfLocal_Call{Call: _e.mock.On("GetLatestMVCCTimestampIfLocal", ctx, vchannel)} +// Local is a helper method to define mock.On call +func (_e *MockWALAccesser_Expecter) Local() *MockWALAccesser_Local_Call { + return &MockWALAccesser_Local_Call{Call: _e.mock.On("Local")} } -func (_c *MockWALAccesser_GetLatestMVCCTimestampIfLocal_Call) Run(run func(ctx context.Context, vchannel string)) *MockWALAccesser_GetLatestMVCCTimestampIfLocal_Call { +func (_c *MockWALAccesser_Local_Call) Run(run func()) *MockWALAccesser_Local_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string)) + run() }) return _c } -func (_c *MockWALAccesser_GetLatestMVCCTimestampIfLocal_Call) Return(_a0 uint64, _a1 error) *MockWALAccesser_GetLatestMVCCTimestampIfLocal_Call { - _c.Call.Return(_a0, _a1) +func (_c *MockWALAccesser_Local_Call) Return(_a0 streaming.Local) *MockWALAccesser_Local_Call { + _c.Call.Return(_a0) return _c } -func (_c *MockWALAccesser_GetLatestMVCCTimestampIfLocal_Call) RunAndReturn(run func(context.Context, string) (uint64, error)) *MockWALAccesser_GetLatestMVCCTimestampIfLocal_Call { +func (_c *MockWALAccesser_Local_Call) RunAndReturn(run func() streaming.Local) *MockWALAccesser_Local_Call { _c.Call.Return(run) return _c } diff --git a/internal/mocks/streamingnode/client/mock_handler/mock_HandlerClient.go b/internal/mocks/streamingnode/client/mock_handler/mock_HandlerClient.go index f315890aaa..85fc9f7182 100644 --- a/internal/mocks/streamingnode/client/mock_handler/mock_HandlerClient.go +++ b/internal/mocks/streamingnode/client/mock_handler/mock_HandlerClient.go @@ -7,6 +7,8 @@ import ( handler "github.com/milvus-io/milvus/internal/streamingnode/client/handler" mock "github.com/stretchr/testify/mock" + + types "github.com/milvus-io/milvus/pkg/v2/streaming/util/types" ) // MockHandlerClient is an autogenerated mock type for the HandlerClient type @@ -229,6 +231,64 @@ func (_c *MockHandlerClient_GetLatestMVCCTimestampIfLocal_Call) RunAndReturn(run return _c } +// GetWALMetricsIfLocal provides a mock function with given fields: ctx +func (_m *MockHandlerClient) GetWALMetricsIfLocal(ctx context.Context) (*types.StreamingNodeMetrics, error) { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for GetWALMetricsIfLocal") + } + + var r0 *types.StreamingNodeMetrics + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (*types.StreamingNodeMetrics, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) *types.StreamingNodeMetrics); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.StreamingNodeMetrics) + } + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockHandlerClient_GetWALMetricsIfLocal_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetWALMetricsIfLocal' +type MockHandlerClient_GetWALMetricsIfLocal_Call struct { + *mock.Call +} + +// GetWALMetricsIfLocal is a helper method to define mock.On call +// - ctx context.Context +func (_e *MockHandlerClient_Expecter) GetWALMetricsIfLocal(ctx interface{}) *MockHandlerClient_GetWALMetricsIfLocal_Call { + return &MockHandlerClient_GetWALMetricsIfLocal_Call{Call: _e.mock.On("GetWALMetricsIfLocal", ctx)} +} + +func (_c *MockHandlerClient_GetWALMetricsIfLocal_Call) Run(run func(ctx context.Context)) *MockHandlerClient_GetWALMetricsIfLocal_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *MockHandlerClient_GetWALMetricsIfLocal_Call) Return(_a0 *types.StreamingNodeMetrics, _a1 error) *MockHandlerClient_GetWALMetricsIfLocal_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockHandlerClient_GetWALMetricsIfLocal_Call) RunAndReturn(run func(context.Context) (*types.StreamingNodeMetrics, error)) *MockHandlerClient_GetWALMetricsIfLocal_Call { + _c.Call.Return(run) + return _c +} + // NewMockHandlerClient creates a new instance of MockHandlerClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewMockHandlerClient(t interface { diff --git a/internal/mocks/streamingnode/server/mock_wal/mock_WAL.go b/internal/mocks/streamingnode/server/mock_wal/mock_WAL.go index 382a967d68..bf24a5ba7b 100644 --- a/internal/mocks/streamingnode/server/mock_wal/mock_WAL.go +++ b/internal/mocks/streamingnode/server/mock_wal/mock_WAL.go @@ -346,6 +346,53 @@ func (_c *MockWAL_IsAvailable_Call) RunAndReturn(run func() bool) *MockWAL_IsAva return _c } +// Metrics provides a mock function with no fields +func (_m *MockWAL) Metrics() types.WALMetrics { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Metrics") + } + + var r0 types.WALMetrics + if rf, ok := ret.Get(0).(func() types.WALMetrics); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(types.WALMetrics) + } + } + + return r0 +} + +// MockWAL_Metrics_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Metrics' +type MockWAL_Metrics_Call struct { + *mock.Call +} + +// Metrics is a helper method to define mock.On call +func (_e *MockWAL_Expecter) Metrics() *MockWAL_Metrics_Call { + return &MockWAL_Metrics_Call{Call: _e.mock.On("Metrics")} +} + +func (_c *MockWAL_Metrics_Call) Run(run func()) *MockWAL_Metrics_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockWAL_Metrics_Call) Return(_a0 types.WALMetrics) *MockWAL_Metrics_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockWAL_Metrics_Call) RunAndReturn(run func() types.WALMetrics) *MockWAL_Metrics_Call { + _c.Call.Return(run) + return _c +} + // Read provides a mock function with given fields: ctx, deliverPolicy func (_m *MockWAL) Read(ctx context.Context, deliverPolicy wal.ReadOption) (wal.Scanner, error) { ret := _m.Called(ctx, deliverPolicy) diff --git a/internal/mocks/streamingnode/server/mock_walmanager/mock_Manager.go b/internal/mocks/streamingnode/server/mock_walmanager/mock_Manager.go index c67b364d06..14010e3ce8 100644 --- a/internal/mocks/streamingnode/server/mock_walmanager/mock_Manager.go +++ b/internal/mocks/streamingnode/server/mock_walmanager/mock_Manager.go @@ -56,63 +56,6 @@ func (_c *MockManager_Close_Call) RunAndReturn(run func()) *MockManager_Close_Ca return _c } -// GetAllAvailableChannels provides a mock function with no fields -func (_m *MockManager) GetAllAvailableChannels() ([]types.PChannelInfo, error) { - ret := _m.Called() - - if len(ret) == 0 { - panic("no return value specified for GetAllAvailableChannels") - } - - var r0 []types.PChannelInfo - var r1 error - if rf, ok := ret.Get(0).(func() ([]types.PChannelInfo, error)); ok { - return rf() - } - if rf, ok := ret.Get(0).(func() []types.PChannelInfo); ok { - r0 = rf() - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]types.PChannelInfo) - } - } - - if rf, ok := ret.Get(1).(func() error); ok { - r1 = rf() - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// MockManager_GetAllAvailableChannels_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetAllAvailableChannels' -type MockManager_GetAllAvailableChannels_Call struct { - *mock.Call -} - -// GetAllAvailableChannels is a helper method to define mock.On call -func (_e *MockManager_Expecter) GetAllAvailableChannels() *MockManager_GetAllAvailableChannels_Call { - return &MockManager_GetAllAvailableChannels_Call{Call: _e.mock.On("GetAllAvailableChannels")} -} - -func (_c *MockManager_GetAllAvailableChannels_Call) Run(run func()) *MockManager_GetAllAvailableChannels_Call { - _c.Call.Run(func(args mock.Arguments) { - run() - }) - return _c -} - -func (_c *MockManager_GetAllAvailableChannels_Call) Return(_a0 []types.PChannelInfo, _a1 error) *MockManager_GetAllAvailableChannels_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *MockManager_GetAllAvailableChannels_Call) RunAndReturn(run func() ([]types.PChannelInfo, error)) *MockManager_GetAllAvailableChannels_Call { - _c.Call.Return(run) - return _c -} - // GetAvailableWAL provides a mock function with given fields: channel func (_m *MockManager) GetAvailableWAL(channel types.PChannelInfo) (wal.WAL, error) { ret := _m.Called(channel) @@ -171,6 +114,63 @@ func (_c *MockManager_GetAvailableWAL_Call) RunAndReturn(run func(types.PChannel return _c } +// Metrics provides a mock function with no fields +func (_m *MockManager) Metrics() (*types.StreamingNodeMetrics, error) { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Metrics") + } + + var r0 *types.StreamingNodeMetrics + var r1 error + if rf, ok := ret.Get(0).(func() (*types.StreamingNodeMetrics, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() *types.StreamingNodeMetrics); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.StreamingNodeMetrics) + } + } + + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockManager_Metrics_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Metrics' +type MockManager_Metrics_Call struct { + *mock.Call +} + +// Metrics is a helper method to define mock.On call +func (_e *MockManager_Expecter) Metrics() *MockManager_Metrics_Call { + return &MockManager_Metrics_Call{Call: _e.mock.On("Metrics")} +} + +func (_c *MockManager_Metrics_Call) Run(run func()) *MockManager_Metrics_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockManager_Metrics_Call) Return(_a0 *types.StreamingNodeMetrics, _a1 error) *MockManager_Metrics_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockManager_Metrics_Call) RunAndReturn(run func() (*types.StreamingNodeMetrics, error)) *MockManager_Metrics_Call { + _c.Call.Return(run) + return _c +} + // Open provides a mock function with given fields: ctx, channel func (_m *MockManager) Open(ctx context.Context, channel types.PChannelInfo) error { ret := _m.Called(ctx, channel) diff --git a/internal/mocks/streamingnode/server/wal/mock_recovery/mock_RecoveryStorage.go b/internal/mocks/streamingnode/server/wal/mock_recovery/mock_RecoveryStorage.go index 0992345035..b11c3d2622 100644 --- a/internal/mocks/streamingnode/server/wal/mock_recovery/mock_RecoveryStorage.go +++ b/internal/mocks/streamingnode/server/wal/mock_recovery/mock_RecoveryStorage.go @@ -56,6 +56,51 @@ func (_c *MockRecoveryStorage_Close_Call) RunAndReturn(run func()) *MockRecovery return _c } +// Metrics provides a mock function with no fields +func (_m *MockRecoveryStorage) Metrics() recovery.RecoveryMetrics { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Metrics") + } + + var r0 recovery.RecoveryMetrics + if rf, ok := ret.Get(0).(func() recovery.RecoveryMetrics); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(recovery.RecoveryMetrics) + } + + return r0 +} + +// MockRecoveryStorage_Metrics_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Metrics' +type MockRecoveryStorage_Metrics_Call struct { + *mock.Call +} + +// Metrics is a helper method to define mock.On call +func (_e *MockRecoveryStorage_Expecter) Metrics() *MockRecoveryStorage_Metrics_Call { + return &MockRecoveryStorage_Metrics_Call{Call: _e.mock.On("Metrics")} +} + +func (_c *MockRecoveryStorage_Metrics_Call) Run(run func()) *MockRecoveryStorage_Metrics_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockRecoveryStorage_Metrics_Call) Return(_a0 recovery.RecoveryMetrics) *MockRecoveryStorage_Metrics_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockRecoveryStorage_Metrics_Call) RunAndReturn(run func() recovery.RecoveryMetrics) *MockRecoveryStorage_Metrics_Call { + _c.Call.Return(run) + return _c +} + // ObserveMessage provides a mock function with given fields: ctx, msg func (_m *MockRecoveryStorage) ObserveMessage(ctx context.Context, msg message.ImmutableMessage) error { ret := _m.Called(ctx, msg) diff --git a/internal/proxy/proxy_test.go b/internal/proxy/proxy_test.go index 702796e454..c08fbdfce0 100644 --- a/internal/proxy/proxy_test.go +++ b/internal/proxy/proxy_test.go @@ -277,6 +277,17 @@ func TestProxy(t *testing.T) { params := paramtable.Get() testutil.ResetEnvironment() + wal := mock_streaming.NewMockWALAccesser(t) + b := mock_streaming.NewMockBroadcast(t) + wal.EXPECT().Broadcast().Return(b).Maybe() + b.EXPECT().Append(mock.Anything, mock.Anything).Return(&types.BroadcastAppendResult{}, nil).Maybe() + local := mock_streaming.NewMockLocal(t) + local.EXPECT().GetLatestMVCCTimestampIfLocal(mock.Anything, mock.Anything).Return(0, nil).Maybe() + local.EXPECT().GetMetricsIfLocal(mock.Anything).Return(&types.StreamingNodeMetrics{}, nil).Maybe() + wal.EXPECT().Local().Return(local).Maybe() + streaming.SetWALForTest(wal) + defer streaming.RecoverWALForTest() + params.RootCoordGrpcServerCfg.IP = "localhost" params.QueryCoordGrpcServerCfg.IP = "localhost" params.DataCoordGrpcServerCfg.IP = "localhost" diff --git a/internal/querynodev2/delegator/delegator.go b/internal/querynodev2/delegator/delegator.go index 6223805523..69f90dcde4 100644 --- a/internal/querynodev2/delegator/delegator.go +++ b/internal/querynodev2/delegator/delegator.go @@ -936,7 +936,7 @@ func (sd *shardDelegator) speedupGuranteeTS( return guaranteeTS } // use the mvcc timestamp of the wal as the guarantee timestamp to make fast strong consistency search. - if mvcc, err := streaming.WAL().GetLatestMVCCTimestampIfLocal(ctx, sd.vchannelName); err == nil && mvcc < guaranteeTS { + if mvcc, err := streaming.WAL().Local().GetLatestMVCCTimestampIfLocal(ctx, sd.vchannelName); err == nil && mvcc < guaranteeTS { return mvcc } return guaranteeTS diff --git a/internal/querynodev2/metrics_info.go b/internal/querynodev2/metrics_info.go index 248d0b136e..da6c625c71 100644 --- a/internal/querynodev2/metrics_info.go +++ b/internal/querynodev2/metrics_info.go @@ -25,12 +25,14 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus/internal/distributed/streaming" "github.com/milvus-io/milvus/internal/json" "github.com/milvus-io/milvus/internal/querynodev2/collector" "github.com/milvus-io/milvus/internal/querynodev2/delegator" "github.com/milvus-io/milvus/internal/querynodev2/segments" "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/metrics" + "github.com/milvus-io/milvus/pkg/v2/streaming/util/types" "github.com/milvus-io/milvus/pkg/v2/util/hardware" "github.com/milvus-io/milvus/pkg/v2/util/metricsinfo" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" @@ -159,9 +161,29 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error CollectionDeleteBufferNum: deleteBufferNum, CollectionDeleteBufferSize: deleteBufferSize, }, + StreamingQuota: getStreamingQuotaMetrics(), }, nil } +// getStreamingQuotaMetrics returns the streaming quota metrics of the QueryNode. +func getStreamingQuotaMetrics() *metricsinfo.StreamingQuotaMetrics { + if streamingMetrics, err := streaming.WAL().Local().GetMetricsIfLocal(context.Background()); err == nil { + walMetrics := make([]metricsinfo.WALMetrics, 0, len(streamingMetrics.WALMetrics)) + for channel, metric := range streamingMetrics.WALMetrics { + if rwMetric, ok := metric.(types.RWWALMetrics); ok { + walMetrics = append(walMetrics, metricsinfo.WALMetrics{ + Channel: channel, + RecoveryTimeTick: rwMetric.RecoveryTimeTick, + }) + } + } + return &metricsinfo.StreamingQuotaMetrics{ + WALs: walMetrics, + } + } + return nil +} + func getCollectionMetrics(node *QueryNode) (*metricsinfo.QueryNodeCollectionMetrics, error) { allSegments := node.manager.Segment.GetBy() ret := &metricsinfo.QueryNodeCollectionMetrics{ diff --git a/internal/querynodev2/metrics_info_test.go b/internal/querynodev2/metrics_info_test.go index 0e5d367dc8..bc45b2749b 100644 --- a/internal/querynodev2/metrics_info_test.go +++ b/internal/querynodev2/metrics_info_test.go @@ -18,17 +18,22 @@ package querynodev2 import ( "testing" + "time" + "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/distributed/streaming" "github.com/milvus-io/milvus/internal/json" + "github.com/milvus-io/milvus/internal/mocks/distributed/mock_streaming" "github.com/milvus-io/milvus/internal/querynodev2/delegator" "github.com/milvus-io/milvus/internal/querynodev2/pipeline" "github.com/milvus-io/milvus/internal/querynodev2/segments" "github.com/milvus-io/milvus/pkg/v2/mq/msgdispatcher" "github.com/milvus-io/milvus/pkg/v2/proto/querypb" + "github.com/milvus-io/milvus/pkg/v2/streaming/util/types" "github.com/milvus-io/milvus/pkg/v2/util/metricsinfo" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/tsoutil" @@ -129,3 +134,36 @@ func TestGetSegmentJSON(t *testing.T) { assert.Equal(t, "default", segments[0].ResourceGroup) assert.Equal(t, int64(100), segments[0].LoadedInsertRowCount) } + +func TestStreamingQuotaMetrics(t *testing.T) { + paramtable.Init() + + wal := mock_streaming.NewMockWALAccesser(t) + local := mock_streaming.NewMockLocal(t) + now := time.Now() + local.EXPECT().GetMetricsIfLocal(mock.Anything).Return(&types.StreamingNodeMetrics{ + WALMetrics: map[types.ChannelID]types.WALMetrics{ + {Name: "ch1"}: types.RWWALMetrics{ + ChannelInfo: types.PChannelInfo{ + Name: "ch1", + }, + MVCCTimeTick: tsoutil.ComposeTSByTime(now, 0), + RecoveryTimeTick: tsoutil.ComposeTSByTime(now.Add(-time.Second), 0), + }, + {Name: "ch2"}: types.ROWALMetrics{}, + }, + }, nil) + wal.EXPECT().Local().Return(local) + streaming.SetWALForTest(wal) + defer streaming.RecoverWALForTest() + + m := getStreamingQuotaMetrics() + assert.Len(t, m.WALs, 1) + assert.Equal(t, "ch1", m.WALs[0].Channel.Name) + assert.Equal(t, tsoutil.ComposeTSByTime(now.Add(-time.Second), 0), m.WALs[0].RecoveryTimeTick) + + local.EXPECT().GetMetricsIfLocal(mock.Anything).Unset() + local.EXPECT().GetMetricsIfLocal(mock.Anything).Return(nil, errors.New("test")) + m = getStreamingQuotaMetrics() + assert.Nil(t, m) +} diff --git a/internal/querynodev2/services_test.go b/internal/querynodev2/services_test.go index f0462757d6..6e7949d72d 100644 --- a/internal/querynodev2/services_test.go +++ b/internal/querynodev2/services_test.go @@ -38,7 +38,9 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/distributed/streaming" "github.com/milvus-io/milvus/internal/json" + "github.com/milvus-io/milvus/internal/mocks/distributed/mock_streaming" "github.com/milvus-io/milvus/internal/mocks/util/mock_segcore" "github.com/milvus-io/milvus/internal/querynodev2/delegator" "github.com/milvus-io/milvus/internal/querynodev2/segments" @@ -52,6 +54,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/proto/indexpb" "github.com/milvus-io/milvus/pkg/v2/proto/internalpb" "github.com/milvus-io/milvus/pkg/v2/proto/querypb" + "github.com/milvus-io/milvus/pkg/v2/streaming/util/types" "github.com/milvus-io/milvus/pkg/v2/util/conc" "github.com/milvus-io/milvus/pkg/v2/util/etcd" "github.com/milvus-io/milvus/pkg/v2/util/funcutil" @@ -2429,5 +2432,13 @@ func (suite *ServiceSuite) TestRunAnalyzer() { } func TestQueryNodeService(t *testing.T) { + wal := mock_streaming.NewMockWALAccesser(t) + local := mock_streaming.NewMockLocal(t) + local.EXPECT().GetLatestMVCCTimestampIfLocal(mock.Anything, mock.Anything).Return(0, nil).Maybe() + local.EXPECT().GetMetricsIfLocal(mock.Anything).Return(&types.StreamingNodeMetrics{}, nil).Maybe() + wal.EXPECT().Local().Return(local).Maybe() + streaming.SetWALForTest(wal) + defer streaming.RecoverWALForTest() + suite.Run(t, new(ServiceSuite)) } diff --git a/internal/rootcoord/meta_table.go b/internal/rootcoord/meta_table.go index 9e766b1cc8..0991f8072d 100644 --- a/internal/rootcoord/meta_table.go +++ b/internal/rootcoord/meta_table.go @@ -67,6 +67,9 @@ type IMetaTable interface { GetCollectionByIDWithMaxTs(ctx context.Context, collectionID UniqueID) (*model.Collection, error) ListCollections(ctx context.Context, dbName string, ts Timestamp, onlyAvail bool) ([]*model.Collection, error) ListAllAvailCollections(ctx context.Context) map[int64][]int64 + // ListAllAvailPartitions returns the partition ids of all available collections. + // The key of the map is the database id, and the value is a map of collection id to partition ids. + ListAllAvailPartitions(ctx context.Context) map[int64]map[int64][]int64 ListCollectionPhysicalChannels(ctx context.Context) map[typeutil.UniqueID][]string GetCollectionVirtualChannels(ctx context.Context, colID int64) []string GetPChannelInfo(ctx context.Context, pchannel string) *rootcoordpb.GetPChannelInfoResponse @@ -752,6 +755,31 @@ func (mt *MetaTable) ListAllAvailCollections(ctx context.Context) map[int64][]in return ret } +func (mt *MetaTable) ListAllAvailPartitions(ctx context.Context) map[int64]map[int64][]int64 { + mt.ddLock.RLock() + defer mt.ddLock.RUnlock() + + ret := make(map[int64]map[int64][]int64, len(mt.dbName2Meta)) + for _, dbMeta := range mt.dbName2Meta { + // Database may not have available collections. + ret[dbMeta.ID] = make(map[int64][]int64, 64) + } + for _, collMeta := range mt.collID2Meta { + if !collMeta.Available() { + continue + } + dbID := collMeta.DBID + if dbID == util.NonDBID { + dbID = util.DefaultDBID + } + if _, ok := ret[dbID]; !ok { + ret[dbID] = make(map[int64][]int64, 64) + } + ret[dbID][collMeta.CollectionID] = lo.Map(collMeta.Partitions, func(part *model.Partition, _ int) int64 { return part.PartitionID }) + } + return ret +} + func (mt *MetaTable) ListCollections(ctx context.Context, dbName string, ts Timestamp, onlyAvail bool) ([]*model.Collection, error) { mt.ddLock.RLock() defer mt.ddLock.RUnlock() diff --git a/internal/rootcoord/mocks/meta_table.go b/internal/rootcoord/mocks/meta_table.go index 406739c103..632744eaa6 100644 --- a/internal/rootcoord/mocks/meta_table.go +++ b/internal/rootcoord/mocks/meta_table.go @@ -1939,6 +1939,54 @@ func (_c *IMetaTable_ListAllAvailCollections_Call) RunAndReturn(run func(context return _c } +// ListAllAvailPartitions provides a mock function with given fields: ctx +func (_m *IMetaTable) ListAllAvailPartitions(ctx context.Context) map[int64]map[int64][]int64 { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for ListAllAvailPartitions") + } + + var r0 map[int64]map[int64][]int64 + if rf, ok := ret.Get(0).(func(context.Context) map[int64]map[int64][]int64); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[int64]map[int64][]int64) + } + } + + return r0 +} + +// IMetaTable_ListAllAvailPartitions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListAllAvailPartitions' +type IMetaTable_ListAllAvailPartitions_Call struct { + *mock.Call +} + +// ListAllAvailPartitions is a helper method to define mock.On call +// - ctx context.Context +func (_e *IMetaTable_Expecter) ListAllAvailPartitions(ctx interface{}) *IMetaTable_ListAllAvailPartitions_Call { + return &IMetaTable_ListAllAvailPartitions_Call{Call: _e.mock.On("ListAllAvailPartitions", ctx)} +} + +func (_c *IMetaTable_ListAllAvailPartitions_Call) Run(run func(ctx context.Context)) *IMetaTable_ListAllAvailPartitions_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *IMetaTable_ListAllAvailPartitions_Call) Return(_a0 map[int64]map[int64][]int64) *IMetaTable_ListAllAvailPartitions_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *IMetaTable_ListAllAvailPartitions_Call) RunAndReturn(run func(context.Context) map[int64]map[int64][]int64) *IMetaTable_ListAllAvailPartitions_Call { + _c.Call.Return(run) + return _c +} + // ListCollectionPhysicalChannels provides a mock function with given fields: ctx func (_m *IMetaTable) ListCollectionPhysicalChannels(ctx context.Context) map[int64][]string { ret := _m.Called(ctx) diff --git a/internal/rootcoord/quota_center.go b/internal/rootcoord/quota_center.go index 0fea8e35d1..41ecfd6aea 100644 --- a/internal/rootcoord/quota_center.go +++ b/internal/rootcoord/quota_center.go @@ -33,6 +33,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus/internal/metastore/model" + "github.com/milvus-io/milvus/internal/streamingcoord/server/balancer/channel" "github.com/milvus-io/milvus/internal/tso" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/proxyutil" @@ -568,6 +569,7 @@ func (q *QuotaCenter) collectMetrics() error { } for oldQN := range oldQueryNodes { metrics.RootCoordTtDelay.DeleteLabelValues(typeutil.QueryNodeRole, strconv.FormatInt(oldQN, 10)) + metrics.RootCoordTtDelay.DeleteLabelValues(typeutil.StreamingNodeRole, strconv.FormatInt(oldQN, 10)) } return nil } @@ -969,6 +971,24 @@ func (q *QuotaCenter) getTimeTickDelayFactor(ts Timestamp) map[int64]float64 { updateCollectionDelay(delay, metric.Effect.CollectionIDs) metrics.RootCoordTtDelay.WithLabelValues(typeutil.QueryNodeRole, strconv.FormatInt(nodeID, 10)).Set(float64(delay.Milliseconds())) } + if metric.StreamingQuota != nil { + // If the query node is embedded in streaming node, + // we also need to use the wal's metrics to calculate the delay. + var maxDelay time.Duration + for _, wal := range metric.StreamingQuota.WALs { + t2, _ := tsoutil.ParseTS(wal.RecoveryTimeTick) + delay := t1.Sub(t2) + if maxDelay < delay { + maxDelay = delay + } + // Update all collections work on this pchannel. + pchannelInfo := channel.StaticPChannelStatsManager.MustGet().GetPChannelStats(wal.Channel) + updateCollectionDelay(delay, pchannelInfo.CollectionIDs()) + } + if maxDelay > 0 { + metrics.RootCoordTtDelay.WithLabelValues(typeutil.StreamingNodeRole, strconv.FormatInt(nodeID, 10)).Set(float64(maxDelay.Milliseconds())) + } + } } for nodeID, metric := range q.dataNodeMetrics { if metric.Fgm.NumFlowGraph > 0 && metric.Fgm.MinFlowGraphChannel != "" { @@ -1286,8 +1306,8 @@ func (q *QuotaCenter) resetAllCurrentRates() error { } } } - initLimiters(q.readableCollections) - initLimiters(q.writableCollections) + partitions := q.meta.ListAllAvailPartitions(q.ctx) + initLimiters(partitions) return nil } diff --git a/internal/rootcoord/quota_center_test.go b/internal/rootcoord/quota_center_test.go index 379d63aa75..645778a70f 100644 --- a/internal/rootcoord/quota_center_test.go +++ b/internal/rootcoord/quota_center_test.go @@ -289,6 +289,7 @@ func TestQuotaCenter(t *testing.T) { quotaCenter.readableCollections = map[int64]map[int64][]int64{ 0: collectionIDToPartitionIDs, } + meta.EXPECT().ListAllAvailPartitions(mock.Anything).Return(quotaCenter.readableCollections).Maybe() err := quotaCenter.resetAllCurrentRates() assert.NoError(t, err) @@ -331,6 +332,7 @@ func TestQuotaCenter(t *testing.T) { 0: collectionIDToPartitionIDs, } quotaCenter.writableCollections[0][1] = append(quotaCenter.writableCollections[0][1], 1000) + meta.EXPECT().ListAllAvailPartitions(mock.Anything).Return(quotaCenter.writableCollections).Maybe() err := quotaCenter.resetAllCurrentRates() assert.NoError(t, err) @@ -396,6 +398,7 @@ func TestQuotaCenter(t *testing.T) { 0: collectionIDToPartitionIDs, } quotaCenter.writableCollections[0][1] = append(quotaCenter.writableCollections[0][1], 1000) + meta.EXPECT().ListAllAvailPartitions(mock.Anything).Return(quotaCenter.writableCollections).Maybe() err := quotaCenter.resetAllCurrentRates() assert.NoError(t, err) @@ -449,6 +452,7 @@ func TestQuotaCenter(t *testing.T) { meta.EXPECT().GetCollectionByIDWithMaxTs(mock.Anything, mock.Anything).Return(nil, merr.ErrCollectionNotFound).Maybe() meta.EXPECT().ListDatabases(mock.Anything, mock.Anything).Return([]*model.Database{}, nil).Maybe() quotaCenter := NewQuotaCenter(pcm, dc, core.tsoAllocator, meta) + meta.EXPECT().ListAllAvailPartitions(mock.Anything).Return(quotaCenter.writableCollections).Maybe() quotaCenter.clearMetrics() err = quotaCenter.calculateRates() assert.NoError(t, err) @@ -580,6 +584,7 @@ func TestQuotaCenter(t *testing.T) { quotaCenter.writableCollections = map[int64]map[int64][]int64{ 0: collectionIDToPartitionIDs, } + meta.EXPECT().ListAllAvailPartitions(mock.Anything).Return(quotaCenter.writableCollections).Maybe() quotaCenter.collectionIDToDBID = collectionIDToDBID err = quotaCenter.resetAllCurrentRates() assert.NoError(t, err) @@ -619,6 +624,7 @@ func TestQuotaCenter(t *testing.T) { 0: {1: {}}, 1: {2: {}}, } + meta.EXPECT().ListAllAvailPartitions(mock.Anything).Return(quotaCenter.readableCollections).Maybe() quotaCenter.dbs.Insert("default", 0) quotaCenter.dbs.Insert("db1", 1) @@ -672,6 +678,7 @@ func TestQuotaCenter(t *testing.T) { 0: collectionIDToPartitionIDs, 1: {4: {}}, } + meta.EXPECT().ListAllAvailPartitions(mock.Anything).Return(quotaCenter.writableCollections).Maybe() quotaCenter.collectionIDToDBID = collectionIDToDBID quotaCenter.collectionIDToDBID = collectionIDToDBID quotaCenter.resetAllCurrentRates() @@ -786,6 +793,7 @@ func TestQuotaCenter(t *testing.T) { quotaCenter.writableCollections = map[int64]map[int64][]int64{ 0: collectionIDToPartitionIDs, } + meta.EXPECT().ListAllAvailPartitions(mock.Anything).Return(quotaCenter.writableCollections).Maybe() for _, c := range memCases { paramtable.Get().Save(Params.QuotaConfig.QueryNodeMemoryLowWaterLevel.Key, fmt.Sprintf("%f", c.lowWater)) paramtable.Get().Save(Params.QuotaConfig.QueryNodeMemoryHighWaterLevel.Key, fmt.Sprintf("%f", c.highWater)) @@ -842,6 +850,7 @@ func TestQuotaCenter(t *testing.T) { quotaCenter.writableCollections = map[int64]map[int64][]int64{ 0: collectionIDToPartitionIDs, } + meta.EXPECT().ListAllAvailPartitions(mock.Anything).Return(quotaCenter.writableCollections).Maybe() paramtable.Get().Save(Params.QuotaConfig.GrowingSegmentsSizeProtectionEnabled.Key, "true") for _, test := range tests { paramtable.Get().Save(Params.QuotaConfig.GrowingSegmentsSizeLowWaterLevel.Key, fmt.Sprintf("%f", test.low)) @@ -912,6 +921,7 @@ func TestQuotaCenter(t *testing.T) { quotaCenter.writableCollections = map[int64]map[int64][]int64{ 0: collectionIDToPartitionIDs, } + meta.EXPECT().ListAllAvailPartitions(mock.Anything).Return(quotaCenter.writableCollections).Maybe() quotaCenter.collectionIDToDBID = collectionIDToDBID quotaCenter.resetAllCurrentRates() quotaCenter.checkDiskQuota(nil) @@ -946,6 +956,7 @@ func TestQuotaCenter(t *testing.T) { quotaCenter.readableCollections = map[int64]map[int64][]int64{ 0: collectionIDToPartitionIDs, } + meta.EXPECT().ListAllAvailPartitions(mock.Anything).Return(quotaCenter.writableCollections).Maybe() quotaCenter.resetAllCurrentRates() collectionID := int64(1) limitNode := quotaCenter.rateLimiter.GetCollectionLimiters(0, collectionID) @@ -966,6 +977,7 @@ func TestQuotaCenter(t *testing.T) { quotaCenter.readableCollections = map[int64]map[int64][]int64{ 0: collectionIDToPartitionIDs, } + meta.EXPECT().ListAllAvailPartitions(mock.Anything).Return(quotaCenter.writableCollections).Maybe() quotaCenter.resetAllCurrentRates() collectionID := int64(1) limitNode := quotaCenter.rateLimiter.GetCollectionLimiters(0, collectionID) @@ -981,6 +993,7 @@ func TestQuotaCenter(t *testing.T) { quotaCenter.readableCollections = map[int64]map[int64][]int64{ 0: collectionIDToPartitionIDs, } + meta.EXPECT().ListAllAvailPartitions(mock.Anything).Return(quotaCenter.readableCollections).Maybe() quotaCenter.resetAllCurrentRates() minRate := Limit(100) collectionID := int64(1) @@ -1010,6 +1023,7 @@ func TestQuotaCenter(t *testing.T) { collection := UniqueID(0) meta := mockrootcoord.NewIMetaTable(t) meta.EXPECT().GetCollectionByIDWithMaxTs(mock.Anything, mock.Anything).Return(nil, merr.ErrCollectionNotFound).Maybe() + meta.EXPECT().ListAllAvailPartitions(mock.Anything).Return(nil).Maybe() quotaCenter := NewQuotaCenter(pcm, dc, core.tsoAllocator, meta) quotaCenter.resetAllCurrentRates() quotaBackup := Params.QuotaConfig.DiskQuota.GetValue() @@ -1039,6 +1053,7 @@ func TestQuotaCenter(t *testing.T) { quotaCenter.writableCollections = map[int64]map[int64][]int64{ 0: {1: {}}, } + meta.EXPECT().ListAllAvailPartitions(mock.Anything).Return(quotaCenter.writableCollections).Maybe() quotaCenter.collectionIDToDBID = collectionIDToDBID quotaCenter.resetAllCurrentRates() @@ -1057,6 +1072,7 @@ func TestQuotaCenter(t *testing.T) { assert.Equal(t, getRate(limiters, internalpb.RateType_DQLQuery), Params.QuotaConfig.DQLMaxQueryRatePerCollection.GetAsFloat()) meta.ExpectedCalls = nil + meta.EXPECT().ListAllAvailPartitions(mock.Anything).Return(quotaCenter.writableCollections).Maybe() meta.EXPECT().GetCollectionByIDWithMaxTs(mock.Anything, mock.Anything).Return(&model.Collection{ Properties: []*commonpb.KeyValuePair{ { @@ -1562,6 +1578,12 @@ func TestResetAllCurrentRates(t *testing.T) { 100: []int64{}, }, } + meta.EXPECT().ListAllAvailPartitions(mock.Anything).Return(map[int64]map[int64][]int64{ + 1: {}, + 2: { + 100: []int64{}, + }, + }).Maybe() err := quotaCenter.resetAllCurrentRates() assert.NoError(t, err) diff --git a/internal/streamingcoord/server/balancer/channel/pchannel_stats.go b/internal/streamingcoord/server/balancer/channel/pchannel_stats.go index 0c3433c05c..27f50056be 100644 --- a/internal/streamingcoord/server/balancer/channel/pchannel_stats.go +++ b/internal/streamingcoord/server/balancer/channel/pchannel_stats.go @@ -115,3 +115,14 @@ func (s *pchannelStats) View() PChannelStatsView { VChannels: vchannels, } } + +// CollectionIDs returns the collection ids of the pchannel. +func (s *pchannelStats) CollectionIDs() []int64 { + s.mu.Lock() + defer s.mu.Unlock() + collectionIDs := make([]int64, 0, len(s.vchannels)) + for _, v := range s.vchannels { + collectionIDs = append(collectionIDs, v) + } + return collectionIDs +} diff --git a/internal/streamingnode/client/handler/handler_client.go b/internal/streamingnode/client/handler/handler_client.go index 00384d5ea6..de6fbc8847 100644 --- a/internal/streamingnode/client/handler/handler_client.go +++ b/internal/streamingnode/client/handler/handler_client.go @@ -70,6 +70,10 @@ type HandlerClient interface { // If the wal is located at remote, it will return 0, error. GetLatestMVCCTimestampIfLocal(ctx context.Context, vchannel string) (uint64, error) + // GetWALMetricsIfLocal gets the metrics of the local wal. + // It will only return the metrics of the local wal but not the remote wal. + GetWALMetricsIfLocal(ctx context.Context) (*types.StreamingNodeMetrics, error) + // CreateProducer creates a producer. // Producer is a stream client without keep alive promise. // It will be available until context canceled, active close, streaming error or remote server wal closing. diff --git a/internal/streamingnode/client/handler/handler_client_impl.go b/internal/streamingnode/client/handler/handler_client_impl.go index 4a06bc4752..2f229c7fc6 100644 --- a/internal/streamingnode/client/handler/handler_client_impl.go +++ b/internal/streamingnode/client/handler/handler_client_impl.go @@ -65,6 +65,16 @@ func (hc *handlerClientImpl) GetLatestMVCCTimestampIfLocal(ctx context.Context, return w.GetLatestMVCCTimestamp(ctx, vchannel) } +// GetWALMetricsIfLocal gets the metrics of the local wal. +func (hc *handlerClientImpl) GetWALMetricsIfLocal(ctx context.Context) (*types.StreamingNodeMetrics, error) { + if !hc.lifetime.Add(typeutil.LifetimeStateWorking) { + return nil, ErrClientClosed + } + defer hc.lifetime.Done() + + return registry.GetLocalWALMetrics() +} + // CreateProducer creates a producer. func (hc *handlerClientImpl) CreateProducer(ctx context.Context, opts *ProducerOptions) (Producer, error) { if !hc.lifetime.Add(typeutil.LifetimeStateWorking) { diff --git a/internal/streamingnode/client/handler/registry/wal_manager.go b/internal/streamingnode/client/handler/registry/wal_manager.go index 907b63c192..59f4472dd0 100644 --- a/internal/streamingnode/client/handler/registry/wal_manager.go +++ b/internal/streamingnode/client/handler/registry/wal_manager.go @@ -42,11 +42,22 @@ func GetLocalAvailableWAL(channel types.PChannelInfo) (wal.WAL, error) { // so make a copy before appending for local wal to keep the consistency. } +// GetLocalWALMetrics returns all the metrics of current wal manager. +func GetLocalWALMetrics() (*types.StreamingNodeMetrics, error) { + if !paramtable.IsLocalComponentEnabled(typeutil.StreamingNodeRole) { + return nil, ErrNoStreamingNodeDeployed + } + return registry.Get().Metrics() +} + // WALManager is a hint type for wal manager at streaming node. type WALManager interface { // GetAvailableWAL returns a available wal instance for the channel. // Return nil if the wal instance is not found. GetAvailableWAL(channel types.PChannelInfo) (wal.WAL, error) + + // Metrics return all the metrics of current wal manager. + Metrics() (*types.StreamingNodeMetrics, error) } // localWAL is a hint type for local wal. diff --git a/internal/streamingnode/client/handler/registry/wal_manager_test.go b/internal/streamingnode/client/handler/registry/wal_manager_test.go index 87384f9f82..88f21c897c 100644 --- a/internal/streamingnode/client/handler/registry/wal_manager_test.go +++ b/internal/streamingnode/client/handler/registry/wal_manager_test.go @@ -20,6 +20,10 @@ type mockWALManager struct { t *testing.T } +func (m *mockWALManager) Metrics() (*types.StreamingNodeMetrics, error) { + return &types.StreamingNodeMetrics{}, nil +} + func (m *mockWALManager) GetAvailableWAL(channel types.PChannelInfo) (wal.WAL, error) { l := mock_wal.NewMockWAL(m.t) l.EXPECT().Append(mock.Anything, mock.Anything).Return(&types.AppendResult{}, nil) diff --git a/internal/streamingnode/server/flusher/flusherimpl/wal_flusher.go b/internal/streamingnode/server/flusher/flusherimpl/wal_flusher.go index 1362e0ec4e..9aa7275bf9 100644 --- a/internal/streamingnode/server/flusher/flusherimpl/wal_flusher.go +++ b/internal/streamingnode/server/flusher/flusherimpl/wal_flusher.go @@ -40,8 +40,8 @@ func RecoverWALFlusher(param *RecoverWALFlusherParam) *WALFlusherImpl { logger: resource.Resource().Logger().With( log.FieldComponent("flusher"), zap.String("pchannel", param.ChannelInfo.String())), - metrics: newFlusherMetrics(param.ChannelInfo), - rs: param.RecoveryStorage, + metrics: newFlusherMetrics(param.ChannelInfo), + RecoveryStorage: param.RecoveryStorage, } go flusher.Execute(param.RecoverySnapshot) return flusher @@ -53,7 +53,7 @@ type WALFlusherImpl struct { flusherComponents *flusherComponents logger *log.MLogger metrics *flusherMetrics - rs recovery.RecoveryStorage + recovery.RecoveryStorage } // Execute starts the wal flusher. @@ -119,7 +119,7 @@ func (impl *WALFlusherImpl) Close() { impl.notifier.BlockUntilFinish() impl.logger.Info("wal flusher start to close the recovery storage...") - impl.rs.Close() + impl.RecoveryStorage.Close() impl.logger.Info("recovery storage closed") impl.metrics.Close() @@ -152,7 +152,7 @@ func (impl *WALFlusherImpl) buildFlusherComponents(ctx context.Context, l wal.WA cpUpdater := util.NewChannelCheckpointUpdaterWithCallback(broker, func(mp *msgpb.MsgPosition) { messageID := adaptor.MustGetMessageIDFromMQWrapperIDBytes(l.WALName(), mp.MsgID) - impl.rs.UpdateFlusherCheckpoint(&recovery.WALCheckpoint{ + impl.RecoveryStorage.UpdateFlusherCheckpoint(&recovery.WALCheckpoint{ MessageID: messageID, TimeTick: mp.Timestamp, Magic: recovery.RecoveryMagicStreamingInitialized, @@ -202,7 +202,7 @@ func (impl *WALFlusherImpl) dispatch(msg message.ImmutableMessage) (err error) { // TODO: We will merge the flusher into recovery storage in future. // Currently, flusher works as a separate component. defer func() { - if err = impl.rs.ObserveMessage(impl.notifier.Context(), msg); err != nil { + if err = impl.RecoveryStorage.ObserveMessage(impl.notifier.Context(), msg); err != nil { impl.logger.Warn("failed to observe message", zap.Error(err)) } }() diff --git a/internal/streamingnode/server/wal/adaptor/ro_wal_adaptor.go b/internal/streamingnode/server/wal/adaptor/ro_wal_adaptor.go index bebc341b04..8b7ff99f53 100644 --- a/internal/streamingnode/server/wal/adaptor/ro_wal_adaptor.go +++ b/internal/streamingnode/server/wal/adaptor/ro_wal_adaptor.go @@ -39,6 +39,13 @@ func (w *roWALAdaptorImpl) Channel() types.PChannelInfo { return w.roWALImpls.Channel() } +// Metrics returns the metrics of the wal. +func (w *roWALAdaptorImpl) Metrics() types.WALMetrics { + return types.ROWALMetrics{ + ChannelInfo: w.Channel(), + } +} + func (w *roWALAdaptorImpl) GetLatestMVCCTimestamp(ctx context.Context, vchannel string) (uint64, error) { panic("we cannot acquire lastest mvcc timestamp from a read only wal") } diff --git a/internal/streamingnode/server/wal/adaptor/wal_adaptor.go b/internal/streamingnode/server/wal/adaptor/wal_adaptor.go index 276e79dbd5..3c68501175 100644 --- a/internal/streamingnode/server/wal/adaptor/wal_adaptor.go +++ b/internal/streamingnode/server/wal/adaptor/wal_adaptor.go @@ -96,6 +96,17 @@ type walAdaptorImpl struct { isFenced *atomic.Bool } +// Metrics returns the metrics of the wal. +func (w *walAdaptorImpl) Metrics() types.WALMetrics { + currentMVCC := w.param.MVCCManager.GetMVCCOfVChannel(w.Channel().Name) + recoveryMetrics := w.flusher.Metrics() + return types.RWWALMetrics{ + ChannelInfo: w.Channel(), + MVCCTimeTick: currentMVCC.Timetick, + RecoveryTimeTick: recoveryMetrics.RecoveryTimeTick, + } +} + // GetLatestMVCCTimestamp get the latest mvcc timestamp of the wal at vchannel. func (w *walAdaptorImpl) GetLatestMVCCTimestamp(ctx context.Context, vchannel string) (uint64, error) { if !w.lifetime.Add(typeutil.LifetimeStateWorking) { diff --git a/internal/streamingnode/server/wal/adaptor/wal_test.go b/internal/streamingnode/server/wal/adaptor/wal_test.go index bc7141b371..7460c3bb5f 100644 --- a/internal/streamingnode/server/wal/adaptor/wal_test.go +++ b/internal/streamingnode/server/wal/adaptor/wal_test.go @@ -146,6 +146,8 @@ func (f *testOneWALFramework) Run() { assert.NoError(f.t, err) assert.NotNil(f.t, rwWAL) assert.Equal(f.t, pChannel.Name, rwWAL.Channel().Name) + // TODO: add test here after remove the flusher component. + // metrics := rwWAL.Metrics() pChannel.AccessMode = types.AccessModeRO roWAL, err := f.opener.Open(ctx, &wal.OpenOption{ @@ -153,6 +155,8 @@ func (f *testOneWALFramework) Run() { DisableFlusher: true, }) assert.NoError(f.t, err) + metrics := roWAL.Metrics() + _ = metrics.(types.ROWALMetrics) f.testReadAndWrite(ctx, rwWAL, roWAL) // close the wal roWAL.Close() diff --git a/internal/streamingnode/server/wal/recovery/recovery_storage.go b/internal/streamingnode/server/wal/recovery/recovery_storage.go index a0ef91a6cf..2967424208 100644 --- a/internal/streamingnode/server/wal/recovery/recovery_storage.go +++ b/internal/streamingnode/server/wal/recovery/recovery_storage.go @@ -23,6 +23,11 @@ type BuildRecoveryStreamParam struct { EndTimeTick uint64 } +// RecoveryMetrics is the metrics of the recovery info. +type RecoveryMetrics struct { + RecoveryTimeTick uint64 +} + // RecoveryStreamBuilder is an interface that is used to build a recovery stream from the WAL. type RecoveryStreamBuilder interface { // WALName returns the name of the WAL. @@ -60,6 +65,9 @@ type RecoveryStream interface { // RecoveryStorage is an interface that is used to observe the messages from the WAL. type RecoveryStorage interface { + // Metrics gets the metrics of the recovery storage. + Metrics() RecoveryMetrics + // ObserveMessage observes the message from the WAL. ObserveMessage(ctx context.Context, msg message.ImmutableMessage) error diff --git a/internal/streamingnode/server/wal/recovery/recovery_storage_impl.go b/internal/streamingnode/server/wal/recovery/recovery_storage_impl.go index 3ffc56181c..770024e8e3 100644 --- a/internal/streamingnode/server/wal/recovery/recovery_storage_impl.go +++ b/internal/streamingnode/server/wal/recovery/recovery_storage_impl.go @@ -95,6 +95,21 @@ type recoveryStorageImpl struct { pendingPersistSnapshot *RecoverySnapshot } +// Metrics gets the metrics of the wal. +func (r *recoveryStorageImpl) Metrics() RecoveryMetrics { + r.mu.Lock() + defer r.mu.Unlock() + + // TODO: flusher will be merged into recovery storage, so this is a temporary solution. + recoveryTimeTick := r.checkpoint.TimeTick + if r.flusherCheckpoint != nil { + recoveryTimeTick = r.flusherCheckpoint.TimeTick + } + return RecoveryMetrics{ + RecoveryTimeTick: recoveryTimeTick, + } +} + // UpdateFlusherCheckpoint updates the checkpoint of flusher. // TODO: should be removed in future, after merge the flusher logic into recovery storage. func (r *recoveryStorageImpl) UpdateFlusherCheckpoint(checkpoint *WALCheckpoint) { diff --git a/internal/streamingnode/server/wal/recovery/recovery_storage_test.go b/internal/streamingnode/server/wal/recovery/recovery_storage_test.go index 31032ce8e7..5447b873bc 100644 --- a/internal/streamingnode/server/wal/recovery/recovery_storage_test.go +++ b/internal/streamingnode/server/wal/recovery/recovery_storage_test.go @@ -131,6 +131,7 @@ func TestRecoveryStorage(t *testing.T) { msgs := b.generateStreamMessage() for _, msg := range msgs { rs.ObserveMessage(context.Background(), msg) + assert.NotNil(t, rs.Metrics()) } rs.Close() var partitionNum int diff --git a/internal/streamingnode/server/wal/wal.go b/internal/streamingnode/server/wal/wal.go index 171e4bdce7..a55ef828e8 100644 --- a/internal/streamingnode/server/wal/wal.go +++ b/internal/streamingnode/server/wal/wal.go @@ -31,6 +31,9 @@ type ROWAL interface { // WALName returns the name of the wal. WALName() string + // Metrics returns the metrics of the wal. + Metrics() types.WALMetrics + // Channel returns the channel assignment info of the wal. Channel() types.PChannelInfo diff --git a/internal/streamingnode/server/walmanager/manager.go b/internal/streamingnode/server/walmanager/manager.go index f30d00dbea..c5d134b7b1 100644 --- a/internal/streamingnode/server/walmanager/manager.go +++ b/internal/streamingnode/server/walmanager/manager.go @@ -20,8 +20,8 @@ type Manager interface { // Return nil if the wal instance is not found. GetAvailableWAL(channel types.PChannelInfo) (wal.WAL, error) - // GetAllAvailableWALInfo returns all available channel info. - GetAllAvailableChannels() ([]types.PChannelInfo, error) + // Metrics return all the metrics of current wal manager. + Metrics() (*types.StreamingNodeMetrics, error) // Remove removes the wal instance for the channel. // Return `IgnoreOperation` error if the channel is not found. diff --git a/internal/streamingnode/server/walmanager/manager_impl.go b/internal/streamingnode/server/walmanager/manager_impl.go index 3c2f6b5f00..b87613bf9f 100644 --- a/internal/streamingnode/server/walmanager/manager_impl.go +++ b/internal/streamingnode/server/walmanager/manager_impl.go @@ -115,6 +115,24 @@ func (m *managerImpl) GetAvailableWAL(channel types.PChannelInfo) (wal.WAL, erro return nopCloseWAL{l}, nil } +func (m *managerImpl) Metrics() (*types.StreamingNodeMetrics, error) { + if !m.lifetime.AddIf(isGetable) { + return nil, errWALManagerClosed + } + defer m.lifetime.Done() + + metrics := make(map[types.ChannelID]types.WALMetrics) + m.wltMap.Range(func(channel string, lt *walLifetime) bool { + if l := lt.GetWAL(); l != nil { + metrics[l.Channel().ChannelID()] = l.Metrics() + } + return true + }) + return &types.StreamingNodeMetrics{ + WALMetrics: metrics, + }, nil +} + // GetAllAvailableChannels returns all available channel info. func (m *managerImpl) GetAllAvailableChannels() ([]types.PChannelInfo, error) { // reject operation if manager is closing. diff --git a/internal/streamingnode/server/walmanager/manager_impl_test.go b/internal/streamingnode/server/walmanager/manager_impl_test.go index a8a1ea7668..a7b4a990a6 100644 --- a/internal/streamingnode/server/walmanager/manager_impl_test.go +++ b/internal/streamingnode/server/walmanager/manager_impl_test.go @@ -37,6 +37,7 @@ func TestManager(t *testing.T) { opener.EXPECT().Open(mock.Anything, mock.Anything).RunAndReturn( func(ctx context.Context, oo *wal.OpenOption) (wal.WAL, error) { l := mock_wal.NewMockWAL(t) + l.EXPECT().Metrics().Return(types.RWWALMetrics{}).Maybe() l.EXPECT().Channel().Return(oo.Channel) l.EXPECT().IsAvailable().Return(true).Maybe() l.EXPECT().Close().Return() @@ -52,9 +53,9 @@ func TestManager(t *testing.T) { assertErrorChannelNotExist(t, err) assert.Nil(t, l) - h, err := m.GetAllAvailableChannels() + h, err := m.Metrics() assert.NoError(t, err) - assert.Len(t, h, 0) + assert.Len(t, h.WALMetrics, 0) err = m.Remove(context.Background(), types.PChannelInfo{Name: channelName, Term: 1}) assert.NoError(t, err) @@ -86,9 +87,9 @@ func TestManager(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, l) - h, err = m.GetAllAvailableChannels() + h, err = m.Metrics() assert.NoError(t, err) - assert.Len(t, h, 1) + assert.Len(t, h.WALMetrics, 1) err = m.Open(context.Background(), types.PChannelInfo{ Name: "term2", @@ -96,15 +97,15 @@ func TestManager(t *testing.T) { }) assert.NoError(t, err) - h, err = m.GetAllAvailableChannels() + h, err = m.Metrics() assert.NoError(t, err) - assert.Len(t, h, 2) + assert.Len(t, h.WALMetrics, 2) m.Close() - h, err = m.GetAllAvailableChannels() + h, err = m.Metrics() assertShutdownError(t, err) - assert.Len(t, h, 0) + assert.Nil(t, h) err = m.Open(context.Background(), types.PChannelInfo{ Name: "term2", diff --git a/pkg/streaming/util/types/pchannel_info.go b/pkg/streaming/util/types/pchannel_info.go index 37700fce6d..4071c99e53 100644 --- a/pkg/streaming/util/types/pchannel_info.go +++ b/pkg/streaming/util/types/pchannel_info.go @@ -63,6 +63,10 @@ type ChannelID struct { // TODO: add replica id in future. } +func (id ChannelID) IsZero() bool { + return id.Name == "" +} + func (id ChannelID) String() string { return id.Name } diff --git a/pkg/streaming/util/types/pchannel_info_test.go b/pkg/streaming/util/types/pchannel_info_test.go index 86e3f58ac1..990922a5f0 100644 --- a/pkg/streaming/util/types/pchannel_info_test.go +++ b/pkg/streaming/util/types/pchannel_info_test.go @@ -10,6 +10,8 @@ import ( func TestPChannelInfo(t *testing.T) { info := PChannelInfo{Name: "pchannel", Term: 1, AccessMode: AccessModeRO} + assert.False(t, info.ChannelID().IsZero()) + assert.True(t, ChannelID{}.IsZero()) pbInfo := NewProtoFromPChannelInfo(info) info2 := NewPChannelInfoFromProto(pbInfo) diff --git a/pkg/streaming/util/types/wal_metrics.go b/pkg/streaming/util/types/wal_metrics.go new file mode 100644 index 0000000000..0f12496b0f --- /dev/null +++ b/pkg/streaming/util/types/wal_metrics.go @@ -0,0 +1,46 @@ +package types + +import ( + "time" + + "github.com/milvus-io/milvus/pkg/v2/util/tsoutil" +) + +var ( + _ WALMetrics = RWWALMetrics{} + _ WALMetrics = ROWALMetrics{} +) + +type WALMetrics interface { + isWALMetrics() +} + +// RWWALMetrics represents the WAL metrics for a read-write channel. +type RWWALMetrics struct { + ChannelInfo PChannelInfo + MVCCTimeTick uint64 + RecoveryTimeTick uint64 +} + +// RecoveryLag returns the duration between the MVCCTimeTick and the RecoveryTimeTick. +func (m RWWALMetrics) RecoveryLag() time.Duration { + duration := tsoutil.PhysicalTime(m.MVCCTimeTick).Sub(tsoutil.PhysicalTime(m.RecoveryTimeTick)) + if duration <= 0 { + return 0 + } + return duration +} + +func (RWWALMetrics) isWALMetrics() {} + +// ROWALMetrics represents the WAL metrics for a read-only channel. +type ROWALMetrics struct { + ChannelInfo PChannelInfo +} + +func (ROWALMetrics) isWALMetrics() {} + +// StreamingNodeMetrics represents the metrics of the streaming node. +type StreamingNodeMetrics struct { + WALMetrics map[ChannelID]WALMetrics +} diff --git a/pkg/streaming/util/types/wal_metrics_test.go b/pkg/streaming/util/types/wal_metrics_test.go new file mode 100644 index 0000000000..ffe10df046 --- /dev/null +++ b/pkg/streaming/util/types/wal_metrics_test.go @@ -0,0 +1,24 @@ +package types + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/pkg/v2/util/tsoutil" +) + +func TestWALMetrics(t *testing.T) { + now := time.Now() + rw := RWWALMetrics{ + ChannelInfo: PChannelInfo{ + Name: "ch1", + }, + MVCCTimeTick: tsoutil.ComposeTSByTime(now, 0), + RecoveryTimeTick: tsoutil.ComposeTSByTime(now.Add(-time.Second), 0), + } + assert.Equal(t, time.Second, rw.RecoveryLag()) + rw.MVCCTimeTick = tsoutil.ComposeTSByTime(now.Add(-2*time.Second), 0) + assert.Zero(t, rw.RecoveryLag()) +} diff --git a/pkg/util/metricsinfo/quota_metric.go b/pkg/util/metricsinfo/quota_metric.go index 85883d93ff..a36d8f171a 100644 --- a/pkg/util/metricsinfo/quota_metric.go +++ b/pkg/util/metricsinfo/quota_metric.go @@ -17,6 +17,7 @@ package metricsinfo import ( + "github.com/milvus-io/milvus/pkg/v2/streaming/util/types" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) @@ -63,6 +64,19 @@ type QueryNodeQuotaMetrics struct { GrowingSegmentsSize int64 Effect NodeEffect DeleteBufferInfo DeleteBufferInfo + StreamingQuota *StreamingQuotaMetrics +} + +// StreamingQuotaMetrics contains the metrics of streaming node. +// Only used in queryNode embedded in streaming node. +type StreamingQuotaMetrics struct { + WALs []WALMetrics +} + +// WALMetrics contains the metrics of wal. +type WALMetrics struct { + Channel types.ChannelID // ChannelID is the unique identifier of a pchannel. + RecoveryTimeTick typeutil.Timestamp // current recovery } type DeleteBufferInfo struct { diff --git a/pkg/util/paramtable/quota_param.go b/pkg/util/paramtable/quota_param.go index 98bbbf1a81..9456382a1e 100644 --- a/pkg/util/paramtable/quota_param.go +++ b/pkg/util/paramtable/quota_param.go @@ -1684,7 +1684,7 @@ specific conditions, such as memory of nodes to water marker), ` + "true" + ` me p.TtProtectionEnabled = ParamItem{ Key: "quotaAndLimits.limitWriting.ttProtection.enabled", Version: "2.2.0", - DefaultValue: "false", + DefaultValue: "true", Export: true, } p.TtProtectionEnabled.Init(base.mgr) diff --git a/pkg/util/paramtable/quota_param_test.go b/pkg/util/paramtable/quota_param_test.go index 759163eddf..0e555b777a 100644 --- a/pkg/util/paramtable/quota_param_test.go +++ b/pkg/util/paramtable/quota_param_test.go @@ -206,7 +206,7 @@ func TestQuotaParam(t *testing.T) { t.Run("test limit writing", func(t *testing.T) { assert.False(t, qc.ForceDenyWriting.GetAsBool()) - assert.Equal(t, false, qc.TtProtectionEnabled.GetAsBool()) + assert.Equal(t, true, qc.TtProtectionEnabled.GetAsBool()) assert.Equal(t, 300, qc.MaxTimeTickDelay.GetAsInt()) assert.Equal(t, defaultLowWaterLevel, qc.DataNodeMemoryLowWaterLevel.GetAsFloat()) assert.Equal(t, defaultHighWaterLevel, qc.DataNodeMemoryHighWaterLevel.GetAsFloat())