From 3721cbfea150eb1631d03a0f7d7adceda9f42421 Mon Sep 17 00:00:00 2001 From: dragondriver Date: Wed, 1 Sep 2021 17:35:00 +0800 Subject: [PATCH] Proxy metrics integrates with other coordinators (#7400) Signed-off-by: dragondriver --- internal/datacoord/metrics_info.go | 17 +- internal/datacoord/mock_test.go | 55 ++++- internal/indexcoord/metrics_info.go | 11 +- internal/proxy/metrics_info.go | 270 ++++++++++++++++++++- internal/querycoord/metrics_info.go | 3 +- internal/rootcoord/metrics_info.go | 14 +- internal/types/types.go | 6 + internal/util/metricsinfo/topology.go | 27 ++- internal/util/metricsinfo/topology_test.go | 42 +++- 9 files changed, 397 insertions(+), 48 deletions(-) diff --git a/internal/datacoord/metrics_info.go b/internal/datacoord/metrics_info.go index dbe43e693d..6c729563fe 100644 --- a/internal/datacoord/metrics_info.go +++ b/internal/datacoord/metrics_info.go @@ -41,7 +41,21 @@ func (s *Server) getSystemInfoMetrics( } nodes := s.cluster.GetNodes() + log.Debug("datacoord.getSystemInfoMetrics", + zap.Int("data nodes num", len(nodes))) for _, node := range nodes { + if node == nil { + log.Warn("skip invalid data node", + zap.String("reason", "datanode is nil")) + continue + } + + if node.GetClient() == nil { + log.Warn("skip invalid data node", + zap.String("reason", "datanode client is nil")) + continue + } + metrics, err := node.GetClient().GetMetrics(ctx, req) if err != nil { log.Warn("invalid metrics of query node was found", @@ -92,7 +106,8 @@ func (s *Server) getSystemInfoMetrics( Cluster: clusterTopology, Connections: metricsinfo.ConnTopology{ Name: metricsinfo.ConstructComponentName(typeutil.DataCoordRole, Params.NodeID), - // TODO(dragondriver): connection info + // TODO(dragondriver): fill ConnectedComponents if necessary + ConnectedComponents: []metricsinfo.ConnectionInfo{}, }, } diff --git a/internal/datacoord/mock_test.go b/internal/datacoord/mock_test.go index 39e0e519c0..3c8f99c229 100644 --- a/internal/datacoord/mock_test.go +++ b/internal/datacoord/mock_test.go @@ -120,6 +120,37 @@ func (c *mockDataNodeClient) FlushSegments(ctx context.Context, in *datapb.Flush return &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil } +func (c *mockDataNodeClient) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { + // TODO(dragondriver): change the id, though it's not important in ut + nodeID := UniqueID(20210819) + + nodeInfos := metricsinfo.DataNodeInfos{ + BaseComponentInfos: metricsinfo.BaseComponentInfos{ + Name: metricsinfo.ConstructComponentName(typeutil.DataNodeRole, nodeID), + }, + } + resp, err := metricsinfo.MarshalComponentInfos(nodeInfos) + if err != nil { + return &milvuspb.GetMetricsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: err.Error(), + }, + Response: "", + ComponentName: metricsinfo.ConstructComponentName(typeutil.DataNodeRole, nodeID), + }, nil + } + + return &milvuspb.GetMetricsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "", + }, + Response: resp, + ComponentName: metricsinfo.ConstructComponentName(typeutil.DataNodeRole, nodeID), + }, nil +} + func (c *mockDataNodeClient) Stop() error { c.state = internalpb.StateCode_Abnormal return nil @@ -315,16 +346,24 @@ func (m *mockRootCoordService) AddNewSegment(ctx context.Context, in *datapb.Seg panic("not implemented") // TODO: Implement } -func (c *mockDataNodeClient) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { +func (m *mockRootCoordService) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { // TODO(dragondriver): change the id, though it's not important in ut - nodeID := UniqueID(20210819) + nodeID := UniqueID(20210901) - nodeInfos := metricsinfo.DataNodeInfos{ - BaseComponentInfos: metricsinfo.BaseComponentInfos{ - Name: metricsinfo.ConstructComponentName(typeutil.DataNodeRole, nodeID), + rootCoordTopology := metricsinfo.RootCoordTopology{ + Self: metricsinfo.RootCoordInfos{ + BaseComponentInfos: metricsinfo.BaseComponentInfos{ + Name: metricsinfo.ConstructComponentName(typeutil.RootCoordRole, nodeID), + }, + }, + Connections: metricsinfo.ConnTopology{ + Name: metricsinfo.ConstructComponentName(typeutil.RootCoordRole, nodeID), + // TODO(dragondriver): fill ConnectedComponents if necessary + ConnectedComponents: []metricsinfo.ConnectionInfo{}, }, } - resp, err := metricsinfo.MarshalComponentInfos(nodeInfos) + + resp, err := metricsinfo.MarshalTopology(rootCoordTopology) if err != nil { return &milvuspb.GetMetricsResponse{ Status: &commonpb.Status{ @@ -332,7 +371,7 @@ func (c *mockDataNodeClient) GetMetrics(ctx context.Context, req *milvuspb.GetMe Reason: err.Error(), }, Response: "", - ComponentName: metricsinfo.ConstructComponentName(typeutil.DataNodeRole, nodeID), + ComponentName: metricsinfo.ConstructComponentName(typeutil.RootCoordRole, nodeID), }, nil } @@ -342,6 +381,6 @@ func (c *mockDataNodeClient) GetMetrics(ctx context.Context, req *milvuspb.GetMe Reason: "", }, Response: resp, - ComponentName: metricsinfo.ConstructComponentName(typeutil.DataNodeRole, nodeID), + ComponentName: metricsinfo.ConstructComponentName(typeutil.RootCoordRole, nodeID), }, nil } diff --git a/internal/indexcoord/metrics_info.go b/internal/indexcoord/metrics_info.go index c2e7ea325c..311b61e856 100644 --- a/internal/indexcoord/metrics_info.go +++ b/internal/indexcoord/metrics_info.go @@ -35,7 +35,7 @@ func getSystemInfoMetrics( clusterTopology := metricsinfo.IndexClusterTopology{ Self: metricsinfo.IndexCoordInfos{ BaseComponentInfos: metricsinfo.BaseComponentInfos{ - Name: metricsinfo.ConstructComponentName(typeutil.IndexCoordRole, coord.ID), + Name: metricsinfo.ConstructComponentName(typeutil.IndexCoordRole, coord.session.ServerID), }, }, ConnectedNodes: make([]metricsinfo.IndexNodeInfos, 0), @@ -91,8 +91,9 @@ func getSystemInfoMetrics( coordTopology := metricsinfo.IndexCoordTopology{ Cluster: clusterTopology, Connections: metricsinfo.ConnTopology{ - Name: metricsinfo.ConstructComponentName(typeutil.IndexCoordRole, coord.ID), - // TODO(dragondriver): connection info + Name: metricsinfo.ConstructComponentName(typeutil.IndexCoordRole, coord.session.ServerID), + // TODO(dragondriver): fill ConnectedComponents if necessary + ConnectedComponents: []metricsinfo.ConnectionInfo{}, }, } @@ -104,7 +105,7 @@ func getSystemInfoMetrics( Reason: err.Error(), }, Response: "", - ComponentName: metricsinfo.ConstructComponentName(typeutil.IndexCoordRole, coord.ID), + ComponentName: metricsinfo.ConstructComponentName(typeutil.IndexCoordRole, coord.session.ServerID), }, nil } @@ -114,6 +115,6 @@ func getSystemInfoMetrics( Reason: "", }, Response: resp, - ComponentName: metricsinfo.ConstructComponentName(typeutil.IndexCoordRole, coord.ID), + ComponentName: metricsinfo.ConstructComponentName(typeutil.IndexCoordRole, coord.session.ServerID), }, nil } diff --git a/internal/proxy/metrics_info.go b/internal/proxy/metrics_info.go index 8a508ee64c..962528e0e2 100644 --- a/internal/proxy/metrics_info.go +++ b/internal/proxy/metrics_info.go @@ -52,16 +52,34 @@ func getSystemInfoMetrics( } queryCoordResp, queryCoordErr := node.queryCoord.GetMetrics(ctx, request) - skipQueryCoord := false queryCoordRoleName := "" - if queryCoordErr != nil || queryCoordResp == nil { - skipQueryCoord = true - } else { + if queryCoordErr == nil && queryCoordResp != nil { queryCoordRoleName = queryCoordResp.ComponentName identifierMap[queryCoordRoleName] = getUniqueIntGeneratorIns().get() } - if !skipQueryCoord { + dataCoordResp, dataCoordErr := node.dataCoord.GetMetrics(ctx, request) + dataCoordRoleName := "" + if dataCoordErr == nil && dataCoordResp != nil { + dataCoordRoleName = dataCoordResp.ComponentName + identifierMap[dataCoordRoleName] = getUniqueIntGeneratorIns().get() + } + + indexCoordResp, indexCoordErr := node.indexCoord.GetMetrics(ctx, request) + indexCoordRoleName := "" + if indexCoordErr == nil && indexCoordResp != nil { + indexCoordRoleName = indexCoordResp.ComponentName + identifierMap[indexCoordRoleName] = getUniqueIntGeneratorIns().get() + } + + rootCoordResp, rootCoordErr := node.rootCoord.GetMetrics(ctx, request) + rootCoordRoleName := "" + if rootCoordErr == nil && rootCoordResp != nil { + rootCoordRoleName = rootCoordResp.ComponentName + identifierMap[rootCoordRoleName] = getUniqueIntGeneratorIns().get() + } + + if queryCoordErr == nil && queryCoordResp != nil { proxyTopologyNode.Connected = append(proxyTopologyNode.Connected, metricsinfo.ConnectionEdge{ ConnectedIdentifier: identifierMap[queryCoordRoleName], Type: metricsinfo.Forward, @@ -78,6 +96,42 @@ func getSystemInfoMetrics( Infos: &queryCoordTopology.Cluster.Self, } + // fill connection edge, a little trick here + for _, edge := range queryCoordTopology.Connections.ConnectedComponents { + switch edge.TargetType { + case typeutil.RootCoordRole: + if rootCoordErr == nil && rootCoordResp != nil { + queryCoordTopologyNode.Connected = append(queryCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{ + ConnectedIdentifier: identifierMap[rootCoordRoleName], + Type: metricsinfo.Forward, + TargetType: typeutil.RootCoordRole, + }) + } + case typeutil.DataCoordRole: + if dataCoordErr == nil && dataCoordResp != nil { + queryCoordTopologyNode.Connected = append(queryCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{ + ConnectedIdentifier: identifierMap[dataCoordRoleName], + Type: metricsinfo.Forward, + TargetType: typeutil.DataCoordRole, + }) + } + case typeutil.IndexCoordRole: + if indexCoordErr == nil && indexCoordResp != nil { + queryCoordTopologyNode.Connected = append(queryCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{ + ConnectedIdentifier: identifierMap[indexCoordRoleName], + Type: metricsinfo.Forward, + TargetType: typeutil.IndexCoordRole, + }) + } + case typeutil.QueryCoordRole: + queryCoordTopologyNode.Connected = append(queryCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{ + ConnectedIdentifier: identifierMap[queryCoordRoleName], + Type: metricsinfo.Forward, + TargetType: typeutil.QueryCoordRole, + }) + } + } + // add query nodes to system topology graph for _, queryNode := range queryCoordTopology.Cluster.ConnectedNodes { identifier := getUniqueIntGeneratorIns().get() @@ -100,7 +154,211 @@ func getSystemInfoMetrics( } } - // TODO(dragondriver): integrate other coordinator + if dataCoordErr == nil && dataCoordResp != nil { + proxyTopologyNode.Connected = append(proxyTopologyNode.Connected, metricsinfo.ConnectionEdge{ + ConnectedIdentifier: identifierMap[dataCoordRoleName], + Type: metricsinfo.Forward, + TargetType: typeutil.DataCoordRole, + }) + + dataCoordTopology := metricsinfo.DataCoordTopology{} + err = metricsinfo.UnmarshalTopology(dataCoordResp.Response, &dataCoordTopology) + if err == nil { + // data coord in system topology graph + dataCoordTopologyNode := metricsinfo.SystemTopologyNode{ + Identifier: identifierMap[dataCoordRoleName], + Connected: make([]metricsinfo.ConnectionEdge, 0), + Infos: &dataCoordTopology.Cluster.Self, + } + + // fill connection edge, a little trick here + for _, edge := range dataCoordTopology.Connections.ConnectedComponents { + switch edge.TargetType { + case typeutil.RootCoordRole: + if rootCoordErr == nil && rootCoordResp != nil { + dataCoordTopologyNode.Connected = append(dataCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{ + ConnectedIdentifier: identifierMap[rootCoordRoleName], + Type: metricsinfo.Forward, + TargetType: typeutil.RootCoordRole, + }) + } + case typeutil.DataCoordRole: + dataCoordTopologyNode.Connected = append(dataCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{ + ConnectedIdentifier: identifierMap[dataCoordRoleName], + Type: metricsinfo.Forward, + TargetType: typeutil.DataCoordRole, + }) + case typeutil.IndexCoordRole: + if indexCoordErr == nil && indexCoordResp != nil { + dataCoordTopologyNode.Connected = append(dataCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{ + ConnectedIdentifier: identifierMap[indexCoordRoleName], + Type: metricsinfo.Forward, + TargetType: typeutil.IndexCoordRole, + }) + } + case typeutil.QueryCoordRole: + if queryCoordErr == nil && queryCoordResp != nil { + dataCoordTopologyNode.Connected = append(dataCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{ + ConnectedIdentifier: identifierMap[queryCoordRoleName], + Type: metricsinfo.Forward, + TargetType: typeutil.QueryCoordRole, + }) + } + } + } + + // add data nodes to system topology graph + for _, dataNode := range dataCoordTopology.Cluster.ConnectedNodes { + identifier := getUniqueIntGeneratorIns().get() + identifierMap[dataNode.Name] = identifier + dataNodeTopologyNode := metricsinfo.SystemTopologyNode{ + Identifier: identifier, + Connected: nil, + Infos: &dataNode, + } + systemTopology.NodesInfo = append(systemTopology.NodesInfo, dataNodeTopologyNode) + dataCoordTopologyNode.Connected = append(dataCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{ + ConnectedIdentifier: identifier, + Type: metricsinfo.CoordConnectToNode, + TargetType: typeutil.DataNodeRole, + }) + } + + // add data coord to system topology graph + systemTopology.NodesInfo = append(systemTopology.NodesInfo, dataCoordTopologyNode) + } + } + + if indexCoordErr == nil && indexCoordResp != nil { + proxyTopologyNode.Connected = append(proxyTopologyNode.Connected, metricsinfo.ConnectionEdge{ + ConnectedIdentifier: identifierMap[indexCoordRoleName], + Type: metricsinfo.Forward, + TargetType: typeutil.IndexCoordRole, + }) + + indexCoordTopology := metricsinfo.IndexCoordTopology{} + err = metricsinfo.UnmarshalTopology(indexCoordResp.Response, &indexCoordTopology) + if err == nil { + // index coord in system topology graph + indexCoordTopologyNode := metricsinfo.SystemTopologyNode{ + Identifier: identifierMap[indexCoordRoleName], + Connected: make([]metricsinfo.ConnectionEdge, 0), + Infos: &indexCoordTopology.Cluster.Self, + } + + // fill connection edge, a little trick here + for _, edge := range indexCoordTopology.Connections.ConnectedComponents { + switch edge.TargetType { + case typeutil.RootCoordRole: + if rootCoordErr == nil && rootCoordResp != nil { + indexCoordTopologyNode.Connected = append(indexCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{ + ConnectedIdentifier: identifierMap[rootCoordRoleName], + Type: metricsinfo.Forward, + TargetType: typeutil.RootCoordRole, + }) + } + case typeutil.DataCoordRole: + if dataCoordErr == nil && dataCoordResp != nil { + indexCoordTopologyNode.Connected = append(indexCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{ + ConnectedIdentifier: identifierMap[dataCoordRoleName], + Type: metricsinfo.Forward, + TargetType: typeutil.DataCoordRole, + }) + } + case typeutil.IndexCoordRole: + indexCoordTopologyNode.Connected = append(indexCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{ + ConnectedIdentifier: identifierMap[indexCoordRoleName], + Type: metricsinfo.Forward, + TargetType: typeutil.IndexCoordRole, + }) + case typeutil.QueryCoordRole: + if queryCoordErr == nil && queryCoordResp != nil { + indexCoordTopologyNode.Connected = append(indexCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{ + ConnectedIdentifier: identifierMap[queryCoordRoleName], + Type: metricsinfo.Forward, + TargetType: typeutil.QueryCoordRole, + }) + } + } + } + + // add index nodes to system topology graph + for _, indexNode := range indexCoordTopology.Cluster.ConnectedNodes { + identifier := getUniqueIntGeneratorIns().get() + identifierMap[indexNode.Name] = identifier + indexNodeTopologyNode := metricsinfo.SystemTopologyNode{ + Identifier: identifier, + Connected: nil, + Infos: &indexNode, + } + systemTopology.NodesInfo = append(systemTopology.NodesInfo, indexNodeTopologyNode) + indexCoordTopologyNode.Connected = append(indexCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{ + ConnectedIdentifier: identifier, + Type: metricsinfo.CoordConnectToNode, + TargetType: typeutil.IndexNodeRole, + }) + } + + // add index coord to system topology graph + systemTopology.NodesInfo = append(systemTopology.NodesInfo, indexCoordTopologyNode) + } + } + + if rootCoordErr == nil && rootCoordResp != nil { + proxyTopologyNode.Connected = append(proxyTopologyNode.Connected, metricsinfo.ConnectionEdge{ + ConnectedIdentifier: identifierMap[rootCoordRoleName], + Type: metricsinfo.Forward, + TargetType: typeutil.RootCoordRole, + }) + + rootCoordTopology := metricsinfo.RootCoordTopology{} + err = metricsinfo.UnmarshalTopology(rootCoordResp.Response, &rootCoordTopology) + if err == nil { + // root coord in system topology graph + rootCoordTopologyNode := metricsinfo.SystemTopologyNode{ + Identifier: identifierMap[rootCoordRoleName], + Connected: make([]metricsinfo.ConnectionEdge, 0), + Infos: &rootCoordTopology.Self, + } + + // fill connection edge, a little trick here + for _, edge := range rootCoordTopology.Connections.ConnectedComponents { + switch edge.TargetType { + case typeutil.RootCoordRole: + rootCoordTopologyNode.Connected = append(rootCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{ + ConnectedIdentifier: identifierMap[rootCoordRoleName], + Type: metricsinfo.Forward, + TargetType: typeutil.RootCoordRole, + }) + case typeutil.DataCoordRole: + if dataCoordErr == nil && dataCoordResp != nil { + rootCoordTopologyNode.Connected = append(rootCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{ + ConnectedIdentifier: identifierMap[dataCoordRoleName], + Type: metricsinfo.Forward, + TargetType: typeutil.DataCoordRole, + }) + } + case typeutil.IndexCoordRole: + rootCoordTopologyNode.Connected = append(rootCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{ + ConnectedIdentifier: identifierMap[indexCoordRoleName], + Type: metricsinfo.Forward, + TargetType: typeutil.IndexCoordRole, + }) + case typeutil.QueryCoordRole: + if queryCoordErr == nil && queryCoordResp != nil { + rootCoordTopologyNode.Connected = append(rootCoordTopologyNode.Connected, metricsinfo.ConnectionEdge{ + ConnectedIdentifier: identifierMap[queryCoordRoleName], + Type: metricsinfo.Forward, + TargetType: typeutil.QueryCoordRole, + }) + } + } + } + + // add root coord to system topology graph + systemTopology.NodesInfo = append(systemTopology.NodesInfo, rootCoordTopologyNode) + } + } // add proxy to system topology graph systemTopology.NodesInfo = append(systemTopology.NodesInfo, proxyTopologyNode) diff --git a/internal/querycoord/metrics_info.go b/internal/querycoord/metrics_info.go index 4fc0ec9329..390fc57b23 100644 --- a/internal/querycoord/metrics_info.go +++ b/internal/querycoord/metrics_info.go @@ -92,7 +92,8 @@ func getSystemInfoMetrics( Cluster: clusterTopology, Connections: metricsinfo.ConnTopology{ Name: metricsinfo.ConstructComponentName(typeutil.QueryCoordRole, Params.QueryCoordID), - // TODO(dragondriver): connection info + // TODO(dragondriver): fill ConnectedComponents if necessary + ConnectedComponents: []metricsinfo.ConnectionInfo{}, }, } diff --git a/internal/rootcoord/metrics_info.go b/internal/rootcoord/metrics_info.go index f8c555e010..eeb957ff02 100644 --- a/internal/rootcoord/metrics_info.go +++ b/internal/rootcoord/metrics_info.go @@ -26,12 +26,20 @@ import ( func (c *Core) getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { // TODO(dragondriver): add more metrics - nodeInfos := metricsinfo.RootCoordInfos{ - BaseComponentInfos: metricsinfo.BaseComponentInfos{ + rootCoordTopology := metricsinfo.RootCoordTopology{ + Self: metricsinfo.RootCoordInfos{ + BaseComponentInfos: metricsinfo.BaseComponentInfos{ + Name: metricsinfo.ConstructComponentName(typeutil.RootCoordRole, c.session.ServerID), + }, + }, + Connections: metricsinfo.ConnTopology{ Name: metricsinfo.ConstructComponentName(typeutil.RootCoordRole, c.session.ServerID), + // TODO(dragondriver): fill ConnectedComponents if necessary + ConnectedComponents: []metricsinfo.ConnectionInfo{}, }, } - resp, err := metricsinfo.MarshalComponentInfos(nodeInfos) + + resp, err := metricsinfo.MarshalTopology(rootCoordTopology) if err != nil { log.Warn("Failed to marshal system info metrics of root coordinator", zap.Error(err)) diff --git a/internal/types/types.go b/internal/types/types.go index bfa6fc55c0..121ee603bd 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -63,6 +63,8 @@ type DataCoord interface { GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInfoRequest) (*datapb.GetRecoveryInfoResponse, error) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPathsRequest) (*commonpb.Status, error) GetFlushedSegments(ctx context.Context, req *datapb.GetFlushedSegmentsRequest) (*datapb.GetFlushedSegmentsResponse, error) + + GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) } type IndexNode interface { @@ -81,6 +83,8 @@ type IndexCoord interface { DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (*commonpb.Status, error) GetIndexStates(ctx context.Context, req *indexpb.GetIndexStatesRequest) (*indexpb.GetIndexStatesResponse, error) GetIndexFilePaths(ctx context.Context, req *indexpb.GetIndexFilePathsRequest) (*indexpb.GetIndexFilePathsResponse, error) + + GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) } type RootCoord interface { @@ -113,6 +117,8 @@ type RootCoord interface { ShowSegments(ctx context.Context, req *milvuspb.ShowSegmentsRequest) (*milvuspb.ShowSegmentsResponse, error) ReleaseDQLMessageStream(ctx context.Context, in *proxypb.ReleaseDQLMessageStreamRequest) (*commonpb.Status, error) SegmentFlushCompleted(ctx context.Context, in *datapb.SegmentFlushCompletedMsg) (*commonpb.Status, error) + + GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) } // RootCoordComponent is used by grpc server of RootCoord diff --git a/internal/util/metricsinfo/topology.go b/internal/util/metricsinfo/topology.go index 2265a0bbcd..75bb67f83e 100644 --- a/internal/util/metricsinfo/topology.go +++ b/internal/util/metricsinfo/topology.go @@ -47,14 +47,28 @@ type QueryClusterTopology struct { ConnectedNodes []QueryNodeInfos `json:"connected_nodes"` } +type ConnectionType = string + +const ( + CoordConnectToNode ConnectionType = "manage" + Forward ConnectionType = "forward" +) + +type ConnectionTargetType = string + +type ConnectionInfo struct { + TargetName string `json:"target_name"` + TargetType ConnectionTargetType `json:"target_type"` +} + // TODO(dragondriver) // necessary to show all connection edge in topology graph? // for example, in system, Proxy connects to RootCoord and RootCoord also connects to Proxy, // if we do so, the connection relationship may be confusing. // ConnTopology shows how different components connect to each other. type ConnTopology struct { - Name string `json:"name"` - ConnectedComponents []string `json:"connected_components"` + Name string `json:"name"` + ConnectedComponents []ConnectionInfo `json:"connected_components"` } // QueryCoordTopology shows the whole metrics of query cluster @@ -93,15 +107,6 @@ type RootCoordTopology struct { Connections ConnTopology `json:"connections"` } -type ConnectionType string - -const ( - CoordConnectToNode ConnectionType = "manage" - Forward ConnectionType = "forward" -) - -type ConnectionTargetType string - type ConnectionEdge struct { ConnectedIdentifier int `json:"connected_identifier"` Type ConnectionType `json:"type"` diff --git a/internal/util/metricsinfo/topology_test.go b/internal/util/metricsinfo/topology_test.go index c0d40aeae3..4eecc8df60 100644 --- a/internal/util/metricsinfo/topology_test.go +++ b/internal/util/metricsinfo/topology_test.go @@ -95,8 +95,10 @@ func TestQueryCoordTopology_Codec(t *testing.T) { }, Connections: ConnTopology{ Name: ConstructComponentName(typeutil.QueryCoordRole, 1), - ConnectedComponents: []string{ - ConstructComponentName(typeutil.RootCoordRole, 1), + ConnectedComponents: []ConnectionInfo{ + { + TargetType: typeutil.RootCoordRole, + }, }, }, } @@ -176,8 +178,10 @@ func TestIndexCoordTopology_Codec(t *testing.T) { }, Connections: ConnTopology{ Name: ConstructComponentName(typeutil.IndexCoordRole, 1), - ConnectedComponents: []string{ - ConstructComponentName(typeutil.RootCoordRole, 1), + ConnectedComponents: []ConnectionInfo{ + { + TargetType: typeutil.RootCoordRole, + }, }, }, } @@ -257,8 +261,10 @@ func TestDataCoordTopology_Codec(t *testing.T) { }, Connections: ConnTopology{ Name: ConstructComponentName(typeutil.DataCoordRole, 1), - ConnectedComponents: []string{ - ConstructComponentName(typeutil.RootCoordRole, 1), + ConnectedComponents: []ConnectionInfo{ + { + TargetType: typeutil.RootCoordRole, + }, }, }, } @@ -290,8 +296,10 @@ func TestRootCoordTopology_Codec(t *testing.T) { }, Connections: ConnTopology{ Name: ConstructComponentName(typeutil.RootCoordRole, 1), - ConnectedComponents: []string{ - ConstructComponentName(typeutil.RootCoordRole, 1), + ConnectedComponents: []ConnectionInfo{ + { + TargetType: typeutil.IndexCoordRole, + }, }, }, } @@ -313,11 +321,19 @@ func TestRootCoordTopology_Codec(t *testing.T) { func TestConnTopology_Codec(t *testing.T) { topology1 := ConnTopology{ Name: ConstructComponentName(typeutil.ProxyRole, 1), - ConnectedComponents: []string{ - ConstructComponentName(typeutil.RootCoordRole, 1), - ConstructComponentName(typeutil.QueryCoordRole, 1), - ConstructComponentName(typeutil.DataCoordRole, 1), - ConstructComponentName(typeutil.IndexCoordRole, 1), + ConnectedComponents: []ConnectionInfo{ + { + TargetType: typeutil.IndexCoordRole, + }, + { + TargetType: typeutil.DataCoordRole, + }, + { + TargetType: typeutil.QueryCoordRole, + }, + { + TargetType: typeutil.RootCoordRole, + }, }, } s, err := MarshalTopology(topology1)