enhance: [2.4] Remove useless ops when there is no write (#34767) (#34839)

Cherry pick from master
pr: #34767
Related to: #33235

THe querynode pipeline will make map & call ProcessInsert when there is
no write messages. So querynodes will have high CPU usage even when
there is no workload.

This PR check msg length before composing data struct and calling method

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2024-07-22 10:23:42 +08:00 committed by GitHub
parent 21973a600d
commit c06a0ebef2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 23 additions and 22 deletions

View File

@ -66,14 +66,13 @@ func (dNode *deleteNode) Operate(in Msg) Msg {
metrics.QueryNodeWaitProcessingMsgCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.DeleteLabel).Dec()
nodeMsg := in.(*deleteNodeMsg)
// partition id = > DeleteData
deleteDatas := make(map[UniqueID]*delegator.DeleteData)
if len(nodeMsg.deleteMsgs) > 0 {
// partition id = > DeleteData
deleteDatas := make(map[UniqueID]*delegator.DeleteData)
for _, msg := range nodeMsg.deleteMsgs {
dNode.addDeleteData(deleteDatas, msg)
}
if len(deleteDatas) > 0 {
for _, msg := range nodeMsg.deleteMsgs {
dNode.addDeleteData(deleteDatas, msg)
}
// do Delete, use ts range max as ts
dNode.delegator.ProcessDelete(lo.Values(deleteDatas), nodeMsg.timeRange.timestampMax)
}

View File

@ -90,24 +90,26 @@ func (iNode *insertNode) Operate(in Msg) Msg {
metrics.QueryNodeWaitProcessingMsgCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.InsertLabel).Dec()
nodeMsg := in.(*insertNodeMsg)
sort.Slice(nodeMsg.insertMsgs, func(i, j int) bool {
return nodeMsg.insertMsgs[i].BeginTs() < nodeMsg.insertMsgs[j].BeginTs()
})
if len(nodeMsg.insertMsgs) > 0 {
sort.Slice(nodeMsg.insertMsgs, func(i, j int) bool {
return nodeMsg.insertMsgs[i].BeginTs() < nodeMsg.insertMsgs[j].BeginTs()
})
insertDatas := make(map[UniqueID]*delegator.InsertData)
collection := iNode.manager.Collection.Get(iNode.collectionID)
if collection == nil {
log.Error("insertNode with collection not exist", zap.Int64("collection", iNode.collectionID))
panic("insertNode with collection not exist")
insertDatas := make(map[UniqueID]*delegator.InsertData)
collection := iNode.manager.Collection.Get(iNode.collectionID)
if collection == nil {
log.Error("insertNode with collection not exist", zap.Int64("collection", iNode.collectionID))
panic("insertNode with collection not exist")
}
// get InsertData and merge datas of same segment
for _, msg := range nodeMsg.insertMsgs {
iNode.addInsertData(insertDatas, msg, collection)
}
iNode.delegator.ProcessInsert(insertDatas)
}
// get InsertData and merge datas of same segment
for _, msg := range nodeMsg.insertMsgs {
iNode.addInsertData(insertDatas, msg, collection)
}
iNode.delegator.ProcessInsert(insertDatas)
metrics.QueryNodeWaitProcessingMsgCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.DeleteLabel).Inc()
return &deleteNodeMsg{