add metrics for delegator insert/delete cost (#25733)

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
wei liu 2023-07-21 15:30:59 +08:00 committed by GitHub
parent b2125bc490
commit 32827f538a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 44 additions and 0 deletions

View File

@ -36,6 +36,7 @@ import (
"github.com/milvus-io/milvus/internal/querynodev2/segments" "github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper" "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
"github.com/milvus-io/milvus/pkg/util/commonpbutil" "github.com/milvus-io/milvus/pkg/util/commonpbutil"
@ -43,6 +44,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/retry" "github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil" "github.com/milvus-io/milvus/pkg/util/typeutil"
) )
@ -97,6 +99,8 @@ func (sd *shardDelegator) newGrowing(segmentID int64, insertData *InsertData) se
// ProcessInsert handles insert data in delegator. // ProcessInsert handles insert data in delegator.
func (sd *shardDelegator) ProcessInsert(insertRecords map[int64]*InsertData) { func (sd *shardDelegator) ProcessInsert(insertRecords map[int64]*InsertData) {
method := "ProcessInsert"
tr := timerecord.NewTimeRecorder(method)
log := sd.getLogger(context.Background()) log := sd.getLogger(context.Background())
for segmentID, insertData := range insertRecords { for segmentID, insertData := range insertRecords {
growing := sd.segmentManager.GetGrowing(segmentID) growing := sd.segmentManager.GetGrowing(segmentID)
@ -126,12 +130,16 @@ func (sd *shardDelegator) ProcessInsert(insertRecords map[int64]*InsertData) {
zap.Uint64("maxTimestamp", insertData.Timestamps[len(insertData.Timestamps)-1]), zap.Uint64("maxTimestamp", insertData.Timestamps[len(insertData.Timestamps)-1]),
) )
} }
metrics.QueryNodeProcessCost.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.InsertLabel).
Observe(float64(tr.ElapseSpan()))
} }
// ProcessDelete handles delete data in delegator. // ProcessDelete handles delete data in delegator.
// delegator puts deleteData into buffer first, // delegator puts deleteData into buffer first,
// then dispatch data to segments acoording to the result of pkOracle. // then dispatch data to segments acoording to the result of pkOracle.
func (sd *shardDelegator) ProcessDelete(deleteData []*DeleteData, ts uint64) { func (sd *shardDelegator) ProcessDelete(deleteData []*DeleteData, ts uint64) {
method := "ProcessDelete"
tr := timerecord.NewTimeRecorder(method)
// block load segment handle delete buffer // block load segment handle delete buffer
sd.deleteMut.Lock() sd.deleteMut.Lock()
defer sd.deleteMut.Unlock() defer sd.deleteMut.Unlock()
@ -223,6 +231,9 @@ func (sd *shardDelegator) ProcessDelete(deleteData []*DeleteData, ts uint64) {
log.Warn("failed to apply delete, mark segment offline", zap.Int64s("offlineSegments", offlineSegIDs)) log.Warn("failed to apply delete, mark segment offline", zap.Int64s("offlineSegments", offlineSegIDs))
sd.markSegmentOffline(offlineSegIDs...) sd.markSegmentOffline(offlineSegIDs...)
} }
metrics.QueryNodeProcessCost.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.DeleteLabel).
Observe(float64(tr.ElapseSpan()))
} }
// applyDelete handles delete record and apply them to corresponding workers. // applyDelete handles delete record and apply them to corresponding workers.

View File

@ -26,6 +26,8 @@ import (
"github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/storage"
base "github.com/milvus-io/milvus/internal/util/pipeline" base "github.com/milvus-io/milvus/internal/util/pipeline"
"github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/paramtable"
) )
type deleteNode struct { type deleteNode struct {
@ -61,6 +63,7 @@ func (dNode *deleteNode) addDeleteData(deleteDatas map[UniqueID]*delegator.Delet
} }
func (dNode *deleteNode) Operate(in Msg) Msg { func (dNode *deleteNode) Operate(in Msg) Msg {
metrics.QueryNodeWaitProcessingMsgCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.DeleteLabel).Dec()
nodeMsg := in.(*deleteNodeMsg) nodeMsg := in.(*deleteNodeMsg)
// partition id = > DeleteData // partition id = > DeleteData

View File

@ -100,6 +100,8 @@ func (fNode *filterNode) Operate(in Msg) Msg {
out.append(msg) out.append(msg)
} }
} }
metrics.QueryNodeWaitProcessingMsgCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.InsertLabel).Inc()
return out return out
} }

View File

@ -28,6 +28,8 @@ import (
"github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/storage"
base "github.com/milvus-io/milvus/internal/util/pipeline" base "github.com/milvus-io/milvus/internal/util/pipeline"
"github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil" "github.com/milvus-io/milvus/pkg/util/typeutil"
) )
@ -85,6 +87,7 @@ func (iNode *insertNode) addInsertData(insertDatas map[UniqueID]*delegator.Inser
// Insert task // Insert task
func (iNode *insertNode) Operate(in Msg) Msg { func (iNode *insertNode) Operate(in Msg) Msg {
metrics.QueryNodeWaitProcessingMsgCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.InsertLabel).Dec()
nodeMsg := in.(*insertNodeMsg) nodeMsg := in.(*insertNodeMsg)
sort.Slice(nodeMsg.insertMsgs, func(i, j int) bool { sort.Slice(nodeMsg.insertMsgs, func(i, j int) bool {
@ -105,6 +108,8 @@ func (iNode *insertNode) Operate(in Msg) Msg {
iNode.delegator.ProcessInsert(insertDatas) iNode.delegator.ProcessInsert(insertDatas)
metrics.QueryNodeWaitProcessingMsgCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.DeleteLabel).Inc()
return &deleteNodeMsg{ return &deleteNodeMsg{
deleteMsgs: nodeMsg.deleteMsgs, deleteMsgs: nodeMsg.deleteMsgs,
timeRange: nodeMsg.timeRange, timeRange: nodeMsg.timeRange,

View File

@ -47,6 +47,29 @@ var (
collectionIDLabelName, collectionIDLabelName,
}) })
QueryNodeProcessCost = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "process_insert_or_delete_ms",
Help: "process insert or delete cost in ms",
Buckets: buckets,
}, []string{
nodeIDLabelName,
msgTypeLabelName,
})
QueryNodeWaitProcessingMsgCount = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "wait_processing_msg_count",
Help: "count of wait processing msg",
}, []string{
nodeIDLabelName,
msgTypeLabelName,
})
QueryNodeConsumerMsgCount = prometheus.NewCounterVec( QueryNodeConsumerMsgCount = prometheus.NewCounterVec(
prometheus.CounterOpts{ prometheus.CounterOpts{
Namespace: milvusNamespace, Namespace: milvusNamespace,