mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
Signed-off-by: MrPresent-Han <chun.han@zilliz.com>
This commit is contained in:
parent
708c724ccc
commit
b09e7aeaf7
@ -243,11 +243,46 @@ func (scheduler *taskScheduler) Add(task Task) error {
|
||||
scheduler.channelTasks[index] = task
|
||||
}
|
||||
|
||||
metrics.QueryCoordTaskNum.WithLabelValues().Set(float64(scheduler.tasks.Len()))
|
||||
scheduler.updateTaskMetrics()
|
||||
log.Info("task added", zap.String("task", task.String()))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (scheduler *taskScheduler) updateTaskMetrics() {
|
||||
segmentGrowNum, segmentReduceNum, segmentMoveNum := 0, 0, 0
|
||||
channelGrowNum, channelReduceNum, channelMoveNum := 0, 0, 0
|
||||
for _, task := range scheduler.segmentTasks {
|
||||
taskType := GetTaskType(task)
|
||||
switch taskType {
|
||||
case TaskTypeGrow:
|
||||
segmentGrowNum++
|
||||
case TaskTypeReduce:
|
||||
segmentReduceNum++
|
||||
case TaskTypeMove:
|
||||
segmentMoveNum++
|
||||
}
|
||||
}
|
||||
|
||||
for _, task := range scheduler.channelTasks {
|
||||
taskType := GetTaskType(task)
|
||||
switch taskType {
|
||||
case TaskTypeGrow:
|
||||
channelGrowNum++
|
||||
case TaskTypeReduce:
|
||||
channelReduceNum++
|
||||
case TaskTypeMove:
|
||||
channelMoveNum++
|
||||
}
|
||||
}
|
||||
|
||||
metrics.QueryCoordTaskNum.WithLabelValues(metrics.SegmentGrowTaskLabel).Set(float64(segmentGrowNum))
|
||||
metrics.QueryCoordTaskNum.WithLabelValues(metrics.SegmentReduceTaskLabel).Set(float64(segmentReduceNum))
|
||||
metrics.QueryCoordTaskNum.WithLabelValues(metrics.SegmentMoveTaskLabel).Set(float64(segmentMoveNum))
|
||||
metrics.QueryCoordTaskNum.WithLabelValues(metrics.ChannelGrowTaskLabel).Set(float64(channelGrowNum))
|
||||
metrics.QueryCoordTaskNum.WithLabelValues(metrics.ChannelReduceTaskLabel).Set(float64(channelReduceNum))
|
||||
metrics.QueryCoordTaskNum.WithLabelValues(metrics.ChannelMoveTaskLabel).Set(float64(channelMoveNum))
|
||||
}
|
||||
|
||||
// check whether the task is valid to add,
|
||||
// must hold lock
|
||||
func (scheduler *taskScheduler) preAdd(task Task) error {
|
||||
@ -656,7 +691,7 @@ func (scheduler *taskScheduler) remove(task Task) {
|
||||
log = log.With(zap.String("channel", task.Channel()))
|
||||
}
|
||||
|
||||
metrics.QueryCoordTaskNum.WithLabelValues().Set(float64(scheduler.tasks.Len()))
|
||||
scheduler.updateTaskMetrics()
|
||||
log.Debug("task removed", zap.Stack("stack"))
|
||||
}
|
||||
|
||||
|
||||
@ -22,6 +22,18 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
const (
|
||||
SegmentGrowTaskLabel = "segment_grow"
|
||||
SegmentReduceTaskLabel = "segment_reduce"
|
||||
SegmentMoveTaskLabel = "segment_move"
|
||||
|
||||
ChannelGrowTaskLabel = "channel_grow"
|
||||
ChannelReduceTaskLabel = "channel_reduce"
|
||||
ChannelMoveTaskLabel = "channel_move"
|
||||
|
||||
QueryCoordTaskType = "querycoord_task_type"
|
||||
)
|
||||
|
||||
var (
|
||||
QueryCoordNumCollections = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
@ -83,7 +95,7 @@ var (
|
||||
Subsystem: typeutil.QueryCoordRole,
|
||||
Name: "task_num",
|
||||
Help: "the number of tasks in QueryCoord's scheduler",
|
||||
}, []string{})
|
||||
}, []string{QueryCoordTaskType})
|
||||
|
||||
QueryCoordNumQueryNodes = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user