enhance: refactor metrics of streaming (#40031)

issue: #38399

- add metrics for broadcaster component.
- add metrics for wal flusher component.
- add metrics for wal interceptors.
- add slow log for wal.
- add more label for some wal metrics. (local or remote/catcup or
tailing...)

Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
Zhen Ye 2025-02-25 12:25:56 +08:00 committed by GitHub
parent ce480c3f57
commit 84df80b5e4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
46 changed files with 1413 additions and 351 deletions

View File

@ -49,6 +49,7 @@ packages:
interfaces: interfaces:
Interceptor: Interceptor:
InterceptorWithReady: InterceptorWithReady:
InterceptorWithMetrics:
InterceptorBuilder: InterceptorBuilder:
github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/inspector: github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/inspector:
interfaces: interfaces:

View File

@ -53,7 +53,7 @@ type resumableConsumerImpl struct {
mh *timeTickOrderMessageHandler mh *timeTickOrderMessageHandler
factory factory factory factory
consumeErr *syncutil.Future[error] consumeErr *syncutil.Future[error]
metrics *consumerMetrics metrics *resumingConsumerMetrics
} }
type factory = func(ctx context.Context, opts *handler.ConsumerOptions) (consumer.Consumer, error) type factory = func(ctx context.Context, opts *handler.ConsumerOptions) (consumer.Consumer, error)
@ -146,7 +146,7 @@ func (rc *resumableConsumerImpl) createNewConsumer(opts *handler.ConsumerOptions
} }
logger.Info("resume on new consumer at new start message id") logger.Info("resume on new consumer at new start message id")
return consumer, nil return newConsumerWithMetrics(rc.opts.PChannel, consumer), nil
} }
} }

View File

@ -3,77 +3,104 @@ package consumer
import ( import (
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/milvus-io/milvus/internal/streamingnode/client/handler/consumer"
"github.com/milvus-io/milvus/internal/streamingnode/client/handler/registry"
"github.com/milvus-io/milvus/pkg/v2/metrics" "github.com/milvus-io/milvus/pkg/v2/metrics"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/paramtable"
) )
// newConsumerMetrics creates a new producer metrics. // newConsumerMetrics creates a new producer metrics.
func newConsumerMetrics(pchannel string) *consumerMetrics { func newConsumerMetrics(pchannel string) *resumingConsumerMetrics {
constLabel := prometheus.Labels{ constLabel := prometheus.Labels{
metrics.NodeIDLabelName: paramtable.GetStringNodeID(), metrics.NodeIDLabelName: paramtable.GetStringNodeID(),
metrics.WALChannelLabelName: pchannel, metrics.WALChannelLabelName: pchannel,
} }
m := &consumerMetrics{ m := &resumingConsumerMetrics{
available: false, available: false,
clientTotal: metrics.StreamingServiceClientConsumerTotal.MustCurryWith(constLabel), resumingClientTotal: metrics.StreamingServiceClientResumingConsumerTotal.MustCurryWith(constLabel),
inflightTotal: metrics.StreamingServiceClientConsumeInflightTotal.With(constLabel),
bytes: metrics.StreamingServiceClientConsumeBytes.With(constLabel), bytes: metrics.StreamingServiceClientConsumeBytes.With(constLabel),
} }
m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusUnavailable).Inc() m.resumingClientTotal.WithLabelValues(metrics.StreamingServiceClientStatusUnavailable).Inc()
return m return m
} }
// consumerMetrics is the metrics for producer. // resumingConsumerMetrics is the metrics for producer.
type consumerMetrics struct { type resumingConsumerMetrics struct {
available bool available bool
clientTotal *prometheus.GaugeVec resumingClientTotal *prometheus.GaugeVec
inflightTotal prometheus.Gauge
bytes prometheus.Observer bytes prometheus.Observer
} }
// IntoUnavailable sets the producer metrics to unavailable. // IntoUnavailable sets the producer metrics to unavailable.
func (m *consumerMetrics) IntoUnavailable() { func (m *resumingConsumerMetrics) IntoUnavailable() {
if !m.available { if !m.available {
return return
} }
m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusUnavailable).Inc() m.resumingClientTotal.WithLabelValues(metrics.StreamingServiceClientStatusUnavailable).Inc()
m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusAvailable).Dec() m.resumingClientTotal.WithLabelValues(metrics.StreamingServiceClientStatusAvailable).Dec()
m.available = false m.available = false
} }
// IntoAvailable sets the producer metrics to available. // IntoAvailable sets the producer metrics to available.
func (m *consumerMetrics) IntoAvailable() { func (m *resumingConsumerMetrics) IntoAvailable() {
if m.available { if m.available {
return return
} }
m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusAvailable).Inc() m.resumingClientTotal.WithLabelValues(metrics.StreamingServiceClientStatusUnavailable).Dec()
m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusUnavailable).Dec() m.resumingClientTotal.WithLabelValues(metrics.StreamingServiceClientStatusAvailable).Inc()
m.available = true m.available = true
} }
// StartConsume starts a consume operation. // StartConsume starts a consume operation.
func (m *consumerMetrics) StartConsume(bytes int) consumerMetricsGuard { func (m *resumingConsumerMetrics) StartConsume(bytes int) consumerMetricsGuard {
m.inflightTotal.Inc()
return consumerMetricsGuard{ return consumerMetricsGuard{
metrics: m, metrics: m,
bytes: bytes, bytes: bytes,
} }
} }
func (m *consumerMetrics) Close() { func (m *resumingConsumerMetrics) Close() {
if m.available { if m.available {
m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusAvailable).Dec() m.resumingClientTotal.WithLabelValues(metrics.StreamingServiceClientStatusAvailable).Dec()
} else { } else {
m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusUnavailable).Dec() m.resumingClientTotal.WithLabelValues(metrics.StreamingServiceClientStatusUnavailable).Dec()
} }
} }
type consumerMetricsGuard struct { type consumerMetricsGuard struct {
metrics *consumerMetrics metrics *resumingConsumerMetrics
bytes int bytes int
} }
func (g consumerMetricsGuard) Finish() { func (g consumerMetricsGuard) Finish() {
g.metrics.inflightTotal.Dec()
g.metrics.bytes.Observe(float64(g.bytes)) g.metrics.bytes.Observe(float64(g.bytes))
} }
func newConsumerWithMetrics(pchannel string, c consumer.Consumer) consumer.Consumer {
accessModel := metrics.WALAccessModelRemote
if registry.IsLocal(c) {
accessModel = metrics.WALAccessModelLocal
}
constLabel := prometheus.Labels{
metrics.NodeIDLabelName: paramtable.GetStringNodeID(),
metrics.WALChannelLabelName: pchannel,
metrics.WALAccessModelLabelName: accessModel,
}
cm := &consumerWithMetrics{
Consumer: c,
clientTotal: metrics.StreamingServiceClientConsumerTotal.With(constLabel),
}
cm.clientTotal.Inc()
return cm
}
type consumerWithMetrics struct {
consumer.Consumer
clientTotal prometheus.Gauge
}
func (c *consumerWithMetrics) Close() error {
err := c.Consumer.Close()
c.clientTotal.Dec()
return err
}

View File

@ -1,64 +1,101 @@
package producer package producer
import ( import (
"context"
"time" "time"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/milvus-io/milvus/internal/streamingnode/client/handler"
"github.com/milvus-io/milvus/internal/streamingnode/client/handler/registry"
"github.com/milvus-io/milvus/internal/util/streamingutil/status" "github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/pkg/v2/metrics" "github.com/milvus-io/milvus/pkg/v2/metrics"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/paramtable"
) )
// newProducerMetrics creates a new producer metrics. // newResumingProducerMetrics creates a new producer metrics.
func newProducerMetrics(pchannel string) *producerMetrics { func newResumingProducerMetrics(pchannel string) *resumingProducerMetrics {
constLabel := prometheus.Labels{ constLabel := prometheus.Labels{
metrics.NodeIDLabelName: paramtable.GetStringNodeID(), metrics.NodeIDLabelName: paramtable.GetStringNodeID(),
metrics.WALChannelLabelName: pchannel, metrics.WALChannelLabelName: pchannel,
} }
m := &producerMetrics{ m := &resumingProducerMetrics{
available: false, available: false,
clientTotal: metrics.StreamingServiceClientProducerTotal.MustCurryWith(constLabel), clientTotal: metrics.StreamingServiceClientResumingProducerTotal.MustCurryWith(constLabel),
inflightTotal: metrics.StreamingServiceClientProduceInflightTotal.With(constLabel),
bytes: metrics.StreamingServiceClientProduceBytes.MustCurryWith(constLabel),
durationSeconds: metrics.StreamingServiceClientProduceDurationSeconds.MustCurryWith(constLabel),
} }
m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusUnavailable).Inc() m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusUnavailable).Inc()
return m return m
} }
// producerMetrics is the metrics for producer. // resumingProducerMetrics is the metrics for producer.
type producerMetrics struct { type resumingProducerMetrics struct {
available bool available bool
clientTotal *prometheus.GaugeVec clientTotal *prometheus.GaugeVec
inflightTotal prometheus.Gauge
bytes prometheus.ObserverVec
durationSeconds prometheus.ObserverVec
} }
// IntoUnavailable sets the producer metrics to unavailable. // IntoUnavailable sets the producer metrics to unavailable.
func (m *producerMetrics) IntoUnavailable() { func (m *resumingProducerMetrics) IntoUnavailable() {
if !m.available { if !m.available {
return return
} }
m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusUnavailable).Inc()
m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusAvailable).Dec() m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusAvailable).Dec()
m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusUnavailable).Inc()
m.available = false m.available = false
} }
// IntoAvailable sets the producer metrics to available. // IntoAvailable sets the producer metrics to available.
func (m *producerMetrics) IntoAvailable() { func (m *resumingProducerMetrics) IntoAvailable() {
if m.available { if m.available {
return return
} }
m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusAvailable).Inc()
m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusUnavailable).Dec() m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusUnavailable).Dec()
m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusAvailable).Inc()
m.available = true m.available = true
} }
// Close closes the producer metrics.
func (m *resumingProducerMetrics) Close() {
if m.available {
m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusAvailable).Dec()
} else {
m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusUnavailable).Dec()
}
}
func newProducerMetrics(pchannel string, isLocal bool) *producerMetrics {
accessModel := metrics.WALAccessModelRemote
if isLocal {
accessModel = metrics.WALAccessModelLocal
}
constLabel := prometheus.Labels{
metrics.NodeIDLabelName: paramtable.GetStringNodeID(),
metrics.WALChannelLabelName: pchannel,
metrics.WALAccessModelLabelName: accessModel,
}
m := &producerMetrics{
producerTotal: metrics.StreamingServiceClientProducerTotal.With(constLabel),
produceTotal: metrics.StreamingServiceClientProduceTotal.MustCurryWith(constLabel),
produceBytes: metrics.StreamingServiceClientProduceBytes.MustCurryWith(constLabel),
produceSuccessBytes: metrics.StreamingServiceClientSuccessProduceBytes.With(constLabel),
produceSuccessDurationSeconds: metrics.StreamingServiceClientSuccessProduceDurationSeconds.With(constLabel),
}
m.producerTotal.Inc()
return m
}
// producerMetrics is the metrics for producer.
type producerMetrics struct {
producerTotal prometheus.Gauge
produceTotal *prometheus.CounterVec
produceBytes *prometheus.CounterVec
produceSuccessBytes prometheus.Observer
produceSuccessDurationSeconds prometheus.Observer
}
// StartProduce starts the produce metrics. // StartProduce starts the produce metrics.
func (m *producerMetrics) StartProduce(bytes int) produceMetricsGuard { func (m *producerMetrics) StartProduce(bytes int) produceMetricsGuard {
m.inflightTotal.Inc()
return produceMetricsGuard{ return produceMetricsGuard{
start: time.Now(), start: time.Now(),
bytes: bytes, bytes: bytes,
@ -67,11 +104,7 @@ func (m *producerMetrics) StartProduce(bytes int) produceMetricsGuard {
} }
func (m *producerMetrics) Close() { func (m *producerMetrics) Close() {
if m.available { m.producerTotal.Dec()
m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusAvailable).Dec()
} else {
m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusUnavailable).Dec()
}
} }
// produceMetricsGuard is the guard for produce metrics. // produceMetricsGuard is the guard for produce metrics.
@ -84,18 +117,48 @@ type produceMetricsGuard struct {
// Finish finishes the produce metrics. // Finish finishes the produce metrics.
func (g produceMetricsGuard) Finish(err error) { func (g produceMetricsGuard) Finish(err error) {
status := parseError(err) status := parseError(err)
g.metrics.bytes.WithLabelValues(status).Observe(float64(g.bytes)) g.metrics.produceTotal.WithLabelValues(status).Inc()
g.metrics.durationSeconds.WithLabelValues(status).Observe(time.Since(g.start).Seconds()) g.metrics.produceBytes.WithLabelValues(status).Add(float64(g.bytes))
g.metrics.inflightTotal.Dec() g.metrics.produceSuccessBytes.Observe(float64(g.bytes))
g.metrics.produceSuccessDurationSeconds.Observe(time.Since(g.start).Seconds())
} }
// parseError parses the error to status. // parseError parses the error to status.
func parseError(err error) string { func parseError(err error) string {
if err == nil { if err == nil {
return metrics.StreamingServiceClientStatusOK return metrics.WALStatusOK
} }
if status.IsCanceled(err) { if status.IsCanceled(err) {
return metrics.StreamingServiceClientStatusCancel return metrics.WALStatusCancel
} }
return metrics.StreamignServiceClientStatusError return metrics.WALStatusError
}
func newProducerWithMetrics(channel string, p handler.Producer) handler.Producer {
if p == nil {
return nil
}
return producerWithMetrics{
Producer: p,
metrics: newProducerMetrics(channel, registry.IsLocal(p)),
}
}
type producerWithMetrics struct {
handler.Producer
metrics *producerMetrics
}
func (pm producerWithMetrics) Append(ctx context.Context, msg message.MutableMessage) (result *types.AppendResult, err error) {
g := pm.metrics.StartProduce(msg.EstimateSize())
defer func() {
g.Finish(err)
}()
return pm.Producer.Append(ctx, msg)
}
func (pm producerWithMetrics) Close() {
pm.Producer.Close()
pm.metrics.Close()
} }

View File

@ -39,11 +39,10 @@ func NewResumableProducer(f factory, opts *ProducerOptions) *ResumableProducer {
lifetime: typeutil.NewLifetime(), lifetime: typeutil.NewLifetime(),
logger: log.With(zap.String("pchannel", opts.PChannel)), logger: log.With(zap.String("pchannel", opts.PChannel)),
opts: opts, opts: opts,
producer: newProducerWithResumingError(opts.PChannel), // lazy initialized.
producer: newProducerWithResumingError(), // lazy initialized.
cond: syncutil.NewContextCond(&sync.Mutex{}), cond: syncutil.NewContextCond(&sync.Mutex{}),
factory: f, factory: f,
metrics: newProducerMetrics(opts.PChannel), metrics: newResumingProducerMetrics(opts.PChannel),
} }
go p.resumeLoop() go p.resumeLoop()
return p return p
@ -74,7 +73,7 @@ type ResumableProducer struct {
// factory is used to create a new producer. // factory is used to create a new producer.
factory factory factory factory
metrics *producerMetrics metrics *resumingProducerMetrics
} }
// Produce produce a new message to log service. // Produce produce a new message to log service.
@ -82,11 +81,7 @@ func (p *ResumableProducer) Produce(ctx context.Context, msg message.MutableMess
if !p.lifetime.Add(typeutil.LifetimeStateWorking) { if !p.lifetime.Add(typeutil.LifetimeStateWorking) {
return nil, errors.Wrapf(errs.ErrClosed, "produce on closed producer") return nil, errors.Wrapf(errs.ErrClosed, "produce on closed producer")
} }
metricGuard := p.metrics.StartProduce(msg.EstimateSize()) defer p.lifetime.Done()
defer func() {
metricGuard.Finish(err)
p.lifetime.Done()
}()
for { for {
// get producer. // get producer.

View File

@ -12,14 +12,16 @@ import (
) )
// newProducerWithResumingError creates a new producer with resuming error. // newProducerWithResumingError creates a new producer with resuming error.
func newProducerWithResumingError() producerWithResumingError { func newProducerWithResumingError(pchannel string) producerWithResumingError {
return producerWithResumingError{ return producerWithResumingError{
pchannel: pchannel,
cond: syncutil.NewContextCond(&sync.Mutex{}), cond: syncutil.NewContextCond(&sync.Mutex{}),
} }
} }
// producerWithResumingError is a producer that can be resumed. // producerWithResumingError is a producer that can be resumed.
type producerWithResumingError struct { type producerWithResumingError struct {
pchannel string
cond *syncutil.ContextCond cond *syncutil.ContextCond
producer handler.Producer producer handler.Producer
err error err error
@ -47,7 +49,7 @@ func (p *producerWithResumingError) GetProducerAfterAvailable(ctx context.Contex
func (p *producerWithResumingError) SwapProducer(producer handler.Producer, err error) { func (p *producerWithResumingError) SwapProducer(producer handler.Producer, err error) {
p.cond.LockAndBroadcast() p.cond.LockAndBroadcast()
oldProducer := p.producer oldProducer := p.producer
p.producer = producer p.producer = newProducerWithMetrics(p.pchannel, producer)
p.err = err p.err = err
p.cond.L.Unlock() p.cond.L.Unlock()

View File

@ -489,7 +489,7 @@ func (s *DataSyncServiceSuite) TestStartStop() {
return nil return nil
}) })
s.msChan <- msgstream.BuildConsumeMsgPack(&msgPack) s.msChan <- msgstream.BuildConsumeMsgPack(&msgPack)
s.msChan <- msgstream.BuildConsumeMsgPack(&msgPack) s.msChan <- msgstream.BuildConsumeMsgPack(&timeTickMsgPack)
<-ch <-ch
} }

View File

@ -0,0 +1,175 @@
// Code generated by mockery v2.46.0. DO NOT EDIT.
package mock_interceptors
import (
context "context"
message "github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
mock "github.com/stretchr/testify/mock"
)
// MockInterceptorWithMetrics is an autogenerated mock type for the InterceptorWithMetrics type
type MockInterceptorWithMetrics struct {
mock.Mock
}
type MockInterceptorWithMetrics_Expecter struct {
mock *mock.Mock
}
func (_m *MockInterceptorWithMetrics) EXPECT() *MockInterceptorWithMetrics_Expecter {
return &MockInterceptorWithMetrics_Expecter{mock: &_m.Mock}
}
// Close provides a mock function with given fields:
func (_m *MockInterceptorWithMetrics) Close() {
_m.Called()
}
// MockInterceptorWithMetrics_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close'
type MockInterceptorWithMetrics_Close_Call struct {
*mock.Call
}
// Close is a helper method to define mock.On call
func (_e *MockInterceptorWithMetrics_Expecter) Close() *MockInterceptorWithMetrics_Close_Call {
return &MockInterceptorWithMetrics_Close_Call{Call: _e.mock.On("Close")}
}
func (_c *MockInterceptorWithMetrics_Close_Call) Run(run func()) *MockInterceptorWithMetrics_Close_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockInterceptorWithMetrics_Close_Call) Return() *MockInterceptorWithMetrics_Close_Call {
_c.Call.Return()
return _c
}
func (_c *MockInterceptorWithMetrics_Close_Call) RunAndReturn(run func()) *MockInterceptorWithMetrics_Close_Call {
_c.Call.Return(run)
return _c
}
// DoAppend provides a mock function with given fields: ctx, msg, append
func (_m *MockInterceptorWithMetrics) DoAppend(ctx context.Context, msg message.MutableMessage, append func(context.Context, message.MutableMessage) (message.MessageID, error)) (message.MessageID, error) {
ret := _m.Called(ctx, msg, append)
if len(ret) == 0 {
panic("no return value specified for DoAppend")
}
var r0 message.MessageID
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, message.MutableMessage, func(context.Context, message.MutableMessage) (message.MessageID, error)) (message.MessageID, error)); ok {
return rf(ctx, msg, append)
}
if rf, ok := ret.Get(0).(func(context.Context, message.MutableMessage, func(context.Context, message.MutableMessage) (message.MessageID, error)) message.MessageID); ok {
r0 = rf(ctx, msg, append)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(message.MessageID)
}
}
if rf, ok := ret.Get(1).(func(context.Context, message.MutableMessage, func(context.Context, message.MutableMessage) (message.MessageID, error)) error); ok {
r1 = rf(ctx, msg, append)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockInterceptorWithMetrics_DoAppend_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DoAppend'
type MockInterceptorWithMetrics_DoAppend_Call struct {
*mock.Call
}
// DoAppend is a helper method to define mock.On call
// - ctx context.Context
// - msg message.MutableMessage
// - append func(context.Context , message.MutableMessage)(message.MessageID , error)
func (_e *MockInterceptorWithMetrics_Expecter) DoAppend(ctx interface{}, msg interface{}, append interface{}) *MockInterceptorWithMetrics_DoAppend_Call {
return &MockInterceptorWithMetrics_DoAppend_Call{Call: _e.mock.On("DoAppend", ctx, msg, append)}
}
func (_c *MockInterceptorWithMetrics_DoAppend_Call) Run(run func(ctx context.Context, msg message.MutableMessage, append func(context.Context, message.MutableMessage) (message.MessageID, error))) *MockInterceptorWithMetrics_DoAppend_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(message.MutableMessage), args[2].(func(context.Context, message.MutableMessage) (message.MessageID, error)))
})
return _c
}
func (_c *MockInterceptorWithMetrics_DoAppend_Call) Return(_a0 message.MessageID, _a1 error) *MockInterceptorWithMetrics_DoAppend_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockInterceptorWithMetrics_DoAppend_Call) RunAndReturn(run func(context.Context, message.MutableMessage, func(context.Context, message.MutableMessage) (message.MessageID, error)) (message.MessageID, error)) *MockInterceptorWithMetrics_DoAppend_Call {
_c.Call.Return(run)
return _c
}
// Name provides a mock function with given fields:
func (_m *MockInterceptorWithMetrics) Name() string {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for Name")
}
var r0 string
if rf, ok := ret.Get(0).(func() string); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(string)
}
return r0
}
// MockInterceptorWithMetrics_Name_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Name'
type MockInterceptorWithMetrics_Name_Call struct {
*mock.Call
}
// Name is a helper method to define mock.On call
func (_e *MockInterceptorWithMetrics_Expecter) Name() *MockInterceptorWithMetrics_Name_Call {
return &MockInterceptorWithMetrics_Name_Call{Call: _e.mock.On("Name")}
}
func (_c *MockInterceptorWithMetrics_Name_Call) Run(run func()) *MockInterceptorWithMetrics_Name_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockInterceptorWithMetrics_Name_Call) Return(_a0 string) *MockInterceptorWithMetrics_Name_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockInterceptorWithMetrics_Name_Call) RunAndReturn(run func() string) *MockInterceptorWithMetrics_Name_Call {
_c.Call.Return(run)
return _c
}
// NewMockInterceptorWithMetrics creates a new instance of MockInterceptorWithMetrics. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockInterceptorWithMetrics(t interface {
mock.TestingT
Cleanup(func())
}) *MockInterceptorWithMetrics {
mock := &MockInterceptorWithMetrics{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -124,7 +124,6 @@ func (cm *ChannelManager) AssignPChannelsDone(ctx context.Context, pChannels []s
defer cm.cond.L.Unlock() defer cm.cond.L.Unlock()
// modified channels. // modified channels.
histories := make([]types.PChannelInfoAssigned, 0, len(pChannels))
pChannelMetas := make([]*streamingpb.PChannelMeta, 0, len(pChannels)) pChannelMetas := make([]*streamingpb.PChannelMeta, 0, len(pChannels))
for _, channelName := range pChannels { for _, channelName := range pChannels {
pchannel, ok := cm.channels[channelName] pchannel, ok := cm.channels[channelName]
@ -132,7 +131,7 @@ func (cm *ChannelManager) AssignPChannelsDone(ctx context.Context, pChannels []s
return ErrChannelNotExist return ErrChannelNotExist
} }
mutablePChannel := pchannel.CopyForWrite() mutablePChannel := pchannel.CopyForWrite()
histories = append(histories, mutablePChannel.AssignToServerDone()...) mutablePChannel.AssignToServerDone()
pChannelMetas = append(pChannelMetas, mutablePChannel.IntoRawMeta()) pChannelMetas = append(pChannelMetas, mutablePChannel.IntoRawMeta())
} }
@ -141,9 +140,6 @@ func (cm *ChannelManager) AssignPChannelsDone(ctx context.Context, pChannels []s
} }
// Update metrics. // Update metrics.
for _, history := range histories {
cm.metrics.RemovePChannelStatus(history)
}
for _, pchannel := range pChannelMetas { for _, pchannel := range pChannelMetas {
cm.metrics.AssignPChannelStatus(pchannel) cm.metrics.AssignPChannelStatus(pchannel)
} }

View File

@ -7,7 +7,6 @@ import (
"github.com/milvus-io/milvus/pkg/v2/metrics" "github.com/milvus-io/milvus/pkg/v2/metrics"
"github.com/milvus-io/milvus/pkg/v2/proto/streamingpb" "github.com/milvus-io/milvus/pkg/v2/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/paramtable"
) )
@ -24,22 +23,17 @@ type channelMetrics struct {
assignmentVersion prometheus.Gauge assignmentVersion prometheus.Gauge
} }
// RemovePChannelStatus removes the pchannel status metric
func (m *channelMetrics) RemovePChannelStatus(assigned types.PChannelInfoAssigned) {
m.pchannelInfo.Delete(prometheus.Labels{
metrics.WALChannelLabelName: assigned.Channel.Name,
metrics.WALChannelTermLabelName: strconv.FormatInt(assigned.Channel.Term, 10),
metrics.StreamingNodeLabelName: strconv.FormatInt(assigned.Node.ServerID, 10),
})
}
// AssignPChannelStatus assigns the pchannel status metric // AssignPChannelStatus assigns the pchannel status metric
func (m *channelMetrics) AssignPChannelStatus(meta *streamingpb.PChannelMeta) { func (m *channelMetrics) AssignPChannelStatus(meta *streamingpb.PChannelMeta) {
metrics.StreamingCoordPChannelInfo.DeletePartialMatch(prometheus.Labels{
metrics.WALChannelLabelName: meta.GetChannel().GetName(),
})
m.pchannelInfo.With(prometheus.Labels{ m.pchannelInfo.With(prometheus.Labels{
metrics.WALChannelLabelName: meta.GetChannel().GetName(), metrics.WALChannelLabelName: meta.GetChannel().GetName(),
metrics.WALChannelTermLabelName: strconv.FormatInt(meta.GetChannel().GetTerm(), 10), metrics.WALChannelTermLabelName: strconv.FormatInt(meta.GetChannel().GetTerm(), 10),
metrics.StreamingNodeLabelName: strconv.FormatInt(meta.GetNode().GetServerId(), 10), metrics.StreamingNodeLabelName: strconv.FormatInt(meta.GetNode().GetServerId(), 10),
}).Set(float64(meta.GetState())) metrics.WALStateLabelName: meta.GetState().String(),
}).Set(1)
} }
// UpdateAssignmentVersion updates the assignment version metric // UpdateAssignmentVersion updates the assignment version metric

View File

@ -127,23 +127,11 @@ func (m *mutablePChannel) TryAssignToServerID(streamingNode types.StreamingNodeI
} }
// AssignToServerDone assigns the channel to the server done. // AssignToServerDone assigns the channel to the server done.
func (m *mutablePChannel) AssignToServerDone() []types.PChannelInfoAssigned { func (m *mutablePChannel) AssignToServerDone() {
var history []types.PChannelInfoAssigned
if m.inner.State == streamingpb.PChannelMetaState_PCHANNEL_META_STATE_ASSIGNING { if m.inner.State == streamingpb.PChannelMetaState_PCHANNEL_META_STATE_ASSIGNING {
history = make([]types.PChannelInfoAssigned, 0, len(m.inner.Histories))
for _, h := range m.inner.Histories {
history = append(history, types.PChannelInfoAssigned{
Channel: types.PChannelInfo{
Name: m.inner.Channel.Name,
Term: h.GetTerm(),
},
Node: types.NewStreamingNodeInfoFromProto(h.Node),
})
}
m.inner.Histories = make([]*streamingpb.PChannelAssignmentLog, 0) m.inner.Histories = make([]*streamingpb.PChannelAssignmentLog, 0)
m.inner.State = streamingpb.PChannelMetaState_PCHANNEL_META_STATE_ASSIGNED m.inner.State = streamingpb.PChannelMetaState_PCHANNEL_META_STATE_ASSIGNED
} }
return history
} }
// MarkAsUnavailable marks the channel as unavailable. // MarkAsUnavailable marks the channel as unavailable.

View File

@ -19,9 +19,11 @@ import (
// newBroadcastTaskManager creates a new broadcast task manager with recovery info. // newBroadcastTaskManager creates a new broadcast task manager with recovery info.
func newBroadcastTaskManager(protos []*streamingpb.BroadcastTask) (*broadcastTaskManager, []*pendingBroadcastTask) { func newBroadcastTaskManager(protos []*streamingpb.BroadcastTask) (*broadcastTaskManager, []*pendingBroadcastTask) {
logger := resource.Resource().Logger().With(log.FieldComponent("broadcaster")) logger := resource.Resource().Logger().With(log.FieldComponent("broadcaster"))
metrics := newBroadcasterMetrics()
recoveryTasks := make([]*broadcastTask, 0, len(protos)) recoveryTasks := make([]*broadcastTask, 0, len(protos))
for _, proto := range protos { for _, proto := range protos {
t := newBroadcastTaskFromProto(proto) t := newBroadcastTaskFromProto(proto, metrics)
t.SetLogger(logger.With(zap.Uint64("broadcastID", t.header.BroadcastID))) t.SetLogger(logger.With(zap.Uint64("broadcastID", t.header.BroadcastID)))
recoveryTasks = append(recoveryTasks, t) recoveryTasks = append(recoveryTasks, t)
} }
@ -34,6 +36,7 @@ func newBroadcastTaskManager(protos []*streamingpb.BroadcastTask) (*broadcastTas
panic(fmt.Sprintf("unreachable: dirty recovery info in metastore, broadcast ids: [%d, %d]", oldTaskID, task.header.BroadcastID)) panic(fmt.Sprintf("unreachable: dirty recovery info in metastore, broadcast ids: [%d, %d]", oldTaskID, task.header.BroadcastID))
} }
rks[rk] = task.header.BroadcastID rks[rk] = task.header.BroadcastID
metrics.IncomingResourceKey(rk.Domain)
} }
tasks[task.header.BroadcastID] = task tasks[task.header.BroadcastID] = task
if task.task.State == streamingpb.BroadcastTaskState_BROADCAST_TASK_STATE_PENDING { if task.task.State == streamingpb.BroadcastTaskState_BROADCAST_TASK_STATE_PENDING {
@ -47,6 +50,7 @@ func newBroadcastTaskManager(protos []*streamingpb.BroadcastTask) (*broadcastTas
tasks: tasks, tasks: tasks,
resourceKeys: rks, resourceKeys: rks,
version: 1, version: 1,
metrics: metrics,
} }
m.SetLogger(logger) m.SetLogger(logger)
return m, pendingTasks return m, pendingTasks
@ -59,6 +63,7 @@ type broadcastTaskManager struct {
tasks map[uint64]*broadcastTask // map the broadcastID to the broadcastTaskState tasks map[uint64]*broadcastTask // map the broadcastID to the broadcastTaskState
resourceKeys map[message.ResourceKey]uint64 // map the resource key to the broadcastID resourceKeys map[message.ResourceKey]uint64 // map the resource key to the broadcastID
version int // version is used to make sure that there's no update lost for watcher. version int // version is used to make sure that there's no update lost for watcher.
metrics *broadcasterMetrics
} }
// AddTask adds a new broadcast task into the manager. // AddTask adds a new broadcast task into the manager.
@ -152,7 +157,7 @@ func (bm *broadcastTaskManager) GetBroadcastTaskByResourceKey(resourceKey messag
// addBroadcastTask adds the broadcast task into the manager. // addBroadcastTask adds the broadcast task into the manager.
func (bm *broadcastTaskManager) addBroadcastTask(msg message.BroadcastMutableMessage) (*broadcastTask, error) { func (bm *broadcastTaskManager) addBroadcastTask(msg message.BroadcastMutableMessage) (*broadcastTask, error) {
newIncomingTask := newBroadcastTaskFromBroadcastMessage(msg) newIncomingTask := newBroadcastTaskFromBroadcastMessage(msg, bm.metrics)
header := newIncomingTask.Header() header := newIncomingTask.Header()
newIncomingTask.SetLogger(bm.Logger().With(zap.Uint64("broadcastID", header.BroadcastID))) newIncomingTask.SetLogger(bm.Logger().With(zap.Uint64("broadcastID", header.BroadcastID)))
@ -167,6 +172,7 @@ func (bm *broadcastTaskManager) addBroadcastTask(msg message.BroadcastMutableMes
// setup the resource keys to make resource exclusive held. // setup the resource keys to make resource exclusive held.
for key := range header.ResourceKeys { for key := range header.ResourceKeys {
bm.resourceKeys[key] = header.BroadcastID bm.resourceKeys[key] = header.BroadcastID
bm.metrics.IncomingResourceKey(key.Domain)
} }
bm.tasks[header.BroadcastID] = newIncomingTask bm.tasks[header.BroadcastID] = newIncomingTask
return newIncomingTask, nil return newIncomingTask, nil
@ -193,6 +199,7 @@ func (bm *broadcastTaskManager) removeBroadcastTask(broadcastID uint64) {
// remove the related resource keys // remove the related resource keys
for key := range task.header.ResourceKeys { for key := range task.header.ResourceKeys {
delete(bm.resourceKeys, key) delete(bm.resourceKeys, key)
bm.metrics.GoneResourceKey(key.Domain)
} }
delete(bm.tasks, broadcastID) delete(bm.tasks, broadcastID)
} }

View File

@ -16,7 +16,8 @@ import (
) )
// newBroadcastTaskFromProto creates a new broadcast task from the proto. // newBroadcastTaskFromProto creates a new broadcast task from the proto.
func newBroadcastTaskFromProto(proto *streamingpb.BroadcastTask) *broadcastTask { func newBroadcastTaskFromProto(proto *streamingpb.BroadcastTask, metrics *broadcasterMetrics) *broadcastTask {
m := metrics.NewBroadcastTask(proto.GetState())
msg := message.NewBroadcastMutableMessageBeforeAppend(proto.Message.Payload, proto.Message.Properties) msg := message.NewBroadcastMutableMessageBeforeAppend(proto.Message.Payload, proto.Message.Properties)
bh := msg.BroadcastHeader() bh := msg.BroadcastHeader()
return &broadcastTask{ return &broadcastTask{
@ -24,11 +25,13 @@ func newBroadcastTaskFromProto(proto *streamingpb.BroadcastTask) *broadcastTask
header: bh, header: bh,
task: proto, task: proto,
recoverPersisted: true, // the task is recovered from the recovery info, so it's persisted. recoverPersisted: true, // the task is recovered from the recovery info, so it's persisted.
metrics: m,
} }
} }
// newBroadcastTaskFromBroadcastMessage creates a new broadcast task from the broadcast message. // newBroadcastTaskFromBroadcastMessage creates a new broadcast task from the broadcast message.
func newBroadcastTaskFromBroadcastMessage(msg message.BroadcastMutableMessage) *broadcastTask { func newBroadcastTaskFromBroadcastMessage(msg message.BroadcastMutableMessage, metrics *broadcasterMetrics) *broadcastTask {
m := metrics.NewBroadcastTask(streamingpb.BroadcastTaskState_BROADCAST_TASK_STATE_PENDING)
header := msg.BroadcastHeader() header := msg.BroadcastHeader()
return &broadcastTask{ return &broadcastTask{
Binder: log.Binder{}, Binder: log.Binder{},
@ -40,6 +43,7 @@ func newBroadcastTaskFromBroadcastMessage(msg message.BroadcastMutableMessage) *
AckedVchannelBitmap: make([]byte, len(header.VChannels)), AckedVchannelBitmap: make([]byte, len(header.VChannels)),
}, },
recoverPersisted: false, recoverPersisted: false,
metrics: m,
} }
} }
@ -50,6 +54,7 @@ type broadcastTask struct {
header *message.BroadcastHeader header *message.BroadcastHeader
task *streamingpb.BroadcastTask task *streamingpb.BroadcastTask
recoverPersisted bool // a flag to indicate that the task has been persisted into the recovery info and can be recovered. recoverPersisted bool // a flag to indicate that the task has been persisted into the recovery info and can be recovered.
metrics *taskMetricsGuard
} }
// Header returns the header of the broadcast task. // Header returns the header of the broadcast task.
@ -116,6 +121,10 @@ func (b *broadcastTask) Ack(ctx context.Context, vchannel string) error {
return err return err
} }
b.task = task b.task = task
b.metrics.ObserveAckAnyOne()
if isAllDone(task) {
b.metrics.ObserveAckAll()
}
return nil return nil
} }
@ -159,6 +168,7 @@ func (b *broadcastTask) BroadcastDone(ctx context.Context) error {
return err return err
} }
b.task = task b.task = task
b.metrics.ObserveBroadcastDone()
return nil return nil
} }
@ -221,5 +231,6 @@ func (b *broadcastTask) saveTask(ctx context.Context, task *streamingpb.Broadcas
return err return err
} }
logger.Info("save broadcast task done") logger.Info("save broadcast task done")
b.metrics.ToState(task.State)
return nil return nil
} }

View File

@ -0,0 +1,91 @@
package broadcaster
import (
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/milvus-io/milvus/pkg/v2/metrics"
"github.com/milvus-io/milvus/pkg/v2/proto/messagespb"
"github.com/milvus-io/milvus/pkg/v2/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)
// newBroadcasterMetrics creates a new broadcaster metrics.
func newBroadcasterMetrics() *broadcasterMetrics {
constLabel := prometheus.Labels{
metrics.NodeIDLabelName: paramtable.GetStringNodeID(),
}
return &broadcasterMetrics{
taskTotal: metrics.StreamingCoordBroadcasterTaskTotal.MustCurryWith(constLabel),
resourceKeyTotal: metrics.StreamingCoordResourceKeyTotal.MustCurryWith(constLabel),
broadcastDuration: metrics.StreamingCoordBroadcastDurationSeconds.With(constLabel),
ackAnyOneDuration: metrics.StreamingCoordBroadcasterAckAnyOneDurationSeconds.With(constLabel),
ackAllDuration: metrics.StreamingCoordBroadcasterAckAllDurationSeconds.With(constLabel),
}
}
// broadcasterMetrics is the metrics of the broadcaster.
type broadcasterMetrics struct {
taskTotal *prometheus.GaugeVec
resourceKeyTotal *prometheus.GaugeVec
broadcastDuration prometheus.Observer
ackAnyOneDuration prometheus.Observer
ackAllDuration prometheus.Observer
}
// fromStateToState updates the metrics when the state of the broadcast task changes.
func (m *broadcasterMetrics) fromStateToState(from streamingpb.BroadcastTaskState, to streamingpb.BroadcastTaskState) {
if from != streamingpb.BroadcastTaskState_BROADCAST_TASK_STATE_UNKNOWN {
m.taskTotal.WithLabelValues(from.String()).Dec()
}
if to != streamingpb.BroadcastTaskState_BROADCAST_TASK_STATE_DONE {
m.taskTotal.WithLabelValues(to.String()).Inc()
}
}
// NewBroadcastTask creates a new broadcast task.
func (m *broadcasterMetrics) NewBroadcastTask(state streamingpb.BroadcastTaskState) *taskMetricsGuard {
g := &taskMetricsGuard{
start: time.Now(),
state: state,
broadcasterMetrics: m,
}
g.broadcasterMetrics.fromStateToState(streamingpb.BroadcastTaskState_BROADCAST_TASK_STATE_UNKNOWN, state)
return g
}
func (m *broadcasterMetrics) IncomingResourceKey(domain messagespb.ResourceDomain) {
m.resourceKeyTotal.WithLabelValues(domain.String()).Inc()
}
func (m *broadcasterMetrics) GoneResourceKey(domain messagespb.ResourceDomain) {
m.resourceKeyTotal.WithLabelValues(domain.String()).Dec()
}
type taskMetricsGuard struct {
start time.Time
state streamingpb.BroadcastTaskState
*broadcasterMetrics
}
// ToState updates the state of the broadcast task.
func (g *taskMetricsGuard) ToState(state streamingpb.BroadcastTaskState) {
g.broadcasterMetrics.fromStateToState(g.state, state)
g.state = state
}
// ObserveBroadcastDone observes the broadcast done.
func (g *taskMetricsGuard) ObserveBroadcastDone() {
g.broadcastDuration.Observe(time.Since(g.start).Seconds())
}
// ObserverAckOne observes the ack any one.
func (g *taskMetricsGuard) ObserveAckAnyOne() {
g.ackAnyOneDuration.Observe(time.Since(g.start).Seconds())
}
// ObserverAckOne observes the ack all.
func (g *taskMetricsGuard) ObserveAckAll() {
g.ackAllDuration.Observe(time.Since(g.start).Seconds())
}

View File

@ -16,7 +16,9 @@ import (
var errBroadcastTaskIsNotDone = errors.New("broadcast task is not done") var errBroadcastTaskIsNotDone = errors.New("broadcast task is not done")
// newPendingBroadcastTask creates a new pendingBroadcastTask. // newPendingBroadcastTask creates a new pendingBroadcastTask.
func newPendingBroadcastTask(task *broadcastTask) *pendingBroadcastTask { func newPendingBroadcastTask(
task *broadcastTask,
) *pendingBroadcastTask {
msgs := task.PendingBroadcastMessages() msgs := task.PendingBroadcastMessages()
return &pendingBroadcastTask{ return &pendingBroadcastTask{
broadcastTask: task, broadcastTask: task,
@ -40,6 +42,7 @@ type pendingBroadcastTask struct {
pendingMessages []message.MutableMessage pendingMessages []message.MutableMessage
appendResult map[string]*types.AppendResult appendResult map[string]*types.AppendResult
future *syncutil.Future[*types.BroadcastAppendResult] future *syncutil.Future[*types.BroadcastAppendResult]
metrics *taskMetricsGuard
*typeutil.BackoffWithInstant *typeutil.BackoffWithInstant
} }

View File

@ -69,11 +69,12 @@ func (hc *handlerClientImpl) CreateProducer(ctx context.Context, opts *ProducerO
} }
defer hc.lifetime.Done() defer hc.lifetime.Done()
p, err := hc.createHandlerAfterStreamingNodeReady(ctx, opts.PChannel, func(ctx context.Context, assign *types.PChannelInfoAssigned) (*handlerCreateResult, error) { logger := log.With(zap.String("pchannel", opts.PChannel), zap.String("handler", "producer"))
p, err := hc.createHandlerAfterStreamingNodeReady(ctx, logger, opts.PChannel, func(ctx context.Context, assign *types.PChannelInfoAssigned) (any, error) {
// Check if the localWAL is assigned at local // Check if the localWAL is assigned at local
localWAL, err := registry.GetLocalAvailableWAL(assign.Channel) localWAL, err := registry.GetLocalAvailableWAL(assign.Channel)
if err == nil { if err == nil {
return localResult(localWAL), nil return localWAL, nil
} }
if !shouldUseRemoteWAL(err) { if !shouldUseRemoteWAL(err) {
return nil, err return nil, err
@ -89,7 +90,7 @@ func (hc *handlerClientImpl) CreateProducer(ctx context.Context, opts *ProducerO
if err != nil { if err != nil {
return nil, err return nil, err
} }
return remoteResult(remoteWAL), nil return remoteWAL, nil
}) })
if err != nil { if err != nil {
return nil, err return nil, err
@ -104,7 +105,8 @@ func (hc *handlerClientImpl) CreateConsumer(ctx context.Context, opts *ConsumerO
} }
defer hc.lifetime.Done() defer hc.lifetime.Done()
c, err := hc.createHandlerAfterStreamingNodeReady(ctx, opts.PChannel, func(ctx context.Context, assign *types.PChannelInfoAssigned) (*handlerCreateResult, error) { logger := log.With(zap.String("pchannel", opts.PChannel), zap.String("vchannel", opts.VChannel), zap.String("handler", "consumer"))
c, err := hc.createHandlerAfterStreamingNodeReady(ctx, logger, opts.PChannel, func(ctx context.Context, assign *types.PChannelInfoAssigned) (any, error) {
// Check if the localWAL is assigned at local // Check if the localWAL is assigned at local
localWAL, err := registry.GetLocalAvailableWAL(assign.Channel) localWAL, err := registry.GetLocalAvailableWAL(assign.Channel)
if err == nil { if err == nil {
@ -117,7 +119,7 @@ func (hc *handlerClientImpl) CreateConsumer(ctx context.Context, opts *ConsumerO
if err != nil { if err != nil {
return nil, err return nil, err
} }
return localResult(localScanner), nil return localScanner, nil
} }
if !shouldUseRemoteWAL(err) { if !shouldUseRemoteWAL(err) {
return nil, err return nil, err
@ -138,7 +140,7 @@ func (hc *handlerClientImpl) CreateConsumer(ctx context.Context, opts *ConsumerO
if err != nil { if err != nil {
return nil, err return nil, err
} }
return remoteResult(remoteScanner), nil return remoteScanner, nil
}) })
if err != nil { if err != nil {
return nil, err return nil, err
@ -146,25 +148,11 @@ func (hc *handlerClientImpl) CreateConsumer(ctx context.Context, opts *ConsumerO
return c.(Consumer), nil return c.(Consumer), nil
} }
func localResult(result any) *handlerCreateResult { type handlerCreateFunc func(ctx context.Context, assign *types.PChannelInfoAssigned) (any, error)
return &handlerCreateResult{result: result, isLocal: true}
}
func remoteResult(result any) *handlerCreateResult {
return &handlerCreateResult{result: result, isLocal: false}
}
type handlerCreateResult struct {
result any
isLocal bool
}
type handlerCreateFunc func(ctx context.Context, assign *types.PChannelInfoAssigned) (*handlerCreateResult, error)
// createHandlerAfterStreamingNodeReady creates a handler until streaming node ready. // createHandlerAfterStreamingNodeReady creates a handler until streaming node ready.
// If streaming node is not ready, it will block until new assignment term is coming or context timeout. // If streaming node is not ready, it will block until new assignment term is coming or context timeout.
func (hc *handlerClientImpl) createHandlerAfterStreamingNodeReady(ctx context.Context, pchannel string, create handlerCreateFunc) (any, error) { func (hc *handlerClientImpl) createHandlerAfterStreamingNodeReady(ctx context.Context, logger *log.MLogger, pchannel string, create handlerCreateFunc) (any, error) {
logger := log.With(zap.String("pchannel", pchannel))
// TODO: backoff should be configurable. // TODO: backoff should be configurable.
backoff := backoff.NewExponentialBackOff() backoff := backoff.NewExponentialBackOff()
for { for {
@ -173,8 +161,8 @@ func (hc *handlerClientImpl) createHandlerAfterStreamingNodeReady(ctx context.Co
// Find assignment, try to create producer on this assignment. // Find assignment, try to create producer on this assignment.
createResult, err := create(ctx, assign) createResult, err := create(ctx, assign)
if err == nil { if err == nil {
logger.Info("create handler success", zap.Any("assignment", assign), zap.Bool("isLocal", createResult.isLocal)) logger.Info("create handler success", zap.Any("assignment", assign), zap.Bool("isLocal", registry.IsLocal(createResult)))
return createResult.result, nil return createResult, nil
} }
logger.Warn("create handler failed", zap.Any("assignment", assign), zap.Error(err)) logger.Warn("create handler failed", zap.Any("assignment", assign), zap.Error(err))

View File

@ -182,6 +182,11 @@ func (p *producerImpl) IsAvailable() bool {
} }
} }
// IsLocal returns if the producer is local.
func (p *producerImpl) IsLocal() bool {
return false
}
// Available returns a channel that will be closed when the producer is unavailable. // Available returns a channel that will be closed when the producer is unavailable.
func (p *producerImpl) Available() <-chan struct{} { func (p *producerImpl) Available() <-chan struct{} {
return p.sendExitCh return p.sendExitCh

View File

@ -0,0 +1,58 @@
package flusherimpl
import (
"strconv"
"github.com/prometheus/client_golang/prometheus"
"github.com/milvus-io/milvus/pkg/v2/metrics"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/tsoutil"
)
const (
flusherStateInRecovering flusherState = "in_recovery"
flusherStateInWorking flusherState = "working"
flusherStateOnClosing flusherState = "closing"
)
type flusherState = string
func newFlusherMetrics(pchannel types.PChannelInfo) *flusherMetrics {
constLabels := prometheus.Labels{
metrics.NodeIDLabelName: paramtable.GetStringNodeID(),
metrics.WALChannelLabelName: pchannel.Name,
metrics.WALChannelTermLabelName: strconv.FormatInt(pchannel.Term, 10),
}
m := &flusherMetrics{
constLabels: constLabels,
info: metrics.WALFlusherInfo.MustCurryWith(constLabels),
timetick: metrics.WALFlusherTimeTick.With(constLabels),
state: flusherStateInRecovering,
}
m.info.WithLabelValues(flusherStateInRecovering).Set(1)
return m
}
type flusherMetrics struct {
constLabels prometheus.Labels
info *prometheus.GaugeVec
timetick prometheus.Gauge
state flusherState
}
func (m *flusherMetrics) IntoState(state flusherState) {
metrics.WALFlusherInfo.DeletePartialMatch(m.constLabels)
m.state = state
m.info.WithLabelValues(m.state).Set(1)
}
func (m *flusherMetrics) ObserveMetrics(tickTime uint64) {
m.timetick.Set(tsoutil.PhysicalTimeSeconds(tickTime))
}
func (m *flusherMetrics) Close() {
metrics.WALFlusherInfo.DeletePartialMatch(m.constLabels)
metrics.WALFlusherTimeTick.DeletePartialMatch(m.constLabels)
}

View File

@ -32,6 +32,7 @@ func RecoverWALFlusher(param interceptors.InterceptorBuildParam) *WALFlusherImpl
logger: resource.Resource().Logger().With( logger: resource.Resource().Logger().With(
log.FieldComponent("flusher"), log.FieldComponent("flusher"),
zap.String("pchannel", param.WALImpls.Channel().Name)), zap.String("pchannel", param.WALImpls.Channel().Name)),
metrics: newFlusherMetrics(param.WALImpls.Channel()),
} }
go flusher.Execute() go flusher.Execute()
return flusher return flusher
@ -42,6 +43,7 @@ type WALFlusherImpl struct {
wal *syncutil.Future[wal.WAL] wal *syncutil.Future[wal.WAL]
flusherComponents *flusherComponents flusherComponents *flusherComponents
logger *log.MLogger logger *log.MLogger
metrics *flusherMetrics
} }
// Execute starts the wal flusher. // Execute starts the wal flusher.
@ -79,6 +81,9 @@ func (impl *WALFlusherImpl) Execute() (err error) {
defer scanner.Close() defer scanner.Close()
impl.logger.Info("wal flusher start to work") impl.logger.Info("wal flusher start to work")
impl.metrics.IntoState(flusherStateInWorking)
defer impl.metrics.IntoState(flusherStateOnClosing)
for { for {
select { select {
case <-impl.notifier.Context().Done(): case <-impl.notifier.Context().Done():
@ -88,6 +93,7 @@ func (impl *WALFlusherImpl) Execute() (err error) {
impl.logger.Warn("wal flusher is closing for closed scanner channel, which is unexpected at graceful way") impl.logger.Warn("wal flusher is closing for closed scanner channel, which is unexpected at graceful way")
return nil return nil
} }
impl.metrics.ObserveMetrics(msg.TimeTick())
if err := impl.dispatch(msg); err != nil { if err := impl.dispatch(msg); err != nil {
// The error is always context canceled. // The error is always context canceled.
return nil return nil
@ -100,6 +106,7 @@ func (impl *WALFlusherImpl) Execute() (err error) {
func (impl *WALFlusherImpl) Close() { func (impl *WALFlusherImpl) Close() {
impl.notifier.Cancel() impl.notifier.Cancel()
impl.notifier.BlockUntilFinish() impl.notifier.BlockUntilFinish()
impl.metrics.Close()
} }
// buildFlusherComponents builds the components of the flusher. // buildFlusherComponents builds the components of the flusher.

View File

@ -17,6 +17,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message" "github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/types" "github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
"github.com/milvus-io/milvus/pkg/v2/streaming/walimpls" "github.com/milvus-io/milvus/pkg/v2/streaming/walimpls"
"github.com/milvus-io/milvus/pkg/v2/streaming/walimpls/impls/walimplstest"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/paramtable"
) )
@ -49,7 +50,7 @@ func TestOpenerAdaptor(t *testing.T) {
wal.EXPECT().Channel().Return(boo.Channel) wal.EXPECT().Channel().Return(boo.Channel)
wal.EXPECT().Append(mock.Anything, mock.Anything).RunAndReturn( wal.EXPECT().Append(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, mm message.MutableMessage) (message.MessageID, error) { func(ctx context.Context, mm message.MutableMessage) (message.MessageID, error) {
return nil, nil return walimplstest.NewTestMessageID(1), nil
}) })
wal.EXPECT().Close().Run(func() {}) wal.EXPECT().Close().Run(func() {})
return wal, nil return wal, nil

View File

@ -129,12 +129,14 @@ func (s *scannerAdaptorImpl) produceEventLoop(msgChan chan<- message.ImmutableMe
} }
scanner := newSwithableScanner(s.Name(), s.logger, s.innerWAL, wb, s.readOption.DeliverPolicy, msgChan) scanner := newSwithableScanner(s.Name(), s.logger, s.innerWAL, wb, s.readOption.DeliverPolicy, msgChan)
s.logger.Info("start produce loop of scanner at mode", zap.String("mode", scanner.Mode())) s.logger.Info("start produce loop of scanner at model", zap.String("model", getScannerModel(scanner)))
for { for {
if scanner, err = scanner.Do(s.Context()); err != nil { if scanner, err = scanner.Do(s.Context()); err != nil {
return err return err
} }
s.logger.Info("switch scanner mode", zap.String("mode", scanner.Mode())) m := getScannerModel(scanner)
s.metrics.SwitchModel(m)
s.logger.Info("switch scanner model", zap.String("model", m))
} }
} }
@ -170,7 +172,9 @@ func (s *scannerAdaptorImpl) consumeEventLoop(msgChan <-chan message.ImmutableMe
// handleUpstream handles the incoming message from the upstream. // handleUpstream handles the incoming message from the upstream.
func (s *scannerAdaptorImpl) handleUpstream(msg message.ImmutableMessage) { func (s *scannerAdaptorImpl) handleUpstream(msg message.ImmutableMessage) {
// Observe the message. // Observe the message.
s.metrics.ObserveMessage(msg.MessageType(), msg.EstimateSize()) var isTailing bool
msg, isTailing = isTailingScanImmutableMessage(msg)
s.metrics.ObserveMessage(isTailing, msg.MessageType(), msg.EstimateSize())
if msg.MessageType() == message.MessageTypeTimeTick { if msg.MessageType() == message.MessageTypeTimeTick {
// If the time tick message incoming, // If the time tick message incoming,
// the reorder buffer can be consumed until latest confirmed timetick. // the reorder buffer can be consumed until latest confirmed timetick.
@ -209,15 +213,16 @@ func (s *scannerAdaptorImpl) handleUpstream(msg message.ImmutableMessage) {
// otherwise add message into reorder buffer directly. // otherwise add message into reorder buffer directly.
if err := s.reorderBuffer.Push(msg); err != nil { if err := s.reorderBuffer.Push(msg); err != nil {
if errors.Is(err, utility.ErrTimeTickVoilation) { if errors.Is(err, utility.ErrTimeTickVoilation) {
s.metrics.ObserveTimeTickViolation(msg.MessageType()) s.metrics.ObserveTimeTickViolation(isTailing, msg.MessageType())
} }
s.logger.Warn("failed to push message into reorder buffer", s.logger.Warn("failed to push message into reorder buffer",
zap.Any("msgID", msg.MessageID()), zap.Any("msgID", msg.MessageID()),
zap.Uint64("timetick", msg.TimeTick()), zap.Uint64("timetick", msg.TimeTick()),
zap.String("vchannel", msg.VChannel()), zap.String("vchannel", msg.VChannel()),
zap.Bool("tailing", isTailing),
zap.Error(err)) zap.Error(err))
} }
// Observe the filtered message. // Observe the filtered message.
s.metrics.UpdateTimeTickBufSize(s.reorderBuffer.Bytes()) s.metrics.UpdateTimeTickBufSize(s.reorderBuffer.Bytes())
s.metrics.ObserveFilteredMessage(msg.MessageType(), msg.EstimateSize()) s.metrics.ObservePassedMessage(isTailing, msg.MessageType(), msg.EstimateSize())
} }

View File

@ -10,6 +10,7 @@ import (
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/wab" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/wab"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/vchantempstore" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/vchantempstore"
"github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/metrics"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message" "github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/options" "github.com/milvus-io/milvus/pkg/v2/streaming/util/options"
"github.com/milvus-io/milvus/pkg/v2/streaming/walimpls" "github.com/milvus-io/milvus/pkg/v2/streaming/walimpls"
@ -45,9 +46,6 @@ func newSwithableScanner(
// switchableScanner is a scanner that can switch between Catchup and Tailing mode // switchableScanner is a scanner that can switch between Catchup and Tailing mode
type switchableScanner interface { type switchableScanner interface {
// Mode is Catchup or Tailing
Mode() string
// Execute make a scanner work at background. // Execute make a scanner work at background.
// When the scanner want to change the mode, it will return a new scanner with the new mode. // When the scanner want to change the mode, it will return a new scanner with the new mode.
// When error is returned, the scanner is canceled and unrecoverable forever. // When error is returned, the scanner is canceled and unrecoverable forever.
@ -79,10 +77,6 @@ type catchupScanner struct {
lastConfirmedMessageIDForOldVersion message.MessageID lastConfirmedMessageIDForOldVersion message.MessageID
} }
func (s *catchupScanner) Mode() string {
return "Catchup"
}
func (s *catchupScanner) Do(ctx context.Context) (switchableScanner, error) { func (s *catchupScanner) Do(ctx context.Context) (switchableScanner, error) {
for { for {
if ctx.Err() != nil { if ctx.Err() != nil {
@ -213,10 +207,6 @@ type tailingScanner struct {
lastConsumedMessage message.ImmutableMessage lastConsumedMessage message.ImmutableMessage
} }
func (s *tailingScanner) Mode() string {
return "Tailing"
}
func (s *tailingScanner) Do(ctx context.Context) (switchableScanner, error) { func (s *tailingScanner) Do(ctx context.Context) (switchableScanner, error) {
for { for {
msg, err := s.reader.Next(ctx) msg, err := s.reader.Next(ctx)
@ -237,9 +227,29 @@ func (s *tailingScanner) Do(ctx context.Context) (switchableScanner, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
if err := s.HandleMessage(ctx, msg); err != nil { if err := s.HandleMessage(ctx, tailingImmutableMesasge{msg}); err != nil {
return nil, err return nil, err
} }
s.lastConsumedMessage = msg s.lastConsumedMessage = msg
} }
} }
// getScannerModel returns the scanner model.
func getScannerModel(scanner switchableScanner) string {
if _, ok := scanner.(*tailingScanner); ok {
return metrics.WALScannerModelTailing
}
return metrics.WALScannerModelCatchup
}
type tailingImmutableMesasge struct {
message.ImmutableMessage
}
// isTailingScanImmutableMessage check whether the message is a tailing message.
func isTailingScanImmutableMessage(msg message.ImmutableMessage) (message.ImmutableMessage, bool) {
if msg, ok := msg.(tailingImmutableMesasge); ok {
return msg.ImmutableMessage, true
}
return msg, false
}

View File

@ -35,6 +35,10 @@ func adaptImplsToWAL(
WALImpls: basicWAL, WALImpls: basicWAL,
WAL: syncutil.NewFuture[wal.WAL](), WAL: syncutil.NewFuture[wal.WAL](),
} }
logger := resource.Resource().Logger().With(
log.FieldComponent("wal"),
zap.Any("channel", basicWAL.Channel()),
)
wal := &walAdaptorImpl{ wal := &walAdaptorImpl{
lifetime: typeutil.NewLifetime(), lifetime: typeutil.NewLifetime(),
available: make(chan struct{}), available: make(chan struct{}),
@ -51,11 +55,9 @@ func adaptImplsToWAL(
cleanup: cleanup, cleanup: cleanup,
writeMetrics: metricsutil.NewWriteMetrics(basicWAL.Channel(), basicWAL.WALName()), writeMetrics: metricsutil.NewWriteMetrics(basicWAL.Channel(), basicWAL.WALName()),
scanMetrics: metricsutil.NewScanMetrics(basicWAL.Channel()), scanMetrics: metricsutil.NewScanMetrics(basicWAL.Channel()),
logger: resource.Resource().Logger().With( logger: logger,
log.FieldComponent("wal"),
zap.Any("channel", basicWAL.Channel()),
),
} }
wal.writeMetrics.SetLogger(logger)
param.WAL.Set(wal) param.WAL.Set(wal)
return wal return wal
} }
@ -121,8 +123,11 @@ func (w *walAdaptorImpl) Append(ctx context.Context, msg message.MutableMessage)
// Setup the term of wal. // Setup the term of wal.
msg = msg.WithWALTerm(w.Channel().Term) msg = msg.WithWALTerm(w.Channel().Term)
appendMetrics := w.writeMetrics.StartAppend(msg)
ctx = utility.WithAppendMetricsContext(ctx, appendMetrics)
// Metrics for append message. // Metrics for append message.
metricsGuard := w.writeMetrics.StartAppend(msg.MessageType(), msg.EstimateSize()) metricsGuard := appendMetrics.StartAppendGuard()
// Execute the interceptor and wal append. // Execute the interceptor and wal append.
var extraAppendResult utility.ExtraAppendResult var extraAppendResult utility.ExtraAppendResult
@ -131,6 +136,7 @@ func (w *walAdaptorImpl) Append(ctx context.Context, msg message.MutableMessage)
func(ctx context.Context, msg message.MutableMessage) (message.MessageID, error) { func(ctx context.Context, msg message.MutableMessage) (message.MessageID, error) {
if notPersistHint := utility.GetNotPersisted(ctx); notPersistHint != nil { if notPersistHint := utility.GetNotPersisted(ctx); notPersistHint != nil {
// do not persist the message if the hint is set. // do not persist the message if the hint is set.
appendMetrics.NotPersisted()
return notPersistHint.MessageID, nil return notPersistHint.MessageID, nil
} }
metricsGuard.StartWALImplAppend() metricsGuard.StartWALImplAppend()
@ -138,8 +144,9 @@ func (w *walAdaptorImpl) Append(ctx context.Context, msg message.MutableMessage)
metricsGuard.FinishWALImplAppend() metricsGuard.FinishWALImplAppend()
return msgID, err return msgID, err
}) })
metricsGuard.FinishAppend()
if err != nil { if err != nil {
metricsGuard.Finish(err) appendMetrics.Done(nil, err)
return nil, err return nil, err
} }
var extra *anypb.Any var extra *anypb.Any
@ -157,7 +164,7 @@ func (w *walAdaptorImpl) Append(ctx context.Context, msg message.MutableMessage)
TxnCtx: extraAppendResult.TxnCtx, TxnCtx: extraAppendResult.TxnCtx,
Extra: extra, Extra: extra,
} }
metricsGuard.Finish(nil) appendMetrics.Done(r, nil)
return r, nil return r, nil
} }

View File

@ -25,6 +25,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message" "github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/types" "github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
"github.com/milvus-io/milvus/pkg/v2/streaming/walimpls" "github.com/milvus-io/milvus/pkg/v2/streaming/walimpls"
"github.com/milvus-io/milvus/pkg/v2/streaming/walimpls/impls/walimplstest"
) )
func TestWalAdaptorReadFail(t *testing.T) { func TestWalAdaptorReadFail(t *testing.T) {
@ -84,7 +85,7 @@ func TestWALAdaptor(t *testing.T) {
l.EXPECT().Channel().Return(types.PChannelInfo{}) l.EXPECT().Channel().Return(types.PChannelInfo{})
l.EXPECT().Append(mock.Anything, mock.Anything).RunAndReturn( l.EXPECT().Append(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, mm message.MutableMessage) (message.MessageID, error) { func(ctx context.Context, mm message.MutableMessage) (message.MessageID, error) {
return nil, nil return walimplstest.NewTestMessageID(1), nil
}) })
l.EXPECT().Read(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, ro walimpls.ReadOption) (walimpls.ScannerImpls, error) { l.EXPECT().Read(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, ro walimpls.ReadOption) (walimpls.ScannerImpls, error) {
scanner := mock_walimpls.NewMockScannerImpls(t) scanner := mock_walimpls.NewMockScannerImpls(t)
@ -162,7 +163,7 @@ func TestNoInterceptor(t *testing.T) {
l.EXPECT().WALName().Return("test") l.EXPECT().WALName().Return("test")
l.EXPECT().Channel().Return(types.PChannelInfo{}) l.EXPECT().Channel().Return(types.PChannelInfo{})
l.EXPECT().Append(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, mm message.MutableMessage) (message.MessageID, error) { l.EXPECT().Append(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, mm message.MutableMessage) (message.MessageID, error) {
return nil, nil return walimplstest.NewTestMessageID(1), nil
}) })
l.EXPECT().Close().Run(func() {}) l.EXPECT().Close().Run(func() {})
@ -181,7 +182,7 @@ func TestWALWithInterceptor(t *testing.T) {
l := mock_walimpls.NewMockWALImpls(t) l := mock_walimpls.NewMockWALImpls(t)
l.EXPECT().Channel().Return(types.PChannelInfo{}) l.EXPECT().Channel().Return(types.PChannelInfo{})
l.EXPECT().Append(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, mm message.MutableMessage) (message.MessageID, error) { l.EXPECT().Append(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, mm message.MutableMessage) (message.MessageID, error) {
return nil, nil return walimplstest.NewTestMessageID(1), nil
}) })
l.EXPECT().WALName().Return("test") l.EXPECT().WALName().Return("test")
l.EXPECT().Close().Run(func() {}) l.EXPECT().Close().Run(func() {})

View File

@ -3,6 +3,7 @@ package interceptors
import ( import (
"context" "context"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/utility"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message" "github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
) )
@ -15,14 +16,10 @@ type (
// NewChainedInterceptor creates a new chained interceptor. // NewChainedInterceptor creates a new chained interceptor.
func NewChainedInterceptor(interceptors ...Interceptor) InterceptorWithReady { func NewChainedInterceptor(interceptors ...Interceptor) InterceptorWithReady {
appendCalls := make([]appendInterceptorCall, 0, len(interceptors))
for _, i := range interceptors {
appendCalls = append(appendCalls, i.DoAppend)
}
return &chainedInterceptor{ return &chainedInterceptor{
closed: make(chan struct{}), closed: make(chan struct{}),
interceptors: interceptors, interceptors: interceptors,
appendCall: chainAppendInterceptors(appendCalls), appendCall: chainAppendInterceptors(interceptors),
} }
} }
@ -66,28 +63,54 @@ func (c *chainedInterceptor) Close() {
} }
// chainAppendInterceptors chains all unary client interceptors into one. // chainAppendInterceptors chains all unary client interceptors into one.
func chainAppendInterceptors(interceptorCalls []appendInterceptorCall) appendInterceptorCall { func chainAppendInterceptors(interceptors []Interceptor) appendInterceptorCall {
if len(interceptorCalls) == 0 { if len(interceptors) == 0 {
// Do nothing if no interceptors. // Do nothing if no interceptors.
return func(ctx context.Context, msg message.MutableMessage, append Append) (message.MessageID, error) { return func(ctx context.Context, msg message.MutableMessage, append Append) (message.MessageID, error) {
return append(ctx, msg) return append(ctx, msg)
} }
} else if len(interceptorCalls) == 1 { } else if len(interceptors) == 1 {
return interceptorCalls[0] if i, ok := interceptors[0].(InterceptorWithMetrics); ok {
return adaptAppendWithMetricCollecting(i.Name(), interceptors[0].DoAppend)
}
return interceptors[0].DoAppend
} }
return func(ctx context.Context, msg message.MutableMessage, invoker Append) (message.MessageID, error) { return func(ctx context.Context, msg message.MutableMessage, invoker Append) (message.MessageID, error) {
return interceptorCalls[0](ctx, msg, getChainAppendInvoker(interceptorCalls, 0, invoker)) if i, ok := interceptors[0].(InterceptorWithMetrics); ok {
return adaptAppendWithMetricCollecting(i.Name(), interceptors[0].DoAppend)(ctx, msg, getChainAppendInvoker(interceptors, 0, invoker))
}
return interceptors[0].DoAppend(ctx, msg, getChainAppendInvoker(interceptors, 0, invoker))
} }
} }
// getChainAppendInvoker recursively generate the chained unary invoker. // getChainAppendInvoker recursively generate the chained unary invoker.
func getChainAppendInvoker(interceptors []appendInterceptorCall, idx int, finalInvoker Append) Append { func getChainAppendInvoker(interceptors []Interceptor, idx int, finalInvoker Append) Append {
// all interceptor is called, so return the final invoker. // all interceptor is called, so return the final invoker.
if idx == len(interceptors)-1 { if idx == len(interceptors)-1 {
return finalInvoker return finalInvoker
} }
// recursively generate the chained invoker. // recursively generate the chained invoker.
return func(ctx context.Context, msg message.MutableMessage) (message.MessageID, error) { return func(ctx context.Context, msg message.MutableMessage) (message.MessageID, error) {
return interceptors[idx+1](ctx, msg, getChainAppendInvoker(interceptors, idx+1, finalInvoker)) idx := idx + 1
if i, ok := interceptors[idx].(InterceptorWithMetrics); ok {
return adaptAppendWithMetricCollecting(i.Name(), i.DoAppend)(ctx, msg, getChainAppendInvoker(interceptors, idx, finalInvoker))
}
return interceptors[idx].DoAppend(ctx, msg, getChainAppendInvoker(interceptors, idx, finalInvoker))
}
}
// adaptAppendWithMetricCollecting adapts the append interceptor with metric collecting.
func adaptAppendWithMetricCollecting(name string, append appendInterceptorCall) appendInterceptorCall {
return func(ctx context.Context, msg message.MutableMessage, invoker Append) (message.MessageID, error) {
c := utility.MustGetAppendMetrics(ctx).StartInterceptorCollector(name)
msgID, err := append(ctx, msg, func(ctx context.Context, msg message.MutableMessage) (message.MessageID, error) {
c.BeforeDone()
msgID, err := invoker(ctx, msg)
c.AfterStart()
return msgID, err
})
c.AfterDone()
c.BeforeFailure(err)
return msgID, err
} }
} }

View File

@ -2,6 +2,7 @@ package interceptors_test
import ( import (
"context" "context"
"fmt"
"testing" "testing"
"time" "time"
@ -10,12 +11,17 @@ import (
"github.com/milvus-io/milvus/internal/mocks/streamingnode/server/wal/mock_interceptors" "github.com/milvus-io/milvus/internal/mocks/streamingnode/server/wal/mock_interceptors"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/metricsutil"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/utility"
"github.com/milvus-io/milvus/pkg/v2/mocks/streaming/util/mock_message"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message" "github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
) )
func TestChainInterceptor(t *testing.T) { func TestChainInterceptor(t *testing.T) {
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
testChainInterceptor(t, i) testChainInterceptor(t, i, false)
testChainInterceptor(t, 5, true)
} }
} }
@ -71,7 +77,7 @@ func TestChainReady(t *testing.T) {
} }
} }
func testChainInterceptor(t *testing.T, count int) { func testChainInterceptor(t *testing.T, count int, named bool) {
type record struct { type record struct {
before bool before bool
after bool after bool
@ -84,7 +90,9 @@ func testChainInterceptor(t *testing.T, count int) {
j := i j := i
appendInterceptorRecords = append(appendInterceptorRecords, record{}) appendInterceptorRecords = append(appendInterceptorRecords, record{})
if !named {
interceptor := mock_interceptors.NewMockInterceptor(t) interceptor := mock_interceptors.NewMockInterceptor(t)
interceptor.EXPECT().DoAppend(mock.Anything, mock.Anything, mock.Anything).RunAndReturn( interceptor.EXPECT().DoAppend(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, mm message.MutableMessage, f func(context.Context, message.MutableMessage) (message.MessageID, error)) (message.MessageID, error) { func(ctx context.Context, mm message.MutableMessage, f func(context.Context, message.MutableMessage) (message.MessageID, error)) (message.MessageID, error) {
appendInterceptorRecords[j].before = true appendInterceptorRecords[j].before = true
@ -96,21 +104,57 @@ func testChainInterceptor(t *testing.T, count int) {
appendInterceptorRecords[j].closed = true appendInterceptorRecords[j].closed = true
}) })
ips = append(ips, interceptor) ips = append(ips, interceptor)
} else {
interceptor := mock_interceptors.NewMockInterceptorWithMetrics(t)
interceptor.EXPECT().DoAppend(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, mm message.MutableMessage, f func(context.Context, message.MutableMessage) (message.MessageID, error)) (message.MessageID, error) {
appendInterceptorRecords[j].before = true
// time.Sleep(time.Duration(j) * 10 * time.Millisecond)
msgID, err := f(ctx, mm)
appendInterceptorRecords[j].after = true
// time.Sleep(time.Duration(j) * 20 * time.Millisecond)
return msgID, err
})
interceptor.EXPECT().Name().Return(fmt.Sprintf("interceptor-%d", j))
interceptor.EXPECT().Close().Run(func() {
appendInterceptorRecords[j].closed = true
})
ips = append(ips, interceptor)
}
} }
interceptor := interceptors.NewChainedInterceptor(ips...) interceptor := interceptors.NewChainedInterceptor(ips...)
// fast return // fast return
<-interceptor.Ready() <-interceptor.Ready()
msg, err := interceptor.DoAppend(context.Background(), nil, func(context.Context, message.MutableMessage) (message.MessageID, error) { msg := mock_message.NewMockMutableMessage(t)
msg.EXPECT().MessageType().Return(message.MessageTypeDelete).Maybe()
msg.EXPECT().EstimateSize().Return(1).Maybe()
msg.EXPECT().TxnContext().Return(nil).Maybe()
mw := metricsutil.NewWriteMetrics(types.PChannelInfo{}, "rocksmq")
m := mw.StartAppend(msg)
ctx := utility.WithAppendMetricsContext(context.Background(), m)
msgID, err := interceptor.DoAppend(ctx, msg, func(context.Context, message.MutableMessage) (message.MessageID, error) {
return nil, nil return nil, nil
}) })
assert.NoError(t, err) assert.NoError(t, err)
assert.Nil(t, msg) assert.Nil(t, msgID)
interceptor.Close() interceptor.Close()
if named {
cnt := 0
m.RangeOverInterceptors(func(name string, ims []*metricsutil.InterceptorMetrics) {
assert.NotEmpty(t, name)
for _, im := range ims {
assert.NotZero(t, im.Before)
assert.NotZero(t, im.After)
cnt++
}
})
assert.Equal(t, count, cnt)
}
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
assert.True(t, appendInterceptorRecords[i].before) assert.True(t, appendInterceptorRecords[i].before, i)
assert.True(t, appendInterceptorRecords[i].after) assert.True(t, appendInterceptorRecords[i].after, i)
assert.True(t, appendInterceptorRecords[i].closed) assert.True(t, appendInterceptorRecords[i].closed, i)
} }
} }

View File

@ -8,6 +8,10 @@ import (
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message" "github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
) )
const (
interceptorName = "flusher"
)
var ( var (
_ interceptors.Interceptor = (*flusherAppendInterceptor)(nil) _ interceptors.Interceptor = (*flusherAppendInterceptor)(nil)
_ interceptors.InterceptorWithGracefulClose = (*flusherAppendInterceptor)(nil) _ interceptors.InterceptorWithGracefulClose = (*flusherAppendInterceptor)(nil)

View File

@ -35,7 +35,8 @@ type Interceptor interface {
// 2. unique primary key filter and build. // 2. unique primary key filter and build.
// 3. index builder. // 3. index builder.
// 4. cache sync up. // 4. cache sync up.
// AppendInterceptor should be lazy initialized and fast execution. // !!! AppendInterceptor should be lazy initialized and fast execution.
// !!! the operation after append should never return error.
// Execute the append operation with interceptor. // Execute the append operation with interceptor.
DoAppend(ctx context.Context, msg message.MutableMessage, append Append) (message.MessageID, error) DoAppend(ctx context.Context, msg message.MutableMessage, append Append) (message.MessageID, error)
@ -57,6 +58,14 @@ type InterceptorWithReady interface {
Ready() <-chan struct{} Ready() <-chan struct{}
} }
// InterceptorWithMetrics is the interceptor with metric collecting.
type InterceptorWithMetrics interface {
Interceptor
// Name returns the name of the interceptor.
Name() string
}
// Some interceptor may need to perform a graceful close operation. // Some interceptor may need to perform a graceful close operation.
type InterceptorWithGracefulClose interface { type InterceptorWithGracefulClose interface {
Interceptor Interceptor

View File

@ -18,6 +18,7 @@ var (
// It's useful when the append operation want to refresh the append context (such as timetick belong to the message) // It's useful when the append operation want to refresh the append context (such as timetick belong to the message)
type redoAppendInterceptor struct{} type redoAppendInterceptor struct{}
// TODO: should be removed after lock-based before timetick is applied.
func (r *redoAppendInterceptor) DoAppend(ctx context.Context, msg message.MutableMessage, append interceptors.Append) (msgID message.MessageID, err error) { func (r *redoAppendInterceptor) DoAppend(ctx context.Context, msg message.MutableMessage, append interceptors.Append) (msgID message.MessageID, err error) {
for { for {
if ctx.Err() != nil { if ctx.Err() != nil {

View File

@ -22,7 +22,12 @@ import (
"github.com/milvus-io/milvus/pkg/v2/util/typeutil" "github.com/milvus-io/milvus/pkg/v2/util/typeutil"
) )
var _ interceptors.InterceptorWithReady = (*segmentInterceptor)(nil) const interceptorName = "segment-assign"
var (
_ interceptors.InterceptorWithMetrics = (*segmentInterceptor)(nil)
_ interceptors.InterceptorWithReady = (*segmentInterceptor)(nil)
)
// segmentInterceptor is the implementation of segment assignment interceptor. // segmentInterceptor is the implementation of segment assignment interceptor.
type segmentInterceptor struct { type segmentInterceptor struct {
@ -33,6 +38,10 @@ type segmentInterceptor struct {
assignManager *syncutil.Future[*manager.PChannelSegmentAllocManager] assignManager *syncutil.Future[*manager.PChannelSegmentAllocManager]
} }
func (impl *segmentInterceptor) Name() string {
return interceptorName
}
// Ready returns a channel that will be closed when the segment interceptor is ready. // Ready returns a channel that will be closed when the segment interceptor is ready.
func (impl *segmentInterceptor) Ready() <-chan struct{} { func (impl *segmentInterceptor) Ready() <-chan struct{} {
// Wait for segment assignment manager ready. // Wait for segment assignment manager ready.

View File

@ -55,16 +55,17 @@ func (ta *AckManager) AllocateWithBarrier(ctx context.Context, barrierTimeTick u
// Allocate allocates a timestamp. // Allocate allocates a timestamp.
// Concurrent safe to call with Sync and Allocate. // Concurrent safe to call with Sync and Allocate.
func (ta *AckManager) Allocate(ctx context.Context) (*Acker, error) { func (ta *AckManager) Allocate(ctx context.Context) (*Acker, error) {
metricsGuard := ta.metrics.StartAllocateTimeTick()
ta.mu.Lock() ta.mu.Lock()
defer ta.mu.Unlock() defer ta.mu.Unlock()
// allocate one from underlying allocator first. // allocate one from underlying allocator first.
ts, err := resource.Resource().TSOAllocator().Allocate(ctx) ts, err := resource.Resource().TSOAllocator().Allocate(ctx)
if err != nil { if err != nil {
metricsGuard.Done(0, err)
return nil, err return nil, err
} }
ta.lastAllocatedTimeTick = ts ta.lastAllocatedTimeTick = ts
ta.metrics.CountAllocateTimeTick(ts)
// create new timestampAck for ack process. // create new timestampAck for ack process.
// add ts to heap wait for ack. // add ts to heap wait for ack.
@ -74,6 +75,7 @@ func (ta *AckManager) Allocate(ctx context.Context) (*Acker, error) {
manager: ta, manager: ta,
} }
ta.notAckHeap.Push(acker) ta.notAckHeap.Push(acker)
metricsGuard.Done(ts, err)
return acker, nil return acker, nil
} }

View File

@ -16,7 +16,10 @@ import (
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message" "github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
) )
const interceptorName = "timetick"
var ( var (
_ interceptors.InterceptorWithMetrics = (*timeTickAppendInterceptor)(nil)
_ interceptors.InterceptorWithReady = (*timeTickAppendInterceptor)(nil) _ interceptors.InterceptorWithReady = (*timeTickAppendInterceptor)(nil)
_ interceptors.InterceptorWithGracefulClose = (*timeTickAppendInterceptor)(nil) _ interceptors.InterceptorWithGracefulClose = (*timeTickAppendInterceptor)(nil)
) )
@ -27,6 +30,10 @@ type timeTickAppendInterceptor struct {
txnManager *txn.TxnManager txnManager *txn.TxnManager
} }
func (impl *timeTickAppendInterceptor) Name() string {
return interceptorName
}
// Ready implements AppendInterceptor. // Ready implements AppendInterceptor.
func (impl *timeTickAppendInterceptor) Ready() <-chan struct{} { func (impl *timeTickAppendInterceptor) Ready() <-chan struct{} {
return impl.operator.Ready() return impl.operator.Ready()
@ -34,16 +41,15 @@ func (impl *timeTickAppendInterceptor) Ready() <-chan struct{} {
// Do implements AppendInterceptor. // Do implements AppendInterceptor.
func (impl *timeTickAppendInterceptor) DoAppend(ctx context.Context, msg message.MutableMessage, append interceptors.Append) (msgID message.MessageID, err error) { func (impl *timeTickAppendInterceptor) DoAppend(ctx context.Context, msg message.MutableMessage, append interceptors.Append) (msgID message.MessageID, err error) {
// the cursor manager should beready since the timetick interceptor is ready.
cm, _ := impl.operator.MVCCManager(ctx)
defer func() { defer func() {
if err == nil { if err == nil {
// the cursor manager should beready since the timetick interceptor is ready.
cm, _ := impl.operator.MVCCManager(ctx)
cm.UpdateMVCC(msg) cm.UpdateMVCC(msg)
} }
}() }()
ackManager := impl.operator.AckManager() ackManager := impl.operator.AckManager()
var txnSession *txn.TxnSession var txnSession *txn.TxnSession
if msg.MessageType() != message.MessageTypeTimeTick { if msg.MessageType() != message.MessageTypeTimeTick {
// Allocate new timestamp acker for message. // Allocate new timestamp acker for message.

View File

@ -185,6 +185,7 @@ func (impl *timeTickSyncOperator) blockUntilSyncTimeTickReady() error {
capacity := int(paramtable.Get().StreamingCfg.WALWriteAheadBufferCapacity.GetAsSize()) capacity := int(paramtable.Get().StreamingCfg.WALWriteAheadBufferCapacity.GetAsSize())
keepalive := paramtable.Get().StreamingCfg.WALWriteAheadBufferKeepalive.GetAsDurationByParse() keepalive := paramtable.Get().StreamingCfg.WALWriteAheadBufferKeepalive.GetAsDurationByParse()
impl.writeAheadBuffer = wab.NewWirteAheadBuffer( impl.writeAheadBuffer = wab.NewWirteAheadBuffer(
impl.Channel().Name,
impl.logger, impl.logger,
capacity, capacity,
keepalive, keepalive,
@ -222,6 +223,7 @@ func (impl *timeTickSyncOperator) AckManager() *ack.AckManager {
func (impl *timeTickSyncOperator) Close() { func (impl *timeTickSyncOperator) Close() {
impl.cancel() impl.cancel()
impl.metrics.Close() impl.metrics.Close()
impl.writeAheadBuffer.Close()
} }
// sendTsMsg sends first timestamp message to wal. // sendTsMsg sends first timestamp message to wal.

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"sync" "sync"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/metricsutil"
"github.com/milvus-io/milvus/internal/util/streamingutil/status" "github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message" "github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
"github.com/milvus-io/milvus/pkg/v2/util/tsoutil" "github.com/milvus-io/milvus/pkg/v2/util/tsoutil"
@ -25,6 +26,7 @@ type TxnSession struct {
doneWait chan struct{} // The channel for waiting the transaction committed. doneWait chan struct{} // The channel for waiting the transaction committed.
rollback bool // The flag indicates the transaction is rollbacked. rollback bool // The flag indicates the transaction is rollbacked.
cleanupCallbacks []func() // The cleanup callbacks function for the session. cleanupCallbacks []func() // The cleanup callbacks function for the session.
metricsGuard *metricsutil.TxnMetricsGuard // The metrics guard for the session.
} }
// TxnContext returns the txn context of the session. // TxnContext returns the txn context of the session.
@ -212,6 +214,8 @@ func (s *TxnSession) cleanup() {
for _, f := range s.cleanupCallbacks { for _, f := range s.cleanupCallbacks {
f() f()
} }
s.metricsGuard.Done(s.state)
s.metricsGuard = nil
s.cleanupCallbacks = nil s.cleanupCallbacks = nil
} }

View File

@ -62,6 +62,7 @@ func (m *TxnManager) BeginNewTxn(ctx context.Context, timetick uint64, keepalive
if m.closed != nil { if m.closed != nil {
return nil, status.NewTransactionExpired("manager closed") return nil, status.NewTransactionExpired("manager closed")
} }
metricsGuard := m.metrics.BeginTxn()
session := &TxnSession{ session := &TxnSession{
mu: sync.Mutex{}, mu: sync.Mutex{},
lastTimetick: timetick, lastTimetick: timetick,
@ -73,10 +74,9 @@ func (m *TxnManager) BeginNewTxn(ctx context.Context, timetick uint64, keepalive
state: message.TxnStateBegin, state: message.TxnStateBegin,
doneWait: nil, doneWait: nil,
rollback: false, rollback: false,
metricsGuard: metricsGuard,
} }
m.sessions[session.TxnContext().TxnID] = session m.sessions[session.TxnContext().TxnID] = session
m.metrics.BeginTxn()
return session, nil return session, nil
} }
@ -88,7 +88,6 @@ func (m *TxnManager) CleanupTxnUntil(ts uint64) {
for id, session := range m.sessions { for id, session := range m.sessions {
if session.IsExpiredOrDone(ts) { if session.IsExpiredOrDone(ts) {
session.Cleanup() session.Cleanup()
m.metrics.Finish(session.State())
delete(m.sessions, id) delete(m.sessions, id)
} }
} }

View File

@ -10,7 +10,10 @@ import (
) )
// ErrEvicted is returned when the expected message has been evicted. // ErrEvicted is returned when the expected message has been evicted.
var ErrEvicted = errors.New("message has been evicted") var (
ErrEvicted = errors.New("message has been evicted")
ErrClosed = errors.New("write ahead buffer is closed")
)
// messageWithOffset is a message with an offset as a unique continuous identifier. // messageWithOffset is a message with an offset as a unique continuous identifier.
type messageWithOffset struct { type messageWithOffset struct {
@ -44,6 +47,24 @@ type pendingQueue struct {
keepAlive time.Duration keepAlive time.Duration
} }
// Len returns the length of the buffer.
func (q *pendingQueue) Len() int {
return len(q.buf)
}
// Size returns the size of the buffer.
func (q *pendingQueue) Size() int {
return q.size
}
// EarliestTimeTick returns the earliest time tick of the buffer.
func (q *pendingQueue) EarliestTimeTick() uint64 {
if len(q.buf) == 0 {
return q.lastTimeTick
}
return q.buf[0].Message.TimeTick()
}
// Push adds messages to the buffer. // Push adds messages to the buffer.
func (q *pendingQueue) Push(msgs []message.ImmutableMessage) { func (q *pendingQueue) Push(msgs []message.ImmutableMessage) {
now := time.Now() now := time.Now()

View File

@ -8,6 +8,7 @@ import (
"github.com/cockroachdb/errors" "github.com/cockroachdb/errors"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/metricsutil"
"github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message" "github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
"github.com/milvus-io/milvus/pkg/v2/util/syncutil" "github.com/milvus-io/milvus/pkg/v2/util/syncutil"
@ -24,6 +25,7 @@ type ROWriteAheadBuffer interface {
// NewWriteAheadBuffer creates a new WriteAheadBuffer. // NewWriteAheadBuffer creates a new WriteAheadBuffer.
func NewWirteAheadBuffer( func NewWirteAheadBuffer(
pchannel string,
logger *log.MLogger, logger *log.MLogger,
capacity int, capacity int,
keepalive time.Duration, keepalive time.Duration,
@ -34,6 +36,7 @@ func NewWirteAheadBuffer(
cond: syncutil.NewContextCond(&sync.Mutex{}), cond: syncutil.NewContextCond(&sync.Mutex{}),
pendingMessages: newPendingQueue(capacity, keepalive, lastConfirmedTimeTickMessage), pendingMessages: newPendingQueue(capacity, keepalive, lastConfirmedTimeTickMessage),
lastTimeTickMessage: lastConfirmedTimeTickMessage, lastTimeTickMessage: lastConfirmedTimeTickMessage,
metrics: metricsutil.NewWriteAheadBufferMetrics(pchannel, capacity),
} }
} }
@ -41,15 +44,20 @@ func NewWirteAheadBuffer(
type WriteAheadBuffer struct { type WriteAheadBuffer struct {
logger *log.MLogger logger *log.MLogger
cond *syncutil.ContextCond cond *syncutil.ContextCond
closed bool
pendingMessages *pendingQueue // The pending message is always sorted by timetick in monotonic ascending order. pendingMessages *pendingQueue // The pending message is always sorted by timetick in monotonic ascending order.
// Only keep the persisted messages in the buffer. // Only keep the persisted messages in the buffer.
lastTimeTickMessage message.ImmutableMessage lastTimeTickMessage message.ImmutableMessage
metrics *metricsutil.WriteAheadBufferMetrics
} }
// Append appends a message to the buffer. // Append appends a message to the buffer.
func (w *WriteAheadBuffer) Append(msgs []message.ImmutableMessage, tsMsg message.ImmutableMessage) { func (w *WriteAheadBuffer) Append(msgs []message.ImmutableMessage, tsMsg message.ImmutableMessage) {
w.cond.LockAndBroadcast() w.cond.LockAndBroadcast()
defer w.cond.L.Unlock() defer w.cond.L.Unlock()
if w.closed {
return
}
if tsMsg.MessageType() != message.MessageTypeTimeTick { if tsMsg.MessageType() != message.MessageTypeTimeTick {
panic("the message is not a time tick message") panic("the message is not a time tick message")
@ -71,6 +79,13 @@ func (w *WriteAheadBuffer) Append(msgs []message.ImmutableMessage, tsMsg message
w.pendingMessages.Evict() w.pendingMessages.Evict()
} }
w.lastTimeTickMessage = tsMsg w.lastTimeTickMessage = tsMsg
w.metrics.Observe(
w.pendingMessages.Len(),
w.pendingMessages.Size(),
w.pendingMessages.EarliestTimeTick(),
w.lastTimeTickMessage.TimeTick(),
)
} }
// ReadFromExclusiveTimeTick reads messages from the buffer from the exclusive time tick. // ReadFromExclusiveTimeTick reads messages from the buffer from the exclusive time tick.
@ -89,6 +104,11 @@ func (w *WriteAheadBuffer) ReadFromExclusiveTimeTick(ctx context.Context, timeti
// createSnapshotFromOffset creates a snapshot of the buffer from the given offset. // createSnapshotFromOffset creates a snapshot of the buffer from the given offset.
func (w *WriteAheadBuffer) createSnapshotFromOffset(ctx context.Context, offset int, timeTick uint64) ([]messageWithOffset, error) { func (w *WriteAheadBuffer) createSnapshotFromOffset(ctx context.Context, offset int, timeTick uint64) ([]messageWithOffset, error) {
w.cond.L.Lock() w.cond.L.Lock()
if w.closed {
w.cond.L.Unlock()
return nil, ErrClosed
}
for { for {
msgs, err := w.pendingMessages.CreateSnapshotFromOffset(offset) msgs, err := w.pendingMessages.CreateSnapshotFromOffset(offset)
if err == nil { if err == nil {
@ -122,6 +142,11 @@ func (w *WriteAheadBuffer) createSnapshotFromOffset(ctx context.Context, offset
// createSnapshotFromTimeTick creates a snapshot of the buffer from the given time tick. // createSnapshotFromTimeTick creates a snapshot of the buffer from the given time tick.
func (w *WriteAheadBuffer) createSnapshotFromTimeTick(ctx context.Context, timeTick uint64) ([]messageWithOffset, int, error) { func (w *WriteAheadBuffer) createSnapshotFromTimeTick(ctx context.Context, timeTick uint64) ([]messageWithOffset, int, error) {
w.cond.L.Lock() w.cond.L.Lock()
if w.closed {
w.cond.L.Unlock()
return nil, 0, ErrClosed
}
for { for {
msgs, err := w.pendingMessages.CreateSnapshotFromExclusiveTimeTick(timeTick) msgs, err := w.pendingMessages.CreateSnapshotFromExclusiveTimeTick(timeTick)
if err == nil { if err == nil {
@ -156,3 +181,10 @@ func (w *WriteAheadBuffer) createSnapshotFromTimeTick(ctx context.Context, timeT
} }
} }
} }
func (w *WriteAheadBuffer) Close() {
w.cond.L.Lock()
w.metrics.Close()
w.closed = true
w.cond.L.Unlock()
}

View File

@ -17,7 +17,7 @@ import (
func TestWriteAheadBufferWithOnlyTrivialTimeTick(t *testing.T) { func TestWriteAheadBufferWithOnlyTrivialTimeTick(t *testing.T) {
ctx := context.Background() ctx := context.Background()
wb := NewWirteAheadBuffer(log.With(), 5*1024*1024, 30*time.Second, createTimeTickMessage(0)) wb := NewWirteAheadBuffer("pchannel", log.With(), 5*1024*1024, 30*time.Second, createTimeTickMessage(0))
// Test timeout // Test timeout
ctx, cancel := context.WithTimeout(ctx, 1*time.Millisecond) ctx, cancel := context.WithTimeout(ctx, 1*time.Millisecond)
@ -81,7 +81,7 @@ func TestWriteAheadBufferWithOnlyTrivialTimeTick(t *testing.T) {
func TestWriteAheadBuffer(t *testing.T) { func TestWriteAheadBuffer(t *testing.T) {
// Concurrent add message into bufffer and make syncup. // Concurrent add message into bufffer and make syncup.
// The reader should never lost any message if no eviction happen. // The reader should never lost any message if no eviction happen.
wb := NewWirteAheadBuffer(log.With(), 5*1024*1024, 30*time.Second, createTimeTickMessage(1)) wb := NewWirteAheadBuffer("pchannel", log.With(), 5*1024*1024, 30*time.Second, createTimeTickMessage(1))
expectedLastTimeTick := uint64(10000) expectedLastTimeTick := uint64(10000)
ch := make(chan struct{}) ch := make(chan struct{})
totalCnt := 0 totalCnt := 0
@ -183,7 +183,7 @@ func TestWriteAheadBuffer(t *testing.T) {
} }
func TestWriteAheadBufferEviction(t *testing.T) { func TestWriteAheadBufferEviction(t *testing.T) {
wb := NewWirteAheadBuffer(log.With(), 5*1024*1024, 50*time.Millisecond, createTimeTickMessage(0)) wb := NewWirteAheadBuffer("pchannel", log.With(), 5*1024*1024, 50*time.Millisecond, createTimeTickMessage(0))
msgs := make([]message.ImmutableMessage, 0) msgs := make([]message.ImmutableMessage, 0)
for i := 1; i < 100; i++ { for i := 1; i < 100; i++ {

View File

@ -0,0 +1,157 @@
package metricsutil
import (
"fmt"
"time"
"go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
)
type InterceptorMetrics struct {
Before time.Duration
BeforeErr error
After time.Duration
}
func (im *InterceptorMetrics) String() string {
return fmt.Sprintf("before: %s, after: %s, before_err: %v", im.Before, im.After, im.BeforeErr)
}
// AppendMetrics is the metrics for append operation.
type AppendMetrics struct {
wm *WriteMetrics
bytes int
messageType message.MessageType
result *types.AppendResult
err error
appendDuration time.Duration
implAppendDuration time.Duration
interceptors map[string][]*InterceptorMetrics
persisted bool
}
type AppendMetricsGuard struct {
inner *AppendMetrics
startAppend time.Time
startImplAppend time.Time
}
// StartInterceptorCollector start the interceptor to collect the duration.
func (m *AppendMetrics) StartInterceptorCollector(name string) *InterceptorCollectGuard {
if _, ok := m.interceptors[name]; !ok {
m.interceptors[name] = make([]*InterceptorMetrics, 0, 2)
}
im := &InterceptorMetrics{}
m.interceptors[name] = append(m.interceptors[name], im)
return &InterceptorCollectGuard{
start: time.Now(),
afterStarted: false,
interceptor: im,
}
}
// StartAppendGuard start the append operation.
func (m *AppendMetrics) StartAppendGuard() *AppendMetricsGuard {
return &AppendMetricsGuard{
inner: m,
startAppend: time.Now(),
}
}
// IntoLogFields convert the metrics to log fields.
func (m *AppendMetrics) IntoLogFields() []zap.Field {
fields := []zap.Field{
zap.String("message_type", m.messageType.String()),
zap.Int("bytes", m.bytes),
zap.Duration("append_duration", m.appendDuration),
zap.Duration("impl_append_duration", m.implAppendDuration),
zap.Bool("presisted", m.persisted),
}
for name, ims := range m.interceptors {
for i, im := range ims {
fields = append(fields, zap.Any(fmt.Sprintf("%s_%d", name, i), im))
}
}
if m.err != nil {
fields = append(fields, zap.Error(m.err))
} else {
fields = append(fields, zap.String("message_id", m.result.MessageID.String()))
fields = append(fields, zap.Uint64("time_tick", m.result.TimeTick))
if m.result.TxnCtx != nil {
fields = append(fields, zap.Int64("txn_id", int64(m.result.TxnCtx.TxnID)))
}
}
return fields
}
// StartWALImplAppend start the implementation append operation.
func (m *AppendMetricsGuard) StartWALImplAppend() {
m.startImplAppend = time.Now()
}
// FinishImplAppend finish the implementation append operation.
func (m *AppendMetricsGuard) FinishWALImplAppend() {
m.inner.implAppendDuration = time.Since(m.startImplAppend)
}
// FinishAppend finish the append operation.
func (m *AppendMetricsGuard) FinishAppend() {
m.inner.appendDuration = time.Since(m.startAppend)
}
// RangeOverInterceptors range over the interceptors.
func (m *AppendMetrics) RangeOverInterceptors(f func(name string, ims []*InterceptorMetrics)) {
for name, ims := range m.interceptors {
f(name, ims)
}
}
// NotPersisted mark the message is not persisted.
func (m *AppendMetrics) NotPersisted() {
m.persisted = false
}
// Done push the metrics.
func (m *AppendMetrics) Done(result *types.AppendResult, err error) {
m.err = err
m.result = result
m.wm.done(m)
}
// InterceptorCollectGuard is used to collect the metrics of interceptor.
type InterceptorCollectGuard struct {
start time.Time
afterStarted bool
interceptor *InterceptorMetrics
}
// BeforeDone mark the before append operation is done.
func (g *InterceptorCollectGuard) BeforeDone() {
g.interceptor.Before = time.Since(g.start)
}
// BeforeFailure mark the operation before append is failed.
func (g *InterceptorCollectGuard) BeforeFailure(err error) {
if g.interceptor.Before == 0 {
// if before duration is not set, means the operation is failed before the interceptor.
g.interceptor.Before = time.Since(g.start)
g.interceptor.BeforeErr = err
}
}
// AfterStart mark the after append operation is started.
func (g *InterceptorCollectGuard) AfterStart() {
g.start = time.Now()
g.afterStarted = true
}
// AfterDone mark the after append operation is done.
func (g *InterceptorCollectGuard) AfterDone() {
if g.afterStarted {
g.interceptor.After += time.Since(g.start)
}
}

View File

@ -1,6 +1,8 @@
package metricsutil package metricsutil
import ( import (
"time"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/milvus-io/milvus/pkg/v2/metrics" "github.com/milvus-io/milvus/pkg/v2/metrics"
@ -13,7 +15,8 @@ import (
type TimeTickMetrics struct { type TimeTickMetrics struct {
mu syncutil.ClosableLock mu syncutil.ClosableLock
constLabel prometheus.Labels constLabel prometheus.Labels
allocatedTimeTickCounter prometheus.Counter allocatedTimeTickCounter *prometheus.CounterVec
allocatedTimeTickDuration prometheus.Observer
acknowledgedTimeTickCounterForSync prometheus.Counter acknowledgedTimeTickCounterForSync prometheus.Counter
syncTimeTickCounterForSync prometheus.Counter syncTimeTickCounterForSync prometheus.Counter
acknowledgedTimeTickCounter prometheus.Counter acknowledgedTimeTickCounter prometheus.Counter
@ -35,7 +38,8 @@ func NewTimeTickMetrics(pchannel string) *TimeTickMetrics {
return &TimeTickMetrics{ return &TimeTickMetrics{
mu: syncutil.ClosableLock{}, mu: syncutil.ClosableLock{},
constLabel: constLabel, constLabel: constLabel,
allocatedTimeTickCounter: metrics.WALAllocateTimeTickTotal.With(constLabel), allocatedTimeTickCounter: metrics.WALAllocateTimeTickTotal.MustCurryWith(constLabel),
allocatedTimeTickDuration: metrics.WALTimeTickAllocateDurationSeconds.With(constLabel),
acknowledgedTimeTickCounterForSync: metrics.WALAcknowledgeTimeTickTotal.MustCurryWith(constLabel).WithLabelValues("sync"), acknowledgedTimeTickCounterForSync: metrics.WALAcknowledgeTimeTickTotal.MustCurryWith(constLabel).WithLabelValues("sync"),
syncTimeTickCounterForSync: metrics.WALSyncTimeTickTotal.MustCurryWith(constLabel).WithLabelValues("sync"), syncTimeTickCounterForSync: metrics.WALSyncTimeTickTotal.MustCurryWith(constLabel).WithLabelValues("sync"),
acknowledgedTimeTickCounter: metrics.WALAcknowledgeTimeTickTotal.MustCurryWith(constLabel).WithLabelValues("common"), acknowledgedTimeTickCounter: metrics.WALAcknowledgeTimeTickTotal.MustCurryWith(constLabel).WithLabelValues("common"),
@ -49,13 +53,33 @@ func NewTimeTickMetrics(pchannel string) *TimeTickMetrics {
} }
} }
func (m *TimeTickMetrics) CountAllocateTimeTick(ts uint64) { // StartAllocateTimeTick starts to allocate time tick.
if !m.mu.LockIfNotClosed() { func (m *TimeTickMetrics) StartAllocateTimeTick() *AllocateTimeTickMetricsGuard {
return &AllocateTimeTickMetricsGuard{
start: time.Now(),
inner: m,
}
}
// AllocateTimeTickMetricsGuard is a guard for allocate time tick metrics.
type AllocateTimeTickMetricsGuard struct {
inner *TimeTickMetrics
start time.Time
}
// Done finishes the allocate time tick metrics.
func (g *AllocateTimeTickMetricsGuard) Done(ts uint64, err error) {
status := parseError(err)
if !g.inner.mu.LockIfNotClosed() {
return return
} }
m.allocatedTimeTickCounter.Inc() g.inner.allocatedTimeTickDuration.Observe(time.Since(g.start).Seconds())
m.lastAllocatedTimeTick.Set(tsoutil.PhysicalTimeSeconds(ts)) g.inner.allocatedTimeTickCounter.WithLabelValues(status).Inc()
m.mu.Unlock() if err == nil {
g.inner.lastAllocatedTimeTick.Set(tsoutil.PhysicalTimeSeconds(ts))
}
g.inner.mu.Unlock()
} }
func (m *TimeTickMetrics) CountAcknowledgeTimeTick(isSync bool) { func (m *TimeTickMetrics) CountAcknowledgeTimeTick(isSync bool) {
@ -107,7 +131,8 @@ func (m *TimeTickMetrics) UpdateLastConfirmedTimeTick(ts uint64) {
func (m *TimeTickMetrics) Close() { func (m *TimeTickMetrics) Close() {
// mark as closed and delete all labeled metrics // mark as closed and delete all labeled metrics
m.mu.Close() m.mu.Close()
metrics.WALAllocateTimeTickTotal.Delete(m.constLabel) metrics.WALAllocateTimeTickTotal.DeletePartialMatch(m.constLabel)
metrics.WALTimeTickAllocateDurationSeconds.DeletePartialMatch(m.constLabel)
metrics.WALLastAllocatedTimeTick.Delete(m.constLabel) metrics.WALLastAllocatedTimeTick.Delete(m.constLabel)
metrics.WALLastConfirmedTimeTick.Delete(m.constLabel) metrics.WALLastConfirmedTimeTick.Delete(m.constLabel)
metrics.WALAcknowledgeTimeTickTotal.DeletePartialMatch(m.constLabel) metrics.WALAcknowledgeTimeTickTotal.DeletePartialMatch(m.constLabel)

View File

@ -1,6 +1,8 @@
package metricsutil package metricsutil
import ( import (
"time"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/milvus-io/milvus/pkg/v2/metrics" "github.com/milvus-io/milvus/pkg/v2/metrics"
@ -9,6 +11,8 @@ import (
"github.com/milvus-io/milvus/pkg/v2/util/syncutil" "github.com/milvus-io/milvus/pkg/v2/util/syncutil"
) )
const labelExpired = "expired"
func NewTxnMetrics(pchannel string) *TxnMetrics { func NewTxnMetrics(pchannel string) *TxnMetrics {
constLabel := prometheus.Labels{ constLabel := prometheus.Labels{
metrics.NodeIDLabelName: paramtable.GetStringNodeID(), metrics.NodeIDLabelName: paramtable.GetStringNodeID(),
@ -18,7 +22,7 @@ func NewTxnMetrics(pchannel string) *TxnMetrics {
mu: syncutil.ClosableLock{}, mu: syncutil.ClosableLock{},
constLabel: constLabel, constLabel: constLabel,
inflightTxnGauge: metrics.WALInflightTxn.With(constLabel), inflightTxnGauge: metrics.WALInflightTxn.With(constLabel),
txnCounter: metrics.WALFinishTxn.MustCurryWith(constLabel), duration: metrics.WALTxnDurationSeconds.MustCurryWith(constLabel),
} }
} }
@ -26,28 +30,46 @@ type TxnMetrics struct {
mu syncutil.ClosableLock mu syncutil.ClosableLock
constLabel prometheus.Labels constLabel prometheus.Labels
inflightTxnGauge prometheus.Gauge inflightTxnGauge prometheus.Gauge
txnCounter *prometheus.CounterVec duration prometheus.ObserverVec
} }
func (m *TxnMetrics) BeginTxn() { func (m *TxnMetrics) BeginTxn() *TxnMetricsGuard {
if !m.mu.LockIfNotClosed() { if !m.mu.LockIfNotClosed() {
return return nil
} }
m.inflightTxnGauge.Inc() m.inflightTxnGauge.Inc()
m.mu.Unlock() m.mu.Unlock()
return &TxnMetricsGuard{
inner: m,
start: time.Now(),
}
} }
func (m *TxnMetrics) Finish(state message.TxnState) { type TxnMetricsGuard struct {
if !m.mu.LockIfNotClosed() { inner *TxnMetrics
start time.Time
}
func (g *TxnMetricsGuard) Done(state message.TxnState) {
if g == nil {
return return
} }
m.inflightTxnGauge.Dec() if !g.inner.mu.LockIfNotClosed() {
m.txnCounter.WithLabelValues(state.String()).Inc() return
m.mu.Unlock() }
g.inner.inflightTxnGauge.Dec()
s := labelExpired
if state == message.TxnStateRollbacked || state == message.TxnStateCommitted {
s = state.String()
}
g.inner.duration.WithLabelValues(s).Observe(time.Since(g.start).Seconds())
g.inner.mu.Unlock()
} }
func (m *TxnMetrics) Close() { func (m *TxnMetrics) Close() {
m.mu.Close() m.mu.Close()
metrics.WALInflightTxn.Delete(m.constLabel) metrics.WALInflightTxn.Delete(m.constLabel)
metrics.WALFinishTxn.DeletePartialMatch(m.constLabel) metrics.WALTxnDurationSeconds.DeletePartialMatch(m.constLabel)
} }

View File

@ -0,0 +1,57 @@
package metricsutil
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/milvus-io/milvus/pkg/v2/metrics"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/tsoutil"
)
// NewWriteAheadBufferMetrics creates a new WriteAheadBufferMetrics.
func NewWriteAheadBufferMetrics(
pchannel string,
capacity int,
) *WriteAheadBufferMetrics {
constLabel := prometheus.Labels{
metrics.NodeIDLabelName: paramtable.GetStringNodeID(),
metrics.WALChannelLabelName: pchannel,
}
metrics.WALWriteAheadBufferCapacityBytes.With(constLabel).Set(float64(capacity))
return &WriteAheadBufferMetrics{
constLabel: constLabel,
total: metrics.WALWriteAheadBufferEntryTotal.With(constLabel),
size: metrics.WALWriteAheadBufferSizeBytes.With(constLabel),
earilestTimeTick: metrics.WALWriteAheadBufferEarliestTimeTick.With(constLabel),
latestTimeTick: metrics.WALWriteAheadBufferLatestTimeTick.With(constLabel),
}
}
type WriteAheadBufferMetrics struct {
constLabel prometheus.Labels
total prometheus.Gauge
size prometheus.Gauge
earilestTimeTick prometheus.Gauge
latestTimeTick prometheus.Gauge
}
func (m *WriteAheadBufferMetrics) Observe(
total int,
bytes int,
earilestTimeTick uint64,
latestTimeTick uint64,
) {
m.total.Set(float64(total))
m.size.Set(float64(bytes))
m.earilestTimeTick.Set(tsoutil.PhysicalTimeSeconds(earilestTimeTick))
m.latestTimeTick.Set(tsoutil.PhysicalTimeSeconds(latestTimeTick))
}
func (m *WriteAheadBufferMetrics) Close() {
metrics.WALWriteAheadBufferEntryTotal.Delete(m.constLabel)
metrics.WALWriteAheadBufferSizeBytes.Delete(m.constLabel)
metrics.WALWriteAheadBufferEarliestTimeTick.Delete(m.constLabel)
metrics.WALWriteAheadBufferLatestTimeTick.Delete(m.constLabel)
metrics.WALWriteAheadBufferCapacityBytes.Delete(m.constLabel)
}

View File

@ -14,13 +14,30 @@ func NewScanMetrics(pchannel types.PChannelInfo) *ScanMetrics {
metrics.NodeIDLabelName: paramtable.GetStringNodeID(), metrics.NodeIDLabelName: paramtable.GetStringNodeID(),
metrics.WALChannelLabelName: pchannel.Name, metrics.WALChannelLabelName: pchannel.Name,
} }
catchupLabel, tailingLabel := make(prometheus.Labels), make(prometheus.Labels)
for k, v := range constLabel {
catchupLabel[k] = v
tailingLabel[k] = v
}
catchupLabel[metrics.WALScannerModelLabelName] = metrics.WALScannerModelCatchup
tailingLabel[metrics.WALScannerModelLabelName] = metrics.WALScannerModelTailing
return &ScanMetrics{ return &ScanMetrics{
constLabel: constLabel, constLabel: constLabel,
messageBytes: metrics.WALScanMessageBytes.With(constLabel), scannerTotal: metrics.WALScannerTotal.MustCurryWith(constLabel),
passMessageBytes: metrics.WALScanPassMessageBytes.With(constLabel), tailing: underlyingScannerMetrics{
messageTotal: metrics.WALScanMessageTotal.MustCurryWith(constLabel), messageBytes: metrics.WALScanMessageBytes.With(tailingLabel),
passMessageTotal: metrics.WALScanPassMessageTotal.MustCurryWith(constLabel), passMessageBytes: metrics.WALScanPassMessageBytes.With(tailingLabel),
timeTickViolationTotal: metrics.WALScanTimeTickViolationMessageTotal.MustCurryWith(constLabel), messageTotal: metrics.WALScanMessageTotal.MustCurryWith(tailingLabel),
passMessageTotal: metrics.WALScanPassMessageTotal.MustCurryWith(tailingLabel),
timeTickViolationTotal: metrics.WALScanTimeTickViolationMessageTotal.MustCurryWith(tailingLabel),
},
catchup: underlyingScannerMetrics{
messageBytes: metrics.WALScanMessageBytes.With(catchupLabel),
passMessageBytes: metrics.WALScanPassMessageBytes.With(catchupLabel),
messageTotal: metrics.WALScanMessageTotal.MustCurryWith(catchupLabel),
passMessageTotal: metrics.WALScanPassMessageTotal.MustCurryWith(catchupLabel),
timeTickViolationTotal: metrics.WALScanTimeTickViolationMessageTotal.MustCurryWith(catchupLabel),
},
txnTotal: metrics.WALScanTxnTotal.MustCurryWith(constLabel), txnTotal: metrics.WALScanTxnTotal.MustCurryWith(constLabel),
pendingQueueSize: metrics.WALScannerPendingQueueBytes.With(constLabel), pendingQueueSize: metrics.WALScannerPendingQueueBytes.With(constLabel),
timeTickBufSize: metrics.WALScannerTimeTickBufBytes.With(constLabel), timeTickBufSize: metrics.WALScannerTimeTickBufBytes.With(constLabel),
@ -30,32 +47,21 @@ func NewScanMetrics(pchannel types.PChannelInfo) *ScanMetrics {
type ScanMetrics struct { type ScanMetrics struct {
constLabel prometheus.Labels constLabel prometheus.Labels
messageBytes prometheus.Observer scannerTotal *prometheus.GaugeVec
passMessageBytes prometheus.Observer catchup underlyingScannerMetrics
messageTotal *prometheus.CounterVec tailing underlyingScannerMetrics
passMessageTotal *prometheus.CounterVec
timeTickViolationTotal *prometheus.CounterVec
txnTotal *prometheus.CounterVec txnTotal *prometheus.CounterVec
timeTickBufSize prometheus.Gauge timeTickBufSize prometheus.Gauge
txnBufSize prometheus.Gauge txnBufSize prometheus.Gauge
pendingQueueSize prometheus.Gauge pendingQueueSize prometheus.Gauge
} }
// ObserveMessage observes the message. type underlyingScannerMetrics struct {
func (m *ScanMetrics) ObserveMessage(msgType message.MessageType, bytes int) { messageBytes prometheus.Observer
m.messageBytes.Observe(float64(bytes)) passMessageBytes prometheus.Observer
m.messageTotal.WithLabelValues(msgType.String()).Inc() messageTotal *prometheus.CounterVec
} passMessageTotal *prometheus.CounterVec
timeTickViolationTotal *prometheus.CounterVec
// ObserveFilteredMessage observes the filtered message.
func (m *ScanMetrics) ObserveFilteredMessage(msgType message.MessageType, bytes int) {
m.passMessageBytes.Observe(float64(bytes))
m.passMessageTotal.WithLabelValues(msgType.String()).Inc()
}
// ObserveTimeTickViolation observes the time tick violation.
func (m *ScanMetrics) ObserveTimeTickViolation(msgType message.MessageType) {
m.timeTickViolationTotal.WithLabelValues(msgType.String()).Inc()
} }
// ObserveAutoCommitTxn observes the auto commit txn. // ObserveAutoCommitTxn observes the auto commit txn.
@ -80,8 +86,10 @@ func (m *ScanMetrics) ObserveExpiredTxn() {
// NewScannerMetrics creates a new scanner metrics. // NewScannerMetrics creates a new scanner metrics.
func (m *ScanMetrics) NewScannerMetrics() *ScannerMetrics { func (m *ScanMetrics) NewScannerMetrics() *ScannerMetrics {
m.scannerTotal.WithLabelValues(metrics.WALScannerModelCatchup).Inc()
return &ScannerMetrics{ return &ScannerMetrics{
ScanMetrics: m, ScanMetrics: m,
scannerModel: metrics.WALScannerModelCatchup,
previousTxnBufSize: 0, previousTxnBufSize: 0,
previousTimeTickBufSize: 0, previousTimeTickBufSize: 0,
previousPendingQueueSize: 0, previousPendingQueueSize: 0,
@ -90,7 +98,8 @@ func (m *ScanMetrics) NewScannerMetrics() *ScannerMetrics {
// Close closes the metrics. // Close closes the metrics.
func (m *ScanMetrics) Close() { func (m *ScanMetrics) Close() {
metrics.WALScanMessageBytes.Delete(m.constLabel) metrics.WALScannerTotal.DeletePartialMatch(m.constLabel)
metrics.WALScanMessageBytes.DeletePartialMatch(m.constLabel)
metrics.WALScanPassMessageBytes.DeletePartialMatch(m.constLabel) metrics.WALScanPassMessageBytes.DeletePartialMatch(m.constLabel)
metrics.WALScanMessageTotal.DeletePartialMatch(m.constLabel) metrics.WALScanMessageTotal.DeletePartialMatch(m.constLabel)
metrics.WALScanPassMessageTotal.DeletePartialMatch(m.constLabel) metrics.WALScanPassMessageTotal.DeletePartialMatch(m.constLabel)
@ -103,11 +112,48 @@ func (m *ScanMetrics) Close() {
type ScannerMetrics struct { type ScannerMetrics struct {
*ScanMetrics *ScanMetrics
scannerModel string
previousTxnBufSize int previousTxnBufSize int
previousTimeTickBufSize int previousTimeTickBufSize int
previousPendingQueueSize int previousPendingQueueSize int
} }
// SwitchModel switches the scanner model.
func (m *ScannerMetrics) SwitchModel(s string) {
m.scannerTotal.WithLabelValues(m.scannerModel).Dec()
m.scannerModel = s
m.scannerTotal.WithLabelValues(m.scannerModel).Inc()
}
// ObserveMessage observes the message.
func (m *ScannerMetrics) ObserveMessage(tailing bool, msgType message.MessageType, bytes int) {
underlying := m.catchup
if tailing {
underlying = m.tailing
}
underlying.messageBytes.Observe(float64(bytes))
underlying.messageTotal.WithLabelValues(msgType.String()).Inc()
}
// ObservePassedMessage observes the filtered message.
func (m *ScannerMetrics) ObservePassedMessage(tailing bool, msgType message.MessageType, bytes int) {
underlying := m.catchup
if tailing {
underlying = m.tailing
}
underlying.passMessageBytes.Observe(float64(bytes))
underlying.passMessageTotal.WithLabelValues(msgType.String()).Inc()
}
// ObserveTimeTickViolation observes the time tick violation.
func (m *ScannerMetrics) ObserveTimeTickViolation(tailing bool, msgType message.MessageType) {
underlying := m.catchup
if tailing {
underlying = m.tailing
}
underlying.timeTickViolationTotal.WithLabelValues(msgType.String()).Inc()
}
func (m *ScannerMetrics) UpdatePendingQueueSize(size int) { func (m *ScannerMetrics) UpdatePendingQueueSize(size int) {
diff := size - m.previousPendingQueueSize diff := size - m.previousPendingQueueSize
m.pendingQueueSize.Add(float64(diff)) m.pendingQueueSize.Add(float64(diff))
@ -130,4 +176,5 @@ func (m *ScannerMetrics) Close() {
m.UpdatePendingQueueSize(0) m.UpdatePendingQueueSize(0)
m.UpdateTimeTickBufSize(0) m.UpdateTimeTickBufSize(0)
m.UpdateTimeTickBufSize(0) m.UpdateTimeTickBufSize(0)
m.scannerTotal.WithLabelValues(m.scannerModel).Dec()
} }

View File

@ -5,8 +5,10 @@ import (
"time" "time"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap/zapcore"
"github.com/milvus-io/milvus/internal/util/streamingutil/status" "github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/metrics" "github.com/milvus-io/milvus/pkg/v2/metrics"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message" "github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/types" "github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
@ -33,10 +35,14 @@ func NewWriteMetrics(pchannel types.PChannelInfo, walName string) *WriteMetrics
total: metrics.WALAppendMessageTotal.MustCurryWith(constLabel), total: metrics.WALAppendMessageTotal.MustCurryWith(constLabel),
walDuration: metrics.WALAppendMessageDurationSeconds.MustCurryWith(constLabel), walDuration: metrics.WALAppendMessageDurationSeconds.MustCurryWith(constLabel),
walimplsDuration: metrics.WALImplsAppendMessageDurationSeconds.MustCurryWith(constLabel), walimplsDuration: metrics.WALImplsAppendMessageDurationSeconds.MustCurryWith(constLabel),
walBeforeInterceptorDuration: metrics.WALAppendMessageBeforeInterceptorDurationSeconds.MustCurryWith(constLabel),
walAfterInterceptorDuration: metrics.WALAppendMessageAfterInterceptorDurationSeconds.MustCurryWith(constLabel),
} }
} }
type WriteMetrics struct { type WriteMetrics struct {
log.Binder
walName string walName string
pchannel types.PChannelInfo pchannel types.PChannelInfo
constLabel prometheus.Labels constLabel prometheus.Labels
@ -44,18 +50,60 @@ type WriteMetrics struct {
total *prometheus.CounterVec total *prometheus.CounterVec
walDuration prometheus.ObserverVec walDuration prometheus.ObserverVec
walimplsDuration prometheus.ObserverVec walimplsDuration prometheus.ObserverVec
walBeforeInterceptorDuration prometheus.ObserverVec
walAfterInterceptorDuration prometheus.ObserverVec
} }
func (m *WriteMetrics) StartAppend(msgType message.MessageType, bytes int) *WriteGuard { func (m *WriteMetrics) StartAppend(msg message.MutableMessage) *AppendMetrics {
return &WriteGuard{ return &AppendMetrics{
startAppend: time.Now(), wm: m,
metrics: m, messageType: msg.MessageType(),
msgType: msgType, bytes: msg.EstimateSize(),
bytes: bytes, persisted: true,
interceptors: make(map[string][]*InterceptorMetrics),
}
}
func (m *WriteMetrics) done(appendMetrics *AppendMetrics) {
if !appendMetrics.persisted {
// ignore all the metrics if the message is not persisted.
return
}
status := parseError(appendMetrics.err)
if appendMetrics.implAppendDuration != 0 {
m.walimplsDuration.WithLabelValues(status).Observe(appendMetrics.implAppendDuration.Seconds())
}
m.bytes.WithLabelValues(status).Observe(float64(appendMetrics.bytes))
m.total.WithLabelValues(appendMetrics.messageType.String(), status).Inc()
m.walDuration.WithLabelValues(status).Observe(appendMetrics.appendDuration.Seconds())
for name, ims := range appendMetrics.interceptors {
for _, im := range ims {
if im.Before != 0 {
m.walBeforeInterceptorDuration.WithLabelValues(name).Observe(im.Before.Seconds())
}
if im.After != 0 {
m.walAfterInterceptorDuration.WithLabelValues(name).Observe(im.After.Seconds())
}
}
}
if appendMetrics.err != nil {
m.Logger().Warn("append message into wal failed", appendMetrics.IntoLogFields()...)
return
}
if appendMetrics.appendDuration >= time.Second {
// log slow append catch
m.Logger().Warn("append message into wal too slow", appendMetrics.IntoLogFields()...)
return
}
if m.Logger().Level().Enabled(zapcore.DebugLevel) {
m.Logger().Debug("append message into wal", appendMetrics.IntoLogFields()...)
} }
} }
func (m *WriteMetrics) Close() { func (m *WriteMetrics) Close() {
metrics.WALAppendMessageBeforeInterceptorDurationSeconds.DeletePartialMatch(m.constLabel)
metrics.WALAppendMessageAfterInterceptorDurationSeconds.DeletePartialMatch(m.constLabel)
metrics.WALAppendMessageBytes.DeletePartialMatch(m.constLabel) metrics.WALAppendMessageBytes.DeletePartialMatch(m.constLabel)
metrics.WALAppendMessageTotal.DeletePartialMatch(m.constLabel) metrics.WALAppendMessageTotal.DeletePartialMatch(m.constLabel)
metrics.WALAppendMessageDurationSeconds.DeletePartialMatch(m.constLabel) metrics.WALAppendMessageDurationSeconds.DeletePartialMatch(m.constLabel)
@ -68,40 +116,13 @@ func (m *WriteMetrics) Close() {
) )
} }
type WriteGuard struct {
startAppend time.Time
startImplAppend time.Time
implCost time.Duration
metrics *WriteMetrics
msgType message.MessageType
bytes int
}
func (g *WriteGuard) StartWALImplAppend() {
g.startImplAppend = time.Now()
}
func (g *WriteGuard) FinishWALImplAppend() {
g.implCost = time.Since(g.startImplAppend)
}
func (g *WriteGuard) Finish(err error) {
status := parseError(err)
if g.implCost != 0 {
g.metrics.walimplsDuration.WithLabelValues(status).Observe(g.implCost.Seconds())
}
g.metrics.bytes.WithLabelValues(status).Observe(float64(g.bytes))
g.metrics.total.WithLabelValues(g.msgType.String(), status).Inc()
g.metrics.walDuration.WithLabelValues(status).Observe(time.Since(g.startAppend).Seconds())
}
// parseError parses the error to status. // parseError parses the error to status.
func parseError(err error) string { func parseError(err error) string {
if err == nil { if err == nil {
return metrics.StreamingServiceClientStatusOK return metrics.WALStatusOK
} }
if status.IsCanceled(err) { if status.IsCanceled(err) {
return metrics.StreamingServiceClientStatusCancel return metrics.WALStatusCancel
} }
return metrics.StreamignServiceClientStatusError return metrics.WALStatusError
} }

View File

@ -6,6 +6,7 @@ import (
"google.golang.org/protobuf/reflect/protoreflect" "google.golang.org/protobuf/reflect/protoreflect"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/metricsutil"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message" "github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
) )
@ -15,6 +16,7 @@ type walCtxKey int
var ( var (
extraAppendResultValue walCtxKey = 1 extraAppendResultValue walCtxKey = 1
notPersistedValue walCtxKey = 2 notPersistedValue walCtxKey = 2
metricsValue walCtxKey = 3
) )
// ExtraAppendResult is the extra append result. // ExtraAppendResult is the extra append result.
@ -74,3 +76,13 @@ func ReplaceAppendResultTxnContext(ctx context.Context, txnCtx *message.TxnConte
result := ctx.Value(extraAppendResultValue) result := ctx.Value(extraAppendResultValue)
result.(*ExtraAppendResult).TxnCtx = txnCtx result.(*ExtraAppendResult).TxnCtx = txnCtx
} }
// WithAppendMetricsContext create a context with metrics recording.
func WithAppendMetricsContext(ctx context.Context, m *metricsutil.AppendMetrics) context.Context {
return context.WithValue(ctx, metricsValue, m)
}
// MustGetAppendMetrics get append metrics from context
func MustGetAppendMetrics(ctx context.Context) *metricsutil.AppendMetrics {
return ctx.Value(metricsValue).(*metricsutil.AppendMetrics)
}

View File

@ -11,15 +11,26 @@ import (
const ( const (
subsystemStreamingServiceClient = "streaming" subsystemStreamingServiceClient = "streaming"
subsystemWAL = "wal" subsystemWAL = "wal"
WALAccessModelRemote = "remote"
WALAccessModelLocal = "local"
WALScannerModelCatchup = "catchup"
WALScannerModelTailing = "tailing"
StreamingServiceClientStatusAvailable = "available" StreamingServiceClientStatusAvailable = "available"
StreamingServiceClientStatusUnavailable = "unavailable" StreamingServiceClientStatusUnavailable = "unavailable"
StreamingServiceClientStatusOK = "ok" WALStatusOK = "ok"
StreamingServiceClientStatusCancel = "cancel" WALStatusCancel = "cancel"
StreamignServiceClientStatusError = "error" WALStatusError = "error"
BroadcasterTaskStateLabelName = "state"
ResourceKeyDomainLabelName = "domain"
WALAccessModelLabelName = "access_model"
WALScannerModelLabelName = "scanner_model"
TimeTickSyncTypeLabelName = "type" TimeTickSyncTypeLabelName = "type"
TimeTickAckTypeLabelName = "type" TimeTickAckTypeLabelName = "type"
WALInterceptorLabelName = "interceptor_name"
WALTxnStateLabelName = "state" WALTxnStateLabelName = "state"
WALFlusherStateLabelName = "state"
WALStateLabelName = "state"
WALChannelLabelName = channelNameLabelName WALChannelLabelName = channelNameLabelName
WALSegmentSealPolicyNameLabelName = "policy" WALSegmentSealPolicyNameLabelName = "policy"
WALSegmentAllocStateLabelName = "state" WALSegmentAllocStateLabelName = "state"
@ -41,34 +52,48 @@ var (
secondsBuckets = prometheus.ExponentialBucketsRange(0.001, 5, 10) secondsBuckets = prometheus.ExponentialBucketsRange(0.001, 5, 10)
// Streaming Service Client Producer Metrics. // Streaming Service Client Producer Metrics.
StreamingServiceClientResumingProducerTotal = newStreamingServiceClientGaugeVec(prometheus.GaugeOpts{
Name: "resuming_producer_total",
Help: "Total of resuming producers",
}, WALChannelLabelName, StatusLabelName)
StreamingServiceClientProducerTotal = newStreamingServiceClientGaugeVec(prometheus.GaugeOpts{ StreamingServiceClientProducerTotal = newStreamingServiceClientGaugeVec(prometheus.GaugeOpts{
Name: "producer_total", Name: "producer_total",
Help: "Total of producers", Help: "Total of producers",
}, WALChannelLabelName, StatusLabelName) }, WALChannelLabelName, WALAccessModelLabelName)
StreamingServiceClientProduceInflightTotal = newStreamingServiceClientGaugeVec(prometheus.GaugeOpts{ StreamingServiceClientProduceTotal = newStreamingServiceClientCounterVec(prometheus.CounterOpts{
Name: "produce_inflight_total", Name: "produce_total",
Help: "Total of inflight produce request", Help: "Total of produce message",
}, WALChannelLabelName) }, WALChannelLabelName, WALAccessModelLabelName, StatusLabelName)
StreamingServiceClientProduceBytes = newStreamingServiceClientHistogramVec(prometheus.HistogramOpts{ StreamingServiceClientProduceBytes = newStreamingServiceClientCounterVec(prometheus.CounterOpts{
Name: "produce_bytes", Name: "produce_bytes",
Help: "Total of produce message",
}, WALChannelLabelName, WALAccessModelLabelName, StatusLabelName)
StreamingServiceClientSuccessProduceBytes = newStreamingServiceClientHistogramVec(prometheus.HistogramOpts{
Name: "produce_success_bytes",
Help: "Bytes of produced message", Help: "Bytes of produced message",
Buckets: messageBytesBuckets, Buckets: messageBytesBuckets,
}, WALChannelLabelName, StatusLabelName) }, WALChannelLabelName, WALAccessModelLabelName)
StreamingServiceClientProduceDurationSeconds = newStreamingServiceClientHistogramVec( StreamingServiceClientSuccessProduceDurationSeconds = newStreamingServiceClientHistogramVec(prometheus.HistogramOpts{
prometheus.HistogramOpts{ Name: "produce_success_duration_seconds",
Name: "produce_duration_seconds", Help: "Duration of produced message",
Help: "Duration of client produce",
Buckets: secondsBuckets, Buckets: secondsBuckets,
}, WALChannelLabelName, StatusLabelName) }, WALChannelLabelName, WALAccessModelLabelName)
// Streaming Service Client Consumer Metrics. // Streaming Service Client Consumer Metrics.
StreamingServiceClientResumingConsumerTotal = newStreamingServiceClientGaugeVec(prometheus.GaugeOpts{
Name: "resuming_consumer_total",
Help: "Total of resuming consumers",
}, WALChannelLabelName, StatusLabelName)
StreamingServiceClientConsumerTotal = newStreamingServiceClientGaugeVec(prometheus.GaugeOpts{ StreamingServiceClientConsumerTotal = newStreamingServiceClientGaugeVec(prometheus.GaugeOpts{
Name: "consumer_total", Name: "consumer_total",
Help: "Total of consumers", Help: "Total of consumers",
}, WALChannelLabelName, StatusLabelName) }, WALChannelLabelName, WALAccessModelLabelName)
StreamingServiceClientConsumeBytes = newStreamingServiceClientHistogramVec(prometheus.HistogramOpts{ StreamingServiceClientConsumeBytes = newStreamingServiceClientHistogramVec(prometheus.HistogramOpts{
Name: "consume_bytes", Name: "consume_bytes",
@ -76,16 +101,11 @@ var (
Buckets: messageBytesBuckets, Buckets: messageBytesBuckets,
}, WALChannelLabelName) }, WALChannelLabelName)
StreamingServiceClientConsumeInflightTotal = newStreamingServiceClientGaugeVec(prometheus.GaugeOpts{
Name: "consume_inflight_total",
Help: "Total of inflight consume body",
}, WALChannelLabelName)
// StreamingCoord metrics // StreamingCoord metrics
StreamingCoordPChannelInfo = newStreamingCoordGaugeVec(prometheus.GaugeOpts{ StreamingCoordPChannelInfo = newStreamingCoordGaugeVec(prometheus.GaugeOpts{
Name: "pchannel_info", Name: "pchannel_info",
Help: "Term of pchannels", Help: "Term of pchannels",
}, WALChannelLabelName, WALChannelTermLabelName, StreamingNodeLabelName) }, WALChannelLabelName, WALChannelTermLabelName, StreamingNodeLabelName, WALStateLabelName)
StreamingCoordAssignmentVersion = newStreamingCoordGaugeVec(prometheus.GaugeOpts{ StreamingCoordAssignmentVersion = newStreamingCoordGaugeVec(prometheus.GaugeOpts{
Name: "assignment_info", Name: "assignment_info",
@ -97,6 +117,34 @@ var (
Help: "Total of assignment listener", Help: "Total of assignment listener",
}) })
StreamingCoordBroadcasterTaskTotal = newStreamingCoordGaugeVec(prometheus.GaugeOpts{
Name: "broadcaster_task_total",
Help: "Total of broadcaster task",
}, BroadcasterTaskStateLabelName)
StreamingCoordBroadcastDurationSeconds = newStreamingCoordHistogramVec(prometheus.HistogramOpts{
Name: "broadcaster_broadcast_duration_seconds",
Help: "Duration of broadcast",
Buckets: secondsBuckets,
})
StreamingCoordBroadcasterAckAnyOneDurationSeconds = newStreamingCoordHistogramVec(prometheus.HistogramOpts{
Name: "broadcaster_ack_any_one_duration_seconds",
Help: "Duration of acknowledge any message",
Buckets: secondsBuckets,
})
StreamingCoordBroadcasterAckAllDurationSeconds = newStreamingCoordHistogramVec(prometheus.HistogramOpts{
Name: "broadcaster_ack_all_duration_seconds",
Help: "Duration of acknowledge all message",
Buckets: secondsBuckets,
})
StreamingCoordResourceKeyTotal = newStreamingCoordGaugeVec(prometheus.GaugeOpts{
Name: "resource_key_total",
Help: "Total of resource key hold at streaming coord",
}, ResourceKeyDomainLabelName)
// StreamingNode Producer Server Metrics. // StreamingNode Producer Server Metrics.
StreamingNodeProducerTotal = newStreamingNodeGaugeVec(prometheus.GaugeOpts{ StreamingNodeProducerTotal = newStreamingNodeGaugeVec(prometheus.GaugeOpts{
Name: "producer_total", Name: "producer_total",
@ -140,6 +188,11 @@ var (
WALAllocateTimeTickTotal = newWALCounterVec(prometheus.CounterOpts{ WALAllocateTimeTickTotal = newWALCounterVec(prometheus.CounterOpts{
Name: "allocate_time_tick_total", Name: "allocate_time_tick_total",
Help: "Total of allocated time tick on wal", Help: "Total of allocated time tick on wal",
}, WALChannelLabelName, StatusLabelName)
WALTimeTickAllocateDurationSeconds = newWALHistogramVec(prometheus.HistogramOpts{
Name: "allocate_time_tick_duration_seconds",
Help: "Duration of wal allocate time tick",
}, WALChannelLabelName) }, WALChannelLabelName)
WALLastConfirmedTimeTick = newWALGaugeVec(prometheus.GaugeOpts{ WALLastConfirmedTimeTick = newWALGaugeVec(prometheus.GaugeOpts{
@ -173,9 +226,10 @@ var (
Help: "Total of inflight txn on wal", Help: "Total of inflight txn on wal",
}, WALChannelLabelName) }, WALChannelLabelName)
WALFinishTxn = newWALCounterVec(prometheus.CounterOpts{ WALTxnDurationSeconds = newWALHistogramVec(prometheus.HistogramOpts{
Name: "finish_txn", Name: "txn_duration_seconds",
Help: "Total of finish txn on wal", Help: "Duration of wal txn",
Buckets: secondsBuckets,
}, WALChannelLabelName, WALTxnStateLabelName) }, WALChannelLabelName, WALTxnStateLabelName)
// Segment related metrics // Segment related metrics
@ -217,6 +271,18 @@ var (
Help: "Total of append message to wal", Help: "Total of append message to wal",
}, WALChannelLabelName, WALMessageTypeLabelName, StatusLabelName) }, WALChannelLabelName, WALMessageTypeLabelName, StatusLabelName)
WALAppendMessageBeforeInterceptorDurationSeconds = newWALHistogramVec(prometheus.HistogramOpts{
Name: "interceptor_before_append_duration_seconds",
Help: "Intercept duration before wal append message",
Buckets: secondsBuckets,
}, WALChannelLabelName, WALInterceptorLabelName)
WALAppendMessageAfterInterceptorDurationSeconds = newWALHistogramVec(prometheus.HistogramOpts{
Name: "interceptor_after_append_duration_seconds",
Help: "Intercept duration after wal append message",
Buckets: secondsBuckets,
}, WALChannelLabelName, WALInterceptorLabelName)
WALAppendMessageDurationSeconds = newWALHistogramVec(prometheus.HistogramOpts{ WALAppendMessageDurationSeconds = newWALHistogramVec(prometheus.HistogramOpts{
Name: "append_message_duration_seconds", Name: "append_message_duration_seconds",
Help: "Duration of wal append message", Help: "Duration of wal append message",
@ -229,38 +295,63 @@ var (
Buckets: secondsBuckets, Buckets: secondsBuckets,
}, WALChannelLabelName, StatusLabelName) }, WALChannelLabelName, StatusLabelName)
WALWriteAheadBufferEntryTotal = newWALGaugeVec(prometheus.GaugeOpts{
Name: "write_ahead_buffer_entry_total",
Help: "Total of write ahead buffer entry in wal",
}, WALChannelLabelName)
WALWriteAheadBufferSizeBytes = newWALGaugeVec(prometheus.GaugeOpts{
Name: "write_ahead_buffer_size_bytes",
Help: "Size of write ahead buffer in wal",
}, WALChannelLabelName)
WALWriteAheadBufferCapacityBytes = newWALGaugeVec(prometheus.GaugeOpts{
Name: "write_ahead_buffer_capacity_bytes",
Help: "Capacity of write ahead buffer in wal",
}, WALChannelLabelName)
WALWriteAheadBufferEarliestTimeTick = newWALGaugeVec(prometheus.GaugeOpts{
Name: "write_ahead_buffer_earliest_time_tick",
Help: "Earliest time tick of write ahead buffer in wal",
}, WALChannelLabelName)
WALWriteAheadBufferLatestTimeTick = newWALGaugeVec(prometheus.GaugeOpts{
Name: "write_ahead_buffer_latest_time_tick",
Help: "Latest time tick of write ahead buffer in wal",
}, WALChannelLabelName)
// Scanner Related Metrics // Scanner Related Metrics
WALScannerTotal = newWALGaugeVec(prometheus.GaugeOpts{ WALScannerTotal = newWALGaugeVec(prometheus.GaugeOpts{
Name: "scanner_total", Name: "scanner_total",
Help: "Total of wal scanner on current streaming node", Help: "Total of wal scanner on current streaming node",
}, WALChannelLabelName) }, WALChannelLabelName, WALScannerModelLabelName)
WALScanMessageBytes = newWALHistogramVec(prometheus.HistogramOpts{ WALScanMessageBytes = newWALHistogramVec(prometheus.HistogramOpts{
Name: "scan_message_bytes", Name: "scan_message_bytes",
Help: "Bytes of scanned message from wal", Help: "Bytes of scanned message from wal",
Buckets: messageBytesBuckets, Buckets: messageBytesBuckets,
}, WALChannelLabelName) }, WALChannelLabelName, WALScannerModelLabelName)
WALScanMessageTotal = newWALCounterVec(prometheus.CounterOpts{ WALScanMessageTotal = newWALCounterVec(prometheus.CounterOpts{
Name: "scan_message_total", Name: "scan_message_total",
Help: "Total of scanned message from wal", Help: "Total of scanned message from wal",
}, WALChannelLabelName, WALMessageTypeLabelName) }, WALChannelLabelName, WALMessageTypeLabelName, WALScannerModelLabelName)
WALScanPassMessageBytes = newWALHistogramVec(prometheus.HistogramOpts{ WALScanPassMessageBytes = newWALHistogramVec(prometheus.HistogramOpts{
Name: "scan_pass_message_bytes", Name: "scan_pass_message_bytes",
Help: "Bytes of pass (not filtered) scanned message from wal", Help: "Bytes of pass (not filtered) scanned message from wal",
Buckets: messageBytesBuckets, Buckets: messageBytesBuckets,
}, WALChannelLabelName) }, WALChannelLabelName, WALScannerModelLabelName)
WALScanPassMessageTotal = newWALCounterVec(prometheus.CounterOpts{ WALScanPassMessageTotal = newWALCounterVec(prometheus.CounterOpts{
Name: "scan_pass_message_total", Name: "scan_pass_message_total",
Help: "Total of pass (not filtered) scanned message from wal", Help: "Total of pass (not filtered) scanned message from wal",
}, WALChannelLabelName, WALMessageTypeLabelName) }, WALChannelLabelName, WALMessageTypeLabelName, WALScannerModelLabelName)
WALScanTimeTickViolationMessageTotal = newWALCounterVec(prometheus.CounterOpts{ WALScanTimeTickViolationMessageTotal = newWALCounterVec(prometheus.CounterOpts{
Name: "scan_time_tick_violation_message_total", Name: "scan_time_tick_violation_message_total",
Help: "Total of time tick violation message (dropped) from wal", Help: "Total of time tick violation message (dropped) from wal",
}, WALChannelLabelName, WALMessageTypeLabelName) }, WALChannelLabelName, WALMessageTypeLabelName, WALScannerModelLabelName)
WALScanTxnTotal = newWALCounterVec(prometheus.CounterOpts{ WALScanTxnTotal = newWALCounterVec(prometheus.CounterOpts{
Name: "scan_txn_total", Name: "scan_txn_total",
@ -281,19 +372,30 @@ var (
Name: "scanner_txn_buf_bytes", Name: "scanner_txn_buf_bytes",
Help: "Size of txn buffer in wal scanner", Help: "Size of txn buffer in wal scanner",
}, WALChannelLabelName) }, WALChannelLabelName)
WALFlusherInfo = newWALGaugeVec(prometheus.GaugeOpts{
Name: "flusher_info",
Help: "Current info of flusher on current wal",
}, WALChannelLabelName, WALChannelTermLabelName, WALFlusherStateLabelName)
WALFlusherTimeTick = newWALGaugeVec(prometheus.GaugeOpts{
Name: "flusher_time_tick",
Help: "the final timetick tick of flusher seen",
}, WALChannelLabelName, WALChannelTermLabelName)
) )
// RegisterStreamingServiceClient registers streaming service client metrics // RegisterStreamingServiceClient registers streaming service client metrics
func RegisterStreamingServiceClient(registry *prometheus.Registry) { func RegisterStreamingServiceClient(registry *prometheus.Registry) {
StreamingServiceClientRegisterOnce.Do(func() { StreamingServiceClientRegisterOnce.Do(func() {
registry.MustRegister(StreamingServiceClientResumingProducerTotal)
registry.MustRegister(StreamingServiceClientProducerTotal) registry.MustRegister(StreamingServiceClientProducerTotal)
registry.MustRegister(StreamingServiceClientProduceInflightTotal) registry.MustRegister(StreamingServiceClientProduceTotal)
registry.MustRegister(StreamingServiceClientProduceBytes) registry.MustRegister(StreamingServiceClientProduceBytes)
registry.MustRegister(StreamingServiceClientProduceDurationSeconds) registry.MustRegister(StreamingServiceClientSuccessProduceBytes)
registry.MustRegister(StreamingServiceClientSuccessProduceDurationSeconds)
registry.MustRegister(StreamingServiceClientResumingConsumerTotal)
registry.MustRegister(StreamingServiceClientConsumerTotal) registry.MustRegister(StreamingServiceClientConsumerTotal)
registry.MustRegister(StreamingServiceClientConsumeBytes) registry.MustRegister(StreamingServiceClientConsumeBytes)
registry.MustRegister(StreamingServiceClientConsumeInflightTotal)
}) })
} }
@ -302,13 +404,17 @@ func registerStreamingCoord(registry *prometheus.Registry) {
registry.MustRegister(StreamingCoordPChannelInfo) registry.MustRegister(StreamingCoordPChannelInfo)
registry.MustRegister(StreamingCoordAssignmentVersion) registry.MustRegister(StreamingCoordAssignmentVersion)
registry.MustRegister(StreamingCoordAssignmentListenerTotal) registry.MustRegister(StreamingCoordAssignmentListenerTotal)
registry.MustRegister(StreamingCoordBroadcasterTaskTotal)
registry.MustRegister(StreamingCoordBroadcastDurationSeconds)
registry.MustRegister(StreamingCoordBroadcasterAckAnyOneDurationSeconds)
registry.MustRegister(StreamingCoordBroadcasterAckAllDurationSeconds)
registry.MustRegister(StreamingCoordResourceKeyTotal)
} }
// RegisterStreamingNode registers streaming node metrics // RegisterStreamingNode registers streaming node metrics
func RegisterStreamingNode(registry *prometheus.Registry) { func RegisterStreamingNode(registry *prometheus.Registry) {
registry.MustRegister(StreamingNodeProducerTotal) registry.MustRegister(StreamingNodeProducerTotal)
registry.MustRegister(StreamingNodeProduceInflightTotal) registry.MustRegister(StreamingNodeProduceInflightTotal)
registry.MustRegister(StreamingNodeConsumerTotal) registry.MustRegister(StreamingNodeConsumerTotal)
registry.MustRegister(StreamingNodeConsumeInflightTotal) registry.MustRegister(StreamingNodeConsumeInflightTotal)
registry.MustRegister(StreamingNodeConsumeBytes) registry.MustRegister(StreamingNodeConsumeBytes)
@ -321,13 +427,14 @@ func registerWAL(registry *prometheus.Registry) {
registry.MustRegister(WALInfo) registry.MustRegister(WALInfo)
registry.MustRegister(WALLastAllocatedTimeTick) registry.MustRegister(WALLastAllocatedTimeTick)
registry.MustRegister(WALAllocateTimeTickTotal) registry.MustRegister(WALAllocateTimeTickTotal)
registry.MustRegister(WALTimeTickAllocateDurationSeconds)
registry.MustRegister(WALLastConfirmedTimeTick) registry.MustRegister(WALLastConfirmedTimeTick)
registry.MustRegister(WALAcknowledgeTimeTickTotal) registry.MustRegister(WALAcknowledgeTimeTickTotal)
registry.MustRegister(WALSyncTimeTickTotal) registry.MustRegister(WALSyncTimeTickTotal)
registry.MustRegister(WALTimeTickSyncTotal) registry.MustRegister(WALTimeTickSyncTotal)
registry.MustRegister(WALTimeTickSyncTimeTick) registry.MustRegister(WALTimeTickSyncTimeTick)
registry.MustRegister(WALInflightTxn) registry.MustRegister(WALInflightTxn)
registry.MustRegister(WALFinishTxn) registry.MustRegister(WALTxnDurationSeconds)
registry.MustRegister(WALSegmentAllocTotal) registry.MustRegister(WALSegmentAllocTotal)
registry.MustRegister(WALSegmentFlushedTotal) registry.MustRegister(WALSegmentFlushedTotal)
registry.MustRegister(WALSegmentBytes) registry.MustRegister(WALSegmentBytes)
@ -335,8 +442,15 @@ func registerWAL(registry *prometheus.Registry) {
registry.MustRegister(WALCollectionTotal) registry.MustRegister(WALCollectionTotal)
registry.MustRegister(WALAppendMessageBytes) registry.MustRegister(WALAppendMessageBytes)
registry.MustRegister(WALAppendMessageTotal) registry.MustRegister(WALAppendMessageTotal)
registry.MustRegister(WALAppendMessageBeforeInterceptorDurationSeconds)
registry.MustRegister(WALAppendMessageAfterInterceptorDurationSeconds)
registry.MustRegister(WALAppendMessageDurationSeconds) registry.MustRegister(WALAppendMessageDurationSeconds)
registry.MustRegister(WALImplsAppendMessageDurationSeconds) registry.MustRegister(WALImplsAppendMessageDurationSeconds)
registry.MustRegister(WALWriteAheadBufferEntryTotal)
registry.MustRegister(WALWriteAheadBufferSizeBytes)
registry.MustRegister(WALWriteAheadBufferCapacityBytes)
registry.MustRegister(WALWriteAheadBufferEarliestTimeTick)
registry.MustRegister(WALWriteAheadBufferLatestTimeTick)
registry.MustRegister(WALScannerTotal) registry.MustRegister(WALScannerTotal)
registry.MustRegister(WALScanMessageBytes) registry.MustRegister(WALScanMessageBytes)
registry.MustRegister(WALScanMessageTotal) registry.MustRegister(WALScanMessageTotal)
@ -347,6 +461,8 @@ func registerWAL(registry *prometheus.Registry) {
registry.MustRegister(WALScannerPendingQueueBytes) registry.MustRegister(WALScannerPendingQueueBytes)
registry.MustRegister(WALScannerTimeTickBufBytes) registry.MustRegister(WALScannerTimeTickBufBytes)
registry.MustRegister(WALScannerTxnBufBytes) registry.MustRegister(WALScannerTxnBufBytes)
registry.MustRegister(WALFlusherInfo)
registry.MustRegister(WALFlusherTimeTick)
} }
func newStreamingCoordGaugeVec(opts prometheus.GaugeOpts, extra ...string) *prometheus.GaugeVec { func newStreamingCoordGaugeVec(opts prometheus.GaugeOpts, extra ...string) *prometheus.GaugeVec {
@ -356,6 +472,13 @@ func newStreamingCoordGaugeVec(opts prometheus.GaugeOpts, extra ...string) *prom
return prometheus.NewGaugeVec(opts, labels) return prometheus.NewGaugeVec(opts, labels)
} }
func newStreamingCoordHistogramVec(opts prometheus.HistogramOpts, extra ...string) *prometheus.HistogramVec {
opts.Namespace = milvusNamespace
opts.Subsystem = typeutil.StreamingCoordRole
labels := mergeLabel(extra...)
return prometheus.NewHistogramVec(opts, labels)
}
func newStreamingServiceClientGaugeVec(opts prometheus.GaugeOpts, extra ...string) *prometheus.GaugeVec { func newStreamingServiceClientGaugeVec(opts prometheus.GaugeOpts, extra ...string) *prometheus.GaugeVec {
opts.Namespace = milvusNamespace opts.Namespace = milvusNamespace
opts.Subsystem = subsystemStreamingServiceClient opts.Subsystem = subsystemStreamingServiceClient
@ -363,6 +486,13 @@ func newStreamingServiceClientGaugeVec(opts prometheus.GaugeOpts, extra ...strin
return prometheus.NewGaugeVec(opts, labels) return prometheus.NewGaugeVec(opts, labels)
} }
func newStreamingServiceClientCounterVec(opts prometheus.CounterOpts, extra ...string) *prometheus.CounterVec {
opts.Namespace = milvusNamespace
opts.Subsystem = subsystemStreamingServiceClient
labels := mergeLabel(extra...)
return prometheus.NewCounterVec(opts, labels)
}
func newStreamingServiceClientHistogramVec(opts prometheus.HistogramOpts, extra ...string) *prometheus.HistogramVec { func newStreamingServiceClientHistogramVec(opts prometheus.HistogramOpts, extra ...string) *prometheus.HistogramVec {
opts.Namespace = milvusNamespace opts.Namespace = milvusNamespace
opts.Subsystem = subsystemStreamingServiceClient opts.Subsystem = subsystemStreamingServiceClient