enhance: add streaming client metrics (#36523)

issue: #33285

---------

Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
Zhen Ye 2024-10-08 21:25:19 +08:00 committed by GitHub
parent 8b867aae63
commit 2ec6e602d6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
65 changed files with 1729 additions and 273 deletions

View File

@ -11,10 +11,8 @@ import (
"github.com/milvus-io/milvus/internal/streamingnode/client/handler"
"github.com/milvus-io/milvus/internal/streamingnode/client/handler/consumer"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/options"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/syncutil"
)
@ -37,6 +35,7 @@ func NewResumableConsumer(factory factory, opts *ConsumerOptions) ResumableConsu
},
factory: factory,
consumeErr: syncutil.NewFuture[error](),
metrics: newConsumerMetrics(opts.PChannel),
}
go consumer.resumeLoop()
return consumer
@ -54,6 +53,7 @@ type resumableConsumerImpl struct {
mh *timeTickOrderMessageHandler
factory factory
consumeErr *syncutil.Future[error]
metrics *consumerMetrics
}
type factory = func(ctx context.Context, opts *handler.ConsumerOptions) (consumer.Consumer, error)
@ -63,6 +63,7 @@ func (rc *resumableConsumerImpl) resumeLoop() {
defer func() {
// close the message handler.
rc.mh.Close()
rc.metrics.IntoUnavailable()
rc.logger.Info("resumable consumer is closed")
close(rc.resumingExitCh)
}()
@ -71,11 +72,17 @@ func (rc *resumableConsumerImpl) resumeLoop() {
deliverPolicy := rc.opts.DeliverPolicy
deliverFilters := rc.opts.DeliverFilters
// consumer need to resume when error occur, so message handler shouldn't close if the internal consumer encounter failure.
nopCloseMH := message.NopCloseHandler{
nopCloseMH := nopCloseHandler{
Handler: rc.mh,
HandleInterceptor: func(msg message.ImmutableMessage, handle func(message.ImmutableMessage)) {
g := rc.metrics.StartConsume(msg.EstimateSize())
handle(msg)
g.Finish()
},
}
for {
rc.metrics.IntoUnavailable()
// Get last checkpoint sent.
// Consume ordering is always time tick order now.
if rc.mh.lastConfirmedMessageID != nil {
@ -104,6 +111,7 @@ func (rc *resumableConsumerImpl) resumeLoop() {
rc.consumeErr.Set(err)
return
}
rc.metrics.IntoAvailable()
// Wait until the consumer is unavailable or context canceled.
if err := rc.waitUntilUnavailable(consumer); err != nil {
@ -114,10 +122,6 @@ func (rc *resumableConsumerImpl) resumeLoop() {
}
func (rc *resumableConsumerImpl) createNewConsumer(opts *handler.ConsumerOptions) (consumer.Consumer, error) {
// Mark as unavailable.
metrics.StreamingServiceClientConsumerTotal.WithLabelValues(paramtable.GetStringNodeID(), metrics.StreamingServiceClientProducerUnAvailable).Inc()
defer metrics.StreamingServiceClientConsumerTotal.WithLabelValues(paramtable.GetStringNodeID(), metrics.StreamingServiceClientProducerUnAvailable).Dec()
logger := rc.logger.With(zap.Any("deliverPolicy", opts.DeliverPolicy))
backoff := backoff.NewExponentialBackOff()
@ -145,14 +149,11 @@ func (rc *resumableConsumerImpl) createNewConsumer(opts *handler.ConsumerOptions
// waitUntilUnavailable is used to wait until the consumer is unavailable or context canceled.
func (rc *resumableConsumerImpl) waitUntilUnavailable(consumer handler.Consumer) error {
// Mark as available.
metrics.StreamingServiceClientConsumerTotal.WithLabelValues(paramtable.GetStringNodeID(), metrics.StreamingServiceClientProducerAvailable).Inc()
defer func() {
consumer.Close()
if consumer.Error() != nil {
rc.logger.Warn("consumer is closed with error", zap.Error(consumer.Error()))
}
metrics.StreamingServiceClientConsumerTotal.WithLabelValues(paramtable.GetStringNodeID(), metrics.StreamingServiceClientProducerAvailable).Dec()
}()
select {
@ -191,6 +192,7 @@ func (rc *resumableConsumerImpl) Close() {
// force close is applied by cancel context if graceful close is failed.
rc.cancel()
<-rc.resumingExitCh
rc.metrics.Close()
}
// Done returns a channel which will be closed when scanner is finished or closed.

View File

@ -6,6 +6,7 @@ import (
"time"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/mocks/streamingnode/client/handler/mock_consumer"
"github.com/milvus-io/milvus/internal/streamingnode/client/handler"
@ -69,3 +70,18 @@ func TestResumableConsumer(t *testing.T) {
rc.Close()
<-rc.Done()
}
func TestHandler(t *testing.T) {
ch := make(chan message.ImmutableMessage, 100)
hNop := nopCloseHandler{
Handler: message.ChanMessageHandler(ch),
}
hNop.Handle(nil)
assert.Nil(t, <-ch)
hNop.Close()
select {
case <-ch:
panic("should not be closed")
default:
}
}

View File

@ -0,0 +1,22 @@
package consumer
import "github.com/milvus-io/milvus/pkg/streaming/util/message"
// nopCloseHandler is a handler that do nothing when close.
type nopCloseHandler struct {
message.Handler
HandleInterceptor func(msg message.ImmutableMessage, handle func(message.ImmutableMessage))
}
// Handle is the callback for handling message.
func (nch nopCloseHandler) Handle(msg message.ImmutableMessage) {
if nch.HandleInterceptor != nil {
nch.HandleInterceptor(msg, nch.Handler.Handle)
return
}
nch.Handler.Handle(msg)
}
// Close is called after all messages are handled or handling is interrupted.
func (nch nopCloseHandler) Close() {
}

View File

@ -1,9 +1,7 @@
package consumer
import (
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
// timeTickOrderMessageHandler is a message handler that will do metrics and record the last sent message id.
@ -16,14 +14,11 @@ type timeTickOrderMessageHandler struct {
func (mh *timeTickOrderMessageHandler) Handle(msg message.ImmutableMessage) {
lastConfirmedMessageID := msg.LastConfirmedMessageID()
timetick := msg.TimeTick()
messageSize := msg.EstimateSize()
mh.inner.Handle(msg)
mh.lastConfirmedMessageID = lastConfirmedMessageID
mh.lastTimeTick = timetick
// Do a metric here.
metrics.StreamingServiceClientConsumeBytes.WithLabelValues(paramtable.GetStringNodeID()).Observe(float64(messageSize))
}
func (mh *timeTickOrderMessageHandler) Close() {

View File

@ -0,0 +1,79 @@
package consumer
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
// newConsumerMetrics creates a new producer metrics.
func newConsumerMetrics(pchannel string) *consumerMetrics {
constLabel := prometheus.Labels{
metrics.NodeIDLabelName: paramtable.GetStringNodeID(),
metrics.WALChannelLabelName: pchannel,
}
m := &consumerMetrics{
available: false,
clientTotal: metrics.StreamingServiceClientConsumerTotal.MustCurryWith(constLabel),
inflightTotal: metrics.StreamingServiceClientConsumeInflightTotal.With(constLabel),
bytes: metrics.StreamingServiceClientConsumeBytes.With(constLabel),
}
m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusUnavailable).Inc()
return m
}
// consumerMetrics is the metrics for producer.
type consumerMetrics struct {
available bool
clientTotal *prometheus.GaugeVec
inflightTotal prometheus.Gauge
bytes prometheus.Observer
}
// IntoUnavailable sets the producer metrics to unavailable.
func (m *consumerMetrics) IntoUnavailable() {
if !m.available {
return
}
m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusUnavailable).Inc()
m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusAvailable).Dec()
m.available = false
}
// IntoAvailable sets the producer metrics to available.
func (m *consumerMetrics) IntoAvailable() {
if m.available {
return
}
m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusAvailable).Inc()
m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusUnavailable).Dec()
m.available = true
}
// StartConsume starts a consume operation.
func (m *consumerMetrics) StartConsume(bytes int) consumerMetricsGuard {
m.inflightTotal.Inc()
return consumerMetricsGuard{
metrics: m,
bytes: bytes,
}
}
func (m *consumerMetrics) Close() {
if m.available {
m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusAvailable).Dec()
} else {
m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusUnavailable).Dec()
}
}
type consumerMetricsGuard struct {
metrics *consumerMetrics
bytes int
}
func (g consumerMetricsGuard) Finish() {
g.metrics.inflightTotal.Dec()
g.metrics.bytes.Observe(float64(g.bytes))
}

View File

@ -0,0 +1,101 @@
package producer
import (
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
// newProducerMetrics creates a new producer metrics.
func newProducerMetrics(pchannel string) *producerMetrics {
constLabel := prometheus.Labels{
metrics.NodeIDLabelName: paramtable.GetStringNodeID(),
metrics.WALChannelLabelName: pchannel,
}
m := &producerMetrics{
available: false,
clientTotal: metrics.StreamingServiceClientProducerTotal.MustCurryWith(constLabel),
inflightTotal: metrics.StreamingServiceClientProduceInflightTotal.With(constLabel),
bytes: metrics.StreamingServiceClientProduceBytes.MustCurryWith(constLabel),
durationSeconds: metrics.StreamingServiceClientProduceDurationSeconds.MustCurryWith(constLabel),
}
m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusUnavailable).Inc()
return m
}
// producerMetrics is the metrics for producer.
type producerMetrics struct {
available bool
clientTotal *prometheus.GaugeVec
inflightTotal prometheus.Gauge
bytes prometheus.ObserverVec
durationSeconds prometheus.ObserverVec
}
// IntoUnavailable sets the producer metrics to unavailable.
func (m *producerMetrics) IntoUnavailable() {
if !m.available {
return
}
m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusUnavailable).Inc()
m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusAvailable).Dec()
m.available = false
}
// IntoAvailable sets the producer metrics to available.
func (m *producerMetrics) IntoAvailable() {
if m.available {
return
}
m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusAvailable).Inc()
m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusUnavailable).Dec()
m.available = true
}
// StartProduce starts the produce metrics.
func (m *producerMetrics) StartProduce(bytes int) produceMetricsGuard {
m.inflightTotal.Inc()
return produceMetricsGuard{
start: time.Now(),
bytes: bytes,
metrics: m,
}
}
func (m *producerMetrics) Close() {
if m.available {
m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusAvailable).Dec()
} else {
m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusUnavailable).Dec()
}
}
// produceMetricsGuard is the guard for produce metrics.
type produceMetricsGuard struct {
start time.Time
bytes int
metrics *producerMetrics
}
// Finish finishes the produce metrics.
func (g produceMetricsGuard) Finish(err error) {
status := parseError(err)
g.metrics.bytes.WithLabelValues(status).Observe(float64(g.bytes))
g.metrics.durationSeconds.WithLabelValues(status).Observe(time.Since(g.start).Seconds())
g.metrics.inflightTotal.Dec()
}
// parseError parses the error to status.
func parseError(err error) string {
if err == nil {
return metrics.StreamingServiceClientStatusOK
}
if status.IsCanceled(err) {
return metrics.StreamingServiceClientStatusCancel
}
return metrics.StreamignServiceClientStatusError
}

View File

@ -14,10 +14,8 @@ import (
"github.com/milvus-io/milvus/internal/streamingnode/client/handler/producer"
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/syncutil"
)
@ -44,6 +42,7 @@ func NewResumableProducer(f factory, opts *ProducerOptions) *ResumableProducer {
producer: newProducerWithResumingError(), // lazy initialized.
cond: syncutil.NewContextCond(&sync.Mutex{}),
factory: f,
metrics: newProducerMetrics(opts.PChannel),
}
go p.resumeLoop()
return p
@ -73,14 +72,20 @@ type ResumableProducer struct {
// factory is used to create a new producer.
factory factory
metrics *producerMetrics
}
// Produce produce a new message to log service.
func (p *ResumableProducer) Produce(ctx context.Context, msg message.MutableMessage) (*producer.ProduceResult, error) {
func (p *ResumableProducer) Produce(ctx context.Context, msg message.MutableMessage) (result *producer.ProduceResult, err error) {
if p.lifetime.Add(lifetime.IsWorking) != nil {
return nil, errors.Wrapf(errs.ErrClosed, "produce on closed producer")
}
defer p.lifetime.Done()
metricGuard := p.metrics.StartProduce(msg.EstimateSize())
defer func() {
metricGuard.Finish(err)
p.lifetime.Done()
}()
for {
// get producer.
@ -112,15 +117,18 @@ func (p *ResumableProducer) Produce(ctx context.Context, msg message.MutableMess
func (p *ResumableProducer) resumeLoop() {
defer func() {
p.logger.Info("stop resuming")
p.metrics.IntoUnavailable()
close(p.resumingExitCh)
}()
for {
p.metrics.IntoUnavailable()
producer, err := p.createNewProducer()
p.producer.SwapProducer(producer, err)
if err != nil {
return
}
p.metrics.IntoAvailable()
// Wait until the new producer is unavailable, trigger a new swap operation.
if err := p.waitUntilUnavailable(producer); err != nil {
@ -132,10 +140,6 @@ func (p *ResumableProducer) resumeLoop() {
// waitUntilUnavailable is used to wait until the producer is unavailable or context canceled.
func (p *ResumableProducer) waitUntilUnavailable(producer handler.Producer) error {
// Mark as available.
metrics.StreamingServiceClientProducerTotal.WithLabelValues(paramtable.GetStringNodeID(), metrics.StreamingServiceClientProducerAvailable).Inc()
defer metrics.StreamingServiceClientProducerTotal.WithLabelValues(paramtable.GetStringNodeID(), metrics.StreamingServiceClientProducerAvailable).Dec()
select {
case <-p.stopResumingCh:
return errGracefulShutdown
@ -204,4 +208,5 @@ func (p *ResumableProducer) Close() {
// force close is applied by cancel context if graceful close is failed.
p.cancel()
<-p.resumingExitCh
p.metrics.Close()
}

View File

@ -75,6 +75,7 @@ func TestResumableProducer(t *testing.T) {
})
msg := mock_message.NewMockMutableMessage(t)
msg.EXPECT().EstimateSize().Return(100)
id, err := rp.Produce(context.Background(), msg)
assert.NotNil(t, id)
assert.NoError(t, err)

View File

@ -7,7 +7,6 @@ import (
"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus/internal/streamingcoord/server/resource"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/paramtable"
@ -19,7 +18,7 @@ var ErrChannelNotExist = errors.New("channel not exist")
// RecoverChannelManager creates a new channel manager.
func RecoverChannelManager(ctx context.Context, incomingChannel ...string) (*ChannelManager, error) {
channels, err := recoverFromConfigurationAndMeta(ctx, incomingChannel...)
channels, metrics, err := recoverFromConfigurationAndMeta(ctx, incomingChannel...)
if err != nil {
return nil, err
}
@ -31,19 +30,24 @@ func RecoverChannelManager(ctx context.Context, incomingChannel ...string) (*Cha
Global: globalVersion, // global version should be keep increasing globally, it's ok to use node id.
Local: 0,
},
metrics: metrics,
}, nil
}
// recoverFromConfigurationAndMeta recovers the channel manager from configuration and meta.
func recoverFromConfigurationAndMeta(ctx context.Context, incomingChannel ...string) (map[string]*PChannelMeta, error) {
func recoverFromConfigurationAndMeta(ctx context.Context, incomingChannel ...string) (map[string]*PChannelMeta, *channelMetrics, error) {
// Recover metrics.
metrics := newPChannelMetrics()
// Get all channels from meta.
channelMetas, err := resource.Resource().StreamingCatalog().ListPChannel(ctx)
if err != nil {
return nil, err
return nil, metrics, err
}
channels := make(map[string]*PChannelMeta, len(channelMetas))
for _, channel := range channelMetas {
metrics.AssignPChannelStatus(channel)
channels[channel.GetChannel().GetName()] = newPChannelMetaFromProto(channel)
}
@ -53,7 +57,7 @@ func recoverFromConfigurationAndMeta(ctx context.Context, incomingChannel ...str
channels[newChannel] = newPChannelMeta(newChannel)
}
}
return channels, nil
return channels, metrics, nil
}
// ChannelManager manages the channels.
@ -63,6 +67,7 @@ type ChannelManager struct {
cond *syncutil.ContextCond
channels map[string]*PChannelMeta
version typeutil.VersionInt64Pair
metrics *channelMetrics
}
// CurrentPChannelsView returns the current view of pchannels.
@ -102,10 +107,10 @@ func (cm *ChannelManager) AssignPChannels(ctx context.Context, pChannelToStreami
if err != nil {
return nil, err
}
updates := make(map[string]*PChannelMeta, len(pChannelMetas))
for _, pchannel := range pChannelMetas {
updates[pchannel.GetChannel().GetName()] = newPChannelMetaFromProto(pchannel)
cm.metrics.AssignPChannelStatus(pchannel)
}
return updates, nil
}
@ -119,6 +124,7 @@ func (cm *ChannelManager) AssignPChannelsDone(ctx context.Context, pChannels []s
defer cm.cond.L.Unlock()
// modified channels.
histories := make([]types.PChannelInfoAssigned, 0, len(pChannels))
pChannelMetas := make([]*streamingpb.PChannelMeta, 0, len(pChannels))
for _, channelName := range pChannels {
pchannel, ok := cm.channels[channelName]
@ -126,11 +132,22 @@ func (cm *ChannelManager) AssignPChannelsDone(ctx context.Context, pChannels []s
return ErrChannelNotExist
}
mutablePChannel := pchannel.CopyForWrite()
mutablePChannel.AssignToServerDone()
histories = append(histories, mutablePChannel.AssignToServerDone()...)
pChannelMetas = append(pChannelMetas, mutablePChannel.IntoRawMeta())
}
return cm.updatePChannelMeta(ctx, pChannelMetas)
if err := cm.updatePChannelMeta(ctx, pChannelMetas); err != nil {
return err
}
// Update metrics.
for _, history := range histories {
cm.metrics.RemovePChannelStatus(history)
}
for _, pchannel := range pChannelMetas {
cm.metrics.AssignPChannelStatus(pchannel)
}
return nil
}
// MarkAsUnavailable mark the pchannels as unavailable.
@ -150,7 +167,13 @@ func (cm *ChannelManager) MarkAsUnavailable(ctx context.Context, pChannels []typ
pChannelMetas = append(pChannelMetas, mutablePChannel.IntoRawMeta())
}
return cm.updatePChannelMeta(ctx, pChannelMetas)
if err := cm.updatePChannelMeta(ctx, pChannelMetas); err != nil {
return err
}
for _, pchannel := range pChannelMetas {
cm.metrics.AssignPChannelStatus(pchannel)
}
return nil
}
// updatePChannelMeta updates the pchannel metas.
@ -168,9 +191,7 @@ func (cm *ChannelManager) updatePChannelMeta(ctx context.Context, pChannelMetas
}
cm.version.Local++
// update metrics.
metrics.StreamingCoordAssignmentVersion.WithLabelValues(
paramtable.GetStringNodeID(),
).Set(float64(cm.version.Local))
cm.metrics.UpdateAssignmentVersion(cm.version.Local)
return nil
}

View File

@ -0,0 +1,48 @@
package channel
import (
"strconv"
"github.com/prometheus/client_golang/prometheus"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
func newPChannelMetrics() *channelMetrics {
constLabel := prometheus.Labels{metrics.NodeIDLabelName: paramtable.GetStringNodeID()}
return &channelMetrics{
pchannelInfo: metrics.StreamingCoordPChannelInfo.MustCurryWith(constLabel),
assignmentVersion: metrics.StreamingCoordAssignmentVersion.With(constLabel),
}
}
type channelMetrics struct {
pchannelInfo *prometheus.GaugeVec
assignmentVersion prometheus.Gauge
}
// RemovePChannelStatus removes the pchannel status metric
func (m *channelMetrics) RemovePChannelStatus(assigned types.PChannelInfoAssigned) {
m.pchannelInfo.Delete(prometheus.Labels{
metrics.WALChannelLabelName: assigned.Channel.Name,
metrics.WALChannelTermLabelName: strconv.FormatInt(assigned.Channel.Term, 10),
metrics.StreamingNodeLabelName: strconv.FormatInt(assigned.Node.ServerID, 10),
})
}
// AssignPChannelStatus assigns the pchannel status metric
func (m *channelMetrics) AssignPChannelStatus(meta *streamingpb.PChannelMeta) {
m.pchannelInfo.With(prometheus.Labels{
metrics.WALChannelLabelName: meta.GetChannel().GetName(),
metrics.WALChannelTermLabelName: strconv.FormatInt(meta.GetChannel().GetTerm(), 10),
metrics.StreamingNodeLabelName: strconv.FormatInt(meta.GetNode().GetServerId(), 10),
}).Set(float64(meta.GetState()))
}
// UpdateAssignmentVersion updates the assignment version metric
func (m *channelMetrics) UpdateAssignmentVersion(version int64) {
m.assignmentVersion.Set(float64(version))
}

View File

@ -127,11 +127,23 @@ func (m *mutablePChannel) TryAssignToServerID(streamingNode types.StreamingNodeI
}
// AssignToServerDone assigns the channel to the server done.
func (m *mutablePChannel) AssignToServerDone() {
func (m *mutablePChannel) AssignToServerDone() []types.PChannelInfoAssigned {
var history []types.PChannelInfoAssigned
if m.inner.State == streamingpb.PChannelMetaState_PCHANNEL_META_STATE_ASSIGNING {
history = make([]types.PChannelInfoAssigned, 0, len(m.inner.Histories))
for _, h := range m.inner.Histories {
history = append(history, types.PChannelInfoAssigned{
Channel: types.PChannelInfo{
Name: m.inner.Channel.Name,
Term: h.GetTerm(),
},
Node: types.NewStreamingNodeInfoFromProto(h.Node),
})
}
m.inner.Histories = make([]*streamingpb.PChannelAssignmentLog, 0)
m.inner.State = streamingpb.PChannelMetaState_PCHANNEL_META_STATE_ASSIGNED
}
return history
}
// MarkAsUnavailable marks the channel as unavailable.

View File

@ -1,6 +1,8 @@
package service
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/milvus-io/milvus/internal/streamingcoord/server/balancer"
"github.com/milvus-io/milvus/internal/streamingcoord/server/service/discover"
"github.com/milvus-io/milvus/pkg/metrics"
@ -15,7 +17,8 @@ func NewAssignmentService(
balancer balancer.Balancer,
) streamingpb.StreamingCoordAssignmentServiceServer {
return &assignmentServiceImpl{
balancer: balancer,
balancer: balancer,
listenerTotal: metrics.StreamingCoordAssignmentListenerTotal.WithLabelValues(paramtable.GetStringNodeID()),
}
}
@ -25,13 +28,14 @@ type AssignmentService interface {
// assignmentServiceImpl is the implementation of the assignment service.
type assignmentServiceImpl struct {
balancer balancer.Balancer
balancer balancer.Balancer
listenerTotal prometheus.Gauge
}
// AssignmentDiscover watches the state of all log nodes.
func (s *assignmentServiceImpl) AssignmentDiscover(server streamingpb.StreamingCoordAssignmentService_AssignmentDiscoverServer) error {
metrics.StreamingCoordAssignmentListenerTotal.WithLabelValues(paramtable.GetStringNodeID()).Inc()
defer metrics.StreamingCoordAssignmentListenerTotal.WithLabelValues(paramtable.GetStringNodeID()).Dec()
s.listenerTotal.Inc()
defer s.listenerTotal.Dec()
return discover.NewAssignmentDiscoverServer(s.balancer, server).Execute()
}

View File

@ -19,6 +19,7 @@ package idalloc
import (
"context"
"sync"
"time"
"github.com/milvus-io/milvus/internal/types"
)
@ -57,11 +58,15 @@ type Allocator interface {
// Sync expire the local allocator messages,
// syncs the local allocator and remote allocator.
Sync()
// SyncIfExpired syncs the local allocator and remote allocator if the duration since last sync operation is greater than expire.
SyncIfExpired(expire time.Duration)
}
type allocatorImpl struct {
mu sync.Mutex
remoteAllocator remoteBatchAllocator
lastSyncTime time.Time
localAllocator *localAllocator
}
@ -87,6 +92,15 @@ func (ta *allocatorImpl) Sync() {
ta.localAllocator.exhausted()
}
func (ta *allocatorImpl) SyncIfExpired(expire time.Duration) {
ta.mu.Lock()
defer ta.mu.Unlock()
if time.Since(ta.lastSyncTime) > expire {
ta.localAllocator.exhausted()
}
}
// allocateRemote allocates timestamp from remote root coordinator.
func (ta *allocatorImpl) allocateRemote(ctx context.Context) (uint64, error) {
// Update local allocator from remote.
@ -95,6 +109,7 @@ func (ta *allocatorImpl) allocateRemote(ctx context.Context) (uint64, error) {
return 0, err
}
ta.localAllocator.update(start, count)
ta.lastSyncTime = time.Now()
// Get from local again.
return ta.localAllocator.allocateOne()

View File

@ -4,9 +4,7 @@ import (
"github.com/milvus-io/milvus/internal/streamingnode/server/service/handler/consumer"
"github.com/milvus-io/milvus/internal/streamingnode/server/service/handler/producer"
"github.com/milvus-io/milvus/internal/streamingnode/server/walmanager"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
var _ HandlerService = (*handlerServiceImpl)(nil)
@ -32,9 +30,6 @@ type handlerServiceImpl struct {
// Produce creates a new producer for the channel on this log node.
func (hs *handlerServiceImpl) Produce(streamServer streamingpb.StreamingNodeHandlerService_ProduceServer) error {
metrics.StreamingNodeProducerTotal.WithLabelValues(paramtable.GetStringNodeID()).Inc()
defer metrics.StreamingNodeProducerTotal.WithLabelValues(paramtable.GetStringNodeID()).Dec()
p, err := producer.CreateProduceServer(hs.walManager, streamServer)
if err != nil {
return err
@ -44,9 +39,6 @@ func (hs *handlerServiceImpl) Produce(streamServer streamingpb.StreamingNodeHand
// Consume creates a new consumer for the channel on this log node.
func (hs *handlerServiceImpl) Consume(streamServer streamingpb.StreamingNodeHandlerService_ConsumeServer) error {
metrics.StreamingNodeConsumerTotal.WithLabelValues(paramtable.GetStringNodeID()).Inc()
defer metrics.StreamingNodeConsumerTotal.WithLabelValues(paramtable.GetStringNodeID()).Dec()
c, err := consumer.CreateConsumeServer(hs.walManager, streamServer)
if err != nil {
return err

View File

@ -2,7 +2,6 @@ package consumer
import (
"io"
"strconv"
"github.com/cockroachdb/errors"
"go.uber.org/zap"
@ -12,12 +11,10 @@ import (
"github.com/milvus-io/milvus/internal/util/streamingutil/service/contextutil"
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/streaming/proto/messagespb"
"github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
// CreateConsumeServer create a new consumer.
@ -55,11 +52,13 @@ func CreateConsumeServer(walManager walmanager.Manager, streamServer streamingpb
}
return nil, errors.Wrap(err, "at send created")
}
metrics := newConsumerMetrics(l.Channel().Name)
return &ConsumeServer{
scanner: scanner,
consumeServer: consumeServer,
logger: log.With(zap.String("channel", l.Channel().Name), zap.Int64("term", l.Channel().Term)), // Add trace info for all log.
closeCh: make(chan struct{}),
metrics: metrics,
}, nil
}
@ -69,6 +68,7 @@ type ConsumeServer struct {
consumeServer *consumeGrpcServerHelper
logger *log.MLogger
closeCh chan struct{}
metrics *consumerMetrics
}
// Execute executes the consumer.
@ -83,7 +83,9 @@ func (c *ConsumeServer) Execute() error {
// 1. the stream is broken.
// 2. recv arm recv close signal.
// 3. scanner is quit with expected error.
return c.sendLoop()
err := c.sendLoop()
c.metrics.Close()
return err
}
// sendLoop sends the message to client.
@ -141,9 +143,13 @@ func (c *ConsumeServer) sendLoop() (err error) {
}
}
func (c *ConsumeServer) sendImmutableMessage(msg message.ImmutableMessage) error {
func (c *ConsumeServer) sendImmutableMessage(msg message.ImmutableMessage) (err error) {
metricsGuard := c.metrics.StartConsume(msg.EstimateSize())
defer func() {
metricsGuard.Finish(err)
}()
// Send Consumed message to client and do metrics.
messageSize := msg.EstimateSize()
if err := c.consumeServer.SendConsumeMessage(&streamingpb.ConsumeMessageReponse{
Message: &messagespb.ImmutableMessage{
Id: &messagespb.MessageID{
@ -155,11 +161,6 @@ func (c *ConsumeServer) sendImmutableMessage(msg message.ImmutableMessage) error
}); err != nil {
return status.NewInner("send consume message failed: %s", err.Error())
}
metrics.StreamingNodeConsumeBytes.WithLabelValues(
paramtable.GetStringNodeID(),
c.scanner.Channel().Name,
strconv.FormatInt(c.scanner.Channel().Term, 10),
).Observe(float64(messageSize))
return nil
}

View File

@ -92,6 +92,7 @@ func TestConsumeServerRecvArm(t *testing.T) {
},
logger: log.With(),
closeCh: make(chan struct{}),
metrics: newConsumerMetrics("test"),
}
recvCh := make(chan *streamingpb.ConsumeRequest)
grpcConsumerServer.EXPECT().Recv().RunAndReturn(func() (*streamingpb.ConsumeRequest, error) {
@ -137,13 +138,13 @@ func TestConsumerServeSendArm(t *testing.T) {
logger: log.With(),
scanner: scanner,
closeCh: make(chan struct{}),
metrics: newConsumerMetrics("test"),
}
ctx, cancel := context.WithCancel(context.Background())
grpcConsumerServer.EXPECT().Context().Return(ctx)
grpcConsumerServer.EXPECT().Send(mock.Anything).RunAndReturn(func(cr *streamingpb.ConsumeResponse) error { return nil }).Times(7)
scanCh := make(chan message.ImmutableMessage, 5)
scanner.EXPECT().Channel().Return(types.PChannelInfo{})
scanner.EXPECT().Chan().Return(scanCh)
scanner.EXPECT().Close().Return(nil).Times(3)

View File

@ -0,0 +1,58 @@
package consumer
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
// newConsumerMetrics creates a new consumer metrics.
func newConsumerMetrics(pchannel string) *consumerMetrics {
constLabel := prometheus.Labels{
metrics.NodeIDLabelName: paramtable.GetStringNodeID(),
metrics.WALChannelLabelName: pchannel,
}
m := &consumerMetrics{
scannerTotal: metrics.StreamingNodeConsumerTotal.With(constLabel),
inflightTotal: metrics.StreamingNodeConsumeInflightTotal.With(constLabel),
bytes: metrics.StreamingNodeConsumeBytes.With(constLabel),
}
m.scannerTotal.Inc()
return m
}
// consumerMetrics is the metrics for consumer.
type consumerMetrics struct {
scannerTotal prometheus.Gauge
inflightTotal prometheus.Gauge
bytes prometheus.Observer
}
// StartConsume starts a consume operation.
func (m *consumerMetrics) StartConsume(bytes int) consumerMetricsGuard {
m.inflightTotal.Inc()
return consumerMetricsGuard{
metrics: m,
bytes: bytes,
}
}
// Close closes the consumer metrics.
func (m *consumerMetrics) Close() {
m.scannerTotal.Dec()
}
// consumerMetricsGuard is a guard for consumer metrics.
type consumerMetricsGuard struct {
metrics *consumerMetrics
bytes int
}
// Finish finishes the consume operation.
func (g consumerMetricsGuard) Finish(err error) {
g.metrics.inflightTotal.Dec()
if err == nil {
g.metrics.bytes.Observe(float64(g.bytes))
}
}

View File

@ -0,0 +1,52 @@
package producer
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
// newProducerMetrics creates a new producer metrics.
func newProducerMetrics(pchannel types.PChannelInfo) *producerMetrics {
constLabel := prometheus.Labels{
metrics.NodeIDLabelName: paramtable.GetStringNodeID(),
metrics.WALChannelLabelName: pchannel.Name,
}
pm := &producerMetrics{
produceTotal: metrics.StreamingNodeProducerTotal.With(constLabel),
inflightTotal: metrics.StreamingNodeProduceInflightTotal.With(constLabel),
}
pm.produceTotal.Inc()
return pm
}
// producerMetrics is the metrics for producer.
type producerMetrics struct {
produceTotal prometheus.Gauge
inflightTotal prometheus.Gauge
}
// StartProduce starts the produce metrics.
func (m *producerMetrics) StartProduce() produceMetricsGuard {
m.inflightTotal.Inc()
return produceMetricsGuard{
metrics: m,
}
}
// Close closes the producer metrics.
func (m *producerMetrics) Close() {
m.produceTotal.Dec()
}
// produceMetricsGuard is the guard for produce metrics.
type produceMetricsGuard struct {
metrics *producerMetrics
}
// Finish finishes the produce metrics.
func (g produceMetricsGuard) Finish(err error) {
g.metrics.inflightTotal.Dec()
}

View File

@ -2,9 +2,7 @@ package producer
import (
"io"
"strconv"
"sync"
"time"
"github.com/cockroachdb/errors"
"go.uber.org/zap"
@ -14,12 +12,10 @@ import (
"github.com/milvus-io/milvus/internal/util/streamingutil/service/contextutil"
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/streaming/proto/messagespb"
"github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
// CreateProduceServer create a new producer.
@ -47,12 +43,14 @@ func CreateProduceServer(walManager walmanager.Manager, streamServer streamingpb
}); err != nil {
return nil, errors.Wrap(err, "at send created")
}
metrics := newProducerMetrics(l.Channel())
return &ProduceServer{
wal: l,
produceServer: produceServer,
logger: log.With(zap.String("channel", l.Channel().Name), zap.Int64("term", l.Channel().Term)),
produceMessageCh: make(chan *streamingpb.ProduceMessageResponse),
appendWG: sync.WaitGroup{},
metrics: metrics,
}, nil
}
@ -63,6 +61,7 @@ type ProduceServer struct {
logger *log.MLogger
produceMessageCh chan *streamingpb.ProduceMessageResponse // All processing messages result should sent from theses channel.
appendWG sync.WaitGroup
metrics *producerMetrics
}
// Execute starts the producer.
@ -79,7 +78,9 @@ func (p *ProduceServer) Execute() error {
// the loop will be blocked until:
// 1. the stream is broken.
// 2. recv arm recv closed and all response is sent.
return p.sendLoop()
err := p.sendLoop()
p.metrics.Close()
return err
}
// sendLoop sends the message to client.
@ -175,22 +176,23 @@ func (p *ProduceServer) handleProduce(req *streamingpb.ProduceMessageRequest) {
p.appendWG.Add(1)
p.logger.Debug("recv produce message from client", zap.Int64("requestID", req.RequestId))
// Update metrics.
msg := message.NewMutableMessage(req.GetMessage().GetPayload(), req.GetMessage().GetProperties())
metricsGuard := p.metrics.StartProduce()
if err := p.validateMessage(msg); err != nil {
p.logger.Warn("produce message validation failed", zap.Int64("requestID", req.RequestId), zap.Error(err))
p.sendProduceResult(req.RequestId, nil, err)
metricsGuard.Finish(err)
p.appendWG.Done()
return
}
// Append message to wal.
// Concurrent append request can be executed concurrently.
messageSize := msg.EstimateSize()
now := time.Now()
p.wal.AppendAsync(p.produceServer.Context(), msg, func(appendResult *wal.AppendResult, err error) {
defer func() {
metricsGuard.Finish(err)
p.appendWG.Done()
p.updateMetrics(messageSize, time.Since(now).Seconds(), err)
}()
p.sendProduceResult(req.RequestId, appendResult, err)
})
@ -241,22 +243,3 @@ func (p *ProduceServer) sendProduceResult(reqID int64, appendResult *wal.AppendR
return
}
}
// updateMetrics updates the metrics.
func (p *ProduceServer) updateMetrics(messageSize int, cost float64, err error) {
name := p.wal.Channel().Name
term := strconv.FormatInt(p.wal.Channel().Term, 10)
metrics.StreamingNodeProduceBytes.WithLabelValues(paramtable.GetStringNodeID(), name, term, getStatusLabel(err)).Observe(float64(messageSize))
metrics.StreamingNodeProduceDurationSeconds.WithLabelValues(paramtable.GetStringNodeID(), name, term, getStatusLabel(err)).Observe(cost)
}
// getStatusLabel returns the status label of error.
func getStatusLabel(err error) string {
if status.IsCanceled(err) {
return metrics.CancelLabel
}
if err != nil {
return metrics.FailLabel
}
return metrics.SuccessLabel
}

View File

@ -212,6 +212,7 @@ func TestProduceServerRecvArm(t *testing.T) {
logger: log.With(),
produceMessageCh: make(chan *streamingpb.ProduceMessageResponse, 10),
appendWG: sync.WaitGroup{},
metrics: newProducerMetrics(l.Channel()),
}
// Test send arm

View File

@ -9,10 +9,8 @@ import (
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors"
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/streaming/walimpls"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -59,12 +57,11 @@ func (o *openerAdaptorImpl) Open(ctx context.Context, opt *wal.OpenOption) (wal.
// wrap the wal into walExtend with cleanup function and interceptors.
wal := adaptImplsToWAL(l, o.interceptorBuilders, func() {
o.walInstances.Remove(id)
log.Info("wal deleted from allocator")
log.Info("wal deleted from opener")
})
o.walInstances.Insert(id, wal)
log.Info("new wal created")
metrics.StreamingNodeWALTotal.WithLabelValues(paramtable.GetStringNodeID()).Inc()
return wal, nil
}

View File

@ -45,6 +45,7 @@ func TestOpenerAdaptor(t *testing.T) {
func(ctx context.Context, boo *walimpls.OpenOption) (walimpls.WALImpls, error) {
wal := mock_walimpls.NewMockWALImpls(t)
wal.EXPECT().WALName().Return("mock_wal")
wal.EXPECT().Channel().Return(boo.Channel)
wal.EXPECT().Append(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, mm message.MutableMessage) (message.MessageID, error) {
@ -81,6 +82,8 @@ func TestOpenerAdaptor(t *testing.T) {
for {
msg := mock_message.NewMockMutableMessage(t)
msg.EXPECT().WithWALTerm(mock.Anything).Return(msg).Maybe()
msg.EXPECT().MessageType().Return(message.MessageTypeInsert).Maybe()
msg.EXPECT().EstimateSize().Return(1).Maybe()
msgID, err := wal.Append(context.Background(), msg)
time.Sleep(time.Millisecond * 10)

View File

@ -7,6 +7,7 @@ import (
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/inspector"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/metricsutil"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/utility"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
@ -15,7 +16,6 @@ import (
"github.com/milvus-io/milvus/pkg/streaming/walimpls"
"github.com/milvus-io/milvus/pkg/streaming/walimpls/helper"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
var _ wal.Scanner = (*scannerAdaptorImpl)(nil)
@ -25,6 +25,7 @@ func newScannerAdaptor(
name string,
l walimpls.WALImpls,
readOption wal.ReadOption,
scanMetrics *metricsutil.ScannerMetrics,
cleanup func(),
) wal.Scanner {
if readOption.MesasgeHandler == nil {
@ -38,11 +39,12 @@ func newScannerAdaptor(
readOption: readOption,
filterFunc: options.GetFilterFunc(readOption.MessageFilter),
reorderBuffer: utility.NewReOrderBuffer(),
pendingQueue: typeutil.NewMultipartQueue[message.ImmutableMessage](),
txnBuffer: utility.NewTxnBuffer(logger),
pendingQueue: utility.NewPendingQueue(),
txnBuffer: utility.NewTxnBuffer(logger, scanMetrics),
cleanup: cleanup,
ScannerHelper: helper.NewScannerHelper(name),
lastTimeTickInfo: inspector.TimeTickInfo{},
metrics: scanMetrics,
}
go s.executeConsume()
return s
@ -55,11 +57,12 @@ type scannerAdaptorImpl struct {
innerWAL walimpls.WALImpls
readOption wal.ReadOption
filterFunc func(message.ImmutableMessage) bool
reorderBuffer *utility.ReOrderByTimeTickBuffer // only support time tick reorder now.
pendingQueue *typeutil.MultipartQueue[message.ImmutableMessage] //
txnBuffer *utility.TxnBuffer // txn buffer for txn message.
reorderBuffer *utility.ReOrderByTimeTickBuffer // only support time tick reorder now.
pendingQueue *utility.PendingQueue
txnBuffer *utility.TxnBuffer // txn buffer for txn message.
cleanup func()
lastTimeTickInfo inspector.TimeTickInfo
metrics *metricsutil.ScannerMetrics
}
// Channel returns the channel assignment info of the wal.
@ -79,6 +82,7 @@ func (s *scannerAdaptorImpl) Close() error {
if s.cleanup != nil {
s.cleanup()
}
s.metrics.Close()
return err
}
@ -112,6 +116,7 @@ func (s *scannerAdaptorImpl) executeConsume() {
}
if handleResult.MessageHandled {
s.pendingQueue.UnsafeAdvance()
s.metrics.UpdatePendingQueueSize(s.pendingQueue.Bytes())
}
if handleResult.Incoming != nil {
s.handleUpstream(handleResult.Incoming)
@ -141,17 +146,22 @@ func (s *scannerAdaptorImpl) getUpstream(scanner walimpls.ScannerImpls) <-chan m
}
func (s *scannerAdaptorImpl) handleUpstream(msg message.ImmutableMessage) {
// Observe the message.
s.metrics.ObserveMessage(msg.MessageType(), msg.EstimateSize())
if msg.MessageType() == message.MessageTypeTimeTick {
// If the time tick message incoming,
// the reorder buffer can be consumed until latest confirmed timetick.
messages := s.reorderBuffer.PopUtilTimeTick(msg.TimeTick())
s.metrics.UpdateTimeTickBufSize(s.reorderBuffer.Bytes())
// There's some txn message need to hold until confirmed, so we need to handle them in txn buffer.
msgs := s.txnBuffer.HandleImmutableMessages(messages, msg.TimeTick())
s.metrics.UpdateTxnBufSize(s.txnBuffer.Bytes())
// Push the confirmed messages into pending queue for consuming.
// and push forward timetick info.
s.pendingQueue.Add(msgs)
s.metrics.UpdatePendingQueueSize(s.pendingQueue.Bytes())
s.lastTimeTickInfo = inspector.TimeTickInfo{
MessageID: msg.MessageID(),
TimeTick: msg.TimeTick(),
@ -167,12 +177,16 @@ func (s *scannerAdaptorImpl) handleUpstream(msg message.ImmutableMessage) {
}
// otherwise add message into reorder buffer directly.
if err := s.reorderBuffer.Push(msg); err != nil {
s.metrics.ObserveTimeTickViolation(msg.MessageType())
s.logger.Warn("failed to push message into reorder buffer",
zap.Any("msgID", msg.MessageID()),
zap.Uint64("timetick", msg.TimeTick()),
zap.String("vchannel", msg.VChannel()),
zap.Error(err))
}
// Observe the filtered message.
s.metrics.UpdateTimeTickBufSize(s.reorderBuffer.Bytes())
s.metrics.ObserveFilteredMessage(msg.MessageType(), msg.EstimateSize())
}
func (s *scannerAdaptorImpl) handleTimeTickUpdated(timeTickNotifier *inspector.TimeTickNotifier) {
@ -189,5 +203,6 @@ func (s *scannerAdaptorImpl) handleTimeTickUpdated(timeTickNotifier *inspector.T
return
}
s.pendingQueue.AddOne(msg.IntoImmutableMessage(s.lastTimeTickInfo.MessageID))
s.metrics.UpdatePendingQueueSize(s.pendingQueue.Bytes())
}
}

View File

@ -8,6 +8,7 @@ import (
"github.com/stretchr/testify/mock"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/metricsutil"
"github.com/milvus-io/milvus/pkg/mocks/streaming/mock_walimpls"
"github.com/milvus-io/milvus/pkg/streaming/util/options"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
@ -25,6 +26,7 @@ func TestScannerAdaptorReadError(t *testing.T) {
DeliverPolicy: options.DeliverPolicyAll(),
MessageFilter: nil,
},
metricsutil.NewScanMetrics(types.PChannelInfo{}).NewScannerMetrics(),
func() {})
defer s.Close()

View File

@ -7,6 +7,7 @@ import (
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/metricsutil"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/utility"
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/pkg/log"
@ -44,8 +45,10 @@ func adaptImplsToWAL(
channel: basicWAL.Channel(),
idAllocator: typeutil.NewIDAllocator(),
},
scanners: typeutil.NewConcurrentMap[int64, wal.Scanner](),
cleanup: cleanup,
scanners: typeutil.NewConcurrentMap[int64, wal.Scanner](),
cleanup: cleanup,
writeMetrics: metricsutil.NewWriteMetrics(basicWAL.Channel(), basicWAL.WALName()),
scanMetrics: metricsutil.NewScanMetrics(basicWAL.Channel()),
}
param.WAL.Set(wal)
return wal
@ -61,6 +64,8 @@ type walAdaptorImpl struct {
scannerRegistry scannerRegistry
scanners *typeutil.ConcurrentMap[int64, wal.Scanner]
cleanup func()
writeMetrics *metricsutil.WriteMetrics
scanMetrics *metricsutil.ScanMetrics
}
func (w *walAdaptorImpl) WALName() string {
@ -88,6 +93,9 @@ func (w *walAdaptorImpl) Append(ctx context.Context, msg message.MutableMessage)
// Setup the term of wal.
msg = msg.WithWALTerm(w.Channel().Term)
// Metrics for append message.
metricsGuard := w.writeMetrics.StartAppend(msg.MessageType(), msg.EstimateSize())
// Execute the interceptor and wal append.
var extraAppendResult utility.ExtraAppendResult
ctx = utility.WithExtraAppendResult(ctx, &extraAppendResult)
@ -98,9 +106,13 @@ func (w *walAdaptorImpl) Append(ctx context.Context, msg message.MutableMessage)
// only used by time tick sync operator.
return notPersistHint.MessageID, nil
}
return w.inner.Append(ctx, msg)
metricsGuard.StartWALImplAppend()
msgID, err := w.inner.Append(ctx, msg)
metricsGuard.FinishWALImplAppend()
return msgID, err
})
if err != nil {
metricsGuard.Finish(err)
return nil, err
}
@ -111,6 +123,7 @@ func (w *walAdaptorImpl) Append(ctx context.Context, msg message.MutableMessage)
TxnCtx: extraAppendResult.TxnCtx,
Extra: extraAppendResult.Extra,
}
metricsGuard.Finish(nil)
return r, nil
}
@ -148,9 +161,8 @@ func (w *walAdaptorImpl) Read(ctx context.Context, opts wal.ReadOption) (wal.Sca
name,
w.inner,
opts,
func() {
w.scanners.Remove(id)
})
w.scanMetrics.NewScannerMetrics(),
func() { w.scanners.Remove(id) })
w.scanners.Insert(id, s)
return s, nil
}
@ -198,6 +210,10 @@ func (w *walAdaptorImpl) Close() {
logger.Info("call wal cleanup function...")
w.cleanup()
logger.Info("wal closed")
// close all metrics.
w.scanMetrics.Close()
w.writeMetrics.Close()
}
type interceptorBuildResult struct {

View File

@ -28,6 +28,7 @@ import (
func TestWalAdaptorReadFail(t *testing.T) {
l := mock_walimpls.NewMockWALImpls(t)
expectedErr := errors.New("test")
l.EXPECT().WALName().Return("test")
l.EXPECT().Channel().Return(types.PChannelInfo{})
l.EXPECT().Read(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, ro walimpls.ReadOption) (walimpls.ScannerImpls, error) {
@ -52,6 +53,7 @@ func TestWALAdaptor(t *testing.T) {
// Create a mock WAL implementation
l := mock_walimpls.NewMockWALImpls(t)
l.EXPECT().WALName().Return("test")
l.EXPECT().Channel().Return(types.PChannelInfo{})
l.EXPECT().Append(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, mm message.MutableMessage) (message.MessageID, error) {
@ -74,6 +76,8 @@ func TestWALAdaptor(t *testing.T) {
msg := mock_message.NewMockMutableMessage(t)
msg.EXPECT().WithWALTerm(mock.Anything).Return(msg).Maybe()
msg.EXPECT().MessageType().Return(message.MessageTypeInsert).Maybe()
msg.EXPECT().EstimateSize().Return(1).Maybe()
_, err := lAdapted.Append(context.Background(), msg)
assert.NoError(t, err)
lAdapted.AppendAsync(context.Background(), msg, func(mi *wal.AppendResult, err error) {
@ -128,6 +132,7 @@ func assertShutdownError(t *testing.T, err error) {
func TestNoInterceptor(t *testing.T) {
l := mock_walimpls.NewMockWALImpls(t)
l.EXPECT().WALName().Return("test")
l.EXPECT().Channel().Return(types.PChannelInfo{})
l.EXPECT().Append(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, mm message.MutableMessage) (message.MessageID, error) {
return nil, nil
@ -138,6 +143,8 @@ func TestNoInterceptor(t *testing.T) {
msg := mock_message.NewMockMutableMessage(t)
msg.EXPECT().WithWALTerm(mock.Anything).Return(msg).Maybe()
msg.EXPECT().MessageType().Return(message.MessageTypeInsert).Maybe()
msg.EXPECT().EstimateSize().Return(1).Maybe()
_, err := lWithInterceptors.Append(context.Background(), msg)
assert.NoError(t, err)
lWithInterceptors.Close()
@ -149,6 +156,7 @@ func TestWALWithInterceptor(t *testing.T) {
l.EXPECT().Append(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, mm message.MutableMessage) (message.MessageID, error) {
return nil, nil
})
l.EXPECT().WALName().Return("test")
l.EXPECT().Close().Run(func() {})
b := mock_interceptors.NewMockInterceptorBuilder(t)
@ -170,6 +178,8 @@ func TestWALWithInterceptor(t *testing.T) {
// Interceptor is not ready, so the append/read will be blocked until timeout.
msg := mock_message.NewMockMutableMessage(t)
msg.EXPECT().WithWALTerm(mock.Anything).Return(msg).Maybe()
msg.EXPECT().MessageType().Return(message.MessageTypeInsert).Maybe()
msg.EXPECT().EstimateSize().Return(1).Maybe()
_, err := lWithInterceptors.Append(ctx, msg)
assert.ErrorIs(t, err, context.DeadlineExceeded)

View File

@ -11,6 +11,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/policy"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/metricsutil"
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb"
@ -27,6 +28,7 @@ func newPartitionSegmentManager(
collectionID int64,
paritionID int64,
segments []*segmentAllocManager,
metrics *metricsutil.SegmentAssignMetrics,
) *partitionSegmentManager {
return &partitionSegmentManager{
mu: sync.Mutex{},
@ -40,6 +42,7 @@ func newPartitionSegmentManager(
collectionID: collectionID,
paritionID: paritionID,
segments: segments,
metrics: metrics,
}
}
@ -53,6 +56,7 @@ type partitionSegmentManager struct {
paritionID int64
segments []*segmentAllocManager // there will be very few segments in this list.
fencedAssignTimeTick uint64 // the time tick that the assign operation is fenced.
metrics *metricsutil.SegmentAssignMetrics
}
func (m *partitionSegmentManager) CollectionID() int64 {
@ -80,7 +84,7 @@ func (m *partitionSegmentManager) SealAllSegmentsAndFenceUntil(timeTick uint64)
m.mu.Lock()
defer m.mu.Unlock()
segmentManagers := m.collectShouldBeSealedWithPolicy(func(segmentMeta *segmentAllocManager) bool { return true })
segmentManagers := m.collectShouldBeSealedWithPolicy(func(segmentMeta *segmentAllocManager) (policy.PolicyName, bool) { return policy.PolicyNameFenced, true })
// fence the assign operation until the incoming time tick or latest assigned timetick.
// The new incoming assignment request will be fenced.
// So all the insert operation before the fenced time tick cannot added to the growing segment (no more insert can be applied on it).
@ -107,7 +111,7 @@ func (m *partitionSegmentManager) CollectionMustSealed(segmentID int64) *segment
var target *segmentAllocManager
m.segments = lo.Filter(m.segments, func(segment *segmentAllocManager, _ int) bool {
if segment.inner.GetSegmentId() == segmentID {
target = segment
target = segment.WithSealPolicy(policy.PolicyNameForce)
return false
}
return true
@ -116,13 +120,13 @@ func (m *partitionSegmentManager) CollectionMustSealed(segmentID int64) *segment
}
// collectShouldBeSealedWithPolicy collects all segments that should be sealed by policy.
func (m *partitionSegmentManager) collectShouldBeSealedWithPolicy(predicates func(segmentMeta *segmentAllocManager) bool) []*segmentAllocManager {
func (m *partitionSegmentManager) collectShouldBeSealedWithPolicy(predicates func(segmentMeta *segmentAllocManager) (policy.PolicyName, bool)) []*segmentAllocManager {
shouldBeSealedSegments := make([]*segmentAllocManager, 0, len(m.segments))
segments := make([]*segmentAllocManager, 0, len(m.segments))
for _, segment := range m.segments {
// A already sealed segment may be came from recovery.
if segment.GetState() == streamingpb.SegmentAssignmentState_SEGMENT_ASSIGNMENT_STATE_SEALED {
shouldBeSealedSegments = append(shouldBeSealedSegments, segment)
shouldBeSealedSegments = append(shouldBeSealedSegments, segment.WithSealPolicy(policy.PolicyNameRecover))
m.logger.Info("segment has been sealed, remove it from assignment",
zap.Int64("segmentID", segment.GetSegmentID()),
zap.String("state", segment.GetState().String()),
@ -132,10 +136,16 @@ func (m *partitionSegmentManager) collectShouldBeSealedWithPolicy(predicates fun
}
// policy hitted growing segment should be removed from assignment manager.
if segment.GetState() == streamingpb.SegmentAssignmentState_SEGMENT_ASSIGNMENT_STATE_GROWING &&
predicates(segment) {
shouldBeSealedSegments = append(shouldBeSealedSegments, segment)
continue
if segment.GetState() == streamingpb.SegmentAssignmentState_SEGMENT_ASSIGNMENT_STATE_GROWING {
if policyName, shouldBeSealed := predicates(segment); shouldBeSealed {
shouldBeSealedSegments = append(shouldBeSealedSegments, segment.WithSealPolicy(policyName))
m.logger.Info("segment should be sealed by policy",
zap.Int64("segmentID", segment.GetSegmentID()),
zap.String("policy", string(policyName)),
zap.Any("stat", segment.GetStat()),
)
continue
}
}
segments = append(segments, segment)
}
@ -159,7 +169,7 @@ func (m *partitionSegmentManager) CollectDirtySegmentsAndClear() []*segmentAlloc
}
// CollectAllCanBeSealedAndClear collects all segments that can be sealed and clear the manager.
func (m *partitionSegmentManager) CollectAllCanBeSealedAndClear() []*segmentAllocManager {
func (m *partitionSegmentManager) CollectAllCanBeSealedAndClear(policy policy.PolicyName) []*segmentAllocManager {
m.mu.Lock()
defer m.mu.Unlock()
@ -167,7 +177,7 @@ func (m *partitionSegmentManager) CollectAllCanBeSealedAndClear() []*segmentAllo
for _, segment := range m.segments {
if segment.GetState() == streamingpb.SegmentAssignmentState_SEGMENT_ASSIGNMENT_STATE_GROWING ||
segment.GetState() == streamingpb.SegmentAssignmentState_SEGMENT_ASSIGNMENT_STATE_SEALED {
canBeSealed = append(canBeSealed, segment)
canBeSealed = append(canBeSealed, segment.WithSealPolicy(policy))
}
}
m.segments = make([]*segmentAllocManager, 0)
@ -175,20 +185,20 @@ func (m *partitionSegmentManager) CollectAllCanBeSealedAndClear() []*segmentAllo
}
// hitSealPolicy checks if the segment should be sealed by policy.
func (m *partitionSegmentManager) hitSealPolicy(segmentMeta *segmentAllocManager) bool {
func (m *partitionSegmentManager) hitSealPolicy(segmentMeta *segmentAllocManager) (policy.PolicyName, bool) {
stat := segmentMeta.GetStat()
for _, p := range policy.GetSegmentAsyncSealPolicy() {
if result := p.ShouldBeSealed(stat); result.ShouldBeSealed {
m.logger.Info("segment should be sealed by policy",
zap.Int64("segmentID", segmentMeta.GetSegmentID()),
zap.String("policy", result.PolicyName),
zap.String("policy", string(result.PolicyName)),
zap.Any("stat", stat),
zap.Any("extraInfo", result.ExtraInfo),
)
return true
return result.PolicyName, true
}
}
return false
return "", false
}
// allocNewGrowingSegment allocates a new growing segment.
@ -258,8 +268,9 @@ func (m *partitionSegmentManager) createNewPendingSegment(ctx context.Context) (
if err != nil {
return nil, errors.Wrap(err, "failed to allocate segment id")
}
meta := newSegmentAllocManager(m.pchannel, m.collectionID, m.paritionID, int64(segmentID), m.vchannel)
meta := newSegmentAllocManager(m.pchannel, m.collectionID, m.paritionID, int64(segmentID), m.vchannel, m.metrics)
tx := meta.BeginModification()
tx.IntoPending()
if err := tx.Commit(ctx); err != nil {
return nil, errors.Wrap(err, "failed to commit segment assignment modification")
}

View File

@ -7,6 +7,8 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/policy"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/metricsutil"
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb"
@ -19,6 +21,7 @@ func buildNewPartitionManagers(
pchannel types.PChannelInfo,
rawMetas []*streamingpb.SegmentAssignmentMeta,
collectionInfos []*rootcoordpb.CollectionInfoOnPChannel,
metrics *metricsutil.SegmentAssignMetrics,
) (*partitionSegmentManagers, []*segmentAllocManager) {
// create a map to check if the partition exists.
partitionExist := make(map[int64]struct{}, len(collectionInfos))
@ -35,11 +38,11 @@ func buildNewPartitionManagers(
waitForSealed := make([]*segmentAllocManager, 0)
metaMaps := make(map[int64][]*segmentAllocManager)
for _, rawMeta := range rawMetas {
m := newSegmentAllocManagerFromProto(pchannel, rawMeta)
m := newSegmentAllocManagerFromProto(pchannel, rawMeta, metrics)
if _, ok := partitionExist[rawMeta.GetPartitionId()]; !ok {
// related collection or partition is not exist.
// should be sealed right now.
waitForSealed = append(waitForSealed, m)
waitForSealed = append(waitForSealed, m.WithSealPolicy(policy.PolicyNamePartitionNotFound))
continue
}
if _, ok := metaMaps[rawMeta.GetPartitionId()]; !ok {
@ -64,19 +67,23 @@ func buildNewPartitionManagers(
collectionID,
partition.GetPartitionId(),
segmentManagers,
metrics,
))
if ok {
panic("partition manager already exists when buildNewPartitionManagers in segment assignment service, there's a bug in system")
}
}
}
return &partitionSegmentManagers{
m := &partitionSegmentManagers{
mu: sync.Mutex{},
logger: log.With(zap.Any("pchannel", pchannel)),
pchannel: pchannel,
managers: managers,
collectionInfos: collectionInfoMap,
}, waitForSealed
metrics: metrics,
}
m.updateMetrics()
return m, waitForSealed
}
// partitionSegmentManagers is a collection of partition managers.
@ -87,6 +94,7 @@ type partitionSegmentManagers struct {
pchannel types.PChannelInfo
managers *typeutil.ConcurrentMap[int64, *partitionSegmentManager] // map partitionID to partition manager
collectionInfos map[int64]*rootcoordpb.CollectionInfoOnPChannel // map collectionID to collectionInfo
metrics *metricsutil.SegmentAssignMetrics
}
// NewCollection creates a new partition manager.
@ -109,6 +117,7 @@ func (m *partitionSegmentManagers) NewCollection(collectionID int64, vchannel st
collectionID,
partitionID,
make([]*segmentAllocManager, 0),
m.metrics,
)); loaded {
m.logger.Warn("partition already exists when NewCollection in segment assignment service, it's may be a bug in system",
zap.Int64("collectionID", collectionID),
@ -116,6 +125,11 @@ func (m *partitionSegmentManagers) NewCollection(collectionID int64, vchannel st
)
}
}
m.logger.Info("collection created in segment assignment service",
zap.Int64("collectionID", collectionID),
zap.String("vchannel", vchannel),
zap.Int64s("partitionIDs", partitionID))
m.updateMetrics()
}
// NewPartition creates a new partition manager.
@ -140,12 +154,18 @@ func (m *partitionSegmentManagers) NewPartition(collectionID int64, partitionID
collectionID,
partitionID,
make([]*segmentAllocManager, 0),
m.metrics,
)); loaded {
m.logger.Warn(
"partition already exists when NewPartition in segment assignment service, it's may be a bug in system",
zap.Int64("collectionID", collectionID),
zap.Int64("partitionID", partitionID))
}
m.logger.Info("partition created in segment assignment service",
zap.Int64("collectionID", collectionID),
zap.String("vchannel", m.collectionInfos[collectionID].Vchannel),
zap.Int64("partitionID", partitionID))
m.updateMetrics()
}
// Get gets a partition manager from the partition managers.
@ -171,13 +191,27 @@ func (m *partitionSegmentManagers) RemoveCollection(collectionID int64) []*segme
delete(m.collectionInfos, collectionID)
needSealed := make([]*segmentAllocManager, 0)
partitionIDs := make([]int64, 0, len(collectionInfo.Partitions))
segmentIDs := make([]int64, 0, len(collectionInfo.Partitions))
for _, partition := range collectionInfo.Partitions {
pm, ok := m.managers.Get(partition.PartitionId)
if ok {
needSealed = append(needSealed, pm.CollectAllCanBeSealedAndClear()...)
segments := pm.CollectAllCanBeSealedAndClear(policy.PolicyNameCollectionRemoved)
partitionIDs = append(partitionIDs, partition.PartitionId)
for _, segment := range segments {
segmentIDs = append(segmentIDs, segment.GetSegmentID())
}
needSealed = append(needSealed, segments...)
m.managers.Remove(partition.PartitionId)
}
m.managers.Remove(partition.PartitionId)
}
m.logger.Info(
"collection removed in segment assignment service",
zap.Int64("collectionID", collectionID),
zap.Int64s("partitionIDs", partitionIDs),
zap.Int64s("segmentIDs", segmentIDs),
)
m.updateMetrics()
return needSealed
}
@ -206,7 +240,19 @@ func (m *partitionSegmentManagers) RemovePartition(collectionID int64, partition
zap.Int64("partitionID", partitionID))
return nil
}
return pm.CollectAllCanBeSealedAndClear()
segments := pm.CollectAllCanBeSealedAndClear(policy.PolicyNamePartitionRemoved)
segmentIDs := make([]int64, 0, len(segments))
for _, segment := range segments {
segmentIDs = append(segmentIDs, segment.GetSegmentID())
}
m.logger.Info(
"partition removed in segment assignment service",
zap.Int64("collectionID", collectionID),
zap.Int64("partitionID", partitionID),
zap.Int64s("segmentIDs", segmentIDs),
)
m.updateMetrics()
return segments
}
// SealAllSegmentsAndFenceUntil seals all segments and fence assign until timetick.
@ -221,6 +267,7 @@ func (m *partitionSegmentManagers) SealAllSegmentsAndFenceUntil(collectionID int
}
sealedSegments := make([]*segmentAllocManager, 0)
segmentIDs := make([]int64, 0)
// collect all partitions
for _, partition := range collectionInfo.Partitions {
// Seal all segments and fence assign to the partition manager.
@ -232,8 +279,17 @@ func (m *partitionSegmentManagers) SealAllSegmentsAndFenceUntil(collectionID int
return nil, errors.New("partition not found")
}
newSealedSegments := pm.SealAllSegmentsAndFenceUntil(timetick)
for _, segment := range newSealedSegments {
segmentIDs = append(segmentIDs, segment.GetSegmentID())
}
sealedSegments = append(sealedSegments, newSealedSegments...)
}
m.logger.Info(
"all segments sealed and fence assign until timetick in segment assignment service",
zap.Int64("collectionID", collectionID),
zap.Uint64("timetick", timetick),
zap.Int64s("segmentIDs", segmentIDs),
)
return sealedSegments, nil
}
@ -245,6 +301,11 @@ func (m *partitionSegmentManagers) Range(f func(pm *partitionSegmentManager)) {
})
}
func (m *partitionSegmentManagers) updateMetrics() {
m.metrics.UpdatePartitionCount(m.managers.Len())
m.metrics.UpdateCollectionCount(len(m.collectionInfos))
}
// newCollectionInfo creates a new collection info.
func newCollectionInfo(collectionID int64, vchannel string, partitionIDs []int64) *rootcoordpb.CollectionInfoOnPChannel {
info := &rootcoordpb.CollectionInfoOnPChannel{

View File

@ -11,6 +11,7 @@ import (
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/inspector"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/stats"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/metricsutil"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
@ -37,7 +38,8 @@ func RecoverPChannelSegmentAllocManager(
if err := merr.CheckRPCCall(resp, err); err != nil {
return nil, errors.Wrap(err, "failed to get pchannel info from rootcoord")
}
managers, waitForSealed := buildNewPartitionManagers(pchannel, rawMetas, resp.GetCollections())
metrics := metricsutil.NewSegmentAssignMetrics(pchannel.Name)
managers, waitForSealed := buildNewPartitionManagers(pchannel, rawMetas, resp.GetCollections(), metrics)
// PChannelSegmentAllocManager is the segment assign manager of determined pchannel.
logger := log.With(zap.Any("pchannel", pchannel))
@ -47,7 +49,8 @@ func RecoverPChannelSegmentAllocManager(
logger: logger,
pchannel: pchannel,
managers: managers,
helper: newSealQueue(logger, wal, waitForSealed),
helper: newSealQueue(logger, wal, waitForSealed, metrics),
metrics: metrics,
}, nil
}
@ -59,7 +62,8 @@ type PChannelSegmentAllocManager struct {
pchannel types.PChannelInfo
managers *partitionSegmentManagers
// There should always
helper *sealQueue
helper *sealQueue
metrics *metricsutil.SegmentAssignMetrics
}
// Channel returns the pchannel info.
@ -200,7 +204,16 @@ func (m *PChannelSegmentAllocManager) MustSealSegments(ctx context.Context, info
for _, info := range infos {
if pm, err := m.managers.Get(info.CollectionID, info.PartitionID); err == nil {
m.helper.AsyncSeal(pm.CollectionMustSealed(info.SegmentID))
if segment := pm.CollectionMustSealed(info.SegmentID); segment != nil {
m.helper.AsyncSeal(segment)
} else {
m.logger.Info(
"segment not found when trigger must seal, may be already sealed",
zap.Int64("collectionID", info.CollectionID),
zap.Int64("partitionID", info.PartitionID),
zap.Int64("segmentID", info.SegmentID),
)
}
}
}
m.helper.SealAllWait(ctx)
@ -273,4 +286,6 @@ func (m *PChannelSegmentAllocManager) Close(ctx context.Context) {
resource.Resource().SegmentAssignStatsManager().UnregisterSealedSegment(segment.GetSegmentID())
}
}
m.metrics.Close()
}

View File

@ -95,7 +95,7 @@ func TestSegmentAllocManager(t *testing.T) {
assert.True(t, m.IsNoWaitSeal()) // result2 is acked, so new seal segment will be sealed right away.
// interactive with txn
txnManager := txn.NewTxnManager()
txnManager := txn.NewTxnManager(types.PChannelInfo{Name: "test"})
txn, err := txnManager.BeginNewTxn(context.Background(), tsoutil.GetCurrentTime(), time.Second)
assert.NoError(t, err)
txn.BeginDone()

View File

@ -8,6 +8,7 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/metricsutil"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
@ -15,13 +16,19 @@ import (
)
// newSealQueue creates a new seal helper queue.
func newSealQueue(logger *log.MLogger, wal *syncutil.Future[wal.WAL], waitForSealed []*segmentAllocManager) *sealQueue {
func newSealQueue(
logger *log.MLogger,
wal *syncutil.Future[wal.WAL],
waitForSealed []*segmentAllocManager,
metrics *metricsutil.SegmentAssignMetrics,
) *sealQueue {
return &sealQueue{
cond: syncutil.NewContextCond(&sync.Mutex{}),
logger: logger,
wal: wal,
waitForSealed: waitForSealed,
waitCounter: len(waitForSealed),
metrics: metrics,
}
}
@ -33,10 +40,21 @@ type sealQueue struct {
waitForSealed []*segmentAllocManager
waitCounter int // wait counter count the real wait segment count, it is not equal to waitForSealed length.
// some segments may be in sealing process.
metrics *metricsutil.SegmentAssignMetrics
}
// AsyncSeal adds a segment into the queue, and will be sealed at next time.
func (q *sealQueue) AsyncSeal(manager ...*segmentAllocManager) {
if q.logger.Level().Enabled(zap.DebugLevel) {
for _, m := range manager {
q.logger.Debug("segment is added into seal queue",
zap.Int("collectionID", int(m.GetCollectionID())),
zap.Int("partitionID", int(m.GetPartitionID())),
zap.Int("segmentID", int(m.GetSegmentID())),
zap.String("policy", string(m.SealPolicy())))
}
}
q.cond.LockAndBroadcast()
defer q.cond.L.Unlock()
@ -106,7 +124,17 @@ func (q *sealQueue) tryToSealSegments(ctx context.Context, segments ...*segmentA
if err := tx.Commit(ctx); err != nil {
q.logger.Warn("flushed segment failed at commit, maybe sent repeated flush message into wal", zap.Int64("segmentID", segment.GetSegmentID()), zap.Error(err))
undone = append(undone, segment)
continue
}
q.metrics.ObserveSegmentFlushed(
string(segment.SealPolicy()),
int64(segment.GetStat().Insert.BinarySize))
q.logger.Info("segment has been flushed",
zap.Int64("collectionID", segment.GetCollectionID()),
zap.Int64("partitionID", segment.GetPartitionID()),
zap.String("vchannel", segment.GetVChannel()),
zap.Int64("segmentID", segment.GetSegmentID()),
zap.String("sealPolicy", string(segment.SealPolicy())))
}
}
}
@ -124,11 +152,18 @@ func (q *sealQueue) transferSegmentStateIntoSealed(ctx context.Context, segments
undone := make([]*segmentAllocManager, 0)
sealedSegments := make(map[int64]map[string][]*segmentAllocManager)
for _, segment := range segments {
logger := q.logger.With(
zap.Int64("collectionID", segment.GetCollectionID()),
zap.Int64("partitionID", segment.GetPartitionID()),
zap.String("vchannel", segment.GetVChannel()),
zap.Int64("segmentID", segment.GetSegmentID()),
zap.String("sealPolicy", string(segment.SealPolicy())))
if segment.GetState() == streamingpb.SegmentAssignmentState_SEGMENT_ASSIGNMENT_STATE_GROWING {
tx := segment.BeginModification()
tx.IntoSealed()
if err := tx.Commit(ctx); err != nil {
q.logger.Warn("seal segment failed at commit", zap.Int64("segmentID", segment.GetSegmentID()), zap.Error(err))
logger.Warn("seal segment failed at commit", zap.Error(err))
undone = append(undone, segment)
continue
}
@ -142,14 +177,14 @@ func (q *sealQueue) transferSegmentStateIntoSealed(ctx context.Context, segments
ackSem := segment.AckSem()
if ackSem > 0 {
undone = append(undone, segment)
q.logger.Info("segment has been sealed, but there are flying acks, delay it", zap.Int64("segmentID", segment.GetSegmentID()), zap.Int32("ackSem", ackSem))
logger.Info("segment has been sealed, but there are flying acks, delay it", zap.Int32("ackSem", ackSem))
continue
}
txnSem := segment.TxnSem()
if txnSem > 0 {
undone = append(undone, segment)
q.logger.Info("segment has been sealed, but there are flying txns, delay it", zap.Int64("segmentID", segment.GetSegmentID()), zap.Int32("txnSem", txnSem))
logger.Info("segment has been sealed, but there are flying txns, delay it", zap.Int32("txnSem", txnSem))
continue
}
@ -161,6 +196,7 @@ func (q *sealQueue) transferSegmentStateIntoSealed(ctx context.Context, segments
sealedSegments[segment.GetCollectionID()][segment.GetVChannel()] = make([]*segmentAllocManager, 0)
}
sealedSegments[segment.GetCollectionID()][segment.GetVChannel()] = append(sealedSegments[segment.GetCollectionID()][segment.GetVChannel()], segment)
logger.Info("segment has been mark as sealed, can be flushed")
}
return undone, sealedSegments
}
@ -190,23 +226,3 @@ func (m *sealQueue) sendFlushSegmentsMessageIntoWAL(ctx context.Context, collect
m.logger.Info("send flush message into wal", zap.Int64("collectionID", collectionID), zap.String("vchannel", vchannel), zap.Int64s("segmentIDs", segmentIDs), zap.Any("msgID", msgID))
return nil
}
// createNewFlushMessage creates a new flush message.
func (m *sealQueue) createNewFlushMessage(
collectionID int64,
vchannel string,
segmentIDs []int64,
) (message.MutableMessage, error) {
// Create a flush message.
msg, err := message.NewFlushMessageBuilderV2().
WithVChannel(vchannel).
WithHeader(&message.FlushMessageHeader{}).
WithBody(&message.FlushMessageBody{
CollectionId: collectionID,
SegmentId: segmentIDs,
}).BuildMutable()
if err != nil {
return nil, errors.Wrap(err, "at create new flush message")
}
return msg, nil
}

View File

@ -11,6 +11,7 @@ import (
"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/policy"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/stats"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/metricsutil"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
@ -22,6 +23,7 @@ const dirtyThreshold = 30 * 1024 * 1024 // 30MB
func newSegmentAllocManagerFromProto(
pchannel types.PChannelInfo,
inner *streamingpb.SegmentAssignmentMeta,
metrics *metricsutil.SegmentAssignMetrics,
) *segmentAllocManager {
stat := stats.NewSegmentStatFromProto(inner.Stat)
// Growing segment's stat should be registered to stats manager.
@ -36,6 +38,7 @@ func newSegmentAllocManagerFromProto(
}, inner.GetSegmentId(), stat)
stat = nil
}
metrics.UpdateGrowingSegmentState(streamingpb.SegmentAssignmentState_SEGMENT_ASSIGNMENT_STATE_UNKNOWN, inner.GetState())
return &segmentAllocManager{
pchannel: pchannel,
inner: inner,
@ -43,6 +46,7 @@ func newSegmentAllocManagerFromProto(
ackSem: atomic.NewInt32(0),
txnSem: atomic.NewInt32(0),
dirtyBytes: 0,
metrics: metrics,
}
}
@ -53,6 +57,7 @@ func newSegmentAllocManager(
partitionID int64,
segmentID int64,
vchannel string,
metrics *metricsutil.SegmentAssignMetrics,
) *segmentAllocManager {
return &segmentAllocManager{
pchannel: pchannel,
@ -61,13 +66,14 @@ func newSegmentAllocManager(
PartitionId: partitionID,
SegmentId: segmentID,
Vchannel: vchannel,
State: streamingpb.SegmentAssignmentState_SEGMENT_ASSIGNMENT_STATE_PENDING,
State: streamingpb.SegmentAssignmentState_SEGMENT_ASSIGNMENT_STATE_UNKNOWN,
Stat: nil,
},
immutableStat: nil, // immutable stat can be seen after sealed.
ackSem: atomic.NewInt32(0),
dirtyBytes: 0,
txnSem: atomic.NewInt32(0),
metrics: metrics,
}
}
@ -92,6 +98,19 @@ type segmentAllocManager struct {
ackSem *atomic.Int32 // the ackSem is increased when segment allocRows, decreased when the segment is acked.
dirtyBytes uint64 // records the dirty bytes that didn't persist.
txnSem *atomic.Int32 // the runnint txn count of the segment.
metrics *metricsutil.SegmentAssignMetrics
sealPolicy policy.PolicyName
}
// WithSealPolicy sets the seal policy of the segment assignment meta.
func (s *segmentAllocManager) WithSealPolicy(policy policy.PolicyName) *segmentAllocManager {
s.sealPolicy = policy
return s
}
// SealPolicy returns the seal policy of the segment assignment meta.
func (s *segmentAllocManager) SealPolicy() policy.PolicyName {
return s.sealPolicy
}
// GetCollectionID returns the collection id of the segment assignment meta.
@ -210,6 +229,13 @@ type mutableSegmentAssignmentMeta struct {
modifiedCopy *streamingpb.SegmentAssignmentMeta
}
func (m *mutableSegmentAssignmentMeta) IntoPending() {
if m.modifiedCopy.State != streamingpb.SegmentAssignmentState_SEGMENT_ASSIGNMENT_STATE_UNKNOWN {
panic("tranfer state to pending from non-unknown state")
}
m.modifiedCopy.State = streamingpb.SegmentAssignmentState_SEGMENT_ASSIGNMENT_STATE_PENDING
}
// IntoGrowing transfers the segment assignment meta into growing state.
func (m *mutableSegmentAssignmentMeta) IntoGrowing(limitation *policy.SegmentLimitation) {
if m.modifiedCopy.State != streamingpb.SegmentAssignmentState_SEGMENT_ASSIGNMENT_STATE_PENDING {
@ -263,6 +289,7 @@ func (m *mutableSegmentAssignmentMeta) Commit(ctx context.Context) error {
// if the state transferred from growing into others, remove the stats from stats manager.
m.original.immutableStat = resource.Resource().SegmentAssignStatsManager().UnregisterSealedSegment(m.original.GetSegmentID())
}
m.original.metrics.UpdateGrowingSegmentState(m.original.GetState(), m.modifiedCopy.GetState())
m.original.inner = m.modifiedCopy
return nil
}

View File

@ -1,14 +0,0 @@
package policy
import "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/stats"
func GetGlobalAsyncSealPolicy() []GlobalAsyncSealPolicy {
// TODO: dynamic policy can be applied here in future.
return []GlobalAsyncSealPolicy{}
}
// GlobalAsyncSealPolicy is the policy to check if a global segment should be sealed or not.
type GlobalAsyncSealPolicy interface {
// ShouldSealed checks if the segment should be sealed, and return the reason string.
ShouldSealed(m stats.StatsManager)
}

View File

@ -7,6 +7,17 @@ import (
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
type PolicyName string
var (
PolicyNamePartitionNotFound PolicyName = "partition_not_found"
PolicyNamePartitionRemoved PolicyName = "partition_removed"
PolicyNameCollectionRemoved PolicyName = "collection_removed"
PolicyNameRecover PolicyName = "recover"
PolicyNameFenced PolicyName = "fenced"
PolicyNameForce PolicyName = "force"
)
// GetSegmentAsyncSealPolicy returns the segment async seal policy.
func GetSegmentAsyncSealPolicy() []SegmentAsyncSealPolicy {
// TODO: dynamic policy can be applied here in future.
@ -20,7 +31,7 @@ func GetSegmentAsyncSealPolicy() []SegmentAsyncSealPolicy {
// SealPolicyResult is the result of the seal policy.
type SealPolicyResult struct {
PolicyName string
PolicyName PolicyName
ShouldBeSealed bool
ExtraInfo interface{}
}
@ -40,7 +51,7 @@ type sealByCapacity struct{}
// ShouldBeSealed checks if the segment should be sealed, and return the reason string.
func (p *sealByCapacity) ShouldBeSealed(stats *stats.SegmentStats) SealPolicyResult {
return SealPolicyResult{
PolicyName: "seal_by_capacity",
PolicyName: "by_capacity",
ShouldBeSealed: stats.ReachLimit,
ExtraInfo: nil,
}
@ -59,7 +70,7 @@ func (p *sealByBinlogFileNumber) ShouldBeSealed(stats *stats.SegmentStats) SealP
limit := paramtable.Get().DataCoordCfg.SegmentMaxBinlogFileNumber.GetAsInt()
shouldBeSealed := stats.BinLogCounter >= uint64(limit)
return SealPolicyResult{
PolicyName: "seal_by_binlog_file_number",
PolicyName: "binlog_file_number",
ShouldBeSealed: shouldBeSealed,
ExtraInfo: &sealByBinlogFileNumberExtraInfo{
BinLogFileNumberLimit: limit,
@ -80,7 +91,7 @@ func (p *sealByLifetime) ShouldBeSealed(stats *stats.SegmentStats) SealPolicyRes
lifetime := paramtable.Get().DataCoordCfg.SegmentMaxLifetime.GetAsDuration(time.Second)
shouldBeSealed := time.Since(stats.CreateTime) > lifetime
return SealPolicyResult{
PolicyName: "seal_by_lifetime",
PolicyName: "by_lifetime",
ShouldBeSealed: shouldBeSealed,
ExtraInfo: sealByLifetimeExtraInfo{
MaxLifeTime: lifetime,
@ -104,7 +115,7 @@ func (p *sealByIdleTime) ShouldBeSealed(stats *stats.SegmentStats) SealPolicyRes
shouldBeSealed := stats.Insert.BinarySize > minSize && time.Since(stats.LastModifiedTime) > idleTime
return SealPolicyResult{
PolicyName: "seal_by_idle_time",
PolicyName: "by_idle_time",
ShouldBeSealed: shouldBeSealed,
ExtraInfo: sealByIdleTimeExtraInfo{
IdleTime: idleTime,

View File

@ -88,3 +88,12 @@ func (ad *AckDetails) EarliestLastConfirmedMessageID() message.MessageID {
func (ad *AckDetails) Clear() {
ad.detail = nil
}
// Range iterates the AckDetail.
func (ad *AckDetails) Range(fn func(detail *AckDetail) bool) {
for _, detail := range ad.detail {
if !fn(detail) {
break
}
}
}

View File

@ -16,6 +16,7 @@ import (
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/metricsutil"
"github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/walimplstest"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
@ -44,7 +45,7 @@ func TestAck(t *testing.T) {
)
resource.InitForTest(t, resource.OptRootCoordClient(rc))
ackManager := NewAckManager(0, nil)
ackManager := NewAckManager(0, nil, metricsutil.NewTimeTickMetrics("test"))
ackers := map[uint64]*Acker{}
for i := 0; i < 10; i++ {
@ -161,7 +162,7 @@ func TestAckManager(t *testing.T) {
)
resource.InitForTest(t, resource.OptRootCoordClient(rc))
ackManager := NewAckManager(0, walimplstest.NewTestMessageID(0))
ackManager := NewAckManager(0, walimplstest.NewTestMessageID(0), metricsutil.NewTimeTickMetrics("test"))
// Test Concurrent Collect.
wg := sync.WaitGroup{}

View File

@ -3,8 +3,10 @@ package ack
import (
"context"
"sync"
"time"
"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/metricsutil"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/util/syncutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
@ -22,12 +24,14 @@ type AckManager struct {
// It is used to detect the concurrent operation to find the last confirmed message id.
acknowledgedDetails sortedDetails // All ack details which time tick less than lastConfirmedTimeTick will be temporarily kept here until sync operation happens.
lastConfirmedManager *lastConfirmedManager // The last confirmed message id manager.
metrics *metricsutil.TimeTickMetrics
}
// NewAckManager creates a new timestampAckHelper.
func NewAckManager(
lastConfirmedTimeTick uint64,
lastConfirmedMessageID message.MessageID,
metrics *metricsutil.TimeTickMetrics,
) *AckManager {
return &AckManager{
cond: syncutil.NewContextCond(&sync.Mutex{}),
@ -36,6 +40,7 @@ func NewAckManager(
ackHeap: typeutil.NewHeap[*Acker](&ackersOrderByEndTimestamp{}),
lastConfirmedTimeTick: lastConfirmedTimeTick,
lastConfirmedManager: newLastConfirmedManager(lastConfirmedMessageID),
metrics: metrics,
}
}
@ -65,6 +70,7 @@ func (ta *AckManager) Allocate(ctx context.Context) (*Acker, error) {
return nil, err
}
ta.lastAllocatedTimeTick = ts
ta.metrics.CountAllocateTimeTick(ts)
// create new timestampAck for ack process.
// add ts to heap wait for ack.
@ -81,7 +87,7 @@ func (ta *AckManager) Allocate(ctx context.Context) (*Acker, error) {
// Concurrent safe to call with Allocate.
func (ta *AckManager) SyncAndGetAcknowledged(ctx context.Context) ([]*AckDetail, error) {
// local timestamp may out of date, sync the underlying allocator before get last all acknowledged.
resource.Resource().TSOAllocator().Sync()
resource.Resource().TSOAllocator().SyncIfExpired(50 * time.Millisecond)
// Allocate may be uncalled in long term, and the recorder may be out of date.
// Do a Allocate and Ack, can sync up the recorder with internal timetick.TimestampAllocator latest time.
@ -107,6 +113,7 @@ func (ta *AckManager) ack(acker *Acker) {
acker.acknowledged = true
acker.detail.EndTimestamp = ta.lastAllocatedTimeTick
ta.ackHeap.Push(acker)
ta.metrics.CountAcknowledgeTimeTick(acker.ackDetail().IsSync)
ta.popUntilLastAllAcknowledged()
}
@ -127,6 +134,7 @@ func (ta *AckManager) popUntilLastAllAcknowledged() {
// update last confirmed time tick.
ta.lastConfirmedTimeTick = acknowledgedDetails[len(acknowledgedDetails)-1].BeginTimestamp
ta.metrics.UpdateLastConfirmedTimeTick(ta.lastConfirmedTimeTick)
// pop all EndTimestamp is less than lastConfirmedTimeTick.
// All the messages which EndTimetick less than lastConfirmedTimeTick have been committed into wal.

View File

@ -26,6 +26,6 @@ func (b *interceptorBuilder) Build(param interceptors.InterceptorBuildParam) int
resource.Resource().TimeTickInspector().RegisterSyncOperator(operator)
return &timeTickAppendInterceptor{
operator: operator,
txnManager: txn.NewTxnManager(),
txnManager: txn.NewTxnManager(param.WALImpls.Channel()),
}
}

View File

@ -11,6 +11,7 @@ import (
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/ack"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/inspector"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/metricsutil"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/utility"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
@ -36,6 +37,7 @@ func newTimeTickSyncOperator(param interceptors.InterceptorBuildParam) *timeTick
ackDetails: ack.NewAckDetails(),
sourceID: paramtable.GetNodeID(),
timeTickNotifier: inspector.NewTimeTickNotifier(),
metrics: metricsutil.NewTimeTickMetrics(param.WALImpls.Channel().Name),
}
}
@ -52,6 +54,7 @@ type timeTickSyncOperator struct {
ackDetails *ack.AckDetails // all acknowledged details, all acked messages but not sent to wal will be kept here.
sourceID int64 // the current node id.
timeTickNotifier *inspector.TimeTickNotifier // used to notify the time tick change.
metrics *metricsutil.TimeTickMetrics
}
// Channel returns the pchannel info.
@ -143,7 +146,11 @@ func (impl *timeTickSyncOperator) blockUntilSyncTimeTickReady() error {
continue
}
// initialize ack manager.
impl.ackManager = ack.NewAckManager(ts, msgID)
impl.ackManager = ack.NewAckManager(ts, msgID, impl.metrics)
impl.logger.Info(
"send first time tick success",
zap.Uint64("timestamp", ts),
zap.String("messageID", msgID.String()))
break
}
// interceptor is ready now.
@ -174,6 +181,7 @@ func (impl *timeTickSyncOperator) AckManager() *ack.AckManager {
// Close close the time tick sync operator.
func (impl *timeTickSyncOperator) Close() {
impl.cancel()
impl.metrics.Close()
}
// sendTsMsg sends first timestamp message to wal.
@ -222,6 +230,12 @@ func (impl *timeTickSyncOperator) sendPersistentTsMsg(ctx context.Context,
)
}
// metrics updates
impl.metrics.CountPersistentTimeTickSync(ts)
impl.ackDetails.Range(func(detail *ack.AckDetail) bool {
impl.metrics.CountSyncTimeTick(detail.IsSync)
return true
})
// Ack details has been committed to wal, clear it.
impl.ackDetails.Clear()
// Update last time tick message id and time tick.
@ -255,6 +269,12 @@ func (impl *timeTickSyncOperator) sendNoPersistentTsMsg(ctx context.Context, ts
)
}
// metrics updates.
impl.metrics.CountMemoryTimeTickSync(ts)
impl.ackDetails.Range(func(detail *ack.AckDetail) bool {
impl.metrics.CountSyncTimeTick(detail.IsSync)
return true
})
// Ack details has been committed to wal, clear it.
impl.ackDetails.Clear()
// Only update time tick.

View File

@ -108,6 +108,14 @@ func (s *TxnSession) IsExpiredOrDone(ts uint64) bool {
return s.isExpiredOrDone(ts)
}
// State returns the state of the session.
func (s *TxnSession) State() message.TxnState {
s.mu.Lock()
defer s.mu.Unlock()
return s.state
}
// isExpiredOrDone checks if the session is expired or done.
func (s *TxnSession) isExpiredOrDone(ts uint64) bool {
// A timeout txn or rollbacked/committed txn should be cleared.

View File

@ -13,6 +13,7 @@ import (
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)
@ -26,7 +27,7 @@ func TestSession(t *testing.T) {
resource.InitForTest(t)
ctx := context.Background()
m := NewTxnManager()
m := NewTxnManager(types.PChannelInfo{Name: "test"})
session, err := m.BeginNewTxn(ctx, 0, 10*time.Millisecond)
assert.NotNil(t, session)
assert.NoError(t, err)
@ -111,7 +112,7 @@ func TestSession(t *testing.T) {
func TestManager(t *testing.T) {
resource.InitForTest(t)
m := NewTxnManager()
m := NewTxnManager(types.PChannelInfo{Name: "test"})
wg := &sync.WaitGroup{}

View File

@ -8,19 +8,22 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/metricsutil"
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
// NewTxnManager creates a new transaction manager.
func NewTxnManager() *TxnManager {
func NewTxnManager(pchannel types.PChannelInfo) *TxnManager {
return &TxnManager{
mu: sync.Mutex{},
sessions: make(map[message.TxnID]*TxnSession),
closed: nil,
metrics: metricsutil.NewTxnMetrics(pchannel.Name),
}
}
@ -31,6 +34,7 @@ type TxnManager struct {
mu sync.Mutex
sessions map[message.TxnID]*TxnSession
closed lifetime.SafeChan
metrics *metricsutil.TxnMetrics
}
// BeginNewTxn starts a new transaction with a session.
@ -70,6 +74,7 @@ func (m *TxnManager) BeginNewTxn(ctx context.Context, timetick uint64, keepalive
}
m.sessions[session.TxnContext().TxnID] = session
m.metrics.BeginTxn()
return session, nil
}
@ -81,6 +86,7 @@ func (m *TxnManager) CleanupTxnUntil(ts uint64) {
for id, session := range m.sessions {
if session.IsExpiredOrDone(ts) {
session.Cleanup()
m.metrics.Finish(session.State())
delete(m.sessions, id)
}
}
@ -105,6 +111,8 @@ func (m *TxnManager) GetSessionOfTxn(id message.TxnID) (*TxnSession, error) {
// GracefulClose waits for all transactions to be cleaned up.
func (m *TxnManager) GracefulClose(ctx context.Context) error {
defer m.metrics.Close()
m.mu.Lock()
if m.closed == nil {
m.closed = lifetime.NewSafeChan()

View File

@ -0,0 +1,65 @@
package metricsutil
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
func NewSegmentAssignMetrics(pchannel string) *SegmentAssignMetrics {
constLabel := prometheus.Labels{
metrics.NodeIDLabelName: paramtable.GetStringNodeID(),
metrics.WALChannelLabelName: pchannel,
}
return &SegmentAssignMetrics{
constLabel: constLabel,
allocTotal: metrics.WALSegmentAllocTotal.MustCurryWith(constLabel),
segmentBytes: metrics.WALSegmentBytes.With(constLabel),
flushedTotal: metrics.WALSegmentFlushedTotal.MustCurryWith(constLabel),
partitionTotal: metrics.WALPartitionTotal.With(constLabel),
collectionTotal: metrics.WALCollectionTotal.With(constLabel),
}
}
// SegmentAssignMetrics is the metrics of the segment assignment.
type SegmentAssignMetrics struct {
constLabel prometheus.Labels
allocTotal *prometheus.GaugeVec
segmentBytes prometheus.Observer
flushedTotal *prometheus.CounterVec
partitionTotal prometheus.Gauge
collectionTotal prometheus.Gauge
}
// UpdateGrowingSegmentState updates the metrics of the segment assignment state.
func (m *SegmentAssignMetrics) UpdateGrowingSegmentState(from streamingpb.SegmentAssignmentState, to streamingpb.SegmentAssignmentState) {
if from != streamingpb.SegmentAssignmentState_SEGMENT_ASSIGNMENT_STATE_UNKNOWN {
m.allocTotal.WithLabelValues(from.String()).Dec()
}
if to != streamingpb.SegmentAssignmentState_SEGMENT_ASSIGNMENT_STATE_FLUSHED {
m.allocTotal.WithLabelValues(to.String()).Inc()
}
}
func (m *SegmentAssignMetrics) ObserveSegmentFlushed(policy string, bytes int64) {
m.segmentBytes.Observe(float64(bytes))
m.flushedTotal.WithLabelValues(policy).Inc()
}
func (m *SegmentAssignMetrics) UpdatePartitionCount(cnt int) {
m.partitionTotal.Set(float64(cnt))
}
func (m *SegmentAssignMetrics) UpdateCollectionCount(cnt int) {
m.collectionTotal.Set(float64(cnt))
}
func (m *SegmentAssignMetrics) Close() {
metrics.WALSegmentAllocTotal.DeletePartialMatch(m.constLabel)
metrics.WALSegmentFlushedTotal.DeletePartialMatch(m.constLabel)
metrics.WALPartitionTotal.Delete(m.constLabel)
metrics.WALCollectionTotal.Delete(m.constLabel)
}

View File

@ -0,0 +1,121 @@
package metricsutil
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/syncutil"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)
// TimeTickMetrics is the metrics for time tick.
type TimeTickMetrics struct {
mu syncutil.ClosableLock
constLabel prometheus.Labels
allocatedTimeTickCounter prometheus.Counter
acknowledgedTimeTickCounterForSync prometheus.Counter
syncTimeTickCounterForSync prometheus.Counter
acknowledgedTimeTickCounter prometheus.Counter
syncTimeTickCounter prometheus.Counter
lastAllocatedTimeTick prometheus.Gauge
lastConfirmedTimeTick prometheus.Gauge
persistentTimeTickSyncCounter prometheus.Counter
persistentTimeTickSync prometheus.Gauge
nonPersistentTimeTickSyncCounter prometheus.Counter
nonPersistentTimeTickSync prometheus.Gauge
}
// NewTimeTickMetrics creates a new time tick metrics.
func NewTimeTickMetrics(pchannel string) *TimeTickMetrics {
constLabel := prometheus.Labels{
metrics.NodeIDLabelName: paramtable.GetStringNodeID(),
metrics.WALChannelLabelName: pchannel,
}
return &TimeTickMetrics{
mu: syncutil.ClosableLock{},
constLabel: constLabel,
allocatedTimeTickCounter: metrics.WALAllocateTimeTickTotal.With(constLabel),
acknowledgedTimeTickCounterForSync: metrics.WALAcknowledgeTimeTickTotal.MustCurryWith(constLabel).WithLabelValues("sync"),
syncTimeTickCounterForSync: metrics.WALSyncTimeTickTotal.MustCurryWith(constLabel).WithLabelValues("sync"),
acknowledgedTimeTickCounter: metrics.WALAcknowledgeTimeTickTotal.MustCurryWith(constLabel).WithLabelValues("common"),
syncTimeTickCounter: metrics.WALSyncTimeTickTotal.MustCurryWith(constLabel).WithLabelValues("common"),
lastAllocatedTimeTick: metrics.WALLastAllocatedTimeTick.With(constLabel),
lastConfirmedTimeTick: metrics.WALLastConfirmedTimeTick.With(constLabel),
persistentTimeTickSyncCounter: metrics.WALTimeTickSyncTotal.MustCurryWith(constLabel).WithLabelValues("persistent"),
persistentTimeTickSync: metrics.WALTimeTickSyncTimeTick.MustCurryWith(constLabel).WithLabelValues("persistent"),
nonPersistentTimeTickSyncCounter: metrics.WALTimeTickSyncTotal.MustCurryWith(constLabel).WithLabelValues("memory"),
nonPersistentTimeTickSync: metrics.WALTimeTickSyncTimeTick.MustCurryWith(constLabel).WithLabelValues("memory"),
}
}
func (m *TimeTickMetrics) CountAllocateTimeTick(ts uint64) {
if !m.mu.LockIfNotClosed() {
return
}
m.allocatedTimeTickCounter.Inc()
m.lastAllocatedTimeTick.Set(tsoutil.PhysicalTimeSeconds(ts))
m.mu.Unlock()
}
func (m *TimeTickMetrics) CountAcknowledgeTimeTick(isSync bool) {
if !m.mu.LockIfNotClosed() {
return
}
if isSync {
m.acknowledgedTimeTickCounterForSync.Inc()
} else {
m.acknowledgedTimeTickCounter.Inc()
}
m.mu.Unlock()
}
func (m *TimeTickMetrics) CountSyncTimeTick(isSync bool) {
if !m.mu.LockIfNotClosed() {
return
}
if isSync {
m.syncTimeTickCounterForSync.Inc()
} else {
m.syncTimeTickCounter.Inc()
}
m.mu.Unlock()
}
func (m *TimeTickMetrics) CountMemoryTimeTickSync(ts uint64) {
if !m.mu.LockIfNotClosed() {
return
}
m.nonPersistentTimeTickSyncCounter.Inc()
m.nonPersistentTimeTickSync.Set(tsoutil.PhysicalTimeSeconds(ts))
m.mu.Unlock()
}
func (m *TimeTickMetrics) CountPersistentTimeTickSync(ts uint64) {
if !m.mu.LockIfNotClosed() {
return
}
m.persistentTimeTickSyncCounter.Inc()
m.persistentTimeTickSync.Set(tsoutil.PhysicalTimeSeconds(ts))
m.mu.Unlock()
}
func (m *TimeTickMetrics) UpdateLastConfirmedTimeTick(ts uint64) {
if !m.mu.LockIfNotClosed() {
return
}
m.lastConfirmedTimeTick.Set(tsoutil.PhysicalTimeSeconds(ts))
m.mu.Unlock()
}
func (m *TimeTickMetrics) Close() {
// mark as closed and delete all labeled metrics
m.mu.Close()
metrics.WALAllocateTimeTickTotal.Delete(m.constLabel)
metrics.WALLastAllocatedTimeTick.Delete(m.constLabel)
metrics.WALLastConfirmedTimeTick.Delete(m.constLabel)
metrics.WALAcknowledgeTimeTickTotal.DeletePartialMatch(m.constLabel)
metrics.WALSyncTimeTickTotal.DeletePartialMatch(m.constLabel)
metrics.WALTimeTickSyncTimeTick.DeletePartialMatch(m.constLabel)
metrics.WALTimeTickSyncTotal.DeletePartialMatch(m.constLabel)
}

View File

@ -0,0 +1,53 @@
package metricsutil
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/syncutil"
)
func NewTxnMetrics(pchannel string) *TxnMetrics {
constLabel := prometheus.Labels{
metrics.NodeIDLabelName: paramtable.GetStringNodeID(),
metrics.WALChannelLabelName: pchannel,
}
return &TxnMetrics{
mu: syncutil.ClosableLock{},
constLabel: constLabel,
inflightTxnGauge: metrics.WALInflightTxn.With(constLabel),
txnCounter: metrics.WALFinishTxn.MustCurryWith(constLabel),
}
}
type TxnMetrics struct {
mu syncutil.ClosableLock
constLabel prometheus.Labels
inflightTxnGauge prometheus.Gauge
txnCounter *prometheus.CounterVec
}
func (m *TxnMetrics) BeginTxn() {
if !m.mu.LockIfNotClosed() {
return
}
m.inflightTxnGauge.Inc()
m.mu.Unlock()
}
func (m *TxnMetrics) Finish(state message.TxnState) {
if !m.mu.LockIfNotClosed() {
return
}
m.inflightTxnGauge.Dec()
m.txnCounter.WithLabelValues(state.String()).Inc()
m.mu.Unlock()
}
func (m *TxnMetrics) Close() {
m.mu.Close()
metrics.WALInflightTxn.Delete(m.constLabel)
metrics.WALFinishTxn.DeletePartialMatch(m.constLabel)
}

View File

@ -0,0 +1,133 @@
package metricsutil
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
func NewScanMetrics(pchannel types.PChannelInfo) *ScanMetrics {
constLabel := prometheus.Labels{
metrics.NodeIDLabelName: paramtable.GetStringNodeID(),
metrics.WALChannelLabelName: pchannel.Name,
}
return &ScanMetrics{
constLabel: constLabel,
messageBytes: metrics.WALScanMessageBytes.With(constLabel),
passMessageBytes: metrics.WALScanPassMessageBytes.With(constLabel),
messageTotal: metrics.WALScanMessageTotal.MustCurryWith(constLabel),
passMessageTotal: metrics.WALScanPassMessageTotal.MustCurryWith(constLabel),
timeTickViolationTotal: metrics.WALScanTimeTickViolationMessageTotal.MustCurryWith(constLabel),
txnTotal: metrics.WALScanTxnTotal.MustCurryWith(constLabel),
pendingQueueSize: metrics.WALScannerPendingQueueBytes.With(constLabel),
timeTickBufSize: metrics.WALScannerTimeTickBufBytes.With(constLabel),
txnBufSize: metrics.WALScannerTxnBufBytes.With(constLabel),
}
}
type ScanMetrics struct {
constLabel prometheus.Labels
messageBytes prometheus.Observer
passMessageBytes prometheus.Observer
messageTotal *prometheus.CounterVec
passMessageTotal *prometheus.CounterVec
timeTickViolationTotal *prometheus.CounterVec
txnTotal *prometheus.CounterVec
timeTickBufSize prometheus.Gauge
txnBufSize prometheus.Gauge
pendingQueueSize prometheus.Gauge
}
// ObserveMessage observes the message.
func (m *ScanMetrics) ObserveMessage(msgType message.MessageType, bytes int) {
m.messageBytes.Observe(float64(bytes))
m.messageTotal.WithLabelValues(msgType.String()).Inc()
}
// ObserveFilteredMessage observes the filtered message.
func (m *ScanMetrics) ObserveFilteredMessage(msgType message.MessageType, bytes int) {
m.passMessageBytes.Observe(float64(bytes))
m.passMessageTotal.WithLabelValues(msgType.String()).Inc()
}
// ObserveTimeTickViolation observes the time tick violation.
func (m *ScanMetrics) ObserveTimeTickViolation(msgType message.MessageType) {
m.timeTickViolationTotal.WithLabelValues(msgType.String()).Inc()
}
// ObserveAutoCommitTxn observes the auto commit txn.
func (m *ScanMetrics) ObserveAutoCommitTxn() {
m.txnTotal.WithLabelValues("autocommit").Inc()
}
// ObserveTxn observes the txn.
func (m *ScanMetrics) ObserveTxn(state message.TxnState) {
m.txnTotal.WithLabelValues(state.String()).Inc()
}
// ObserveErrorTxn observes the error txn.
func (m *ScanMetrics) ObserveErrorTxn() {
m.txnTotal.WithLabelValues("error").Inc()
}
// ObserveExpiredTxn observes the expired txn.
func (m *ScanMetrics) ObserveExpiredTxn() {
m.txnTotal.WithLabelValues("expired").Inc()
}
// NewScannerMetrics creates a new scanner metrics.
func (m *ScanMetrics) NewScannerMetrics() *ScannerMetrics {
return &ScannerMetrics{
ScanMetrics: m,
previousTxnBufSize: 0,
previousTimeTickBufSize: 0,
previousPendingQueueSize: 0,
}
}
// Close closes the metrics.
func (m *ScanMetrics) Close() {
metrics.WALScanMessageBytes.Delete(m.constLabel)
metrics.WALScanPassMessageBytes.DeletePartialMatch(m.constLabel)
metrics.WALScanMessageTotal.DeletePartialMatch(m.constLabel)
metrics.WALScanPassMessageTotal.DeletePartialMatch(m.constLabel)
metrics.WALScanTimeTickViolationMessageTotal.DeletePartialMatch(m.constLabel)
metrics.WALScanTxnTotal.DeletePartialMatch(m.constLabel)
metrics.WALScannerTimeTickBufBytes.Delete(m.constLabel)
metrics.WALScannerTxnBufBytes.Delete(m.constLabel)
metrics.WALScannerPendingQueueBytes.Delete(m.constLabel)
}
type ScannerMetrics struct {
*ScanMetrics
previousTxnBufSize int
previousTimeTickBufSize int
previousPendingQueueSize int
}
func (m *ScannerMetrics) UpdatePendingQueueSize(size int) {
diff := size - m.previousPendingQueueSize
m.pendingQueueSize.Add(float64(diff))
m.previousPendingQueueSize = size
}
func (m *ScannerMetrics) UpdateTxnBufSize(size int) {
diff := size - m.previousTimeTickBufSize
m.timeTickBufSize.Add(float64(diff))
m.previousTimeTickBufSize = size
}
func (m *ScannerMetrics) UpdateTimeTickBufSize(size int) {
diff := size - m.previousTxnBufSize
m.txnBufSize.Add(float64(diff))
m.previousTxnBufSize = size
}
func (m *ScannerMetrics) Close() {
m.UpdatePendingQueueSize(0)
m.UpdateTimeTickBufSize(0)
m.UpdateTimeTickBufSize(0)
}

View File

@ -0,0 +1,107 @@
package metricsutil
import (
"strconv"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
// NewWriteMetrics creates a new WriteMetrics.
func NewWriteMetrics(pchannel types.PChannelInfo, walName string) *WriteMetrics {
constLabel := prometheus.Labels{
metrics.NodeIDLabelName: paramtable.GetStringNodeID(),
metrics.WALChannelLabelName: pchannel.Name,
}
metrics.WALInfo.WithLabelValues(
paramtable.GetStringNodeID(),
pchannel.Name,
strconv.FormatInt(pchannel.Term, 10),
walName).Set(1)
return &WriteMetrics{
walName: walName,
pchannel: pchannel,
constLabel: constLabel,
bytes: metrics.WALAppendMessageBytes.MustCurryWith(constLabel),
total: metrics.WALAppendMessageTotal.MustCurryWith(constLabel),
walDuration: metrics.WALAppendMessageDurationSeconds.MustCurryWith(constLabel),
walimplsDuration: metrics.WALImplsAppendMessageDurationSeconds.MustCurryWith(constLabel),
}
}
type WriteMetrics struct {
walName string
pchannel types.PChannelInfo
constLabel prometheus.Labels
bytes prometheus.ObserverVec
total *prometheus.CounterVec
walDuration prometheus.ObserverVec
walimplsDuration prometheus.ObserverVec
}
func (m *WriteMetrics) StartAppend(msgType message.MessageType, bytes int) *WriteGuard {
return &WriteGuard{
startAppend: time.Now(),
metrics: m,
msgType: msgType,
bytes: bytes,
}
}
func (m *WriteMetrics) Close() {
metrics.WALAppendMessageBytes.DeletePartialMatch(m.constLabel)
metrics.WALAppendMessageTotal.DeletePartialMatch(m.constLabel)
metrics.WALAppendMessageDurationSeconds.DeletePartialMatch(m.constLabel)
metrics.WALImplsAppendMessageDurationSeconds.DeletePartialMatch(m.constLabel)
metrics.WALInfo.DeleteLabelValues(
paramtable.GetStringNodeID(),
m.pchannel.Name,
strconv.FormatInt(m.pchannel.Term, 10),
m.walName,
)
}
type WriteGuard struct {
startAppend time.Time
startImplAppend time.Time
implCost time.Duration
metrics *WriteMetrics
msgType message.MessageType
bytes int
}
func (g *WriteGuard) StartWALImplAppend() {
g.startImplAppend = time.Now()
}
func (g *WriteGuard) FinishWALImplAppend() {
g.implCost = time.Since(g.startImplAppend)
}
func (g *WriteGuard) Finish(err error) {
status := parseError(err)
if g.implCost != 0 {
g.metrics.walimplsDuration.WithLabelValues(status).Observe(g.implCost.Seconds())
}
g.metrics.bytes.WithLabelValues(status).Observe(float64(g.bytes))
g.metrics.total.WithLabelValues(g.msgType.String(), status).Inc()
g.metrics.walDuration.WithLabelValues(status).Observe(time.Since(g.startAppend).Seconds())
}
// parseError parses the error to status.
func parseError(err error) string {
if err == nil {
return metrics.StreamingServiceClientStatusOK
}
if status.IsCanceled(err) {
return metrics.StreamingServiceClientStatusCancel
}
return metrics.StreamignServiceClientStatusError
}

View File

@ -0,0 +1,38 @@
package utility
import (
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type PendingQueue struct {
*typeutil.MultipartQueue[message.ImmutableMessage]
bytes int
}
func NewPendingQueue() *PendingQueue {
return &PendingQueue{
MultipartQueue: typeutil.NewMultipartQueue[message.ImmutableMessage](),
}
}
func (q *PendingQueue) Bytes() int {
return q.bytes
}
func (q *PendingQueue) Add(msg []message.ImmutableMessage) {
for _, m := range msg {
q.bytes += m.EstimateSize()
}
q.MultipartQueue.Add(msg)
}
func (q *PendingQueue) AddOne(msg message.ImmutableMessage) {
q.bytes += msg.EstimateSize()
q.MultipartQueue.AddOne(msg)
}
func (q *PendingQueue) UnsafeAdvance() {
q.bytes -= q.MultipartQueue.Next().EstimateSize()
q.MultipartQueue.UnsafeAdvance()
}

View File

@ -11,6 +11,7 @@ import (
type ReOrderByTimeTickBuffer struct {
messageHeap typeutil.Heap[message.ImmutableMessage]
lastPopTimeTick uint64
bytes int
}
// NewReOrderBuffer creates a new ReOrderBuffer.
@ -28,6 +29,7 @@ func (r *ReOrderByTimeTickBuffer) Push(msg message.ImmutableMessage) error {
return errors.Errorf("message time tick is less than last pop time tick: %d", r.lastPopTimeTick)
}
r.messageHeap.Push(msg)
r.bytes += msg.EstimateSize()
return nil
}
@ -36,6 +38,7 @@ func (r *ReOrderByTimeTickBuffer) Push(msg message.ImmutableMessage) error {
func (r *ReOrderByTimeTickBuffer) PopUtilTimeTick(timetick uint64) []message.ImmutableMessage {
var res []message.ImmutableMessage
for r.messageHeap.Len() > 0 && r.messageHeap.Peek().TimeTick() <= timetick {
r.bytes -= r.messageHeap.Peek().EstimateSize()
res = append(res, r.messageHeap.Pop())
}
r.lastPopTimeTick = timetick
@ -46,3 +49,7 @@ func (r *ReOrderByTimeTickBuffer) PopUtilTimeTick(timetick uint64) []message.Imm
func (r *ReOrderByTimeTickBuffer) Len() int {
return r.messageHeap.Len()
}
func (r *ReOrderByTimeTickBuffer) Bytes() int {
return r.bytes
}

View File

@ -14,6 +14,7 @@ func TestReOrderByTimeTickBuffer(t *testing.T) {
timeticks := rand.Perm(25)
for i, timetick := range timeticks {
msg := mock_message.NewMockImmutableMessage(t)
msg.EXPECT().EstimateSize().Return(1)
msg.EXPECT().TimeTick().Return(uint64(timetick + 1))
buf.Push(msg)
assert.Equal(t, i+1, buf.Len())

View File

@ -3,15 +3,17 @@ package utility
import (
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/metricsutil"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
)
// NewTxnBuffer creates a new txn buffer.
func NewTxnBuffer(logger *log.MLogger) *TxnBuffer {
func NewTxnBuffer(logger *log.MLogger, metrics *metricsutil.ScannerMetrics) *TxnBuffer {
return &TxnBuffer{
logger: logger,
builders: make(map[message.TxnID]*message.ImmutableTxnMessageBuilder),
metrics: metrics,
}
}
@ -19,6 +21,12 @@ func NewTxnBuffer(logger *log.MLogger) *TxnBuffer {
type TxnBuffer struct {
logger *log.MLogger
builders map[message.TxnID]*message.ImmutableTxnMessageBuilder
metrics *metricsutil.ScannerMetrics
bytes int
}
func (b *TxnBuffer) Bytes() int {
return b.bytes
}
// HandleImmutableMessages handles immutable messages.
@ -29,6 +37,7 @@ func (b *TxnBuffer) HandleImmutableMessages(msgs []message.ImmutableMessage, ts
for _, msg := range msgs {
// Not a txn message, can be consumed right now.
if msg.TxnContext() == nil {
b.metrics.ObserveAutoCommitTxn()
result = append(result, msg)
continue
}
@ -69,6 +78,7 @@ func (b *TxnBuffer) handleBeginTxn(msg message.ImmutableMessage) {
return
}
b.builders[beginMsg.TxnContext().TxnID] = message.NewImmutableTxnMessageBuilder(beginMsg)
b.bytes += beginMsg.EstimateSize()
}
// handleCommitTxn handles commit txn message.
@ -93,9 +103,11 @@ func (b *TxnBuffer) handleCommitTxn(msg message.ImmutableMessage) message.Immuta
}
// build the txn message and remove it from buffer.
b.bytes -= builder.EstimateSize()
txnMsg, err := builder.Build(commitMsg)
delete(b.builders, commitMsg.TxnContext().TxnID)
if err != nil {
b.metrics.ObserveErrorTxn()
b.logger.Warn(
"failed to build txn message, it's a critical error, some data is lost",
zap.Int64("txnID", int64(commitMsg.TxnContext().TxnID)),
@ -108,6 +120,7 @@ func (b *TxnBuffer) handleCommitTxn(msg message.ImmutableMessage) message.Immuta
zap.Int64("txnID", int64(commitMsg.TxnContext().TxnID)),
zap.Any("messageID", commitMsg.MessageID()),
)
b.metrics.ObserveTxn(message.TxnStateCommitted)
return txnMsg
}
@ -127,8 +140,12 @@ func (b *TxnBuffer) handleRollbackTxn(msg message.ImmutableMessage) {
zap.Int64("txnID", int64(rollbackMsg.TxnContext().TxnID)),
zap.Any("messageID", rollbackMsg.MessageID()),
)
// just drop the txn from buffer.
delete(b.builders, rollbackMsg.TxnContext().TxnID)
if builder, ok := b.builders[rollbackMsg.TxnContext().TxnID]; ok {
// just drop the txn from buffer.
delete(b.builders, rollbackMsg.TxnContext().TxnID)
b.bytes -= builder.EstimateSize()
b.metrics.ObserveTxn(message.TxnStateRollbacked)
}
}
// handleTxnBodyMessage handles txn body message.
@ -143,6 +160,7 @@ func (b *TxnBuffer) handleTxnBodyMessage(msg message.ImmutableMessage) {
return
}
builder.Add(msg)
b.bytes += msg.EstimateSize()
}
// clearExpiredTxn clears the expired txn.
@ -150,6 +168,8 @@ func (b *TxnBuffer) clearExpiredTxn(ts uint64) {
for txnID, builder := range b.builders {
if builder.ExpiredTimeTick() <= ts {
delete(b.builders, txnID)
b.bytes -= builder.EstimateSize()
b.metrics.ObserveExpiredTxn()
if b.logger.Level().Enabled(zap.DebugLevel) {
b.logger.Debug(
"the txn is expired, so drop the txn from buffer",

View File

@ -7,8 +7,10 @@ import (
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/metricsutil"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/walimplstest"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
@ -17,7 +19,7 @@ import (
var idAllocator = typeutil.NewIDAllocator()
func TestTxnBuffer(t *testing.T) {
b := NewTxnBuffer(log.With())
b := NewTxnBuffer(log.With(), metricsutil.NewScanMetrics(types.PChannelInfo{}).NewScannerMetrics())
baseTso := tsoutil.GetCurrentTime()

View File

@ -363,6 +363,8 @@ func RegisterDataCoord(registry *prometheus.Registry) {
registry.MustRegister(GarbageCollectorRunCount)
registry.MustRegister(DataCoordTaskExecuteLatency)
registry.MustRegister(TaskNum)
registerStreamingCoord(registry)
}
func CleanupDataCoordSegmentMetrics(dbName string, collectionID int64, segmentID int64) {

View File

@ -88,7 +88,6 @@ const (
collectionIDLabelName = "collection_id"
partitionIDLabelName = "partition_id"
channelNameLabelName = "channel_name"
channelTermLabelName = "channel_term"
functionLabelName = "function_name"
queryTypeLabelName = "query_type"
collectionName = "collection_name"

View File

@ -39,6 +39,8 @@ func TestRegisterMetrics(t *testing.T) {
RegisterStorageMetrics(r)
RegisterMsgStreamMetrics(r)
RegisterCGOMetrics(r)
RegisterStreamingServiceClient(r)
RegisterStreamingNode(r)
})
}

View File

@ -447,6 +447,8 @@ func RegisterProxy(registry *prometheus.Registry) {
registry.MustRegister(ProxyReqInQueueLatency)
registry.MustRegister(MaxInsertRate)
RegisterStreamingServiceClient(registry)
}
func CleanupProxyDBMetrics(nodeID int64, dbName string) {

View File

@ -834,6 +834,8 @@ func RegisterQueryNode(registry *prometheus.Registry) {
registry.MustRegister(QueryNodeSearchHitSegmentNum)
// Add cgo metrics
RegisterCGOMetrics(registry)
RegisterStreamingServiceClient(registry)
}
func CleanupQueryNodeCollectionMetrics(nodeID int64, collectionID int64) {

View File

@ -275,6 +275,8 @@ func RegisterRootCoord(registry *prometheus.Registry) {
registry.MustRegister(QueryNodeMemoryHighWaterLevel)
registry.MustRegister(DiskQuota)
RegisterStreamingServiceClient(registry)
}
func CleanupRootCoordDBMetrics(dbName string) {

View File

@ -9,55 +9,87 @@ import (
)
const (
subsystemStreamingServiceClient = "streaming"
StreamingServiceClientProducerAvailable = "available"
StreamingServiceClientProducerUnAvailable = "unavailable"
subsystemStreamingServiceClient = "streaming"
subsystemWAL = "wal"
StreamingServiceClientStatusAvailable = "available"
StreamingServiceClientStatusUnavailable = "unavailable"
StreamingServiceClientStatusOK = "ok"
StreamingServiceClientStatusCancel = "cancel"
StreamignServiceClientStatusError = "error"
TimeTickSyncTypeLabelName = "type"
TimeTickAckTypeLabelName = "type"
WALTxnStateLabelName = "state"
WALChannelLabelName = channelNameLabelName
WALSegmentSealPolicyNameLabelName = "policy"
WALSegmentAllocStateLabelName = "state"
WALMessageTypeLabelName = "message_type"
WALChannelTermLabelName = "term"
WALNameLabelName = "wal_name"
WALTxnTypeLabelName = "txn_type"
StatusLabelName = statusLabelName
StreamingNodeLabelName = "streaming_node"
NodeIDLabelName = nodeIDLabelName
)
var (
logServiceClientRegisterOnce sync.Once
StreamingServiceClientRegisterOnce sync.Once
// from 64 bytes to 5MB
bytesBuckets = prometheus.ExponentialBucketsRange(64, 5242880, 10)
// from 64 bytes to 8MB
messageBytesBuckets = prometheus.ExponentialBucketsRange(64, 8388608, 10)
// from 1ms to 5s
secondsBuckets = prometheus.ExponentialBucketsRange(0.001, 5, 10)
// Client side metrics
// Streaming Service Client Producer Metrics.
StreamingServiceClientProducerTotal = newStreamingServiceClientGaugeVec(prometheus.GaugeOpts{
Name: "producer_total",
Help: "Total of producers",
}, statusLabelName)
}, WALChannelLabelName, StatusLabelName)
StreamingServiceClientConsumerTotal = newStreamingServiceClientGaugeVec(prometheus.GaugeOpts{
Name: "consumer_total",
Help: "Total of consumers",
}, statusLabelName)
StreamingServiceClientProduceInflightTotal = newStreamingServiceClientGaugeVec(prometheus.GaugeOpts{
Name: "produce_inflight_total",
Help: "Total of inflight produce request",
}, WALChannelLabelName)
StreamingServiceClientProduceBytes = newStreamingServiceClientHistogramVec(prometheus.HistogramOpts{
Name: "produce_bytes",
Help: "Bytes of produced message",
Buckets: bytesBuckets,
}, statusLabelName)
StreamingServiceClientConsumeBytes = newStreamingServiceClientHistogramVec(prometheus.HistogramOpts{
Name: "consume_bytes",
Help: "Bytes of consumed message",
Buckets: bytesBuckets,
})
Buckets: messageBytesBuckets,
}, WALChannelLabelName, StatusLabelName)
StreamingServiceClientProduceDurationSeconds = newStreamingServiceClientHistogramVec(
prometheus.HistogramOpts{
Name: "produce_duration_seconds",
Help: "Duration of client produce",
Buckets: secondsBuckets,
},
statusLabelName,
)
}, WALChannelLabelName, StatusLabelName)
// Streaming Service Client Consumer Metrics.
StreamingServiceClientConsumerTotal = newStreamingServiceClientGaugeVec(prometheus.GaugeOpts{
Name: "consumer_total",
Help: "Total of consumers",
}, WALChannelLabelName, StatusLabelName)
StreamingServiceClientConsumeBytes = newStreamingServiceClientHistogramVec(prometheus.HistogramOpts{
Name: "consume_bytes",
Help: "Bytes of consumed message",
Buckets: messageBytesBuckets,
}, WALChannelLabelName)
StreamingServiceClientConsumeInflightTotal = newStreamingServiceClientGaugeVec(prometheus.GaugeOpts{
Name: "consume_inflight_total",
Help: "Total of inflight consume body",
}, WALChannelLabelName)
// StreamingCoord metrics
StreamingCoordPChannelTotal = newStreamingCoordGaugeVec(prometheus.GaugeOpts{
Name: "pchannel_total",
Help: "Total of pchannels",
StreamingCoordPChannelInfo = newStreamingCoordGaugeVec(prometheus.GaugeOpts{
Name: "pchannel_info",
Help: "Term of pchannels",
}, WALChannelLabelName, WALChannelTermLabelName, StreamingNodeLabelName)
StreamingCoordAssignmentVersion = newStreamingCoordGaugeVec(prometheus.GaugeOpts{
Name: "assignment_info",
Help: "Info of assignment",
})
StreamingCoordAssignmentListenerTotal = newStreamingCoordGaugeVec(prometheus.GaugeOpts{
@ -65,71 +97,256 @@ var (
Help: "Total of assignment listener",
})
StreamingCoordAssignmentVersion = newStreamingCoordGaugeVec(prometheus.GaugeOpts{
Name: "assignment_info",
Help: "Info of assignment",
})
// StreamingNode metrics
StreamingNodeWALTotal = newStreamingNodeGaugeVec(prometheus.GaugeOpts{
Name: "wal_total",
Help: "Total of wal",
})
// StreamingNode Producer Server Metrics.
StreamingNodeProducerTotal = newStreamingNodeGaugeVec(prometheus.GaugeOpts{
Name: "producer_total",
Help: "Total of producers",
})
Help: "Total of producers on current streaming node",
}, WALChannelLabelName)
StreamingNodeProduceInflightTotal = newStreamingNodeGaugeVec(prometheus.GaugeOpts{
Name: "produce_inflight_total",
Help: "Total of inflight produce request",
}, WALChannelLabelName)
// StreamingNode Consumer Server Metrics.
StreamingNodeConsumerTotal = newStreamingNodeGaugeVec(prometheus.GaugeOpts{
Name: "consumer_total",
Help: "Total of consumers",
})
Help: "Total of consumers on current streaming node",
}, WALChannelLabelName)
StreamingNodeProduceBytes = newStreamingNodeHistogramVec(prometheus.HistogramOpts{
Name: "produce_bytes",
Help: "Bytes of produced message",
Buckets: bytesBuckets,
}, channelNameLabelName, channelTermLabelName, statusLabelName)
StreamingNodeConsumeInflightTotal = newStreamingNodeGaugeVec(prometheus.GaugeOpts{
Name: "consume_inflight_total",
Help: "Total of inflight consume body",
}, WALChannelLabelName)
StreamingNodeConsumeBytes = newStreamingNodeHistogramVec(prometheus.HistogramOpts{
Name: "consume_bytes",
Help: "Bytes of consumed message",
Buckets: bytesBuckets,
}, channelNameLabelName, channelTermLabelName)
Buckets: messageBytesBuckets,
}, WALChannelLabelName)
StreamingNodeProduceDurationSeconds = newStreamingNodeHistogramVec(prometheus.HistogramOpts{
Name: "produce_duration_seconds",
Help: "Duration of producing message",
// WAL WAL metrics
WALInfo = newWALGaugeVec(prometheus.GaugeOpts{
Name: "info",
Help: "current info of wal on current streaming node",
}, WALChannelLabelName, WALChannelTermLabelName, WALNameLabelName)
// TimeTick related metrics
WALLastAllocatedTimeTick = newWALGaugeVec(prometheus.GaugeOpts{
Name: "last_allocated_time_tick",
Help: "Current max allocated time tick of wal",
}, WALChannelLabelName)
WALAllocateTimeTickTotal = newWALCounterVec(prometheus.CounterOpts{
Name: "allocate_time_tick_total",
Help: "Total of allocated time tick on wal",
}, WALChannelLabelName)
WALLastConfirmedTimeTick = newWALGaugeVec(prometheus.GaugeOpts{
Name: "last_confirmed_time_tick",
Help: "Current max confirmed time tick of wal",
}, WALChannelLabelName)
WALAcknowledgeTimeTickTotal = newWALCounterVec(prometheus.CounterOpts{
Name: "acknowledge_time_tick_total",
Help: "Total of acknowledge time tick on wal",
}, WALChannelLabelName, TimeTickAckTypeLabelName)
WALSyncTimeTickTotal = newWALCounterVec(prometheus.CounterOpts{
Name: "sync_time_tick_total",
Help: "Total of sync time tick on wal",
}, WALChannelLabelName, TimeTickAckTypeLabelName)
WALTimeTickSyncTotal = newWALCounterVec(prometheus.CounterOpts{
Name: "sync_total",
Help: "Total of time tick sync sent",
}, WALChannelLabelName, TimeTickSyncTypeLabelName)
WALTimeTickSyncTimeTick = newWALGaugeVec(prometheus.GaugeOpts{
Name: "sync_time_tick",
Help: "Max time tick of time tick sync sent",
}, WALChannelLabelName, TimeTickSyncTypeLabelName)
// Txn Related Metrics
WALInflightTxn = newWALGaugeVec(prometheus.GaugeOpts{
Name: "inflight_txn",
Help: "Total of inflight txn on wal",
}, WALChannelLabelName)
WALFinishTxn = newWALCounterVec(prometheus.CounterOpts{
Name: "finish_txn",
Help: "Total of finish txn on wal",
}, WALChannelLabelName, WALTxnStateLabelName)
// Segment related metrics
WALSegmentAllocTotal = newWALGaugeVec(prometheus.GaugeOpts{
Name: "segment_assign_segment_alloc_total",
Help: "Total of segment alloc on wal",
}, WALChannelLabelName, WALSegmentAllocStateLabelName)
WALSegmentFlushedTotal = newWALCounterVec(prometheus.CounterOpts{
Name: "segment_assign_flushed_segment_total",
Help: "Total of segment sealed on wal",
}, WALChannelLabelName, WALSegmentSealPolicyNameLabelName)
WALSegmentBytes = newWALHistogramVec(prometheus.HistogramOpts{
Name: "segment_assign_segment_bytes",
Help: "Bytes of segment alloc on wal",
Buckets: prometheus.ExponentialBucketsRange(5242880, 1073741824, 10), // 5MB -> 1024MB
}, WALChannelLabelName)
WALPartitionTotal = newWALGaugeVec(prometheus.GaugeOpts{
Name: "segment_assign_partition_total",
Help: "Total of partition on wal",
}, WALChannelLabelName)
WALCollectionTotal = newWALGaugeVec(prometheus.GaugeOpts{
Name: "segment_assign_collection_total",
Help: "Total of collection on wal",
}, WALChannelLabelName)
// Append Related Metrics
WALAppendMessageBytes = newWALHistogramVec(prometheus.HistogramOpts{
Name: "append_message_bytes",
Help: "Bytes of append message to wal",
Buckets: messageBytesBuckets,
}, WALChannelLabelName, StatusLabelName)
WALAppendMessageTotal = newWALCounterVec(prometheus.CounterOpts{
Name: "append_message_total",
Help: "Total of append message to wal",
}, WALChannelLabelName, WALMessageTypeLabelName, StatusLabelName)
WALAppendMessageDurationSeconds = newStreamingNodeHistogramVec(prometheus.HistogramOpts{
Name: "append_message_duration_seconds",
Help: "Duration of wal append message",
Buckets: secondsBuckets,
}, channelNameLabelName, channelTermLabelName, statusLabelName)
}, WALChannelLabelName, StatusLabelName)
WALImplsAppendMessageDurationSeconds = newStreamingNodeHistogramVec(prometheus.HistogramOpts{
Name: "impls_append_message_duration_seconds",
Help: "Duration of wal impls append message",
Buckets: secondsBuckets,
}, WALChannelLabelName, StatusLabelName)
// Scanner Related Metrics
WALScannerTotal = newWALGaugeVec(prometheus.GaugeOpts{
Name: "scanner_total",
Help: "Total of wal scanner on current streaming node",
}, WALChannelLabelName)
WALScanMessageBytes = newWALHistogramVec(prometheus.HistogramOpts{
Name: "scan_message_bytes",
Help: "Bytes of scanned message from wal",
Buckets: messageBytesBuckets,
}, WALChannelLabelName)
WALScanMessageTotal = newWALCounterVec(prometheus.CounterOpts{
Name: "scan_message_total",
Help: "Total of scanned message from wal",
}, WALChannelLabelName, WALMessageTypeLabelName)
WALScanPassMessageBytes = newWALHistogramVec(prometheus.HistogramOpts{
Name: "scan_pass_message_bytes",
Help: "Bytes of pass (not filtered) scanned message from wal",
Buckets: messageBytesBuckets,
}, WALChannelLabelName)
WALScanPassMessageTotal = newWALCounterVec(prometheus.CounterOpts{
Name: "scan_pass_message_total",
Help: "Total of pass (not filtered) scanned message from wal",
}, WALChannelLabelName, WALMessageTypeLabelName)
WALScanTimeTickViolationMessageTotal = newWALCounterVec(prometheus.CounterOpts{
Name: "scan_time_tick_violation_message_total",
Help: "Total of time tick violation message (dropped) from wal",
}, WALChannelLabelName, WALMessageTypeLabelName)
WALScanTxnTotal = newWALCounterVec(prometheus.CounterOpts{
Name: "scan_txn_total",
Help: "Total of scanned txn from wal",
}, WALChannelLabelName, WALTxnStateLabelName)
WALScannerPendingQueueBytes = newWALGaugeVec(prometheus.GaugeOpts{
Name: "scanner_pending_queue_bytes",
Help: "Size of pending queue in wal scanner",
}, WALChannelLabelName)
WALScannerTimeTickBufBytes = newWALGaugeVec(prometheus.GaugeOpts{
Name: "scanner_time_tick_buf_bytes",
Help: "Size of time tick buffer in wal scanner",
}, WALChannelLabelName)
WALScannerTxnBufBytes = newWALGaugeVec(prometheus.GaugeOpts{
Name: "scanner_txn_buf_bytes",
Help: "Size of txn buffer in wal scanner",
}, WALChannelLabelName)
)
// RegisterStreamingServiceClient registers streaming service client metrics
func RegisterStreamingServiceClient(registry *prometheus.Registry) {
logServiceClientRegisterOnce.Do(func() {
StreamingServiceClientRegisterOnce.Do(func() {
registry.MustRegister(StreamingServiceClientProducerTotal)
registry.MustRegister(StreamingServiceClientConsumerTotal)
registry.MustRegister(StreamingServiceClientProduceInflightTotal)
registry.MustRegister(StreamingServiceClientProduceBytes)
registry.MustRegister(StreamingServiceClientConsumeBytes)
registry.MustRegister(StreamingServiceClientProduceDurationSeconds)
registry.MustRegister(StreamingServiceClientConsumerTotal)
registry.MustRegister(StreamingServiceClientConsumeBytes)
registry.MustRegister(StreamingServiceClientConsumeInflightTotal)
})
}
// RegisterStreamingCoord registers log service metrics
func RegisterStreamingCoord(registry *prometheus.Registry) {
registry.MustRegister(StreamingCoordPChannelTotal)
registry.MustRegister(StreamingCoordAssignmentListenerTotal)
// registerStreamingCoord registers streaming coord metrics
func registerStreamingCoord(registry *prometheus.Registry) {
registry.MustRegister(StreamingCoordPChannelInfo)
registry.MustRegister(StreamingCoordAssignmentVersion)
registry.MustRegister(StreamingCoordAssignmentListenerTotal)
}
// RegisterStreamingNode registers log service metrics
// RegisterStreamingNode registers streaming node metrics
func RegisterStreamingNode(registry *prometheus.Registry) {
registry.MustRegister(StreamingNodeWALTotal)
registry.MustRegister(StreamingNodeProducerTotal)
registry.MustRegister(StreamingNodeProduceInflightTotal)
registry.MustRegister(StreamingNodeConsumerTotal)
registry.MustRegister(StreamingNodeProduceBytes)
registry.MustRegister(StreamingNodeConsumeInflightTotal)
registry.MustRegister(StreamingNodeConsumeBytes)
registry.MustRegister(StreamingNodeProduceDurationSeconds)
registerWAL(registry)
}
// registerWAL registers wal metrics
func registerWAL(registry *prometheus.Registry) {
registry.MustRegister(WALInfo)
registry.MustRegister(WALLastAllocatedTimeTick)
registry.MustRegister(WALAllocateTimeTickTotal)
registry.MustRegister(WALLastConfirmedTimeTick)
registry.MustRegister(WALAcknowledgeTimeTickTotal)
registry.MustRegister(WALSyncTimeTickTotal)
registry.MustRegister(WALTimeTickSyncTotal)
registry.MustRegister(WALTimeTickSyncTimeTick)
registry.MustRegister(WALInflightTxn)
registry.MustRegister(WALFinishTxn)
registry.MustRegister(WALSegmentAllocTotal)
registry.MustRegister(WALSegmentFlushedTotal)
registry.MustRegister(WALSegmentBytes)
registry.MustRegister(WALPartitionTotal)
registry.MustRegister(WALCollectionTotal)
registry.MustRegister(WALAppendMessageBytes)
registry.MustRegister(WALAppendMessageTotal)
registry.MustRegister(WALAppendMessageDurationSeconds)
registry.MustRegister(WALImplsAppendMessageDurationSeconds)
registry.MustRegister(WALScannerTotal)
registry.MustRegister(WALScanMessageBytes)
registry.MustRegister(WALScanMessageTotal)
registry.MustRegister(WALScanPassMessageBytes)
registry.MustRegister(WALScanPassMessageTotal)
registry.MustRegister(WALScanTimeTickViolationMessageTotal)
registry.MustRegister(WALScanTxnTotal)
registry.MustRegister(WALScannerPendingQueueBytes)
registry.MustRegister(WALScannerTimeTickBufBytes)
registry.MustRegister(WALScannerTxnBufBytes)
}
func newStreamingCoordGaugeVec(opts prometheus.GaugeOpts, extra ...string) *prometheus.GaugeVec {
@ -167,9 +384,30 @@ func newStreamingNodeHistogramVec(opts prometheus.HistogramOpts, extra ...string
return prometheus.NewHistogramVec(opts, labels)
}
func newWALGaugeVec(opts prometheus.GaugeOpts, extra ...string) *prometheus.GaugeVec {
opts.Namespace = milvusNamespace
opts.Subsystem = subsystemWAL
labels := mergeLabel(extra...)
return prometheus.NewGaugeVec(opts, labels)
}
func newWALCounterVec(opts prometheus.CounterOpts, extra ...string) *prometheus.CounterVec {
opts.Namespace = milvusNamespace
opts.Subsystem = subsystemWAL
labels := mergeLabel(extra...)
return prometheus.NewCounterVec(opts, labels)
}
func newWALHistogramVec(opts prometheus.HistogramOpts, extra ...string) *prometheus.HistogramVec {
opts.Namespace = milvusNamespace
opts.Subsystem = subsystemWAL
labels := mergeLabel(extra...)
return prometheus.NewHistogramVec(opts, labels)
}
func mergeLabel(extra ...string) []string {
labels := make([]string, 0, 1+len(extra))
labels = append(labels, nodeIDLabelName)
labels = append(labels, NodeIDLabelName)
labels = append(labels, extra...)
return labels
}

View File

@ -192,6 +192,15 @@ func (b *ImmutableTxnMessageBuilder) Add(msg ImmutableMessage) *ImmutableTxnMess
return b
}
// EstimateSize estimates the size of the txn message.
func (b *ImmutableTxnMessageBuilder) EstimateSize() int {
size := b.begin.EstimateSize()
for _, m := range b.messages {
size += m.EstimateSize()
}
return size
}
// Build builds a txn message.
func (b *ImmutableTxnMessageBuilder) Build(commit ImmutableCommitTxnMessageV2) (ImmutableTxnMessage, error) {
msg, err := newImmutableTxnMesasgeFromWAL(b.begin, b.messages, commit)

View File

@ -23,12 +23,3 @@ func (cmh ChanMessageHandler) Handle(msg ImmutableMessage) {
func (cmh ChanMessageHandler) Close() {
close(cmh)
}
// NopCloseHandler is a handler that do nothing when close.
type NopCloseHandler struct {
Handler
}
// Close is called after all messages are handled or handling is interrupted.
func (nch NopCloseHandler) Close() {
}

View File

@ -14,17 +14,4 @@ func TestMessageHandler(t *testing.T) {
h.Close()
_, ok := <-ch
assert.False(t, ok)
ch = make(chan ImmutableMessage, 100)
hNop := NopCloseHandler{
Handler: ChanMessageHandler(ch),
}
hNop.Handle(nil)
assert.Nil(t, <-ch)
hNop.Close()
select {
case <-ch:
panic("should not be closed")
default:
}
}

View File

@ -220,6 +220,15 @@ func (m *immutableTxnMessageImpl) Begin() ImmutableMessage {
return m.begin
}
// EstimateSize returns the estimated size of current message.
func (m *immutableTxnMessageImpl) EstimateSize() int {
size := 0
for _, msg := range m.messages {
size += msg.EstimateSize()
}
return size
}
// RangeOver iterates over the underlying messages in the transaction message.
func (m *immutableTxnMessageImpl) RangeOver(fn func(ImmutableMessage) error) error {
for _, msg := range m.messages {

View File

@ -0,0 +1,27 @@
package syncutil
import "sync"
type ClosableLock struct {
mu sync.Mutex
closed bool
}
func (l *ClosableLock) LockIfNotClosed() bool {
l.mu.Lock()
if l.closed {
l.mu.Unlock()
return false
}
return true
}
func (l *ClosableLock) Unlock() {
l.mu.Unlock()
}
func (l *ClosableLock) Close() {
l.mu.Lock()
l.closed = true
l.mu.Unlock()
}

View File

@ -55,6 +55,11 @@ func PhysicalTime(ts uint64) time.Time {
return physicalTime
}
// PhysicalTimeSeconds returns the physical time in seconds
func PhysicalTimeSeconds(ts uint64) float64 {
return float64(ts>>logicalBits) / 1000
}
// ParseHybridTs parses the ts to (physical, logical), physical part is of utc-timestamp format.
func ParseHybridTs(ts uint64) (int64, int64) {
logical := ts & logicalBitsMask