enhance: collect all cgo calling into metric and log slow cgo call (#43035)

issue: #42833

- also fix the error metric for async cgo.
- also make sure the roles can be seen when node startup, #43041.

Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
Zhen Ye 2025-07-03 15:00:44 +08:00 committed by GitHub
parent f6b2a71c95
commit bbbc7d4517
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 327 additions and 7 deletions

View File

@ -407,34 +407,34 @@ func (mr *MilvusRoles) Run() {
var proxy, dataNode, queryNode, streamingNode component
if (mr.EnableRootCoord && mr.EnableDataCoord && mr.EnableQueryCoord) || mr.EnableMixCoord {
paramtable.SetLocalComponentEnabled(typeutil.MixCoordRole)
mixCoord = mr.runMixCoord(ctx, local, &wg)
componentMap[typeutil.MixCoordRole] = mixCoord
paramtable.SetLocalComponentEnabled(typeutil.MixCoordRole)
}
if mr.EnableQueryNode {
paramtable.SetLocalComponentEnabled(typeutil.QueryNodeRole)
queryNode = mr.runQueryNode(ctx, local, &wg)
componentMap[typeutil.QueryNodeRole] = queryNode
paramtable.SetLocalComponentEnabled(typeutil.QueryNodeRole)
}
if mr.EnableDataNode {
paramtable.SetLocalComponentEnabled(typeutil.DataNodeRole)
dataNode = mr.runDataNode(ctx, local, &wg)
componentMap[typeutil.DataNodeRole] = dataNode
paramtable.SetLocalComponentEnabled(typeutil.DataNodeRole)
}
if mr.EnableProxy {
paramtable.SetLocalComponentEnabled(typeutil.ProxyRole)
proxy = mr.runProxy(ctx, local, &wg)
componentMap[typeutil.ProxyRole] = proxy
paramtable.SetLocalComponentEnabled(typeutil.ProxyRole)
}
if mr.EnableStreamingNode {
// Before initializing the local streaming node, make sure the local registry is ready.
paramtable.SetLocalComponentEnabled(typeutil.StreamingNodeRole)
streamingNode = mr.runStreamingNode(ctx, local, &wg)
componentMap[typeutil.StreamingNodeRole] = streamingNode
paramtable.SetLocalComponentEnabled(typeutil.StreamingNodeRole)
}
wg.Wait()

View File

@ -23,6 +23,7 @@
#include "index/Meta.h"
#include "storage/Util.h"
#include "pb/clustering.pb.h"
#include "monitor/scope_metric.h"
#include "clustering/KmeansClustering.h"
using namespace milvus;
@ -54,6 +55,8 @@ CStatus
Analyze(CAnalyze* res_analyze,
const uint8_t* serialized_analyze_info,
const uint64_t len) {
SCOPE_CGO_CALL_METRIC();
try {
auto analyze_info =
std::make_unique<milvus::proto::clustering::AnalyzeInfo>();
@ -110,6 +113,8 @@ Analyze(CAnalyze* res_analyze,
CStatus
DeleteAnalyze(CAnalyze analyze) {
SCOPE_CGO_CALL_METRIC();
auto status = CStatus();
try {
AssertInfo(analyze, "failed to delete analyze, passed index was null");
@ -131,6 +136,8 @@ GetAnalyzeResultMeta(CAnalyze analyze,
int64_t* centroid_file_size,
void* id_mapping_paths,
int64_t* id_mapping_sizes) {
SCOPE_CGO_CALL_METRIC();
auto status = CStatus();
try {
AssertInfo(analyze,

View File

@ -17,9 +17,12 @@
#include "common/EasyAssert.h"
#include "knowhere/binaryset.h"
#include "common/binary_set_c.h"
#include "monitor/scope_metric.h"
CStatus
NewBinarySet(CBinarySet* c_binary_set) {
SCOPE_CGO_CALL_METRIC();
try {
auto binary_set = std::make_unique<knowhere::BinarySet>();
*c_binary_set = binary_set.release();
@ -37,6 +40,8 @@ NewBinarySet(CBinarySet* c_binary_set) {
void
DeleteBinarySet(CBinarySet c_binary_set) {
SCOPE_CGO_CALL_METRIC();
auto binary_set = (knowhere::BinarySet*)c_binary_set;
delete binary_set;
}
@ -46,6 +51,8 @@ AppendIndexBinary(CBinarySet c_binary_set,
void* index_binary,
int64_t index_size,
const char* c_index_key) {
SCOPE_CGO_CALL_METRIC();
auto status = CStatus();
try {
auto binary_set = (knowhere::BinarySet*)c_binary_set;
@ -67,12 +74,16 @@ AppendIndexBinary(CBinarySet c_binary_set,
int
GetBinarySetSize(CBinarySet c_binary_set) {
SCOPE_CGO_CALL_METRIC();
auto binary_set = (knowhere::BinarySet*)c_binary_set;
return binary_set->binary_map_.size();
}
void
GetBinarySetKeys(CBinarySet c_binary_set, void* data) {
SCOPE_CGO_CALL_METRIC();
auto binary_set = (knowhere::BinarySet*)c_binary_set;
auto& map_ = binary_set->binary_map_;
const char** data_ = (const char**)data;
@ -84,6 +95,8 @@ GetBinarySetKeys(CBinarySet c_binary_set, void* data) {
int
GetBinarySetValueSize(CBinarySet c_binary_set, const char* key) {
SCOPE_CGO_CALL_METRIC();
auto binary_set = (knowhere::BinarySet*)c_binary_set;
int64_t ret_ = 0;
try {
@ -97,6 +110,8 @@ GetBinarySetValueSize(CBinarySet c_binary_set, const char* key) {
CStatus
CopyBinarySetValue(void* data, const char* key, CBinarySet c_binary_set) {
SCOPE_CGO_CALL_METRIC();
auto status = CStatus();
auto binary_set = (knowhere::BinarySet*)c_binary_set;
try {

View File

@ -75,7 +75,6 @@ class Metrics {
milvus::monitor::internal_cgo_cancel_before_execute_total_all
.Increment();
} else {
milvus::monitor::internal_cgo_executing_task_total_all.Decrement();
milvus::monitor::internal_cgo_execute_duration_seconds_all.Observe(
std::chrono::duration<double>(execute_duration_).count());
}
@ -102,6 +101,7 @@ class Metrics {
auto now = std::chrono::steady_clock::now();
execute_duration_ =
std::chrono::duration_cast<Duration>(now - time_point_);
milvus::monitor::internal_cgo_executing_task_total_all.Decrement();
}
private:

View File

@ -37,6 +37,7 @@
#include "index/Meta.h"
#include "index/JsonKeyStatsInvertedIndex.h"
#include "milvus-storage/filesystem/fs.h"
#include "monitor/scope_metric.h"
using namespace milvus;
CStatus
@ -44,6 +45,8 @@ CreateIndexV0(enum CDataType dtype,
const char* serialized_type_params,
const char* serialized_index_params,
CIndex* res_index) {
SCOPE_CGO_CALL_METRIC();
auto status = CStatus();
try {
AssertInfo(res_index, "failed to create index, passed index was null");
@ -183,6 +186,8 @@ CStatus
CreateIndex(CIndex* res_index,
const uint8_t* serialized_build_index_info,
const uint64_t len) {
SCOPE_CGO_CALL_METRIC();
try {
auto build_index_info =
std::make_unique<milvus::proto::indexcgo::BuildIndexInfo>();
@ -265,6 +270,8 @@ CStatus
BuildJsonKeyIndex(ProtoLayoutInterface result,
const uint8_t* serialized_build_index_info,
const uint64_t len) {
SCOPE_CGO_CALL_METRIC();
try {
auto build_index_info =
std::make_unique<milvus::proto::indexcgo::BuildIndexInfo>();
@ -347,6 +354,8 @@ CStatus
BuildTextIndex(ProtoLayoutInterface result,
const uint8_t* serialized_build_index_info,
const uint64_t len) {
SCOPE_CGO_CALL_METRIC();
try {
auto build_index_info =
std::make_unique<milvus::proto::indexcgo::BuildIndexInfo>();
@ -426,6 +435,8 @@ BuildTextIndex(ProtoLayoutInterface result,
CStatus
DeleteIndex(CIndex index) {
SCOPE_CGO_CALL_METRIC();
auto status = CStatus();
try {
AssertInfo(index, "failed to delete index, passed index was null");
@ -445,6 +456,8 @@ CStatus
BuildFloatVecIndex(CIndex index,
int64_t float_value_num,
const float* vectors) {
SCOPE_CGO_CALL_METRIC();
auto status = CStatus();
try {
AssertInfo(index,
@ -470,6 +483,8 @@ CStatus
BuildFloat16VecIndex(CIndex index,
int64_t float16_value_num,
const uint8_t* vectors) {
SCOPE_CGO_CALL_METRIC();
auto status = CStatus();
try {
AssertInfo(
@ -496,6 +511,8 @@ CStatus
BuildBFloat16VecIndex(CIndex index,
int64_t bfloat16_value_num,
const uint8_t* vectors) {
SCOPE_CGO_CALL_METRIC();
auto status = CStatus();
try {
AssertInfo(
@ -520,6 +537,8 @@ BuildBFloat16VecIndex(CIndex index,
CStatus
BuildBinaryVecIndex(CIndex index, int64_t data_size, const uint8_t* vectors) {
SCOPE_CGO_CALL_METRIC();
auto status = CStatus();
try {
AssertInfo(
@ -547,6 +566,8 @@ BuildSparseFloatVecIndex(CIndex index,
int64_t row_num,
int64_t dim,
const uint8_t* vectors) {
SCOPE_CGO_CALL_METRIC();
auto status = CStatus();
try {
AssertInfo(
@ -570,6 +591,8 @@ BuildSparseFloatVecIndex(CIndex index,
CStatus
BuildInt8VecIndex(CIndex index, int64_t int8_value_num, const int8_t* vectors) {
SCOPE_CGO_CALL_METRIC();
auto status = CStatus();
try {
AssertInfo(index,
@ -598,6 +621,8 @@ BuildInt8VecIndex(CIndex index, int64_t int8_value_num, const int8_t* vectors) {
// TODO: optimize here if necessary.
CStatus
BuildScalarIndex(CIndex c_index, int64_t size, const void* field_data) {
SCOPE_CGO_CALL_METRIC();
auto status = CStatus();
try {
AssertInfo(c_index,
@ -620,6 +645,8 @@ BuildScalarIndex(CIndex c_index, int64_t size, const void* field_data) {
CStatus
SerializeIndexToBinarySet(CIndex index, CBinarySet* c_binary_set) {
SCOPE_CGO_CALL_METRIC();
auto status = CStatus();
try {
AssertInfo(
@ -641,6 +668,8 @@ SerializeIndexToBinarySet(CIndex index, CBinarySet* c_binary_set) {
CStatus
LoadIndexFromBinarySet(CIndex index, CBinarySet c_binary_set) {
SCOPE_CGO_CALL_METRIC();
auto status = CStatus();
try {
AssertInfo(
@ -661,6 +690,8 @@ LoadIndexFromBinarySet(CIndex index, CBinarySet c_binary_set) {
CStatus
CleanLocalData(CIndex index) {
SCOPE_CGO_CALL_METRIC();
auto status = CStatus();
try {
AssertInfo(index,
@ -681,6 +712,8 @@ CleanLocalData(CIndex index) {
CStatus
SerializeIndexAndUpLoad(CIndex index, ProtoLayoutInterface result) {
SCOPE_CGO_CALL_METRIC();
auto status = CStatus();
try {
AssertInfo(

View File

@ -0,0 +1,56 @@
#include <chrono>
#include <string>
#include <iostream>
#include "log/Log.h"
#include "prometheus_client.h"
#include "scope_metric.h"
namespace milvus::monitor {
const prometheus::Histogram::BucketBoundaries cgoCallDurationbuckets = {
std::chrono::duration<float>(std::chrono::microseconds(10)).count(),
std::chrono::duration<float>(std::chrono::microseconds(50)).count(),
std::chrono::duration<float>(std::chrono::microseconds(100)).count(),
std::chrono::duration<float>(std::chrono::microseconds(500)).count(),
std::chrono::duration<float>(std::chrono::milliseconds(1)).count(),
std::chrono::duration<float>(std::chrono::milliseconds(5)).count(),
std::chrono::duration<float>(std::chrono::milliseconds(10)).count(),
std::chrono::duration<float>(std::chrono::milliseconds(50)).count(),
std::chrono::duration<float>(std::chrono::milliseconds(100)).count(),
std::chrono::duration<float>(std::chrono::milliseconds(500)).count(),
std::chrono::duration<float>(std::chrono::seconds(1)).count(),
std::chrono::duration<float>(std::chrono::seconds(2)).count(),
std::chrono::duration<float>(std::chrono::seconds(5)).count(),
std::chrono::duration<float>(std::chrono::seconds(10)).count(),
};
// One histogram per function name (label)
static inline prometheus::Histogram&
GetHistogram(std::string&& func) {
static auto& hist_family = prometheus::BuildHistogram()
.Name("milvus_cgocall_duration_seconds")
.Help("Duration of cgo-exposed functions")
.Register(prometheusClient->GetRegistry());
// default buckets: [0.005, 0.01, ..., 1.0]
return hist_family.Add({{"func", func}}, cgoCallDurationbuckets);
}
FuncScopeMetric::FuncScopeMetric(const char* f)
: func_(f), start_(std::chrono::high_resolution_clock::now()) {
}
FuncScopeMetric::~FuncScopeMetric() {
auto end = std::chrono::high_resolution_clock::now();
double duration_sec = std::chrono::duration<double>(end - start_).count();
if (duration_sec > 1.0) {
LOG_INFO("[CGO Call] slow function {} done with duration {}s",
func_,
duration_sec);
}
// record prometheus metric
auto& hist = GetHistogram(std::move(func_));
hist.Observe(duration_sec);
}
} // namespace milvus::monitor

View File

@ -0,0 +1,22 @@
#pragma once
#include <chrono>
#include <string>
#define SCOPE_CGO_CALL_METRIC() \
::milvus::monitor::FuncScopeMetric _scope_metric(__func__)
namespace milvus::monitor {
class FuncScopeMetric {
public:
FuncScopeMetric(const char* f);
~FuncScopeMetric();
private:
std::string func_;
std::chrono::high_resolution_clock::time_point start_;
};
} // namespace milvus::monitor

View File

@ -43,6 +43,7 @@
#include "common/Tracer.h"
#include "common/Types.h"
#include "common/resource_c.h"
#include "monitor/scope_metric.h"
#include "google/protobuf/message_lite.h"
#include "index/Index.h"
#include "index/IndexFactory.h"
@ -339,6 +340,8 @@ ChunkedSegmentSealedImpl::load_column_group_data_internal(
void
ChunkedSegmentSealedImpl::load_field_data_internal(
const LoadFieldDataInfo& load_info) {
SCOPE_CGO_CALL_METRIC();
size_t num_rows = storage::GetNumRowsForLoadInfo(load_info);
AssertInfo(
!num_rows_.has_value() || num_rows_ == num_rows,
@ -412,6 +415,8 @@ ChunkedSegmentSealedImpl::load_field_data_internal(
void
ChunkedSegmentSealedImpl::load_system_field_internal(FieldId field_id,
FieldDataInfo& data) {
SCOPE_CGO_CALL_METRIC();
auto num_rows = data.row_count;
AssertInfo(SystemProperty::Instance().IsSystem(field_id),
"system field is not system field");
@ -452,6 +457,8 @@ ChunkedSegmentSealedImpl::load_system_field_internal(FieldId field_id,
void
ChunkedSegmentSealedImpl::LoadDeletedRecord(const LoadDeletedRecordInfo& info) {
SCOPE_CGO_CALL_METRIC();
AssertInfo(info.row_count > 0, "The row count of deleted record is 0");
AssertInfo(info.primary_keys, "Deleted primary keys is null");
AssertInfo(info.timestamps, "Deleted timestamps is null");

View File

@ -18,12 +18,15 @@
#include <iostream>
#include "segcore/collection_c.h"
#include "monitor/scope_metric.h"
#include "segcore/Collection.h"
CStatus
NewCollection(const void* schema_proto_blob,
const int64_t length,
CCollection* newCollection) {
SCOPE_CGO_CALL_METRIC();
try {
auto collection = std::make_unique<milvus::segcore::Collection>(
schema_proto_blob, length);
@ -39,6 +42,8 @@ UpdateSchema(CCollection collection,
const void* proto_blob,
const int64_t length,
const uint64_t version) {
SCOPE_CGO_CALL_METRIC();
try {
auto col = static_cast<milvus::segcore::Collection*>(collection);
@ -68,6 +73,8 @@ CStatus
SetIndexMeta(CCollection collection,
const void* proto_blob,
const int64_t length) {
SCOPE_CGO_CALL_METRIC();
try {
auto col = static_cast<milvus::segcore::Collection*>(collection);
col->parseIndexMeta(proto_blob, length);
@ -79,12 +86,16 @@ SetIndexMeta(CCollection collection,
void
DeleteCollection(CCollection collection) {
SCOPE_CGO_CALL_METRIC();
auto col = static_cast<milvus::segcore::Collection*>(collection);
delete col;
}
const char*
GetCollectionName(CCollection collection) {
SCOPE_CGO_CALL_METRIC();
auto col = static_cast<milvus::segcore::Collection*>(collection);
return strdup(col->get_collection_name().data());
}

View File

@ -12,23 +12,29 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include "segcore/column_groups_c.h"
#include <vector>
#include <string>
#include <memory>
#include "segcore/column_groups_c.h"
#include "monitor/scope_metric.h"
using VecVecInt = std::vector<std::vector<int>>;
extern "C" {
CColumnGroups
NewCColumnGroups() {
SCOPE_CGO_CALL_METRIC();
auto vv = std::make_unique<VecVecInt>();
return vv.release();
}
void
AddCColumnGroup(CColumnGroups cgs, int* group, int group_size) {
SCOPE_CGO_CALL_METRIC();
if (!cgs || !group)
return;
@ -39,6 +45,8 @@ AddCColumnGroup(CColumnGroups cgs, int* group, int group_size) {
int
CColumnGroupsSize(CColumnGroups cgs) {
SCOPE_CGO_CALL_METRIC();
if (!cgs)
return 0;
@ -48,6 +56,8 @@ CColumnGroupsSize(CColumnGroups cgs) {
void
FreeCColumnGroups(CColumnGroups cgs) {
SCOPE_CGO_CALL_METRIC();
delete static_cast<VecVecInt*>(cgs);
}
}

View File

@ -17,10 +17,13 @@
#include "common/EasyAssert.h"
#include "common/LoadInfo.h"
#include "segcore/load_field_data_c.h"
#include "monitor/scope_metric.h"
CStatus
NewLoadFieldDataInfo(CLoadFieldDataInfo* c_load_field_data_info,
int64_t storage_version) {
SCOPE_CGO_CALL_METRIC();
try {
auto load_field_data_info = std::make_unique<LoadFieldDataInfo>();
load_field_data_info->storage_version = storage_version;
@ -33,6 +36,8 @@ NewLoadFieldDataInfo(CLoadFieldDataInfo* c_load_field_data_info,
void
DeleteLoadFieldDataInfo(CLoadFieldDataInfo c_load_field_data_info) {
SCOPE_CGO_CALL_METRIC();
auto info = static_cast<LoadFieldDataInfo*>(c_load_field_data_info);
delete info;
}
@ -41,6 +46,8 @@ CStatus
AppendLoadFieldInfo(CLoadFieldDataInfo c_load_field_data_info,
int64_t field_id,
int64_t row_count) {
SCOPE_CGO_CALL_METRIC();
try {
auto load_field_data_info =
static_cast<LoadFieldDataInfo*>(c_load_field_data_info);
@ -64,6 +71,8 @@ AppendLoadFieldDataPath(CLoadFieldDataInfo c_load_field_data_info,
int64_t field_id,
int64_t entries_num,
const char* c_file_path) {
SCOPE_CGO_CALL_METRIC();
try {
auto load_field_data_info =
static_cast<LoadFieldDataInfo*>(c_load_field_data_info);
@ -86,6 +95,8 @@ AppendLoadFieldDataPath(CLoadFieldDataInfo c_load_field_data_info,
void
AppendMMapDirPath(CLoadFieldDataInfo c_load_field_data_info,
const char* c_dir_path) {
SCOPE_CGO_CALL_METRIC();
auto load_field_data_info =
static_cast<LoadFieldDataInfo*>(c_load_field_data_info);
load_field_data_info->mmap_dir_path = std::string(c_dir_path);
@ -94,6 +105,8 @@ AppendMMapDirPath(CLoadFieldDataInfo c_load_field_data_info,
void
SetStorageVersion(CLoadFieldDataInfo c_load_field_data_info,
int64_t storage_version) {
SCOPE_CGO_CALL_METRIC();
auto load_field_data_info = (LoadFieldDataInfo*)c_load_field_data_info;
load_field_data_info->storage_version = storage_version;
}
@ -102,12 +115,16 @@ void
EnableMmap(CLoadFieldDataInfo c_load_field_data_info,
int64_t field_id,
bool enabled) {
SCOPE_CGO_CALL_METRIC();
auto info = static_cast<LoadFieldDataInfo*>(c_load_field_data_info);
info->field_infos[field_id].enable_mmap = enabled;
}
void
SetLoadPriority(CLoadFieldDataInfo c_load_field_data_info, int32_t priority) {
SCOPE_CGO_CALL_METRIC();
auto info = static_cast<LoadFieldDataInfo*>(c_load_field_data_info);
info->load_priority = milvus::proto::common::LoadPriority(priority);
}

View File

@ -34,15 +34,20 @@
#include "cachinglayer/Manager.h"
#include "segcore/storagev1translator/SealedIndexTranslator.h"
#include "segcore/storagev1translator/V1SealedIndexTranslator.h"
#include "monitor/scope_metric.h"
bool
IsLoadWithDisk(const char* index_type, int index_engine_version) {
SCOPE_CGO_CALL_METRIC();
return knowhere::UseDiskLoad(index_type, index_engine_version) ||
strcmp(index_type, milvus::index::INVERTED_INDEX_TYPE) == 0;
}
CStatus
NewLoadIndexInfo(CLoadIndexInfo* c_load_index_info) {
SCOPE_CGO_CALL_METRIC();
try {
auto load_index_info =
std::make_unique<milvus::segcore::LoadIndexInfo>();
@ -62,6 +67,8 @@ NewLoadIndexInfo(CLoadIndexInfo* c_load_index_info) {
void
DeleteLoadIndexInfo(CLoadIndexInfo c_load_index_info) {
SCOPE_CGO_CALL_METRIC();
auto info = (milvus::segcore::LoadIndexInfo*)c_load_index_info;
delete info;
}
@ -70,6 +77,8 @@ CStatus
AppendIndexParam(CLoadIndexInfo c_load_index_info,
const char* c_index_key,
const char* c_index_value) {
SCOPE_CGO_CALL_METRIC();
try {
auto load_index_info =
(milvus::segcore::LoadIndexInfo*)c_load_index_info;
@ -98,6 +107,8 @@ AppendFieldInfo(CLoadIndexInfo c_load_index_info,
enum CDataType field_type,
bool enable_mmap,
const char* mmap_dir_path) {
SCOPE_CGO_CALL_METRIC();
try {
auto load_index_info =
(milvus::segcore::LoadIndexInfo*)c_load_index_info;
@ -123,6 +134,8 @@ AppendFieldInfo(CLoadIndexInfo c_load_index_info,
CStatus
appendVecIndex(CLoadIndexInfo c_load_index_info, CBinarySet c_binary_set) {
SCOPE_CGO_CALL_METRIC();
try {
auto load_index_info =
(milvus::segcore::LoadIndexInfo*)c_load_index_info;
@ -183,6 +196,8 @@ appendVecIndex(CLoadIndexInfo c_load_index_info, CBinarySet c_binary_set) {
CStatus
appendScalarIndex(CLoadIndexInfo c_load_index_info, CBinarySet c_binary_set) {
SCOPE_CGO_CALL_METRIC();
try {
auto load_index_info =
(milvus::segcore::LoadIndexInfo*)c_load_index_info;
@ -231,6 +246,8 @@ appendScalarIndex(CLoadIndexInfo c_load_index_info, CBinarySet c_binary_set) {
LoadResourceRequest
EstimateLoadIndexResource(CLoadIndexInfo c_load_index_info) {
SCOPE_CGO_CALL_METRIC();
try {
auto load_index_info =
(milvus::segcore::LoadIndexInfo*)c_load_index_info;
@ -260,6 +277,8 @@ EstimateLoadIndexResource(CLoadIndexInfo c_load_index_info) {
CStatus
AppendIndex(CLoadIndexInfo c_load_index_info, CBinarySet c_binary_set) {
SCOPE_CGO_CALL_METRIC();
try {
auto load_index_info =
(milvus::segcore::LoadIndexInfo*)c_load_index_info;
@ -290,6 +309,8 @@ AppendIndex(CLoadIndexInfo c_load_index_info, CBinarySet c_binary_set) {
CStatus
AppendIndexV2(CTraceContext c_trace, CLoadIndexInfo c_load_index_info) {
SCOPE_CGO_CALL_METRIC();
try {
auto load_index_info =
static_cast<milvus::segcore::LoadIndexInfo*>(c_load_index_info);
@ -432,6 +453,8 @@ AppendIndexV2(CTraceContext c_trace, CLoadIndexInfo c_load_index_info) {
CStatus
AppendIndexFilePath(CLoadIndexInfo c_load_index_info, const char* c_file_path) {
SCOPE_CGO_CALL_METRIC();
try {
auto load_index_info =
(milvus::segcore::LoadIndexInfo*)c_load_index_info;
@ -455,6 +478,8 @@ AppendIndexInfo(CLoadIndexInfo c_load_index_info,
int64_t index_id,
int64_t build_id,
int64_t version) {
SCOPE_CGO_CALL_METRIC();
try {
auto load_index_info =
(milvus::segcore::LoadIndexInfo*)c_load_index_info;
@ -477,6 +502,8 @@ AppendIndexInfo(CLoadIndexInfo c_load_index_info,
CStatus
AppendIndexEngineVersionToLoadInfo(CLoadIndexInfo c_load_index_info,
int32_t index_engine_version) {
SCOPE_CGO_CALL_METRIC();
try {
auto load_index_info =
(milvus::segcore::LoadIndexInfo*)c_load_index_info;
@ -496,6 +523,8 @@ AppendIndexEngineVersionToLoadInfo(CLoadIndexInfo c_load_index_info,
CStatus
CleanLoadedIndex(CLoadIndexInfo c_load_index_info) {
SCOPE_CGO_CALL_METRIC();
try {
auto load_index_info =
(milvus::segcore::LoadIndexInfo*)c_load_index_info;
@ -526,6 +555,8 @@ void
AppendStorageInfo(CLoadIndexInfo c_load_index_info,
const char* uri,
int64_t version) {
SCOPE_CGO_CALL_METRIC();
auto load_index_info = (milvus::segcore::LoadIndexInfo*)c_load_index_info;
load_index_info->uri = uri;
load_index_info->index_store_version = version;
@ -535,6 +566,8 @@ CStatus
FinishLoadIndexInfo(CLoadIndexInfo c_load_index_info,
const uint8_t* serialized_load_index_info,
const uint64_t len) {
SCOPE_CGO_CALL_METRIC();
try {
auto info_proto = std::make_unique<milvus::proto::cgo::LoadIndexInfo>();
info_proto->ParseFromArray(serialized_load_index_info, len);

View File

@ -24,6 +24,7 @@
#include <memory>
#include "common/EasyAssert.h"
#include "common/type_c.h"
#include "monitor/scope_metric.h"
CStatus
NewPackedReaderWithStorageConfig(char** paths,
@ -32,6 +33,8 @@ NewPackedReaderWithStorageConfig(char** paths,
const int64_t buffer_size,
CStorageConfig c_storage_config,
CPackedReader* c_packed_reader) {
SCOPE_CGO_CALL_METRIC();
try {
auto truePaths = std::vector<std::string>(paths, paths + num_paths);
@ -77,6 +80,8 @@ NewPackedReader(char** paths,
struct ArrowSchema* schema,
const int64_t buffer_size,
CPackedReader* c_packed_reader) {
SCOPE_CGO_CALL_METRIC();
try {
auto truePaths = std::vector<std::string>(paths, paths + num_paths);
auto trueFs = milvus_storage::ArrowFileSystemSingleton::GetInstance()
@ -99,6 +104,8 @@ CStatus
ReadNext(CPackedReader c_packed_reader,
CArrowArray* out_array,
CArrowSchema* out_schema) {
SCOPE_CGO_CALL_METRIC();
try {
auto packed_reader =
static_cast<milvus_storage::PackedRecordBatchReader*>(
@ -134,6 +141,8 @@ ReadNext(CPackedReader c_packed_reader,
CStatus
CloseReader(CPackedReader c_packed_reader) {
SCOPE_CGO_CALL_METRIC();
try {
auto packed_reader =
static_cast<milvus_storage::PackedRecordBatchReader*>(

View File

@ -23,6 +23,7 @@
#include <arrow/filesystem/filesystem.h>
#include "common/EasyAssert.h"
#include "common/type_c.h"
#include "monitor/scope_metric.h"
CStatus
NewPackedWriterWithStorageConfig(struct ArrowSchema* schema,
@ -33,6 +34,8 @@ NewPackedWriterWithStorageConfig(struct ArrowSchema* schema,
CColumnGroups column_groups,
CStorageConfig c_storage_config,
CPackedWriter* c_packed_writer) {
SCOPE_CGO_CALL_METRIC();
try {
auto truePaths = std::vector<std::string>(paths, paths + num_paths);
@ -95,6 +98,8 @@ NewPackedWriter(struct ArrowSchema* schema,
int64_t part_upload_size,
CColumnGroups column_groups,
CPackedWriter* c_packed_writer) {
SCOPE_CGO_CALL_METRIC();
try {
auto truePaths = std::vector<std::string>(paths, paths + num_paths);
@ -128,6 +133,8 @@ CStatus
WriteRecordBatch(CPackedWriter c_packed_writer,
struct ArrowArray* array,
struct ArrowSchema* schema) {
SCOPE_CGO_CALL_METRIC();
try {
auto packed_writer =
static_cast<milvus_storage::PackedRecordBatchWriter*>(
@ -147,6 +154,8 @@ WriteRecordBatch(CPackedWriter c_packed_writer,
CStatus
CloseWriter(CPackedWriter c_packed_writer) {
SCOPE_CGO_CALL_METRIC();
try {
auto packed_writer =
static_cast<milvus_storage::PackedRecordBatchWriter*>(

View File

@ -18,6 +18,7 @@
#include "segcore/reduce_c.h"
#include "segcore/reduce/StreamReduce.h"
#include "segcore/Utils.h"
#include "monitor/scope_metric.h"
using SearchResult = milvus::SearchResult;
@ -27,6 +28,8 @@ NewStreamReducer(CSearchPlan c_plan,
int64_t* slice_topKs,
int64_t num_slices,
CSearchStreamReducer* stream_reducer) {
SCOPE_CGO_CALL_METRIC();
try {
//convert search results and search plan
auto plan = static_cast<milvus::query::Plan*>(c_plan);
@ -44,6 +47,8 @@ CStatus
StreamReduce(CSearchStreamReducer c_stream_reducer,
CSearchResult* c_search_results,
int64_t num_segments) {
SCOPE_CGO_CALL_METRIC();
try {
auto stream_reducer =
static_cast<milvus::segcore::StreamReducerHelper*>(
@ -63,6 +68,8 @@ StreamReduce(CSearchStreamReducer c_stream_reducer,
CStatus
GetStreamReduceResult(CSearchStreamReducer c_stream_reducer,
CSearchResultDataBlobs* c_search_result_data_blobs) {
SCOPE_CGO_CALL_METRIC();
try {
auto stream_reducer =
static_cast<milvus::segcore::StreamReducerHelper*>(
@ -83,6 +90,8 @@ ReduceSearchResultsAndFillData(CTraceContext c_trace,
int64_t* slice_nqs,
int64_t* slice_topKs,
int64_t num_slices) {
SCOPE_CGO_CALL_METRIC();
try {
// get SearchResult and SearchPlan
auto plan = static_cast<milvus::query::Plan*>(c_plan);
@ -130,6 +139,8 @@ CStatus
GetSearchResultDataBlob(CProto* searchResultDataBlob,
CSearchResultDataBlobs cSearchResultDataBlobs,
int32_t blob_index) {
SCOPE_CGO_CALL_METRIC();
try {
auto search_result_data_blobs =
reinterpret_cast<milvus::segcore::SearchResultDataBlobs*>(
@ -150,6 +161,8 @@ GetSearchResultDataBlob(CProto* searchResultDataBlob,
void
DeleteSearchResultDataBlobs(CSearchResultDataBlobs cSearchResultDataBlobs) {
SCOPE_CGO_CALL_METRIC();
if (cSearchResultDataBlobs == nullptr) {
return;
}
@ -161,6 +174,8 @@ DeleteSearchResultDataBlobs(CSearchResultDataBlobs cSearchResultDataBlobs) {
void
DeleteStreamSearchReducer(CSearchStreamReducer c_stream_reducer) {
SCOPE_CGO_CALL_METRIC();
if (c_stream_reducer == nullptr) {
return;
}

View File

@ -25,6 +25,7 @@
#include "google/protobuf/text_format.h"
#include "log/Log.h"
#include "mmap/Types.h"
#include "monitor/scope_metric.h"
#include "segcore/Collection.h"
#include "segcore/SegcoreConfig.h"
#include "segcore/SegmentGrowingImpl.h"
@ -45,6 +46,8 @@ NewSegment(CCollection collection,
int64_t segment_id,
CSegmentInterface* newSegment,
bool is_sorted_by_pk) {
SCOPE_CGO_CALL_METRIC();
try {
auto col = static_cast<milvus::segcore::Collection*>(collection);
@ -81,18 +84,24 @@ NewSegment(CCollection collection,
void
DeleteSegment(CSegmentInterface c_segment) {
SCOPE_CGO_CALL_METRIC();
auto s = static_cast<milvus::segcore::SegmentInterface*>(c_segment);
delete s;
}
void
ClearSegmentData(CSegmentInterface c_segment) {
SCOPE_CGO_CALL_METRIC();
auto s = static_cast<milvus::segcore::SegmentSealed*>(c_segment);
s->ClearData();
}
void
DeleteSearchResult(CSearchResult search_result) {
SCOPE_CGO_CALL_METRIC();
auto res = static_cast<milvus::SearchResult*>(search_result);
delete res;
}
@ -248,6 +257,8 @@ AsyncRetrieveByOffsets(CTraceContext c_trace,
int64_t
GetMemoryUsageInBytes(CSegmentInterface c_segment) {
SCOPE_CGO_CALL_METRIC();
auto segment = static_cast<milvus::segcore::SegmentInterface*>(c_segment);
auto mem_size = segment->GetMemoryUsageInBytes();
return mem_size;
@ -255,6 +266,8 @@ GetMemoryUsageInBytes(CSegmentInterface c_segment) {
int64_t
GetRowCount(CSegmentInterface c_segment) {
SCOPE_CGO_CALL_METRIC();
auto segment = static_cast<milvus::segcore::SegmentInterface*>(c_segment);
auto row_count = segment->get_row_count();
return row_count;
@ -263,6 +276,8 @@ GetRowCount(CSegmentInterface c_segment) {
// TODO: segmentInterface implement get_deleted_count()
int64_t
GetDeletedCount(CSegmentInterface c_segment) {
SCOPE_CGO_CALL_METRIC();
auto segment =
reinterpret_cast<milvus::segcore::SegmentInterface*>(c_segment);
auto deleted_count = segment->get_deleted_count();
@ -271,6 +286,8 @@ GetDeletedCount(CSegmentInterface c_segment) {
int64_t
GetRealCount(CSegmentInterface c_segment) {
SCOPE_CGO_CALL_METRIC();
// not accurate, pk may exist in deleted record and not in insert record.
// return GetRowCount(c_segment) - GetDeletedCount(c_segment);
auto segment =
@ -280,6 +297,8 @@ GetRealCount(CSegmentInterface c_segment) {
bool
HasRawData(CSegmentInterface c_segment, int64_t field_id) {
SCOPE_CGO_CALL_METRIC();
auto segment =
reinterpret_cast<milvus::segcore::SegmentInterface*>(c_segment);
return segment->HasRawData(field_id);
@ -294,6 +313,8 @@ Insert(CSegmentInterface c_segment,
const uint64_t* timestamps,
const uint8_t* data_info,
const uint64_t data_info_len) {
SCOPE_CGO_CALL_METRIC();
try {
AssertInfo(data_info_len < std::numeric_limits<int>::max(),
"insert data length ({}) exceeds max int",
@ -318,6 +339,8 @@ Insert(CSegmentInterface c_segment,
CStatus
PreInsert(CSegmentInterface c_segment, int64_t size, int64_t* offset) {
SCOPE_CGO_CALL_METRIC();
try {
auto segment = static_cast<milvus::segcore::SegmentGrowing*>(c_segment);
*offset = segment->PreInsert(size);
@ -333,6 +356,8 @@ Delete(CSegmentInterface c_segment,
const uint8_t* ids,
const uint64_t ids_size,
const uint64_t* timestamps) {
SCOPE_CGO_CALL_METRIC();
auto segment = static_cast<milvus::segcore::SegmentInterface*>(c_segment);
auto pks = std::make_unique<milvus::proto::schema::IDs>();
auto suc = pks->ParseFromArray(ids, ids_size);
@ -349,6 +374,8 @@ Delete(CSegmentInterface c_segment,
CStatus
LoadFieldData(CSegmentInterface c_segment,
CLoadFieldDataInfo c_load_field_data_info) {
SCOPE_CGO_CALL_METRIC();
try {
auto segment =
reinterpret_cast<milvus::segcore::SegmentInterface*>(c_segment);
@ -364,6 +391,8 @@ LoadFieldData(CSegmentInterface c_segment,
CStatus
LoadDeletedRecord(CSegmentInterface c_segment,
CLoadDeletedRecordInfo deleted_record_info) {
SCOPE_CGO_CALL_METRIC();
try {
auto segment_interface =
reinterpret_cast<milvus::segcore::SegmentInterface*>(c_segment);
@ -385,6 +414,8 @@ LoadDeletedRecord(CSegmentInterface c_segment,
CStatus
UpdateSealedSegmentIndex(CSegmentInterface c_segment,
CLoadIndexInfo c_load_index_info) {
SCOPE_CGO_CALL_METRIC();
try {
auto segment_interface =
reinterpret_cast<milvus::segcore::SegmentInterface*>(c_segment);
@ -404,6 +435,8 @@ CStatus
LoadTextIndex(CSegmentInterface c_segment,
const uint8_t* serialized_load_text_index_info,
const uint64_t len) {
SCOPE_CGO_CALL_METRIC();
try {
auto segment_interface =
reinterpret_cast<milvus::segcore::SegmentInterface*>(c_segment);
@ -456,6 +489,8 @@ LoadJsonKeyIndex(CTraceContext c_trace,
CSegmentInterface c_segment,
const uint8_t* serialized_load_json_key_index_info,
const uint64_t len) {
SCOPE_CGO_CALL_METRIC();
try {
auto ctx = milvus::tracer::TraceContext{
c_trace.traceID, c_trace.spanID, c_trace.traceFlags};
@ -511,6 +546,8 @@ UpdateFieldRawDataSize(CSegmentInterface c_segment,
int64_t field_id,
int64_t num_rows,
int64_t field_data_size) {
SCOPE_CGO_CALL_METRIC();
try {
auto segment_interface =
reinterpret_cast<milvus::segcore::SegmentInterface*>(c_segment);
@ -525,6 +562,8 @@ UpdateFieldRawDataSize(CSegmentInterface c_segment,
CStatus
DropFieldData(CSegmentInterface c_segment, int64_t field_id) {
SCOPE_CGO_CALL_METRIC();
try {
auto segment_interface =
reinterpret_cast<milvus::segcore::SegmentInterface*>(c_segment);
@ -540,6 +579,8 @@ DropFieldData(CSegmentInterface c_segment, int64_t field_id) {
CStatus
DropSealedSegmentIndex(CSegmentInterface c_segment, int64_t field_id) {
SCOPE_CGO_CALL_METRIC();
try {
auto segment_interface =
reinterpret_cast<milvus::segcore::SegmentInterface*>(c_segment);
@ -556,6 +597,8 @@ DropSealedSegmentIndex(CSegmentInterface c_segment, int64_t field_id) {
CStatus
AddFieldDataInfoForSealed(CSegmentInterface c_segment,
CLoadFieldDataInfo c_load_field_data_info) {
SCOPE_CGO_CALL_METRIC();
try {
auto segment_interface =
reinterpret_cast<milvus::segcore::SegmentInterface*>(c_segment);
@ -573,12 +616,16 @@ AddFieldDataInfoForSealed(CSegmentInterface c_segment,
void
RemoveFieldFile(CSegmentInterface c_segment, int64_t field_id) {
SCOPE_CGO_CALL_METRIC();
auto segment = reinterpret_cast<milvus::segcore::SegmentSealed*>(c_segment);
segment->RemoveFieldFile(milvus::FieldId(field_id));
}
CStatus
CreateTextIndex(CSegmentInterface c_segment, int64_t field_id) {
SCOPE_CGO_CALL_METRIC();
try {
auto segment_interface =
reinterpret_cast<milvus::segcore::SegmentInterface*>(c_segment);
@ -591,6 +638,8 @@ CreateTextIndex(CSegmentInterface c_segment, int64_t field_id) {
CStatus
FinishLoad(CSegmentInterface c_segment) {
SCOPE_CGO_CALL_METRIC();
try {
auto segment_interface =
reinterpret_cast<milvus::segcore::SegmentInterface*>(c_segment);

View File

@ -10,26 +10,35 @@
// or implied. See the License for the specific language governing permissions and limitations under the License
#include "segcore/token_stream_c.h"
#include "token-stream.h"
#include "monitor/scope_metric.h"
void
free_token_stream(CTokenStream token_stream) {
SCOPE_CGO_CALL_METRIC();
delete static_cast<milvus::tantivy::TokenStream*>(token_stream);
}
bool
token_stream_advance(CTokenStream token_stream) {
SCOPE_CGO_CALL_METRIC();
return static_cast<milvus::tantivy::TokenStream*>(token_stream)->advance();
}
// Note: returned token must be freed by the caller using `free_token`.
const char*
token_stream_get_token(CTokenStream token_stream) {
SCOPE_CGO_CALL_METRIC();
return static_cast<milvus::tantivy::TokenStream*>(token_stream)
->get_token_no_copy();
}
CToken
token_stream_get_detailed_token(CTokenStream token_stream) {
SCOPE_CGO_CALL_METRIC();
auto token = static_cast<milvus::tantivy::TokenStream*>(token_stream)
->get_detailed_token();
return CToken{token.token,
@ -41,5 +50,7 @@ token_stream_get_detailed_token(CTokenStream token_stream) {
void
free_token(void* token) {
SCOPE_CGO_CALL_METRIC();
free_rust_string(static_cast<const char*>(token));
}

View File

@ -13,6 +13,7 @@
#include <memory>
#include "common/FieldMeta.h"
#include "common/protobuf_utils.h"
#include "monitor/scope_metric.h"
#include "pb/schema.pb.h"
#include "common/EasyAssert.h"
#include "tokenizer.h"
@ -21,6 +22,8 @@ using Map = std::map<std::string, std::string>;
CStatus
create_tokenizer(const char* params, CTokenizer* tokenizer) {
SCOPE_CGO_CALL_METRIC();
try {
auto impl = std::make_unique<milvus::tantivy::Tokenizer>(params);
*tokenizer = impl.release();
@ -32,6 +35,8 @@ create_tokenizer(const char* params, CTokenizer* tokenizer) {
CStatus
clone_tokenizer(CTokenizer* tokenizer, CTokenizer* rst) {
SCOPE_CGO_CALL_METRIC();
try {
auto impl = reinterpret_cast<milvus::tantivy::Tokenizer*>(*tokenizer);
*rst = impl->Clone().release();
@ -43,18 +48,24 @@ clone_tokenizer(CTokenizer* tokenizer, CTokenizer* rst) {
void
free_tokenizer(CTokenizer tokenizer) {
SCOPE_CGO_CALL_METRIC();
auto impl = reinterpret_cast<milvus::tantivy::Tokenizer*>(tokenizer);
delete impl;
}
CTokenStream
create_token_stream(CTokenizer tokenizer, const char* text, uint32_t text_len) {
SCOPE_CGO_CALL_METRIC();
auto impl = reinterpret_cast<milvus::tantivy::Tokenizer*>(tokenizer);
return impl->CreateTokenStream(std::string(text, text_len)).release();
}
CStatus
validate_tokenizer(const char* params) {
SCOPE_CGO_CALL_METRIC();
try {
auto impl = std::make_unique<milvus::tantivy::Tokenizer>(params);
return milvus::SuccessCStatus();
@ -65,6 +76,8 @@ validate_tokenizer(const char* params) {
CStatus
validate_text_schema(const uint8_t* field_schema, uint64_t length) {
SCOPE_CGO_CALL_METRIC();
try {
auto schema = std::make_unique<milvus::proto::schema::FieldSchema>();
AssertInfo(schema->ParseFromArray(field_schema, length),

View File

@ -21,9 +21,12 @@
#include "storage/LocalChunkManagerSingleton.h"
#include "storage/MmapManager.h"
#include "storage/ThreadPools.h"
#include "monitor/scope_metric.h"
CStatus
GetLocalUsedSize(const char* c_dir, int64_t* size) {
SCOPE_CGO_CALL_METRIC();
try {
auto local_chunk_manager =
milvus::storage::LocalChunkManagerSingleton::GetInstance()