mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-08 01:58:34 +08:00
Get indexnode metrics when building index tasks (#13107)
Signed-off-by: Cai.Zhang <cai.zhang@zilliz.com>
This commit is contained in:
parent
983ee423f3
commit
2c6c74669f
@ -50,23 +50,6 @@ func NewNodeManager() *NodeManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (nm *NodeManager) setClient(nodeID UniqueID, client types.IndexNode) error {
|
func (nm *NodeManager) setClient(nodeID UniqueID, client types.IndexNode) error {
|
||||||
req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics)
|
|
||||||
if err != nil {
|
|
||||||
log.Error("create metrics request failed", zap.Error(err))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
metrics, err := client.GetMetrics(context.Background(), req)
|
|
||||||
if err != nil {
|
|
||||||
log.Error("get indexnode metrics failed", zap.Error(err))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
infos := &metricsinfo.IndexNodeInfos{}
|
|
||||||
err = metricsinfo.UnmarshalComponentInfos(metrics.Response, infos)
|
|
||||||
if err != nil {
|
|
||||||
log.Error("get indexnode metrics info failed", zap.Error(err))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
nm.lock.Lock()
|
nm.lock.Lock()
|
||||||
defer nm.lock.Unlock()
|
defer nm.lock.Unlock()
|
||||||
|
|
||||||
@ -76,7 +59,7 @@ func (nm *NodeManager) setClient(nodeID UniqueID, client types.IndexNode) error
|
|||||||
key: nodeID,
|
key: nodeID,
|
||||||
priority: 0,
|
priority: 0,
|
||||||
weight: 0,
|
weight: 0,
|
||||||
totalMem: infos.HardwareInfos.Memory,
|
totalMem: 0,
|
||||||
}
|
}
|
||||||
nm.nodeClients[nodeID] = client
|
nm.nodeClients[nodeID] = client
|
||||||
nm.pq.Push(item)
|
nm.pq.Push(item)
|
||||||
@ -147,6 +130,27 @@ func (nm *NodeManager) ListNode() []UniqueID {
|
|||||||
clients := []UniqueID{}
|
clients := []UniqueID{}
|
||||||
for id := range nm.nodeClients {
|
for id := range nm.nodeClients {
|
||||||
clients = append(clients, id)
|
clients = append(clients, id)
|
||||||
|
if item := (nm.pq.getItemByKey(id)).(*PQItem); item.totalMem == 0 {
|
||||||
|
req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("create metrics request failed", zap.Error(err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
metrics, err := nm.nodeClients[id].GetMetrics(context.Background(), req)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("get indexnode metrics failed", zap.Error(err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
infos := &metricsinfo.IndexNodeInfos{}
|
||||||
|
err = metricsinfo.UnmarshalComponentInfos(metrics.Response, infos)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("get indexnode metrics info failed", zap.Error(err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
nm.pq.SetMemory(id, infos.HardwareInfos.Memory)
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
return clients
|
return clients
|
||||||
}
|
}
|
||||||
|
|||||||
@ -166,3 +166,16 @@ func (pq *PriorityQueue) PeekAll() []UniqueID {
|
|||||||
|
|
||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetMemory sets the memory info for IndexNode.
|
||||||
|
func (pq *PriorityQueue) SetMemory(key UniqueID, memorySize uint64) {
|
||||||
|
pq.lock.Lock()
|
||||||
|
defer pq.lock.Unlock()
|
||||||
|
|
||||||
|
for i := range pq.items {
|
||||||
|
if pq.items[i].key == key {
|
||||||
|
pq.items[i].totalMem = memorySize
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@ -93,6 +93,25 @@ func TestPriorityQueue_IncPriority(t *testing.T) {
|
|||||||
assert.Equal(t, key, peekKey)
|
assert.Equal(t, key, peekKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPriorityQueue_SetMemory(t *testing.T) {
|
||||||
|
ret := &PriorityQueue{
|
||||||
|
policy: PeekClientV1,
|
||||||
|
}
|
||||||
|
for i := 0; i < QueueLen; i++ {
|
||||||
|
item := &PQItem{
|
||||||
|
key: UniqueID(i),
|
||||||
|
priority: i,
|
||||||
|
index: i,
|
||||||
|
totalMem: 1000,
|
||||||
|
}
|
||||||
|
ret.items = append(ret.items, item)
|
||||||
|
}
|
||||||
|
heap.Init(ret)
|
||||||
|
ret.SetMemory(1, 100000)
|
||||||
|
peeKey := ret.Peek(99999, []*commonpb.KeyValuePair{}, []*commonpb.KeyValuePair{})
|
||||||
|
assert.Equal(t, int64(1), peeKey)
|
||||||
|
}
|
||||||
|
|
||||||
func TestPriorityQueue(t *testing.T) {
|
func TestPriorityQueue(t *testing.T) {
|
||||||
ret := &PriorityQueue{
|
ret := &PriorityQueue{
|
||||||
policy: PeekClientV0,
|
policy: PeekClientV0,
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user