diff --git a/internal/datacoord/cluster.go b/internal/datacoord/cluster.go index 86bf2042c8..de36366737 100644 --- a/internal/datacoord/cluster.go +++ b/internal/datacoord/cluster.go @@ -22,7 +22,6 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/util/commonpbutil" "github.com/samber/lo" @@ -60,21 +59,13 @@ func (c *Cluster) Startup(ctx context.Context, nodes []*NodeInfo) error { // Register registers a new node in cluster func (c *Cluster) Register(node *NodeInfo) error { c.sessionManager.AddSession(node) - err := c.channelManager.AddNode(node.NodeID) - if err == nil { - metrics.DataCoordNumDataNodes.WithLabelValues().Inc() - } - return err + return c.channelManager.AddNode(node.NodeID) } // UnRegister removes a node from cluster func (c *Cluster) UnRegister(node *NodeInfo) error { c.sessionManager.DeleteSession(node) - err := c.channelManager.DeleteNode(node.NodeID) - if err == nil { - metrics.DataCoordNumDataNodes.WithLabelValues().Dec() - } - return err + return c.channelManager.DeleteNode(node.NodeID) } // Watch tries to add a channel in datanode cluster diff --git a/internal/datacoord/session_manager.go b/internal/datacoord/session_manager.go index 32b06efe95..fa13ea3a50 100644 --- a/internal/datacoord/session_manager.go +++ b/internal/datacoord/session_manager.go @@ -25,6 +25,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" grpcdatanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/commonpbutil" @@ -83,6 +84,7 @@ func (c *SessionManager) AddSession(node *NodeInfo) { session := NewSession(node, c.sessionCreator) c.sessions.data[node.NodeID] = session + metrics.DataCoordNumDataNodes.WithLabelValues().Set(float64(len(c.sessions.data))) } // DeleteSession removes the node session @@ -94,6 +96,7 @@ func (c *SessionManager) DeleteSession(node *NodeInfo) { session.Dispose() delete(c.sessions.data, node.NodeID) } + metrics.DataCoordNumDataNodes.WithLabelValues().Set(float64(len(c.sessions.data))) } // getLiveNodeIDs returns IDs of all live DataNodes. diff --git a/internal/indexcoord/node_manager.go b/internal/indexcoord/node_manager.go index 5969d1abbd..f31eabd1a7 100644 --- a/internal/indexcoord/node_manager.go +++ b/internal/indexcoord/node_manager.go @@ -65,6 +65,7 @@ func (nm *NodeManager) setClient(nodeID UniqueID, client types.IndexNode) { } nm.lock.Lock() nm.nodeClients[nodeID] = client + metrics.IndexCoordIndexNodeNum.WithLabelValues().Set(float64(len(nm.nodeClients))) log.Info("IndexNode NodeManager setClient success", zap.Int64("nodeID", nodeID), zap.Int("IndexNode num", len(nm.nodeClients))) nm.lock.Unlock() nm.pq.Push(item) @@ -75,10 +76,10 @@ func (nm *NodeManager) RemoveNode(nodeID UniqueID) { log.Info("IndexCoord", zap.Any("Remove node with ID", nodeID)) nm.lock.Lock() delete(nm.nodeClients, nodeID) + metrics.IndexCoordIndexNodeNum.WithLabelValues().Set(float64(len(nm.nodeClients))) delete(nm.stoppingNodes, nodeID) nm.lock.Unlock() nm.pq.Remove(nodeID) - metrics.IndexCoordIndexNodeNum.WithLabelValues().Dec() } func (nm *NodeManager) StoppingNode(nodeID UniqueID) { @@ -112,7 +113,6 @@ func (nm *NodeManager) AddNode(nodeID UniqueID, address string) error { log.Error("IndexCoord NodeManager", zap.Any("Add node err", err)) return err } - metrics.IndexCoordIndexNodeNum.WithLabelValues().Inc() nm.setClient(nodeID, nodeClient) return nil }