From 55202ce7d901bc20e651566a7b100218a1aff1ee Mon Sep 17 00:00:00 2001 From: Cai Yudong Date: Fri, 7 Apr 2023 15:34:29 +0800 Subject: [PATCH] Support show knowhere prometheus metrics (#23102) Signed-off-by: yudong.cai --- cmd/roles/roles.go | 14 ++- internal/core/src/segcore/CMakeLists.txt | 1 + internal/core/src/segcore/metrics_c.cpp | 24 +++++ internal/core/src/segcore/metrics_c.h | 23 ++++ pkg/metrics/c_registry.go | 131 +++++++++++++++++++++++ pkg/metrics/metrics_test.go | 22 ++-- pkg/metrics/milvus_registry.go | 52 +++++++++ 7 files changed, 247 insertions(+), 20 deletions(-) create mode 100644 internal/core/src/segcore/metrics_c.cpp create mode 100644 internal/core/src/segcore/metrics_c.h create mode 100644 pkg/metrics/c_registry.go create mode 100644 pkg/metrics/milvus_registry.go diff --git a/cmd/roles/roles.go b/cmd/roles/roles.go index fa931a160f..44b84ba5c9 100644 --- a/cmd/roles/roles.go +++ b/cmd/roles/roles.go @@ -47,14 +47,12 @@ import ( ) // all milvus related metrics is in a separate registry -var Registry *prometheus.Registry +var Registry *metrics.MilvusRegistry func init() { - Registry = prometheus.NewRegistry() - Registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{})) - Registry.MustRegister(prometheus.NewGoCollector()) - metrics.RegisterEtcdMetrics(Registry) - metrics.RegisterMq(Registry) + Registry = metrics.NewMilvusRegistry() + metrics.RegisterEtcdMetrics(Registry.GoRegistry) + metrics.RegisterMq(Registry.GoRegistry) } func stopRocksmq() { @@ -95,7 +93,7 @@ func runComponent[T component](ctx context.Context, wg.Wait() healthz.Register(role) - metricRegister(Registry) + metricRegister(Registry.GoRegistry) return role } @@ -193,7 +191,7 @@ func (mr *MilvusRoles) setupLogger() { } // Register serves prometheus http service -func setupPrometheusHTTPServer(r *prometheus.Registry) { +func setupPrometheusHTTPServer(r *metrics.MilvusRegistry) { http.Register(&http.Handler{ Path: "/metrics", Handler: promhttp.HandlerFor(r, promhttp.HandlerOpts{}), diff --git a/internal/core/src/segcore/CMakeLists.txt b/internal/core/src/segcore/CMakeLists.txt index 3fa8a8e560..fcc478e61c 100644 --- a/internal/core/src/segcore/CMakeLists.txt +++ b/internal/core/src/segcore/CMakeLists.txt @@ -26,6 +26,7 @@ set(SEGCORE_FILES FieldIndexing.cpp InsertRecord.cpp Reduce.cpp + metrics_c.cpp plan_c.cpp reduce_c.cpp load_index_c.cpp diff --git a/internal/core/src/segcore/metrics_c.cpp b/internal/core/src/segcore/metrics_c.cpp new file mode 100644 index 0000000000..e9d0b6a844 --- /dev/null +++ b/internal/core/src/segcore/metrics_c.cpp @@ -0,0 +1,24 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#include + +#include "knowhere/comp/prometheus_client.h" +#include "segcore/metrics_c.h" + +char* +GetKnowhereMetrics() { + auto str = knowhere::prometheusClient->GetMetrics(); + auto len = str.length(); + char* res = (char*)malloc(len + 1); + memcpy(res, str.data(), len); + return res; +} diff --git a/internal/core/src/segcore/metrics_c.h b/internal/core/src/segcore/metrics_c.h new file mode 100644 index 0000000000..8036e94861 --- /dev/null +++ b/internal/core/src/segcore/metrics_c.h @@ -0,0 +1,23 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +char* +GetKnowhereMetrics(); + +#ifdef __cplusplus +} +#endif diff --git a/pkg/metrics/c_registry.go b/pkg/metrics/c_registry.go new file mode 100644 index 0000000000..85b69bede1 --- /dev/null +++ b/pkg/metrics/c_registry.go @@ -0,0 +1,131 @@ +// Copyright 2014 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +/* +#cgo pkg-config: milvus_segcore milvus_common + +#include +#include "segcore/metrics_c.h" + +*/ +import "C" +import ( + "github.com/milvus-io/milvus/pkg/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/expfmt" + "sort" + "strings" + "sync" + "unsafe" + + dto "github.com/prometheus/client_model/go" +) + +// metricSorter is a sortable slice of *dto.Metric. +type metricSorter []*dto.Metric + +func (s metricSorter) Len() int { + return len(s) +} + +func (s metricSorter) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} + +func (s metricSorter) Less(i, j int) bool { + if len(s[i].Label) != len(s[j].Label) { + // This should not happen. The metrics are + // inconsistent. However, we have to deal with the fact, as + // people might use custom collectors or metric family injection + // to create inconsistent metrics. So let's simply compare the + // number of labels in this case. That will still yield + // reproducible sorting. + return len(s[i].Label) < len(s[j].Label) + } + for n, lp := range s[i].Label { + vi := lp.GetValue() + vj := s[j].Label[n].GetValue() + if vi != vj { + return vi < vj + } + } + + // We should never arrive here. Multiple metrics with the same + // label set in the same scrape will lead to undefined ingestion + // behavior. However, as above, we have to provide stable sorting + // here, even for inconsistent metrics. So sort equal metrics + // by their timestamp, with missing timestamps (implying "now") + // coming last. + if s[i].TimestampMs == nil { + return false + } + if s[j].TimestampMs == nil { + return true + } + return s[i].GetTimestampMs() < s[j].GetTimestampMs() +} + +// NormalizeMetricFamilies returns a MetricFamily slice with empty +// MetricFamilies pruned and the remaining MetricFamilies sorted by name within +// the slice, with the contained Metrics sorted within each MetricFamily. +func NormalizeMetricFamilies(metricFamiliesByName map[string]*dto.MetricFamily) []*dto.MetricFamily { + for _, mf := range metricFamiliesByName { + sort.Sort(metricSorter(mf.Metric)) + } + names := make([]string, 0, len(metricFamiliesByName)) + for name, mf := range metricFamiliesByName { + if len(mf.Metric) > 0 { + names = append(names, name) + } + } + sort.Strings(names) + result := make([]*dto.MetricFamily, 0, len(names)) + for _, name := range names { + result = append(result, metricFamiliesByName[name]) + } + return result +} + +func NewCRegistry() *CRegistry { + return &CRegistry{ + Registry: prometheus.NewRegistry(), + } +} + +// only re-write the implementation of Gather() +type CRegistry struct { + *prometheus.Registry + mtx sync.RWMutex +} + +// Gather implements Gatherer. +func (r *CRegistry) Gather() (res []*dto.MetricFamily, err error) { + var ( + parser expfmt.TextParser + ) + + r.mtx.RLock() + cMetricsStr := C.GetKnowhereMetrics() + metricsStr := C.GoString(cMetricsStr) + C.free(unsafe.Pointer(cMetricsStr)) + + out, err := parser.TextToMetricFamilies(strings.NewReader(metricsStr)) + if err != nil { + log.Error("fail to parse prometheus metrics") + return + } + res = NormalizeMetricFamilies(out) + return +} diff --git a/pkg/metrics/metrics_test.go b/pkg/metrics/metrics_test.go index bc5f6c039a..f60099a276 100644 --- a/pkg/metrics/metrics_test.go +++ b/pkg/metrics/metrics_test.go @@ -18,20 +18,18 @@ package metrics import ( "testing" - - "github.com/prometheus/client_golang/prometheus" ) func TestRegisterMetrics(t *testing.T) { - r := prometheus.NewRegistry() + r := NewMilvusRegistry() // Make sure it doesn't panic. - RegisterRootCoord(r) - RegisterDataNode(r) - RegisterDataCoord(r) - RegisterIndexNode(r) - RegisterProxy(r) - RegisterQueryNode(r) - RegisterQueryCoord(r) - RegisterEtcdMetrics(r) - RegisterMq(r) + RegisterRootCoord(r.GoRegistry) + RegisterDataNode(r.GoRegistry) + RegisterDataCoord(r.GoRegistry) + RegisterIndexNode(r.GoRegistry) + RegisterProxy(r.GoRegistry) + RegisterQueryNode(r.GoRegistry) + RegisterQueryCoord(r.GoRegistry) + RegisterEtcdMetrics(r.GoRegistry) + RegisterMq(r.GoRegistry) } diff --git a/pkg/metrics/milvus_registry.go b/pkg/metrics/milvus_registry.go new file mode 100644 index 0000000000..e97fa68058 --- /dev/null +++ b/pkg/metrics/milvus_registry.go @@ -0,0 +1,52 @@ +// Copyright 2014 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" +) + +func NewMilvusRegistry() *MilvusRegistry { + r := &MilvusRegistry{ + GoRegistry: prometheus.NewRegistry(), + CRegistry: NewCRegistry(), + } + + r.GoRegistry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{})) + r.GoRegistry.MustRegister(prometheus.NewGoCollector()) + + return r +} + +// re-write the implementation of Gather() +type MilvusRegistry struct { + GoRegistry *prometheus.Registry + CRegistry *CRegistry +} + +// Gather implements Gatherer. +func (r *MilvusRegistry) Gather() ([]*dto.MetricFamily, error) { + var res []*dto.MetricFamily + res1, err := r.GoRegistry.Gather() + if err != nil { + return res, err + } + res2, err := r.CRegistry.Gather() + if err != nil { + return res, err + } + res = append(res1, res2...) + return res, nil +}