Fix indexnode and datanode num metric (#26031)

Signed-off-by: xige-16 <xi.ge@zilliz.com>
This commit is contained in:
xige-16 2023-07-31 19:35:04 +08:00 committed by GitHub
parent 8d942ecd79
commit cb21898ba5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 7 additions and 13 deletions

View File

@ -22,7 +22,6 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/util/commonpbutil"
"github.com/samber/lo"
@ -60,21 +59,13 @@ func (c *Cluster) Startup(ctx context.Context, nodes []*NodeInfo) error {
// Register registers a new node in cluster
func (c *Cluster) Register(node *NodeInfo) error {
c.sessionManager.AddSession(node)
err := c.channelManager.AddNode(node.NodeID)
if err == nil {
metrics.DataCoordNumDataNodes.WithLabelValues().Inc()
}
return err
return c.channelManager.AddNode(node.NodeID)
}
// UnRegister removes a node from cluster
func (c *Cluster) UnRegister(node *NodeInfo) error {
c.sessionManager.DeleteSession(node)
err := c.channelManager.DeleteNode(node.NodeID)
if err == nil {
metrics.DataCoordNumDataNodes.WithLabelValues().Dec()
}
return err
return c.channelManager.DeleteNode(node.NodeID)
}
// Watch tries to add a channel in datanode cluster

View File

@ -25,6 +25,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
grpcdatanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/commonpbutil"
@ -83,6 +84,7 @@ func (c *SessionManager) AddSession(node *NodeInfo) {
session := NewSession(node, c.sessionCreator)
c.sessions.data[node.NodeID] = session
metrics.DataCoordNumDataNodes.WithLabelValues().Set(float64(len(c.sessions.data)))
}
// DeleteSession removes the node session
@ -94,6 +96,7 @@ func (c *SessionManager) DeleteSession(node *NodeInfo) {
session.Dispose()
delete(c.sessions.data, node.NodeID)
}
metrics.DataCoordNumDataNodes.WithLabelValues().Set(float64(len(c.sessions.data)))
}
// getLiveNodeIDs returns IDs of all live DataNodes.

View File

@ -65,6 +65,7 @@ func (nm *NodeManager) setClient(nodeID UniqueID, client types.IndexNode) {
}
nm.lock.Lock()
nm.nodeClients[nodeID] = client
metrics.IndexCoordIndexNodeNum.WithLabelValues().Set(float64(len(nm.nodeClients)))
log.Info("IndexNode NodeManager setClient success", zap.Int64("nodeID", nodeID), zap.Int("IndexNode num", len(nm.nodeClients)))
nm.lock.Unlock()
nm.pq.Push(item)
@ -75,10 +76,10 @@ func (nm *NodeManager) RemoveNode(nodeID UniqueID) {
log.Info("IndexCoord", zap.Any("Remove node with ID", nodeID))
nm.lock.Lock()
delete(nm.nodeClients, nodeID)
metrics.IndexCoordIndexNodeNum.WithLabelValues().Set(float64(len(nm.nodeClients)))
delete(nm.stoppingNodes, nodeID)
nm.lock.Unlock()
nm.pq.Remove(nodeID)
metrics.IndexCoordIndexNodeNum.WithLabelValues().Dec()
}
func (nm *NodeManager) StoppingNode(nodeID UniqueID) {
@ -112,7 +113,6 @@ func (nm *NodeManager) AddNode(nodeID UniqueID, address string) error {
log.Error("IndexCoord NodeManager", zap.Any("Add node err", err))
return err
}
metrics.IndexCoordIndexNodeNum.WithLabelValues().Inc()
nm.setClient(nodeID, nodeClient)
return nil
}