From 3db137f4ad2b6ca01410c17b0776259a4dae1bb6 Mon Sep 17 00:00:00 2001 From: congqixia Date: Thu, 24 Oct 2024 16:21:37 +0800 Subject: [PATCH] enhance: [2.4] Add metrics for querynode delete buffer info (#37081) (#37097) Cherry pick from master pr: #37081 Related to #35303 This PR add metrics for querynode delegator delete buffer information, which is related to dml quota logic. Signed-off-by: Congqi Xia --- internal/querynodev2/delegator/delegator.go | 24 ++++++++------ .../deletebuffer/list_delete_buffer.go | 33 +++++++++++++++---- .../deletebuffer/list_delete_buffer_test.go | 6 ++-- pkg/metrics/querynode_metrics.go | 26 +++++++++++++++ 4 files changed, 69 insertions(+), 20 deletions(-) diff --git a/internal/querynodev2/delegator/delegator.go b/internal/querynodev2/delegator/delegator.go index 2231253978..0eab841c28 100644 --- a/internal/querynodev2/delegator/delegator.go +++ b/internal/querynodev2/delegator/delegator.go @@ -792,6 +792,9 @@ func (sd *shardDelegator) Close() { // broadcast to all waitTsafe goroutine to quit sd.tsCond.Broadcast() sd.lifetime.Wait() + + metrics.QueryNodeDeleteBufferSize.DeleteLabelValues(fmt.Sprint(paramtable.GetNodeID()), sd.vchannelName) + metrics.QueryNodeDeleteBufferRowNum.DeleteLabelValues(fmt.Sprint(paramtable.GetNodeID()), sd.vchannelName) } // As partition stats is an optimization for search/query which is not mandatory for milvus instance, @@ -864,16 +867,17 @@ func NewShardDelegator(ctx context.Context, collectionID UniqueID, replicaID Uni excludedSegments := NewExcludedSegments(paramtable.Get().QueryNodeCfg.CleanExcludeSegInterval.GetAsDuration(time.Second)) sd := &shardDelegator{ - collectionID: collectionID, - replicaID: replicaID, - vchannelName: channel, - version: version, - collection: collection, - segmentManager: manager.Segment, - workerManager: workerManager, - lifetime: lifetime.NewLifetime(lifetime.Initializing), - distribution: NewDistribution(), - deleteBuffer: deletebuffer.NewListDeleteBuffer[*deletebuffer.Item](startTs, sizePerBlock), + collectionID: collectionID, + replicaID: replicaID, + vchannelName: channel, + version: version, + collection: collection, + segmentManager: manager.Segment, + workerManager: workerManager, + lifetime: lifetime.NewLifetime(lifetime.Initializing), + distribution: NewDistribution(), + deleteBuffer: deletebuffer.NewListDeleteBuffer[*deletebuffer.Item](startTs, sizePerBlock, + []string{fmt.Sprint(paramtable.GetNodeID()), channel}), pkOracle: pkoracle.NewPkOracle(), tsafeManager: tsafeManager, latestTsafe: atomic.NewUint64(startTs), diff --git a/internal/querynodev2/delegator/deletebuffer/list_delete_buffer.go b/internal/querynodev2/delegator/deletebuffer/list_delete_buffer.go index 8991f35bc5..2a17e1ec6e 100644 --- a/internal/querynodev2/delegator/deletebuffer/list_delete_buffer.go +++ b/internal/querynodev2/delegator/deletebuffer/list_delete_buffer.go @@ -20,13 +20,16 @@ import ( "sync" "github.com/cockroachdb/errors" + + "github.com/milvus-io/milvus/pkg/metrics" ) -func NewListDeleteBuffer[T timed](startTs uint64, sizePerBlock int64) DeleteBuffer[T] { +func NewListDeleteBuffer[T timed](startTs uint64, sizePerBlock int64, labels []string) DeleteBuffer[T] { return &listDeleteBuffer[T]{ safeTs: startTs, sizePerBlock: sizePerBlock, list: []*cacheBlock[T]{newCacheBlock[T](startTs, sizePerBlock)}, + labels: labels, } } @@ -40,6 +43,18 @@ type listDeleteBuffer[T timed] struct { safeTs uint64 sizePerBlock int64 + + // cached metrics + rowNum int64 + size int64 + + // metrics labels + labels []string +} + +func (b *listDeleteBuffer[T]) updateMetrics() { + metrics.QueryNodeDeleteBufferRowNum.WithLabelValues(b.labels...).Set(float64(b.rowNum)) + metrics.QueryNodeDeleteBufferSize.WithLabelValues(b.labels...).Set(float64(b.size)) } func (b *listDeleteBuffer[T]) Put(entry T) { @@ -51,6 +66,11 @@ func (b *listDeleteBuffer[T]) Put(entry T) { if errors.Is(err, errBufferFull) { b.list = append(b.list, newCacheBlock[T](entry.Timestamp(), b.sizePerBlock, entry)) } + + // update metrics + b.rowNum += entry.EntryNum() + b.size += entry.Size() + b.updateMetrics() } func (b *listDeleteBuffer[T]) ListAfter(ts uint64) []T { @@ -87,9 +107,13 @@ func (b *listDeleteBuffer[T]) TryDiscard(ts uint64) { if nextHead > 0 { for idx := 0; idx < nextHead; idx++ { + rowNum, memSize := b.list[idx].Size() + b.rowNum -= rowNum + b.size -= memSize b.list[idx] = nil } b.list = b.list[nextHead:] + b.updateMetrics() } } @@ -97,10 +121,5 @@ func (b *listDeleteBuffer[T]) Size() (entryNum, memorySize int64) { b.mut.RLock() defer b.mut.RUnlock() - for _, block := range b.list { - blockNum, blockSize := block.Size() - entryNum += blockNum - memorySize += blockSize - } - return entryNum, memorySize + return b.rowNum, b.size } diff --git a/internal/querynodev2/delegator/deletebuffer/list_delete_buffer_test.go b/internal/querynodev2/delegator/deletebuffer/list_delete_buffer_test.go index 84f6d67bc4..0123d30add 100644 --- a/internal/querynodev2/delegator/deletebuffer/list_delete_buffer_test.go +++ b/internal/querynodev2/delegator/deletebuffer/list_delete_buffer_test.go @@ -29,7 +29,7 @@ type ListDeleteBufferSuite struct { } func (s *ListDeleteBufferSuite) TestNewBuffer() { - buffer := NewListDeleteBuffer[*Item](10, 1000) + buffer := NewListDeleteBuffer[*Item](10, 1000, []string{"1", "dml-1"}) s.EqualValues(10, buffer.SafeTs()) @@ -39,7 +39,7 @@ func (s *ListDeleteBufferSuite) TestNewBuffer() { } func (s *ListDeleteBufferSuite) TestCache() { - buffer := NewListDeleteBuffer[*Item](10, 1000) + buffer := NewListDeleteBuffer[*Item](10, 1000, []string{"1", "dml-1"}) buffer.Put(&Item{ Ts: 11, Data: []BufferItem{ @@ -68,7 +68,7 @@ func (s *ListDeleteBufferSuite) TestCache() { } func (s *ListDeleteBufferSuite) TestTryDiscard() { - buffer := NewListDeleteBuffer[*Item](10, 1) + buffer := NewListDeleteBuffer[*Item](10, 1, []string{"1", "dml-1"}) buffer.Put(&Item{ Ts: 10, Data: []BufferItem{ diff --git a/pkg/metrics/querynode_metrics.go b/pkg/metrics/querynode_metrics.go index 613b3e5963..41f7c2873e 100644 --- a/pkg/metrics/querynode_metrics.go +++ b/pkg/metrics/querynode_metrics.go @@ -765,6 +765,30 @@ var ( }, []string{ nodeIDLabelName, }) + + QueryNodeDeleteBufferSize = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryNodeRole, + Name: "delete_buffer_size", + Help: "delegator delete buffer size (in bytes)", + }, []string{ + nodeIDLabelName, + channelNameLabelName, + }, + ) + + QueryNodeDeleteBufferRowNum = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryNodeRole, + Name: "delete_buffer_row_num", + Help: "delegator delete buffer row num", + }, []string{ + nodeIDLabelName, + channelNameLabelName, + }, + ) ) // RegisterQueryNode registers QueryNode metrics @@ -832,6 +856,8 @@ func RegisterQueryNode(registry *prometheus.Registry) { registry.MustRegister(QueryNodeForwardDeleteCost) registry.MustRegister(QueryNodeSegmentPruneRatio) registry.MustRegister(QueryNodeSearchHitSegmentNum) + registry.MustRegister(QueryNodeDeleteBufferSize) + registry.MustRegister(QueryNodeDeleteBufferRowNum) // Add cgo metrics RegisterCGOMetrics(registry) }