diff --git a/internal/rootcoord/quota_center.go b/internal/rootcoord/quota_center.go index 9ee6c46b19..c7e6bc6753 100644 --- a/internal/rootcoord/quota_center.go +++ b/internal/rootcoord/quota_center.go @@ -41,10 +41,12 @@ import ( "github.com/milvus-io/milvus/internal/util/quota" rlinternal "github.com/milvus-io/milvus/internal/util/ratelimitutil" "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/config" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/commonpbutil" "github.com/milvus-io/milvus/pkg/util/metricsinfo" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/ratelimitutil" "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -276,10 +278,18 @@ func (q *QuotaCenter) Start() { }() } +func (q *QuotaCenter) watchQuotaAndLimit() { + pt := paramtable.Get() + pt.Watch(pt.QuotaConfig.QueryNodeMemoryHighWaterLevel.Key, config.NewHandler(pt.QuotaConfig.QueryNodeMemoryHighWaterLevel.Key, func(event *config.Event) { + metrics.QueryNodeMemoryHighWaterLevel.Set(pt.QuotaConfig.QueryNodeMemoryHighWaterLevel.GetAsFloat()) + })) +} + // run starts the service of QuotaCenter. func (q *QuotaCenter) run() { interval := Params.QuotaConfig.QuotaCenterCollectInterval.GetAsDuration(time.Second) log.Info("Start QuotaCenter", zap.Duration("collectInterval", interval)) + q.watchQuotaAndLimit() ticker := time.NewTicker(interval) defer ticker.Stop() for { diff --git a/internal/util/quota/quota_constant.go b/internal/util/quota/quota_constant.go index 0302e1fddc..7f36800bfe 100644 --- a/internal/util/quota/quota_constant.go +++ b/internal/util/quota/quota_constant.go @@ -25,7 +25,9 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/pkg/config" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -82,6 +84,20 @@ func initLimitConfigMaps() { internalpb.RateType_DQLQuery: "aConfig.DQLMaxQueryRatePerPartition, }, } + + pt := paramtable.Get() + pt.Watch(quotaConfig.DMLMaxInsertRate.Key, config.NewHandler(quotaConfig.DMLMaxInsertRate.Key, func(event *config.Event) { + metrics.MaxInsertRate.WithLabelValues(paramtable.GetStringNodeID(), "cluster").Set(quotaConfig.DMLMaxInsertRate.GetAsFloat()) + })) + pt.Watch(quotaConfig.DMLMaxInsertRatePerDB.Key, config.NewHandler(quotaConfig.DMLMaxInsertRatePerDB.Key, func(event *config.Event) { + metrics.MaxInsertRate.WithLabelValues(paramtable.GetStringNodeID(), "db").Set(quotaConfig.DMLMaxInsertRatePerDB.GetAsFloat()) + })) + pt.Watch(quotaConfig.DMLMaxInsertRatePerCollection.Key, config.NewHandler(quotaConfig.DMLMaxInsertRatePerCollection.Key, func(event *config.Event) { + metrics.MaxInsertRate.WithLabelValues(paramtable.GetStringNodeID(), "collection").Set(quotaConfig.DMLMaxInsertRatePerCollection.GetAsFloat()) + })) + pt.Watch(quotaConfig.DMLMaxInsertRatePerPartition.Key, config.NewHandler(quotaConfig.DMLMaxInsertRatePerPartition.Key, func(event *config.Event) { + metrics.MaxInsertRate.WithLabelValues(paramtable.GetStringNodeID(), "partition").Set(quotaConfig.DMLMaxInsertRatePerPartition.GetAsFloat()) + })) }) } diff --git a/pkg/metrics/proxy_metrics.go b/pkg/metrics/proxy_metrics.go index c22bbed15d..0ea7776981 100644 --- a/pkg/metrics/proxy_metrics.go +++ b/pkg/metrics/proxy_metrics.go @@ -381,6 +381,14 @@ var ( Help: "latency which request waits in the queue", Buckets: buckets, // unit: ms }, []string{nodeIDLabelName, functionLabelName}) + + MaxInsertRate = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.ProxyRole, + Name: "max_insert_rate", + Help: "max insert rate", + }, []string{"node_id", "scope"}) ) // RegisterProxy registers Proxy metrics @@ -437,6 +445,8 @@ func RegisterProxy(registry *prometheus.Registry) { registry.MustRegister(ProxySlowQueryCount) registry.MustRegister(ProxyReportValue) registry.MustRegister(ProxyReqInQueueLatency) + + registry.MustRegister(MaxInsertRate) } func CleanupProxyDBMetrics(nodeID int64, dbName string) { diff --git a/pkg/metrics/rootcoord_metrics.go b/pkg/metrics/rootcoord_metrics.go index 4ff9b8c606..217ccefeb0 100644 --- a/pkg/metrics/rootcoord_metrics.go +++ b/pkg/metrics/rootcoord_metrics.go @@ -218,6 +218,14 @@ var ( indexName, isVectorIndex, }) + + QueryNodeMemoryHighWaterLevel = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.RootCoordRole, + Name: "qn_mem_high_water_level", + Help: "querynode memory high water level", + }) ) // RegisterRootCoord registers RootCoord metrics @@ -256,6 +264,8 @@ func RegisterRootCoord(registry *prometheus.Registry) { registry.MustRegister(RootCoordNumEntities) registry.MustRegister(RootCoordIndexedNumEntities) + + registry.MustRegister(QueryNodeMemoryHighWaterLevel) } func CleanupRootCoordDBMetrics(dbName string) {