enhance: add jemalloc cached monitor (#46041)

#46133

Signed-off-by: luzhang <luzhang@zilliz.com>
Co-authored-by: luzhang <luzhang@zilliz.com>
This commit is contained in:
zhagnlu 2025-12-09 19:53:13 +08:00 committed by GitHub
parent f9ff0e8402
commit 8f0b7983ec
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 434 additions and 2 deletions

View File

@ -33,10 +33,15 @@ if ( USE_DYNAMIC_SIMD )
add_definitions(-DUSE_DYNAMIC_SIMD)
endif()
if (USE_OPENDAL)
if (USE_OPENDAL)
add_definitions(-DUSE_OPENDAL)
endif()
# Enable jemalloc memory statistics on Linux only
if (LINUX)
add_definitions(-DMILVUS_JEMALLOC_ENABLED)
endif()
project(core)
include(CheckCXXCompilerFlag)
if ( APPLE )

View File

@ -96,6 +96,11 @@ if (ENABLE_GCP_NATIVE)
set(LINK_TARGETS ${LINK_TARGETS} gcp-native-storage)
endif()
# Link jemalloc library for mallctl API access (Linux only)
if (LINUX)
set(LINK_TARGETS ${LINK_TARGETS} jemalloc)
endif()
target_link_libraries(milvus_core ${LINK_TARGETS})
install(TARGETS milvus_core DESTINATION "${CMAKE_INSTALL_LIBDIR}")

View File

@ -10,4 +10,10 @@
# or implied. See the License for the specific language governing permissions and limitations under the License
add_source_at_current_directory_recursively()
add_library(milvus_monitor OBJECT ${SOURCE_FILES})
add_library(milvus_monitor OBJECT ${SOURCE_FILES})
# jemalloc_stats_c.cpp needs jemalloc headers on Linux, ensure jemalloc is built first
if (LINUX)
add_dependencies(milvus_monitor jemalloc)
target_include_directories(milvus_monitor PRIVATE ${CMAKE_INSTALL_PREFIX}/include)
endif()

View File

@ -0,0 +1,108 @@
// Copyright 2025 Zilliz
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "monitor/jemalloc_stats_c.h"
#ifdef MILVUS_JEMALLOC_ENABLED
#include <jemalloc/jemalloc.h>
#include <cstddef>
#include <cstring>
#endif
JemallocStats
GetJemallocStats() {
JemallocStats stats;
// initialize all fields to zero
#ifdef MILVUS_JEMALLOC_ENABLED
std::memset(&stats, 0, sizeof(JemallocStats));
#else
stats.allocated = 0;
stats.active = 0;
stats.metadata = 0;
stats.resident = 0;
stats.mapped = 0;
stats.retained = 0;
stats.fragmentation = 0;
stats.overhead = 0;
#endif
stats.success = false;
#ifdef MILVUS_JEMALLOC_ENABLED
// refresh jemalloc stats epoch to get current values
uint64_t epoch = 1;
size_t epoch_sz = sizeof(epoch);
if (mallctl("epoch", &epoch, &epoch_sz, &epoch, epoch_sz) != 0) {
return stats;
}
size_t sz = sizeof(size_t);
size_t allocated = 0;
size_t active = 0;
size_t metadata = 0;
size_t resident = 0;
size_t mapped = 0;
size_t retained = 0;
// get allocated bytes (total allocated by application)
if (mallctl("stats.allocated", &allocated, &sz, nullptr, 0) != 0) {
return stats;
}
// get active bytes (in active pages, includes fragmentation)
if (mallctl("stats.active", &active, &sz, nullptr, 0) != 0) {
return stats;
}
// get metadata bytes (jemalloc's own overhead)
if (mallctl("stats.metadata", &metadata, &sz, nullptr, 0) != 0) {
return stats;
}
// get resident bytes (physically resident memory, RSS)
if (mallctl("stats.resident", &resident, &sz, nullptr, 0) != 0) {
return stats;
}
// get mapped bytes (total virtual memory mappings)
if (mallctl("stats.mapped", &mapped, &sz, nullptr, 0) != 0) {
return stats;
}
// get retained bytes (virtual memory that could be returned to OS)
if (mallctl("stats.retained", &retained, &sz, nullptr, 0) != 0) {
return stats;
}
// store core metrics
stats.allocated = static_cast<uint64_t>(allocated);
stats.active = static_cast<uint64_t>(active);
stats.metadata = static_cast<uint64_t>(metadata);
stats.resident = static_cast<uint64_t>(resident);
stats.mapped = static_cast<uint64_t>(mapped);
stats.retained = static_cast<uint64_t>(retained);
// fragmentation: wasted space within allocated pages
stats.fragmentation = (active > allocated) ? (active - allocated) : 0;
// overhead: additional memory for page alignment and metadata
stats.overhead = (resident > active) ? (resident - active) : 0;
stats.success = true;
#else
// on platforms without jemalloc (e.g., macOS), return empty stats
// with success = false to indicate jemalloc is not available
#endif
return stats;
}

View File

@ -0,0 +1,49 @@
// Copyright 2025 Zilliz
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <stdint.h>
#include <stdbool.h>
#ifdef __cplusplus
extern "C" {
#endif
// JemallocStats contains comprehensive jemalloc memory statistics
// All sizes are in bytes
typedef struct {
// Core memory metrics
uint64_t allocated; // Total bytes allocated by the application
uint64_t active; // Total bytes in active pages (includes fragmentation)
uint64_t metadata; // Total bytes dedicated to jemalloc metadata
uint64_t resident; // Total bytes in physically resident data pages (RSS)
uint64_t mapped; // Total bytes in virtual memory mappings
uint64_t
retained; // Total bytes in retained virtual memory (could be returned to OS)
// Derived metrics for convenience
uint64_t fragmentation; // Internal fragmentation (active - allocated)
uint64_t overhead; // Memory overhead (resident - active)
// Status flags
bool success; // Whether stats were successfully retrieved
} JemallocStats;
JemallocStats
GetJemallocStats();
#ifdef __cplusplus
}
#endif

View File

@ -268,6 +268,9 @@ func getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest,
log.Ctx(ctx).Warn("get iowait failed", zap.Error(err))
}
// Get jemalloc memory statistics
jemallocStats := hardware.GetJemallocStats()
hardwareInfos := metricsinfo.HardwareMetrics{
IP: node.session.Address,
CPUCoreCount: hardware.GetCPUNum(),
@ -277,6 +280,16 @@ func getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest,
Disk: totalDiskGB,
DiskUsage: usedDiskGB,
IOWaitPercentage: ioWait,
// Jemalloc memory statistics (comprehensive metrics)
JemallocAllocated: jemallocStats.Allocated,
JemallocActive: jemallocStats.Active,
JemallocMetadata: jemallocStats.Metadata,
JemallocResident: jemallocStats.Resident,
JemallocMapped: jemallocStats.Mapped,
JemallocRetained: jemallocStats.Retained,
JemallocFragmentation: jemallocStats.Fragmentation,
JemallocOverhead: jemallocStats.Overhead,
JemallocSuccess: jemallocStats.Success,
}
quotaMetrics, err := getQuotaMetrics(node)

View File

@ -24,6 +24,7 @@ package metrics
#include <stdlib.h>
#include "segcore/metrics_c.h"
#include "monitor/monitor_c.h"
#include "monitor/jemalloc_stats_c.h"
*/
import "C"
@ -32,6 +33,7 @@ import (
"sort"
"strings"
"sync"
"time"
"unsafe"
"github.com/prometheus/client_golang/prometheus"
@ -39,6 +41,7 @@ import (
"github.com/prometheus/common/expfmt"
"go.uber.org/zap"
"golang.org/x/exp/maps"
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus/pkg/v2/log"
)
@ -108,6 +111,18 @@ func NormalizeMetricFamilies(metricFamiliesByName map[string]*dto.MetricFamily)
return result
}
// Jemalloc metrics cache to avoid frequent C calls
var (
jemallocMetricsCache struct {
sync.RWMutex
metrics map[string]*dto.MetricFamily
timestamp time.Time
}
// Cache TTL: 10 seconds to balance performance and data freshness
// This reduces mallctl("epoch") calls from every scrape to once per 10s
jemallocMetricsCacheTTL = 10 * time.Second
)
func NewCRegistry() *CRegistry {
return &CRegistry{
Registry: prometheus.NewRegistry(),
@ -125,6 +140,8 @@ func (r *CRegistry) Gather() (res []*dto.MetricFamily, err error) {
var parser expfmt.TextParser
r.mtx.RLock()
defer r.mtx.RUnlock()
cMetricsStr := C.GetKnowhereMetrics()
metricsStr := C.GoString(cMetricsStr)
C.free(unsafe.Pointer(cMetricsStr))
@ -146,6 +163,89 @@ func (r *CRegistry) Gather() (res []*dto.MetricFamily, err error) {
}
maps.Copy(out, out1)
// Add jemalloc stats metrics
jemallocMetrics := gatherJemallocMetrics()
for name, mf := range jemallocMetrics {
out[name] = mf
}
res = NormalizeMetricFamilies(out)
return
}
// gatherJemallocMetrics collects comprehensive jemalloc stats and returns them as metric families.
// Uses a 10-second cache to avoid expensive mallctl("epoch") calls on every Prometheus scrape.
func gatherJemallocMetrics() map[string]*dto.MetricFamily {
// Fast path: check if cache is still valid
jemallocMetricsCache.RLock()
if time.Since(jemallocMetricsCache.timestamp) < jemallocMetricsCacheTTL && jemallocMetricsCache.metrics != nil {
cached := jemallocMetricsCache.metrics
jemallocMetricsCache.RUnlock()
log.Debug("using cached jemalloc metrics",
zap.Duration("age", time.Since(jemallocMetricsCache.timestamp)))
return cached
}
jemallocMetricsCache.RUnlock()
// Slow path: cache expired, collect fresh metrics from C
// This involves expensive mallctl("epoch") call which can take 100-5000μs
result := make(map[string]*dto.MetricFamily)
cStats := C.GetJemallocStats()
if !bool(cStats.success) {
log.Debug("jemalloc stats not available (may be running on macOS or jemalloc is disabled)")
return result
}
gaugeType := dto.MetricType_GAUGE
// Helper function to create a gauge metric family
createGaugeFamily := func(name, help string, value float64) *dto.MetricFamily {
return &dto.MetricFamily{
Name: proto.String(name),
Help: proto.String(help),
Type: &gaugeType,
Metric: []*dto.Metric{
{
Gauge: &dto.Gauge{
Value: proto.Float64(value),
},
},
},
}
}
// Define all jemalloc metrics (8 comprehensive metrics)
metrics := []struct {
name string
help string
value uint64
}{
// Core metrics from jemalloc
{"milvus_jemalloc_allocated_bytes", "Total number of bytes allocated by the application", uint64(cStats.allocated)},
{"milvus_jemalloc_active_bytes", "Total number of bytes in active pages allocated by the application (includes fragmentation)", uint64(cStats.active)},
{"milvus_jemalloc_metadata_bytes", "Total number of bytes dedicated to jemalloc metadata", uint64(cStats.metadata)},
{"milvus_jemalloc_resident_bytes", "Total number of bytes in physically resident data pages mapped by the allocator", uint64(cStats.resident)},
{"milvus_jemalloc_mapped_bytes", "Total number of bytes in virtual memory mappings", uint64(cStats.mapped)},
{"milvus_jemalloc_retained_bytes", "Total number of bytes in retained virtual memory mappings (could be returned to OS)", uint64(cStats.retained)},
// Derived metrics (calculated in C code)
{"milvus_jemalloc_fragmentation_bytes", "Internal fragmentation in bytes (active - allocated)", uint64(cStats.fragmentation)},
{"milvus_jemalloc_overhead_bytes", "Memory overhead in bytes (resident - active)", uint64(cStats.overhead)},
}
for _, m := range metrics {
result[m.name] = createGaugeFamily(m.name, m.help, float64(m.value))
}
// Update cache with fresh metrics
jemallocMetricsCache.Lock()
jemallocMetricsCache.metrics = result
jemallocMetricsCache.timestamp = time.Now()
jemallocMetricsCache.Unlock()
log.Debug("refreshed jemalloc metrics cache",
zap.Int("num_metrics", len(result)))
return result
}

View File

@ -0,0 +1,60 @@
// Copyright (C) 2019-2025 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
//go:build linux || darwin
package hardware
/*
#cgo pkg-config: milvus_core
#include <stdlib.h>
#include "monitor/jemalloc_stats_c.h"
*/
import "C"
// JemallocStats represents comprehensive jemalloc memory statistics
// All sizes are in bytes
type JemallocStats struct {
// Core memory metrics (from jemalloc)
Allocated uint64 // Total bytes allocated by the application
Active uint64 // Total bytes in active pages (includes fragmentation)
Metadata uint64 // Total bytes dedicated to jemalloc metadata
Resident uint64 // Total bytes in physically resident data pages (RSS)
Mapped uint64 // Total bytes in virtual memory mappings
Retained uint64 // Total bytes in retained virtual memory (could be returned to OS)
// Derived metrics (calculated by C code)
Fragmentation uint64 // Internal fragmentation (active - allocated)
Overhead uint64 // Memory overhead (resident - active)
// Status
Success bool // Whether stats were successfully retrieved
}
// GetJemallocStats retrieves comprehensive jemalloc memory statistics
// Returns JemallocStats with detailed memory information
// On platforms without jemalloc support, all metrics will be 0 and Success will be false
func GetJemallocStats() JemallocStats {
cStats := C.GetJemallocStats()
return JemallocStats{
Allocated: uint64(cStats.allocated),
Active: uint64(cStats.active),
Metadata: uint64(cStats.metadata),
Resident: uint64(cStats.resident),
Mapped: uint64(cStats.mapped),
Retained: uint64(cStats.retained),
Fragmentation: uint64(cStats.fragmentation),
Overhead: uint64(cStats.overhead),
Success: bool(cStats.success),
}
}

View File

@ -0,0 +1,70 @@
// Copyright (C) 2019-2025 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package hardware
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestGetJemallocStats(t *testing.T) {
stats := GetJemallocStats()
// The test should always succeed, whether jemalloc is available or not
if stats.Success {
// If jemalloc is available, verify the stats make sense
assert.GreaterOrEqual(t, stats.Active, stats.Allocated,
"Active memory should be >= allocated memory (includes fragmentation)")
assert.GreaterOrEqual(t, stats.Resident, stats.Active,
"Resident memory should be >= active memory")
assert.GreaterOrEqual(t, stats.Mapped, stats.Resident,
"Mapped memory should be >= resident memory")
// Verify derived metrics
expectedFragmentation := uint64(0)
if stats.Active > stats.Allocated {
expectedFragmentation = stats.Active - stats.Allocated
}
assert.Equal(t, expectedFragmentation, stats.Fragmentation,
"Fragmentation should equal active - allocated")
expectedOverhead := uint64(0)
if stats.Resident > stats.Active {
expectedOverhead = stats.Resident - stats.Active
}
assert.Equal(t, expectedOverhead, stats.Overhead,
"Overhead should equal resident - active")
t.Logf("Jemalloc stats (all 8 metrics):")
t.Logf(" Allocated: %d bytes (%.2f MB)", stats.Allocated, float64(stats.Allocated)/1024/1024)
t.Logf(" Active: %d bytes (%.2f MB)", stats.Active, float64(stats.Active)/1024/1024)
t.Logf(" Metadata: %d bytes (%.2f MB)", stats.Metadata, float64(stats.Metadata)/1024/1024)
t.Logf(" Resident: %d bytes (%.2f MB)", stats.Resident, float64(stats.Resident)/1024/1024)
t.Logf(" Mapped: %d bytes (%.2f MB)", stats.Mapped, float64(stats.Mapped)/1024/1024)
t.Logf(" Retained: %d bytes (%.2f MB)", stats.Retained, float64(stats.Retained)/1024/1024)
t.Logf(" Fragmentation: %d bytes (%.2f MB)", stats.Fragmentation, float64(stats.Fragmentation)/1024/1024)
t.Logf(" Overhead: %d bytes (%.2f MB)", stats.Overhead, float64(stats.Overhead)/1024/1024)
} else {
// If jemalloc is not available, all values should be zero
assert.Equal(t, uint64(0), stats.Allocated, "Allocated should be 0 when jemalloc is unavailable")
assert.Equal(t, uint64(0), stats.Active, "Active should be 0 when jemalloc is unavailable")
assert.Equal(t, uint64(0), stats.Metadata, "Metadata should be 0 when jemalloc is unavailable")
assert.Equal(t, uint64(0), stats.Resident, "Resident should be 0 when jemalloc is unavailable")
assert.Equal(t, uint64(0), stats.Mapped, "Mapped should be 0 when jemalloc is unavailable")
assert.Equal(t, uint64(0), stats.Retained, "Retained should be 0 when jemalloc is unavailable")
assert.Equal(t, uint64(0), stats.Fragmentation, "Fragmentation should be 0 when jemalloc is unavailable")
assert.Equal(t, uint64(0), stats.Overhead, "Overhead should be 0 when jemalloc is unavailable")
t.Log("Jemalloc is not available on this system (e.g., macOS)")
}
}

View File

@ -44,6 +44,22 @@ type HardwareMetrics struct {
DiskUsage float64 `json:"disk_usage"`
IOWaitPercentage float64 `json:"io_wait_percentage"` // IO Wait in %
// Jemalloc memory statistics (all sizes in bytes)
// Core metrics from jemalloc
JemallocAllocated uint64 `json:"jemalloc_allocated,omitempty"` // Total bytes allocated by the application
JemallocActive uint64 `json:"jemalloc_active,omitempty"` // Total bytes in active pages (includes fragmentation)
JemallocMetadata uint64 `json:"jemalloc_metadata,omitempty"` // Total bytes dedicated to jemalloc metadata
JemallocResident uint64 `json:"jemalloc_resident,omitempty"` // Total bytes in physically resident data pages (RSS)
JemallocMapped uint64 `json:"jemalloc_mapped,omitempty"` // Total bytes in virtual memory mappings
JemallocRetained uint64 `json:"jemalloc_retained,omitempty"` // Total bytes in retained virtual memory (could be returned to OS)
// Derived metrics (calculated)
JemallocFragmentation uint64 `json:"jemalloc_fragmentation,omitempty"` // Internal fragmentation (active - allocated)
JemallocOverhead uint64 `json:"jemalloc_overhead,omitempty"` // Memory overhead (resident - active)
// Status
JemallocSuccess bool `json:"jemalloc_success,omitempty"` // Whether jemalloc stats were successfully retrieved
}
type TaskQueueMetrics struct {