Add disk metric info (#25678)

Signed-off-by: xige-16 <xi.ge@zilliz.com>
This commit is contained in:
xige-16 2023-08-22 16:58:22 +08:00 committed by GitHub
parent 0999fb233e
commit 1ea477151e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 48 additions and 15 deletions

View File

@ -24,11 +24,11 @@
#endif
CStatus
GetLocalUsedSize(int64_t* size) {
GetLocalUsedSize(const char* c_dir, int64_t* size) {
try {
#ifdef BUILD_DISK_ANN
auto& local_chunk_manager = milvus::storage::LocalChunkManager::GetInstance();
auto dir = milvus::ChunkMangerConfig::GetLocalRootPath();
std::string dir(c_dir);
if (local_chunk_manager.DirExist(dir)) {
*size = local_chunk_manager.GetSizeOfDir(dir);
} else {

View File

@ -22,7 +22,7 @@ extern "C" {
#include "common/type_c.h"
CStatus
GetLocalUsedSize(int64_t* size);
GetLocalUsedSize(const char* c_path, int64_t* size);
CStatus
InitRemoteChunkManagerSingleton(CStorageConfig c_storage_config);

View File

@ -260,7 +260,7 @@ func (it *indexBuildTask) BuildIndex(ctx context.Context) error {
}
// check load size and size of field data
localUsedSize, err := indexcgowrapper.GetLocalUsedSize()
localUsedSize, err := indexcgowrapper.GetLocalUsedSize(Params.LocalStorageCfg.Path)
if err != nil {
log.Ctx(ctx).Warn("IndexNode get local used size failed")
return err

View File

@ -381,6 +381,16 @@ var (
Name: "execute_bytes_counter",
Help: "",
}, []string{nodeIDLabelName, msgTypeLabelName})
QueryNodeDiskUsedSize = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "disk_used_size",
Help: "disk used size(MB)",
}, []string{
nodeIDLabelName,
})
)
// RegisterQueryNode registers QueryNode metrics
@ -417,6 +427,7 @@ func RegisterQueryNode(registry *prometheus.Registry) {
registry.MustRegister(QueryNodeConsumeTimeTickLag)
registry.MustRegister(QueryNodeSegmentSearchLatencyPerVector)
registry.MustRegister(QueryNodeWatchDmlChannelLatency)
registry.MustRegister(QueryNodeDiskUsedSize)
}
func CleanupQueryNodeCollectionMetrics(nodeID int64, collectionID int64) {

View File

@ -82,11 +82,13 @@ func GetCProtoBlob(cProto *C.CProto) []byte {
return blob
}
func GetLocalUsedSize() (int64, error) {
func GetLocalUsedSize(path string) (int64, error) {
var availableSize int64
cSize := (*C.int64_t)(&availableSize)
cSize := C.int64_t(availableSize)
cPath := C.CString(path)
defer C.free(unsafe.Pointer(cPath))
status := C.GetLocalUsedSize(cSize)
status := C.GetLocalUsedSize(cPath, &cSize)
err := HandleCStatus(&status, "get local used size failed")
if err != nil {
return 0, err

View File

@ -290,6 +290,16 @@ func (node *QueryNode) InitSegcore() error {
return initcore.InitRemoteChunkManager(&Params)
}
func (node *QueryNode) InitMetrics() error {
localUsedSize, err := GetLocalUsedSize(Params.LocalStorageCfg.Path)
if err != nil {
return err
}
metrics.QueryNodeDiskUsedSize.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Set(float64(localUsedSize / 1024 / 1024))
return nil
}
// Init function init historical and streaming module to manage segments
func (node *QueryNode) Init() error {
var initError error = nil
@ -359,6 +369,13 @@ func (node *QueryNode) Init() error {
gc.NewTuner(Params.QueryNodeCfg.OverloadedMemoryThresholdPercentage, uint32(Params.QueryNodeCfg.MinimumGOGCConfig), uint32(Params.QueryNodeCfg.MaximumGOGCConfig), action)
}
err = node.InitMetrics()
if err != nil {
log.Warn("QueryNode init metrics failed", zap.Error(err))
initError = err
return
}
log.Info("query node init successfully",
zap.Any("queryNodeID", Params.QueryNodeCfg.GetNodeID()),
zap.Any("IP", Params.QueryNodeCfg.QueryNodeIP),

View File

@ -807,13 +807,18 @@ func (loader *segmentLoader) checkSegmentSize(collectionID UniqueID, segmentLoad
return fmt.Errorf("get memory failed when checkSegmentSize, collectionID = %d", collectionID)
}
toMB := func(mem uint64) uint64 {
return mem / 1024 / 1024
}
predictMemUsage := memUsage
maxSegmentSize := uint64(0)
localDiskUsage, err := GetLocalUsedSize()
localDiskUsage, err := GetLocalUsedSize(Params.LocalStorageCfg.Path)
if err != nil {
return fmt.Errorf("get local used size failed, collectionID = %d", collectionID)
}
metrics.QueryNodeDiskUsedSize.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Set(float64(toMB(uint64(localDiskUsage))))
predictDiskUsage := uint64(localDiskUsage)
for _, loadInfo := range segmentLoadInfos {
@ -869,10 +874,6 @@ func (loader *segmentLoader) checkSegmentSize(collectionID UniqueID, segmentLoad
}
}
toMB := func(mem uint64) uint64 {
return mem / 1024 / 1024
}
// when load segment, data will be copied from go memory to c++ memory
predictPeakMemUsage := predictMemUsage + uint64(float64(maxSegmentSize)*float64(concurrency))

View File

@ -77,11 +77,13 @@ func HandleCStatus(status *C.CStatus, extraInfo string) error {
return errors.New(finalMsg)
}
func GetLocalUsedSize() (int64, error) {
func GetLocalUsedSize(path string) (int64, error) {
var availableSize int64
cSize := (*C.int64_t)(&availableSize)
cSize := C.int64_t(availableSize)
cPath := C.CString(path)
defer C.free(unsafe.Pointer(cPath))
status := C.GetLocalUsedSize(cSize)
status := C.GetLocalUsedSize(cPath, &cSize)
err := HandleCStatus(&status, "get local used size failed")
if err != nil {
return 0, err