diff --git a/internal/proxy/task_scheduler.go b/internal/proxy/task_scheduler.go index 8cfe2cf514..96455651af 100644 --- a/internal/proxy/task_scheduler.go +++ b/internal/proxy/task_scheduler.go @@ -228,6 +228,18 @@ type ddTaskQueue struct { lock sync.Mutex } +func (queue *ddTaskQueue) updateMetrics() { + queue.utLock.RLock() + unissuedTasksNum := queue.unissuedTasks.Len() + queue.utLock.RUnlock() + queue.atLock.RLock() + activateTaskNum := len(queue.activeTasks) + queue.atLock.RUnlock() + + metrics.ProxyQueueTaskNum.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), "ddl", metrics.UnissuedIndexTaskLabel).Set(float64(unissuedTasksNum)) + metrics.ProxyQueueTaskNum.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), "ddl", metrics.InProgressIndexTaskLabel).Set(float64(activateTaskNum)) +} + type pChanStatInfo struct { pChanStatistics tsSet map[Timestamp]struct{} @@ -241,6 +253,18 @@ type dmTaskQueue struct { pChanStatisticsInfos map[pChan]*pChanStatInfo } +func (queue *dmTaskQueue) updateMetrics() { + queue.utLock.RLock() + unissuedTasksNum := queue.unissuedTasks.Len() + queue.utLock.RUnlock() + queue.atLock.RLock() + activateTaskNum := len(queue.activeTasks) + queue.atLock.RUnlock() + + metrics.ProxyQueueTaskNum.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), "dml", metrics.UnissuedIndexTaskLabel).Set(float64(unissuedTasksNum)) + metrics.ProxyQueueTaskNum.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), "dml", metrics.InProgressIndexTaskLabel).Set(float64(activateTaskNum)) +} + func (queue *dmTaskQueue) Enqueue(t task) error { // This statsLock has two functions: // 1) Protect member pChanStatisticsInfos @@ -361,6 +385,18 @@ type dqTaskQueue struct { *baseTaskQueue } +func (queue *dqTaskQueue) updateMetrics() { + queue.utLock.RLock() + unissuedTasksNum := queue.unissuedTasks.Len() + queue.utLock.RUnlock() + queue.atLock.RLock() + activateTaskNum := len(queue.activeTasks) + queue.atLock.RUnlock() + + metrics.ProxyQueueTaskNum.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), "dql", metrics.UnissuedIndexTaskLabel).Set(float64(unissuedTasksNum)) + metrics.ProxyQueueTaskNum.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), "dql", metrics.InProgressIndexTaskLabel).Set(float64(activateTaskNum)) +} + func (queue *ddTaskQueue) Enqueue(t task) error { queue.lock.Lock() defer queue.lock.Unlock() @@ -507,6 +543,7 @@ func (sched *taskScheduler) definitionLoop() { return struct{}{}, nil }) } + sched.ddQueue.updateMetrics() } } } @@ -528,6 +565,7 @@ func (sched *taskScheduler) controlLoop() { return struct{}{}, nil }) } + sched.dcQueue.updateMetrics() } } } @@ -547,6 +585,7 @@ func (sched *taskScheduler) manipulationLoop() { return struct{}{}, nil }) } + sched.dmQueue.updateMetrics() } } } @@ -577,6 +616,7 @@ func (sched *taskScheduler) queryLoop() { } else { log.Ctx(context.TODO()).Debug("query queue is empty ...") } + sched.dqQueue.updateMetrics() } } } diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index b6025b8bd9..ae93947236 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -124,6 +124,7 @@ const ( pathLabelName = "path" cgoNameLabelName = `cgo_name` cgoTypeLabelName = `cgo_type` + queueTypeLabelName = `queue_type` // entities label LoadedLabel = "loaded" diff --git a/pkg/metrics/proxy_metrics.go b/pkg/metrics/proxy_metrics.go index f44e566677..2af25c89ac 100644 --- a/pkg/metrics/proxy_metrics.go +++ b/pkg/metrics/proxy_metrics.go @@ -427,6 +427,15 @@ var ( Help: "the number of non-zeros in each sparse search task", Buckets: buckets, }, []string{nodeIDLabelName, collectionName}) + + // ProxyQueueTaskNum records task number of queue in Proxy. + ProxyQueueTaskNum = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.ProxyRole, + Name: "queue_task_num", + Help: "", + }, []string{nodeIDLabelName, queueTypeLabelName, taskStateLabel}) ) // RegisterProxy registers Proxy metrics @@ -490,6 +499,7 @@ func RegisterProxy(registry *prometheus.Registry) { registry.MustRegister(ProxyRecallSearchCount) registry.MustRegister(ProxySearchSparseNumNonZeros) + registry.MustRegister(ProxyQueueTaskNum) RegisterStreamingServiceClient(registry) }