diff --git a/internal/core/src/storage/storage_c.cpp b/internal/core/src/storage/storage_c.cpp index 56ca91a0a4..80d8d2c7ea 100644 --- a/internal/core/src/storage/storage_c.cpp +++ b/internal/core/src/storage/storage_c.cpp @@ -24,11 +24,11 @@ #endif CStatus -GetLocalUsedSize(int64_t* size) { +GetLocalUsedSize(const char* c_dir, int64_t* size) { try { #ifdef BUILD_DISK_ANN auto& local_chunk_manager = milvus::storage::LocalChunkManager::GetInstance(); - auto dir = milvus::ChunkMangerConfig::GetLocalRootPath(); + std::string dir(c_dir); if (local_chunk_manager.DirExist(dir)) { *size = local_chunk_manager.GetSizeOfDir(dir); } else { diff --git a/internal/core/src/storage/storage_c.h b/internal/core/src/storage/storage_c.h index 5e85f4f1c2..dffa3da2e0 100644 --- a/internal/core/src/storage/storage_c.h +++ b/internal/core/src/storage/storage_c.h @@ -22,7 +22,7 @@ extern "C" { #include "common/type_c.h" CStatus -GetLocalUsedSize(int64_t* size); +GetLocalUsedSize(const char* c_path, int64_t* size); CStatus InitRemoteChunkManagerSingleton(CStorageConfig c_storage_config); diff --git a/internal/indexnode/task.go b/internal/indexnode/task.go index 67670cb9b6..1fbbcaaff3 100644 --- a/internal/indexnode/task.go +++ b/internal/indexnode/task.go @@ -260,7 +260,7 @@ func (it *indexBuildTask) BuildIndex(ctx context.Context) error { } // check load size and size of field data - localUsedSize, err := indexcgowrapper.GetLocalUsedSize() + localUsedSize, err := indexcgowrapper.GetLocalUsedSize(Params.LocalStorageCfg.Path) if err != nil { log.Ctx(ctx).Warn("IndexNode get local used size failed") return err diff --git a/internal/metrics/querynode_metrics.go b/internal/metrics/querynode_metrics.go index 02ab5f5aaf..bfc9e3e413 100644 --- a/internal/metrics/querynode_metrics.go +++ b/internal/metrics/querynode_metrics.go @@ -381,6 +381,16 @@ var ( Name: "execute_bytes_counter", Help: "", }, []string{nodeIDLabelName, msgTypeLabelName}) + + QueryNodeDiskUsedSize = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryNodeRole, + Name: "disk_used_size", + Help: "disk used size(MB)", + }, []string{ + nodeIDLabelName, + }) ) // RegisterQueryNode registers QueryNode metrics @@ -417,6 +427,7 @@ func RegisterQueryNode(registry *prometheus.Registry) { registry.MustRegister(QueryNodeConsumeTimeTickLag) registry.MustRegister(QueryNodeSegmentSearchLatencyPerVector) registry.MustRegister(QueryNodeWatchDmlChannelLatency) + registry.MustRegister(QueryNodeDiskUsedSize) } func CleanupQueryNodeCollectionMetrics(nodeID int64, collectionID int64) { diff --git a/internal/querynode/cgo_helper.go b/internal/querynode/cgo_helper.go index 4f494dd5f8..e2aad11fa3 100644 --- a/internal/querynode/cgo_helper.go +++ b/internal/querynode/cgo_helper.go @@ -82,11 +82,13 @@ func GetCProtoBlob(cProto *C.CProto) []byte { return blob } -func GetLocalUsedSize() (int64, error) { +func GetLocalUsedSize(path string) (int64, error) { var availableSize int64 - cSize := (*C.int64_t)(&availableSize) + cSize := C.int64_t(availableSize) + cPath := C.CString(path) + defer C.free(unsafe.Pointer(cPath)) - status := C.GetLocalUsedSize(cSize) + status := C.GetLocalUsedSize(cPath, &cSize) err := HandleCStatus(&status, "get local used size failed") if err != nil { return 0, err diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index 897bd6c793..bc662905ac 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -290,6 +290,16 @@ func (node *QueryNode) InitSegcore() error { return initcore.InitRemoteChunkManager(&Params) } +func (node *QueryNode) InitMetrics() error { + localUsedSize, err := GetLocalUsedSize(Params.LocalStorageCfg.Path) + if err != nil { + return err + } + metrics.QueryNodeDiskUsedSize.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Set(float64(localUsedSize / 1024 / 1024)) + + return nil +} + // Init function init historical and streaming module to manage segments func (node *QueryNode) Init() error { var initError error = nil @@ -359,6 +369,13 @@ func (node *QueryNode) Init() error { gc.NewTuner(Params.QueryNodeCfg.OverloadedMemoryThresholdPercentage, uint32(Params.QueryNodeCfg.MinimumGOGCConfig), uint32(Params.QueryNodeCfg.MaximumGOGCConfig), action) } + err = node.InitMetrics() + if err != nil { + log.Warn("QueryNode init metrics failed", zap.Error(err)) + initError = err + return + } + log.Info("query node init successfully", zap.Any("queryNodeID", Params.QueryNodeCfg.GetNodeID()), zap.Any("IP", Params.QueryNodeCfg.QueryNodeIP), diff --git a/internal/querynode/segment_loader.go b/internal/querynode/segment_loader.go index 7e4235824f..133f7ab1eb 100644 --- a/internal/querynode/segment_loader.go +++ b/internal/querynode/segment_loader.go @@ -807,13 +807,18 @@ func (loader *segmentLoader) checkSegmentSize(collectionID UniqueID, segmentLoad return fmt.Errorf("get memory failed when checkSegmentSize, collectionID = %d", collectionID) } + toMB := func(mem uint64) uint64 { + return mem / 1024 / 1024 + } + predictMemUsage := memUsage maxSegmentSize := uint64(0) - localDiskUsage, err := GetLocalUsedSize() + localDiskUsage, err := GetLocalUsedSize(Params.LocalStorageCfg.Path) if err != nil { return fmt.Errorf("get local used size failed, collectionID = %d", collectionID) } + metrics.QueryNodeDiskUsedSize.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Set(float64(toMB(uint64(localDiskUsage)))) predictDiskUsage := uint64(localDiskUsage) for _, loadInfo := range segmentLoadInfos { @@ -869,10 +874,6 @@ func (loader *segmentLoader) checkSegmentSize(collectionID UniqueID, segmentLoad } } - toMB := func(mem uint64) uint64 { - return mem / 1024 / 1024 - } - // when load segment, data will be copied from go memory to c++ memory predictPeakMemUsage := predictMemUsage + uint64(float64(maxSegmentSize)*float64(concurrency)) diff --git a/internal/util/indexcgowrapper/helper.go b/internal/util/indexcgowrapper/helper.go index 1d166f98c4..065662fbcd 100644 --- a/internal/util/indexcgowrapper/helper.go +++ b/internal/util/indexcgowrapper/helper.go @@ -77,11 +77,13 @@ func HandleCStatus(status *C.CStatus, extraInfo string) error { return errors.New(finalMsg) } -func GetLocalUsedSize() (int64, error) { +func GetLocalUsedSize(path string) (int64, error) { var availableSize int64 - cSize := (*C.int64_t)(&availableSize) + cSize := C.int64_t(availableSize) + cPath := C.CString(path) + defer C.free(unsafe.Pointer(cPath)) - status := C.GetLocalUsedSize(cSize) + status := C.GetLocalUsedSize(cPath, &cSize) err := HandleCStatus(&status, "get local used size failed") if err != nil { return 0, err