mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
Add some msgstream metrics (#20296)
Signed-off-by: yun.zhang <yun.zhang@zilliz.com> Signed-off-by: yun.zhang <yun.zhang@zilliz.com>
This commit is contained in:
parent
4a66965df4
commit
709a0a94e9
@ -24,7 +24,7 @@ We recommend reading [Milvus monitoring framework overview](monitor_overview.md)
|
|||||||
| Mutation Send Latency | The average latency and the 99th percentile of the latency of sending insertion or deletion requests by each proxy within the past two minutes. | p99: <br/> ``` histogram_quantile(0.99, sum by (le, msg_type, pod, node_id) (rate(milvus_proxy_mutation_send_latency_bucket{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m]))) ``` <br/> avg: <br/> ``` sum(increase(milvus_proxy_mutation_send_latency_sum{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) by (pod, node_id, msg_type) / sum(increase(milvus_proxy_mutation_send_latency_count{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) by (pod, node_id, msg_type) ``` | `milvus_proxy_mutation_send_latency` | The latency of sending insertion or deletion requests. |
|
| Mutation Send Latency | The average latency and the 99th percentile of the latency of sending insertion or deletion requests by each proxy within the past two minutes. | p99: <br/> ``` histogram_quantile(0.99, sum by (le, msg_type, pod, node_id) (rate(milvus_proxy_mutation_send_latency_bucket{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m]))) ``` <br/> avg: <br/> ``` sum(increase(milvus_proxy_mutation_send_latency_sum{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) by (pod, node_id, msg_type) / sum(increase(milvus_proxy_mutation_send_latency_count{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) by (pod, node_id, msg_type) ``` | `milvus_proxy_mutation_send_latency` | The latency of sending insertion or deletion requests. |
|
||||||
| Cache Hit Rate | The average cache hit rate of operations including `GeCollectionID`, `GetCollectionInfo `, and `GetCollectionSchema` per second within the past two minutes. | ``` sum(increase(milvus_proxy_cache_hit_count{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace", cache_state="hit"}[2m])/120) by(cache_name, pod, node_id) / sum(increase(milvus_proxy_cache_hit_count{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])/120) by(cache_name, pod, node_id) ``` | `milvus_proxy_cache_hit_count` | The statistics of hit and failure rate of each cache reading operation. |
|
| Cache Hit Rate | The average cache hit rate of operations including `GeCollectionID`, `GetCollectionInfo `, and `GetCollectionSchema` per second within the past two minutes. | ``` sum(increase(milvus_proxy_cache_hit_count{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace", cache_state="hit"}[2m])/120) by(cache_name, pod, node_id) / sum(increase(milvus_proxy_cache_hit_count{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])/120) by(cache_name, pod, node_id) ``` | `milvus_proxy_cache_hit_count` | The statistics of hit and failure rate of each cache reading operation. |
|
||||||
| Cache Update Latency | The average latency and the 99th percentile of cache update latency by proxy within the past two minutes. | p99: <br/> ``` histogram_quantile(0.99, sum by (le, pod, node_id) (rate(milvus_proxy_cache_update_latency_bucket{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m]))) ``` <br/> avg: <br/> ``` sum(increase(milvus_proxy_cache_update_latency_sum{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) by (pod, node_id) / sum(increase(milvus_proxy_cache_update_latency_count{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) by (pod, node_id) ``` | `milvus_proxy_cache_update_latency` | The latency of updating cache each time. |
|
| Cache Update Latency | The average latency and the 99th percentile of cache update latency by proxy within the past two minutes. | p99: <br/> ``` histogram_quantile(0.99, sum by (le, pod, node_id) (rate(milvus_proxy_cache_update_latency_bucket{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m]))) ``` <br/> avg: <br/> ``` sum(increase(milvus_proxy_cache_update_latency_sum{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) by (pod, node_id) / sum(increase(milvus_proxy_cache_update_latency_count{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) by (pod, node_id) ``` | `milvus_proxy_cache_update_latency` | The latency of updating cache each time. |
|
||||||
| Sync Time | The average, maximum, and minimum number of epoch time synced by each proxy in its corresponding physical channel. | ``` avg(milvus_proxy_sync_epoch_time{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (pod, node_id) max(milvus_proxy_sync_epoch_time{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (pod, node_id) min(milvus_proxy_sync_epoch_time{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (pod, node_id) ``` | `milvus_proxy_sync_epoch_time ` | Each physical channel's epoch time (Unix time, the milliseconds passed ever since January 1, 1970). <br/> There is a default `ChannelName` apart from the physical channels. |
|
| Sync Time | The average, maximum, and minimum number of epoch time synced by each proxy in its corresponding physical channel. | ``` avg(milvus_proxy_tt_lag_ms{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (pod, node_id) max(milvus_proxy_tt_lag_ms{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (pod, node_id) min(milvus_proxy_tt_lag_ms{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (pod, node_id) ``` | `milvus_proxy_tt_lag_ms ` | Each physical channel's epoch time (Unix time, the milliseconds passed ever since January 1, 1970). <br/> There is a default `ChannelName` apart from the physical channels. |
|
||||||
| Apply PK Latency | The average latency and the 99th percentile of primary key application latency by each proxy within the past two minutes. | p99: <br/> ``` histogram_quantile(0.99, sum by (le, pod, node_id) (rate(milvus_proxy_apply_pk_latency_bucket{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m]))) ``` <br/> avg: <br/> ``` sum(increase(milvus_proxy_apply_pk_latency_sum{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) by (pod, node_id) / sum(increase(milvus_proxy_apply_pk_latency_count{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) by (pod, node_id) ``` | `milvus_proxy_apply_pk_latency` | The latency of applying primary key. |
|
| Apply PK Latency | The average latency and the 99th percentile of primary key application latency by each proxy within the past two minutes. | p99: <br/> ``` histogram_quantile(0.99, sum by (le, pod, node_id) (rate(milvus_proxy_apply_pk_latency_bucket{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m]))) ``` <br/> avg: <br/> ``` sum(increase(milvus_proxy_apply_pk_latency_sum{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) by (pod, node_id) / sum(increase(milvus_proxy_apply_pk_latency_count{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) by (pod, node_id) ``` | `milvus_proxy_apply_pk_latency` | The latency of applying primary key. |
|
||||||
| Apply Timestamp Latency | The average latency and the 99th percentile of timestamp application latency by each proxy within the past two minutes. | p99: <br/> ``` histogram_quantile(0.99, sum by (le, pod, node_id) (rate(milvus_proxy_apply_timestamp_latency_bucket{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m]))) ``` <br/> avg: <br/> ``` sum(increase(milvus_proxy_apply_timestamp_latency_sum{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) by (pod, node_id) / sum(increase(milvus_proxy_apply_timestamp_latency_count{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) by (pod, node_id) ``` | `milvus_proxy_apply_timestamp_latency` | The latency of applying timestamp. |
|
| Apply Timestamp Latency | The average latency and the 99th percentile of timestamp application latency by each proxy within the past two minutes. | p99: <br/> ``` histogram_quantile(0.99, sum by (le, pod, node_id) (rate(milvus_proxy_apply_timestamp_latency_bucket{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m]))) ``` <br/> avg: <br/> ``` sum(increase(milvus_proxy_apply_timestamp_latency_sum{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) by (pod, node_id) / sum(increase(milvus_proxy_apply_timestamp_latency_count{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) by (pod, node_id) ``` | `milvus_proxy_apply_timestamp_latency` | The latency of applying timestamp. |
|
||||||
| DQL Request Rate | The status and number of DQL requests received per second by each proxy within the past two minutes. <br/> DQL requests include `DescribeCollection`, `DescribeIndex`, `GetCollectionStatistics`, `HasCollection`, `Search`, `Query`, `ShowPartitions`, etc. This panel specifically shows the total number and the number of successful DQL requests. | ``` sum(increase(milvus_proxy_dql_req_count{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])/120) by(function_name, status, pod, node_id) ``` | `milvus_proxy_dql_req_count` | The number of all types of DQL requests. |
|
| DQL Request Rate | The status and number of DQL requests received per second by each proxy within the past two minutes. <br/> DQL requests include `DescribeCollection`, `DescribeIndex`, `GetCollectionStatistics`, `HasCollection`, `Search`, `Query`, `ShowPartitions`, etc. This panel specifically shows the total number and the number of successful DQL requests. | ``` sum(increase(milvus_proxy_dql_req_count{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])/120) by(function_name, status, pod, node_id) ``` | `milvus_proxy_dql_req_count` | The number of all types of DQL requests. |
|
||||||
@ -45,7 +45,7 @@ We recommend reading [Milvus monitoring framework overview](monitor_overview.md)
|
|||||||
| Panel | Panel description | PromQL (Prometheus query language) | The Milvus metrics used | Milvus metrics description |
|
| Panel | Panel description | PromQL (Prometheus query language) | The Milvus metrics used | Milvus metrics description |
|
||||||
|---|---|---|---|---|
|
|---|---|---|---|---|
|
||||||
| Proxy Node Num | The number of proxies created. | ``` sum(milvus_rootcoord_proxy_num{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (app_kubernetes_io_instance) ``` | `milvus_rootcoord_proxy_num` | The number of proxies. |
|
| Proxy Node Num | The number of proxies created. | ``` sum(milvus_rootcoord_proxy_num{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (app_kubernetes_io_instance) ``` | `milvus_rootcoord_proxy_num` | The number of proxies. |
|
||||||
| Sync Time | The average, maximum, and minimum number of epoch time synced by each root coord in each physical channel (PChannel). | ``` avg(milvus_rootcoord_sync_epoch_time{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (app_kubernetes_io_instance) max(milvus_rootcoord_sync_epoch_time{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (app_kubernetes_io_instance) min(milvus_rootcoord_sync_epoch_time{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (app_kubernetes_io_instance) ``` | `milvus_rootcoord_sync_epoch_time` | Each physical channel's epoch time (Unix time, the milliseconds passed ever since January 1, 1970). |
|
| Sync Time | The average, maximum, and minimum number of epoch time synced by each root coord in each physical channel (PChannel). | ``` avg(milvus_rootcoord_produce_tt_lag_ms{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (app_kubernetes_io_instance) max(milvus_rootcoord_produce_tt_lag_ms{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (app_kubernetes_io_instance) min(milvus_rootcoord_produce_tt_lag_ms{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (app_kubernetes_io_instance) ``` | `milvus_rootcoord_produce_tt_lag_ms` | Each physical channel's epoch time (Unix time, the milliseconds passed ever since January 1, 1970). |
|
||||||
| DDL Request Rate | The status and number of DDL requests per second within the past two minutes. | ``` sum(increase(milvus_rootcoord_ddl_req_count{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])/120) by (status, function_name) ``` | `milvus_rootcoord_ddl_req_count` | The total number of DDL requests including `CreateCollection`, `DescribeCollection`, `DescribeSegments`, `HasCollection`, `ShowCollections`, `ShowPartitions`, and `ShowSegments`. |
|
| DDL Request Rate | The status and number of DDL requests per second within the past two minutes. | ``` sum(increase(milvus_rootcoord_ddl_req_count{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])/120) by (status, function_name) ``` | `milvus_rootcoord_ddl_req_count` | The total number of DDL requests including `CreateCollection`, `DescribeCollection`, `DescribeSegments`, `HasCollection`, `ShowCollections`, `ShowPartitions`, and `ShowSegments`. |
|
||||||
| DDL Request Latency | The average latency and the 99th percentile of DDL request latency within the past two minutes. | p99: <br/> ``` histogram_quantile(0.99, sum by (le, function_name) (rate(milvus_rootcoord_ddl_req_latency_bucket{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m]))) ``` <br/> avg: <br/> ``` sum(increase(milvus_rootcoord_ddl_req_latency_sum{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) by (function_name) / sum(increase(milvus_rootcoord_ddl_req_latency_count{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) by (function_name) ``` | `milvus_rootcoord_ddl_req_latency` | The latency of all types of DDL requests. |
|
| DDL Request Latency | The average latency and the 99th percentile of DDL request latency within the past two minutes. | p99: <br/> ``` histogram_quantile(0.99, sum by (le, function_name) (rate(milvus_rootcoord_ddl_req_latency_bucket{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m]))) ``` <br/> avg: <br/> ``` sum(increase(milvus_rootcoord_ddl_req_latency_sum{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) by (function_name) / sum(increase(milvus_rootcoord_ddl_req_latency_count{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) by (function_name) ``` | `milvus_rootcoord_ddl_req_latency` | The latency of all types of DDL requests. |
|
||||||
| Sync Timetick Latency | The average latency and the 99th percentile of the time used by root coord to sync all timestamp to PChannel within the past two minutes. | p99: <br/> ``` histogram_quantile(0.99, sum by (le) (rate(milvus_rootcoord_sync_timetick_latency_bucket{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m]))) ``` <br/> avg: <br/> ``` sum(increase(milvus_rootcoord_sync_timetick_latency_sum{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) / sum(increase(milvus_rootcoord_sync_timetick_latency_count{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) ``` | `milvus_rootcoord_sync_timetick_latency` | the time used by root coord to sync all timestamp to pchannel. |
|
| Sync Timetick Latency | The average latency and the 99th percentile of the time used by root coord to sync all timestamp to PChannel within the past two minutes. | p99: <br/> ``` histogram_quantile(0.99, sum by (le) (rate(milvus_rootcoord_sync_timetick_latency_bucket{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m]))) ``` <br/> avg: <br/> ``` sum(increase(milvus_rootcoord_sync_timetick_latency_sum{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) / sum(increase(milvus_rootcoord_sync_timetick_latency_count{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) ``` | `milvus_rootcoord_sync_timetick_latency` | the time used by root coord to sync all timestamp to pchannel. |
|
||||||
@ -120,7 +120,7 @@ We recommend reading [Milvus monitoring framework overview](monitor_overview.md)
|
|||||||
| Collection Num | The number of collections recorded in metadata by data coord. | ``` sum(milvus_datacoord_collection_num{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (app_kubernetes_io_instance) ``` | `milvus_datacoord_collection_num` | The number of collections recorded in metadata by data coord. |
|
| Collection Num | The number of collections recorded in metadata by data coord. | ``` sum(milvus_datacoord_collection_num{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (app_kubernetes_io_instance) ``` | `milvus_datacoord_collection_num` | The number of collections recorded in metadata by data coord. |
|
||||||
| Stored Rows | The accumulated number of rows of valid and flushed data in data coord. | ``` sum(milvus_datacoord_stored_rows_num{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (app_kubernetes_io_instance) ``` | `milvus_datacoord_stored_rows_num` | The accumulated number of rows of valid and flushed data in data coord. |
|
| Stored Rows | The accumulated number of rows of valid and flushed data in data coord. | ``` sum(milvus_datacoord_stored_rows_num{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (app_kubernetes_io_instance) ``` | `milvus_datacoord_stored_rows_num` | The accumulated number of rows of valid and flushed data in data coord. |
|
||||||
| Stored Rows Rate | The average number of rows flushed per second within the past two minutes. | ``` sum(increase(milvus_datacoord_stored_rows_count{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])/120) by (pod, node_id) ``` | `milvus_datacoord_stored_rows_count` | The accumulated number of rows flushed by data coord. |
|
| Stored Rows Rate | The average number of rows flushed per second within the past two minutes. | ``` sum(increase(milvus_datacoord_stored_rows_count{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])/120) by (pod, node_id) ``` | `milvus_datacoord_stored_rows_count` | The accumulated number of rows flushed by data coord. |
|
||||||
| Sync Time | The average, maximum, and minimum number of epoch time synced by data coord in each physical channel. | ``` avg(milvus_datacoord_sync_epoch_time{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (app_kubernetes_io_instance) max(milvus_datacoord_sync_epoch_time{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (app_kubernetes_io_instance) min(milvus_datacoord_sync_epoch_time{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (app_kubernetes_io_instance) ``` | `milvus_datacoord_sync_epoch_time` | Each physical channel's epoch time (Unix time, the milliseconds passed ever since January 1, 1970). |
|
| Sync Time | The average, maximum, and minimum number of epoch time synced by data coord in each physical channel. | ``` avg(milvus_datacoord_consumer_datanode_tt_lag_ms{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (app_kubernetes_io_instance) max(milvus_datacoord_consumer_datanode_tt_lag_ms{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (app_kubernetes_io_instance) min(milvus_datacoord_consumer_datanode_tt_lag_ms{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (app_kubernetes_io_instance) ``` | `milvus_datacoord_consumer_datanode_tt_lag_ms` | Each physical channel's epoch time (Unix time, the milliseconds passed ever since January 1, 1970). |
|
||||||
|
|
||||||
</details>
|
</details>
|
||||||
|
|
||||||
@ -133,7 +133,7 @@ We recommend reading [Milvus monitoring framework overview](monitor_overview.md)
|
|||||||
| Flush Data Size Rate | The size of each flushed message recorded per second by each data node within the past two minutes. | ``` sum(increase(milvus_datanode_flushed_data_size{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])/120) by (msg_type, pod, node_id) ``` | `milvus_datanode_flushed_data_size` | The size of each flushed message. <br/> Currently, streaming messages counted by data node only include insertion and deletion messages. |
|
| Flush Data Size Rate | The size of each flushed message recorded per second by each data node within the past two minutes. | ``` sum(increase(milvus_datanode_flushed_data_size{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])/120) by (msg_type, pod, node_id) ``` | `milvus_datanode_flushed_data_size` | The size of each flushed message. <br/> Currently, streaming messages counted by data node only include insertion and deletion messages. |
|
||||||
| Consumer Num | The number of consumers created on each data node. | ``` sum(milvus_datanode_consumer_num{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (pod, node_id) ``` | `milvus_datanode_consumer_num` | The number of consumers created on each data node. <br/> Each flowgraph corresponds to a consumer. |
|
| Consumer Num | The number of consumers created on each data node. | ``` sum(milvus_datanode_consumer_num{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (pod, node_id) ``` | `milvus_datanode_consumer_num` | The number of consumers created on each data node. <br/> Each flowgraph corresponds to a consumer. |
|
||||||
| Producer Num | The number of producers created on each data node. | ``` sum(milvus_datanode_producer_num{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (pod, node_id) ``` | `milvus_datanode_producer_num` | The number of consumers created on each data node. <br/> Each shard in a collection corresponds to a delta channel producer and a timetick channel producer. |
|
| Producer Num | The number of producers created on each data node. | ``` sum(milvus_datanode_producer_num{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (pod, node_id) ``` | `milvus_datanode_producer_num` | The number of consumers created on each data node. <br/> Each shard in a collection corresponds to a delta channel producer and a timetick channel producer. |
|
||||||
| Sync Time | The average, maximum, and minimum number of epoch time synced by each data node in all physical topics. | ``` avg(milvus_datanode_sync_epoch_time{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (pod, node_id) max(milvus_datanode_sync_epoch_time{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (pod, node_id) min(milvus_datanode_sync_epoch_time{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (pod, node_id) ``` | `milvus_datanode_sync_epoch_time` | The epoch time (Unix time, the milliseconds passed ever since January 1, 1970.) of each physical topic on a data node. |
|
| Sync Time | The average, maximum, and minimum number of epoch time synced by each data node in all physical topics. | ``` avg(milvus_datanode_produce_tt_lag_ms{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (pod, node_id) max(milvus_datanode_produce_tt_lag_ms{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (pod, node_id) min(milvus_datanode_produce_tt_lag_ms{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (pod, node_id) ``` | `milvus_datanode_produce_tt_lag_ms` | The epoch time (Unix time, the milliseconds passed ever since January 1, 1970.) of each physical topic on a data node. |
|
||||||
| Unflushed Segment Num | The number of unflushed segments created on each data node. | ``` sum(milvus_datanode_unflushed_segment_num{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (pod, node_id) ``` | `milvus_datanode_unflushed_segment_num` | The number of unflushed segments created on each data node. |
|
| Unflushed Segment Num | The number of unflushed segments created on each data node. | ``` sum(milvus_datanode_unflushed_segment_num{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}) by (pod, node_id) ``` | `milvus_datanode_unflushed_segment_num` | The number of unflushed segments created on each data node. |
|
||||||
| Encode Buffer Latency | The average latency and the 99th percentile of the time used to encode a buffer by each data node within the past two minutes. | p99: <br/> ``` histogram_quantile(0.99, sum by (le, pod, node_id) (rate(milvus_datanode_encode_buffer_latency_bucket{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m]))) ``` <br/> avg: <br/> ``` sum(increase(milvus_datanode_encode_buffer_latency_sum{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) by(pod, node_id) / sum(increase(milvus_datanode_encode_buffer_latency_count{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) by(pod, node_id) ``` | `milvus_datanode_encode_buffer_latency` | The time each data node takes to encode a buffer. |
|
| Encode Buffer Latency | The average latency and the 99th percentile of the time used to encode a buffer by each data node within the past two minutes. | p99: <br/> ``` histogram_quantile(0.99, sum by (le, pod, node_id) (rate(milvus_datanode_encode_buffer_latency_bucket{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m]))) ``` <br/> avg: <br/> ``` sum(increase(milvus_datanode_encode_buffer_latency_sum{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) by(pod, node_id) / sum(increase(milvus_datanode_encode_buffer_latency_count{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) by(pod, node_id) ``` | `milvus_datanode_encode_buffer_latency` | The time each data node takes to encode a buffer. |
|
||||||
| Save Data Latency | The average latency and the 99th percentile of the time used to write a buffer into the storage layer by each data node within the past two minutes. | p99: <br/> ``` histogram_quantile(0.99, sum by (le, pod, node_id) (rate(milvus_datanode_save_latency_bucket{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m]))) ``` <br/> avg: <br/> ``` sum(increase(milvus_datanode_save_latency_sum{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) by(pod, node_id) / sum(increase(milvus_datanode_save_latency_count{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) by(pod, node_id) ``` | `milvus_datanode_save_latency` | The time each data node takes to write a buffer into the storage layer. |
|
| Save Data Latency | The average latency and the 99th percentile of the time used to write a buffer into the storage layer by each data node within the past two minutes. | p99: <br/> ``` histogram_quantile(0.99, sum by (le, pod, node_id) (rate(milvus_datanode_save_latency_bucket{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m]))) ``` <br/> avg: <br/> ``` sum(increase(milvus_datanode_save_latency_sum{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) by(pod, node_id) / sum(increase(milvus_datanode_save_latency_count{app_kubernetes_io_instance=~"$instance", app_kubernetes_io_name="$app_name", namespace="$namespace"}[2m])) by(pod, node_id) ``` | `milvus_datanode_save_latency` | The time each data node takes to write a buffer into the storage layer. |
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
@ -555,10 +555,11 @@ func (s *Server) handleTimetickMessage(ctx context.Context, ttMsg *msgstream.Dat
|
|||||||
log.RatedWarn(60.0, "time tick lag behind for more than 1 minutes", zap.String("channel", ch), zap.Time("timetick", physical))
|
log.RatedWarn(60.0, "time tick lag behind for more than 1 minutes", zap.String("channel", ch), zap.Time("timetick", physical))
|
||||||
}
|
}
|
||||||
|
|
||||||
utcT, _ := tsoutil.ParseHybridTs(ts)
|
sub := tsoutil.SubByNow(ts)
|
||||||
|
|
||||||
pChannelName := funcutil.ToPhysicalChannel(ch)
|
pChannelName := funcutil.ToPhysicalChannel(ch)
|
||||||
metrics.DataCoordSyncEpoch.WithLabelValues(pChannelName).Set(float64(utcT))
|
metrics.DataCoordConsumeDataNodeTimeTickLag.
|
||||||
|
WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), pChannelName).
|
||||||
|
Set(float64(sub))
|
||||||
|
|
||||||
s.updateSegmentStatistics(ttMsg.GetSegmentsStats())
|
s.updateSegmentStatistics(ttMsg.GetSegmentsStats())
|
||||||
|
|
||||||
|
|||||||
@ -20,9 +20,10 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
"strconv"
|
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
|
"github.com/milvus-io/milvus/internal/util/timerecord"
|
||||||
|
|
||||||
"github.com/golang/protobuf/proto"
|
"github.com/golang/protobuf/proto"
|
||||||
"github.com/opentracing/opentracing-go"
|
"github.com/opentracing/opentracing-go"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
@ -141,6 +142,9 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
|
|||||||
log.Info("Stop compaction of vChannel", zap.String("vChannelName", ddn.vChannelName))
|
log.Info("Stop compaction of vChannel", zap.String("vChannelName", ddn.vChannelName))
|
||||||
ddn.compactionExecutor.stopExecutingtaskByVChannelName(ddn.vChannelName)
|
ddn.compactionExecutor.stopExecutingtaskByVChannelName(ddn.vChannelName)
|
||||||
fgMsg.dropCollection = true
|
fgMsg.dropCollection = true
|
||||||
|
|
||||||
|
pChan := funcutil.ToPhysicalChannel(ddn.vChannelName)
|
||||||
|
metrics.CleanupDataNodeCollectionMetrics(paramtable.GetNodeID(), ddn.collectionID, pChan)
|
||||||
}
|
}
|
||||||
|
|
||||||
case commonpb.MsgType_DropPartition:
|
case commonpb.MsgType_DropPartition:
|
||||||
@ -172,7 +176,14 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
|
|||||||
}
|
}
|
||||||
|
|
||||||
rateCol.Add(metricsinfo.InsertConsumeThroughput, float64(proto.Size(&imsg.InsertRequest)))
|
rateCol.Add(metricsinfo.InsertConsumeThroughput, float64(proto.Size(&imsg.InsertRequest)))
|
||||||
metrics.DataNodeConsumeCounter.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.InsertLabel).Add(float64(proto.Size(&imsg.InsertRequest)))
|
|
||||||
|
metrics.DataNodeConsumeBytesCount.
|
||||||
|
WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.InsertLabel).
|
||||||
|
Add(float64(proto.Size(&imsg.InsertRequest)))
|
||||||
|
|
||||||
|
metrics.DataNodeConsumeMsgCount.
|
||||||
|
WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.InsertLabel, fmt.Sprint(ddn.collectionID)).
|
||||||
|
Inc()
|
||||||
|
|
||||||
log.Debug("DDNode receive insert messages",
|
log.Debug("DDNode receive insert messages",
|
||||||
zap.Int("numRows", len(imsg.GetRowIDs())),
|
zap.Int("numRows", len(imsg.GetRowIDs())),
|
||||||
@ -195,7 +206,14 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
rateCol.Add(metricsinfo.DeleteConsumeThroughput, float64(proto.Size(&dmsg.DeleteRequest)))
|
rateCol.Add(metricsinfo.DeleteConsumeThroughput, float64(proto.Size(&dmsg.DeleteRequest)))
|
||||||
metrics.DataNodeConsumeCounter.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.DeleteLabel).Add(float64(proto.Size(&dmsg.DeleteRequest)))
|
|
||||||
|
metrics.DataNodeConsumeBytesCount.
|
||||||
|
WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.DeleteLabel).
|
||||||
|
Add(float64(proto.Size(&dmsg.DeleteRequest)))
|
||||||
|
|
||||||
|
metrics.DataNodeConsumeMsgCount.
|
||||||
|
WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.DeleteLabel, fmt.Sprint(ddn.collectionID)).
|
||||||
|
Inc()
|
||||||
fgMsg.deleteMessages = append(fgMsg.deleteMessages, dmsg)
|
fgMsg.deleteMessages = append(fgMsg.deleteMessages, dmsg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -262,6 +280,8 @@ func (ddn *ddNode) isDropped(segID UniqueID) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (ddn *ddNode) forwardDeleteMsg(msgs []msgstream.TsMsg, minTs Timestamp, maxTs Timestamp) error {
|
func (ddn *ddNode) forwardDeleteMsg(msgs []msgstream.TsMsg, minTs Timestamp, maxTs Timestamp) error {
|
||||||
|
tr := timerecord.NewTimeRecorder("forwardDeleteMsg")
|
||||||
|
|
||||||
if len(msgs) != 0 {
|
if len(msgs) != 0 {
|
||||||
var msgPack = msgstream.MsgPack{
|
var msgPack = msgstream.MsgPack{
|
||||||
Msgs: msgs,
|
Msgs: msgs,
|
||||||
@ -275,6 +295,10 @@ func (ddn *ddNode) forwardDeleteMsg(msgs []msgstream.TsMsg, minTs Timestamp, max
|
|||||||
if err := ddn.sendDeltaTimeTick(maxTs); err != nil {
|
if err := ddn.sendDeltaTimeTick(maxTs); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
metrics.DataNodeForwardDeleteMsgTimeTaken.
|
||||||
|
WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).
|
||||||
|
Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -31,6 +31,7 @@ import (
|
|||||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||||
"github.com/milvus-io/milvus/internal/util/paramtable"
|
"github.com/milvus-io/milvus/internal/util/paramtable"
|
||||||
"github.com/milvus-io/milvus/internal/util/tsoutil"
|
"github.com/milvus-io/milvus/internal/util/tsoutil"
|
||||||
|
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
// DmInputNode receives messages from message streams, packs messages between two timeticks, and passes all
|
// DmInputNode receives messages from message streams, packs messages between two timeticks, and passes all
|
||||||
@ -79,6 +80,7 @@ func newDmInputNode(ctx context.Context, seekPos *internalpb.MsgPosition, dmNode
|
|||||||
log.Info("datanode AsConsumer", zap.String("physical channel", pchannelName), zap.String("subName", consumeSubName), zap.Int64("collection ID", dmNodeConfig.collectionID))
|
log.Info("datanode AsConsumer", zap.String("physical channel", pchannelName), zap.String("subName", consumeSubName), zap.Int64("collection ID", dmNodeConfig.collectionID))
|
||||||
|
|
||||||
name := fmt.Sprintf("dmInputNode-data-%d-%s", dmNodeConfig.collectionID, dmNodeConfig.vChannelName)
|
name := fmt.Sprintf("dmInputNode-data-%d-%s", dmNodeConfig.collectionID, dmNodeConfig.vChannelName)
|
||||||
node := flowgraph.NewInputNode(insertStream, name, dmNodeConfig.maxQueueLength, dmNodeConfig.maxParallelism)
|
node := flowgraph.NewInputNode(insertStream, name, dmNodeConfig.maxQueueLength, dmNodeConfig.maxParallelism,
|
||||||
|
typeutil.DataNodeRole, paramtable.GetNodeID(), dmNodeConfig.collectionID, metrics.AllLabel)
|
||||||
return node, nil
|
return node, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@ -648,9 +648,11 @@ func newInsertBufferNode(ctx context.Context, collID UniqueID, flushCh <-chan fl
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
msgPack.Msgs = append(msgPack.Msgs, &timeTickMsg)
|
msgPack.Msgs = append(msgPack.Msgs, &timeTickMsg)
|
||||||
pt, _ := tsoutil.ParseHybridTs(ts)
|
sub := tsoutil.SubByNow(ts)
|
||||||
pChan := funcutil.ToPhysicalChannel(config.vChannelName)
|
pChan := funcutil.ToPhysicalChannel(config.vChannelName)
|
||||||
metrics.DataNodeTimeSync.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), pChan).Set(float64(pt))
|
metrics.DataNodeProduceTimeTickLag.
|
||||||
|
WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(collID), pChan).
|
||||||
|
Set(float64(sub))
|
||||||
return wTtMsgStream.Produce(&msgPack)
|
return wTtMsgStream.Produce(&msgPack)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|||||||
@ -76,13 +76,16 @@ var (
|
|||||||
Help: "count of all stored rows ever",
|
Help: "count of all stored rows ever",
|
||||||
}, []string{})
|
}, []string{})
|
||||||
|
|
||||||
DataCoordSyncEpoch = prometheus.NewGaugeVec(
|
DataCoordConsumeDataNodeTimeTickLag = prometheus.NewGaugeVec(
|
||||||
prometheus.GaugeOpts{
|
prometheus.GaugeOpts{
|
||||||
Namespace: milvusNamespace,
|
Namespace: milvusNamespace,
|
||||||
Subsystem: typeutil.DataCoordRole,
|
Subsystem: typeutil.DataCoordRole,
|
||||||
Name: "sync_epoch_time",
|
Name: "consume_datanode_tt_lag_ms",
|
||||||
Help: "synchronized unix epoch per physical channel",
|
Help: "now time minus tt per physical channel",
|
||||||
}, []string{channelNameLabelName})
|
}, []string{
|
||||||
|
nodeIDLabelName,
|
||||||
|
channelNameLabelName,
|
||||||
|
})
|
||||||
|
|
||||||
DataCoordStoredBinlogSize = prometheus.NewGaugeVec(
|
DataCoordStoredBinlogSize = prometheus.NewGaugeVec(
|
||||||
prometheus.GaugeOpts{
|
prometheus.GaugeOpts{
|
||||||
@ -146,6 +149,6 @@ func RegisterDataCoord(registry *prometheus.Registry) {
|
|||||||
registry.MustRegister(DataCoordNumCollections)
|
registry.MustRegister(DataCoordNumCollections)
|
||||||
registry.MustRegister(DataCoordNumStoredRows)
|
registry.MustRegister(DataCoordNumStoredRows)
|
||||||
registry.MustRegister(DataCoordNumStoredRowsCounter)
|
registry.MustRegister(DataCoordNumStoredRowsCounter)
|
||||||
registry.MustRegister(DataCoordSyncEpoch)
|
registry.MustRegister(DataCoordConsumeDataNodeTimeTickLag)
|
||||||
registry.MustRegister(DataCoordStoredBinlogSize)
|
registry.MustRegister(DataCoordStoredBinlogSize)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -17,6 +17,8 @@
|
|||||||
package metrics
|
package metrics
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
)
|
)
|
||||||
@ -74,17 +76,42 @@ var (
|
|||||||
nodeIDLabelName,
|
nodeIDLabelName,
|
||||||
})
|
})
|
||||||
|
|
||||||
DataNodeTimeSync = prometheus.NewGaugeVec(
|
DataNodeConsumeTimeTickLag = prometheus.NewGaugeVec(
|
||||||
prometheus.GaugeOpts{
|
prometheus.GaugeOpts{
|
||||||
Namespace: milvusNamespace,
|
Namespace: milvusNamespace,
|
||||||
Subsystem: typeutil.DataNodeRole,
|
Subsystem: typeutil.DataNodeRole,
|
||||||
Name: "sync_epoch_time",
|
Name: "consume_tt_lag_ms",
|
||||||
Help: "synchronized unix epoch per physical channel",
|
Help: "now time minus tt per physical channel",
|
||||||
}, []string{
|
}, []string{
|
||||||
nodeIDLabelName,
|
nodeIDLabelName,
|
||||||
|
msgTypeLabelName,
|
||||||
|
collectionIDLabelName,
|
||||||
|
})
|
||||||
|
|
||||||
|
DataNodeProduceTimeTickLag = prometheus.NewGaugeVec(
|
||||||
|
prometheus.GaugeOpts{
|
||||||
|
Namespace: milvusNamespace,
|
||||||
|
Subsystem: typeutil.DataNodeRole,
|
||||||
|
Name: "produce_tt_lag_ms",
|
||||||
|
Help: "now time minus tt pts per physical channel",
|
||||||
|
}, []string{
|
||||||
|
nodeIDLabelName,
|
||||||
|
collectionIDLabelName,
|
||||||
channelNameLabelName,
|
channelNameLabelName,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
DataNodeConsumeMsgCount = prometheus.NewCounterVec(
|
||||||
|
prometheus.CounterOpts{
|
||||||
|
Namespace: milvusNamespace,
|
||||||
|
Subsystem: typeutil.DataNodeRole,
|
||||||
|
Name: "consume_msg_count",
|
||||||
|
Help: "count of consumed msg",
|
||||||
|
}, []string{
|
||||||
|
nodeIDLabelName,
|
||||||
|
msgTypeLabelName,
|
||||||
|
collectionIDLabelName,
|
||||||
|
})
|
||||||
|
|
||||||
DataNodeNumUnflushedSegments = prometheus.NewGaugeVec(
|
DataNodeNumUnflushedSegments = prometheus.NewGaugeVec(
|
||||||
prometheus.GaugeOpts{
|
prometheus.GaugeOpts{
|
||||||
Namespace: milvusNamespace,
|
Namespace: milvusNamespace,
|
||||||
@ -163,14 +190,23 @@ var (
|
|||||||
statusLabelName,
|
statusLabelName,
|
||||||
})
|
})
|
||||||
|
|
||||||
// DataNodeConsumeCounter counts the bytes DataNode consumed from message storage.
|
// DataNodeConsumeBytesCount counts the bytes DataNode consumed from message storage.
|
||||||
DataNodeConsumeCounter = prometheus.NewCounterVec(
|
DataNodeConsumeBytesCount = prometheus.NewCounterVec(
|
||||||
prometheus.CounterOpts{
|
prometheus.CounterOpts{
|
||||||
Namespace: milvusNamespace,
|
Namespace: milvusNamespace,
|
||||||
Subsystem: typeutil.DataNodeRole,
|
Subsystem: typeutil.DataNodeRole,
|
||||||
Name: "consume_counter",
|
Name: "consume_counter",
|
||||||
Help: "",
|
Help: "",
|
||||||
}, []string{nodeIDLabelName, msgTypeLabelName})
|
}, []string{nodeIDLabelName, msgTypeLabelName})
|
||||||
|
|
||||||
|
DataNodeForwardDeleteMsgTimeTaken = prometheus.NewHistogramVec(
|
||||||
|
prometheus.HistogramOpts{
|
||||||
|
Namespace: milvusNamespace,
|
||||||
|
Subsystem: typeutil.DataNodeRole,
|
||||||
|
Name: "forward_delete_msg_time_taken_ms",
|
||||||
|
Help: "forward delete message time taken",
|
||||||
|
Buckets: buckets, // unit: ms
|
||||||
|
}, []string{nodeIDLabelName})
|
||||||
)
|
)
|
||||||
|
|
||||||
//RegisterDataNode registers DataNode metrics
|
//RegisterDataNode registers DataNode metrics
|
||||||
@ -180,7 +216,7 @@ func RegisterDataNode(registry *prometheus.Registry) {
|
|||||||
registry.MustRegister(DataNodeFlushedSize)
|
registry.MustRegister(DataNodeFlushedSize)
|
||||||
registry.MustRegister(DataNodeNumConsumers)
|
registry.MustRegister(DataNodeNumConsumers)
|
||||||
registry.MustRegister(DataNodeNumProducers)
|
registry.MustRegister(DataNodeNumProducers)
|
||||||
registry.MustRegister(DataNodeTimeSync)
|
registry.MustRegister(DataNodeConsumeTimeTickLag)
|
||||||
registry.MustRegister(DataNodeNumUnflushedSegments)
|
registry.MustRegister(DataNodeNumUnflushedSegments)
|
||||||
registry.MustRegister(DataNodeEncodeBufferLatency)
|
registry.MustRegister(DataNodeEncodeBufferLatency)
|
||||||
registry.MustRegister(DataNodeSave2StorageLatency)
|
registry.MustRegister(DataNodeSave2StorageLatency)
|
||||||
@ -188,5 +224,36 @@ func RegisterDataNode(registry *prometheus.Registry) {
|
|||||||
registry.MustRegister(DataNodeAutoFlushBufferCount)
|
registry.MustRegister(DataNodeAutoFlushBufferCount)
|
||||||
registry.MustRegister(DataNodeCompactionLatency)
|
registry.MustRegister(DataNodeCompactionLatency)
|
||||||
registry.MustRegister(DataNodeFlushReqCounter)
|
registry.MustRegister(DataNodeFlushReqCounter)
|
||||||
registry.MustRegister(DataNodeConsumeCounter)
|
registry.MustRegister(DataNodeConsumeMsgCount)
|
||||||
|
registry.MustRegister(DataNodeProduceTimeTickLag)
|
||||||
|
registry.MustRegister(DataNodeConsumeBytesCount)
|
||||||
|
registry.MustRegister(DataNodeForwardDeleteMsgTimeTaken)
|
||||||
|
}
|
||||||
|
|
||||||
|
func CleanupDataNodeCollectionMetrics(nodeID int64, collectionID int64, channel string) {
|
||||||
|
DataNodeConsumeTimeTickLag.
|
||||||
|
Delete(
|
||||||
|
prometheus.Labels{
|
||||||
|
nodeIDLabelName: fmt.Sprint(nodeID),
|
||||||
|
msgTypeLabelName: AllLabel,
|
||||||
|
collectionIDLabelName: fmt.Sprint(collectionID),
|
||||||
|
})
|
||||||
|
|
||||||
|
DataNodeProduceTimeTickLag.
|
||||||
|
Delete(
|
||||||
|
prometheus.Labels{
|
||||||
|
nodeIDLabelName: fmt.Sprint(nodeID),
|
||||||
|
collectionIDLabelName: fmt.Sprint(collectionID),
|
||||||
|
channelNameLabelName: channel,
|
||||||
|
})
|
||||||
|
|
||||||
|
for _, label := range []string{AllLabel, DeleteLabel, InsertLabel} {
|
||||||
|
DataNodeConsumeMsgCount.
|
||||||
|
Delete(
|
||||||
|
prometheus.Labels{
|
||||||
|
nodeIDLabelName: fmt.Sprint(nodeID),
|
||||||
|
msgTypeLabelName: label,
|
||||||
|
collectionIDLabelName: fmt.Sprint(collectionID),
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -34,13 +34,14 @@ const (
|
|||||||
FailLabel = "fail"
|
FailLabel = "fail"
|
||||||
TotalLabel = "total"
|
TotalLabel = "total"
|
||||||
|
|
||||||
InsertLabel = "insert"
|
InsertLabel = "insert"
|
||||||
DeleteLabel = "delete"
|
DeleteLabel = "delete"
|
||||||
SearchLabel = "search"
|
SearchLabel = "search"
|
||||||
QueryLabel = "query"
|
QueryLabel = "query"
|
||||||
|
|
||||||
CacheHitLabel = "hit"
|
CacheHitLabel = "hit"
|
||||||
CacheMissLabel = "miss"
|
CacheMissLabel = "miss"
|
||||||
|
TimetickLabel = "timetick"
|
||||||
|
AllLabel = "all"
|
||||||
|
|
||||||
UnissuedIndexTaskLabel = "unissued"
|
UnissuedIndexTaskLabel = "unissued"
|
||||||
InProgressIndexTaskLabel = "in-progress"
|
InProgressIndexTaskLabel = "in-progress"
|
||||||
|
|||||||
@ -150,13 +150,13 @@ var (
|
|||||||
Buckets: buckets, // unit: ms
|
Buckets: buckets, // unit: ms
|
||||||
}, []string{nodeIDLabelName})
|
}, []string{nodeIDLabelName})
|
||||||
|
|
||||||
// ProxySyncTimeTick record Proxy synchronization timestamp statistics, differentiated by Channel.
|
// ProxySyncTimeTickLag record Proxy synchronization timestamp statistics, differentiated by Channel.
|
||||||
ProxySyncTimeTick = prometheus.NewGaugeVec(
|
ProxySyncTimeTickLag = prometheus.NewGaugeVec(
|
||||||
prometheus.GaugeOpts{
|
prometheus.GaugeOpts{
|
||||||
Namespace: milvusNamespace,
|
Namespace: milvusNamespace,
|
||||||
Subsystem: typeutil.ProxyRole,
|
Subsystem: typeutil.ProxyRole,
|
||||||
Name: "sync_epoch_time",
|
Name: "tt_lag_ms",
|
||||||
Help: "synchronized unix epoch per physical channel and default channel",
|
Help: "now time minus tt per physical channel",
|
||||||
}, []string{nodeIDLabelName, channelNameLabelName})
|
}, []string{nodeIDLabelName, channelNameLabelName})
|
||||||
|
|
||||||
// ProxyApplyPrimaryKeyLatency record the latency that apply primary key.
|
// ProxyApplyPrimaryKeyLatency record the latency that apply primary key.
|
||||||
@ -247,7 +247,7 @@ func RegisterProxy(registry *prometheus.Registry) {
|
|||||||
registry.MustRegister(ProxyCacheStatsCounter)
|
registry.MustRegister(ProxyCacheStatsCounter)
|
||||||
registry.MustRegister(ProxyUpdateCacheLatency)
|
registry.MustRegister(ProxyUpdateCacheLatency)
|
||||||
|
|
||||||
registry.MustRegister(ProxySyncTimeTick)
|
registry.MustRegister(ProxySyncTimeTickLag)
|
||||||
registry.MustRegister(ProxyApplyPrimaryKeyLatency)
|
registry.MustRegister(ProxyApplyPrimaryKeyLatency)
|
||||||
registry.MustRegister(ProxyApplyTimestampLatency)
|
registry.MustRegister(ProxyApplyTimestampLatency)
|
||||||
|
|
||||||
|
|||||||
@ -17,6 +17,8 @@
|
|||||||
package metrics
|
package metrics
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
)
|
)
|
||||||
@ -32,6 +34,30 @@ var (
|
|||||||
nodeIDLabelName,
|
nodeIDLabelName,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
QueryNodeConsumeTimeTickLag = prometheus.NewGaugeVec(
|
||||||
|
prometheus.GaugeOpts{
|
||||||
|
Namespace: milvusNamespace,
|
||||||
|
Subsystem: typeutil.QueryNodeRole,
|
||||||
|
Name: "consume_tt_lag_ms",
|
||||||
|
Help: "now time minus tt per physical channel",
|
||||||
|
}, []string{
|
||||||
|
nodeIDLabelName,
|
||||||
|
msgTypeLabelName,
|
||||||
|
collectionIDLabelName,
|
||||||
|
})
|
||||||
|
|
||||||
|
QueryNodeConsumerMsgCount = prometheus.NewCounterVec(
|
||||||
|
prometheus.CounterOpts{
|
||||||
|
Namespace: milvusNamespace,
|
||||||
|
Subsystem: typeutil.QueryNodeRole,
|
||||||
|
Name: "consume_msg_count",
|
||||||
|
Help: "count of consumed msg",
|
||||||
|
}, []string{
|
||||||
|
nodeIDLabelName,
|
||||||
|
msgTypeLabelName,
|
||||||
|
collectionIDLabelName,
|
||||||
|
})
|
||||||
|
|
||||||
QueryNodeNumPartitions = prometheus.NewGaugeVec(
|
QueryNodeNumPartitions = prometheus.NewGaugeVec(
|
||||||
prometheus.GaugeOpts{
|
prometheus.GaugeOpts{
|
||||||
Namespace: milvusNamespace,
|
Namespace: milvusNamespace,
|
||||||
@ -339,4 +365,27 @@ func RegisterQueryNode(registry *prometheus.Registry) {
|
|||||||
registry.MustRegister(QueryNodeNumEntities)
|
registry.MustRegister(QueryNodeNumEntities)
|
||||||
registry.MustRegister(QueryNodeConsumeCounter)
|
registry.MustRegister(QueryNodeConsumeCounter)
|
||||||
registry.MustRegister(QueryNodeExecuteCounter)
|
registry.MustRegister(QueryNodeExecuteCounter)
|
||||||
|
registry.MustRegister(QueryNodeConsumerMsgCount)
|
||||||
|
registry.MustRegister(QueryNodeConsumeTimeTickLag)
|
||||||
|
}
|
||||||
|
|
||||||
|
func CleanupQueryNodeCollectionMetrics(nodeID int64, collectionID int64) {
|
||||||
|
for _, label := range []string{DeleteLabel, InsertLabel} {
|
||||||
|
QueryNodeConsumerMsgCount.
|
||||||
|
Delete(
|
||||||
|
prometheus.Labels{
|
||||||
|
nodeIDLabelName: fmt.Sprint(nodeID),
|
||||||
|
msgTypeLabelName: label,
|
||||||
|
collectionIDLabelName: fmt.Sprint(collectionID),
|
||||||
|
})
|
||||||
|
|
||||||
|
QueryNodeConsumeTimeTickLag.
|
||||||
|
Delete(
|
||||||
|
prometheus.Labels{
|
||||||
|
nodeIDLabelName: fmt.Sprint(nodeID),
|
||||||
|
msgTypeLabelName: label,
|
||||||
|
collectionIDLabelName: fmt.Sprint(collectionID),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -23,8 +23,8 @@ var (
|
|||||||
prometheus.GaugeOpts{
|
prometheus.GaugeOpts{
|
||||||
Namespace: milvusNamespace,
|
Namespace: milvusNamespace,
|
||||||
Subsystem: typeutil.RootCoordRole,
|
Subsystem: typeutil.RootCoordRole,
|
||||||
Name: "sync_epoch_time",
|
Name: "produce_tt_lag_ms",
|
||||||
Help: "synchronized unix epoch per physical channel",
|
Help: "now time minus tt per physical channel",
|
||||||
}, []string{channelNameLabelName})
|
}, []string{channelNameLabelName})
|
||||||
|
|
||||||
RootCoordDDLReqCounter = prometheus.NewCounterVec(
|
RootCoordDDLReqCounter = prometheus.NewCounterVec(
|
||||||
|
|||||||
@ -291,8 +291,6 @@ func (node *Proxy) sendChannelsTimeTickLoop() {
|
|||||||
|
|
||||||
maxTs := ts
|
maxTs := ts
|
||||||
for channel, ts := range stats {
|
for channel, ts := range stats {
|
||||||
physicalTs, _ := tsoutil.ParseHybridTs(ts)
|
|
||||||
metrics.ProxySyncTimeTick.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), channel).Set(float64(physicalTs))
|
|
||||||
channels = append(channels, channel)
|
channels = append(channels, channel)
|
||||||
tss = append(tss, ts)
|
tss = append(tss, ts)
|
||||||
if ts > maxTs {
|
if ts > maxTs {
|
||||||
@ -311,8 +309,8 @@ func (node *Proxy) sendChannelsTimeTickLoop() {
|
|||||||
Timestamps: tss,
|
Timestamps: tss,
|
||||||
DefaultTimestamp: maxTs,
|
DefaultTimestamp: maxTs,
|
||||||
}
|
}
|
||||||
maxPhysicalTs, _ := tsoutil.ParseHybridTs(maxTs)
|
sub := tsoutil.SubByNow(maxTs)
|
||||||
metrics.ProxySyncTimeTick.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), "default").Set(float64(maxPhysicalTs))
|
metrics.ProxySyncTimeTickLag.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), "default").Set(float64(sub))
|
||||||
status, err := node.rootCoord.UpdateChannelTimeTick(node.ctx, req)
|
status, err := node.rootCoord.UpdateChannelTimeTick(node.ctx, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("sendChannelsTimeTickLoop.UpdateChannelTimeTick", zap.Error(err))
|
log.Warn("sendChannelsTimeTickLoop.UpdateChannelTimeTick", zap.Error(err))
|
||||||
|
|||||||
@ -22,6 +22,8 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
"github.com/milvus-io/milvus/internal/log"
|
"github.com/milvus-io/milvus/internal/log"
|
||||||
@ -71,7 +73,7 @@ func newQueryNodeFlowGraph(ctx context.Context,
|
|||||||
flowGraph: flowgraph.NewTimeTickedFlowGraph(ctx1),
|
flowGraph: flowgraph.NewTimeTickedFlowGraph(ctx1),
|
||||||
}
|
}
|
||||||
|
|
||||||
dmStreamNode, err := q.newDmInputNode(ctx1, factory, collectionID, vchannel)
|
dmStreamNode, err := q.newDmInputNode(ctx1, factory, collectionID, vchannel, metrics.InsertLabel)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -137,7 +139,7 @@ func newQueryNodeDeltaFlowGraph(ctx context.Context,
|
|||||||
flowGraph: flowgraph.NewTimeTickedFlowGraph(ctx1),
|
flowGraph: flowgraph.NewTimeTickedFlowGraph(ctx1),
|
||||||
}
|
}
|
||||||
|
|
||||||
dmStreamNode, err := q.newDmInputNode(ctx1, factory, collectionID, vchannel)
|
dmStreamNode, err := q.newDmInputNode(ctx1, factory, collectionID, vchannel, metrics.DeleteLabel)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -186,7 +188,8 @@ func newQueryNodeDeltaFlowGraph(ctx context.Context,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// newDmInputNode returns a new inputNode
|
// newDmInputNode returns a new inputNode
|
||||||
func (q *queryNodeFlowGraph) newDmInputNode(ctx context.Context, factory msgstream.Factory, collectionID UniqueID, vchannel Channel) (*flowgraph.InputNode, error) {
|
|
||||||
|
func (q *queryNodeFlowGraph) newDmInputNode(ctx context.Context, factory msgstream.Factory, collectionID UniqueID, vchannel Channel, dataType string) (*flowgraph.InputNode, error) {
|
||||||
insertStream, err := factory.NewTtMsgStream(ctx)
|
insertStream, err := factory.NewTtMsgStream(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -197,7 +200,8 @@ func (q *queryNodeFlowGraph) newDmInputNode(ctx context.Context, factory msgstre
|
|||||||
maxQueueLength := Params.QueryNodeCfg.FlowGraphMaxQueueLength
|
maxQueueLength := Params.QueryNodeCfg.FlowGraphMaxQueueLength
|
||||||
maxParallelism := Params.QueryNodeCfg.FlowGraphMaxParallelism
|
maxParallelism := Params.QueryNodeCfg.FlowGraphMaxParallelism
|
||||||
name := fmt.Sprintf("dmInputNode-query-%d-%s", collectionID, vchannel)
|
name := fmt.Sprintf("dmInputNode-query-%d-%s", collectionID, vchannel)
|
||||||
node := flowgraph.NewInputNode(insertStream, name, maxQueueLength, maxParallelism)
|
node := flowgraph.NewInputNode(insertStream, name, maxQueueLength, maxParallelism, typeutil.QueryNodeRole,
|
||||||
|
paramtable.GetNodeID(), collectionID, dataType)
|
||||||
return node, nil
|
return node, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -267,4 +271,6 @@ func (q *queryNodeFlowGraph) close() {
|
|||||||
zap.Int64("collectionID", q.collectionID),
|
zap.Int64("collectionID", q.collectionID),
|
||||||
zap.String("vchannel", q.vchannel),
|
zap.String("vchannel", q.vchannel),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
metrics.CleanupQueryNodeCollectionMetrics(paramtable.GetNodeID(), q.collectionID)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -318,9 +318,9 @@ func (t *timetickSync) sendTimeTickToChannel(chanNames []string, ts typeutil.Tim
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
physicalTs, _ := tsoutil.ParseHybridTs(ts)
|
sub := tsoutil.SubByNow(ts)
|
||||||
for _, chanName := range chanNames {
|
for _, chanName := range chanNames {
|
||||||
metrics.RootCoordInsertChannelTimeTick.WithLabelValues(chanName).Set(float64(physicalTs))
|
metrics.RootCoordInsertChannelTimeTick.WithLabelValues(chanName).Set(float64(sub))
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@ -17,9 +17,15 @@
|
|||||||
package flowgraph
|
package flowgraph
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"github.com/milvus-io/milvus/internal/util/tsoutil"
|
||||||
|
|
||||||
|
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||||
|
|
||||||
"github.com/milvus-io/milvus/internal/log"
|
"github.com/milvus-io/milvus/internal/log"
|
||||||
|
"github.com/milvus-io/milvus/internal/metrics"
|
||||||
"github.com/milvus-io/milvus/internal/mq/msgstream"
|
"github.com/milvus-io/milvus/internal/mq/msgstream"
|
||||||
"github.com/milvus-io/milvus/internal/util/trace"
|
"github.com/milvus-io/milvus/internal/util/trace"
|
||||||
"github.com/opentracing/opentracing-go"
|
"github.com/opentracing/opentracing-go"
|
||||||
@ -30,9 +36,13 @@ import (
|
|||||||
// InputNode is the entry point of flowgragh
|
// InputNode is the entry point of flowgragh
|
||||||
type InputNode struct {
|
type InputNode struct {
|
||||||
BaseNode
|
BaseNode
|
||||||
inStream msgstream.MsgStream
|
inStream msgstream.MsgStream
|
||||||
name string
|
name string
|
||||||
closeOnce sync.Once
|
role string
|
||||||
|
nodeID int64
|
||||||
|
collectionID int64
|
||||||
|
dataType string
|
||||||
|
closeOnce sync.Once
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsInputNode returns whether Node is InputNode
|
// IsInputNode returns whether Node is InputNode
|
||||||
@ -76,6 +86,28 @@ func (inNode *InputNode) Operate(in []Msg) []Msg {
|
|||||||
if msgPack == nil {
|
if msgPack == nil {
|
||||||
return []Msg{}
|
return []Msg{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sub := tsoutil.SubByNow(msgPack.EndTs)
|
||||||
|
if inNode.role == typeutil.QueryNodeRole {
|
||||||
|
metrics.QueryNodeConsumerMsgCount.
|
||||||
|
WithLabelValues(fmt.Sprint(inNode.nodeID), inNode.dataType, fmt.Sprint(inNode.collectionID)).
|
||||||
|
Inc()
|
||||||
|
|
||||||
|
metrics.QueryNodeConsumeTimeTickLag.
|
||||||
|
WithLabelValues(fmt.Sprint(inNode.nodeID), inNode.dataType, fmt.Sprint(inNode.collectionID)).
|
||||||
|
Set(float64(sub))
|
||||||
|
}
|
||||||
|
|
||||||
|
if inNode.role == typeutil.DataNodeRole {
|
||||||
|
metrics.DataNodeConsumeMsgCount.
|
||||||
|
WithLabelValues(fmt.Sprint(inNode.nodeID), inNode.dataType, fmt.Sprint(inNode.collectionID)).
|
||||||
|
Inc()
|
||||||
|
|
||||||
|
metrics.DataNodeConsumeTimeTickLag.
|
||||||
|
WithLabelValues(fmt.Sprint(inNode.nodeID), inNode.dataType, fmt.Sprint(inNode.collectionID)).
|
||||||
|
Set(float64(sub))
|
||||||
|
}
|
||||||
|
|
||||||
var spans []opentracing.Span
|
var spans []opentracing.Span
|
||||||
for _, msg := range msgPack.Msgs {
|
for _, msg := range msgPack.Msgs {
|
||||||
sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())
|
sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())
|
||||||
@ -101,14 +133,18 @@ func (inNode *InputNode) Operate(in []Msg) []Msg {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewInputNode composes an InputNode with provided MsgStream, name and parameters
|
// NewInputNode composes an InputNode with provided MsgStream, name and parameters
|
||||||
func NewInputNode(inStream msgstream.MsgStream, nodeName string, maxQueueLength int32, maxParallelism int32) *InputNode {
|
func NewInputNode(inStream msgstream.MsgStream, nodeName string, maxQueueLength int32, maxParallelism int32, role string, nodeID int64, collectionID int64, dataType string) *InputNode {
|
||||||
baseNode := BaseNode{}
|
baseNode := BaseNode{}
|
||||||
baseNode.SetMaxQueueLength(maxQueueLength)
|
baseNode.SetMaxQueueLength(maxQueueLength)
|
||||||
baseNode.SetMaxParallelism(maxParallelism)
|
baseNode.SetMaxParallelism(maxParallelism)
|
||||||
|
|
||||||
return &InputNode{
|
return &InputNode{
|
||||||
BaseNode: baseNode,
|
BaseNode: baseNode,
|
||||||
inStream: inStream,
|
inStream: inStream,
|
||||||
name: nodeName,
|
name: nodeName,
|
||||||
|
role: role,
|
||||||
|
nodeID: nodeID,
|
||||||
|
collectionID: collectionID,
|
||||||
|
dataType: dataType,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -41,7 +41,7 @@ func TestInputNode(t *testing.T) {
|
|||||||
produceStream.Produce(&msgPack)
|
produceStream.Produce(&msgPack)
|
||||||
|
|
||||||
nodeName := "input_node"
|
nodeName := "input_node"
|
||||||
inputNode := NewInputNode(msgStream, nodeName, 100, 100)
|
inputNode := NewInputNode(msgStream, nodeName, 100, 100, "", 0, 0, "")
|
||||||
defer inputNode.Close()
|
defer inputNode.Close()
|
||||||
|
|
||||||
isInputNode := inputNode.IsInputNode()
|
isInputNode := inputNode.IsInputNode()
|
||||||
@ -64,7 +64,7 @@ func Test_NewInputNode(t *testing.T) {
|
|||||||
nodeName := "input_node"
|
nodeName := "input_node"
|
||||||
var maxQueueLength int32
|
var maxQueueLength int32
|
||||||
var maxParallelism int32 = 100
|
var maxParallelism int32 = 100
|
||||||
node := NewInputNode(nil, nodeName, maxQueueLength, maxParallelism)
|
node := NewInputNode(nil, nodeName, maxQueueLength, maxParallelism, "", 0, 0, "")
|
||||||
assert.NotNil(t, node)
|
assert.NotNil(t, node)
|
||||||
assert.Equal(t, node.name, nodeName)
|
assert.Equal(t, node.name, nodeName)
|
||||||
assert.Equal(t, node.maxQueueLength, maxQueueLength)
|
assert.Equal(t, node.maxQueueLength, maxQueueLength)
|
||||||
|
|||||||
@ -74,7 +74,7 @@ func TestNodeCtx_Start(t *testing.T) {
|
|||||||
produceStream.Produce(&msgPack)
|
produceStream.Produce(&msgPack)
|
||||||
|
|
||||||
nodeName := "input_node"
|
nodeName := "input_node"
|
||||||
inputNode := NewInputNode(msgStream, nodeName, 100, 100)
|
inputNode := NewInputNode(msgStream, nodeName, 100, 100, "", 0, 0, "")
|
||||||
|
|
||||||
node := &nodeCtx{
|
node := &nodeCtx{
|
||||||
node: inputNode,
|
node: inputNode,
|
||||||
|
|||||||
@ -97,3 +97,10 @@ func AddPhysicalDurationOnTs(ts uint64, duration time.Duration) uint64 {
|
|||||||
func NewTSOKVBase(client *clientv3.Client, tsoRoot, subPath string) *etcdkv.EtcdKV {
|
func NewTSOKVBase(client *clientv3.Client, tsoRoot, subPath string) *etcdkv.EtcdKV {
|
||||||
return etcdkv.NewEtcdKV(client, path.Join(tsoRoot, subPath))
|
return etcdkv.NewEtcdKV(client, path.Join(tsoRoot, subPath))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SubByNow ts is a hybrid
|
||||||
|
func SubByNow(ts uint64) int64 {
|
||||||
|
utcT, _ := ParseHybridTs(ts)
|
||||||
|
now := time.Now().UnixMilli()
|
||||||
|
return now - utcT
|
||||||
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user