From 8ed019735ce8603f2ee77d00fffe9f589e492ab7 Mon Sep 17 00:00:00 2001 From: jaime Date: Fri, 6 Dec 2024 16:32:41 +0800 Subject: [PATCH] enhance: add disk stats within system metrics (#38033) issue: ##36621 Signed-off-by: jaime --- internal/datacoord/metrics_info.go | 25 +++++-- internal/datanode/metrics_info.go | 31 +++++--- internal/indexnode/metrics_info.go | 30 ++++++-- internal/proxy/metrics_info.go | 95 ++++++++++++------------- internal/querycoordv2/handlers.go | 25 +++++-- internal/querynodev2/metrics_info.go | 26 +++++-- internal/rootcoord/metrics_info.go | 28 ++++++-- pkg/util/hardware/hardware_info.go | 28 +++++--- pkg/util/hardware/hardware_info_test.go | 15 ++-- pkg/util/metricsinfo/metrics_info.go | 6 +- 10 files changed, 202 insertions(+), 107 deletions(-) diff --git a/internal/datacoord/metrics_info.go b/internal/datacoord/metrics_info.go index 9e83e7d289..0dd5c8cd72 100644 --- a/internal/datacoord/metrics_info.go +++ b/internal/datacoord/metrics_info.go @@ -261,17 +261,28 @@ func (s *Server) getSystemInfoMetrics( // getDataCoordMetrics composes datacoord infos func (s *Server) getDataCoordMetrics(ctx context.Context) metricsinfo.DataCoordInfos { + used, total, err := hardware.GetDiskUsage(paramtable.Get().LocalStorageCfg.Path.GetValue()) + if err != nil { + log.Ctx(ctx).Warn("get disk usage failed", zap.Error(err)) + } + + ioWait, err := hardware.GetIOWait() + if err != nil { + log.Ctx(ctx).Warn("get iowait failed", zap.Error(err)) + } + ret := metricsinfo.DataCoordInfos{ BaseComponentInfos: metricsinfo.BaseComponentInfos{ Name: metricsinfo.ConstructComponentName(typeutil.DataCoordRole, paramtable.GetNodeID()), HardwareInfos: metricsinfo.HardwareMetrics{ - IP: s.session.GetAddress(), - CPUCoreCount: hardware.GetCPUNum(), - CPUCoreUsage: hardware.GetCPUUsage(), - Memory: hardware.GetMemoryCount(), - MemoryUsage: hardware.GetUsedMemoryCount(), - Disk: hardware.GetDiskCount(), - DiskUsage: hardware.GetDiskUsage(), + IP: s.session.GetAddress(), + CPUCoreCount: hardware.GetCPUNum(), + CPUCoreUsage: hardware.GetCPUUsage(), + Memory: hardware.GetMemoryCount(), + MemoryUsage: hardware.GetUsedMemoryCount(), + Disk: total, + DiskUsage: used, + IOWaitPercentage: ioWait, }, SystemInfo: metricsinfo.DeployMetrics{}, CreatedTime: paramtable.GetCreateTime().String(), diff --git a/internal/datanode/metrics_info.go b/internal/datanode/metrics_info.go index 7e59297c0a..4872b6f6c5 100644 --- a/internal/datanode/metrics_info.go +++ b/internal/datanode/metrics_info.go @@ -19,8 +19,11 @@ package datanode import ( "context" + "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus/internal/flushcommon/util" + "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/hardware" "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -65,7 +68,7 @@ func (node *DataNode) getQuotaMetrics() (*metricsinfo.DataNodeQuotaMetrics, erro }, nil } -func (node *DataNode) getSystemInfoMetrics(_ context.Context, _ *milvuspb.GetMetricsRequest) (string, error) { +func (node *DataNode) getSystemInfoMetrics(ctx context.Context, _ *milvuspb.GetMetricsRequest) (string, error) { // TODO(dragondriver): add more metrics usedMem := hardware.GetUsedMemoryCount() totalMem := hardware.GetMemoryCount() @@ -74,14 +77,26 @@ func (node *DataNode) getSystemInfoMetrics(_ context.Context, _ *milvuspb.GetMet if err != nil { return "", err } + + used, total, err := hardware.GetDiskUsage(paramtable.Get().LocalStorageCfg.Path.GetValue()) + if err != nil { + log.Ctx(ctx).Warn("get disk usage failed", zap.Error(err)) + } + + ioWait, err := hardware.GetIOWait() + if err != nil { + log.Ctx(ctx).Warn("get iowait failed", zap.Error(err)) + } + hardwareMetrics := metricsinfo.HardwareMetrics{ - IP: node.session.Address, - CPUCoreCount: hardware.GetCPUNum(), - CPUCoreUsage: hardware.GetCPUUsage(), - Memory: totalMem, - MemoryUsage: usedMem, - Disk: hardware.GetDiskCount(), - DiskUsage: hardware.GetDiskUsage(), + IP: node.session.Address, + CPUCoreCount: hardware.GetCPUNum(), + CPUCoreUsage: hardware.GetCPUUsage(), + Memory: totalMem, + MemoryUsage: usedMem, + Disk: total, + DiskUsage: used, + IOWaitPercentage: ioWait, } quotaMetrics.Hms = hardwareMetrics diff --git a/internal/indexnode/metrics_info.go b/internal/indexnode/metrics_info.go index 2eb6eef049..55ae1ff498 100644 --- a/internal/indexnode/metrics_info.go +++ b/internal/indexnode/metrics_info.go @@ -19,7 +19,10 @@ package indexnode import ( "context" + "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/hardware" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metricsinfo" @@ -34,17 +37,30 @@ func getSystemInfoMetrics( node *IndexNode, ) (*milvuspb.GetMetricsResponse, error) { // TODO(dragondriver): add more metrics + + used, total, err := hardware.GetDiskUsage(paramtable.Get().LocalStorageCfg.Path.GetValue()) + if err != nil { + log.Ctx(ctx).Warn("get disk usage failed", zap.Error(err)) + } + + iowait, err := hardware.GetIOWait() + if err != nil { + log.Ctx(ctx).Warn("get iowait failed", zap.Error(err)) + } + nodeInfos := metricsinfo.IndexNodeInfos{ BaseComponentInfos: metricsinfo.BaseComponentInfos{ Name: metricsinfo.ConstructComponentName(typeutil.IndexNodeRole, paramtable.GetNodeID()), + HardwareInfos: metricsinfo.HardwareMetrics{ - IP: node.session.Address, - CPUCoreCount: hardware.GetCPUNum(), - CPUCoreUsage: hardware.GetCPUUsage(), - Memory: hardware.GetMemoryCount(), - MemoryUsage: hardware.GetUsedMemoryCount(), - Disk: hardware.GetDiskCount(), - DiskUsage: hardware.GetDiskUsage(), + IP: node.session.Address, + CPUCoreCount: hardware.GetCPUNum(), + CPUCoreUsage: hardware.GetCPUUsage(), + Memory: hardware.GetMemoryCount(), + MemoryUsage: hardware.GetUsedMemoryCount(), + Disk: total, + DiskUsage: used, + IOWaitPercentage: iowait, }, SystemInfo: metricsinfo.DeployMetrics{}, CreatedTime: paramtable.GetCreateTime().String(), diff --git a/internal/proxy/metrics_info.go b/internal/proxy/metrics_info.go index ba1bad9f15..3c31f33617 100644 --- a/internal/proxy/metrics_info.go +++ b/internal/proxy/metrics_info.go @@ -20,8 +20,11 @@ import ( "context" "sync" + "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/hardware" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metricsinfo" @@ -82,25 +85,54 @@ func getQuotaMetrics() (*metricsinfo.ProxyQuotaMetrics, error) { // getProxyMetrics get metrics of Proxy, not including the topological metrics of Query cluster and Data cluster. func getProxyMetrics(ctx context.Context, request *milvuspb.GetMetricsRequest, node *Proxy) (*milvuspb.GetMetricsResponse, error) { - totalMem := hardware.GetMemoryCount() - usedMem := hardware.GetUsedMemoryCount() quotaMetrics, err := getQuotaMetrics() if err != nil { return nil, err } - hardwareMetrics := metricsinfo.HardwareMetrics{ - IP: node.session.Address, - CPUCoreCount: hardware.GetCPUNum(), - CPUCoreUsage: hardware.GetCPUUsage(), - Memory: totalMem, - MemoryUsage: usedMem, - Disk: hardware.GetDiskCount(), - DiskUsage: hardware.GetDiskUsage(), + proxyMetricInfo := getProxyMetricInfo(ctx, node, quotaMetrics) + proxyRoleName := metricsinfo.ConstructComponentName(typeutil.ProxyRole, paramtable.GetNodeID()) + + resp, err := metricsinfo.MarshalComponentInfos(proxyMetricInfo) + if err != nil { + return nil, err + } + + return &milvuspb.GetMetricsResponse{ + Status: merr.Success(), + Response: resp, + ComponentName: proxyRoleName, + }, nil +} + +func getProxyMetricInfo(ctx context.Context, node *Proxy, quotaMetrics *metricsinfo.ProxyQuotaMetrics) *metricsinfo.ProxyInfos { + totalMem := hardware.GetMemoryCount() + usedMem := hardware.GetUsedMemoryCount() + used, total, err := hardware.GetDiskUsage(paramtable.Get().LocalStorageCfg.Path.GetValue()) + if err != nil { + log.Ctx(ctx).Warn("get disk usage failed", zap.Error(err)) + } + + ioWait, err := hardware.GetIOWait() + if err != nil { + log.Ctx(ctx).Warn("get iowait failed", zap.Error(err)) + } + hardwareMetrics := metricsinfo.HardwareMetrics{ + IP: node.session.Address, + CPUCoreCount: hardware.GetCPUNum(), + CPUCoreUsage: hardware.GetCPUUsage(), + Memory: totalMem, + MemoryUsage: usedMem, + Disk: total, + DiskUsage: used, + IOWaitPercentage: ioWait, + } + + if quotaMetrics != nil { + quotaMetrics.Hms = hardwareMetrics } - quotaMetrics.Hms = hardwareMetrics proxyRoleName := metricsinfo.ConstructComponentName(typeutil.ProxyRole, paramtable.GetNodeID()) - proxyMetricInfo := metricsinfo.ProxyInfos{ + return &metricsinfo.ProxyInfos{ BaseComponentInfos: metricsinfo.BaseComponentInfos{ HasError: false, Name: proxyRoleName, @@ -117,17 +149,6 @@ func getProxyMetrics(ctx context.Context, request *milvuspb.GetMetricsRequest, n }, QuotaMetrics: quotaMetrics, } - - resp, err := metricsinfo.MarshalComponentInfos(proxyMetricInfo) - if err != nil { - return nil, err - } - - return &milvuspb.GetMetricsResponse{ - Status: merr.Success(), - Response: resp, - ComponentName: metricsinfo.ConstructComponentName(typeutil.ProxyRole, paramtable.GetNodeID()), - }, nil } // getSystemInfoMetrics returns the system information metrics. @@ -146,34 +167,12 @@ func getSystemInfoMetrics( proxyRoleName := metricsinfo.ConstructComponentName(typeutil.ProxyRole, paramtable.GetNodeID()) identifierMap[proxyRoleName] = int(node.session.ServerID) + // FIXME:All proxy metrics are retrieved from RootCoord, while single proxy metrics are obtained from the proxy itself. + proxyInfo := getProxyMetricInfo(ctx, node, nil) proxyTopologyNode := metricsinfo.SystemTopologyNode{ Identifier: int(node.session.ServerID), Connected: make([]metricsinfo.ConnectionEdge, 0), - Infos: &metricsinfo.ProxyInfos{ - BaseComponentInfos: metricsinfo.BaseComponentInfos{ - HasError: false, - ErrorReason: "", - Name: proxyRoleName, - HardwareInfos: metricsinfo.HardwareMetrics{ - IP: node.session.Address, - CPUCoreCount: hardware.GetCPUNum(), - CPUCoreUsage: hardware.GetCPUUsage(), - Memory: hardware.GetMemoryCount(), - MemoryUsage: hardware.GetUsedMemoryCount(), - Disk: hardware.GetDiskCount(), - DiskUsage: hardware.GetDiskUsage(), - }, - SystemInfo: metricsinfo.DeployMetrics{}, - CreatedTime: paramtable.GetCreateTime().String(), - UpdatedTime: paramtable.GetUpdateTime().String(), - Type: typeutil.ProxyRole, - ID: node.session.ServerID, - }, - SystemConfigurations: metricsinfo.ProxyConfiguration{ - DefaultPartitionName: Params.CommonCfg.DefaultPartitionName.GetValue(), - DefaultIndexName: Params.CommonCfg.DefaultIndexName.GetValue(), - }, - }, + Infos: proxyInfo, } metricsinfo.FillDeployMetricsWithEnv(&(proxyTopologyNode.Infos.(*metricsinfo.ProxyInfos).SystemInfo)) diff --git a/internal/querycoordv2/handlers.go b/internal/querycoordv2/handlers.go index d770edc1e6..c6b2e92dfe 100644 --- a/internal/querycoordv2/handlers.go +++ b/internal/querycoordv2/handlers.go @@ -328,18 +328,29 @@ func (s *Server) getSystemInfoMetrics( ctx context.Context, req *milvuspb.GetMetricsRequest, ) (string, error) { + used, total, err := hardware.GetDiskUsage(paramtable.Get().LocalStorageCfg.Path.GetValue()) + if err != nil { + log.Ctx(ctx).Warn("get disk usage failed", zap.Error(err)) + } + + ioWait, err := hardware.GetIOWait() + if err != nil { + log.Ctx(ctx).Warn("get iowait failed", zap.Error(err)) + } + clusterTopology := metricsinfo.QueryClusterTopology{ Self: metricsinfo.QueryCoordInfos{ BaseComponentInfos: metricsinfo.BaseComponentInfos{ Name: metricsinfo.ConstructComponentName(typeutil.QueryCoordRole, paramtable.GetNodeID()), HardwareInfos: metricsinfo.HardwareMetrics{ - IP: s.session.GetAddress(), - CPUCoreCount: hardware.GetCPUNum(), - CPUCoreUsage: hardware.GetCPUUsage(), - Memory: hardware.GetMemoryCount(), - MemoryUsage: hardware.GetUsedMemoryCount(), - Disk: hardware.GetDiskCount(), - DiskUsage: hardware.GetDiskUsage(), + IP: s.session.GetAddress(), + CPUCoreCount: hardware.GetCPUNum(), + CPUCoreUsage: hardware.GetCPUUsage(), + Memory: hardware.GetMemoryCount(), + MemoryUsage: hardware.GetUsedMemoryCount(), + Disk: total, + DiskUsage: used, + IOWaitPercentage: ioWait, }, SystemInfo: metricsinfo.DeployMetrics{}, CreatedTime: paramtable.GetCreateTime().String(), diff --git a/internal/querynodev2/metrics_info.go b/internal/querynodev2/metrics_info.go index e2ca77f512..ef974517f3 100644 --- a/internal/querynodev2/metrics_info.go +++ b/internal/querynodev2/metrics_info.go @@ -230,14 +230,26 @@ func getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest, if err != nil { return "", err } + + used, total, err := hardware.GetDiskUsage(paramtable.Get().LocalStorageCfg.Path.GetValue()) + if err != nil { + log.Ctx(ctx).Warn("get disk usage failed", zap.Error(err)) + } + + ioWait, err := hardware.GetIOWait() + if err != nil { + log.Ctx(ctx).Warn("get iowait failed", zap.Error(err)) + } + hardwareInfos := metricsinfo.HardwareMetrics{ - IP: node.session.Address, - CPUCoreCount: hardware.GetCPUNum(), - CPUCoreUsage: hardware.GetCPUUsage(), - Memory: totalMem, - MemoryUsage: usedMem, - Disk: hardware.GetDiskCount(), - DiskUsage: hardware.GetDiskUsage(), + IP: node.session.Address, + CPUCoreCount: hardware.GetCPUNum(), + CPUCoreUsage: hardware.GetCPUUsage(), + Memory: totalMem, + MemoryUsage: usedMem, + Disk: total, + DiskUsage: used, + IOWaitPercentage: ioWait, } quotaMetrics.Hms = hardwareInfos diff --git a/internal/rootcoord/metrics_info.go b/internal/rootcoord/metrics_info.go index c7721ce7c3..0a9e8deebf 100644 --- a/internal/rootcoord/metrics_info.go +++ b/internal/rootcoord/metrics_info.go @@ -19,7 +19,10 @@ package rootcoord import ( "context" + "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/hardware" "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -27,18 +30,29 @@ import ( ) func (c *Core) getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (string, error) { + used, total, err := hardware.GetDiskUsage(paramtable.Get().LocalStorageCfg.Path.GetValue()) + if err != nil { + log.Ctx(ctx).Warn("get disk usage failed", zap.Error(err)) + } + + ioWait, err := hardware.GetIOWait() + if err != nil { + log.Ctx(ctx).Warn("get iowait failed", zap.Error(err)) + } + rootCoordTopology := metricsinfo.RootCoordTopology{ Self: metricsinfo.RootCoordInfos{ BaseComponentInfos: metricsinfo.BaseComponentInfos{ Name: metricsinfo.ConstructComponentName(typeutil.RootCoordRole, c.session.ServerID), HardwareInfos: metricsinfo.HardwareMetrics{ - IP: c.session.Address, - CPUCoreCount: hardware.GetCPUNum(), - CPUCoreUsage: hardware.GetCPUUsage(), - Memory: hardware.GetMemoryCount(), - MemoryUsage: hardware.GetUsedMemoryCount(), - Disk: hardware.GetDiskCount(), - DiskUsage: hardware.GetDiskUsage(), + IP: c.session.Address, + CPUCoreCount: hardware.GetCPUNum(), + CPUCoreUsage: hardware.GetCPUUsage(), + Memory: hardware.GetMemoryCount(), + MemoryUsage: hardware.GetUsedMemoryCount(), + Disk: total, + DiskUsage: used, + IOWaitPercentage: ioWait, }, SystemInfo: metricsinfo.DeployMetrics{}, CreatedTime: paramtable.GetCreateTime().String(), diff --git a/pkg/util/hardware/hardware_info.go b/pkg/util/hardware/hardware_info.go index 9b7c5c947c..38fb6e220e 100644 --- a/pkg/util/hardware/hardware_info.go +++ b/pkg/util/hardware/hardware_info.go @@ -18,6 +18,7 @@ import ( "sync" "github.com/shirou/gopsutil/v3/cpu" + "github.com/shirou/gopsutil/v3/disk" "github.com/shirou/gopsutil/v3/mem" "go.uber.org/automaxprocs/maxprocs" "go.uber.org/zap" @@ -102,16 +103,27 @@ func GetFreeMemoryCount() uint64 { return GetMemoryCount() - GetUsedMemoryCount() } -// TODO(dragondriver): not accurate to calculate disk usage when we use distributed storage - -// GetDiskCount returns the disk count in bytes. -func GetDiskCount() uint64 { - return 100 * 1024 * 1024 +// GetDiskUsage Get Disk Usage in GB +func GetDiskUsage(path string) (float64, float64, error) { + diskStats, err := disk.Usage(path) + if err != nil { + return 0, 0, err + } + usedGB := float64(diskStats.Used) / 1e9 + totalGB := float64(diskStats.Total) / 1e9 + return usedGB, totalGB, nil } -// GetDiskUsage returns the disk usage in bytes. -func GetDiskUsage() uint64 { - return 2 * 1024 * 1024 +// GetIOWait Get IO Wait Percentage +func GetIOWait() (float64, error) { + cpuTimes, err := cpu.Times(false) + if err != nil { + return 0, err + } + if len(cpuTimes) > 0 { + return cpuTimes[0].Iowait, nil + } + return 0, nil } func GetMemoryUseRatio() float64 { diff --git a/pkg/util/hardware/hardware_info_test.go b/pkg/util/hardware/hardware_info_test.go index 28113faf83..037e1c3a58 100644 --- a/pkg/util/hardware/hardware_info_test.go +++ b/pkg/util/hardware/hardware_info_test.go @@ -42,14 +42,17 @@ func Test_GetUsedMemoryCount(t *testing.T) { zap.Uint64("UsedMemoryCount", GetUsedMemoryCount())) } -func Test_GetDiskCount(t *testing.T) { - log.Info("TestGetDiskCount", - zap.Uint64("DiskCount", GetDiskCount())) +func TestGetDiskUsage(t *testing.T) { + used, total, err := GetDiskUsage("/") + assert.NoError(t, err) + assert.GreaterOrEqual(t, used, 0.0) + assert.GreaterOrEqual(t, total, 0.0) } -func Test_GetDiskUsage(t *testing.T) { - log.Info("TestGetDiskUsage", - zap.Uint64("DiskUsage", GetDiskUsage())) +func TestGetIOWait(t *testing.T) { + iowait, err := GetIOWait() + assert.NoError(t, err) + assert.GreaterOrEqual(t, iowait, 0.0) } func Test_GetMemoryUsageRatio(t *testing.T) { diff --git a/pkg/util/metricsinfo/metrics_info.go b/pkg/util/metricsinfo/metrics_info.go index b9a0178501..be6b468b2b 100644 --- a/pkg/util/metricsinfo/metrics_info.go +++ b/pkg/util/metricsinfo/metrics_info.go @@ -40,8 +40,10 @@ type HardwareMetrics struct { MemoryUsage uint64 `json:"memory_usage"` // how to metric disk & disk usage in distributed storage - Disk uint64 `json:"disk"` - DiskUsage uint64 `json:"disk_usage"` + Disk float64 `json:"disk,string"` + DiskUsage float64 `json:"disk_usage,string"` + + IOWaitPercentage float64 `json:"io_wait_percentage,string"` // IO Wait in % } const (